Skip to content

Commit 1edbd43

Browse files
committed
feat: make python module
1 parent 9adfd98 commit 1edbd43

6 files changed

Lines changed: 44 additions & 34 deletions

File tree

src/main.rs renamed to src/bin/app.rs

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,6 @@
1-
use std::process::exit;
2-
3-
use tokio::sync::mpsc;
4-
5-
mod redis_logs;
6-
use crate::redis_logs::{LogMsg, producer_loop};
7-
8-
mod elastic_push;
9-
use crate::elastic_push::consumer_loop;
10-
11-
mod config;
12-
use crate::config::IngestorConfig;
13-
1+
use bec_log_ingestor::{config::IngestorConfig, main_loop};
142
use clap::Parser;
3+
use std::process::exit;
154

165
#[derive(clap::Parser, Debug)]
176
struct Args {
@@ -37,16 +26,6 @@ fn entry() -> IngestorConfig {
3726
IngestorConfig::from_file(path)
3827
}
3928

40-
async fn main_loop(config: IngestorConfig) {
41-
println!("Starting log ingestor with config: \n {:?}", &config);
42-
43-
let (tx, mut rx) = mpsc::unbounded_channel::<LogMsg>();
44-
let producer = tokio::spawn(producer_loop(tx, config.redis.clone()));
45-
consumer_loop(&mut rx, config.elastic.clone()).await;
46-
47-
let _ = tokio::join!(producer);
48-
}
49-
5029
#[tokio::main]
5130
async fn main() {
5231
let config = entry();

src/config.rs renamed to src/lib/config.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,10 @@ pub struct IngestorConfig {
8282

8383
impl IngestorConfig {
8484
/// Parse a toml file for an IngestorConfig. Assumes the file exists and is readable.
85-
pub fn from_file(path: std::path::PathBuf) -> Self {
85+
pub fn from_file<P>(path: P) -> Self
86+
where
87+
P: AsRef<std::path::Path>,
88+
{
8689
let mut file = std::fs::File::open(path).expect("Cannot open supplied config file!");
8790
let mut contents = String::new();
8891
file.read_to_string(&mut contents)
Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,7 @@
1+
use crate::{config::ElasticConfig, redis_logs::LogMsg};
12
use elasticsearch::{Elasticsearch, http::request::JsonBody};
2-
use tokio::sync::mpsc;
3-
43
use std::{error::Error, iter::once};
5-
6-
use crate::{
7-
config::ElasticConfig,
8-
redis_logs::{LogMsg, LogRecord},
9-
};
4+
use tokio::sync::mpsc;
105

116
fn elastic_client(config: &ElasticConfig) -> Result<Elasticsearch, Box<dyn Error>> {
127
let url = elasticsearch::http::Url::parse(&config.url.full_url())?;
@@ -83,7 +78,7 @@ pub async fn consumer_loop(rx: &mut mpsc::UnboundedReceiver<LogMsg>, config: Ela
8378

8479
#[cfg(test)]
8580
mod tests {
86-
use crate::config::UrlPort;
81+
use crate::{config::UrlPort, redis_logs::LogRecord};
8782

8883
use super::*;
8984
use serde::{Deserialize, Serialize};

src/lib/lib.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
use tokio::sync::mpsc;
2+
pub mod config;
3+
pub mod elastic_push;
4+
pub mod redis_logs;
5+
use redis_logs::{LogMsg, producer_loop};
6+
7+
use elastic_push::consumer_loop;
8+
9+
use config::IngestorConfig;
10+
11+
pub async fn main_loop(config: IngestorConfig) {
12+
println!("Starting log ingestor with config: \n {:?}", &config);
13+
14+
let (tx, mut rx) = mpsc::unbounded_channel::<LogMsg>();
15+
let producer = tokio::spawn(producer_loop(tx, config.redis.clone()));
16+
consumer_loop(&mut rx, config.elastic.clone()).await;
17+
18+
let _ = tokio::join!(producer);
19+
}
20+
21+
#[pyo3::pymodule]
22+
mod bec_log_ingestor {
23+
use crate::config::IngestorConfig;
24+
use pyo3::prelude::*;
25+
use tokio::runtime::Runtime;
26+
27+
/// Run forever. Will block forever.
28+
#[pyfunction]
29+
fn run_with_config(filename: String) {
30+
let config = IngestorConfig::from_file(filename);
31+
let rt = Runtime::new().unwrap();
32+
rt.block_on(super::main_loop(config));
33+
}
34+
}
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1+
use crate::config::RedisConfig;
12
use chrono::TimeZone;
23
use redis::Commands;
34
use rmp_serde;
45
use serde::{Deserialize, Serialize};
56
use std::error::Error;
67
use tokio::sync::mpsc;
78

8-
use crate::config::RedisConfig;
9-
109
#[derive(Debug, PartialEq, Deserialize, Serialize, Clone)]
1110
pub struct Elapsed {
1211
pub repr: String,

0 commit comments

Comments
 (0)