Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions digitiser-aggregator/src/frame/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ where
digitiser_id: DigitizerId,
metadata: &FrameMetadata,
data: D,
) {
) -> bool {
if let Some(latest_timestamp_dispatched) = self.latest_timestamp_dispatched {
if metadata.timestamp <= latest_timestamp_dispatched {
warn!("Frame's timestamp earlier than or equal to the latest frame dispatched: {0} <= {1}", metadata.timestamp, latest_timestamp_dispatched);
return;
return false;
}
}
let frame = {
Expand Down Expand Up @@ -85,6 +85,8 @@ where
}) {
warn!("Frame span linking failed {e}")
}

true
}

pub(crate) fn poll(&mut self) -> Option<AggregatedFrame<D>> {
Expand Down
4 changes: 3 additions & 1 deletion digitiser-aggregator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ async fn process_kafka_message(
metadata_protons_per_pulse = tracing::field::Empty,
metadata_running = tracing::field::Empty,
num_cached_frames = cache.get_num_partial_frames(),
is_discarded,
))]
async fn process_digitiser_event_list_message(
channel_send: &AggregatedFrameToBufferSender,
Expand All @@ -269,9 +270,10 @@ async fn process_digitiser_event_list_message(
debug!("Event packet: metadata: {:?}", msg.metadata());

// Push the current digitiser message to the frame cache, possibly creating a new partial frame
cache.push(msg.digitizer_id(), &metadata, msg.into());
let success = cache.push(msg.digitizer_id(), &metadata, msg.into());

record_metadata_fields_to_span!(&metadata, tracing::Span::current());
tracing::Span::current().record("is_discarded", !success);

cache_poll(channel_send, cache).await?;
}
Expand Down