Skip to content

Commit 01f3656

Browse files
committed
fix(enricher,stream_writer): improve error handling and metrics updates
Signed-off-by: spencercjh <spencercjh@gmail.com>
1 parent 1408e28 commit 01f3656

2 files changed

Lines changed: 27 additions & 21 deletions

File tree

internal/enricher/enricher.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package enricher
22

33
import (
44
"context"
5-
"fmt"
65
"io"
76
"log/slog"
87
"net/http"
@@ -114,10 +113,11 @@ func (e *Enricher) Enrich(ctx context.Context, spec *openapi3.T, opts *EnrichOpt
114113
return spec, err
115114
}
116115

117-
// Flush any remaining buffered content in StreamWriter
116+
// Flush any remaining buffered content in StreamWriter.
117+
// Streaming output is ancillary; a flush failure should not cause enrichment to fail.
118118
if streamWriter != nil {
119119
if flushErr := streamWriter.Flush(); flushErr != nil {
120-
return spec, fmt.Errorf("failed to flush stream writer: %w", flushErr)
120+
slog.Warn("failed to flush stream writer after successful enrichment", "error", flushErr)
121121
}
122122
}
123123

internal/enricher/processor/stream_writer.go

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"io"
88
"log/slog"
99
"sync"
10-
"sync/atomic"
1110
)
1211

1312
const (
@@ -83,14 +82,14 @@ func (sw *StreamWriter) WriteWithPrefix(prefix string, chunk []byte) error {
8382
sw.mu.Lock()
8483
defer sw.mu.Unlock()
8584

86-
// Update metrics
87-
atomic.AddInt64(&sw.metrics.TotalChunks, 1)
88-
atomic.AddInt64(&sw.metrics.TotalBytes, int64(len(chunk)))
85+
// Update metrics (protected by mutex)
86+
sw.metrics.TotalChunks++
87+
sw.metrics.TotalBytes += int64(len(chunk))
8988

9089
// Track unique prefixes
9190
if !sw.prefixSet[prefix] {
9291
sw.prefixSet[prefix] = true
93-
atomic.AddInt64(&sw.metrics.PrefixCount, 1)
92+
sw.metrics.PrefixCount++
9493
}
9594

9695
// Debug logging
@@ -99,8 +98,8 @@ func (sw *StreamWriter) WriteWithPrefix(prefix string, chunk []byte) error {
9998
"prefix", prefix,
10099
"chunk_size", len(chunk),
101100
"buffer_size", sw.buffer.Len(),
102-
"total_chunks", atomic.LoadInt64(&sw.metrics.TotalChunks),
103-
"total_bytes", atomic.LoadInt64(&sw.metrics.TotalBytes),
101+
"total_chunks", sw.metrics.TotalChunks,
102+
"total_bytes", sw.metrics.TotalBytes,
104103
)
105104
}
106105

@@ -138,23 +137,33 @@ func (sw *StreamWriter) flushLocked() error {
138137
return nil
139138
}
140139

141-
atomic.AddInt64(&sw.metrics.FlushCount, 1)
140+
sw.metrics.FlushCount++
142141

143142
// Debug logging
144143
if sw.debug {
145144
slog.Debug("stream buffer flushed",
146145
"prefix", sw.currentPrefix,
147146
"buffer_size", sw.buffer.Len(),
148-
"flush_count", atomic.LoadInt64(&sw.metrics.FlushCount),
147+
"flush_count", sw.metrics.FlushCount,
149148
)
150149
}
151150

152-
// Write prefix and buffered content
151+
// Write prefix
153152
if _, err := fmt.Fprintf(sw.writer, "[%s] ", sw.currentPrefix); err != nil {
154153
return err
155154
}
156-
if _, err := sw.writer.Write(sw.buffer.Bytes()); err != nil {
157-
return err
155+
156+
// Write buffered content, handling potential short writes
157+
data := sw.buffer.Bytes()
158+
for len(data) > 0 {
159+
n, err := sw.writer.Write(data)
160+
if err != nil {
161+
return err
162+
}
163+
if n == 0 {
164+
return io.ErrShortWrite
165+
}
166+
data = data[n:]
158167
}
159168

160169
// Reset buffer
@@ -164,10 +173,7 @@ func (sw *StreamWriter) flushLocked() error {
164173

165174
// GetMetrics returns current streaming metrics
166175
func (sw *StreamWriter) GetMetrics() StreamWriterMetrics {
167-
return StreamWriterMetrics{
168-
TotalChunks: atomic.LoadInt64(&sw.metrics.TotalChunks),
169-
TotalBytes: atomic.LoadInt64(&sw.metrics.TotalBytes),
170-
FlushCount: atomic.LoadInt64(&sw.metrics.FlushCount),
171-
PrefixCount: atomic.LoadInt64(&sw.metrics.PrefixCount),
172-
}
176+
sw.mu.Lock()
177+
defer sw.mu.Unlock()
178+
return sw.metrics
173179
}

0 commit comments

Comments
 (0)