Skip to content

Commit a0c043a

Browse files
authored
Exec can return stdout data even after stdin is closed. (#1693)
* Exec can return stdout data even after stdin is closed. #1689 Signed-off-by: Eric Webster <[email protected]> * Formatting fixes. Signed-off-by: Eric Webster <[email protected]> --------- Signed-off-by: Eric Webster <[email protected]>
1 parent 8876639 commit a0c043a

File tree

6 files changed

+194
-59
lines changed

6 files changed

+194
-59
lines changed

kube-client/src/api/remote_command.rs

+35-20
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,9 @@ use tokio::{
1212
io::{AsyncRead, AsyncWrite, AsyncWriteExt, DuplexStream},
1313
select,
1414
};
15-
use tokio_tungstenite::{
16-
tungstenite::{self as ws},
17-
WebSocketStream,
18-
};
15+
use tokio_tungstenite::tungstenite as ws;
16+
17+
use crate::client::Connection;
1918

2019
use super::AttachParams;
2120

@@ -112,10 +111,7 @@ pub struct AttachedProcess {
112111
}
113112

114113
impl AttachedProcess {
115-
pub(crate) fn new<S>(stream: WebSocketStream<S>, ap: &AttachParams) -> Self
116-
where
117-
S: AsyncRead + AsyncWrite + Unpin + Sized + Send + 'static,
118-
{
114+
pub(crate) fn new(connection: Connection, ap: &AttachParams) -> Self {
119115
// To simplify the implementation, always create a pipe for stdin.
120116
// The caller does not have access to it unless they had requested.
121117
let (stdin_writer, stdin_reader) = tokio::io::duplex(ap.max_stdin_buf_size.unwrap_or(MAX_BUF_SIZE));
@@ -140,7 +136,7 @@ impl AttachedProcess {
140136
};
141137

142138
let task = tokio::spawn(start_message_loop(
143-
stream,
139+
connection,
144140
stdin_reader,
145141
stdout_writer,
146142
stderr_writer,
@@ -259,32 +255,36 @@ impl AttachedProcess {
259255
}
260256
}
261257

262-
// theses values come from here: https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/cri/streaming/remotecommand/websocket.go#L34
258+
// theses values come from here: https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apimachinery/pkg/util/remotecommand/constants.go#L57
263259
const STDIN_CHANNEL: u8 = 0;
264260
const STDOUT_CHANNEL: u8 = 1;
265261
const STDERR_CHANNEL: u8 = 2;
266262
// status channel receives `Status` object on exit.
267263
const STATUS_CHANNEL: u8 = 3;
268264
// resize channel is use to send TerminalSize object to change the size of the terminal
269265
const RESIZE_CHANNEL: u8 = 4;
266+
/// Used to signal that a channel has reached EOF. Only works on V5 of the protocol.
267+
const CLOSE_CHANNEL: u8 = 255;
270268

271-
async fn start_message_loop<S>(
272-
stream: WebSocketStream<S>,
269+
async fn start_message_loop(
270+
connection: Connection,
273271
stdin: impl AsyncRead + Unpin,
274272
mut stdout: Option<impl AsyncWrite + Unpin>,
275273
mut stderr: Option<impl AsyncWrite + Unpin>,
276274
status_tx: StatusSender,
277275
mut terminal_size_rx: Option<TerminalSizeReceiver>,
278-
) -> Result<(), Error>
279-
where
280-
S: AsyncRead + AsyncWrite + Unpin + Sized + Send + 'static,
281-
{
276+
) -> Result<(), Error> {
277+
let supports_stream_close = connection.supports_stream_close();
278+
let stream = connection.into_stream();
282279
let mut stdin_stream = tokio_util::io::ReaderStream::new(stdin);
283280
let (mut server_send, raw_server_recv) = stream.split();
284281
// Work with filtered messages to reduce noise.
285282
let mut server_recv = raw_server_recv.filter_map(filter_message).boxed();
286283
let mut have_terminal_size_rx = terminal_size_rx.is_some();
287284

285+
// True until we reach EOF for stdin.
286+
let mut stdin_is_open = true;
287+
288288
loop {
289289
let terminal_size_next = async {
290290
match terminal_size_rx.as_mut() {
@@ -319,7 +319,7 @@ where
319319
},
320320
}
321321
},
322-
stdin_message = stdin_stream.next() => {
322+
stdin_message = stdin_stream.next(), if stdin_is_open => {
323323
match stdin_message {
324324
Some(Ok(bytes)) => {
325325
if !bytes.is_empty() {
@@ -337,9 +337,24 @@ where
337337
}
338338
None => {
339339
// Stdin closed (writer half dropped).
340-
// Let the server know and disconnect.
341-
server_send.close().await.map_err(Error::SendClose)?;
342-
break;
340+
// Let the server know we reached the end of stdin.
341+
if supports_stream_close {
342+
// Signal stdin has reached EOF.
343+
// See: https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apimachinery/pkg/util/httpstream/wsstream/conn.go#L346
344+
let vec = vec![CLOSE_CHANNEL, STDIN_CHANNEL];
345+
server_send
346+
.send(ws::Message::binary(vec))
347+
.await
348+
.map_err(Error::SendStdin)?;
349+
} else {
350+
// Best we can do is trigger the whole websocket to close.
351+
// We may miss out on any remaining stdout data that has not
352+
// been sent yet.
353+
server_send.close().await.map_err(Error::SendClose)?;
354+
}
355+
356+
// Do not check stdin_stream for data in future loops.
357+
stdin_is_open = false;
343358
}
344359
}
345360
},

kube-client/src/api/subresource.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -606,7 +606,7 @@ where
606606
.request
607607
.portforward(name, ports)
608608
.map_err(Error::BuildRequest)?;
609-
let stream = self.client.connect(req).await?;
610-
Ok(Portforwarder::new(stream, ports))
609+
let connection = self.client.connect(req).await?;
610+
Ok(Portforwarder::new(connection.into_stream(), ports))
611611
}
612612
}

kube-client/src/client/kubelet_debug.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ impl Client {
6969
let mut req =
7070
Request::kubelet_node_portforward(kubelet_params, ports).map_err(Error::BuildRequest)?;
7171
req.extensions_mut().insert("kubelet_node_portforward");
72-
let stream = self.connect(req).await?;
73-
Ok(Portforwarder::new(stream, ports))
72+
let connection = self.connect(req).await?;
73+
Ok(Portforwarder::new(connection.into_stream(), ports))
7474
}
7575

7676
/// Stream logs directly from node

kube-client/src/client/mod.rs

+35-20
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,29 @@ pub struct Client {
8080
default_ns: String,
8181
}
8282

83+
/// Represents a WebSocket connection.
84+
/// Value returned by [`Client::connect`].
85+
#[cfg(feature = "ws")]
86+
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
87+
pub struct Connection {
88+
stream: WebSocketStream<TokioIo<hyper::upgrade::Upgraded>>,
89+
protocol: upgrade::StreamProtocol,
90+
}
91+
92+
#[cfg(feature = "ws")]
93+
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
94+
impl Connection {
95+
/// Return true if the stream supports graceful close signaling.
96+
pub fn supports_stream_close(&self) -> bool {
97+
self.protocol.supports_stream_close()
98+
}
99+
100+
/// Transform into the raw WebSocketStream.
101+
pub fn into_stream(self) -> WebSocketStream<TokioIo<hyper::upgrade::Upgraded>> {
102+
self.stream
103+
}
104+
}
105+
83106
/// Constructors and low-level api interfaces.
84107
///
85108
/// Most users only need [`Client::try_default`] or [`Client::new`] from this block.
@@ -190,10 +213,7 @@ impl Client {
190213
/// Make WebSocket connection.
191214
#[cfg(feature = "ws")]
192215
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
193-
pub async fn connect(
194-
&self,
195-
request: Request<Vec<u8>>,
196-
) -> Result<WebSocketStream<TokioIo<hyper::upgrade::Upgraded>>> {
216+
pub async fn connect(&self, request: Request<Vec<u8>>) -> Result<Connection> {
197217
use http::header::HeaderValue;
198218
let (mut parts, body) = request.into_parts();
199219
parts
@@ -211,25 +231,20 @@ impl Client {
211231
http::header::SEC_WEBSOCKET_KEY,
212232
key.parse().expect("valid header value"),
213233
);
214-
// Use the binary subprotocol v4, to get JSON `Status` object in `error` channel (3).
215-
// There's no official documentation about this protocol, but it's described in
216-
// [`k8s.io/apiserver/pkg/util/wsstream/conn.go`](https://git.io/JLQED).
217-
// There's a comment about v4 and `Status` object in
218-
// [`kublet/cri/streaming/remotecommand/httpstream.go`](https://git.io/JLQEh).
219-
parts.headers.insert(
220-
http::header::SEC_WEBSOCKET_PROTOCOL,
221-
HeaderValue::from_static(upgrade::WS_PROTOCOL),
222-
);
234+
upgrade::StreamProtocol::add_to_headers(&mut parts.headers)?;
223235

224236
let res = self.send(Request::from_parts(parts, Body::from(body))).await?;
225-
upgrade::verify_response(&res, &key).map_err(Error::UpgradeConnection)?;
237+
let protocol = upgrade::verify_response(&res, &key).map_err(Error::UpgradeConnection)?;
226238
match hyper::upgrade::on(res).await {
227-
Ok(upgraded) => Ok(WebSocketStream::from_raw_socket(
228-
TokioIo::new(upgraded),
229-
ws::protocol::Role::Client,
230-
None,
231-
)
232-
.await),
239+
Ok(upgraded) => Ok(Connection {
240+
stream: WebSocketStream::from_raw_socket(
241+
TokioIo::new(upgraded),
242+
ws::protocol::Role::Client,
243+
None,
244+
)
245+
.await,
246+
protocol,
247+
}),
233248

234249
Err(e) => Err(Error::UpgradeConnection(
235250
UpgradeConnectionError::GetPendingUpgrade(e),

kube-client/src/client/upgrade.rs

+85-14
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,85 @@
1-
use http::{self, Response, StatusCode};
1+
use http::{self, HeaderValue, Response, StatusCode};
22
use thiserror::Error;
33
use tokio_tungstenite::tungstenite as ws;
44

5-
use crate::client::Body;
5+
use crate::{client::Body, Error, Result};
66

7-
// Binary subprotocol v4. See `Client::connect`.
8-
pub const WS_PROTOCOL: &str = "v4.channel.k8s.io";
7+
#[derive(Debug)]
8+
pub enum StreamProtocol {
9+
/// Binary subprotocol v4. See `Client::connect`.
10+
V4,
11+
12+
/// Binary subprotocol v5. See `Client::connect`.
13+
/// v5 supports CLOSE signals.
14+
/// https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apimachinery/pkg/util/remotecommand/constants.go#L52C26-L52C43
15+
V5,
16+
}
17+
18+
impl StreamProtocol {
19+
pub fn as_str(&self) -> &'static str {
20+
match self {
21+
Self::V4 => "v4.channel.k8s.io",
22+
Self::V5 => "v5.channel.k8s.io",
23+
}
24+
}
25+
26+
fn as_bytes(&self) -> &'static [u8] {
27+
self.as_str().as_bytes()
28+
}
29+
30+
pub fn supports_stream_close(&self) -> bool {
31+
match self {
32+
Self::V4 => false,
33+
Self::V5 => true,
34+
}
35+
}
36+
37+
/// Add HTTP header SEC_WEBSOCKET_PROTOCOL with a list of supported protocol.
38+
pub fn add_to_headers(headers: &mut http::HeaderMap) -> Result<()> {
39+
// Protocols we support in our preferred order.
40+
let supported_protocols = [
41+
// v5 supports CLOSE signals.
42+
Self::V5.as_str(),
43+
// Use the binary subprotocol v4, to get JSON `Status` object in `error` channel (3).
44+
// There's no official documentation about this protocol, but it's described in
45+
// [`k8s.io/apiserver/pkg/util/wsstream/conn.go`](https://git.io/JLQED).
46+
// There's a comment about v4 and `Status` object in
47+
// [`kublet/cri/streaming/remotecommand/httpstream.go`](https://git.io/JLQEh).
48+
Self::V4.as_str(),
49+
];
50+
51+
let header_value_string = supported_protocols.join(", ");
52+
53+
// Note: Multiple headers does not work. Only a single CSV works.
54+
headers.insert(
55+
http::header::SEC_WEBSOCKET_PROTOCOL,
56+
HeaderValue::from_str(&header_value_string).map_err(|e| Error::HttpError(e.into()))?,
57+
);
58+
59+
Ok(())
60+
}
61+
62+
/// Return the subprotocol of an HTTP response.
63+
fn get_from_response<B>(res: &Response<B>) -> Option<Self> {
64+
let headers = res.headers();
65+
66+
match headers
67+
.get(http::header::SEC_WEBSOCKET_PROTOCOL)
68+
.map(|h| h.as_bytes())
69+
{
70+
Some(protocol) => {
71+
if protocol == Self::V4.as_bytes() {
72+
Some(Self::V4)
73+
} else if protocol == Self::V5.as_bytes() {
74+
Some(Self::V5)
75+
} else {
76+
None
77+
}
78+
}
79+
_ => None,
80+
}
81+
}
82+
}
983

1084
/// Possible errors from upgrading to a WebSocket connection
1185
#[cfg(feature = "ws")]
@@ -42,7 +116,7 @@ pub enum UpgradeConnectionError {
42116

43117
// Verify upgrade response according to RFC6455.
44118
// Based on `tungstenite` and added subprotocol verification.
45-
pub fn verify_response(res: &Response<Body>, key: &str) -> Result<(), UpgradeConnectionError> {
119+
pub fn verify_response(res: &Response<Body>, key: &str) -> Result<StreamProtocol, UpgradeConnectionError> {
46120
if res.status() != StatusCode::SWITCHING_PROTOCOLS {
47121
return Err(UpgradeConnectionError::ProtocolSwitch(res.status()));
48122
}
@@ -75,14 +149,11 @@ pub fn verify_response(res: &Response<Body>, key: &str) -> Result<(), UpgradeCon
75149
return Err(UpgradeConnectionError::SecWebSocketAcceptKeyMismatch);
76150
}
77151

78-
// Make sure that the server returned the correct subprotocol.
79-
if !headers
80-
.get(http::header::SEC_WEBSOCKET_PROTOCOL)
81-
.map(|h| h == WS_PROTOCOL)
82-
.unwrap_or(false)
83-
{
84-
return Err(UpgradeConnectionError::SecWebSocketProtocolMismatch);
85-
}
152+
// Make sure that the server returned an expected subprotocol.
153+
let protocol = match StreamProtocol::get_from_response(res) {
154+
Some(p) => p,
155+
None => return Err(UpgradeConnectionError::SecWebSocketProtocolMismatch),
156+
};
86157

87-
Ok(())
158+
Ok(protocol)
88159
}

kube-client/src/lib.rs

+35-1
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ mod test {
279279
#[cfg(feature = "ws")]
280280
async fn pod_can_exec_and_write_to_stdin() -> Result<(), Box<dyn std::error::Error>> {
281281
use crate::api::{DeleteParams, ListParams, Patch, PatchParams, WatchEvent};
282+
use tokio::io::AsyncWriteExt;
282283

283284
let client = Client::try_default().await?;
284285
let pods: Api<Pod> = Api::default_namespaced(client);
@@ -348,9 +349,42 @@ mod test {
348349
assert_eq!(out, "1\n2\n3\n");
349350
}
350351

352+
// Verify we read from stdout after stdin is closed.
353+
{
354+
let name = "busybox-kube2";
355+
let command = vec!["sh", "-c", "sleep 2; echo test string 2"];
356+
let ap = AttachParams::default().stdin(true).stderr(false);
357+
358+
// Make a connection so we can determine if the K8s cluster supports stream closing.
359+
let mut req = pods.request.exec(name, command.clone(), &ap)?;
360+
req.extensions_mut().insert("exec");
361+
let stream = pods.client.connect(req).await?;
362+
363+
// This only works is the cluster supports protocol version v5.channel.k8s.io
364+
// Skip for older protocols.
365+
if stream.supports_stream_close() {
366+
let mut attached = pods.exec(name, command, &ap).await?;
367+
let mut stdin_writer = attached.stdin().unwrap();
368+
let mut stdout_stream = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
369+
370+
stdin_writer.write_all(b"this will be ignored\n").await?;
371+
_ = stdin_writer.shutdown().await;
372+
373+
let next_stdout = stdout_stream.next();
374+
let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap();
375+
assert_eq!(stdout, "test string 2\n");
376+
377+
// AttachedProcess resolves with status object.
378+
let status = attached.take_status().unwrap();
379+
if let Some(status) = status.await {
380+
assert_eq!(status.status, Some("Success".to_owned()));
381+
assert_eq!(status.reason, None);
382+
}
383+
}
384+
}
385+
351386
// Verify we can write to Stdin
352387
{
353-
use tokio::io::AsyncWriteExt;
354388
let mut attached = pods
355389
.exec(
356390
"busybox-kube2",

0 commit comments

Comments
 (0)