Skip to content

Commit afe83c2

Browse files
committed
Add new methods to Schedule API
1 parent d286628 commit afe83c2

File tree

2 files changed

+223
-11
lines changed

2 files changed

+223
-11
lines changed

docs/concepts/schedule.mdx

+122-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ Parameters:
2121
- `fn` (string): The name of the action to be executed.
2222
- `...args` (unknown[]): Additional arguments to pass to the function.
2323

24+
Returns:
25+
- `Promise<string>`: A unique identifier for the scheduled event.
26+
2427
### `c.schedule.at(timestamp, fn, ...args)`
2528

2629
Schedules a function to be executed at a specific timestamp. This function persists across actor restarts, upgrades, or crashes.
@@ -31,6 +34,41 @@ Parameters:
3134
- `fn` (string): The name of the action to be executed.
3235
- `...args` (unknown[]): Additional arguments to pass to the function.
3336

37+
Returns:
38+
- `Promise<string>`: A unique identifier for the scheduled event.
39+
40+
### `c.schedule.list()`
41+
42+
Lists all scheduled events for the actor.
43+
44+
Returns:
45+
- `Promise<Alarm[]>`: An array of scheduled alarms, where each alarm has the following properties:
46+
- `id` (string): The unique identifier of the alarm
47+
- `createdAt` (number): The timestamp when the alarm was created
48+
- `triggersAt` (number): The timestamp when the alarm will trigger
49+
- `fn` (string): The name of the action to be executed
50+
- `args` (unknown[]): The arguments to pass to the function
51+
52+
### `c.schedule.get(alarmId)`
53+
54+
Gets details about a specific scheduled event.
55+
56+
Parameters:
57+
- `alarmId` (string): The unique identifier of the alarm to retrieve.
58+
59+
Returns:
60+
- `Promise<Alarm | undefined>`: The alarm details if found, undefined otherwise.
61+
62+
### `c.schedule.cancel(alarmId)`
63+
64+
Cancels a scheduled event.
65+
66+
Parameters:
67+
- `alarmId` (string): The unique identifier of the alarm to cancel.
68+
69+
Returns:
70+
- `Promise<void>`
71+
3472
## Scheduling Private Actions
3573

3674
Currently, scheduling can only trigger public actions. If the scheduled action is private, it needs to be secured with something like a token.
@@ -46,7 +84,7 @@ const reminderService = actor({
4684
},
4785

4886
actions: {
49-
setReminder: (c, userId, message, delayMs) => {
87+
setReminder: async (c, userId, message, delayMs) => {
5088
const reminderId = crypto.randomUUID();
5189

5290
// Store the reminder in state
@@ -57,7 +95,89 @@ const reminderService = actor({
5795
};
5896

5997
// Schedule the sendReminder action to run after the delay
60-
c.after(delayMs, "sendReminder", reminderId);
98+
// Store the alarmId for potential cancellation
99+
const alarmId = await c.schedule.after(delayMs, "sendReminder", reminderId);
100+
101+
return { reminderId, alarmId };
102+
},
103+
104+
cancelReminder: async (c, reminderId) => {
105+
const reminder = c.state.reminders[reminderId];
106+
if (!reminder) return { success: false };
107+
108+
// Cancel the scheduled reminder
109+
await c.schedule.cancel(reminder.alarmId);
110+
111+
// Clean up the reminder
112+
delete c.state.reminders[reminderId];
113+
114+
return { success: true };
115+
},
116+
117+
sendReminder: (c, reminderId) => {
118+
const reminder = c.state.reminders[reminderId];
119+
if (!reminder) return;
120+
121+
// Find the user's connection if they're online
122+
const userConn = c.conns.find(
123+
conn => conn.state.userId === reminder.userId
124+
);
125+
126+
if (userConn) {
127+
// Send the reminder to the user
128+
userConn.send("reminder", {
129+
message: reminder.message,
130+
scheduledAt: reminder.scheduledFor
131+
});
132+
} else {
133+
// If user is offline, store reminder for later delivery
134+
// ...
135+
}
136+
137+
// Clean up the processed reminder
138+
delete c.state.reminders[reminderId];
139+
}
140+
}
141+
});
142+
```
143+
144+
## Testing Schedules
145+
146+
```typescript
147+
import { actor } from "actor-core";
148+
149+
const reminderService = actor({
150+
state: {
151+
reminders: {}
152+
},
153+
154+
actions: {
155+
setReminder: async (c, userId, message, delayMs) => {
156+
const reminderId = crypto.randomUUID();
157+
158+
// Store the reminder in state
159+
c.state.reminders[reminderId] = {
160+
userId,
161+
message,
162+
scheduledFor: Date.now() + delayMs
163+
};
164+
165+
// Schedule the sendReminder action to run after the delay
166+
// Store the alarmId for potential cancellation
167+
const alarmId = await c.schedule.after(delayMs, "sendReminder", reminderId);
168+
169+
return { reminderId, alarmId };
170+
},
171+
172+
cancelReminder: async (c, reminderId) => {
173+
const reminder = c.state.reminders[reminderId];
174+
if (!reminder) return { success: false };
175+
176+
// Cancel the scheduled reminder
177+
await c.schedule.cancel(reminder.alarmId);
178+
179+
// Clean up the reminder
180+
delete c.state.reminders[reminderId];
61181

62182
return { reminderId };
63183
},

packages/actor-core/src/actor/schedule.ts

+101-9
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,15 @@ interface ScheduleIndexEvent {
1515

1616
interface ScheduleEvent {
1717
timestamp: number;
18+
createdAt: number;
19+
fn: string;
20+
args: unknown[];
21+
}
22+
23+
export interface Alarm {
24+
id: string;
25+
createdAt: number;
26+
triggersAt: number;
1827
fn: string;
1928
args: unknown[];
2029
}
@@ -28,27 +37,109 @@ export class Schedule {
2837
this.#driver = driver;
2938
}
3039

31-
async after(duration: number, fn: string, ...args: unknown[]) {
32-
await this.#scheduleEvent(Date.now() + duration, fn, args);
40+
async after(duration: number, fn: string, ...args: unknown[]): Promise<string> {
41+
return this.#scheduleEvent(Date.now() + duration, fn, args);
42+
}
43+
44+
async at(timestamp: number, fn: string, ...args: unknown[]): Promise<string> {
45+
return this.#scheduleEvent(timestamp, fn, args);
46+
}
47+
48+
async get(alarmId: string): Promise<Alarm | null> {
49+
const event = await this.#driver.kvGet(
50+
this.#actor.id,
51+
KEYS.SCHEDULE.event(alarmId)
52+
) as ScheduleEvent | undefined;
53+
54+
if (!event) return null;
55+
56+
return {
57+
id: alarmId,
58+
createdAt: event.createdAt,
59+
triggersAt: event.timestamp,
60+
fn: event.fn,
61+
args: event.args
62+
};
63+
}
64+
65+
async list(): Promise<readonly Alarm[]> {
66+
const schedule: ScheduleState = ((await this.#driver.kvGet(
67+
this.#actor.id,
68+
KEYS.SCHEDULE.SCHEDULE
69+
)) as ScheduleState) ?? { events: [] };
70+
71+
const alarms: Alarm[] = [];
72+
for (const event of schedule.events) {
73+
const scheduleEvent = await this.#driver.kvGet(
74+
this.#actor.id,
75+
KEYS.SCHEDULE.event(event.eventId)
76+
) as ScheduleEvent;
77+
78+
if (scheduleEvent) {
79+
alarms.push(Object.freeze({
80+
id: event.eventId,
81+
createdAt: scheduleEvent.createdAt,
82+
triggersAt: scheduleEvent.timestamp,
83+
fn: scheduleEvent.fn,
84+
args: scheduleEvent.args
85+
}));
86+
}
87+
}
88+
89+
return Object.freeze(alarms);
3390
}
3491

35-
async at(timestamp: number, fn: string, ...args: unknown[]) {
36-
await this.#scheduleEvent(timestamp, fn, args);
92+
async cancel(alarmId: string): Promise<void> {
93+
// Get the schedule index
94+
const schedule: ScheduleState = ((await this.#driver.kvGet(
95+
this.#actor.id,
96+
KEYS.SCHEDULE.SCHEDULE
97+
)) as ScheduleState) ?? { events: [] };
98+
99+
// Find and remove the event from the index
100+
const eventIndex = schedule.events.findIndex(x => x.eventId === alarmId);
101+
if (eventIndex === -1) return;
102+
103+
const [removedEvent] = schedule.events.splice(eventIndex, 1);
104+
105+
// Delete the event data
106+
await this.#driver.kvDelete(
107+
this.#actor.id,
108+
KEYS.SCHEDULE.event(alarmId)
109+
);
110+
111+
// Update the schedule index
112+
await this.#driver.kvPut(
113+
this.#actor.id,
114+
KEYS.SCHEDULE.SCHEDULE,
115+
schedule
116+
);
117+
118+
// If we removed the first event (next to execute), update the alarm
119+
if (eventIndex === 0) {
120+
if (schedule.events.length > 0) {
121+
// Set alarm to next event
122+
await this.#driver.setAlarm(this.#actor, schedule.events[0].timestamp);
123+
} else {
124+
// No more events, delete the alarm
125+
await this.#driver.deleteAlarm(this.#actor);
126+
}
127+
}
37128
}
38129

39130
async #scheduleEvent(
40131
timestamp: number,
41132
fn: string,
42133
args: unknown[],
43-
): Promise<void> {
134+
): Promise<string> {
44135
// Save event
45136
const eventId = crypto.randomUUID();
46137
await this.#driver.kvPut(
47138
this.#actor.id,
48-
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
49-
KEYS.SCHEDULE.event(eventId) as any,
139+
KEYS.SCHEDULE.event(eventId),
50140
{
51141
timestamp,
142+
createdAt: Date.now(),
52143
fn,
53144
args,
54145
},
@@ -58,8 +149,7 @@ export class Schedule {
58149
// Read index
59150
const schedule: ScheduleState = ((await this.#driver.kvGet(
60151
this.#actor.id,
61-
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
62-
KEYS.SCHEDULE.SCHEDULE as any,
152+
KEYS.SCHEDULE.SCHEDULE,
63153
)) as ScheduleState) ?? {
64154
events: [],
65155
};
@@ -84,6 +174,8 @@ export class Schedule {
84174
if (insertIndex === 0 || schedule.events.length === 1) {
85175
await this.#driver.setAlarm(this.#actor, newEvent.timestamp);
86176
}
177+
178+
return eventId;
87179
}
88180

89181
async __onAlarm() {

0 commit comments

Comments
 (0)