Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit e7b848a

Browse files
committed
preemtible request stream
1 parent 6d3eaee commit e7b848a

File tree

2 files changed

+65
-63
lines changed

2 files changed

+65
-63
lines changed

sqld/src/error.rs

+2
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ impl IntoResponse for Error {
137137
ConflictingRestoreParameters => self.format_err(StatusCode::BAD_REQUEST),
138138
Fork(e) => e.into_response(),
139139
PrimaryStreamDisconnect => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
140+
PrimaryStreamMisuse => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
141+
PrimaryStreamInterupted => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
140142
}
141143
}
142144
}

sqld/src/rpc/streaming_exec.rs

+63-63
Original file line numberDiff line numberDiff line change
@@ -209,79 +209,79 @@ where
209209
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
210210
let this = self.project();
211211

212-
match this.state {
213-
State::Idle => {
214-
match ready!(this.request_stream.poll_next(cx)) {
215-
Some(Err(e)) => {
216-
*this.state = State::Fused;
217-
Poll::Ready(Some(Err(e)))
218-
}
219-
Some(Ok(req)) => {
220-
let request_id = req.request_id;
221-
match req.request {
222-
Some(Request::Execute(pgm)) => {
223-
let Ok(pgm) =
224-
crate::connection::program::Program::try_from(pgm.pgm.unwrap()) else {
225-
*this.state = State::Fused;
226-
return Poll::Ready(Some(Err(Status::new(Code::InvalidArgument, "invalid program"))));
227-
};
228-
let conn = this.connection.clone();
229-
let authenticated = this.authenticated.clone();
230-
231-
let s = async_stream::stream! {
232-
let (sender, mut receiver) = mpsc::channel(1);
233-
let builder = StreamResponseBuilder {
234-
request_id,
235-
sender,
236-
current: None,
237-
};
238-
let mut fut = conn.execute_program(pgm, authenticated, builder, None);
239-
loop {
240-
tokio::select! {
241-
res = &mut fut => {
242-
// drain the receiver
243-
while let Ok(msg) = receiver.try_recv() {
244-
yield msg;
245-
}
212+
// we always poll from the request stream. If a new request arrive, we interupt the current
213+
// one, and move to the next.
214+
if let Poll::Ready(maybe_req) = this.request_stream.poll_next(cx) {
215+
match maybe_req {
216+
Some(Err(e)) => {
217+
*this.state = State::Fused;
218+
return Poll::Ready(Some(Err(e)))
219+
}
220+
Some(Ok(req)) => {
221+
let request_id = req.request_id;
222+
match req.request {
223+
Some(Request::Execute(pgm)) => {
224+
let Ok(pgm) =
225+
crate::connection::program::Program::try_from(pgm.pgm.unwrap()) else {
226+
*this.state = State::Fused;
227+
return Poll::Ready(Some(Err(Status::new(Code::InvalidArgument, "invalid program"))));
228+
};
229+
let conn = this.connection.clone();
230+
let authenticated = this.authenticated.clone();
231+
232+
let s = async_stream::stream! {
233+
let (sender, mut receiver) = mpsc::channel(1);
234+
let builder = StreamResponseBuilder {
235+
request_id,
236+
sender,
237+
current: None,
238+
};
239+
let mut fut = conn.execute_program(pgm, authenticated, builder, None);
240+
loop {
241+
tokio::select! {
242+
res = &mut fut => {
243+
// drain the receiver
244+
while let Ok(msg) = receiver.try_recv() {
245+
yield msg;
246+
}
246247

247-
if let Err(e) = res {
248-
yield ExecResp {
249-
request_id,
250-
response: Some(exec_resp::Response::Error(e.into()))
251-
}
248+
if let Err(e) = res {
249+
yield ExecResp {
250+
request_id,
251+
response: Some(exec_resp::Response::Error(e.into()))
252252
}
253-
break
254253
}
255-
msg = receiver.recv() => {
256-
if let Some(msg) = msg {
257-
yield msg;
258-
}
254+
break
255+
}
256+
msg = receiver.recv() => {
257+
if let Some(msg) = msg {
258+
yield msg;
259259
}
260260
}
261261
}
262-
};
263-
*this.state = State::Execute(Box::pin(s));
264-
}
265-
Some(Request::Describe(_)) => todo!(),
266-
None => {
267-
*this.state = State::Fused;
268-
return Poll::Ready(Some(Err(Status::new(
269-
Code::InvalidArgument,
270-
"invalid ExecReq: missing request",
271-
))));
272-
}
262+
}
263+
};
264+
*this.state = State::Execute(Box::pin(s));
265+
}
266+
Some(Request::Describe(_)) => todo!(),
267+
None => {
268+
*this.state = State::Fused;
269+
return Poll::Ready(Some(Err(Status::new(
270+
Code::InvalidArgument,
271+
"invalid ExecReq: missing request",
272+
))));
273273
}
274-
// we have placed the request, poll immediately
275-
cx.waker().wake_by_ref();
276-
Poll::Pending
277-
}
278-
None => {
279-
// this would easier if tokio_stream re-exported combinators
280-
*this.state = State::Fused;
281-
Poll::Ready(None)
282274
}
283275
}
276+
None => {
277+
*this.state = State::Fused;
278+
return Poll::Ready(None)
279+
}
284280
}
281+
}
282+
283+
match this.state {
284+
State::Idle => Poll::Pending,
285285
State::Fused => Poll::Ready(None),
286286
State::Execute(stream) => {
287287
let resp = ready!(stream.as_mut().poll_next(cx));

0 commit comments

Comments
 (0)