From f279e75f5af17495dae4033f1be2d1b1f5fb6d29 Mon Sep 17 00:00:00 2001 From: Modularius Date: Sat, 1 Feb 2025 02:13:41 +0000 Subject: [PATCH 1/7] Added span --- digitiser-aggregator/src/frame/cache.rs | 6 ++++-- digitiser-aggregator/src/main.rs | 4 +++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/digitiser-aggregator/src/frame/cache.rs b/digitiser-aggregator/src/frame/cache.rs index d169a4212..1f3d88dd2 100644 --- a/digitiser-aggregator/src/frame/cache.rs +++ b/digitiser-aggregator/src/frame/cache.rs @@ -33,11 +33,11 @@ where digitiser_id: DigitizerId, metadata: &FrameMetadata, data: D, - ) { + ) -> bool { if let Some(latest_timestamp_dispatched) = self.latest_timestamp_dispatched { if metadata.timestamp <= latest_timestamp_dispatched { warn!("Frame's timestamp earlier than or equal to the latest frame dispatched: {0} <= {1}", metadata.timestamp, latest_timestamp_dispatched); - return; + return false; } } let frame = { @@ -85,6 +85,8 @@ where }) { warn!("Frame span linking failed {e}") } + + true } pub(crate) fn poll(&mut self) -> Option> { diff --git a/digitiser-aggregator/src/main.rs b/digitiser-aggregator/src/main.rs index 0a8bf7eed..1662dc3f8 100644 --- a/digitiser-aggregator/src/main.rs +++ b/digitiser-aggregator/src/main.rs @@ -258,6 +258,7 @@ async fn process_kafka_message( metadata_protons_per_pulse = tracing::field::Empty, metadata_running = tracing::field::Empty, num_cached_frames = cache.get_num_partial_frames(), + added_to_frame, ))] async fn process_digitiser_event_list_message( channel_send: &AggregatedFrameToBufferSender, @@ -269,9 +270,10 @@ async fn process_digitiser_event_list_message( debug!("Event packet: metadata: {:?}", msg.metadata()); // Push the current digitiser message to the frame cache, possibly creating a new partial frame - cache.push(msg.digitizer_id(), &metadata, msg.into()); + let success = cache.push(msg.digitizer_id(), &metadata, msg.into()); record_metadata_fields_to_span!(&metadata, tracing::Span::current()); + tracing::Span::current().record("added_to_frame", success); cache_poll(channel_send, cache).await?; } From 729b729224a5d1d4a52c43f5619c7fd176252a5c Mon Sep 17 00:00:00 2001 From: Modularius Date: Sat, 1 Feb 2025 02:15:25 +0000 Subject: [PATCH 2/7] Changed name --- digitiser-aggregator/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/digitiser-aggregator/src/main.rs b/digitiser-aggregator/src/main.rs index 1662dc3f8..28d047881 100644 --- a/digitiser-aggregator/src/main.rs +++ b/digitiser-aggregator/src/main.rs @@ -273,7 +273,7 @@ async fn process_digitiser_event_list_message( let success = cache.push(msg.digitizer_id(), &metadata, msg.into()); record_metadata_fields_to_span!(&metadata, tracing::Span::current()); - tracing::Span::current().record("added_to_frame", success); + tracing::Span::current().record("is_discarded", !success); cache_poll(channel_send, cache).await?; } From 690e8956bd9586c8b6786b967817d9c5c061acc6 Mon Sep 17 00:00:00 2001 From: Modularius Date: Sat, 1 Feb 2025 02:22:33 +0000 Subject: [PATCH 3/7] Typo --- digitiser-aggregator/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/digitiser-aggregator/src/main.rs b/digitiser-aggregator/src/main.rs index 28d047881..09ad9cebe 100644 --- a/digitiser-aggregator/src/main.rs +++ b/digitiser-aggregator/src/main.rs @@ -258,7 +258,7 @@ async fn process_kafka_message( metadata_protons_per_pulse = tracing::field::Empty, metadata_running = tracing::field::Empty, num_cached_frames = cache.get_num_partial_frames(), - added_to_frame, + is_discarded, ))] async fn process_digitiser_event_list_message( channel_send: &AggregatedFrameToBufferSender, From 935e0c328db14e9d5cafe41d02ccddd5baa8e2a6 Mon Sep 17 00:00:00 2001 From: Modularius Date: Tue, 4 Feb 2025 10:56:46 +0000 Subject: [PATCH 4/7] Made requested changes --- digitiser-aggregator/src/frame/cache.rs | 6 +++--- digitiser-aggregator/src/main.rs | 6 ++++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/digitiser-aggregator/src/frame/cache.rs b/digitiser-aggregator/src/frame/cache.rs index 1f3d88dd2..72956db2c 100644 --- a/digitiser-aggregator/src/frame/cache.rs +++ b/digitiser-aggregator/src/frame/cache.rs @@ -33,11 +33,11 @@ where digitiser_id: DigitizerId, metadata: &FrameMetadata, data: D, - ) -> bool { + ) -> Result<(), ()> { if let Some(latest_timestamp_dispatched) = self.latest_timestamp_dispatched { if metadata.timestamp <= latest_timestamp_dispatched { warn!("Frame's timestamp earlier than or equal to the latest frame dispatched: {0} <= {1}", metadata.timestamp, latest_timestamp_dispatched); - return false; + return Err(()); } } let frame = { @@ -86,7 +86,7 @@ where warn!("Frame span linking failed {e}") } - true + Ok(()) } pub(crate) fn poll(&mut self) -> Option> { diff --git a/digitiser-aggregator/src/main.rs b/digitiser-aggregator/src/main.rs index 09ad9cebe..a0f984486 100644 --- a/digitiser-aggregator/src/main.rs +++ b/digitiser-aggregator/src/main.rs @@ -270,10 +270,12 @@ async fn process_digitiser_event_list_message( debug!("Event packet: metadata: {:?}", msg.metadata()); // Push the current digitiser message to the frame cache, possibly creating a new partial frame - let success = cache.push(msg.digitizer_id(), &metadata, msg.into()); + let is_discarded = cache + .push(msg.digitizer_id(), &metadata, msg.into()) + .is_err(); record_metadata_fields_to_span!(&metadata, tracing::Span::current()); - tracing::Span::current().record("is_discarded", !success); + tracing::Span::current().record("is_discarded", is_discarded); cache_poll(channel_send, cache).await?; } From 2b8902e53a49f7a106dae3111f3983787736ed77 Mon Sep 17 00:00:00 2001 From: Modularius Date: Tue, 4 Feb 2025 11:03:36 +0000 Subject: [PATCH 5/7] Updated tests --- digitiser-aggregator/src/frame/cache.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/digitiser-aggregator/src/frame/cache.rs b/digitiser-aggregator/src/frame/cache.rs index 72956db2c..a27d130e7 100644 --- a/digitiser-aggregator/src/frame/cache.rs +++ b/digitiser-aggregator/src/frame/cache.rs @@ -139,20 +139,20 @@ mod test { assert!(cache.poll().is_none()); assert_eq!(cache.get_num_partial_frames(), 0); - cache.push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2])); + let _ = cache.push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2])); assert_eq!(cache.get_num_partial_frames(), 1); assert!(cache.poll().is_none()); - cache.push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5])); + let _ = cache.push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5])); assert!(cache.poll().is_none()); - cache.push(4, &frame_1, EventData::dummy_data(0, 5, &[6, 7, 8])); + let _ = cache.push(4, &frame_1, EventData::dummy_data(0, 5, &[6, 7, 8])); assert!(cache.poll().is_none()); - cache.push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11])); + let _ = cache.push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11])); { let frame = cache.poll().unwrap(); @@ -200,15 +200,15 @@ mod test { assert!(cache.poll().is_none()); - cache.push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2])); + let _ = cache.push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2])); assert!(cache.poll().is_none()); - cache.push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5])); + let _ = cache.push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5])); assert!(cache.poll().is_none()); - cache.push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11])); + let _ = cache.push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11])); assert!(cache.poll().is_none()); @@ -270,11 +270,11 @@ mod test { assert_eq!(cache.frames.len(), 0); assert!(cache.poll().is_none()); - cache.push(1, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2])); + let _ = cache.push(1, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2])); assert_eq!(cache.frames.len(), 1); assert!(cache.poll().is_none()); - cache.push(2, &frame_2, EventData::dummy_data(0, 5, &[0, 1, 2])); + let _ = cache.push(2, &frame_2, EventData::dummy_data(0, 5, &[0, 1, 2])); assert_eq!(cache.frames.len(), 1); assert!(cache.poll().is_some()); } From 157eb6fde9fa83ef4f7569c55552538daa21a9c3 Mon Sep 17 00:00:00 2001 From: Modularius Date: Tue, 4 Feb 2025 11:35:35 +0000 Subject: [PATCH 6/7] Updated tests --- digitiser-aggregator/src/frame/cache.rs | 44 ++++++++++++++++++++----- 1 file changed, 35 insertions(+), 9 deletions(-) diff --git a/digitiser-aggregator/src/frame/cache.rs b/digitiser-aggregator/src/frame/cache.rs index a27d130e7..9372e35e3 100644 --- a/digitiser-aggregator/src/frame/cache.rs +++ b/digitiser-aggregator/src/frame/cache.rs @@ -139,20 +139,20 @@ mod test { assert!(cache.poll().is_none()); assert_eq!(cache.get_num_partial_frames(), 0); - let _ = cache.push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2])); + assert!(cache.push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2])).is_ok()); assert_eq!(cache.get_num_partial_frames(), 1); assert!(cache.poll().is_none()); - let _ = cache.push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5])); + assert!(cache.push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5])).is_ok()); assert!(cache.poll().is_none()); - let _ = cache.push(4, &frame_1, EventData::dummy_data(0, 5, &[6, 7, 8])); + assert!(cache.push(4, &frame_1, EventData::dummy_data(0, 5, &[6, 7, 8])).is_ok()); assert!(cache.poll().is_none()); - let _ = cache.push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11])); + assert!(cache.push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11])).is_ok()); { let frame = cache.poll().unwrap(); @@ -200,15 +200,15 @@ mod test { assert!(cache.poll().is_none()); - let _ = cache.push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2])); + assert!(cache.push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2])).is_ok()); assert!(cache.poll().is_none()); - let _ = cache.push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5])); + assert!(cache.push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5])).is_ok()); assert!(cache.poll().is_none()); - let _ = cache.push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11])); + assert!(cache.push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11])).is_ok()); assert!(cache.poll().is_none()); @@ -242,6 +242,32 @@ mod test { assert!(cache.poll().is_none()); } + #[tokio::test] + async fn one_frame_in_one_frame_out_missing_digitiser_and_late_message_timeout() { + let mut cache = FrameCache::::new(Duration::from_millis(100), vec![0, 1, 4, 8]); + + let frame_1 = FrameMetadata { + timestamp: Utc::now(), + period_number: 1, + protons_per_pulse: 8, + running: true, + frame_number: 1728, + veto_flags: 4, + }; + assert!(cache.push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2])).is_ok()); + assert!(cache.push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5])).is_ok()); + assert!(cache.push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11])).is_ok()); + + tokio::time::sleep(Duration::from_millis(105)).await; + + let _ = cache.poll().unwrap(); + + // This call to push should return an error + assert!(cache.push(4, &frame_1, EventData::dummy_data(0, 5, &[6, 7, 8])).is_err()); + } + + + #[test] fn test_metadata_equality() { let mut cache = FrameCache::::new(Duration::from_millis(100), vec![1, 2]); @@ -270,11 +296,11 @@ mod test { assert_eq!(cache.frames.len(), 0); assert!(cache.poll().is_none()); - let _ = cache.push(1, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2])); + assert!(cache.push(1, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2])).is_ok()); assert_eq!(cache.frames.len(), 1); assert!(cache.poll().is_none()); - let _ = cache.push(2, &frame_2, EventData::dummy_data(0, 5, &[0, 1, 2])); + assert!(cache.push(2, &frame_2, EventData::dummy_data(0, 5, &[0, 1, 2])).is_ok()); assert_eq!(cache.frames.len(), 1); assert!(cache.poll().is_some()); } From 8c77af768f31ae30896ac2d34455445a8b1720d2 Mon Sep 17 00:00:00 2001 From: Modularius Date: Tue, 4 Feb 2025 11:36:24 +0000 Subject: [PATCH 7/7] Formatting --- digitiser-aggregator/src/frame/cache.rs | 56 ++++++++++++++++++------- 1 file changed, 40 insertions(+), 16 deletions(-) diff --git a/digitiser-aggregator/src/frame/cache.rs b/digitiser-aggregator/src/frame/cache.rs index 9372e35e3..944d633e3 100644 --- a/digitiser-aggregator/src/frame/cache.rs +++ b/digitiser-aggregator/src/frame/cache.rs @@ -139,20 +139,28 @@ mod test { assert!(cache.poll().is_none()); assert_eq!(cache.get_num_partial_frames(), 0); - assert!(cache.push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2])).is_ok()); + assert!(cache + .push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2])) + .is_ok()); assert_eq!(cache.get_num_partial_frames(), 1); assert!(cache.poll().is_none()); - assert!(cache.push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5])).is_ok()); + assert!(cache + .push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5])) + .is_ok()); assert!(cache.poll().is_none()); - assert!(cache.push(4, &frame_1, EventData::dummy_data(0, 5, &[6, 7, 8])).is_ok()); + assert!(cache + .push(4, &frame_1, EventData::dummy_data(0, 5, &[6, 7, 8])) + .is_ok()); assert!(cache.poll().is_none()); - assert!(cache.push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11])).is_ok()); + assert!(cache + .push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11])) + .is_ok()); { let frame = cache.poll().unwrap(); @@ -200,15 +208,21 @@ mod test { assert!(cache.poll().is_none()); - assert!(cache.push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2])).is_ok()); + assert!(cache + .push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2])) + .is_ok()); assert!(cache.poll().is_none()); - assert!(cache.push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5])).is_ok()); + assert!(cache + .push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5])) + .is_ok()); assert!(cache.poll().is_none()); - assert!(cache.push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11])).is_ok()); + assert!(cache + .push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11])) + .is_ok()); assert!(cache.poll().is_none()); @@ -254,20 +268,26 @@ mod test { frame_number: 1728, veto_flags: 4, }; - assert!(cache.push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2])).is_ok()); - assert!(cache.push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5])).is_ok()); - assert!(cache.push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11])).is_ok()); + assert!(cache + .push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2])) + .is_ok()); + assert!(cache + .push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5])) + .is_ok()); + assert!(cache + .push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11])) + .is_ok()); tokio::time::sleep(Duration::from_millis(105)).await; let _ = cache.poll().unwrap(); - + // This call to push should return an error - assert!(cache.push(4, &frame_1, EventData::dummy_data(0, 5, &[6, 7, 8])).is_err()); + assert!(cache + .push(4, &frame_1, EventData::dummy_data(0, 5, &[6, 7, 8])) + .is_err()); } - - #[test] fn test_metadata_equality() { let mut cache = FrameCache::::new(Duration::from_millis(100), vec![1, 2]); @@ -296,11 +316,15 @@ mod test { assert_eq!(cache.frames.len(), 0); assert!(cache.poll().is_none()); - assert!(cache.push(1, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2])).is_ok()); + assert!(cache + .push(1, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2])) + .is_ok()); assert_eq!(cache.frames.len(), 1); assert!(cache.poll().is_none()); - assert!(cache.push(2, &frame_2, EventData::dummy_data(0, 5, &[0, 1, 2])).is_ok()); + assert!(cache + .push(2, &frame_2, EventData::dummy_data(0, 5, &[0, 1, 2])) + .is_ok()); assert_eq!(cache.frames.len(), 1); assert!(cache.poll().is_some()); }