Skip to content

Implement schedule.get, schedule.list, and schedule.cancel #823

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 122 additions & 2 deletions docs/concepts/schedule.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ Parameters:
- `fn` (string): The name of the action to be executed.
- `...args` (unknown[]): Additional arguments to pass to the function.

Returns:
- `Promise<string>`: A unique identifier for the scheduled event.

### `c.schedule.at(timestamp, fn, ...args)`

Schedules a function to be executed at a specific timestamp. This function persists across actor restarts, upgrades, or crashes.
Expand All @@ -31,6 +34,41 @@ Parameters:
- `fn` (string): The name of the action to be executed.
- `...args` (unknown[]): Additional arguments to pass to the function.

Returns:
- `Promise<string>`: A unique identifier for the scheduled event.

### `c.schedule.list()`

Lists all scheduled events for the actor.

Returns:
- `Promise<Alarm[]>`: An array of scheduled alarms, where each alarm has the following properties:
- `id` (string): The unique identifier of the alarm
- `createdAt` (number): The timestamp when the alarm was created
- `triggersAt` (number): The timestamp when the alarm will trigger
- `fn` (string): The name of the action to be executed
- `args` (unknown[]): The arguments to pass to the function

### `c.schedule.get(alarmId)`

Gets details about a specific scheduled event.

Parameters:
- `alarmId` (string): The unique identifier of the alarm to retrieve.

Returns:
- `Promise<Alarm | undefined>`: The alarm details if found, undefined otherwise.

### `c.schedule.cancel(alarmId)`

Cancels a scheduled event.

Parameters:
- `alarmId` (string): The unique identifier of the alarm to cancel.

Returns:
- `Promise<void>`

## Scheduling Private Actions

Currently, scheduling can only trigger public actions. If the scheduled action is private, it needs to be secured with something like a token.
Expand All @@ -46,7 +84,7 @@ const reminderService = actor({
},

actions: {
setReminder: (c, userId, message, delayMs) => {
setReminder: async (c, userId, message, delayMs) => {
const reminderId = crypto.randomUUID();

// Store the reminder in state
Expand All @@ -57,7 +95,89 @@ const reminderService = actor({
};

// Schedule the sendReminder action to run after the delay
c.after(delayMs, "sendReminder", reminderId);
// Store the alarmId for potential cancellation
const alarmId = await c.schedule.after(delayMs, "sendReminder", reminderId);

return { reminderId, alarmId };
},

cancelReminder: async (c, reminderId) => {
const reminder = c.state.reminders[reminderId];
if (!reminder) return { success: false };

// Cancel the scheduled reminder
await c.schedule.cancel(reminder.alarmId);

// Clean up the reminder
delete c.state.reminders[reminderId];

return { success: true };
},

sendReminder: (c, reminderId) => {
const reminder = c.state.reminders[reminderId];
if (!reminder) return;

// Find the user's connection if they're online
const userConn = c.conns.find(
conn => conn.state.userId === reminder.userId
);

if (userConn) {
// Send the reminder to the user
userConn.send("reminder", {
message: reminder.message,
scheduledAt: reminder.scheduledFor
});
} else {
// If user is offline, store reminder for later delivery
// ...
}

// Clean up the processed reminder
delete c.state.reminders[reminderId];
}
}
});
```

## Testing Schedules

```typescript
import { actor } from "actor-core";

const reminderService = actor({
state: {
reminders: {}
},

actions: {
setReminder: async (c, userId, message, delayMs) => {
const reminderId = crypto.randomUUID();

// Store the reminder in state
c.state.reminders[reminderId] = {
userId,
message,
scheduledFor: Date.now() + delayMs
};

// Schedule the sendReminder action to run after the delay
// Store the alarmId for potential cancellation
const alarmId = await c.schedule.after(delayMs, "sendReminder", reminderId);

return { reminderId, alarmId };
},

cancelReminder: async (c, reminderId) => {
const reminder = c.state.reminders[reminderId];
if (!reminder) return { success: false };

// Cancel the scheduled reminder
await c.schedule.cancel(reminder.alarmId);

// Clean up the reminder
delete c.state.reminders[reminderId];

return { reminderId };
},
Expand Down
2 changes: 2 additions & 0 deletions packages/actor-core/src/actor/driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ export interface ActorDriver {

// Schedule
setAlarm(actor: AnyActorInstance, timestamp: number): Promise<void>;
getAlarm(actor: AnyActorInstance): Promise<number | null>;
deleteAlarm(actor: AnyActorInstance): Promise<void>;

// TODO:
//destroy(): Promise<void>;
Expand Down
26 changes: 25 additions & 1 deletion packages/actor-core/src/actor/instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ export class ActorInstance<S, CP, CS, V> {
timestamp: number,
fn: string,
args: unknown[],
): Promise<void> {
): Promise<string> {
// Build event
const eventId = crypto.randomUUID();
const newEvent: PersistedScheduleEvents = {
Expand Down Expand Up @@ -244,6 +244,30 @@ export class ActorInstance<S, CP, CS, V> {
this.actorContext.log.info("setting alarm", { timestamp });
await this.#actorDriver.setAlarm(this, newEvent.t);
}
return eventId;
}

async getEvent(eventId: string) {
return this.#persist.e.find((x) => x.e === eventId);
}

async cancelEvent(eventId: string) {
const index = this.#persist.e.findIndex((x) => x.e === eventId);
if (index !== -1) {
if (index === 0 && this.#persist.e.length === 1) {
this.actorContext.log.info("clearing alarm");
await this.#actorDriver.deleteAlarm(this);
} else if (index === 0) {
this.actorContext.log.info("setting next alarm", { timestamp: this.#persist.e[1].t });
await this.#actorDriver.setAlarm(this, this.#persist.e[1].t);
}
this.#persist.e.splice(index, 1);
this.actorContext.log.info("cancelled event", { eventId });
}
}

async listEvents() {
return this.#persist.e;
}

async onAlarm() {
Expand Down
26 changes: 23 additions & 3 deletions packages/actor-core/src/actor/schedule.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,37 @@
import type { AnyActorInstance } from "./instance";

export interface ScheduledEvent {
id: string;
createdAt: number;
triggersAt: number;
fn: string;
args: unknown[];
}

export class Schedule {
#actor: AnyActorInstance;

constructor(actor: AnyActorInstance) {
this.#actor = actor;
}

async after(duration: number, fn: string, ...args: unknown[]) {
await this.#actor.scheduleEvent(Date.now() + duration, fn, args);
async after(duration: number, fn: string, ...args: unknown[]): Promise<string> {
return await this.#actor.scheduleEvent(Date.now() + duration, fn, args);
}

async at(timestamp: number, fn: string, ...args: unknown[]) {
await this.#actor.scheduleEvent(timestamp, fn, args);
return await this.#actor.scheduleEvent(timestamp, fn, args);
}

async get(alarmId: string) {
return this.#actor.getEvent(alarmId);
}

async cancel(eventId: string) {
await this.#actor.cancelEvent(eventId);
}

async list() {
return await this.#actor.listEvents();
}
}
19 changes: 18 additions & 1 deletion packages/actor-core/src/test/driver/actor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ export interface ActorDriverContext {

export class TestActorDriver implements ActorDriver {
#state: TestGlobalState;
#alarms: Map<string, { timeout: NodeJS.Timeout, timestamp: number }>;

constructor(state: TestGlobalState) {
this.#state = state;
this.#alarms = new Map();
}

getContext(_actorId: string): ActorDriverContext {
Expand All @@ -29,8 +31,23 @@ export class TestActorDriver implements ActorDriver {

async setAlarm(actor: AnyActorInstance, timestamp: number): Promise<void> {
const delay = Math.max(timestamp - Date.now(), 0);
setTimeout(() => {
const timeout = setTimeout(() => {
this.#alarms.delete(actor.id);
actor.onAlarm();
}, delay);
this.#alarms.set(actor.id, { timeout, timestamp });
}

async getAlarm(actor: AnyActorInstance): Promise<number | null> {
const alarm = this.#alarms.get(actor.id);
return alarm ? alarm.timestamp : null;
}

async deleteAlarm(actor: AnyActorInstance): Promise<void> {
const alarm = this.#alarms.get(actor.id);
if (alarm) {
clearTimeout(alarm.timeout);
this.#alarms.delete(actor.id);
}
}
}
23 changes: 20 additions & 3 deletions packages/drivers/file-system/src/actor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ export type ActorDriverContext = Record<never, never>;
*/
export class FileSystemActorDriver implements ActorDriver {
#state: FileSystemGlobalState;
#alarms: Map<string, { timeout: NodeJS.Timeout, timestamp: number }>;

constructor(state: FileSystemGlobalState) {
this.#state = state;
this.#alarms = new Map();
}

/**
Expand All @@ -36,9 +38,24 @@ export class FileSystemActorDriver implements ActorDriver {
}

async setAlarm(actor: AnyActorInstance, timestamp: number): Promise<void> {
const delay = Math.max(0, timestamp - Date.now());
setTimeout(() => {
const delay = Math.max(timestamp - Date.now(), 0);
const timeout = setTimeout(() => {
this.#alarms.delete(actor.id);
actor.onAlarm();
}, delay);
this.#alarms.set(actor.id, { timeout, timestamp });
}
}

async getAlarm(actor: AnyActorInstance): Promise<number | null> {
const alarm = this.#alarms.get(actor.id);
return alarm ? alarm.timestamp : null;
}

async deleteAlarm(actor: AnyActorInstance): Promise<void> {
const alarm = this.#alarms.get(actor.id);
if (alarm) {
clearTimeout(alarm.timeout);
this.#alarms.delete(actor.id);
}
}
}
19 changes: 18 additions & 1 deletion packages/drivers/memory/src/actor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ export type ActorDriverContext = Record<never, never>;

export class MemoryActorDriver implements ActorDriver {
#state: MemoryGlobalState;
#alarms: Map<string, { timeout: NodeJS.Timeout, timestamp: number }>;

constructor(state: MemoryGlobalState) {
this.#state = state;
this.#alarms = new Map();
}

getContext(_actorId: string): ActorDriverContext {
Expand All @@ -24,8 +26,23 @@ export class MemoryActorDriver implements ActorDriver {

async setAlarm(actor: AnyActorInstance, timestamp: number): Promise<void> {
const delay = Math.max(timestamp - Date.now(), 0);
setTimeout(() => {
const timeout = setTimeout(() => {
this.#alarms.delete(actor.id);
actor.onAlarm();
}, delay);
this.#alarms.set(actor.id, { timeout, timestamp });
}

async getAlarm(actor: AnyActorInstance): Promise<number | null> {
const alarm = this.#alarms.get(actor.id);
return alarm ? alarm.timestamp : null;
}

async deleteAlarm(actor: AnyActorInstance): Promise<void> {
const alarm = this.#alarms.get(actor.id);
if (alarm) {
clearTimeout(alarm.timeout);
this.#alarms.delete(actor.id);
}
}
}
19 changes: 18 additions & 1 deletion packages/drivers/redis/src/actor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ export interface ActorDriverContext {

export class RedisActorDriver implements ActorDriver {
#redis: Redis;
#alarms: Map<string, { timeout: NodeJS.Timeout, timestamp: number }>;

constructor(redis: Redis) {
this.#redis = redis;
this.#alarms = new Map();
}

getContext(_actorId: string): ActorDriverContext {
Expand All @@ -32,8 +34,23 @@ export class RedisActorDriver implements ActorDriver {

async setAlarm(actor: AnyActorInstance, timestamp: number): Promise<void> {
const delay = Math.max(timestamp - Date.now(), 0);
setTimeout(() => {
const timeout = setTimeout(() => {
this.#alarms.delete(actor.id);
actor.onAlarm();
}, delay);
this.#alarms.set(actor.id, { timeout, timestamp });
}

async getAlarm(actor: AnyActorInstance): Promise<number | null> {
const alarm = this.#alarms.get(actor.id);
return alarm ? alarm.timestamp : null;
}

async deleteAlarm(actor: AnyActorInstance): Promise<void> {
const alarm = this.#alarms.get(actor.id);
if (alarm) {
clearTimeout(alarm.timeout);
this.#alarms.delete(actor.id);
}
}
}
Loading
Loading