Skip to content

Commit d2cba5a

Browse files
committed
Refactor sync primitives
1 parent 1d346bb commit d2cba5a

19 files changed

+422
-408
lines changed

src/lib/json_buffer.ts

Lines changed: 215 additions & 215 deletions
Large diffs are not rendered by default.

src/lib/shared.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,10 @@ export function serialize(arg: any): {
5353
}
5454

5555
// Library Object (Instance of Serializable)
56-
if (typeof arg === "object" && typeof arg[toSerialized] === "function") {
56+
if (
57+
typeof arg === "object" && arg !== null &&
58+
typeof arg[toSerialized] === "function"
59+
) {
5760
const { value, transfer, typeId } = arg[toSerialized]();
5861
const Ctor = arg.constructor as SerializableConstructor;
5962

src/lib/sync/condvar.ts

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ import {
77
} from "../shared.ts";
88
import type { SharedMemoryView } from "../types.ts";
99

10+
const IDX_NOTIFY_SEQ = 0;
11+
const SEQ_INCREMENT = 1;
12+
const NOTIFY_ONE = 1;
13+
const NOTIFY_ALL = Infinity;
14+
1015
export class Condvar extends Serializable {
1116
static {
1217
register(1, this);
@@ -16,37 +21,35 @@ export class Condvar extends Serializable {
1621

1722
constructor(_buffer?: SharedArrayBuffer) {
1823
super();
19-
if (_buffer) {
20-
this.#atomic = new Int32Array(_buffer);
21-
} else {
22-
this.#atomic = new Int32Array(new SharedArrayBuffer(4));
23-
}
24+
this.#atomic = new Int32Array(
25+
_buffer ?? new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT),
26+
);
2427
}
2528

2629
blockingWait<T extends SharedMemoryView | void>(guard: MutexGuard<T>): void {
2730
const controller = guard[INTERNAL_MUTEX_CONTROLLER];
28-
const seq = Atomics.load(this.#atomic, 0);
31+
const seq = Atomics.load(this.#atomic, IDX_NOTIFY_SEQ);
2932

3033
controller.unlock();
3134

32-
Atomics.wait(this.#atomic, 0, seq);
35+
Atomics.wait(this.#atomic, IDX_NOTIFY_SEQ, seq);
3336

3437
controller.blockingLock();
3538
}
3639

3740
/**
3841
* Asynchronously waits for a notification. Safe to use on the Main Thread.
39-
* * @param guard The MutexGuard protecting the shared state.
42+
* @param guard The MutexGuard protecting the shared state.
4043
*/
4144
async wait<T extends SharedMemoryView | void>(
4245
guard: MutexGuard<T>,
4346
): Promise<void> {
4447
const controller = guard[INTERNAL_MUTEX_CONTROLLER];
45-
const seq = Atomics.load(this.#atomic, 0);
48+
const seq = Atomics.load(this.#atomic, IDX_NOTIFY_SEQ);
4649

4750
controller.unlock();
4851

49-
const result = Atomics.waitAsync(this.#atomic, 0, seq);
52+
const result = Atomics.waitAsync(this.#atomic, IDX_NOTIFY_SEQ, seq);
5053
if (result.async) {
5154
await result.value;
5255
}
@@ -58,16 +61,16 @@ export class Condvar extends Serializable {
5861
* Wakes up one blocked thread waiting on this Condvar.
5962
*/
6063
notifyOne() {
61-
Atomics.add(this.#atomic, 0, 1);
62-
Atomics.notify(this.#atomic, 0, 1);
64+
Atomics.add(this.#atomic, IDX_NOTIFY_SEQ, SEQ_INCREMENT);
65+
Atomics.notify(this.#atomic, IDX_NOTIFY_SEQ, NOTIFY_ONE);
6366
}
6467

6568
/**
6669
* Wakes up all blocked threads waiting on this Condvar.
6770
*/
6871
notifyAll() {
69-
Atomics.add(this.#atomic, 0, 1);
70-
Atomics.notify(this.#atomic, 0, Infinity);
72+
Atomics.add(this.#atomic, IDX_NOTIFY_SEQ, SEQ_INCREMENT);
73+
Atomics.notify(this.#atomic, IDX_NOTIFY_SEQ, NOTIFY_ALL);
7174
}
7275

7376
[toSerialized]() {

src/lib/sync/mpmc.ts

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,17 @@ import type { Result } from "../types.ts";
1010
import { INTERNAL_SEMAPHORE_CONTROLLER, Semaphore } from "./semaphore.ts";
1111
import { SharedJsonBuffer } from "../json_buffer.ts";
1212

13-
const HEAD_IDX = 0;
14-
const TAIL_IDX = 1;
15-
const CLOSED_IDX = 2;
16-
const CAP_IDX = 3;
17-
const TX_COUNT_IDX = 4;
18-
const RX_COUNT_IDX = 5;
13+
const IDX_HEAD = 0;
14+
const IDX_TAIL = 1;
15+
const IDX_CLOSED = 2;
16+
const IDX_CAP = 3;
17+
const IDX_TX_COUNT = 4;
18+
const IDX_RX_COUNT = 5;
1919
const META_SIZE = 6;
2020

21+
const OPEN = 0;
22+
const CLOSED = 1;
23+
2124
const ERR_DISPOSED_SENDER = new Error("Sender is disposed");
2225
const ERR_DISPOSED_RECEIVER = new Error("Receiver disposed");
2326
const ERR_CLOSED = new Error("Channel closed");
@@ -41,29 +44,29 @@ class ChannelInternals<T> extends Serializable {
4144
}
4245

4346
write(value: T): void {
44-
const tail = this.state[TAIL_IDX]!;
47+
const tail = this.state[IDX_TAIL]!;
4548
this.items[tail] = value;
46-
this.state[TAIL_IDX] = (tail + 1) % this.state[CAP_IDX]!;
49+
this.state[IDX_TAIL] = (tail + 1) % this.state[IDX_CAP]!;
4750
}
4851

4952
read(): T | null {
50-
const head = this.state[HEAD_IDX]!;
53+
const head = this.state[IDX_HEAD]!;
5154
const val = this.items[head] as T;
5255

5356
// Optimistic read check
5457
if (val === null) return null;
5558

5659
this.items[head] = null;
57-
this.state[HEAD_IDX] = (head + 1) % this.state[CAP_IDX]!;
60+
this.state[IDX_HEAD] = (head + 1) % this.state[IDX_CAP]!;
5861
return val;
5962
}
6063

6164
isClosed(): boolean {
62-
return Atomics.load(this.state, CLOSED_IDX) === 1;
65+
return Atomics.load(this.state, IDX_CLOSED) === CLOSED;
6366
}
6467

6568
hasReceivers(): boolean {
66-
return Atomics.load(this.state, RX_COUNT_IDX) > 0;
69+
return Atomics.load(this.state, IDX_RX_COUNT) > 0;
6770
}
6871

6972
[toSerialized]() {
@@ -140,7 +143,7 @@ export class Sender<T> extends ChannelHandle<T> {
140143

141144
clone(): Sender<T> {
142145
if (this.disposed) throw new Error("Cannot clone disposed Sender");
143-
Atomics.add(this.internals.state, TX_COUNT_IDX, 1);
146+
Atomics.add(this.internals.state, IDX_TX_COUNT, 1);
144147
return new Sender(this.internals);
145148
}
146149

@@ -224,7 +227,7 @@ export class Sender<T> extends ChannelHandle<T> {
224227

225228
try {
226229
if (this.internals.isClosed()) return;
227-
Atomics.store(state, CLOSED_IDX, 1);
230+
Atomics.store(state, IDX_CLOSED, CLOSED);
228231
// Wake up everyone
229232
slotsAvailable[INTERNAL_SEMAPHORE_CONTROLLER].release(1_073_741_823);
230233
itemsAvailable[INTERNAL_SEMAPHORE_CONTROLLER].release(1_073_741_823);
@@ -236,7 +239,7 @@ export class Sender<T> extends ChannelHandle<T> {
236239

237240
[Symbol.dispose]() {
238241
if (this.disposed) return;
239-
const prevCount = Atomics.sub(this.internals.state, TX_COUNT_IDX, 1);
242+
const prevCount = Atomics.sub(this.internals.state, IDX_TX_COUNT, 1);
240243
if (prevCount === 1) this.close();
241244
this.disposed = true;
242245
}
@@ -259,7 +262,7 @@ export class Receiver<T> extends ChannelHandle<T> {
259262

260263
clone(): Receiver<T> {
261264
if (this.disposed) throw new Error("Cannot clone disposed Receiver");
262-
Atomics.add(this.internals.state, RX_COUNT_IDX, 1);
265+
Atomics.add(this.internals.state, IDX_RX_COUNT, 1);
263266
return new Receiver(this.internals);
264267
}
265268

@@ -348,7 +351,7 @@ export class Receiver<T> extends ChannelHandle<T> {
348351
[Symbol.dispose]() {
349352
if (this.disposed) return;
350353
this.disposed = true;
351-
const prevCount = Atomics.sub(this.internals.state, RX_COUNT_IDX, 1);
354+
const prevCount = Atomics.sub(this.internals.state, IDX_RX_COUNT, 1);
352355
if (prevCount === 1) this.close();
353356
}
354357

@@ -360,15 +363,16 @@ export class Receiver<T> extends ChannelHandle<T> {
360363
}
361364

362365
export function channel<T>(capacity: number = 32): [Sender<T>, Receiver<T>] {
363-
const stateSab = new SharedArrayBuffer(META_SIZE * 4);
364-
const state = new Int32Array(stateSab);
365-
366-
state[CAP_IDX] = capacity;
367-
state[HEAD_IDX] = 0;
368-
state[TAIL_IDX] = 0;
369-
state[CLOSED_IDX] = 0;
370-
state[TX_COUNT_IDX] = 1;
371-
state[RX_COUNT_IDX] = 1;
366+
const state = new Int32Array(
367+
new SharedArrayBuffer(META_SIZE * Int32Array.BYTES_PER_ELEMENT),
368+
);
369+
370+
state[IDX_CAP] = capacity;
371+
state[IDX_HEAD] = 0;
372+
state[IDX_TAIL] = 0;
373+
state[IDX_CLOSED] = OPEN;
374+
state[IDX_TX_COUNT] = 1;
375+
state[IDX_RX_COUNT] = 1;
372376

373377
const initialData = new Array<T | null>(capacity).fill(null);
374378
const items = new SharedJsonBuffer(initialData);

src/lib/sync/mutex.ts

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ export const INTERNAL_MUTEX_CONTROLLER = Symbol(
1212
"Thread.InternalMutexController",
1313
);
1414

15-
const INDEX = 0;
15+
const IDX_LOCK_STATE = 0;
1616
const LOCKED = 1;
1717
const UNLOCKED = 0;
1818

@@ -25,7 +25,7 @@ export interface MutexController {
2525
export class MutexGuard<T extends SharedMemoryView | void>
2626
implements Disposable {
2727
#data: T;
28-
#mutex: MutexController;
28+
readonly #mutex: MutexController;
2929
#released = false;
3030

3131
constructor(data: T, mutex: MutexController) {
@@ -65,14 +65,14 @@ export class Mutex<T extends SharedMemoryView | void = void>
6565

6666
#data: T;
6767
#lockState: Int32Array<SharedArrayBuffer>;
68-
#controller: MutexController;
68+
readonly #controller: MutexController;
6969

70-
constructor(data?: T, _existingLockBuffer?: SharedArrayBuffer) {
70+
constructor(data?: T, _lockBuffer?: SharedArrayBuffer) {
7171
super();
7272
this.#data = data as T;
73-
this.#lockState = _existingLockBuffer
74-
? new Int32Array(_existingLockBuffer)
75-
: new Int32Array(new SharedArrayBuffer(4));
73+
this.#lockState = _lockBuffer
74+
? new Int32Array(_lockBuffer)
75+
: new Int32Array(new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT));
7676
this.#controller = {
7777
unlock: () => this.#unlock(),
7878
blockingLock: () => this.#performBlockingLock(),
@@ -82,19 +82,29 @@ export class Mutex<T extends SharedMemoryView | void = void>
8282

8383
#tryLock(): boolean {
8484
return (
85-
Atomics.compareExchange(this.#lockState, INDEX, UNLOCKED, LOCKED) ===
85+
Atomics.compareExchange(
86+
this.#lockState,
87+
IDX_LOCK_STATE,
88+
UNLOCKED,
89+
LOCKED,
90+
) ===
8691
UNLOCKED
8792
);
8893
}
8994

9095
#unlock(): void {
9196
if (
92-
Atomics.compareExchange(this.#lockState, INDEX, LOCKED, UNLOCKED) !==
97+
Atomics.compareExchange(
98+
this.#lockState,
99+
IDX_LOCK_STATE,
100+
LOCKED,
101+
UNLOCKED,
102+
) !==
93103
LOCKED
94104
) {
95105
throw new Error("Mutex was not locked or locked by another thread");
96106
}
97-
Atomics.notify(this.#lockState, INDEX, 1);
107+
Atomics.notify(this.#lockState, IDX_LOCK_STATE, 1);
98108
}
99109

100110
/**
@@ -104,7 +114,7 @@ export class Mutex<T extends SharedMemoryView | void = void>
104114
#performBlockingLock(): void {
105115
while (true) {
106116
if (this.#tryLock()) return;
107-
Atomics.wait(this.#lockState, INDEX, LOCKED);
117+
Atomics.wait(this.#lockState, IDX_LOCK_STATE, LOCKED);
108118
}
109119
}
110120

@@ -115,7 +125,7 @@ export class Mutex<T extends SharedMemoryView | void = void>
115125
async #performAsyncLock(): Promise<void> {
116126
while (true) {
117127
if (this.#tryLock()) return;
118-
const result = Atomics.waitAsync(this.#lockState, INDEX, LOCKED);
128+
const result = Atomics.waitAsync(this.#lockState, IDX_LOCK_STATE, LOCKED);
119129
if (result.async) {
120130
await result.value;
121131
}

0 commit comments

Comments
 (0)