Skip to content

feat: Use query id as trace id #17947

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Jun 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,6 @@ hashlink = "0.8"
headers = "0.4.0"
hex = "0.4.3"
hickory-resolver = "0.25"
highway = "1.1"
hive_metastore = "0.1.0"
hostname = "0.3.1"
http = "1"
Expand Down
1 change: 0 additions & 1 deletion src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ futures-util = { workspace = true }
geozero = { workspace = true }
headers = { workspace = true }
hex = { workspace = true }
highway = { workspace = true }
http = { workspace = true }
humantime = { workspace = true }
indicatif = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ use crate::servers::flight::v1::packets::QueryEnv;

pub static INIT_QUERY_ENV: &str = "/actions/init_query_env";

pub async fn init_query_env(env: QueryEnv) -> Result<()> {
pub async fn init_query_env(mut env: QueryEnv) -> Result<()> {
// Update query id to make sure they are compatible.
env.query_id = env.query_id.replace('-', "");

let mut tracking_workload_group = None;
let mut parent_mem_stat = ParentMemStat::StaticRef(&GLOBAL_MEM_STAT);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::servers::flight::v1::exchange::DataExchangeManager;
pub static START_PREPARED_QUERY: &str = "/actions/start_prepared_query";

pub async fn start_prepared_query(id: String) -> Result<()> {
let id = id.replace('-', "");
let ctx = DataExchangeManager::instance().get_query_ctx(&id)?;

let mut tracking_payload = ThreadTracker::new_tracking_payload();
Expand Down
4 changes: 2 additions & 2 deletions src/query/service/src/servers/http/middleware/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,8 +681,8 @@ impl<E: Endpoint> Endpoint for HTTPSessionEndpoint<E> {
let query_id = req
.headers()
.get(HEADER_QUERY_ID)
.map(|id| id.to_str().unwrap().to_string())
.unwrap_or_else(|| Uuid::new_v4().to_string());
.map(|id| id.to_str().unwrap().replace('-', ""))
.unwrap_or_else(|| Uuid::now_v7().simple().to_string());

let mut login_history = LoginHistory::new();
login_history.handler = LoginHandler::HTTP;
Expand Down
6 changes: 3 additions & 3 deletions src/query/service/src/servers/http/v1/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use databend_common_metrics::http::metrics_incr_http_response_errors_count;
use databend_common_version::DATABEND_SEMVER;
use fastrace::func_path;
use fastrace::prelude::*;
use highway::HighwayHash;
use http::HeaderMap;
use http::HeaderValue;
use http::StatusCode;
Expand All @@ -56,6 +55,7 @@ use poem::Request;
use poem::Route;
use serde::Deserialize;
use serde::Serialize;
use uuid::Uuid;

use super::query::ExecuteStateKind;
use super::query::HttpQuery;
Expand Down Expand Up @@ -788,8 +788,8 @@ fn query_id_not_found(query_id: &str, node_id: &str) -> PoemError {
}

fn query_id_to_trace_id(query_id: &str) -> TraceId {
let [hash_high, hash_low] = highway::PortableHash::default().hash128(query_id.as_bytes());
TraceId(((hash_high as u128) << 64) + (hash_low as u128))
let uuid = Uuid::parse_str(query_id).unwrap_or_else(|_| Uuid::now_v7());
TraceId(uuid.as_u128())
}

/// The HTTP query endpoints are expected to be responses within 60 seconds.
Expand Down
15 changes: 10 additions & 5 deletions src/query/service/src/servers/mysql/mysql_interactive_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,15 +214,20 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for InteractiveWorke
query: &'a str,
writer: QueryResultWriter<'a, W>,
) -> Result<()> {
let query_id = Uuid::new_v4().to_string();
let query_id = Uuid::now_v7();
// Ensure the query id shares the same representation as trace_id.
let query_id_str = query_id.simple().to_string();

let sampled =
thread_rng().gen_range(0..100) <= self.base.session.get_trace_sample_rate()?;
let root = Span::root(func_path!(), SpanContext::random().sampled(sampled))
let span_context =
SpanContext::new(TraceId(query_id.as_u128()), SpanId::default()).sampled(sampled);
let root = Span::root(func_path!(), span_context)
.with_properties(|| self.base.session.to_fastrace_properties());

let mut tracking_payload = ThreadTracker::new_tracking_payload();
tracking_payload.query_id = Some(query_id.clone());
tracking_payload.mem_stat = Some(MemStat::create(query_id.clone()));
tracking_payload.query_id = Some(query_id_str.clone());
tracking_payload.mem_stat = Some(MemStat::create(query_id_str.to_string()));
let _guard = ThreadTracker::tracking(tracking_payload);

ThreadTracker::tracking_future(async {
Expand All @@ -247,7 +252,7 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for InteractiveWorke
let instant = Instant::now();
let query_result = self
.base
.do_query(query_id, query)
.do_query(query_id_str, query)
.await
.map_err(|err| err.display_with_sql(query));

Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,7 @@ impl TableContext for QueryContext {
}

fn get_id(&self) -> String {
self.shared.init_query_id.as_ref().read().clone()
self.shared.init_query_id.as_ref().read().replace('-', "")
}

fn get_current_catalog(&self) -> String {
Expand Down
20 changes: 19 additions & 1 deletion src/query/service/tests/it/servers/http/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,24 @@ async fn test_return_when_finish() -> Result<()> {
async fn test_client_query_id() -> Result<()> {
let _fixture = TestFixture::setup().await?;

let wait_time_secs = 5;
let sql = "select * from numbers(1)";
let ep = create_endpoint()?;
let mut headers = HeaderMap::new();
headers.insert("x-databend-query-id", "testqueryid".parse().unwrap());
let (status, result) =
post_sql_to_endpoint_new_session(&ep, sql, wait_time_secs, headers).await?;
assert_eq!(status, StatusCode::OK);
assert_eq!(result.id, "testqueryid");

Ok(())
}

// `-` in query id will be trimmed.
#[tokio::test(flavor = "current_thread")]
async fn test_client_compatible_query_id() -> Result<()> {
let _fixture = TestFixture::setup().await?;

let wait_time_secs = 5;
let sql = "select * from numbers(1)";
let ep = create_endpoint()?;
Expand All @@ -455,7 +473,7 @@ async fn test_client_query_id() -> Result<()> {
let (status, result) =
post_sql_to_endpoint_new_session(&ep, sql, wait_time_secs, headers).await?;
assert_eq!(status, StatusCode::OK);
assert_eq!(result.id, "test-query-id");
assert_eq!(result.id, "testqueryid");

Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions tests/sqllogictests/suites/stage/ordered_unload.test
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ SELECT COUNT(*) FROM (SELECT $1 AS a, rank() OVER (ORDER BY metadata$filename, m
----
10000

# data_af2ab6dc-8725-46e5-a601-3dad9c512769_0000_00000770.csv
# data_af2ab6dc872546e5a6013dad9c512769_0000_00000770.csv
query
SELECT * from list_stage(location => '@s1') where substr(name, 43, 4) != '0000'
SELECT * from list_stage(location => '@s1') where substr(name, 39, 4) != '0000'
----

statement ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)

export TEST_USER_PASSWORD="password"
export TEST_USER_CONNECT="bendsql --user=test-user --password=password --host=${QUERY_MYSQL_HANDLER_HOST} --port ${QUERY_HTTP_HANDLER_PORT}"
export RM_UUID="sed -E ""s/[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}/UUID/g"""
export RM_UUID="sed -E ""s/[a-z0-9]{32}/UUID/g"""

stmt "drop database if exists db01;"
stmt "create database db01;"
Expand Down
2 changes: 1 addition & 1 deletion tests/suites/1_stateful/00_stage/00_0012_stage_priv.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export TEST_USER_NAME="u1"
export TEST_USER_PASSWORD="password"
export TEST_USER_CONNECT="bendsql --user=u1 --password=password --host=${QUERY_MYSQL_HANDLER_HOST} --port ${QUERY_HTTP_HANDLER_PORT}"
export USER_B_CONNECT="bendsql --user=b --password=password --host=${QUERY_MYSQL_HANDLER_HOST} --port ${QUERY_HTTP_HANDLER_PORT}"
export RM_UUID="sed -E ""s/[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}/UUID/g"""
export RM_UUID="sed -E ""s/[a-z0-9]{32}/UUID/g"""

echo "drop table if exists test_table;" | $BENDSQL_CLIENT_CONNECT
echo "drop user if exists u1;" | $BENDSQL_CLIENT_CONNECT
Expand Down
2 changes: 1 addition & 1 deletion tests/suites/1_stateful/00_stage/00_0015_unload_output.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../../../shell_env.sh

export RM_UUID="sed -E ""s/[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}/UUID/g"""
export RM_UUID="sed -E ""s/[a-z0-9]{32}/UUID/g"""

stmt "drop table if exists t1"
stmt "create table t1 (a int)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
q1.parquet 624 1
>>>> streaming load: q1.parquet error :
+ curl -sS -H x-databend-query-id:load-q1 -H 'sql:insert into streaming_load_parquet(c2,c3) values file_format = (type='\''parquet'\'', missing_field_as=error, null_if=())' -F upload=@/tmp/streaming_load_parquet/q1.parquet -u root: -XPUT http://localhost:8000/v1/streaming_load
{"id":"load-q1","stats":{"rows":1,"bytes":25}}
{"id":"loadq1","stats":{"rows":1,"bytes":25}}
<<<<
>>>> select * from streaming_load_parquet;
ok 1 2021-01-01
Expand All @@ -26,7 +26,7 @@ q2.parquet 426 1
q3.parquet 426 1
>>>> streaming load: q3.parquet field_default :
+ curl -sS -H x-databend-query-id:load-q3 -H 'sql:insert into streaming_load_parquet(c2,c3) values file_format = (type='\''parquet'\'', missing_field_as=field_default, null_if=())' -F upload=@/tmp/streaming_load_parquet/q3.parquet -u root: -XPUT http://localhost:8000/v1/streaming_load
{"id":"load-q3","stats":{"rows":1,"bytes":21}}
{"id":"loadq3","stats":{"rows":1,"bytes":21}}
<<<<
>>>> select * from streaming_load_parquet;
ok NULL 2021-01-01
Expand All @@ -37,7 +37,7 @@ ok NULL 2021-01-01
q4.parquet 643 1
>>>> streaming load: q4.parquet error :
+ curl -sS -H x-databend-query-id:load-q4 -H 'sql:insert into streaming_load_parquet(c1,c3) values file_format = (type='\''parquet'\'', missing_field_as=error, null_if=())' -F upload=@/tmp/streaming_load_parquet/q4.parquet -u root: -XPUT http://localhost:8000/v1/streaming_load
{"id":"load-q4","stats":{"rows":1,"bytes":26}}
{"id":"loadq4","stats":{"rows":1,"bytes":26}}
<<<<
>>>> select * from streaming_load_parquet;
my_null NULL 2021-01-01
Expand All @@ -48,7 +48,7 @@ my_null NULL 2021-01-01
q5.parquet 643 1
>>>> streaming load: q5.parquet error 'my_null':
+ curl -sS -H x-databend-query-id:load-q5 -H 'sql:insert into streaming_load_parquet(c1,c3) values file_format = (type='\''parquet'\'', missing_field_as=error, null_if=('\''my_null'\''))' -F upload=@/tmp/streaming_load_parquet/q5.parquet -u root: -XPUT http://localhost:8000/v1/streaming_load
{"id":"load-q5","stats":{"rows":1,"bytes":7}}
{"id":"loadq5","stats":{"rows":1,"bytes":7}}
<<<<
>>>> select * from streaming_load_parquet;
NULL NULL 2021-01-01
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
>>>> copy into @streaming_load_07/data.csv from (select '2020-01-02' as c4, 110 as c2) file_format=(type='csv') single=true include_query_id=false use_raw_path=true detailed_output=true overwrite=true;
data.csv 17 1
+ curl -sS -H x-databend-query-id:load-csv -H 'sql:insert into streaming_load_07(c3, c4, c2) values ('\''a'\'', ?, ?) file_format = (type=csv)' -F upload=@/tmp/streaming_load_07/data.csv -u root: -XPUT http://localhost:8000/v1/streaming_load
{"id":"load-csv","stats":{"rows":1,"bytes":39}}
{"id":"loadcsv","stats":{"rows":1,"bytes":39}}
<<<<
>>>> select * from streaming_load_07;
ok 110 a 2020-01-02
Expand All @@ -14,7 +14,7 @@ ok 110 a 2020-01-02
>>>> copy into @streaming_load_07/data.tsv from (select '2020-01-02' as c4, 110 as c2) file_format=(type='tsv') single=true include_query_id=false use_raw_path=true detailed_output=true overwrite=true;
data.tsv 15 1
+ curl -sS -H x-databend-query-id:load-tsv -H 'sql:insert into streaming_load_07(c3, c4, c2) values ('\''a'\'', ?, ?) file_format = (type=tsv)' -F upload=@/tmp/streaming_load_07/data.tsv -u root: -XPUT http://localhost:8000/v1/streaming_load
{"id":"load-tsv","stats":{"rows":1,"bytes":39}}
{"id":"loadtsv","stats":{"rows":1,"bytes":39}}
<<<<
>>>> select * from streaming_load_07;
ok 110 a 2020-01-02
Expand All @@ -24,7 +24,7 @@ ok 110 a 2020-01-02
>>>> copy into @streaming_load_07/data.ndjson from (select '2020-01-02' as c4, 110 as c2) file_format=(type='ndjson') single=true include_query_id=false use_raw_path=true detailed_output=true overwrite=true;
data.ndjson 29 1
+ curl -sS -H x-databend-query-id:load-ndjson -H 'sql:insert into streaming_load_07(c3, c4, c2) values ('\''a'\'', ?, ?) file_format = (type=ndjson)' -F upload=@/tmp/streaming_load_07/data.ndjson -u root: -XPUT http://localhost:8000/v1/streaming_load
{"id":"load-ndjson","stats":{"rows":1,"bytes":39}}
{"id":"loadndjson","stats":{"rows":1,"bytes":39}}
<<<<
>>>> select * from streaming_load_07;
ok 110 a 2020-01-02
Expand All @@ -34,7 +34,7 @@ ok 110 a 2020-01-02
>>>> copy into @streaming_load_07/data.parquet from (select '2020-01-02' as c4, 110 as c2) file_format=(type='parquet') single=true include_query_id=false use_raw_path=true detailed_output=true overwrite=true;
data.parquet 665 1
+ curl -sS -H x-databend-query-id:load-parquet -H 'sql:insert into streaming_load_07(c3, c4, c2) values ('\''a'\'', ?, ?) file_format = (type=parquet)' -F upload=@/tmp/streaming_load_07/data.parquet -u root: -XPUT http://localhost:8000/v1/streaming_load
{"id":"load-parquet","stats":{"rows":1,"bytes":39}}
{"id":"loadparquet","stats":{"rows":1,"bytes":39}}
<<<<
>>>> select * from streaming_load_07;
ok 110 a 2020-01-02
Expand Down
Loading