Skip to content

Commit 41cbec2

Browse files
committed
Handle termination reason
1 parent 5820db3 commit 41cbec2

7 files changed

Lines changed: 251 additions & 72 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "openworkers-runner"
3-
version = "0.1.10"
3+
version = "0.1.11"
44
edition = "2021"
55
default-run = "openworkers-runner"
66

@@ -16,8 +16,8 @@ once_cell = "1.19"
1616
env_logger = "0.11.6"
1717
http_v02 = { package = "http", version = "0.2.12" }
1818
sqlx = { version = "0.8.3", features = [ "runtime-tokio", "postgres", "uuid", "bigdecimal", "rust_decimal" ] }
19-
openworkers-runtime ={ git = "https://github.com/openworkers/openworkers-runtime", tag = "v0.1.10"}
20-
# openworkers-runtime = { path = "../openworkers-runtime" }
19+
# openworkers-runtime ={ git = "https://github.com/openworkers/openworkers-runtime", tag = "v0.1.10"}
20+
openworkers-runtime = { path = "../openworkers-runtime" }
2121
async-nats = "0.37"
2222
futures = "0.3"
2323
serde_json = "1.0.137"

bin/main.rs

Lines changed: 157 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ use bytes::Bytes;
22

33
use log::debug;
44
use log::error;
5+
use log::warn;
6+
7+
use std::time::Duration;
58

69
use tokio::sync::oneshot::channel;
710

@@ -41,7 +44,14 @@ async fn handle_request(data: Data<AppState>, req: HttpRequest, body: Bytes) ->
4144

4245
// Expect x-request-id header
4346
let request_id = match req.headers().get("x-request-id") {
44-
Some(value) => value.to_str().unwrap(),
47+
Some(value) => match value.to_str() {
48+
Ok(s) => s,
49+
Err(_) => {
50+
return HttpResponse::BadRequest()
51+
.content_type("text/plain")
52+
.body("Invalid x-request-id header encoding");
53+
}
54+
},
4555
None => {
4656
return HttpResponse::BadRequest()
4757
.content_type("text/plain")
@@ -50,17 +60,35 @@ async fn handle_request(data: Data<AppState>, req: HttpRequest, body: Bytes) ->
5060
};
5161

5262
let host = match req.headers().get("host") {
53-
Some(value) => Some(value.to_str().unwrap().to_string()),
63+
Some(value) => match value.to_str() {
64+
Ok(s) => Some(s.to_string()),
65+
Err(_) => {
66+
error!("Invalid host header encoding");
67+
None
68+
}
69+
},
5470
None => None,
5571
};
5672

5773
let mut worker_id = match req.headers().get("x-worker-id") {
58-
Some(value) => Some(value.to_str().unwrap().to_string()),
74+
Some(value) => match value.to_str() {
75+
Ok(s) => Some(s.to_string()),
76+
Err(_) => {
77+
error!("Invalid x-worker-id header encoding");
78+
None
79+
}
80+
},
5981
None => None,
6082
};
6183

6284
let mut worker_name = match req.headers().get("x-worker-name") {
63-
Some(value) => Some(value.to_str().unwrap().to_string()),
85+
Some(value) => match value.to_str() {
86+
Ok(s) => Some(s.to_string()),
87+
Err(_) => {
88+
error!("Invalid x-worker-name header encoding");
89+
None
90+
}
91+
},
6492
None => None,
6593
};
6694

@@ -117,7 +145,7 @@ async fn handle_request(data: Data<AppState>, req: HttpRequest, body: Bytes) ->
117145

118146
// Create a new request to forward to the worker.
119147
let request = {
120-
let mut request: http_v02::Request<Bytes> = http_v02::Request::builder()
148+
let mut request: http_v02::Request<Bytes> = match http_v02::Request::builder()
121149
.uri(format!(
122150
"{}://{}{}",
123151
req.connection_info().scheme(),
@@ -126,7 +154,15 @@ async fn handle_request(data: Data<AppState>, req: HttpRequest, body: Bytes) ->
126154
))
127155
.method(req.method())
128156
.body(body)
129-
.unwrap();
157+
{
158+
Ok(r) => r,
159+
Err(e) => {
160+
error!("Failed to build request: {}", e);
161+
return HttpResponse::InternalServerError()
162+
.content_type("text/plain")
163+
.body("Failed to build forwarded request");
164+
}
165+
};
130166

131167
// Copy headers from the incoming request to the forwarded request.
132168
let headers = request.headers_mut();
@@ -136,18 +172,26 @@ async fn handle_request(data: Data<AppState>, req: HttpRequest, body: Bytes) ->
136172

137173
// If the worker id is not provided, we add it to the headers.
138174
if req.headers().get("x-worker-id").is_none() {
139-
headers.insert(
140-
"x-worker-id",
141-
http_v02::HeaderValue::from_str(&worker.id).unwrap(),
142-
);
175+
match http_v02::HeaderValue::from_str(&worker.id) {
176+
Ok(header_value) => {
177+
headers.insert("x-worker-id", header_value);
178+
}
179+
Err(e) => {
180+
error!("Invalid worker id for header: {}", e);
181+
}
182+
}
143183
}
144184

145185
// If the worker name is not provided, we add it to the headers.
146186
if req.headers().get("x-worker-name").is_none() {
147-
headers.insert(
148-
"x-worker-name",
149-
http_v02::HeaderValue::from_str(&worker.name).unwrap(),
150-
);
187+
match http_v02::HeaderValue::from_str(&worker.name) {
188+
Ok(header_value) => {
189+
headers.insert("x-worker-name", header_value);
190+
}
191+
Err(e) => {
192+
error!("Invalid worker name for header: {}", e);
193+
}
194+
}
151195
}
152196

153197
request
@@ -179,8 +223,9 @@ async fn handle_request(data: Data<AppState>, req: HttpRequest, body: Bytes) ->
179223
};
180224

181225
let (res_tx, res_rx) = channel::<http_v02::Response<Bytes>>();
226+
let (termination_tx, termination_rx) = channel::<openworkers_runner::TerminationReason>();
182227

183-
openworkers_runner::event_fetch::run_fetch(worker, request, res_tx, data.log_tx.clone(), permit);
228+
openworkers_runner::event_fetch::run_fetch(worker, request, res_tx, termination_tx, data.log_tx.clone(), permit);
184229

185230
let response = match res_rx.await {
186231
Ok(res) => {
@@ -192,9 +237,44 @@ async fn handle_request(data: Data<AppState>, req: HttpRequest, body: Bytes) ->
192237

193238
rb.body(res.body().clone())
194239
}
195-
Err(err) => {
196-
error!("worker fetch error: {}, ensure the worker registered a listener for the 'fetch' event", err);
197-
HttpResponse::InternalServerError().body(err.to_string())
240+
Err(_) => {
241+
// Worker didn't send a response, check termination reason
242+
use openworkers_runner::TerminationReason;
243+
244+
let reason = termination_rx.await.unwrap_or(TerminationReason::Exception);
245+
246+
error!("worker terminated without sending response: {:?}", reason);
247+
248+
let status = reason.http_status();
249+
let body = match reason {
250+
TerminationReason::Success => {
251+
// This shouldn't happen - worker completed but didn't send response
252+
"Worker completed but did not send a response (missing fetch event listener?)"
253+
}
254+
TerminationReason::CpuTimeLimit => {
255+
"Worker exceeded CPU time limit (100ms)"
256+
}
257+
TerminationReason::WallClockTimeout => {
258+
"Worker exceeded wall-clock time limit (60s)"
259+
}
260+
TerminationReason::MemoryLimit => {
261+
"Worker exceeded memory limit (128MB)"
262+
}
263+
TerminationReason::Exception => {
264+
"Worker threw an uncaught exception"
265+
}
266+
TerminationReason::InitializationError => {
267+
"Worker failed to initialize"
268+
}
269+
TerminationReason::Terminated => {
270+
"Worker was terminated"
271+
}
272+
};
273+
274+
HttpResponse::build(actix_web::http::StatusCode::from_u16(status).unwrap())
275+
.content_type("text/plain")
276+
.insert_header(("X-Termination-Reason", format!("{:?}", reason)))
277+
.body(body)
198278
}
199279
};
200280

@@ -231,26 +311,65 @@ async fn main() -> std::io::Result<()> {
231311
}
232312

233313
let db_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set");
234-
let pool = PgPoolOptions::new()
235-
.max_connections(4)
236-
.connect(&db_url)
237-
.await
238-
.expect("Failed to connect to Postgres");
239-
240-
// Check postgres connection
241-
sqlx::query("SELECT 1")
242-
.fetch_one(&pool)
243-
.await
244-
.expect("Failed to query Postgres");
245-
debug!("connected to Postgres");
246-
247-
// Check NATS connection
248-
let nats_client = openworkers_runner::nats::nats_connect().await;
249-
nats_client
250-
.publish("boot", "0".into())
251-
.await
252-
.expect("Failed to connect to NATS");
253-
debug!("connected to NATS");
314+
315+
// Retry database connection with exponential backoff
316+
let mut retry_count = 0;
317+
let max_retries = 5;
318+
let pool = loop {
319+
match PgPoolOptions::new()
320+
.max_connections(20) // Increased from 4
321+
.acquire_timeout(Duration::from_secs(5))
322+
.connect(&db_url)
323+
.await
324+
{
325+
Ok(pool) => {
326+
// Test the connection
327+
match sqlx::query("SELECT 1").fetch_one(&pool).await {
328+
Ok(_) => {
329+
debug!("connected to Postgres");
330+
break pool;
331+
}
332+
Err(e) => {
333+
error!("Database connection test failed: {}", e);
334+
if retry_count >= max_retries {
335+
panic!("Failed to connect to database after {} retries", max_retries);
336+
}
337+
}
338+
}
339+
}
340+
Err(e) => {
341+
retry_count += 1;
342+
if retry_count > max_retries {
343+
panic!("Failed to connect to database after {} retries: {}", max_retries, e);
344+
}
345+
let wait_time = Duration::from_secs(2u64.pow(retry_count.min(5)));
346+
warn!("Database connection attempt {} failed: {}. Retrying in {:?}...",
347+
retry_count, e, wait_time);
348+
tokio::time::sleep(wait_time).await;
349+
}
350+
}
351+
};
352+
353+
// Connect to NATS with retries
354+
let mut retry_count = 0;
355+
loop {
356+
match openworkers_runner::nats::nats_connect().await.publish("boot", "0".into()).await {
357+
Ok(_) => {
358+
debug!("connected to NATS");
359+
break;
360+
}
361+
Err(e) => {
362+
retry_count += 1;
363+
if retry_count > max_retries {
364+
panic!("Failed to connect to NATS after {} retries: {}", max_retries, e);
365+
}
366+
let wait_time = Duration::from_secs(2u64.pow(retry_count.min(5)));
367+
warn!("NATS connection attempt {} failed: {}. Retrying in {:?}...",
368+
retry_count, e, wait_time);
369+
tokio::time::sleep(wait_time).await;
370+
}
371+
}
372+
}
254373

255374
// Start global log publisher
256375
let log_tx = openworkers_runner::log::start_log_publisher();

src/event_fetch.rs

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@ use openworkers_runtime::FetchInit;
66
use openworkers_runtime::RuntimeLimits;
77
use openworkers_runtime::Script;
88
use openworkers_runtime::Task;
9+
use openworkers_runtime::TerminationReason;
910
use openworkers_runtime::Worker;
1011
use tokio::sync::OwnedSemaphorePermit;
1112

1213
use crate::store::WorkerData;
1314
use crate::worker_pool::WORKER_POOL;
1415

1516
type ResTx = tokio::sync::oneshot::Sender<http_v02::Response<Bytes>>;
17+
type TerminationTx = tokio::sync::oneshot::Sender<TerminationReason>;
1618

1719
// Default timeout for fetch events
1820
const FETCH_TIMEOUT_MS: u64 = 64_000; // 64 seconds
@@ -21,13 +23,31 @@ pub fn run_fetch(
2123
worker: WorkerData,
2224
req: http_v02::Request<Bytes>,
2325
res_tx: ResTx,
26+
termination_tx: TerminationTx,
2427
global_log_tx: std::sync::mpsc::Sender<crate::log::LogMessage>,
2528
permit: OwnedSemaphorePermit,
2629
) {
2730
let (log_tx, log_handler) = crate::log::create_log_handler(worker.id.clone(), global_log_tx);
2831

32+
let code = match crate::transform::parse_worker_code(&worker) {
33+
Ok(code) => code,
34+
Err(e) => {
35+
log::error!("Failed to parse worker code: {}", e);
36+
res_tx
37+
.send(
38+
http_v02::Response::builder()
39+
.status(500)
40+
.body(format!("Failed to parse worker code: {}", e).into())
41+
.unwrap(),
42+
)
43+
.ok(); // Ignore send error
44+
termination_tx.send(TerminationReason::InitializationError).ok();
45+
return;
46+
}
47+
};
48+
2949
let script = Script {
30-
code: crate::transform::parse_worker_code(&worker),
50+
code,
3151
env: match worker.env {
3252
Some(env) => Some(env.deref().to_owned()),
3353
None => None,
@@ -71,15 +91,23 @@ pub fn run_fetch(
7191

7292
// Wrap execution with timeout
7393
let timeout_duration = Duration::from_millis(FETCH_TIMEOUT_MS);
74-
match tokio::time::timeout(timeout_duration, worker.exec(task)).await {
75-
Ok(Ok(())) => log::debug!("exec completed"),
76-
Ok(Err(err)) => log::error!("exec did not complete: {err}"),
94+
let termination_reason = match tokio::time::timeout(timeout_duration, worker.exec(task)).await {
95+
Ok(Ok(reason)) => {
96+
log::debug!("worker exec completed: reason={:?}", reason);
97+
reason
98+
}
99+
Ok(Err(err)) => {
100+
log::error!("worker exec error: {err}");
101+
TerminationReason::Exception
102+
}
77103
Err(_) => {
78-
log::error!("exec timeout after {}ms", FETCH_TIMEOUT_MS);
79-
// Note: Worker may have already sent a response via FetchInit
80-
// If no response was sent, res_tx will be dropped and client gets an error
104+
log::error!("worker exec timeout after {}ms (outer timeout)", FETCH_TIMEOUT_MS);
105+
TerminationReason::WallClockTimeout
81106
}
82-
}
107+
};
108+
109+
// Send termination reason back to the main thread
110+
let _ = termination_tx.send(termination_reason);
83111

84112
// CRITICAL: Flush logs before worker is dropped to prevent log loss
85113
log_handler.flush();

0 commit comments

Comments
 (0)