Skip to content

Commit eb49fba

Browse files
committed
feat: replace elastic with loki
- add skeleton for metrics in config
1 parent bf0a4fd commit eb49fba

14 files changed

Lines changed: 825 additions & 877 deletions

Cargo.lock

Lines changed: 413 additions & 558 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "bec_log_ingestor"
3-
description = "Tiny service to pull BEC logs from Redis and push them to Elastic"
3+
description = "Tiny service to pull BEC logs from Redis and push them to Loki"
44
readme = "README.md"
55
repository = "https://github.com/bec-project/bec_log_ingestor"
66
keywords = ["logging", "BEC"]
@@ -19,13 +19,13 @@ pre_install_script_prog = ["/bin/bash"]
1919
pre_install_script_flags = 0b100
2020

2121
[dependencies]
22-
chrono = "0.4.41"
23-
clap = { version = "4.5.42", features = ["derive"] }
24-
elasticsearch = "9.0.0-alpha.1"
25-
redis = "0.32.4"
22+
chrono = "0.4.42"
23+
clap = { version = "4.5.52", features = ["derive"] }
24+
redis = "0.32.7"
25+
reqwest = "0.12.24"
2626
rmp-serde = "1.3.0"
27-
serde = "1.0.219"
28-
serde_derive = "1.0.219"
29-
serde_json = "1.0.142"
30-
tokio = "1.47.0"
31-
toml = "0.9.5"
27+
serde = "1.0.228"
28+
serde_derive = "1.0.228"
29+
serde_json = "1.0.145"
30+
tokio = { version = "1.48.0", features = ["full", "rt-multi-thread", "sync", "tokio-macros"] }
31+
toml = "0.9.8"

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
[![Linting](https://github.com/bec-project/bec_log_ingestor/actions/workflows/check-and-lint.yaml/badge.svg)](https://github.com/bec-project/bec_log_ingestor/actions/workflows/check-and-lint.yaml) [![codecov](https://codecov.io/gh/bec-project/bec_log_ingestor/graph/badge.svg?token=B7Mzj4EhzH)](https://codecov.io/gh/bec-project/bec_log_ingestor)
44

5-
Tiny service to pull BEC logs from Redis and push them to Elastic.
5+
Tiny service to pull BEC logs from Redis and push them to Loki.
66

77
To use the systemd service as-is, a config should be created at `/etc/bec_log_ingestor.toml`, following the example in `install/example_config.toml`.
88

install/example_config.toml

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,18 @@ consumer_id = "log-ingestor"
88
url = "redis://127.0.0.1"
99
port = 6379
1010

11-
[elastic]
12-
api_key = "RjhrMWY1Z0J4ZjV0T0NJQmIzdjU6ZjVURGdmWmVCM3I3ckd2ZmFLUXl6UQ=="
11+
[loki]
12+
url = "http://localhost"
13+
auth = { username = "test-loki", password = "test-loki-password" }
1314
chunk_size = 100
14-
index = "logstash-bec_test123"
1515
beamline_name = "x99xa"
1616

17-
[elastic.url]
17+
[metrics]
18+
user_config_path = "./test/test_metrics_config.toml"
1819
url = "http://localhost"
19-
port = 9200
20+
auth = { username = "test-mimir", password = "test-mimir-password" }
21+
22+
[metrics.intervals]
23+
new_metric = { Secondly = 53 }
24+
cpu_usage = { Secondly = 15 }
25+
ram_usage = { Secondly = 30 }
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[intervals]
2+
repo_git_status = { Minutely = 30 }
3+
cpu_usage = { Secondly = 30 }

install/setup_user

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22
if id "bec_log_ingestor" &>/dev/null; then
33
echo "User 'bec_log_ingestor' already exists."
44
else
5-
useradd -r -c 'BEC Logs Elasticsearch Ingestor' -d /var/lib/bec_log_ingestor -s /sbin/nologin bec_log_ingestor
5+
useradd -r -c 'BEC Logs Loki Ingestor' -d /var/lib/bec_log_ingestor -s /sbin/nologin bec_log_ingestor
66
echo "User 'bec_log_ingestor' created."
77
fi

src/config.rs

Lines changed: 154 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,22 @@
1-
use std::{fmt, io::Read};
1+
use std::{collections::HashMap, fmt, io::Read};
22

3-
use serde::Deserialize;
3+
use serde_derive::{Deserialize, Serialize};
44

5-
#[derive(Clone, Debug, Deserialize)]
5+
pub trait FromTomlFile {
6+
/// Parse a toml file for an IngestorConfig. Assumes the file exists and is readable.
7+
fn from_file(path: std::path::PathBuf) -> Self
8+
where
9+
Self: for<'de> serde::Deserialize<'de>,
10+
{
11+
let mut file = std::fs::File::open(path).expect("Cannot open supplied config file!");
12+
let mut contents = String::new();
13+
file.read_to_string(&mut contents)
14+
.expect("Cannot read supplied config file!");
15+
let self_name = std::any::type_name::<Self>();
16+
toml::from_str(&contents).expect(format!("Invalid TOML for {self_name} struct").as_str())
17+
}
18+
}
19+
#[derive(Clone, Debug, Serialize, Deserialize)]
620
pub struct UrlPort {
721
pub url: String,
822
pub port: u16,
@@ -14,7 +28,20 @@ impl UrlPort {
1428
}
1529
}
1630

17-
/// Default number of records to read from Redis or push to Elastic at once
31+
#[derive(Clone, Deserialize)]
32+
33+
pub struct BasicAuth {
34+
pub username: String,
35+
pub password: String,
36+
}
37+
38+
impl fmt::Debug for BasicAuth {
39+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
40+
write!(f, "Basic auth: provided",)
41+
}
42+
}
43+
44+
/// Default number of records to read from Redis or push to Loki at once
1845
fn default_chunk_size() -> u16 {
1946
100
2047
}
@@ -26,10 +53,6 @@ fn default_blocktime_millis() -> usize {
2653
fn default_consumer() -> String {
2754
"log-ingestor".into()
2855
}
29-
/// Default value for the elastic index
30-
fn default_index() -> String {
31-
"logstash-bec_test123".into()
32-
}
3356
/// Default value for the beamline name
3457
fn default_beamline_name() -> String {
3558
"x99xa".into()
@@ -48,77 +71,72 @@ pub struct RedisConfig {
4871
pub consumer_id: String,
4972
}
5073

51-
#[derive(Clone, Deserialize)]
52-
pub struct ElasticConfig {
53-
pub url: UrlPort,
54-
pub api_key: Option<String>,
55-
pub username: Option<String>,
56-
pub password: Option<String>,
74+
#[derive(Clone, Debug, Deserialize)]
75+
pub struct LokiConfig {
76+
pub url: String,
77+
pub auth: BasicAuth,
5778
#[serde(default = "default_chunk_size")]
5879
pub chunk_size: u16,
59-
#[serde(default = "default_index")]
60-
pub index: String,
6180
#[serde(default = "default_beamline_name")]
6281
pub beamline_name: String,
6382
}
6483

65-
impl fmt::Debug for ElasticConfig {
66-
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
67-
let auth_mode = {
68-
if self.api_key.is_some() {
69-
"api key"
70-
} else {
71-
"basic auth"
72-
}
73-
};
74-
write!(
75-
f,
76-
"{{
77-
url: {:?}
78-
auth mode: {:?}
79-
chunk size: {:?}
80-
index: {:?}
81-
beamline: {:?}
82-
}}",
83-
self.url, auth_mode, self.chunk_size, self.index, self.beamline_name
84-
)
85-
}
86-
}
87-
88-
impl ElasticConfig {
89-
pub fn credentials(&self) -> Result<elasticsearch::auth::Credentials, &str> {
90-
if let Some(api_key) = &self.api_key {
91-
Ok(elasticsearch::auth::Credentials::EncodedApiKey(
92-
api_key.into(),
93-
))
94-
} else if let Some(username) = &self.username
95-
&& let Some(password) = &self.password
96-
{
97-
Ok(elasticsearch::auth::Credentials::Basic(
98-
username.to_owned(),
99-
password.to_owned(),
100-
))
101-
} else {
102-
Err("No credentials in config!")
103-
}
104-
}
84+
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
85+
pub enum MetricInterval {
86+
Secondly(i32),
87+
Minutely(i32),
88+
Hourly(i32),
89+
Daily(i32),
90+
Weekly(i32),
91+
}
92+
#[derive(Clone, Serialize, Deserialize, Debug)]
93+
pub struct MetricIntervalsConfig {
94+
#[serde(default = "HashMap::new")]
95+
pub intervals: HashMap<String, MetricInterval>,
96+
}
97+
impl FromTomlFile for MetricIntervalsConfig {}
98+
99+
#[derive(Clone, Debug, Deserialize)]
100+
pub struct MetricsConfig {
101+
pub user_config_path: Option<std::path::PathBuf>,
102+
pub auth: BasicAuth,
103+
pub url: String,
104+
#[serde(default = "HashMap::new")]
105+
pub intervals: HashMap<String, MetricInterval>,
105106
}
106107

107108
#[derive(Clone, Debug, Deserialize)]
108109
pub struct IngestorConfig {
109110
pub redis: RedisConfig,
110-
pub elastic: ElasticConfig,
111+
pub loki: LokiConfig,
112+
pub metrics: MetricsConfig,
111113
}
114+
impl FromTomlFile for IngestorConfig {}
112115

113-
impl IngestorConfig {
114-
/// Parse a toml file for an IngestorConfig. Assumes the file exists and is readable.
115-
pub fn from_file(path: std::path::PathBuf) -> Self {
116-
let mut file = std::fs::File::open(path).expect("Cannot open supplied config file!");
117-
let mut contents = String::new();
118-
file.read_to_string(&mut contents)
119-
.expect("Cannot read supplied config file!");
120-
toml::from_str(&contents).unwrap()
116+
/// Assemble a full IngestorConfig from the paths from commandline arguments
117+
/// The path for the main config must be supplied.
118+
///
119+
/// Metric interval configuration can be supplied in three places, in decreasing order of preference:
120+
/// - in a file with the commandline -m flag
121+
/// - in a file referenced as `metrics.user_config_path` in the main config file
122+
/// - directly under [metrics.intervals] in the main config file
123+
///
124+
/// The file referenced in the main config doesn't have to exist, if not, it will be ignored
125+
pub fn assemble_config(paths: (std::path::PathBuf, Option<std::path::PathBuf>)) -> IngestorConfig {
126+
let (path, metrics_path) = paths;
127+
let mut config = IngestorConfig::from_file(path);
128+
if let Some(ref intervals_reference_file) = config.metrics.user_config_path {
129+
if intervals_reference_file.exists() {
130+
let intervals = MetricIntervalsConfig::from_file(intervals_reference_file.clone());
131+
config.metrics.intervals.extend(intervals.intervals);
132+
} else {
133+
println!("Metric intervals config file not found at {intervals_reference_file:?}")
134+
}
121135
}
136+
if let Some(intervals) = metrics_path.map(MetricIntervalsConfig::from_file) {
137+
config.metrics.intervals.extend(intervals.intervals);
138+
}
139+
config
122140
}
123141

124142
#[cfg(test)]
@@ -153,17 +171,19 @@ port = 12345
153171
url = \"http://127.0.0.1\"
154172
port = 12345
155173
156-
[elastic]
157-
api_key = \"abcdefgh==\"
174+
[loki]
175+
auth = { username = \"test_user\", password = \"test_password\" }
176+
url = \"http://127.0.0.1/api/v1/push\"
158177
159-
[elastic.url]
178+
[metrics]
179+
auth = { username = \"test_user\", password = \"test_password\" }
160180
url = \"http://127.0.0.1\"
161-
port = 9876
162181
";
163182
let config: IngestorConfig = toml::from_str(&test_str).unwrap();
164183
assert_eq!(config.redis.url.full_url(), "http://127.0.0.1:12345");
165-
assert_eq!(config.elastic.url.full_url(), "http://127.0.0.1:9876");
166-
assert_eq!(config.elastic.api_key, Some("abcdefgh==".into()));
184+
assert_eq!(config.loki.url, "http://127.0.0.1/api/v1/push");
185+
assert_eq!(config.loki.auth.username, "test_user");
186+
assert_eq!(config.loki.auth.password, "test_password");
167187
}
168188

169189
#[test]
@@ -180,18 +200,6 @@ port = 6379
180200
assert_eq!(redis.consumer_id, "log-ingestor");
181201
}
182202

183-
#[test]
184-
fn test_elastic_defaults() {
185-
let test_str = "
186-
url = { url = \"http://localhost\", port = 9200 }
187-
api_key = \"testkey\"
188-
";
189-
let elastic: ElasticConfig = toml::from_str(&test_str).unwrap();
190-
assert_eq!(elastic.chunk_size, 100);
191-
assert_eq!(elastic.api_key, Some("testkey".into()));
192-
assert_eq!(elastic.url.full_url(), "http://localhost:9200");
193-
}
194-
195203
#[test]
196204
fn test_invalid_urlport_missing_field() {
197205
let test_str = "
@@ -219,4 +227,67 @@ this is not toml
219227
});
220228
assert!(result.is_err());
221229
}
230+
231+
#[test]
232+
fn test_from_file_success() {
233+
use std::path::PathBuf;
234+
let path = PathBuf::from("./install/example_config.toml");
235+
let config = IngestorConfig::from_file(path);
236+
assert_eq!(config.loki.auth.password, "test-loki-password");
237+
assert_eq!(config.loki.auth.username, "test-loki");
238+
}
239+
240+
#[test]
241+
fn test_assembled_from_file_success() {
242+
use std::path::PathBuf;
243+
let path = PathBuf::from("./install/example_config.toml");
244+
let metrics_path = PathBuf::from("./install/example_metrics_config.toml");
245+
let config = assemble_config((path, Some(metrics_path)));
246+
assert_eq!(config.loki.auth.password, "test-loki-password");
247+
assert_eq!(config.loki.auth.username, "test-loki");
248+
assert_eq!(config.metrics.intervals.len(), 4);
249+
}
250+
251+
#[test]
252+
fn test_assembled_from_file_preference_order() {
253+
use std::path::PathBuf;
254+
let path = PathBuf::from("./install/example_config.toml");
255+
let metrics_path = PathBuf::from("./install/example_metrics_config.toml");
256+
let config = assemble_config((path, Some(metrics_path)));
257+
assert_eq!(config.loki.auth.password, "test-loki-password");
258+
assert_eq!(config.loki.auth.username, "test-loki");
259+
assert_eq!(config.metrics.intervals.len(), 4);
260+
assert_eq!(
261+
config.metrics.intervals.get("cpu_usage"),
262+
Some(&MetricInterval::Secondly(30)) // from commandline arg
263+
);
264+
assert_eq!(
265+
config.metrics.intervals.get("ram_usage"),
266+
Some(&MetricInterval::Secondly(75)) // from referenced file
267+
);
268+
assert_eq!(
269+
config.metrics.intervals.get("new_metric"),
270+
Some(&MetricInterval::Secondly(53)) // from direct config file
271+
);
272+
}
273+
274+
#[test]
275+
fn test_interval_parse() {
276+
let test_str = "
277+
url = \"http://localhost\"
278+
279+
[intervals]
280+
metric_1 = { Weekly = 1 }
281+
metric_2 = { Secondly = 10 }
282+
";
283+
let intervals: MetricIntervalsConfig = toml::from_str(&test_str).unwrap();
284+
println!("{intervals:?}");
285+
assert_eq!(
286+
intervals.intervals,
287+
HashMap::from([
288+
("metric_1".into(), MetricInterval::Weekly(1)),
289+
("metric_2".into(), MetricInterval::Secondly(10)),
290+
])
291+
);
292+
}
222293
}

0 commit comments

Comments
 (0)