-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathmessage_debug.rs
More file actions
68 lines (61 loc) · 2.36 KB
/
message_debug.rs
File metadata and controls
68 lines (61 loc) · 2.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
use super::CommonOpts;
use rdkafka::{
Message,
consumer::{CommitMode, Consumer, StreamConsumer},
};
use supermusr_streaming_types::dat2_digitizer_analog_trace_v2_generated::{
digitizer_analog_trace_message_buffer_has_identifier, root_as_digitizer_analog_trace_message,
};
use tracing::{debug, error, info, warn};
// Message dumping tool
pub(crate) async fn run(args: CommonOpts) -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let kafka_opts = args.common_kafka_options;
let consumer: StreamConsumer = supermusr_common::generate_kafka_client_config(
&kafka_opts.broker,
&kafka_opts.username,
&kafka_opts.password,
)
.set("group.id", &args.consumer_group)
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false")
.create()?;
consumer.subscribe(&[&args.topic])?;
loop {
match consumer.recv().await {
Err(e) => warn!("Kafka error: {}", e),
Ok(msg) => {
debug!(
"key: '{:?}', topic: {}, partition: {}, offset: {}, timestamp: {:?}",
msg.key(),
msg.topic(),
msg.partition(),
msg.offset(),
msg.timestamp()
);
if let Some(payload) = msg.payload() {
if digitizer_analog_trace_message_buffer_has_identifier(payload) {
match root_as_digitizer_analog_trace_message(payload) {
Ok(data) => {
info!(
"Trace packet: dig. ID: {}, metadata: {:?}",
data.digitizer_id(),
data.metadata()
);
}
Err(e) => {
warn!("Failed to parse message: {}", e);
}
}
} else {
warn!("Unexpected message type on topic \"{}\"", msg.topic());
}
}
if let Err(e) = consumer.commit_message(&msg, CommitMode::Async) {
error!("Failed to commit message: {e}");
}
}
};
}
}