@@ -1516,8 +1516,26 @@ pub struct TelemetryCore;
15161516impl TelemetryCore {
15171517 /// Build a telemetry session. Recording starts disabled; call
15181518 /// [`TelemetryGuard::enable`] to begin recording.
1519- #[ builder]
1519+ #[ builder( state_mod = telemetry_core_builder ) ]
15201520 pub fn new (
1521+ /// The pipeline of [`SegmentProcessor`](crate::background_task::SegmentProcessor)s
1522+ /// to run on each sealed segment. When empty the background worker
1523+ /// is not spawned.
1524+ #[ builder( field) ]
1525+ processors : Vec < Box < dyn crate :: background_task:: SegmentProcessor > > ,
1526+ /// Static segment metadata injected into every rotated segment's
1527+ /// header. Empty by default; the S3 preset populates it from the
1528+ /// configured `S3Config` so traces stay self-describing.
1529+ #[ builder( field) ]
1530+ segment_metadata : Vec < ( String , String ) > ,
1531+ /// S3 upload configuration.
1532+ #[ cfg( feature = "worker-s3" ) ]
1533+ #[ builder( field) ]
1534+ s3_config : Option < crate :: background_task:: s3:: S3Config > ,
1535+ /// Pre-built S3 client.
1536+ #[ cfg( feature = "worker-s3" ) ]
1537+ #[ builder( field) ]
1538+ s3_client : Option < aws_sdk_s3:: Client > ,
15211539 /// The trace writer (e.g. [`RotatingWriter`], [`NullWriter`]).
15221540 writer : impl TraceWriter + ' static ,
15231541 /// Path for trace output. Enables the background worker when any
@@ -1533,16 +1551,6 @@ impl TelemetryCore {
15331551 /// Enable scheduler event capture (Linux only).
15341552 #[ cfg( feature = "cpu-profiling" ) ]
15351553 sched_events : Option < crate :: telemetry:: cpu_profile:: SchedEventConfig > ,
1536- /// The pipeline of [`SegmentProcessor`](crate::background_task::SegmentProcessor)s
1537- /// to run on each sealed segment. When empty the background worker
1538- /// is not spawned.
1539- #[ builder( default ) ]
1540- processors : Vec < Box < dyn crate :: background_task:: SegmentProcessor > > ,
1541- /// Static segment metadata injected into every rotated segment's
1542- /// header. Empty by default; the S3 preset populates it from the
1543- /// configured `S3Config` so traces stay self-describing.
1544- #[ builder( default ) ]
1545- segment_metadata : Vec < ( String , String ) > ,
15461554 /// How often the background worker polls for sealed segments.
15471555 worker_poll_interval : Option < Duration > ,
15481556 /// Metrics sink for the flush/worker threads.
@@ -1557,6 +1565,33 @@ impl TelemetryCore {
15571565 . task_dump_idle_threshold_ns
15581566 . store ( cfg. idle_threshold ( ) . as_nanos ( ) as u64 , Ordering :: Relaxed ) ;
15591567 }
1568+
1569+ // Assemble processors from s3_config when provided and no explicit
1570+ // processors were set.
1571+ #[ allow( unused_mut) ]
1572+ let mut processors = processors;
1573+ #[ allow( unused_mut) ]
1574+ let mut segment_metadata = segment_metadata;
1575+ #[ cfg( feature = "worker-s3" ) ]
1576+ if let Some ( config) = s3_config {
1577+ if segment_metadata. is_empty ( ) {
1578+ segment_metadata = config
1579+ . as_metadata ( )
1580+ . map ( |( k, v) | ( k. to_string ( ) , v. to_string ( ) ) )
1581+ . collect ( ) ;
1582+ }
1583+ if processors. is_empty ( ) {
1584+ #[ cfg( feature = "cpu-profiling" ) ]
1585+ if cpu_profiling. is_some ( ) {
1586+ processors. push ( Box :: new ( crate :: background_task:: SymbolizeProcessor ) ) ;
1587+ }
1588+ processors. push ( Box :: new ( crate :: background_task:: GzipCompressor ) ) ;
1589+ processors. push ( Box :: new ( crate :: background_task:: S3PipelineUploader :: new (
1590+ config, s3_client,
1591+ ) ) ) ;
1592+ }
1593+ }
1594+
15601595 #[ allow( unused_mut) ]
15611596 let mut event_writer = EventWriter :: new ( Box :: new ( writer) ) ;
15621597
@@ -1659,6 +1694,38 @@ impl TelemetryCore {
16591694 }
16601695}
16611696
1697+ // Custom methods on the generated builder.
1698+ impl < W : TraceWriter , S : telemetry_core_builder:: State > TelemetryCoreBuilder < W , S > {
1699+ /// Configure S3 upload for sealed trace segments.
1700+ #[ cfg( feature = "worker-s3" ) ]
1701+ pub fn s3_config ( mut self , config : crate :: background_task:: s3:: S3Config ) -> Self {
1702+ self . s3_config = Some ( config) ;
1703+ self
1704+ }
1705+
1706+ /// Provide a pre-built S3 client (for custom credentials or endpoints).
1707+ #[ cfg( feature = "worker-s3" ) ]
1708+ pub fn s3_client ( mut self , client : aws_sdk_s3:: Client ) -> Self {
1709+ self . s3_client = Some ( client) ;
1710+ self
1711+ }
1712+
1713+ /// Set the processor pipeline directly.
1714+ pub fn processors (
1715+ mut self ,
1716+ processors : Vec < Box < dyn crate :: background_task:: SegmentProcessor > > ,
1717+ ) -> Self {
1718+ self . processors = processors;
1719+ self
1720+ }
1721+
1722+ /// Set static segment metadata.
1723+ pub fn segment_metadata ( mut self , entries : Vec < ( String , String ) > ) -> Self {
1724+ self . segment_metadata = entries;
1725+ self
1726+ }
1727+ }
1728+
16621729/// The flush thread main loop. Extracted so `TelemetryCore::builder` stays readable.
16631730fn run_flush_loop (
16641731 control_rx : crate :: primitives:: sync:: mpsc:: Receiver < ControlCommand > ,
0 commit comments