Skip to content

Commit a4d22a0

Browse files
authored
Fix executors heartbeat flaw on startup (#82)
* Fix executors heartbeat flaw by setting them before spawning executors * Fix worker startup command in README.md
1 parent 7b2002f commit a4d22a0

3 files changed

Lines changed: 22 additions & 15 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ It picks the latest released worker based on your python version and installs it
9898
Running the worker is straightforward:
9999

100100
```bash
101-
fluxqueue start --tasks-module-path tasks
101+
fluxqueue start --tasks-module-path myapp/tasks
102102
```
103103

104104
In order the worker to disover your tasks you need to pass `--tasks-module-path` argument with the path to the tasks module. For more information please view the [documentation](https://fluxqueue.ccxlv.dev/tutorial/defininig_and_exposing_tasks).

crates/fluxqueue-worker/src/redis_client.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,25 +33,30 @@ impl RedisClient {
3333
Ok(())
3434
}
3535

36-
pub async fn set_executor_heartbeat(&self, executor_ids: Arc<Vec<Arc<str>>>) -> Result<()> {
37-
let mut conn = self.redis_pool.get().await?;
38-
36+
pub async fn set_executors_heartbeat(&self, executor_ids: Arc<Vec<Arc<str>>>) -> Result<()> {
3937
for id in executor_ids.iter() {
40-
let heartbeat_key = keys::get_heartbeat_key(id);
41-
42-
let _: () = redis::cmd("SET")
43-
.arg(heartbeat_key)
44-
.arg(1)
45-
.arg("EX")
46-
.arg(15)
47-
.query_async(&mut conn)
48-
.await
49-
.map_err(|e| anyhow::anyhow!("Failed to set executor heartbeat: {}", e))?;
38+
self.set_executor_heartbeat(id).await?;
5039
}
5140

5241
Ok(())
5342
}
5443

44+
pub async fn set_executor_heartbeat(&self, executor_id: &str) -> Result<()> {
45+
let mut conn = self.redis_pool.get().await?;
46+
let heartbeat_key = keys::get_heartbeat_key(executor_id);
47+
48+
let _: () = redis::cmd("SET")
49+
.arg(heartbeat_key)
50+
.arg(1)
51+
.arg("EX")
52+
.arg(15)
53+
.query_async(&mut conn)
54+
.await
55+
.map_err(|e| anyhow::anyhow!("Failed to set executor heartbeat: {}", e))?;
56+
57+
Ok(())
58+
}
59+
5560
pub async fn cleanup_executors_registry(
5661
&self,
5762
queue_name: &str,

crates/fluxqueue-worker/src/worker.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ pub async fn run_worker(
6363
.register_executor(&queue_name, &executor_id)
6464
.await?;
6565

66+
redis_client.set_executor_heartbeat(&executor_id).await?;
67+
6668
executors.spawn(executor_loop(
6769
shutdown,
6870
queue_name,
@@ -249,7 +251,7 @@ async fn janitor_loop(
249251
hearbeat = async {
250252
let executor_ids = Arc::clone(&executor_ids);
251253
redis_client
252-
.set_executor_heartbeat(executor_ids)
254+
.set_executors_heartbeat(executor_ids)
253255
.await
254256
} => {
255257
match hearbeat {

0 commit comments

Comments
 (0)