Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -486,14 +486,6 @@ impl Exporter<OtapPdata> for AzureMonitorExporter {
}
})?;

// Start periodic telemetry collection and retain the cancel handle for graceful shutdown
let telemetry_timer_cancel_handle = effect_handler
.start_periodic_telemetry(std::time::Duration::from_secs(1))
.await
.map_err(|e| EngineError::InternalError {
message: format!("Failed to start telemetry timer: {e}"),
})?;

let mut next_periodic_export = tokio::time::Instant::now()
+ tokio::time::Duration::from_secs(PERIODIC_EXPORT_INTERVAL);
let mut next_heartbeat_send = tokio::time::Instant::now();
Expand Down Expand Up @@ -639,7 +631,6 @@ impl Exporter<OtapPdata> for AzureMonitorExporter {
let _ = self.metrics.borrow_mut().report(&mut metrics_reporter);
}
Ok(Message::Control(NodeControlMsg::Shutdown { deadline, .. })) => {
let _ = telemetry_timer_cancel_handle.cancel().await;
self.handle_shutdown(&effect_handler).await?;
let snapshot = self.metrics.borrow().metrics().snapshot();
return Ok(TerminalState::new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use otap_df_telemetry_macros::metric_set;
use serde::Deserialize;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Instant;

// Geneva uploader dependencies
use futures::StreamExt;
Expand Down Expand Up @@ -724,12 +724,6 @@ impl Exporter<OtapPdata> for GenevaExporter {
message = "Geneva exporter starting"
);

// Start periodic telemetry collection so CollectTelemetry messages
// are delivered to this exporter's message channel.
let timer_cancel_handle = effect_handler
.start_periodic_telemetry(Duration::from_secs(1))
.await?;

// Message loop
loop {
match msg_chan.recv().await? {
Expand All @@ -739,7 +733,6 @@ impl Exporter<OtapPdata> for GenevaExporter {
message = "Geneva exporter shutting down"
);

_ = timer_cancel_handle.cancel().await;
return Ok(TerminalState::new(
deadline,
[self.pdata_metrics.snapshot(), self.metrics.snapshot()],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,10 +349,6 @@ impl local::Exporter<OtapPdata> for OTAPExporter {
}
})?;

let timer_cancel_handle = effect_handler
.start_periodic_telemetry(Duration::from_secs(1))
.await?;

// start a grpc client and connect to the server
let mut arrow_metrics_client = ArrowMetricsServiceClient::new(channel.clone());
let mut arrow_logs_client = ArrowLogsServiceClient::new(channel.clone());
Expand Down Expand Up @@ -451,7 +447,6 @@ impl local::Exporter<OtapPdata> for OTAPExporter {
await_stream_handles(logs_handles).await;
await_stream_handles(metrics_handles).await;
await_stream_handles(traces_handles).await;
_ = timer_cancel_handle.cancel().await;
self.export_latency_window
.report_into(&mut self.async_metrics);
return Ok(TerminalState::new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ use otap_df_telemetry::{otel_debug, otel_info, otel_warn};
use serde::Deserialize;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use tonic::codec::CompressionEncoding;
use tonic::metadata::{MetadataKey, MetadataMap, MetadataValue};
use tonic::transport::Channel;
Expand Down Expand Up @@ -132,9 +131,6 @@ impl Exporter<OtapPdata> for OTLPExporter {
self.config.grpc.log_proxy_info();

let exporter_id = effect_handler.exporter_id();
let timer_cancel_handle = effect_handler
.start_periodic_telemetry(Duration::from_secs(1))
.await?;

let channel = self
.config
Expand Down Expand Up @@ -246,7 +242,6 @@ impl Exporter<OtapPdata> for OTLPExporter {
grpc_clients.release(client);
}
}
_ = timer_cancel_handle.cancel().await;
return Ok(TerminalState::new(deadline, [self.pdata_metrics]));
}
Message::Control(NodeControlMsg::CollectTelemetry {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
use std::num::NonZeroUsize;
use std::rc::Rc;
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
Expand Down Expand Up @@ -225,10 +224,6 @@ impl Exporter<OtapPdata> for OtlpHttpExporter {
traces_endpoint = traces_endpoint.as_str(),
);

let telemetry_timer_cancel = effect_handler
.start_periodic_telemetry(Duration::from_secs(1))
.await?;

let max_in_flight = self.config.max_in_flight.max(1);
let mut client_pool =
HttpClientPool::try_new(&self.config.http, self.config.client_pool_size)
Expand Down Expand Up @@ -307,7 +302,6 @@ impl Exporter<OtapPdata> for OtlpHttpExporter {
.await;
}
}
_ = telemetry_timer_cancel.cancel().await;
return Ok(TerminalState::new(deadline, [self.pdata_metrics]));
}
Message::Control(NodeControlMsg::CollectTelemetry {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,6 @@ impl Exporter<OtapPdata> for ParquetExporter {
.await?;
}

// Start periodic telemetry collection (internal metrics)
let telemetry_cancel_handle = effect_handler
.start_periodic_telemetry(Duration::from_secs(1))
.await?;

let mut writer = writer::WriterManager::new(object_store, writer_options);
let mut batch_id = 0;
let mut id_generator = PartitionSequenceIdGenerator::new();
Expand Down Expand Up @@ -256,7 +251,6 @@ impl Exporter<OtapPdata> for ParquetExporter {
// granularity is ~15 ms), so an explicit check avoids a
// race between the timeout and the flush future.
if deadline.checked_duration_since(Instant::now()).is_none() {
let _ = telemetry_cancel_handle.cancel().await;
return Err(Error::IoError {
node: exporter_id.clone(),
error: std::io::Error::from(ErrorKind::TimedOut),
Expand All @@ -267,15 +261,7 @@ impl Exporter<OtapPdata> for ParquetExporter {
let flush_all = writer.flush_all().fuse();
pin_mut!(flush_all);
// Stop telemetry loop concurrently with flushing; do not block shutdown on cancel
let cancel_fut = async {
let _ = telemetry_cancel_handle.cancel().await;
futures::future::pending::<()>().await
}
.fuse();
pin_mut!(cancel_fut);

return futures::select_biased! {
_ = cancel_fut => unreachable!(),
_timeout = timeout => Err(Error::IoError {
node: exporter_id.clone(),
error: std::io::Error::from(ErrorKind::TimedOut)
Expand Down Expand Up @@ -1224,92 +1210,6 @@ mod test {
exporter_result.unwrap();
}

#[test]
fn test_starts_telemetry_timer() {
use otap_df_engine::control::runtime_ctrl_msg_channel;
use otap_df_engine::testing::test_node;

let test_runtime = TestRuntime::<OtapPdata>::new();
let temp_dir = tempfile::tempdir().unwrap();
let base_dir: String = temp_dir.path().to_str().unwrap().into();
let exporter = ParquetExporter::new(config::Config {
storage: object_store::StorageType::File {
base_uri: base_dir.clone(),
},
partitioning_strategies: None,
writer_options: None,
});
let node_config = Arc::new(NodeUserConfig::new_exporter_config(PARQUET_EXPORTER_URN));
let mut exporter = ExporterWrapper::<OtapPdata>::local::<ParquetExporter>(
exporter,
test_node(test_runtime.config().name.clone()),
node_config,
test_runtime.config(),
);

let (rt, _) = setup_test_runtime();
let control_sender = exporter.control_sender();
let (pdata_tx, pdata_rx) = create_not_send_channel::<OtapPdata>(1);
let _pdata_tx = Sender::Local(LocalSender::mpsc(pdata_tx));
let pdata_rx = Receiver::Local(LocalReceiver::mpsc(pdata_rx));

let (runtime_ctrl_msg_tx, mut runtime_ctrl_msg_rx) = runtime_ctrl_msg_channel(10);
let (pipeline_completion_msg_tx, _pipeline_completion_msg_rx) =
pipeline_completion_msg_channel::<OtapPdata>(10);

exporter
.set_pdata_receiver(test_node("exp"), pdata_rx)
.expect("Failed to set PData Receiver");

async fn start_exporter(
exporter: ExporterWrapper<OtapPdata>,
runtime_ctrl_msg_tx: RuntimeCtrlMsgSender<OtapPdata>,
pipeline_completion_msg_tx: PipelineCompletionMsgSender<OtapPdata>,
) -> Result<(), Error> {
let (_metrics_rx, metrics_reporter) =
otap_df_telemetry::reporter::MetricsReporter::create_new_and_receiver(1);
exporter
.start(
runtime_ctrl_msg_tx,
pipeline_completion_msg_tx,
metrics_reporter,
Interests::empty(),
)
.await
.map(|_| ())
}

let (_exporter_result, _ignored) = rt.block_on(async move {
tokio::join!(
start_exporter(exporter, runtime_ctrl_msg_tx, pipeline_completion_msg_tx),
async move {
// Expect StartTelemetryTimer quickly after startup
let msg = tokio::time::timeout(Duration::from_millis(1500), async {
runtime_ctrl_msg_rx.recv().await
})
.await
.expect("timed out waiting for StartTelemetryTimer")
.expect("runtime-control channel closed");

match msg {
RuntimeControlMsg::StartTelemetryTimer { duration, .. } => {
assert_eq!(duration, Duration::from_secs(1));
}
other => panic!("Expected StartTelemetryTimer, got {other:?}"),
}

// Shutdown exporter to end the test
let _ = control_sender
.send(NodeControlMsg::Shutdown {
deadline: Instant::now(),
reason: "done".into(),
})
.await;
}
)
});
}

#[test]
fn test_traces() {
let test_runtime = TestRuntime::<OtapPdata>::new();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ use otap_df_telemetry::otel_info;
use serde_json::Value;
use std::sync::Arc;
use std::time::Instant;
use tokio::time::Duration;

/// The URN for the OTAP Perf exporter
pub const OTAP_PERF_EXPORTER_URN: &str = "urn:otel:exporter:perf";
Expand Down Expand Up @@ -142,11 +141,6 @@ impl local::Exporter<OtapPdata> for PerfExporter {
message = "Starting Perf Exporter"
);

// Start telemetry collection tick as a dedicated control message.
let timer_cancel_handle = effect_handler
.start_periodic_telemetry(Duration::from_millis(self.config.frequency()))
.await?;

// Loop until a Shutdown event is received.
loop {
let msg = msg_chan.recv().await?;
Expand All @@ -160,7 +154,6 @@ impl local::Exporter<OtapPdata> for PerfExporter {
// ToDo: Handle configuration changes
Message::Control(NodeControlMsg::Config { .. }) => {}
Message::Control(NodeControlMsg::Shutdown { deadline, .. }) => {
_ = timer_cancel_handle.cancel().await;
return Ok(self.terminal_state(deadline));
}
Message::PData(mut pdata) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

/// URN for the topic exporter.
pub const TOPIC_EXPORTER_URN: &str = "urn:otel:exporter:topic";
Expand Down Expand Up @@ -404,9 +403,6 @@ impl Exporter<OtapPdata> for TopicExporter {
ack_propagation = format!("{ack_propagation_mode:?}"),
message = "Topic exporter started"
);
let telemetry_cancel_handle = effect_handler
.start_periodic_telemetry(Duration::from_secs(1))
.await?;

let run_result: Result<(), Error> = async {
loop {
Expand Down Expand Up @@ -583,7 +579,6 @@ impl Exporter<OtapPdata> for TopicExporter {
}
.await;

_ = telemetry_cancel_handle.cancel().await;
run_result?;
Ok(TerminalState::default())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,10 +522,6 @@ impl local::Receiver<OtapPdata> for FakeGeneratorReceiver {

let transport_headers = build_transport_headers(self.config.transport_headers());

let _ = effect_handler
.start_periodic_telemetry(Duration::from_secs(1))
.await?;

let run_len = producer.run_len();

// We consume one tick here because it's always immediately ready and would
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,6 @@ impl local::Receiver<OtapPdata> for InternalTelemetryReceiver {
let log_tap = internal.log_tap;
let mut scope_cache = ScopeToBytesMap::new(internal.registry);

// Start periodic telemetry collection
let _ = effect_handler
.start_periodic_telemetry(std::time::Duration::from_secs(1))
.await?;

loop {
tokio::select! {
biased;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,13 +383,6 @@ impl shared::Receiver<OtapPdata> for OTAPReceiver {
.add_service(metrics_server)
.add_service(traces_server);

// Start periodic telemetry collection
let mut telemetry_cancel_handle = Some(
effect_handler
.start_periodic_telemetry(Duration::from_secs(1))
.await?,
);

let grpc_shutdown = CancellationToken::new();
let server_task = {
let grpc_shutdown = grpc_shutdown.clone();
Expand Down Expand Up @@ -440,9 +433,6 @@ impl shared::Receiver<OtapPdata> for OTAPReceiver {
}

if server_task_done && states.is_empty() {
if let Some(handle) = telemetry_cancel_handle.take() {
_ = handle.cancel().await;
}
effect_handler.notify_receiver_drained().await?;
self.flush_memory_pressure_metrics();
terminal_state = TerminalState::new(deadline, [self.metrics.snapshot()]);
Expand Down Expand Up @@ -480,9 +470,6 @@ impl shared::Receiver<OtapPdata> for OTAPReceiver {
otap_df_telemetry::otel_info!("otap_receiver.shutdown");
grpc_shutdown.cancel();
states.force_shutdown(&reason);
if let Some(handle) = telemetry_cancel_handle.take() {
_ = handle.cancel().await;
}
self.flush_memory_pressure_metrics();
terminal_state = TerminalState::new(deadline, [self.metrics.snapshot()]);
break;
Expand All @@ -501,9 +488,6 @@ impl shared::Receiver<OtapPdata> for OTAPReceiver {
self.handle_nack_response(self.route_nack_response(&states, nack));
}
Err(e) => {
if let Some(handle) = telemetry_cancel_handle.take() {
_ = handle.cancel().await;
}
return Err(Error::ChannelRecvError(e));
}
_ => {}
Expand All @@ -523,9 +507,6 @@ impl shared::Receiver<OtapPdata> for OTAPReceiver {
}

if draining_deadline.is_none() {
if let Some(handle) = telemetry_cancel_handle.take() {
_ = handle.cancel().await;
}
self.flush_memory_pressure_metrics();
terminal_state = TerminalState::new(
clock::now().add(Duration::from_secs(1)),
Expand Down
Loading
Loading