Skip to content

Commit 142bc67

Browse files
committed
chore: code review
Signed-off-by: Vigith Maurice <vigith@gmail.com>
1 parent 652f5d9 commit 142bc67

7 files changed

Lines changed: 40 additions & 22 deletions

File tree

rust/Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ all-tests:
1818
cargo test --features nats-tests,pulsar-tests,sqs-tests,kafka-tests,redis-tests --workspace --all
1919
cargo test --features global-state-tests test_map_with_panic
2020
cargo test --features global-state-tests test_batch_map_with_panic
21-
RUST_LOG=debug cargo test --features global-state-tests test_map_stream_with_panic -- --nocapture
21+
cargo test --features global-state-tests test_map_stream_with_panic
2222
cargo test --features global-state-tests test_transform_stream_with_panic
2323

2424
.PHONY: clean

rust/numaflow-core/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,5 @@ k8s-openapi = { version = "0.24.0", features = ["v1_32"] }
7575
numaflow-kafka = { workspace = true, features = ["kafka-tests-utils"] }
7676
numaflow-pulsar = { workspace = true, features = ["pulsar-tests-utils"] }
7777
serial_test = "3.3.1"
78-
tracing-subscriber = "0.3.20"
7978

8079
[build-dependencies]

rust/numaflow-core/src/mapper/map.rs

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,11 @@ enum ConcurrentMapper {
4444
}
4545

4646
/// MapperType is an enum to store the different types of mappers.
47-
/// Concurrent mappers (Unary/Stream) process messages individually with concurrency control.
48-
/// Batch mapper processes messages in batches sequentially.
4947
#[derive(Clone)]
5048
enum MapperType {
49+
/// Concurrent mappers (Unary/Stream) process messages individually with concurrency control.
5150
Concurrent(ConcurrentMapper),
51+
/// Batch mapper processes messages in batches sequentially.
5252
Batch(UserDefinedBatchMap),
5353
}
5454

@@ -205,7 +205,7 @@ impl MapHandle {
205205
.acquire_many_owned(self.concurrency as u32)
206206
.await
207207
.map_err(|e| Error::Mapper(format!("failed to acquire semaphore: {e}")))?;
208-
info!(status=?self.final_result, "Map component is completed with status");
208+
info!(status=?self.final_result, "Map component is completed");
209209

210210
// abort the shutdown handle since we are done processing, no need to wait for the
211211
// hard shutdown
@@ -409,13 +409,6 @@ fn update_udf_process_time_metric(is_mono_vertex: bool) {
409409
}
410410
}
411411

412-
/*
413-
t1 -> receiver dropped
414-
t2 -> map drained
415-
t3 -> written to the tx
416-
t4 -> hung
417-
*/
418-
419412
fn update_udf_write_only_metric(is_mono_vertex: bool) {
420413
if !is_mono_vertex {
421414
pipeline_metrics()
@@ -1055,10 +1048,9 @@ mod tests {
10551048
}
10561049
}
10571050

1058-
// #[cfg(feature = "global-state-tests")]
1051+
#[cfg(feature = "global-state-tests")]
10591052
#[tokio::test]
10601053
async fn test_batch_map_with_panic() -> Result<()> {
1061-
tracing_subscriber::fmt::init();
10621054
let cln_token = CancellationToken::new();
10631055
let (_shutdown_tx, shutdown_rx) = oneshot::channel();
10641056
let tmp_dir = TempDir::new().unwrap();

rust/numaflow-core/src/mapper/map/batch.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,19 @@ use tracing::error;
2323
/// Type alias for the batch response - raw results from the UDF
2424
pub(in crate::mapper) type BatchMapResponse = Vec<map::map_response::Result>;
2525

26-
/// Type aliases
26+
/// Type aliases for HashMap used to track the oneshot response sender for each request keyed by
27+
/// message id.
2728
type ResponseSenderMap = HashMap<String, oneshot::Sender<Result<BatchMapResponse>>>;
2829

30+
/// Shared state for tracking batch map senders between the sender and the receiver tasks.
31+
/// We have BiDi gRPC stream so we have 2 different set of tasks for sending and receiving.
2932
#[derive(Default)]
3033
pub(in crate::mapper) struct BatchSenderMapState {
34+
/// Map of oneshot response senders keyed by message id.
3135
map: ResponseSenderMap,
36+
/// Flag to indicate whether the rx task has closed the stream and cleared the `map`.
37+
/// This is because `tx.send()` could return `Ok()` even after the receiver task has closed the
38+
/// stream.
3239
closed: bool,
3340
}
3441

rust/numaflow-core/src/mapper/map/stream.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use tokio_util::sync::CancellationToken;
1313
use tokio_util::task::AbortOnDropHandle;
1414
use tonic::Streaming;
1515
use tonic::transport::Channel;
16-
use tracing::error;
16+
use tracing::{error, warn};
1717

1818
use super::{
1919
ParentMessageInfo, STREAMING_MAP_RESP_CHANNEL_SIZE, SharedMapTaskContext, UserDefinedMessage,
@@ -24,11 +24,19 @@ use super::{
2424
/// Type alias for the stream response - raw results from the UDF
2525
pub(in crate::mapper) type StreamMapResponse = Vec<map::map_response::Result>;
2626

27+
/// Type aliases for HashMap used to track the oneshot response sender for each request keyed by
28+
/// message id.
2729
type StreamResponseSenderMap = HashMap<String, mpsc::Sender<Result<StreamMapResponse>>>;
2830

31+
/// Shared state for tracking batch map senders between the sender and the receiver tasks.
32+
/// We have BiDi gRPC stream so we have 2 different set of tasks for sending and receiving.
2933
#[derive(Default)]
3034
pub(in crate::mapper) struct StreamSenderMapState {
35+
/// Map of oneshot response senders keyed by message id.
3136
map: StreamResponseSenderMap,
37+
/// Flag to indicate whether the rx task has closed the stream and cleared the `map`.
38+
/// This is because `tx.send()` could return `Ok()` even after the receiver task has closed the
39+
/// stream.
3240
closed: bool,
3341
}
3442

@@ -261,7 +269,8 @@ impl UserDefinedStreamMap {
261269
.send(Err(Error::Mapper(format!(
262270
"failed to send message to stream map server: {e}"
263271
))))
264-
.await;
272+
.await
273+
.inspect(|_| warn!("failed to send error to oneshot receiver"));
265274
return rx;
266275
}
267276

rust/numaflow-core/src/mapper/map/unary.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use tokio_util::sync::CancellationToken;
1313
use tokio_util::task::AbortOnDropHandle;
1414
use tonic::Streaming;
1515
use tonic::transport::Channel;
16-
use tracing::error;
16+
use tracing::{error, warn};
1717

1818
use super::{
1919
ParentMessageInfo, SharedMapTaskContext, UserDefinedMessage, create_response_stream,
@@ -23,24 +23,34 @@ use super::{
2323
/// Type alias for the response - raw results from the UDF
2424
pub(in crate::mapper) type UnaryMapResponse = Vec<map::map_response::Result>;
2525

26+
/// Type aliases for HashMap used to track the oneshot response sender for each request keyed by
27+
/// message id.
2628
type ResponseSenderMap = HashMap<String, oneshot::Sender<Result<UnaryMapResponse>>>;
2729

30+
/// Shared state for tracking batch map senders between the sender and the receiver tasks.
31+
/// We have BiDi gRPC stream so we have 2 different set of tasks for sending and receiving.
2832
#[derive(Default)]
2933
pub(in crate::mapper) struct UnarySenderMapState {
34+
/// Map of oneshot response senders keyed by message id.
3035
map: ResponseSenderMap,
36+
/// Flag to indicate whether the rx task has closed the stream and cleared the `map`.
37+
/// This is because `tx.send()` could return `Ok()` even after the receiver task has closed the
38+
/// stream.
3139
closed: bool,
3240
}
3341

34-
/// MapUnaryTask encapsulates all the context needed to execute a unary map operation.
42+
/// MapUnaryTask encapsulates all the context needed to execute a unary map operation per message.
3543
pub(in crate::mapper) struct MapUnaryTask {
3644
pub mapper: UserDefinedUnaryMap,
45+
/// Permit to achieve structured concurrency by ensuring we do not exceed the concurrency limit
46+
/// and all the tasks are cleaned up when the component is shutting down.
3747
pub permit: OwnedSemaphorePermit,
3848
pub message: Message,
3949
pub shared_ctx: Arc<SharedMapTaskContext>,
4050
}
4151

4252
impl MapUnaryTask {
43-
/// Spawns the unary map task as a tokio task.
53+
/// Spawns the unary map task as a tokio task per [MapUnaryTask].
4454
/// The task will process the message through the UDF and send results downstream.
4555
pub fn spawn(self) {
4656
tokio::spawn(async move {
@@ -229,7 +239,9 @@ impl UserDefinedUnaryMap {
229239
if !senders_guard.closed {
230240
senders_guard.map.insert(key.clone(), tx);
231241
} else {
232-
let _ = tx.send(Err(Error::Mapper("mapper closed".to_string())));
242+
let _ = tx
243+
.send(Err(Error::Mapper("mapper closed".to_string())))
244+
.inspect(|_| warn!("failed to send error to oneshot receiver"));
233245
}
234246
};
235247

0 commit comments

Comments
 (0)