Skip to content

Commit 269bd6f

Browse files
authored
perf: Speed up executor startup (#117)
1 parent 9a4d4b4 commit 269bd6f

3 files changed

Lines changed: 50 additions & 36 deletions

File tree

Cargo.lock

Lines changed: 19 additions & 25 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/fluxqueue-worker/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,6 @@ anyhow = "1.0.100"
3333
pythonize = "0.27.0"
3434
chrono = "0.4.43"
3535
num_cpus = "1.17.0"
36+
futures = "0.3.32"
3637

3738
fluxqueue-common.workspace = true

crates/fluxqueue-worker/src/worker.rs

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,20 +50,39 @@ pub async fn run_worker(
5050
let executor_ids = generate_executor_ids(concurrency);
5151
let mut executors = JoinSet::new();
5252

53-
for i in 0..concurrency {
54-
let redis_client = Arc::clone(&redis_client);
55-
let queue_name = Arc::clone(&queue_name);
56-
let executor_id = Arc::clone(&executor_ids[i]);
57-
let shutdown = shutdown.clone();
58-
let task_registry = Arc::clone(&task_registry);
59-
let python_dispatcher = Arc::new(PythonDispatcher::new()?);
53+
let executor_futures: Vec<_> = (0..concurrency)
54+
.map(|i| {
55+
let redis_client = Arc::clone(&redis_client);
56+
let queue_name = Arc::clone(&queue_name);
57+
let executor_id = Arc::clone(&executor_ids[i]);
58+
let shutdown = shutdown.clone();
59+
let task_registry = Arc::clone(&task_registry);
6060

61-
redis_client
62-
.register_executor(&queue_name, &executor_id)
63-
.await?;
61+
async move {
62+
let python_dispatcher = Arc::new(PythonDispatcher::new()?);
6463

65-
redis_client.set_executor_heartbeat(&executor_id).await?;
64+
redis_client
65+
.register_executor(&queue_name, &executor_id)
66+
.await?;
67+
68+
redis_client.set_executor_heartbeat(&executor_id).await?;
69+
70+
Ok::<_, anyhow::Error>((
71+
shutdown,
72+
queue_name,
73+
executor_id,
74+
redis_client,
75+
task_registry,
76+
python_dispatcher,
77+
))
78+
}
79+
})
80+
.collect();
6681

82+
let results = futures::future::join_all(executor_futures).await;
83+
for result in results {
84+
let (shutdown, queue_name, executor_id, redis_client, task_registry, python_dispatcher) =
85+
result?;
6786
executors.spawn(executor_loop(
6887
shutdown,
6988
queue_name,

0 commit comments

Comments
 (0)