Skip to content

Commit a66e783

Browse files
authored
Propagate inherited fields properly (#54)
2 parents 9e0941b + 2ec1de9 commit a66e783

8 files changed

Lines changed: 463 additions & 103 deletions

File tree

src/dataset.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -741,6 +741,7 @@ impl<S: SpanSubmitter + DatasetRegistrar + DatasetFetcher + DatasetSummarizer +
741741
} else {
742742
None
743743
},
744+
extra: std::collections::HashMap::new(),
744745
};
745746

746747
let parent_info = ParentSpanInfo::Dataset {

src/experiments/experiment.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ impl<
198198
tags: feedback.tags,
199199
context: None,
200200
span_attributes: None,
201+
extra: HashMap::new(),
201202
};
202203

203204
let parent_info = ParentSpanInfo::Experiment {
@@ -384,6 +385,7 @@ impl<
384385
purpose: None,
385386
extra: HashMap::new(),
386387
}),
388+
extra: HashMap::new(),
387389
};
388390

389391
let parent_info = ParentSpanInfo::Experiment {

src/log_queue/merge.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,15 @@ pub(crate) fn merge_row_into(target: &mut Logs3Row, source: Logs3Row) {
185185
}
186186
}
187187

188+
for (key, source_value) in source.extra {
189+
match target.extra.get_mut(&key) {
190+
Some(target_value) => deep_merge(target_value, &source_value),
191+
None => {
192+
target.extra.insert(key, source_value);
193+
}
194+
}
195+
}
196+
188197
// Combine merge_paths from both entries.
189198
// Only retain merge paths if both entries are merge-type entries.
190199
let final_is_merge = target.is_merge.unwrap_or(false) && source.is_merge.unwrap_or(false);

src/log_queue/queue.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use anyhow::Context;
22
use chrono::Utc;
33
use serde::{Deserialize, Serialize};
44
use serde_json::Value;
5-
use std::collections::HashMap;
65

76
use super::batching::batch_and_serialize_rows;
87
use super::config::LogQueueConfig;
@@ -379,6 +378,7 @@ impl LogQueueCore {
379378
tags,
380379
context,
381380
span_attributes,
381+
extra,
382382
} = payload;
383383

384384
if let Some(span_components) = span_components {
@@ -415,7 +415,7 @@ impl LogQueueCore {
415415
tags,
416416
context,
417417
span_attributes,
418-
extra: HashMap::new(),
418+
extra,
419419
created: Utc::now(),
420420
xact_id: None,
421421
object_delete: None,
@@ -539,7 +539,7 @@ impl LogQueueCore {
539539
tags,
540540
context,
541541
span_attributes,
542-
extra: HashMap::new(),
542+
extra,
543543
created: Utc::now(),
544544
xact_id: None,
545545
object_delete: None,
@@ -821,7 +821,7 @@ impl LogQueueCore {
821821
tags: payload.tags,
822822
context: payload.context,
823823
span_attributes: payload.span_attributes,
824-
extra: HashMap::new(),
824+
extra: payload.extra,
825825
created: Utc::now(),
826826
xact_id: None,
827827
object_delete: None,
@@ -1005,6 +1005,7 @@ mod tests {
10051005
tags: None,
10061006
context: None,
10071007
span_attributes: None,
1008+
extra: std::collections::HashMap::new(),
10081009
},
10091010
parent_info: Some(ParentSpanInfo::Experiment {
10101011
object_id: "exp-test".to_string(),

src/logger.rs

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::http::build_http_client;
2323
use crate::log_queue::{LogQueue, LogQueueConfig};
2424
use crate::span::SpanSubmitter;
2525
use crate::span_components::SpanComponents;
26-
use crate::types::{ParentSpanInfo, SpanAttributes, SpanObjectType, SpanPayload};
26+
use crate::types::{ParentSpanInfo, SpanAttributes, SpanEventData, SpanObjectType, SpanPayload};
2727

2828
// 30s covers both quick API calls (login, project registration) and slower batch log uploads.
2929
// The TypeScript SDK applies no explicit timeout.
@@ -632,15 +632,7 @@ impl BraintrustClient {
632632
}
633633

634634
let span_id = components.span_id.clone().unwrap_or_else(|| row_id.clone());
635-
636-
let payload = SpanPayload {
637-
row_id,
638-
span_id,
639-
is_merge: true,
640-
span_components: Some(components),
641-
org_id,
642-
org_name,
643-
project_name: None,
635+
let mut event_data = SpanEventData {
644636
input: event.input,
645637
output: event.output,
646638
expected: event.expected,
@@ -656,6 +648,31 @@ impl BraintrustClient {
656648
purpose: None,
657649
extra: HashMap::new(),
658650
}),
651+
extra: HashMap::new(),
652+
};
653+
if let Some(propagated_event) = components.propagated_event.as_ref() {
654+
event_data.apply_propagated_event(propagated_event);
655+
}
656+
657+
let payload = SpanPayload {
658+
row_id,
659+
span_id,
660+
is_merge: true,
661+
span_components: Some(components),
662+
org_id,
663+
org_name,
664+
project_name: None,
665+
input: event_data.input,
666+
output: event_data.output,
667+
expected: event_data.expected,
668+
error: event_data.error,
669+
scores: event_data.scores,
670+
metadata: event_data.metadata,
671+
metrics: event_data.metrics,
672+
tags: event_data.tags,
673+
context: event_data.context,
674+
span_attributes: event_data.span_attributes,
675+
extra: event_data.extra,
659676
};
660677

661678
self.submit_payload(token, payload, None);

src/span.rs

Lines changed: 45 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ use uuid::Uuid;
99

1010
use crate::error::Result;
1111
use crate::span_components::SpanComponents;
12-
use crate::types::{ParentSpanInfo, SpanAttributes, SpanObjectType, SpanPayload, SpanType};
12+
use crate::types::{
13+
ParentSpanInfo, SpanAttributes, SpanEventData, SpanObjectType, SpanPayload, SpanType,
14+
};
1315

1416
/// Error type for SpanLog builder validation.
1517
#[derive(Debug, Clone, PartialEq)]
@@ -578,14 +580,7 @@ struct SpanData {
578580
}
579581

580582
impl From<SpanData> for SpanPayload {
581-
fn from(mut data: SpanData) -> Self {
582-
// Apply propagated_event if present - merge its fields into the span data
583-
if let Some(propagated_event) = data.propagated_event.take() {
584-
apply_propagated_event_to_span_data(&mut data, &propagated_event);
585-
// Restore the propagated_event after applying it
586-
data.propagated_event = Some(propagated_event);
587-
}
588-
583+
fn from(data: SpanData) -> Self {
589584
let span_attributes = SpanAttributes {
590585
name: data.name,
591586
span_type: Some(data.span_type),
@@ -598,14 +593,7 @@ impl From<SpanData> for SpanPayload {
598593
|| span_attributes.span_type.is_some()
599594
|| span_attributes.purpose.is_some();
600595

601-
Self {
602-
row_id: data.row_id,
603-
span_id: data.span_id,
604-
is_merge: data.has_flushed, // First flush = false (replace), subsequent = true (merge)
605-
span_components: None,
606-
org_id: data.org_id,
607-
org_name: data.org_name,
608-
project_name: data.project_name,
596+
let mut event_data = SpanEventData {
609597
input: data.input,
610598
output: data.output,
611599
expected: data.expected,
@@ -616,76 +604,32 @@ impl From<SpanData> for SpanPayload {
616604
tags: (!data.tags.is_empty()).then_some(data.tags),
617605
context: data.context,
618606
span_attributes: has_attributes.then_some(span_attributes),
607+
extra: HashMap::new(),
608+
};
609+
610+
if let Some(propagated_event) = data.propagated_event {
611+
event_data.apply_propagated_event(&propagated_event);
619612
}
620-
}
621-
}
622613

623-
/// Apply propagated_event data to SpanData by merging it into the span's fields.
624-
/// This allows parent span metadata to flow down to child spans.
625-
fn apply_propagated_event_to_span_data(data: &mut SpanData, propagated_event: &Map<String, Value>) {
626-
for (key, value) in propagated_event {
627-
match key.as_str() {
628-
"input" if data.input.is_none() => {
629-
data.input = Some(value.clone());
630-
}
631-
"output" if data.output.is_none() => {
632-
data.output = Some(value.clone());
633-
}
634-
"expected" if data.expected.is_none() => {
635-
data.expected = Some(value.clone());
636-
}
637-
"error" if data.error.is_none() => {
638-
if let Some(s) = value.as_str() {
639-
data.error = Some(Value::String(s.to_string()));
640-
}
641-
}
642-
"scores" => {
643-
if let Some(obj) = value.as_object() {
644-
for (score_key, score_val) in obj {
645-
if let Some(score) = score_val.as_f64() {
646-
data.scores.entry(score_key.clone()).or_insert(score);
647-
}
648-
}
649-
}
650-
}
651-
"metadata" => {
652-
if let Some(obj) = value.as_object() {
653-
for (meta_key, meta_val) in obj {
654-
data.metadata
655-
.entry(meta_key.clone())
656-
.or_insert_with(|| meta_val.clone());
657-
}
658-
}
659-
}
660-
"metrics" => {
661-
if let Some(obj) = value.as_object() {
662-
for (metric_key, metric_val) in obj {
663-
if let Some(metric) = metric_val.as_f64() {
664-
data.metrics.entry(metric_key.clone()).or_insert(metric);
665-
}
666-
}
667-
}
668-
}
669-
"tags" => {
670-
if let Some(arr) = value.as_array() {
671-
for tag_val in arr {
672-
if let Some(tag) = tag_val.as_str() {
673-
if !data.tags.contains(&tag.to_string()) {
674-
data.tags.push(tag.to_string());
675-
}
676-
}
677-
}
678-
}
679-
}
680-
"context" if data.context.is_none() => {
681-
data.context = Some(value.clone());
682-
}
683-
_ => {
684-
// Unknown fields go into metadata
685-
data.metadata
686-
.entry(key.clone())
687-
.or_insert_with(|| value.clone());
688-
}
614+
Self {
615+
row_id: data.row_id,
616+
span_id: data.span_id,
617+
is_merge: data.has_flushed, // First flush = false (replace), subsequent = true (merge)
618+
span_components: None,
619+
org_id: data.org_id,
620+
org_name: data.org_name,
621+
project_name: data.project_name,
622+
input: event_data.input,
623+
output: event_data.output,
624+
expected: event_data.expected,
625+
error: event_data.error,
626+
scores: event_data.scores,
627+
metadata: event_data.metadata,
628+
metrics: event_data.metrics,
629+
tags: event_data.tags,
630+
context: event_data.context,
631+
span_attributes: event_data.span_attributes,
632+
extra: event_data.extra,
689633
}
690634
}
691635
}
@@ -874,6 +818,10 @@ mod tests {
874818
let mut parent_propagated = Map::new();
875819
parent_propagated.insert("parent_key".to_string(), json!("parent_value"));
876820
parent_propagated.insert("metadata".to_string(), json!({"inherited": "data"}));
821+
parent_propagated.insert(
822+
"span_attributes".to_string(),
823+
json!({"purpose": "scorer", "skip_realtime": true}),
824+
);
877825

878826
let parent_info = ParentSpanInfo::FullSpan {
879827
object_type: SpanObjectType::ProjectLogs,
@@ -910,9 +858,19 @@ mod tests {
910858
"Inherited metadata should be present"
911859
);
912860
assert_eq!(
913-
metadata.get("parent_key").and_then(|v| v.as_str()),
861+
captured
862+
.payload
863+
.extra
864+
.get("parent_key")
865+
.and_then(|v| v.as_str()),
914866
Some("parent_value"),
915-
"Parent key should be in metadata"
867+
"Unknown propagated fields should remain top-level extras"
868+
);
869+
let span_attributes = captured.payload.span_attributes.as_ref().unwrap();
870+
assert_eq!(span_attributes.purpose.as_deref(), Some("scorer"));
871+
assert_eq!(
872+
span_attributes.extra.get("skip_realtime"),
873+
Some(&json!(true))
916874
);
917875
}
918876

0 commit comments

Comments
 (0)