Skip to content

fix(TargetDevice): support immediate event writes #276

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
312 changes: 213 additions & 99 deletions src/input/target/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
error::Error,
io,
sync::{Arc, Mutex, MutexGuard},
sync::{Arc, Mutex},
thread,
time::Duration,
};
Expand Down Expand Up @@ -360,7 +360,7 @@ pub struct TargetDriver<T: TargetInputDevice + TargetOutputDevice> {
composite_device: Option<CompositeDeviceClient>,
scheduled_events: Vec<ScheduledNativeEvent>,
tx: mpsc::Sender<TargetCommand>,
rx: mpsc::Receiver<TargetCommand>,
rx: Option<mpsc::Receiver<TargetCommand>>,
}

impl<T: TargetInputDevice + TargetOutputDevice + Send + 'static> TargetDriver<T> {
Expand All @@ -385,7 +385,7 @@ impl<T: TargetInputDevice + TargetOutputDevice + Send + 'static> TargetDriver<T>
implementation: Arc::new(Mutex::new(device)),
composite_device: None,
scheduled_events: Vec::new(),
rx,
rx: Some(rx),
tx,
}
}
Expand All @@ -399,72 +399,117 @@ impl<T: TargetInputDevice + TargetOutputDevice + Send + 'static> TargetDriver<T>
pub async fn run(mut self, dbus_path: String) -> Result<(), Box<dyn Error>> {
log::debug!("Started running target device: {dbus_path}");

// Spawn a task to wait for commands/input events
let implementation = self.implementation.clone();
let type_id = self.type_id;
let rx = self.rx.take();
let (writer_tx, mut writer_rx) = mpsc::channel(16);
let writer_task = tokio::task::spawn(async move {
let Some(rx) = rx else {
log::error!("No target command receiver was found on the target device!");
return;
};

let type_id = type_id.as_str();
if let Err(e) = Self::receive_commands(implementation, type_id, rx, writer_tx).await {
log::debug!("Failed to receive target commands: {e:?}");
}
});

// Spawn a blocking task to run the target device. The '?' operator should
// be avoided in this task so cleanup tasks can run to remove the DBus
// interface and stop the device if an error occurs.
let client = self.client();
let task =
let reader_task =
tokio::task::spawn_blocking(move || -> Result<(), Box<dyn Error + Send + Sync>> {
let mut composite_device = self.composite_device;
let mut rx = self.rx;
let mut implementation = self.implementation.lock().unwrap();

// Start the DBus interface for the device
implementation.start_dbus_interface(self.dbus.clone(), dbus_path.clone(), client);
{
let mut implementation = self.implementation.lock().unwrap();

// Start the DBus interface for the device
implementation.start_dbus_interface(
self.dbus.clone(),
dbus_path.clone(),
client,
);
}

log::debug!("Target device running: {dbus_path}");
loop {
// Find any scheduled events that are ready to be sent
let mut ready_events = vec![];
let mut i = 0;
while i < self.scheduled_events.len() {
if self.scheduled_events[i].is_ready() {
let event = self.scheduled_events.remove(i);
ready_events.push(event);
continue;
{
// Find any scheduled events that are ready to be sent
let mut ready_events = vec![];
let mut i = 0;
while i < self.scheduled_events.len() {
if self.scheduled_events[i].is_ready() {
let event = self.scheduled_events.remove(i);
ready_events.push(event);
continue;
}
i += 1;
}
i += 1;
}
for event in ready_events.drain(..) {
if let Err(e) = implementation.write_event(event.into()) {
log::error!("Error writing event: {e:?}");
break;
for event in ready_events.drain(..) {
let mut implementation = self.implementation.lock().unwrap();
if let Err(e) = implementation.write_event(event.into()) {
log::error!("Error writing event: {e:?}");
break;
}
}
}

// Receive commands/input events
if let Err(e) = TargetDriver::receive_commands(
self.type_id.as_str(),
&mut composite_device,
&mut rx,
&mut implementation,
) {
log::debug!("Error receiving commands: {e:?}");
break;
}

// Poll the implementation for scheduled input events
if let Some(mut scheduled_events) = implementation.scheduled_events() {
self.scheduled_events.append(&mut scheduled_events);
}
// Receive commands/input events
//if let Err(e) = TargetDriver::receive_commands(
// self.type_id.as_str(),
// &mut composite_device,
// &mut rx,
// &mut implementation,
//) {
// log::debug!("Error receiving commands: {e:?}");
// break;
//}

// Receive from the writer thread
match writer_rx.try_recv() {
Ok(device) => {
composite_device = Some(device);
}
Err(err) => match err {
TryRecvError::Empty => (),
TryRecvError::Disconnected => {
log::debug!("Writer thread disconnected");
break;
}
},
}

// Poll the implementation for output events
let events = match implementation.poll(&composite_device) {
Ok(events) => events,
Err(e) => {
log::error!("Error polling target device: {e:?}");
break;
// Poll the implementation for scheduled input events
{
let mut implementation = self.implementation.lock().unwrap();
if let Some(mut scheduled_events) = implementation.scheduled_events() {
self.scheduled_events.append(&mut scheduled_events);
}
}
};
for event in events.into_iter() {
let Some(ref client) = composite_device else {
break;
};

// Send the output event to source devices
let result = client.blocking_process_output_event(event);
if let Err(e) = result {
return Err(e.to_string().into());
// Poll the implementation for output events
let events = {
let mut implementation = self.implementation.lock().unwrap();
match implementation.poll(&composite_device) {
Ok(events) => events,
Err(e) => {
log::error!("Error polling target device: {e:?}");
break;
}
}
};
for event in events.into_iter() {
let Some(ref client) = composite_device else {
break;
};

// Send the output event to source devices
let result = client.blocking_process_output_event(event);
if let Err(e) = result {
return Err(e.to_string().into());
}
}
}

Expand All @@ -474,6 +519,7 @@ impl<T: TargetInputDevice + TargetOutputDevice + Send + 'static> TargetDriver<T>

// Stop the device
log::debug!("Target device stopping: {dbus_path}");
let mut implementation = self.implementation.lock().unwrap();
implementation.stop_dbus_interface(self.dbus, dbus_path.clone());
implementation.stop()?;
log::debug!("Target device stopped: {dbus_path}");
Expand All @@ -482,64 +528,132 @@ impl<T: TargetInputDevice + TargetOutputDevice + Send + 'static> TargetDriver<T>
});

// Wait for the device to finish running.
if let Err(e) = task.await? {
let (reader_result, writer_result) = tokio::join!(reader_task, writer_task);
if let Err(e) = reader_result {
return Err(e.to_string().into());
}
if let Err(e) = writer_result {
return Err(e.to_string().into());
}

Ok(())
}

/// Read commands sent to this device from the channel until it is
/// empty.
fn receive_commands(
/// Read and process commands sent to this device from the channel in a loop
async fn receive_commands(
implementation: Arc<Mutex<T>>,
type_id: &str,
composite_device: &mut Option<CompositeDeviceClient>,
rx: &mut mpsc::Receiver<TargetCommand>,
implementation: &mut MutexGuard<'_, T>,
mut rx: mpsc::Receiver<TargetCommand>,
writer_tx: mpsc::Sender<CompositeDeviceClient>,
) -> Result<(), Box<dyn Error>> {
const MAX_COMMANDS: u8 = 64;
let mut commands_processed = 0;
const BUFFER_SIZE: usize = 1024;
let mut buffer = Vec::with_capacity(BUFFER_SIZE);
loop {
match rx.try_recv() {
Ok(cmd) => match cmd {
TargetCommand::WriteEvent(event) => {
implementation.write_event(event)?;
}
TargetCommand::SetCompositeDevice(device) => {
*composite_device = Some(device.clone());
implementation.on_composite_device_attached(device)?;
}
TargetCommand::GetCapabilities(sender) => {
let capabilities = implementation.get_capabilities().unwrap_or_default();
sender.blocking_send(capabilities)?;
}
TargetCommand::GetType(sender) => {
sender.blocking_send(type_id.to_string())?;
}
TargetCommand::ClearState => {
implementation.clear_state();
}
TargetCommand::Stop => {
implementation.stop()?;
return Err("Target device stopped".into());
}
},
Err(e) => match e {
TryRecvError::Empty => return Ok(()),
TryRecvError::Disconnected => {
log::debug!("Receive channel disconnected");
return Err("Receive channel disconnected".into());
}
},
};
let num = rx.recv_many(&mut buffer, BUFFER_SIZE).await;
if num == 0 {
log::warn!("Unable to receive more commands. Channel closed.");
break;
}
for cmd in buffer.drain(..) {
Self::process_command(&implementation, type_id, cmd, &writer_tx).await?;
}
}

// Only process MAX_COMMANDS messages at a time
commands_processed += 1;
if commands_processed >= MAX_COMMANDS {
return Ok(());
Ok(())
}

/// Process the given target command
async fn process_command(
implementation: &Arc<Mutex<T>>,
type_id: &str,
cmd: TargetCommand,
writer_tx: &mpsc::Sender<CompositeDeviceClient>,
) -> Result<(), Box<dyn Error>> {
match cmd {
TargetCommand::WriteEvent(event) => {
let mut implementation = implementation.lock().unwrap();
implementation.write_event(event)?;
}
TargetCommand::SetCompositeDevice(device) => {
writer_tx.send(device.clone()).await?;
let mut implementation = implementation.lock().unwrap();
implementation.on_composite_device_attached(device)?;
}
TargetCommand::GetCapabilities(sender) => {
let capabilities = {
let implementation = implementation.lock().unwrap();
implementation.get_capabilities().unwrap_or_default()
};
sender.send(capabilities).await?;
}
TargetCommand::GetType(sender) => {
sender.send(type_id.to_string()).await?;
}
TargetCommand::ClearState => {
let mut implementation = implementation.lock().unwrap();
implementation.clear_state();
}
TargetCommand::Stop => {
let mut implementation = implementation.lock().unwrap();
implementation.stop()?;
return Err("Target device stopped".into());
}
}

Ok(())
}

//// Read commands sent to this device from the channel until it is
//// empty.
//fn receive_commands(
// type_id: &str,
// composite_device: &mut Option<CompositeDeviceClient>,
// rx: &mut mpsc::Receiver<TargetCommand>,
// implementation: &mut MutexGuard<'_, T>,
//) -> Result<(), Box<dyn Error>> {
// const MAX_COMMANDS: u8 = 64;
// let mut commands_processed = 0;
// loop {
// match rx.try_recv() {
// Ok(cmd) => match cmd {
// TargetCommand::WriteEvent(event) => {
// implementation.write_event(event)?;
// }
// TargetCommand::SetCompositeDevice(device) => {
// *composite_device = Some(device.clone());
// implementation.on_composite_device_attached(device)?;
// }
// TargetCommand::GetCapabilities(sender) => {
// let capabilities = implementation.get_capabilities().unwrap_or_default();
// sender.blocking_send(capabilities)?;
// }
// TargetCommand::GetType(sender) => {
// sender.blocking_send(type_id.to_string())?;
// }
// TargetCommand::ClearState => {
// implementation.clear_state();
// }
// TargetCommand::Stop => {
// implementation.stop()?;
// return Err("Target device stopped".into());
// }
// },
// Err(e) => match e {
// TryRecvError::Empty => return Ok(()),
// TryRecvError::Disconnected => {
// log::debug!("Receive channel disconnected");
// return Err("Receive channel disconnected".into());
// }
// },
// };

// // Only process MAX_COMMANDS messages at a time
// commands_processed += 1;
// if commands_processed >= MAX_COMMANDS {
// return Ok(());
// }
// }
//}
}

/// A [TargetDevice] is any virtual input device that emits input events
Expand Down