Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test-and-coverage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:

- name: Generate code coverage
run: |
cargo +nightly tarpaulin --verbose --all-features --workspace --timeout 120 --out xml
cargo +nightly tarpaulin --verbose --all-features --engine llvm --no-dead-code --timeout 120 --out xml

- name: Upload coverage to Codecov
uses: codecov/codecov-action@v5
Expand Down
89 changes: 89 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,18 @@ name = "bec_log_ingestor"
version = "0.1.0"
edition = "2024"

[lib]
name = "bec_log_ingestor"
path = "src/lib/lib.rs"
crate-type = ["cdylib", "rlib"]

[[bin]]
name = "bec_log_ingestor"
path = "src/bin/app.rs"


[dependencies]
pyo3 = { git = "https://github.com/pyo3/pyo3", features = ["extension-module"] }
chrono = "0.4.41"
clap = { version = "4.5.42", features = ["derive"] }
elasticsearch = "9.0.0-alpha.1"
Expand Down
14 changes: 14 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[build-system]
requires = ["maturin>=1,<2"]
build-backend = "maturin"

[project]
name = "bec_log_ingestor"
version = "0.1.0"
authors = [{ name = "David Perl", email = "david.perl@psi.ch" }]
requires-python = ">=3.10"
classifiers = [
"Programming Language :: Rust",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
]
25 changes: 2 additions & 23 deletions src/main.rs → src/bin/app.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,6 @@
use std::process::exit;

use tokio::sync::mpsc;

mod redis_logs;
use crate::redis_logs::{LogMsg, producer_loop};

mod elastic_push;
use crate::elastic_push::consumer_loop;

mod config;
use crate::config::IngestorConfig;

use bec_log_ingestor::{config::IngestorConfig, main_loop};
use clap::Parser;
use std::process::exit;

#[derive(clap::Parser, Debug)]
struct Args {
Expand All @@ -37,16 +26,6 @@ fn entry() -> IngestorConfig {
IngestorConfig::from_file(path)
}

async fn main_loop(config: IngestorConfig) {
println!("Starting log ingestor with config: \n {:?}", &config);

let (tx, mut rx) = mpsc::unbounded_channel::<LogMsg>();
let producer = tokio::spawn(producer_loop(tx, config.redis.clone()));
consumer_loop(&mut rx, config.elastic.clone()).await;

let _ = tokio::join!(producer);
}

#[tokio::main]
async fn main() {
let config = entry();
Expand Down
5 changes: 4 additions & 1 deletion src/config.rs → src/lib/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ pub struct IngestorConfig {

impl IngestorConfig {
/// Parse a toml file for an IngestorConfig. Assumes the file exists and is readable.
pub fn from_file(path: std::path::PathBuf) -> Self {
pub fn from_file<P>(path: P) -> Self
where
P: AsRef<std::path::Path>,
{
let mut file = std::fs::File::open(path).expect("Cannot open supplied config file!");
let mut contents = String::new();
file.read_to_string(&mut contents)
Expand Down
11 changes: 3 additions & 8 deletions src/elastic_push.rs → src/lib/elastic_push.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
use crate::{config::ElasticConfig, redis_logs::LogMsg};
use elasticsearch::{Elasticsearch, http::request::JsonBody};
use tokio::sync::mpsc;

use std::{error::Error, iter::once};

use crate::{
config::ElasticConfig,
redis_logs::{LogMsg, LogRecord},
};
use tokio::sync::mpsc;

fn elastic_client(config: &ElasticConfig) -> Result<Elasticsearch, Box<dyn Error>> {
let url = elasticsearch::http::Url::parse(&config.url.full_url())?;
Expand Down Expand Up @@ -83,7 +78,7 @@ pub async fn consumer_loop(rx: &mut mpsc::UnboundedReceiver<LogMsg>, config: Ela

#[cfg(test)]
mod tests {
use crate::config::UrlPort;
use crate::{config::UrlPort, redis_logs::LogRecord};

use super::*;
use serde::{Deserialize, Serialize};
Expand Down
File renamed without changes.
34 changes: 34 additions & 0 deletions src/lib/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use tokio::sync::mpsc;
pub mod config;
pub mod elastic_push;
pub mod redis_logs;
use redis_logs::{LogMsg, producer_loop};

use elastic_push::consumer_loop;

use config::IngestorConfig;

pub async fn main_loop(config: IngestorConfig) {
println!("Starting log ingestor with config: \n {:?}", &config);

let (tx, mut rx) = mpsc::unbounded_channel::<LogMsg>();
let producer = tokio::spawn(producer_loop(tx, config.redis.clone()));
consumer_loop(&mut rx, config.elastic.clone()).await;

let _ = tokio::join!(producer);
}

#[pyo3::pymodule]
mod bec_log_ingestor {
use crate::config::IngestorConfig;
use pyo3::prelude::*;
use tokio::runtime::Runtime;

/// Run forever. Will block forever.
#[pyfunction]
fn run_with_config(filename: String) {
let config = IngestorConfig::from_file(filename);
let rt = Runtime::new().unwrap();
rt.block_on(super::main_loop(config));
}
}
3 changes: 1 addition & 2 deletions src/redis_logs.rs → src/lib/redis_logs.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use crate::config::RedisConfig;
use chrono::TimeZone;
use redis::Commands;
use rmp_serde;
use serde::{Deserialize, Serialize};
use std::error::Error;
use tokio::sync::mpsc;

use crate::config::RedisConfig;

#[derive(Debug, PartialEq, Deserialize, Serialize, Clone)]
pub struct Elapsed {
pub repr: String,
Expand Down
Loading