Skip to content

feat: add udf shared helpers and resilient error persistence#3459

Merged
adarsh0728 merged 7 commits into
mainfrom
reconnect-helper
Jun 11, 2026
Merged

feat: add udf shared helpers and resilient error persistence#3459
adarsh0728 merged 7 commits into
mainfrom
reconnect-helper

Conversation

@adarsh0728

@adarsh0728 adarsh0728 commented Jun 8, 2026

Copy link
Copy Markdown
Member

Summary

Introduces shared helpers, UDF-specific error classification, and safer runtime error persistence. The actual source/transformer/map/sink reconnect behaviour will be wired in follow-up PRs.

Related Issues

#3368

What Changed

  • Added Error::UdfRedrive for UDF stream failures that should be redriven after reconnect instead of treated as fatal.
  • Added shared create_*_client helpers for source, sink, source transformer, and mapper UDF clients.
    • Create a fresh UDS-backed gRPC channel.
    • Apply the configured gRPC max message size.
    • Wait for the typed client to become ready.
    • Re-read SDK server info from the running sidecar.
  • Updated UDF startup paths to use the same shared client creation helpers that future reconnect paths will use.
  • Updated runtime error persistence to support repeated and concurrent UDF errors.
    • Removed single-error persistence behaviour.
    • Added collision-safe filenames: <unix_nanos>--numa.{tmp,json}.

Behaviour Change

  • This PR does not yet change source/transformer/map/sink runtime behaviour. UDF failures are not fully reconnected in place until follow-up PRs wire these helpers into the UDF clients.
  • The behaviour that does change in this PR is runtime error persistence

Signed-off-by: adarsh0728 <gooneriitk@gmail.com>
Signed-off-by: adarsh0728 <gooneriitk@gmail.com>
@codecov

codecov Bot commented Jun 8, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 87.19008% with 62 lines in your changes missing coverage. Please review.
✅ Project coverage is 82.66%. Comparing base (9bd8b83) to head (e0f47d7).

Files with missing lines Patch % Lines
rust/numaflow-core/src/shared/create_components.rs 55.93% 26 Missing ⚠️
rust/numaflow-core/src/shared/grpc.rs 88.26% 25 Missing ⚠️
rust/numaflow-monitor/src/runtime.rs 94.58% 11 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3459      +/-   ##
==========================================
- Coverage   82.68%   82.66%   -0.02%     
==========================================
  Files         307      307              
  Lines       77569    77912     +343     
==========================================
+ Hits        64135    64407     +272     
- Misses      12876    12948      +72     
+ Partials      558      557       -1     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Comment thread rust/numaflow-core/src/shared/grpc.rs Outdated
Comment on lines +219 to +235
/// Reconnects to a user-defined sink sidecar. See [`reconnect_source`] for the reconnect sequence.
#[allow(dead_code)]
pub(crate) async fn reconnect_sink(
socket_path: PathBuf,
server_info_path: PathBuf,
cln_token: CancellationToken,
grpc_max_message_size: usize,
retry_interval: Duration,
) -> error::Result<SinkClient<Channel>> {
let channel = create_rpc_channel_with_interval(socket_path, retry_interval).await?;
let mut client = SinkClient::new(channel)
.max_encoding_message_size(grpc_max_message_size)
.max_decoding_message_size(grpc_max_message_size);
wait_until_sink_ready(&cln_token, &mut client).await?;
let _server_info = sdk_server_info(server_info_path, cln_token.clone()).await?;
Ok(client)
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why a separate method for reconnect? can we just use the existing connect method or call it create_sink_client? similarly for other clients as well.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will use create_sink_client and similar naming for other components

@adarsh0728 adarsh0728 Jun 9, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added different clients (will be used at startup and reconnect) , may be can add a common helper like create_udf_client to avoid duplication of logic inside those helpers.. but current helpers look simple and readabale.

Signed-off-by: adarsh0728 <gooneriitk@gmail.com>
@adarsh0728 adarsh0728 changed the title feat: add udf reconnect helpers and resilient error persistence feat: add udf shared helpers and resilient error persistence Jun 9, 2026
@adarsh0728 adarsh0728 self-assigned this Jun 9, 2026
@adarsh0728 adarsh0728 added this to the 1.9 milestone Jun 9, 2026
@adarsh0728 adarsh0728 marked this pull request as ready for review June 10, 2026 08:59
@adarsh0728 adarsh0728 merged commit b8063fb into main Jun 11, 2026
27 checks passed
@adarsh0728 adarsh0728 deleted the reconnect-helper branch June 11, 2026 14:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants