11using System ;
2- using System . Collections . Generic ;
3- using System . Text ;
42using System . Threading ;
53using System . Threading . Tasks ;
64using Confluent . Kafka ;
75
86namespace Take . Elephant . Kafka
97{
10- public class KafkaPartitionQueue < T > : IReceiverQueue < T > , IPartitionSenderQueue < T > , ICloseable , IDisposable
8+ public class KafkaPartitionQueue < T > : IKafkaReceiverQueue < T > , IPartitionSenderQueue < T > , ICloseable , IDisposable
119 {
1210 private readonly KafkaPartitionSenderQueue < T > _senderQueue ;
1311 private readonly KafkaReceiverQueue < T > _receiverQueue ;
@@ -16,12 +14,43 @@ public KafkaPartitionQueue(
1614 ProducerConfig producerConfig ,
1715 ConsumerConfig consumerConfig ,
1816 string topic ,
19- Take . Elephant . ISerializer < T > serializer ,
20- Confluent . Kafka . ISerializer < string > kafkaSerializer = null ,
21- IDeserializer < string > kafkaDeserializer = null )
17+ ISerializer < T > serializer )
18+ : this ( producerConfig , consumerConfig , topic , serializer , null , null , null )
2219 {
23- _senderQueue = new KafkaPartitionSenderQueue < T > ( producerConfig , topic , serializer , kafkaSerializer ) ;
24- _receiverQueue = new KafkaReceiverQueue < T > ( consumerConfig , topic , serializer , kafkaDeserializer ) ;
20+ }
21+
22+ public KafkaPartitionQueue (
23+ ProducerConfig producerConfig ,
24+ ConsumerConfig consumerConfig ,
25+ string topic ,
26+ ISerializer < T > serializer ,
27+ Confluent . Kafka . ISerializer < string > kafkaSerializer )
28+ : this ( producerConfig , consumerConfig , topic , serializer , kafkaSerializer , null , null )
29+ {
30+ }
31+
32+ public KafkaPartitionQueue (
33+ ProducerConfig producerConfig ,
34+ ConsumerConfig consumerConfig ,
35+ string topic ,
36+ ISerializer < T > serializer ,
37+ Confluent . Kafka . ISerializer < string > kafkaSerializer ,
38+ IDeserializer < string > kafkaDeserializer )
39+ : this ( producerConfig , consumerConfig , topic , serializer , kafkaSerializer , kafkaDeserializer , null )
40+ {
41+ }
42+
43+ public KafkaPartitionQueue (
44+ ProducerConfig producerConfig ,
45+ ConsumerConfig consumerConfig ,
46+ string topic ,
47+ ISerializer < T > serializer ,
48+ Confluent . Kafka . ISerializer < string > kafkaSerializer ,
49+ IDeserializer < string > kafkaDeserializer ,
50+ IKafkaHeaderProvider headerProvider )
51+ {
52+ _senderQueue = new ( producerConfig , topic , serializer , kafkaSerializer , headerProvider ) ;
53+ _receiverQueue = new ( consumerConfig , topic , serializer , kafkaDeserializer ) ;
2554 }
2655
2756 public Task EnqueueAsync ( T item , string key , CancellationToken cancellationToken = default )
@@ -34,11 +63,25 @@ public Task<T> DequeueOrDefaultAsync(CancellationToken cancellationToken = defau
3463 return _receiverQueue . DequeueOrDefaultAsync ( cancellationToken ) ;
3564 }
3665
66+ public Task < KafkaConsumedMessage < T > > DequeueWithHeadersOrDefaultAsync (
67+ CancellationToken cancellationToken = default
68+ )
69+ {
70+ return _receiverQueue . DequeueWithHeadersOrDefaultAsync ( cancellationToken ) ;
71+ }
72+
3773 public Task < T > DequeueAsync ( CancellationToken cancellationToken = default )
3874 {
3975 return _receiverQueue . DequeueAsync ( cancellationToken ) ;
4076 }
4177
78+ public Task < KafkaConsumedMessage < T > > DequeueWithHeadersAsync (
79+ CancellationToken cancellationToken
80+ )
81+ {
82+ return _receiverQueue . DequeueWithHeadersAsync ( cancellationToken ) ;
83+ }
84+
4285 public Task CloseAsync ( CancellationToken cancellationToken ) => _receiverQueue . CloseAsync ( cancellationToken ) ;
4386
4487 public void Dispose ( )
@@ -49,11 +92,9 @@ public void Dispose()
4992
5093 protected void Dispose ( bool disposing )
5194 {
52- if ( disposing )
53- {
54- _senderQueue ? . Dispose ( ) ;
55- _receiverQueue ? . Dispose ( ) ;
56- }
95+ if ( ! disposing ) return ;
96+ _senderQueue ? . Dispose ( ) ;
97+ _receiverQueue ? . Dispose ( ) ;
5798 }
5899 }
59100}
0 commit comments