Skip to content

Commit a29f43f

Browse files
authored
Merge pull request #145347 from asg0451/backport24.3-144497
release-24.3: changefeedccl: fix mock kafka server shutdown
2 parents b75aa74 + 15016a3 commit a29f43f

File tree

1 file changed

+5
-1
lines changed

1 file changed

+5
-1
lines changed

pkg/ccl/changefeedccl/testfeed_test.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -1819,11 +1819,15 @@ func (s *fakeKafkaSinkV2) Dial() error {
18191819
if m.Key != nil {
18201820
key = sarama.ByteEncoder(m.Key)
18211821
}
1822-
s.feedCh <- &sarama.ProducerMessage{
1822+
select {
1823+
case <-ctx.Done():
1824+
return kgo.ProduceResults{kgo.ProduceResult{Err: ctx.Err()}}
1825+
case s.feedCh <- &sarama.ProducerMessage{
18231826
Topic: m.Topic,
18241827
Key: key,
18251828
Value: sarama.ByteEncoder(m.Value),
18261829
Partition: m.Partition,
1830+
}:
18271831
}
18281832
}
18291833
return nil

0 commit comments

Comments
 (0)