Skip to content

Commit 75920dc

Browse files
committed
feat: 🌈 WIP
1 parent d31818a commit 75920dc

File tree

17 files changed

+1087
-264
lines changed

17 files changed

+1087
-264
lines changed

‎CLAUDE.md‎

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ SensApp is a **sensor data platform** built with Rust that scales from edge depl
3030

3131
- **HTTP REST API** with Axum web framework
3232
- **MQTT client** for IoT device integration
33-
- **Multi-format support**: JSON, CSV, SenML, InfluxDB line protocol, Prometheus remote write
33+
- **Multi-format support**: JSON, CSV, SenML, InfluxDB line protocol, Prometheus remote write and Remote Read
34+
- **Compatibility gateways**: InfluxDB and Prometheus.
3435

3536
#### Storage Abstraction (`src/storage/`)
3637

@@ -78,8 +79,11 @@ SensApp is a **sensor data platform** built with Rust that scales from edge depl
7879
- DATABASE_URL="postgres://postgres:postgres@localhost:5432/sensapp" cargo sqlx prepare
7980
- do focus on postgresql, AND NOT OTHER STORAGE BACKENDS FOR NOW.
8081
- You are an excellent and experienced software engineer.
81-
- When you filter the tests, you may get a success code even when all the tests where filtered out, confusing you as the test you wanted to run passed. But a line such as "test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 78 filtered out; finished in 0.00s" means that you failed your filter, not that the test passed.
8282
- code used for unit tests and integration tests should be marked with #[cfg(any(test, feature = "test-utils"))]
83-
- unit tests and integrations tests are very helpful and appreciated. Consider doing them even when not actively requested.
84-
- Backward compatibility is not a concern. The project is new.
8583
- I truly hate #[allow(dead_code)], so avoid it as much as possible. If the code is unused, delete it. If the code is used only conditionally, mark the conditions correctly. For example, in integration tests it should be marked with #[cfg(feature = "test-utils")]
84+
- unit tests and integrations tests are very helpful and appreciated. Consider doing them even when not actively requested.
85+
- Rust best practices should be followed.
86+
- KISS: Keep It Simple Stupid. Avoid over-engineering.
87+
- Good Enough is the target, not perfection.
88+
- Professionalism and pragmatism are expected. This is a professional project, not a hobby project.
89+
- Backward compatibility is not a concern. The project is not deployed in production yet.

‎Cargo.lock‎

Lines changed: 42 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎Cargo.toml‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ byte-unit = "5.1"
8080
#prost = "0.12"
8181
prost = "0.14"
8282
snap = "1.1"
83+
rusty-chunkenc = "0.1"
8384
hex = "0.4"
8485
blake3 = "1.8"
8586
regex = "1.11"

‎src/importers/arrow.rs‎

Lines changed: 59 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::{
44
},
55
storage::StorageInstance,
66
};
7-
use anyhow::{Result, anyhow};
7+
use anyhow::Result;
88
use arrow::array::{
99
Array, ArrayRef, BinaryArray, BooleanArray, Decimal128Array, Float64Array, Int64Array,
1010
StringArray, StructArray, TimestampMicrosecondArray,
@@ -17,6 +17,30 @@ use geo::Point;
1717
use rust_decimal::Decimal;
1818
use smallvec::SmallVec;
1919
use std::{collections::HashMap, io::Cursor, sync::Arc};
20+
use thiserror::Error;
21+
22+
/// Arrow import error types
23+
#[derive(Error, Debug)]
24+
pub enum ArrowError {
25+
#[error("Invalid Arrow format: {0}")]
26+
InvalidFormat(String),
27+
28+
#[error("Arrow parsing error: {0}")]
29+
ParseError(String),
30+
31+
#[error("IO error: {0}")]
32+
IoError(#[from] std::io::Error),
33+
34+
#[error("Storage error: {0}")]
35+
StorageError(#[from] anyhow::Error),
36+
}
37+
38+
impl ArrowError {
39+
/// Check if this is a client error (bad request)
40+
pub fn is_client_error(&self) -> bool {
41+
matches!(self, ArrowError::InvalidFormat(_) | ArrowError::ParseError(_))
42+
}
43+
}
2044

2145
/// Type alias for complex sensor data map
2246
type SensorDataMap = HashMap<String, (Arc<Sensor>, Vec<(SensAppDateTime, TypedSamples)>)>;
@@ -26,60 +50,60 @@ use uuid::Uuid;
2650
pub async fn publish_arrow_async<R: AsyncRead + Unpin + Send>(
2751
mut arrow_reader: R,
2852
storage: Arc<dyn StorageInstance>,
29-
) -> Result<()> {
53+
) -> Result<(), ArrowError> {
3054
// Read all data into a buffer first
3155
let mut buffer = Vec::new();
32-
arrow_reader.read_to_end(&mut buffer).await?;
56+
arrow_reader.read_to_end(&mut buffer).await.map_err(ArrowError::IoError)?;
3357

3458
// Parse the Arrow data
3559
let record_batches = parse_arrow_file(&buffer)?;
3660

3761
// Convert Arrow data to SensApp format and publish
38-
let mut batch_builder = BatchBuilder::new()?;
62+
let mut batch_builder = BatchBuilder::new().map_err(ArrowError::StorageError)?;
3963

4064
for record_batch in record_batches {
4165
let sensor_data_map = convert_record_batch_to_sensors(&record_batch)?;
4266

4367
for (_sensor_key, (sensor, sample_entries)) in sensor_data_map {
4468
for (_datetime, typed_samples) in sample_entries {
45-
batch_builder.add(sensor.clone(), typed_samples).await?;
69+
batch_builder.add(sensor.clone(), typed_samples).await.map_err(ArrowError::StorageError)?;
4670
}
4771
}
4872
}
4973

50-
batch_builder.send_what_is_left(storage).await?;
74+
batch_builder.send_what_is_left(storage).await.map_err(ArrowError::StorageError)?;
5175
Ok(())
5276
}
5377

5478
/// Parse Arrow IPC file format from bytes
55-
fn parse_arrow_file(buffer: &[u8]) -> Result<Vec<RecordBatch>> {
79+
fn parse_arrow_file(buffer: &[u8]) -> Result<Vec<RecordBatch>, ArrowError> {
5680
let cursor = Cursor::new(buffer);
5781
let reader = FileReader::try_new(cursor, None)
58-
.map_err(|e| anyhow!("Failed to create Arrow file reader: {}", e))?;
82+
.map_err(|e| ArrowError::InvalidFormat(format!("Not a valid Arrow file: {}", e)))?;
5983

6084
let mut batches = Vec::new();
6185
for batch_result in reader {
62-
let batch =
63-
batch_result.map_err(|e| anyhow!("Failed to read Arrow batch from file: {}", e))?;
86+
let batch = batch_result
87+
.map_err(|e| ArrowError::ParseError(format!("Failed to read batch: {}", e)))?;
6488
batches.push(batch);
6589
}
6690

6791
if batches.is_empty() {
68-
return Err(anyhow!("Arrow file contains no data batches"));
92+
return Err(ArrowError::InvalidFormat("Arrow file contains no data batches".to_string()));
6993
}
7094

7195
Ok(batches)
7296
}
7397

7498
/// Convert Arrow RecordBatch to SensApp sensors and samples
75-
fn convert_record_batch_to_sensors(batch: &RecordBatch) -> Result<SensorDataMap> {
99+
fn convert_record_batch_to_sensors(batch: &RecordBatch) -> Result<SensorDataMap, ArrowError> {
76100
let schema = batch.schema();
77101

78102
// Find required columns
79103
let timestamp_idx = find_column_index(&schema, "timestamp")
80-
.ok_or_else(|| anyhow!("Arrow data must contain 'timestamp' column"))?;
104+
.ok_or_else(|| ArrowError::InvalidFormat("Arrow data must contain 'timestamp' column".to_string()))?;
81105
let value_idx = find_column_index(&schema, "value")
82-
.ok_or_else(|| anyhow!("Arrow data must contain 'value' column"))?;
106+
.ok_or_else(|| ArrowError::InvalidFormat("Arrow data must contain 'value' column".to_string()))?;
83107

84108
// Optional metadata columns
85109
let sensor_id_idx = find_column_index(&schema, "sensor_id");
@@ -94,20 +118,20 @@ fn convert_record_batch_to_sensors(batch: &RecordBatch) -> Result<SensorDataMap>
94118
DataType::Timestamp(TimeUnit::Microsecond, _) => timestamp_array
95119
.as_any()
96120
.downcast_ref::<TimestampMicrosecondArray>()
97-
.ok_or_else(|| anyhow!("Failed to downcast timestamp array"))?,
121+
.ok_or_else(|| ArrowError::ParseError("Failed to downcast timestamp array".to_string()))?,
98122
_ => {
99-
return Err(anyhow!(
100-
"Timestamp column must be Timestamp(Microsecond, _) type"
123+
return Err(ArrowError::InvalidFormat(
124+
"Timestamp column must be Timestamp(Microsecond, _) type".to_string()
101125
));
102126
}
103127
};
104128

105129
// Convert timestamps
106-
let timestamps: Result<Vec<SensAppDateTime>> = (0..timestamp_data.len())
130+
let timestamps: Result<Vec<SensAppDateTime>, ArrowError> = (0..timestamp_data.len())
107131
.map(|i| {
108132
let ts_micros = timestamp_data.value(i);
109133
microseconds_to_sensapp_datetime(ts_micros)
110-
.ok_or_else(|| anyhow!("Invalid timestamp value: {}", ts_micros))
134+
.ok_or_else(|| ArrowError::ParseError(format!("Invalid timestamp value: {}", ts_micros)))
111135
})
112136
.collect();
113137
let timestamps = timestamps?;
@@ -143,13 +167,13 @@ fn convert_record_batch_to_sensors(batch: &RecordBatch) -> Result<SensorDataMap>
143167
fn convert_arrow_array_to_typed_samples(
144168
array: &ArrayRef,
145169
timestamps: &[SensAppDateTime],
146-
) -> Result<(SensorType, TypedSamples)> {
170+
) -> Result<(SensorType, TypedSamples), ArrowError> {
147171
match array.data_type() {
148172
DataType::Int64 => {
149173
let int_array = array
150174
.as_any()
151175
.downcast_ref::<Int64Array>()
152-
.ok_or_else(|| anyhow!("Failed to downcast Int64 array"))?;
176+
.ok_or_else(|| ArrowError::ParseError("Failed to downcast Int64 array".to_string()))?;
153177

154178
let samples: SmallVec<[Sample<i64>; 4]> = (0..int_array.len())
155179
.map(|i| Sample {
@@ -165,7 +189,7 @@ fn convert_arrow_array_to_typed_samples(
165189
let float_array = array
166190
.as_any()
167191
.downcast_ref::<Float64Array>()
168-
.ok_or_else(|| anyhow!("Failed to downcast Float64 array"))?;
192+
.ok_or_else(|| ArrowError::ParseError("Failed to downcast Float64 array".to_string()))?;
169193

170194
let samples: SmallVec<[Sample<f64>; 4]> = (0..float_array.len())
171195
.map(|i| Sample {
@@ -181,7 +205,7 @@ fn convert_arrow_array_to_typed_samples(
181205
let decimal_array = array
182206
.as_any()
183207
.downcast_ref::<Decimal128Array>()
184-
.ok_or_else(|| anyhow!("Failed to downcast Decimal128 array"))?;
208+
.ok_or_else(|| ArrowError::ParseError("Failed to downcast Decimal128 array".to_string()))?;
185209

186210
let samples: Result<SmallVec<[Sample<Decimal>; 4]>> = (0..decimal_array.len())
187211
.map(|i| {
@@ -202,7 +226,7 @@ fn convert_arrow_array_to_typed_samples(
202226
let string_array = array
203227
.as_any()
204228
.downcast_ref::<StringArray>()
205-
.ok_or_else(|| anyhow!("Failed to downcast String array"))?;
229+
.ok_or_else(|| ArrowError::ParseError("Failed to downcast String array".to_string()))?;
206230

207231
let samples: SmallVec<[Sample<String>; 4]> = (0..string_array.len())
208232
.map(|i| Sample {
@@ -218,7 +242,7 @@ fn convert_arrow_array_to_typed_samples(
218242
let bool_array = array
219243
.as_any()
220244
.downcast_ref::<BooleanArray>()
221-
.ok_or_else(|| anyhow!("Failed to downcast Boolean array"))?;
245+
.ok_or_else(|| ArrowError::ParseError("Failed to downcast Boolean array".to_string()))?;
222246

223247
let samples: SmallVec<[Sample<bool>; 4]> = (0..bool_array.len())
224248
.map(|i| Sample {
@@ -239,19 +263,19 @@ fn convert_arrow_array_to_typed_samples(
239263
let struct_array = array
240264
.as_any()
241265
.downcast_ref::<StructArray>()
242-
.ok_or_else(|| anyhow!("Failed to downcast Struct array"))?;
266+
.ok_or_else(|| ArrowError::ParseError("Failed to downcast Struct array".to_string()))?;
243267

244268
let lat_array = struct_array
245269
.column(0)
246270
.as_any()
247271
.downcast_ref::<Float64Array>()
248-
.ok_or_else(|| anyhow!("Latitude must be Float64"))?;
272+
.ok_or_else(|| ArrowError::InvalidFormat("Latitude must be Float64".to_string()))?;
249273

250274
let lon_array = struct_array
251275
.column(1)
252276
.as_any()
253277
.downcast_ref::<Float64Array>()
254-
.ok_or_else(|| anyhow!("Longitude must be Float64"))?;
278+
.ok_or_else(|| ArrowError::InvalidFormat("Longitude must be Float64".to_string()))?;
255279

256280
let samples: SmallVec<[Sample<Point>; 4]> = (0..struct_array.len())
257281
.map(|i| {
@@ -266,15 +290,15 @@ fn convert_arrow_array_to_typed_samples(
266290

267291
Ok((SensorType::Location, TypedSamples::Location(samples)))
268292
} else {
269-
Err(anyhow!("Unsupported struct format for location data"))
293+
Err(ArrowError::InvalidFormat("Unsupported struct format for location data".to_string()))
270294
}
271295
}
272296

273297
DataType::Binary => {
274298
let binary_array = array
275299
.as_any()
276300
.downcast_ref::<BinaryArray>()
277-
.ok_or_else(|| anyhow!("Failed to downcast Binary array"))?;
301+
.ok_or_else(|| ArrowError::ParseError("Failed to downcast Binary array".to_string()))?;
278302

279303
let samples: SmallVec<[Sample<Vec<u8>>; 4]> = (0..binary_array.len())
280304
.map(|i| Sample {
@@ -286,10 +310,10 @@ fn convert_arrow_array_to_typed_samples(
286310
Ok((SensorType::Blob, TypedSamples::Blob(samples)))
287311
}
288312

289-
_ => Err(anyhow!(
313+
_ => Err(ArrowError::InvalidFormat(format!(
290314
"Unsupported Arrow data type: {:?}",
291315
array.data_type()
292-
)),
316+
))),
293317
}
294318
}
295319

@@ -301,18 +325,18 @@ fn find_column_index(schema: &arrow::datatypes::Schema, column_name: &str) -> Op
301325
.position(|field| field.name() == column_name)
302326
}
303327

304-
fn extract_sensor_id(batch: &RecordBatch, sensor_id_idx: Option<usize>) -> Result<Uuid> {
328+
fn extract_sensor_id(batch: &RecordBatch, sensor_id_idx: Option<usize>) -> Result<Uuid, ArrowError> {
305329
if let Some(idx) = sensor_id_idx {
306330
let array = batch
307331
.column(idx)
308332
.as_any()
309333
.downcast_ref::<StringArray>()
310-
.ok_or_else(|| anyhow!("sensor_id column must be string type"))?;
334+
.ok_or_else(|| ArrowError::InvalidFormat("sensor_id column must be string type".to_string()))?;
311335

312336
if array.len() > 0 {
313337
let uuid_str = array.value(0);
314338
return Uuid::parse_str(uuid_str)
315-
.map_err(|e| anyhow!("Invalid UUID in sensor_id: {}", e));
339+
.map_err(|e| ArrowError::ParseError(format!("Invalid UUID in sensor_id: {}", e)));
316340
}
317341
}
318342

0 commit comments

Comments
 (0)