diff --git a/internal/impl/mysql/input_mysql_stream.go b/internal/impl/mysql/input_mysql_stream.go index 07fe87ca87..3fa456b001 100644 --- a/internal/impl/mysql/input_mysql_stream.go +++ b/internal/impl/mysql/input_mysql_stream.go @@ -952,11 +952,15 @@ func (i *mysqlStreamInput) onMessage(e *canal.RowsEvent, initValue, incrementVal } message[col.Name] = v } - i.rawMessageEvents <- MessageEvent{ + select { + case i.rawMessageEvents <- MessageEvent{ Row: message, Operation: MessageOperation(e.Action), Table: e.Table.Name, Position: &position{Name: i.currentBinlogName, Pos: e.Header.LogPos}, + }: + case <-i.shutSig.SoftStopChan(): + return context.Canceled } } return nil diff --git a/internal/impl/mysql/integration_test.go b/internal/impl/mysql/integration_test.go index 45b9ad3d92..926e18d13c 100644 --- a/internal/impl/mysql/integration_test.go +++ b/internal/impl/mysql/integration_test.go @@ -199,8 +199,9 @@ file: } go func() { - err = streamOut.Run(t.Context()) - require.NoError(t, err) + if err := streamOut.Run(t.Context()); err != nil && !errors.Is(err, context.Canceled) { + t.Error(err) + } }() assert.Eventually(t, func() bool { @@ -263,8 +264,7 @@ file: license.InjectTestService(streamOut.Resources()) go func() { - err = streamOut.Run(t.Context()) - require.NoError(t, err) + _ = streamOut.Run(t.Context()) }() time.Sleep(time.Second * 5) @@ -346,8 +346,9 @@ file: license.InjectTestService(streamOut.Resources()) go func() { - err = streamOut.Run(t.Context()) - require.NoError(t, err) + if err := streamOut.Run(t.Context()); err != nil && !errors.Is(err, context.Canceled) { + t.Error(err) + } }() time.Sleep(time.Second * 5) @@ -1064,8 +1065,7 @@ file: license.InjectTestService(streamOut.Resources()) go func() { - err = streamOut.Run(t.Context()) - require.NoError(t, err) + _ = streamOut.Run(t.Context()) }() // Wait for stream to start