diff --git a/internal/channelmanager/channel_manager.go b/internal/channelmanager/channel_manager.go index 67faf0a..18f811b 100644 --- a/internal/channelmanager/channel_manager.go +++ b/internal/channelmanager/channel_manager.go @@ -38,7 +38,7 @@ func NewChannelManager(connManager *connectionmanager.ConnectionManager, log log reconnectInterval: reconnectInterval, reconnectionCount: 0, reconnectionCountMux: &sync.Mutex{}, - dispatcher: dispatcher.NewDispatcher(), + dispatcher: dispatcher.NewDispatcher(log), } go chanManager.startNotifyCancelOrClosed() return &chanManager, nil diff --git a/internal/connectionmanager/connection_manager.go b/internal/connectionmanager/connection_manager.go index fce1f2b..eb916d3 100644 --- a/internal/connectionmanager/connection_manager.go +++ b/internal/connectionmanager/connection_manager.go @@ -37,7 +37,7 @@ func NewConnectionManager(url string, conf amqp.Config, log logger.Logger, recon ReconnectInterval: reconnectInterval, reconnectionCount: 0, reconnectionCountMux: &sync.Mutex{}, - dispatcher: dispatcher.NewDispatcher(), + dispatcher: dispatcher.NewDispatcher(log), } go connManager.startNotifyClose() return &connManager, nil diff --git a/internal/dispatcher/dispatcher.go b/internal/dispatcher/dispatcher.go index 52385c6..a4863bf 100644 --- a/internal/dispatcher/dispatcher.go +++ b/internal/dispatcher/dispatcher.go @@ -1,7 +1,7 @@ package dispatcher import ( - "log" + "github.com/wagslane/go-rabbitmq/internal/logger" "math" "math/rand" "sync" @@ -12,6 +12,7 @@ import ( type Dispatcher struct { subscribers map[int]dispatchSubscriber subscribersMux *sync.Mutex + logger logger.Logger } type dispatchSubscriber struct { @@ -20,10 +21,11 @@ type dispatchSubscriber struct { } // NewDispatcher - -func NewDispatcher() *Dispatcher { +func NewDispatcher(logger logger.Logger) *Dispatcher { return &Dispatcher{ subscribers: make(map[int]dispatchSubscriber), subscribersMux: &sync.Mutex{}, + logger: logger, } } @@ -34,7 +36,7 @@ func (d *Dispatcher) Dispatch(err error) error { for _, subscriber := range d.subscribers { select { case <-time.After(time.Second * 5): - log.Println("Unexpected rabbitmq error: timeout in dispatch") + d.logger.Errorf("Unexpected rabbitmq error: timeout in dispatch") case subscriber.notifyCancelOrCloseChan <- err: } } diff --git a/internal/dispatcher/dispatcher_test.go b/internal/dispatcher/dispatcher_test.go index afc5509..dc7889d 100644 --- a/internal/dispatcher/dispatcher_test.go +++ b/internal/dispatcher/dispatcher_test.go @@ -5,8 +5,16 @@ import ( "time" ) +type lgr struct{} + +func (l *lgr) Fatalf(string, ...interface{}) {} +func (l *lgr) Errorf(string, ...interface{}) {} +func (l *lgr) Warnf(string, ...interface{}) {} +func (l *lgr) Infof(string, ...interface{}) {} +func (l *lgr) Debugf(string, ...interface{}) {} + func TestNewDispatcher(t *testing.T) { - d := NewDispatcher() + d := NewDispatcher(&lgr{}) if d.subscribers == nil { t.Error("Dispatcher subscribers is nil") } @@ -16,7 +24,7 @@ func TestNewDispatcher(t *testing.T) { } func TestAddSubscriber(t *testing.T) { - d := NewDispatcher() + d := NewDispatcher(&lgr{}) d.AddSubscriber() if len(d.subscribers) != 1 { t.Error("Dispatcher subscribers length is not 1") @@ -24,7 +32,7 @@ func TestAddSubscriber(t *testing.T) { } func TestCloseSubscriber(t *testing.T) { - d := NewDispatcher() + d := NewDispatcher(&lgr{}) _, closeCh := d.AddSubscriber() close(closeCh) time.Sleep(time.Millisecond)