Skip to content

Commit c5df6dd

Browse files
Do get statement
1 parent 35e5259 commit c5df6dd

1 file changed

Lines changed: 88 additions & 39 deletions

File tree

src/server/flightsql/service.rs

Lines changed: 88 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,53 @@ impl FlightSqlServiceImpl {
5353
}
5454
}
5555

56-
/// Return an [`FlightServiceServer`] that can be used with a
56+
/// Return a [`FlightServiceServer`] that can be used with a
5757
/// [`Server`](tonic::transport::Server)
5858
pub fn service(&self) -> FlightServiceServer<Self> {
5959
// wrap up tonic goop
6060
FlightServiceServer::new(self.clone())
6161
}
6262

63+
async fn do_get_common_handler(
64+
&self,
65+
request_id: String,
66+
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
67+
match Uuid::from_str(&request_id) {
68+
Ok(id) => {
69+
info!("getting plan for id: {:?}", id);
70+
// Limit the scope of the lock
71+
let maybe_plan = {
72+
let guard = self
73+
.requests
74+
.lock()
75+
.map_err(|_| Status::internal("Failed to acquire lock on requests"))?;
76+
guard.get(&id).cloned()
77+
};
78+
if let Some(plan) = maybe_plan {
79+
let stream = self
80+
.execution
81+
.execute_logical_plan(plan)
82+
.await
83+
.map_err(|e| Status::internal(e.to_string()))?;
84+
let builder = FlightDataEncoderBuilder::new();
85+
let flight_data_stream = builder
86+
.build(stream.map_err(|e| FlightError::ExternalError(Box::new(e))))
87+
.map_err(|e| Status::internal(e.to_string()))
88+
.boxed();
89+
Ok(Response::new(flight_data_stream))
90+
} else {
91+
Err(Status::internal("Plan not found for id"))
92+
}
93+
}
94+
Err(e) => {
95+
error!("error decoding handle to uuid for {request_id}: {:?}", e);
96+
Err(Status::internal(
97+
"Error decoding handle to uuid for {request_id}",
98+
))
99+
}
100+
}
101+
}
102+
63103
async fn get_flight_info_statement_handler(
64104
&self,
65105
query: String,
@@ -120,47 +160,22 @@ impl FlightSqlServiceImpl {
120160
}
121161
}
122162

163+
async fn do_get_statement_handler(
164+
&self,
165+
request_id: String,
166+
ticket: TicketStatementQuery,
167+
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
168+
debug!("do_get_statement ticket: {:?}", ticket);
169+
self.do_get_common_handler(request_id).await
170+
}
171+
123172
async fn do_get_fallback_handler(
124173
&self,
125174
request_id: String,
126175
message: Any,
127176
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
128177
debug!("do_get_fallback message: {:?}", message);
129-
130-
match Uuid::from_str(&request_id) {
131-
Ok(id) => {
132-
info!("getting plan for id: {:?}", id);
133-
// Limit the scope of the lock
134-
let maybe_plan = {
135-
let guard = self
136-
.requests
137-
.lock()
138-
.map_err(|_| Status::internal("Failed to acquire lock on requests"))?;
139-
guard.get(&id).cloned()
140-
};
141-
if let Some(plan) = maybe_plan {
142-
let stream = self
143-
.execution
144-
.execute_logical_plan(plan)
145-
.await
146-
.map_err(|e| Status::internal(e.to_string()))?;
147-
let builder = FlightDataEncoderBuilder::new();
148-
let flight_data_stream = builder
149-
.build(stream.map_err(|e| FlightError::ExternalError(Box::new(e))))
150-
.map_err(|e| Status::internal(e.to_string()))
151-
.boxed();
152-
Ok(Response::new(flight_data_stream))
153-
} else {
154-
Err(Status::internal("Plan not found for id"))
155-
}
156-
}
157-
Err(e) => {
158-
error!("error decoding handle to uuid for {request_id}: {:?}", e);
159-
Err(Status::internal(
160-
"Error decoding handle to uuid for {request_id}",
161-
))
162-
}
163-
}
178+
self.do_get_common_handler(request_id).await
164179
}
165180
}
166181

@@ -211,10 +226,44 @@ impl FlightSqlService for FlightSqlServiceImpl {
211226

212227
async fn do_get_statement(
213228
&self,
214-
_ticket: TicketStatementQuery,
215-
_request: Request<Ticket>,
229+
ticket: TicketStatementQuery,
230+
request: Request<Ticket>,
216231
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
217-
Err(Status::unimplemented("Not implemented"))
232+
counter!("requests", "endpoint" => "do_get_statement").increment(1);
233+
let start = Timestamp::now();
234+
let request_id =
235+
try_request_id_from_request(request).map_err(|e| Status::internal(e.to_string()))?;
236+
debug!("do_get_statement for request_id: {}", &request_id);
237+
let res = self
238+
.do_get_statement_handler(request_id.clone(), ticket)
239+
.await;
240+
241+
let duration = Timestamp::now() - start;
242+
let grpc_code = match &res {
243+
Ok(_) => Code::Ok,
244+
Err(status) => status.code(),
245+
};
246+
let ctx = self.execution.session_ctx();
247+
let req = ObservabilityRequestDetails {
248+
request_id: Some(request_id),
249+
path: "DoGetStatement".to_string(),
250+
sql: None,
251+
rows: None,
252+
start_ms: start.as_millisecond(),
253+
duration_ms: duration.get_milliseconds(),
254+
status: grpc_code as u16,
255+
};
256+
if let Err(e) = self
257+
.execution
258+
.observability()
259+
.try_record_request(ctx, req)
260+
.await
261+
{
262+
error!("Error recording request: {}", e.to_string())
263+
}
264+
265+
histogram!("do_get_statement_latency_ms").record(duration.get_milliseconds() as f64);
266+
res
218267
}
219268

220269
async fn do_get_fallback(

0 commit comments

Comments
 (0)