7
7
using System . Collections . Concurrent ;
8
8
using System . Collections . Generic ;
9
9
using System . Diagnostics ;
10
+ using System . Linq ;
10
11
using System . Text ;
11
12
using Microsoft . Extensions . Logging ;
12
13
using Streamiz . Kafka . Net . Metrics ;
@@ -81,24 +82,28 @@ public void Clear()
81
82
private readonly IStreamConfig configuration ;
82
83
private readonly TaskId id ;
83
84
private readonly Sensor droppedRecordsSensor ;
85
+ private readonly IAdminClient _adminClient ;
84
86
private Exception exception = null ;
85
87
86
88
private readonly string logPrefix ;
87
89
private readonly ILogger log = Logger . GetLogger ( typeof ( RecordCollector ) ) ;
88
90
89
- private readonly ConcurrentDictionary < TopicPartition , long > collectorsOffsets =
90
- new ConcurrentDictionary < TopicPartition , long > ( ) ;
91
+ private readonly ConcurrentDictionary < TopicPartition , long > collectorsOffsets = new ( ) ;
91
92
92
- private readonly RetryRecordContext retryRecordContext = new RetryRecordContext ( ) ;
93
+ private readonly RetryRecordContext retryRecordContext = new ( ) ;
93
94
94
95
public IDictionary < TopicPartition , long > CollectorOffsets => collectorsOffsets . ToDictionary ( ) ;
95
96
96
- public RecordCollector ( string logPrefix , IStreamConfig configuration , TaskId id , Sensor droppedRecordsSensor )
97
+ public IDictionary < string , ( int , DateTime ) > cachePartitionsForTopics =
98
+ new Dictionary < string , ( int , DateTime ) > ( ) ;
99
+
100
+ public RecordCollector ( string logPrefix , IStreamConfig configuration , TaskId id , Sensor droppedRecordsSensor , IAdminClient adminClient )
97
101
{
98
102
this . logPrefix = $ "{ logPrefix } ";
99
103
this . configuration = configuration ;
100
104
this . id = id ;
101
105
this . droppedRecordsSensor = droppedRecordsSensor ;
106
+ _adminClient = adminClient ;
102
107
}
103
108
104
109
public void Init ( ref IProducer < byte [ ] , byte [ ] > producer )
@@ -136,6 +141,8 @@ public void Close()
136
141
}
137
142
}
138
143
}
144
+
145
+ _adminClient ? . Dispose ( ) ;
139
146
}
140
147
141
148
public void Flush ( )
@@ -416,5 +423,20 @@ private void CheckForException()
416
423
throw e ;
417
424
}
418
425
}
426
+
427
+ public int PartitionsFor ( string topic )
428
+ {
429
+ var adminConfig = configuration . ToAdminConfig ( "" ) ;
430
+ var refreshInterval = adminConfig . TopicMetadataRefreshIntervalMs ?? 5 * 60 * 1000 ;
431
+
432
+ if ( cachePartitionsForTopics . ContainsKey ( topic ) &&
433
+ cachePartitionsForTopics [ topic ] . Item2 + TimeSpan . FromMilliseconds ( refreshInterval ) > DateTime . Now )
434
+ return cachePartitionsForTopics [ topic ] . Item1 ;
435
+
436
+ var metadata = _adminClient . GetMetadata ( topic , TimeSpan . FromSeconds ( 5 ) ) ;
437
+ var partitionCount = metadata . Topics . FirstOrDefault ( t => t . Topic . Equals ( topic ) ) ! . Partitions . Count ;
438
+ cachePartitionsForTopics . Add ( topic , ( partitionCount , DateTime . Now ) ) ;
439
+ return partitionCount ;
440
+ }
419
441
}
420
442
}
0 commit comments