Skip to content

Commit f7d366e

Browse files
authored
refactor: Event per log, streamline data handling (#1209)
--------- Signed-off-by: Devdutt Shenoi <[email protected]>
1 parent 1b4ea73 commit f7d366e

File tree

7 files changed

+244
-279
lines changed

7 files changed

+244
-279
lines changed

src/connectors/kafka/processor.rs

+16-34
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,9 @@
1616
*
1717
*/
1818

19-
use std::{collections::HashMap, sync::Arc};
19+
use std::sync::Arc;
2020

2121
use async_trait::async_trait;
22-
use chrono::Utc;
2322
use futures_util::StreamExt;
2423
use rdkafka::consumer::{CommitMode, Consumer};
2524
use serde_json::Value;
@@ -58,38 +57,10 @@ impl ParseableSinkProcessor {
5857
let stream = PARSEABLE.get_stream(stream_name)?;
5958
let schema = stream.get_schema_raw();
6059
let time_partition = stream.get_time_partition();
60+
let custom_partition = stream.get_custom_partition();
6161
let static_schema_flag = stream.get_static_schema_flag();
6262
let schema_version = stream.get_schema_version();
6363

64-
let (json_vec, total_payload_size) = Self::json_vec(records);
65-
let batch_json_event = json::Event {
66-
data: Value::Array(json_vec),
67-
};
68-
69-
let (rb, is_first) = batch_json_event.into_recordbatch(
70-
&schema,
71-
Utc::now(),
72-
static_schema_flag,
73-
time_partition.as_ref(),
74-
schema_version,
75-
)?;
76-
77-
let p_event = ParseableEvent {
78-
rb,
79-
stream_name: stream_name.to_string(),
80-
origin_format: "json",
81-
origin_size: total_payload_size,
82-
is_first_event: is_first,
83-
parsed_timestamp: Utc::now().naive_utc(),
84-
time_partition: None,
85-
custom_partition_values: HashMap::new(),
86-
stream_type: StreamType::UserDefined,
87-
};
88-
89-
Ok(p_event)
90-
}
91-
92-
fn json_vec(records: &[ConsumerRecord]) -> (Vec<Value>, u64) {
9364
let mut json_vec = Vec::with_capacity(records.len());
9465
let mut total_payload_size = 0u64;
9566

@@ -100,19 +71,30 @@ impl ParseableSinkProcessor {
10071
}
10172
}
10273

103-
(json_vec, total_payload_size)
74+
let p_event = json::Event::new(Value::Array(json_vec)).into_event(
75+
stream_name.to_string(),
76+
total_payload_size,
77+
&schema,
78+
static_schema_flag,
79+
custom_partition.as_ref(),
80+
time_partition.as_ref(),
81+
schema_version,
82+
StreamType::UserDefined,
83+
)?;
84+
85+
Ok(p_event)
10486
}
10587
}
10688

10789
#[async_trait]
10890
impl Processor<Vec<ConsumerRecord>, ()> for ParseableSinkProcessor {
10991
async fn process(&self, records: Vec<ConsumerRecord>) -> anyhow::Result<()> {
11092
let len = records.len();
111-
debug!("Processing {} records", len);
93+
debug!("Processing {len} records");
11294

11395
self.build_event_from_chunk(&records).await?.process()?;
11496

115-
debug!("Processed {} records", len);
97+
debug!("Processed {len} records");
11698
Ok(())
11799
}
118100
}

src/event/format/json.rs

+134-3
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,38 @@ use anyhow::anyhow;
2323
use arrow_array::RecordBatch;
2424
use arrow_json::reader::{infer_json_schema_from_iterator, ReaderBuilder};
2525
use arrow_schema::{DataType, Field, Fields, Schema};
26+
use chrono::{DateTime, NaiveDateTime, Utc};
2627
use datafusion::arrow::util::bit_util::round_upto_multiple_of_64;
2728
use itertools::Itertools;
2829
use serde_json::Value;
2930
use std::{collections::HashMap, sync::Arc};
3031
use tracing::error;
3132

3233
use super::EventFormat;
33-
use crate::{metadata::SchemaVersion, utils::arrow::get_field};
34+
use crate::{metadata::SchemaVersion, storage::StreamType, utils::arrow::get_field};
3435

3536
pub struct Event {
36-
pub data: Value,
37+
pub json: Value,
38+
pub p_timestamp: DateTime<Utc>,
39+
}
40+
41+
impl Event {
42+
pub fn new(json: Value) -> Self {
43+
Self {
44+
json,
45+
p_timestamp: Utc::now(),
46+
}
47+
}
3748
}
3849

3950
impl EventFormat for Event {
4051
type Data = Vec<Value>;
4152

53+
/// Returns the time at ingestion, i.e. the `p_timestamp` value
54+
fn get_p_timestamp(&self) -> DateTime<Utc> {
55+
self.p_timestamp
56+
}
57+
4258
// convert the incoming json to a vector of json values
4359
// also extract the arrow schema, tags and metadata from the incoming json
4460
fn to_data(
@@ -52,7 +68,7 @@ impl EventFormat for Event {
5268
// incoming event may be a single json or a json array
5369
// but Data (type defined above) is a vector of json values
5470
// hence we need to convert the incoming event to a vector of json values
55-
let value_arr = match self.data {
71+
let value_arr = match self.json {
5672
Value::Array(arr) => arr,
5773
value @ Value::Object(_) => vec![value],
5874
_ => unreachable!("flatten would have failed beforehand"),
@@ -120,6 +136,87 @@ impl EventFormat for Event {
120136
Ok(None) => unreachable!("all records are added to one rb"),
121137
}
122138
}
139+
140+
/// Converts a JSON event into a Parseable Event
141+
fn into_event(
142+
self,
143+
stream_name: String,
144+
origin_size: u64,
145+
storage_schema: &HashMap<String, Arc<Field>>,
146+
static_schema_flag: bool,
147+
custom_partitions: Option<&String>,
148+
time_partition: Option<&String>,
149+
schema_version: SchemaVersion,
150+
stream_type: StreamType,
151+
) -> Result<super::Event, anyhow::Error> {
152+
let custom_partition_values = match custom_partitions.as_ref() {
153+
Some(custom_partition) => {
154+
let custom_partitions = custom_partition.split(',').collect_vec();
155+
extract_custom_partition_values(&self.json, &custom_partitions)
156+
}
157+
None => HashMap::new(),
158+
};
159+
160+
let parsed_timestamp = match time_partition {
161+
Some(time_partition) => extract_and_parse_time(&self.json, time_partition)?,
162+
_ => self.p_timestamp.naive_utc(),
163+
};
164+
165+
let (rb, is_first_event) = self.into_recordbatch(
166+
storage_schema,
167+
static_schema_flag,
168+
time_partition,
169+
schema_version,
170+
)?;
171+
172+
Ok(super::Event {
173+
rb,
174+
stream_name,
175+
origin_format: "json",
176+
origin_size,
177+
is_first_event,
178+
parsed_timestamp,
179+
time_partition: None,
180+
custom_partition_values,
181+
stream_type,
182+
})
183+
}
184+
}
185+
186+
/// Extracts custom partition values from provided JSON object
187+
/// e.g. `json: {"status": 400, "msg": "Hello, World!"}, custom_partition_list: ["status"]` returns `{"status" => 400}`
188+
pub fn extract_custom_partition_values(
189+
json: &Value,
190+
custom_partition_list: &[&str],
191+
) -> HashMap<String, String> {
192+
let mut custom_partition_values: HashMap<String, String> = HashMap::new();
193+
for custom_partition_field in custom_partition_list {
194+
let custom_partition_value = json.get(custom_partition_field.trim()).unwrap().to_owned();
195+
let custom_partition_value = match custom_partition_value {
196+
e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(),
197+
Value::String(s) => s,
198+
_ => "".to_string(),
199+
};
200+
custom_partition_values.insert(
201+
custom_partition_field.trim().to_string(),
202+
custom_partition_value,
203+
);
204+
}
205+
custom_partition_values
206+
}
207+
208+
/// Returns the parsed timestamp of deignated time partition from json object
209+
/// e.g. `json: {"timestamp": "2025-05-15T15:30:00Z"}` returns `2025-05-15T15:30:00`
210+
fn extract_and_parse_time(
211+
json: &Value,
212+
time_partition: &str,
213+
) -> Result<NaiveDateTime, anyhow::Error> {
214+
let current_time = json
215+
.get(time_partition)
216+
.ok_or_else(|| anyhow!("Missing field for time partition in json: {time_partition}"))?;
217+
let parsed_time: DateTime<Utc> = serde_json::from_value(current_time.clone())?;
218+
219+
Ok(parsed_time.naive_utc())
123220
}
124221

125222
// Returns arrow schema with the fields that are present in the request body
@@ -225,3 +322,37 @@ fn valid_type(data_type: &DataType, value: &Value, schema_version: SchemaVersion
225322
}
226323
}
227324
}
325+
326+
#[cfg(test)]
327+
mod tests {
328+
use std::str::FromStr;
329+
330+
use serde_json::json;
331+
332+
use super::*;
333+
334+
#[test]
335+
fn parse_time_parition_from_value() {
336+
let json = json!({"timestamp": "2025-05-15T15:30:00Z"});
337+
let parsed = extract_and_parse_time(&json, "timestamp");
338+
339+
let expected = NaiveDateTime::from_str("2025-05-15T15:30:00").unwrap();
340+
assert_eq!(parsed.unwrap(), expected);
341+
}
342+
343+
#[test]
344+
fn time_parition_not_in_json() {
345+
let json = json!({"hello": "world!"});
346+
let parsed = extract_and_parse_time(&json, "timestamp");
347+
348+
assert!(parsed.is_err());
349+
}
350+
351+
#[test]
352+
fn time_parition_not_parseable_as_datetime() {
353+
let json = json!({"timestamp": "not time"});
354+
let parsed = extract_and_parse_time(&json, "timestamp");
355+
356+
assert!(parsed.is_err());
357+
}
358+
}

src/event/format/mod.rs

+19-2
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,11 @@ use serde_json::Value;
3232

3333
use crate::{
3434
metadata::SchemaVersion,
35+
storage::StreamType,
3536
utils::arrow::{get_field, get_timestamp_array, replace_columns},
3637
};
3738

38-
use super::DEFAULT_TIMESTAMP_KEY;
39+
use super::{Event, DEFAULT_TIMESTAMP_KEY};
3940

4041
pub mod json;
4142

@@ -105,14 +106,17 @@ pub trait EventFormat: Sized {
105106

106107
fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;
107108

109+
/// Returns the UTC time at ingestion
110+
fn get_p_timestamp(&self) -> DateTime<Utc>;
111+
108112
fn into_recordbatch(
109113
self,
110114
storage_schema: &HashMap<String, Arc<Field>>,
111-
p_timestamp: DateTime<Utc>,
112115
static_schema_flag: bool,
113116
time_partition: Option<&String>,
114117
schema_version: SchemaVersion,
115118
) -> Result<(RecordBatch, bool), AnyError> {
119+
let p_timestamp = self.get_p_timestamp();
116120
let (data, mut schema, is_first) =
117121
self.to_data(storage_schema, time_partition, schema_version)?;
118122

@@ -173,6 +177,19 @@ pub trait EventFormat: Sized {
173177
}
174178
true
175179
}
180+
181+
#[allow(clippy::too_many_arguments)]
182+
fn into_event(
183+
self,
184+
stream_name: String,
185+
origin_size: u64,
186+
storage_schema: &HashMap<String, Arc<Field>>,
187+
static_schema_flag: bool,
188+
custom_partitions: Option<&String>,
189+
time_partition: Option<&String>,
190+
schema_version: SchemaVersion,
191+
stream_type: StreamType,
192+
) -> Result<Event, AnyError>;
176193
}
177194

178195
pub fn get_existing_field_names(

src/event/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ impl Event {
8181
self.origin_format,
8282
self.origin_size,
8383
self.rb.num_rows(),
84-
self.parsed_timestamp,
84+
self.parsed_timestamp.date(),
8585
);
8686

8787
crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb);

0 commit comments

Comments
 (0)