Skip to content

Commit 439d3ba

Browse files
committed
add more log of utf8 failure
1 parent 471c398 commit 439d3ba

File tree

6 files changed

+80
-66
lines changed

6 files changed

+80
-66
lines changed

apps/indexer-proxy/proxy/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "subql-indexer-proxy"
3-
version = "2.7.1"
3+
version = "2.8.2-beta.1"
44
edition = "2021"
55

66
[dependencies]

apps/indexer-proxy/proxy/src/p2p.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -719,7 +719,13 @@ async fn handle_group(
719719
)
720720
.await
721721
{
722-
Ok((res_query, res_signature, res_state, _limit)) => {
722+
Ok((
723+
res_query,
724+
res_signature,
725+
res_state,
726+
_limit,
727+
_inactive,
728+
)) => {
723729
json!({
724730
"result": general_purpose::STANDARD.encode(&res_query),
725731
"signature": res_signature,

apps/indexer-proxy/proxy/src/payg.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -480,7 +480,7 @@ pub async fn query_single_state(
480480
state: QueryState,
481481
network_type: MetricsNetwork,
482482
no_sig: bool,
483-
) -> Result<(Vec<u8>, String, String, Option<(i64, i64)>)> {
483+
) -> Result<(Vec<u8>, String, String, Option<(i64, i64)>, bool)> {
484484
let project: Project = get_project(project_id).await?;
485485

486486
// compute unit count times
@@ -529,7 +529,7 @@ pub async fn query_single_state(
529529
})?;
530530

531531
debug!("Handle query channel success");
532-
Ok((data, signature, post_state.to_bs64_old2(), limit))
532+
Ok((data, signature, post_state.to_bs64_old2(), limit, true))
533533
}
534534

535535
// query with multiple state mode
@@ -658,7 +658,7 @@ pub async fn query_multiple_state(
658658
state: MultipleQueryState,
659659
network_type: MetricsNetwork,
660660
no_sig: bool,
661-
) -> Result<(Vec<u8>, String, String, Option<(i64, i64)>)> {
661+
) -> Result<(Vec<u8>, String, String, Option<(i64, i64)>, bool)> {
662662
let project = get_project(project_id).await?;
663663

664664
// compute unit count times
@@ -675,7 +675,7 @@ pub async fn query_multiple_state(
675675
}
676676
})?;
677677
if inactive {
678-
return Ok((vec![], "".to_owned(), state.to_bs64(), None));
678+
return Ok((vec![], "".to_owned(), state.to_bs64(), None, inactive));
679679
}
680680

681681
// query the data.
@@ -701,7 +701,7 @@ pub async fn query_multiple_state(
701701
post_query_multiple_state(keyname, state_cache).await;
702702

703703
debug!("Handle query channel success");
704-
Ok((data, signature, state.to_bs64(), limit))
704+
Ok((data, signature, state.to_bs64(), limit, inactive))
705705
}
706706

707707
pub async fn extend_channel(

apps/indexer-proxy/proxy/src/server.rs

Lines changed: 46 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,23 @@
1717
// along with this program. If not, see <https://www.gnu.org/licenses/>.
1818

1919
#![deny(warnings)]
20+
use crate::account::ACCOUNT;
21+
use crate::ai::api_stream;
22+
use crate::auth::{create_jwt, AuthQuery, AuthQueryLimit, Payload};
23+
use crate::cli::COMMAND;
24+
use crate::contracts::check_agreement_and_consumer;
25+
use crate::metrics::{get_owner_metrics, MetricsNetwork, MetricsQuery};
26+
use crate::payg::{
27+
extend_channel, fetch_channel_cache, merket_price, open_state, pay_channel,
28+
query_multiple_state, query_single_state, AuthPayg,
29+
};
30+
use crate::project::get_project;
31+
use crate::sentry_log::make_sentry_message;
32+
use crate::websocket::{connect_to_project_ws, handle_websocket, validate_project, QueryType};
33+
use crate::{
34+
account::{get_indexer, indexer_healthy},
35+
auth::AuthWhitelistQuery,
36+
};
2037
use axum::extract::ws::WebSocket;
2138
use axum::{
2239
extract::{ConnectInfo, Path, WebSocketUpgrade},
@@ -43,23 +60,6 @@ use subql_indexer_utils::{
4360
};
4461
use tower_http::cors::{Any, CorsLayer};
4562

46-
use crate::ai::api_stream;
47-
use crate::auth::{create_jwt, AuthQuery, AuthQueryLimit, Payload};
48-
use crate::cli::COMMAND;
49-
use crate::contracts::check_agreement_and_consumer;
50-
use crate::metrics::{get_owner_metrics, MetricsNetwork, MetricsQuery};
51-
use crate::payg::{
52-
extend_channel, fetch_channel_cache, merket_price, open_state, pay_channel,
53-
query_multiple_state, query_single_state, AuthPayg,
54-
};
55-
use crate::project::get_project;
56-
use crate::sentry_log::make_sentry_message;
57-
use crate::websocket::{connect_to_project_ws, handle_websocket, validate_project, QueryType};
58-
use crate::{
59-
account::{get_indexer, indexer_healthy},
60-
auth::AuthWhitelistQuery,
61-
};
62-
6363
#[derive(Serialize)]
6464
pub struct QueryUri {
6565
/// the url refer to specific project
@@ -484,7 +484,7 @@ async fn ep_payg_handler(
484484
return payg_stream(endpoint.endpoint.clone(), v, state, false).await;
485485
}
486486

487-
let (data, signature, state_data, limit) = match block.to_str() {
487+
let (data, signature, state_data, limit, inactive) = match block.to_str() {
488488
Ok("multiple") => {
489489
let state = match MultipleQueryState::from_bs64(auth) {
490490
Ok(p) => p,
@@ -527,30 +527,41 @@ async fn ep_payg_handler(
527527

528528
let (body, mut headers) = match res_fmt.to_str() {
529529
Ok("inline") => {
530-
let return_body = if let Ok(return_data) = String::from_utf8(data.clone()) {
531-
if return_data.is_empty() {
530+
let return_body = match String::from_utf8(data.clone()) {
531+
Ok(return_data) => {
532+
if data.is_empty() {
533+
let account = ACCOUNT.read().await;
534+
let indexer = account.indexer;
535+
drop(account);
536+
let indexer_string = format!("{:?}", indexer);
537+
let unique_title = format!(
538+
"payg ep_query_handler, proxy get empty and lead to inline returns empty, deployment_id: {}, ep_name: {}",
539+
deployment, ep_name
540+
);
541+
let msg = format!(
542+
"res_fmt: {:#?}, headers: {:#?}, body: {}, data: {:#?}, data length is {}, base64 data is {:#?}, account address is {:#?}, inactive is {}",
543+
res_fmt, headers, body, data, data.len(), general_purpose::STANDARD.encode(&data), indexer_string, inactive
544+
);
545+
make_sentry_message(&unique_title, &msg);
546+
}
547+
return_data
548+
}
549+
Err(err) => {
550+
let account = ACCOUNT.read().await;
551+
let indexer = account.indexer;
552+
drop(account);
553+
let indexer_string = format!("{:?}", indexer);
532554
let unique_title = format!(
533555
"payg ep_query_handler, inline returns empty, because endpoint returns empty, deployment_id: {}, ep_name: {}",
534556
deployment, ep_name
535-
);
557+
);
536558
let msg = format!(
537-
"res_fmt: {:#?}, headers: {:#?}, body: {}, data: {:?}",
538-
res_fmt, headers, body, data
559+
"res_fmt: {:#?}, headers: {:#?}, body: {}, data: {:#?}, data length is {}, err is {:#?}, base64 data is {:#?}, account address is {:#?}, inactive is {}",
560+
res_fmt, headers, body, data, data.len(), err, general_purpose::STANDARD.encode(&data), indexer_string,inactive
539561
);
540562
make_sentry_message(&unique_title, &msg);
563+
"".to_owned()
541564
}
542-
return_data
543-
} else {
544-
let unique_title = format!(
545-
"payg ep_query_handler, inline returns empty, deployment_id: {}, ep_name: {}",
546-
deployment, ep_name
547-
);
548-
let msg = format!(
549-
"res_fmt: {:#?}, headers: {:#?}, body: {}, data: {:?}",
550-
res_fmt, headers, body, data
551-
);
552-
make_sentry_message(&unique_title, &msg);
553-
"".to_owned()
554565
};
555566
(
556567
return_body,

apps/indexer-proxy/utils/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@ http = "1.1.0"
1616
native-tls = "0.2.12"
1717
once_cell = "1.12"
1818
rand_chacha = "0.3"
19-
reqwest = { version = "0.12", features = ["json", "native-tls"] }
19+
reqwest = { version = "0.12", features = ["json", "native-tls", "stream"] }
2020
rustc-hex = "2.1"
2121
serde = { version = "1.0", features = ["derive"] }
2222
serde_json = "1.0"
2323
serde_with ={ version = "3.0", features = ["json"] }
2424
subql-contracts = { git = "https://github.com/subquery/network-contracts", tag = "v1.5.0" }
25+
tokio-stream = "0.1.16"
2526
uint = "0.10"

apps/indexer-proxy/utils/src/request.rs

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use serde_json::{json, Value};
3030
use serde_with::skip_serializing_none;
3131
use std::error::Error as StdError;
3232
use std::time::Duration;
33+
use tokio_stream::StreamExt;
3334

3435
pub static REQUEST_CLIENT: Lazy<Client> = Lazy::new(reqwest::Client::new);
3536

@@ -134,38 +135,33 @@ pub async fn post_request_raw(uri: &str, query: String) -> Result<Vec<u8>, Error
134135
// handle request
135136
#[inline]
136137
async fn handle_request_raw(request: RequestBuilder, query: String) -> Result<Vec<u8>, Error> {
137-
let response_result = request
138+
let res = request
138139
.timeout(Duration::from_secs(REQUEST_TIMEOUT))
139-
.header(CONTENT_TYPE, APPLICATION_JSON)
140-
.header(CONNECTION, KEEP_ALIVE)
141-
.body(query.to_owned())
140+
.header(reqwest::header::CONTENT_TYPE, "application/json")
141+
.header(reqwest::header::CONNECTION, "keep-alive")
142+
.body(query)
142143
.send()
143-
.await;
144-
145-
let res = match response_result {
146-
Ok(res) => res,
147-
Err(_e) => {
148-
return Err(Error::GraphQLInternal(
144+
.await
145+
.or_else(|_e| {
146+
Err(Error::GraphQLInternal(
149147
1010,
150148
"Service exception or timeout".to_owned(),
151149
))
152-
}
153-
};
150+
})?;
154151

155152
let status = res.status();
156-
let body = res
157-
.bytes()
158-
.await
159-
.map(|bytes| bytes.to_vec())
160-
.map_err(|e| Error::GraphQLQuery(1011, e.to_string()))?;
153+
let mut body_stream = res.bytes_stream();
154+
let mut body = Vec::new();
161155

162-
// 200~299
163-
if status.is_success() {
164-
Ok(body)
165-
} else {
166-
let err = String::from_utf8(body).unwrap_or("Internal request error".to_owned());
167-
Err(Error::GraphQLInternal(1011, err))
156+
while let Some(chunk) = body_stream.next().await {
157+
let chunk = chunk.map_err(|e| Error::GraphQLQuery(1011, e.to_string()))?;
158+
body.extend_from_slice(&chunk);
168159
}
160+
161+
status.is_success().then_some(body.clone()).ok_or_else(|| {
162+
let err = String::from_utf8_lossy(&body).to_string();
163+
Error::GraphQLInternal(1011, err)
164+
})
169165
}
170166

171167
// Request to indexer/consumer proxy

0 commit comments

Comments
 (0)