Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -486,14 +486,6 @@ impl Exporter<OtapPdata> for AzureMonitorExporter {
}
})?;

// Start periodic telemetry collection and retain the cancel handle for graceful shutdown
let telemetry_timer_cancel_handle = effect_handler
.start_periodic_telemetry(std::time::Duration::from_secs(1))
.await
.map_err(|e| EngineError::InternalError {
message: format!("Failed to start telemetry timer: {e}"),
})?;

let mut next_periodic_export = tokio::time::Instant::now()
+ tokio::time::Duration::from_secs(PERIODIC_EXPORT_INTERVAL);
let mut next_heartbeat_send = tokio::time::Instant::now();
Expand Down Expand Up @@ -639,7 +631,6 @@ impl Exporter<OtapPdata> for AzureMonitorExporter {
let _ = self.metrics.borrow_mut().report(&mut metrics_reporter);
}
Ok(Message::Control(NodeControlMsg::Shutdown { deadline, .. })) => {
let _ = telemetry_timer_cancel_handle.cancel().await;
self.handle_shutdown(&effect_handler).await?;
let snapshot = self.metrics.borrow().metrics().snapshot();
return Ok(TerminalState::new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use otap_df_telemetry_macros::metric_set;
use serde::Deserialize;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Instant;

// Geneva uploader dependencies
use futures::StreamExt;
Expand Down Expand Up @@ -724,12 +724,6 @@ impl Exporter<OtapPdata> for GenevaExporter {
message = "Geneva exporter starting"
);

// Start periodic telemetry collection so CollectTelemetry messages
// are delivered to this exporter's message channel.
let timer_cancel_handle = effect_handler
.start_periodic_telemetry(Duration::from_secs(1))
.await?;

// Message loop
loop {
match msg_chan.recv().await? {
Expand All @@ -739,7 +733,6 @@ impl Exporter<OtapPdata> for GenevaExporter {
message = "Geneva exporter shutting down"
);

_ = timer_cancel_handle.cancel().await;
return Ok(TerminalState::new(
deadline,
[self.pdata_metrics.snapshot(), self.metrics.snapshot()],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,10 +349,6 @@ impl local::Exporter<OtapPdata> for OTAPExporter {
}
})?;

let timer_cancel_handle = effect_handler
.start_periodic_telemetry(Duration::from_secs(1))
.await?;

// start a grpc client and connect to the server
let mut arrow_metrics_client = ArrowMetricsServiceClient::new(channel.clone());
let mut arrow_logs_client = ArrowLogsServiceClient::new(channel.clone());
Expand Down Expand Up @@ -451,7 +447,6 @@ impl local::Exporter<OtapPdata> for OTAPExporter {
await_stream_handles(logs_handles).await;
await_stream_handles(metrics_handles).await;
await_stream_handles(traces_handles).await;
_ = timer_cancel_handle.cancel().await;
self.export_latency_window
.report_into(&mut self.async_metrics);
return Ok(TerminalState::new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ use otap_df_telemetry::{otel_debug, otel_info, otel_warn};
use serde::Deserialize;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use tonic::codec::CompressionEncoding;
use tonic::metadata::{MetadataKey, MetadataMap, MetadataValue};
use tonic::transport::Channel;
Expand Down Expand Up @@ -132,9 +131,6 @@ impl Exporter<OtapPdata> for OTLPExporter {
self.config.grpc.log_proxy_info();

let exporter_id = effect_handler.exporter_id();
let timer_cancel_handle = effect_handler
.start_periodic_telemetry(Duration::from_secs(1))
.await?;

let channel = self
.config
Expand Down Expand Up @@ -246,7 +242,6 @@ impl Exporter<OtapPdata> for OTLPExporter {
grpc_clients.release(client);
}
}
_ = timer_cancel_handle.cancel().await;
return Ok(TerminalState::new(deadline, [self.pdata_metrics]));
}
Message::Control(NodeControlMsg::CollectTelemetry {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
use std::num::NonZeroUsize;
use std::rc::Rc;
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
Expand Down Expand Up @@ -225,10 +224,6 @@ impl Exporter<OtapPdata> for OtlpHttpExporter {
traces_endpoint = traces_endpoint.as_str(),
);

let telemetry_timer_cancel = effect_handler
.start_periodic_telemetry(Duration::from_secs(1))
.await?;

let max_in_flight = self.config.max_in_flight.max(1);
let mut client_pool =
HttpClientPool::try_new(&self.config.http, self.config.client_pool_size)
Expand Down Expand Up @@ -307,7 +302,6 @@ impl Exporter<OtapPdata> for OtlpHttpExporter {
.await;
}
}
_ = telemetry_timer_cancel.cancel().await;
return Ok(TerminalState::new(deadline, [self.pdata_metrics]));
}
Message::Control(NodeControlMsg::CollectTelemetry {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,6 @@ impl Exporter<OtapPdata> for ParquetExporter {
.await?;
}

// Start periodic telemetry collection (internal metrics)
let telemetry_cancel_handle = effect_handler
.start_periodic_telemetry(Duration::from_secs(1))
.await?;

let mut writer = writer::WriterManager::new(object_store, writer_options);
let mut batch_id = 0;
let mut id_generator = PartitionSequenceIdGenerator::new();
Expand Down Expand Up @@ -256,7 +251,6 @@ impl Exporter<OtapPdata> for ParquetExporter {
// granularity is ~15 ms), so an explicit check avoids a
// race between the timeout and the flush future.
if deadline.checked_duration_since(Instant::now()).is_none() {
let _ = telemetry_cancel_handle.cancel().await;
return Err(Error::IoError {
node: exporter_id.clone(),
error: std::io::Error::from(ErrorKind::TimedOut),
Expand All @@ -267,15 +261,7 @@ impl Exporter<OtapPdata> for ParquetExporter {
let flush_all = writer.flush_all().fuse();
pin_mut!(flush_all);
// Stop telemetry loop concurrently with flushing; do not block shutdown on cancel
let cancel_fut = async {
let _ = telemetry_cancel_handle.cancel().await;
futures::future::pending::<()>().await
}
.fuse();
pin_mut!(cancel_fut);

return futures::select_biased! {
_ = cancel_fut => unreachable!(),
_timeout = timeout => Err(Error::IoError {
node: exporter_id.clone(),
error: std::io::Error::from(ErrorKind::TimedOut)
Expand Down Expand Up @@ -1224,92 +1210,6 @@ mod test {
exporter_result.unwrap();
}

#[test]
fn test_starts_telemetry_timer() {
use otap_df_engine::control::runtime_ctrl_msg_channel;
use otap_df_engine::testing::test_node;

let test_runtime = TestRuntime::<OtapPdata>::new();
let temp_dir = tempfile::tempdir().unwrap();
let base_dir: String = temp_dir.path().to_str().unwrap().into();
let exporter = ParquetExporter::new(config::Config {
storage: object_store::StorageType::File {
base_uri: base_dir.clone(),
},
partitioning_strategies: None,
writer_options: None,
});
let node_config = Arc::new(NodeUserConfig::new_exporter_config(PARQUET_EXPORTER_URN));
let mut exporter = ExporterWrapper::<OtapPdata>::local::<ParquetExporter>(
exporter,
test_node(test_runtime.config().name.clone()),
node_config,
test_runtime.config(),
);

let (rt, _) = setup_test_runtime();
let control_sender = exporter.control_sender();
let (pdata_tx, pdata_rx) = create_not_send_channel::<OtapPdata>(1);
let _pdata_tx = Sender::Local(LocalSender::mpsc(pdata_tx));
let pdata_rx = Receiver::Local(LocalReceiver::mpsc(pdata_rx));

let (runtime_ctrl_msg_tx, mut runtime_ctrl_msg_rx) = runtime_ctrl_msg_channel(10);
let (pipeline_completion_msg_tx, _pipeline_completion_msg_rx) =
pipeline_completion_msg_channel::<OtapPdata>(10);

exporter
.set_pdata_receiver(test_node("exp"), pdata_rx)
.expect("Failed to set PData Receiver");

async fn start_exporter(
exporter: ExporterWrapper<OtapPdata>,
runtime_ctrl_msg_tx: RuntimeCtrlMsgSender<OtapPdata>,
pipeline_completion_msg_tx: PipelineCompletionMsgSender<OtapPdata>,
) -> Result<(), Error> {
let (_metrics_rx, metrics_reporter) =
otap_df_telemetry::reporter::MetricsReporter::create_new_and_receiver(1);
exporter
.start(
runtime_ctrl_msg_tx,
pipeline_completion_msg_tx,
metrics_reporter,
Interests::empty(),
)
.await
.map(|_| ())
}

let (_exporter_result, _ignored) = rt.block_on(async move {
tokio::join!(
start_exporter(exporter, runtime_ctrl_msg_tx, pipeline_completion_msg_tx),
async move {
// Expect StartTelemetryTimer quickly after startup
let msg = tokio::time::timeout(Duration::from_millis(1500), async {
runtime_ctrl_msg_rx.recv().await
})
.await
.expect("timed out waiting for StartTelemetryTimer")
.expect("runtime-control channel closed");

match msg {
RuntimeControlMsg::StartTelemetryTimer { duration, .. } => {
assert_eq!(duration, Duration::from_secs(1));
}
other => panic!("Expected StartTelemetryTimer, got {other:?}"),
}

// Shutdown exporter to end the test
let _ = control_sender
.send(NodeControlMsg::Shutdown {
deadline: Instant::now(),
reason: "done".into(),
})
.await;
}
)
});
}

#[test]
fn test_traces() {
let test_runtime = TestRuntime::<OtapPdata>::new();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ use otap_df_telemetry::otel_info;
use serde_json::Value;
use std::sync::Arc;
use std::time::Instant;
use tokio::time::Duration;

/// The URN for the OTAP Perf exporter
pub const OTAP_PERF_EXPORTER_URN: &str = "urn:otel:exporter:perf";
Expand Down Expand Up @@ -142,11 +141,6 @@ impl local::Exporter<OtapPdata> for PerfExporter {
message = "Starting Perf Exporter"
);

// Start telemetry collection tick as a dedicated control message.
let timer_cancel_handle = effect_handler
.start_periodic_telemetry(Duration::from_millis(self.config.frequency()))
.await?;

// Loop until a Shutdown event is received.
loop {
let msg = msg_chan.recv().await?;
Expand All @@ -160,7 +154,6 @@ impl local::Exporter<OtapPdata> for PerfExporter {
// ToDo: Handle configuration changes
Message::Control(NodeControlMsg::Config { .. }) => {}
Message::Control(NodeControlMsg::Shutdown { deadline, .. }) => {
_ = timer_cancel_handle.cancel().await;
return Ok(self.terminal_state(deadline));
}
Message::PData(mut pdata) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

/// URN for the topic exporter.
pub const TOPIC_EXPORTER_URN: &str = "urn:otel:exporter:topic";
Expand Down Expand Up @@ -404,9 +403,6 @@ impl Exporter<OtapPdata> for TopicExporter {
ack_propagation = format!("{ack_propagation_mode:?}"),
message = "Topic exporter started"
);
let telemetry_cancel_handle = effect_handler
.start_periodic_telemetry(Duration::from_secs(1))
.await?;

let run_result: Result<(), Error> = async {
loop {
Expand Down Expand Up @@ -583,7 +579,6 @@ impl Exporter<OtapPdata> for TopicExporter {
}
.await;

_ = telemetry_cancel_handle.cancel().await;
run_result?;
Ok(TerminalState::default())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,10 +522,6 @@ impl local::Receiver<OtapPdata> for FakeGeneratorReceiver {

let transport_headers = build_transport_headers(self.config.transport_headers());

let _ = effect_handler
.start_periodic_telemetry(Duration::from_secs(1))
.await?;

let run_len = producer.run_len();

// We consume one tick here because it's always immediately ready and would
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,6 @@ impl local::Receiver<OtapPdata> for InternalTelemetryReceiver {
let log_tap = internal.log_tap;
let mut scope_cache = ScopeToBytesMap::new(internal.registry);

// Start periodic telemetry collection
let _ = effect_handler
.start_periodic_telemetry(std::time::Duration::from_secs(1))
.await?;

loop {
tokio::select! {
biased;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,13 +383,6 @@ impl shared::Receiver<OtapPdata> for OTAPReceiver {
.add_service(metrics_server)
.add_service(traces_server);

// Start periodic telemetry collection
let mut telemetry_cancel_handle = Some(
effect_handler
.start_periodic_telemetry(Duration::from_secs(1))
.await?,
);

let grpc_shutdown = CancellationToken::new();
let server_task = {
let grpc_shutdown = grpc_shutdown.clone();
Expand Down Expand Up @@ -440,9 +433,6 @@ impl shared::Receiver<OtapPdata> for OTAPReceiver {
}

if server_task_done && states.is_empty() {
if let Some(handle) = telemetry_cancel_handle.take() {
_ = handle.cancel().await;
}
effect_handler.notify_receiver_drained().await?;
self.flush_memory_pressure_metrics();
terminal_state = TerminalState::new(deadline, [self.metrics.snapshot()]);
Expand Down Expand Up @@ -480,9 +470,6 @@ impl shared::Receiver<OtapPdata> for OTAPReceiver {
otap_df_telemetry::otel_info!("otap_receiver.shutdown");
grpc_shutdown.cancel();
states.force_shutdown(&reason);
if let Some(handle) = telemetry_cancel_handle.take() {
_ = handle.cancel().await;
}
self.flush_memory_pressure_metrics();
terminal_state = TerminalState::new(deadline, [self.metrics.snapshot()]);
break;
Expand All @@ -501,9 +488,6 @@ impl shared::Receiver<OtapPdata> for OTAPReceiver {
self.handle_nack_response(self.route_nack_response(&states, nack));
}
Err(e) => {
if let Some(handle) = telemetry_cancel_handle.take() {
_ = handle.cancel().await;
}
return Err(Error::ChannelRecvError(e));
}
_ => {}
Expand All @@ -523,9 +507,6 @@ impl shared::Receiver<OtapPdata> for OTAPReceiver {
}

if draining_deadline.is_none() {
if let Some(handle) = telemetry_cancel_handle.take() {
_ = handle.cancel().await;
}
self.flush_memory_pressure_metrics();
terminal_state = TerminalState::new(
clock::now().add(Duration::from_secs(1)),
Expand Down
Loading
Loading