Skip to content
16 changes: 9 additions & 7 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use clap::Args;
use rdkafka::{
config::ClientConfig,
consumer::{Consumer, StreamConsumer},
error::KafkaError,
};

pub type DigitizerId = u8;
Expand Down Expand Up @@ -71,8 +72,8 @@ pub fn create_default_consumer(
username: &Option<String>,
password: &Option<String>,
consumer_group: &String,
topics_to_subscribe: &[&str],
) -> StreamConsumer {
topics_to_subscribe: Option<&[&str]>,
) -> Result<StreamConsumer, KafkaError> {
// Setup consumer with arguments and default parameters.
let consumer: StreamConsumer = generate_kafka_client_config(broker_address, username, password)
.set("group.id", consumer_group)
Expand All @@ -82,10 +83,11 @@ pub fn create_default_consumer(
.create()
.expect("kafka consumer should be created");

// Subscribe to topics.
consumer
.subscribe(topics_to_subscribe)
.expect("kafka topic should be subscribed");
// Subscribe to if topics are provided.
if let Some(topics_to_subscribe) = topics_to_subscribe {
// Note this fails if the topics list is empty
consumer.subscribe(topics_to_subscribe)?;
}

consumer
Ok(consumer)
}
5 changes: 3 additions & 2 deletions digitiser-aggregator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ async fn main() -> anyhow::Result<()> {
&kafka_opts.username,
&kafka_opts.password,
&args.consumer_group,
&[args.input_topic.as_str()],
);
Some(&[args.input_topic.as_str()]),
)
.expect("Topic list should be non-empty, this should never fail.");

let producer: FutureProducer = supermusr_common::generate_kafka_client_config(
&kafka_opts.broker,
Expand Down
3 changes: 3 additions & 0 deletions nexus-writer/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{hdf5_handlers::NexusHDF5Error, run_engine::NexusDateTime};
use glob::{GlobError, PatternError};
use rdkafka::error::KafkaError;
use std::{num::TryFromIntError, path::PathBuf};
use supermusr_streaming_types::time_conversions::GpsTimeConversionError;
use thiserror::Error;
Expand Down Expand Up @@ -42,6 +43,8 @@ pub(crate) enum NexusWriterError {
FlatBufferTimestampConversion(#[from] GpsTimeConversionError),
#[error("{0} at {1}")]
FlatBufferMissing(FlatBufferMissingError, ErrorCodeLocation),
#[error("Kafka Error: {0}")]
KafkaError(#[from] KafkaError),
#[error("Cannot convert local path to string: {path} at {location}")]
CannotConvertPath {
path: PathBuf,
Expand Down
97 changes: 97 additions & 0 deletions nexus-writer/src/kafka_topic_interface.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
use rdkafka::{
consumer::{Consumer, StreamConsumer},
error::KafkaResult,
};
use tracing::debug;

pub(crate) trait KafkaTopicInterface {
fn ensure_subscription_mode_is(&mut self, mode: TopicMode) -> KafkaResult<()>;
}

pub(super) struct Topics {
pub(super) control: String,
pub(super) log: String,
pub(super) frame_event: String,
pub(super) sample_env: String,
pub(super) alarm: String,
}

fn get_nonrepeating_list(list: Vec<&str>) -> Vec<&str> {
let mut topics_to_subscribe = list.into_iter().collect::<Vec<&str>>();
debug!("{topics_to_subscribe:?}");
topics_to_subscribe.sort();
topics_to_subscribe.dedup();
topics_to_subscribe
}

impl Topics {
fn get_full_nonrepeating_list(&self) -> Vec<&str> {
get_nonrepeating_list(vec![
&self.control,
&self.log,
&self.frame_event,
&self.sample_env,
&self.alarm,
])
}

fn get_continuous_only_nonrepeating_list(&self) -> Vec<&str> {
get_nonrepeating_list(vec![&self.control, &self.log, &self.frame_event])
}
}

#[derive(PartialEq)]
pub(crate) enum TopicMode {
Full,
ConitinousOnly,
}

pub(crate) struct TopicSubscriber<'a> {
mode: Option<TopicMode>,
consumer: &'a StreamConsumer,
full_list: Vec<&'a str>,
continous_only_list: Vec<&'a str>,
}

impl<'a> TopicSubscriber<'a> {
pub(crate) fn new(consumer: &'a StreamConsumer, topics: &'a Topics) -> Self {
let full_list = topics.get_full_nonrepeating_list();
let continous_only_list = topics.get_continuous_only_nonrepeating_list();
Self {
mode: None,
consumer,
full_list,
continous_only_list,
}
}
}

impl KafkaTopicInterface for TopicSubscriber<'_> {
fn ensure_subscription_mode_is(&mut self, mode: TopicMode) -> KafkaResult<()> {
if self
.mode
.as_ref()
.is_none_or(|this_mode| this_mode.eq(&mode))
{
if self.mode.is_some() {
self.consumer.unsubscribe();
}
match mode {
TopicMode::Full => self.consumer.subscribe(&self.full_list)?,
TopicMode::ConitinousOnly => self.consumer.subscribe(&self.continous_only_list)?,
};
self.mode = Some(mode);
}
Ok(())
}
}

#[cfg(test)]
pub(crate) struct NoKafka;

#[cfg(test)]
impl KafkaTopicInterface for NoKafka {
fn ensure_subscription_mode_is(&mut self, _mode: TopicMode) -> KafkaResult<()> {
Ok(())
}
}
59 changes: 26 additions & 33 deletions nexus-writer/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod error;
mod flush_to_archive;
mod hdf5_handlers;
mod kafka_topic_interface;
mod message_handlers;
mod nexus;
mod nexus_structure;
Expand All @@ -9,6 +10,7 @@ mod run_engine;
use chrono::Duration;
use clap::Parser;
use flush_to_archive::create_archive_flush_task;
use kafka_topic_interface::{KafkaTopicInterface, TopicMode, TopicSubscriber, Topics};
use message_handlers::{
process_payload_on_alarm_topic, process_payload_on_control_topic,
process_payload_on_frame_event_list_topic, process_payload_on_runlog_topic,
Expand All @@ -21,8 +23,8 @@ use rdkafka::{
consumer::{CommitMode, Consumer},
message::{BorrowedMessage, Message},
};
use run_engine::{NexusConfiguration, NexusEngine, NexusSettings};
use std::{fs::create_dir_all, net::SocketAddr, path::PathBuf};
use run_engine::{NexusConfiguration, NexusEngine, NexusEngineDependencies, NexusSettings};
use std::{fs::create_dir_all, marker::PhantomData, net::SocketAddr, path::PathBuf};
use supermusr_common::{
init_tracer,
metrics::{
Expand Down Expand Up @@ -113,30 +115,13 @@ struct Cli {
frame_list_chunk_size: usize,
}

struct Topics<'a> {
control: &'a str,
log: &'a str,
frame_event: &'a str,
sample_env: &'a str,
alarm: &'a str,
struct EngineDependencies<'a> {
phantom: PhantomData<&'a ()>,
}

impl Topics<'_> {
fn get_nonrepeating_list(&self) -> Vec<&str> {
let mut topics_to_subscribe = [
self.control,
self.log,
self.frame_event,
self.sample_env,
self.alarm,
]
.into_iter()
.collect::<Vec<&str>>();
debug!("{topics_to_subscribe:?}");
topics_to_subscribe.sort();
topics_to_subscribe.dedup();
topics_to_subscribe
}
impl<'a> NexusEngineDependencies for EngineDependencies<'a> {
type FileInterface = NexusFile;
type TopicInterface = TopicSubscriber<'a>;
}

#[tokio::main]
Expand All @@ -152,11 +137,11 @@ async fn main() -> anyhow::Result<()> {

// Get topics to subscribe to from command line arguments.
let topics = Topics {
control: args.control_topic.as_str(),
log: args.log_topic.as_str(),
frame_event: args.frame_event_topic.as_str(),
sample_env: args.sample_env_topic.as_str(),
alarm: args.alarm_topic.as_str(),
control: args.control_topic.clone(),
log: args.log_topic.clone(),
frame_event: args.frame_event_topic.clone(),
sample_env: args.sample_env_topic.clone(),
alarm: args.alarm_topic.clone(),
};

let kafka_opts = args.common_kafka_options;
Expand All @@ -166,8 +151,11 @@ async fn main() -> anyhow::Result<()> {
&kafka_opts.username,
&kafka_opts.password,
&args.consumer_group,
&topics.get_nonrepeating_list(),
);
None,
)
.expect("Error not possible, this should never fail.");
let mut topics_subscriber = TopicSubscriber::new(&consumer, &topics);
topics_subscriber.ensure_subscription_mode_is(TopicMode::Full)?;

let nexus_settings = NexusSettings::new(
args.local_path.as_path(),
Expand All @@ -191,7 +179,11 @@ async fn main() -> anyhow::Result<()> {

let nexus_configuration = NexusConfiguration::new(args.configuration_options);

let mut nexus_engine = NexusEngine::<NexusFile>::new(nexus_settings, nexus_configuration);
let mut nexus_engine = NexusEngine::<EngineDependencies>::new(
nexus_settings,
nexus_configuration,
topics_subscriber,
);
nexus_engine.resume_partial_runs()?;

// Install exporter and register metrics
Expand Down Expand Up @@ -235,6 +227,7 @@ async fn main() -> anyhow::Result<()> {
},
Ok(msg) => {
process_kafka_message(&topics, &mut nexus_engine, tracer.use_otel(), &msg);

if let Err(e) = consumer.commit_message(&msg, CommitMode::Async){
error!("Failed to commit Kafka message consumption: {e}");
}
Expand All @@ -259,7 +252,7 @@ async fn main() -> anyhow::Result<()> {
))]
fn process_kafka_message(
topics: &Topics,
nexus_engine: &mut NexusEngine<NexusFile>,
nexus_engine: &mut NexusEngine<EngineDependencies>,
use_otel: bool,
msg: &BorrowedMessage,
) {
Expand Down
Loading
Loading