Skip to content

Commit 4e9a603

Browse files
committed
Simplify state management in durable object
1 parent d03ddd7 commit 4e9a603

File tree

1 file changed

+70
-152
lines changed

1 file changed

+70
-152
lines changed
Lines changed: 70 additions & 152 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import {
2+
Edge,
3+
Node,
24
WorkflowErrorMessage,
35
WorkflowInitMessage,
46
WorkflowMessage,
@@ -13,208 +15,135 @@ import { createDatabase } from "../db/index";
1315
import { getWorkflow, updateWorkflow } from "../db/queries";
1416

1517
export class DurableWorkflow extends DurableObject<Bindings> {
16-
private static readonly PERSIST_DELAY_MS = 60_000;
17-
private static readonly STORAGE_ID = "current";
18-
19-
private sql: SqlStorage;
20-
private workflowId: string = "";
21-
private organizationId: string = "";
22-
private loaded: boolean = false;
23-
private dirty: boolean = false;
18+
private state: WorkflowState | null = null;
19+
private workflowId: string | null = null;
20+
private organizationId: string | null = null;
2421

2522
constructor(ctx: DurableObjectState, env: Bindings) {
2623
super(ctx, env);
27-
this.sql = this.ctx.storage.sql;
28-
this.initDatabase();
29-
}
30-
31-
private initDatabase() {
32-
this.sql.exec(`
33-
CREATE TABLE IF NOT EXISTS workflow (
34-
id TEXT PRIMARY KEY DEFAULT 'current',
35-
workflow_id TEXT NOT NULL,
36-
organization_id TEXT NOT NULL,
37-
name TEXT NOT NULL,
38-
handle TEXT NOT NULL,
39-
type TEXT NOT NULL,
40-
nodes TEXT NOT NULL,
41-
edges TEXT NOT NULL,
42-
timestamp INTEGER NOT NULL
43-
)
44-
`);
4524
}
4625

4726
/**
48-
* Load workflow from database into durable storage if not already loaded
27+
* Load workflow from D1 database into memory if not already loaded
4928
*/
5029
private async ensureLoaded(
5130
workflowId: string,
5231
organizationId: string
5332
): Promise<void> {
54-
if (this.loaded) {
33+
if (this.state !== null) {
5534
return;
5635
}
5736

5837
this.workflowId = workflowId;
5938
this.organizationId = organizationId;
6039

61-
try {
62-
// First check if SQLite storage already has data (from previous session)
63-
// This is important because SQLite storage persists across cold starts
64-
const existing = this.sql
65-
.exec(
66-
"SELECT workflow_id FROM workflow WHERE id = ?",
67-
DurableWorkflow.STORAGE_ID
68-
)
69-
.toArray();
70-
71-
if (existing.length > 0) {
72-
console.log(`Using existing SQLite storage for workflow ${workflowId}`);
73-
this.loaded = true;
74-
return;
75-
}
76-
77-
// SQLite storage is empty, load from D1 database
78-
console.log(`Loading workflow ${workflowId} from D1 database`);
79-
const db = createDatabase(this.env.DB);
80-
const workflow = await getWorkflow(db, workflowId, organizationId);
81-
82-
const { name, handle, type, nodes, edges, timestamp } =
83-
this.extractWorkflowData(workflow);
84-
85-
this.sql.exec(
86-
`INSERT INTO workflow (id, workflow_id, organization_id, name, handle, type, nodes, edges, timestamp)
87-
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
88-
DurableWorkflow.STORAGE_ID,
89-
workflowId,
90-
organizationId,
91-
name,
92-
handle,
93-
type,
94-
nodes,
95-
edges,
96-
timestamp
97-
);
40+
console.log(`Loading workflow ${workflowId} from D1 database`);
41+
const db = createDatabase(this.env.DB);
42+
const workflow = await getWorkflow(db, workflowId, organizationId);
9843

99-
this.dirty = false;
100-
} catch (error) {
101-
console.error("Error loading workflow:", error);
102-
}
44+
const { name, handle, type, nodes, edges, timestamp } =
45+
this.extractWorkflowData(workflow);
10346

104-
this.loaded = true;
47+
this.state = {
48+
id: workflowId,
49+
name,
50+
handle,
51+
type,
52+
nodes,
53+
edges,
54+
timestamp,
55+
};
10556
}
10657

10758
private extractWorkflowData(workflow: any) {
10859
return {
10960
name: workflow?.name || "New Workflow",
11061
handle: workflow?.handle || this.workflowId,
11162
type: (workflow?.data?.type || "manual") as WorkflowType,
112-
nodes: JSON.stringify(workflow?.data?.nodes || []),
113-
edges: JSON.stringify(workflow?.data?.edges || []),
63+
nodes: workflow?.data?.nodes || [],
64+
edges: workflow?.data?.edges || [],
11465
timestamp: workflow?.updatedAt?.getTime() || Date.now(),
11566
};
11667
}
11768

11869
/**
119-
* Get state from durable storage
70+
* Get state from memory
12071
*/
12172
async getState(): Promise<WorkflowState> {
122-
const row = this.sql
123-
.exec(
124-
`SELECT workflow_id as id, name, handle, type, nodes, edges, timestamp
125-
FROM workflow WHERE id = ?`,
126-
DurableWorkflow.STORAGE_ID
127-
)
128-
.toArray()[0];
129-
130-
if (!row) {
73+
if (!this.state) {
13174
throw new Error("State missing; call ensureLoaded first");
13275
}
13376

134-
return {
135-
id: row.id as string,
136-
name: row.name as string,
137-
handle: row.handle as string,
138-
type: row.type as WorkflowType,
139-
nodes: JSON.parse(row.nodes as string),
140-
edges: JSON.parse(row.edges as string),
141-
timestamp: row.timestamp as number,
142-
};
77+
return this.state;
14378
}
14479

145-
async updateState(nodes: unknown[], edges: unknown[]): Promise<void> {
80+
async updateState(nodes: Node[], edges: Edge[]): Promise<void> {
81+
if (!this.state) {
82+
throw new Error("State missing; call ensureLoaded first");
83+
}
84+
14685
const timestamp = Date.now();
147-
this.sql.exec(
148-
`UPDATE workflow SET nodes = ?, edges = ?, timestamp = ? WHERE id = ?`,
149-
JSON.stringify(nodes),
150-
JSON.stringify(edges),
86+
this.state = {
87+
...this.state,
88+
nodes,
89+
edges,
15190
timestamp,
152-
DurableWorkflow.STORAGE_ID
153-
);
154-
155-
this.dirty = true;
91+
};
15692

157-
// Schedule an alarm to persist to database if not already scheduled
158-
const currentAlarm = await this.ctx.storage.getAlarm();
159-
if (currentAlarm === null) {
160-
await this.ctx.storage.setAlarm(
161-
Date.now() + DurableWorkflow.PERSIST_DELAY_MS
162-
);
163-
}
93+
// Persist immediately to D1
94+
await this.persistToDatabase();
16495
}
16596

16697
/**
167-
* Persist durable state back to database
98+
* Persist state back to D1 database
16899
*/
169100
private async persistToDatabase(): Promise<void> {
170-
if (!this.dirty || !this.workflowId || !this.organizationId) {
101+
if (!this.state || !this.workflowId || !this.organizationId) {
171102
return;
172103
}
173104

174105
try {
175-
const state = await this.getState();
176106
const db = createDatabase(this.env.DB);
177107
await updateWorkflow(db, this.workflowId, this.organizationId, {
178-
name: state.name,
108+
name: this.state.name,
179109
data: {
180-
id: state.id,
181-
name: state.name,
182-
handle: state.handle,
183-
type: state.type,
184-
nodes: state.nodes,
185-
edges: state.edges,
110+
id: this.state.id,
111+
name: this.state.name,
112+
handle: this.state.handle,
113+
type: this.state.type,
114+
nodes: this.state.nodes,
115+
edges: this.state.edges,
186116
},
187117
});
188118

189-
this.dirty = false;
190-
console.log(`Persisted workflow ${this.workflowId} to database`);
119+
console.log(`Persisted workflow ${this.workflowId} to D1 database`);
191120
} catch (error) {
192121
console.error("Error persisting workflow to database:", error);
193122
}
194123
}
195124

196-
/**
197-
* Alarm handler - called when alarm fires
198-
*/
199-
async alarm(): Promise<void> {
200-
console.log("Alarm fired for DurableWorkflow");
201-
await this.persistToDatabase();
202-
203-
// If still dirty (updates happened during persist), schedule another alarm
204-
if (this.dirty) {
205-
await this.ctx.storage.setAlarm(
206-
Date.now() + DurableWorkflow.PERSIST_DELAY_MS
207-
);
208-
}
209-
}
210-
211125
async fetch(request: Request): Promise<Response> {
212126
const url = new URL(request.url);
213127
const workflowId = url.searchParams.get("workflowId") || "";
214128
const organizationId = url.searchParams.get("organizationId") || "";
215129

216-
if (workflowId && organizationId) {
130+
if (!workflowId || !organizationId) {
131+
return new Response("Missing workflowId or organizationId", {
132+
status: 400,
133+
});
134+
}
135+
136+
try {
217137
await this.ensureLoaded(workflowId, organizationId);
138+
} catch (error) {
139+
console.error("Error loading workflow:", error);
140+
return Response.json(
141+
{
142+
error: "Failed to load workflow",
143+
details: error instanceof Error ? error.message : "Unknown error",
144+
},
145+
{ status: 404 }
146+
);
218147
}
219148

220149
if (url.pathname === "/state" && request.method === "GET") {
@@ -267,19 +196,7 @@ export class DurableWorkflow extends DurableObject<Bindings> {
267196
}
268197

269198
private async getInitialState(): Promise<WorkflowState> {
270-
try {
271-
return await this.getState();
272-
} catch {
273-
return {
274-
id: this.workflowId,
275-
name: "New Workflow",
276-
handle: this.workflowId,
277-
type: "manual",
278-
nodes: [],
279-
edges: [],
280-
timestamp: Date.now(),
281-
};
282-
}
199+
return await this.getState();
283200
}
284201

285202
async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer) {
@@ -311,11 +228,12 @@ export class DurableWorkflow extends DurableObject<Bindings> {
311228
}
312229

313230
async webSocketClose(
314-
ws: WebSocket,
315-
code: number,
316-
reason: string,
231+
_ws: WebSocket,
232+
_code: number,
233+
_reason: string,
317234
_wasClean: boolean
318235
) {
319-
ws.close(code, reason);
236+
// Persist any pending changes to D1 before closing
237+
await this.persistToDatabase();
320238
}
321239
}

0 commit comments

Comments
 (0)