Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit 9445167

Browse files
committed
implement streaming describe rpc
1 parent a4eaec3 commit 9445167

File tree

6 files changed

+264
-131
lines changed

6 files changed

+264
-131
lines changed

sqld/proto/proxy.proto

+71-28
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ package proxy;
44
message Queries {
55
repeated Query queries = 1;
66
// Uuid
7-
string clientId = 2;
7+
string client_id = 2;
88
}
99

1010
message Query {
@@ -34,10 +34,10 @@ message QueryResult {
3434

3535
message Error {
3636
enum ErrorCode {
37-
SQLError = 0;
38-
TxBusy = 1;
39-
TxTimeout = 2;
40-
Internal = 3;
37+
SQL_ERROR = 0;
38+
TX_BUSY = 1;
39+
TX_TIMEOUT = 2;
40+
INTERNAL = 3;
4141
}
4242

4343
ErrorCode code = 1;
@@ -71,7 +71,7 @@ message Description {
7171

7272
message Value {
7373
/// bincode encoded Value
74-
bytes data = 1;
74+
bytes data = 1;
7575
}
7676

7777
message Row {
@@ -84,7 +84,7 @@ message Column {
8484
}
8585

8686
message DisconnectMessage {
87-
string clientId = 1;
87+
string client_id = 1;
8888
}
8989

9090
message Ack { }
@@ -150,17 +150,29 @@ message ProgramReq {
150150
Program pgm = 2;
151151
}
152152

153+
/// Streaming exec request
153154
message ExecReq {
154-
/// id of the request. The response will contain this id
155+
/// id of the request. The response will contain this id.
155156
uint32 request_id = 1;
156157
oneof request {
157-
Program execute = 2;
158-
DescribeRequest describe = 3;
158+
StreamProgramReq execute = 2;
159+
StreamDescribeReq describe = 3;
159160
}
160161
}
161162

162-
/// streaming exec proto
163+
/// Describe request for the streaming protocol
164+
message StreamProgramReq {
165+
Program pgm = 1;
166+
}
163167

168+
/// descibre request for the streaming protocol
169+
message StreamDescribeReq {
170+
string stmt = 1;
171+
}
172+
173+
/// Response message for the streaming proto
174+
175+
/// Request response types
164176
message Init { }
165177
message BeginStep { }
166178
message FinishStep {
@@ -173,10 +185,20 @@ message StepError {
173185
message ColsDescription {
174186
repeated Column columns = 1;
175187
}
188+
message RowValue {
189+
oneof value {
190+
string text = 1;
191+
int64 integer = 2;
192+
double real = 3;
193+
bytes blob = 4;
194+
// null if present
195+
bool null = 5;
196+
}
197+
}
176198
message BeginRows { }
177199
message BeginRow { }
178200
message AddRowValue {
179-
Value val = 1;
201+
RowValue val = 1;
180202
}
181203
message FinishRow { }
182204
message FinishRows { }
@@ -185,35 +207,56 @@ message Finish {
185207
State state = 2;
186208
}
187209

210+
/// Stream execx dexcribe response messages
211+
message DescribeParam {
212+
optional string name = 1;
213+
}
188214

189-
message Message {
190-
oneof payload {
191-
Description describe_result = 1;
215+
message DescribeCol {
216+
string name = 1;
217+
optional string decltype = 2;
218+
}
192219

193-
Init init = 2;
194-
BeginStep begin_step = 3;
195-
FinishStep finish_step = 4;
196-
StepError step_error = 5;
197-
ColsDescription cols_description = 6;
198-
BeginRows begin_rows = 7;
199-
BeginRow begin_row = 8;
200-
AddRowValue add_row_value = 9;
201-
FinishRow finish_row = 10;
202-
FinishRows finish_rows = 11;
203-
Finish finish = 12;
220+
message DescribeResp {
221+
repeated DescribeParam params = 1;
222+
repeated DescribeCol cols = 2;
223+
bool is_explain = 3;
224+
bool is_readonly = 4;
225+
}
204226

205-
Error error = 13;
227+
message RespStep {
228+
oneof step {
229+
Init init = 1;
230+
BeginStep begin_step = 2;
231+
FinishStep finish_step = 3;
232+
StepError step_error = 4;
233+
ColsDescription cols_description = 5;
234+
BeginRows begin_rows = 6;
235+
BeginRow begin_row = 7;
236+
AddRowValue add_row_value = 8;
237+
FinishRow finish_row = 9;
238+
FinishRows finish_rows = 10;
239+
Finish finish = 11;
206240
}
207241
}
208242

243+
message ProgramResp {
244+
repeated RespStep steps = 1;
245+
}
246+
209247
message ExecResp {
210248
uint32 request_id = 1;
211-
repeated Message messages = 2;
249+
oneof response {
250+
ProgramResp program_resp = 2;
251+
DescribeResp describe_resp = 3;
252+
Error error = 4;
253+
}
212254
}
213255

214256
service Proxy {
215257
rpc StreamExec(stream ExecReq) returns (stream ExecResp) {}
216258

259+
// Deprecated:
217260
rpc Execute(ProgramReq) returns (ExecuteResults) {}
218261
rpc Describe(DescribeRequest) returns (DescribeResult) {}
219262
rpc Disconnect(DisconnectMessage) returns (Ack) {}

sqld/src/connection/libsql.rs

+12-7
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::stats::Stats;
2020
use crate::Result;
2121

2222
use super::config::DatabaseConfigStore;
23-
use super::program::{Cond, DescribeCol, DescribeParam, DescribeResponse, DescribeResult};
23+
use super::program::{Cond, DescribeCol, DescribeParam, DescribeResponse};
2424
use super::{MakeConnection, Program, Step, TXN_TIMEOUT};
2525

2626
pub struct MakeLibSqlConn<W: WalHook + 'static> {
@@ -161,7 +161,9 @@ impl<W: WalHook> std::fmt::Debug for LibSqlConnection<W> {
161161

162162
impl<W: WalHook> Clone for LibSqlConnection<W> {
163163
fn clone(&self) -> Self {
164-
Self { inner: self.inner.clone() }
164+
Self {
165+
inner: self.inner.clone(),
166+
}
165167
}
166168
}
167169

@@ -650,7 +652,7 @@ impl<W: WalHook> Connection<W> {
650652
self.stats.inc_rows_written(rows_written as u64);
651653
}
652654

653-
fn describe(&self, sql: &str) -> DescribeResult {
655+
fn describe(&self, sql: &str) -> crate::Result<DescribeResponse> {
654656
let stmt = self.conn.prepare(sql)?;
655657

656658
let params = (1..=stmt.parameter_count())
@@ -768,7 +770,7 @@ where
768770
sql: String,
769771
auth: Authenticated,
770772
_replication_index: Option<FrameNo>,
771-
) -> Result<DescribeResult> {
773+
) -> Result<crate::Result<DescribeResponse>> {
772774
check_describe_auth(auth)?;
773775
let conn = self.inner.clone();
774776
let res = tokio::task::spawn_blocking(move || conn.lock().describe(&sql))
@@ -994,9 +996,12 @@ mod test {
994996
tokio::task::spawn_blocking({
995997
let conn = conn1.clone();
996998
move || {
997-
let builder =
998-
Connection::run(conn.inner.clone(), Program::seq(&["COMMIT"]), TestBuilder::default())
999-
.unwrap();
999+
let builder = Connection::run(
1000+
conn.inner.clone(),
1001+
Program::seq(&["COMMIT"]),
1002+
TestBuilder::default(),
1003+
)
1004+
.unwrap();
10001005
assert_eq!(conn.txn_status().unwrap(), TxnStatus::Init);
10011006
assert!(builder.into_ret()[0].is_ok());
10021007
}

sqld/src/connection/mod.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::query_result_builder::{IgnoreResult, QueryResultBuilder};
1313
use crate::replication::FrameNo;
1414
use crate::Result;
1515

16-
use self::program::{Cond, DescribeResult, Program, Step};
16+
use self::program::{Cond, DescribeResponse, Program, Step};
1717

1818
pub mod config;
1919
pub mod dump;
@@ -111,7 +111,7 @@ pub trait Connection: Send + Sync + 'static {
111111
sql: String,
112112
auth: Authenticated,
113113
replication_index: Option<FrameNo>,
114-
) -> Result<DescribeResult>;
114+
) -> Result<Result<DescribeResponse>>;
115115

116116
/// Check whether the connection is in autocommit mode.
117117
async fn is_autocommit(&self) -> Result<bool>;
@@ -325,7 +325,7 @@ impl<DB: Connection> Connection for TrackedConnection<DB> {
325325
sql: String,
326326
auth: Authenticated,
327327
replication_index: Option<FrameNo>,
328-
) -> crate::Result<DescribeResult> {
328+
) -> crate::Result<crate::Result<DescribeResponse>> {
329329
self.atime.store(now_millis(), Ordering::Relaxed);
330330
self.inner.describe(sql, auth, replication_index).await
331331
}
@@ -376,7 +376,7 @@ mod test {
376376
_sql: String,
377377
_auth: Authenticated,
378378
_replication_index: Option<FrameNo>,
379-
) -> crate::Result<DescribeResult> {
379+
) -> crate::Result<crate::Result<DescribeResponse>> {
380380
unreachable!()
381381
}
382382

sqld/src/connection/program.rs

-2
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,6 @@ pub enum Cond {
6060
IsAutocommit,
6161
}
6262

63-
pub type DescribeResult = crate::Result<DescribeResponse>;
64-
6563
#[derive(Debug, Clone)]
6664
pub struct DescribeResponse {
6765
pub params: Vec<DescribeParam>,

0 commit comments

Comments
 (0)