Skip to content

Commit f57900a

Browse files
feat(pipeline): unified dedup→compress→summarize pipeline
Implements issue #4. Adds: pkg/pipeline: - Runner.Run: chains ClusterByThreshold → ExtractiveCompressor → HierarchicalSummarizer; each stage independently enable/disable - StageStats: per-stage input/output tokens, reduction ratio, latency - DefaultOptions: dedup+compress enabled, summarize opt-in cmd/pipeline.go: - 'distill pipeline' CLI reads JSON chunks from stdin/file, writes optimised chunks to stdout/file; --stats prints per-stage breakdown cmd/api_pipeline.go: - POST /v1/pipeline: synchronous pipeline with full stats response - POST /v1/batch: submit async job, returns job_id - GET /v1/batch/{id}: poll status and progress - GET /v1/batch/{id}/results: retrieve completed results Co-authored-by: Ona <no-reply@ona.com>
1 parent 85cf702 commit f57900a

5 files changed

Lines changed: 869 additions & 0 deletions

File tree

cmd/api.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,10 @@ func runAPI(cmd *cobra.Command, args []string) error {
240240
sessAPI.RegisterSessionRoutes(mux, m.Middleware)
241241
}
242242

243+
// Pipeline and batch routes.
244+
pipelineAPI := NewPipelineAPI()
245+
pipelineAPI.RegisterPipelineRoutes(mux, m.Middleware)
246+
243247
mux.HandleFunc("/health", server.handleHealth)
244248
mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
245249
m.Handler().ServeHTTP(w, r)

cmd/api_pipeline.go

Lines changed: 309 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
package cmd
2+
3+
import (
4+
"encoding/json"
5+
"net/http"
6+
"strings"
7+
8+
"github.com/Siddhant-K-code/distill/pkg/batch"
9+
"github.com/Siddhant-K-code/distill/pkg/pipeline"
10+
"github.com/Siddhant-K-code/distill/pkg/types"
11+
)
12+
13+
// PipelineRequest is the JSON body for POST /v1/pipeline.
14+
type PipelineRequest struct {
15+
Chunks []DedupeChunk `json:"chunks"`
16+
Options PipelineOptions `json:"options,omitempty"`
17+
}
18+
19+
// PipelineOptions mirrors pipeline.Options for JSON serialisation.
20+
type PipelineOptions struct {
21+
Dedup PipelineDedupOptions `json:"dedup,omitempty"`
22+
Compress PipelineCompressOptions `json:"compress,omitempty"`
23+
Summarize PipelineSummarizeOptions `json:"summarize,omitempty"`
24+
}
25+
26+
type PipelineDedupOptions struct {
27+
Enabled bool `json:"enabled"`
28+
Threshold float64 `json:"threshold,omitempty"`
29+
Lambda float64 `json:"lambda,omitempty"`
30+
TargetK int `json:"target_k,omitempty"`
31+
}
32+
33+
type PipelineCompressOptions struct {
34+
Enabled bool `json:"enabled"`
35+
TargetReduction float64 `json:"target_reduction,omitempty"`
36+
}
37+
38+
type PipelineSummarizeOptions struct {
39+
Enabled bool `json:"enabled"`
40+
MaxTokens int `json:"max_tokens,omitempty"`
41+
KeepRecent int `json:"keep_recent,omitempty"`
42+
}
43+
44+
// PipelineResponse is the JSON response for POST /v1/pipeline.
45+
type PipelineResponse struct {
46+
Chunks []DedupeChunk `json:"chunks"`
47+
Stats PipelineStatsPayload `json:"stats"`
48+
}
49+
50+
// PipelineStatsPayload is the serialisable form of pipeline.Stats.
51+
type PipelineStatsPayload struct {
52+
OriginalTokens int `json:"original_tokens"`
53+
FinalTokens int `json:"final_tokens"`
54+
TotalReduction float64 `json:"total_reduction"`
55+
LatencyMs float64 `json:"latency_ms"`
56+
Stages map[string]StageStatsPL `json:"stages"`
57+
}
58+
59+
// StageStatsPL is the serialisable form of pipeline.StageStats.
60+
type StageStatsPL struct {
61+
Enabled bool `json:"enabled"`
62+
InputTokens int `json:"input_tokens"`
63+
OutputTokens int `json:"output_tokens"`
64+
Reduction float64 `json:"reduction"`
65+
LatencyMs float64 `json:"latency_ms"`
66+
}
67+
68+
// BatchSubmitRequest is the JSON body for POST /v1/batch.
69+
type BatchSubmitRequest struct {
70+
Chunks []DedupeChunk `json:"chunks"`
71+
Options PipelineOptions `json:"options,omitempty"`
72+
}
73+
74+
// BatchSubmitResponse is the JSON response for POST /v1/batch.
75+
type BatchSubmitResponse struct {
76+
JobID string `json:"job_id"`
77+
Status string `json:"status"`
78+
}
79+
80+
// BatchStatusResponse is the JSON response for GET /v1/batch/{id}.
81+
type BatchStatusResponse struct {
82+
JobID string `json:"job_id"`
83+
Status string `json:"status"`
84+
Progress float64 `json:"progress"`
85+
Error string `json:"error,omitempty"`
86+
CreatedAt string `json:"created_at"`
87+
StartedAt string `json:"started_at,omitempty"`
88+
CompletedAt string `json:"completed_at,omitempty"`
89+
}
90+
91+
// BatchResultsResponse is the JSON response for GET /v1/batch/{id}/results.
92+
type BatchResultsResponse struct {
93+
JobID string `json:"job_id"`
94+
Status string `json:"status"`
95+
Chunks []DedupeChunk `json:"chunks"`
96+
Stats PipelineStatsPayload `json:"stats"`
97+
}
98+
99+
// PipelineAPI holds the pipeline runner and batch processor.
100+
type PipelineAPI struct {
101+
processor *batch.Processor
102+
}
103+
104+
// NewPipelineAPI creates a PipelineAPI with a default batch processor.
105+
func NewPipelineAPI() *PipelineAPI {
106+
return &PipelineAPI{
107+
processor: batch.NewProcessor(batch.DefaultConfig()),
108+
}
109+
}
110+
111+
// RegisterPipelineRoutes wires up /v1/pipeline and /v1/batch/* routes.
112+
func (a *PipelineAPI) RegisterPipelineRoutes(mux *http.ServeMux, middleware func(string, http.HandlerFunc) http.HandlerFunc) {
113+
mux.HandleFunc("/v1/pipeline", middleware("/v1/pipeline", a.handlePipeline))
114+
mux.HandleFunc("/v1/batch", middleware("/v1/batch", a.handleBatchSubmit))
115+
mux.HandleFunc("/v1/batch/", middleware("/v1/batch/", a.handleBatchLookup))
116+
}
117+
118+
// handlePipeline runs the full pipeline synchronously.
119+
func (a *PipelineAPI) handlePipeline(w http.ResponseWriter, r *http.Request) {
120+
if r.Method != http.MethodPost {
121+
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
122+
return
123+
}
124+
125+
var req PipelineRequest
126+
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
127+
http.Error(w, "invalid JSON: "+err.Error(), http.StatusBadRequest)
128+
return
129+
}
130+
131+
chunks := dedupeChunksToTypes(req.Chunks)
132+
opts := pipelineOptsFromRequest(req.Options)
133+
134+
runner := pipeline.New()
135+
result, stats, err := runner.Run(r.Context(), chunks, opts)
136+
if err != nil {
137+
http.Error(w, "pipeline error: "+err.Error(), http.StatusInternalServerError)
138+
return
139+
}
140+
141+
resp := PipelineResponse{
142+
Chunks: typesToDedupeChunks(result),
143+
Stats: marshalStats(stats),
144+
}
145+
w.Header().Set("Content-Type", "application/json")
146+
json.NewEncoder(w).Encode(resp)
147+
}
148+
149+
// handleBatchSubmit accepts a new batch job.
150+
func (a *PipelineAPI) handleBatchSubmit(w http.ResponseWriter, r *http.Request) {
151+
if r.Method != http.MethodPost {
152+
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
153+
return
154+
}
155+
156+
var req BatchSubmitRequest
157+
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
158+
http.Error(w, "invalid JSON: "+err.Error(), http.StatusBadRequest)
159+
return
160+
}
161+
162+
job, err := a.processor.Submit(batch.SubmitRequest{
163+
Chunks: dedupeChunksToTypes(req.Chunks),
164+
Options: pipelineOptsFromRequest(req.Options),
165+
})
166+
if err != nil {
167+
http.Error(w, "submit error: "+err.Error(), http.StatusServiceUnavailable)
168+
return
169+
}
170+
171+
w.Header().Set("Content-Type", "application/json")
172+
w.WriteHeader(http.StatusAccepted)
173+
json.NewEncoder(w).Encode(BatchSubmitResponse{
174+
JobID: job.ID,
175+
Status: string(job.Status),
176+
})
177+
}
178+
179+
// handleBatchLookup handles GET /v1/batch/{id} and GET /v1/batch/{id}/results.
180+
func (a *PipelineAPI) handleBatchLookup(w http.ResponseWriter, r *http.Request) {
181+
if r.Method != http.MethodGet {
182+
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
183+
return
184+
}
185+
186+
// Path: /v1/batch/{id} or /v1/batch/{id}/results
187+
path := strings.TrimPrefix(r.URL.Path, "/v1/batch/")
188+
parts := strings.SplitN(path, "/", 2)
189+
id := parts[0]
190+
sub := ""
191+
if len(parts) == 2 {
192+
sub = parts[1]
193+
}
194+
195+
if sub == "results" {
196+
a.handleBatchResults(w, r, id)
197+
return
198+
}
199+
a.handleBatchStatus(w, r, id)
200+
}
201+
202+
func (a *PipelineAPI) handleBatchStatus(w http.ResponseWriter, _ *http.Request, id string) {
203+
job, err := a.processor.Get(id)
204+
if err != nil {
205+
http.Error(w, err.Error(), http.StatusNotFound)
206+
return
207+
}
208+
resp := BatchStatusResponse{
209+
JobID: job.ID,
210+
Status: string(job.Status),
211+
Progress: job.Progress,
212+
Error: job.Error,
213+
}
214+
if !job.CreatedAt.IsZero() {
215+
resp.CreatedAt = job.CreatedAt.UTC().Format("2006-01-02T15:04:05Z")
216+
}
217+
if !job.StartedAt.IsZero() {
218+
resp.StartedAt = job.StartedAt.UTC().Format("2006-01-02T15:04:05Z")
219+
}
220+
if !job.CompletedAt.IsZero() {
221+
resp.CompletedAt = job.CompletedAt.UTC().Format("2006-01-02T15:04:05Z")
222+
}
223+
w.Header().Set("Content-Type", "application/json")
224+
json.NewEncoder(w).Encode(resp)
225+
}
226+
227+
func (a *PipelineAPI) handleBatchResults(w http.ResponseWriter, _ *http.Request, id string) {
228+
chunks, stats, err := a.processor.Results(id)
229+
if err != nil {
230+
code := http.StatusNotFound
231+
if err == batch.ErrJobNotFound {
232+
code = http.StatusNotFound
233+
} else {
234+
code = http.StatusConflict
235+
}
236+
http.Error(w, err.Error(), code)
237+
return
238+
}
239+
resp := BatchResultsResponse{
240+
JobID: id,
241+
Status: string(batch.StatusCompleted),
242+
Chunks: typesToDedupeChunks(chunks),
243+
Stats: marshalStats(stats),
244+
}
245+
w.Header().Set("Content-Type", "application/json")
246+
json.NewEncoder(w).Encode(resp)
247+
}
248+
249+
// ── helpers ───────────────────────────────────────────────────────────────────
250+
251+
func dedupeChunksToTypes(in []DedupeChunk) []types.Chunk {
252+
out := make([]types.Chunk, len(in))
253+
for i, c := range in {
254+
out[i] = types.Chunk{
255+
ID: c.ID,
256+
Text: c.Text,
257+
Embedding: c.Embedding,
258+
Score: c.Score,
259+
}
260+
}
261+
return out
262+
}
263+
264+
func typesToDedupeChunks(in []types.Chunk) []DedupeChunk {
265+
out := make([]DedupeChunk, len(in))
266+
for i, c := range in {
267+
out[i] = DedupeChunk{
268+
ID: c.ID,
269+
Text: c.Text,
270+
Embedding: c.Embedding,
271+
Score: c.Score,
272+
}
273+
}
274+
return out
275+
}
276+
277+
func pipelineOptsFromRequest(o PipelineOptions) pipeline.Options {
278+
return pipeline.Options{
279+
DedupEnabled: o.Dedup.Enabled,
280+
DedupThreshold: o.Dedup.Threshold,
281+
DedupLambda: o.Dedup.Lambda,
282+
DedupTargetK: o.Dedup.TargetK,
283+
CompressEnabled: o.Compress.Enabled,
284+
CompressTargetReduction: o.Compress.TargetReduction,
285+
SummarizeEnabled: o.Summarize.Enabled,
286+
SummarizeMaxTokens: o.Summarize.MaxTokens,
287+
SummarizeRecent: o.Summarize.KeepRecent,
288+
}
289+
}
290+
291+
func marshalStats(s pipeline.Stats) PipelineStatsPayload {
292+
stages := make(map[string]StageStatsPL, len(s.Stages))
293+
for k, v := range s.Stages {
294+
stages[k] = StageStatsPL{
295+
Enabled: v.Enabled,
296+
InputTokens: v.InputTokens,
297+
OutputTokens: v.OutputTokens,
298+
Reduction: v.Reduction,
299+
LatencyMs: float64(v.Latency.Microseconds()) / 1000.0,
300+
}
301+
}
302+
return PipelineStatsPayload{
303+
OriginalTokens: s.OriginalTokens,
304+
FinalTokens: s.FinalTokens,
305+
TotalReduction: s.TotalReduction,
306+
LatencyMs: float64(s.TotalLatency.Microseconds()) / 1000.0,
307+
Stages: stages,
308+
}
309+
}

0 commit comments

Comments
 (0)