Skip to content

Commit e33dbe2

Browse files
committed
Revert "fix: temporarily removes beforeSync callback (#919, partially reverts a6a7bcd). Making applySync async causes timing issues that leads to issues when using PermanentUserData. This can be reproduced by using the playground server with sqlite and a sleep(1000). Additionally, use Y.PermanentUserData, or try to read data from the ydoc in the onSynced event."
This reverts commit 7192664.
1 parent a46d9dd commit e33dbe2

File tree

2 files changed

+22
-29
lines changed

2 files changed

+22
-29
lines changed

packages/server/src/Connection.ts

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,7 @@ import type Document from "./Document.ts";
99
import { IncomingMessage } from "./IncomingMessage.ts";
1010
import { MessageReceiver } from "./MessageReceiver.ts";
1111
import { OutgoingMessage } from "./OutgoingMessage.ts";
12-
import type {
13-
beforeSyncPayload,
14-
onStatelessPayload,
15-
onTokenSyncPayload,
16-
} from "./types.ts";
12+
import type { beforeSyncPayload, onTokenSyncPayload, onStatelessPayload } from "./types.ts";
1713

1814
export class Connection {
1915
webSocket: WebSocket;
@@ -33,8 +29,7 @@ export class Connection {
3329
payload: Pick<beforeSyncPayload, "type" | "payload">,
3430
) => Promise.resolve(),
3531
statelessCallback: (payload: onStatelessPayload) => Promise.resolve(),
36-
onTokenSyncCallback: (payload: Partial<onTokenSyncPayload>) =>
37-
Promise.resolve(),
32+
onTokenSyncCallback: (payload: Partial<onTokenSyncPayload>) => Promise.resolve(),
3833
};
3934

4035
socketId: string;
@@ -112,7 +107,7 @@ export class Connection {
112107
return this;
113108
}
114109

115-
/**
110+
/**
116111
* Set a callback that will be triggered when on token sync message is received
117112
*/
118113
onTokenSyncCallback(
@@ -159,9 +154,7 @@ export class Connection {
159154
* Request current token from the client
160155
*/
161156
public requestToken(): void {
162-
const message = new OutgoingMessage(
163-
this.document.name,
164-
).writeTokenSyncRequest();
157+
const message = new OutgoingMessage(this.document.name).writeTokenSyncRequest();
165158

166159
this.send(message.toUint8Array());
167160
}
@@ -216,18 +209,18 @@ export class Connection {
216209
this.callbacks
217210
.beforeHandleMessage(this, data)
218211
.then(() => {
219-
try {
220-
new MessageReceiver(message).apply(this.document, this);
221-
} catch (e) {
222-
console.error(
223-
`closing connection ${this.socketId} (while handling ${documentName}) because of exception`,
224-
e,
225-
);
226-
this.close({
227-
code: "code" in e ? e.code : ResetConnection.code,
228-
reason: "reason" in e ? e.reason : ResetConnection.reason,
212+
new MessageReceiver(message)
213+
.apply(this.document, this)
214+
.catch((e: any) => {
215+
console.error(
216+
`closing connection ${this.socketId} (while handling ${documentName}) because of exception`,
217+
e,
218+
);
219+
this.close({
220+
code: "code" in e ? e.code : ResetConnection.code,
221+
reason: "reason" in e ? e.reason : ResetConnection.reason,
222+
});
229223
});
230-
}
231224
})
232225
.catch((e: any) => {
233226
console.error(

packages/server/src/MessageReceiver.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { AuthMessageType } from "@hocuspocus/common";
21
import * as decoding from "lib0/decoding";
32
import { readVarString } from "lib0/decoding";
43
import { applyAwarenessUpdate } from "y-protocols/awareness";
@@ -16,6 +15,7 @@ import type Document from "./Document.ts";
1615
import type { IncomingMessage } from "./IncomingMessage.ts";
1716
import { OutgoingMessage } from "./OutgoingMessage.ts";
1817
import { MessageType } from "./types.ts";
18+
import { AuthMessageType } from "@hocuspocus/common";
1919

2020
export class MessageReceiver {
2121
message: IncomingMessage;
@@ -27,7 +27,7 @@ export class MessageReceiver {
2727
this.defaultTransactionOrigin = defaultTransactionOrigin;
2828
}
2929

30-
public apply(
30+
public async apply(
3131
document: Document,
3232
connection?: Connection,
3333
reply?: (message: Uint8Array) => void,
@@ -40,7 +40,7 @@ export class MessageReceiver {
4040
case MessageType.Sync:
4141
case MessageType.SyncReply: {
4242
message.writeVarUint(MessageType.Sync);
43-
this.readSyncMessage(
43+
await this.readSyncMessage(
4444
message,
4545
document,
4646
connection,
@@ -65,7 +65,7 @@ export class MessageReceiver {
6565
break;
6666
}
6767
case MessageType.Awareness: {
68-
applyAwarenessUpdate(
68+
await applyAwarenessUpdate(
6969
document.awareness,
7070
message.readVarUint8Array(),
7171
connection?.webSocket,
@@ -111,7 +111,7 @@ export class MessageReceiver {
111111
token: message.readVarString(),
112112
});
113113
break;
114-
}
114+
}
115115
console.error(
116116
"Received an authentication message on a connection that is already fully authenticated. Probably your provider has been destroyed + recreated really fast.",
117117
);
@@ -126,7 +126,7 @@ export class MessageReceiver {
126126
}
127127
}
128128

129-
readSyncMessage(
129+
async readSyncMessage(
130130
message: IncomingMessage,
131131
document: Document,
132132
connection?: Connection,
@@ -136,7 +136,7 @@ export class MessageReceiver {
136136
const type = message.readVarUint();
137137

138138
if (connection) {
139-
connection.callbacks.beforeSync(connection, {
139+
await connection.callbacks.beforeSync(connection, {
140140
type,
141141
payload: message.peekVarUint8Array(),
142142
});

0 commit comments

Comments
 (0)