Skip to content

Commit e9a0f95

Browse files
committed
feat: ✨ improve exporters
1 parent de41cd5 commit e9a0f95

File tree

8 files changed

+964
-320
lines changed

8 files changed

+964
-320
lines changed

src/exporters/arrow/mod.rs

Lines changed: 189 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ impl ArrowConverter {
2828
}
2929

3030
/// Convert multiple SensorData to Arrow IPC file format (bytes)
31-
/// Uses a "long" format similar to CSV (timestamp, sensor_name, value, type)
31+
/// Uses a "long" format similar to CSV (timestamp, sensor_id, sensor_name, value, type, labels)
3232
/// which can accommodate multiple sensors in a single file.
3333
pub fn to_arrow_file_multi(sensor_data_list: &[SensorData]) -> Result<Vec<u8>> {
3434
if sensor_data_list.is_empty() {
@@ -39,9 +39,11 @@ impl ArrowConverter {
3939
DataType::Timestamp(TimeUnit::Microsecond, None),
4040
false,
4141
),
42+
Field::new("sensor_id", DataType::Utf8, false),
4243
Field::new("sensor_name", DataType::Utf8, false),
4344
Field::new("value", DataType::Utf8, false),
4445
Field::new("type", DataType::Utf8, false),
46+
Field::new("labels", DataType::Utf8, false),
4547
]));
4648
let batch = RecordBatch::new_empty(schema);
4749
return Self::record_batch_to_arrow_file(&batch);
@@ -52,19 +54,23 @@ impl ArrowConverter {
5254

5355
// Create builders for the long format
5456
let mut timestamp_builder = TimestampMicrosecondBuilder::with_capacity(total_samples);
57+
let mut sensor_id_builder = StringBuilder::with_capacity(total_samples, total_samples * 36); // UUID is 36 chars
5558
let mut sensor_name_builder =
5659
StringBuilder::with_capacity(total_samples, total_samples * 32);
5760
let mut value_builder = StringBuilder::with_capacity(total_samples, total_samples * 16);
5861
let mut type_builder = StringBuilder::with_capacity(total_samples, total_samples * 8);
62+
let mut labels_builder = StringBuilder::with_capacity(total_samples, total_samples * 64);
5963

6064
// Process each sensor's data
6165
for sensor_data in sensor_data_list {
6266
Self::append_samples_to_builders(
6367
sensor_data,
6468
&mut timestamp_builder,
69+
&mut sensor_id_builder,
6570
&mut sensor_name_builder,
6671
&mut value_builder,
6772
&mut type_builder,
73+
&mut labels_builder,
6874
)?;
6975
}
7076

@@ -75,16 +81,20 @@ impl ArrowConverter {
7581
DataType::Timestamp(TimeUnit::Microsecond, None),
7682
false,
7783
),
84+
Field::new("sensor_id", DataType::Utf8, false),
7885
Field::new("sensor_name", DataType::Utf8, false),
7986
Field::new("value", DataType::Utf8, false),
8087
Field::new("type", DataType::Utf8, false),
88+
Field::new("labels", DataType::Utf8, false),
8189
]));
8290

8391
let columns: Vec<ArrayRef> = vec![
8492
Arc::new(timestamp_builder.finish()),
93+
Arc::new(sensor_id_builder.finish()),
8594
Arc::new(sensor_name_builder.finish()),
8695
Arc::new(value_builder.finish()),
8796
Arc::new(type_builder.finish()),
97+
Arc::new(labels_builder.finish()),
8898
];
8999

90100
let batch = RecordBatch::try_new(schema, columns)
@@ -93,86 +103,116 @@ impl ArrowConverter {
93103
Self::record_batch_to_arrow_file(&batch)
94104
}
95105

106+
/// Helper to convert labels to JSON string
107+
fn labels_to_json_string(sensor_data: &SensorData) -> String {
108+
use serde_json::{Map, Value};
109+
let mut map = Map::new();
110+
for (key, value) in sensor_data.sensor.labels.iter() {
111+
map.insert(key.clone(), Value::String(value.clone()));
112+
}
113+
Value::Object(map).to_string()
114+
}
115+
96116
/// Internal helper to append samples from a sensor to the builders
97117
fn append_samples_to_builders(
98118
sensor_data: &SensorData,
99119
timestamp_builder: &mut TimestampMicrosecondBuilder,
120+
sensor_id_builder: &mut StringBuilder,
100121
sensor_name_builder: &mut StringBuilder,
101122
value_builder: &mut StringBuilder,
102123
type_builder: &mut StringBuilder,
124+
labels_builder: &mut StringBuilder,
103125
) -> Result<()> {
126+
let sensor_id = sensor_data.sensor.uuid.to_string();
104127
let sensor_name = &sensor_data.sensor.name;
128+
let labels_json = Self::labels_to_json_string(sensor_data);
105129

106130
match &sensor_data.samples {
107131
TypedSamples::Integer(samples) => {
108132
for sample in samples.iter() {
109133
timestamp_builder.append_value(sample.datetime.to_microseconds_since_epoch());
134+
sensor_id_builder.append_value(&sensor_id);
110135
sensor_name_builder.append_value(sensor_name);
111136
value_builder.append_value(sample.value.to_string());
112137
type_builder.append_value("integer");
138+
labels_builder.append_value(&labels_json);
113139
}
114140
}
115141
TypedSamples::Numeric(samples) => {
116142
for sample in samples.iter() {
117143
timestamp_builder.append_value(sample.datetime.to_microseconds_since_epoch());
144+
sensor_id_builder.append_value(&sensor_id);
118145
sensor_name_builder.append_value(sensor_name);
119146
value_builder.append_value(sample.value.to_string());
120147
type_builder.append_value("numeric");
148+
labels_builder.append_value(&labels_json);
121149
}
122150
}
123151
TypedSamples::Float(samples) => {
124152
for sample in samples.iter() {
125153
timestamp_builder.append_value(sample.datetime.to_microseconds_since_epoch());
154+
sensor_id_builder.append_value(&sensor_id);
126155
sensor_name_builder.append_value(sensor_name);
127156
value_builder.append_value(sample.value.to_string());
128157
type_builder.append_value("float");
158+
labels_builder.append_value(&labels_json);
129159
}
130160
}
131161
TypedSamples::String(samples) => {
132162
for sample in samples.iter() {
133163
timestamp_builder.append_value(sample.datetime.to_microseconds_since_epoch());
164+
sensor_id_builder.append_value(&sensor_id);
134165
sensor_name_builder.append_value(sensor_name);
135166
value_builder.append_value(&sample.value);
136167
type_builder.append_value("string");
168+
labels_builder.append_value(&labels_json);
137169
}
138170
}
139171
TypedSamples::Boolean(samples) => {
140172
for sample in samples.iter() {
141173
timestamp_builder.append_value(sample.datetime.to_microseconds_since_epoch());
174+
sensor_id_builder.append_value(&sensor_id);
142175
sensor_name_builder.append_value(sensor_name);
143176
value_builder.append_value(sample.value.to_string());
144177
type_builder.append_value("boolean");
178+
labels_builder.append_value(&labels_json);
145179
}
146180
}
147181
TypedSamples::Location(samples) => {
148182
for sample in samples.iter() {
149183
timestamp_builder.append_value(sample.datetime.to_microseconds_since_epoch());
184+
sensor_id_builder.append_value(&sensor_id);
150185
sensor_name_builder.append_value(sensor_name);
151186
value_builder.append_value(format!(
152187
"{},{}",
153188
sample.value.y(),
154189
sample.value.x()
155190
));
156191
type_builder.append_value("location");
192+
labels_builder.append_value(&labels_json);
157193
}
158194
}
159195
TypedSamples::Json(samples) => {
160196
for sample in samples.iter() {
161197
timestamp_builder.append_value(sample.datetime.to_microseconds_since_epoch());
198+
sensor_id_builder.append_value(&sensor_id);
162199
sensor_name_builder.append_value(sensor_name);
163200
value_builder.append_value(sample.value.to_string());
164201
type_builder.append_value("json");
202+
labels_builder.append_value(&labels_json);
165203
}
166204
}
167205
TypedSamples::Blob(samples) => {
168206
for sample in samples.iter() {
169207
timestamp_builder.append_value(sample.datetime.to_microseconds_since_epoch());
208+
sensor_id_builder.append_value(&sensor_id);
170209
sensor_name_builder.append_value(sensor_name);
171210
value_builder.append_value(base64::Engine::encode(
172211
&base64::engine::general_purpose::STANDARD,
173212
&sample.value,
174213
));
175214
type_builder.append_value("blob");
215+
labels_builder.append_value(&labels_json);
176216
}
177217
}
178218
}
@@ -431,6 +471,8 @@ pub mod test_data_helpers {
431471
#[cfg(test)]
432472
mod tests {
433473
use super::*;
474+
use arrow_ipc::reader::FileReader;
475+
use std::io::Cursor;
434476

435477
#[test]
436478
fn test_arrow_conversion_integer() {
@@ -477,4 +519,150 @@ mod tests {
477519
let schema = batch.schema();
478520
assert!(matches!(schema.field(1).data_type(), DataType::Struct(_)));
479521
}
522+
523+
#[test]
524+
fn test_arrow_multi_with_sensor_id_and_labels() {
525+
use crate::datamodel::*;
526+
use smallvec::smallvec;
527+
use uuid::Uuid;
528+
529+
// Create sensor with labels
530+
let sensor_uuid = Uuid::new_v4();
531+
let sensor = Sensor {
532+
uuid: sensor_uuid,
533+
name: "test_sensor".to_string(),
534+
sensor_type: SensorType::Float,
535+
unit: None,
536+
labels: smallvec![
537+
("env".to_string(), "production".to_string()),
538+
("region".to_string(), "us-east".to_string()),
539+
],
540+
};
541+
542+
let datetime = SensAppDateTime::now().unwrap();
543+
let samples = TypedSamples::Float(smallvec![Sample {
544+
datetime,
545+
value: 42.5
546+
}]);
547+
548+
let sensor_data = SensorData::new(sensor, samples);
549+
let sensor_data_list = vec![sensor_data];
550+
551+
let arrow_bytes = ArrowConverter::to_arrow_file_multi(&sensor_data_list).unwrap();
552+
553+
// Read it back
554+
let cursor = Cursor::new(arrow_bytes);
555+
let reader = FileReader::try_new(cursor, None).unwrap();
556+
let schema = reader.schema();
557+
558+
// Verify schema has all fields
559+
let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
560+
assert!(
561+
field_names.contains(&"sensor_id"),
562+
"Schema should have sensor_id field, got: {:?}",
563+
field_names
564+
);
565+
assert!(
566+
field_names.contains(&"sensor_name"),
567+
"Schema should have sensor_name field, got: {:?}",
568+
field_names
569+
);
570+
assert!(
571+
field_names.contains(&"labels"),
572+
"Schema should have labels field, got: {:?}",
573+
field_names
574+
);
575+
576+
// Read batches and verify sensor_id value
577+
let batches: Vec<_> = reader.into_iter().filter_map(|b| b.ok()).collect();
578+
assert_eq!(batches.len(), 1);
579+
let batch = &batches[0];
580+
581+
// Find sensor_id column
582+
let sensor_id_idx = schema
583+
.fields()
584+
.iter()
585+
.position(|f| f.name() == "sensor_id")
586+
.unwrap();
587+
let sensor_id_col = batch
588+
.column(sensor_id_idx)
589+
.as_any()
590+
.downcast_ref::<arrow::array::StringArray>()
591+
.unwrap();
592+
assert_eq!(sensor_id_col.value(0), sensor_uuid.to_string());
593+
594+
// Find labels column
595+
let labels_idx = schema
596+
.fields()
597+
.iter()
598+
.position(|f| f.name() == "labels")
599+
.unwrap();
600+
let labels_col = batch
601+
.column(labels_idx)
602+
.as_any()
603+
.downcast_ref::<arrow::array::StringArray>()
604+
.unwrap();
605+
let labels_json = labels_col.value(0);
606+
assert!(
607+
labels_json.contains("env"),
608+
"Labels should contain env, got: {}",
609+
labels_json
610+
);
611+
assert!(
612+
labels_json.contains("production"),
613+
"Labels should contain production, got: {}",
614+
labels_json
615+
);
616+
}
617+
618+
#[test]
619+
fn test_arrow_multi_without_labels() {
620+
use crate::datamodel::*;
621+
use smallvec::{SmallVec, smallvec};
622+
use uuid::Uuid;
623+
624+
// Create sensor without labels
625+
let sensor = Sensor {
626+
uuid: Uuid::new_v4(),
627+
name: "test_sensor".to_string(),
628+
sensor_type: SensorType::Float,
629+
unit: None,
630+
labels: SmallVec::new(),
631+
};
632+
633+
let datetime = SensAppDateTime::now().unwrap();
634+
let samples = TypedSamples::Float(smallvec![Sample {
635+
datetime,
636+
value: 42.5
637+
}]);
638+
639+
let sensor_data = SensorData::new(sensor, samples);
640+
let sensor_data_list = vec![sensor_data];
641+
642+
let arrow_bytes = ArrowConverter::to_arrow_file_multi(&sensor_data_list).unwrap();
643+
644+
// Read it back
645+
let cursor = Cursor::new(arrow_bytes);
646+
let reader = FileReader::try_new(cursor, None).unwrap();
647+
let schema = reader.schema();
648+
649+
// Should still have labels field even if empty
650+
let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
651+
assert!(field_names.contains(&"labels"));
652+
653+
// Read batches and verify labels is empty object
654+
let batches: Vec<_> = reader.into_iter().filter_map(|b| b.ok()).collect();
655+
let batch = &batches[0];
656+
let labels_idx = schema
657+
.fields()
658+
.iter()
659+
.position(|f| f.name() == "labels")
660+
.unwrap();
661+
let labels_col = batch
662+
.column(labels_idx)
663+
.as_any()
664+
.downcast_ref::<arrow::array::StringArray>()
665+
.unwrap();
666+
assert_eq!(labels_col.value(0), "{}");
667+
}
480668
}

0 commit comments

Comments
 (0)