Skip to content

Custom background tasks: core defineTask / enqueueTask API #797

@2chanhaeng

Description

@2chanhaeng

First sub-issue of #206.

Background

#206 generalizes the fire-and-then-process pattern that Context.sendActivity already uses — enqueue work, return the HTTP response immediately, let a separate worker process the payload later — to arbitrary application-defined long-running jobs (digest mailers, image transcoding, periodic cleanup, …). Tasks run on a separate background worker, not in-request; in-process execution via setTimeout/queueMicrotask is explicitly out of scope.

This sub-issue delivers the end-to-end core: define a task, enqueue it from a Context, run it on a worker, retry on failure. It deliberately excludes deduplication (#798 sub-issue) and task-specific telemetry attributes (#799 sub-issue) so the first PR stays small and reviewable.

Public API

Registration

defineTask is declared on a new TaskRegistry<TContextData> interface; Federatable<TContextData> extends it, so both Federation and FederationBuilder inherit the method (they already extend Federatable).

import type { StandardSchemaV1 } from "@standard-schema/spec";

interface TaskRegistry<TContextData> {
  defineTask<TSchema extends StandardSchemaV1>(
    name: string,
    options: TaskDefinitionOptions<TContextData, TSchema>,
  ): TaskDefinition<TContextData, StandardSchemaV1.InferOutput<TSchema>>;
}

interface Federatable<TContextData> extends TaskRegistry<TContextData> { /* … */ }

The returned TaskDefinition<TContextData, TData> carries name and schema plus a phantom TContextData marker — no runtime methods. Dispatch lives on Context (below).

Payload handling

schema is required. Every task declares a Standard Schema object (Zod, Valibot, ArkType, …) and TData is inferred from it via StandardSchemaV1.InferOutput; there is no explicit-TData escape hatch and no unknown fallback. The schema validates the envelope of the payload, and an Activity Vocabulary object carried inside it (Note, Create, …) is described as an opaque instanceof leaf (z.instanceof(Create) / v.instance(Create)), so the schema stays small even when the payload carries ActivityPub objects.

Serialization is owned by Fedify, not the caller. The payload is encoded to a wire string with devalue, whose native custom-type hook bridges each vocabulary object through its JSON-LD document: vocabulary objects keep their state in private fields, so they cannot be serialized structurally and must travel as JSON-LD via the asynchronous toJsonLd/fromJsonLd pair. Encoding and decoding are therefore asynchronous, and there is no caller-facing stringify/parse hook.

interface TaskDefinitionOptions<TContextData, TSchema extends StandardSchemaV1> {
  readonly schema: TSchema;
  readonly handler: (
    ctx: Context<TContextData>,
    data: StandardSchemaV1.InferOutput<TSchema>,
  ) => Promise<void> | void;
  readonly retryPolicy?: RetryPolicy;
  readonly onError?: (
    ctx: Context<TContextData>,
    error: unknown,
    data: StandardSchemaV1.InferOutput<TSchema>,
  ) => Promise<void> | void;
  readonly queue?: MessageQueue;
}

The serializer (devalue) and the required-schema decision were settled on the umbrella issue; raise any reconsideration here rather than re-opening it on #206.

Dispatch (enqueue)

Two new methods on Context<TContextData>, placed alongside sendActivity / routeActivity / forwardActivity:

enqueueTask<TData>(task: TaskDefinition<TContextData, TData>, data: TData, options?: TaskEnqueueOptions): Promise<void>;
enqueueTaskMany<TData>(task: TaskDefinition<TContextData, TData>, payloads: readonly TData[], options?: TaskEnqueueOptions): Promise<void>;

interface TaskEnqueueOptions {
  readonly delay?: Temporal.DurationLike;
  readonly orderingKey?: string;
  // deduplicationKey is added by the deduplication sub-issue (non-breaking).
}

enqueueTask validates data against the task's schema (fail fast), encodes it with devalue, and stores the resulting string in the message before handing it to the queue adapter. enqueueTaskMany resolves the queue once and uses MessageQueue.enqueueMany when available, falling back to parallel single enqueues otherwise. Both go through the private #enqueueTasks helper.

Wire format

export interface TaskMessage {
  readonly type: "task";
  readonly id: ReturnType<typeof crypto.randomUUID>;
  readonly baseUrl: string;
  readonly taskName: string;
  readonly data: string;            // devalue-encoded payload, see payload handling
  readonly started: string;
  readonly attempt: number;
  readonly orderingKey?: string;
  readonly traceContext: Readonly<Record<string, string>>;
}
export type Message = FanoutMessage | OutboxMessage | InboxMessage | TaskMessage;

Message is documented as opaque, so the new variant is non-breaking. data is a string (not unknown) because every queue backend serializes the message in its own way (JSON, structured clone, …); a pre-encoded string is the only form that survives all of them unchanged, and it is also what lets a vocabulary object travel as its JSON-LD document.

Queue routing

  • New task?: MessageQueue slot in FederationQueueOptions.
  • A task's queue is resolved as: per-task queue (a MessageQueue passed to defineTask) → taskQueueoutboxQueue fallback → throw. There is no per-call queue override on TaskEnqueueOptions; a task's destination is declared once at defineTask.
  • FederationOptions.taskQueueResolution?: "fallback" | "strict" (default "fallback") opts out of the outboxQueue fallback step for deployments that isolate task work.
  • Federation.startQueue(ctxData, { queue: "task" }) starts a dedicated task worker (extend the existing "inbox" | "outbox" | "fanout" selector).

Worker behavior

processQueuedTask gains a "task" branch dispatching to #listenTaskMessage, which: looks up the handler by taskName (missing → log + drop); decodes data with devalue (failure → log + drop, no retry); validates the decoded value against schema (invalid → log + drop, no retry); builds a Context and invokes the handler; on throw calls onError, then either rethrows (nativeRetrial) or applies retryPolicy (per-task, else FederationOptions.taskRetryPolicy, else createExponentialBackoffPolicy()) and re-enqueues with attempt + 1.

Forward compatibility (Approach 2)

The TaskRegistry<TContextData> seam exists so Approach 2 (a separated Worker class) lands without call-site churn: Federatable stops extending TaskRegistry, and a Worker-like class implements TaskRegistry instead. TaskMessage.taskName stays a string identifier (location-agnostic lookup), and the task queue slot becomes WorkerOptions.queue verbatim.

Out of scope (separate sub-issues)

  • Deduplication — MessageQueue.nativeDeduplication, TaskEnqueueOptions.deduplicationKey, KV fallback, taskDeduplicationTtl, taskDeduplicationFallbackOutbox auto-filler #2 sub-issue.
  • Task-specific telemetry — QueueTaskRole "task", fedify.task span, fedify.task.name/failure_reason attributes → Custom WebFinger query #3 sub-issue. This PR implements the dispatch behavior and logging only; Custom WebFinger query #3 layers OTel attributes onto the decision points.
  • Deferred entirely: cron/periodic scheduling, Celery canvas primitives, AsyncResult/result backend, bind=True/self.retry, per-task priority.

Acceptance criteria

  • ctx.enqueueTask(task, data) round-trips to a typed handler; a wrong-shaped payload is a compile error.
  • schema (required) infers TData; an invalid decoded payload is dropped without retry.
  • A payload carrying a vocabulary object (Note/Create) alongside Date/Map round-trips faithfully through the devalue codec — each vocab object comes back as a real instance — and the schema validates the envelope with the vocab object as an instanceof leaf.
  • A decode failure on a malformed wire string is dropped without retry.
  • Handler throw → retry with backoff; a nativeRetrial queue rethrows without re-enqueue; per-task retryPolicy overrides the federation default.
  • Unknown taskName → drop + warning.
  • Queue routing precedence and taskQueueResolution: "strict" behave as specified; startQueue({ queue: "task" }) starts only the task worker.
  • enqueueTaskMany uses enqueueMany when available, else parallel enqueue.
  • defineTask with a duplicate name throws; builder clone isolation holds across two build() calls.
  • Type-level forward-compat guard: Federatable is assignable to TaskRegistry<TContextData>.
  • Tests use @fedify/fixture test() (Cloudflare Workers harness compatibility) and pass under Deno, Node.js, and Bun.
  • docs/manual/tasks.md covers defining, payload handling, dispatching, retry/error, queue isolation; CHANGES.md updated; new public types carry @since JSDoc; AI usage disclosed per AI_POLICY.md.

References

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Priority

None yet

Effort

None yet

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions