A complete queue library for Go inspired by Laravel's Queue system.
- Go 100%
| example | ||
| dispatcher.go | ||
| failed_helper.go | ||
| failed_jobs.go | ||
| go.mod | ||
| go.sum | ||
| memory_failed_store.go | ||
| memory_queue.go | ||
| queue.go | ||
| README.md | ||
| redis_failed_store.go | ||
| redis_queue.go | ||
| worker.go | ||
Anar Queue Library
A complete queue library for Go inspired by Laravel's Queue system. Supports multiple backends (in-memory, Redis), delayed jobs, job chaining, retries, and concurrent workers.
Features
- Multiple queue backends (Memory, Redis)
- Delayed job execution
- Job retry with exponential backoff
- Concurrent workers
- Job chaining
- Fluent interface for job dispatching
- Multiple queue support
- Job timeout handling
- Bulk job dispatching
Installation
go get codeberg.org/anarproject/anarqueue
Quick Start
1. Initialize Queue
import "codeberg.org/anarproject/anarqueue"
// Using in-memory queue
config := anarqueue.Config{
DefaultQueue: "default",
MaxAttempts: 3,
Timeout: 60 * time.Second,
}
q := anarqueue.NewMemoryQueue(config)
// Or using Redis
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
q := anarqueue.NewRedisQueue(redisClient, config)
2. Dispatch Jobs
dispatcher := queue.NewDispatcher(q, config)
ctx := context.Background()
// Simple dispatch
dispatcher.DispatchNow(ctx, map[string]interface{}{
"type": "send_email",
"email": "user@example.com",
})
// With options
dispatcher.Dispatch(payload).
OnQueue("high-priority").
MaxAttempts(5).
Timeout(300 * time.Second).
Send(ctx)
// Delayed job
dispatcher.Dispatch(payload).
Delay(1 * time.Hour).
Send(ctx)
3. Create Worker
worker := anarqueue.NewWorker(q, 5) // 5 concurrent workers
worker.Listen("default", "high-priority")
// Register handlers
worker.Register("send_email", func(ctx context.Context, job *queue.Job) error {
payload := job.Payload.(map[string]interface{})
// Process job
return nil
})
// Start processing
worker.Start(ctx)
Advanced Usage
Job Chaining
dispatcher.Chain(
map[string]interface{}{"type": "step1"},
map[string]interface{}{"type": "step2"},
map[string]interface{}{"type": "step3"},
).OnQueue("processing").Dispatch(ctx)
Bulk Dispatch
jobs := []interface{}{
map[string]interface{}{"type": "task1"},
map[string]interface{}{"type": "task2"},
}
dispatcher.Bulk(ctx, jobs)
Custom Job Configuration
job := &anarqueue.Job{
Payload: yourData,
MaxAttempts: 5,
Timeout: 300 * time.Second,
Delay: 10 * time.Minute,
}
q.Push(ctx, job)
Queue Backends
Memory Queue
- Fast, lightweight
- Perfect for testing and development
- No external dependencies
- Data lost on restart
Redis Queue
- Persistent storage
- Distributed workers support
- Production-ready
- Requires Redis server
Configuration Options
type Config struct {
DefaultQueue string // Default queue name
MaxAttempts int // Maximum retry attempts
Timeout time.Duration // Job timeout
RetryAfter time.Duration // Delay before retry
}
Job Structure
type Job struct {
ID string // Unique job ID
Queue string // Queue name
Payload interface{} // Job data
Attempts int // Current attempt number
MaxAttempts int // Maximum attempts
Timeout time.Duration // Job timeout
Delay time.Duration // Initial delay
CreatedAt time.Time // Creation time
AvailableAt time.Time // When job becomes available
}
Worker Features
- Concurrent Processing: Run multiple workers in parallel
- Automatic Retries: Failed jobs are retried with exponential backoff
- Timeout Handling: Jobs are cancelled if they exceed timeout
- Multiple Queues: Listen to multiple queues with priorities
- Graceful Shutdown: Clean shutdown on context cancellation
Best Practices
- Always use context: Pass context for cancellation and timeout
- Handle errors properly: Return errors for retries, log for debugging
- Set appropriate timeouts: Prevent jobs from running indefinitely
- Use specific queues: Separate different job types for better management
- Monitor queue sizes: Track pending jobs to prevent backlog
Testing
// Use memory queue for tests
q := queue.NewMemoryQueue(queue.DefaultConfig)
// Dispatch test job
dispatcher := queue.NewDispatcher(q, queue.DefaultConfig)
dispatcher.DispatchNow(ctx, testPayload)
// Verify job was queued
size, _ := q.Size(ctx, "default")
assert.Equal(t, 1, size)
License
This package is licensed under the GNU Lesser General Public License (LGPL) v3.
Contributing
Contributions welcome! Please open an issue or submit a PR.