Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ members = [
[workspace.package]
version = "0.1.0"
license = "GPL-3.0-only"
edition = "2021"
edition = "2024"

[workspace.dependencies]
anyhow = "1.0.98"
Expand Down
2 changes: 1 addition & 1 deletion common/src/tracer/otel_tracer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::trace::Tracer;
use tracing::{level_filters::LevelFilter, warn};
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::{filter::Filtered, registry::LookupSpan, EnvFilter, Layer};
use tracing_subscriber::{EnvFilter, Layer, filter::Filtered, registry::LookupSpan};

pub(super) struct OtelOptions<'a> {
pub(super) endpoint: &'a str,
Expand Down
2 changes: 1 addition & 1 deletion common/src/tracer/propagator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use rdkafka::{
message::{BorrowedHeaders, Headers, OwnedHeaders},
producer::FutureRecord,
};
use tracing::{debug, Span};
use tracing::{Span, debug};
use tracing_opentelemetry::OpenTelemetrySpanExt;

struct HeaderInjector<'a>(pub &'a mut OwnedHeaders);
Expand Down
2 changes: 1 addition & 1 deletion common/src/tracer/tracer_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::otel_tracer::{OtelOptions, OtelTracer};
use opentelemetry::{global::Error, trace::TraceError};
use tracing::Span;
use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Layer};
use tracing_subscriber::{EnvFilter, Layer, layer::SubscriberExt};

pub struct TracerOptions<'a> {
otel_options: Option<OtelOptions<'a>>,
Expand Down
2 changes: 1 addition & 1 deletion diagnostics/src/daq_trace/app.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{data::DigitiserData, DigitiserDataHashMap};
use super::{DigitiserDataHashMap, data::DigitiserData};
use chrono::{DateTime, Timelike, Utc};
use ratatui::widgets::TableState;

Expand Down
12 changes: 6 additions & 6 deletions diagnostics/src/daq_trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,25 @@ use super::DaqTraceOpts;
use crossterm::event::{self, DisableMouseCapture, EnableMouseCapture, Event as CEvent, KeyCode};
use crossterm::execute;
use crossterm::terminal::{
disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen,
EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode, enable_raw_mode,
};
use data::{DigitiserData, DigitiserDataHashMap};
use ratatui::{prelude::CrosstermBackend, Terminal};
use ratatui::{Terminal, prelude::CrosstermBackend};
use rdkafka::{
consumer::{stream_consumer::StreamConsumer, CommitMode, Consumer},
consumer::{CommitMode, Consumer, stream_consumer::StreamConsumer},
message::Message,
};
use std::collections::HashMap;
use std::{
io,
sync::{mpsc, Arc, Mutex},
sync::{Arc, Mutex, mpsc},
thread,
time::{Duration, Instant},
};
use supermusr_common::Intensity;
use supermusr_streaming_types::dat2_digitizer_analog_trace_v2_generated::{
digitizer_analog_trace_message_buffer_has_identifier, root_as_digitizer_analog_trace_message,
DigitizerAnalogTraceMessage,
DigitizerAnalogTraceMessage, digitizer_analog_trace_message_buffer_has_identifier,
root_as_digitizer_analog_trace_message,
};
use tokio::task;
use tokio::time::sleep;
Expand Down
2 changes: 1 addition & 1 deletion diagnostics/src/daq_trace/ui.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use super::{app::App, data::DigitiserData};
use ratatui::{
Frame,
prelude::{Alignment, Backend, Constraint, Direction, Layout, Rect},
style::{Color, Modifier, Style},
text::Text,
widgets::{Block, Borders, Cell, Paragraph, Row, Table},
Frame,
};

/// Draws the ui based on the current app state.
Expand Down
2 changes: 1 addition & 1 deletion diagnostics/src/message_debug.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::CommonOpts;
use rdkafka::{
consumer::{CommitMode, Consumer, StreamConsumer},
Message,
consumer::{CommitMode, Consumer, StreamConsumer},
};
use supermusr_streaming_types::dat2_digitizer_analog_trace_v2_generated::{
digitizer_analog_trace_message_buffer_has_identifier, root_as_digitizer_analog_trace_message,
Expand Down
8 changes: 4 additions & 4 deletions digitiser-aggregator/src/data/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use crate::frame::AggregatedFrame;
use supermusr_common::{Channel, DigitizerId, Intensity, Time};
use supermusr_streaming_types::{
aev2_frame_assembled_event_v2_generated::{
finish_frame_assembled_event_list_message_buffer, FrameAssembledEventListMessage,
FrameAssembledEventListMessageArgs,
FrameAssembledEventListMessage, FrameAssembledEventListMessageArgs,
finish_frame_assembled_event_list_message_buffer,
},
dev2_digitizer_event_v2_generated::DigitizerEventListMessage,
flatbuffers::FlatBufferBuilder,
Expand Down Expand Up @@ -34,10 +34,10 @@ impl EventData {
events_per_channel: usize,
channels: &[Channel],
) -> Self {
let time = std::iter::repeat(
let time = std::iter::repeat_n(
&(time_offset..(time_offset + events_per_channel as Time)).collect::<Vec<Time>>(),
channels.len(),
)
.take(channels.len())
.flatten()
.copied()
.collect();
Expand Down
2 changes: 1 addition & 1 deletion digitiser-aggregator/src/frame/aggregated.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use super::partial::PartialFrame;
use crate::data::{Accumulate, DigitiserData};
use supermusr_common::{
spanned::{SpanOnce, Spanned, SpannedMut},
DigitizerId,
spanned::{SpanOnce, Spanned, SpannedMut},
};
use supermusr_streaming_types::FrameMetadata;

Expand Down
113 changes: 71 additions & 42 deletions digitiser-aggregator/src/frame/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
//! and handles incoming digitiser event list messages, appending them to, or creating new partial frames
//! as they are received.

use super::{partial::PartialFrame, AggregatedFrame, RejectMessageError};
use super::{AggregatedFrame, RejectMessageError, partial::PartialFrame};
use crate::data::{Accumulate, DigitiserData};
use chrono::{DateTime, Utc};
use std::{collections::VecDeque, fmt::Debug, time::Duration};
use supermusr_common::{record_metadata_fields_to_span, spanned::SpannedAggregator, DigitizerId};
use supermusr_common::{DigitizerId, record_metadata_fields_to_span, spanned::SpannedAggregator};
use supermusr_streaming_types::FrameMetadata;
use tracing::{info_span, warn};

Expand Down Expand Up @@ -60,7 +60,10 @@ where
) -> Result<(), RejectMessageError> {
if let Some(latest_timestamp_dispatched) = self.latest_timestamp_dispatched {
if metadata.timestamp <= latest_timestamp_dispatched {
warn!("Frame's timestamp earlier than or equal to the latest frame dispatched: {0} <= {1}", metadata.timestamp, latest_timestamp_dispatched);
warn!(
"Frame's timestamp earlier than or equal to the latest frame dispatched: {0} <= {1}",
metadata.timestamp, latest_timestamp_dispatched
);
return Err(RejectMessageError::TimestampTooEarly);
}
}
Expand Down Expand Up @@ -172,28 +175,36 @@ mod test {
assert!(cache.poll().is_none());

assert_eq!(cache.get_num_partial_frames(), 0);
assert!(cache
.push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2]))
.is_ok());
assert!(
cache
.push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2]))
.is_ok()
);
assert_eq!(cache.get_num_partial_frames(), 1);

assert!(cache.poll().is_none());

assert!(cache
.push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5]))
.is_ok());
assert!(
cache
.push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5]))
.is_ok()
);

assert!(cache.poll().is_none());

assert!(cache
.push(4, &frame_1, EventData::dummy_data(0, 5, &[6, 7, 8]))
.is_ok());
assert!(
cache
.push(4, &frame_1, EventData::dummy_data(0, 5, &[6, 7, 8]))
.is_ok()
);

assert!(cache.poll().is_none());

assert!(cache
.push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11]))
.is_ok());
assert!(
cache
.push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11]))
.is_ok()
);

{
let frame = cache.poll().unwrap();
Expand Down Expand Up @@ -241,21 +252,27 @@ mod test {

assert!(cache.poll().is_none());

assert!(cache
.push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2]))
.is_ok());
assert!(
cache
.push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2]))
.is_ok()
);

assert!(cache.poll().is_none());

assert!(cache
.push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5]))
.is_ok());
assert!(
cache
.push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5]))
.is_ok()
);

assert!(cache.poll().is_none());

assert!(cache
.push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11]))
.is_ok());
assert!(
cache
.push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11]))
.is_ok()
);

assert!(cache.poll().is_none());

Expand Down Expand Up @@ -301,24 +318,32 @@ mod test {
frame_number: 1728,
veto_flags: 4,
};
assert!(cache
.push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2]))
.is_ok());
assert!(cache
.push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5]))
.is_ok());
assert!(cache
.push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11]))
.is_ok());
assert!(
cache
.push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2]))
.is_ok()
);
assert!(
cache
.push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5]))
.is_ok()
);
assert!(
cache
.push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11]))
.is_ok()
);

tokio::time::sleep(Duration::from_millis(105)).await;

let _ = cache.poll().unwrap();

// This call to push should return an error
assert!(cache
.push(4, &frame_1, EventData::dummy_data(0, 5, &[6, 7, 8]))
.is_err());
assert!(
cache
.push(4, &frame_1, EventData::dummy_data(0, 5, &[6, 7, 8]))
.is_err()
);
}

#[test]
Expand Down Expand Up @@ -349,15 +374,19 @@ mod test {
assert_eq!(cache.frames.len(), 0);
assert!(cache.poll().is_none());

assert!(cache
.push(1, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2]))
.is_ok());
assert!(
cache
.push(1, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2]))
.is_ok()
);
assert_eq!(cache.frames.len(), 1);
assert!(cache.poll().is_none());

assert!(cache
.push(2, &frame_2, EventData::dummy_data(0, 5, &[0, 1, 2]))
.is_ok());
assert!(
cache
.push(2, &frame_2, EventData::dummy_data(0, 5, &[0, 1, 2]))
.is_ok()
);
assert_eq!(cache.frames.len(), 1);
assert!(cache.poll().is_some());
}
Expand Down
4 changes: 2 additions & 2 deletions digitiser-aggregator/src/frame/partial.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::data::DigitiserData;
use std::time::Duration;
use supermusr_common::{
spanned::{SpanOnce, SpanOnceError, Spanned, SpannedAggregator, SpannedMut},
DigitizerId,
spanned::{SpanOnce, SpanOnceError, Spanned, SpannedAggregator, SpannedMut},
};
use supermusr_streaming_types::FrameMetadata;
use tokio::time::Instant;
use tracing::{info_span, Span};
use tracing::{Span, info_span};

/// Holds the data of a frame, whislt it is in cache being built from digitiser messages.
pub(crate) struct PartialFrame<D> {
Expand Down
11 changes: 5 additions & 6 deletions digitiser-aggregator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use rdkafka::{
};
use std::{fmt::Debug, net::SocketAddr, time::Duration};
use supermusr_common::{
init_tracer,
CommonKafkaOpts, DigitizerId, init_tracer,
metrics::{
failures::{self, FailureKind},
messages_received::{self, MessageKind},
Expand All @@ -23,19 +23,18 @@ use supermusr_common::{
record_metadata_fields_to_span,
spanned::Spanned,
tracer::{FutureRecordTracerExt, OptionalHeaderTracerExt, TracerEngine, TracerOptions},
CommonKafkaOpts, DigitizerId,
};
use supermusr_streaming_types::{
dev2_digitizer_event_v2_generated::{
digitizer_event_list_message_buffer_has_identifier, root_as_digitizer_event_list_message,
DigitizerEventListMessage,
DigitizerEventListMessage, digitizer_event_list_message_buffer_has_identifier,
root_as_digitizer_event_list_message,
},
flatbuffers::InvalidFlatbuffer,
};
use tokio::{
select,
signal::unix::{signal, Signal, SignalKind},
sync::mpsc::{error::SendError, Receiver, Sender},
signal::unix::{Signal, SignalKind, signal},
sync::mpsc::{Receiver, Sender, error::SendError},
task::JoinHandle,
};
use tracing::{debug, error, info, info_span, instrument, warn};
Expand Down
6 changes: 3 additions & 3 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
};

toolchain = fenix.packages.${system}.toolchainOf {
channel = "1.84";
date = "2025-01-09";
sha256 = "lMLAupxng4Fd9F1oDw8gx+qA0RuF7ou7xhNU8wgs0PU=";
channel = "1.87";
date = "2025-05-15";
sha256 = "KUm16pHj+cRedf8vxs/Hd2YWxpOrWZ7UOrwhILdSJBU=";
};

naersk' = pkgs.callPackage naersk {
Expand Down
4 changes: 2 additions & 2 deletions nexus-writer/src/flush_to_archive.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
//! Defines async function which moves completed NeXus files to remote storage.
use crate::{
error::{ErrorCodeLocation, NexusWriterError, NexusWriterResult},
NexusSettings,
error::{ErrorCodeLocation, NexusWriterError, NexusWriterResult},
};
use std::path::{Path, PathBuf};
use tokio::{
signal::unix::{signal, SignalKind},
signal::unix::{SignalKind, signal},
task::JoinHandle,
time::Interval,
};
Expand Down
4 changes: 2 additions & 2 deletions nexus-writer/src/hdf5_handlers/attribute.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
//! This module implements the traits to extend the hdf5 [Attribute] type to provide robust, conventient helper methods.
use super::{
error::{ConvertResult, NexusHDF5Result},
AttributeExt,
error::{ConvertResult, NexusHDF5Result},
};
use crate::run_engine::NexusDateTime;
use hdf5::{types::VarLenUnicode, Attribute};
use hdf5::{Attribute, types::VarLenUnicode};

impl AttributeExt for Attribute {
/// Maybe this should be a provided method?
Expand Down
Loading