Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit f969447

Browse files
committed
flush when response exceed limit size
1 parent e7b848a commit f969447

File tree

4 files changed

+49
-42
lines changed

4 files changed

+49
-42
lines changed

sqld/src/connection/write_proxy.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ impl MakeConnection for MakeWriteProxyConn {
108108
}
109109
}
110110

111-
#[derive(Debug)]
112111
pub struct WriteProxyConnection {
113112
/// Lazily initialized read connection
114113
read_conn: LibSqlConnection<TransparentMethods>,
@@ -196,8 +195,9 @@ impl WriteProxyConnection {
196195
let (builder, new_status, new_frame_no) = match res {
197196
Ok(res) => res,
198197
Err(e @ (Error::PrimaryStreamDisconnect | Error::PrimaryStreamMisuse)) => {
199-
// drop the connection
198+
// drop the connection, and reset the state.
200199
self.remote_conn.lock().await.take();
200+
*status = TxnStatus::Init;
201201
return Err(e);
202202
}
203203
Err(e) => return Err(e),

sqld/src/rpc/streaming_exec.rs

+10-3
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::sync::Arc;
33
use std::task::{ready, Context, Poll};
44

55
use futures_core::Stream;
6+
use prost::Message;
67
use rusqlite::types::ValueRef;
78
use tokio::sync::mpsc;
89
use tonic::{Code, Status};
@@ -50,6 +51,7 @@ struct StreamResponseBuilder {
5051
request_id: u32,
5152
sender: mpsc::Sender<ExecResp>,
5253
current: Option<ProgramResp>,
54+
current_size: usize,
5355
}
5456

5557
impl StreamResponseBuilder {
@@ -59,12 +61,15 @@ impl StreamResponseBuilder {
5961
}
6062

6163
fn push(&mut self, step: Step) -> Result<(), QueryResultBuilderError> {
62-
const MAX_RESPONSE_STEPS: usize = 10;
64+
const MAX_RESPONSE_SIZE: usize = bytesize::ByteSize::mb(1).as_u64() as usize;
6365

6466
let current = self.current();
65-
current.steps.push(RespStep { step: Some(step) });
67+
let step = RespStep { step: Some(step) };
68+
let size = step.encoded_len();
69+
current.steps.push(step);
70+
self.current_size += size;
6671

67-
if current.steps.len() > MAX_RESPONSE_STEPS {
72+
if self.current_size >= MAX_RESPONSE_SIZE {
6873
self.flush()?;
6974
}
7075

@@ -77,6 +82,7 @@ impl StreamResponseBuilder {
7782
request_id: self.request_id,
7883
response: Some(exec_resp::Response::ProgramResp(current)),
7984
};
85+
self.current_size = 0;
8086
self.sender
8187
.blocking_send(resp)
8288
.map_err(|_| QueryResultBuilderError::Internal(anyhow::anyhow!("stream closed")))?;
@@ -235,6 +241,7 @@ where
235241
request_id,
236242
sender,
237243
current: None,
244+
current_size: 0,
238245
};
239246
let mut fut = conn.execute_program(pgm, authenticated, builder, None);
240247
loop {

sqld/tests/cluster/mod.rs

-37
Original file line numberDiff line numberDiff line change
@@ -205,40 +205,3 @@ fn sync_many_replica() {
205205

206206
sim.run().unwrap();
207207
}
208-
209-
#[test]
210-
fn create_namespace() {
211-
let mut sim = Builder::new().build();
212-
make_cluster(&mut sim, 0, false);
213-
214-
sim.client("client", async {
215-
let db =
216-
Database::open_remote_with_connector("http://foo.primary:8080", "", TurmoilConnector)?;
217-
let conn = db.connect()?;
218-
219-
let Err(e) = conn.execute("create table test (x)", ()).await else {
220-
panic!()
221-
};
222-
assert_snapshot!(e.to_string());
223-
224-
let client = Client::new();
225-
let resp = client
226-
.post(
227-
"http://foo.primary:9090/v1/namespaces/foo/create",
228-
json!({}),
229-
)
230-
.await?;
231-
assert_eq!(resp.status(), 200);
232-
233-
conn.execute("create table test (x)", ()).await.unwrap();
234-
let mut rows = conn.query("select count(*) from test", ()).await.unwrap();
235-
assert!(matches!(
236-
rows.next().unwrap().unwrap().get_value(0).unwrap(),
237-
Value::Integer(0)
238-
));
239-
240-
Ok(())
241-
});
242-
243-
sim.run().unwrap();
244-
}

sqld/tests/namespaces/mod.rs

+37
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,43 @@ fn make_primary(sim: &mut Sim, path: PathBuf) {
4141
});
4242
}
4343

44+
#[test]
45+
fn create_namespace() {
46+
let mut sim = Builder::new().build();
47+
make_cluster(&mut sim, 0, false);
48+
49+
sim.client("client", async {
50+
let db =
51+
Database::open_remote_with_connector("http://foo.primary:8080", "", TurmoilConnector)?;
52+
let conn = db.connect()?;
53+
54+
let Err(e) = conn.execute("create table test (x)", ()).await else {
55+
panic!()
56+
};
57+
assert_snapshot!(e.to_string());
58+
59+
let client = Client::new();
60+
let resp = client
61+
.post(
62+
"http://foo.primary:9090/v1/namespaces/foo/create",
63+
json!({}),
64+
)
65+
.await?;
66+
assert_eq!(resp.status(), 200);
67+
68+
conn.execute("create table test (x)", ()).await.unwrap();
69+
let mut rows = conn.query("select count(*) from test", ()).await.unwrap();
70+
assert!(matches!(
71+
rows.next().unwrap().unwrap().get_value(0).unwrap(),
72+
Value::Integer(0)
73+
));
74+
75+
Ok(())
76+
});
77+
78+
sim.run().unwrap();
79+
}
80+
4481
#[test]
4582
fn fork_namespace() {
4683
let mut sim = Builder::new().build();

0 commit comments

Comments
 (0)