Skip to content

Commit 63460fc

Browse files
authored
fix(enricher): clean up streaming output and add --no-stream to generate (#67)
* fix(enricher): clean up streaming output and add --no-stream to generate Remove per-chunk [api]/[schema] prefixes from streaming output — they were injected on every flush (each newline), making the LLM response unreadable. Stream raw chunks instead via new StreamWriter.WriteRaw(). Redirect streaming output to stderr (alongside the progress bar) so the two no longer interleave unpredictably on the terminal. Add --no-stream and --concurrency flags to the generate command, bringing feature parity with the enrich command. Signed-off-by: spencercjh <spencercjh@gmail.com> * test(enricher): strengthen streaming assertions and update docs - Assert deterministic chunk content/order in streaming tests instead of just checking "non-empty and no prefix" (addresses Copilot C1, C2) - Remove stale [api]/[schema]/[param] prefix references from CLAUDE.md and docs/ai-enrichment.md (addresses Qodo Q2) Signed-off-by: spencercjh <spencercjh@gmail.com> --------- Signed-off-by: spencercjh <spencercjh@gmail.com>
1 parent b900064 commit 63460fc

8 files changed

Lines changed: 74 additions & 17 deletions

File tree

CLAUDE.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -263,8 +263,8 @@ LLM_API_KEY="your-deepseek-api-key" ./build/spec-forge enrich \
263263
--no-stream
264264
```
265265

266-
> **Note:** Streaming is enabled by default, showing real-time LLM output with batch-type prefixes
267-
> (`[api]`, `[schema]`, `[param]`). With streaming on, batches are processed sequentially for readable output.
266+
> **Note:** Streaming is enabled by default, showing real-time LLM output to stderr.
267+
> With streaming on, batches are processed sequentially for readable output.
268268
> Use `--no-stream` to enable concurrent processing across batches for faster enrichment.
269269
270270
Expected output:

cmd/enrich.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ func runEnrich(cmd *cobra.Command, args []string) error {
161161
result, err := e.Enrich(ctx, spec, &enricher.EnrichOptions{
162162
Language: lang,
163163
Stream: &streamEnabled,
164+
Writer: os.Stderr,
164165
Force: forceFlag,
165166
})
166167
if err != nil {

cmd/generate.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ func runGenerate(cmd *cobra.Command, args []string) error { //nolint:gocyclo //
8181
overwriteOutput, _ := cmd.Flags().GetBool("overwrite-output")
8282
//nolint:errcheck
8383
protoImportPaths, _ := cmd.Flags().GetStringSlice("proto-import-path")
84+
//nolint:errcheck
85+
noStream, _ := cmd.Flags().GetBool("no-stream")
86+
//nolint:errcheck
87+
concurrency, _ := cmd.Flags().GetInt("concurrency")
8488

8589
// Step 1: Detect framework - try all registered extractors
8690
extractorImpl, info, err := builtin.DetectFramework(path)
@@ -187,7 +191,7 @@ func runGenerate(cmd *cobra.Command, args []string) error { //nolint:gocyclo //
187191
// Step 6: Enrich with AI (optional)
188192
cfg := config.Get()
189193
if !skipEnrich && cfg.Enrich.Enabled && cfg.Enrich.Provider != "" && cfg.Enrich.Model != "" {
190-
if enrichErr := enrichGeneratedSpec(ctx, genResult.SpecFilePath, cfg, language); enrichErr != nil {
194+
if enrichErr := enrichGeneratedSpec(ctx, genResult.SpecFilePath, cfg, language, noStream, concurrency); enrichErr != nil {
191195
// Log warning but don't fail - enrichment is optional
192196
slog.WarnContext(ctx, "Enrichment failed (non-fatal)", "error", enrichErr)
193197
}
@@ -313,6 +317,10 @@ to preserve your project's formatting. Use --keep-patched to keep the changes.`,
313317
"overwrite existing local spec file if it already exists")
314318
c.Flags().StringSlice("proto-import-path", nil,
315319
"additional import paths for protoc (-I flags), can be specified multiple times")
320+
c.Flags().Bool("no-stream", false,
321+
"disable streaming to enable concurrent LLM calls (faster, but no real-time output)")
322+
c.Flags().Int("concurrency", 3,
323+
"max concurrent LLM calls (only effective with --no-stream)")
316324

317325
registerCompletion(c, "output", []string{"yaml", "json"})
318326
registerCompletion(c, "language", []string{"en", "zh"})
@@ -335,6 +343,8 @@ var (
335343
generatePublishOverwrite bool
336344
generateOverwriteOutput bool
337345
generateProtoImportPaths []string
346+
generateNoStream bool
347+
generateConcurrency int
338348
)
339349

340350
func init() {
@@ -364,14 +374,18 @@ func init() {
364374
"overwrite existing local spec file if it already exists")
365375
generateCmd.Flags().StringSliceVar(&generateProtoImportPaths, "proto-import-path", nil,
366376
"additional import paths for protoc (-I flags), can be specified multiple times")
377+
generateCmd.Flags().BoolVar(&generateNoStream, "no-stream", false,
378+
"disable streaming to enable concurrent LLM calls (faster, but no real-time output)")
379+
generateCmd.Flags().IntVar(&generateConcurrency, "concurrency", 3,
380+
"max concurrent LLM calls (only effective with --no-stream)")
367381

368382
registerCompletion(generateCmd, "output", []string{"yaml", "json"})
369383
registerCompletion(generateCmd, "language", []string{"en", "zh"})
370384
registerCompletion(generateCmd, "publish-target", []string{"readme"})
371385
}
372386

373387
// enrichGeneratedSpec enriches the generated spec with AI-generated descriptions
374-
func enrichGeneratedSpec(ctx context.Context, specFilePath string, cfg *config.Config, language string) error {
388+
func enrichGeneratedSpec(ctx context.Context, specFilePath string, cfg *config.Config, language string, noStream bool, concurrency int) error {
375389
cli.Statusf(os.Stderr, "Enriching OpenAPI spec with AI descriptions...")
376390

377391
// Determine language
@@ -417,6 +431,7 @@ func enrichGeneratedSpec(ctx context.Context, specFilePath string, cfg *config.C
417431
Timeout: timeout,
418432
CustomBaseURL: cfg.Enrich.BaseURL,
419433
CustomPrompts: customPrompts,
434+
Concurrency: concurrency,
420435
}
421436
enricherCfg = enricherCfg.MergeWithDefaults()
422437

@@ -427,7 +442,12 @@ func enrichGeneratedSpec(ctx context.Context, specFilePath string, cfg *config.C
427442
}
428443

429444
// Enrich
430-
result, err := e.Enrich(ctx, spec, &enricher.EnrichOptions{Language: lang})
445+
streamEnabled := !noStream
446+
result, err := e.Enrich(ctx, spec, &enricher.EnrichOptions{
447+
Language: lang,
448+
Stream: &streamEnabled,
449+
Writer: os.Stderr,
450+
})
431451
if err != nil {
432452
// Check if partial enrichment
433453
if partialErr, ok := errors.AsType[*processor.PartialEnrichmentError](err); ok {

docs/ai-enrichment.md

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,12 +110,10 @@ LLM_API_KEY="sk-xxx" spec-forge enrich ./openapi.json \
110110

111111
### Streaming (Default)
112112

113-
Shows real-time progress with batch type prefixes:
113+
Shows real-time LLM output as it is generated:
114114

115115
```
116-
[api] Processing batch 1/3...
117-
[schema] Processing batch 2/3...
118-
[param] Processing batch 3/3...
116+
{"summary": "获取用户列表", "description": "检索系统中所有可用的用户信息..."}
119117
```
120118

121119
Best for: Interactive use, seeing progress

internal/enricher/enricher_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -309,10 +309,11 @@ func TestEnricher_WithStreaming(t *testing.T) {
309309
})
310310
require.NoError(t, err)
311311

312-
// Verify streaming output was written
313-
// Note: TemplateType is lowercase, so prefix is "[api]" not "[API]"
312+
// Verify streaming output was written (raw, no prefix)
313+
expected := strings.Join(chunks, "")
314314
output := buf.String()
315-
assert.Contains(t, output, "[api]", "Expected [api] prefix in streaming output")
315+
assert.Contains(t, output, expected, "Expected streaming output to contain concatenated chunks")
316+
assert.NotContains(t, output, "[api]", "Streaming output should not contain prefix markers")
316317
}
317318

318319
// mockStreamingProvider simulates streaming behavior

internal/enricher/processor/batch.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,8 @@ func (p *BatchProcessor) ProcessBatch(ctx context.Context, batch *Batch) (*provi
6868
// Prepare options for provider
6969
var genOpts []provider.Option
7070
if p.streamWriter != nil {
71-
prefix := string(batch.Type) // e.g., "api", "schema", "param" (lowercase from TemplateType)
7271
genOpts = append(genOpts, provider.WithStreamingFunc(func(_ context.Context, chunk []byte) error {
73-
return p.streamWriter.WriteWithPrefix(prefix, chunk)
72+
return p.streamWriter.WriteRaw(chunk)
7473
}))
7574
}
7675

internal/enricher/processor/batch_test.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -408,10 +408,16 @@ func TestBatchProcessor_ProcessBatch_WithStreaming(t *testing.T) {
408408
t.Fatalf("Flush() error = %v", err)
409409
}
410410

411-
// Verify output contains the prefix
411+
// Verify output was streamed (raw, no prefix)
412412
output := buf.String()
413-
if !strings.Contains(output, "[api]") {
414-
t.Errorf("Expected output to contain '[api]' prefix, got: %s", output)
413+
if !strings.Contains(output, "chunk1") || !strings.Contains(output, "chunk2") {
414+
t.Errorf("Streaming output should contain 'chunk1' and 'chunk2', got: %s", output)
415+
}
416+
if idx1, idx2 := strings.Index(output, "chunk1"), strings.Index(output, "chunk2"); idx1 > idx2 {
417+
t.Errorf("Streaming chunks out of order: expected 'chunk1' before 'chunk2', got: %s", output)
418+
}
419+
if strings.Contains(output, "[api]") {
420+
t.Errorf("Streaming output should not contain prefix, got: %s", output)
415421
}
416422
}
417423

internal/enricher/processor/stream_writer.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,38 @@ func (sw *StreamWriter) flushLocked() error {
171171
return nil
172172
}
173173

174+
// WriteRaw writes a chunk directly to the underlying writer without any prefix.
175+
// It is thread-safe and can be used for streaming LLM output where per-chunk
176+
// prefixes would create visual noise.
177+
func (sw *StreamWriter) WriteRaw(chunk []byte) error {
178+
sw.mu.Lock()
179+
defer sw.mu.Unlock()
180+
181+
sw.metrics.TotalChunks++
182+
sw.metrics.TotalBytes += int64(len(chunk))
183+
184+
if sw.debug {
185+
slog.Debug("stream raw chunk received",
186+
"chunk_size", len(chunk),
187+
"total_chunks", sw.metrics.TotalChunks,
188+
"total_bytes", sw.metrics.TotalBytes,
189+
)
190+
}
191+
192+
// Write directly, handling potential short writes
193+
for len(chunk) > 0 {
194+
n, err := sw.writer.Write(chunk)
195+
if err != nil {
196+
return err
197+
}
198+
if n == 0 {
199+
return io.ErrShortWrite
200+
}
201+
chunk = chunk[n:]
202+
}
203+
return nil
204+
}
205+
174206
// GetMetrics returns current streaming metrics
175207
func (sw *StreamWriter) GetMetrics() StreamWriterMetrics {
176208
sw.mu.Lock()

0 commit comments

Comments
 (0)