Skip to content

Commit d6bbb0a

Browse files
committed
fix: plugins own their error channel
Signed-off-by: Chris Gianelloni <wolf31o2@blinklabs.io>
1 parent 19415be commit d6bbb0a

File tree

13 files changed

+644
-340
lines changed

13 files changed

+644
-340
lines changed

filter/chainsync/chainsync.go

Lines changed: 244 additions & 223 deletions
Large diffs are not rendered by default.

filter/chainsync/chainsync_test.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -158,30 +158,36 @@ func TestChainSync_Stop(t *testing.T) {
158158
if err != nil {
159159
t.Fatalf("expected no error, got %v", err)
160160
}
161-
// Check if channels are closed
162-
select {
163-
case <-c.inputChan:
164-
default:
165-
t.Fatalf("expected inputChan to be closed")
161+
// Check if channels are nil after stop
162+
if c.inputChan != nil {
163+
t.Fatalf("expected inputChan to be nil after stop")
166164
}
167-
select {
168-
case <-c.outputChan:
169-
default:
170-
t.Fatalf("expected outputChan to be closed")
165+
if c.outputChan != nil {
166+
t.Fatalf("expected outputChan to be nil after stop")
171167
}
172168
}
173169

174170
func TestChainSync_InputChan(t *testing.T) {
175171
c := New()
172+
err := c.Start()
173+
if err != nil {
174+
t.Fatalf("expected no error, got %v", err)
175+
}
176+
defer c.Stop()
176177
if c.InputChan() == nil {
177-
t.Fatalf("expected non-nil inputChan")
178+
t.Fatalf("expected non-nil inputChan after Start()")
178179
}
179180
}
180181

181182
func TestChainSync_OutputChan(t *testing.T) {
182183
c := New()
184+
err := c.Start()
185+
if err != nil {
186+
t.Fatalf("expected no error, got %v", err)
187+
}
188+
defer c.Stop()
183189
if c.OutputChan() == nil {
184-
t.Fatalf("expected non-nil outputChan")
190+
t.Fatalf("expected non-nil outputChan after Start()")
185191
}
186192
}
187193

filter/chainsync/plugin_test.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,24 @@ func TestPluginChannels(t *testing.T) {
5050
// Create a new plugin instance
5151
p := NewFromCmdlineOptions()
5252

53+
// Start the plugin (channels are created in Start())
54+
err := p.Start()
55+
assert.NoError(t, err, "Plugin should start without errors")
56+
defer p.Stop()
57+
5358
// Verify that the input channel is not nil
54-
assert.NotNil(t, p.InputChan(), "Input channel should not be nil")
59+
assert.NotNil(
60+
t,
61+
p.InputChan(),
62+
"Input channel should not be nil after Start()",
63+
)
5564

5665
// Verify that the output channel is not nil
57-
assert.NotNil(t, p.OutputChan(), "Output channel should not be nil")
66+
assert.NotNil(
67+
t,
68+
p.OutputChan(),
69+
"Output channel should not be nil after Start()",
70+
)
5871
}
5972

6073
func TestPluginEventProcessing(t *testing.T) {

filter/event/event.go

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,25 @@ package event
1616

1717
import (
1818
"slices"
19+
"sync"
1920

2021
"github.com/blinklabs-io/adder/event"
2122
"github.com/blinklabs-io/adder/plugin"
2223
)
2324

2425
type Event struct {
25-
errorChan chan<- error
26+
errorChan chan error
2627
inputChan chan event.Event
2728
outputChan chan event.Event
29+
doneChan chan struct{}
30+
stopOnce sync.Once
2831
logger plugin.Logger
2932
filterTypes []string
3033
}
3134

3235
// New returns a new Event object with the specified options applied
3336
func New(options ...EventOptionFunc) *Event {
34-
e := &Event{
35-
inputChan: make(chan event.Event, 10),
36-
outputChan: make(chan event.Event, 10),
37-
}
37+
e := &Event{}
3838
for _, option := range options {
3939
option(e)
4040
}
@@ -43,37 +43,58 @@ func New(options ...EventOptionFunc) *Event {
4343

4444
// Start the event filter
4545
func (e *Event) Start() error {
46+
e.errorChan = make(chan error)
47+
e.inputChan = make(chan event.Event, 10)
48+
e.outputChan = make(chan event.Event, 10)
49+
e.doneChan = make(chan struct{})
50+
e.stopOnce = sync.Once{}
4651
go func() {
4752
for {
48-
evt, ok := <-e.inputChan
49-
// Channel has been closed, which means we're shutting down
50-
if !ok {
53+
select {
54+
case <-e.doneChan:
5155
return
52-
}
53-
// Drop events if we have a type filter configured and the event doesn't match
54-
if len(e.filterTypes) > 0 {
55-
matched := slices.Contains(e.filterTypes, evt.Type)
56-
if !matched {
57-
continue
56+
case evt, ok := <-e.inputChan:
57+
// Channel has been closed, which means we're shutting down
58+
if !ok {
59+
return
5860
}
61+
// Drop events if we have a type filter configured and the event doesn't match
62+
if len(e.filterTypes) > 0 {
63+
matched := slices.Contains(e.filterTypes, evt.Type)
64+
if !matched {
65+
continue
66+
}
67+
}
68+
// Send event along
69+
e.outputChan <- evt
5970
}
60-
// Send event along
61-
e.outputChan <- evt
6271
}
6372
}()
6473
return nil
6574
}
6675

6776
// Stop the event filter
6877
func (e *Event) Stop() error {
69-
close(e.inputChan)
70-
close(e.outputChan)
78+
e.stopOnce.Do(func() {
79+
if e.doneChan != nil {
80+
close(e.doneChan)
81+
}
82+
if e.inputChan != nil {
83+
close(e.inputChan)
84+
}
85+
if e.outputChan != nil {
86+
close(e.outputChan)
87+
}
88+
if e.errorChan != nil {
89+
close(e.errorChan)
90+
}
91+
})
7192
return nil
7293
}
7394

74-
// SetErrorChan sets the error channel
75-
func (e *Event) SetErrorChan(ch chan<- error) {
76-
e.errorChan = ch
95+
// ErrorChan returns the filter error channel
96+
func (e *Event) ErrorChan() chan error {
97+
return e.errorChan
7798
}
7899

79100
// InputChan returns the input event channel

input/chainsync/chainsync.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ type ChainSync struct {
9191
kupoClient *kugo.Client
9292
oConn *ouroboros.Connection
9393
eventChan chan event.Event
94-
errorChan chan<- error
94+
errorChan chan error
9595
status *ChainSyncStatus
9696
dialFamily string
9797
kupoUrl string
@@ -128,7 +128,6 @@ type StatusUpdateFunc func(ChainSyncStatus)
128128
// New returns a new ChainSync object with the specified options applied
129129
func New(options ...ChainSyncOptionFunc) *ChainSync {
130130
c := &ChainSync{
131-
eventChan: make(chan event.Event, 10),
132131
intersectPoints: []ocommon.Point{},
133132
status: &ChainSyncStatus{},
134133
}
@@ -142,6 +141,8 @@ func New(options ...ChainSyncOptionFunc) *ChainSync {
142141

143142
// Start the chain sync input
144143
func (c *ChainSync) Start() error {
144+
c.eventChan = make(chan event.Event, 10)
145+
c.errorChan = make(chan error)
145146
if err := c.setupConnection(); err != nil {
146147
return err
147148
}
@@ -166,14 +167,25 @@ func (c *ChainSync) Start() error {
166167

167168
// Stop the chain sync input
168169
func (c *ChainSync) Stop() error {
169-
err := c.oConn.Close()
170-
close(c.eventChan)
170+
var err error
171+
if c.oConn != nil {
172+
err = c.oConn.Close()
173+
c.oConn = nil
174+
}
175+
if c.eventChan != nil {
176+
close(c.eventChan)
177+
c.eventChan = nil
178+
}
179+
if c.errorChan != nil {
180+
close(c.errorChan)
181+
c.errorChan = nil
182+
}
171183
return err
172184
}
173185

174-
// SetErrorChan sets the error channel
175-
func (c *ChainSync) SetErrorChan(ch chan<- error) {
176-
c.errorChan = ch
186+
// ErrorChan returns the input error channel
187+
func (c *ChainSync) ErrorChan() chan error {
188+
return c.errorChan
177189
}
178190

179191
// InputChan always returns nil

output/embedded/embedded.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,14 @@ import (
2323
type CallbackFunc func(event.Event) error
2424

2525
type EmbeddedOutput struct {
26-
errorChan chan<- error
26+
errorChan chan error
2727
eventChan chan event.Event
2828
callbackFunc CallbackFunc
2929
outputChan chan event.Event
3030
}
3131

3232
func New(options ...EmbeddedOptionFunc) *EmbeddedOutput {
33-
e := &EmbeddedOutput{
34-
eventChan: make(chan event.Event, 10),
35-
}
33+
e := &EmbeddedOutput{}
3634
for _, option := range options {
3735
option(e)
3836
}
@@ -41,6 +39,8 @@ func New(options ...EmbeddedOptionFunc) *EmbeddedOutput {
4139

4240
// Start the embedded output
4341
func (e *EmbeddedOutput) Start() error {
42+
e.eventChan = make(chan event.Event, 10)
43+
e.errorChan = make(chan error)
4444
go func() {
4545
for {
4646
evt, ok := <-e.eventChan
@@ -66,16 +66,24 @@ func (e *EmbeddedOutput) Start() error {
6666

6767
// Stop the embedded output
6868
func (e *EmbeddedOutput) Stop() error {
69-
close(e.eventChan)
69+
if e.eventChan != nil {
70+
close(e.eventChan)
71+
e.eventChan = nil
72+
}
73+
if e.errorChan != nil {
74+
close(e.errorChan)
75+
e.errorChan = nil
76+
}
7077
if e.outputChan != nil {
7178
close(e.outputChan)
79+
e.outputChan = nil
7280
}
7381
return nil
7482
}
7583

76-
// SetErrorChan sets the error channel
77-
func (e *EmbeddedOutput) SetErrorChan(ch chan<- error) {
78-
e.errorChan = ch
84+
// ErrorChan returns the input error channel
85+
func (e *EmbeddedOutput) ErrorChan() chan error {
86+
return e.errorChan
7987
}
8088

8189
// InputChan returns the input event channel

output/log/log.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
)
2525

2626
type LogOutput struct {
27-
errorChan chan<- error
27+
errorChan chan error
2828
eventChan chan event.Event
2929
logger plugin.Logger
3030
outputLogger *slog.Logger
@@ -33,8 +33,7 @@ type LogOutput struct {
3333

3434
func New(options ...LogOptionFunc) *LogOutput {
3535
l := &LogOutput{
36-
eventChan: make(chan event.Event, 10),
37-
level: "info",
36+
level: "info",
3837
}
3938
for _, option := range options {
4039
option(l)
@@ -54,6 +53,8 @@ func New(options ...LogOptionFunc) *LogOutput {
5453

5554
// Start the log output
5655
func (l *LogOutput) Start() error {
56+
l.eventChan = make(chan event.Event, 10)
57+
l.errorChan = make(chan error)
5758
go func() {
5859
for {
5960
evt, ok := <-l.eventChan
@@ -79,13 +80,20 @@ func (l *LogOutput) Start() error {
7980

8081
// Stop the log output
8182
func (l *LogOutput) Stop() error {
82-
close(l.eventChan)
83+
if l.eventChan != nil {
84+
close(l.eventChan)
85+
l.eventChan = nil
86+
}
87+
if l.errorChan != nil {
88+
close(l.errorChan)
89+
l.errorChan = nil
90+
}
8391
return nil
8492
}
8593

86-
// SetErrorChan sets the error channel
87-
func (l *LogOutput) SetErrorChan(ch chan<- error) {
88-
l.errorChan = ch
94+
// ErrorChan returns the input error channel
95+
func (l *LogOutput) ErrorChan() chan error {
96+
return l.errorChan
8997
}
9098

9199
// InputChan returns the input event channel

0 commit comments

Comments
 (0)