diff --git a/Cargo.lock b/Cargo.lock index 1017aada7..5f318db8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1984,6 +1984,7 @@ dependencies = [ "rdkafka", "supermusr-common", "supermusr-streaming-types", + "thiserror 2.0.11", "tokio", "tracing", ] diff --git a/nexus-writer/Cargo.toml b/nexus-writer/Cargo.toml index 775e99618..6431a57df 100644 --- a/nexus-writer/Cargo.toml +++ b/nexus-writer/Cargo.toml @@ -16,6 +16,7 @@ ndarray.workspace = true rdkafka.workspace = true supermusr-common.workspace = true supermusr-streaming-types.workspace = true +thiserror.workspace = true tokio.workspace = true tracing.workspace = true diff --git a/nexus-writer/src/nexus/engine.rs b/nexus-writer/src/nexus/engine.rs index 2bdde3c81..640931793 100644 --- a/nexus-writer/src/nexus/engine.rs +++ b/nexus-writer/src/nexus/engine.rs @@ -1,5 +1,8 @@ -use super::{Run, RunParameters}; -use chrono::{DateTime, Duration, Utc}; +use super::{ + error::{ErrorCodeLocation, FlatBufferMissingError, NexusWriterError, NexusWriterResult}, + NexusDateTime, Run, RunParameters, +}; +use chrono::Duration; use glob::glob; #[cfg(test)] use std::collections::vec_deque; @@ -43,10 +46,13 @@ impl NexusEngine { } } - pub(crate) fn resume_partial_runs(&mut self) -> anyhow::Result<()> { + pub(crate) fn resume_partial_runs(&mut self) -> NexusWriterResult<()> { if let Some(local_path) = &self.local_path { let local_path_str = local_path.as_os_str().to_str().ok_or_else(|| { - anyhow::anyhow!("Cannot convert local path to string: {0:?}", local_path) + NexusWriterError::CannotConvertPath { + path: local_path.clone(), + location: ErrorCodeLocation::ResumePartialRunsLocalDirectoryPath, + } })?; for filename in glob(&format!("{local_path_str}/*.nxs"))? { @@ -55,15 +61,18 @@ impl NexusEngine { filename .file_stem() .and_then(OsStr::to_str) - .ok_or_else(|| { - anyhow::anyhow!("Cannot convert filename to string: {0:?}", filename) + .ok_or_else(|| NexusWriterError::CannotConvertPath { + path: filename.clone(), + location: ErrorCodeLocation::ResumePartialRunsFilePath, })?; let mut run = info_span!( "Partial Run Found", path = local_path_str, file_name = filename_str ) - .in_scope(|| Run::resume_partial_run(local_path, filename_str))?; + .in_scope(|| { + Run::resume_partial_run(local_path, filename_str, &self.nexus_settings) + })?; if let Err(e) = run.span_init() { warn!("Run span initiation failed {e}") } @@ -86,8 +95,8 @@ impl NexusEngine { pub(crate) fn sample_envionment( &mut self, data: se00_SampleEnvironmentData<'_>, - ) -> anyhow::Result> { - let timestamp = DateTime::::from_timestamp_nanos(data.packet_timestamp()); + ) -> NexusWriterResult> { + let timestamp = NexusDateTime::from_timestamp_nanos(data.packet_timestamp()); if let Some(run) = self .run_cache .iter_mut() @@ -102,8 +111,8 @@ impl NexusEngine { } #[tracing::instrument(skip_all)] - pub(crate) fn logdata(&mut self, data: &f144_LogData<'_>) -> anyhow::Result> { - let timestamp = DateTime::::from_timestamp_nanos(data.timestamp()); + pub(crate) fn logdata(&mut self, data: &f144_LogData<'_>) -> NexusWriterResult> { + let timestamp = NexusDateTime::from_timestamp_nanos(data.timestamp()); if let Some(run) = self .run_cache .iter_mut() @@ -118,14 +127,14 @@ impl NexusEngine { } #[tracing::instrument(skip_all)] - pub(crate) fn alarm(&mut self, data: Alarm<'_>) -> anyhow::Result> { - let timestamp = DateTime::::from_timestamp_nanos(data.timestamp()); + pub(crate) fn alarm(&mut self, data: Alarm<'_>) -> NexusWriterResult> { + let timestamp = NexusDateTime::from_timestamp_nanos(data.timestamp()); if let Some(run) = self .run_cache .iter_mut() .find(|run| run.is_message_timestamp_valid(×tamp)) { - run.push_alarm_to_run(self.local_path.as_deref(), data)?; + run.push_alarm_to_run(self.local_path.as_deref(), data, &self.nexus_settings)?; Ok(Some(run)) } else { warn!("No run found for alarm message with timestamp: {timestamp}"); @@ -134,7 +143,7 @@ impl NexusEngine { } #[tracing::instrument(skip_all)] - pub(crate) fn start_command(&mut self, data: RunStart<'_>) -> anyhow::Result<&mut Run> { + pub(crate) fn start_command(&mut self, data: RunStart<'_>) -> NexusWriterResult<&mut Run> { // If a run is already in progress, and is missing a run-stop // then call an abort run on the current run. if self.run_cache.back().is_some_and(|run| !run.has_run_stop()) { @@ -161,7 +170,7 @@ impl NexusEngine { start_time = data.start_time(), ) )] - fn abort_back_run(&mut self, data: &RunStart<'_>) -> anyhow::Result<()> { + fn abort_back_run(&mut self, data: &RunStart<'_>) -> NexusWriterResult<()> { self.run_cache .back_mut() .expect("run_cache::back_mut should exist") @@ -174,13 +183,15 @@ impl NexusEngine { } #[tracing::instrument(skip_all)] - pub(crate) fn stop_command(&mut self, data: RunStop<'_>) -> anyhow::Result<&Run> { + pub(crate) fn stop_command(&mut self, data: RunStop<'_>) -> NexusWriterResult<&Run> { if let Some(last_run) = self.run_cache.back_mut() { last_run.set_stop_if_valid(self.local_path.as_deref(), data)?; Ok(last_run) } else { - Err(anyhow::anyhow!("Unexpected RunStop Command")) + Err(NexusWriterError::UnexpectedRunStop( + ErrorCodeLocation::StopCommand, + )) } } @@ -197,19 +208,27 @@ impl NexusEngine { pub(crate) fn process_event_list( &mut self, message: &FrameAssembledEventListMessage<'_>, - ) -> anyhow::Result> { - let timestamp: DateTime = (*message - .metadata() - .timestamp() - .ok_or(anyhow::anyhow!("Message timestamp missing."))?) - .try_into()?; + ) -> NexusWriterResult> { + let timestamp: NexusDateTime = + (*message + .metadata() + .timestamp() + .ok_or(NexusWriterError::FlatBufferMissing( + FlatBufferMissingError::Timestamp, + ErrorCodeLocation::ProcessEventList, + ))?) + .try_into()?; let run: Option<&Run> = if let Some(run) = self .run_cache .iter_mut() .find(|run| run.is_message_timestamp_valid(×tamp)) { - run.push_message(self.local_path.as_deref(), message, &self.nexus_settings)?; + run.push_frame_eventlist_message( + self.local_path.as_deref(), + message, + &self.nexus_settings, + )?; Some(run) } else { warn!("No run found for message with timestamp: {timestamp}"); @@ -288,7 +307,7 @@ mod test { fbb: &'b mut FlatBufferBuilder, name: &str, start_time: u64, - ) -> anyhow::Result, InvalidFlatbuffer> { + ) -> Result, InvalidFlatbuffer> { let args = RunStartArgs { start_time, run_name: Some(fbb.create_string(name)), @@ -304,7 +323,7 @@ mod test { fbb: &'b mut FlatBufferBuilder, name: &str, stop_time: u64, - ) -> anyhow::Result, InvalidFlatbuffer> { + ) -> Result, InvalidFlatbuffer> { let args = RunStopArgs { stop_time, run_name: Some(fbb.create_string(name)), @@ -329,7 +348,7 @@ mod test { fn create_frame_assembled_message<'a, 'b: 'a>( fbb: &'b mut FlatBufferBuilder, timestamp: &GpsTime, - ) -> anyhow::Result, InvalidFlatbuffer> { + ) -> Result, InvalidFlatbuffer> { let metadata = FrameMetadataV2::create(fbb, &create_metadata(timestamp)); let args = FrameAssembledEventListMessageArgs { metadata: Some(metadata), diff --git a/nexus-writer/src/nexus/error.rs b/nexus-writer/src/nexus/error.rs new file mode 100644 index 000000000..c0e427d09 --- /dev/null +++ b/nexus-writer/src/nexus/error.rs @@ -0,0 +1,87 @@ +use super::{hdf5_file::NexusHDF5Error, NexusDateTime}; +use glob::{GlobError, PatternError}; +use std::{num::TryFromIntError, path::PathBuf}; +use supermusr_streaming_types::time_conversions::GpsTimeConversionError; +use thiserror::Error; + +pub(crate) type NexusWriterResult = Result; + +#[derive(Debug, Error)] +pub(crate) enum ErrorCodeLocation { + #[error("set_stop_if_valid")] + SetStopIfValid, + #[error("set_aborted_run")] + SetAbortedRun, + #[error("RunParameters::new")] + NewRunParamemters, + #[error("stop_command")] + StopCommand, + #[error("process_event_list")] + ProcessEventList, + #[error("resume_partial_runs local directory path")] + ResumePartialRunsLocalDirectoryPath, + #[error("resume_partial_runs file path")] + ResumePartialRunsFilePath, +} + +#[derive(Debug, Error)] +pub(crate) enum NexusWriterError { + #[error("{0}")] + HDF5(#[from] NexusHDF5Error), + #[error("Flatbuffer Timestamp Conversion Error {0}")] + FlatBufferTimestampConversion(#[from] GpsTimeConversionError), + #[error("{0}")] + FlatBufferMissing(FlatBufferMissingError, ErrorCodeLocation), + #[error("Unexpected RunStop Command")] + UnexpectedRunStop(ErrorCodeLocation), + #[error("Cannot convert local path to string: {path}")] + CannotConvertPath { + path: PathBuf, + location: ErrorCodeLocation, + }, + #[error("Glob Pattern Error: {0}")] + GlobPattern(#[from] PatternError), + #[error("Glob Error: {0}")] + Glob(#[from] GlobError), + #[error("Integer Conversion Error")] + TryFromInt(#[from] TryFromIntError), + #[error("Start Time {int} Out of Range for DateTime at {location}")] + IntOutOfRangeForDateTime { + int: u64, + location: ErrorCodeLocation, + }, + #[error("Stop Command before Start Command at {0}")] + StopCommandBeforeStartCommand(ErrorCodeLocation), + #[error("Stop Time {stop} earlier than current Start Time {start} at {location}")] + StopTimeEarlierThanStartTime { + start: NexusDateTime, + stop: NexusDateTime, + location: ErrorCodeLocation, + }, + #[error("RunStop already set at {0}")] + RunStopAlreadySet(ErrorCodeLocation), +} + +#[derive(Debug, Error)] +pub(crate) enum FlatBufferInvalidDataTypeContext { + #[error("Run Log")] + RunLog, + #[error("Sample Environment Log")] + SELog, +} + +#[derive(Debug, Error)] +pub(crate) enum FlatBufferMissingError { + #[error("Flatbuffer Timestamp Missing")] + Timestamp, + #[error("Flatbuffer Channels Missing")] + Channels, + #[error("Flatbuffer Intensities Missing")] + Intensities, + #[error("Flatbuffer Times Missing")] + Times, + #[error("Flatbuffer Run Name Missing")] + RunName, + #[error("Flatbuffer Instrument Name Missing")] + InstrumentName, +} diff --git a/nexus-writer/src/nexus/hdf5_file/error.rs b/nexus-writer/src/nexus/hdf5_file/error.rs new file mode 100644 index 000000000..d02716a4a --- /dev/null +++ b/nexus-writer/src/nexus/hdf5_file/error.rs @@ -0,0 +1,266 @@ +use crate::nexus::error::{FlatBufferInvalidDataTypeContext, FlatBufferMissingError}; +use hdf5::{types::TypeDescriptor, Attribute, Dataset, Group}; +use std::error::Error; +use supermusr_streaming_types::time_conversions::GpsTimeConversionError; +use thiserror::Error; + +pub(crate) type NexusHDF5Result = Result; + +const NO_HDF5_PATH_SET: &str = "[No HDF5 Path Set]"; + +#[derive(Debug, Error)] +pub(crate) enum NexusHDF5Error { + #[error("{error} at {0}", hdf5_path.as_deref().unwrap_or(NO_HDF5_PATH_SET))] + HDF5 { + error: hdf5::Error, + hdf5_path: Option, + }, + #[error("{error} at {0}", hdf5_path.as_deref().unwrap_or(NO_HDF5_PATH_SET))] + DateTimeConversion { + error: chrono::ParseError, + hdf5_path: Option, + }, + #[error("{error} at {0}", hdf5_path.as_deref().unwrap_or(NO_HDF5_PATH_SET))] + HDF5String { + error: hdf5::types::StringError, + hdf5_path: Option, + }, + #[error("Flatbuffer Timestamp Conversion Error {error} at {0}", hdf5_path.as_deref().unwrap_or(NO_HDF5_PATH_SET))] + FlatBufferTimestampConversion { + error: GpsTimeConversionError, + hdf5_path: Option, + }, + #[error("Flatbuffer Timestamp Error Converting to Nanoseconds at {0}", hdf5_path.as_deref().unwrap_or(NO_HDF5_PATH_SET))] + FlatBufferTimestampConvertToNanoseconds { hdf5_path: Option }, + #[error("{error} at {0}", hdf5_path.as_deref().unwrap_or(NO_HDF5_PATH_SET))] + FlatBufferMissing { + error: FlatBufferMissingError, + hdf5_path: Option, + }, + #[error("Invalid FlatBuffer {context} Data Type {error} at {0}", hdf5_path.as_deref().unwrap_or(NO_HDF5_PATH_SET))] + FlatBufferInvalidDataType { + context: FlatBufferInvalidDataTypeContext, + error: String, + hdf5_path: Option, + }, + #[error("Inconsistent Numbers of Sample Environment Log Times and Values {0} != {1} at {2}", sizes.0, sizes.1, hdf5_path.as_deref().unwrap_or(NO_HDF5_PATH_SET))] + FlatBufferInconsistentSELogTimeValueSizes { + sizes: (usize, usize), + hdf5_path: Option, + }, + #[error("Invalid HDF5 Type {error} at {0}", hdf5_path.as_deref().unwrap_or(NO_HDF5_PATH_SET))] + InvalidHDF5Type { + error: TypeDescriptor, + hdf5_path: Option, + }, + #[error("Invalid HDF5 Conversion {error} at {0}", hdf5_path.as_deref().unwrap_or(NO_HDF5_PATH_SET))] + InvalidHDF5TypeConversion { + error: TypeDescriptor, + hdf5_path: Option, + }, + #[error("IO Error {error} at {0}", hdf5_path.as_deref().unwrap_or(NO_HDF5_PATH_SET))] + IO { + error: std::io::Error, + hdf5_path: Option, + }, +} + +impl NexusHDF5Error { + fn with_hdf5_path(self, path: String) -> Self { + match self { + Self::HDF5 { + error, + hdf5_path: _, + } => Self::HDF5 { + error, + hdf5_path: Some(path), + }, + Self::DateTimeConversion { + error, + hdf5_path: _, + } => Self::DateTimeConversion { + error, + hdf5_path: Some(path), + }, + Self::HDF5String { + error, + hdf5_path: _, + } => Self::HDF5String { + error, + hdf5_path: Some(path), + }, + Self::FlatBufferTimestampConversion { + error, + hdf5_path: _, + } => Self::FlatBufferTimestampConversion { + error, + hdf5_path: Some(path), + }, + Self::FlatBufferTimestampConvertToNanoseconds { hdf5_path: _ } => { + Self::FlatBufferTimestampConvertToNanoseconds { + hdf5_path: Some(path), + } + } + Self::FlatBufferMissing { + error, + hdf5_path: _, + } => Self::FlatBufferMissing { + error, + hdf5_path: Some(path), + }, + Self::FlatBufferInvalidDataType { + context, + error, + hdf5_path: _, + } => Self::FlatBufferInvalidDataType { + context, + error, + hdf5_path: Some(path), + }, + Self::FlatBufferInconsistentSELogTimeValueSizes { + sizes, + hdf5_path: _, + } => Self::FlatBufferInconsistentSELogTimeValueSizes { + sizes, + hdf5_path: Some(path), + }, + Self::InvalidHDF5Type { + error, + hdf5_path: _, + } => Self::InvalidHDF5Type { + error, + hdf5_path: Some(path), + }, + Self::InvalidHDF5TypeConversion { + error, + hdf5_path: _, + } => Self::InvalidHDF5TypeConversion { + error, + hdf5_path: Some(path), + }, + Self::IO { + error, + hdf5_path: _, + } => Self::IO { + error, + hdf5_path: Some(path), + }, + } + } + + pub(crate) fn new_flatbuffer_missing(error: FlatBufferMissingError) -> Self { + Self::FlatBufferMissing { + error, + hdf5_path: None, + } + } + + pub(crate) fn new_flatbuffer_timestamp_convert_to_nanoseconds() -> Self { + Self::FlatBufferTimestampConvertToNanoseconds { hdf5_path: None } + } + + pub(crate) fn new_flatbuffer_invalid_data_type( + context: FlatBufferInvalidDataTypeContext, + error: String, + ) -> Self { + Self::FlatBufferInvalidDataType { + context, + error, + hdf5_path: None, + } + } + + pub(crate) fn new_invalid_hdf5_type_conversion(error: TypeDescriptor) -> Self { + Self::InvalidHDF5TypeConversion { + error, + hdf5_path: None, + } + } +} + +impl From for NexusHDF5Error { + fn from(error: hdf5::Error) -> Self { + NexusHDF5Error::HDF5 { + error, + hdf5_path: None, + } + } +} + +impl From for NexusHDF5Error { + fn from(error: hdf5::types::StringError) -> Self { + NexusHDF5Error::HDF5String { + error, + hdf5_path: None, + } + } +} + +impl From for NexusHDF5Error { + fn from(error: chrono::ParseError) -> Self { + NexusHDF5Error::DateTimeConversion { + error, + hdf5_path: None, + } + } +} + +impl From for NexusHDF5Error { + fn from(error: GpsTimeConversionError) -> Self { + NexusHDF5Error::FlatBufferTimestampConversion { + error, + hdf5_path: None, + } + } +} + +impl From for NexusHDF5Error { + fn from(error: FlatBufferMissingError) -> Self { + NexusHDF5Error::FlatBufferMissing { + error, + hdf5_path: None, + } + } +} + +impl From for NexusHDF5Error { + fn from(error: std::io::Error) -> Self { + NexusHDF5Error::IO { + error, + hdf5_path: None, + } + } +} + +/// Used to allow errors which can be convertex to NexusHDF5Errors to be +/// appended with hdf5 paths +pub(crate) trait ConvertResult +where + E: Error + Into, +{ + fn err_group(self, group: &Group) -> NexusHDF5Result; + fn err_dataset(self, dataset: &Dataset) -> NexusHDF5Result; + fn err_attribute(self, attribute: &Attribute) -> NexusHDF5Result; + fn err_file(self) -> NexusHDF5Result; +} + +impl ConvertResult for Result +where + E: Error + Into, +{ + fn err_group(self, group: &Group) -> NexusHDF5Result { + self.map_err(|e| e.into().with_hdf5_path(group.name())) + } + + fn err_dataset(self, dataset: &Dataset) -> NexusHDF5Result { + self.map_err(|e| e.into().with_hdf5_path(dataset.name())) + } + + fn err_attribute(self, attribute: &Attribute) -> NexusHDF5Result { + self.map_err(|e| e.into().with_hdf5_path(attribute.name())) + } + + fn err_file(self) -> NexusHDF5Result { + self.map_err(|e| e.into().with_hdf5_path("File Level".to_owned())) + } +} diff --git a/nexus-writer/src/nexus/hdf5_file/hdf5_writer.rs b/nexus-writer/src/nexus/hdf5_file/hdf5_writer.rs index 9df09a8f1..d69af01b1 100644 --- a/nexus-writer/src/nexus/hdf5_file/hdf5_writer.rs +++ b/nexus-writer/src/nexus/hdf5_file/hdf5_writer.rs @@ -1,80 +1,79 @@ +use super::{ + error::{ConvertResult, NexusHDF5Result}, + NexusHDF5Error, +}; +use crate::nexus::NexusDateTime; use hdf5::{ - types::{IntSize, TypeDescriptor, VarLenUnicode}, - Dataset, Group, H5Type, Location, SimpleExtents, + types::{FloatSize, IntSize, TypeDescriptor, VarLenUnicode}, + Attribute, Dataset, DatasetBuilderEmpty, Group, H5Type, SimpleExtents, }; +use ndarray::s; -pub(super) fn add_new_group_to(parent: &Group, name: &str, class: &str) -> anyhow::Result { - let group = parent.create_group(name)?; - set_group_nx_class(&group, class)?; - Ok(group) -} - -pub(super) fn add_attribute_to(parent: &Location, attr: &str, value: &str) -> anyhow::Result<()> { - parent - .new_attr::() - .create(attr)? - .write_scalar(&value.parse::()?)?; - Ok(()) +pub(crate) trait HasAttributesExt { + fn add_attribute_to(&self, attr: &str, value: &str) -> NexusHDF5Result<()>; + fn get_attribute(&self, attr: &str) -> NexusHDF5Result; } -pub(super) fn set_group_nx_class(parent: &Group, class: &str) -> anyhow::Result<()> { - add_attribute_to(parent, "NX_class", class) +pub(crate) trait GroupExt { + fn add_new_group_to(&self, name: &str, class: &str) -> NexusHDF5Result; + fn set_nx_class(&self, class: &str) -> NexusHDF5Result<()>; + fn create_resizable_empty_dataset( + &self, + name: &str, + chunk_size: usize, + ) -> NexusHDF5Result; + fn create_dynamic_resizable_empty_dataset( + &self, + name: &str, + type_descriptor: &TypeDescriptor, + chunk_size: usize, + ) -> NexusHDF5Result; + fn create_scalar_dataset(&self, name: &str) -> NexusHDF5Result; + fn get_dataset(&self, name: &str) -> NexusHDF5Result; + fn get_dataset_or_create_dynamic_resizable_empty_dataset( + &self, + name: &str, + type_descriptor: &TypeDescriptor, + chunk_size: usize, + ) -> NexusHDF5Result; + fn get_dataset_or_else(&self, name: &str, f: F) -> NexusHDF5Result + where + F: Fn(&Group) -> NexusHDF5Result; + fn get_group(&self, name: &str) -> NexusHDF5Result; + fn get_group_or_create_new(&self, name: &str, class: &str) -> NexusHDF5Result; } -pub(super) fn set_string_to(target: &Dataset, value: &str) -> anyhow::Result<()> { - Ok(target.write_scalar(&value.parse::()?)?) +pub(crate) trait AttributeExt { + fn get_datetime_from(&self) -> NexusHDF5Result; } -pub(super) fn set_slice_to(target: &Dataset, value: &[T]) -> anyhow::Result<()> { - target.resize(value.len())?; - Ok(target.write_raw(value)?) +impl AttributeExt for Attribute { + fn get_datetime_from(&self) -> NexusHDF5Result { + let string: VarLenUnicode = self.read_scalar().err_attribute(self)?; + string.parse().err_attribute(self) + } } -pub(super) fn create_resizable_dataset( - parent: &Group, - name: &str, - initial_size: usize, - chunk_size: usize, -) -> anyhow::Result { - Ok(parent - .new_dataset::() - .shape(SimpleExtents::resizable(vec![initial_size])) - .chunk(vec![chunk_size]) - .create(name)?) -} +impl HasAttributesExt for Group { + fn add_attribute_to(&self, attr: &str, value: &str) -> NexusHDF5Result<()> { + self.new_attr::() + .create(attr) + .err_group(self)? + .write_scalar(&value.parse::().err_group(self)?) + .err_group(self)?; + Ok(()) + } -pub(super) fn _create_resizable_2d_dataset( - parent: &Group, - name: &str, - initial_size: (usize, usize), - chunk_size: (usize, usize), -) -> anyhow::Result { - Ok(parent - .new_dataset::() - .shape(SimpleExtents::resizable(vec![ - initial_size.0, - initial_size.1, - ])) - .chunk(vec![chunk_size.0, chunk_size.1]) - .create(name)?) + fn get_attribute(&self, attr: &str) -> NexusHDF5Result { + self.attr(attr).err_group(self) + } } -pub(super) fn _create_resizable_2d_dataset_dyn_type( +fn get_dataset_builder( + type_descriptor: &TypeDescriptor, parent: &Group, - name: &str, - hdf5_type: &TypeDescriptor, - initial_size: (usize, usize), - chunk_size: (usize, usize), -) -> anyhow::Result { - let hdf5_type = { - if let TypeDescriptor::VarLenArray(t) = hdf5_type { - t - } else { - hdf5_type - } - }; - - let dataset = match hdf5_type { +) -> Result { + Ok(match type_descriptor { TypeDescriptor::Integer(sz) => match sz { IntSize::U1 => parent.new_dataset::(), IntSize::U2 => parent.new_dataset::(), @@ -88,16 +87,272 @@ pub(super) fn _create_resizable_2d_dataset_dyn_type( IntSize::U8 => parent.new_dataset::(), }, TypeDescriptor::Float(sz) => match sz { - hdf5::types::FloatSize::U4 => parent.new_dataset::(), - hdf5::types::FloatSize::U8 => parent.new_dataset::(), + FloatSize::U4 => parent.new_dataset::(), + FloatSize::U8 => parent.new_dataset::(), }, - _ => unreachable!(), - }; - Ok(dataset - .shape(SimpleExtents::resizable(vec![ - initial_size.0, - initial_size.1, - ])) - .chunk(vec![chunk_size.0, chunk_size.1]) - .create(name)?) + TypeDescriptor::VarLenUnicode => parent.new_dataset::(), + _ => { + return Err(NexusHDF5Error::InvalidHDF5Type { + error: type_descriptor.clone(), + hdf5_path: None, + }) + } + }) +} + +impl GroupExt for Group { + fn add_new_group_to(&self, name: &str, class: &str) -> NexusHDF5Result { + let group = self.create_group(name).err_group(self)?; + group.set_nx_class(class)?; + Ok(group) + } + + fn set_nx_class(&self, class: &str) -> NexusHDF5Result<()> { + self.add_attribute_to("NX_class", class) + } + + fn create_scalar_dataset(&self, name: &str) -> NexusHDF5Result { + self.new_dataset::().create(name).err_group(self) + } + + fn create_resizable_empty_dataset( + &self, + name: &str, + chunk_size: usize, + ) -> NexusHDF5Result { + self.new_dataset::() + .shape(SimpleExtents::resizable(vec![0])) + .chunk(vec![chunk_size]) + .create(name) + .err_group(self) + } + + fn create_dynamic_resizable_empty_dataset( + &self, + name: &str, + type_descriptor: &TypeDescriptor, + chunk_size: usize, + ) -> NexusHDF5Result { + get_dataset_builder(type_descriptor, self) + .err_group(self)? + .shape(SimpleExtents::resizable(vec![0])) + .chunk(chunk_size) + .create(name) + .err_group(self) + } + + fn get_dataset(&self, name: &str) -> NexusHDF5Result { + self.dataset(name).err_group(self) + } + + fn get_dataset_or_else(&self, name: &str, f: F) -> NexusHDF5Result + where + F: Fn(&Group) -> NexusHDF5Result, + { + self.dataset(name).or_else(|_| f(self)) + } + + fn get_dataset_or_create_dynamic_resizable_empty_dataset( + &self, + name: &str, + type_descriptor: &TypeDescriptor, + chunk_size: usize, + ) -> NexusHDF5Result { + self.dataset(name).or_else(|_| { + self.create_dynamic_resizable_empty_dataset(name, type_descriptor, chunk_size) + }) + } + + fn get_group(&self, name: &str) -> NexusHDF5Result { + self.group(name).err_group(self) + } + + fn get_group_or_create_new(&self, name: &str, class: &str) -> NexusHDF5Result { + self.group(name) + .or_else(|_| self.add_new_group_to(name, class)) + } +} + +impl HasAttributesExt for Dataset { + fn add_attribute_to(&self, attr: &str, value: &str) -> NexusHDF5Result<()> { + self.new_attr::() + .create(attr) + .err_dataset(self)? + .write_scalar(&value.parse::().err_dataset(self)?) + .err_dataset(self)?; + Ok(()) + } + + fn get_attribute(&self, attr: &str) -> NexusHDF5Result { + self.attr(attr).err_dataset(self) + } +} + +pub(crate) trait DatasetExt { + fn set_scalar_to(&self, value: &T) -> NexusHDF5Result<()>; + fn get_scalar_from(&self) -> NexusHDF5Result; + fn set_string_to(&self, value: &str) -> NexusHDF5Result<()>; + fn get_string_from(&self) -> NexusHDF5Result; + fn get_datetime_from(&self) -> NexusHDF5Result; + fn set_slice_to(&self, value: &[T]) -> NexusHDF5Result<()>; + fn append_slice(&self, value: &[T]) -> NexusHDF5Result<()>; +} + +impl DatasetExt for Dataset { + fn set_scalar_to(&self, value: &T) -> NexusHDF5Result<()> { + self.write_scalar(value).err_dataset(self) + } + + fn get_scalar_from(&self) -> NexusHDF5Result { + self.read_scalar().err_dataset(self) + } + + fn set_string_to(&self, value: &str) -> NexusHDF5Result<()> { + self.write_scalar(&value.parse::().err_dataset(self)?) + .err_dataset(self) + } + + fn get_string_from(&self) -> NexusHDF5Result { + let string: VarLenUnicode = self.read_scalar().err_dataset(self)?; + Ok(string.into()) + } + + fn get_datetime_from(&self) -> NexusHDF5Result { + let string: VarLenUnicode = self.read_scalar().err_dataset(self)?; + string.parse().err_dataset(self) + } + + fn set_slice_to(&self, value: &[T]) -> NexusHDF5Result<()> { + self.resize(value.len()).err_dataset(self)?; + self.write_raw(value).err_dataset(self) + } + + fn append_slice(&self, value: &[T]) -> NexusHDF5Result<()> { + let cur_size = self.size(); + let new_size = cur_size + value.len(); + self.resize(new_size).err_dataset(self)?; + self.write_slice(value, s![cur_size..new_size]) + .err_dataset(self) + } +} + +#[cfg(test)] +mod tests { + use std::{env::temp_dir, ops::Deref, path::PathBuf}; + + use super::*; + + // Helper struct to create and tidy-up a temp hdf5 file + struct OneTempFile(Option, PathBuf); + // Suitably long temp file name, unlikely to clash with anything else + const TEMP_FILE_PREFIX: &str = "temp_supermusr_pipeline_nexus_writer_file"; + + impl OneTempFile { + // We need a different file for each test, so they can run in parallel + fn new(test_name: &str) -> Self { + let mut path = temp_dir(); + path.push(format!("{TEMP_FILE_PREFIX}_{test_name}.nxs")); + Self(Some(hdf5::File::create(&path).unwrap()), path) + } + } + + // Cleans up the temp directory after our test + impl Drop for OneTempFile { + fn drop(&mut self) { + let file = self.0.take().unwrap(); + file.close().unwrap(); + std::fs::remove_file(&self.1).unwrap(); + } + } + + // So we can use our OneTempFile as an hdf5 file + impl Deref for OneTempFile { + type Target = hdf5::File; + + fn deref(&self) -> &Self::Target { + self.0.as_ref().unwrap() + } + } + + #[test] + fn create_group() { + let file = OneTempFile::new("create_group"); + let maybe_group = file.get_group_or_create_new("my_group", "my_class"); + + assert!(maybe_group.is_ok()); + assert_eq!(maybe_group.unwrap().name().as_str(), "/my_group"); + } + + #[test] + fn create_nested_group() { + let file = OneTempFile::new("create_nested_group"); + let group = file + .get_group_or_create_new("my_group", "my_class") + .unwrap(); + let maybe_subgroup = group.get_group_or_create_new("my_subgroup", "my_subclass"); + + assert!(maybe_subgroup.is_ok()); + assert_eq!( + maybe_subgroup.unwrap().name().as_str(), + "/my_group/my_subgroup" + ); + } + + #[test] + fn create_dataset() { + let file = OneTempFile::new("create_dataset"); + let maybe_dataset = file.get_dataset_or_else("my_dataset", |group| { + group.create_scalar_dataset::("my_dataset") + }); + + assert!(maybe_dataset.is_ok()); + assert_eq!(maybe_dataset.unwrap().name().as_str(), "/my_dataset"); + } + + #[test] + fn open_nonexistant_group() { + let file = OneTempFile::new("open_nonexistant_group"); + let maybe_group = file.get_group("non_existant_group"); + + assert!(maybe_group.is_err()); + + const EXPECTED_ERR_MSG : &str = "H5Gopen2(): unable to synchronously open group: object 'non_existant_group' doesn't exist at /"; + assert_eq!(maybe_group.err().unwrap().to_string(), EXPECTED_ERR_MSG); + } + + #[test] + fn open_nonexistant_dataset() { + let file = OneTempFile::new("open_nonexistant_dataset"); + let maybe_dataset = file.get_dataset("non_existant_dataset"); + + assert!(maybe_dataset.is_err()); + + const EXPECTED_ERR_MSG : &str = "H5Dopen2(): unable to synchronously open dataset: object 'non_existant_dataset' doesn't exist at /"; + assert_eq!(maybe_dataset.err().unwrap().to_string(), EXPECTED_ERR_MSG); + } + + #[test] + fn open_nonexistant_nested_dataset() { + let file = OneTempFile::new("open_nonexistant_nested_dataset"); + let group = file + .get_group_or_create_new("my_group", "my_class") + .unwrap(); + let maybe_subgroup = group.get_dataset("my_subgroup"); + + assert!(maybe_subgroup.is_err()); + + const EXPECTED_ERR_MSG : &str = "H5Dopen2(): unable to synchronously open dataset: object 'my_subgroup' doesn't exist at /my_group"; + assert_eq!(maybe_subgroup.err().unwrap().to_string(), EXPECTED_ERR_MSG); + } + + #[test] + fn open_nonexistant_attribute() { + let file = OneTempFile::new("open_nonexistant_attribute"); + let maybe_dataset = file.get_attribute("non_existant_attribute"); + + assert!(maybe_dataset.is_err()); + + const EXPECTED_ERR_MSG : &str = "H5Aopen(): unable to synchronously open attribute: can't locate attribute: 'non_existant_attribute' at /"; + assert_eq!(maybe_dataset.err().unwrap().to_string(), EXPECTED_ERR_MSG); + } } diff --git a/nexus-writer/src/nexus/hdf5_file/mod.rs b/nexus-writer/src/nexus/hdf5_file/mod.rs index a00165ca3..35321af61 100644 --- a/nexus-writer/src/nexus/hdf5_file/mod.rs +++ b/nexus-writer/src/nexus/hdf5_file/mod.rs @@ -1,10 +1,8 @@ +mod error; mod hdf5_writer; mod run_file; mod run_file_components; -use hdf5_writer::{ - add_attribute_to, add_new_group_to, create_resizable_dataset, set_group_nx_class, set_slice_to, - set_string_to, -}; +pub(crate) use error::NexusHDF5Error; pub(crate) use run_file::RunFile; use run_file_components::EventRun; diff --git a/nexus-writer/src/nexus/hdf5_file/run_file.rs b/nexus-writer/src/nexus/hdf5_file/run_file.rs index 2991ff2d6..9bffe9fb5 100644 --- a/nexus-writer/src/nexus/hdf5_file/run_file.rs +++ b/nexus-writer/src/nexus/hdf5_file/run_file.rs @@ -1,15 +1,16 @@ use super::{ - add_attribute_to, add_new_group_to, create_resizable_dataset, set_group_nx_class, set_slice_to, - set_string_to, EventRun, + error::{ConvertResult, NexusHDF5Result}, + hdf5_writer::{DatasetExt, GroupExt, HasAttributesExt}, + EventRun, }; use crate::nexus::{ hdf5_file::run_file_components::{RunLog, SeLog}, nexus_class as NX, run_parameters::RunStopParameters, - NexusConfiguration, NexusSettings, RunParameters, DATETIME_FORMAT, + NexusConfiguration, NexusDateTime, NexusSettings, RunParameters, DATETIME_FORMAT, }; -use chrono::{DateTime, Utc}; -use hdf5::{types::VarLenUnicode, Dataset, File, H5Type}; +use chrono::Utc; +use hdf5::{types::VarLenUnicode, Dataset, File}; use std::{fs::create_dir_all, path::Path}; use supermusr_streaming_types::{ aev2_frame_assembled_event_v2_generated::FrameAssembledEventListMessage, @@ -58,55 +59,48 @@ impl RunFileContents { pub(crate) fn populate_new_runfile( file: &File, nexus_settings: &NexusSettings, - ) -> anyhow::Result { - set_group_nx_class(file, NX::ROOT)?; + ) -> NexusHDF5Result { + file.set_nx_class(NX::ROOT)?; - add_attribute_to(file, "HDF5_version", "1.14.3")?; // Can this be taken directly from the nix package; - add_attribute_to(file, "NeXus_version", "")?; // Where does this come from? - add_attribute_to(file, "file_name", &file.filename())?; // This should be absolutized at some point - add_attribute_to(file, "file_time", Utc::now().to_string().as_str())?; // This should be formatted, the nanoseconds are overkill. + file.add_attribute_to("HDF5_version", "1.14.3")?; // Can this be taken directly from the nix package; + file.add_attribute_to("NeXus_version", "")?; // Where does this come from? + file.add_attribute_to("file_name", &file.filename())?; // This should be absolutized at some point + file.add_attribute_to("file_time", Utc::now().to_string().as_str())?; // This should be formatted, the nanoseconds are overkill. - let entry = add_new_group_to(file, "raw_data_1", NX::ENTRY)?; + let entry = file.add_new_group_to("raw_data_1", NX::ENTRY)?; - let idf_version = entry.new_dataset::().create("IDF_version")?; - let definition = entry.new_dataset::().create("definition")?; - let program_name = entry - .new_dataset::() - .create("program_name")?; + let idf_version = entry.create_scalar_dataset::("IDF_version")?; + let definition = entry.create_scalar_dataset::("definition")?; + let program_name = entry.create_scalar_dataset::("program_name")?; - let run_number = entry.new_dataset::().create("run_number")?; - let experiment_identifier = entry - .new_dataset::() - .create("experiment_identifier")?; + let run_number = entry.create_scalar_dataset::("run_number")?; + let experiment_identifier = + entry.create_scalar_dataset::("experiment_identifier")?; - let start_time = entry.new_dataset::().create("start_time")?; - let end_time = entry.new_dataset::().create("end_time")?; + let start_time = entry.create_scalar_dataset::("start_time")?; + let end_time = entry.create_scalar_dataset::("end_time")?; - let name = entry.new_dataset::().create("name")?; - let title = entry.new_dataset::().create("title")?; + let name = entry.create_scalar_dataset::("name")?; + let title = entry.create_scalar_dataset::("title")?; - let instrument = add_new_group_to(&entry, "instrument", NX::INSTRUMENT)?; - let instrument_name = instrument.new_dataset::().create("name")?; + let instrument = entry.add_new_group_to("instrument", NX::INSTRUMENT)?; + let instrument_name = instrument.create_scalar_dataset::("name")?; let logs = RunLog::new_runlog(&entry)?; - let periods = add_new_group_to(&entry, "periods", NX::PERIOD)?; - let period_number = periods.new_dataset::().create("number")?; - let period_type = create_resizable_dataset::( - &periods, - "type", - 0, - nexus_settings.periodlist_chunk_size, - )?; + let periods = entry.add_new_group_to("periods", NX::PERIOD)?; + let period_number = periods.create_scalar_dataset::("number")?; + let period_type = periods + .create_resizable_empty_dataset::("type", nexus_settings.periodlist_chunk_size)?; let selogs = SeLog::new_selog(&entry)?; - let source = add_new_group_to(&instrument, "source", NX::SOURCE)?; - let source_name = source.new_dataset::().create("name")?; - let source_type = source.new_dataset::().create("type")?; - let source_probe = source.new_dataset::().create("probe")?; + let source = instrument.add_new_group_to("source", NX::SOURCE)?; + let source_name = source.create_scalar_dataset::("name")?; + let source_type = source.create_scalar_dataset::("type")?; + let source_probe = source.create_scalar_dataset::("probe")?; - let _detector = add_new_group_to(&instrument, "detector", NX::DETECTOR)?; + let _detector = instrument.add_new_group_to("detector", NX::DETECTOR)?; let lists = EventRun::new_event_runfile(&entry, nexus_settings)?; @@ -132,38 +126,38 @@ impl RunFileContents { }) } - fn populate_open_runfile(file: &File) -> anyhow::Result { - let entry = file.group("raw_data_1")?; + fn populate_open_runfile(file: &File) -> NexusHDF5Result { + let entry = file.get_group("raw_data_1")?; - let idf_version = entry.dataset("IDF_version")?; - let definition = entry.dataset("definition")?; - let run_number = entry.dataset("run_number")?; - let program_name = entry.dataset("program_name")?; - let experiment_identifier = entry.dataset("experiment_identifier")?; + let idf_version = entry.get_dataset("IDF_version")?; + let definition = entry.get_dataset("definition")?; + let run_number = entry.get_dataset("run_number")?; + let program_name = entry.get_dataset("program_name")?; + let experiment_identifier = entry.get_dataset("experiment_identifier")?; - let start_time = entry.dataset("start_time")?; - let end_time = entry.dataset("end_time")?; + let start_time = entry.get_dataset("start_time")?; + let end_time = entry.get_dataset("end_time")?; - let name = entry.dataset("name")?; - let title = entry.dataset("title")?; + let name = entry.get_dataset("name")?; + let title = entry.get_dataset("title")?; - let periods = entry.group("periods")?; - let period_number = periods.dataset("number")?; - let period_type = periods.dataset("type")?; + let periods = entry.get_group("periods")?; + let period_number = periods.get_dataset("number")?; + let period_type = periods.get_dataset("type")?; let selogs = SeLog::open_selog(&entry)?; - let instrument = entry.group("instrument")?; - let instrument_name = instrument.dataset("name")?; + let instrument = entry.get_group("instrument")?; + let instrument_name = instrument.get_dataset("name")?; let logs = RunLog::open_runlog(&entry)?; - let source = instrument.group("source")?; - let source_name = source.dataset("name")?; - let source_type = source.dataset("type")?; - let source_probe = source.dataset("probe")?; + let source = instrument.get_group("source")?; + let source_name = source.get_dataset("name")?; + let source_type = source.get_dataset("type")?; + let source_probe = source.get_dataset("probe")?; - let _detector = instrument.group("detector")?; + let _detector = instrument.get_group("detector")?; let lists = EventRun::open_event_runfile(&entry)?; @@ -196,31 +190,31 @@ impl RunFile { path: &Path, run_name: &str, nexus_settings: &NexusSettings, - ) -> anyhow::Result { - create_dir_all(path)?; + ) -> NexusHDF5Result { + create_dir_all(path).err_file()?; let filename = RunParameters::get_hdf5_filename(path, run_name); debug!("File save begin. File: {0}.", filename.display()); - let file = File::create(filename)?; + let file = File::create(filename).err_file()?; match RunFileContents::populate_new_runfile(&file, nexus_settings) { Ok(contents) => Ok(Self { file, contents }), Err(e) => { - file.close()?; + file.close().err_file()?; Err(e) } } } #[tracing::instrument(skip_all, err(level = "warn"))] - pub(crate) fn open_runfile(local_path: &Path, run_name: &str) -> anyhow::Result { + pub(crate) fn open_runfile(local_path: &Path, run_name: &str) -> NexusHDF5Result { let filename = RunParameters::get_hdf5_filename(local_path, run_name); debug!("File open begin. File: {0}.", filename.display()); - let file = File::open_rw(filename)?; + let file = File::open_rw(filename).err_file()?; match RunFileContents::populate_open_runfile(&file) { Ok(contents) => Ok(Self { file, contents }), Err(e) => { - file.close()?; + file.close().err_file()?; Err(e) } } @@ -231,57 +225,57 @@ impl RunFile { &mut self, parameters: &RunParameters, nexus_configuration: &NexusConfiguration, - ) -> anyhow::Result<()> { - self.contents.idf_version.write_scalar(&2)?; + ) -> NexusHDF5Result<()> { + self.contents.idf_version.set_scalar_to(&2)?; self.contents .run_number - .write_scalar(¶meters.run_number)?; - - set_string_to(&self.contents.definition, "muonTD")?; - set_string_to(&self.contents.experiment_identifier, "")?; - - set_string_to( - &self.contents.program_name, - "SuperMuSR Data Pipeline Nexus Writer", - )?; - add_attribute_to(&self.contents.program_name, "version", "1.0")?; - add_attribute_to( - &self.contents.program_name, - "configuration", - &nexus_configuration.configuration, - )?; + .set_scalar_to(¶meters.run_number)?; + + self.contents.definition.set_string_to("muonTD")?; + self.contents.experiment_identifier.set_string_to("")?; + + self.contents + .program_name + .set_string_to("SuperMuSR Data Pipeline Nexus Writer")?; + self.contents + .program_name + .add_attribute_to("version", "1.0")?; + self.contents + .program_name + .add_attribute_to("configuration", &nexus_configuration.configuration)?; let start_time = parameters.collect_from.format(DATETIME_FORMAT).to_string(); - set_string_to(&self.contents.start_time, &start_time)?; - set_string_to(&self.contents.end_time, "")?; + self.contents.start_time.set_string_to(&start_time)?; + self.contents.end_time.set_string_to("")?; - set_string_to(&self.contents.name, ¶meters.run_name)?; - set_string_to(&self.contents.title, "")?; + self.contents.name.set_string_to(¶meters.run_name)?; + self.contents.title.set_string_to("")?; - set_string_to(&self.contents.instrument_name, ¶meters.instrument_name)?; + self.contents + .instrument_name + .set_string_to(¶meters.instrument_name)?; self.contents .period_number - .write_scalar(¶meters.num_periods)?; - set_slice_to( - &self.contents.period_type, - &vec![1; parameters.num_periods as usize], - )?; + .set_scalar_to(¶meters.num_periods)?; + self.contents + .period_type + .set_slice_to(&vec![1; parameters.num_periods as usize])?; - set_string_to(&self.contents.source_name, "MuSR")?; - set_string_to(&self.contents.source_type, "")?; - set_string_to(&self.contents.source_probe, "")?; + self.contents.source_name.set_string_to("MuSR")?; + self.contents.source_type.set_string_to("")?; + self.contents.source_probe.set_string_to("")?; self.contents.lists.init(¶meters.collect_from)?; Ok(()) } #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] - pub(crate) fn set_end_time(&mut self, end_time: &DateTime) -> anyhow::Result<()> { + pub(crate) fn set_end_time(&mut self, end_time: &NexusDateTime) -> NexusHDF5Result<()> { let end_time = end_time.format(DATETIME_FORMAT).to_string(); - set_string_to(&self.contents.end_time, &end_time)?; + self.contents.end_time.set_string_to(&end_time)?; Ok(()) } @@ -289,81 +283,65 @@ impl RunFile { pub(crate) fn push_logdata_to_runfile( &mut self, logdata: &f144_LogData, + origin_time: &NexusDateTime, nexus_settings: &NexusSettings, - ) -> anyhow::Result<()> { + ) -> NexusHDF5Result<()> { self.contents .logs - .push_logdata_to_runlog(logdata, nexus_settings) + .push_logdata_to_runlog(logdata, origin_time, nexus_settings) } #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] - pub(crate) fn push_alarm_to_runfile(&mut self, alarm: Alarm) -> anyhow::Result<()> { - self.contents.selogs.push_alarm_to_selog(alarm) + pub(crate) fn push_alarm_to_runfile( + &mut self, + alarm: Alarm, + origin_time: &NexusDateTime, + nexus_settings: &NexusSettings, + ) -> NexusHDF5Result<()> { + self.contents + .selogs + .push_alarm_to_selog(alarm, origin_time, nexus_settings) } #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] pub(crate) fn push_selogdata( &mut self, selogdata: se00_SampleEnvironmentData, + origin_time: &NexusDateTime, nexus_settings: &NexusSettings, - ) -> anyhow::Result<()> { + ) -> NexusHDF5Result<()> { self.contents .selogs - .push_selogdata_to_selog(&selogdata, nexus_settings) + .push_selogdata_to_selog(&selogdata, origin_time, nexus_settings) } #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] - pub(crate) fn push_message_to_runfile( + pub(crate) fn push_frame_eventlist_message_to_runfile( &mut self, message: &FrameAssembledEventListMessage, - nexus_settings: &NexusSettings, - ) -> anyhow::Result<()> { - self.contents.lists.push_message_to_event_runfile(message)?; - - if !message.complete() { - let time_zero = self.contents.lists.get_time_zero(message)?; - - self.contents.logs.push_incomplete_frame_log( - time_zero, - message - .digitizers_present() - .unwrap_or_default() - .iter() - .collect(), - nexus_settings, - )?; - } - Ok(()) - } - - fn try_read_scalar(dataset: &Dataset) -> anyhow::Result { - if dataset.storage_size() != 0 { - if dataset.is_scalar() { - Ok(dataset.read_scalar::()?) - } else { - anyhow::bail!("{} is not a scalar", dataset.name()) - } - } else { - anyhow::bail!("{} is not allocated", dataset.name()) - } + ) -> NexusHDF5Result<()> { + self.contents + .lists + .push_frame_eventlist_message_to_runfile(message) } #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] - pub(crate) fn extract_run_parameters(&self) -> anyhow::Result { - let collect_from: DateTime = - Self::try_read_scalar::(&self.contents.start_time)?.parse()?; - let run_name = Self::try_read_scalar::(&self.contents.name)?.into(); - let run_number = Self::try_read_scalar::(&self.contents.run_number)?; - let num_periods = Self::try_read_scalar::(&self.contents.period_number)?; - let instrument_name = - Self::try_read_scalar::(&self.contents.instrument_name)?.into(); - let run_stop_parameters = Self::try_read_scalar::(&self.contents.end_time)? - .parse() + pub(crate) fn extract_run_parameters(&self) -> NexusHDF5Result { + let collect_from = self.contents.start_time.get_datetime_from()?; + let run_name = self.contents.name.get_string_from()?; + let run_number = self.contents.run_number.get_scalar_from()?; + let num_periods = self.contents.period_number.get_scalar_from()?; + let instrument_name = self.contents.instrument_name.get_string_from()?; + let run_stop_parameters = self + .contents + .end_time + .get_datetime_from() .map(|collect_until| RunStopParameters { collect_until, last_modified: Utc::now(), }) .ok(); + Ok(RunParameters { collect_from, run_stop_parameters, @@ -375,20 +353,59 @@ impl RunFile { } #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] - pub(crate) fn set_aborted_run_warning( + pub(crate) fn push_incomplete_frame_warning( + &mut self, + message: &FrameAssembledEventListMessage, + nexus_settings: &NexusSettings, + ) -> NexusHDF5Result<()> { + let time_zero = self.contents.lists.get_time_zero(message).err_file()?; + let origin = self + .contents + .lists + .get_offset() + .expect("This should never fail."); + + self.contents.logs.push_incomplete_frame_log( + time_zero, + message + .digitizers_present() + .unwrap_or_default() + .iter() + .collect(), + origin, + nexus_settings, + ) + } + + #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] + pub(crate) fn push_run_resumed_warning( + &mut self, + current_time: &NexusDateTime, + origin_time: &NexusDateTime, + nexus_settings: &NexusSettings, + ) -> NexusHDF5Result<()> { + self.contents + .logs + .push_run_resumed_warning(current_time, origin_time, nexus_settings)?; + Ok(()) + } + + #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] + pub(crate) fn push_aborted_run_warning( &mut self, - stop_time: i32, + stop_time_ms: i64, + origin_time: &NexusDateTime, nexus_settings: &NexusSettings, - ) -> anyhow::Result<()> { + ) -> NexusHDF5Result<()> { self.contents .logs - .set_aborted_run_warning(stop_time, nexus_settings)?; + .push_aborted_run_warning(stop_time_ms, origin_time, nexus_settings)?; Ok(()) } #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] - pub(crate) fn close(self) -> anyhow::Result<()> { - self.file.close()?; + pub(crate) fn close(self) -> NexusHDF5Result<()> { + self.file.close().err_file()?; Ok(()) } } diff --git a/nexus-writer/src/nexus/hdf5_file/run_file_components/event_run_file.rs b/nexus-writer/src/nexus/hdf5_file/run_file_components/event_run_file.rs index dedf4e8e3..3633bf77e 100644 --- a/nexus-writer/src/nexus/hdf5_file/run_file_components/event_run_file.rs +++ b/nexus-writer/src/nexus/hdf5_file/run_file_components/event_run_file.rs @@ -1,16 +1,19 @@ use crate::nexus::{ - hdf5_file::{add_attribute_to, add_new_group_to, create_resizable_dataset}, - nexus_class as NX, NexusSettings, + error::FlatBufferMissingError, + hdf5_file::{ + error::{ConvertResult, NexusHDF5Error, NexusHDF5Result}, + hdf5_writer::{AttributeExt, DatasetExt, GroupExt, HasAttributesExt}, + }, + nexus_class as NX, NexusDateTime, NexusSettings, }; -use chrono::{DateTime, Utc}; -use hdf5::{types::VarLenUnicode, Dataset, Group}; -use ndarray::s; +use hdf5::{Dataset, Group}; use supermusr_common::{Channel, Time}; use supermusr_streaming_types::aev2_frame_assembled_event_v2_generated::FrameAssembledEventListMessage; #[derive(Debug)] pub(crate) struct EventRun { - offset: Option>, + parent: Group, + offset: Option, num_messages: usize, num_events: usize, @@ -34,78 +37,59 @@ impl EventRun { pub(crate) fn new_event_runfile( parent: &Group, nexus_settings: &NexusSettings, - ) -> anyhow::Result { - let detector = add_new_group_to(parent, "detector_1", NX::EVENT_DATA)?; + ) -> NexusHDF5Result { + let detector = parent.add_new_group_to("detector_1", NX::EVENT_DATA)?; - let pulse_height = create_resizable_dataset::( - &detector, + let pulse_height = detector.create_resizable_empty_dataset::( "pulse_height", - 0, nexus_settings.eventlist_chunk_size, )?; - let event_id = create_resizable_dataset::( - &detector, + let event_id = detector.create_resizable_empty_dataset::( "event_id", - 0, nexus_settings.eventlist_chunk_size, )?; - let event_time_offset = create_resizable_dataset::