Skip to content

Commit b8063fb

Browse files
authored
feat: add udf shared helpers and resilient error persistence (#3459)
1 parent 9bd8b83 commit b8063fb

5 files changed

Lines changed: 779 additions & 167 deletions

File tree

rust/numaflow-core/src/error.rs

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1+
use std::error::Error as StdError;
2+
use std::io::ErrorKind;
3+
14
use thiserror::Error;
5+
use tonic::Code;
26

37
pub type Result<T> = std::result::Result<T, Error>;
48

@@ -37,6 +41,11 @@ pub enum Error {
3741
#[error("gRPC Error - {0}")]
3842
Grpc(Box<tonic::Status>),
3943

44+
/// A UDF stream broke and the in-flight call should be redriven after the reconnect owner
45+
/// publishes its readiness signal.
46+
#[error("UDF redrive - {0}")]
47+
UdfRedrive(Box<tonic::Status>),
48+
4049
#[error("Config Error - {0}")]
4150
Config(String),
4251

@@ -93,3 +102,103 @@ impl From<numaflow_shared::error::Error> for Error {
93102
Error::Shared(value)
94103
}
95104
}
105+
106+
const UDF_TRANSPORT_IO_ERROR_KINDS: &[ErrorKind] = &[
107+
ErrorKind::BrokenPipe,
108+
ErrorKind::ConnectionReset,
109+
ErrorKind::NotConnected,
110+
];
111+
112+
/// Classifies whether a `tonic::Status` represents a UDF transport break.
113+
///
114+
/// A status is considered transport-level when:
115+
/// - its `Code` is `Unavailable`, `Cancelled`, `Aborted`, or `DeadlineExceeded`; or
116+
/// - its underlying I/O error chain contains a `BrokenPipe`, `ConnectionReset`, or `NotConnected`
117+
/// kind. This last case captures `Code::Unknown` / `Code::Internal` wrappers around hyper/h2
118+
/// transport errors that don't classify cleanly by status code alone.
119+
///
120+
/// Callers can use this to decide when to convert a UDF stream error into [`Error::UdfRedrive`].
121+
/// It does not classify all UDF application errors that should be handled without exiting `numa`.
122+
#[allow(dead_code)]
123+
pub(crate) fn is_udf_transport_failure(status: &tonic::Status) -> bool {
124+
matches!(
125+
status.code(),
126+
Code::Unavailable | Code::Cancelled | Code::Aborted | Code::DeadlineExceeded
127+
) || has_io_kind_in_chain(status, UDF_TRANSPORT_IO_ERROR_KINDS)
128+
}
129+
130+
/// Walks the `Error::source` chain looking for a `std::io::Error` whose `kind()` matches any of
131+
/// `kinds`.
132+
#[allow(dead_code)]
133+
pub(crate) fn has_io_kind_in_chain(err: &(dyn StdError + 'static), kinds: &[ErrorKind]) -> bool {
134+
let mut current: Option<&(dyn StdError + 'static)> = Some(err);
135+
while let Some(e) = current {
136+
if let Some(ioe) = e.downcast_ref::<std::io::Error>()
137+
&& kinds.contains(&ioe.kind())
138+
{
139+
return true;
140+
}
141+
current = e.source();
142+
}
143+
false
144+
}
145+
146+
#[cfg(test)]
147+
mod tests {
148+
use super::*;
149+
use std::io;
150+
use tonic::Code;
151+
152+
#[test]
153+
fn transport_codes_classify_as_transport() {
154+
for code in [
155+
Code::Unavailable,
156+
Code::Cancelled,
157+
Code::Aborted,
158+
Code::DeadlineExceeded,
159+
] {
160+
let status = tonic::Status::new(code, "x");
161+
assert!(
162+
is_udf_transport_failure(&status),
163+
"expected {code:?} to be transport-classified"
164+
);
165+
}
166+
}
167+
168+
#[test]
169+
fn non_transport_codes_do_not_classify_as_transport() {
170+
for code in [
171+
Code::InvalidArgument,
172+
Code::FailedPrecondition,
173+
Code::Unimplemented,
174+
Code::DataLoss,
175+
] {
176+
let status = tonic::Status::new(code, "x");
177+
assert!(
178+
!is_udf_transport_failure(&status),
179+
"expected {code:?} not to be transport-classified"
180+
);
181+
}
182+
}
183+
184+
#[test]
185+
fn io_kind_in_chain_finds_direct_match() {
186+
let io_err = io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe");
187+
assert!(has_io_kind_in_chain(&io_err, &[ErrorKind::BrokenPipe]));
188+
assert!(!has_io_kind_in_chain(&io_err, &[ErrorKind::TimedOut]));
189+
}
190+
191+
#[derive(Debug, thiserror::Error)]
192+
#[error("wrapped I/O error")]
193+
struct WrappedIoError(#[source] io::Error);
194+
195+
#[test]
196+
fn io_kind_in_chain_finds_nested_match() {
197+
let io_err = io::Error::new(io::ErrorKind::ConnectionReset, "connection reset");
198+
let wrapped = WrappedIoError(io_err);
199+
assert!(has_io_kind_in_chain(
200+
&wrapped,
201+
&[ErrorKind::ConnectionReset]
202+
));
203+
}
204+
}

rust/numaflow-core/src/shared/create_components.rs

Lines changed: 63 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::path::PathBuf;
12
use std::time::Duration;
23

34
use crate::config::components::reduce::UnalignedWindowType;
@@ -19,7 +20,6 @@ use crate::reduce::reducer::unaligned::user_defined::UserDefinedUnalignedReduce;
1920
use crate::reduce::reducer::unaligned::user_defined::accumulator::UserDefinedAccumulator;
2021
use crate::reduce::reducer::unaligned::user_defined::session::UserDefinedSessionReduce;
2122
use crate::shared::grpc;
22-
use crate::shared::grpc::{create_rpc_channel, wait_until_source_ready};
2323
use crate::sinker::sink::serve::ServingStore;
2424
use crate::sinker::sink::{SinkClientType, SinkWriter, SinkWriterBuilder};
2525
use crate::source::Source;
@@ -44,16 +44,9 @@ use numaflow_pb::clients::accumulator::accumulator_client::AccumulatorClient;
4444
use numaflow_pb::clients::map::map_client::MapClient;
4545
use numaflow_pb::clients::reduce::reduce_client::ReduceClient;
4646
use numaflow_pb::clients::sessionreduce::session_reduce_client::SessionReduceClient;
47-
use numaflow_pb::clients::sink::sink_client::SinkClient;
48-
use numaflow_pb::clients::source::source_client::SourceClient;
49-
use numaflow_pb::clients::sourcetransformer::source_transform_client::SourceTransformClient;
50-
use numaflow_shared::server_info::{
51-
ContainerType, Protocol, ServerInfo, sdk_server_info, supports_nack,
52-
};
47+
use numaflow_shared::server_info::{ContainerType, Protocol, sdk_server_info, supports_nack};
5348
use numaflow_sqs::sink::SqsSinkBuilder;
54-
use std::path::PathBuf;
5549
use tokio_util::sync::CancellationToken;
56-
use tonic::transport::Channel;
5750

5851
/// Creates a sink writer based on the configuration
5952
#[allow(clippy::too_many_arguments)]
@@ -106,8 +99,14 @@ async fn append_primary_sink_client(
10699
}
107100
SinkType::Serve => SinkWriterBuilder::new(batch_size, read_timeout, SinkClientType::Serve),
108101
SinkType::UserDefined(ud_config) => {
109-
let sink_server_info =
110-
sdk_server_info(ud_config.server_info_path.clone(), cln_token.clone()).await?;
102+
let (sink_grpc_client, sink_server_info) = grpc::create_sink_client(
103+
PathBuf::from(ud_config.socket_path.clone()),
104+
PathBuf::from(ud_config.server_info_path.clone()),
105+
cln_token.clone(),
106+
ud_config.grpc_max_message_size,
107+
grpc::DEFAULT_RECONNECT_INTERVAL,
108+
)
109+
.await?;
111110

112111
let metric_labels = metrics::sdk_info_labels(
113112
config::get_component_type().to_string(),
@@ -122,12 +121,6 @@ async fn append_primary_sink_client(
122121
.get_or_create(&metric_labels)
123122
.set(1);
124123

125-
let mut sink_grpc_client = SinkClient::new(
126-
grpc::create_rpc_channel(ud_config.socket_path.clone().into()).await?,
127-
)
128-
.max_encoding_message_size(ud_config.grpc_max_message_size)
129-
.max_decoding_message_size(ud_config.grpc_max_message_size);
130-
grpc::wait_until_sink_ready(cln_token, &mut sink_grpc_client).await?;
131124
SinkWriterBuilder::new(
132125
batch_size,
133126
read_timeout,
@@ -169,8 +162,14 @@ async fn append_fallback_sink_client(
169162
SinkType::Serve => sink_writer_builder.fb_sink_client(SinkClientType::Serve),
170163
SinkType::Blackhole(_) => sink_writer_builder.fb_sink_client(SinkClientType::Blackhole),
171164
SinkType::UserDefined(ud_config) => {
172-
let fb_server_info =
173-
sdk_server_info(ud_config.server_info_path.clone(), cln_token.clone()).await?;
165+
let (sink_grpc_client, fb_server_info) = grpc::create_sink_client(
166+
PathBuf::from(ud_config.socket_path.clone()),
167+
PathBuf::from(ud_config.server_info_path.clone()),
168+
cln_token.clone(),
169+
ud_config.grpc_max_message_size,
170+
grpc::DEFAULT_RECONNECT_INTERVAL,
171+
)
172+
.await?;
174173

175174
let metric_labels = metrics::sdk_info_labels(
176175
config::get_component_type().to_string(),
@@ -185,13 +184,6 @@ async fn append_fallback_sink_client(
185184
.get_or_create(&metric_labels)
186185
.set(1);
187186

188-
let mut sink_grpc_client = SinkClient::new(
189-
grpc::create_rpc_channel(ud_config.socket_path.clone().into()).await?,
190-
)
191-
.max_encoding_message_size(ud_config.grpc_max_message_size)
192-
.max_decoding_message_size(ud_config.grpc_max_message_size);
193-
grpc::wait_until_sink_ready(cln_token, &mut sink_grpc_client).await?;
194-
195187
sink_writer_builder
196188
.fb_sink_client(SinkClientType::UserDefined(sink_grpc_client.clone()))
197189
}
@@ -223,8 +215,14 @@ async fn append_ons_sink_client(
223215
sink_writer_builder.on_success_sink_client(SinkClientType::Blackhole)
224216
}
225217
SinkType::UserDefined(ud_config) => {
226-
let os_server_info =
227-
sdk_server_info(ud_config.server_info_path.clone(), cln_token.clone()).await?;
218+
let (sink_grpc_client, os_server_info) = grpc::create_sink_client(
219+
PathBuf::from(ud_config.socket_path.clone()),
220+
PathBuf::from(ud_config.server_info_path.clone()),
221+
cln_token.clone(),
222+
ud_config.grpc_max_message_size,
223+
grpc::DEFAULT_RECONNECT_INTERVAL,
224+
)
225+
.await?;
228226

229227
let metric_labels = metrics::sdk_info_labels(
230228
config::get_component_type().to_string(),
@@ -239,13 +237,6 @@ async fn append_ons_sink_client(
239237
.get_or_create(&metric_labels)
240238
.set(1);
241239

242-
let mut sink_grpc_client = SinkClient::new(
243-
grpc::create_rpc_channel(ud_config.socket_path.clone().into()).await?,
244-
)
245-
.max_encoding_message_size(ud_config.grpc_max_message_size)
246-
.max_decoding_message_size(ud_config.grpc_max_message_size);
247-
grpc::wait_until_sink_ready(cln_token, &mut sink_grpc_client).await?;
248-
249240
sink_writer_builder
250241
.on_success_sink_client(SinkClientType::UserDefined(sink_grpc_client.clone()))
251242
}
@@ -278,8 +269,14 @@ pub(crate) async fn create_transformer(
278269
&& let config::components::transformer::TransformerType::UserDefined(ud_transformer) =
279270
&transformer_config.transformer_type
280271
{
281-
let server_info =
282-
sdk_server_info(ud_transformer.server_info_path.clone(), cln_token.clone()).await?;
272+
let (transformer_grpc_client, server_info) = grpc::create_transformer_client(
273+
PathBuf::from(ud_transformer.socket_path.clone()),
274+
PathBuf::from(ud_transformer.server_info_path.clone()),
275+
cln_token.clone(),
276+
ud_transformer.grpc_max_message_size,
277+
grpc::DEFAULT_RECONNECT_INTERVAL,
278+
)
279+
.await?;
283280
let metric_labels = metrics::sdk_info_labels(
284281
config::get_component_type().to_string(),
285282
config::get_vertex_name().to_string(),
@@ -292,12 +289,6 @@ pub(crate) async fn create_transformer(
292289
.get_or_create(&metric_labels)
293290
.set(1);
294291

295-
let mut transformer_grpc_client = SourceTransformClient::new(
296-
grpc::create_rpc_channel(ud_transformer.socket_path.clone().into()).await?,
297-
)
298-
.max_encoding_message_size(ud_transformer.grpc_max_message_size)
299-
.max_decoding_message_size(ud_transformer.grpc_max_message_size);
300-
grpc::wait_until_transformer_ready(&cln_token, &mut transformer_grpc_client).await?;
301292
return Ok(Some(
302293
Transformer::new(
303294
batch_size,
@@ -383,13 +374,14 @@ pub(crate) async fn create_mapper(
383374
}
384375
};
385376

386-
let mut map_grpc_client = MapClient::new(
387-
grpc::create_rpc_channel(config.socket_path.clone().into()).await?,
377+
let (map_grpc_client, _) = grpc::create_mapper_client(
378+
PathBuf::from(config.socket_path.clone()),
379+
PathBuf::from(config.server_info_path.clone()),
380+
cln_token.clone(),
381+
config.grpc_max_message_size,
382+
grpc::DEFAULT_RECONNECT_INTERVAL,
388383
)
389-
.max_encoding_message_size(config.grpc_max_message_size)
390-
.max_decoding_message_size(config.grpc_max_message_size);
391-
392-
grpc::wait_until_mapper_ready(&cln_token, &mut map_grpc_client).await?;
384+
.await?;
393385
Ok(MapHandle::new(
394386
server_info.get_map_mode().unwrap_or(MapMode::Unary),
395387
batch_size,
@@ -560,8 +552,26 @@ pub async fn create_source<C: NumaflowTypeConfig>(
560552
}
561553

562554
SourceType::UserDefined(user_defined_config) => {
563-
let (source_client, server_info) =
564-
create_source_client(user_defined_config, cln_token.clone()).await?;
555+
let (source_client, server_info) = grpc::create_source_client(
556+
PathBuf::from(user_defined_config.socket_path.clone()),
557+
PathBuf::from(user_defined_config.server_info_path.clone()),
558+
cln_token.clone(),
559+
user_defined_config.grpc_max_message_size,
560+
grpc::DEFAULT_RECONNECT_INTERVAL,
561+
)
562+
.await?;
563+
let metric_labels = metrics::sdk_info_labels(
564+
config::get_component_type().to_string(),
565+
config::get_vertex_name().to_string(),
566+
server_info.language.to_string(),
567+
server_info.version.clone(),
568+
ContainerType::Sourcer.to_string(),
569+
);
570+
metrics::global_metrics()
571+
.sdk_info
572+
.get_or_create(&metric_labels)
573+
.set(1);
574+
565575
// Check if the SDK version supports nack functionality
566576
let supports_nack = supports_nack(&server_info.version, server_info.language);
567577
let (ud_read, ud_ack, ud_lag) = new_source(
@@ -588,37 +598,6 @@ pub async fn create_source<C: NumaflowTypeConfig>(
588598
}
589599
}
590600

591-
/// Creates a source client from user-defined config
592-
/// Returns both the client and server info for version-aware functionality
593-
async fn create_source_client(
594-
user_defined_config: &config::components::source::UserDefinedConfig,
595-
cln_token: CancellationToken,
596-
) -> error::Result<(SourceClient<Channel>, ServerInfo)> {
597-
let server_info = sdk_server_info(
598-
user_defined_config.server_info_path.clone(),
599-
cln_token.clone(),
600-
)
601-
.await?;
602-
603-
let metric_labels = metrics::sdk_info_labels(
604-
config::get_component_type().to_string(),
605-
config::get_vertex_name().to_string(),
606-
server_info.language.to_string(),
607-
server_info.version.clone(),
608-
ContainerType::Sourcer.to_string(),
609-
);
610-
metrics::global_metrics()
611-
.sdk_info
612-
.get_or_create(&metric_labels)
613-
.set(1);
614-
615-
let channel =
616-
create_rpc_channel(PathBuf::from(user_defined_config.socket_path.clone())).await?;
617-
let mut client = SourceClient::new(channel);
618-
wait_until_source_ready(&cln_token, &mut client).await?;
619-
Ok((client, server_info))
620-
}
621-
622601
/// Creates a user-defined aligned reducer client
623602
pub(crate) async fn create_aligned_reducer(
624603
reducer_config: config::components::reduce::AlignedReducerConfig,
@@ -643,7 +622,8 @@ pub(crate) async fn create_aligned_reducer(
643622
.set(1);
644623

645624
// Create gRPC channel
646-
let channel = create_rpc_channel(reducer_config.user_defined_config.socket_path.into()).await?;
625+
let channel =
626+
grpc::create_rpc_channel(reducer_config.user_defined_config.socket_path.into()).await?;
647627

648628
// Create client
649629
let client = UserDefinedAlignedReduce::new(

0 commit comments

Comments
 (0)