@@ -15,6 +15,7 @@ import (
1515 "testing"
1616 "time"
1717
18+ "github.com/stretchr/testify/assert"
1819 "github.com/stretchr/testify/require"
1920)
2021
@@ -890,6 +891,49 @@ func TestReaderConsumerGroup(t *testing.T) {
890891 }
891892}
892893
894+ func TestAssignmentListener (t * testing.T ) {
895+ // It appears that some of the tests depend on all these tests being
896+ // run concurrently to pass... this is brittle and should be fixed
897+ // at some point.
898+ t .Parallel ()
899+
900+ topic := makeTopic ()
901+ createTopic (t , topic , 10 )
902+ defer deleteTopic (t , topic )
903+
904+ var lock sync.Mutex
905+ assignments := make ([][]GroupMemberTopic , 0 )
906+ groupID := makeGroupID ()
907+ r := NewReader (ReaderConfig {
908+ Brokers : []string {"localhost:9092" },
909+ Topic : topic ,
910+ GroupID : groupID ,
911+ HeartbeatInterval : 2 * time .Second ,
912+ CommitInterval : 1 * time .Second ,
913+ RebalanceTimeout : 2 * time .Second ,
914+ RetentionTime : time .Hour ,
915+ MinBytes : 1 ,
916+ MaxBytes : 1e6 ,
917+ AssignmentListener : func (partitions []GroupMemberTopic ) {
918+ lock .Lock ()
919+ defer lock .Unlock ()
920+ assignments = append (assignments , partitions )
921+ },
922+ })
923+ defer r .Close ()
924+
925+ assert .Eventually (t , func () bool {
926+ return reflect .DeepEqual (assignments , [][]GroupMemberTopic {
927+ {
928+ GroupMemberTopic {
929+ Topic : topic ,
930+ Partitions : []int {0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 },
931+ },
932+ },
933+ })
934+ }, 10 * time .Second , 100 * time .Millisecond )
935+ }
936+
893937func testReaderConsumerGroupHandshake (t * testing.T , ctx context.Context , r * Reader ) {
894938 prepareReader (t , context .Background (), r , makeTestSequence (5 )... )
895939
0 commit comments