@@ -64,49 +64,19 @@ func (s *Service) listOffsetsCached(ctx context.Context, offsetType string) (kad
6464
6565// ListEndOffsets fetches the high water mark for all topic partitions.
6666func (s * Service ) ListEndOffsets (ctx context.Context ) (kadm.ListedOffsets , error ) {
67- return s .listOffsetsInternal (ctx , func (ctx context.Context , topics ... string ) (kadm.ListedOffsets , error ) {
68- if len (topics ) == 0 {
69- // Get all topics if none specified
70- allTopics , err := s .getAllowedTopics (ctx )
71- if err != nil {
72- return nil , fmt .Errorf ("failed to get topics for listing end offsets: %w" , err )
73- }
74- topics = allTopics
75- }
76- return s .admClient .ListEndOffsets (ctx , topics ... )
77- }, "end" )
67+ return s .listOffsetsInternal (ctx , s .admClient .ListEndOffsets , "end" )
7868}
7969
8070// ListStartOffsets fetches the low water mark for all topic partitions.
8171func (s * Service ) ListStartOffsets (ctx context.Context ) (kadm.ListedOffsets , error ) {
82- return s .listOffsetsInternal (ctx , func (ctx context.Context , topics ... string ) (kadm.ListedOffsets , error ) {
83- if len (topics ) == 0 {
84- // Get all topics if none specified
85- allTopics , err := s .getAllowedTopics (ctx )
86- if err != nil {
87- return nil , fmt .Errorf ("failed to get topics for listing start offsets: %w" , err )
88- }
89- topics = allTopics
90- }
91- return s .admClient .ListStartOffsets (ctx , topics ... )
92- }, "start" )
72+ return s .listOffsetsInternal (ctx , s .admClient .ListStartOffsets , "start" )
9373}
9474
9575// ListMaxTimestampOffsets fetches the offsets for the maximum timestamp for all topic partitions.
9676// This requires Kafka 3.0+ (see https://issues.apache.org/jira/browse/KAFKA-12541)
9777// Uses the dedicated ListMaxTimestampOffsets method from franz-go.
9878func (s * Service ) ListMaxTimestampOffsets (ctx context.Context ) (kadm.ListedOffsets , error ) {
99- return s .listOffsetsInternal (ctx , func (ctx context.Context , topics ... string ) (kadm.ListedOffsets , error ) {
100- if len (topics ) == 0 {
101- // Get all topics if none specified
102- allTopics , err := s .getAllowedTopics (ctx )
103- if err != nil {
104- return nil , fmt .Errorf ("failed to get topics for listing max timestamp offsets: %w" , err )
105- }
106- topics = allTopics
107- }
108- return s .admClient .ListMaxTimestampOffsets (ctx , topics ... )
109- }, "maxTimestamp" )
79+ return s .listOffsetsInternal (ctx , s .admClient .ListMaxTimestampOffsets , "maxTimestamp" )
11080}
11181
11282type listOffsetsFunc func (context.Context , ... string ) (kadm.ListedOffsets , error )
0 commit comments