Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions docs/server/hooks.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ By way of illustration, if a user isn’t allowed to connect: Just throw an erro
| `beforeBroadcastStateless` | Before broadcast a stateless message | [Read more](/server/hooks#before-broadcast-stateless) |
| `afterUnloadDocument` | When a document is closed | [Read more](/server/hooks#after-unload-document) |


## Usage

```js
Expand Down Expand Up @@ -759,7 +758,6 @@ const server = new Server({
server.listen();
```


### onStateless

The `onStateless` hooks are called after the server has received a stateless message. It should return a Promise.
Expand Down Expand Up @@ -796,6 +794,37 @@ const server = new Server({
server.listen()
```

### beforeSync

The `beforeSync` hooks are called before a sync message is handled. This is useful if you want to inspect the sync message that will be applied to the document.

**Hook payload**

```js
const data = {
documentName: string,
document: Document,
// The y-protocols/sync message type
type: number,
// The payload of the y-protocols/sync message
payload: Uint8Array,
}
```

**Example**

```js
import { Server } from '@hocuspocus/server'

const server = new Server({
async beforeSync({ payload, document, documentName, type }) {
console.log(`Server will handle a sync message: "${payload}"!`)
},
})

server.listen()
```

### beforeBroadcastStateless

The `beforeBroadcastStateless` hooks are called before the server broadcast a stateless message.
Expand Down
15 changes: 15 additions & 0 deletions packages/server/src/ClientConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { OutgoingMessage } from './OutgoingMessage.ts'
import type {
ConnectionConfiguration,
beforeHandleMessagePayload,
beforeSyncPayload,
onDisconnectPayload,
} from './types.ts'
import {
Expand Down Expand Up @@ -199,6 +200,20 @@ export class ClientConnection {
return this.hooks('beforeHandleMessage', beforeHandleMessagePayload)
})

instance.beforeSync((connection, payload) => {
const beforeSyncPayload: beforeSyncPayload = {
clientsCount: document.getConnectionsCount(),
context: hookPayload.context,
document,
documentName: document.name,
connection,
type: payload.type,
payload: payload.payload,
}

return this.hooks('beforeSync', beforeSyncPayload)
})

return instance
}

Expand Down
12 changes: 11 additions & 1 deletion packages/server/src/Connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import type Document from './Document.ts'
import { IncomingMessage } from './IncomingMessage.ts'
import { MessageReceiver } from './MessageReceiver.ts'
import { OutgoingMessage } from './OutgoingMessage.ts'
import type { onStatelessPayload } from './types.ts'
import type { beforeSyncPayload, onStatelessPayload } from './types.ts'

export class Connection {

Expand All @@ -23,6 +23,7 @@ export class Connection {
callbacks = {
onClose: [(document: Document, event?: CloseEvent) => {}],
beforeHandleMessage: (connection: Connection, update: Uint8Array) => Promise.resolve(),
beforeSync: (connection: Connection, payload: Pick<beforeSyncPayload, 'type' | 'payload'>) => Promise.resolve(),
statelessCallback: (payload: onStatelessPayload) => Promise.resolve(),
}

Expand Down Expand Up @@ -81,6 +82,15 @@ export class Connection {
return this
}

/**
* Set a callback that will be triggered before a sync message is handled
*/
beforeSync(callback: (connection: Connection, payload: Pick<beforeSyncPayload, 'type' | 'payload'>) => Promise<any>): Connection {
this.callbacks.beforeSync = callback

return this
}

/**
* Send the given message
*/
Expand Down
2 changes: 2 additions & 0 deletions packages/server/src/Hocuspocus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ export class Hocuspocus {
onConnect: () => new Promise(r => r(null)),
connected: () => new Promise(r => r(null)),
beforeHandleMessage: () => new Promise(r => r(null)),
beforeSync: () => new Promise(r => r(null)),
beforeBroadcastStateless: () => new Promise(r => r(null)),
onStateless: () => new Promise(r => r(null)),
onChange: () => new Promise(r => r(null)),
Expand Down Expand Up @@ -111,6 +112,7 @@ export class Hocuspocus {
afterLoadDocument: this.configuration.afterLoadDocument,
beforeHandleMessage: this.configuration.beforeHandleMessage,
beforeBroadcastStateless: this.configuration.beforeBroadcastStateless,
beforeSync: this.configuration.beforeSync,
onStateless: this.configuration.onStateless,
onChange: this.configuration.onChange,
onStoreDocument: this.configuration.onStoreDocument,
Expand Down
7 changes: 7 additions & 0 deletions packages/server/src/IncomingMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ export class IncomingMessage {
return readVarUint8Array(this.decoder)
}

peekVarUint8Array() {
const { pos } = this.decoder
const result = readVarUint8Array(this.decoder)
this.decoder.pos = pos
return result
}

readVarUint() {
return readVarUint(this.decoder)
}
Expand Down
15 changes: 11 additions & 4 deletions packages/server/src/MessageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class MessageReceiver {
this.defaultTransactionOrigin = defaultTransactionOrigin
}

public apply(document: Document, connection?: Connection, reply?: (message: Uint8Array) => void) {
public async apply(document: Document, connection?: Connection, reply?: (message: Uint8Array) => void) {
const { message } = this
const type = message.readVarUint()
const emptyMessageLength = message.length
Expand All @@ -36,7 +36,7 @@ export class MessageReceiver {
case MessageType.Sync:
case MessageType.SyncReply: {
message.writeVarUint(MessageType.Sync)
this.readSyncMessage(message, document, connection, reply, type !== MessageType.SyncReply)
await this.readSyncMessage(message, document, connection, reply, type !== MessageType.SyncReply)

if (message.length > emptyMessageLength + 1) {
if (reply) {
Expand All @@ -55,7 +55,7 @@ export class MessageReceiver {
break
}
case MessageType.Awareness: {
applyAwarenessUpdate(document.awareness, message.readVarUint8Array(), connection?.webSocket)
await applyAwarenessUpdate(document.awareness, message.readVarUint8Array(), connection?.webSocket)

break
}
Expand Down Expand Up @@ -101,9 +101,16 @@ export class MessageReceiver {
}
}

readSyncMessage(message: IncomingMessage, document: Document, connection?: Connection, reply?: (message: Uint8Array) => void, requestFirstSync = true) {
async readSyncMessage(message: IncomingMessage, document: Document, connection?: Connection, reply?: (message: Uint8Array) => void, requestFirstSync = true) {
const type = message.readVarUint()

if (connection) {
await connection.callbacks.beforeSync(connection, {
type,
payload: message.peekVarUint8Array(),
})
}

switch (type) {
case messageYjsSyncStep1: {
readSyncStep1(message.decoder, message.encoder, document)
Expand Down
25 changes: 25 additions & 0 deletions packages/server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export interface Extension {
onLoadDocument?(data: onLoadDocumentPayload): Promise<any>;
afterLoadDocument?(data: afterLoadDocumentPayload): Promise<any>;
beforeHandleMessage?(data: beforeHandleMessagePayload): Promise<any>;
beforeSync?(data: beforeSyncPayload): Promise<any>;
beforeBroadcastStateless?(data: beforeBroadcastStatelessPayload): Promise<any>;
onStateless?(payload: onStatelessPayload): Promise<any>;
onChange?(data: onChangePayload): Promise<any>;
Expand All @@ -69,6 +70,7 @@ export type HookName =
'afterLoadDocument' |
'beforeHandleMessage' |
'beforeBroadcastStateless' |
'beforeSync' |
'onStateless' |
'onChange' |
'onStoreDocument' |
Expand All @@ -92,6 +94,7 @@ export type HookPayloadByName = {
afterLoadDocument: afterLoadDocumentPayload,
beforeHandleMessage: beforeHandleMessagePayload,
beforeBroadcastStateless: beforeBroadcastStatelessPayload,
beforeSync: beforeSyncPayload,
onStateless: onStatelessPayload,
onChange: onChangePayload,
onStoreDocument: onStoreDocumentPayload,
Expand Down Expand Up @@ -250,6 +253,28 @@ export interface beforeHandleMessagePayload {
connection: Connection
}

export interface beforeSyncPayload {
clientsCount: number,
context: any,
document: Document,
documentName: string,
connection: Connection,
/**
* The y-protocols/sync message type
* @example
* 0: SyncStep1
* 1: SyncStep2
* 2: YjsUpdate
*
* @see https://github.com/yjs/y-protocols/blob/master/sync.js#L13-L40
*/
type: number,
/**
* The payload of the y-sync message.
*/
payload: Uint8Array,
}

export interface beforeBroadcastStatelessPayload {
document: Document,
documentName: string,
Expand Down
Loading