Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions nexus-writer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ ndarray.workspace = true
rdkafka.workspace = true
supermusr-common.workspace = true
supermusr-streaming-types.workspace = true
thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true

Expand Down
75 changes: 47 additions & 28 deletions nexus-writer/src/nexus/engine.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use super::{Run, RunParameters};
use chrono::{DateTime, Duration, Utc};
use super::{
error::{ErrorCodeLocation, FlatBufferMissingError, NexusWriterError, NexusWriterResult},
NexusDateTime, Run, RunParameters,
};
use chrono::Duration;
use glob::glob;
#[cfg(test)]
use std::collections::vec_deque;
Expand Down Expand Up @@ -43,10 +46,13 @@ impl NexusEngine {
}
}

pub(crate) fn resume_partial_runs(&mut self) -> anyhow::Result<()> {
pub(crate) fn resume_partial_runs(&mut self) -> NexusWriterResult<()> {
if let Some(local_path) = &self.local_path {
let local_path_str = local_path.as_os_str().to_str().ok_or_else(|| {
anyhow::anyhow!("Cannot convert local path to string: {0:?}", local_path)
NexusWriterError::CannotConvertPath {
path: local_path.clone(),
location: ErrorCodeLocation::ResumePartialRunsLocalDirectoryPath,
}
})?;

for filename in glob(&format!("{local_path_str}/*.nxs"))? {
Expand All @@ -55,15 +61,18 @@ impl NexusEngine {
filename
.file_stem()
.and_then(OsStr::to_str)
.ok_or_else(|| {
anyhow::anyhow!("Cannot convert filename to string: {0:?}", filename)
.ok_or_else(|| NexusWriterError::CannotConvertPath {
path: filename.clone(),
location: ErrorCodeLocation::ResumePartialRunsFilePath,
})?;
let mut run = info_span!(
"Partial Run Found",
path = local_path_str,
file_name = filename_str
)
.in_scope(|| Run::resume_partial_run(local_path, filename_str))?;
.in_scope(|| {
Run::resume_partial_run(local_path, filename_str, &self.nexus_settings)
})?;
if let Err(e) = run.span_init() {
warn!("Run span initiation failed {e}")
}
Expand All @@ -86,8 +95,8 @@ impl NexusEngine {
pub(crate) fn sample_envionment(
&mut self,
data: se00_SampleEnvironmentData<'_>,
) -> anyhow::Result<Option<&Run>> {
let timestamp = DateTime::<Utc>::from_timestamp_nanos(data.packet_timestamp());
) -> NexusWriterResult<Option<&Run>> {
let timestamp = NexusDateTime::from_timestamp_nanos(data.packet_timestamp());
if let Some(run) = self
.run_cache
.iter_mut()
Expand All @@ -102,8 +111,8 @@ impl NexusEngine {
}

#[tracing::instrument(skip_all)]
pub(crate) fn logdata(&mut self, data: &f144_LogData<'_>) -> anyhow::Result<Option<&Run>> {
let timestamp = DateTime::<Utc>::from_timestamp_nanos(data.timestamp());
pub(crate) fn logdata(&mut self, data: &f144_LogData<'_>) -> NexusWriterResult<Option<&Run>> {
let timestamp = NexusDateTime::from_timestamp_nanos(data.timestamp());
if let Some(run) = self
.run_cache
.iter_mut()
Expand All @@ -118,14 +127,14 @@ impl NexusEngine {
}

#[tracing::instrument(skip_all)]
pub(crate) fn alarm(&mut self, data: Alarm<'_>) -> anyhow::Result<Option<&Run>> {
let timestamp = DateTime::<Utc>::from_timestamp_nanos(data.timestamp());
pub(crate) fn alarm(&mut self, data: Alarm<'_>) -> NexusWriterResult<Option<&Run>> {
let timestamp = NexusDateTime::from_timestamp_nanos(data.timestamp());
if let Some(run) = self
.run_cache
.iter_mut()
.find(|run| run.is_message_timestamp_valid(&timestamp))
{
run.push_alarm_to_run(self.local_path.as_deref(), data)?;
run.push_alarm_to_run(self.local_path.as_deref(), data, &self.nexus_settings)?;
Ok(Some(run))
} else {
warn!("No run found for alarm message with timestamp: {timestamp}");
Expand All @@ -134,7 +143,7 @@ impl NexusEngine {
}

#[tracing::instrument(skip_all)]
pub(crate) fn start_command(&mut self, data: RunStart<'_>) -> anyhow::Result<&mut Run> {
pub(crate) fn start_command(&mut self, data: RunStart<'_>) -> NexusWriterResult<&mut Run> {
// If a run is already in progress, and is missing a run-stop
// then call an abort run on the current run.
if self.run_cache.back().is_some_and(|run| !run.has_run_stop()) {
Expand All @@ -161,7 +170,7 @@ impl NexusEngine {
start_time = data.start_time(),
)
)]
fn abort_back_run(&mut self, data: &RunStart<'_>) -> anyhow::Result<()> {
fn abort_back_run(&mut self, data: &RunStart<'_>) -> NexusWriterResult<()> {
self.run_cache
.back_mut()
.expect("run_cache::back_mut should exist")
Expand All @@ -174,13 +183,15 @@ impl NexusEngine {
}

#[tracing::instrument(skip_all)]
pub(crate) fn stop_command(&mut self, data: RunStop<'_>) -> anyhow::Result<&Run> {
pub(crate) fn stop_command(&mut self, data: RunStop<'_>) -> NexusWriterResult<&Run> {
if let Some(last_run) = self.run_cache.back_mut() {
last_run.set_stop_if_valid(self.local_path.as_deref(), data)?;

Ok(last_run)
} else {
Err(anyhow::anyhow!("Unexpected RunStop Command"))
Err(NexusWriterError::UnexpectedRunStop(
ErrorCodeLocation::StopCommand,
))
}
}

Expand All @@ -198,19 +209,27 @@ impl NexusEngine {
pub(crate) fn process_event_list(
&mut self,
message: &FrameAssembledEventListMessage<'_>,
) -> anyhow::Result<Option<&Run>> {
let timestamp: DateTime<Utc> = (*message
.metadata()
.timestamp()
.ok_or(anyhow::anyhow!("Message timestamp missing."))?)
.try_into()?;
) -> NexusWriterResult<Option<&Run>> {
let timestamp: NexusDateTime =
(*message
.metadata()
.timestamp()
.ok_or(NexusWriterError::FlatBufferMissing(
FlatBufferMissingError::Timestamp,
ErrorCodeLocation::ProcessEventList,
))?)
.try_into()?;

let run: Option<&Run> = if let Some(run) = self
.run_cache
.iter_mut()
.find(|run| run.is_message_timestamp_valid(&timestamp))
{
run.push_message(self.local_path.as_deref(), message, &self.nexus_settings)?;
run.push_frame_eventlist_message(
self.local_path.as_deref(),
message,
&self.nexus_settings,
)?;
Some(run)
} else {
warn!("No run found for message with timestamp: {timestamp}");
Expand Down Expand Up @@ -289,7 +308,7 @@ mod test {
fbb: &'b mut FlatBufferBuilder,
name: &str,
start_time: u64,
) -> anyhow::Result<RunStart<'a>, InvalidFlatbuffer> {
) -> Result<RunStart<'a>, InvalidFlatbuffer> {
let args = RunStartArgs {
start_time,
run_name: Some(fbb.create_string(name)),
Expand All @@ -305,7 +324,7 @@ mod test {
fbb: &'b mut FlatBufferBuilder,
name: &str,
stop_time: u64,
) -> anyhow::Result<RunStop<'a>, InvalidFlatbuffer> {
) -> Result<RunStop<'a>, InvalidFlatbuffer> {
let args = RunStopArgs {
stop_time,
run_name: Some(fbb.create_string(name)),
Expand All @@ -330,7 +349,7 @@ mod test {
fn create_frame_assembled_message<'a, 'b: 'a>(
fbb: &'b mut FlatBufferBuilder,
timestamp: &GpsTime,
) -> anyhow::Result<FrameAssembledEventListMessage<'a>, InvalidFlatbuffer> {
) -> Result<FrameAssembledEventListMessage<'a>, InvalidFlatbuffer> {
let metadata = FrameMetadataV2::create(fbb, &create_metadata(timestamp));
let args = FrameAssembledEventListMessageArgs {
metadata: Some(metadata),
Expand Down
79 changes: 79 additions & 0 deletions nexus-writer/src/nexus/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use super::{hdf5_file::NexusHDF5Error, NexusDateTime};
use glob::{GlobError, PatternError};
use std::{num::TryFromIntError, path::PathBuf};
use supermusr_streaming_types::time_conversions::GpsTimeConversionError;
use thiserror::Error;

pub(crate) type NexusWriterResult<T> = Result<T, NexusWriterError>;

#[derive(Debug, Error)]
pub(crate) enum ErrorCodeLocation {
#[error("set_stop_if_valid")]
SetStopIfValid,
#[error("set_aborted_run")]
SetAbortedRun,
#[error("RunParameters::new")]
NewRunParamemters,
#[error("stop_command")]
StopCommand,
#[error("process_event_list")]
ProcessEventList,
#[error("resume_partial_runs local directory path")]
ResumePartialRunsLocalDirectoryPath,
#[error("resume_partial_runs file path")]
ResumePartialRunsFilePath,
}

#[derive(Debug, Error)]
pub(crate) enum NexusWriterError {
#[error("{0}")]
HDF5(#[from] NexusHDF5Error),
#[error("Flatbuffer Timestamp Conversion Error {0}")]
FlatBufferTimestampConversion(#[from] GpsTimeConversionError),
#[error("{0}")]
FlatBufferMissing(FlatBufferMissingError, ErrorCodeLocation),
#[error("Unexpected RunStop Command")]
UnexpectedRunStop(ErrorCodeLocation),
#[error("Cannot convert local path to string: {path}")]
CannotConvertPath {
path: PathBuf,
location: ErrorCodeLocation,
},
#[error("Glob Pattern Error: {0}")]
GlobPattern(#[from] PatternError),
#[error("Glob Error: {0}")]
Glob(#[from] GlobError),
#[error("Integer Conversion Error")]
TryFromInt(#[from] TryFromIntError),
#[error("Start Time {int} Out of Range for DateTime at {location}")]
IntOutOfRangeForDateTime {
int: u64,
location: ErrorCodeLocation,
},
#[error("Stop Command before Start Command at {0}")]
StopCommandBeforeStartCommand(ErrorCodeLocation),
#[error("Stop Time {stop} earlier than current Start Time {start} at {location}")]
StopTimeEarlierThanStartTime {
start: NexusDateTime,
stop: NexusDateTime,
location: ErrorCodeLocation,
},
#[error("RunStop already set at {0}")]
RunStopAlreadySet(ErrorCodeLocation),
}

#[derive(Debug, Error)]
pub(crate) enum FlatBufferMissingError {
#[error("Flatbuffer Timestamp Missing")]
Timestamp,
#[error("Flatbuffer Channels Missing")]
Channels,
#[error("Flatbuffer Intensities Missing")]
Intensities,
#[error("Flatbuffer Times Missing")]
Times,
#[error("Flatbuffer Run Name Missing")]
RunName,
#[error("Flatbuffer Instrument Name Missing")]
InstrumentName,
}
85 changes: 85 additions & 0 deletions nexus-writer/src/nexus/hdf5_file/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use crate::nexus::error::FlatBufferMissingError;
use hdf5::{types::TypeDescriptor, Attribute, Dataset, Group};
use std::error::Error;
use supermusr_streaming_types::time_conversions::GpsTimeConversionError;
use thiserror::Error;

pub(crate) type NexusHDF5Result<T> = Result<T, NexusHDF5Error>;

pub(crate) trait ConvertResult<T, E>
where
E: Error + Into<NexusHDF5ErrorType>,
{
fn err_group(self, group: &Group) -> NexusHDF5Result<T>;
fn err_dataset(self, dataset: &Dataset) -> NexusHDF5Result<T>;
fn err_attribute(self, attribute: &Attribute) -> NexusHDF5Result<T>;
fn err_file(self) -> NexusHDF5Result<T>;
}

#[derive(Debug, Error)]
#[error("{error_type} at {context}")]
pub(crate) struct NexusHDF5Error {
error_type: NexusHDF5ErrorType,
context: String,
}

impl<T, E> ConvertResult<T, E> for Result<T, E>
where
E: Error + Into<NexusHDF5ErrorType>,
{
fn err_group(self, group: &Group) -> NexusHDF5Result<T> {
self.map_err(|e| NexusHDF5Error {
error_type: e.into(),
context: group.name(),
})
}

fn err_dataset(self, dataset: &Dataset) -> NexusHDF5Result<T> {
self.map_err(|e| NexusHDF5Error {
error_type: e.into(),
context: dataset.name(),
})
}

fn err_attribute(self, attribute: &Attribute) -> NexusHDF5Result<T> {
self.map_err(|e| NexusHDF5Error {
error_type: e.into(),
context: attribute.name(),
})
}

fn err_file(self) -> NexusHDF5Result<T> {
self.map_err(|e| NexusHDF5Error {
error_type: e.into(),
context: "File Level".to_owned(),
})
}
}

#[derive(Debug, Error)]
pub(crate) enum NexusHDF5ErrorType {
#[error("{0}")]
HDF5(#[from] hdf5::Error),
#[error("{0}")]
DateTimeConversion(#[from] chrono::ParseError),
#[error("{0}")]
HDF5String(#[from] hdf5::types::StringError),
#[error("Flatbuffer Timestamp Conversion Error {0}")]
FlatBufferTimestampConversion(#[from] GpsTimeConversionError),
#[error("Flatbuffer Timestamp Error Converting to Nanoseconds")]
FlatBufferTimestampConvertToNanoseconds,
#[error("{0}")]
FlatBufferMissing(FlatBufferMissingError),
#[error("Invalid FlatBuffer RunLog Data Type {0}")]
FlatBufferInvalidRunLogDataType(String),
#[error("Invalid FlatBuffer Sample Environment Log Data Type {0}")]
FlatBufferInvalidSELogDataType(String),
#[error("Inconsistent Numbers of SELog Times and Values {0} != {1}")]
FlatBufferInconsistentSELogTimeValueSizes(usize, usize),
#[error("Invalid HDF5 Type {0}")]
InvalidHDF5Type(TypeDescriptor),
#[error("Invalid HDF5 Conversion {0}")]
InvalidHDF5TypeConversion(TypeDescriptor),
#[error("IO Error {0}")]
IO(#[from] std::io::Error),
}
Loading