Skip to content

Commit c45a112

Browse files
committed
feat: implement get logs route + log body payload to clickhouse
1 parent 4cb0be1 commit c45a112

17 files changed

+172
-44
lines changed

Diff for: Cargo.lock

+2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ thiserror = "2.0.11"
3535
confik = "0.12.0"
3636

3737
## json
38+
serde = { version = "1.0.217", features = ["derive"] }
39+
serde_json = "1.0.137"
3840
sonic-rs = "0.3.17"
3941

4042
## another

Diff for: src/clickhouse/logger.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ impl ClickhouseLogger {
1717
}
1818

1919
pub async fn log(&self, log: WebServerLog) -> Result<()> {
20-
println!("Logging to Clickhouse: {:?}", log);
20+
tracing::debug!("Logging to Clickhouse: {:?}", log);
2121

2222
let connection = self
2323
.clickhouse_pool
@@ -31,7 +31,7 @@ impl ClickhouseLogger {
3131
.map_err(|e| eyre!("Failed to write log to ClickHouse: {}", e));
3232

3333
if let Err(e) = result {
34-
println!("Error: {:?}", e);
34+
tracing::error!("Error: {:?}", e);
3535
}
3636

3737
Ok(())

Diff for: src/clickhouse/migrations/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ impl MigrationManager {
9797
let migration = self.migrations.get(&version).unwrap();
9898

9999
// Log migration start
100-
println!("Running migration v{}: {}", version, migration.name());
100+
tracing::info!("Running migration v{}: {}", version, migration.name());
101101

102102
// Run migration
103103
migration.apply(pool).await?;
@@ -116,7 +116,7 @@ impl MigrationManager {
116116
}
117117
}
118118

119-
println!("All migrations applied successfully!");
119+
tracing::info!("All migrations applied successfully!");
120120
Ok(())
121121
}
122122

Diff for: src/clickhouse/models/web_server_log.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use chrono::Utc;
22
use klickhouse::Row;
3+
use serde::{Deserialize, Serialize};
34

4-
#[derive(Row, Debug)]
5+
#[derive(Row, Debug, Serialize, Deserialize)]
56
pub struct WebServerLog {
67
pub timestamp: chrono::DateTime<Utc>,
78
pub level: String,

Diff for: src/clickhouse/pool.rs

+7-6
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use klickhouse::*;
33

44
use super::config::ClickhouseConfig;
55
use super::migrations::MigrationManager;
6+
use super::queries::LogsQueries;
67

78
// Queries
89
// use super::queries::trades_queries::TradesQueries;
@@ -51,7 +52,7 @@ impl ClickhousePool {
5152
.map_err(|e| eyre!("Failed to check Clickhouse connection: {}", e))?;
5253

5354
if result.0 == 1 {
54-
println!("Clickhouse connection is getting ok");
55+
tracing::info!("Clickhouse connection is getting ok");
5556
} else {
5657
return Err(eyre!("Failed to check Clickhouse connection"));
5758
}
@@ -67,10 +68,11 @@ impl ClickhousePool {
6768
Ok(())
6869
}
6970

70-
// pub async fn get_trades_queries(&self) -> Result<TradesQueries> {
71-
// let connection = self.get_connection().await?;
72-
// Ok(TradesQueries::new(connection))
73-
// }
71+
// Queries
72+
pub async fn get_logs_queries(&self) -> Result<LogsQueries> {
73+
let connection = self.get_connection().await?;
74+
Ok(LogsQueries::new(connection))
75+
}
7476
}
7577

7678
#[cfg(test)]
@@ -88,7 +90,6 @@ mod tests {
8890
pool.check_pool().await.unwrap();
8991

9092
let version = MigrationManager::get_current_version(&pool).await.unwrap();
91-
println!("Current version: {}", version);
9293
assert!(version > 0);
9394
}
9495
}

Diff for: src/clickhouse/queries/logs_query.rs

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
use eyre::Result;
2+
3+
use crate::clickhouse::models::WebServerLog;
4+
use crate::clickhouse::pool::ClickhouseConnection;
5+
6+
pub struct LogsQueries<'a> {
7+
conn: ClickhouseConnection<'a>,
8+
}
9+
10+
impl<'a> LogsQueries<'a> {
11+
pub fn new(conn: ClickhouseConnection<'a>) -> Self {
12+
Self { conn }
13+
}
14+
15+
pub async fn get_logs(&self, limit: usize, offset: usize) -> Result<Vec<WebServerLog>> {
16+
let query = format!(
17+
"SELECT * FROM web_server_logs ORDER BY timestamp DESC LIMIT {} OFFSET {}",
18+
limit, offset
19+
);
20+
let logs = match self.conn.query_collect::<WebServerLog>(query).await {
21+
Ok(logs) => {
22+
tracing::debug!("Fetched {} logs", logs.len());
23+
logs
24+
}
25+
Err(e) => {
26+
tracing::error!("Error fetching logs: {}", e);
27+
return Err(e.into());
28+
}
29+
};
30+
31+
Ok(logs)
32+
}
33+
}

Diff for: src/clickhouse/queries/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
1+
pub mod logs_query;
12

3+
pub use logs_query::LogsQueries;

Diff for: src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub mod cli;
22
pub mod clickhouse;
33
pub mod config;
4+
pub mod services;
45
pub mod web;

Diff for: src/services/get_web_logs_service.rs

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
use crate::clickhouse::{models::WebServerLog, ClickhousePool};
2+
use eyre::{eyre, Result};
3+
4+
pub struct GetWebLogsService {}
5+
6+
impl GetWebLogsService {
7+
pub async fn get_logs(
8+
clickhouse_pool: &ClickhousePool,
9+
limit: usize,
10+
offset: usize,
11+
) -> Result<Vec<WebServerLog>> {
12+
// setup default values
13+
let logs_queries = clickhouse_pool.get_logs_queries().await?;
14+
15+
match logs_queries.get_logs(limit, offset).await {
16+
Ok(logs) => Ok(logs),
17+
Err(e) => Err(eyre!("Error getting logs: {}", e)),
18+
}
19+
}
20+
}

Diff for: src/services/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
pub mod get_web_logs_service;
2+
3+
pub use get_web_logs_service::GetWebLogsService;

Diff for: src/web/app_state.rs

+4
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ impl AppState {
4141
self.clickhouse_pool.check_pool().await
4242
}
4343

44+
pub fn clickhouse_pool(&self) -> &ClickhousePool {
45+
&self.clickhouse_pool
46+
}
47+
4448
pub fn config(&self) -> &AppConfig {
4549
&self.config
4650
}

Diff for: src/web/global_panic_handler.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::panic;
44

55
pub fn setup_global_panic_handler(app_state: Data<AppState>) {
66
panic::set_hook(Box::new(move |info| {
7-
println!("Global panic handler called");
7+
tracing::error!("Global panic handler called");
88
let app_state = app_state.clone();
99

1010
let location = info
@@ -32,7 +32,7 @@ pub fn setup_global_panic_handler(app_state: Data<AppState>) {
3232

3333
// Write log to Clickhouse
3434
if let Err(e) = app_state.ch_logger().log(log).await {
35-
eprintln!("Failed to log panic to ClickHouse: {:?}", e);
35+
tracing::error!("Failed to log panic to ClickHouse: {:?}", e);
3636
}
3737
});
3838
}));

Diff for: src/web/handlers/get_logs_handler.rs

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
use crate::web::app_state::AppState;
2+
use actix_web::{get, web, HttpResponse, Responder};
3+
use serde::Deserialize;
4+
5+
use crate::services::GetWebLogsService;
6+
7+
#[derive(Deserialize)]
8+
pub struct WebLogsQuery {
9+
pub limit: Option<usize>, // Limit of logs to return, default 10
10+
pub offset: Option<usize>, // Offset of logs to return, default 0
11+
}
12+
13+
#[get("/logs")]
14+
pub async fn get_logs(
15+
app_state: web::Data<AppState>,
16+
query: web::Query<WebLogsQuery>,
17+
) -> impl Responder {
18+
let clickhouse_pool = app_state.clickhouse_pool();
19+
20+
// setup default values
21+
let limit = query.limit.unwrap_or(10);
22+
let offset = query.offset.unwrap_or(0);
23+
24+
match GetWebLogsService::get_logs(clickhouse_pool, limit, offset).await {
25+
Ok(logs) => HttpResponse::Ok().json(logs),
26+
Err(e) => HttpResponse::InternalServerError().json(e.to_string()),
27+
}
28+
}

Diff for: src/web/handlers/mod.rs

+4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
pub mod get_logs_handler;
2+
3+
pub use get_logs_handler::get_logs;
4+
15
use crate::web::app_state::AppState;
26
use actix_web::{get, web, HttpResponse, Responder};
37
use tracing_actix_web::RequestId;

Diff for: src/web/middleware/clickhouse_logger.rs

+36-15
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use actix_web::{
2+
body::{BoxBody, MessageBody},
23
dev::{Service, ServiceRequest, ServiceResponse, Transform},
3-
Error, HttpMessage,
4+
web::Bytes,
5+
Error, HttpMessage, HttpResponse,
46
};
57
use futures::future::{ok, Ready};
68
use std::{
@@ -20,10 +22,9 @@ pub struct ClickhouseLogger;
2022
impl<S, B> Transform<S, ServiceRequest> for ClickhouseLogger
2123
where
2224
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error> + 'static,
23-
S::Future: 'static,
24-
B: 'static,
25+
B: MessageBody + 'static,
2526
{
26-
type Response = ServiceResponse<B>;
27+
type Response = ServiceResponse<BoxBody>;
2728
type Error = Error;
2829
type Transform = ClickhouseLoggerMiddleware<S>;
2930
type InitError = ();
@@ -43,10 +44,9 @@ pub struct ClickhouseLoggerMiddleware<S> {
4344
impl<S, B> Service<ServiceRequest> for ClickhouseLoggerMiddleware<S>
4445
where
4546
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error> + 'static,
46-
S::Future: 'static,
47-
B: 'static,
47+
B: MessageBody + 'static,
4848
{
49-
type Response = ServiceResponse<B>;
49+
type Response = ServiceResponse<BoxBody>;
5050
type Error = Error;
5151
type Future = Pin<Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>>>>;
5252

@@ -75,33 +75,54 @@ where
7575
match result {
7676
Ok(res) => {
7777
// Успешный запрос
78+
let (req_head, http_resp) = res.into_parts();
79+
7880
let duration = start_time.elapsed();
79-
let request_id = res
80-
.request()
81+
let request_id = req_head
8182
.extensions()
8283
.get::<RequestId>()
8384
.map(|id| id.to_string())
8485
.unwrap_or_default();
8586

86-
let uri = res.request().uri().to_string();
87-
let method = res.request().method().to_string();
88-
let status_code = res.status().as_u16() as i32;
87+
let uri = req_head.uri().to_string();
88+
let method = req_head.method().to_string();
89+
let status_code = http_resp.status();
90+
91+
let body_bytes_result =
92+
actix_web::body::to_bytes(http_resp.into_body()).await;
93+
let message = match body_bytes_result {
94+
Ok(ref bytes) => format!(
95+
"Request processed. Body: {}",
96+
String::from_utf8_lossy(bytes)
97+
),
98+
Err(_) => "Request processed. Failed to read body.".to_string(),
99+
};
89100

90101
let log = WebServerLog {
91102
timestamp: chrono::Utc::now(),
92103
level: "INFO".to_string(),
93-
message: "Request processed".to_string(),
104+
message,
94105
module: "web_server".to_string(),
95106
request_id,
96107
uri,
97108
method,
98-
status_code,
109+
status_code: status_code.as_u16() as i32,
99110
response_time: duration.as_secs_f64(),
100111
};
101112

102113
let _ = app_state.ch_logger().log(log).await;
103114

104-
Ok(res)
115+
let new_body = match body_bytes_result {
116+
Ok(ref b) => b.clone(),
117+
Err(_) => Bytes::new(),
118+
};
119+
120+
let new_http_response = HttpResponse::build(status_code).body(new_body);
121+
let new_srv_response =
122+
ServiceResponse::new(req_head, new_http_response)
123+
.map_into_boxed_body();
124+
125+
Ok(new_srv_response)
105126
}
106127
Err(e) => {
107128
// Ошибка от следующего middleware/handler

Diff for: src/web/startup.rs

+22-16
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use actix_web::middleware::NormalizePath;
2-
use actix_web::{web, App, HttpServer};
2+
use actix_web::{web, App, HttpResponse, HttpServer};
33
use eyre::Result;
44
use tracing_actix_web::TracingLogger;
55

@@ -10,7 +10,8 @@ use crate::web::middleware::ClickhouseLogger;
1010

1111
use crate::web::app_state::AppState;
1212
use crate::web::global_panic_handler::setup_global_panic_handler;
13-
use crate::web::handlers::{fail_endpoint, health, index};
13+
// handlers
14+
use crate::web::handlers::{fail_endpoint, get_logs, health, index};
1415

1516
pub async fn run_serve(config: AppConfig) -> Result<actix_web::dev::Server> {
1617
let app_state = AppState::build(config).await?;
@@ -20,20 +21,25 @@ pub async fn run_serve(config: AppConfig) -> Result<actix_web::dev::Server> {
2021
let port = app_state.config().http_port;
2122
let addr = format!("0.0.0.0:{}", port);
2223

23-
let server = HttpServer::new(move || {
24-
App::new()
25-
.app_data(data_app_state.clone())
26-
.wrap(TracingLogger::default())
27-
.wrap(NormalizePath::new(
28-
actix_web::middleware::TrailingSlash::Trim,
29-
))
30-
.wrap(ClickhouseLogger)
31-
.service(index)
32-
.service(health)
33-
.service(fail_endpoint)
34-
})
35-
.bind(addr)?
36-
.run();
24+
let server =
25+
HttpServer::new(move || {
26+
App::new()
27+
.app_data(data_app_state.clone())
28+
.wrap(TracingLogger::default())
29+
.wrap(NormalizePath::new(
30+
actix_web::middleware::TrailingSlash::Trim,
31+
))
32+
.wrap(ClickhouseLogger)
33+
.service(index)
34+
.service(health)
35+
.service(fail_endpoint)
36+
.service(get_logs)
37+
.default_service(web::route().to(|| async {
38+
HttpResponse::NotFound().body("Oops! This route does not exist.")
39+
}))
40+
})
41+
.bind(addr)?
42+
.run();
3743

3844
Ok(server)
3945
}

0 commit comments

Comments
 (0)