Skip to content

Commit 23b5dec

Browse files
committed
refactor: separate json tranform to new function
1 parent 2383ff6 commit 23b5dec

1 file changed

Lines changed: 13 additions & 6 deletions

File tree

src/elastic_push.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,21 @@ fn elastic_client(url: &str, api_key: &str) -> Result<Elasticsearch, elasticsear
1515
Ok(Elasticsearch::new(transport))
1616
}
1717

18-
fn make_docs_values(
18+
/// Convert a LogRecord to the document we want Elastic to ingest
19+
fn json_from_logrecord(record: &LogRecord) -> Result<serde_json::Value, serde_json::Error> {
20+
// Currently a naive implementation but could be:
21+
// serde_json::json!({"field_a": record.a, "field_b": record.b, etc...})
22+
serde_json::to_value(record)
23+
}
24+
25+
fn make_json_body(
1926
records: &Vec<LogRecord>,
2027
) -> Result<Vec<JsonBody<serde_json::Value>>, serde_json::Error> {
2128
let action = serde_json::json!({ "create": {} });
2229

2330
let values = records
2431
.iter()
25-
.map(|e| serde_json::to_value(e))
32+
.map(|e| json_from_logrecord(e))
2633
.collect::<Result<Vec<serde_json::Value>, serde_json::Error>>()?;
2734

2835
Ok(values
@@ -44,7 +51,7 @@ pub async fn consumer_loop(rx: &mut mpsc::UnboundedReceiver<LogRecord>, config:
4451
if open == 0 {
4552
break;
4653
}
47-
let body = make_docs_values(&buffer).unwrap_or(vec![]);
54+
let body = make_json_body(&buffer).unwrap_or(vec![]);
4855
let response = elastic_client
4956
.bulk(elasticsearch::BulkParts::Index("test-index"))
5057
.body(body)
@@ -115,7 +122,7 @@ mod tests {
115122
#[test]
116123
fn test_make_docs_values_empty() {
117124
let records: Vec<LogRecord> = vec![];
118-
let docs = make_docs_values(&records).unwrap();
125+
let docs = make_json_body(&records).unwrap();
119126
assert!(docs.is_empty());
120127
}
121128

@@ -126,7 +133,7 @@ mod tests {
126133
level: "info".to_string(),
127134
}
128135
.into();
129-
let docs = make_docs_values(&vec![record.clone()]).unwrap();
136+
let docs = make_json_body(&vec![record.clone()]).unwrap();
130137
// Each record should produce two JSON bodies (action + doc)
131138
assert_eq!(docs.len(), 2);
132139
}
@@ -143,7 +150,7 @@ mod tests {
143150
level: "warn".to_string(),
144151
}
145152
.into();
146-
let docs = make_docs_values(&vec![record1, record2]).unwrap();
153+
let docs = make_json_body(&vec![record1, record2]).unwrap();
147154
assert_eq!(docs.len(), 4);
148155
}
149156

0 commit comments

Comments
 (0)