Skip to content

Commit 08ca16f

Browse files
committed
refactor: more explicit error confirmation for http query route error.
1 parent 8b615a1 commit 08ca16f

File tree

9 files changed

+132
-55
lines changed

9 files changed

+132
-55
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

scripts/ci/deploy/config/databend-query-node-1.toml

+5-5
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,11 @@ auth_type = "no_password"
5454
# name = "admin"
5555
# auth_type = "no_password"
5656

57-
# [[query.users]]
58-
# name = "databend"
59-
# auth_type = "double_sha1_password"
60-
# # echo -n "databend" | sha1sum | cut -d' ' -f1 | xxd -r -p | sha1sum
61-
# auth_string = "3081f32caef285c232d066033c89a78d88a6d8a5"
57+
[[query.users]]
58+
name = "databend"
59+
auth_type = "double_sha1_password"
60+
# echo -n "databend" | sha1sum | cut -d' ' -f1 | xxd -r -p | sha1sum
61+
auth_string = "3081f32caef285c232d066033c89a78d88a6d8a5"
6262

6363
# [[query.users]]
6464
# name = "datafuselabs"

src/query/service/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ rustls = "0.22"
161161
rustls-pemfile = "2"
162162
rustls-pki-types = "1"
163163
rustyline = "14"
164+
semver = { workspace = true }
164165
serde = { workspace = true }
165166
serde_json = { workspace = true }
166167
serde_urlencoded = "0.7.1"

src/query/service/src/servers/http/v1/http_query_handlers.rs

+42-36
Original file line numberDiff line numberDiff line change
@@ -53,20 +53,20 @@ const HEADER_QUERY_ID: &str = "X-DATABEND-QUERY-ID";
5353
const HEADER_QUERY_STATE: &str = "X-DATABEND-QUERY-STATE";
5454
const HEADER_QUERY_PAGE_ROWS: &str = "X-DATABEND-QUERY-PAGE-ROWS";
5555

56-
pub fn make_page_uri(query_id: &str, page_no: usize) -> String {
57-
format!("/v1/query/{}/page/{}", query_id, page_no)
56+
pub fn make_page_uri(query_id: &str, node_id: &str, page_no: usize) -> String {
57+
format!("/v1/query/{}/{}/page/{}", query_id, node_id, page_no)
5858
}
5959

60-
pub fn make_state_uri(query_id: &str) -> String {
61-
format!("/v1/query/{}", query_id)
60+
pub fn make_state_uri(query_id: &str, node_id: &str) -> String {
61+
format!("/v1/query/{}/{}", query_id, node_id)
6262
}
6363

64-
pub fn make_final_uri(query_id: &str) -> String {
65-
format!("/v1/query/{}/final", query_id)
64+
pub fn make_final_uri(query_id: &str, node_id: &str) -> String {
65+
format!("/v1/query/{}/{}/final", query_id, node_id)
6666
}
6767

68-
pub fn make_kill_uri(query_id: &str) -> String {
69-
format!("/v1/query/{}/kill", query_id)
68+
pub fn make_kill_uri(query_id: &str, node_id: &str) -> String {
69+
format!("/v1/query/{}/{}/kill", query_id, node_id)
7070
}
7171

7272
#[derive(Serialize, Deserialize, Debug, Clone)]
@@ -146,28 +146,31 @@ impl QueryResponse {
146146
r: HttpQueryResponseInternal,
147147
is_final: bool,
148148
) -> impl IntoResponse {
149+
let node_id = r.node_id.clone();
149150
let state = r.state.clone();
150151
let (data, next_uri) = if is_final {
151152
(JsonBlock::empty(), None)
152153
} else {
153154
match state.state {
154155
ExecuteStateKind::Running | ExecuteStateKind::Starting => match r.data {
155-
None => (JsonBlock::empty(), Some(make_state_uri(&id))),
156+
None => (JsonBlock::empty(), Some(make_state_uri(&id, &r.node_id))),
156157
Some(d) => {
157158
let uri = match d.next_page_no {
158-
Some(n) => Some(make_page_uri(&id, n)),
159-
None => Some(make_state_uri(&id)),
159+
Some(n) => Some(make_page_uri(&id, &r.node_id, n)),
160+
None => Some(make_state_uri(&id, &r.node_id)),
160161
};
161162
(d.page.data, uri)
162163
}
163164
},
164-
ExecuteStateKind::Failed => (JsonBlock::empty(), Some(make_final_uri(&id))),
165+
ExecuteStateKind::Failed => {
166+
(JsonBlock::empty(), Some(make_final_uri(&id, &r.node_id)))
167+
}
165168
ExecuteStateKind::Succeeded => match r.data {
166-
None => (JsonBlock::empty(), Some(make_final_uri(&id))),
169+
None => (JsonBlock::empty(), Some(make_final_uri(&id, &r.node_id))),
167170
Some(d) => {
168171
let uri = match d.next_page_no {
169-
Some(n) => Some(make_page_uri(&id, n)),
170-
None => Some(make_final_uri(&id)),
172+
Some(n) => Some(make_page_uri(&id, &r.node_id, n)),
173+
None => Some(make_final_uri(&id, &r.node_id)),
171174
};
172175
(d.page.data, uri)
173176
}
@@ -198,9 +201,9 @@ impl QueryResponse {
198201
warnings: r.state.warnings,
199202
id: id.clone(),
200203
next_uri,
201-
stats_uri: Some(make_state_uri(&id)),
202-
final_uri: Some(make_final_uri(&id)),
203-
kill_uri: Some(make_kill_uri(&id)),
204+
stats_uri: Some(make_state_uri(&id, &node_id)),
205+
final_uri: Some(make_final_uri(&id, &node_id)),
206+
kill_uri: Some(make_kill_uri(&id, &node_id)),
204207
error: r.state.error.map(QueryError::from_error_code),
205208
has_result_set: r.state.has_result_set,
206209
})
@@ -223,15 +226,16 @@ impl QueryResponse {
223226
#[poem::handler]
224227
async fn query_final_handler(
225228
ctx: &HttpQueryContext,
226-
Path(query_id): Path<String>,
229+
Path((query_id, node_id)): Path<(String, String)>,
227230
) -> PoemResult<impl IntoResponse> {
231+
ctx.check_node_id(&node_id, &query_id)?;
228232
let root = get_http_tracing_span(full_name!(), ctx, &query_id);
229233
let _t = SlowRequestLogTracker::new(ctx);
230-
231234
async {
232235
info!(
233-
"{}: got /v1/query/{}/final request, this query is going to be finally completed.",
234-
query_id, query_id
236+
"{}: got {} request, this query is going to be finally completed.",
237+
query_id,
238+
make_final_uri(&query_id, &node_id)
235239
);
236240
let http_query_manager = HttpQueryManager::instance();
237241
match http_query_manager
@@ -260,15 +264,16 @@ async fn query_final_handler(
260264
#[poem::handler]
261265
async fn query_cancel_handler(
262266
ctx: &HttpQueryContext,
263-
Path(query_id): Path<String>,
267+
Path((query_id, node_id)): Path<(String, String)>,
264268
) -> PoemResult<impl IntoResponse> {
269+
ctx.check_node_id(&node_id, &query_id)?;
265270
let root = get_http_tracing_span(full_name!(), ctx, &query_id);
266271
let _t = SlowRequestLogTracker::new(ctx);
267-
268272
async {
269273
info!(
270-
"{}: got /v1/query/{}/kill request, cancel the query",
271-
query_id, query_id
274+
"{}: got {} request, cancel the query",
275+
query_id,
276+
make_kill_uri(&query_id, &node_id)
272277
);
273278
let http_query_manager = HttpQueryManager::instance();
274279
match http_query_manager
@@ -290,10 +295,10 @@ async fn query_cancel_handler(
290295
#[poem::handler]
291296
async fn query_state_handler(
292297
ctx: &HttpQueryContext,
293-
Path(query_id): Path<String>,
298+
Path((query_id, node_id)): Path<(String, String)>,
294299
) -> PoemResult<impl IntoResponse> {
300+
ctx.check_node_id(&node_id, &query_id)?;
295301
let root = get_http_tracing_span(full_name!(), ctx, &query_id);
296-
297302
async {
298303
let http_query_manager = HttpQueryManager::instance();
299304
match http_query_manager.get_query(&query_id) {
@@ -315,11 +320,11 @@ async fn query_state_handler(
315320
#[poem::handler]
316321
async fn query_page_handler(
317322
ctx: &HttpQueryContext,
318-
Path((query_id, page_no)): Path<(String, usize)>,
323+
Path((query_id, node_id, page_no)): Path<(String, String, usize)>,
319324
) -> PoemResult<impl IntoResponse> {
325+
ctx.check_node_id(&node_id, &query_id)?;
320326
let root = get_http_tracing_span(full_name!(), ctx, &query_id);
321327
let _t = SlowRequestLogTracker::new(ctx);
322-
323328
async {
324329
let http_query_manager = HttpQueryManager::instance();
325330
match http_query_manager.get_query(&query_id) {
@@ -352,7 +357,8 @@ pub(crate) async fn query_handler(
352357
let _t = SlowRequestLogTracker::new(ctx);
353358

354359
async {
355-
info!("http query new request: {:}", mask_connection_info(&format!("{:?}", req)));
360+
let agent = ctx.user_agent.as_ref().map(|s|(format!("(from {s})"))).unwrap_or("".to_string());
361+
info!("http query new request{}: {:}", agent, mask_connection_info(&format!("{:?}", req)));
356362
let http_query_manager = HttpQueryManager::instance();
357363
let sql = req.sql.clone();
358364

@@ -397,14 +403,14 @@ pub fn query_route() -> Route {
397403
// Note: endpoints except /v1/query may change without notice, use uris in response instead
398404
let rules = [
399405
("/", post(query_handler)),
400-
("/:id", get(query_state_handler)),
401-
("/:id/page/:page_no", get(query_page_handler)),
406+
("/:query_id/:node_id", get(query_state_handler)),
407+
("/:query_id/:node_id/page/:page_no", get(query_page_handler)),
402408
(
403-
"/:id/kill",
409+
"/:query_id/:node_id/kill",
404410
get(query_cancel_handler).post(query_cancel_handler),
405411
),
406412
(
407-
"/:id/final",
413+
"/:query_id/:node_id/final",
408414
get(query_final_handler).post(query_final_handler),
409415
),
410416
];
@@ -436,7 +442,7 @@ fn query_id_to_trace_id(query_id: &str) -> TraceId {
436442
}
437443

438444
/// The HTTP query endpoints are expected to be responses within 60 seconds.
439-
/// If it exceeds far of 60 seconds, there might be something wrong, we should
445+
/// If it exceeds far from 60 seconds, there might be something wrong, we should
440446
/// log it.
441447
struct SlowRequestLogTracker {
442448
started_at: std::time::Instant,

src/query/service/src/servers/http/v1/query/http_query_context.rs

+20-2
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515
use std::collections::BTreeMap;
1616
use std::sync::Arc;
1717

18+
use http::StatusCode;
19+
use log::warn;
1820
use poem::FromRequest;
1921
use poem::Request;
2022
use poem::RequestBody;
21-
use poem::Result as PoemResult;
23+
use time::Instant;
2224

25+
use crate::servers::http::v1::HttpQueryManager;
2326
use crate::sessions::Session;
2427
use crate::sessions::SessionManager;
2528
use crate::sessions::SessionType;
@@ -101,11 +104,26 @@ impl HttpQueryContext {
101104
pub fn set_fail(&self) {
102105
self.session.txn_mgr().lock().set_fail();
103106
}
107+
108+
pub fn check_node_id(&self, node_id: &str, query_id: &str) -> poem::Result<()> {
109+
if node_id != self.node_id {
110+
let manager = HttpQueryManager::instance();
111+
let start_time = manager.server_info.start_time.clone();
112+
let uptime = (Instant::now() - manager.start_instant).as_seconds_f32();
113+
let msg = format!(
114+
"route error: query {query_id} SHOULD be on server {node_id}, but current server is {}, which started at {start_time}({uptime} secs ago)",
115+
self.node_id
116+
);
117+
warn!("{msg}");
118+
return Err(poem::Error::from_string(msg, StatusCode::NOT_FOUND));
119+
}
120+
Ok(())
121+
}
104122
}
105123

106124
impl<'a> FromRequest<'a> for &'a HttpQueryContext {
107125
#[async_backtrace::framed]
108-
async fn from_request(req: &'a Request, _body: &mut RequestBody) -> PoemResult<Self> {
126+
async fn from_request(req: &'a Request, _body: &mut RequestBody) -> poem::Result<Self> {
109127
Ok(req.extensions().get::<HttpQueryContext>().expect(
110128
"To use the `HttpQueryContext` extractor, the `HTTPSessionMiddleware` is required",
111129
))

src/query/service/src/servers/http/v1/query/http_query_manager.rs

+3
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use databend_common_exception::ErrorCode;
3030
use databend_common_exception::Result;
3131
use databend_storages_common_txn::TxnManagerRef;
3232
use parking_lot::Mutex;
33+
use time::Instant;
3334
use tokio::task;
3435

3536
use super::expiring_map::ExpiringMap;
@@ -77,6 +78,7 @@ impl<T> LimitedQueue<T> {
7778
}
7879

7980
pub struct HttpQueryManager {
81+
pub(crate) start_instant: Instant,
8082
pub(crate) server_info: ServerInfo,
8183
#[allow(clippy::type_complexity)]
8284
pub(crate) queries: Arc<DashMap<String, Arc<HttpQuery>>>,
@@ -90,6 +92,7 @@ impl HttpQueryManager {
9092
#[async_backtrace::framed]
9193
pub async fn init(cfg: &InnerConfig) -> Result<()> {
9294
GlobalInstance::set(Arc::new(HttpQueryManager {
95+
start_instant: Instant::now(),
9396
server_info: ServerInfo {
9497
id: cfg.query.node_id.clone(),
9598
start_time: chrono::Local::now().to_rfc3339_opts(SecondsFormat::Nanos, false),

0 commit comments

Comments
 (0)