Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit 6d3eaee

Browse files
committed
more error handling
1 parent 9445167 commit 6d3eaee

File tree

2 files changed

+33
-23
lines changed

2 files changed

+33
-23
lines changed

sqld/src/connection/write_proxy.rs

+26-21
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ impl WriteProxyConnection {
195195

196196
let (builder, new_status, new_frame_no) = match res {
197197
Ok(res) => res,
198-
Err(e @ Error::StreamDisconnect) => {
198+
Err(e @ (Error::PrimaryStreamDisconnect | Error::PrimaryStreamMisuse)) => {
199199
// drop the connection
200200
self.remote_conn.lock().await.take();
201201
return Err(e);
@@ -290,23 +290,28 @@ impl RemoteConnection {
290290
self.request_sender
291291
.send(req)
292292
.await
293-
.map_err(|_| Error::StreamDisconnect)?;
293+
.map_err(|_| Error::PrimaryStreamDisconnect)?;
294294

295-
'outer: while let Some(resp) = self.response_stream.next().await {
295+
while let Some(resp) = self.response_stream.next().await {
296296
match resp {
297297
Ok(resp) => {
298-
// todo: handle interuption
299-
if resp.request_id != request_id {
300-
todo!("stream misuse: connection should be serialized");
298+
// there was an interuption, and we moved to the next query
299+
if resp.request_id > request_id {
300+
return Err(Error::PrimaryStreamInterupted)
301301
}
302302

303-
if !response_cb(resp.response.unwrap())? {
304-
break 'outer;
303+
// we can ignore response for previously interupted requests
304+
if resp.request_id < request_id {
305+
continue;
306+
}
307+
308+
if !response_cb(resp.response.ok_or(Error::PrimaryStreamMisuse)?)? {
309+
break;
305310
}
306311
}
307312
Err(e) => {
308-
tracing::error!("received error from connection stream: {e}");
309-
return Err(Error::StreamDisconnect);
313+
tracing::error!("received an error from connection stream: {e}");
314+
return Err(Error::PrimaryStreamDisconnect);
310315
}
311316
}
312317
}
@@ -326,16 +331,16 @@ impl RemoteConnection {
326331
match response {
327332
exec_resp::Response::ProgramResp(resp) => {
328333
for step in resp.steps {
329-
let Some(step) = step.step else {panic!("invalid pgm")};
334+
let Some(step) = step.step else { return Err(Error::PrimaryStreamMisuse) };
330335
match step {
331336
Step::Init(_) => builder.init(&builder_config)?,
332337
Step::BeginStep(_) => builder.begin_step()?,
333338
Step::FinishStep(FinishStep {
334339
affected_row_count,
335340
last_insert_rowid,
336341
}) => builder.finish_step(affected_row_count, last_insert_rowid)?,
337-
Step::StepError(StepError { error }) => builder
338-
.step_error(crate::error::Error::RpcQueryError(error.unwrap()))?,
342+
Step::StepError(StepError { error: Some(err) }) => builder
343+
.step_error(crate::error::Error::RpcQueryError(err))?,
339344
Step::ColsDescription(ColsDescription { columns }) => {
340345
let cols = columns.iter().map(|c| Column {
341346
name: &c.name,
@@ -365,12 +370,12 @@ impl RemoteConnection {
365370
builder.finish(last_frame_no, txn_status)?;
366371
return Ok(false);
367372
}
368-
_ => todo!("invalid request"),
373+
_ => return Err(Error::PrimaryStreamMisuse),
369374
}
370375
}
371376
}
372-
exec_resp::Response::DescribeResp(_) => todo!("invalid resp"),
373-
exec_resp::Response::Error(_) => todo!(),
377+
exec_resp::Response::DescribeResp(_) => return Err(Error::PrimaryStreamMisuse),
378+
exec_resp::Response::Error(e) => return Err(Error::RpcQueryError(e)),
374379
}
375380

376381
Ok(true)
@@ -410,12 +415,12 @@ impl RemoteConnection {
410415
is_explain: resp.is_explain,
411416
is_readonly: resp.is_readonly,
412417
});
418+
419+
Ok(false)
413420
}
414-
exec_resp::Response::Error(_) => todo!(),
415-
exec_resp::Response::ProgramResp(_) => todo!(),
421+
exec_resp::Response::Error(e) => Err(Error::RpcQueryError(e)),
422+
exec_resp::Response::ProgramResp(_) => Err(Error::PrimaryStreamMisuse),
416423
}
417-
418-
Ok(false)
419424
};
420425

421426
self.make_request(
@@ -424,7 +429,7 @@ impl RemoteConnection {
424429
)
425430
.await?;
426431

427-
Ok(out.unwrap())
432+
out.ok_or(Error::PrimaryStreamMisuse)
428433
}
429434
}
430435

sqld/src/error.rs

+7-2
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,13 @@ pub enum Error {
7979
ConflictingRestoreParameters,
8080
#[error("failed to fork database: {0}")]
8181
Fork(#[from] ForkError),
82+
8283
#[error("Connection with primary broken")]
83-
StreamDisconnect,
84+
PrimaryStreamDisconnect,
85+
#[error("Proxy protocal misuse")]
86+
PrimaryStreamMisuse,
87+
#[error("Proxy request interupted")]
88+
PrimaryStreamInterupted,
8489
}
8590

8691
trait ResponseError: std::error::Error {
@@ -131,7 +136,7 @@ impl IntoResponse for Error {
131136
LoadDumpExistingDb => self.format_err(StatusCode::BAD_REQUEST),
132137
ConflictingRestoreParameters => self.format_err(StatusCode::BAD_REQUEST),
133138
Fork(e) => e.into_response(),
134-
StreamDisconnect => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
139+
PrimaryStreamDisconnect => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
135140
}
136141
}
137142
}

0 commit comments

Comments
 (0)