Skip to content

Commit bfa9add

Browse files
committed
refactor: use logmsg instead of record only
1 parent 01614ec commit bfa9add

3 files changed

Lines changed: 70 additions & 58 deletions

File tree

src/elastic_push.rs

Lines changed: 60 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ use tokio::sync::mpsc;
33

44
use std::{error::Error, iter::once};
55

6-
use crate::{config::ElasticConfig, redis_logs::LogRecord};
6+
use crate::{
7+
config::ElasticConfig,
8+
redis_logs::{LogMsg, LogRecord},
9+
};
710

811
fn elastic_client(config: &ElasticConfig) -> Result<Elasticsearch, Box<dyn Error>> {
912
let url = elasticsearch::http::Url::parse(&config.url.full_url())?;
@@ -17,25 +20,30 @@ fn elastic_client(config: &ElasticConfig) -> Result<Elasticsearch, Box<dyn Error
1720
}
1821

1922
/// Convert a LogRecord to the document we want Elastic to ingest
20-
fn json_from_logrecord(record: &LogRecord) -> Result<serde_json::Value, serde_json::Error> {
23+
fn json_from_logmsg(msg: &LogMsg) -> Result<serde_json::Value, serde_json::Error> {
2124
// dbg!(serde_json::to_value(record))
2225
dbg!(Ok(serde_json::json!({
23-
"@timestamp": record.time.as_rfc3339(),
24-
"file": record.file,
25-
"function": record.function,
26-
"message": record.message,
27-
"log_type": record.level.name
26+
"@timestamp": msg.record.time.as_rfc3339(),
27+
"file": msg.record.file,
28+
"function": msg.record.function,
29+
"message": msg.record.message,
30+
"log_type": msg.record.level.name,
31+
"line": msg.record.line,
32+
"module": msg.record.module,
33+
"service_name": msg.service_name,
34+
"proc_id": msg.record.process.id,
35+
"exception": msg.record.exception,
2836
})))
2937
}
3038

3139
fn make_json_body(
32-
records: &Vec<LogRecord>,
40+
msgs: &Vec<LogMsg>,
3341
) -> Result<Vec<JsonBody<serde_json::Value>>, serde_json::Error> {
3442
let action = serde_json::json!({ "create": {} });
3543

36-
let values = records
44+
let values = msgs
3745
.iter()
38-
.map(|e| json_from_logrecord(e))
46+
.map(|e| json_from_logmsg(e))
3947
.collect::<Result<Vec<serde_json::Value>, serde_json::Error>>()?;
4048

4149
Ok(values
@@ -46,10 +54,10 @@ fn make_json_body(
4654
.collect())
4755
}
4856

49-
pub async fn consumer_loop(rx: &mut mpsc::UnboundedReceiver<LogRecord>, config: ElasticConfig) {
57+
pub async fn consumer_loop(rx: &mut mpsc::UnboundedReceiver<LogMsg>, config: ElasticConfig) {
5058
let elastic_client = elastic_client(&config).expect("Failed to connect to Elastic!");
5159

52-
let mut buffer: Vec<LogRecord> = Vec::with_capacity(config.chunk_size.into());
60+
let mut buffer: Vec<LogMsg> = Vec::with_capacity(config.chunk_size.into());
5361

5462
loop {
5563
let open = rx.recv_many(&mut buffer, config.chunk_size.into()).await;
@@ -87,56 +95,60 @@ mod tests {
8795
}
8896

8997
// Implement LogRecord for test if not already present
90-
impl From<DummyLog> for LogRecord {
98+
impl From<DummyLog> for LogMsg {
9199
fn from(d: DummyLog) -> Self {
92100
// Adjust this conversion as per your actual LogRecord struct
93-
LogRecord {
94-
elapsed: crate::redis_logs::Elapsed {
95-
repr: "".into(),
96-
seconds: 0.0,
97-
},
98-
exception: None,
99-
extra: {}.into(),
100-
file: crate::redis_logs::File {
101-
name: "".into(),
102-
path: "".into(),
103-
},
104-
function: "".into(),
105-
level: crate::redis_logs::LogLevel {
106-
icon: "".into(),
107-
name: d.level,
108-
no: 100,
109-
},
110-
line: 0,
111-
message: d.msg,
112-
module: "".into(),
113-
name: "".into(),
114-
process: crate::redis_logs::NameId {
101+
LogMsg {
102+
service_name: "test_service".into(),
103+
text: "...".into(),
104+
record: LogRecord {
105+
elapsed: crate::redis_logs::Elapsed {
106+
repr: "".into(),
107+
seconds: 0.0,
108+
},
109+
exception: None,
110+
extra: {}.into(),
111+
file: crate::redis_logs::File {
112+
name: "".into(),
113+
path: "".into(),
114+
},
115+
function: "".into(),
116+
level: crate::redis_logs::LogLevel {
117+
icon: "".into(),
118+
name: d.level,
119+
no: 100,
120+
},
121+
line: 0,
122+
message: d.msg,
123+
module: "".into(),
115124
name: "".into(),
116-
id: 0,
117-
},
118-
thread: crate::redis_logs::NameId {
119-
name: "".into(),
120-
id: 0,
121-
},
122-
time: crate::redis_logs::Timestamp {
123-
repr: "".into(),
124-
timestamp: 0.0,
125+
process: crate::redis_logs::NameId {
126+
name: "".into(),
127+
id: 0,
128+
},
129+
thread: crate::redis_logs::NameId {
130+
name: "".into(),
131+
id: 0,
132+
},
133+
time: crate::redis_logs::Timestamp {
134+
repr: "".into(),
135+
timestamp: 0.0,
136+
},
125137
},
126138
}
127139
}
128140
}
129141

130142
#[test]
131143
fn test_make_docs_values_empty() {
132-
let records: Vec<LogRecord> = vec![];
144+
let records: Vec<LogMsg> = vec![];
133145
let docs = make_json_body(&records).unwrap();
134146
assert!(docs.is_empty());
135147
}
136148

137149
#[test]
138150
fn test_make_docs_values_single() {
139-
let record: LogRecord = DummyLog {
151+
let record: LogMsg = DummyLog {
140152
msg: "hello".to_string(),
141153
level: "info".to_string(),
142154
}
@@ -148,12 +160,12 @@ mod tests {
148160

149161
#[test]
150162
fn test_make_docs_values_multiple() {
151-
let record1: LogRecord = DummyLog {
163+
let record1: LogMsg = DummyLog {
152164
msg: "a".to_string(),
153165
level: "info".to_string(),
154166
}
155167
.into();
156-
let record2: LogRecord = DummyLog {
168+
let record2: LogMsg = DummyLog {
157169
msg: "b".to_string(),
158170
level: "warn".to_string(),
159171
}

src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::process::exit;
33
use tokio::sync::mpsc;
44

55
mod redis_logs;
6-
use crate::redis_logs::{LogRecord, producer_loop};
6+
use crate::redis_logs::{LogMsg, producer_loop};
77

88
mod elastic_push;
99
use crate::elastic_push::consumer_loop;
@@ -42,7 +42,7 @@ async fn main() {
4242
let config = entry();
4343
println!("Starting log ingestor with config: \n {:?}", &config);
4444

45-
let (tx, mut rx) = mpsc::unbounded_channel::<LogRecord>();
45+
let (tx, mut rx) = mpsc::unbounded_channel::<LogMsg>();
4646
let producer = tokio::spawn(producer_loop(tx, config.redis.clone()));
4747
consumer_loop(&mut rx, config.elastic.clone()).await;
4848

src/redis_logs.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,10 @@ pub struct LogRecord {
5959
}
6060

6161
#[derive(Debug, PartialEq, Deserialize, Serialize, Clone)]
62-
struct LogMsg {
63-
record: LogRecord,
64-
service_name: String,
65-
text: String,
62+
pub struct LogMsg {
63+
pub record: LogRecord,
64+
pub service_name: String,
65+
pub text: String,
6666
}
6767

6868
#[derive(Debug, PartialEq, Deserialize, Serialize, Clone)]
@@ -196,10 +196,10 @@ fn process_data(values: Vec<redis::Value>) -> Result<Vec<LogMessagePack>, Box<dy
196196
.collect::<Result<Vec<LogMessagePack>, rmp_serde::decode::Error>>()?)
197197
}
198198

199-
fn extract_records(messages: Vec<LogMessagePack>) -> Vec<LogRecord> {
199+
fn extract_records(messages: Vec<LogMessagePack>) -> Vec<LogMsg> {
200200
messages
201201
.iter()
202-
.map(|e| e.bec_codec.data.log_msg.record.clone())
202+
.map(|e| e.bec_codec.data.log_msg.clone())
203203
.collect()
204204
}
205205

@@ -235,7 +235,7 @@ fn setup_consumer_group(conn: &mut redis::Connection, config: &RedisConfig) {
235235
));
236236
}
237237

238-
pub async fn producer_loop(tx: mpsc::UnboundedSender<LogRecord>, config: RedisConfig) {
238+
pub async fn producer_loop(tx: mpsc::UnboundedSender<LogMsg>, config: RedisConfig) {
239239
let mut redis_conn = redis_conn(&config.url.full_url()).expect("Could not connect to Redis!");
240240
let stream_read_id: String = ">".into();
241241
setup_consumer_group(&mut redis_conn, &config);
@@ -275,7 +275,7 @@ mod tests {
275275
pack.bec_codec.data.log_msg.record.message = "test".to_string();
276276
let records = extract_records(vec![pack.clone()]);
277277
assert_eq!(records.len(), 1);
278-
assert_eq!(records[0].message, "test");
278+
assert_eq!(records[0].record.message, "test");
279279
}
280280

281281
#[test]

0 commit comments

Comments
 (0)