Skip to content

Commit 31c9c89

Browse files
refactor(prisma-adapter): separate event/snapshot/view tables (#81)
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent dbf8856 commit 31c9c89

3 files changed

Lines changed: 312 additions & 78 deletions

File tree

.changeset/dark-experts-allow.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"ventyd": patch
3+
---
4+
5+
fix: add snapshot feature in prisma adapter

src/adapter/prisma.ts

Lines changed: 88 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import type {
55
InferStateFromSchema,
66
} from "../types";
77

8-
type BaseEventTypeRow = Omit<
8+
type BaseEventRow = Omit<
99
BaseEventType,
1010
"eventCreatedAt" | "body" | "version"
1111
> & {
@@ -14,15 +14,30 @@ type BaseEventTypeRow = Omit<
1414
version?: number;
1515
};
1616

17+
type BaseSnapshotRow = {
18+
entityId: string;
19+
entityName: string;
20+
state: unknown;
21+
version: number;
22+
};
23+
1724
export type VentydAdapterOptions<
1825
$$Schema,
19-
$$PrismaEventRow extends BaseEventTypeRow,
26+
$$PrismaEventRow extends BaseEventRow,
2027
$$PrismaEventRowInput,
28+
$$PrismaSnapshotRow extends BaseSnapshotRow,
2129
$$PrismaSnapshotRowInput,
30+
$$PrismaViewRowInput,
2231
> = {
2332
prisma: {
2433
$transaction: (commands: unknown[]) => Promise<unknown[]>;
2534
};
35+
/**
36+
* Save a snapshot every N events. When the latest event version is a
37+
* multiple of this value, a snapshot is written inside the same transaction
38+
* as the events and view update.
39+
*/
40+
snapshotEvery?: number;
2641
tables: {
2742
event: {
2843
findMany(args?: {
@@ -35,30 +50,46 @@ export type VentydAdapterOptions<
3550
createMany(args?: { data: $$PrismaEventRowInput[] }): Promise<unknown>;
3651
};
3752
snapshot: {
53+
findFirst(args: {
54+
where: { entityId: string; entityName: string };
55+
}): Promise<$$PrismaSnapshotRow | null>;
3856
upsert(args: {
39-
where: { id?: string };
57+
where: { entityId: string };
4058
update: $$PrismaSnapshotRowInput;
4159
create: $$PrismaSnapshotRowInput;
4260
}): Promise<unknown>;
4361
};
62+
view: {
63+
upsert(args: {
64+
where: { entityId: string };
65+
update: $$PrismaViewRowInput;
66+
create: $$PrismaViewRowInput;
67+
}): Promise<unknown>;
68+
};
4469
};
45-
entityToRow(args: {
70+
entityToViewRow(args: {
4671
entityId: string;
72+
entityName: string;
4773
state: InferStateFromSchema<$$Schema>;
48-
}): $$PrismaSnapshotRowInput;
74+
version: number;
75+
}): $$PrismaViewRowInput;
4976
};
5077

5178
export function prismaAdapter<
5279
$$Schema,
53-
$$PrismaEventRow extends BaseEventTypeRow,
80+
$$PrismaEventRow extends BaseEventRow,
5481
$$PrismaEventRowInput,
82+
$$PrismaSnapshotRow extends BaseSnapshotRow,
5583
$$PrismaSnapshotRowInput,
84+
$$PrismaViewRowInput,
5685
>(
5786
options: VentydAdapterOptions<
5887
$$Schema,
5988
$$PrismaEventRow,
6089
$$PrismaEventRowInput,
61-
$$PrismaSnapshotRowInput
90+
$$PrismaSnapshotRow,
91+
$$PrismaSnapshotRowInput,
92+
$$PrismaViewRowInput
6293
>,
6394
): Adapter<$$Schema> {
6495
function eventToRow(_event: InferEventFromSchema<$$Schema>) {
@@ -105,22 +136,64 @@ export function prismaAdapter<
105136

106137
return rows.map(rowToEvent);
107138
},
108-
async commitEvents({ events, entityId, state }) {
109-
const snapshotRow = options.entityToRow({
139+
140+
async getSnapshot({ entityName, entityId }) {
141+
const row = await options.tables.snapshot.findFirst({
142+
where: { entityId, entityName },
143+
});
144+
145+
if (!row) return null;
146+
147+
return {
148+
state: row.state as InferStateFromSchema<$$Schema>,
149+
version: row.version,
150+
};
151+
},
152+
153+
async commitEvents({ events, entityId, entityName, state }) {
154+
const lastEvent = events[events.length - 1] as BaseEventType | undefined;
155+
const lastVersion = lastEvent?.version ?? 0;
156+
157+
const viewRow = options.entityToViewRow({
110158
entityId,
159+
entityName,
111160
state,
161+
version: lastVersion,
112162
});
113163

114-
await options.prisma.$transaction([
164+
const transactionCommands: unknown[] = [
115165
options.tables.event.createMany({
116166
data: events.map(eventToRow),
117167
}),
118-
options.tables.snapshot.upsert({
119-
where: { id: entityId },
120-
update: snapshotRow,
121-
create: snapshotRow,
168+
options.tables.view.upsert({
169+
where: { entityId },
170+
update: viewRow,
171+
create: viewRow,
122172
}),
123-
]);
173+
];
174+
175+
const shouldSnapshot =
176+
options.snapshotEvery != null &&
177+
lastVersion > 0 &&
178+
lastVersion % options.snapshotEvery === 0;
179+
180+
if (shouldSnapshot) {
181+
const snapshotRow = {
182+
entityId,
183+
entityName,
184+
state,
185+
version: lastVersion,
186+
} as $$PrismaSnapshotRowInput;
187+
transactionCommands.push(
188+
options.tables.snapshot.upsert({
189+
where: { entityId },
190+
update: snapshotRow,
191+
create: snapshotRow,
192+
}),
193+
);
194+
}
195+
196+
await options.prisma.$transaction(transactionCommands);
124197
},
125198
};
126199
}

0 commit comments

Comments
 (0)