Skip to content

Commit 3101cab

Browse files
DRY
1 parent c5df6dd commit 3101cab

1 file changed

Lines changed: 63 additions & 76 deletions

File tree

src/server/flightsql/service.rs

Lines changed: 63 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use datafusion::logical_expr::LogicalPlan;
2727
use datafusion::sql::parser::DFParser;
2828
use datafusion_app::local::ExecutionContext;
2929
use datafusion_app::observability::ObservabilityRequestDetails;
30-
use futures::{StreamExt, TryStreamExt};
30+
use futures::{Stream, StreamExt, TryStreamExt};
3131
use jiff::Timestamp;
3232
use log::{debug, error, info};
3333
use metrics::{counter, histogram};
@@ -100,6 +100,41 @@ impl FlightSqlServiceImpl {
100100
}
101101
}
102102

103+
async fn record_request(
104+
&self,
105+
start: Timestamp,
106+
request_id: Option<String>,
107+
response_err: Option<&Status>,
108+
path: String,
109+
latency_metric: &'static str,
110+
) {
111+
let duration = Timestamp::now() - start;
112+
let grpc_code = match &response_err {
113+
None => Code::Ok,
114+
Some(status) => status.code(),
115+
};
116+
let ctx = self.execution.session_ctx();
117+
let req = ObservabilityRequestDetails {
118+
request_id,
119+
path,
120+
sql: None,
121+
rows: None,
122+
start_ms: start.as_millisecond(),
123+
duration_ms: duration.get_milliseconds(),
124+
status: grpc_code as u16,
125+
};
126+
if let Err(e) = self
127+
.execution
128+
.observability()
129+
.try_record_request(ctx, req)
130+
.await
131+
{
132+
error!("Error recording request: {}", e.to_string())
133+
}
134+
135+
histogram!(latency_metric).record(duration.get_milliseconds() as f64);
136+
}
137+
103138
async fn get_flight_info_statement_handler(
104139
&self,
105140
query: String,
@@ -195,32 +230,16 @@ impl FlightSqlService for FlightSqlServiceImpl {
195230
let res = self
196231
.get_flight_info_statement_handler(query.clone(), request_id, request)
197232
.await;
198-
let duration = Timestamp::now() - start;
199-
200-
let grpc_code = match &res {
201-
Ok(_) => Code::Ok,
202-
Err(status) => status.code(),
203-
};
204233

205-
let ctx = self.execution.session_ctx();
206-
let req = ObservabilityRequestDetails {
207-
request_id: Some(request_id.to_string()),
208-
path: "GetFlightInfo".to_string(),
209-
sql: Some(query),
210-
rows: None,
211-
start_ms: start.as_millisecond(),
212-
duration_ms: duration.get_milliseconds(),
213-
status: grpc_code as u16,
214-
};
215-
if let Err(e) = self
216-
.execution
217-
.observability()
218-
.try_record_request(ctx, req)
219-
.await
220-
{
221-
error!("Error recording request: {}", e.to_string())
222-
}
223-
histogram!("get_flight_info_latency_ms").record(duration.get_milliseconds() as f64);
234+
// TODO: Move recording to after response is sent to not impact response latency
235+
self.record_request(
236+
start,
237+
Some(request_id.to_string()),
238+
res.as_ref().err(),
239+
"get_flight_info_statement".to_string(),
240+
"get_flight_info_statement_latency_ms",
241+
)
242+
.await;
224243
res
225244
}
226245

@@ -238,31 +257,15 @@ impl FlightSqlService for FlightSqlServiceImpl {
238257
.do_get_statement_handler(request_id.clone(), ticket)
239258
.await;
240259

241-
let duration = Timestamp::now() - start;
242-
let grpc_code = match &res {
243-
Ok(_) => Code::Ok,
244-
Err(status) => status.code(),
245-
};
246-
let ctx = self.execution.session_ctx();
247-
let req = ObservabilityRequestDetails {
248-
request_id: Some(request_id),
249-
path: "DoGetStatement".to_string(),
250-
sql: None,
251-
rows: None,
252-
start_ms: start.as_millisecond(),
253-
duration_ms: duration.get_milliseconds(),
254-
status: grpc_code as u16,
255-
};
256-
if let Err(e) = self
257-
.execution
258-
.observability()
259-
.try_record_request(ctx, req)
260-
.await
261-
{
262-
error!("Error recording request: {}", e.to_string())
263-
}
264-
265-
histogram!("do_get_statement_latency_ms").record(duration.get_milliseconds() as f64);
260+
// TODO: Move recording to after response is sent to not impact response latency
261+
self.record_request(
262+
start,
263+
Some(request_id),
264+
res.as_ref().err(),
265+
"do_get_statement".to_string(),
266+
"do_get_statement_latency_ms",
267+
)
268+
.await;
266269
res
267270
}
268271

@@ -280,31 +283,15 @@ impl FlightSqlService for FlightSqlServiceImpl {
280283
.do_get_fallback_handler(request_id.clone(), message)
281284
.await;
282285

283-
let duration = Timestamp::now() - start;
284-
let grpc_code = match &res {
285-
Ok(_) => Code::Ok,
286-
Err(status) => status.code(),
287-
};
288-
let ctx = self.execution.session_ctx();
289-
let req = ObservabilityRequestDetails {
290-
request_id: Some(request_id),
291-
path: "DoGetFallback".to_string(),
292-
sql: None,
293-
rows: None,
294-
start_ms: start.as_millisecond(),
295-
duration_ms: duration.get_milliseconds(),
296-
status: grpc_code as u16,
297-
};
298-
if let Err(e) = self
299-
.execution
300-
.observability()
301-
.try_record_request(ctx, req)
302-
.await
303-
{
304-
error!("Error recording request: {}", e.to_string())
305-
}
306-
307-
histogram!("do_get_fallback_latency_ms").record(duration.get_milliseconds() as f64);
286+
// TODO: Move recording to after response is sent to not impact response latency
287+
self.record_request(
288+
start,
289+
Some(request_id),
290+
res.as_ref().err(),
291+
"do_get_fallback".to_string(),
292+
"do_get_fallback_latency_ms",
293+
)
294+
.await;
308295
res
309296
}
310297

0 commit comments

Comments
 (0)