Skip to content

Commit a4e6057

Browse files
authored
Merge branch 'main' into cijothomas/tuneguidance
2 parents 79b0e7a + 0d0af9a commit a4e6057

7 files changed

Lines changed: 3947 additions & 803 deletions

File tree

rust/experimental/query_engine/engine-recordset-otlp-bridge/src/bridge.rs

Lines changed: 193 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,50 @@ const OTLP_BUFFER_INITIAL_CAPACITY: usize = 1024 * 64;
1515
static EXPRESSIONS: LazyLock<RwLock<Vec<BridgePipeline>>> =
1616
LazyLock::new(|| RwLock::new(Vec::new()));
1717

18+
static LOG_RECORD_SCHEMA: LazyLock<ParserMapSchema> = LazyLock::new(|| {
19+
// Canonical schema definition comes from LogRecord proto definition
20+
// https://github.com/open-telemetry/otel-arrow/blob/main/rust/otap-dataflow/crates/pdata/src/views/otlp/proto/logs.rs
21+
ParserMapSchema::new()
22+
.set_default_map_key("attributes")
23+
.with_key_definition("time_unix_nano", ParserMapKeySchema::DateTime)
24+
.with_key_definition("observed_time_unix_nano", ParserMapKeySchema::DateTime)
25+
.with_key_definition("severity_number", ParserMapKeySchema::Integer)
26+
.with_key_definition("severity_text", ParserMapKeySchema::String)
27+
.with_key_definition("body", ParserMapKeySchema::Any)
28+
.with_key_definition("trace_id", ParserMapKeySchema::Array)
29+
.with_key_definition("span_id", ParserMapKeySchema::Array)
30+
.with_key_definition("flags", ParserMapKeySchema::Integer)
31+
.with_key_definition("event_name", ParserMapKeySchema::String)
32+
.with_key_aliases([
33+
// Support aliases to the Log and Event definition naming
34+
// https://opentelemetry.io/docs/specs/otel/logs/data-model/
35+
("Attributes", "attributes"),
36+
("Timestamp", "time_unix_nano"),
37+
("ObservedTimestamp", "observed_time_unix_nano"),
38+
("SeverityNumber", "severity_number"),
39+
("SeverityText", "severity_text"),
40+
("Body", "body"),
41+
("TraceId", "trace_id"),
42+
("SpanId", "span_id"),
43+
("TraceFlags", "flags"),
44+
("EventName", "event_name"),
45+
// Support aliases from OTLP JSON encoding
46+
// https://opentelemetry.io/docs/specs/otlp/#json-protobuf-encoding
47+
("timeUnixNano", "time_unix_nano"),
48+
("observedTimeUnixNano", "observed_time_unix_nano"),
49+
("severityNumber", "severity_number"),
50+
("severityText", "severity_text"),
51+
("traceId", "trace_id"),
52+
("spanId", "span_id"),
53+
("eventName", "event_name"),
54+
])
55+
});
56+
57+
/// Get the static schema for LogRecord fields and their aliases
58+
pub fn get_log_record_schema() -> &'static ParserMapSchema {
59+
&LOG_RECORD_SCHEMA
60+
}
61+
1862
#[derive(Debug)]
1963
pub struct BridgePipeline {
2064
attributes_schema: Option<ParserMapSchema>,
@@ -359,17 +403,7 @@ fn build_parser_options(options: Option<BridgeOptions>) -> Result<ParserOptions,
359403
fn build_log_record_schema(
360404
attributes_schema: Option<ParserMapSchema>,
361405
) -> Result<(ParserMapSchema, Option<ParserMapSchema>), ParserError> {
362-
let mut log_record_schema = ParserMapSchema::new()
363-
.set_default_map_key("Attributes")
364-
.with_key_definition("Timestamp", ParserMapKeySchema::DateTime)
365-
.with_key_definition("ObservedTimestamp", ParserMapKeySchema::DateTime)
366-
.with_key_definition("SeverityNumber", ParserMapKeySchema::Integer)
367-
.with_key_definition("SeverityText", ParserMapKeySchema::String)
368-
.with_key_definition("Body", ParserMapKeySchema::Any)
369-
.with_key_definition("TraceId", ParserMapKeySchema::Array)
370-
.with_key_definition("SpanId", ParserMapKeySchema::Array)
371-
.with_key_definition("TraceFlags", ParserMapKeySchema::Integer)
372-
.with_key_definition("EventName", ParserMapKeySchema::String);
406+
let mut log_record_schema = LOG_RECORD_SCHEMA.clone();
373407

374408
if let Some(mut attributes_schema) = attributes_schema {
375409
let schema = attributes_schema.get_schema_mut();
@@ -385,20 +419,24 @@ fn build_log_record_schema(
385419
// present in Attributes users might query with ambiguous naming.
386420
// For example: source | extend Body = 'something' will write to the
387421
// top-level field and not Attributes.
388-
if let Some(removed) = schema.remove(top_level_key)
389-
&& &removed != top_level_key_schema
390-
{
391-
return Err(ParserError::SchemaError(format!(
392-
"'{top_level_key}' key cannot be declared as '{}' type",
393-
&removed
394-
)));
422+
423+
// Check both the canonical key and all its aliases
424+
for key_name in log_record_schema.get_all_key_names_for_canonical_key(top_level_key) {
425+
if let Some(removed) = schema.remove(key_name.as_ref())
426+
&& &removed != top_level_key_schema
427+
{
428+
return Err(ParserError::SchemaError(format!(
429+
"'{key_name}' key cannot be declared as '{}' type",
430+
&removed
431+
)));
432+
}
395433
}
396434
}
397435

398436
let allow_undefined_keys = attributes_schema.get_allow_undefined_keys();
399437

400438
log_record_schema = log_record_schema.with_key_definition(
401-
"Attributes",
439+
"attributes",
402440
ParserMapKeySchema::Map(Some(attributes_schema)),
403441
);
404442

@@ -409,7 +447,7 @@ fn build_log_record_schema(
409447
}
410448

411449
for (top_level_key, top_level_key_schema) in log_record_schema.get_schema() {
412-
if top_level_key.as_ref() == "Attributes" {
450+
if top_level_key.as_ref() == "attributes" {
413451
if let ParserMapKeySchema::Map(Some(attributes_schema)) = top_level_key_schema {
414452
for (top_level_key, top_level_key_schema) in attributes_schema.get_schema() {
415453
summary_schema = summary_schema
@@ -672,7 +710,7 @@ mod tests {
672710
Some(
673711
BridgeOptions::new().with_attributes_schema(
674712
ParserMapSchema::new()
675-
.with_key_definition("Body", ParserMapKeySchema::Any)
713+
.with_key_definition("body", ParserMapKeySchema::Any)
676714
.with_key_definition("int_value", ParserMapKeySchema::Integer),
677715
),
678716
),
@@ -688,7 +726,7 @@ mod tests {
688726
Some(
689727
BridgeOptions::new().with_attributes_schema(
690728
ParserMapSchema::new()
691-
.with_key_definition("Body", ParserMapKeySchema::Any)
729+
.with_key_definition("body", ParserMapKeySchema::Any)
692730
.with_key_definition("int_value", ParserMapKeySchema::Integer),
693731
),
694732
),
@@ -722,9 +760,9 @@ mod tests {
722760
"source | summarize by int_value | extend int_value = 1 | summarize int_value = count() | extend Custom = 1234",
723761
);
724762

725-
run_test_success("source | extend Body = 'hello world'");
726-
// Note: Body gets removed from Attributes schema because it is defined at the root
727-
run_test_failure("source | extend Attributes.Body = 'hello world'");
763+
run_test_success("source | extend body = 'hello world'");
764+
// Note: body gets removed from attributes schema because it is defined at the root
765+
run_test_failure("source | extend attributes.body = 'hello world'");
728766
}
729767

730768
#[test]
@@ -771,7 +809,7 @@ mod tests {
771809
Some(
772810
BridgeOptions::new().with_attributes_schema(
773811
ParserMapSchema::new()
774-
.with_key_definition("Body", ParserMapKeySchema::Any)
812+
.with_key_definition("body", ParserMapKeySchema::Any)
775813
.with_key_definition("int_value", ParserMapKeySchema::Integer)
776814
.set_allow_undefined_keys(),
777815
),
@@ -792,12 +830,140 @@ mod tests {
792830
let e = parse_kql_query_into_pipeline(
793831
"",
794832
Some(BridgeOptions::new().with_attributes_schema(
795-
ParserMapSchema::new().with_key_definition("Body", ParserMapKeySchema::Map(None)),
833+
ParserMapSchema::new().with_key_definition("body", ParserMapKeySchema::Map(None)),
796834
)),
797835
)
798836
.unwrap_err();
799837

800838
assert_eq!(1, e.len());
801839
assert!(matches!(e[0], ParserError::SchemaError(_)));
802840
}
841+
842+
#[test]
843+
fn test_parse_kql_query_with_field_aliases() {
844+
let run_test = |query: &str| {
845+
parse_kql_query_into_pipeline(query, None).unwrap();
846+
};
847+
848+
// Test canonical names
849+
run_test("source | where severity_text == 'Info'");
850+
run_test("source | extend x = severity_number");
851+
run_test("source | project time_unix_nano, body");
852+
853+
// Test aliases
854+
run_test("source | where SeverityText == 'Info'");
855+
run_test("source | extend x = SeverityNumber");
856+
run_test("source | project Timestamp, Body");
857+
run_test("source | where severityText == 'Info'");
858+
run_test("source | extend x = severityNumber");
859+
run_test("source | project timeUnixNano, body");
860+
861+
// Test mixing aliases and canonical names
862+
run_test("source | where SeverityText == 'Info' and severity_number > 0");
863+
run_test("source | extend ts = Timestamp, sev = severityText");
864+
865+
// Test default map key alias
866+
run_test("source | where attributes.custom_field == 'value'");
867+
run_test("source | where Attributes.custom_field == 'value'");
868+
}
869+
870+
#[test]
871+
fn test_attributes_schema_removes_top_level_aliases() {
872+
// Test that when user provides an attributes schema with aliases
873+
// of top-level fields, they get removed properly
874+
let result = parse_kql_query_into_pipeline(
875+
"source | extend SeverityText = 'Debug'",
876+
Some(
877+
BridgeOptions::new().with_attributes_schema(
878+
ParserMapSchema::new()
879+
// This should be removed because SeverityText is an alias for severity_text
880+
.with_key_definition("SeverityText", ParserMapKeySchema::String)
881+
.with_key_definition("custom_field", ParserMapKeySchema::Integer),
882+
),
883+
),
884+
);
885+
886+
// Should succeed - SeverityText should write to top-level field
887+
assert!(result.is_ok());
888+
889+
// Test with canonical name (snake_case) too
890+
let result = parse_kql_query_into_pipeline(
891+
"source | extend severity_text = 'Debug'",
892+
Some(
893+
BridgeOptions::new().with_attributes_schema(
894+
ParserMapSchema::new()
895+
.with_key_definition("severity_text", ParserMapKeySchema::String)
896+
.with_key_definition("custom_field", ParserMapKeySchema::Integer),
897+
),
898+
),
899+
);
900+
901+
assert!(result.is_ok());
902+
}
903+
904+
#[test]
905+
fn test_field_aliases_in_log_record_processing() {
906+
let create_request = || {
907+
ExportLogsServiceRequest::new().with_resource_logs(ResourceLogs::new().with_scope_logs(
908+
ScopeLogs::new().with_log_records(vec![
909+
LogRecord::new()
910+
.with_severity_text("Info".into())
911+
.with_severity_number(9),
912+
LogRecord::new()
913+
.with_severity_text("Warning".into())
914+
.with_severity_number(13),
915+
]),
916+
))
917+
};
918+
919+
let pipeline_canonical = parse_kql_query_into_pipeline(
920+
"source | where severity_text == 'Info' and severity_number == 9",
921+
None,
922+
)
923+
.unwrap();
924+
let (result_canonical, _) = process_export_logs_service_request_using_pipeline(
925+
&pipeline_canonical,
926+
RecordSetEngineDiagnosticLevel::Error,
927+
create_request(),
928+
)
929+
.unwrap();
930+
931+
let pipeline_with_aliases = parse_kql_query_into_pipeline(
932+
"source | where SeverityText == 'Info' and SeverityNumber == 9",
933+
None,
934+
)
935+
.unwrap();
936+
let (result_with_aliases, _) = process_export_logs_service_request_using_pipeline(
937+
&pipeline_with_aliases,
938+
RecordSetEngineDiagnosticLevel::Error,
939+
create_request(),
940+
)
941+
.unwrap();
942+
943+
let canonical_logs = &result_canonical.unwrap().resource_logs[0].scope_logs[0].log_records;
944+
let aliased_logs = &result_with_aliases.unwrap().resource_logs[0].scope_logs[0].log_records;
945+
946+
assert_eq!(canonical_logs.len(), 1);
947+
assert_eq!(aliased_logs.len(), 1);
948+
assert_eq!(
949+
canonical_logs[0]
950+
.severity_text
951+
.as_ref()
952+
.unwrap()
953+
.get_value(),
954+
aliased_logs[0].severity_text.as_ref().unwrap().get_value()
955+
);
956+
assert_eq!(
957+
canonical_logs[0]
958+
.severity_number
959+
.as_ref()
960+
.unwrap()
961+
.get_value(),
962+
aliased_logs[0]
963+
.severity_number
964+
.as_ref()
965+
.unwrap()
966+
.get_value()
967+
);
968+
}
803969
}

0 commit comments

Comments
 (0)