Skip to content

Commit 6cd1c62

Browse files
committed
refactor: more explicit error confirmation for http query route error.
1 parent 8b615a1 commit 6cd1c62

File tree

7 files changed

+131
-55
lines changed

7 files changed

+131
-55
lines changed

src/query/service/src/servers/http/v1/http_query_handlers.rs

+42-36
Original file line numberDiff line numberDiff line change
@@ -53,20 +53,20 @@ const HEADER_QUERY_ID: &str = "X-DATABEND-QUERY-ID";
5353
const HEADER_QUERY_STATE: &str = "X-DATABEND-QUERY-STATE";
5454
const HEADER_QUERY_PAGE_ROWS: &str = "X-DATABEND-QUERY-PAGE-ROWS";
5555

56-
pub fn make_page_uri(query_id: &str, page_no: usize) -> String {
57-
format!("/v1/query/{}/page/{}", query_id, page_no)
56+
pub fn make_page_uri(query_id: &str, node_id: &str, page_no: usize) -> String {
57+
format!("/v1/query/{}/{}/page/{}", query_id, node_id, page_no)
5858
}
5959

60-
pub fn make_state_uri(query_id: &str) -> String {
61-
format!("/v1/query/{}", query_id)
60+
pub fn make_state_uri(query_id: &str, node_id: &str) -> String {
61+
format!("/v1/query/{}/{}", query_id, node_id)
6262
}
6363

64-
pub fn make_final_uri(query_id: &str) -> String {
65-
format!("/v1/query/{}/final", query_id)
64+
pub fn make_final_uri(query_id: &str, node_id: &str) -> String {
65+
format!("/v1/query/{}/{}/final", query_id, node_id)
6666
}
6767

68-
pub fn make_kill_uri(query_id: &str) -> String {
69-
format!("/v1/query/{}/kill", query_id)
68+
pub fn make_kill_uri(query_id: &str, node_id: &str) -> String {
69+
format!("/v1/query/{}/{}/kill", query_id, node_id)
7070
}
7171

7272
#[derive(Serialize, Deserialize, Debug, Clone)]
@@ -146,28 +146,31 @@ impl QueryResponse {
146146
r: HttpQueryResponseInternal,
147147
is_final: bool,
148148
) -> impl IntoResponse {
149+
let node_id = r.node_id.clone();
149150
let state = r.state.clone();
150151
let (data, next_uri) = if is_final {
151152
(JsonBlock::empty(), None)
152153
} else {
153154
match state.state {
154155
ExecuteStateKind::Running | ExecuteStateKind::Starting => match r.data {
155-
None => (JsonBlock::empty(), Some(make_state_uri(&id))),
156+
None => (JsonBlock::empty(), Some(make_state_uri(&id, &r.node_id))),
156157
Some(d) => {
157158
let uri = match d.next_page_no {
158-
Some(n) => Some(make_page_uri(&id, n)),
159-
None => Some(make_state_uri(&id)),
159+
Some(n) => Some(make_page_uri(&id, &r.node_id, n)),
160+
None => Some(make_state_uri(&id, &r.node_id)),
160161
};
161162
(d.page.data, uri)
162163
}
163164
},
164-
ExecuteStateKind::Failed => (JsonBlock::empty(), Some(make_final_uri(&id))),
165+
ExecuteStateKind::Failed => {
166+
(JsonBlock::empty(), Some(make_final_uri(&id, &r.node_id)))
167+
}
165168
ExecuteStateKind::Succeeded => match r.data {
166-
None => (JsonBlock::empty(), Some(make_final_uri(&id))),
169+
None => (JsonBlock::empty(), Some(make_final_uri(&id, &r.node_id))),
167170
Some(d) => {
168171
let uri = match d.next_page_no {
169-
Some(n) => Some(make_page_uri(&id, n)),
170-
None => Some(make_final_uri(&id)),
172+
Some(n) => Some(make_page_uri(&id, &r.node_id, n)),
173+
None => Some(make_final_uri(&id, &r.node_id)),
171174
};
172175
(d.page.data, uri)
173176
}
@@ -198,9 +201,9 @@ impl QueryResponse {
198201
warnings: r.state.warnings,
199202
id: id.clone(),
200203
next_uri,
201-
stats_uri: Some(make_state_uri(&id)),
202-
final_uri: Some(make_final_uri(&id)),
203-
kill_uri: Some(make_kill_uri(&id)),
204+
stats_uri: Some(make_state_uri(&id, &node_id)),
205+
final_uri: Some(make_final_uri(&id, &node_id)),
206+
kill_uri: Some(make_kill_uri(&id, &node_id)),
204207
error: r.state.error.map(QueryError::from_error_code),
205208
has_result_set: r.state.has_result_set,
206209
})
@@ -223,15 +226,16 @@ impl QueryResponse {
223226
#[poem::handler]
224227
async fn query_final_handler(
225228
ctx: &HttpQueryContext,
226-
Path(query_id): Path<String>,
229+
Path((query_id, node_id)): Path<(String, String)>,
227230
) -> PoemResult<impl IntoResponse> {
231+
ctx.check_node_id(&node_id, &query_id)?;
228232
let root = get_http_tracing_span(full_name!(), ctx, &query_id);
229233
let _t = SlowRequestLogTracker::new(ctx);
230-
231234
async {
232235
info!(
233-
"{}: got /v1/query/{}/final request, this query is going to be finally completed.",
234-
query_id, query_id
236+
"{}: got {} request, this query is going to be finally completed.",
237+
query_id,
238+
make_final_uri(&query_id, &node_id)
235239
);
236240
let http_query_manager = HttpQueryManager::instance();
237241
match http_query_manager
@@ -260,15 +264,16 @@ async fn query_final_handler(
260264
#[poem::handler]
261265
async fn query_cancel_handler(
262266
ctx: &HttpQueryContext,
263-
Path(query_id): Path<String>,
267+
Path((query_id, node_id)): Path<(String, String)>,
264268
) -> PoemResult<impl IntoResponse> {
269+
ctx.check_node_id(&node_id, &query_id)?;
265270
let root = get_http_tracing_span(full_name!(), ctx, &query_id);
266271
let _t = SlowRequestLogTracker::new(ctx);
267-
268272
async {
269273
info!(
270-
"{}: got /v1/query/{}/kill request, cancel the query",
271-
query_id, query_id
274+
"{}: got {} request, cancel the query",
275+
query_id,
276+
make_kill_uri(&query_id, &node_id)
272277
);
273278
let http_query_manager = HttpQueryManager::instance();
274279
match http_query_manager
@@ -290,10 +295,10 @@ async fn query_cancel_handler(
290295
#[poem::handler]
291296
async fn query_state_handler(
292297
ctx: &HttpQueryContext,
293-
Path(query_id): Path<String>,
298+
Path((query_id, node_id)): Path<(String, String)>,
294299
) -> PoemResult<impl IntoResponse> {
300+
ctx.check_node_id(&node_id, &query_id)?;
295301
let root = get_http_tracing_span(full_name!(), ctx, &query_id);
296-
297302
async {
298303
let http_query_manager = HttpQueryManager::instance();
299304
match http_query_manager.get_query(&query_id) {
@@ -315,11 +320,11 @@ async fn query_state_handler(
315320
#[poem::handler]
316321
async fn query_page_handler(
317322
ctx: &HttpQueryContext,
318-
Path((query_id, page_no)): Path<(String, usize)>,
323+
Path((query_id, node_id, page_no)): Path<(String, String, usize)>,
319324
) -> PoemResult<impl IntoResponse> {
325+
ctx.check_node_id(&node_id, &query_id)?;
320326
let root = get_http_tracing_span(full_name!(), ctx, &query_id);
321327
let _t = SlowRequestLogTracker::new(ctx);
322-
323328
async {
324329
let http_query_manager = HttpQueryManager::instance();
325330
match http_query_manager.get_query(&query_id) {
@@ -352,7 +357,8 @@ pub(crate) async fn query_handler(
352357
let _t = SlowRequestLogTracker::new(ctx);
353358

354359
async {
355-
info!("http query new request: {:}", mask_connection_info(&format!("{:?}", req)));
360+
let agent = ctx.user_agent.as_ref().map(|s|(format!("(from {s})"))).unwrap_or("".to_string());
361+
info!("http query new request{}: {:}", agent, mask_connection_info(&format!("{:?}", req)));
356362
let http_query_manager = HttpQueryManager::instance();
357363
let sql = req.sql.clone();
358364

@@ -397,14 +403,14 @@ pub fn query_route() -> Route {
397403
// Note: endpoints except /v1/query may change without notice, use uris in response instead
398404
let rules = [
399405
("/", post(query_handler)),
400-
("/:id", get(query_state_handler)),
401-
("/:id/page/:page_no", get(query_page_handler)),
406+
("/:query_id/:node_id", get(query_state_handler)),
407+
("/:query_id/:node_id/page/:page_no", get(query_page_handler)),
402408
(
403-
"/:id/kill",
409+
"/:query_id/:node_id/kill",
404410
get(query_cancel_handler).post(query_cancel_handler),
405411
),
406412
(
407-
"/:id/final",
413+
"/:query_id/:node_id/final",
408414
get(query_final_handler).post(query_final_handler),
409415
),
410416
];
@@ -436,7 +442,7 @@ fn query_id_to_trace_id(query_id: &str) -> TraceId {
436442
}
437443

438444
/// The HTTP query endpoints are expected to be responses within 60 seconds.
439-
/// If it exceeds far of 60 seconds, there might be something wrong, we should
445+
/// If it exceeds far from 60 seconds, there might be something wrong, we should
440446
/// log it.
441447
struct SlowRequestLogTracker {
442448
started_at: std::time::Instant,

src/query/service/src/servers/http/v1/query/http_query_context.rs

+20-2
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515
use std::collections::BTreeMap;
1616
use std::sync::Arc;
1717

18+
use http::StatusCode;
19+
use log::warn;
1820
use poem::FromRequest;
1921
use poem::Request;
2022
use poem::RequestBody;
21-
use poem::Result as PoemResult;
23+
use time::Instant;
2224

25+
use crate::servers::http::v1::HttpQueryManager;
2326
use crate::sessions::Session;
2427
use crate::sessions::SessionManager;
2528
use crate::sessions::SessionType;
@@ -101,11 +104,26 @@ impl HttpQueryContext {
101104
pub fn set_fail(&self) {
102105
self.session.txn_mgr().lock().set_fail();
103106
}
107+
108+
pub fn check_node_id(&self, node_id: &str, query_id: &str) -> poem::Result<()> {
109+
if node_id != self.node_id {
110+
let manager = HttpQueryManager::instance();
111+
let start_time = manager.server_info.start_time.clone();
112+
let uptime = (Instant::now() - manager.start_instant).as_seconds_f32();
113+
let msg = format!(
114+
"route error: query {query_id} SHOULD be on server {node_id}, but current server is {}, which started at {start_time}({uptime} secs ago)",
115+
self.node_id
116+
);
117+
warn!("{msg}");
118+
return Err(poem::Error::from_string(msg, StatusCode::NOT_FOUND));
119+
}
120+
Ok(())
121+
}
104122
}
105123

106124
impl<'a> FromRequest<'a> for &'a HttpQueryContext {
107125
#[async_backtrace::framed]
108-
async fn from_request(req: &'a Request, _body: &mut RequestBody) -> PoemResult<Self> {
126+
async fn from_request(req: &'a Request, _body: &mut RequestBody) -> poem::Result<Self> {
109127
Ok(req.extensions().get::<HttpQueryContext>().expect(
110128
"To use the `HttpQueryContext` extractor, the `HTTPSessionMiddleware` is required",
111129
))

src/query/service/src/servers/http/v1/query/http_query_manager.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use databend_common_exception::ErrorCode;
3030
use databend_common_exception::Result;
3131
use databend_storages_common_txn::TxnManagerRef;
3232
use parking_lot::Mutex;
33+
use time::Instant;
3334
use tokio::task;
3435

3536
use super::expiring_map::ExpiringMap;
@@ -77,6 +78,7 @@ impl<T> LimitedQueue<T> {
7778
}
7879

7980
pub struct HttpQueryManager {
81+
pub(crate) start_instant: Instant,
8082
pub(crate) server_info: ServerInfo,
8183
#[allow(clippy::type_complexity)]
8284
pub(crate) queries: Arc<DashMap<String, Arc<HttpQuery>>>,
@@ -90,9 +92,10 @@ impl HttpQueryManager {
9092
#[async_backtrace::framed]
9193
pub async fn init(cfg: &InnerConfig) -> Result<()> {
9294
GlobalInstance::set(Arc::new(HttpQueryManager {
95+
start_instant: Instant::now(),
9396
server_info: ServerInfo {
9497
id: cfg.query.node_id.clone(),
95-
start_time: chrono::Local::now().to_rfc3339_opts(SecondsFormat::Nanos, false),
98+
start_time: chrono::Local::now().to_rfc3339_opts(SecondsFormat::Millis, false),
9699
},
97100
queries: Arc::new(DashMap::new()),
98101
sessions: Mutex::new(ExpiringMap::default()),

src/query/service/tests/it/servers/http/http_query_handlers.rs

+41-12
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,7 @@ use databend_query::auth::AuthMgr;
3131
use databend_query::servers::http::middleware::get_client_ip;
3232
use databend_query::servers::http::middleware::HTTPSessionEndpoint;
3333
use databend_query::servers::http::middleware::HTTPSessionMiddleware;
34-
use databend_query::servers::http::v1::make_final_uri;
3534
use databend_query::servers::http::v1::make_page_uri;
36-
use databend_query::servers::http::v1::make_state_uri;
3735
use databend_query::servers::http::v1::query_route;
3836
use databend_query::servers::http::v1::ExecuteStateKind;
3937
use databend_query::servers::http::v1::HttpSessionConf;
@@ -276,15 +274,15 @@ async fn test_simple_sql() -> Result<()> {
276274
assert!(result.error.is_none(), "{:?}", result.error);
277275

278276
let query_id = &result.id;
279-
let final_uri = make_final_uri(query_id);
277+
let final_uri = result.final_uri.clone().unwrap();
280278

281279
assert_eq!(result.state, ExecuteStateKind::Succeeded, "{:?}", result);
282280
assert_eq!(result.next_uri, Some(final_uri.clone()), "{:?}", result);
283281
assert_eq!(result.data.len(), 10, "{:?}", result);
284282
assert_eq!(result.schema.len(), 19, "{:?}", result);
285283

286284
// get state
287-
let uri = make_state_uri(query_id);
285+
let uri = result.stats_uri.unwrap();
288286
let (status, result) = get_uri_checked(&ep, &uri).await?;
289287
assert_eq!(status, StatusCode::OK, "{:?}", result);
290288
assert!(result.error.is_none(), "{:?}", result);
@@ -293,8 +291,18 @@ async fn test_simple_sql() -> Result<()> {
293291
// assert!(result.schema.is_empty(), "{:?}", result);
294292
assert_eq!(result.state, ExecuteStateKind::Succeeded, "{:?}", result);
295293

294+
let node_id = result
295+
.session
296+
.as_ref()
297+
.unwrap()
298+
.last_server_info
299+
.as_ref()
300+
.unwrap()
301+
.id
302+
.clone();
303+
296304
// get page, support retry
297-
let page_0_uri = make_page_uri(query_id, 0);
305+
let page_0_uri = make_page_uri(query_id, &node_id, 0);
298306
for _ in 1..3 {
299307
let (status, result) = get_uri_checked(&ep, &page_0_uri).await?;
300308
assert_eq!(status, StatusCode::OK, "{:?}", result);
@@ -306,15 +314,15 @@ async fn test_simple_sql() -> Result<()> {
306314
}
307315

308316
// client retry
309-
let page_1_uri = make_page_uri(query_id, 1);
317+
let page_1_uri = make_page_uri(query_id, &node_id, 1);
310318
let (_, result) = get_uri_checked(&ep, &page_1_uri).await?;
311319
assert_eq!(status, StatusCode::OK, "{:?}", result);
312320
assert!(result.error.is_none(), "{:?}", result);
313321
assert_eq!(result.data.len(), 0, "{:?}", result);
314322
assert_eq!(result.next_uri, Some(final_uri.clone()), "{:?}", result);
315323

316324
// get page not expected
317-
let page_2_uri = make_page_uri(query_id, 2);
325+
let page_2_uri = make_page_uri(query_id, &node_id, 2);
318326
let response = get_uri(&ep, &page_2_uri).await;
319327
assert_eq!(response.status(), StatusCode::NOT_FOUND, "{:?}", result);
320328
let body = response.into_body().into_string().await.unwrap();
@@ -497,13 +505,23 @@ async fn test_wait_time_secs() -> Result<()> {
497505
let (status, result) = post_json_to_endpoint(&ep, &json, HeaderMap::default()).await?;
498506
assert_eq!(result.state, ExecuteStateKind::Starting, "{:?}", result);
499507
assert_eq!(status, StatusCode::OK, "{:?}", result);
508+
let node_id = result
509+
.session
510+
.as_ref()
511+
.unwrap()
512+
.last_server_info
513+
.as_ref()
514+
.unwrap()
515+
.id
516+
.clone();
517+
500518
let query_id = &result.id;
501-
let next_uri = make_page_uri(query_id, 0);
519+
let next_uri = make_page_uri(query_id, &node_id, 0);
502520
assert!(result.error.is_none(), "{:?}", result);
503521
assert_eq!(result.data.len(), 0, "{:?}", result);
504522
assert_eq!(result.next_uri, Some(next_uri.clone()), "{:?}", result);
505523

506-
let mut uri = make_page_uri(query_id, 0);
524+
let mut uri = make_page_uri(query_id, &node_id, 0);
507525
let mut num_row = 0;
508526
for _ in 1..300 {
509527
sleep(Duration::from_millis(10)).await;
@@ -569,16 +587,27 @@ async fn test_pagination() -> Result<()> {
569587
let json = serde_json::json!({"sql": sql.to_string(), "pagination": {"wait_time_secs": 1, "max_rows_per_page": 2}, "session": { "settings": {}}});
570588

571589
let (status, result) = post_json_to_endpoint(&ep, &json, HeaderMap::default()).await?;
590+
let node_id = result
591+
.session
592+
.as_ref()
593+
.unwrap()
594+
.last_server_info
595+
.as_ref()
596+
.unwrap()
597+
.id
598+
.clone();
599+
572600
assert_eq!(status, StatusCode::OK, "{:?}", result);
573601
let query_id = &result.id;
574-
let next_uri = make_page_uri(query_id, 1);
602+
603+
let next_uri = make_page_uri(query_id, &node_id, 1);
575604
assert!(result.error.is_none(), "{:?}", result);
576605
assert_eq!(result.data.len(), 2, "{:?}", result);
577606
assert_eq!(result.next_uri, Some(next_uri), "{:?}", result);
578607
assert!(!result.schema.is_empty(), "{:?}", result);
579608

580609
// get page not expected
581-
let uri = make_page_uri(query_id, 6);
610+
let uri = make_page_uri(query_id, &node_id, 6);
582611
let response = get_uri(&ep, &uri).await;
583612
assert_eq!(response.status(), StatusCode::NOT_FOUND, "{:?}", result);
584613
let body = response.into_body().into_string().await.unwrap();
@@ -597,7 +626,7 @@ async fn test_pagination() -> Result<()> {
597626
assert!(!result.schema.is_empty(), "{:?}", result);
598627
if page == 5 {
599628
// get state
600-
let uri = make_state_uri(query_id);
629+
let uri = result.stats_uri.clone().unwrap();
601630
let (status, _state_result) = get_uri_checked(&ep, &uri).await?;
602631
assert_eq!(status, StatusCode::OK);
603632

0 commit comments

Comments
 (0)