feat: add outputBuffering option for step log streaming#2311
feat: add outputBuffering option for step log streaming#2311four-bytes-robby wants to merge 6 commits into
Conversation
📝 WalkthroughWalkthroughIntroduces a three-mode ChangesOutput Buffering Feature
Sequence Diagram(s)sequenceDiagram
participant StepExec as Step Executor
participant EffOB as EffectiveOutputBuffering
participant Ctx as context.Context
participant newWFM as newWriterForMode
participant StepWriter as stepLogWriter (gRPC)
participant LocalWriter as local file writer
StepExec->>EffOB: resolve(dag, step)
EffOB-->>StepExec: mode (buffer|line|none)
StepExec->>Ctx: WithOutputBuffering(ctx, mode)
alt Remote (gRPC) path
StepExec->>StepWriter: NewStepWriter(ctx, ...)
StepWriter->>Ctx: GetOutputBuffering(ctx)
Ctx-->>StepWriter: mode
StepWriter->>StepWriter: set lineBuffered / unbuffered flags
loop Write(p)
StepWriter->>StepWriter: none → sendChunk immediately
StepWriter->>StepWriter: line → scan newlines, sendChunk per line
StepWriter->>StepWriter: buffer → accumulate, sendChunk at 32KB
end
StepWriter->>StepWriter: Close() → sendChunk remaining bytes
end
alt Local file path
StepExec->>newWFM: newWriterForMode(fileWriter, mode)
newWFM-->>LocalWriter: directWriter | lineBufferedWriter | safeBufferedWriter
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai review |
✅ Action performedReview finished.
|
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (2)
internal/runtime/output.go (1)
382-383: ⚡ Quick winRemove the no-op context write in local writer setup.
Line 383 discards the returned context, so this call has no effect and suggests propagation that never happens.
Suggested fix
- // Store mode in context for consistency with setupRemoteWriters - _ = WithOutputBuffering(ctx, mode)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/runtime/output.go` around lines 382 - 383, The call to WithOutputBuffering(ctx, mode) on line 383 discards the returned context by assigning it to the blank identifier, making this a no-op that has no actual effect. Remove this entire line since the context propagation it appears to perform never actually happens - either the operation is unnecessary for local writer setup or the returned context should be captured and used in the function's subsequent logic.internal/runtime/writer.go (1)
127-135: ⚡ Quick winConsider using chunked newline detection instead of byte-by-byte iteration.
The byte-by-byte loop with individual appends can trigger many reallocations for large writes. The
stepLogWriter.Writeimplementation (log_streamer.go:232-244) demonstrates a more efficient pattern usingbytes.IndexByteto find newlines in chunks, then slicing the buffer in bulk.For typical line-buffered use cases (CLI output, logs), this may not be a bottleneck, but the chunked approach would scale better for high-throughput scenarios.
⚡ Example refactor using chunked detection
func (lw *lineBufferedWriter) Write(p []byte) (int, error) { lw.mu.Lock() defer lw.mu.Unlock() - for _, b := range p { - lw.buf = append(lw.buf, b) - if b == '\n' { + lw.buf = append(lw.buf, p...) + for { + idx := bytes.IndexByte(lw.buf, '\n') + if idx < 0 { + break + } + line := lw.buf[:idx+1] + lw.buf = lw.buf[idx+1:] + if _, err := lw.w.Write(line); err != nil { + return len(p), err - if _, err := lw.w.Write(lw.buf); err != nil { - return 0, err - } - lw.buf = lw.buf[:0] } } return len(p), nil }This also fixes the error-return issue mentioned in the previous comment.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/runtime/writer.go` around lines 127 - 135, The byte-by-byte iteration in the loop over p with individual appends to lw.buf causes many buffer reallocations for large writes. Refactor the loop to use bytes.IndexByte to find newlines in chunks rather than checking each byte individually, then slice and append the buffer content in bulk segments between newlines. Follow the pattern demonstrated in the stepLogWriter.Write implementation (referenced at log_streamer.go lines 232-244) which shows how to efficiently handle chunked newline detection and bulk buffer management instead of per-byte operations.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/cmn/schema/dag.schema.json`:
- Line 1768: The description text on line 1768 references "outputBuffering" in
camelCase, but the actual schema field name uses snake_case as
"output_buffering". Update the description string to use the correct field name
"output_buffering" instead of "outputBuffering" to match the actual schema field
and prevent confusion for config authors reading the schema documentation.
In `@internal/runtime/writer.go`:
- Around line 130-131: The error handling in the Write method violates the
io.Writer contract by returning (0, err) when the underlying write operation
fails. Since all input bytes have already been consumed into the buffer before
the flush attempt, the function should return (len(p), err) instead of (0, err)
when lw.w.Write(lw.buf) fails at line 130. This indicates to the caller that all
input bytes were accepted into the buffer, even though the flush operation
failed, consistent with the pattern used in stepLogWriter.Write from
log_streamer.go.
---
Nitpick comments:
In `@internal/runtime/output.go`:
- Around line 382-383: The call to WithOutputBuffering(ctx, mode) on line 383
discards the returned context by assigning it to the blank identifier, making
this a no-op that has no actual effect. Remove this entire line since the
context propagation it appears to perform never actually happens - either the
operation is unnecessary for local writer setup or the returned context should
be captured and used in the function's subsequent logic.
In `@internal/runtime/writer.go`:
- Around line 127-135: The byte-by-byte iteration in the loop over p with
individual appends to lw.buf causes many buffer reallocations for large writes.
Refactor the loop to use bytes.IndexByte to find newlines in chunks rather than
checking each byte individually, then slice and append the buffer content in
bulk segments between newlines. Follow the pattern demonstrated in the
stepLogWriter.Write implementation (referenced at log_streamer.go lines 232-244)
which shows how to efficiently handle chunked newline detection and bulk buffer
management instead of per-byte operations.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: c398dfa3-425d-4d4f-8ee1-80390a88d8d8
📒 Files selected for processing (11)
internal/cmn/schema/dag.schema.jsoninternal/core/dag.gointernal/core/output_buffering.gointernal/core/step.gointernal/runtime/context.gointernal/runtime/output.gointernal/runtime/output_test.gointernal/runtime/writer.gointernal/service/worker/coordreport/export_test.gointernal/service/worker/coordreport/log_streamer.gointernal/service/worker/coordreport/log_streamer_test.go
…ract, no-op context dagucloud#2310
|
@coderabbitai Fixed in ddca497. |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Check what commit ddca497 contains
gh pr view 2311 --repo dagucloud/dagu --json commits | jq '.commits[] | select(.oid | startswith("ddca497"))'Length of output: 653 🏁 Script executed: #!/bin/bash
# Get recent review comments from CodeRabbit on this PR
gh api repos/dagucloud/dagu/pulls/2311/comments --jq '.[] | select(.user.login == "coderabbitai[bot]") | {id: .id, path: .path, body: .body[:200], line: .line}'Length of output: 844
If you found this review helpful, would you consider giving us a shout-out on X? Thank you for using CodeRabbit! |
|
@coderabbitai Fixed in ddca497. (reply to comment on internal/cmn/schema/dag.schema.json:3447615306) |
|
@coderabbitai Fixed in ddca497. (reply to comment on internal/runtime/writer.go:3447615308) |
|
Tip For best results, initiate chat on the files or code changes.
|
|
Tip For best results, initiate chat on the files or code changes.
|
|
@coderabbitai All nitpicks from review #4538924054 fixed: |
|
Tip For best results, initiate chat on the files or code changes.
|
|
@yohamta0 Let's first keep this running a bit in production to evaluate that there are no further regressions. I added a fix for the finish / teardown moment of a dag run (e.g. error occurs) to flush the buffer even with 32k window. |
Closes #2310
Summary
Adds an
output_bufferingoption to both DAG and Step configurations that controls how step output is buffered before being flushed to the log stream.Three modes:
buffer(default)linenoneBackward compatible
Default
"buffer"preserves existing behavior for all existing DAGs — no migration needed.Changes (9 files, +485/-35)
internal/core/output_buffering.go— new enum type + constantsinternal/core/step.go— addedOutputBufferingfield to Stepinternal/core/dag.go— addedOutputBufferingto DAG +EffectiveOutputBuffering()internal/runtime/context.go— context helpers for threading buffering modeinternal/runtime/output.go— wire mode into remote writer setupinternal/service/worker/coordreport/log_streamer.go— critical fix: Write() supports all three modes, new sendChunk() helperinternal/cmn/schema/dag.schema.json— added output_buffering propertyTests
Root cause fix
The
stepLogWriterpreviously only flushed at 32KB, causing:With
output_buffering: "line", output streams in real-time. Withoutput_buffering: "none", every Write() is immediately sent.Summary by cubic
Adds
output_bufferingto DAGs and Steps to control how step logs flush in remote gRPC streams and local file writers, and preserves buffered output if the gRPC stream fails (implements #2310).New Features
output_bufferingat DAG and Step (step overrides). Modes:buffer(default),line,none.Bug Fixes
Written for commit a548189. Summary will update on new commits.
Summary by CodeRabbit
buffer(default),line(flush on newlines), andnone(immediate flush). Configurable at DAG level with per-step overrides.