dcron is a distributed cron system based on multiple registries (such as Consul, Redis, Etcd, ZooKeeper). It supports the management of both static and dynamic tasks, providing service node discovery and load balancing mechanisms, aimed at offering an efficient and reliable solution for scheduled tasks in microservices architectures.
- Features
- Installation
- Quick Start
- Registry Examples
- Core Concepts & Mechanisms
- Configuration and API Reference
- Frequently Asked Questions (FAQ) / Usage Suggestions
- Summary
- β¨ Multi-Registry Support: Seamlessly integrates with Consul, Redis, Etcd, and ZooKeeper, allowing for flexible selection and easy switching.
- π Static and Dynamic Task Management: Supports both statically defined tasks in code and dynamically added, deleted, or modified tasks at runtime to meet various scheduling needs.
- π Intelligent Node Change Monitoring: Combines Poll and Watch modes to perceive node changes in real-time, ensuring high availability and timeliness of task scheduling.
- π― Rich Task Scheduling Strategies: Built-in strategies such as consistent hashing, average distribution, hash slots, range allocation, weighted distribution, and round-robin to optimize task distribution and enhance system load balancing capabilities.
- π¦ Fine-Grained Node State Management: Clearly manages the complete lifecycle of nodes from starting (
starting), working (working), to leaving (leaving). - π§ Flexible Dynamic Task Configuration: Dynamic tasks support one-time execution (OneShot) and can carry custom data (Payload).
- π‘οΈ Reliable Error Handling Mechanism: Provides callbacks for task execution errors to ensure stability and traceability of task execution.
- π Intelligent Synchronization of Deleted Tasks: Automatically synchronizes deleted dynamic tasks to the registry's deleted list, effectively preventing dirty data and task re-execution.
- π Built-in Distributed Lock: Implements distributed locks through the
CanRunTaskinterface to ensure that the same task is executed only once in the cluster, avoiding concurrency conflicts. - β»οΈ Complete Task Lifecycle Management: Supports automatic deletion or cleanup after task execution, especially suitable for one-time and temporary task scenarios.
Before using dcron, ensure that the following dependencies are installed in your project:
# Consul Registry (if you choose Consul)
go get github.com/hashicorp/consul/api
# Redis Registry (if you choose Redis)
go get github.com/redis/go-redis/v9
# Etcd Registry (if you choose Etcd)
go get go.etcd.io/etcd/client/v3
# ZooKeeper Registry (if you choose ZooKeeper)
go get github.com/go-zookeeper/zk
# Cron Scheduler (core scheduling library)
go get github.com/robfig/cron/v3This section will guide you through quickly getting started with dcron using Consul as the registry.
First, you need to initialize a registry client. Here is an example using Consul:
import (
"log"
"github.com/kkangxu/dcron"
"github.com/hashicorp/consul/api"
)
func main() {
// Configure Consul client
config := api.DefaultConfig()
config.Address = "localhost:8500" // Consul service address
client, err := api.NewClient(config)
if err != nil {
log.Fatalf("Failed to create Consul client: %v", err)
}
// Create dcron Registry based on Consul client
registry := dcron.NewConsulRegistry(client)
// ... Next, create a Dcron instance
}Tip: The initialization methods for other registries (Redis, Etcd, ZooKeeper) are similar; please refer to the Registry Examples section.
Next, you can create a Dcron instance and configure related options:
import (
"fmt"
"log"
"github.com/kkangxu/dcron"
"github.com/robfig/cron/v3" // For cron Options
// ... Other imports
)
// ... Continue in the main function
// Create Dcron instance
dc := dcron.NewDcron(
registry, // The registry instance created in the previous step
dcron.WithStrategy(dcron.StrategyConsistent), // Set task assignment strategy to consistent hashing
dcron.WithCronOptions(cron.WithSeconds()), // Set Cron expression to support second-level precision
dcron.WithTaskRunFunc(func(task *dcron.TaskMeta) error { // Set unified execution function for dynamic tasks
fmt.Println("Executing dynamic task:", task.Name, "Payload:", task.Payload)
return nil
}),
dcron.WithErrHandler(func(task *dcron.TaskMeta, err error) { // Set error handling function for task execution
fmt.Printf("Task %s execution error: %v\n", task.Name, err)
}),
)Static tasks are usually defined in code and loaded when the service starts.
// ... Continue in the main function
// Add a static task that runs every 5 seconds
err = dc.AddFunc("static-task-example", "*/5 * * * * *", func() error {
fmt.Println("Static task runs every 5 seconds (static-task-example)")
return nil
})
if err != nil {
log.Fatalf("Failed to add static task: %v", err)
}If you need to ignore whether a task already exists or has been deleted, you can use the ForceAdd series of methods to forcefully add or overwrite tasks:
// Forcefully add a normal task, even if it already exists or has been deleted
_ = dc.ForceAddFunc("force-task-example", "*/3 * * * * *", func() error {
fmt.Println("Forcefully added task (force-task-example), runs every 3 seconds")
return nil
})
// Forcefully add a one-time task
_ = dc.ForceAddOneShotFunc("force-oneshot-example", "*/10 * * * * *", func() error {
fmt.Println("Forcefully added one-time task (force-oneshot-example)")
return nil // The task will be automatically cleaned up after execution
})Note: The
ForceAddseries of methods will directly overwrite tasks with the same name and clear their related records in the registry (such as deleted markers). Please use with caution.
Dynamic tasks can be added at runtime through the API, with their metadata stored in the registry.
// ... Continue in the main function
// Add a dynamic task that runs every 10 seconds
err = dc.AddTaskMeta(dcron.TaskMeta{
Name: "dynamic-task-example",
CronFormat: "*/10 * * * * *",
Payload: "Greetings from the dynamic task",
})
if err != nil {
log.Fatalf("Failed to add dynamic task: %v", err)
}Dynamic tasks will be executed by the handler set in WithTaskRunFunc.
Finally, start the Dcron service to begin task scheduling:
// ... Continue in the main function
log.Println("Dcron service is about to start...")
if err := dc.Start(); err != nil {
log.Fatalf("Failed to start Dcron service: %v", err)
}dcron supports multiple registries. Below are examples of how to initialize clients for each supported registry and create a Dcron instance.
package main
import (
"github.com/robfig/cron/v3"
"github.com/kkangxu/dcron"
"github.com/hashicorp/consul/api"
"log"
"fmt"
)
func main() {
// 1. Configure Consul client
config := api.DefaultConfig()
config.Address = "localhost:8500" // Consul service address
client, err := api.NewClient(config)
if err != nil {
log.Fatalf("Failed to create Consul client: %v", err)
}
// 2. Create Dcron instance using Consul Registry
dc := dcron.NewDcron(dcron.NewConsulRegistry(client), dcron.WithStrategy(dcron.StrategyConsistent), dcron.WithCronOptions(cron.WithSeconds()))
// 3. Add static task example
err = dc.AddFunc("consul-static-task", "*/5 * * * * *", func() error {
log.Println("Consul Registry: Executing static task (consul-static-task) every 5 seconds")
return nil
})
if err != nil {
log.Fatal(err)
}
// 4. Add dynamic task example (optional)
err = dc.AddTaskMeta(dcron.TaskMeta{
Name: "consul-dynamic-task",
CronFormat: "*/10 * * * * *",
Payload: "Hello from Consul dynamic task",
})
if err != nil {
log.Printf("Failed to add dynamic task consul-dynamic-task: %v", err) // Non-fatal error, can choose to log
}
// 5. Start the service
log.Println("Dcron (Consul) service is starting...")
if err := dc.Start(); err != nil {
log.Fatal(err)
}
}package main
import (
"github.com/robfig/cron/v3"
// "context" // If your Redis operations require context
"github.com/kkangxu/dcron"
"github.com/redis/go-redis/v9"
"log"
"fmt"
)
func main() {
// 1. Initialize Redis client
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379", // Redis service address
})
// 2. Create Dcron instance using Redis Registry
dc := dcron.NewDcron(dcron.NewRedisRegistry(rdb), dcron.WithStrategy(dcron.StrategyConsistent), dcron.WithCronOptions(cron.WithSeconds()))
// 3. Add static task example
err := dc.AddFunc("redis-static-task", "*/10 * * * * *", func() error {
log.Println("Redis Registry: Executing static task (redis-static-task) every 10 seconds")
return nil
})
if err != nil {
log.Fatal(err)
}
// 4. Start the service
log.Println("Dcron (Redis) service is starting...")
if err := dc.Start(); err != nil {
log.Fatal(err);
}
}package main
import (
"github.com/robfig/cron/v3"
"github.com/kkangxu/dcron"
"go.etcd.io/etcd/client/v3"
"log"
"time"
"fmt"
)
func main() {
// 1. Initialize Etcd client
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"}, // Etcd service address
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatalf("Failed to connect to Etcd: %v", err)
}
defer cli.Close() // Ensure the client is closed
// 2. Create Dcron instance using Etcd Registry
dc := dcron.NewDcron(dcron.NewEtcdRegistry(cli), dcron.WithStrategy(dcron.StrategyConsistent), dcron.WithCronOptions(cron.WithSeconds()))
// 3. Add static task example
err = dc.AddFunc("etcd-static-task", "*/3 * * * * *", func() error {
log.Println("Etcd Registry: Executing static task (etcd-static-task) every 3 seconds")
return nil
})
if err != nil {
log.Fatal(err)
}
// 4. Start the service
log.Println("Dcron (Etcd) service is starting...")
if err := dc.Start(); err != nil {
log.Fatal(err)
}
}package main
import (
"github.com/robfig/cron/v3"
"github.com/kkangxu/dcron"
"github.com/go-zookeeper/zk"
"log"
"time"
"fmt"
)
func main() {
// 1. Connect to ZooKeeper
conn, _, err := zk.Connect([]string{"127.0.0.1:2181"}, time.Second*5) // ZooKeeper service address
if err != nil {
log.Fatalf("Failed to connect to ZooKeeper: %v", err)
}
// defer conn.Close() // Typically, zk.Conn is managed internally by dcron, unless you have special needs
// 2. Create Dcron instance using ZooKeeper Registry
dc := dcron.NewDcron(dcron.NewZookeeperRegistry(conn), dcron.WithStrategy(dcron.StrategyConsistent), dcron.WithCronOptions(cron.WithSeconds()))
// 3. Add static task example
err = dc.AddFunc("zk-static-task", "*/2 * * * * *", func() error {
log.Println("ZooKeeper Registry: Executing static task (zk-static-task) every 2 seconds")
return nil
})
if err != nil {
log.Fatal(err)
}
// 4. Start the service
log.Println("Dcron (ZooKeeper) service is starting...")
if err := dc.Start(); err != nil {
log.Fatal(err)
}
}When you need to delete a dynamic task, call dc.MarkTaskDeleted("task-name"). This operation marks the task as "deleted" and synchronizes this state to the registry. The benefits of this approach include:
- Preventing Dirty Data: Ensures all nodes are aware that the task has been deleted.
- Avoiding Duplicate Scheduling: Other nodes will not attempt to schedule tasks that have been marked as deleted.
If you need to re-enable a task with the same name later, it is recommended to use ForceAddTaskMeta or first call CleanupTask to clear the old marker before adding it again.
In dcron, service nodes have clear lifecycle states:
starting: Node is in the initialization phase.working: Node has successfully registered with the registry and is functioning normally (listening for tasks, participating in scheduling, etc.).leaving: Node is preparing to shut down or go offline, performing resource cleanup and state updates.
When a node exits, its information in the registry will be handled properly to ensure it does not affect the normal operation of the cluster.
One of the core designs of dcron is to ensure idempotent execution of tasks in a distributed environment, meaning that the same task is executed only once at a given time point across the cluster. This is achieved through the built-in distributed lock mechanism, primarily relying on the CanRunTask method in the Registry interface:
// CanRunTask checks if the specified task can be executed by the current node at the given execution time.
// Returns: (can run, error)
CanRunTask(ctx context.Context, taskName string, execTime time.Time) (bool, error)How It Works:
- When a node is ready to execute a task, it first calls
CanRunTask. CanRunTaskattempts to create a temporary, unique marker (i.e., acquire a lock) for the combination oftaskName+execTimein the registry.- If it successfully acquires the lock, it indicates that the current node can execute the task. After execution, the lock is typically released (or it expires automatically).
- If it fails to acquire the lock (usually meaning another node has already obtained the execution rights for that task at that time), the current node will not execute the task.
Why Is This Important?
- Avoiding Duplicate Execution: In a distributed system, multiple nodes may simultaneously meet the trigger conditions for a task. Without distributed locks, the same task may be executed by multiple nodes, leading to data inconsistency or other unexpected behaviors.
- Ensuring Data Consistency: For tasks that need to modify shared resources, it is crucial to ensure that only one executor is active.
- Applicable to Various Scenarios:
- Nodes frequently join or leave the cluster.
- Business scenarios that require strict uniqueness in task execution.
- Tasks with very sensitive execution times that cannot afford delays or failures due to conflicts.
dcron encapsulates the implementation details of distributed locks within the specific implementations of each Registry, so users do not need to worry about the underlying details, as all built-in registries provided by dcron support reliable distributed locks.
The dcron instance is configured through a series of Option functions. An Option is a function type func(*dcron).
| Option | Description | Default Behavior/Notes |
|---|---|---|
WithStrategy(strategy AssignerStrategy) |
Set the task assignment strategy. Possible values: | Default is StrategyConsistent (consistent hashing) |
StrategyConsistent (consistent hashing) |
||
StrategyHashSharding (average distribution) |
||
StrategyHashSlot (hash slots) |
||
StrategyRange (range allocation) |
||
StrategyWeighted (weighted distribution) |
Requires node weights | |
StrategyRoundRobin (round-robin) |
||
WithAssigner(assigner Assigner) |
Set a custom task assigner or use the built-in assigner instance from dcron. |
If this option is set, it will override the effect of WithStrategy. |
WithCronOptions(...) |
Customize github.com/robfig/cron/v3 configuration, such as enabling second-level precision with cron.WithSeconds() |
Default does not support second-level precision. |
WithTaskRunFunc(handler TaskRunFunc) |
Set the unified execution function for dynamic tasks. TaskRunFunc type is func(*TaskMeta) error. |
Must be set, otherwise dynamic tasks cannot be executed. |
WithErrHandler(handler ErrHandler) |
Set the error handling function when a task execution error occurs. ErrHandler type is func(*TaskMeta, error). |
Default will print errors to logs. |
WithLogger(log Logger) |
Set a custom logger that implements the dcron.Logger interface. |
Default uses a simple logger based on the standard log package. |
Example:
import "github.com/robfig/cron/v3"
dc := dcron.NewDcron(
registry,
dcron.WithStrategy(dcron.StrategyRoundRobin), // Use round-robin strategy
dcron.WithCronOptions(cron.WithParser(cron.NewParser(
cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow, // Support optional second field
))),
// ... Other options
)You can inject a custom error handling function for task execution through the WithErrHandler option. This is useful for centralized error logging, sending alerts, or executing specific recovery logic.
dc := dcron.NewDcron(
registry,
dcron.WithErrHandler(func(task *dcron.TaskMeta, err error) {
// Use your project's logging system to log the error
fmt.Errorf("Task '%s' (Payload: %s) execution failed: %v", task.Name, task.Payload, err)
// You can add alert logic here, such as sending emails or webhooks
}),
// ... Other options
)If you do not set a custom handler through WithErrHandler, dcron will default to printing error messages to its internal logs (if a logger is configured with WithLogger, it will use the custom logger; otherwise, it will use the standard log package).
All dynamic tasks (tasks added through AddTaskMeta or ForceAddTaskMeta) are executed by the function set through WithTaskRunFunc. This provides a centralized place to manage the execution of dynamic tasks.
dc := dcron.NewDcron(
registry,
dcron.WithTaskRunFunc(func(task *dcron.TaskMeta) error {
fmt.Printf("Starting execution of dynamic task: %s\n", task.Name)
fmt.Printf("Task Cron expression: %s\n", task.CronFormat)
fmt.Printf("Task Payload: %s\n", task.Payload)
// Execute different business logic based on task.Name or task.Payload
switch task.Name {
case "send-email-report":
// Logic for sending email report
fmt.Println("Sending email report...")
case "cleanup-temp-files":
// Logic for cleaning temporary files
fmt.Println("Cleaning temporary files...")
default:
fmt.Printf("Unknown dynamic task type: %s\n", task.Name)
}
// Return nil if the task executes successfully
// Return a specific error if the task execution fails
return nil
}),
// ... Other options
)Important: If you plan to use dynamic tasks, you must provide a handler through WithTaskRunFunc, otherwise dynamic tasks will not be executed.
The following are the main API methods for managing dynamic tasks (called through the dcron instance):
-
AddTaskMeta(meta TaskMeta) errorAdds a new dynamic task. If the task already exists or has been marked as deleted, an error will be returned.err := dc.AddTaskMeta(dcron.TaskMeta{ Name: "new-dynamic-task", CronFormat: "0 * * * *", // Every hour Payload: "{"report_type":"hourly"}", })
-
ForceAddTaskMeta(meta TaskMeta) errorForcefully adds a dynamic task. If the task already exists or has been marked as deleted, it will overwrite the existing task and clear the related markers.err := dc.ForceAddTaskMeta(dcron.TaskMeta{ Name: "new-dynamic-task", // Can be the same as above CronFormat: "0 */2 * * *", // Every two hours Payload: "{"report_type":"bi_hourly"}", })
-
MarkTaskDeleted(name string) errorMarks the specified dynamic task as deleted. The task will not be immediately removed from the system, but it will not be scheduled for execution anymore. This state will be synchronized to the registry.err := dc.MarkTaskDeleted("new-dynamic-task")
The following table lists the main methods provided by the dcron instance and their descriptions.
| Method Name | Parameters | Return Type | Description |
|---|---|---|---|
AddTask |
name string, cronExpr string, tasker Tasker |
error |
Adds a static task, where Tasker is an interface containing a Run() error method. |
AddFunc |
name string, cronExpr string, cmd func() error |
error |
Adds a static task, with execution logic provided by the passed function. |
AddOneShotTask |
name string, cronExpr string, tasker Tasker |
error |
Adds a one-time static task. The task will be automatically removed after execution. |
AddOneShotFunc |
name string, cronExpr string, cmd func() error |
error |
Adds a one-time static task (function form). |
AddTaskMeta |
meta TaskMeta |
error |
Adds a dynamic task. Task metadata is stored in the registry. If the task already exists or is deleted, an error will be returned. |
ForceAddTask |
name string, cronExpr string, tasker Tasker |
error |
Forcefully adds a static task. Ignores whether the task already exists or has been deleted, directly overwriting. |
ForceAddFunc |
name string, cronExpr string, cmd func() error |
error |
Forcefully adds a static task (function form). |
ForceAddOneShotTask |
name string, cronExpr string, tasker Tasker |
error |
Forcefully adds a one-time static task. |
ForceAddOneShotFunc |
name string, cronExpr string, cmd func() error |
error |
Forcefully adds a one-time static task (function form). |
ForceAddTaskMeta |
meta TaskMeta |
error |
Forcefully adds a dynamic task. Ignores whether the task already exists or has been deleted, directly overwriting. |
ReuseDeletedTask |
name string |
error |
Reuses a previously marked deleted task. This operation will clear the deleted marker. |
MarkTaskDeleted |
name string |
error |
Marks the specified dynamic task as deleted. This state will be synchronized to the registry, and the task will no longer be scheduled. |
CleanupTask |
ctx context.Context, name string |
error |
Thoroughly cleans all related data of the specified task, including its metadata, deleted markers, and other traces in the registry. If the service restarts and the task is re-added through the Add series of APIs, it will be treated as a new task. |
Start |
error |
Starts the dcron service, beginning task scheduling and node registration. |
|
Stop |
error |
Stops the dcron service, unregistering nodes, stopping the scheduler, and releasing resources. |
|
GetNodeID |
string |
Gets the node ID of the current dcron service instance. |
|
GetAllTasks |
[]string |
Gets a list of all task names known to the current node (including static and dynamic). | |
GetMyselfRunningTasks |
[]string |
Gets a list of task names currently running on the node (i.e., tasks assigned to the current node and within the scheduling period). | |
ForceCleanupAllTasks |
ctx context.Context |
error |
!!! Extremely Dangerous Operation !!! Forcefully cleans all task metadata and deleted task markers in the registry. Only for testing environments, and must ensure only one node performs this operation, otherwise it may lead to data confusion. |
TaskMeta is used to define the metadata of dynamic tasks.
type TaskMeta struct {
Name string `json:"name"` // Unique name of the task
CronFormat string `json:"cron_format"` // Cron expression (e.g., "*/5 * * * *")
OneShot bool `json:"one_shot,omitempty"` // Indicates if it is a one-time task. If true, the behavior after execution depends on the following two fields.
ExecutedAndMarkDeleted bool `json:"executed_and_mark_deleted,omitempty"` // If OneShot is true, this field being true will mark the task as deleted after execution.
ExecutedAndCleanup bool `json:"executed_and_cleanup,omitempty"` // If OneShot is true, this field being true will thoroughly clean the task after execution (calling CleanupTask). This option takes precedence over ExecutedAndMarkDeleted.
Payload string `json:"payload,omitempty"` // Custom data passed to the task execution function (usually a JSON string).
}Node represents a service node in the distributed cluster.
type Node struct {
ID string `json:"id"` // Unique ID of the node (usually auto-generated)
IP string `json:"ip"` // IP address of the node
Hostname string `json:"hostname"` // Hostname of the node
LastAlive time.Time `json:"last_alive"` // Last heartbeat time of the node, used for health checks
CreateTime time.Time `json:"create_time"` // Registration creation time of the node
Status NodeStatus `json:"status"` // Current status of the node: "starting", "working", or "leaving"
}
// NodeStatus defines the type of node status
type NodeStatus string
const (
NodeStatusStarting NodeStatus = "starting"
NodeStatusWorking NodeStatus = "working"
NodeStatusLeaving NodeStatus = "leaving"
)dcron allows you to listen for changes in node states and task metadata changes through the Registry interface.
// NodeEvent represents a node change event
type NodeEvent struct {
Type NodeEventType // Event type: NodeEventTypePut (add/update), NodeEventTypeDelete (delete), NodeEventTypeChanged (changed) after the watch node is started, send this event.
Node Node // Related node information
}
// NodeEventType defines the type of node event
type NodeEventType string
const (
NodeEventTypePut NodeEventType = "put" // Node added or updated
NodeEventTypeDelete NodeEventType = "delete" // Node deleted
NodeEventTypeChanged NodeEventType = "changed" // Node changed event (details not specified)
)
// TaskMetaEvent represents a task metadata change event
type TaskMetaEvent struct {
Type TaskEventType // Event type: TaskEventTypePut (add/update), TaskEventTypeDelete (delete)
Task TaskMeta // Related task metadata
}
// TaskEventType defines the type of task event
type TaskEventType string
const (
TaskEventTypePut TaskEventType = "put" // Task added or metadata updated
TaskEventTypeDelete TaskEventType = "delete" // Task marked as deleted (MarkTaskDeleted)
)You can obtain these events' channels through the WatchNodes(ctx context.Context) (<-chan []NodeEvent, error) and WatchTaskEvent(ctx context.Context) (<-chan []TaskMetaEvent, error) methods of the Registry interface, allowing your application to respond to changes in the cluster's state.
-
Q: Why can't I directly add a task with the same name after calling
MarkTaskDeleted?
A: When you callMarkTaskDeleted("task-name"), the system records a "deleted" marker for that task name in the registry. This prevents other nodes from mistakenly scheduling a task that has been deleted due to network delays or other reasons. This marker acts as a "tombstone." If you need to reuse this task name, you can:- Use
ForceAddTaskMeta: This method will ignore all existing markers (including "deleted" markers) and forcefully overwrite or create the task. - Call
CleanupTaskfirst, thenAddTaskMeta:CleanupTask("task-name")will thoroughly clear all traces of that task name in the registry, including metadata and "deleted" markers. After that, you can useAddTaskMetaas if it were a brand new task. - Use
ReuseDeletedTask: This method is specifically designed to "revive" a task that has been marked as deleted, clearing the deletion marker so that the task can be rescheduled (if its metadata still exists).
- Use
-
Q: What is the core difference between
ForceAddTaskseries methods and regularAddseries methods?
A: The core difference lies in how they handle existing tasks or markers:Addseries methods (likeAddTask,AddTaskMeta): These methods check for existing tasks before adding. If a task with the same name exists or has been marked as deleted, they typically return an error to prevent accidental overwriting or conflicts with old states.ForceAddseries methods (likeForceAddTask,ForceAddTaskMeta): As the name suggests, these methods "force" execution. They ignore whether the task already exists or has been deleted, directly creating or overwriting the task. This usually means they will first clean up any old records related to that task name (including metadata, deleted markers, and possible execution locks) before writing the new task information. This makes them very suitable for scenarios where you need to "reset" or "ensure the latest configuration takes effect."
-
Q: How should I choose between dynamic and static tasks?
A:- Static Tasks:
- Definition: Typically defined in code through
AddFuncorAddTaskdirectly. - Lifecycle: Loaded with the service instance and stopped when the service instance stops. Their definitions are hardcoded in the application.
- Management: Modifying static tasks usually requires recompiling and redeploying the code.
- Use Cases: Suitable for tasks that are fixed, do not change often, and are closely related to core application functionality, such as periodic log rotation, system health checks, or fixed data synchronization.
- Definition: Typically defined in code through
- Dynamic Tasks:
- Definition: Added at runtime through
AddTaskMetaAPI, with their metadata stored in the registry. - Lifecycle: Independent of the deployment of service instances. Once added to the registry, dynamic tasks can be discovered and scheduled by
dcronnodes. They can be controlled through APIs for addition, deletion, and modification. - Management: More flexible, allowing for dynamic control of task execution through external systems (like management dashboards or operational scripts).
- Use Cases:
- Tasks that need to be dynamically created and managed by operations or users (e.g., user-defined reminders, temporary tasks related to marketing activities).
- Tasks that require frequent adjustments to execution times or parameters.
- One-time or temporary data processing or operational tasks.
- Definition: Added at runtime through
- Static Tasks:
-
Q: How can I thoroughly clean up all traces of a task in the registry?
A: Call thedcroninstance'sCleanupTask(ctx context.Context, taskName string)method. This method will:- Delete the task's metadata (
TaskMeta). - Clear the "deleted" marker for that task (if it exists).
- Attempt to clean up other potential data related to that task, such as last execution time records, distributed lock markers, etc. (specifics depend on the registry implementation).
After calling this method, the
taskNamewill no longer have any associated information in the registry and can be considered a fully available task name. If you need to clean all tasks at once (very dangerous, only for testing!), you can useForceCleanupAllTasks(ctx context.Context).
- Delete the task's metadata (
-
Q: Why does
dcronneed a "deleted task synchronization" mechanism?
A: In a distributed system, there may be delays in state synchronization among nodes. Without a clear "deleted task" list and synchronization across nodes, the following issues may arise:- A node deletes task A, but other nodes may still attempt to schedule it due to network delays or outdated information.
- If a deleted task is re-added before its old instances are cleaned up, it may lead to conflicts or unexpected behavior.
By maintaining a "deleted task list" in the registry and allowing all
dcronnodes to be aware of this list, it ensures: - Global Consistency: All nodes reach a consensus on which tasks are in a "deleted" state.
- Preventing "Zombie" Tasks: Avoids mistakenly reactivating or executing tasks that have been deleted.
- Idempotency Assurance: Works in conjunction with the distributed lock mechanism to further enhance the accuracy of task scheduling.
-
Q: How to gracefully shut down the
dcronservice and take nodes offline?
A: When your application is ready to shut down, you should call theStop()method of thedcroninstance. This method will perform the following operations to ensure a graceful shutdown:- Update Node Status: Change the current node's status in the registry to
leaving. This notifies other nodes that this node is about to go offline, and they will consider this when redistributing tasks. - Stop Local Scheduler: Stops the cron scheduler, preventing new task executions. Ongoing tasks are typically allowed to finish (but this depends on whether the task implementation can respond to interrupt signals).
- Unregister Node Information: Removes the current node's registration information from the registry.
- Release Resources: Closes connections to the registry (if applicable) and releases other internal resources. By executing these steps, you can minimize the impact of node shutdown on task interruptions or duplicate scheduling, ensuring smooth operation of the cluster.
- Update Node Status: Change the current node's status in the registry to
-
Q: What registries does
dcronsupport? If I want to switch from Consul to Redis, will I need to change a lot of code?
A:dcroncurrently supports built-in registries for Consul, Redis, Etcd, and ZooKeeper. Switching registries is very simple because the core logic ofdcronis decoupled from theRegistryinterface. The main changes you need to make are:- Modify Dependencies: Ensure that your
go.modincludes the client library for the target registry (e.g., switch fromgithub.com/hashicorp/consul/apitogithub.com/redis/go-redis/v9). - Modify Initialization Code: When creating the
Dcroninstance, pass in the newRegistryimplementation. For example:// Original Consul initialization // import "github.com/hashicorp/consul/api" // consulClient, _ := api.NewClient(api.DefaultConfig()) // registry := dcron.NewConsulRegistry(consulClient) // New Redis initialization import "github.com/redis/go-redis/v9" rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"}) registry := dcron.NewRedisRegistry(rdb) // Just change here dc := dcron.NewDcron(registry, /* ... options ... */)
Your task definitions (static tasks'
AddFunccalls, dynamic tasks'TaskMetastructure) and otherdcronconfiguration options (likeWithStrategy,WithErrHandler, etc.) typically do not require any modifications. - Modify Dependencies: Ensure that your
-
Q: How can I listen for changes in nodes or tasks in the cluster, such as implementing a monitoring dashboard?
A: TheRegistryinterface provides aWatchmechanism to listen for these changes:- Listen for Node Changes:
registry.WatchNodes(ctx context.Context) (<-chan []NodeEvent, error)This method returns a channel that will sendNodeEventslices when nodes join, leave, or update their status. - Listen for Task Metadata Changes:
registry.WatchTaskEvent(ctx context.Context) (<-chan []TaskMetaEvent, error)Similarly, this method returns a channel that will sendTaskMetaEventslices when dynamic tasks are added, modified, or marked as deleted.
You can start a goroutine in your application to consume these channels and update your monitoring dashboard, send notifications, or perform other automated operational tasks based on the events.
// Example: Listening for node events go func() { nodeEventsChan, err := registry.WatchNodes(context.Background()) // Use appropriate context if err != nil { log.Printf("Unable to listen for node events: %v", err) return } for events := range nodeEventsChan { for _, event := range events { log.Printf("Node event: Type=%s, NodeID=%s, Status=%s", event.Type, event.Node.ID, event.Node.Status) // Update your monitoring system here } } }()
- Listen for Node Changes:
-
Q: How can I implement a unified execution logic for all dynamic tasks?
A: Through thedcron.WithTaskRunFunc(handler func(*dcron.TaskMeta) error)option. You need to provide a function that takes a*dcron.TaskMetaparameter and returns anerror. When any dynamic task reaches its execution time and is selected for execution by the current node,dcronwill call the handler function you provided and pass the task'sTaskMetainformation to it. Inside this handler function, you can:- Distinguish different dynamic tasks by
task.Name. - Access the custom data passed to the task via
task.Payload. - Execute the corresponding business logic.
- Return
nilto indicate successful execution, or return an error to indicate failure (this will trigger the error handler set throughWithErrHandler). This is the core extension point for implementing dynamic task scheduling functionality.
- Distinguish different dynamic tasks by
-
Q: What task assignment strategies does
dcronprovide, and how should I choose?
A: The choice of strategy depends on your specific needs and cluster characteristics:StrategyConsistent(Consistent Hashing):- Advantages: When the number of nodes changes (increased or decreased), only a small number of tasks will be redistributed, while most tasks' assignments remain stable. Task distribution among nodes is relatively even.
- Use Cases: Environments where nodes may frequently change (e.g., elastic scaling clusters), a large number of tasks, and a desire for smooth task distribution transitions.
StrategyHashSharding(Average Distribution / Hash Modulus):- Advantages: Simple implementation with low computational overhead. When the number of nodes is fixed, task distribution is very even.
- Disadvantages: When the number of nodes changes, most tasks' assignments will change, potentially leading to significant task migration.
- Use Cases: Stable node counts, moderate task volumes, and insensitivity to task migration during node changes.
StrategyHashSlot(Hash Slots):- Advantages: Maps tasks to a fixed number of slots, then assigns slots to nodes. When nodes are added or removed, only a few slots and their tasks need to be migrated, resulting in smoother transitions than pure hash modulus. Can achieve finer-grained load balancing.
- Use Cases: Scenarios where task distribution needs to be more stable during node changes while maintaining good uniformity, similar to the slot concept in Redis Cluster.
StrategyRange(Range Allocation):- Advantages: Allocates continuous blocks of tasks to nodes based on the sorted order of task names. Suitable for tasks with some order or business correlation.
- Disadvantages: If task names are unevenly distributed, it may lead to uneven node loads.
- Use Cases: Tasks that can be logically partitioned by name, such as alphabetical or numerical ranges.
StrategyWeighted(Weighted Distribution):- Advantages: Allows different nodes to be assigned different weights, enabling more capable nodes to handle more tasks.
- Disadvantages: Requires prior assessment and configuration of each node's weight.
- Use Cases: Scenarios where nodes have varying processing capabilities (e.g., different machine configurations).
StrategyRoundRobin(Round Robin):- Advantages: Simple and fair, assigning tasks to each node in turn.
- Disadvantages: Does not consider the characteristics of tasks or the current load of nodes. If task execution times vary significantly, some nodes may be idle while others are busy.
- Use Cases: Scenarios where task execution times are similar, node capabilities are equal, and a simple fair distribution is desired.
Selection Recommendations:
- For most general scenarios,
StrategyConsistent(Consistent Hashing) is a good default choice due to its balanced performance in dynamic environments and load balancing. - If your nodes are very stable, consider
StrategyHashShardingorStrategyRoundRobinfor simplicity. - If you require better stability during node changes and want finer control than consistent hashing, consider
StrategyHashSlot. - Specific business scenarios (like partitioning by name or heterogeneous nodes) would correspond to
StrategyRangeorStrategyWeighted.
The dcron project aims to provide a powerful, flexible, and extensible distributed cron job solution. By supporting multiple mainstream registries and rich task management features, it can help you efficiently and reliably schedule and manage cron jobs in microservices architectures.
π§ Recommended Uses:
- Unified scheduling and management of various cron jobs in microservices architectures.
- Scheduled tasks that require high availability and load balancing.
- Dynamically issuing, controlling, and monitoring scheduled tasks through a backend management interface or operational scripts.
- Implementing one-time, periodic, or temporary automated operations and data processing jobs.
We encourage you to choose the appropriate registry and configuration options based on your project's actual needs and to fully utilize the features provided by dcron. If you have any questions or suggestions, feel free to communicate with us through GitHub Issues (assuming project address).