Skip to content

Commit 68f7248

Browse files
feat(batch): async batch processing with worker pool (#71)
Implements issue #11. Adds pkg/batch: - Processor: in-memory job queue backed by a configurable worker pool - Submit: enqueues a job, returns immediately with job_id - Get: returns current job state (queued/processing/completed/failed) - Results: returns deduplicated chunks + pipeline stats for completed jobs - List: filter jobs by status - evictLoop: removes completed/failed jobs after ResultTTL (default 24h) - Stop: graceful shutdown, drains in-flight jobs HTTP handlers live in cmd/api_pipeline.go (part of feat/4-pipeline). Co-authored-by: Ona <no-reply@ona.com>
1 parent 1b600f7 commit 68f7248

2 files changed

Lines changed: 397 additions & 0 deletions

File tree

pkg/batch/batch.go

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
// Package batch provides async batch processing for large deduplication workloads.
2+
// Jobs are queued in-memory, processed by a background worker pool, and results
3+
// are retained for a configurable TTL.
4+
package batch
5+
6+
import (
7+
"context"
8+
"errors"
9+
"fmt"
10+
"sync"
11+
"time"
12+
13+
"github.com/Siddhant-K-code/distill/pkg/pipeline"
14+
"github.com/Siddhant-K-code/distill/pkg/types"
15+
)
16+
17+
// Status represents the lifecycle state of a batch job.
18+
type Status string
19+
20+
const (
21+
StatusQueued Status = "queued"
22+
StatusProcessing Status = "processing"
23+
StatusCompleted Status = "completed"
24+
StatusFailed Status = "failed"
25+
)
26+
27+
// Job holds the input, state, and result of a single batch job.
28+
type Job struct {
29+
ID string
30+
Status Status
31+
Chunks []types.Chunk
32+
Options pipeline.Options
33+
Result []types.Chunk
34+
Stats pipeline.Stats
35+
Error string
36+
CreatedAt time.Time
37+
StartedAt time.Time
38+
CompletedAt time.Time
39+
Progress float64 // 0–1
40+
}
41+
42+
// SubmitRequest is the input for submitting a new batch job.
43+
type SubmitRequest struct {
44+
Chunks []types.Chunk
45+
Options pipeline.Options
46+
}
47+
48+
// ErrJobNotFound is returned when a job ID does not exist.
49+
var ErrJobNotFound = errors.New("job not found")
50+
51+
// ErrResultExpired is returned when a job's result has been evicted.
52+
var ErrResultExpired = errors.New("job result has expired")
53+
54+
// Processor manages the job queue, worker pool, and result store.
55+
type Processor struct {
56+
mu sync.RWMutex
57+
jobs map[string]*Job
58+
queue chan string
59+
resultTTL time.Duration
60+
runner *pipeline.Runner
61+
wg sync.WaitGroup
62+
cancelFunc context.CancelFunc
63+
}
64+
65+
// Config controls processor behaviour.
66+
type Config struct {
67+
// Workers is the number of concurrent processing goroutines. Default: 4.
68+
Workers int
69+
// QueueSize is the maximum number of queued jobs. Default: 1000.
70+
QueueSize int
71+
// ResultTTL is how long completed job results are retained. Default: 24h.
72+
ResultTTL time.Duration
73+
}
74+
75+
// DefaultConfig returns sensible defaults.
76+
func DefaultConfig() Config {
77+
return Config{
78+
Workers: 4,
79+
QueueSize: 1000,
80+
ResultTTL: 24 * time.Hour,
81+
}
82+
}
83+
84+
// NewProcessor creates and starts a Processor with the given config.
85+
func NewProcessor(cfg Config) *Processor {
86+
if cfg.Workers < 0 {
87+
cfg.Workers = 4
88+
}
89+
if cfg.QueueSize <= 0 {
90+
cfg.QueueSize = 1000
91+
}
92+
if cfg.ResultTTL <= 0 {
93+
cfg.ResultTTL = 24 * time.Hour
94+
}
95+
96+
ctx, cancel := context.WithCancel(context.Background())
97+
p := &Processor{
98+
jobs: make(map[string]*Job),
99+
queue: make(chan string, cfg.QueueSize),
100+
resultTTL: cfg.ResultTTL,
101+
runner: pipeline.New(),
102+
cancelFunc: cancel,
103+
}
104+
105+
for i := 0; i < cfg.Workers; i++ {
106+
p.wg.Add(1)
107+
go p.worker(ctx)
108+
}
109+
110+
go p.evictLoop(ctx)
111+
112+
return p
113+
}
114+
115+
// Submit enqueues a new batch job and returns its ID.
116+
func (p *Processor) Submit(req SubmitRequest) (*Job, error) {
117+
id := generateID()
118+
job := &Job{
119+
ID: id,
120+
Status: StatusQueued,
121+
Chunks: req.Chunks,
122+
Options: req.Options,
123+
CreatedAt: time.Now(),
124+
}
125+
126+
p.mu.Lock()
127+
p.jobs[id] = job
128+
p.mu.Unlock()
129+
130+
select {
131+
case p.queue <- id:
132+
default:
133+
p.mu.Lock()
134+
delete(p.jobs, id)
135+
p.mu.Unlock()
136+
return nil, fmt.Errorf("job queue is full")
137+
}
138+
139+
return job, nil
140+
}
141+
142+
// Get returns the current state of a job.
143+
func (p *Processor) Get(id string) (*Job, error) {
144+
p.mu.RLock()
145+
defer p.mu.RUnlock()
146+
job, ok := p.jobs[id]
147+
if !ok {
148+
return nil, ErrJobNotFound
149+
}
150+
// Return a copy to avoid data races.
151+
cp := *job
152+
return &cp, nil
153+
}
154+
155+
// Results returns the deduplicated chunks for a completed job.
156+
func (p *Processor) Results(id string) ([]types.Chunk, pipeline.Stats, error) {
157+
p.mu.RLock()
158+
defer p.mu.RUnlock()
159+
job, ok := p.jobs[id]
160+
if !ok {
161+
return nil, pipeline.Stats{}, ErrJobNotFound
162+
}
163+
if job.Status != StatusCompleted {
164+
return nil, pipeline.Stats{}, fmt.Errorf("job %s is %s, not completed", id, job.Status)
165+
}
166+
return job.Result, job.Stats, nil
167+
}
168+
169+
// List returns all jobs, optionally filtered by status ("" = all).
170+
func (p *Processor) List(status Status) []*Job {
171+
p.mu.RLock()
172+
defer p.mu.RUnlock()
173+
var out []*Job
174+
for _, j := range p.jobs {
175+
if status == "" || j.Status == status {
176+
cp := *j
177+
out = append(out, &cp)
178+
}
179+
}
180+
return out
181+
}
182+
183+
// Stop gracefully shuts down the processor, waiting for in-flight jobs.
184+
func (p *Processor) Stop() {
185+
p.cancelFunc()
186+
close(p.queue)
187+
p.wg.Wait()
188+
}
189+
190+
// worker processes jobs from the queue.
191+
func (p *Processor) worker(ctx context.Context) {
192+
defer p.wg.Done()
193+
for id := range p.queue {
194+
if ctx.Err() != nil {
195+
return
196+
}
197+
p.process(ctx, id)
198+
}
199+
}
200+
201+
// process runs the pipeline for a single job.
202+
func (p *Processor) process(ctx context.Context, id string) {
203+
p.mu.Lock()
204+
job, ok := p.jobs[id]
205+
if !ok {
206+
p.mu.Unlock()
207+
return
208+
}
209+
job.Status = StatusProcessing
210+
job.StartedAt = time.Now()
211+
job.Progress = 0.0
212+
p.mu.Unlock()
213+
214+
result, stats, err := p.runner.Run(ctx, job.Chunks, job.Options)
215+
216+
p.mu.Lock()
217+
defer p.mu.Unlock()
218+
job, ok = p.jobs[id]
219+
if !ok {
220+
return
221+
}
222+
job.CompletedAt = time.Now()
223+
job.Progress = 1.0
224+
if err != nil {
225+
job.Status = StatusFailed
226+
job.Error = err.Error()
227+
} else {
228+
job.Status = StatusCompleted
229+
job.Result = result
230+
job.Stats = stats
231+
}
232+
}
233+
234+
// evictLoop removes completed/failed jobs whose results have expired.
235+
func (p *Processor) evictLoop(ctx context.Context) {
236+
ticker := time.NewTicker(5 * time.Minute)
237+
defer ticker.Stop()
238+
for {
239+
select {
240+
case <-ctx.Done():
241+
return
242+
case <-ticker.C:
243+
p.evict()
244+
}
245+
}
246+
}
247+
248+
func (p *Processor) evict() {
249+
cutoff := time.Now().Add(-p.resultTTL)
250+
p.mu.Lock()
251+
defer p.mu.Unlock()
252+
for id, job := range p.jobs {
253+
if (job.Status == StatusCompleted || job.Status == StatusFailed) &&
254+
job.CompletedAt.Before(cutoff) {
255+
delete(p.jobs, id)
256+
}
257+
}
258+
}
259+
260+
// generateID returns a simple time-based unique ID.
261+
func generateID() string {
262+
return fmt.Sprintf("batch_%d", time.Now().UnixNano())
263+
}

pkg/batch/batch_test.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package batch
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
"time"
7+
8+
"github.com/Siddhant-K-code/distill/pkg/pipeline"
9+
"github.com/Siddhant-K-code/distill/pkg/types"
10+
)
11+
12+
func makeChunks(n int) []types.Chunk {
13+
chunks := make([]types.Chunk, n)
14+
for i := range chunks {
15+
chunks[i] = types.Chunk{
16+
ID: fmt.Sprintf("chunk-%d", i),
17+
Text: "test chunk content for batch processing",
18+
}
19+
}
20+
return chunks
21+
}
22+
23+
func TestSubmitAndGet(t *testing.T) {
24+
p := NewProcessor(Config{Workers: 1, QueueSize: 10, ResultTTL: time.Minute})
25+
defer p.Stop()
26+
27+
job, err := p.Submit(SubmitRequest{
28+
Chunks: []types.Chunk{{ID: "a", Text: "hello"}},
29+
Options: pipeline.Options{},
30+
})
31+
if err != nil {
32+
t.Fatalf("Submit: %v", err)
33+
}
34+
if job.ID == "" {
35+
t.Error("expected non-empty job ID")
36+
}
37+
38+
// Poll until done (max 2s).
39+
deadline := time.Now().Add(2 * time.Second)
40+
for time.Now().Before(deadline) {
41+
got, err := p.Get(job.ID)
42+
if err != nil {
43+
t.Fatalf("Get: %v", err)
44+
}
45+
if got.Status == StatusCompleted || got.Status == StatusFailed {
46+
break
47+
}
48+
time.Sleep(10 * time.Millisecond)
49+
}
50+
51+
got, _ := p.Get(job.ID)
52+
if got.Status != StatusCompleted {
53+
t.Errorf("expected StatusCompleted, got %s (error: %s)", got.Status, got.Error)
54+
}
55+
}
56+
57+
func TestResults_NotCompleted(t *testing.T) {
58+
p := NewProcessor(Config{Workers: 0, QueueSize: 10, ResultTTL: time.Minute})
59+
// Workers=0 means nothing processes; job stays queued.
60+
defer p.Stop()
61+
62+
job, _ := p.Submit(SubmitRequest{
63+
Chunks: []types.Chunk{{ID: "a", Text: "hello"}},
64+
Options: pipeline.Options{},
65+
})
66+
67+
_, _, err := p.Results(job.ID)
68+
if err == nil {
69+
t.Error("expected error for non-completed job")
70+
}
71+
}
72+
73+
func TestGet_NotFound(t *testing.T) {
74+
p := NewProcessor(DefaultConfig())
75+
defer p.Stop()
76+
77+
_, err := p.Get("nonexistent")
78+
if err != ErrJobNotFound {
79+
t.Errorf("expected ErrJobNotFound, got %v", err)
80+
}
81+
}
82+
83+
func TestList(t *testing.T) {
84+
p := NewProcessor(Config{Workers: 1, QueueSize: 10, ResultTTL: time.Minute})
85+
defer p.Stop()
86+
87+
p.Submit(SubmitRequest{Chunks: []types.Chunk{{ID: "a", Text: "hello"}}, Options: pipeline.Options{}})
88+
p.Submit(SubmitRequest{Chunks: []types.Chunk{{ID: "b", Text: "world"}}, Options: pipeline.Options{}})
89+
90+
// Wait briefly for processing.
91+
time.Sleep(200 * time.Millisecond)
92+
93+
all := p.List("")
94+
if len(all) < 1 {
95+
t.Error("expected at least one job in list")
96+
}
97+
}
98+
99+
func TestQueueFull(t *testing.T) {
100+
// QueueSize=0 means the channel has no buffer — submit should fail.
101+
p := NewProcessor(Config{Workers: 0, QueueSize: 1, ResultTTL: time.Minute})
102+
defer p.Stop()
103+
104+
// Fill the queue.
105+
p.Submit(SubmitRequest{Chunks: []types.Chunk{{ID: "a", Text: "x"}}, Options: pipeline.Options{}})
106+
107+
// This should fail.
108+
_, err := p.Submit(SubmitRequest{Chunks: []types.Chunk{{ID: "b", Text: "y"}}, Options: pipeline.Options{}})
109+
if err == nil {
110+
t.Error("expected error when queue is full")
111+
}
112+
}
113+
114+
func TestDefaultConfig(t *testing.T) {
115+
cfg := DefaultConfig()
116+
if cfg.Workers <= 0 {
117+
t.Error("expected positive workers")
118+
}
119+
if cfg.ResultTTL <= 0 {
120+
t.Error("expected positive ResultTTL")
121+
}
122+
}
123+
124+
func TestGenerateID_Unique(t *testing.T) {
125+
ids := map[string]bool{}
126+
for i := 0; i < 100; i++ {
127+
id := generateID()
128+
if ids[id] {
129+
t.Errorf("duplicate ID generated: %s", id)
130+
}
131+
ids[id] = true
132+
time.Sleep(time.Nanosecond)
133+
}
134+
}

0 commit comments

Comments
 (0)