Skip to content
19 changes: 10 additions & 9 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use clap::Args;
use rdkafka::{
config::ClientConfig,
consumer::{Consumer, StreamConsumer},
error::KafkaError,
};

pub type DigitizerId = u8;
Expand Down Expand Up @@ -71,21 +72,21 @@ pub fn create_default_consumer(
username: &Option<String>,
password: &Option<String>,
consumer_group: &String,
topics_to_subscribe: &[&str],
) -> StreamConsumer {
topics_to_subscribe: Option<&[&str]>,
) -> Result<StreamConsumer, KafkaError> {
// Setup consumer with arguments and default parameters.
let consumer: StreamConsumer = generate_kafka_client_config(broker_address, username, password)
.set("group.id", consumer_group)
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false")
.create()
.expect("kafka consumer should be created");
.create()?;

// Subscribe to topics.
consumer
.subscribe(topics_to_subscribe)
.expect("kafka topic should be subscribed");
// Subscribe to if topics are provided.
if let Some(topics_to_subscribe) = topics_to_subscribe {
// Note this fails if the topics list is empty
consumer.subscribe(topics_to_subscribe)?;
}

consumer
Ok(consumer)
}
4 changes: 2 additions & 2 deletions digitiser-aggregator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ async fn main() -> anyhow::Result<()> {
&kafka_opts.username,
&kafka_opts.password,
&args.consumer_group,
&[args.input_topic.as_str()],
);
Some(&[args.input_topic.as_str()]),
)?;

let producer: FutureProducer = supermusr_common::generate_kafka_client_config(
&kafka_opts.broker,
Expand Down
3 changes: 3 additions & 0 deletions nexus-writer/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{hdf5_handlers::NexusHDF5Error, run_engine::NexusDateTime};
use glob::{GlobError, PatternError};
use rdkafka::error::KafkaError;
use std::{num::TryFromIntError, path::PathBuf};
use supermusr_streaming_types::time_conversions::GpsTimeConversionError;
use thiserror::Error;
Expand Down Expand Up @@ -42,6 +43,8 @@ pub(crate) enum NexusWriterError {
FlatBufferTimestampConversion(#[from] GpsTimeConversionError),
#[error("{0} at {1}")]
FlatBufferMissing(FlatBufferMissingError, ErrorCodeLocation),
#[error("Kafka Error: {0}")]
KafkaError(#[from] KafkaError),
#[error("Cannot convert local path to string: {path} at {location}")]
CannotConvertPath {
path: PathBuf,
Expand Down
90 changes: 90 additions & 0 deletions nexus-writer/src/kafka_topic_interface.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use rdkafka::{
consumer::{Consumer, StreamConsumer},
error::KafkaResult,
};

#[derive(PartialEq)]
pub(crate) enum TopicMode {
Full,
ConitinousOnly,
}

pub(crate) trait KafkaTopicInterface {
fn ensure_subscription_mode_is(&mut self, mode: TopicMode) -> KafkaResult<()>;
}

pub(super) struct Topics {
pub(super) control: String,
pub(super) log: String,
pub(super) frame_event: String,
pub(super) sample_env: String,
pub(super) alarm: String,
}

impl Topics {
fn topics_for_mode(&self, mode: TopicMode) -> Vec<&str> {
let mut list: Vec<&str> = match mode {
TopicMode::Full => vec![
&self.control,
&self.log,
&self.frame_event,
&self.sample_env,
&self.alarm,
],
TopicMode::ConitinousOnly => {
vec![&self.control, &self.log, &self.frame_event]
}
};
list.sort();
list.dedup();
list
}
}

pub(crate) struct TopicSubscriber<'a> {
mode: Option<TopicMode>,
consumer: &'a StreamConsumer,
full_list: Vec<&'a str>,
continous_only_list: Vec<&'a str>,
}

impl<'a> TopicSubscriber<'a> {
pub(crate) fn new(consumer: &'a StreamConsumer, topics: &'a Topics) -> Self {
Self {
mode: None,
consumer,
full_list: topics.topics_for_mode(TopicMode::Full),
continous_only_list: topics.topics_for_mode(TopicMode::ConitinousOnly),
}
}
}

impl KafkaTopicInterface for TopicSubscriber<'_> {
fn ensure_subscription_mode_is(&mut self, mode: TopicMode) -> KafkaResult<()> {
if self
.mode
.as_ref()
.is_none_or(|this_mode| this_mode.eq(&mode))
{
if self.mode.is_some() {
self.consumer.unsubscribe();
}
match mode {
TopicMode::Full => self.consumer.subscribe(&self.full_list)?,
TopicMode::ConitinousOnly => self.consumer.subscribe(&self.continous_only_list)?,
};
self.mode = Some(mode);
}
Ok(())
}
}

#[cfg(test)]
pub(crate) struct NoKafka;

#[cfg(test)]
impl KafkaTopicInterface for NoKafka {
fn ensure_subscription_mode_is(&mut self, _mode: TopicMode) -> KafkaResult<()> {
Ok(())
}
}
58 changes: 25 additions & 33 deletions nexus-writer/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod error;
mod flush_to_archive;
mod hdf5_handlers;
mod kafka_topic_interface;
mod message_handlers;
mod nexus;
mod nexus_structure;
Expand All @@ -9,6 +10,7 @@ mod run_engine;
use chrono::Duration;
use clap::Parser;
use flush_to_archive::create_archive_flush_task;
use kafka_topic_interface::{KafkaTopicInterface, TopicMode, TopicSubscriber, Topics};
use message_handlers::{
process_payload_on_alarm_topic, process_payload_on_control_topic,
process_payload_on_frame_event_list_topic, process_payload_on_runlog_topic,
Expand All @@ -21,8 +23,8 @@ use rdkafka::{
consumer::{CommitMode, Consumer},
message::{BorrowedMessage, Message},
};
use run_engine::{NexusConfiguration, NexusEngine, NexusSettings};
use std::{fs::create_dir_all, net::SocketAddr, path::PathBuf};
use run_engine::{NexusConfiguration, NexusEngine, NexusEngineDependencies, NexusSettings};
use std::{fs::create_dir_all, marker::PhantomData, net::SocketAddr, path::PathBuf};
use supermusr_common::{
init_tracer,
metrics::{
Expand Down Expand Up @@ -113,30 +115,13 @@ struct Cli {
frame_list_chunk_size: usize,
}

struct Topics<'a> {
control: &'a str,
log: &'a str,
frame_event: &'a str,
sample_env: &'a str,
alarm: &'a str,
struct EngineDependencies<'a> {
phantom: PhantomData<&'a ()>,
}

impl Topics<'_> {
fn get_nonrepeating_list(&self) -> Vec<&str> {
let mut topics_to_subscribe = [
self.control,
self.log,
self.frame_event,
self.sample_env,
self.alarm,
]
.into_iter()
.collect::<Vec<&str>>();
debug!("{topics_to_subscribe:?}");
topics_to_subscribe.sort();
topics_to_subscribe.dedup();
topics_to_subscribe
}
impl<'a> NexusEngineDependencies for EngineDependencies<'a> {
type FileInterface = NexusFile;
type TopicInterface = TopicSubscriber<'a>;
}

#[tokio::main]
Expand All @@ -152,11 +137,11 @@ async fn main() -> anyhow::Result<()> {

// Get topics to subscribe to from command line arguments.
let topics = Topics {
control: args.control_topic.as_str(),
log: args.log_topic.as_str(),
frame_event: args.frame_event_topic.as_str(),
sample_env: args.sample_env_topic.as_str(),
alarm: args.alarm_topic.as_str(),
control: args.control_topic.clone(),
log: args.log_topic.clone(),
frame_event: args.frame_event_topic.clone(),
sample_env: args.sample_env_topic.clone(),
alarm: args.alarm_topic.clone(),
};

let kafka_opts = args.common_kafka_options;
Expand All @@ -166,8 +151,10 @@ async fn main() -> anyhow::Result<()> {
&kafka_opts.username,
&kafka_opts.password,
&args.consumer_group,
&topics.get_nonrepeating_list(),
);
None,
)?;
let mut topics_subscriber = TopicSubscriber::new(&consumer, &topics);
topics_subscriber.ensure_subscription_mode_is(TopicMode::Full)?;

let nexus_settings = NexusSettings::new(
args.local_path.as_path(),
Expand All @@ -191,7 +178,11 @@ async fn main() -> anyhow::Result<()> {

let nexus_configuration = NexusConfiguration::new(args.configuration_options);

let mut nexus_engine = NexusEngine::<NexusFile>::new(nexus_settings, nexus_configuration);
let mut nexus_engine = NexusEngine::<EngineDependencies>::new(
nexus_settings,
nexus_configuration,
topics_subscriber,
);
nexus_engine.resume_partial_runs()?;

// Install exporter and register metrics
Expand Down Expand Up @@ -235,6 +226,7 @@ async fn main() -> anyhow::Result<()> {
},
Ok(msg) => {
process_kafka_message(&topics, &mut nexus_engine, tracer.use_otel(), &msg);

if let Err(e) = consumer.commit_message(&msg, CommitMode::Async){
error!("Failed to commit Kafka message consumption: {e}");
}
Expand All @@ -259,7 +251,7 @@ async fn main() -> anyhow::Result<()> {
))]
fn process_kafka_message(
topics: &Topics,
nexus_engine: &mut NexusEngine<NexusFile>,
nexus_engine: &mut NexusEngine<EngineDependencies>,
use_otel: bool,
msg: &BorrowedMessage,
) {
Expand Down
26 changes: 13 additions & 13 deletions nexus-writer/src/message_handlers.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
run_engine::{run_messages::SampleEnvironmentLog, NexusEngine},
NexusFile,
EngineDependencies,
};
use metrics::counter;
use supermusr_common::{
Expand Down Expand Up @@ -30,7 +30,7 @@ use tracing::{instrument, warn, warn_span};

/// Processes the message payload for a message on the frame_event_list topic
pub(crate) fn process_payload_on_frame_event_list_topic(
nexus_engine: &mut NexusEngine<NexusFile>,
nexus_engine: &mut NexusEngine<EngineDependencies>,
message_kafka_timestamp_ms: i64,
payload: &[u8],
) {
Expand All @@ -43,7 +43,7 @@ pub(crate) fn process_payload_on_frame_event_list_topic(

/// Processes the message payload for a message on the sample_environment topic
pub(crate) fn process_payload_on_sample_env_topic(
nexus_engine: &mut NexusEngine<NexusFile>,
nexus_engine: &mut NexusEngine<EngineDependencies>,
message_kafka_timestamp_ms: i64,
payload: &[u8],
) {
Expand All @@ -58,7 +58,7 @@ pub(crate) fn process_payload_on_sample_env_topic(

/// Processes the message payload for a message on the run_log topic
pub(crate) fn process_payload_on_runlog_topic(
nexus_engine: &mut NexusEngine<NexusFile>,
nexus_engine: &mut NexusEngine<EngineDependencies>,
message_kafka_timestamp_ms: i64,
payload: &[u8],
) {
Expand All @@ -71,7 +71,7 @@ pub(crate) fn process_payload_on_runlog_topic(

/// Processes the message payload for a message on the alarm topic
pub(crate) fn process_payload_on_alarm_topic(
nexus_engine: &mut NexusEngine<NexusFile>,
nexus_engine: &mut NexusEngine<EngineDependencies>,
message_kafka_timestamp_ms: i64,
payload: &[u8],
) {
Expand All @@ -84,7 +84,7 @@ pub(crate) fn process_payload_on_alarm_topic(

/// Processes the message payload for a message on the control topic
pub(crate) fn process_payload_on_control_topic(
nexus_engine: &mut NexusEngine<NexusFile>,
nexus_engine: &mut NexusEngine<EngineDependencies>,
message_kafka_timestamp_ms: i64,
payload: &[u8],
) {
Expand Down Expand Up @@ -125,7 +125,7 @@ fn increment_message_received_counter(kind: MessageKind) {
/// Decode, validate and process a flatbuffer RunStart message
#[tracing::instrument(skip_all, fields(kafka_message_timestamp_ms=kafka_message_timestamp_ms))]
fn push_run_start(
nexus_engine: &mut NexusEngine<NexusFile>,
nexus_engine: &mut NexusEngine<EngineDependencies>,
kafka_message_timestamp_ms: i64,
payload: &[u8],
) {
Expand Down Expand Up @@ -157,7 +157,7 @@ fn push_run_start(
)
)]
fn push_frame_event_list(
nexus_engine: &mut NexusEngine<NexusFile>,
nexus_engine: &mut NexusEngine<EngineDependencies>,
kafka_message_timestamp_ms: i64,
payload: &[u8],
) {
Expand All @@ -182,7 +182,7 @@ fn push_frame_event_list(
/// Decode, validate and process a flatbuffer RunLog message
#[tracing::instrument(skip_all, fields(kafka_message_timestamp_ms=kafka_message_timestamp_ms, has_run))]
pub(crate) fn push_run_log(
nexus_engine: &mut NexusEngine<NexusFile>,
nexus_engine: &mut NexusEngine<EngineDependencies>,
kafka_message_timestamp_ms: i64,
payload: &[u8],
) {
Expand All @@ -201,7 +201,7 @@ pub(crate) fn push_run_log(
/// Decode, validate and process flatbuffer SampleEnvironmentLog messages
#[tracing::instrument(skip_all, fields(kafka_message_timestamp_ms=kafka_message_timestamp_ms, has_run))]
fn push_f144_sample_environment_log(
nexus_engine: &mut NexusEngine<NexusFile>,
nexus_engine: &mut NexusEngine<EngineDependencies>,
kafka_message_timestamp_ms: i64,
payload: &[u8],
) {
Expand All @@ -221,7 +221,7 @@ fn push_f144_sample_environment_log(
/// Decode, validate and process flatbuffer SampleEnvironmentLog messages
#[tracing::instrument(skip_all, fields(kafka_message_timestamp_ms=kafka_message_timestamp_ms, has_run))]
fn push_se00_sample_environment_log(
nexus_engine: &mut NexusEngine<NexusFile>,
nexus_engine: &mut NexusEngine<EngineDependencies>,
kafka_message_timestamp_ms: i64,
payload: &[u8],
) {
Expand All @@ -241,7 +241,7 @@ fn push_se00_sample_environment_log(
/// Decode, validate and process a flatbuffer Alarm message
#[tracing::instrument(skip_all, fields(kafka_message_timestamp_ms=kafka_message_timestamp_ms, has_run))]
fn push_alarm(
nexus_engine: &mut NexusEngine<NexusFile>,
nexus_engine: &mut NexusEngine<EngineDependencies>,
kafka_message_timestamp_ms: i64,
payload: &[u8],
) {
Expand All @@ -259,7 +259,7 @@ fn push_alarm(
/// Decode, validate and process a flatbuffer RunStop message
#[tracing::instrument(skip_all, fields(kafka_message_timestamp_ms=kafka_message_timestamp_ms, has_run))]
fn push_run_stop(
nexus_engine: &mut NexusEngine<NexusFile>,
nexus_engine: &mut NexusEngine<EngineDependencies>,
kafka_message_timestamp_ms: i64,
payload: &[u8],
) {
Expand Down
Loading