Skip to content

Commit 60be15c

Browse files
committed
Add resource limits
1 parent a2f0200 commit 60be15c

7 files changed

Lines changed: 99 additions & 21 deletions

File tree

.github/workflows/docker.yml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
name: Build docker image
21

32
on:
43
push:
@@ -8,12 +7,14 @@ on:
87
tags:
98
- 'v*.*.*'
109
pull_request:
11-
branches: [ master ]
10+
branches:
11+
- master
12+
- dev
1213

1314
jobs:
14-
build_types:
15+
build:
1516
name: Build
16-
runs-on: ubuntu-latest
17+
runs-on: ubuntu-24.04
1718
steps:
1819
- name: Check Out Repo
1920
uses: actions/checkout@v4

.vscode/settings.json

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
11
{
2-
"cSpell.words": ["actix", "dotenv", "nats", "oneshot", "sqlx"]
2+
"cSpell.words": [
3+
"actix",
4+
"codegen",
5+
"dotenv",
6+
"nats",
7+
"oneshot",
8+
"openworkers",
9+
"serde",
10+
"sqlx"
11+
]
312
}

Cargo.lock

Lines changed: 30 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "openworkers-runner"
3-
version = "0.1.9"
3+
version = "0.1.10"
44
edition = "2021"
55
default-run = "openworkers-runner"
66

@@ -14,7 +14,7 @@ tokio = "1.43.0"
1414
env_logger = "0.11.6"
1515
http_v02 = { package = "http", version = "0.2.12" }
1616
sqlx = { version = "0.8.3", features = [ "runtime-tokio", "postgres", "uuid", "bigdecimal", "rust_decimal" ] }
17-
openworkers-runtime ={ git = "https://github.com/openworkers/openworkers-runtime", tag = "v0.1.9"}
17+
openworkers-runtime ={ git = "https://github.com/openworkers/openworkers-runtime", tag = "v0.1.10"}
1818
# openworkers-runtime = { path = "../openworkers-runtime" }
1919
nats = "0.25.0"
2020
serde_json = "1.0.137"

src/event_fetch.rs

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
use std::ops::Deref;
22
use std::thread::JoinHandle;
3+
use std::time::Duration;
34

45
use bytes::Bytes;
56
use openworkers_runtime::FetchInit;
7+
use openworkers_runtime::RuntimeLimits;
68
use openworkers_runtime::Script;
79
use openworkers_runtime::Task;
810
use openworkers_runtime::Worker;
@@ -11,6 +13,9 @@ use crate::store::WorkerData;
1113

1214
type ResTx = tokio::sync::oneshot::Sender<http_v02::Response<Bytes>>;
1315

16+
// Default timeout for fetch events
17+
const FETCH_TIMEOUT_MS: u64 = 64_000; // 64 seconds
18+
1419
pub fn run_fetch(
1520
worker: WorkerData,
1621
req: http_v02::Request<Bytes>,
@@ -31,7 +36,14 @@ pub fn run_fetch(
3136

3237
let tasks = local.spawn_local(async move {
3338
log::debug!("create worker");
34-
let mut worker = match Worker::new(script, Some(log_tx)).await {
39+
40+
let limits = RuntimeLimits {
41+
max_cpu_time_ms: 100, // 100ms CPU time for fetch tasks
42+
max_wall_clock_time_ms: 60_000, // 60s total time for fetch tasks
43+
..Default::default()
44+
};
45+
46+
let mut worker = match Worker::new(script, Some(log_tx), Some(limits)).await {
3547
Ok(worker) => worker,
3648
Err(err) => {
3749
log::error!("failed to create worker: {err}");
@@ -50,10 +62,18 @@ pub fn run_fetch(
5062

5163
let task = Task::Fetch(Some(FetchInit::new(req, res_tx)));
5264

53-
log::debug!("exec fetch task");
54-
match worker.exec(task).await {
55-
Ok(()) => log::debug!("exec completed"),
56-
Err(err) => log::error!("exec did not complete: {err}"),
65+
log::debug!("exec fetch task with {}ms timeout", FETCH_TIMEOUT_MS);
66+
67+
// Wrap execution with timeout
68+
let timeout_duration = Duration::from_millis(FETCH_TIMEOUT_MS);
69+
match tokio::time::timeout(timeout_duration, worker.exec(task)).await {
70+
Ok(Ok(())) => log::debug!("exec completed"),
71+
Ok(Err(err)) => log::error!("exec did not complete: {err}"),
72+
Err(_) => {
73+
log::error!("exec timeout after {}ms", FETCH_TIMEOUT_MS);
74+
// Note: Worker may have already sent a response via FetchInit
75+
// If no response was sent, res_tx will be dropped and client gets an error
76+
}
5777
}
5878
});
5979

src/event_scheduled.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::ops::Deref;
22

3+
use openworkers_runtime::RuntimeLimits;
34
use openworkers_runtime::ScheduledInit;
45
use openworkers_runtime::Script;
56
use openworkers_runtime::Task;
@@ -36,7 +37,16 @@ fn run_scheduled(data: ScheduledData, script: Script) {
3637

3738
local.spawn_local(async move {
3839
log::debug!("create worker");
39-
let mut worker = Worker::new(script, Some(log_tx)).await.unwrap();
40+
41+
let limits = RuntimeLimits {
42+
max_cpu_time_ms: 100, // 100ms CPU time for scheduled tasks
43+
max_wall_clock_time_ms: 60_000, // 60s total time for scheduled tasks
44+
..Default::default()
45+
};
46+
47+
let mut worker = Worker::new(script, Some(log_tx), Some(limits))
48+
.await
49+
.unwrap();
4050

4151
log::debug!("exec scheduled task");
4252
match worker.exec(task).await {
@@ -47,7 +57,7 @@ fn run_scheduled(data: ScheduledData, script: Script) {
4757

4858
log::debug!("scheduled task listener started");
4959

50-
match local.block_on(&rt, async { res_rx.await } ) {
60+
match local.block_on(&rt, async { res_rx.await }) {
5161
Ok(()) => {}
5262
Err(err) => log::error!("failed to wait for end: {err}"),
5363
}
@@ -108,7 +118,7 @@ pub fn handle_scheduled(db: sqlx::Pool<sqlx::Postgres>) {
108118
code: crate::transform::parse_worker_code(&worker),
109119
env: match worker.env {
110120
Some(env) => Some(env.deref().to_owned()),
111-
None => None
121+
None => None,
112122
},
113123
};
114124

src/store.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ pub struct WorkerData {
2323
pub language: WorkerLanguage,
2424
}
2525

26-
pub async fn get_worker(conn: &mut sqlx::PgConnection, identifier: WorkerIdentifier) -> Option<WorkerData> {
26+
pub async fn get_worker(
27+
conn: &mut sqlx::PgConnection,
28+
identifier: WorkerIdentifier,
29+
) -> Option<WorkerData> {
2730
log::debug!("get_worker: {:?}", identifier);
2831

2932
let query = format!(
@@ -58,7 +61,12 @@ pub async fn get_worker(conn: &mut sqlx::PgConnection, identifier: WorkerIdentif
5861
.await
5962
{
6063
Ok(worker) => {
61-
log::debug!("worker found: id: {}, checksum: {}, language: {:?}", worker.id, worker.checksum, worker.language);
64+
log::debug!(
65+
"worker found: id: {}, checksum: {}, language: {:?}",
66+
worker.id,
67+
worker.checksum,
68+
worker.language
69+
);
6270
Some(worker)
6371
}
6472
Err(err) => {
@@ -68,7 +76,10 @@ pub async fn get_worker(conn: &mut sqlx::PgConnection, identifier: WorkerIdentif
6876
}
6977
}
7078

71-
pub async fn get_worker_id_from_domain(conn: &mut sqlx::PgConnection, domain: String) -> Option<String> {
79+
pub async fn get_worker_id_from_domain(
80+
conn: &mut sqlx::PgConnection,
81+
domain: String,
82+
) -> Option<String> {
7283
let query = sqlx::query_scalar!(
7384
"SELECT worker_id::text FROM domains WHERE name = $1 LIMIT 1",
7485
domain

0 commit comments

Comments
 (0)