Replies: 2 comments 1 reply
-
|
Tokio would usually be the approach I go with. To ensure the interval doesn't run after the actor dies, I exit the loop in the task when the message being sent to the actor fails due to it not running: async fn on_start(&mut self, actor_ref: ActorRef<Self>) -> Result<(), BoxError> {
tokio::spawn(async move {
loop {
if matches!(
actor_ref.tell(Alive).await,
Err(SendError::ActorNotRunning | SendError::ActorStopped(_))
) {
break;
}
tokio::time::sleep(duration).await;
}
});
Ok(())
}This works fine in most cases. If you needed the task to be forcefully shutdown immediately when the actor dies, then the alternatives would be either:
async fn on_start(&mut self, actor_ref: ActorRef<Self>) -> Result<(), BoxError> {
self.interval_handle = tokio::spawn(async move { ... });
Ok(())
}
async fn on_stop(&mut self, _: WeakActorRef<Self>, _: ActorStopReason) -> Result<(), BoxError> {
self.interval_handle.abort();
Ok(())
}
struct IntervalActor<A> { notify_actor: ActorRef<A> };
impl Actor for IntervalActor {
type Mailbox = UnboundedMailbox<Self>;
async fn on_start(&mut self, actor_ref: ActorRef<Self>) -> Result<(), BoxError> {
actor_ref.tell(Tick).try_send_sync()?; // Kick start the interval ticking
Ok(())
}
}
struct Tick;
impl Message<Tick> for IntervalActor {
type Reply = Result<(), SendError<...>>;
fn handle(&mut self, ctx: Context<'_, Self, Self::Reply>) -> Self::Reply {
self.notify_actor.tell(Alive).await?;
tokio::time::sleep(duration).await;
ctx.actor_ref().tell(Tick).try_send_sync()?;
Ok(())
}
} |
Beta Was this translation helpful? Give feedback.
-
|
@tqwewe We added this to our code. This is kinda similar to the attach_stream function, maybe worth having in the lib? With a use kameo::error::SendError;
use kameo::prelude::Message;
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use kameo::actor::ActorRef;
use kameo::Actor;
pub trait ActorRefExt<A> {
fn run_interval<M>(&self, duration: Duration) -> JoinHandle<()>
where
A: Message<M>,
M: Default + Send + 'static;
}
impl<A> ActorRefExt<A> for ActorRef<A>
where
A: Actor,
{
fn run_interval<M>(&self, duration: Duration) -> JoinHandle<()>
where
A: Message<M>,
M: Default + Send + 'static,
{
let actor_ref = self.clone();
tokio::spawn(async move {
loop {
log::info!("Starting interval {duration:?}");
sleep(duration).await;
match actor_ref.tell(M::default()).await {
Err(SendError::ActorNotRunning(_)) => {
break;
}
Err(SendError::ActorStopped) => break,
_ => (),
}
}
})
}
} |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Hi!
Our system is currently on Actix. Slowly looking to move to a better system.
How would you suggest to handle recurring events like:
Spawning a task on tokio is always possible but feels very dirty and it is not structured (meaning if the actor dies, it doesn't necessarily get cleaned).
Thoughts?
Beta Was this translation helpful? Give feedback.
All reactions