Building a Distributed Job Scheduler in Go: From Design to Production
A practical guide to building a distributed task scheduling system in Go using Machinery, Redis, and cron — covering task management, distributed locking, retry strategies, and monitoring.
中文版 / Chinese Version: 本文最初发表于 CSDN。阅读中文原文 →
Every backend service eventually needs scheduled jobs. Send a weekly report, clean up expired sessions, sync data from an external API, retry failed payments. When you’re running a single instance, a simple cron job works fine. But the moment you scale to multiple instances, things break: jobs run multiple times, there’s no visibility into what’s running, and failures silently disappear.
This article walks through designing and building a production-grade distributed job scheduling system in Go. We’ll start with the architecture, implement core components with working code, and cover the production concerns that tutorials usually skip — distributed locking, retry strategies, graceful shutdown, and monitoring.
1. Why Not Just Use Cron?
A basic cron job (or Go’s time.Ticker) works fine on a single server. The problems appear when you deploy multiple instances:
- Duplicate execution: All instances fire the same job at the same time
- No visibility: You can’t tell which jobs are running, which failed, or when they last succeeded
- No retry logic: If a job fails, it’s gone until the next scheduled run
- Tight coupling: Job logic lives inside the service that schedules it, making it hard to scale workers independently
A distributed job scheduler solves all of these by separating scheduling (when and what to run) from execution (running the actual logic), with a message queue in between.
2. Architecture Overview
The system has two main components connected by a message queue:
Task Center — the brain of the system:
- Scheduler: Runs cron triggers with distributed locking to ensure each job fires exactly once
- Task Manager: CRUD API for managing job definitions
- Monitor: Tracks task status, collects metrics, sends alerts on failure
- Result Handler: Consumes execution results from the queue and updates task state
Workers — stateless, horizontally scalable execution units:
- Task Receiver: Pulls tasks from the queue and dispatches them
- Task Executor: Runs the actual job logic
- Result Reporter: Pushes execution results back to the queue
- Retry Handler: Implements exponential backoff for failed tasks
3. Task Lifecycle
Every task follows a well-defined state machine:
A task starts in the Created state, gets Queued when the scheduler pushes it to the message broker, enters Running when a worker picks it up, and ends in either Completed or Failed. Failed tasks with remaining retries enter the Retrying state and are re-enqueued with exponential backoff. Tasks that exhaust all retries go to a Dead Letter queue for manual investigation.
4. Technology Selection
Machinery — The Foundation
Machinery is a Go framework for distributed task processing, inspired by Python’s Celery. It provides task registration, message broker abstraction, result storage, and retry logic out of the box.
However, machinery alone doesn’t handle cron scheduling or distributed locking — we need to add those layers ourselves.
Framework Comparison
Before committing to Machinery, let’s compare the main options in the Go ecosystem:
| Feature | Machinery | Asynq | Temporal | go-cron |
|---|---|---|---|---|
| Task queue | Yes (Redis, AMQP, SQS) | Yes (Redis only) | Yes (built-in) | No |
| Cron scheduling | No (add manually) | Yes (built-in) | Yes (built-in) | Yes (core feature) |
| Distributed locking | No | Unique tasks via Redis | Built-in | No |
| Result storage | Yes (Redis, Memcache, MongoDB) | Yes (Redis) | Yes (built-in) | No |
| Retry with backoff | Yes | Yes | Yes (complex policies) | No |
| Workflow/chaining | Yes (groups, chains, chords) | No | Yes (full workflow engine) | No |
| Complexity | Medium | Low | High | Very low |
| Best for | General task queues | Simple async jobs | Complex workflows | Simple cron replacement |
Machinery is the right choice when you need a general-purpose task queue with broker flexibility and workflow support (chains, groups). Asynq is simpler if you’re committed to Redis. Temporal is the heavy-duty option for complex, long-running workflows with saga patterns. go-cron is fine for single-instance scheduling but doesn’t solve the distributed problem on its own.
For this article, we’ll use Machinery for the task queue and add cron scheduling with distributed locking on top.
5. Implementation
Project Structure
scheduler/
├── cmd/
│ ├── server/main.go # Task center entry point
│ └── worker/main.go # Worker entry point
├── internal/
│ ├── config/config.go # Configuration
│ ├── lock/redis.go # Distributed lock
│ ├── scheduler/cron.go # Cron scheduler
│ ├── tasks/registry.go # Task definitions
│ ├── tasks/handlers.go # Task handler implementations
│ └── monitor/metrics.go # Monitoring and metrics
├── go.mod
└── go.sum
Configuration
// internal/config/config.go
package config
import "time"
type Config struct {
RedisURL string `env:"REDIS_URL" default:"redis://localhost:6379"`
BrokerURL string `env:"BROKER_URL" default:"redis://localhost:6379"`
ResultBackend string `env:"RESULT_BACKEND" default:"redis://localhost:6379"`
LockTTL time.Duration `env:"LOCK_TTL" default:"30s"`
MaxRetries int `env:"MAX_RETRIES" default:"3"`
DefaultQueue string `env:"DEFAULT_QUEUE" default:"machinery_tasks"`
MetricsPort int `env:"METRICS_PORT" default:"9090"`
}
Task Definition and Registration
First, define the tasks that workers can execute. Each task is a regular Go function — Machinery handles serialization and dispatch.
// internal/tasks/handlers.go
package tasks
import (
"context"
"fmt"
"net/http"
"time"
"github.com/RichardKnop/machinery/v2/tasks"
)
// SendReport generates and emails a weekly report.
// Parameters are passed as primitive types (Machinery serialization requirement).
func SendReport(reportType string, recipientEmail string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
report, err := generateReport(ctx, reportType)
if err != nil {
return "", fmt.Errorf("generate report %s: %w", reportType, err)
}
if err := emailReport(ctx, recipientEmail, report); err != nil {
return "", fmt.Errorf("email report to %s: %w", recipientEmail, err)
}
return fmt.Sprintf("report_%s_sent_to_%s", reportType, recipientEmail), nil
}
// CleanExpiredSessions removes sessions older than the given threshold.
func CleanExpiredSessions(maxAgeDays int64) (int64, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
count, err := deleteExpiredSessions(ctx, time.Duration(maxAgeDays)*24*time.Hour)
if err != nil {
return 0, fmt.Errorf("clean sessions older than %d days: %w", maxAgeDays, err)
}
return count, nil
}
// SyncExternalData pulls data from an external API and upserts it locally.
func SyncExternalData(apiEndpoint string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, apiEndpoint, nil)
if err != nil {
return "", fmt.Errorf("create request for %s: %w", apiEndpoint, err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", fmt.Errorf("fetch %s: %w", apiEndpoint, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("unexpected status %d from %s", resp.StatusCode, apiEndpoint)
}
count, err := upsertData(ctx, resp.Body)
if err != nil {
return "", fmt.Errorf("upsert data from %s: %w", apiEndpoint, err)
}
return fmt.Sprintf("synced_%d_records", count), nil
}
Now register these tasks with Machinery:
// internal/tasks/registry.go
package tasks
import (
"github.com/RichardKnop/machinery/v2"
)
// RegisterAll registers all available task handlers with the Machinery server.
func RegisterAll(server *machinery.Server) error {
return server.RegisterTasks(map[string]interface{}{
"send_report": SendReport,
"clean_expired_sessions": CleanExpiredSessions,
"sync_external_data": SyncExternalData,
})
}
Distributed Locking with Redis
The cron scheduler runs on every instance of the Task Center. Without distributed locking, every instance would fire the same job simultaneously. We use Redis SET NX (set-if-not-exists) with an expiry to ensure exactly one instance wins the lock.
// internal/lock/redis.go
package lock
import (
"context"
"crypto/rand"
"encoding/hex"
"errors"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
var ErrLockNotAcquired = errors.New("lock not acquired")
// RedisLock implements a distributed lock using Redis SET NX with automatic expiry.
type RedisLock struct {
client *redis.Client
key string
value string // unique value to prevent releasing someone else's lock
ttl time.Duration
}
// NewRedisLock creates a new distributed lock.
func NewRedisLock(client *redis.Client, key string, ttl time.Duration) *RedisLock {
// Generate a random value so only the holder can release the lock
b := make([]byte, 16)
rand.Read(b)
return &RedisLock{
client: client,
key: fmt.Sprintf("dlock:%s", key),
value: hex.EncodeToString(b),
ttl: ttl,
}
}
// Acquire attempts to acquire the lock. Returns ErrLockNotAcquired if
// another instance already holds it.
func (l *RedisLock) Acquire(ctx context.Context) error {
ok, err := l.client.SetNX(ctx, l.key, l.value, l.ttl).Result()
if err != nil {
return fmt.Errorf("redis SETNX: %w", err)
}
if !ok {
return ErrLockNotAcquired
}
return nil
}
// Release releases the lock, but only if we still own it.
// Uses a Lua script for atomic check-and-delete.
func (l *RedisLock) Release(ctx context.Context) error {
script := redis.NewScript(`
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
end
return 0
`)
_, err := script.Run(ctx, l.client, []string{l.key}, l.value).Result()
if err != nil {
return fmt.Errorf("release lock %s: %w", l.key, err)
}
return nil
}
// Extend resets the lock's TTL. Useful for long-running tasks that need
// to hold the lock beyond the initial TTL.
func (l *RedisLock) Extend(ctx context.Context, ttl time.Duration) error {
script := redis.NewScript(`
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("PEXPIRE", KEYS[1], ARGV[2])
end
return 0
`)
_, err := script.Run(ctx, l.client, []string{l.key},
l.value, ttl.Milliseconds()).Result()
if err != nil {
return fmt.Errorf("extend lock %s: %w", l.key, err)
}
return nil
}
Why the Lua script for Release? Without it, there’s a race condition: instance A checks the value, finds it matches, but before it can delete the key, the lock expires and instance B acquires it. Instance A then deletes instance B’s lock. The Lua script makes check-and-delete atomic.
Alternative: etcd Lease-Based Locking
For systems already running etcd (e.g., Kubernetes environments), etcd’s lease mechanism provides stronger guarantees than Redis:
// Alternative: etcd-based distributed lock
import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)
func acquireEtcdLock(client *clientv3.Client, lockName string) (*concurrency.Mutex, error) {
session, err := concurrency.NewSession(client, concurrency.WithTTL(30))
if err != nil {
return nil, err
}
mutex := concurrency.NewMutex(session, "/locks/"+lockName)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := mutex.TryLock(ctx); err != nil {
session.Close()
return nil, err
}
return mutex, nil
}
etcd uses Raft consensus, so it provides true linearizability. Redis, in contrast, can lose locks during failover in Redis Sentinel or Cluster setups. For most job scheduling use cases, Redis is sufficient — duplicate execution once in a while during failover is acceptable. For financial or safety-critical tasks, prefer etcd or use the Redlock algorithm with multiple independent Redis instances.
Cron Scheduler with Distributed Locking
Now we combine robfig/cron with the distributed lock to create a scheduler that fires jobs exactly once across all instances:
// internal/scheduler/cron.go
package scheduler
import (
"context"
"errors"
"log/slog"
"time"
"github.com/RichardKnop/machinery/v2"
"github.com/RichardKnop/machinery/v2/tasks"
"github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3"
"scheduler/internal/lock"
)
// JobDefinition describes a scheduled job.
type JobDefinition struct {
Name string // unique job name, used as the lock key
CronExpr string // cron expression, e.g. "0 */5 * * *"
TaskName string // registered Machinery task name
Args []tasks.Arg // task arguments
Queue string // target queue (for routing to specific workers)
MaxRetries int // max retry count
}
// CronScheduler wraps robfig/cron with distributed locking.
type CronScheduler struct {
cron *cron.Cron
server *machinery.Server
redisClient *redis.Client
lockTTL time.Duration
logger *slog.Logger
}
func NewCronScheduler(
server *machinery.Server,
redisClient *redis.Client,
lockTTL time.Duration,
logger *slog.Logger,
) *CronScheduler {
return &CronScheduler{
cron: cron.New(cron.WithSeconds()),
server: server,
redisClient: redisClient,
lockTTL: lockTTL,
logger: logger,
}
}
// Schedule registers a job definition with the cron scheduler.
func (s *CronScheduler) Schedule(job JobDefinition) error {
_, err := s.cron.AddFunc(job.CronExpr, func() {
s.fireJob(job)
})
if err != nil {
return err
}
s.logger.Info("scheduled job",
"name", job.Name,
"cron", job.CronExpr,
"task", job.TaskName,
)
return nil
}
// fireJob attempts to acquire the distributed lock and dispatch the task.
func (s *CronScheduler) fireJob(job JobDefinition) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Try to acquire the distributed lock
dlock := lock.NewRedisLock(s.redisClient, job.Name, s.lockTTL)
if err := dlock.Acquire(ctx); err != nil {
if errors.Is(err, lock.ErrLockNotAcquired) {
s.logger.Debug("another instance holds the lock, skipping",
"job", job.Name)
return
}
s.logger.Error("failed to acquire lock", "job", job.Name, "error", err)
return
}
defer dlock.Release(ctx)
// Build and send the Machinery task
signature := &tasks.Signature{
Name: job.TaskName,
Args: job.Args,
RetryCount: job.MaxRetries,
}
if job.Queue != "" {
signature.RoutingKey = job.Queue
}
result, err := s.server.SendTask(signature)
if err != nil {
s.logger.Error("failed to send task",
"job", job.Name,
"task", job.TaskName,
"error", err,
)
return
}
s.logger.Info("dispatched task",
"job", job.Name,
"task", job.TaskName,
"taskID", result.Signature.UUID,
)
}
// Start begins the cron scheduler.
func (s *CronScheduler) Start() {
s.cron.Start()
s.logger.Info("cron scheduler started")
}
// Stop gracefully stops the cron scheduler, waiting for running jobs to finish.
func (s *CronScheduler) Stop() context.Context {
return s.cron.Stop()
}
Worker Implementation
The worker is a separate binary that connects to the same message broker and processes tasks:
// cmd/worker/main.go
package main
import (
"context"
"log/slog"
"os"
"os/signal"
"syscall"
"github.com/RichardKnop/machinery/v2"
backendsiface "github.com/RichardKnop/machinery/v2/backends/iface"
brokersiface "github.com/RichardKnop/machinery/v2/brokers/iface"
configMachinery "github.com/RichardKnop/machinery/v2/config"
lockiface "github.com/RichardKnop/machinery/v2/locks/iface"
eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
redisbroker "github.com/RichardKnop/machinery/v2/brokers/redis"
"scheduler/internal/config"
"scheduler/internal/tasks"
)
func main() {
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
}))
cfg := config.Load()
// Configure Machinery
mCfg := &configMachinery.Config{
DefaultQueue: cfg.DefaultQueue,
ResultsExpireIn: 3600,
Redis: &configMachinery.RedisConfig{
MaxIdle: 3,
IdleTimeout: 240,
ReadTimeout: 15,
WriteTimeout: 15,
ConnectTimeout: 15,
NormalTasksPollPeriod: 1000,
DelayedTasksPollPeriod: 500,
},
}
broker := redisbroker.NewGR(mCfg, []string{cfg.BrokerURL}, 0)
backend := redisbackend.NewGR(mCfg, []string{cfg.ResultBackend}, 0)
lock := eagerlock.New()
server := machinery.NewServer(mCfg, broker, backend, lock)
// Register task handlers
if err := tasks.RegisterAll(server); err != nil {
logger.Error("failed to register tasks", "error", err)
os.Exit(1)
}
// Create a worker with a unique tag (hostname + PID works well)
hostname, _ := os.Hostname()
workerTag := fmt.Sprintf("worker-%s-%d", hostname, os.Getpid())
worker := server.NewWorker(workerTag, 10) // concurrency = 10
// Set up pre/post task hooks for logging
worker.SetPreTaskHandler(func(signature *tasks.Signature) {
logger.Info("starting task",
"taskID", signature.UUID,
"name", signature.Name,
"retry", signature.RetryCount,
)
})
worker.SetPostTaskHandler(func(signature *tasks.Signature) {
logger.Info("completed task",
"taskID", signature.UUID,
"name", signature.Name,
)
})
worker.SetErrorHandler(func(err error) {
logger.Error("task error", "error", err)
})
// Graceful shutdown
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
go func() {
<-ctx.Done()
logger.Info("shutting down worker...")
worker.Quit()
}()
logger.Info("starting worker", "tag", workerTag, "concurrency", 10)
if err := worker.Launch(); err != nil {
logger.Error("worker stopped with error", "error", err)
os.Exit(1)
}
}
Task Center (Server) Entry Point
// cmd/server/main.go
package main
import (
"context"
"log/slog"
"os"
"os/signal"
"syscall"
"github.com/RichardKnop/machinery/v2"
"github.com/RichardKnop/machinery/v2/tasks"
"github.com/redis/go-redis/v9"
"scheduler/internal/config"
internalTasks "scheduler/internal/tasks"
"scheduler/internal/scheduler"
)
func main() {
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
}))
cfg := config.Load()
// Initialize Machinery server (same setup as worker)
server := initMachineryServer(cfg)
if err := internalTasks.RegisterAll(server); err != nil {
logger.Error("failed to register tasks", "error", err)
os.Exit(1)
}
// Initialize Redis client for distributed locking
redisOpts, _ := redis.ParseURL(cfg.RedisURL)
redisClient := redis.NewClient(redisOpts)
// Create and configure the cron scheduler
cronScheduler := scheduler.NewCronScheduler(server, redisClient, cfg.LockTTL, logger)
// Register scheduled jobs
jobs := []scheduler.JobDefinition{
{
Name: "weekly-report",
CronExpr: "0 0 9 * * 1", // Every Monday at 9:00 AM
TaskName: "send_report",
Args: []tasks.Arg{
{Type: "string", Value: "weekly"},
{Type: "string", Value: "team@example.com"},
},
MaxRetries: 3,
},
{
Name: "session-cleanup",
CronExpr: "0 0 */6 * * *", // Every 6 hours
TaskName: "clean_expired_sessions",
Args: []tasks.Arg{
{Type: "int64", Value: 30}, // 30 days
},
MaxRetries: 2,
},
{
Name: "external-sync",
CronExpr: "0 */5 * * * *", // Every 5 minutes
TaskName: "sync_external_data",
Args: []tasks.Arg{
{Type: "string", Value: "https://api.example.com/data"},
},
Queue: "sync_workers",
MaxRetries: 5,
},
}
for _, job := range jobs {
if err := cronScheduler.Schedule(job); err != nil {
logger.Error("failed to schedule job", "name", job.Name, "error", err)
os.Exit(1)
}
}
// Start the scheduler
cronScheduler.Start()
// Wait for shutdown signal
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
<-ctx.Done()
logger.Info("shutting down task center...")
shutdownCtx := cronScheduler.Stop()
<-shutdownCtx.Done()
logger.Info("task center stopped")
}
Retry Mechanism with Exponential Backoff
Machinery provides built-in retries, but the default is immediate retry. For production systems, you want exponential backoff to avoid hammering a failing dependency.
// Configure retry behavior on task signatures
func newTaskSignatureWithBackoff(taskName string, args []tasks.Arg, maxRetries int) *tasks.Signature {
return &tasks.Signature{
Name: taskName,
Args: args,
RetryCount: maxRetries,
RetryTimeout: 10, // base retry delay in seconds
// Machinery multiplies RetryTimeout by 2^(attempt number)
// Attempt 1: 10s, Attempt 2: 20s, Attempt 3: 40s
}
}
For more control over retry behavior, implement a custom error handler that decides whether to retry:
// internal/tasks/retry.go
package tasks
import (
"errors"
"fmt"
"math"
"time"
)
// PermanentError wraps an error to indicate it should NOT be retried.
type PermanentError struct {
Err error
}
func (e *PermanentError) Error() string { return e.Err.Error() }
func (e *PermanentError) Unwrap() error { return e.Err }
// RetryableError wraps an error with a specific delay before the next retry.
type RetryableError struct {
Err error
RetryIn time.Duration
}
func (e *RetryableError) Error() string {
return fmt.Sprintf("%s (retry in %s)", e.Err.Error(), e.RetryIn)
}
// CalculateBackoff returns the delay for the given attempt using
// exponential backoff with jitter.
func CalculateBackoff(attempt int, baseDelay time.Duration, maxDelay time.Duration) time.Duration {
delay := time.Duration(float64(baseDelay) * math.Pow(2, float64(attempt)))
if delay > maxDelay {
delay = maxDelay
}
// Add up to 25% jitter to prevent thundering herd
jitter := time.Duration(float64(delay) * 0.25 * rand.Float64())
return delay + jitter
}
In your task handlers, use these error types to control retry behavior:
func SyncExternalData(apiEndpoint string) (string, error) {
// ... (fetch logic)
if resp.StatusCode == http.StatusNotFound {
// 404 won't fix itself — don't retry
return "", &PermanentError{Err: fmt.Errorf("endpoint %s returned 404", apiEndpoint)}
}
if resp.StatusCode == http.StatusTooManyRequests {
// Rate limited — retry with longer backoff
return "", &RetryableError{
Err: fmt.Errorf("rate limited by %s", apiEndpoint),
RetryIn: 60 * time.Second,
}
}
// Other errors: use default retry behavior
if resp.StatusCode >= 500 {
return "", fmt.Errorf("server error %d from %s", resp.StatusCode, apiEndpoint)
}
// ... (success path)
}
6. Task Status Monitoring
In a distributed system, visibility is not optional. You need to know which tasks are running, which failed, and how long they’re taking.
Querying Task State
Machinery stores task state in the result backend. You can query it programmatically:
// internal/monitor/status.go
package monitor
import (
"fmt"
"time"
"github.com/RichardKnop/machinery/v2"
"github.com/RichardKnop/machinery/v2/backends/result"
)
type TaskStatus struct {
ID string `json:"id"`
State string `json:"state"`
Result string `json:"result,omitempty"`
Error string `json:"error,omitempty"`
CreatedAt time.Time `json:"created_at"`
}
// GetTaskStatus retrieves the current state of a task by its ID.
func GetTaskStatus(server *machinery.Server, taskID string) (*TaskStatus, error) {
asyncResult := result.NewAsyncResult(&tasks.Signature{UUID: taskID}, server.GetBackend())
taskState := asyncResult.GetState()
status := &TaskStatus{
ID: taskID,
State: taskState.State,
CreatedAt: taskState.CreatedAt,
}
if taskState.Error != "" {
status.Error = taskState.Error
}
if taskState.IsSuccess() {
for _, r := range taskState.Results {
status.Result += fmt.Sprintf("%v ", r.Value)
}
}
return status, nil
}
// WaitForResult blocks until the task completes or the timeout expires.
func WaitForResult(server *machinery.Server, taskID string, timeout time.Duration) (*TaskStatus, error) {
asyncResult := result.NewAsyncResult(&tasks.Signature{UUID: taskID}, server.GetBackend())
results, err := asyncResult.GetWithTimeout(timeout, 500*time.Millisecond)
if err != nil {
return nil, fmt.Errorf("task %s: %w", taskID, err)
}
status := &TaskStatus{
ID: taskID,
State: "SUCCESS",
}
for _, r := range results {
status.Result += fmt.Sprintf("%v ", r.Interface())
}
return status, nil
}
Prometheus Metrics
Export metrics so your monitoring stack can track task health:
// internal/monitor/metrics.go
package monitor
import (
"net/http"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
var (
TasksDispatched = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "scheduler_tasks_dispatched_total",
Help: "Total number of tasks dispatched to the queue",
},
[]string{"task_name"},
)
TasksCompleted = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "scheduler_tasks_completed_total",
Help: "Total number of tasks completed successfully",
},
[]string{"task_name"},
)
TasksFailed = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "scheduler_tasks_failed_total",
Help: "Total number of tasks that failed (including retries exhausted)",
},
[]string{"task_name"},
)
TaskDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "scheduler_task_duration_seconds",
Help: "Time taken to execute a task",
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10), // 0.1s to ~51s
},
[]string{"task_name"},
)
TaskRetries = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "scheduler_task_retries_total",
Help: "Total number of task retries",
},
[]string{"task_name"},
)
QueueDepth = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "scheduler_queue_depth",
Help: "Current number of tasks waiting in the queue",
},
[]string{"queue_name"},
)
)
// StartMetricsServer starts a Prometheus metrics endpoint.
func StartMetricsServer(port int) {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
go http.ListenAndServe(fmt.Sprintf(":%d", port), mux)
}
You can then set up Grafana dashboards and Prometheus alerting rules:
# prometheus-alerts.yml
groups:
- name: scheduler
rules:
- alert: TaskFailureRateHigh
expr: rate(scheduler_tasks_failed_total[5m]) > 0.1
for: 5m
labels:
severity: warning
annotations:
summary: "High task failure rate"
description: "Task {{ $labels.task_name }} failing at {{ $value }} per second"
- alert: QueueBacklog
expr: scheduler_queue_depth > 1000
for: 10m
labels:
severity: critical
annotations:
summary: "Task queue backlog growing"
description: "Queue {{ $labels.queue_name }} has {{ $value }} pending tasks"
7. Production Considerations
Graceful Shutdown
When deploying new versions, you need workers to finish their current tasks before stopping. The worker code above already handles SIGTERM via signal.NotifyContext. The key points:
- Stop accepting new tasks — call
worker.Quit()which stops consuming from the broker - Wait for in-flight tasks — Machinery’s
Quit()waits for currently running goroutines - Set a deadline — Kubernetes sends
SIGTERM, thenSIGKILLafterterminationGracePeriodSeconds(default 30s). Set this higher than your longest-running task
# kubernetes deployment snippet
spec:
terminationGracePeriodSeconds: 300 # 5 minutes for long-running tasks
containers:
- name: worker
lifecycle:
preStop:
exec:
command: ["sleep", "5"] # Allow load balancer to drain
Task Idempotency
Network failures, worker crashes, and broker redelivery can cause a task to run more than once. Every task handler must be idempotent — running it twice with the same arguments should produce the same result.
Practical techniques:
- Use unique request IDs: Pass the Machinery task UUID into your business logic and use it as a deduplication key
- Upsert, don’t insert: Use
INSERT ... ON DUPLICATE KEY UPDATEor equivalent - Check-before-act: Verify the current state before making changes (e.g., don’t send an email if the
email_sentflag is already set)
func SendReport(reportType string, recipientEmail string) (string, error) {
// Idempotency: check if this report was already sent today
key := fmt.Sprintf("report:%s:%s:%s", reportType, recipientEmail,
time.Now().Format("2006-01-02"))
exists, err := redisClient.Exists(ctx, key).Result()
if err == nil && exists > 0 {
return "already_sent", nil
}
// ... generate and send report ...
// Mark as sent (expire after 24h)
redisClient.Set(ctx, key, "1", 24*time.Hour)
return "sent", nil
}
Dead Letter Queue
Tasks that exhaust all retries should not be silently dropped. Push them to a dead letter queue for manual investigation:
func (w *Worker) SetErrorHandler(fn func(err error)) {
// In addition to the error handler, check if retries are exhausted
worker.SetPostTaskHandler(func(signature *tasks.Signature) {
state := server.GetBackend().GetState(signature.UUID)
if state.IsFailure() && signature.RetryCount <= 0 {
// Push to dead letter queue
deadLetterSignature := *signature
deadLetterSignature.RoutingKey = "dead_letter"
deadLetterSignature.RetryCount = 0
server.SendTask(&deadLetterSignature)
logger.Error("task moved to dead letter queue",
"taskID", signature.UUID,
"name", signature.Name,
"error", state.Error,
)
}
})
}
Health Checks
Expose a health endpoint that verifies connectivity to Redis and the message broker:
func healthHandler(redisClient *redis.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second)
defer cancel()
if err := redisClient.Ping(ctx).Err(); err != nil {
w.WriteHeader(http.StatusServiceUnavailable)
fmt.Fprintf(w, `{"status":"unhealthy","error":"%s"}`, err)
return
}
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, `{"status":"healthy"}`)
}
}
8. Putting It All Together
Here’s the deployment topology for a production setup:
| Component | Instances | Scaling Strategy |
|---|---|---|
| Task Center | 2-3 (HA) | Fixed — distributed lock prevents duplicate scheduling |
| General Workers | 3-10 | Auto-scale based on queue depth |
| Sync Workers | 2-5 | Auto-scale based on sync_workers queue depth |
| Redis | 1 primary + replicas | Standard Redis HA setup |
| Database | 1 primary + read replica | For task definitions and audit log |
| Prometheus + Grafana | 1 | Monitoring and alerting |
The Task Center instances all run the same cron schedules, but the distributed lock ensures only one instance dispatches each job. If one instance goes down, another takes over on the next cron tick — no manual failover needed.
Workers can be scaled independently per queue. If the sync_workers queue is backing up, add more sync workers without affecting general task processing.
Summary
Building a distributed job scheduler is not about any single technique — it’s about combining several well-understood patterns:
- Separate scheduling from execution via a message queue
- Distributed locking (Redis SETNX or etcd leases) to prevent duplicate scheduling
- Exponential backoff with jitter for retry logic
- Dead letter queues for tasks that exhaust retries
- Idempotent task handlers because exactly-once delivery is a myth
- Prometheus metrics for visibility into queue depth, failure rates, and task duration
- Graceful shutdown to avoid killing in-flight tasks during deployments
Machinery gives you a solid foundation for the task queue and result storage. Layer cron scheduling with distributed locking on top, add monitoring, and you have a system that’s production-ready.
The complete source code is structured to be modular — you can swap Redis for RabbitMQ as the broker, switch from Machinery to Asynq if you prefer a simpler API, or graduate to Temporal when your workflow complexity demands it. The architectural patterns remain the same regardless of which libraries you choose.