Skip to content

Commit 97c4921

Browse files
committed
feat: replace elastic with loki
- add skeleton for metrics in config
1 parent bf0a4fd commit 97c4921

12 files changed

Lines changed: 643 additions & 755 deletions

Cargo.lock

Lines changed: 413 additions & 558 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "bec_log_ingestor"
3-
description = "Tiny service to pull BEC logs from Redis and push them to Elastic"
3+
description = "Tiny service to pull BEC logs from Redis and push them to Loki"
44
readme = "README.md"
55
repository = "https://github.com/bec-project/bec_log_ingestor"
66
keywords = ["logging", "BEC"]
@@ -19,13 +19,13 @@ pre_install_script_prog = ["/bin/bash"]
1919
pre_install_script_flags = 0b100
2020

2121
[dependencies]
22-
chrono = "0.4.41"
23-
clap = { version = "4.5.42", features = ["derive"] }
24-
elasticsearch = "9.0.0-alpha.1"
25-
redis = "0.32.4"
22+
chrono = "0.4.42"
23+
clap = { version = "4.5.52", features = ["derive"] }
24+
redis = "0.32.7"
25+
reqwest = "0.12.24"
2626
rmp-serde = "1.3.0"
27-
serde = "1.0.219"
28-
serde_derive = "1.0.219"
29-
serde_json = "1.0.142"
30-
tokio = "1.47.0"
31-
toml = "0.9.5"
27+
serde = "1.0.228"
28+
serde_derive = "1.0.228"
29+
serde_json = "1.0.145"
30+
tokio = { version = "1.48.0", features = ["full", "rt-multi-thread", "sync", "tokio-macros"] }
31+
toml = "0.9.8"

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
[![Linting](https://github.com/bec-project/bec_log_ingestor/actions/workflows/check-and-lint.yaml/badge.svg)](https://github.com/bec-project/bec_log_ingestor/actions/workflows/check-and-lint.yaml) [![codecov](https://codecov.io/gh/bec-project/bec_log_ingestor/graph/badge.svg?token=B7Mzj4EhzH)](https://codecov.io/gh/bec-project/bec_log_ingestor)
44

5-
Tiny service to pull BEC logs from Redis and push them to Elastic.
5+
Tiny service to pull BEC logs from Redis and push them to Loki.
66

77
To use the systemd service as-is, a config should be created at `/etc/bec_log_ingestor.toml`, following the example in `install/example_config.toml`.
88

install/example_config.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ consumer_id = "log-ingestor"
88
url = "redis://127.0.0.1"
99
port = 6379
1010

11-
[elastic]
12-
api_key = "RjhrMWY1Z0J4ZjV0T0NJQmIzdjU6ZjVURGdmWmVCM3I3ckd2ZmFLUXl6UQ=="
11+
[loki]
12+
url = "http://localhost"
13+
auth = { username = "test-loki", password = "test-loki-password" }
1314
chunk_size = 100
14-
index = "logstash-bec_test123"
1515
beamline_name = "x99xa"
1616

17-
[elastic.url]
17+
[metrics]
1818
url = "http://localhost"
19-
port = 9200
19+
auth = { username = "test-mimir", password = "test-mimir-password" }
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
[intervals]
2+
repo_git_status = { Minutely = 30 }
3+
cpu_usage = { Secondly = 30 }
4+
ram_usage = { Secondly = 30 }

install/setup_user

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22
if id "bec_log_ingestor" &>/dev/null; then
33
echo "User 'bec_log_ingestor' already exists."
44
else
5-
useradd -r -c 'BEC Logs Elasticsearch Ingestor' -d /var/lib/bec_log_ingestor -s /sbin/nologin bec_log_ingestor
5+
useradd -r -c 'BEC Logs Loki Ingestor' -d /var/lib/bec_log_ingestor -s /sbin/nologin bec_log_ingestor
66
echo "User 'bec_log_ingestor' created."
77
fi

src/config.rs

Lines changed: 111 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,22 @@
1-
use std::{fmt, io::Read};
1+
use std::{collections::HashMap, fmt, io::Read};
22

3-
use serde::Deserialize;
3+
use serde_derive::{Deserialize, Serialize};
44

5-
#[derive(Clone, Debug, Deserialize)]
5+
pub trait FromTomlFile {
6+
/// Parse a toml file for an IngestorConfig. Assumes the file exists and is readable.
7+
fn from_file(path: std::path::PathBuf) -> Self
8+
where
9+
Self: for<'de> serde::Deserialize<'de>,
10+
{
11+
let mut file = std::fs::File::open(path).expect("Cannot open supplied config file!");
12+
let mut contents = String::new();
13+
file.read_to_string(&mut contents)
14+
.expect("Cannot read supplied config file!");
15+
let self_name = std::any::type_name::<Self>();
16+
toml::from_str(&contents).expect(format!("Invalid TOML for {self_name} struct").as_str())
17+
}
18+
}
19+
#[derive(Clone, Debug, Serialize, Deserialize)]
620
pub struct UrlPort {
721
pub url: String,
822
pub port: u16,
@@ -14,7 +28,20 @@ impl UrlPort {
1428
}
1529
}
1630

17-
/// Default number of records to read from Redis or push to Elastic at once
31+
#[derive(Clone, Deserialize)]
32+
33+
pub struct BasicAuth {
34+
pub username: String,
35+
pub password: String,
36+
}
37+
38+
impl fmt::Debug for BasicAuth {
39+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
40+
write!(f, "Basic auth: provided",)
41+
}
42+
}
43+
44+
/// Default number of records to read from Redis or push to Loki at once
1845
fn default_chunk_size() -> u16 {
1946
100
2047
}
@@ -26,10 +53,6 @@ fn default_blocktime_millis() -> usize {
2653
fn default_consumer() -> String {
2754
"log-ingestor".into()
2855
}
29-
/// Default value for the elastic index
30-
fn default_index() -> String {
31-
"logstash-bec_test123".into()
32-
}
3356
/// Default value for the beamline name
3457
fn default_beamline_name() -> String {
3558
"x99xa".into()
@@ -48,77 +71,54 @@ pub struct RedisConfig {
4871
pub consumer_id: String,
4972
}
5073

51-
#[derive(Clone, Deserialize)]
52-
pub struct ElasticConfig {
53-
pub url: UrlPort,
54-
pub api_key: Option<String>,
55-
pub username: Option<String>,
56-
pub password: Option<String>,
74+
#[derive(Clone, Debug, Deserialize)]
75+
pub struct LokiConfig {
76+
pub url: String,
77+
pub auth: BasicAuth,
5778
#[serde(default = "default_chunk_size")]
5879
pub chunk_size: u16,
59-
#[serde(default = "default_index")]
60-
pub index: String,
6180
#[serde(default = "default_beamline_name")]
6281
pub beamline_name: String,
6382
}
6483

65-
impl fmt::Debug for ElasticConfig {
66-
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
67-
let auth_mode = {
68-
if self.api_key.is_some() {
69-
"api key"
70-
} else {
71-
"basic auth"
72-
}
73-
};
74-
write!(
75-
f,
76-
"{{
77-
url: {:?}
78-
auth mode: {:?}
79-
chunk size: {:?}
80-
index: {:?}
81-
beamline: {:?}
82-
}}",
83-
self.url, auth_mode, self.chunk_size, self.index, self.beamline_name
84-
)
85-
}
84+
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
85+
pub enum MetricInterval {
86+
Secondly(i32),
87+
Minutely(i32),
88+
Hourly(i32),
89+
Daily(i32),
90+
Weekly(i32),
8691
}
92+
#[derive(Clone, Serialize, Deserialize, Debug)]
93+
pub struct MetricIntervalsConfig {
94+
#[serde(default = "HashMap::new")]
95+
pub intervals: HashMap<String, MetricInterval>,
96+
}
97+
impl FromTomlFile for MetricIntervalsConfig {}
8798

88-
impl ElasticConfig {
89-
pub fn credentials(&self) -> Result<elasticsearch::auth::Credentials, &str> {
90-
if let Some(api_key) = &self.api_key {
91-
Ok(elasticsearch::auth::Credentials::EncodedApiKey(
92-
api_key.into(),
93-
))
94-
} else if let Some(username) = &self.username
95-
&& let Some(password) = &self.password
96-
{
97-
Ok(elasticsearch::auth::Credentials::Basic(
98-
username.to_owned(),
99-
password.to_owned(),
100-
))
101-
} else {
102-
Err("No credentials in config!")
103-
}
104-
}
99+
#[derive(Clone, Debug, Deserialize)]
100+
pub struct MetricsConfig {
101+
pub auth: BasicAuth,
102+
pub url: String,
103+
#[serde(default = "HashMap::new")]
104+
pub intervals: HashMap<String, MetricInterval>,
105105
}
106106

107107
#[derive(Clone, Debug, Deserialize)]
108108
pub struct IngestorConfig {
109109
pub redis: RedisConfig,
110-
pub elastic: ElasticConfig,
110+
pub loki: LokiConfig,
111+
pub metrics: MetricsConfig,
111112
}
113+
impl FromTomlFile for IngestorConfig {}
112114

113-
impl IngestorConfig {
114-
/// Parse a toml file for an IngestorConfig. Assumes the file exists and is readable.
115-
pub fn from_file(path: std::path::PathBuf) -> Self {
116-
let mut file = std::fs::File::open(path).expect("Cannot open supplied config file!");
117-
let mut contents = String::new();
118-
file.read_to_string(&mut contents)
119-
.expect("Cannot read supplied config file!");
120-
toml::from_str(&contents).unwrap()
115+
pub fn assemble_config(paths: (std::path::PathBuf, Option<std::path::PathBuf>)) -> IngestorConfig {
116+
let (path, metrics_path) = paths;
117+
let mut config = IngestorConfig::from_file(path);
118+
if let Some(intervals) = metrics_path.map(MetricIntervalsConfig::from_file) {
119+
config.metrics.intervals.extend(intervals.intervals);
121120
}
121+
config
122122
}
123123

124124
#[cfg(test)]
@@ -153,17 +153,19 @@ port = 12345
153153
url = \"http://127.0.0.1\"
154154
port = 12345
155155
156-
[elastic]
157-
api_key = \"abcdefgh==\"
156+
[loki]
157+
auth = { username = \"test_user\", password = \"test_password\" }
158+
url = \"http://127.0.0.1/api/v1/push\"
158159
159-
[elastic.url]
160+
[metrics]
161+
auth = { username = \"test_user\", password = \"test_password\" }
160162
url = \"http://127.0.0.1\"
161-
port = 9876
162163
";
163164
let config: IngestorConfig = toml::from_str(&test_str).unwrap();
164165
assert_eq!(config.redis.url.full_url(), "http://127.0.0.1:12345");
165-
assert_eq!(config.elastic.url.full_url(), "http://127.0.0.1:9876");
166-
assert_eq!(config.elastic.api_key, Some("abcdefgh==".into()));
166+
assert_eq!(config.loki.url, "http://127.0.0.1/api/v1/push");
167+
assert_eq!(config.loki.auth.username, "test_user");
168+
assert_eq!(config.loki.auth.password, "test_password");
167169
}
168170

169171
#[test]
@@ -180,18 +182,6 @@ port = 6379
180182
assert_eq!(redis.consumer_id, "log-ingestor");
181183
}
182184

183-
#[test]
184-
fn test_elastic_defaults() {
185-
let test_str = "
186-
url = { url = \"http://localhost\", port = 9200 }
187-
api_key = \"testkey\"
188-
";
189-
let elastic: ElasticConfig = toml::from_str(&test_str).unwrap();
190-
assert_eq!(elastic.chunk_size, 100);
191-
assert_eq!(elastic.api_key, Some("testkey".into()));
192-
assert_eq!(elastic.url.full_url(), "http://localhost:9200");
193-
}
194-
195185
#[test]
196186
fn test_invalid_urlport_missing_field() {
197187
let test_str = "
@@ -219,4 +209,44 @@ this is not toml
219209
});
220210
assert!(result.is_err());
221211
}
212+
213+
#[test]
214+
fn test_from_file_success() {
215+
use std::path::PathBuf;
216+
let path = PathBuf::from("./install/example_config.toml");
217+
let config = IngestorConfig::from_file(path);
218+
assert_eq!(config.loki.auth.password, "test-loki-password");
219+
assert_eq!(config.loki.auth.username, "test-loki");
220+
}
221+
222+
#[test]
223+
fn test_assembled_from_file_success() {
224+
use std::path::PathBuf;
225+
let path = PathBuf::from("./install/example_config.toml");
226+
let metrics_path = PathBuf::from("./install/example_metrics_config.toml");
227+
let config = assemble_config((path, Some(metrics_path)));
228+
assert_eq!(config.loki.auth.password, "test-loki-password");
229+
assert_eq!(config.loki.auth.username, "test-loki");
230+
assert_eq!(config.metrics.intervals.len(), 3)
231+
}
232+
233+
#[test]
234+
fn test_interval_parse() {
235+
let test_str = "
236+
url = \"http://localhost\"
237+
238+
[intervals]
239+
metric_1 = { Weekly = 1 }
240+
metric_2 = { Secondly = 10 }
241+
";
242+
let intervals: MetricIntervalsConfig = toml::from_str(&test_str).unwrap();
243+
println!("{intervals:?}");
244+
assert_eq!(
245+
intervals.intervals,
246+
HashMap::from([
247+
("metric_1".into(), MetricInterval::Weekly(1)),
248+
("metric_2".into(), MetricInterval::Secondly(10)),
249+
])
250+
);
251+
}
222252
}

0 commit comments

Comments
 (0)