Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ repository = "https://github.com/lablup/bssh"
readme = "README.md"
keywords = ["cli", "rust"]
categories = ["command-line-utilities"]
edition = "2021"
edition = "2024"

[dependencies]
bytes = "1.11.1"
Expand Down
4 changes: 2 additions & 2 deletions benches/large_output_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use bssh::node::Node;
use bssh::ssh::tokio_client::CommandOutput;
use bssh::ui::tui::app::TuiApp;
use bytes::Bytes;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use ratatui::backend::TestBackend;
use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};
use ratatui::Terminal;
use ratatui::backend::TestBackend;
use std::hint::black_box;
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
Expand Down
2 changes: 1 addition & 1 deletion crates/bssh-russh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version = "0.60.1"
authors = ["Jeongkyu Shin <inureyes@gmail.com>"]
description = "Temporary fork of russh with high-frequency PTY output fix (Handle::data from spawned tasks)"
documentation = "https://docs.rs/bssh-russh"
edition = "2021"
edition = "2024"
homepage = "https://github.com/lablup/bssh"
keywords = ["ssh"]
license = "Apache-2.0"
Expand Down
15 changes: 6 additions & 9 deletions crates/bssh-russh/src/client/encrypted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,15 +437,14 @@ impl Session {
let channel_num = map_err!(ChannelId::decode(&mut r))?;
let data = map_err!(Bytes::decode(&mut r))?;
let target = self.common.config.window_size;
if let Some(ref mut enc) = self.common.encrypted {
if enc.adjust_window_size(channel_num, &data, target)? {
if let Some(ref mut enc) = self.common.encrypted
&& enc.adjust_window_size(channel_num, &data, target)? {
let next_window =
client.adjust_window(channel_num, self.target_window_size);
if next_window > 0 {
self.target_window_size = next_window
}
}
}

if let Some(chan) = self.channels.get(&channel_num) {
let _ = chan.send(ChannelMsg::Data { data: data.clone() }).await;
Expand All @@ -459,15 +458,14 @@ impl Session {
let extended_code = map_err!(u32::decode(&mut r))?;
let data = map_err!(Bytes::decode(&mut r))?;
let target = self.common.config.window_size;
if let Some(ref mut enc) = self.common.encrypted {
if enc.adjust_window_size(channel_num, &data, target)? {
if let Some(ref mut enc) = self.common.encrypted
&& enc.adjust_window_size(channel_num, &data, target)? {
let next_window =
client.adjust_window(channel_num, self.target_window_size);
if next_window > 0 {
self.target_window_size = next_window
}
}
}

if let Some(chan) = self.channels.get(&channel_num) {
let _ = chan
Expand Down Expand Up @@ -551,8 +549,8 @@ impl Session {
}
_ => {
let wants_reply = map_err!(u8::decode(&mut r))?;
if wants_reply == 1 {
if let Some(ref mut enc) = self.common.encrypted {
if wants_reply == 1
&& let Some(ref mut enc) = self.common.encrypted {
self.common.wants_reply = false;
if let Some(ch) = enc.channels.get(&channel_num) {
push_packet!(enc.write, {
Expand All @@ -561,7 +559,6 @@ impl Session {
})
}
}
}
info!("Unknown channel request {req:?} {wants_reply:?}",);
Ok(())
}
Expand Down
35 changes: 14 additions & 21 deletions crates/bssh-russh/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -966,11 +966,10 @@ pub async fn connect<H: Handler + Send + 'static, A: tokio::net::ToSocketAddrs>(
handler: H,
) -> Result<Handle<H>, H::Error> {
let socket = map_err!(tokio::net::TcpStream::connect(addrs).await)?;
if config.as_ref().nodelay {
if let Err(e) = socket.set_nodelay(true) {
if config.as_ref().nodelay
&& let Err(e) = socket.set_nodelay(true) {
warn!("set_nodelay() failed: {e:?}");
}
}

connect_stream(config, socket, handler).await
}
Expand Down Expand Up @@ -1211,8 +1210,8 @@ impl Session {
reading.set(start_reading(stream_read, buffer, opening_cipher));
}
() = &mut keepalive_timer => {
if let Some(ref mut enc) = self.common.encrypted {
if matches!(enc.state, EncryptedState::Authenticated) {
if let Some(ref mut enc) = self.common.encrypted
&& matches!(enc.state, EncryptedState::Authenticated) {
self.common.alive_timeouts = self.common.alive_timeouts.saturating_add(1);
if self.common.config.keepalive_max != 0 && self.common.alive_timeouts > self.common.config.keepalive_max {
debug!("Timeout, server not responding to keepalives");
Expand All @@ -1221,7 +1220,6 @@ impl Session {
sent_keepalive = true;
self.send_keepalive(true)?;
}
}
}
() = &mut inactivity_timer => {
debug!("timeout");
Expand Down Expand Up @@ -1263,15 +1261,14 @@ impl Session {
self.flush()?;
map_err!(self.common.packet_writer.flush_into(stream_write).await)?;

if let Some(ref mut enc) = self.common.encrypted {
if let EncryptedState::InitCompression = enc.state {
if let Some(ref mut enc) = self.common.encrypted
&& let EncryptedState::InitCompression = enc.state {
if enc.client_compression.is_deferred() {
enc.client_compression
.init_compress(self.common.packet_writer.compress());
}
enc.state = EncryptedState::Authenticated;
}
}

if self.common.received_data {
// Reset the number of failed keepalive attempts. We don't
Expand All @@ -1281,22 +1278,20 @@ impl Session {
// data from it.
self.common.alive_timeouts = 0;
}
if self.common.received_data || sent_keepalive {
if let (futures::future::Either::Right(ref mut sleep), Some(d)) = (
if (self.common.received_data || sent_keepalive)
&& let (futures::future::Either::Right(ref mut sleep), Some(d)) = (
keepalive_timer.as_mut().as_pin_mut(),
self.common.config.keepalive_interval,
) {
sleep.as_mut().reset(tokio::time::Instant::now() + d);
}
}
if !sent_keepalive {
if let (futures::future::Either::Right(ref mut sleep), Some(d)) = (
if !sent_keepalive
&& let (futures::future::Either::Right(ref mut sleep), Some(d)) = (
inactivity_timer.as_mut().as_pin_mut(),
self.common.config.inactivity_timeout,
) {
sleep.as_mut().reset(tokio::time::Instant::now() + d);
}
}
}

result
Expand Down Expand Up @@ -1528,15 +1523,14 @@ impl Session {
/// Flush the temporary cleartext buffer into the encryption
/// buffer. This does *not* flush to the socket.
fn flush(&mut self) -> Result<(), crate::Error> {
if let Some(ref mut enc) = self.common.encrypted {
if enc.flush(
if let Some(ref mut enc) = self.common.encrypted
&& enc.flush(
&self.common.config.as_ref().limits,
&mut self.common.packet_writer,
)? && !self.kex.active()
{
self.begin_rekey()?;
}
}
Ok(())
}

Expand Down Expand Up @@ -1581,8 +1575,8 @@ async fn reply<H: Handler>(

let is_kex_msg = pkt.buffer.first().cloned().map(is_kex_msg).unwrap_or(false);

if is_kex_msg {
if let SessionKexState::InProgress(kex) = session.kex.take() {
if is_kex_msg
&& let SessionKexState::InProgress(kex) = session.kex.take() {
let progress = kex.step(Some(pkt), &mut session.common.packet_writer)?;

match progress {
Expand Down Expand Up @@ -1652,7 +1646,6 @@ async fn reply<H: Handler>(

return Ok(());
}
}

session.client_read_encrypted(handler, pkt).await
}
Expand Down
45 changes: 18 additions & 27 deletions crates/bssh-russh/src/client/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ impl Session {
pix_height: u32,
terminal_modes: &[(Pty, u32)],
) -> Result<(), crate::Error> {
if let Some(ref mut enc) = self.common.encrypted {
if let Some(channel) = enc.channels.get(&channel) {
if let Some(ref mut enc) = self.common.encrypted
&& let Some(channel) = enc.channels.get(&channel) {
push_packet!(enc.write, {
map_err!(msg::CHANNEL_REQUEST.encode(&mut enc.write))?;

Expand All @@ -137,7 +137,6 @@ impl Session {
(Pty::TTY_OP_END as u8).encode(&mut enc.write)?;
});
}
}
Ok(())
}

Expand All @@ -150,8 +149,8 @@ impl Session {
x11_authentication_cookie: &str,
x11_screen_number: u32,
) -> Result<(), crate::Error> {
if let Some(ref mut enc) = self.common.encrypted {
if let Some(channel) = enc.channels.get(&channel) {
if let Some(ref mut enc) = self.common.encrypted
&& let Some(channel) = enc.channels.get(&channel) {
push_packet!(enc.write, {
msg::CHANNEL_REQUEST.encode(&mut enc.write)?;

Expand All @@ -164,7 +163,6 @@ impl Session {
x11_screen_number.encode(&mut enc.write)?;
});
}
}
Ok(())
}

Expand All @@ -175,8 +173,8 @@ impl Session {
variable_name: &str,
variable_value: &str,
) -> Result<(), crate::Error> {
if let Some(ref mut enc) = self.common.encrypted {
if let Some(channel) = enc.channels.get(&channel) {
if let Some(ref mut enc) = self.common.encrypted
&& let Some(channel) = enc.channels.get(&channel) {
push_packet!(enc.write, {
msg::CHANNEL_REQUEST.encode(&mut enc.write)?;

Expand All @@ -187,7 +185,6 @@ impl Session {
variable_value.encode(&mut enc.write)?;
});
}
}
Ok(())
}

Expand All @@ -196,8 +193,8 @@ impl Session {
want_reply: bool,
channel: ChannelId,
) -> Result<(), crate::Error> {
if let Some(ref mut enc) = self.common.encrypted {
if let Some(channel) = enc.channels.get(&channel) {
if let Some(ref mut enc) = self.common.encrypted
&& let Some(channel) = enc.channels.get(&channel) {
push_packet!(enc.write, {
msg::CHANNEL_REQUEST.encode(&mut enc.write)?;

Expand All @@ -206,7 +203,6 @@ impl Session {
(want_reply as u8).encode(&mut enc.write)?;
});
}
}
Ok(())
}

Expand All @@ -216,8 +212,8 @@ impl Session {
want_reply: bool,
command: &[u8],
) -> Result<(), crate::Error> {
if let Some(ref mut enc) = self.common.encrypted {
if let Some(channel) = enc.channels.get(&channel) {
if let Some(ref mut enc) = self.common.encrypted
&& let Some(channel) = enc.channels.get(&channel) {
push_packet!(enc.write, {
msg::CHANNEL_REQUEST.encode(&mut enc.write)?;

Expand All @@ -228,14 +224,13 @@ impl Session {
});
return Ok(());
}
}
error!("exec");
Ok(())
}

pub fn signal(&mut self, channel: ChannelId, signal: Sig) -> Result<(), crate::Error> {
if let Some(ref mut enc) = self.common.encrypted {
if let Some(channel) = enc.channels.get(&channel) {
if let Some(ref mut enc) = self.common.encrypted
&& let Some(channel) = enc.channels.get(&channel) {
push_packet!(enc.write, {
msg::CHANNEL_REQUEST.encode(&mut enc.write)?;
channel.recipient_channel.encode(&mut enc.write)?;
Expand All @@ -244,7 +239,6 @@ impl Session {
signal.name().encode(&mut enc.write)?;
});
}
}
Ok(())
}

Expand All @@ -254,8 +248,8 @@ impl Session {
channel: ChannelId,
name: &str,
) -> Result<(), crate::Error> {
if let Some(ref mut enc) = self.common.encrypted {
if let Some(channel) = enc.channels.get(&channel) {
if let Some(ref mut enc) = self.common.encrypted
&& let Some(channel) = enc.channels.get(&channel) {
push_packet!(enc.write, {
msg::CHANNEL_REQUEST.encode(&mut enc.write)?;

Expand All @@ -265,7 +259,6 @@ impl Session {
name.encode(&mut enc.write)?;
});
}
}
Ok(())
}

Expand All @@ -277,8 +270,8 @@ impl Session {
pix_width: u32,
pix_height: u32,
) -> Result<(), crate::Error> {
if let Some(ref mut enc) = self.common.encrypted {
if let Some(channel) = enc.channels.get(&channel) {
if let Some(ref mut enc) = self.common.encrypted
&& let Some(channel) = enc.channels.get(&channel) {
push_packet!(enc.write, {
msg::CHANNEL_REQUEST.encode(&mut enc.write)?;

Expand All @@ -291,7 +284,6 @@ impl Session {
pix_height.encode(&mut enc.write)?;
});
}
}
Ok(())
}

Expand Down Expand Up @@ -484,16 +476,15 @@ impl Session {
channel: ChannelId,
want_reply: bool,
) -> Result<(), crate::Error> {
if let Some(ref mut enc) = self.common.encrypted {
if let Some(channel) = enc.channels.get(&channel) {
if let Some(ref mut enc) = self.common.encrypted
&& let Some(channel) = enc.channels.get(&channel) {
push_packet!(enc.write, {
msg::CHANNEL_REQUEST.encode(&mut enc.write)?;
channel.recipient_channel.encode(&mut enc.write)?;
"auth-agent-req@openssh.com".encode(&mut enc.write)?;
(want_reply as u8).encode(&mut enc.write)?;
});
}
}
Ok(())
}

Expand Down
5 changes: 2 additions & 3 deletions crates/bssh-russh/src/kex/dh/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,10 @@ impl<D: Digest> std::fmt::Debug for DhGroupKex<D> {
pub(crate) fn biguint_to_mpint(biguint: &BigUint) -> Vec<u8> {
let mut mpint = Vec::new();
let bytes = biguint.to_bytes_be();
if let Some(b) = bytes.first() {
if b > &0x7f {
if let Some(b) = bytes.first()
&& b > &0x7f {
mpint.push(0);
}
}
mpint.extend(&bytes);
mpint
}
Expand Down
5 changes: 2 additions & 3 deletions crates/bssh-russh/src/keys/known_hosts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,10 @@ fn match_hostname(host: &str, pattern: &str) -> bool {
let Some(Ok(hash)) = parts.next().map(|p| BASE64_MIME.decode(p.as_bytes())) else {
continue;
};
if let Ok(hmac) = Hmac::<Sha1>::new_from_slice(&salt) {
if hmac.chain_update(host).verify_slice(&hash).is_ok() {
if let Ok(hmac) = Hmac::<Sha1>::new_from_slice(&salt)
&& hmac.chain_update(host).verify_slice(&hash).is_ok() {
return true;
}
}
} else if host == entry {
return true;
}
Expand Down
Loading
Loading