Skip to content

Commit abbfdfb

Browse files
committed
Implement Barrier
1 parent d2cba5a commit abbfdfb

File tree

10 files changed

+525
-3
lines changed

10 files changed

+525
-3
lines changed

README.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,39 @@ spawn(move(mutex, cv), async (mutex, cv) => {
282282
});
283283
```
284284

285+
### 5\. Barrier
286+
287+
A `Barrier` acts as a synchronization checkpoint. It blocks all participating threads until a specific number (`N`) of threads have reached the barrier. Once the last thread arrives, all threads are released simultaneously.
288+
289+
Barriers are **cyclic**, meaning they automatically reset and can be reused immediately for subsequent phases of work (e.g., in iterative algorithms like simulations).
290+
291+
**Leader Election:** The `wait()` method returns an object `{ isLeader: boolean }`. Exactly **one** thread is guaranteed to be the leader per cycle. This is useful for performing single-threaded setup or cleanup tasks between parallel phases.
292+
293+
```typescript
294+
import { spawn, move, Barrier } from "multithreading";
295+
296+
const N = 4;
297+
const barrier = new Barrier(N);
298+
299+
for (let i = 0; i < N; i++) {
300+
spawn(move(barrier), async (b) => {
301+
// Phase 1
302+
console.log("Working on Phase 1...");
303+
304+
// Wait for all 4 threads to reach this point
305+
const res = await b.wait();
306+
307+
// Only one thread runs this code
308+
if (res.isLeader) {
309+
console.log("All threads finished Phase 1. preparing Phase 2...");
310+
}
311+
312+
// Phase 2 (All threads start this simultaneously)
313+
console.log("Working on Phase 2...");
314+
});
315+
}
316+
```
317+
285318
-----
286319

287320
## Channels (MPMC)
@@ -440,6 +473,9 @@ Content-Security-Policy: default-src 'self'; worker-src 'self' blob:; script-src
440473
* `notifyOne()`: Wake one waiting thread.
441474
* `notifyAll()`: Wake all waiting threads.
442475
* `blockingWait(guard)`: Blocking wait (Halts Worker).
476+
* **`Barrier`**:
477+
* `wait()`: Async wait (Recommended). Returns `Promise<{ isLeader: boolean }>`.
478+
* `blockingWait()`: Blocking wait (Halts Worker). Returns `{ isLeader: boolean }`.
443479

444480
-----
445481

src/lib/check_move_args.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,12 @@
1-
import { Condvar, Mutex, Receiver, RwLock, Semaphore, Sender } from "./lib.ts";
1+
import {
2+
Barrier,
3+
Condvar,
4+
Mutex,
5+
Receiver,
6+
RwLock,
7+
Semaphore,
8+
Sender,
9+
} from "./lib.ts";
210
import { toSerialized } from "./shared.ts";
311

412
export function checkMoveArgs(args: any[]) {
@@ -9,7 +17,8 @@ export function checkMoveArgs(args: any[]) {
917
arg.buffer instanceof SharedArrayBuffer;
1018
const isThreadSafe = arg instanceof Mutex || arg instanceof Condvar ||
1119
arg instanceof RwLock || arg instanceof Sender ||
12-
arg instanceof Receiver || arg instanceof Semaphore;
20+
arg instanceof Receiver || arg instanceof Semaphore ||
21+
arg instanceof Barrier;
1322
const isLibrarySAB = !isThreadSafe &&
1423
typeof arg[toSerialized] !== "undefined";
1524

src/lib/sync/barrier.ts

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
import {
2+
register,
3+
Serializable,
4+
toDeserialized,
5+
toSerialized,
6+
} from "../shared.ts";
7+
8+
const IDX_LOCK = 0;
9+
const IDX_CAP = 1; // Capacity (N)
10+
const IDX_COUNT = 2; // Current count (Starts at 0, goes to N, or N down to 0)
11+
const IDX_GEN = 3; // Generation ID
12+
13+
const META_SIZE = 4;
14+
15+
const LOCK_UNLOCKED = 0;
16+
const LOCK_LOCKED = 1;
17+
18+
export interface BarrierWaitResult {
19+
isLeader: boolean;
20+
}
21+
22+
export class Barrier extends Serializable {
23+
static {
24+
register(8, this);
25+
}
26+
27+
readonly #state: Int32Array<SharedArrayBuffer>;
28+
29+
constructor(n?: number, _buffer?: SharedArrayBuffer) {
30+
super();
31+
if (_buffer) {
32+
this.#state = new Int32Array(_buffer);
33+
} else {
34+
if (n === undefined) {
35+
throw new Error("Barrier capacity must be provided");
36+
}
37+
this.#state = new Int32Array(
38+
new SharedArrayBuffer(META_SIZE * Int32Array.BYTES_PER_ELEMENT),
39+
);
40+
this.#state[IDX_LOCK] = LOCK_UNLOCKED;
41+
this.#state[IDX_CAP] = n;
42+
this.#state[IDX_COUNT] = n; // We count down from N to 0
43+
this.#state[IDX_GEN] = 0;
44+
}
45+
}
46+
47+
/**
48+
* Internal Spin/Wait Lock for protecting state updates.
49+
* Updates are very fast, so we use a simple atomic lock mechanism.
50+
*/
51+
#lock() {
52+
while (
53+
Atomics.compareExchange(
54+
this.#state,
55+
IDX_LOCK,
56+
LOCK_UNLOCKED,
57+
LOCK_LOCKED,
58+
) !== LOCK_UNLOCKED
59+
) {
60+
Atomics.wait(this.#state, IDX_LOCK, LOCK_LOCKED);
61+
}
62+
}
63+
64+
#unlock() {
65+
if (
66+
Atomics.compareExchange(
67+
this.#state,
68+
IDX_LOCK,
69+
LOCK_LOCKED,
70+
LOCK_UNLOCKED,
71+
) !== LOCK_LOCKED
72+
) {
73+
throw new Error("Barrier lock state corrupted");
74+
}
75+
Atomics.notify(this.#state, IDX_LOCK, 1);
76+
}
77+
78+
/**
79+
* Async version of the internal lock for Main Thread compatibility.
80+
*/
81+
async #lockAsync() {
82+
while (
83+
Atomics.compareExchange(
84+
this.#state,
85+
IDX_LOCK,
86+
LOCK_UNLOCKED,
87+
LOCK_LOCKED,
88+
) !== LOCK_UNLOCKED
89+
) {
90+
const res = Atomics.waitAsync(this.#state, IDX_LOCK, LOCK_LOCKED);
91+
if (res.async) {
92+
await res.value;
93+
}
94+
}
95+
}
96+
97+
/**
98+
* Blocks the current thread until all participating threads have reached the barrier.
99+
*/
100+
blockingWait(): BarrierWaitResult {
101+
this.#lock();
102+
const localGen = Atomics.load(this.#state, IDX_GEN);
103+
const count = Atomics.load(this.#state, IDX_COUNT) - 1;
104+
105+
if (count === 0) {
106+
// We are the leader (the last one to arrive)
107+
Atomics.store(this.#state, IDX_COUNT, Atomics.load(this.#state, IDX_CAP));
108+
Atomics.add(this.#state, IDX_GEN, 1);
109+
this.#unlock();
110+
111+
// Wake everyone up. They are waiting on IDX_GEN changing.
112+
Atomics.notify(this.#state, IDX_GEN, Infinity);
113+
return { isLeader: true };
114+
} else {
115+
// We are a follower
116+
Atomics.store(this.#state, IDX_COUNT, count);
117+
this.#unlock();
118+
119+
// Wait until the generation changes
120+
while (Atomics.load(this.#state, IDX_GEN) === localGen) {
121+
Atomics.wait(this.#state, IDX_GEN, localGen);
122+
}
123+
return { isLeader: false };
124+
}
125+
}
126+
127+
/**
128+
* Asynchronously waits until all participating threads have reached the barrier.
129+
*/
130+
async wait(): Promise<BarrierWaitResult> {
131+
await this.#lockAsync();
132+
const localGen = Atomics.load(this.#state, IDX_GEN);
133+
const count = Atomics.load(this.#state, IDX_COUNT) - 1;
134+
135+
if (count === 0) {
136+
// We are the leader
137+
Atomics.store(this.#state, IDX_COUNT, Atomics.load(this.#state, IDX_CAP));
138+
Atomics.add(this.#state, IDX_GEN, 1);
139+
this.#unlock();
140+
141+
Atomics.notify(this.#state, IDX_GEN, Infinity);
142+
return { isLeader: true };
143+
} else {
144+
// We are a follower
145+
Atomics.store(this.#state, IDX_COUNT, count);
146+
this.#unlock();
147+
148+
// Wait until the generation changes
149+
while (Atomics.load(this.#state, IDX_GEN) === localGen) {
150+
const res = Atomics.waitAsync(this.#state, IDX_GEN, localGen);
151+
if (res.async) {
152+
await res.value;
153+
}
154+
}
155+
return { isLeader: false };
156+
}
157+
}
158+
159+
[toSerialized]() {
160+
return {
161+
value: this.#state.buffer,
162+
transfer: [],
163+
};
164+
}
165+
166+
static override [toDeserialized](
167+
buffer: ReturnType<Barrier[typeof toSerialized]>["value"],
168+
) {
169+
return new Barrier(undefined, buffer);
170+
}
171+
}

src/lib/sync/condvar.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
import type { SharedMemoryView } from "../types.ts";
99

1010
const IDX_NOTIFY_SEQ = 0;
11+
1112
const SEQ_INCREMENT = 1;
1213
const NOTIFY_ONE = 1;
1314
const NOTIFY_ALL = Infinity;

src/lib/sync/mod.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ export * from "./mutex.ts";
33
export * from "./rwlock.ts";
44
export * from "./semaphore.ts";
55
export * from "./mpmc.ts";
6+
export * from "./barrier.ts";

src/lib/sync/mpmc.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ const IDX_CLOSED = 2;
1616
const IDX_CAP = 3;
1717
const IDX_TX_COUNT = 4;
1818
const IDX_RX_COUNT = 5;
19+
1920
const META_SIZE = 6;
2021

2122
const OPEN = 0;

src/lib/sync/mutex.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ export const INTERNAL_MUTEX_CONTROLLER = Symbol(
1313
);
1414

1515
const IDX_LOCK_STATE = 0;
16+
1617
const LOCKED = 1;
1718
const UNLOCKED = 0;
1819

src/lib/sync/rwlock.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ export interface RwLockController {
1717
}
1818

1919
const IDX_LOCK_STATE = 0;
20+
2021
const UNLOCKED = 0;
2122
const WRITE_LOCKED = -1;
2223
const READ_ONE = 1;

src/lib/sync/semaphore.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ export interface SemaphoreController {
1616
const IDX_PERMITS = 0;
1717
const IDX_WAITERS = 1;
1818

19+
const META_SIZE = 2;
20+
1921
export class SemaphoreGuard implements Disposable {
2022
readonly #amount: number;
2123
readonly #controller: SemaphoreController;
@@ -60,7 +62,7 @@ export class Semaphore extends Serializable {
6062
this.#state = new Int32Array(_buffer);
6163
} else {
6264
this.#state = new Int32Array(
63-
new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT * 2),
65+
new SharedArrayBuffer(META_SIZE * Int32Array.BYTES_PER_ELEMENT),
6466
);
6567
this.#state[IDX_PERMITS] = initialCount;
6668
this.#state[IDX_WAITERS] = 0;

0 commit comments

Comments
 (0)