diff --git a/digitiser-aggregator/src/frame/cache.rs b/digitiser-aggregator/src/frame/cache.rs index d169a4212..944d633e3 100644 --- a/digitiser-aggregator/src/frame/cache.rs +++ b/digitiser-aggregator/src/frame/cache.rs @@ -33,11 +33,11 @@ where digitiser_id: DigitizerId, metadata: &FrameMetadata, data: D, - ) { + ) -> Result<(), ()> { if let Some(latest_timestamp_dispatched) = self.latest_timestamp_dispatched { if metadata.timestamp <= latest_timestamp_dispatched { warn!("Frame's timestamp earlier than or equal to the latest frame dispatched: {0} <= {1}", metadata.timestamp, latest_timestamp_dispatched); - return; + return Err(()); } } let frame = { @@ -85,6 +85,8 @@ where }) { warn!("Frame span linking failed {e}") } + + Ok(()) } pub(crate) fn poll(&mut self) -> Option> { @@ -137,20 +139,28 @@ mod test { assert!(cache.poll().is_none()); assert_eq!(cache.get_num_partial_frames(), 0); - cache.push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2])); + assert!(cache + .push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2])) + .is_ok()); assert_eq!(cache.get_num_partial_frames(), 1); assert!(cache.poll().is_none()); - cache.push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5])); + assert!(cache + .push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5])) + .is_ok()); assert!(cache.poll().is_none()); - cache.push(4, &frame_1, EventData::dummy_data(0, 5, &[6, 7, 8])); + assert!(cache + .push(4, &frame_1, EventData::dummy_data(0, 5, &[6, 7, 8])) + .is_ok()); assert!(cache.poll().is_none()); - cache.push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11])); + assert!(cache + .push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11])) + .is_ok()); { let frame = cache.poll().unwrap(); @@ -198,15 +208,21 @@ mod test { assert!(cache.poll().is_none()); - cache.push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2])); + assert!(cache + .push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2])) + .is_ok()); assert!(cache.poll().is_none()); - cache.push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5])); + assert!(cache + .push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5])) + .is_ok()); assert!(cache.poll().is_none()); - cache.push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11])); + assert!(cache + .push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11])) + .is_ok()); assert!(cache.poll().is_none()); @@ -240,6 +256,38 @@ mod test { assert!(cache.poll().is_none()); } + #[tokio::test] + async fn one_frame_in_one_frame_out_missing_digitiser_and_late_message_timeout() { + let mut cache = FrameCache::::new(Duration::from_millis(100), vec![0, 1, 4, 8]); + + let frame_1 = FrameMetadata { + timestamp: Utc::now(), + period_number: 1, + protons_per_pulse: 8, + running: true, + frame_number: 1728, + veto_flags: 4, + }; + assert!(cache + .push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2])) + .is_ok()); + assert!(cache + .push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5])) + .is_ok()); + assert!(cache + .push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11])) + .is_ok()); + + tokio::time::sleep(Duration::from_millis(105)).await; + + let _ = cache.poll().unwrap(); + + // This call to push should return an error + assert!(cache + .push(4, &frame_1, EventData::dummy_data(0, 5, &[6, 7, 8])) + .is_err()); + } + #[test] fn test_metadata_equality() { let mut cache = FrameCache::::new(Duration::from_millis(100), vec![1, 2]); @@ -268,11 +316,15 @@ mod test { assert_eq!(cache.frames.len(), 0); assert!(cache.poll().is_none()); - cache.push(1, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2])); + assert!(cache + .push(1, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2])) + .is_ok()); assert_eq!(cache.frames.len(), 1); assert!(cache.poll().is_none()); - cache.push(2, &frame_2, EventData::dummy_data(0, 5, &[0, 1, 2])); + assert!(cache + .push(2, &frame_2, EventData::dummy_data(0, 5, &[0, 1, 2])) + .is_ok()); assert_eq!(cache.frames.len(), 1); assert!(cache.poll().is_some()); } diff --git a/digitiser-aggregator/src/main.rs b/digitiser-aggregator/src/main.rs index 0a8bf7eed..a0f984486 100644 --- a/digitiser-aggregator/src/main.rs +++ b/digitiser-aggregator/src/main.rs @@ -258,6 +258,7 @@ async fn process_kafka_message( metadata_protons_per_pulse = tracing::field::Empty, metadata_running = tracing::field::Empty, num_cached_frames = cache.get_num_partial_frames(), + is_discarded, ))] async fn process_digitiser_event_list_message( channel_send: &AggregatedFrameToBufferSender, @@ -269,9 +270,12 @@ async fn process_digitiser_event_list_message( debug!("Event packet: metadata: {:?}", msg.metadata()); // Push the current digitiser message to the frame cache, possibly creating a new partial frame - cache.push(msg.digitizer_id(), &metadata, msg.into()); + let is_discarded = cache + .push(msg.digitizer_id(), &metadata, msg.into()) + .is_err(); record_metadata_fields_to_span!(&metadata, tracing::Span::current()); + tracing::Span::current().record("is_discarded", is_discarded); cache_poll(channel_send, cache).await?; }