Skip to content

Commit 36d98e8

Browse files
authored
Merge branch 'main' into dev/sapatr/codesimplification1
2 parents beea881 + 9c54c8e commit 36d98e8

11 files changed

Lines changed: 66 additions & 106 deletions

File tree

rust/otap-dataflow/.config/nextest.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ retries = 2
44

55
[profile.ci]
66
slow-timeout = { period = "60s", terminate-after = 2 }
7-
max-fail = 10
7+
fail-fast = { max-fail = 10 }
88
retries = 2
99

1010
# JUnit output for flaky test tracking.

rust/otap-dataflow/crates/core-nodes/src/exporters/otap_exporter/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -852,9 +852,7 @@ mod tests {
852852
);
853853
}
854854

855-
// Skipping on Windows due to flakiness: https://github.com/open-telemetry/otel-arrow/issues/1614
856855
#[test]
857-
#[cfg_attr(windows, ignore = "Skipping on Windows due to flakiness")]
858856
fn test_receiver_not_ready_on_start() {
859857
let grpc_addr = "127.0.0.1";
860858
let grpc_port = portpicker::pick_unused_port().expect("No free ports");

rust/otap-dataflow/crates/core-nodes/src/exporters/otlp_grpc_exporter/mod.rs

Lines changed: 16 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -862,13 +862,25 @@ mod tests {
862862

863863
use otap_df_config::node::NodeUserConfig;
864864

865-
#[cfg(not(windows))]
865+
use otap_df_config::transport_headers::{TransportHeader, TransportHeaders};
866866
use otap_df_config::transport_headers_policy::PropagationSelectorType;
867+
use otap_df_config::transport_headers_policy::{
868+
HeaderPropagationPolicy, PropagationAction, PropagationDefault, PropagationMatch,
869+
PropagationOverride, PropagationSelector,
870+
};
867871
use otap_df_engine::Interests;
868872
use otap_df_engine::context::ControllerContext;
869873
use otap_df_engine::control::PipelineCompletionMsg;
874+
use otap_df_engine::control::{
875+
Controllable, PipelineCompletionMsgSender, RuntimeCtrlMsgSender,
876+
pipeline_completion_msg_channel, runtime_ctrl_msg_channel,
877+
};
870878
use otap_df_engine::error::Error;
871879
use otap_df_engine::exporter::ExporterWrapper;
880+
use otap_df_engine::local::message::{LocalReceiver, LocalSender};
881+
use otap_df_engine::message::{Receiver, Sender};
882+
use otap_df_engine::node::NodeWithPDataReceiver;
883+
use otap_df_engine::testing::create_not_send_channel;
872884
use otap_df_engine::testing::{
873885
exporter::{TestContext, TestRuntime},
874886
test_node,
@@ -883,7 +895,9 @@ mod tests {
883895
use otap_df_pdata::proto::opentelemetry::collector::metrics::v1::metrics_service_server::MetricsServiceServer;
884896
use otap_df_pdata::proto::opentelemetry::collector::trace::v1::ExportTraceServiceRequest;
885897
use otap_df_pdata::proto::opentelemetry::collector::trace::v1::trace_service_server::TraceServiceServer;
898+
use otap_df_telemetry::metrics::MetricSetSnapshot;
886899
use otap_df_telemetry::registry::TelemetryRegistryHandle;
900+
use otap_df_telemetry::reporter::MetricsReporter;
887901
use prost::Message;
888902
use std::net::SocketAddr;
889903
use std::pin::Pin;
@@ -893,25 +907,6 @@ mod tests {
893907
use tokio::time::{Duration, timeout};
894908
use tonic::codegen::tokio_stream::wrappers::TcpListenerStream;
895909
use tonic::transport::Server;
896-
// Imports only used by tests that are skipped on Windows
897-
#[cfg(not(windows))]
898-
use {
899-
otap_df_config::transport_headers::{TransportHeader, TransportHeaders},
900-
otap_df_config::transport_headers_policy::{
901-
HeaderPropagationPolicy, PropagationAction, PropagationDefault, PropagationMatch,
902-
PropagationOverride, PropagationSelector,
903-
},
904-
otap_df_engine::control::{
905-
Controllable, PipelineCompletionMsgSender, RuntimeCtrlMsgSender,
906-
pipeline_completion_msg_channel, runtime_ctrl_msg_channel,
907-
},
908-
otap_df_engine::local::message::{LocalReceiver, LocalSender},
909-
otap_df_engine::message::{Receiver, Sender},
910-
otap_df_engine::node::NodeWithPDataReceiver,
911-
otap_df_engine::testing::create_not_send_channel,
912-
otap_df_telemetry::metrics::MetricSetSnapshot,
913-
otap_df_telemetry::reporter::MetricsReporter,
914-
};
915910

916911
/// Helper function to wait for and validate an Ack or Nack message with the expected node_id
917912
async fn wait_for_ack_or_nack(
@@ -1153,8 +1148,6 @@ mod tests {
11531148
_ = shutdown_sender.send("Shutdown");
11541149
}
11551150

1156-
// Skipping on Windows due to flakiness: https://github.com/open-telemetry/otel-arrow/issues/1611
1157-
#[cfg(not(windows))]
11581151
#[test]
11591152
fn test_receiver_not_ready_on_start_and_reconnect() {
11601153
// the purpose of this test is to that the exporter behaves as expected in the face of
@@ -1181,6 +1174,7 @@ mod tests {
11811174
config: Config {
11821175
grpc: GrpcClientSettings {
11831176
grpc_endpoint: grpc_endpoint.clone(),
1177+
connect_timeout: Duration::from_millis(500),
11841178
..Default::default()
11851179
},
11861180
max_in_flight: 32,
@@ -1458,7 +1452,6 @@ mod tests {
14581452
// ---- build_grpc_metadata unit tests ----------------------------------------
14591453

14601454
/// Helper: Creates an [`EffectHandler`] with an optional propagation policy set.
1461-
#[cfg(not(windows))]
14621455
fn make_effect_handler_with_policy(
14631456
policy: Option<HeaderPropagationPolicy>,
14641457
) -> EffectHandler<OtapPdata> {
@@ -1470,7 +1463,6 @@ mod tests {
14701463
}
14711464

14721465
/// Helper: Creates a [`Context`] that carries the given transport headers.
1473-
#[cfg(not(windows))]
14741466
fn context_with_headers(headers: TransportHeaders) -> Context {
14751467
let pdata = OtapPdata::new_default(OtlpProtoBytes::ExportLogsRequest(Bytes::new()).into())
14761468
.with_transport_headers(headers);
@@ -1479,15 +1471,13 @@ mod tests {
14791471
}
14801472

14811473
/// Helper: Creates a [`Context`] without any transport headers.
1482-
#[cfg(not(windows))]
14831474
fn context_without_headers() -> Context {
14841475
let pdata = OtapPdata::new_default(OtlpProtoBytes::ExportLogsRequest(Bytes::new()).into());
14851476
let (context, _) = pdata.into_parts();
14861477
context
14871478
}
14881479

14891480
/// Helper: Propagation policy that propagates all captured headers.
1490-
#[cfg(not(windows))]
14911481
fn propagate_all_policy() -> HeaderPropagationPolicy {
14921482
HeaderPropagationPolicy::new(
14931483
PropagationDefault {
@@ -1501,7 +1491,6 @@ mod tests {
15011491
)
15021492
}
15031493

1504-
#[cfg(not(windows))]
15051494
#[test]
15061495
fn test_build_grpc_metadata_returns_none_without_policy() {
15071496
let handler = make_effect_handler_with_policy(None);
@@ -1513,7 +1502,6 @@ mod tests {
15131502
assert!(result.is_none(), "should return None when no policy is set");
15141503
}
15151504

1516-
#[cfg(not(windows))]
15171505
#[test]
15181506
fn test_build_grpc_metadata_returns_none_without_headers() {
15191507
let handler = make_effect_handler_with_policy(Some(propagate_all_policy()));
@@ -1526,7 +1514,6 @@ mod tests {
15261514
);
15271515
}
15281516

1529-
#[cfg(not(windows))]
15301517
#[test]
15311518
fn test_build_grpc_metadata_propagates_text_headers() {
15321519
let handler = make_effect_handler_with_policy(Some(propagate_all_policy()));
@@ -1558,7 +1545,6 @@ mod tests {
15581545
assert_eq!(request_id.to_str().unwrap(), "req-xyz-789");
15591546
}
15601547

1561-
#[cfg(not(windows))]
15621548
#[test]
15631549
fn test_build_grpc_metadata_drops_filtered_headers() {
15641550
let policy = HeaderPropagationPolicy::new(
@@ -1602,7 +1588,6 @@ mod tests {
16021588
);
16031589
}
16041590

1605-
#[cfg(not(windows))]
16061591
#[test]
16071592
fn test_build_grpc_metadata_propagates_binary_headers() {
16081593
let handler = make_effect_handler_with_policy(Some(propagate_all_policy()));
@@ -1629,7 +1614,6 @@ mod tests {
16291614
);
16301615
}
16311616

1632-
#[cfg(not(windows))]
16331617
#[test]
16341618
fn test_build_grpc_metadata_appends_bin_suffix_for_binary_headers() {
16351619
let handler = make_effect_handler_with_policy(Some(propagate_all_policy()));
@@ -1653,7 +1637,6 @@ mod tests {
16531637
assert_eq!(bin_val.to_bytes().unwrap(), binary_value.as_slice());
16541638
}
16551639

1656-
#[cfg(not(windows))]
16571640
#[test]
16581641
fn test_build_grpc_metadata_preserves_duplicate_headers() {
16591642
let handler = make_effect_handler_with_policy(Some(propagate_all_policy()));
@@ -1691,7 +1674,6 @@ mod tests {
16911674
);
16921675
}
16931676

1694-
#[cfg(not(windows))]
16951677
#[test]
16961678
fn test_build_grpc_metadata_returns_none_when_all_dropped() {
16971679
// Policy that drops everything (selector = None means no headers are selected).

rust/otap-dataflow/crates/core-nodes/src/exporters/parquet_exporter/mod.rs

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,19 @@ impl Exporter<OtapPdata> for ParquetExporter {
250250
deadline,
251251
reason: _,
252252
}) => {
253+
// If the deadline has already passed, return immediately.
254+
// `Delay::new(Duration::ZERO)` is not guaranteed to resolve
255+
// on the first poll on all platforms (Windows timer
256+
// granularity is ~15 ms), so an explicit check avoids a
257+
// race between the timeout and the flush future.
258+
if deadline.checked_duration_since(Instant::now()).is_none() {
259+
let _ = telemetry_cancel_handle.cancel().await;
260+
return Err(Error::IoError {
261+
node: exporter_id.clone(),
262+
error: std::io::Error::from(ErrorKind::TimedOut),
263+
});
264+
}
265+
253266
let mut timeout = Delay::new(deadline.duration_since(Instant::now())).fuse();
254267
let flush_all = writer.flush_all().fuse();
255268
pin_mut!(flush_all);
@@ -503,10 +516,6 @@ mod test {
503516
}
504517

505518
#[test]
506-
#[cfg_attr(
507-
target_os = "windows",
508-
ignore = "Skipping on Windows due to timing flakiness"
509-
)]
510519
fn test_adaptive_schema_dict_upgrade_write() {
511520
let test_runtime = TestRuntime::<OtapPdata>::new();
512521
let temp_dir = tempfile::tempdir().unwrap();
@@ -895,19 +904,15 @@ mod test {
895904
});
896905
}
897906

898-
// Skipping on Windows and macOS due to flakiness: https://github.com/open-telemetry/otel-arrow/issues/1614
899907
#[test]
900-
#[cfg_attr(
901-
any(target_os = "windows", target_os = "macos"),
902-
ignore = "Skipping on Windows and macOS due to flakiness"
903-
)]
904908
fn test_shutdown_timeout() {
905909
let test_runtime = TestRuntime::<OtapPdata>::new();
906910
let temp_dir = tempfile::tempdir().unwrap();
907911
let base_dir: String = temp_dir.path().to_str().unwrap().into();
912+
let base_dir_url = base_dir.replace('\\', "/");
908913
let exporter = ParquetExporter::new(config::Config {
909914
storage: object_store::StorageType::File {
910-
base_uri: format!("testdelayed://{base_dir}?delay=500ms"),
915+
base_uri: format!("testdelayed:///{base_dir_url}?delay=500ms"),
911916
},
912917
partitioning_strategies: None,
913918
writer_options: Some(WriterOptions {
@@ -1030,10 +1035,10 @@ mod test {
10301035
);
10311036
}
10321037

1033-
// shutdown faster than it could possibly flush
1038+
// Make timeout deterministic: deadline is already due when shutdown is handled.
10341039
_ = ctrl_sender
10351040
.send(NodeControlMsg::Shutdown {
1036-
deadline: Instant::now().add(Duration::from_secs(1)),
1041+
deadline: Instant::now(),
10371042
reason: "shutting down".into(),
10381043
})
10391044
.await;

rust/otap-dataflow/crates/core-nodes/src/receivers/otlp_receiver/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1758,7 +1758,7 @@ mod tests {
17581758
.await
17591759
.expect("Failed to send Shutdown");
17601760

1761-
timeout(Duration::from_secs(1), async {
1761+
timeout(Duration::from_secs(5), async {
17621762
loop {
17631763
if LogsServiceClient::connect(grpc_endpoint.clone())
17641764
.await
@@ -1866,7 +1866,6 @@ mod tests {
18661866
}
18671867
}
18681868

1869-
#[cfg_attr(any(windows), ignore = "Skipping on Windows due to flakiness")]
18701869
#[test]
18711870
fn test_otlp_receiver_ack() {
18721871
let test_runtime = TestRuntime::new();

rust/otap-dataflow/crates/otap/src/object_store.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,14 @@ mod test {
253253

254254
let path = url.path().to_string();
255255

256+
// On Windows, url.path() returns "/C:/..." for file paths; strip the leading slash
257+
// so that LocalFileSystem receives a valid Windows path.
258+
#[cfg(windows)]
259+
let path = path
260+
.strip_prefix('/')
261+
.map(|s| s.to_string())
262+
.unwrap_or(path);
263+
256264
let delay = url
257265
.query_pairs()
258266
.find(|(k, _)| k == "delay")
@@ -342,23 +350,20 @@ mod test {
342350
}
343351
}
344352

345-
// Skipping on Windows: https://github.com/open-telemetry/otel-arrow/issues/1614
346353
#[test]
347-
#[cfg(not(windows))]
348354
fn test_get_testdelayed_file_storage() {
349-
let storage = StorageType::File {
350-
base_uri: "testdelayed:///tmp".to_string(),
351-
};
355+
let tmp = tempfile::tempdir().unwrap();
356+
let path = tmp.path().to_str().unwrap().replace('\\', "/");
357+
let base_uri = format!("testdelayed:///{path}");
358+
let storage = StorageType::File { base_uri };
352359
assert!(from_storage_type(&storage).is_ok());
353360
}
354361

355-
// Skipping on Windows: https://github.com/open-telemetry/otel-arrow/issues/1614
356362
#[test]
357-
#[cfg(not(windows))]
358363
fn test_get_file_storage() {
359-
let storage = StorageType::File {
360-
base_uri: "/tmp".to_string(),
361-
};
364+
let tmp = tempfile::tempdir().unwrap();
365+
let base_uri = tmp.path().to_str().unwrap().to_string();
366+
let storage = StorageType::File { base_uri };
362367
assert!(from_storage_type(&storage).is_ok());
363368
}
364369

rust/otap-dataflow/crates/otap/src/otap_grpc/client_settings.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -602,10 +602,6 @@ mod tests {
602602
}
603603

604604
#[tokio::test]
605-
#[cfg_attr(
606-
any(target_os = "windows", target_os = "macos"),
607-
ignore = "Skipping on Windows and macOS due to flakiness"
608-
)]
609605
async fn build_endpoint_with_tls_errors_when_ca_file_missing() {
610606
crate::crypto::ensure_crypto_provider();
611607
let settings = GrpcClientSettings {
@@ -619,9 +615,12 @@ mod tests {
619615
};
620616

621617
let err = settings.build_endpoint_with_tls().await.unwrap_err();
618+
let err_msg = err.to_string().to_lowercase();
622619
assert!(
623-
err.to_string().to_lowercase().contains("no such")
624-
|| err.to_string().to_lowercase().contains("not found")
620+
err_msg.contains("no such")
621+
|| err_msg.contains("not found")
622+
|| err_msg.contains("cannot find"),
623+
"unexpected error message: {err_msg}"
625624
);
626625
}
627626

0 commit comments

Comments
 (0)