@@ -2,15 +2,16 @@ package mercure
22
33import (
44 "bytes"
5+ "context"
56 "encoding/binary"
67 "encoding/json"
78 "fmt"
9+ "log/slog"
810 "math/rand"
911 "sync"
1012 "time"
1113
1214 bolt "go.etcd.io/bbolt"
13- "go.uber.org/zap"
1415)
1516
1617const BoltDefaultCleanupFrequency = 0.3
@@ -22,7 +23,7 @@ type BoltTransport struct {
2223 sync.RWMutex
2324
2425 subscribers * SubscriberList
25- logger Logger
26+ logger * slog. Logger
2627 db * bolt.DB
2728 bucketName string
2829 size uint64
@@ -36,7 +37,7 @@ type BoltTransport struct {
3637// NewBoltTransport creates a new BoltTransport.
3738func NewBoltTransport (
3839 subscriberList * SubscriberList ,
39- logger Logger ,
40+ logger * slog. Logger ,
4041 path string ,
4142 bucketName string ,
4243 size uint64 ,
@@ -95,7 +96,7 @@ func getDBLastEventID(db *bolt.DB, bucketName string) (string, error) {
9596}
9697
9798// Dispatch dispatches an update to all subscribers and persists it in Bolt DB.
98- func (t * BoltTransport ) Dispatch (update * Update ) error {
99+ func (t * BoltTransport ) Dispatch (ctx context. Context , update * Update ) error {
99100 select {
100101 case <- t .closed :
101102 return ErrClosedTransport
@@ -118,14 +119,14 @@ func (t *BoltTransport) Dispatch(update *Update) error {
118119 }
119120
120121 for _ , s := range t .subscribers .MatchAny (update ) {
121- s .Dispatch (update , false )
122+ s .Dispatch (ctx , update , false )
122123 }
123124
124125 return nil
125126}
126127
127128// AddSubscriber adds a new subscriber to the transport.
128- func (t * BoltTransport ) AddSubscriber (s * LocalSubscriber ) error {
129+ func (t * BoltTransport ) AddSubscriber (ctx context. Context , s * LocalSubscriber ) error {
129130 select {
130131 case <- t .closed :
131132 return ErrClosedTransport
@@ -138,18 +139,18 @@ func (t *BoltTransport) AddSubscriber(s *LocalSubscriber) error {
138139 t .Unlock ()
139140
140141 if s .RequestLastEventID != "" {
141- if err := t .dispatchHistory (s , toSeq ); err != nil {
142+ if err := t .dispatchHistory (ctx , s , toSeq ); err != nil {
142143 return err
143144 }
144145 }
145146
146- s .Ready ()
147+ s .Ready (ctx )
147148
148149 return nil
149150}
150151
151152// RemoveSubscriber removes a new subscriber from the transport.
152- func (t * BoltTransport ) RemoveSubscriber (s * LocalSubscriber ) error {
153+ func (t * BoltTransport ) RemoveSubscriber (_ context. Context , s * LocalSubscriber ) error {
153154 select {
154155 case <- t .closed :
155156 return ErrClosedTransport
@@ -165,15 +166,15 @@ func (t *BoltTransport) RemoveSubscriber(s *LocalSubscriber) error {
165166}
166167
167168// GetSubscribers get the list of active subscribers.
168- func (t * BoltTransport ) GetSubscribers () (string , []* Subscriber , error ) {
169+ func (t * BoltTransport ) GetSubscribers (_ context. Context ) (string , []* Subscriber , error ) {
169170 t .RLock ()
170171 defer t .RUnlock ()
171172
172173 return t .lastEventID , getSubscribers (t .subscribers ), nil
173174}
174175
175176// Close closes the Transport.
176- func (t * BoltTransport ) Close () (err error ) {
177+ func (t * BoltTransport ) Close (_ context. Context ) (err error ) {
177178 t .closedOnce .Do (func () {
178179 close (t .closed )
179180
@@ -196,7 +197,7 @@ func (t *BoltTransport) Close() (err error) {
196197}
197198
198199//nolint:gocognit
199- func (t * BoltTransport ) dispatchHistory (s * LocalSubscriber , toSeq uint64 ) error {
200+ func (t * BoltTransport ) dispatchHistory (ctx context. Context , s * LocalSubscriber , toSeq uint64 ) error {
200201 err := t .db .View (func (tx * bolt.Tx ) error {
201202 b := tx .Bucket ([]byte (t .bucketName ))
202203 if b == nil {
@@ -223,14 +224,13 @@ func (t *BoltTransport) dispatchHistory(s *LocalSubscriber, toSeq uint64) error
223224 if err := json .Unmarshal (v , & update ); err != nil {
224225 s .HistoryDispatched (responseLastEventID )
225226
226- if c := t .logger .Check (zap .ErrorLevel , "Unable to unmarshal update coming from the Bolt DB" ); c != nil {
227- c .Write (zap .Error (err ))
228- }
227+ err := fmt .Errorf ("unable to unmarshal update: %w" , err )
228+ t .logger .ErrorContext (ctx , "Unable to unmarshal update coming from the Bolt DB" , slog .Any ("update" , update ), slog .Any ("error" , err ))
229229
230- return fmt . Errorf ( "unable to unmarshal update: %w" , err )
230+ return err
231231 }
232232
233- if (s .Match (update ) && ! s .Dispatch (update , true )) || (toSeq > 0 && binary .BigEndian .Uint64 (k [:8 ]) >= toSeq ) {
233+ if (s .Match (update ) && ! s .Dispatch (ctx , update , true )) || (toSeq > 0 && binary .BigEndian .Uint64 (k [:8 ]) >= toSeq ) {
234234 s .HistoryDispatched (responseLastEventID )
235235
236236 return nil
@@ -240,9 +240,7 @@ func (t *BoltTransport) dispatchHistory(s *LocalSubscriber, toSeq uint64) error
240240 s .HistoryDispatched (responseLastEventID )
241241
242242 if ! afterFromID {
243- if c := t .logger .Check (zap .InfoLevel , "Can't find requested LastEventID" ); c != nil {
244- c .Write (zap .String ("LastEventID" , s .RequestLastEventID ))
245- }
243+ t .logger .InfoContext (ctx , "Can't find requested LastEventID" )
246244 }
247245
248246 return nil
0 commit comments