Skip to content

Commit 6e46f33

Browse files
committed
fix(actix-ws): wake task when multiple continuation frames exist in WebSocket stream
1 parent e3e5659 commit 6e46f33

File tree

1 file changed

+73
-67
lines changed

1 file changed

+73
-67
lines changed

actix-ws/src/aggregated.rs

Lines changed: 73 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -110,94 +110,100 @@ impl Stream for AggregatedMessageStream {
110110
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
111111
let this = self.get_mut();
112112

113-
let Some(msg) = ready!(Pin::new(&mut this.stream).poll_next(cx)?) else {
114-
return Poll::Ready(None);
115-
};
116-
117-
match msg {
118-
Message::Continuation(item) => match item {
119-
Item::FirstText(bytes) => {
120-
this.continuation_kind = ContinuationKind::Text;
121-
this.current_size += bytes.len();
122-
123-
if this.current_size > this.max_size {
124-
this.continuations.clear();
125-
return size_error();
126-
}
127-
128-
this.continuations.push(bytes);
129-
130-
Poll::Pending
131-
}
113+
loop {
114+
let Some(msg) = ready!(Pin::new(&mut this.stream).poll_next(cx)?) else {
115+
return Poll::Ready(None);
116+
};
117+
118+
match msg {
119+
Message::Continuation(item) => match item {
120+
Item::FirstText(bytes) => {
121+
this.continuation_kind = ContinuationKind::Text;
122+
this.current_size += bytes.len();
123+
124+
if this.current_size > this.max_size {
125+
this.continuations.clear();
126+
return size_error();
127+
}
132128

133-
Item::FirstBinary(bytes) => {
134-
this.continuation_kind = ContinuationKind::Binary;
135-
this.current_size += bytes.len();
129+
this.continuations.push(bytes);
136130

137-
if this.current_size > this.max_size {
138-
this.continuations.clear();
139-
return size_error();
131+
continue;
140132
}
141133

142-
this.continuations.push(bytes);
134+
Item::FirstBinary(bytes) => {
135+
this.continuation_kind = ContinuationKind::Binary;
136+
this.current_size += bytes.len();
143137

144-
Poll::Pending
145-
}
138+
if this.current_size > this.max_size {
139+
this.continuations.clear();
140+
return size_error();
141+
}
146142

147-
Item::Continue(bytes) => {
148-
this.current_size += bytes.len();
143+
this.continuations.push(bytes);
149144

150-
if this.current_size > this.max_size {
151-
this.continuations.clear();
152-
return size_error();
145+
continue;
153146
}
154147

155-
this.continuations.push(bytes);
156-
157-
Poll::Pending
158-
}
148+
Item::Continue(bytes) => {
149+
this.current_size += bytes.len();
159150

160-
Item::Last(bytes) => {
161-
this.current_size += bytes.len();
151+
if this.current_size > this.max_size {
152+
this.continuations.clear();
153+
return size_error();
154+
}
162155

163-
if this.current_size > this.max_size {
164-
// reset current_size, as this is the last message for
165-
// the current continuation
166-
this.current_size = 0;
167-
this.continuations.clear();
156+
this.continuations.push(bytes);
168157

169-
return size_error();
158+
continue;
170159
}
171160

172-
this.continuations.push(bytes);
173-
let bytes = collect(&mut this.continuations);
161+
Item::Last(bytes) => {
162+
this.current_size += bytes.len();
174163

175-
this.current_size = 0;
164+
if this.current_size > this.max_size {
165+
// reset current_size, as this is the last message for
166+
// the current continuation
167+
this.current_size = 0;
168+
this.continuations.clear();
176169

177-
match this.continuation_kind {
178-
ContinuationKind::Text => {
179-
Poll::Ready(Some(match ByteString::try_from(bytes) {
180-
Ok(bytestring) => Ok(AggregatedMessage::Text(bytestring)),
181-
Err(err) => Err(ProtocolError::Io(io::Error::new(
182-
io::ErrorKind::InvalidData,
183-
err.to_string(),
184-
))),
185-
}))
170+
return size_error();
186171
}
187-
ContinuationKind::Binary => {
188-
Poll::Ready(Some(Ok(AggregatedMessage::Binary(bytes))))
172+
173+
this.continuations.push(bytes);
174+
let bytes = collect(&mut this.continuations);
175+
176+
this.current_size = 0;
177+
178+
match this.continuation_kind {
179+
ContinuationKind::Text => {
180+
return Poll::Ready(Some(match ByteString::try_from(bytes) {
181+
Ok(bytestring) => Ok(AggregatedMessage::Text(bytestring)),
182+
Err(err) => Err(ProtocolError::Io(io::Error::new(
183+
io::ErrorKind::InvalidData,
184+
err.to_string(),
185+
))),
186+
}))
187+
}
188+
ContinuationKind::Binary => {
189+
return Poll::Ready(Some(Ok(AggregatedMessage::Binary(bytes))))
190+
}
189191
}
190192
}
191-
}
192-
},
193+
},
193194

194-
Message::Text(text) => Poll::Ready(Some(Ok(AggregatedMessage::Text(text)))),
195-
Message::Binary(binary) => Poll::Ready(Some(Ok(AggregatedMessage::Binary(binary)))),
196-
Message::Ping(ping) => Poll::Ready(Some(Ok(AggregatedMessage::Ping(ping)))),
197-
Message::Pong(pong) => Poll::Ready(Some(Ok(AggregatedMessage::Pong(pong)))),
198-
Message::Close(close) => Poll::Ready(Some(Ok(AggregatedMessage::Close(close)))),
195+
Message::Text(text) => return Poll::Ready(Some(Ok(AggregatedMessage::Text(text)))),
196+
Message::Binary(binary) => {
197+
return Poll::Ready(Some(Ok(AggregatedMessage::Binary(binary))))
198+
}
199+
Message::Ping(ping) => return Poll::Ready(Some(Ok(AggregatedMessage::Ping(ping)))),
200+
Message::Pong(pong) => return Poll::Ready(Some(Ok(AggregatedMessage::Pong(pong)))),
201+
Message::Close(close) => {
202+
return Poll::Ready(Some(Ok(AggregatedMessage::Close(close))))
203+
}
199204

200-
Message::Nop => unreachable!("MessageStream should not produce no-ops"),
205+
Message::Nop => unreachable!("MessageStream should not produce no-ops"),
206+
}
201207
}
202208
}
203209
}

0 commit comments

Comments
 (0)