Skip to content

Commit 6b33900

Browse files
committed
feat: option to use basic auth for elastic
1 parent b9041d2 commit 6b33900

2 files changed

Lines changed: 41 additions & 10 deletions

File tree

src/config.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,33 @@ pub struct RedisConfig {
4747
#[derive(Clone, Debug, Deserialize)]
4848
pub struct ElasticConfig {
4949
pub url: UrlPort,
50-
pub api_key: String,
50+
pub api_key: Option<String>,
51+
pub username: Option<String>,
52+
pub password: Option<String>,
5153
#[serde(default = "default_chunk_size")]
5254
pub chunk_size: u16,
5355
#[serde(default = "default_index")]
5456
pub index: String,
5557
}
5658

59+
impl ElasticConfig {
60+
pub fn credentials(&self) -> Result<elasticsearch::auth::Credentials, &str> {
61+
if let Some(api_key) = &self.api_key {
62+
Ok(elasticsearch::auth::Credentials::EncodedApiKey(
63+
api_key.into(),
64+
))
65+
} else if let Some(username) = &self.username
66+
&& let Some(password) = &self.password
67+
{
68+
Ok(elasticsearch::auth::Credentials::Basic(
69+
username.to_owned(),
70+
password.to_owned(),
71+
))
72+
} else {
73+
Err("No credentials in config!")
74+
}
75+
}
76+
}
5777
#[derive(Clone, Debug, Deserialize)]
5878
pub struct IngestorConfig {
5979
pub redis: RedisConfig,
@@ -113,7 +133,7 @@ port = 9876
113133
let config: IngestorConfig = toml::from_str(&test_str).unwrap();
114134
assert_eq!(config.redis.url.full_url(), "http://127.0.0.1:12345");
115135
assert_eq!(config.elastic.url.full_url(), "http://127.0.0.1:9876");
116-
assert_eq!(config.elastic.api_key, "abcdefgh==");
136+
assert_eq!(config.elastic.api_key, Some("abcdefgh==".into()));
117137
}
118138

119139
#[test]
@@ -138,7 +158,7 @@ api_key = \"testkey\"
138158
";
139159
let elastic: ElasticConfig = toml::from_str(&test_str).unwrap();
140160
assert_eq!(elastic.chunk_size, 100);
141-
assert_eq!(elastic.api_key, "testkey");
161+
assert_eq!(elastic.api_key, Some("testkey".into()));
142162
assert_eq!(elastic.url.full_url(), "http://localhost:9200");
143163
}
144164

src/elastic_push.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
use elasticsearch::{Elasticsearch, http::request::JsonBody};
22
use tokio::sync::mpsc;
33

4-
use std::iter::once;
4+
use std::{error::Error, iter::once};
55

66
use crate::{config::ElasticConfig, redis_logs::LogRecord};
77

8-
fn elastic_client(url: &str, api_key: &str) -> Result<Elasticsearch, elasticsearch::Error> {
9-
let url = elasticsearch::http::Url::parse(url)?;
8+
fn elastic_client(config: &ElasticConfig) -> Result<Elasticsearch, Box<dyn Error>> {
9+
let url = elasticsearch::http::Url::parse(&config.url.full_url())?;
1010
let conn_pool = elasticsearch::http::transport::SingleNodeConnectionPool::new(url);
11-
let credentials = elasticsearch::auth::Credentials::EncodedApiKey(api_key.into());
11+
let credentials = config.credentials()?;
1212
let transport = elasticsearch::http::transport::TransportBuilder::new(conn_pool)
1313
.auth(credentials)
1414
.build()?;
@@ -41,8 +41,7 @@ fn make_json_body(
4141
}
4242

4343
pub async fn consumer_loop(rx: &mut mpsc::UnboundedReceiver<LogRecord>, config: ElasticConfig) {
44-
let elastic_client = elastic_client(&config.url.full_url(), &config.api_key)
45-
.expect("Failed to connect to Elastic!");
44+
let elastic_client = elastic_client(&config).expect("Failed to connect to Elastic!");
4645

4746
let mut buffer: Vec<LogRecord> = Vec::with_capacity(config.chunk_size.into());
4847

@@ -69,6 +68,8 @@ pub async fn consumer_loop(rx: &mut mpsc::UnboundedReceiver<LogRecord>, config:
6968

7069
#[cfg(test)]
7170
mod tests {
71+
use crate::config::UrlPort;
72+
7273
use super::*;
7374
use serde::{Deserialize, Serialize};
7475

@@ -156,7 +157,17 @@ mod tests {
156157

157158
#[test]
158159
fn test_elastic_client_invalid_url() {
159-
let result = elastic_client("not a url", "apikey");
160+
let result = elastic_client(&ElasticConfig {
161+
url: UrlPort {
162+
url: "not an url".into(),
163+
port: 9876,
164+
},
165+
api_key: Some("key".into()),
166+
username: None,
167+
password: None,
168+
chunk_size: 8,
169+
index: "".into(),
170+
});
160171
assert!(result.is_err());
161172
}
162173
}

0 commit comments

Comments
 (0)