diff --git a/README.md b/README.md index 5b89a240..c59e18d9 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,5 @@ # River -River allows multiple clients to connect to and make remote procedure calls to a remote server as if they were local procedures. - ## Long-lived streaming remote procedure calls River provides a framework for long-lived streaming Remote Procedure Calls (RPCs) in modern web applications, featuring advanced error handling and customizable retry policies to ensure seamless communication between clients and servers. @@ -14,7 +12,8 @@ River provides a framework similar to [tRPC](https://trpc.io/) and [gRPC](https: - result types and error handling - snappy DX (no code generation) - transparent reconnect support for long-lived sessions -- over any transport (WebSockets and Unix Domain Socket out of the box) +- over any transport (WebSockets out of the box) +- full OpenTelemetry integration (distributed tracing for connections, sessions, procedure calls) See [PROTOCOL.md](./PROTOCOL.md) for more information on the protocol. @@ -27,13 +26,7 @@ Before proceeding, ensure you have TypeScript 5 installed and configured appropr You must verify that: - `compilerOptions.moduleResolution` is set to `"bundler"` - - `compilerOptions.strictFunctionTypes` is set to `true` - - `compilerOptions.strictNullChecks` is set to `true` - - or, preferably, that: - - - `compilerOptions.moduleResolution` is set to `"bundler"` - - `compilerOptions.strict` is set to `true` + - `compilerOptions.strict` is set to true (or at least `compilerOptions.strictFunctionTypes` and `compilerOptions.strictNullChecks`) Like so: @@ -47,7 +40,7 @@ Before proceeding, ensure you have TypeScript 5 installed and configured appropr } ``` - If these options already exist in your `tsconfig.json` and don't match what is shown above, modify them. River is designed for `"strict": true`, but technically only `strictFunctionTypes` and `strictNullChecks` being set to `true` is required. Failing to set these will cause unresolvable type errors when defining services. + If these options already exist in your `tsconfig.json` and don't match what is shown above, modify them. Failing to set these will cause unresolvable type errors when defining services. 2. Install River and Dependencies: @@ -75,14 +68,15 @@ Before proceeding, ensure you have TypeScript 5 installed and configured appropr ### A basic router -First, we create a service using `ServiceSchema`: +First, we create a service: ```ts -import { ServiceSchema, Procedure, Ok } from '@replit/river'; +import { createServiceSchema, Procedure, Ok } from '@replit/river'; import { Type } from '@sinclair/typebox'; +const ServiceSchema = createServiceSchema(); export const ExampleService = ServiceSchema.define( - // configuration + // optional configuration parameter { // initializer for shared state initializeState: () => ({ count: 0 }), @@ -90,10 +84,13 @@ export const ExampleService = ServiceSchema.define( // procedures { add: Procedure.rpc({ + // input type requestInit: Type.Object({ n: Type.Number() }), + // response data type responseData: Type.Object({ result: Type.Number() }), - requestErrors: Type.Never(), - // note that a handler is unique per user RPC + // any error results (other than the uncaught) that this procedure can return + responseError: Type.Never(), + // note that a handler is unique per user async handler({ ctx, reqInit: { n } }) { // access and mutate shared state ctx.state.count += n; @@ -118,11 +115,13 @@ const port = 3000; const wss = new WebSocketServer({ server: httpServer }); const transport = new WebSocketServerTransport(wss, 'SERVER'); -export const server = createServer(transport, { +const services = { example: ExampleService, -}); +}; -export type ServiceSurface = typeof server; +export type ServiceSurface = typeof services; + +const server = createServer(transport, services); httpServer.listen(port); ``` @@ -133,13 +132,15 @@ In another file for the client (to create a separate entrypoint), import { WebSocketClientTransport } from '@replit/river/transport/ws/client'; import { createClient } from '@replit/river'; import { WebSocket } from 'ws'; +import type { ServiceSurface } from './server'; +// ^ type only import to avoid bundling the server! const transport = new WebSocketClientTransport( async () => new WebSocket('ws://localhost:3000'), 'my-client-id', ); -const client = createClient( +const client = createClient( transport, 'SERVER', // transport id of the server in the previous step { eagerlyConnect: true }, // whether to eagerly connect to the server on creation (optional argument) @@ -155,6 +156,88 @@ if (result.ok) { } ``` +### Error Handling + +River uses a Result pattern for error handling. All procedure responses are wrapped in `Ok()` for success or `Err()` for errors: + +```ts +import { Ok, Err } from '@replit/river'; + +// success +return Ok({ result: 42 }); + +// error +return Err({ code: 'INVALID_INPUT', message: 'Value must be positive' }); +``` + +#### Custom Error Types + +You can define custom error schemas for your procedures: + +```ts +const MathService = ServiceSchema.define({ + divide: Procedure.rpc({ + requestInit: Type.Object({ a: Type.Number(), b: Type.Number() }), + responseData: Type.Object({ result: Type.Number() }), + responseError: Type.Union([ + Type.Object({ + code: Type.Literal('DIVISION_BY_ZERO'), + message: Type.String(), + extras: Type.Object({ dividend: Type.Number() }), + }), + Type.Object({ + code: Type.Literal('INVALID_INPUT'), + message: Type.String(), + }), + ]), + async handler({ reqInit: { a, b } }) { + if (b === 0) { + return Err({ + code: 'DIVISION_BY_ZERO', + message: 'Cannot divide by zero', + extras: { dividend: a }, + }); + } + + if (!Number.isFinite(a) || !Number.isFinite(b)) { + return Err({ + code: 'INVALID_INPUT', + message: 'Inputs must be finite numbers', + }); + } + + return Ok({ result: a / b }); + }, + }), +}); +``` + +#### Uncaught Errors + +When a procedure handler throws an uncaught error, River automatically handles it: + +```ts +const ExampleService = ServiceSchema.define({ + maybeThrow: Procedure.rpc({ + requestInit: Type.Object({ shouldThrow: Type.Boolean() }), + responseData: Type.Object({ result: Type.String() }), + async handler({ reqInit: { shouldThrow } }) { + if (shouldThrow) { + throw new Error('Something went wrong!'); + } + + return Ok({ result: 'success' }); + }, + }), +}); + +// client will receive an error with code 'UNCAUGHT_ERROR' +const result = await client.example.maybeThrow.rpc({ shouldThrow: true }); +if (!result.ok && result.payload.code === 'UNCAUGHT_ERROR') { + console.log('Handler threw an error:', result.payload.message); +} +``` + ### Logging To add logging, you can bind a logging function to a transport. @@ -208,7 +291,380 @@ transport.addEventListener('sessionTransition', (evt) => { }); ``` -### Custom Handshake +### Advanced Patterns + +#### All Procedure Types + +River supports four types of procedures, each with different message patterns: + +##### Unary RPC Procedures (1:1) + +Single request, single response: + +```ts +const ExampleService = ServiceSchema.define({ + add: Procedure.rpc({ + requestInit: Type.Object({ a: Type.Number(), b: Type.Number() }), + responseData: Type.Object({ result: Type.Number() }), + async handler({ reqInit: { a, b } }) { + return Ok({ result: a + b }); + }, + }), +}); + +// client usage +const result = await client.example.add.rpc({ a: 1, b: 2 }); +if (result.ok) { + console.log(result.payload.result); // 3 +} +``` + +##### Upload Procedures (n:1) + +Multiple requests, single response: + +```ts +const ExampleService = ServiceSchema.define({ + sum: Procedure.upload({ + requestInit: Type.Object({ multiplier: Type.Number() }), + requestData: Type.Object({ value: Type.Number() }), + responseData: Type.Object({ total: Type.Number() }), + responseError: Type.Object({ + code: Type.Literal('INVALID_INPUT'), + message: Type.String(), + }), + async handler({ ctx, reqInit, reqReadable }) { + let sum = 0; + for await (const msg of reqReadable) { + if (!msg.ok) { + return ctx.cancel('client disconnected'); + } + + sum += msg.payload.value; + } + return Ok({ total: sum * reqInit.multiplier }); + }, + }), +}); + +// client usage +const { reqWritable, finalize } = client.example.sum.upload({ multiplier: 2 }); +reqWritable.write({ value: 1 }); +reqWritable.write({ value: 2 }); +reqWritable.write({ value: 3 }); + +const result = await finalize(); +if (result.ok) { + console.log(result.payload.total); // 12 (6 * 2) +} else { + console.error('Upload failed:', result.payload.message); +} +``` + +##### Subscription Procedures (1:n) + +Single request, multiple responses: + +```ts +const ExampleService = ServiceSchema.define( + { initializeState: () => ({ count: 0 }) }, + { + counter: Procedure.subscription({ + requestInit: Type.Object({ interval: Type.Number() }), + responseData: Type.Object({ count: Type.Number() }), + async handler({ ctx, reqInit, resWritable }) { + const intervalId = setInterval(() => { + ctx.state.count++; + resWritable.write(Ok({ count: ctx.state.count })); + }, reqInit.interval); + + ctx.signal.addEventListener('abort', () => { + clearInterval(intervalId); + }); + }, + }), + }, +); + +// client usage +const { resReadable } = client.example.counter.subscribe({ interval: 1000 }); +for await (const msg of resReadable) { + if (msg.ok) { + console.log('Count:', msg.payload.count); + } else { + console.error('Subscription error:', msg.payload.message); + break; // exit on error for subscriptions + } +} +``` + +##### Stream Procedures (n:n) + +Multiple requests, multiple responses: + +```ts +const ExampleService = ServiceSchema.define({ + echo: Procedure.stream({ + requestInit: Type.Object({ prefix: Type.String() }), + requestData: Type.Object({ message: Type.String() }), + responseData: Type.Object({ echo: Type.String() }), + async handler({ reqInit, reqReadable, resWritable, ctx }) { + for await (const msg of reqReadable) { + if (!msg.ok) { + return; + } + + const { message } = msg.payload; + resWritable.write( + Ok({ + echo: `${reqInit.prefix}: ${message}`, + }), + ); + } + + // client ended their side, we can close ours + resWritable.close(); + }, + }), +}); + +// client usage +const { reqWritable, resReadable } = client.example.echo.stream({ + prefix: 'Server', +}); + +// send messages +reqWritable.write({ message: 'Hello' }); +reqWritable.write({ message: 'World' }); +reqWritable.close(); + +// read responses +for await (const msg of resReadable) { + if (msg.ok) { + console.log(msg.payload.echo); // "Server: Hello", "Server: World" + } else { + console.error('Stream error:', msg.payload.message); + } +} +``` + +#### Client Cancellation + +River supports client-side cancellation using AbortController. All procedure calls accept an optional `signal` parameter: + +```ts +const controller = new AbortController(); +const rpcResult = client.example.longRunning.rpc( + { data: 'hello world' }, + { signal: controller.signal }, +); + +// cancel the operation +controller.abort(); + +// all cancelled operations will receive an error with CANCEL_CODE +const result = await rpcResult; +if (!result.ok && result.payload.code === 'CANCEL_CODE') { + console.log('Operation was cancelled'); +} +``` + +When a client cancels an operation, the server handler receives the cancellation via the `ctx.signal`: + +```ts +const ExampleService = ServiceSchema.define({ + longRunning: Procedure.rpc({ + requestInit: Type.Object({}), + responseData: Type.Object({ result: Type.String() }), + async handler({ ctx }) { + ctx.signal.addEventListener('abort', () => { + // do something + }); + + // long running operation + await new Promise((resolve) => setTimeout(resolve, 10000)); + return Ok({ result: 'completed' }); + }, + }), + + streamingExample: Procedure.stream({ + requestInit: Type.Object({}), + requestData: Type.Object({ message: Type.String() }), + responseData: Type.Object({ echo: Type.String() }), + async handler({ ctx, reqReadable, resWritable }) { + // for streams, cancellation closes both readable and writable + // in addition to triggering the abort signal. + for await (const msg of reqReadable) { + if (!msg.ok) { + // msg.payload.code === CANCEL_CODE error if client cancelled + break; + } + + resWritable.write(Ok({ echo: msg.payload.message })); + } + + resWritable.close(); + }, + }), +}); +``` + +Worth noting that the `ctx.signal` is triggered regardless of the reason the procedure has ended. + +#### Codecs + +River provides two built-in codecs: + +- `NaiveJsonCodec`: Simple JSON serialization +- `BinaryCodec`: Efficient msgpack serialization (recommended for production) + +```ts +import { BinaryCodec, NaiveJsonCodec } from '@replit/river/codec'; + +// use binary codec for better performance +const transport = new WebSocketClientTransport( + async () => new WebSocket('ws://localhost:3000'), + 'my-client-id', + { codec: BinaryCodec }, +); +``` + +You can also create custom codecs for message serialization: + +```ts +import { Codec } from '@replit/river/codec'; + +class CustomCodec implements Codec { + toBuffer(obj: object): Uint8Array { + // custom serialization logic + } + + fromBuffer(buf: Uint8Array): object { + // custom deserialization logic + } +} + +// use with transports +const transport = new WebSocketClientTransport( + async () => new WebSocket('ws://localhost:3000'), + 'my-client-id', + { codec: new CustomCodec() }, +); +``` + +#### Custom Transports + +You can implement custom transports by extending the base Transport classes: + +```ts +import { ClientTransport, ServerTransport } from '@replit/river/transport'; +import { Connection } from '@replit/river/transport'; + +// custom connection implementation +class MyCustomConnection extends Connection { + private socket: MyCustomSocket; + + constructor(socket: MyCustomSocket) { + super(); + this.socket = socket; + + this.socket.onMessage = (data: Uint8Array) => { + this.dataListener?.(data); + }; + + this.socket.onClose = () => { + this.closeListener?.(); + }; + + this.socket.onError = (err: Error) => { + this.errorListener?.(err); + }; + } + + send(msg: Uint8Array): boolean { + return this.socket.send(msg); + } + + close(): void { + this.socket.close(); + } +} + +// custom client transport +class MyCustomClientTransport extends ClientTransport { + constructor( + private connectFn: () => Promise, + clientId: string, + ) { + super(clientId); + } + + async createNewOutgoingConnection(): Promise { + const socket = await this.connectFn(); + return new MyCustomConnection(socket); + } +} + +// custom server transport +class MyCustomServerTransport extends ServerTransport { + constructor( + private server: MyCustomServer, + clientId: string, + ) { + super(clientId); + + server.onConnection = (socket: MyCustomSocket) => { + const connection = new MyCustomConnection(socket); + this.handleConnection(connection); + }; + } +} + +// usage +const clientTransport = new MyCustomClientTransport( + () => connectToMyCustomServer(), + 'client-id', +); + +const client = createClient(clientTransport, 'SERVER'); +``` + +#### Testing + +River provides utilities for testing your services: + +```ts +import { createMockTransportNetwork } from '@replit/river/testUtil'; + +describe('My Service', () => { + // create mock transport network + const { getClientTransport, getServerTransport, cleanup } = + createMockTransportNetwork(); + afterEach(cleanup); + + test('should add numbers correctly', async () => { + // setup server + const serverTransport = getServerTransport('SERVER'); + const services = { + math: MathService, + }; + const server = createServer(serverTransport, services); + + // setup client + const clientTransport = getClientTransport('client'); + const client = createClient(clientTransport, 'SERVER'); + + // test the service + const result = await client.math.add.rpc({ a: 1, b: 2 }); + expect(result.ok).toBe(true); + if (result.ok) { + expect(result.payload.result).toBe(3); + } + }); +}); +``` + +#### Custom Handshake River allows you to extend the protocol-level handshake so you can add additional logic to validate incoming connections. @@ -216,32 +672,33 @@ validate incoming connections. You can do this by passing extra options to `createClient` and `createServer` and extending the `ParsedMetadata` interface: ```ts -declare module '@replit/river' { - interface ParsedMetadata { - userId: number; - } -} +type ContextType = { ... }; // has to extend object +type ParsedMetadata = { parsedToken: string }; +const ServiceSchema = createServiceSchema(); + +const services = { ... }; // use custom ServiceSchema builder here -const schema = Type.Object({ token: Type.String() }); +const handshakeSchema = Type.Object({ token: Type.String() }); createClient(new MockClientTransport('client'), 'SERVER', { eagerlyConnect: false, - handshakeOptions: createClientHandshakeOptions(schema, async () => ({ + handshakeOptions: createClientHandshakeOptions(handshakeSchema, async () => ({ // the type of this function is - // () => Static | Promise> + // () => Static | Promise> token: '123', })), }); createServer(new MockServerTransport('SERVER'), services, { handshakeOptions: createServerHandshakeOptions( - schema, + handshakeSchema, (metadata, previousMetadata) => { // the type of this function is - // (metadata: Static, previousMetadata?: ParsedMetadata) => + // (metadata: Static, previousMetadata?: ParsedMetadata) => // | false | Promise (if you reject it) // | ParsedMetadata | Promise (if you allow it) // next time a connection happens on the same session, previousMetadata will // be populated with the last returned value + return { parsedToken: metadata.token }; }, ), }); diff --git a/package-lock.json b/package-lock.json index d2f75b6e..498a409e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@replit/river", - "version": "0.209.7", + "version": "0.209.8", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@replit/river", - "version": "0.209.7", + "version": "0.209.8", "license": "MIT", "dependencies": { "@msgpack/msgpack": "^3.0.0-beta2", diff --git a/package.json b/package.json index 19d62ff8..7394f97f 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@replit/river", "description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!", - "version": "0.209.7", + "version": "0.209.8", "type": "module", "exports": { ".": {