Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/ergot/src/interface_manager/interface_impls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
#[cfg(feature = "tokio-std")]
pub mod tokio_tcp;

#[cfg(feature = "tokio-std")]
pub mod tokio_mpsc;

#[cfg(feature = "tokio-serial-v5")]
pub mod tokio_serial_cobs;

Expand Down
15 changes: 15 additions & 0 deletions crates/ergot/src/interface_manager/interface_impls/tokio_mpsc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
//! std tcp interface impl
//!
//! std tcp uses COBS for framing over a MPSC queues

use crate::interface_manager::{
Interface,
utils::{framed_stream, std::StdQueue},
};

/// An interface implementation for MPSC channel using tokio
pub struct TokioMpscInterface {}

impl Interface for TokioMpscInterface {
type Sink = framed_stream::Sink<StdQueue>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub mod eusb_0_5;
#[cfg(feature = "tokio-std")]
pub mod tokio_tcp;

#[cfg(feature = "tokio-std")]
pub mod tokio_mpsc;

use crate::{
Header, HeaderSeq, ProtocolError,
interface_manager::{
Expand Down
143 changes: 143 additions & 0 deletions crates/ergot/src/interface_manager/profiles/direct_edge/tokio_mpsc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
//! A std+tcp edge device profile
//!
//! This is useful for std based devices/applications that can directly connect to a DirectRouter
//! using a tcp connection.

use std::sync::Arc;

use crate::{
interface_manager::{
InterfaceState, Profile,
interface_impls::tokio_mpsc::TokioMpscInterface,
profiles::direct_edge::{DirectEdge, process_frame},
utils::std::{
ReceiverError, StdQueue,
acc::{CobsAccumulator, FeedResult},
},
},
net_stack::NetStackHandle,
};

use bbq2::{prod_cons::stream::StreamConsumer, traits::bbqhdl::BbqHandle};
use log::{error, info, trace, warn};
use maitake_sync::WaitQueue;
use tokio::{
select,
sync::mpsc::{Receiver, Sender},
};

pub type StdTcpClientIm = DirectEdge<TokioMpscInterface>;

pub struct RxWorker<N: NetStackHandle> {
stack: N,
skt: Receiver<Vec<u8>>,
closer: Arc<WaitQueue>,
}

// ---- impls ----

impl<N> RxWorker<N>
where
N: NetStackHandle<Profile = DirectEdge<TokioMpscInterface>>,
{
pub async fn run(mut self) -> Result<(), ReceiverError> {
let res = self.run_inner().await;
// todo: this could live somewhere else?
self.stack.stack().manage_profile(|im| {
_ = im.set_interface_state((), InterfaceState::Down);
});
res
}

pub async fn run_inner(&mut self) -> Result<(), ReceiverError> {
let mut net_id = None;

loop {
let rd = self.skt.recv();
let close = self.closer.wait();

let ct = select! {
r = rd => {
match r {
None => {
warn!("recv run closed");
return Err(ReceiverError::SocketClosed)
},
Some(ct) => ct,
}
}
_c = close => {
return Err(ReceiverError::SocketClosed);
}
};
process_frame(&mut net_id, ct.as_slice(), &self.stack, ());
}
}
}

#[derive(Debug, PartialEq)]
pub struct SocketAlreadyActive;

// Helper functions

pub async fn register_target_interface<N>(
stack: N,
socket: (Sender<Vec<u8>>, Receiver<Vec<u8>>),
queue: StdQueue,
) -> Result<(), SocketAlreadyActive>
where
N: NetStackHandle<Profile = DirectEdge<TokioMpscInterface>>,
N: Send + 'static,
{
let (tx, rx) = socket;
let closer = Arc::new(WaitQueue::new());
stack.stack().manage_profile(|im| {
match im.interface_state(()) {
Some(InterfaceState::Down) => {}
Some(InterfaceState::Inactive) => return Err(SocketAlreadyActive),
Some(InterfaceState::ActiveLocal { .. }) => return Err(SocketAlreadyActive),
Some(InterfaceState::Active { .. }) => return Err(SocketAlreadyActive),
None => {}
}

im.set_interface_state((), InterfaceState::Inactive)
.map_err(|_| SocketAlreadyActive)?;

Ok(())
})?;
let rx_worker = RxWorker {
stack,
skt: rx,
closer: closer.clone(),
};
// TODO: spawning in a non-async context!
Comment thread
cramt marked this conversation as resolved.
tokio::task::spawn(tx_worker(tx, queue.stream_consumer(), closer.clone()));
tokio::task::spawn(rx_worker.run());
Ok(())
}

async fn tx_worker(tx: Sender<Vec<u8>>, rx: StreamConsumer<StdQueue>, closer: Arc<WaitQueue>) {
info!("Started tx_worker");
loop {
let rxf = rx.wait_read();
let clf = closer.wait();

let frame = select! {
r = rxf => r,
_c = clf => {
break;
}
};

let len = frame.len();
trace!("sending pkt len:{}", len);
let res = tx.send(frame.to_vec()).await;
frame.release(len);
if let Err(e) = res {
error!("Err: {e:?}");
break;
}
}
// TODO: GC waker?
warn!("Closing interface");
}
Loading
Loading