1
1
use clap:: { App , Arg } ;
2
2
use log:: { debug, error, info} ;
3
- use rdkafka:: config:: ClientConfig ;
3
+ use rdkafka:: config:: { ClientConfig , RDKafkaLogLevel } ;
4
+ use rdkafka:: consumer:: stream_consumer:: StreamConsumer ;
5
+ use rdkafka:: consumer:: CommitMode ;
6
+ use rdkafka:: consumer:: Consumer ;
4
7
use rdkafka:: producer:: FutureProducer ;
5
8
use rdkafka:: util:: get_rdkafka_version;
6
- use rust_arroyo:: backends:: kafka:: config :: KafkaConfig ;
7
- use rust_arroyo:: backends:: kafka :: KafkaConsumer ;
8
- use rust_arroyo:: backends :: { AssignmentCallbacks , Consumer } ;
9
+ use rust_arroyo:: backends:: kafka:: create_kafka_message ;
10
+ use rust_arroyo:: backends:: AssignmentCallbacks ;
11
+ use rust_arroyo:: processing :: strategies :: async_noop :: build_topic_partitions ;
9
12
use rust_arroyo:: processing:: strategies:: async_noop:: AsyncNoopCommit ;
13
+ use rust_arroyo:: processing:: strategies:: async_noop:: CustomContext ;
10
14
use rust_arroyo:: types:: { Partition , Topic } ;
11
15
use std:: collections:: HashMap ;
12
16
use std:: time:: Duration ;
13
17
use std:: time:: SystemTime ;
14
18
use tokio:: time:: timeout;
15
19
20
+ // A type alias with your custom consumer can be created for convenience.
21
+ type LoggingConsumer = StreamConsumer < CustomContext > ;
22
+
16
23
struct EmptyCallbacks { }
17
24
impl AssignmentCallbacks for EmptyCallbacks {
18
25
fn on_assign ( & mut self , _: HashMap < Partition , u64 > ) {
@@ -30,19 +37,28 @@ async fn consume_and_produce(
30
37
dest_topic : & str ,
31
38
batch_size : usize ,
32
39
) {
33
- let config = KafkaConfig :: new_consumer_config (
34
- vec ! [ brokers. to_string( ) ] ,
35
- group_id. to_string ( ) ,
36
- "earliest" . to_string ( ) ,
37
- false ,
38
- None ,
39
- ) ;
40
- let mut consumer = KafkaConsumer :: new ( config) ;
40
+ let context = CustomContext { } ;
41
+
42
+ let consumer: LoggingConsumer = ClientConfig :: new ( )
43
+ . set ( "group.id" , group_id)
44
+ . set ( "bootstrap.servers" , brokers)
45
+ . set ( "enable.partition.eof" , "false" )
46
+ . set ( "session.timeout.ms" , "6000" )
47
+ . set ( "enable.auto.commit" , "false" )
48
+ //.set("statistics.interval.ms", "30000")
49
+ . set ( "auto.offset.reset" , "earliest" )
50
+ . set_log_level ( RDKafkaLogLevel :: Warning )
51
+ . create_with_context ( context)
52
+ . expect ( "Consumer creation failed" ) ;
53
+
54
+ consumer
55
+ . subscribe ( & [ source_topic] )
56
+ . expect ( "Can't subscribe to specified topics" ) ;
57
+
41
58
let topic = Topic {
42
59
name : source_topic. to_string ( ) ,
43
60
} ;
44
61
let topic_clone = topic. clone ( ) ;
45
- let _ = consumer. subscribe ( & [ topic] , Box :: new ( EmptyCallbacks { } ) ) ;
46
62
47
63
let producer: FutureProducer = ClientConfig :: new ( )
48
64
. set ( "bootstrap.servers" , brokers)
@@ -66,31 +82,28 @@ async fn consume_and_produce(
66
82
} ;
67
83
loop {
68
84
match timeout ( Duration :: from_secs ( 2 ) , consumer. recv ( ) ) . await {
69
- Ok ( result) => {
70
- match result {
71
- Ok ( message) => {
72
- match strategy. poll ( ) . await {
73
- Some ( request) => {
74
- consumer. stage_positions ( request. positions ) . await . unwrap ( ) ;
75
- consumer. commit_positions ( ) . await . unwrap ( ) ;
76
- //info!("Committed: {:?}", request);
77
- }
78
- None => { }
85
+ Ok ( result) => match result {
86
+ Err ( e) => panic ! ( "Kafka error: {}" , e) ,
87
+ Ok ( m) => {
88
+ match strategy. poll ( ) . await {
89
+ Some ( partition_list) => {
90
+ let part_list = build_topic_partitions ( partition_list) ;
91
+ consumer. commit ( & part_list, CommitMode :: Sync ) . unwrap ( ) ;
92
+ info ! ( "Committed: {:?}" , part_list) ;
79
93
}
80
- strategy. submit ( message) . await ;
81
- }
82
- Err ( e) => {
83
- panic ! ( "Kafka error: {}" , e)
94
+ None => { }
84
95
}
96
+
97
+ strategy. submit ( create_kafka_message ( m) ) . await ;
85
98
}
86
- }
99
+ } ,
87
100
Err ( _) => {
88
101
error ! ( "timeoout, flushing batch" ) ;
89
102
match strategy. poll ( ) . await {
90
- Some ( request ) => {
91
- consumer . stage_positions ( request . positions ) . await . unwrap ( ) ;
92
- consumer. commit_positions ( ) . await . unwrap ( ) ;
93
- // info!("Committed: {:?}", request );
103
+ Some ( partition_list ) => {
104
+ let part_list = build_topic_partitions ( partition_list ) ;
105
+ consumer. commit ( & part_list , CommitMode :: Sync ) . unwrap ( ) ;
106
+ info ! ( "Committed: {:?}" , part_list ) ;
94
107
}
95
108
None => { }
96
109
}
0 commit comments