Skip to content

Commit 02d2f80

Browse files
author
Simon
committed
feat: OPENVX_PIPELINING_THREADS env var for thread pool configuration
- compute_pool_size() reads OPENVX_PIPELINING_THREADS env var - unset/"0"/"" → auto-detect core count (up to 64) - "1" → single-threaded (useful for debugging / baseline) - "N" → exactly N threads - Single-threaded mode still works and produces correct results - All UserKernel tests pass with OPENVX_PIPELINING_THREADS=1,2,auto
1 parent 5f9856c commit 02d2f80

2 files changed

Lines changed: 41 additions & 67 deletions

File tree

multicore-plan.md

Lines changed: 12 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -14,69 +14,18 @@ The topological sort now correctly orders nodes including scalar dependencies (f
1414

1515
## Plan
1616

17-
### Step 1: [DOCUMENT] Document current single-threaded architecture
18-
**Dependencies:** None
19-
**Approach:**
20-
- Write `docs/pipelining_architecture.md` describing:
21-
- How QUEUE_AUTO executor thread works
22-
- How `execute_pipelined_graph` currently works (sequential node dispatch)
23-
- How `finish_pipelined_execution` works (moving refs to done, emitting events)
24-
- Where synchronization points are (`execution_mutex`, `active_executions`, `active_cv`)
25-
- Why current approach is safe but not parallel
26-
**Verification:** Document is clear enough for another engineer to understand
27-
**Files:** `docs/pipelining_architecture.md`
28-
29-
### Step 2: [DESIGN] Design wave-based parallel execution model
30-
**Dependencies:** Step 1
31-
**Approach:**
32-
- Design a wave-based execution model:
33-
- **Wave 0:** All nodes with in-degree 0 execute in parallel
34-
- **Wave 1:** After all wave-0 nodes finish, nodes whose dependencies are satisfied execute in parallel
35-
- Continue until all waves complete
36-
- Determine synchronization strategy:
37-
- Per-wave barrier (all nodes in wave N must finish before wave N+1 starts)
38-
- Or per-node notification (each node notifies its dependents when done)
39-
- Decide thread pool model:
40-
- Global thread pool (shared across all graphs) vs per-graph thread pool
41-
- Configuration via environment variable or build flag
42-
- Define memory model for shared state:
43-
- `REF_SUBSTITUTIONS` is thread-local — safe for parallel reads, needs coordination for writes
44-
- Node output refs are written by producer, read by consumers in later waves
45-
- `NODE_PARAMETER_BINDINGS` / `GRAPH_PARAMETER_BINDINGS` are read-only during execution
46-
**Verification:** Design doc covers edge cases (cyclic graphs, error handling, graph release during execution)
47-
**Files:** `docs/multicore_pipeline_design.md`
48-
49-
### Step 3: [IMPLEMENT] Create global thread pool for node dispatch
50-
**Dependencies:** Step 2
51-
**Approach:**
52-
- Create `openvx-core/src/thread_pool.rs`:
53-
- `ThreadPool` struct with configurable worker count
54-
- `spawn` method that accepts closures
55-
- `join` or `wait_all` for barrier synchronization
56-
- Use `std::thread` + channels (or `crossbeam` if available)
57-
- Configuration:
58-
- Default thread count = `num_cpus::get()` or env var `OPENVX_PIPELINING_THREADS`
59-
- Minimum 1, maximum 64 (sanity cap)
60-
- Lazy initialization: thread pool created on first use, shared across all graphs
61-
**Verification:** Thread pool compiles, basic spawn/join test passes
62-
**Files:** `openvx-core/src/thread_pool.rs`, update `openvx-core/src/lib.rs`
63-
64-
### Step 4: [IMPLEMENT] Wave-based node execution in `execute_pipelined_graph`
65-
**Dependencies:** Step 3
66-
**Approach:**
67-
- Modify `execute_pipelined_graph` in `pipelining_executor.rs`:
68-
1. Build wave map: group nodes by topological depth (already computed by `vxVerifyGraph`)
69-
2. For each wave:
70-
a. Spawn all nodes in wave to thread pool
71-
b. Wait for all wave nodes to complete
72-
c. Check for errors — if any node failed, abort remaining waves
73-
3. After all waves: run `finish_pipelined_execution`
74-
- Key changes:
75-
- `execute_node` must be safe to call from multiple threads concurrently (verify no shared mutable state)
76-
- Error propagation: collect status from all nodes in wave, fail fast
77-
- Keep thread-local `REF_SUBSTITUTIONS` per worker thread (already thread-local)
78-
**Verification:** Single-graph pipelining tests pass, no regressions in non-pipelining tests
79-
**Files:** `openvx-core/src/pipelining_executor.rs`
17+
### Step 1: [DOCUMENT] Document current single-threaded architecture ✅ DONE
18+
**Status:** `docs/pipelining_architecture.md` written
19+
20+
### Step 2: [DESIGN] Design wave-based parallel execution model ✅ DONE
21+
**Status:** `docs/multicore_pipeline_design.md` written
22+
23+
### Step 3: [IMPLEMENT] Create global thread pool for node dispatch ✅ DONE
24+
**Status:** `openvx-core/src/thread_pool.rs` created and tested
25+
26+
### Step 4: [IMPLEMENT] Wave-based node execution in `execute_pipelined_graph` ✅ DONE
27+
**Status:** `pipelining_executor.rs` modified with wave-based dispatch
28+
**Tests:** All 9 UserKernel tests pass, 12 representative tests pass
8029

8130
### Step 5: [IMPLEMENT] Per-node execution wrapper with error propagation
8231
**Dependencies:** Step 4

openvx-core/src/thread_pool.rs

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use std::sync::atomic::{AtomicUsize, Ordering};
21
use std::sync::{mpsc, Arc, Mutex};
32
use std::thread;
43

@@ -87,16 +86,42 @@ pub fn init_global_pool(size: usize) {
8786

8887
/// Get a reference to the global thread pool, initializing with default
8988
/// size (hardware concurrency or 4) if not already set.
89+
/// Respects `OPENVX_PIPELINING_THREADS` environment variable:
90+
/// - unset / "0" / "" → auto-detect core count
91+
/// - "1" → single-threaded (sequential fallback)
92+
/// - "N" → exactly N threads, capped at 64
9093
pub fn get_global_pool() -> Option<Arc<ThreadPool>> {
9194
let pool = GLOBAL_POOL.get_or_init(|| {
92-
let size = std::thread::available_parallelism()
93-
.map(|n| n.get())
94-
.unwrap_or(4);
95+
let size = compute_pool_size();
9596
Arc::new(ThreadPool::new(size))
9697
});
9798
Some(Arc::clone(pool))
9899
}
99100

101+
/// Compute thread pool size from environment and hardware.
102+
fn compute_pool_size() -> usize {
103+
if let Ok(val) = std::env::var("OPENVX_PIPELINING_THREADS") {
104+
if !val.is_empty() {
105+
if let Ok(n) = val.parse::<usize>() {
106+
if n == 0 {
107+
// Auto-detect
108+
let auto = std::thread::available_parallelism()
109+
.map(|p| p.get())
110+
.unwrap_or(4);
111+
return auto.max(1).min(64);
112+
} else {
113+
return n.max(1).min(64);
114+
}
115+
}
116+
}
117+
}
118+
// Default: auto-detect
119+
let auto = std::thread::available_parallelism()
120+
.map(|p| p.get())
121+
.unwrap_or(4);
122+
auto.max(1).min(64)
123+
}
124+
100125
/// Set a custom pool size (for testing or tuning). Returns false if pool
101126
/// already initialized.
102127
pub fn set_pool_size(size: usize) -> bool {

0 commit comments

Comments
 (0)