Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ jobs:
shell: bash
run: |
cargo fmt --all --check
cargo clippy -- --no-deps -Dclippy::pedantic -Dwarnings
cargo clippy -- --no-deps -Dclippy::pedantic -Dclippy::nursery -Dwarnings

- uses: pnpm/action-setup@v4
with:
Expand Down
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ uniffi_bindgen = "=0.29.4"

[workspace.lints.clippy]
pedantic = "warn"
nursery = "warn"

[workspace.lints.rust]
unsafe_code = "forbid"
Expand Down
2 changes: 1 addition & 1 deletion Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ check:
@echo '{{ style("command") }}check:{{ NORMAL }}'
cargo fmt --all --check
cargo check --all-features
cargo clippy --all-targets -- --no-deps -Dclippy::pedantic -Dwarnings
cargo clippy --all-targets -- --no-deps -Dclippy::pedantic -Dclippy::nursery -Dwarnings

# Clean build artefacts in the root workspace and all examples
clean:
Expand Down
5 changes: 3 additions & 2 deletions crux_core/src/bridge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ pub trait FfiFormat: Debug + 'static {
fn deserialize<'de, T: Deserialize<'de>>(bytes: &'de [u8]) -> Result<T, Self::Error>;
}

/// Request for a side-effect passed from the Core to the Shell. The `EffectId` links
/// the `Request` with the corresponding call to [`Core::resolve`] to pass the data back
/// Request for a side-effect passed from the Core to the Shell.
///
/// The `EffectId` links the `Request` with the corresponding call to [`Core::resolve`] to pass the data back
/// to the [`App::update`] function (wrapped in the event provided to the capability originating the effect).
// used in docs/internals/bridge.md
// ANCHOR: request
Expand Down
2 changes: 1 addition & 1 deletion crux_core/src/bridge/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl<T: FfiFormat> ResolveRegistry<T> {

let resolved = entry.resolve(response);

if let ResolveSerialized::Never = entry {
if matches!(entry, ResolveSerialized::Never) {
registry_lock.remove(id.0 as usize);
}

Expand Down
15 changes: 7 additions & 8 deletions crux_core/src/bridge/request_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ pub enum ResolveSerialized<T: FfiFormat> {
impl<T: FfiFormat> ResolveSerialized<T> {
pub(crate) fn resolve(&mut self, response: &[u8]) -> Result<(), BridgeError<T>> {
match self {
ResolveSerialized::Never => Err(BridgeError::ProcessResponse(ResolveError::Never)),
ResolveSerialized::Many(f) => f(response),
ResolveSerialized::Once(_) => {
Self::Never => Err(BridgeError::ProcessResponse(ResolveError::Never)),
Self::Many(f) => f(response),
Self::Once(_) => {
// The resolve has been used, turn it into a Never
let ResolveSerialized::Once(f) = std::mem::replace(self, ResolveSerialized::Never)
else {
let Self::Once(f) = std::mem::replace(self, Self::Never) else {
unreachable!("already resolved");
};

Expand Down Expand Up @@ -77,13 +76,13 @@ impl<Out> RequestHandle<Out> {
Out: 'static,
{
match self {
RequestHandle::Never => ResolveSerialized::Never,
RequestHandle::Once(resolve) => ResolveSerialized::Once(Box::new(move |response| {
Self::Never => ResolveSerialized::Never,
Self::Once(resolve) => ResolveSerialized::Once(Box::new(move |response| {
let out = func(response)?;
resolve(out);
Ok(())
})),
RequestHandle::Many(resolve) => ResolveSerialized::Many(Box::new(move |response| {
Self::Many(resolve) => ResolveSerialized::Many(Box::new(move |response| {
let out = func(response)?;
resolve(out).map_err(|()| BridgeError::ProcessResponse(ResolveError::FinishedMany))
})),
Expand Down
12 changes: 6 additions & 6 deletions crux_core/src/command/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ where
{
let make_task = Box::new(make_task);

NotificationBuilder { make_task }
Self { make_task }
}

/// Convert the [`NotificationBuilder`] into a future to use in an async context
Expand All @@ -53,7 +53,7 @@ where
Task: Future<Output = ()> + Send + 'static,
{
fn from(value: NotificationBuilder<Effect, Event, Task>) -> Self {
Command::new(|ctx| value.into_future(ctx))
Self::new(|ctx| value.into_future(ctx))
}
}

Expand All @@ -75,14 +75,14 @@ where
{
let make_task = Box::new(make_task);

RequestBuilder { make_task }
Self { make_task }
}

pub fn map<F, U>(self, map: F) -> RequestBuilder<Effect, Event, impl Future<Output = U>>
where
F: FnOnce(T) -> U + Send + 'static,
{
RequestBuilder::new(|ctx| self.into_future(ctx.clone()).map(map))
RequestBuilder::new(|ctx| self.into_future(ctx).map(map))
}

/// Chain a [`NotificationBuilder`] to run after completion of this one,
Expand Down Expand Up @@ -409,7 +409,7 @@ where
{
let make_task = Box::new(make_task);

StreamBuilder {
Self {
make_stream: make_task,
}
}
Expand All @@ -418,7 +418,7 @@ where
where
F: FnMut(T) -> U + Send + 'static,
{
StreamBuilder::new(|ctx| self.into_stream(ctx.clone()).map(map))
StreamBuilder::new(|ctx| self.into_stream(ctx).map(map))
}

/// Chain a [`RequestBuilder`] to run after completion of this [`StreamBuilder`],
Expand Down
15 changes: 7 additions & 8 deletions crux_core/src/command/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl<Effect, Event> CommandContext<Effect, Event> {
// ANCHOR: spawn
pub fn spawn<F, Fut>(&self, make_future: F) -> JoinHandle
where
F: FnOnce(CommandContext<Effect, Event>) -> Fut,
F: FnOnce(Self) -> Fut,
Fut: Future<Output = ()> + Send + 'static,
{
let (sender, receiver) = crossbeam_channel::unbounded();
Expand Down Expand Up @@ -185,22 +185,21 @@ impl<T: Unpin + Send> ShellStream<T> {
send_request: impl FnOnce() + Send + 'static,
output_receiver: mpsc::UnboundedReceiver<T>,
) -> Self {
ShellStream::ReadyToSend(Box::new(send_request), output_receiver)
Self::ReadyToSend(Box::new(send_request), output_receiver)
}

fn send(&mut self) {
// Since neither part is Clone, we'll need to do an Indiana Jones

// 1. take items out of self
let dummy = ShellStream::Sent(mpsc::unbounded().1);
let ShellStream::ReadyToSend(send_request, output_receiver) =
std::mem::replace(self, dummy)
let dummy = Self::Sent(mpsc::unbounded().1);
let Self::ReadyToSend(send_request, output_receiver) = std::mem::replace(self, dummy)
else {
unreachable!("cannot send");
};

// 2. replace self with with a Sent using the original receiver
*self = ShellStream::Sent(output_receiver);
*self = Self::Sent(output_receiver);

send_request();
}
Expand All @@ -211,15 +210,15 @@ impl<T: Unpin + Send> Stream for ShellStream<T> {

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match *self {
ShellStream::ReadyToSend(_, ref mut output_receiver) => {
Self::ReadyToSend(_, ref mut output_receiver) => {
let poll = pin!(output_receiver).poll_next(cx);
assert!(matches!(poll, Poll::Pending)); // we have not sent the request yet

self.send();

Poll::Pending
}
ShellStream::Sent(ref mut output_receiver) => pin!(output_receiver).poll_next(cx),
Self::Sent(ref mut output_receiver) => pin!(output_receiver).poll_next(cx),
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions crux_core/src/command/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ use futures::task::AtomicWaker;

use std::sync::Arc;

#[derive(Clone, Copy, Debug, PartialEq)]
pub(crate) struct TaskId(pub(crate) usize);
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct TaskId(pub(crate) usize);

// ANCHOR: task
pub(crate) struct Task {
pub struct Task {
// Used to wake the join handle when the task concludes
pub(crate) join_handle_wakers: Receiver<Waker>,
// Set to true when the task finishes, used by the join handle
Expand Down Expand Up @@ -52,7 +52,7 @@ impl Task {
// Waking a task also wakes the command itself, if it is being used as a Stream
// inside another Command (or hosted with a CommandSink)
// ANCHOR: command_waker
pub(crate) struct CommandWaker {
pub struct CommandWaker {
pub(crate) task_id: TaskId,
pub(crate) ready_queue: Sender<TaskId>,
// Waker for the executor running this command as a Stream.
Expand Down Expand Up @@ -139,8 +139,8 @@ impl Future for JoinHandle {
}
}

#[derive(Debug, PartialEq)]
pub(crate) enum TaskState {
#[derive(Debug, PartialEq, Eq)]
pub enum TaskState {
Missing,
Suspended,
Completed,
Expand Down
16 changes: 8 additions & 8 deletions crux_core/src/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ where
.send(task_id)
.expect("Could not make task ready, ready channel disconnected");

Command {
Self {
effects: effect_receiver,
events: event_receiver,
context,
Expand All @@ -353,7 +353,7 @@ where
/// Create an empty, completed Command. This is useful as a return value from `update` if
/// there are no side-effects to perform.
pub fn done() -> Self {
Command::new(|_ctx| futures::future::ready(()))
Self::new(|_ctx| futures::future::ready(()))
}

/// Create a command from another command with compatible `Effect` and `Event` types
Expand Down Expand Up @@ -384,7 +384,7 @@ where
/// the event is not guaranteed to dispatch instantly - another `update` call which is
/// already scheduled may happen first.
pub fn event(event: Event) -> Self {
Command::new(|ctx| futures::future::lazy(move |_| ctx.send_event(event)))
Self::new(|ctx| futures::future::lazy(move |_| ctx.send_event(event)))
}

/// Start a creation of a Command which sends a notification to the shell with a provided
Expand Down Expand Up @@ -490,7 +490,7 @@ where
Effect: Unpin,
Event: Unpin,
{
Command::new(|ctx| {
Self::new(|ctx| {
// first run self until done
self.into_future(ctx.clone())
// then run other until done
Expand All @@ -516,7 +516,7 @@ where
Effect: Unpin,
Event: Unpin,
{
let mut command = Command::done();
let mut command = Self::done();

for c in commands {
command.spawn(|ctx| c.into_future(ctx));
Expand Down Expand Up @@ -595,13 +595,13 @@ where
}
}

impl<Effect, Event> FromIterator<Command<Effect, Event>> for Command<Effect, Event>
impl<Effect, Event> FromIterator<Self> for Command<Effect, Event>
where
Effect: Send + Unpin + 'static,
Event: Send + Unpin + 'static,
{
fn from_iter<I: IntoIterator<Item = Command<Effect, Event>>>(iter: I) -> Self {
Command::all(iter)
fn from_iter<I: IntoIterator<Item = Self>>(iter: I) -> Self {
Self::all(iter)
}
}

Expand Down
10 changes: 4 additions & 6 deletions crux_core/src/command/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,19 @@ where
}

/// A sink for a Command stream, sending all emitted effects and events into a pair of channels
pub(crate) struct CommandSink<Effect, Event> {
pub struct CommandSink<Effect, Event> {
pub(crate) effects: Sender<Effect>,
pub(crate) events: Sender<Event>,
}

impl<Effect, Event> CommandSink<Effect, Event> {
pub(crate) fn new(effects: Sender<Effect>, events: Sender<Event>) -> Self {
pub const fn new(effects: Sender<Effect>, events: Sender<Event>) -> Self {
Self { effects, events }
}
}

#[derive(Debug, Error)]
pub(crate) enum HostedCommandError {
pub enum HostedCommandError {
#[error("Cannot send effect to host")]
CannotSendEffect,
#[error("Cannot send event to host")]
Expand Down Expand Up @@ -104,9 +104,7 @@ impl<Effect, Event> Sink<CommandOutput<Effect, Event>> for CommandSink<Effect, E
}
}

pub(crate) trait CommandStreamExt<Effect, Event>:
Stream<Item = CommandOutput<Effect, Event>>
{
pub trait CommandStreamExt<Effect, Event>: Stream<Item = CommandOutput<Effect, Event>> {
/// Connect this command to a pair of effect and event channels
///
/// This is useful if you need to multiplex several commands into the same stream of
Expand Down
4 changes: 2 additions & 2 deletions crux_core/src/command/tests/cancellation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ enum Effect {

impl From<Request<Op>> for Effect {
fn from(value: Request<Op>) -> Self {
Effect::Op(value)
Self::Op(value)
}
}

Expand Down Expand Up @@ -74,7 +74,7 @@ fn all_join_handles_get_notified() {
});

ctx.spawn({
let task_join = task_join.clone();
let task_join = task_join;

|ctx| async move {
task_join.await;
Expand Down
4 changes: 2 additions & 2 deletions crux_core/src/command/tests/combinators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -695,11 +695,11 @@ fn chaining_with_mapping() {
fn stream_mapping_and_chaining() {
let mut cmd = Command::stream_from_shell(AnOperation::One)
.map(|out| {
let AnOperationOutput::Other([a, b]) = out else {
let AnOperationOutput::Other(x) = out else {
panic!("Bad output");
};

(a, b)
x.into()
})
.then_request(|(a, b)| Command::request_from_shell(AnOperation::More([a + 1, b + 1])))
.then_send(Event::Completed);
Expand Down
4 changes: 2 additions & 2 deletions crux_core/src/command/tests/composition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ enum Effect {

impl From<Request<ToString>> for Effect {
fn from(value: Request<ToString>) -> Self {
Effect::Convert(value)
Self::Convert(value)
}
}

Expand All @@ -34,7 +34,7 @@ enum ParentEffect {

impl From<Request<ToString>> for ParentEffect {
fn from(value: Request<ToString>) -> Self {
ParentEffect::Convert(value)
Self::Convert(value)
}
}

Expand Down
Loading
Loading