Skip to content

Commit 4d02534

Browse files
kitsonkvwkd
andcommitted
fix: batched_atomic: properly handle multiple mutations
Fixes: #17 Co-Authored-By: vwkd <[email protected]>
1 parent 35e6810 commit 4d02534

File tree

3 files changed

+124
-71
lines changed

3 files changed

+124
-71
lines changed

batched_atomic.test.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,3 +155,22 @@ Deno.test({
155155
return teardown();
156156
},
157157
});
158+
159+
Deno.test({
160+
name: "batched atomic mutate handles many items",
161+
async fn() {
162+
const kv = await setup();
163+
const items = Array
164+
.from({ length: 1000 }, (_, index) => index)
165+
.map((i) => ({
166+
key: [i],
167+
value: "x".repeat(2000),
168+
type: "set" as const,
169+
}));
170+
const op = batchedAtomic(kv);
171+
op.mutate(...items);
172+
const actual = await op.commit();
173+
assert(actual.every(({ ok }) => ok));
174+
return teardown();
175+
},
176+
});

batched_atomic.ts

Lines changed: 102 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -256,35 +256,41 @@ export class BatchedAtomicOperation {
256256
if (!this.#queue.length) {
257257
return Promise.resolve([]);
258258
}
259+
259260
const results: Promise<Deno.KvCommitResult | Deno.KvCommitError>[] = [];
260261
let checks = 0;
261262
let mutations = 0;
262263
let payloadBytes = 0;
263264
let keyBytes = 0;
264265
let operation = this.#kv.atomic();
265266
let hasCheck = false;
267+
266268
while (this.#queue.length) {
267269
const [method, args] = this.#queue.shift()!;
268-
if (method === "setBlob") {
269-
const queue = this.#queue;
270-
this.#queue = [];
271-
const [key, value, options] = args as [
272-
Deno.KvKey,
273-
ArrayBufferLike | ReadableStream<Uint8Array> | Blob,
274-
{ expireIn?: number } | undefined,
275-
];
276-
const items = await keys(this.#kv, { prefix: [...key, BLOB_KEY] });
277-
await setBlob(this, key, value, items.length, options);
278-
this.#queue.push(...queue);
279-
} else if (method === "deleteBlob") {
280-
const [key] = args as [Deno.KvKey];
281-
const items = await keys(this.#kv, { prefix: [...key, BLOB_KEY] });
282-
for (const item of items) {
283-
this.#queue.unshift(["delete", [item]]);
270+
switch (method) {
271+
case "setBlob": {
272+
const queue = this.#queue;
273+
this.#queue = [];
274+
const [key, value, options] = args as [
275+
Deno.KvKey,
276+
ArrayBufferLike | ReadableStream<Uint8Array> | Blob,
277+
{ expireIn?: number } | undefined,
278+
];
279+
const items = await keys(this.#kv, { prefix: [...key, BLOB_KEY] });
280+
await setBlob(this, key, value, items.length, options);
281+
this.#queue.push(...queue);
282+
continue;
283+
}
284+
case "deleteBlob": {
285+
const [key] = args as [Deno.KvKey];
286+
const items = await keys(this.#kv, { prefix: [...key, BLOB_KEY] });
287+
for (const item of items) {
288+
this.#queue.unshift(["delete", [item]]);
289+
}
290+
this.#queue.unshift(["delete", [[...key, BLOB_META_KEY]]]);
291+
continue;
284292
}
285-
this.#queue.unshift(["delete", [[...key, BLOB_META_KEY]]]);
286-
} else {
287-
if (method === "check") {
293+
case "check": {
288294
checks++;
289295
for (const { key } of args as Deno.AtomicCheck[]) {
290296
const len = key.reduce(
@@ -295,65 +301,90 @@ export class BatchedAtomicOperation {
295301
keyBytes += len;
296302
}
297303
hasCheck = true;
298-
} else {
299-
mutations++;
300-
if (method === "mutate") {
301-
for (const mutation of args as Deno.KvMutation[]) {
302-
const keyLen = estimateSize(mutation.key);
303-
payloadBytes += keyLen;
304-
keyBytes += keyLen;
305-
if (mutation.type === "set") {
306-
payloadBytes += estimateSize(mutation.value);
307-
} else if (mutation.type !== "delete") {
308-
payloadBytes += 8;
309-
}
310-
}
311-
} else if (method === "max" || method === "min" || method === "sum") {
312-
const [key] = args as [Deno.KvKey];
313-
const keyLen = estimateSize(key);
314-
keyBytes += keyLen;
315-
payloadBytes += keyLen + 8;
316-
} else if (method === "set") {
317-
const [key, value] = args as [Deno.KvKey, unknown];
318-
const keyLen = estimateSize(key);
319-
keyBytes += keyLen;
320-
payloadBytes += keyLen + estimateSize(value);
321-
} else if (method === "delete") {
322-
const [key] = args as [Deno.KvKey];
323-
const keyLen = estimateSize(key);
324-
keyBytes += keyLen;
325-
payloadBytes += keyLen;
326-
} else if (method === "enqueue") {
327-
const [value] = args as [unknown];
328-
payloadBytes += estimateSize(value);
329-
}
304+
break;
330305
}
331-
if (
332-
checks > this.#maxChecks || mutations > this.#maxMutations ||
333-
payloadBytes > this.#maxBytes || keyBytes > this.#maxKeyBytes
334-
) {
335-
const rp = operation.commit();
336-
results.push(rp);
337-
if (hasCheck) {
338-
const result = await rp;
339-
if (!result.ok) {
340-
break;
306+
case "mutate": {
307+
// if there are multiple mutations, we need to batch them
308+
if (args.length > 1) {
309+
this.#queue.unshift(
310+
...args.map((mutation) =>
311+
[method, [mutation]] as [AtomicOperationKeys, unknown[]]
312+
),
313+
);
314+
continue;
315+
} else {
316+
mutations++;
317+
const [mutation] = args;
318+
const keyLen = estimateSize(mutation.key);
319+
payloadBytes += keyLen;
320+
keyBytes += keyLen;
321+
if (mutation.type === "set") {
322+
payloadBytes += estimateSize(mutation.value);
323+
} else if (mutation.type !== "delete") {
324+
payloadBytes += 8;
341325
}
342326
}
343-
checks = 0;
344-
mutations = 0;
345-
payloadBytes = 0;
346-
keyBytes = 0;
347-
operation = this.#kv.atomic();
327+
break;
328+
}
329+
case "max":
330+
case "min":
331+
case "sum": {
332+
mutations++;
333+
const [key] = args as [Deno.KvKey];
334+
const keyLen = estimateSize(key);
335+
keyBytes += keyLen;
336+
payloadBytes += keyLen + 8;
337+
break;
338+
}
339+
case "set": {
340+
mutations++;
341+
const [key, value] = args as [Deno.KvKey, unknown];
342+
const keyLen = estimateSize(key);
343+
keyBytes += keyLen;
344+
payloadBytes += keyLen + estimateSize(value);
345+
break;
348346
}
349-
// deno-lint-ignore no-explicit-any
350-
(operation[method] as any).apply(operation, args);
351-
if (!this.#queue.length) {
352-
const rp = operation.commit();
353-
results.push(rp);
347+
case "delete": {
348+
mutations++;
349+
const [key] = args as [Deno.KvKey];
350+
const keyLen = estimateSize(key);
351+
keyBytes += keyLen;
352+
payloadBytes += keyLen;
353+
break;
354+
}
355+
case "enqueue": {
356+
mutations++;
357+
const [value] = args as [unknown];
358+
payloadBytes += estimateSize(value);
359+
break;
360+
}
361+
}
362+
// conditionally commit a batch of operations if we have reached
363+
// the maximum number of checks, mutations, or payload size
364+
// or key size
365+
if (
366+
checks > this.#maxChecks || mutations > this.#maxMutations ||
367+
payloadBytes > this.#maxBytes || keyBytes > this.#maxKeyBytes
368+
) {
369+
const rp = operation.commit();
370+
results.push(rp);
371+
if (hasCheck) {
372+
const result = await rp;
373+
if (!result.ok) {
374+
continue;
375+
}
354376
}
377+
checks = 0;
378+
mutations = 0;
379+
payloadBytes = 0;
380+
keyBytes = 0;
381+
operation = this.#kv.atomic();
355382
}
383+
// deno-lint-ignore no-explicit-any
384+
(operation[method] as any).apply(operation, args);
356385
}
386+
const rp = operation.commit();
387+
results.push(rp);
357388
return Promise.all(results);
358389
}
359390
}

deno.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
"test:ci": "deno test --allow-read --allow-write --unstable-kv --junit-path=junit.xml --coverage=./cov --parallel"
2828
},
2929
"lock": false,
30+
"format": {
31+
"lineWidth": 120
32+
},
3033
"imports": {
3134
"@deno/kv-utils": "jsr:@deno/kv-utils@^0.1.3",
3235
"@std/assert": "jsr:@std/assert@~1",

0 commit comments

Comments
 (0)