Skip to content

Latest commit

 

History

History

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 
 
 

README.md

@atcute/xrpc-server

web framework for XRPC servers.

npm install @atcute/xrpc-server

prerequisites

this framework relies on schemas generated by @atcute/lex-cli, you'd need to follow its quick start guide on how to set it up.

for these examples, we'll use a simple query operation that greets a name:

// file: lexicons/com/example/greet.json
{
	"lexicon": 1,
	"id": "com.example.greet",
	"defs": {
		"main": {
			"type": "query",
			"parameters": {
				"type": "params",
				"required": ["name"],
				"properties": {
					"name": { "type": "string" }
				}
			},
			"output": {
				"encoding": "application/json",
				"schema": {
					"type": "object",
					"required": ["message"],
					"properties": {
						"message": { "type": "string" }
					}
				}
			}
		}
	}
}

usage

handling requests

use addQuery() for queries (GET) and addProcedure() for procedures (POST). handlers receive typed params and input, and return responses using the json() helper:

import { XRPCRouter, json } from '@atcute/xrpc-server';
import { cors } from '@atcute/xrpc-server/middlewares/cors';

import { ComExampleGreet, ComExampleCreatePost } from './lexicons/index.js';

const router = new XRPCRouter({ middlewares: [cors()] });

router.addQuery(ComExampleGreet, {
	async handler({ params }) {
		return json({ message: `hello ${params.name}!` });
	},
});

router.addProcedure(ComExampleCreatePost, {
	async handler({ input }) {
		const post = await db.createPost(input);
		return json(post);
	},
});

export default router;

serving the router

on Deno, Bun or Cloudflare Workers, you can export the router directly and expect it to work out of the box.

for Node.js, you'll need a fetch-to-Node adapter like @remix-run/node-fetch-server since the router works with standard Web Request/Response:

import * as http from 'node:http';
import { createRequestListener } from '@remix-run/node-fetch-server';
import { XRPCRouter } from '@atcute/xrpc-server';

const router = new XRPCRouter();

// ... add handlers ...

const server = http.createServer(createRequestListener(router.fetch));
server.listen(3000, () => {
	console.log('listening on port 3000');
});

standalone handlers

if you only need a single XRPC operation, you can skip creating a router and export a handler directly:

import { createXrpcHandler, json } from '@atcute/xrpc-server';
import { AppBskyFeedGetFeedSkeleton } from '@atcute/bluesky';

export default createXrpcHandler({
	lxm: AppBskyFeedGetFeedSkeleton,
	async handler({ params }) {
		return json({ feed: [] });
	},
});

requests should be routed to /xrpc/<nsid>.

error handling

throw XRPCError in handlers to return error responses:

import { XRPCError } from '@atcute/xrpc-server';

router.addQuery(ComExampleGetPost, {
	async handler({ params, request }) {
		const session = await getSession(request);
		if (!session) {
			throw new XRPCError({ status: 401, error: 'AuthenticationRequired' });
		}

		const post = await db.getPost(params.uri);
		if (!post) {
			throw new XRPCError({ status: 400, error: 'InvalidRequest', message: `post not found` });
		}

		return json(post);
	},
});

convenience subclasses are also available: InvalidRequestError, AuthRequiredError, ForbiddenError, RateLimitExceededError, InternalServerError, UpstreamFailureError, NotEnoughResourcesError, UpstreamTimeoutError.

AuthRequiredError accepts a wwwAuthenticate option that auto-formats an RFC 7235 WWW-Authenticate header on the response (and appends access-control-expose-headers so browsers can read it from CORS responses):

import { AuthRequiredError } from '@atcute/xrpc-server';

throw new AuthRequiredError({
	message: 'invalid token',
	wwwAuthenticate: { scheme: 'Bearer', params: { error: 'BadJwtSignature' } },
});

observing errors

for logs or metrics, use the onError / onSocketError router options. these are fire-and-forget hooks invoked alongside response generation, and they are NOT called for client-induced errors (aborted requests, XRPCError, XRPCSubscriptionError) — only for bugs worth reporting:

const router = new XRPCRouter({
	onError({ error, request }) {
		reportToSentry(error, { url: request.url });
	},
	onSocketError({ error, request }) {
		reportToSentry(error, { url: request.url, subscription: true });
	},
});

health check

the router can optionally answer /xrpc/_health if you pass handleHealthCheck. this endpoint is non-standard — consumers decide the response shape and status.

const router = new XRPCRouter({
	async handleHealthCheck() {
		const healthy = await pingDatabase();
		return Response.json({ status: healthy ? 'ok' : 'degraded' }, { status: healthy ? 200 : 503 });
	},
});

subscriptions

subscriptions provide real-time streaming over WebSocket. they require a runtime-specific adapter:

runtime adapter package
Bun @atcute/xrpc-server-bun
Node.js @atcute/xrpc-server-node
Deno @atcute/xrpc-server-deno
Cloudflare Workers @atcute/xrpc-server-cloudflare

here's an example using Bun:

import { XRPCRouter } from '@atcute/xrpc-server';
import { createBunWebSocket } from '@atcute/xrpc-server-bun';

import { ComExampleSubscribe } from './lexicons/index.js';

const ws = createBunWebSocket();

const router = new XRPCRouter({ websocket: ws.adapter });

router.addSubscription(ComExampleSubscribe, {
	async *handler({ params, signal }) {
		// yield messages until the client disconnects
		while (!signal.aborted) {
			const events = await getNewEvents(params.cursor);

			for (const event of events) {
				yield event;
			}

			await sleep(1000);
		}
	},
});

export default ws.wrap(router);

the handler is an async generator that yields messages. each yielded value is encoded as a CBOR frame and sent to the client. the signal is aborted when the client disconnects.

for subscription errors, use XRPCSubscriptionError:

import { XRPCSubscriptionError } from '@atcute/xrpc-server';

router.addSubscription(ComExampleSubscribe, {
	async *handler({ params }) {
		if (params.cursor && isCursorTooOld(params.cursor)) {
			throw new XRPCSubscriptionError({
				error: 'FutureCursor',
				message: `cursor is too old`,
			});
		}

		// ...
	},
});

backpressure is handled per adapter: each adapter's create*WebSocket() factory accepts highWaterMark / lowWaterMark options (default 250 KB / 50 KB) that throttle the send loop when the outgoing buffer grows. the Cloudflare Workers adapter does not apply backpressure — the runtime does not expose the outgoing WebSocket buffer.

service authentication

the @atcute/xrpc-server/auth subpackage provides utilities for service-to-service authentication using JWTs.

verifying incoming JWTs:

import { ServiceJwtVerifier } from '@atcute/xrpc-server/auth';
import {
	CompositeDidDocumentResolver,
	PlcDidDocumentResolver,
	WebDidDocumentResolver,
} from '@atcute/identity-resolver';

const jwtVerifier = new ServiceJwtVerifier({
	// list of audience values this service accepts. during the transition to proposal 0014
	// (atproto service auth audience), configure both the bare DID and the DID-with-service-ref
	// form so that tokens from older issuers keep working alongside new ones.
	acceptAudiences: ['did:web:my-service.example.com', 'did:web:my-service.example.com#atproto_pds'],
	resolver: new CompositeDidDocumentResolver({
		methods: {
			plc: new PlcDidDocumentResolver(),
			web: new WebDidDocumentResolver(),
		},
	}),
});

router.addQuery(ComExampleProtectedEndpoint, {
	async handler({ request }) {
		const auth = await jwtVerifier.verifyRequest(request, { lxm: 'com.example.protectedEndpoint' });
		return json({ caller: auth.issuer });
	},
});

verifyRequest parses the Authorization: Bearer header, verifies the token, and throws an AuthRequiredError with a populated WWW-Authenticate: Bearer error="…" challenge on every failure path. it forwards request.signal into DID resolution so aborted requests don't keep network calls alive.

additional options tune verification:

const jwtVerifier = new ServiceJwtVerifier({
	acceptAudiences: [...],
	resolver: ...,
	maxAge: 300,       // max token lifetime window in seconds (default 300)
	clockLeeway: 5,    // leeway applied to nbf/exp comparisons (default 5)
	replayStore: {     // optional replay protection; requires jti in tokens
		async check({ iss, jti }, ttlSeconds) {
			const key = `${iss}:${jti}`;
			const isNew = await redis.setnx(key, '1');
			if (isNew === 1) await redis.expire(key, ttlSeconds);
			return isNew === 1;
		},
	},
});

creating outgoing JWTs:

import { createServiceJwt } from '@atcute/xrpc-server/auth';

const jwt = await createServiceJwt({
	keypair: myServiceKeypair,
	issuer: 'did:web:my-service.example.com',
	audience: 'did:plc:targetservice',
	lxm: 'com.example.someEndpoint',
});

// use jwt in Authorization header when calling other services

internal calls

you can make typed calls to your own endpoints using @atcute/client:

import { Client, ok } from '@atcute/client';

const client = new Client({
	handler(pathname, init) {
		return router.fetch(new Request(new URL(pathname, 'http://localhost'), init));
	},
});

const data = await ok(
	client.get('com.example.greet', {
		params: { name: 'world' },
	}),
);

console.log(data.message); // fully typed!