Skip to content

Conversation

@albertlockett
Copy link
Member

Opening this as draft for now. There are some addition test cases and cleanup I want to do, but I wanted to first ensure my handling of the Pdata Contexts was correct.


Closes: #1784

Now that we have route_to in OPL, in combination with if/else, this can create a scenario where we split the batch.

logs |
if (severity_text == "ERROR") {
  route_to "out_port1"
}
// implicit collect everything that didn't go in "if" branch

A pipeline like this would emit two batches:

  1. "ERROR" logs on the processor's "out_port1"
  2. all other logs on the default out port in two

If the batch had subscribers, when we process a pdata we must keep the context for the inbound batch, and create new contexts for the outbound batches. When all the outbound batches Ack/Nack'd, we must then Ack/Nack the inbound context.

This PR adds a Contexts type for juggling the inbound/outbound contexts and updates the transform processor to manage contexts + Ack/NAck correctly.

@albertlockett albertlockett requested a review from a team as a code owner January 15, 2026 21:27
@github-actions github-actions bot added the rust Pull requests that update Rust code label Jan 15, 2026
@codecov
Copy link

codecov bot commented Jan 15, 2026

Codecov Report

❌ Patch coverage is 90.46322% with 35 lines in your changes missing coverage. Please review.
✅ Project coverage is 83.56%. Comparing base (4b64646) to head (0bfc412).
⚠️ Report is 15 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1798      +/-   ##
==========================================
- Coverage   84.22%   83.56%   -0.67%     
==========================================
  Files         486      506      +20     
  Lines      140885   147598    +6713     
==========================================
+ Hits       118657   123336    +4679     
- Misses      21694    23728    +2034     
  Partials      534      534              
Components Coverage Δ
otap-dataflow 84.36% <90.46%> (-1.10%) ⬇️
query_abstraction 80.61% <ø> (ø)
query_engine 90.52% <ø> (-0.01%) ⬇️
syslog_cef_receivers ∅ <ø> (∅)
otel-arrow-go 53.50% <ø> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@albertlockett
Copy link
Member Author

@jmacd could I ask you for a review on this :)? I was using the batch_processor Ack/Nack implementation as inspiration.

One thing in particular I'd like to ensure is correct is the behaviour of juggling the contexts.

Basically when we find we've split the batch

  1. For the inbound batch, put it in the inbounds slot map in Contexts which contains number of outbound batches + the original inbound context
  2. For each outbound batch, put it in the outbounds slot map in Contexts with a value pointing back at the inbounds key.
  3. Subscribe the outbound context using call data derived from the the outbounds slotmap key

Then when we receive an Ack/Nack:

  1. Lookup the inbound key from outbounds slot map using the key from the calldata. Clear the outbounds slot map
  2. decrement the count of outbounds in the inbounds slot map. If the count is zero, then we Ack/Nack the original inbound context.

I'm also curious if, when testing, is this the correct way to setup test Ack messages using the outbound contexts to simulate a downstream component having Ack/Nacked the batch:

// now we'll Ack the outbound messages and ensure that we eventually emit an ack
// for the inbound message
let call_data = outbound_context1.current_calldata().unwrap();
let mut ack1 = AckMsg::new(OtapPdata::new(
outbound_context1,
OtapPayload::empty(SignalType::Logs),
));
ack1.calldata = call_data;
let call_data = outbound_context2.current_calldata().unwrap();
let mut ack2 = AckMsg::new(OtapPdata::new(
outbound_context2,
OtapPayload::empty(SignalType::Logs),
));
ack2.calldata = call_data;
let call_data = outbound_context3.current_calldata().unwrap();
let mut ack3 = AckMsg::new(OtapPdata::new(
outbound_context3,
OtapPayload::empty(SignalType::Logs),
));
ack3.calldata = call_data;

I realize I'm asking you to reverse engineer a lot of code here, so happy to walk through on teams if it's easier :)

Copy link
Contributor

@jmacd jmacd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Looks like maybe the new code could be applied to the batch_processor in a future PR, maybe.

@jmacd
Copy link
Contributor

jmacd commented Jan 16, 2026

@albertlockett I think you want to use a call to Context::next_ack, for the test section you quoted. This does what the effect handler would have done when the recipient responded with an Ack. See how batch_processor.rs tests use Context::next_ack, basically.

@albertlockett
Copy link
Member Author

@albertlockett I think you want to use a call to Context::next_ack, for the test section you quoted. This does what the effect handler would have done when the recipient responded with an Ack. See how batch_processor.rs tests use Context::next_ack, basically.

thanks @jmacd, that worked! made this change in c031e62

@albertlockett
Copy link
Member Author

Looks good. Looks like maybe the new code could be applied to the batch_processor in a future PR, maybe.

Yeah I think we could reuse the Contexts with a little bit of refactoring. The gap with the current implementation is that it doesn't expect an outbound batch to be associated with more than one inbound batch (because currently we only split, we don't combine batches). I imagine eventually this change will need to be made b/c we'd want OPL to support batching, so once that is in place we could reuse this in batch_processor.

}

pub fn set_failed(&mut self, outbound_key: Key, error_reason: String) {
if let Some(inbound) = self.inbound.get_mut(outbound_key) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we first lookup the outbound to get the inbound_key, as done in clear_outbound (line 115) ?

/// Ack/NAck'd
pub fn clear_outbound(&mut self, outbound_key: Key) -> Option<(Context, Option<String>)> {
let inbound_key = {
let outbound = self.outbound.get(outbound_key)?;
Copy link
Member

@lalitb lalitb Jan 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I am missing something, but seems we get the slot, but never remove it from self.outbound ?

// insert outbound
let outbound = Outbound { inbound_key };
self.outbound
.allocate(|| (outbound, ()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If allocate fails, we return early but already incremented num_outbound at line 88 - this would leave the inbound context stuck.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

rust Pull requests that update Rust code

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

Columnar Query Engine route to operator

3 participants