fix: ensure plugins cannot close channels#554
Conversation
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. 📝 WalkthroughWalkthroughThis PR changes error-channel handling across the codebase: fields previously typed as bidirectional Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Pre-merge checks and finishing touches✅ Passed checks (2 passed)
✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
input/chainsync/chainsync.go (1)
314-318: Critical: Add nil check before sending to error channel.At line 316, the code sends to
c.errorChanwithout verifying it's non-nil. SinceNew()no longer initializes the error channel and it's only set viaSetErrorChan(), ifStart()is called beforeSetErrorChan(), sending to a nil channel will block forever, causing a goroutine leak.Apply this diff to add a nil check:
} else { // Pass error through our own error channel - c.errorChan <- err + if c.errorChan != nil { + c.errorChan <- err + } }Alternatively, consider documenting that
SetErrorChanmust be called beforeStart(), or validate inStart()that the error channel has been set.output/embedded/embedded.go (1)
51-56: Critical: Add nil check before sending to error channel.At line 53, the code sends to
e.errorChanwithout checking if it's non-nil. SinceNew()no longer initializes this channel and it's only set viaSetErrorChan(), callingStart()beforeSetErrorChan()will cause sends to a nil channel, which blocks forever and creates a goroutine leak.Apply this diff to add a nil check:
if e.callbackFunc != nil { if err := e.callbackFunc(evt); err != nil { - e.errorChan <- fmt.Errorf("callback function error: %w", err) - return + if e.errorChan != nil { + e.errorChan <- fmt.Errorf("callback function error: %w", err) + return + } } }Alternatively, validate in
Start()that the error channel has been set, or document the requirement thatSetErrorChanmust be called beforeStart().
🧹 Nitpick comments (1)
pipeline/pipeline.go (1)
83-96: Potential race:SetErrorChanis called afterStart().The plugin's
Start()is invoked beforeSetErrorChan(). IfStart()spawns a goroutine that immediately attempts to send an error, theerrorChanfield in the plugin will still benil, causing the send to block indefinitely.Consider setting the error channel before calling
Start():// Start inputs for _, input := range p.inputs { + // Create error channel for this plugin + errorChan := make(chan error) + input.SetErrorChan(errorChan) + p.inputErrorChans = append(p.inputErrorChans, errorChan) if err := input.Start(); err != nil { return fmt.Errorf("failed to start input: %w", err) } - // Create error channel for this plugin - errorChan := make(chan error) - input.SetErrorChan(errorChan) - p.inputErrorChans = append(p.inputErrorChans, errorChan) // Start background process to send input events to combined filter channelThe same pattern applies to filters (lines 99-123) and outputs (lines 131-142).
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (12)
filter/chainsync/chainsync.go(2 hunks)filter/chainsync/chainsync_test.go(0 hunks)filter/chainsync/plugin_test.go(0 hunks)filter/event/event.go(2 hunks)input/chainsync/chainsync.go(2 hunks)output/embedded/embedded.go(2 hunks)output/log/log.go(2 hunks)output/notify/notify.go(2 hunks)output/push/push.go(2 hunks)output/webhook/webhook.go(2 hunks)pipeline/pipeline.go(5 hunks)plugin/plugin.go(1 hunks)
💤 Files with no reviewable changes (2)
- filter/chainsync/chainsync_test.go
- filter/chainsync/plugin_test.go
🧰 Additional context used
🧬 Code graph analysis (3)
pipeline/pipeline.go (3)
filter/event/event.go (2)
Event(24-30)New(33-42)event/event.go (2)
Event(21-26)New(28-39)plugin/plugin.go (1)
Plugin(21-27)
input/chainsync/chainsync.go (1)
filter/chainsync/chainsync.go (1)
ChainSync(28-37)
filter/chainsync/chainsync.go (1)
input/chainsync/chainsync.go (1)
ChainSync(87-114)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: cubic · AI code reviewer
- GitHub Check: Analyze (go)
🔇 Additional comments (13)
filter/event/event.go (1)
25-25: LGTM! Error channel correctly converted to send-only.The change from
chan errortochan<- errorsuccessfully prevents this plugin from closing the error channel. The newSetErrorChansetter enables external injection, aligning with the PR's objective to centralize channel lifecycle management.Also applies to: 74-77
plugin/plugin.go (1)
21-27: LGTM! Interface correctly updated to enforce send-only error channels.Replacing
ErrorChan()getter withSetErrorChan(chan<- error)setter is the correct approach to prevent plugins from accessing and closing channels they don't own. This enforces the architectural change across all plugin implementations.output/push/push.go (1)
35-35: LGTM! Consistent implementation of send-only error channel.The conversion to
chan<- errorand addition ofSetErrorChanfollows the established pattern. The error channel is stored but not used in this implementation, maintaining forward compatibility.Also applies to: 291-294
output/webhook/webhook.go (1)
42-42: LGTM! Error channel properly converted to send-only.The implementation correctly follows the PR pattern: send-only channel type, external injection via
SetErrorChan, and removal of internal channel management fromNew()andStop().Also applies to: 308-311
output/notify/notify.go (1)
31-31: LGTM! Send-only error channel correctly implemented.The changes align with the codebase-wide refactor to prevent plugins from managing error channel lifecycle. Implementation is consistent and correct.
Also applies to: 171-174
input/chainsync/chainsync.go (1)
94-94: LGTM! Error channel type correctly updated.The conversion to
chan<- errorand addition ofSetErrorChanaligns with the PR's objective to prevent plugins from closing error channels.Also applies to: 174-177
filter/chainsync/chainsync.go (1)
29-29: LGTM! Send-only error channel correctly implemented.The changes are consistent with the broader refactoring pattern. The error channel is properly typed as send-only and configured via external injection through
SetErrorChan.Also applies to: 337-340
output/embedded/embedded.go (1)
26-26: LGTM! Error channel correctly converted to send-only.The change to
chan<- errorwithSetErrorChansetter follows the established pattern and prevents the plugin from closing the error channel.Also applies to: 74-77
output/log/log.go (2)
86-89: Send-only channel pattern correctly implements the PR objective.The
SetErrorChanmethod accepts a send-only channel (chan<- error), preventing the plugin from closing the channel. This aligns with the PR goal of ensuring plugins cannot close channels.
26-32: TheerrorChanfield is never used inLogOutput.Start().The
SetErrorChan()method stores the channel, but the event processing loop never sends to it. This appears intentional—unlike other plugins (e.g.,embedded.goreports callback errors),LogOutputonly processes events with no error conditions to report. If this is by design, it's acceptable to leave as-is for architectural consistency. However, if error handling should be added (e.g., for logging failures), implement error sends in theStart()goroutine or remove the field to clarify the absence of error reporting.pipeline/pipeline.go (3)
34-38: Good design: Centralized per-plugin error channel management.Storing error channels in the Pipeline ensures proper lifecycle control and prevents plugins from closing channels they don't own.
185-198: Correct ordering: Error channels closed after goroutines exit.The per-plugin error channels are closed after
p.wg.Wait()completes, ensuring thaterrorChanWaitgoroutines have already exited via thedoneChansignal. This prevents sending to a closed channel.
251-270: Error forwarding logic is sound.The
errorChanWaitfunction correctly handles both the done signal and channel closure, forwarding errors to the pipeline's error channel with proper select-based cancellation.
There was a problem hiding this comment.
2 issues found across 12 files
Prompt for AI agents (all 2 issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="output/embedded/embedded.go">
<violation number="1" location="output/embedded/embedded.go:75">
P1: The `errorChan` is no longer initialized in `New()`, but `Start()` sends to it without a nil check. If `SetErrorChan()` is never called and a callback error occurs, sending to a nil channel will block the goroutine forever. Consider adding a nil check before sending, or documenting that `SetErrorChan()` must be called before `Start()`.</violation>
</file>
<file name="pipeline/pipeline.go">
<violation number="1" location="pipeline/pipeline.go:89">
P1: Race condition: `SetErrorChan` is called after `input.Start()`. If the plugin's `Start()` spawns goroutines that immediately report errors, the error channel won't be set yet. Move the error channel creation and `SetErrorChan` call before `input.Start()`.</violation>
</file>
Reply to cubic to teach it or ask questions. Re-run a review with @cubic-dev-ai review this PR
| func (e *EmbeddedOutput) ErrorChan() chan error { | ||
| return e.errorChan | ||
| // SetErrorChan sets the error channel | ||
| func (e *EmbeddedOutput) SetErrorChan(ch chan<- error) { |
There was a problem hiding this comment.
P1: The errorChan is no longer initialized in New(), but Start() sends to it without a nil check. If SetErrorChan() is never called and a callback error occurs, sending to a nil channel will block the goroutine forever. Consider adding a nil check before sending, or documenting that SetErrorChan() must be called before Start().
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At output/embedded/embedded.go, line 75:
<comment>The `errorChan` is no longer initialized in `New()`, but `Start()` sends to it without a nil check. If `SetErrorChan()` is never called and a callback error occurs, sending to a nil channel will block the goroutine forever. Consider adding a nil check before sending, or documenting that `SetErrorChan()` must be called before `Start()`.</comment>
<file context>
@@ -66,16 +65,15 @@ func (e *EmbeddedOutput) Start() error {
-func (e *EmbeddedOutput) ErrorChan() chan error {
- return e.errorChan
+// SetErrorChan sets the error channel
+func (e *EmbeddedOutput) SetErrorChan(ch chan<- error) {
+ e.errorChan = ch
}
</file context>
| } | ||
| // Create error channel for this plugin | ||
| errorChan := make(chan error) | ||
| input.SetErrorChan(errorChan) |
There was a problem hiding this comment.
P1: Race condition: SetErrorChan is called after input.Start(). If the plugin's Start() spawns goroutines that immediately report errors, the error channel won't be set yet. Move the error channel creation and SetErrorChan call before input.Start().
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At pipeline/pipeline.go, line 89:
<comment>Race condition: `SetErrorChan` is called after `input.Start()`. If the plugin's `Start()` spawns goroutines that immediately report errors, the error channel won't be set yet. Move the error channel creation and `SetErrorChan` call before `input.Start()`.</comment>
<file context>
@@ -78,18 +84,26 @@ func (p *Pipeline) Start() error {
}
+ // Create error channel for this plugin
+ errorChan := make(chan error)
+ input.SetErrorChan(errorChan)
+ p.inputErrorChans = append(p.inputErrorChans, errorChan)
// Start background process to send input events to combined filter channel
</file context>
943c402 to
6aaf13f
Compare
There was a problem hiding this comment.
Actionable comments posted: 0
♻️ Duplicate comments (1)
output/embedded/embedded.go (1)
25-30: Nil‑guarded error reporting resolves previous blocking riskAdding
if e.errorChan != nilbefore sending the wrapped callback error ensures the goroutine won’t block forever when no error channel was injected, while still respecting that the pipeline—not the plugin—owns channel creation/closure. This addresses the earlier concern about sends on an uninitializederrorChan.Also applies to: 51-56, 76-79
🧹 Nitpick comments (4)
plugin/plugin.go (1)
21-27: Plugin interface change aligns with pipeline‑owned error channelsSwitching to
SetErrorChan(chan<- error)on the interface cleanly enforces that plugins only send on error channels and never own/close them. Consider adding a brief comment on the expected contract (pipeline injects, plugins never close, nil may mean “no error reporting”) to make this pattern explicit to external plugin authors.pipeline/pipeline_test.go (1)
42-53: Clarify Stop lifecycle expectations in idempotence test
TestStopIdempotentasserts that callingStoptwice without ever callingStartreturnsnilboth times. If the intended contract is thatStopis always safe (even pre‑Start), this is great—consider adding a short comment to that effect so future readers know this is intentional, not a missingStart()call in the test.pipeline/pipeline.go (2)
26-39: Struct additions and constructor wiring for error handling look solidThe new fields and the
New()initialization correctly make the pipeline the owner of the shared error channel and the per-plugin error channels; this aligns with the PR goal of centralizing channel ownership. ThedoneChan,wg, andstopOnceusage is coherent with the shutdown flow.Optional: initializing the
[]chan errorslices explicitly to empty is not required—nilslices work fine withappendand slightly reduce boilerplate.Also applies to: 41-50
185-194: Remove per-plugin error channel closing inStop()and tightenerrorChanWaitsignature for defensive programmingThe review correctly identifies that plugins receive
chan<- error(send-only), which still allows them to callclose()on the channel. While the current code's error channels are already properly drained by the timeStop()closes them (sincep.wg.Wait()at line 157 ensures allerrorChanWaitgoroutines have exited), a misbehaving plugin that closes its error channel afterStop()returns would cause a "close of closed channel" panic.Refactor defensively by not closing per-plugin error channels in
Stop():- // Close error channels - for _, ch := range p.inputErrorChans { - close(ch) - } - for _, ch := range p.filterErrorChans { - close(ch) - } - for _, ch := range p.outputErrorChans { - close(ch) - }The
errorChanWaitgoroutines already exit promptly whendoneChanis closed (line 156), making these close calls unnecessary for correctness. Removing them reduces the attack surface by preventing double-close panics if a plugin misbehaves.Also improve the
errorChanWaitsignature (line 252) to clarify that it only reads from the error channel:-func (p *Pipeline) errorChanWait(errorChan chan error) { +func (p *Pipeline) errorChanWait(errorChan <-chan error) {And update the comment to match:
-// errorChanWait reads from an error channel. If an error is received, it's copied to the plugin error channel +// errorChanWait reads from a plugin error channel and forwards errors to the pipeline error channel
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
filter/chainsync/chainsync.go(2 hunks)filter/chainsync/chainsync_test.go(0 hunks)filter/chainsync/plugin_test.go(0 hunks)filter/event/event.go(2 hunks)input/chainsync/chainsync.go(3 hunks)output/embedded/embedded.go(3 hunks)output/log/log.go(2 hunks)output/notify/notify.go(2 hunks)output/push/push.go(2 hunks)output/webhook/webhook.go(2 hunks)pipeline/pipeline.go(6 hunks)pipeline/pipeline_test.go(1 hunks)plugin/plugin.go(1 hunks)
💤 Files with no reviewable changes (2)
- filter/chainsync/plugin_test.go
- filter/chainsync/chainsync_test.go
🚧 Files skipped from review as they are similar to previous changes (2)
- input/chainsync/chainsync.go
- filter/chainsync/chainsync.go
🧰 Additional context used
🧬 Code graph analysis (3)
pipeline/pipeline.go (4)
filter/event/event.go (2)
Event(24-30)New(33-42)event/event.go (2)
Event(21-26)New(28-39)plugin/plugin.go (1)
Plugin(21-27)output/log/log.go (1)
New(34-53)
filter/event/event.go (1)
event/event.go (1)
Event(21-26)
pipeline/pipeline_test.go (1)
pipeline/pipeline.go (1)
New(41-52)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Analyze (go)
🔇 Additional comments (7)
output/notify/notify.go (1)
31-35: NotifyOutput now correctly treats error channel as externally owned
NotifyOutputstores a send‑onlyerrorChanand only exposesSetErrorChan, without creating or closing the channel itself. That matches the new plugin contract and keeps channel ownership with the pipeline; leaving it unused here is fine until you decide to surface notification errors.Also applies to: 171-174
pipeline/pipeline_test.go (1)
9-18: Good coverage for plugin Stop panicsThe
panicPluginhelper andTestStopWithPluginPaniccleanly verify that a panic inplugin.Stopis not swallowed by the pipeline. This is valuable regression coverage around shutdown behavior after the channel‑ownership changes.Also applies to: 28-40
filter/event/event.go (1)
24-30: Event filter correctly adopts injected send‑only error channelMoving to a stored
chan<- errorwith aSetErrorChansetter and no internal creation/close keeps error channel ownership in the pipeline while preserving existing Start/Stop semantics. Even thougherrorChanisn’t used yet, this is consistent with the new plugin pattern and safe.Also applies to: 74-77
output/push/push.go (1)
34-43: PushOutput error channel wiring matches new plugin contract
PushOutputnow accepts a send‑onlyerrorChanviaSetErrorChanand does not create or close it internally, which is exactly what’s needed to keep the pipeline in control of channel lifecycle. Using it for push‑send failures can be added later without changing this API.Also applies to: 291-294
output/webhook/webhook.go (1)
41-50: WebhookOutput now treats error channel as injected, not ownedStoring a send‑only
errorChanwith aSetErrorChansetter and never creating/closing it internally keeps ownership squarely in the pipeline, aligned with the PR’s goal of preventing plugins from closing shared channels.Also applies to: 308-311
output/log/log.go (1)
26-32: LogOutput updated to accept injected send‑only error channelThe new
errorChan chan<- errorfield plusSetErrorChanmethod cleanly align the log output with the shared plugin interface while leaving its existing logging behavior untouched. No issues from the channel‑ownership perspective.Also applies to: 86-89
pipeline/pipeline.go (1)
82-97: Per-plugin error channels are now created and injected beforeStart()(avoids prior race)Creating each plugin’s
errorChan, callingSetErrorChan, and startingerrorChanWaitbefore invoking the plugin’sStart()ensures any goroutines spawned duringStart()can safely send errors immediately. The pattern is applied consistently across inputs, filters, and outputs and integrates correctly with theWaitGroupanddoneChan-based shutdown.Also applies to: 100-104, 116-123, 132-143
6aaf13f to
498575c
Compare
There was a problem hiding this comment.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pipeline/pipeline.go (1)
149-188: Missing cleanup: Per-plugin error channels are not closed in Stop().The pipeline creates per-plugin error channels in
Start()(lines 82, 98, 131) and stores them in slices, butStop()never closes these channels. This could cause:
- Resource leaks (open channels)
- Blocked goroutines if plugins attempt to send errors after shutdown
- Unclear ownership semantics
Add cleanup after stopping plugins:
for _, output := range p.outputs { if err := output.Stop(); err != nil { stopErrors = append( stopErrors, fmt.Errorf("failed to stop output: %w", err), ) } } + + // Close per-plugin error channels + for _, ch := range p.inputErrorChans { + close(ch) + } + for _, ch := range p.filterErrorChans { + close(ch) + } + for _, ch := range p.outputErrorChans { + close(ch) + } close(p.errorChan)
🧹 Nitpick comments (3)
filter/event/event.go (1)
25-25:errorChanfield is assigned but never used.The
errorChanfield is set viaSetErrorChanbut never referenced elsewhere in this file. TheStart()method doesn't send any errors to this channel. If this filter doesn't need error reporting, consider either:
- Documenting that this is intentional (interface compliance only)
- Or adding a no-op implementation note in
SetErrorChanThis isn't blocking, but dead fields can confuse future maintainers.
Also applies to: 74-77
output/notify/notify.go (1)
31-31:errorChanis set but never used; errors usepanic()instead.The
SetErrorChanimplementation stores the channel, butStart()usespanic()for all error conditions (lines 81, 85, 102, 107, 120, 125, 129, 148, 157). This defeats the purpose of the error channel pattern introduced by this PR.Consider replacing panics with error channel sends (with nil check) for graceful degradation:
if err != nil { - panic(err) + if n.errorChan != nil { + n.errorChan <- fmt.Errorf("beeep notify error: %w", err) + } + return }Also applies to: 171-174
output/webhook/webhook.go (1)
42-42:errorChanis set but never used; errors are logged or panic instead.Similar to
notify.go, theerrorChanfield is assigned viaSetErrorChanbut never utilized. Errors inStart()either panic (lines 78, 84) or are logged (line 103).For consistency with
output/embedded/embedded.gowhich properly uses the error channel pattern, consider sending errors toerrorChan(with nil check) instead of just logging or panicking.Also applies to: 308-311
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
filter/chainsync/chainsync.go(2 hunks)filter/chainsync/chainsync_test.go(3 hunks)filter/chainsync/plugin_test.go(1 hunks)filter/event/event.go(2 hunks)input/chainsync/chainsync.go(3 hunks)output/embedded/embedded.go(3 hunks)output/log/log.go(2 hunks)output/notify/notify.go(2 hunks)output/push/push.go(2 hunks)output/webhook/webhook.go(2 hunks)pipeline/pipeline.go(6 hunks)pipeline/pipeline_test.go(1 hunks)plugin/plugin.go(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- filter/chainsync/plugin_test.go
- pipeline/pipeline_test.go
- output/log/log.go
🧰 Additional context used
🧬 Code graph analysis (3)
input/chainsync/chainsync.go (1)
filter/chainsync/chainsync.go (1)
ChainSync(28-37)
filter/chainsync/chainsync.go (1)
input/chainsync/chainsync.go (1)
ChainSync(87-114)
pipeline/pipeline.go (3)
filter/event/event.go (1)
Event(24-30)event/event.go (1)
Event(21-26)plugin/plugin.go (1)
Plugin(24-30)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Analyze (go)
🔇 Additional comments (4)
plugin/plugin.go (1)
21-29: Interface redesign aligns well with the PR objectives.The updated
Plugininterface properly enforces the constraint that plugins cannot close channels by:
- Using send-only
chan<- errortype- Documenting that nil channels mean "no error reporting"
- Moving channel ownership to the pipeline
This is a clean API change that prevents the panic scenarios mentioned in the PR objectives.
output/embedded/embedded.go (2)
53-55: Proper nil-guarded error channel usage.This correctly implements the pattern with a nil check before sending to
errorChan, addressing the previous review concern about goroutine blocking. This serves as a good reference implementation for other outputs in this PR.
76-79: LGTM!The
SetErrorChanimplementation is straightforward and correctly stores the send-only channel.pipeline/pipeline.go (1)
81-85: Good fix: Race condition from previous review has been addressed.The past review comment flagged that
SetErrorChanwas called afterinput.Start(), causing a potential race condition. This has been correctly fixed - the error channel is now created and set (lines 81-83) before callinginput.Start()at line 85.
498575c to
45f7d5d
Compare
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (6)
filter/chainsync/chainsync_test.go (1)
215-217: Verify thatmockAddressstill exercises the basic address-match path
mockAddressnow ignores the input string and always returns a zero-valuecommon.Address. That might mean the"Basic address match"case inTestFilterByAddressno longer verifies theoutput.Address().String() == filterAddresspath, depending on howcommon.Address.String()behaves for a zero value.If you still intend to cover that direct address comparison logic, consider either:
- Constructing a
common.AddresswhoseString()matchesfilterAddress, or- Adjusting/removing the basic-match test case so it doesn’t give a false sense of coverage.
plugin/plugin.go (1)
21-29: Plugin interface update aligns with pipeline-managed error channelsThe addition of
SetErrorChan(chan<- error)and the clarifying comments about pipeline-injected, send-only error channels are consistent with the rest of the PR and the new ownership model.You might further clarify in the comment that plugins must not close or read from the provided channel and that the pipeline will close it during shutdown; misbehaving plugins could otherwise still cause panics.
output/notify/notify.go (1)
30-35: NotifyOutput error channel is wired correctly but currently unusedThe
chan<- errorfield andSetErrorChanimplementation are consistent with the new pipeline ownership model and do not alter current behavior. As a future enhancement, you could consider sending non-fatal errors toerrorChan(or logging) instead of panicking, now that structured error reporting is available.Also applies to: 171-174
output/push/push.go (1)
34-43: PushOutput error channel integration is correctIntroducing
errorChan chan<- errorandSetErrorChanaligns this output with the pipeline-managed error model without changing its current notification flow. Longer term, you may want to route recoverable failures througherrorChanor logging rather thanpanic/os.Exit, but that can be a separate refactor.Also applies to: 291-294
pipeline/pipeline.go (1)
182-192: Reconsider closing per-plugin error channels in Stop()These per-plugin error channels are only read by
errorChanWait, which has already exited by the time you reach this block (doneChanis closed andwg.Wait()has returned). Explicitly closing them here is therefore not required for correctness and slightly increases coupling to plugin behavior:
- A well-behaved plugin won’t close or send on the channel after
Stop(), but if an external plugin does, a concurrent send on a closing/closed channel will panic.- Since the pipeline cannot be restarted, simply letting these channels go out of use (and be GC’d with the pipeline) is usually sufficient.
You could either:
- Drop the explicit
close(...)loops and optionally nil out the slices (p.inputErrorChans = nil, etc.), or- Keep them but document very clearly in the plugin contract that sending on the error channel after
Stop()returns is undefined and may panic.input/chainsync/chainsync.go (1)
316-318: Consider logging when error channel is nil for better observability.The nil check prevents a panic, which is good defensive programming. However, if the error channel is not set and an error occurs, the error is silently dropped. Consider logging a warning when
c.errorChanis nil to aid debugging.} else { // Pass error through our own error channel if c.errorChan != nil { c.errorChan <- err + } else if c.logger != nil { + c.logger.Warn(fmt.Sprintf( + "error occurred but no error channel set: %s", + err, + )) } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
filter/chainsync/chainsync.go(2 hunks)filter/chainsync/chainsync_test.go(3 hunks)filter/chainsync/plugin_test.go(1 hunks)filter/event/event.go(2 hunks)input/chainsync/chainsync.go(3 hunks)output/embedded/embedded.go(3 hunks)output/log/log.go(2 hunks)output/notify/notify.go(2 hunks)output/push/push.go(2 hunks)output/webhook/webhook.go(2 hunks)pipeline/pipeline.go(7 hunks)pipeline/pipeline_test.go(1 hunks)plugin/plugin.go(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
- output/webhook/webhook.go
- filter/chainsync/plugin_test.go
- output/embedded/embedded.go
- pipeline/pipeline_test.go
🧰 Additional context used
🧬 Code graph analysis (3)
pipeline/pipeline.go (3)
filter/event/event.go (1)
Event(24-30)event/event.go (1)
Event(21-26)plugin/plugin.go (1)
Plugin(24-30)
filter/chainsync/chainsync.go (1)
input/chainsync/chainsync.go (1)
ChainSync(87-114)
input/chainsync/chainsync.go (1)
filter/chainsync/chainsync.go (1)
ChainSync(28-37)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Analyze (go)
🔇 Additional comments (8)
filter/event/event.go (1)
24-30: Event error channel wiring matches new Plugin contractChanging
errorChantochan<- errorand addingSetErrorChanis consistent with the updatedplugin.Plugininterface; since the channel is not yet used in this filter, there’s no behavioral regression, and ownership stays with the pipeline.Also applies to: 74-77
filter/chainsync/chainsync_test.go (2)
86-88: UnmarshalCBOR stub is fine for testsMaking the CBOR input parameter unnamed and returning
nilkeeps the mock minimal and avoids unused-parameter warnings without changing behavior.
135-137: More descriptive MockOutput.String improves test diagnosticsReturning
"mockOutput"here is harmless and makes assertion failures easier to read when values are formatted with%v/%+v.filter/chainsync/chainsync.go (1)
28-37: ChainSync now cleanly implements the new error-channel contractAdding
errorChan chan<- errorandSetErrorChanbrings this filter in line with the updatedplugin.PluginAPI without changing its runtime behavior. Once you start emitting errors from this filter, you’ll already have the plumbing in place.Also applies to: 337-340
output/log/log.go (1)
26-32: LogOutput error-channel wiring is consistent and non-invasiveSwitching
errorChantochan<- errorand addingSetErrorChanintegrates this output with the pipeline’s error-channel management without affecting its existing logging behavior.Also applies to: 86-89
pipeline/pipeline.go (2)
26-38: Per-plugin error channels and Start() ordering look solidCreating a dedicated
chan errorper plugin, callingSetErrorChanbeforeStart(), and then wiring each througherrorChanWaitgives you clear ownership and avoids the earlier race where plugins could start emitting errors before their channels were set. The use ofwg.Add/wg.Donearound bothchanCopyLoopanderrorChanWaitalso makes the shutdown path inStop()predictable.Also applies to: 79-85, 95-101, 129-134
248-268: errorChanWait correctly shields the pipeline error channel during shutdown
errorChanWait’s select ondoneChanensures that all forwarding goroutines exit beforep.errorChanis closed inStop(), so you avoid sends to a closed pipeline error channel. The function signature using<-chan erroralso enforces read-only usage on the caller side, matching the ownership model.input/chainsync/chainsync.go (1)
94-94: Field type change to send-only channel correctly prevents unintended channel closure.The change from
chan errortochan<- erroron line 94 and theSetErrorChanmethod addition (lines 174-177) are well-designed. The pipeline always callsSetErrorChanbeforeStart, eliminating any race condition concerns. The nil check at lines 316-318 aligns with the documented plugin interface behavior: when the pipeline doesn't inject an error channel, plugins gracefully handle it by not sending errors. This is intentional design, not an oversight.
Signed-off-by: Chris Gianelloni <wolf31o2@blinklabs.io>
45f7d5d to
b0ad37e
Compare
|
@cubic-dev-ai review this PR |
@wolf31o2 I've started the AI code review. It'll take a few minutes to complete. |
Summary by cubic
Prevent plugins from closing channels by making error channels send-only and moving channel ownership to the pipeline. This stabilizes shutdown and avoids panics from premature closes.
Refactors
Migration
Written for commit b0ad37e. Summary will update automatically on new commits.
Summary by CodeRabbit
Refactor
Tests
✏️ Tip: You can customize this high-level summary in your review settings.