@@ -191,6 +191,10 @@ func TestWriter(t *testing.T) {
191191 scenario : "test write message with writer data" ,
192192 function : testWriteMessageWithWriterData ,
193193 },
194+ {
195+ scenario : "test no new partition writers after close" ,
196+ function : testWriterNoNewPartitionWritersAfterClose ,
197+ },
194198 }
195199
196200 for _ , test := range tests {
@@ -1030,6 +1034,43 @@ func testWriterOverrideConfigStats(t *testing.T) {
10301034 }
10311035}
10321036
1037+ func testWriterNoNewPartitionWritersAfterClose (t * testing.T ) {
1038+ ctx , cancel := context .WithTimeout (context .Background (), 5 * time .Second )
1039+ defer cancel ()
1040+ topic1 := makeTopic ()
1041+ createTopic (t , topic1 , 1 )
1042+ defer deleteTopic (t , topic1 )
1043+
1044+ w := newTestWriter (WriterConfig {
1045+ Topic : topic1 ,
1046+ })
1047+ defer w .Close () // try and close anyway after test finished
1048+
1049+ // using balancer to close writer right between first mutex is released and second mutex is taken to make map of partition writers
1050+ w .Balancer = mockBalancerFunc (func (m Message , i ... int ) int {
1051+ go w .Close () // close is blocking so run in goroutine
1052+ for ! w .closed { // wait until writer is marked as closed
1053+ time .Sleep (time .Millisecond )
1054+ }
1055+ w .mutex .Lock () // ability to get mutex means w.Close is waiting to finish pending tasks
1056+ defer w .mutex .Unlock ()
1057+ return 0
1058+ })
1059+
1060+ msg := Message {Value : []byte ("Hello World" )} // no topic
1061+
1062+ if err := w .WriteMessages (ctx , msg ); ! errors .Is (err , io .ErrClosedPipe ) {
1063+ t .Errorf ("expected error: %v got: %v" , io .ErrClosedPipe , err )
1064+ return
1065+ }
1066+ }
1067+
1068+ type mockBalancerFunc func (msg Message , partitions ... int ) (partition int )
1069+
1070+ func (b mockBalancerFunc ) Balance (msg Message , partitions ... int ) int {
1071+ return b (msg , partitions ... )
1072+ }
1073+
10331074type staticBalancer struct {
10341075 partition int
10351076}
0 commit comments