A high-performance, thread-safe priority queue implementation in Go featuring dual-queue prioritization and dynamic worker management.
- Dual-Queue System: Separate fast (high-priority) and slow (regular) queues
- Priority-Based Routing: Automatic job routing based on worker priority
- Thread-Safe: Concurrent enqueue/dequeue operations with atomic statistics
- Blocking Operations: Support for blocking dequeue with timeout
- Dynamic Priority Management: Update priority workers at runtime
- External Configuration: Optional external endpoint for priority list updates
- Comprehensive Statistics: Real-time queue metrics and monitoring
go get github.com/priority-queuepackage main
import (
"fmt"
"log"
"github.com/priority-queue/internal/types"
"github.com/priority-queue/pkg/queue"
)
func main() {
// Create a priority queue with buffer sizes
pq := queue.NewPriorityQueue(100, 100)
defer pq.Close()
// Create jobs
highPriorityJob := types.NewJob("process", "job-123", "worker-001")
highPriorityJob.Priority = 10
regularJob := types.NewJob("analyze", "job-456", "worker-002")
// Enqueue jobs
if err := pq.Enqueue(highPriorityJob); err != nil {
log.Fatal(err)
}
if err := pq.Enqueue(regularJob); err != nil {
log.Fatal(err)
}
// Dequeue jobs (high priority first)
job, err := pq.Dequeue()
if err != nil {
log.Fatal(err)
}
fmt.Printf("Processing job: %s\n", job.UUID)
}// Create priority manager with static list
priorityWorkers := []string{"vip-worker-1", "vip-worker-2"}
pm := queue.NewPriorityManagerWithDefaults(priorityWorkers)
defer pm.Stop()
// Or with external endpoint
pm := queue.NewPriorityManager("https://api.example.com/priority-workers", 5*time.Minute)
defer pm.Stop()
// Route jobs based on worker priority
pq := queue.NewPriorityQueue(100, 100)
job := types.NewJob("task", "job-789", "vip-worker-1")
if pm.IsPriorityWorker(job.WorkerID) {
pq.EnqueueFast(job)
} else {
pq.EnqueueSlow(job)
}// Blocking dequeue - waits until job available
job, err := pq.DequeueBlocking()
// Dequeue with timeout
job, err := pq.DequeueWithTimeout(5 * time.Second)stats := pq.GetStats()
fmt.Printf("Fast queue depth: %d\n", stats.FastQueueDepth)
fmt.Printf("Slow queue depth: %d\n", stats.SlowQueueDepth)
fmt.Printf("Fast processed: %d\n", stats.FastProcessed)
fmt.Printf("Slow processed: %d\n", stats.SlowProcessed)The priority queue maintains two separate channels:
- Fast Queue: For high-priority jobs
- Slow Queue: For regular jobs
Dequeue operations always check the fast queue first, ensuring high-priority jobs are processed before regular jobs.
The priority manager maintains a set of worker IDs that should receive priority processing. It supports:
- Static configuration with predefined worker lists
- Dynamic updates from external HTTP endpoints
- Thread-safe concurrent access
- Automatic periodic refresh
type Job struct {
Type string // Job type identifier
Arguments JobArguments // Key-value parameters
UUID string // Unique job identifier
WorkerID string // Worker that submitted the job
Priority int // Priority level (>0 for high priority)
CreatedAt time.Time // Job creation timestamp
Timeout time.Duration // Maximum processing time
}NewPriorityQueue(fastSize, slowSize int) *PriorityQueueEnqueueFast(job *types.Job) errorEnqueueSlow(job *types.Job) errorEnqueue(job *types.Job) error- Routes based on job.PriorityDequeue() (*types.Job, error)- Non-blockingDequeueBlocking() (*types.Job, error)- Blocks until job availableDequeueWithTimeout(timeout time.Duration) (*types.Job, error)GetStats() QueueStatsClose()IsClosed() bool
NewPriorityManager(endpoint string, interval time.Duration) *PriorityManagerNewPriorityManagerWithDefaults(workers []string) *PriorityManagerIsPriorityWorker(workerID string) boolGetPriorityWorkers() []stringUpdatePriorityWorkers(workerIDs []string)AddPriorityWorker(workerID string)RemovePriorityWorker(workerID string)Stop()
The queue system returns specific errors for different scenarios:
var (
ErrQueueClosed = errors.New("queue is closed")
ErrQueueFull = errors.New("queue is full")
ErrQueueEmpty = errors.New("all queues are empty")
)See the cmd/demo/main.go file for comprehensive examples including:
- Basic queue operations
- Priority-based routing
- Concurrent processing
- Blocking operations
- Statistics monitoring
Run the test suite:
go test ./...Run with coverage:
go test -coverprofile=coverage.out ./...
go tool cover -html=coverage.out- Queue operations use channels for high performance
- Statistics use atomic operations for lock-free updates
- Priority checks are O(1) using map lookups
- Blocking operations use efficient channel selects with periodic priority checks
Contributions are welcome! Please ensure:
- All tests pass
- Code follows Go conventions
- New features include tests
- Documentation is updated
This project is licensed under the MIT License - see the LICENSE file for details.