Skip to content

Commit b1cc37a

Browse files
committed
refactor: replace global error handlers with atomic storage for concurrency safety
1 parent 12e3c79 commit b1cc37a

File tree

2 files changed

+58
-32
lines changed

2 files changed

+58
-32
lines changed

observer_capture_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,11 @@ func TestObserverImpl_tryErrorWithCapture_withCapture(t *testing.T) {
7878
is := assert.New(t)
7979

8080
var unhandledError error
81-
prev := OnUnhandledError
82-
OnUnhandledError = func(ctx context.Context, err error) {
81+
prev := GetOnUnhandledError()
82+
SetOnUnhandledError(func(ctx context.Context, err error) {
8383
unhandledError = err
84-
}
85-
defer func() { OnUnhandledError = prev }()
84+
})
85+
defer SetOnUnhandledError(prev)
8686

8787
observer := &observerImpl[int]{
8888
status: 0,
@@ -134,11 +134,11 @@ func TestObserverImpl_tryCompleteWithCapture_withCapture(t *testing.T) {
134134
is := assert.New(t)
135135

136136
var unhandledError error
137-
prev := OnUnhandledError
138-
OnUnhandledError = func(ctx context.Context, err error) {
137+
prev := GetOnUnhandledError()
138+
SetOnUnhandledError(func(ctx context.Context, err error) {
139139
unhandledError = err
140-
}
141-
defer func() { OnUnhandledError = prev }()
140+
})
141+
defer SetOnUnhandledError(prev)
142142

143143
observer := &observerImpl[int]{
144144
status: 0,

ro.go

Lines changed: 50 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,35 +18,61 @@ import (
1818
"context"
1919
"fmt"
2020
"log"
21+
"sync/atomic"
2122
)
2223

2324
var (
24-
// By default, the library will ignore unhandled errors and dropped notifications.
25-
// You can change this behavior by setting the following variables to your own
26-
// error handling functions.
27-
//
28-
// Example:
29-
//
30-
// ro.OnUnhandledError = func(ctx context.Context, err error) {
31-
// slog.Error(fmt.Sprintf("unhandled error: %s\n", err.Error()))
32-
// }
33-
//
34-
// ro.OnDroppedNotification = func(ctx context.Context, notification fmt.Stringer) {
35-
// slog.Warn(fmt.Sprintf("dropped notification: %s\n", notification.String()))
36-
// }
37-
//
38-
// Note: `OnUnhandledError` and `OnDroppedNotification` are called synchronously from
39-
// the goroutine that emits the error or the notification. A slow callback will slow
40-
// down the whole pipeline.
41-
42-
// OnUnhandledError is called when an error is emitted by an Observable and
43-
// no error handler is registered.
44-
OnUnhandledError = IgnoreOnUnhandledError
45-
// OnDroppedNotification is called when a notification is emitted by an Observable and
46-
// no notification handler is registered.
47-
OnDroppedNotification = IgnoreOnDroppedNotification
25+
// onUnhandledError stores the current handler for unhandled errors. It is accessed
26+
// via atomic.Value to allow concurrent readers and writers without data races.
27+
onUnhandledError atomic.Value // func(context.Context, error)
28+
29+
// onDroppedNotification stores the current handler for dropped notifications.
30+
onDroppedNotification atomic.Value // func(context.Context, fmt.Stringer)
4831
)
4932

33+
func init() {
34+
onUnhandledError.Store(IgnoreOnUnhandledError)
35+
onDroppedNotification.Store(IgnoreOnDroppedNotification)
36+
}
37+
38+
// SetOnUnhandledError sets the handler that will be invoked when an error is
39+
// emitted and not otherwise handled. Passing nil restores the default.
40+
func SetOnUnhandledError(fn func(ctx context.Context, err error)) {
41+
if fn == nil {
42+
fn = IgnoreOnUnhandledError
43+
}
44+
onUnhandledError.Store(fn)
45+
}
46+
47+
// GetOnUnhandledError returns the currently configured unhandled-error handler.
48+
func GetOnUnhandledError() func(ctx context.Context, err error) {
49+
return onUnhandledError.Load().(func(context.Context, error))
50+
}
51+
52+
// OnUnhandledError calls the currently configured unhandled-error handler.
53+
func OnUnhandledError(ctx context.Context, err error) {
54+
GetOnUnhandledError()(ctx, err)
55+
}
56+
57+
// SetOnDroppedNotification sets the handler invoked when a notification is
58+
// dropped. Passing nil restores the default.
59+
func SetOnDroppedNotification(fn func(ctx context.Context, notification fmt.Stringer)) {
60+
if fn == nil {
61+
fn = IgnoreOnDroppedNotification
62+
}
63+
onDroppedNotification.Store(fn)
64+
}
65+
66+
// GetOnDroppedNotification returns the currently configured dropped-notification handler.
67+
func GetOnDroppedNotification() func(ctx context.Context, notification fmt.Stringer) {
68+
return onDroppedNotification.Load().(func(context.Context, fmt.Stringer))
69+
}
70+
71+
// OnDroppedNotification calls the currently configured dropped-notification handler.
72+
func OnDroppedNotification(ctx context.Context, notification fmt.Stringer) {
73+
GetOnDroppedNotification()(ctx, notification)
74+
}
75+
5076
// IgnoreOnUnhandledError is the default implementation of `OnUnhandledError`.
5177
func IgnoreOnUnhandledError(ctx context.Context, err error) {}
5278

0 commit comments

Comments
 (0)