Skip to content

Commit 5820db3

Browse files
committed
Optimize worker execution with thread pool
1 parent 5de6a6e commit 5820db3

7 files changed

Lines changed: 213 additions & 96 deletions

File tree

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ actix-web = "4.9.0"
1111
bytes = "1.6.0"
1212
log = "0.4.21"
1313
tokio = "1.43.0"
14+
tokio-util = { version = "0.7", features = ["rt"] }
15+
once_cell = "1.19"
1416
env_logger = "0.11.6"
1517
http_v02 = { package = "http", version = "0.2.12" }
1618
sqlx = { version = "0.8.3", features = [ "runtime-tokio", "postgres", "uuid", "bigdecimal", "rust_decimal" ] }

bin/main.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,11 +153,35 @@ async fn handle_request(data: Data<AppState>, req: HttpRequest, body: Bytes) ->
153153
request
154154
};
155155

156+
// Try to acquire a worker slot from the semaphore with timeout
157+
let timeout = openworkers_runner::worker_pool::get_worker_wait_timeout();
158+
let permit = match tokio::time::timeout(
159+
timeout,
160+
openworkers_runner::worker_pool::WORKER_SEMAPHORE
161+
.clone()
162+
.acquire_owned(),
163+
)
164+
.await
165+
{
166+
Ok(Ok(permit)) => permit,
167+
Ok(Err(_)) => {
168+
error!("semaphore closed unexpectedly");
169+
return HttpResponse::InternalServerError()
170+
.content_type("text/plain")
171+
.body("Internal server error");
172+
}
173+
Err(_) => {
174+
debug!("worker pool saturated after {}ms timeout, returning 503", timeout.as_millis());
175+
return HttpResponse::ServiceUnavailable()
176+
.content_type("text/plain")
177+
.body("Server is overloaded, please try again later");
178+
}
179+
};
180+
156181
let (res_tx, res_rx) = channel::<http_v02::Response<Bytes>>();
157182

158-
let handle = openworkers_runner::event_fetch::run_fetch(worker, request, res_tx, data.log_tx.clone());
183+
openworkers_runner::event_fetch::run_fetch(worker, request, res_tx, data.log_tx.clone(), permit);
159184

160-
// TODO: select! on res_rx, timeout and handle.join()
161185
let response = match res_rx.await {
162186
Ok(res) => {
163187
let mut rb = HttpResponse::build(res.status());
@@ -176,8 +200,6 @@ async fn handle_request(data: Data<AppState>, req: HttpRequest, body: Bytes) ->
176200

177201
debug!("handle_request done in {}ms", start.elapsed().as_millis());
178202

179-
handle.join().unwrap();
180-
181203
response
182204
}
183205

src/event_fetch.rs

Lines changed: 51 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use std::ops::Deref;
2-
use std::thread::JoinHandle;
32
use std::time::Duration;
43

54
use bytes::Bytes;
@@ -8,8 +7,10 @@ use openworkers_runtime::RuntimeLimits;
87
use openworkers_runtime::Script;
98
use openworkers_runtime::Task;
109
use openworkers_runtime::Worker;
10+
use tokio::sync::OwnedSemaphorePermit;
1111

1212
use crate::store::WorkerData;
13+
use crate::worker_pool::WORKER_POOL;
1314

1415
type ResTx = tokio::sync::oneshot::Sender<http_v02::Response<Bytes>>;
1516

@@ -21,7 +22,8 @@ pub fn run_fetch(
2122
req: http_v02::Request<Bytes>,
2223
res_tx: ResTx,
2324
global_log_tx: std::sync::mpsc::Sender<crate::log::LogMessage>,
24-
) -> JoinHandle<()> {
25+
permit: OwnedSemaphorePermit,
26+
) {
2527
let (log_tx, log_handler) = crate::log::create_log_handler(worker.id.clone(), global_log_tx);
2628

2729
let script = Script {
@@ -32,63 +34,56 @@ pub fn run_fetch(
3234
},
3335
};
3436

35-
std::thread::spawn(move || {
36-
let local = tokio::task::LocalSet::new();
37-
38-
let tasks = local.spawn_local(async move {
39-
log::debug!("create worker");
40-
41-
let limits = RuntimeLimits {
42-
max_cpu_time_ms: 100, // 100ms CPU time for fetch tasks
43-
max_wall_clock_time_ms: 60_000, // 60s total time for fetch tasks
44-
..Default::default()
45-
};
46-
47-
let mut worker = match Worker::new(script, Some(log_tx), Some(limits)).await {
48-
Ok(worker) => worker,
49-
Err(err) => {
50-
log::error!("failed to create worker: {err}");
51-
res_tx
52-
.send(
53-
http_v02::Response::builder()
54-
.status(500)
55-
.body(format!("failed to create worker: {err}").into())
56-
.unwrap(),
57-
)
58-
.unwrap();
59-
60-
return;
61-
}
62-
};
63-
64-
let task = Task::Fetch(Some(FetchInit::new(req, res_tx)));
65-
66-
log::debug!("exec fetch task with {}ms timeout", FETCH_TIMEOUT_MS);
67-
68-
// Wrap execution with timeout
69-
let timeout_duration = Duration::from_millis(FETCH_TIMEOUT_MS);
70-
match tokio::time::timeout(timeout_duration, worker.exec(task)).await {
71-
Ok(Ok(())) => log::debug!("exec completed"),
72-
Ok(Err(err)) => log::error!("exec did not complete: {err}"),
73-
Err(_) => {
74-
log::error!("exec timeout after {}ms", FETCH_TIMEOUT_MS);
75-
// Note: Worker may have already sent a response via FetchInit
76-
// If no response was sent, res_tx will be dropped and client gets an error
77-
}
37+
// Use the global worker pool instead of spawning a new thread
38+
WORKER_POOL.spawn_pinned(move || async move {
39+
// Keep the permit alive for the entire worker execution
40+
// It will be automatically released when this async block completes
41+
let _permit = permit;
42+
43+
log::debug!("create worker");
44+
45+
let limits = RuntimeLimits {
46+
max_cpu_time_ms: 100, // 100ms CPU time for fetch tasks
47+
max_wall_clock_time_ms: 60_000, // 60s total time for fetch tasks
48+
..Default::default()
49+
};
50+
51+
let mut worker = match Worker::new(script, Some(log_tx), Some(limits)).await {
52+
Ok(worker) => worker,
53+
Err(err) => {
54+
log::error!("failed to create worker: {err}");
55+
res_tx
56+
.send(
57+
http_v02::Response::builder()
58+
.status(500)
59+
.body(format!("failed to create worker: {err}").into())
60+
.unwrap(),
61+
)
62+
.unwrap();
63+
64+
return;
7865
}
66+
};
7967

80-
// CRITICAL: Flush logs before worker is dropped to prevent log loss
81-
log_handler.flush();
82-
});
68+
let task = Task::Fetch(Some(FetchInit::new(req, res_tx)));
8369

84-
let rt = tokio::runtime::Builder::new_current_thread()
85-
.enable_all()
86-
.build()
87-
.unwrap();
70+
log::debug!("exec fetch task with {}ms timeout", FETCH_TIMEOUT_MS);
8871

89-
match local.block_on(&rt, tasks) {
90-
Ok(()) => {}
91-
Err(err) => log::error!("failed to wait for end: {err}"),
72+
// Wrap execution with timeout
73+
let timeout_duration = Duration::from_millis(FETCH_TIMEOUT_MS);
74+
match tokio::time::timeout(timeout_duration, worker.exec(task)).await {
75+
Ok(Ok(())) => log::debug!("exec completed"),
76+
Ok(Err(err)) => log::error!("exec did not complete: {err}"),
77+
Err(_) => {
78+
log::error!("exec timeout after {}ms", FETCH_TIMEOUT_MS);
79+
// Note: Worker may have already sent a response via FetchInit
80+
// If no response was sent, res_tx will be dropped and client gets an error
81+
}
9282
}
93-
})
83+
84+
// CRITICAL: Flush logs before worker is dropped to prevent log loss
85+
log_handler.flush();
86+
87+
// Permit is automatically released here when _permit goes out of scope
88+
});
9489
}

src/event_scheduled.rs

Lines changed: 50 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use serde::Deserialize;
1010
use serde::Serialize;
1111

1212
use crate::store;
13+
use crate::worker_pool::WORKER_POOL;
1314

1415
#[derive(Debug, Serialize, Deserialize)]
1516
#[serde(rename_all = "camelCase")]
@@ -25,51 +26,64 @@ fn run_scheduled(
2526
script: Script,
2627
global_log_tx: std::sync::mpsc::Sender<crate::log::LogMessage>,
2728
) {
28-
let (res_tx, res_rx) = tokio::sync::oneshot::channel::<()>();
29-
30-
let task = Task::Scheduled(Some(ScheduledInit::new(res_tx, data.scheduled_time)));
29+
// Try to acquire a worker slot
30+
let permit = match crate::worker_pool::WORKER_SEMAPHORE
31+
.clone()
32+
.try_acquire_owned()
33+
{
34+
Ok(permit) => permit,
35+
Err(_) => {
36+
log::warn!(
37+
"worker pool saturated, skipping scheduled task for worker: {}",
38+
data.worker_id
39+
);
40+
return;
41+
}
42+
};
3143

3244
let (log_tx, log_handler) = crate::log::create_log_handler(data.worker_id, global_log_tx);
3345

34-
std::thread::spawn(move || {
35-
let rt = tokio::runtime::Builder::new_current_thread()
36-
.enable_all()
37-
.build()
38-
.unwrap();
39-
40-
let local = tokio::task::LocalSet::new();
41-
42-
local.spawn_local(async move {
43-
log::debug!("create worker");
44-
45-
let limits = RuntimeLimits {
46-
max_cpu_time_ms: 100, // 100ms CPU time for scheduled tasks
47-
max_wall_clock_time_ms: 60_000, // 60s total time for scheduled tasks
48-
..Default::default()
49-
};
50-
51-
let mut worker = Worker::new(script, Some(log_tx), Some(limits))
52-
.await
53-
.unwrap();
54-
55-
log::debug!("exec scheduled task");
56-
match worker.exec(task).await {
57-
Ok(()) => log::debug!("exec completed"),
58-
Err(err) => log::error!("exec did not complete: {err}"),
46+
// Use the global worker pool instead of spawning a new thread
47+
WORKER_POOL.spawn_pinned(move || async move {
48+
// Keep the permit alive for the entire worker execution
49+
let _permit = permit;
50+
log::debug!("create worker");
51+
52+
let limits = RuntimeLimits {
53+
max_cpu_time_ms: 100, // 100ms CPU time for scheduled tasks
54+
max_wall_clock_time_ms: 60_000, // 60s total time for scheduled tasks
55+
..Default::default()
56+
};
57+
58+
let mut worker = match Worker::new(script, Some(log_tx), Some(limits)).await {
59+
Ok(worker) => worker,
60+
Err(err) => {
61+
log::error!("failed to create scheduled worker: {err}");
62+
log_handler.flush();
63+
return;
5964
}
65+
};
6066

61-
// CRITICAL: Flush logs before worker is dropped to prevent log loss
62-
log_handler.flush();
63-
});
67+
// Create the oneshot channel INSIDE the async block so the receiver stays alive
68+
let (res_tx, res_rx) = tokio::sync::oneshot::channel::<()>();
69+
let task = Task::Scheduled(Some(ScheduledInit::new(res_tx, data.scheduled_time)));
6470

65-
log::debug!("scheduled task listener started");
71+
log::debug!("exec scheduled task");
72+
match worker.exec(task).await {
73+
Ok(()) => log::debug!("exec completed"),
74+
Err(err) => log::error!("exec did not complete: {err}"),
75+
}
6676

67-
match local.block_on(&rt, async { res_rx.await }) {
68-
Ok(()) => {}
69-
Err(err) => log::error!("failed to wait for end: {err}"),
77+
// Wait for the scheduled event to complete
78+
match res_rx.await {
79+
Ok(()) => log::debug!("scheduled task responded"),
80+
Err(err) => log::error!("scheduled task response error: {err}"),
7081
}
7182

72-
log::debug!("scheduled task listener stopped");
83+
// CRITICAL: Flush logs before worker is dropped to prevent log loss
84+
log_handler.flush();
85+
86+
// Permit is automatically released here when _permit goes out of scope
7387
});
7488
}
7589

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ pub mod event_scheduled;
33
pub mod log;
44
pub mod nats;
55
pub mod store;
6+
pub mod worker_pool;
67
mod transform;

0 commit comments

Comments
 (0)