Skip to content

Commit 5192116

Browse files
committed
conat: got all tests passing again
1 parent 948483b commit 5192116

File tree

7 files changed

+86
-32
lines changed

7 files changed

+86
-32
lines changed

src/packages/backend/conat/test/core/core-stream-break.test.ts

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import {
1717
wait,
1818
delay,
1919
persistServer as setupPersistServer,
20+
setDefaultTimeouts,
2021
} from "@cocalc/backend/conat/test/setup";
2122

2223
beforeAll(before);
@@ -27,6 +28,9 @@ describe("stop persist server, create a client, create an ephemeral core-stream,
2728
let pclient;
2829
let persistServer;
2930

31+
// use much shorter timeout for this test of restarting persist server, which isn't a NORMAL thing.
32+
setDefaultTimeouts({ request: 750, publish: 750 });
33+
3034
it("close the persist server that was setup as part of before above", async () => {
3135
await setupPersistServer.end();
3236
});
@@ -59,7 +63,7 @@ describe("stop persist server, create a client, create an ephemeral core-stream,
5963

6064
await expect(async () => {
6165
await stream.publish("y", { timeout: 100 });
62-
}).rejects.toThrowError("no subscribers");
66+
}).rejects.toThrowError();
6367

6468
try {
6569
await stream.publish("y", { timeout: 100 });
@@ -68,9 +72,18 @@ describe("stop persist server, create a client, create an ephemeral core-stream,
6872
}
6973
});
7074

71-
it("starts persist server and can publish again", async () => {
75+
it("starts persist server and can eventually publish again", async () => {
7276
persistServer = initPersistServer({ client: pclient });
73-
await stream.publish("y");
77+
await wait({
78+
until: async () => {
79+
try {
80+
await stream.publish("y");
81+
return true;
82+
} catch {
83+
return false;
84+
}
85+
},
86+
});
7487
});
7588

7689
it("creates a dstream, publishes, sees it hasn't saved, starts persist server and sees save works again", async () => {

src/packages/backend/conat/test/core/core-stream.test.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,9 +270,8 @@ describe("test key:value delete", () => {
270270
});
271271

272272
it("also delete the empty key one", async () => {
273-
const n = stream.length;
274273
await stream2.deleteKv("");
275-
await wait({ until: () => stream.length > n });
274+
await wait({ until: async () => (await stream2.getKv("")) == undefined });
276275
expect(await stream2.getKv("")).toEqual(undefined);
277276
await wait({ until: () => stream.getKv("") === undefined });
278277
});

src/packages/backend/conat/test/setup.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import { tmpdir } from "node:os";
1515
import { join } from "path";
1616
export { wait } from "@cocalc/backend/conat/test/util";
1717
export { delay } from "awaiting";
18+
export { setDefaultTimeouts } from "@cocalc/conat/core/client";
1819

1920
const logger = getLogger("conat:test:setup");
2021

src/packages/conat/core/client.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -263,10 +263,22 @@ export type ClientOptions = Options & {
263263

264264
const INBOX_PREFIX = "_INBOX";
265265
const REPLY_HEADER = "CN-Reply";
266-
export const DEFAULT_REQUEST_TIMEOUT = 10000;
267-
export const DEFAULT_PUBLISH_TIMEOUT = 7500;
268266
const MAX_HEADER_SIZE = 100000;
269267

268+
export let DEFAULT_REQUEST_TIMEOUT = 7500;
269+
export let DEFAULT_PUBLISH_TIMEOUT = 7500;
270+
271+
export function setDefaultTimeouts({
272+
request = DEFAULT_REQUEST_TIMEOUT,
273+
publish = DEFAULT_PUBLISH_TIMEOUT,
274+
}: {
275+
request?: number;
276+
publish?: number;
277+
}) {
278+
DEFAULT_REQUEST_TIMEOUT = request;
279+
DEFAULT_PUBLISH_TIMEOUT = publish;
280+
}
281+
270282
export enum DataEncoding {
271283
MsgPack = 0,
272284
JsonCodec = 1,

src/packages/conat/persist/client.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,10 @@ export class PersistStreamClient extends EventEmitter {
229229
timeout,
230230
maxWait,
231231
});
232-
for await (const { data } of sub) {
232+
for await (const { data, headers } of sub) {
233+
if (headers?.error) {
234+
throw new ConatError(`${headers.error}`, { code: headers.code });
235+
}
233236
if (data == null || this.socket.state == "closed") {
234237
// done
235238
return;

src/packages/conat/persist/server.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ client = await require('@cocalc/backend/conat').conat(); s = await require('@coc
4040
4141
*/
4242

43-
import { type Client } from "@cocalc/conat/core/client";
43+
import { type Client, ConatError } from "@cocalc/conat/core/client";
4444
import {
4545
type ConatSocketServer,
4646
type ServerSocket,
@@ -94,6 +94,7 @@ export function server({
9494
subject: socket.subject,
9595
});
9696
let error = "";
97+
let errorCode: any = undefined;
9798
let changefeed = false;
9899
let storage: undefined | StorageOptions = undefined;
99100
let stream: undefined | PersistentStream = undefined;
@@ -111,7 +112,8 @@ export function server({
111112
startChangefeed({ socket, stream, messagesThresh });
112113
}
113114
} catch (err) {
114-
error = `${err};`;
115+
error = `${err}`;
116+
errorCode = err.code;
115117
}
116118
}
117119
});
@@ -127,7 +129,7 @@ export function server({
127129

128130
try {
129131
if (error) {
130-
throw Error(error);
132+
throw new ConatError(error, { code: errorCode });
131133
}
132134
if (storage === undefined || stream === undefined) {
133135
// this happens, e.g., when you restart both the persist server and the conat
@@ -239,7 +241,7 @@ export function server({
239241
async function getAll({ stream, mesg, request, messagesThresh }) {
240242
let seq = 0;
241243
const respond = (error?, messages?: StoredMessage[]) => {
242-
mesg.respondSync(messages, { headers: { error, seq } });
244+
mesg.respondSync(messages, { headers: { error, seq, code: error?.code } });
243245
seq += 1;
244246
};
245247

src/packages/conat/sync/core-stream.ts

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,20 @@ export class CoreStream<T = any> extends EventEmitter {
201201
start_seq: this._start_seq,
202202
noEmit: true,
203203
});
204-
this.configOptions = await this.config(this.configOptions);
204+
let d = 750;
205+
while (this.client != null) {
206+
try {
207+
this.configOptions = await this.config(this.configOptions);
208+
break;
209+
} catch (err) {
210+
if (err.code == 403) {
211+
// fatal permission error
212+
throw err;
213+
}
214+
d = Math.min(15000, d * 1.3);
215+
await delay(d);
216+
}
217+
}
205218
// NOTE: if we miss a message between getAllFromLeader and when we start listening,
206219
// then the sequence number will have a gap, and we'll immediately reconnect, starting
207220
// at the right point. So no data can possibly be lost.
@@ -245,27 +258,38 @@ export class CoreStream<T = any> extends EventEmitter {
245258
if (this.storage == null) {
246259
throw Error("bug -- storage must be set");
247260
}
248-
if (this.client == null) {
249-
return;
250-
}
251-
// console.log("get persistent stream", { start_seq });
252-
const sub = await this.persistClient.getAll({
253-
start_seq,
254-
});
255-
while (true) {
256-
const { value, done } = await sub.next();
257-
// console.log("got ", { value, done });
258-
if (done) {
259-
return;
260-
}
261-
const messages = value as StoredMessage[];
262-
const seq = this.processPersistentMessages(messages, noEmit);
263-
if (seq != null) {
264-
// we update start_seq in case we need to try again
265-
start_seq = seq! + 1;
261+
let d = 750;
262+
while (this.client != null) {
263+
try {
264+
// console.log("get persistent stream", { start_seq });
265+
const sub = await this.persistClient.getAll({
266+
start_seq,
267+
});
268+
// console.log("got sub");
269+
while (true) {
270+
const { value, done } = await sub.next();
271+
if (done) {
272+
return;
273+
}
274+
const messages = value as StoredMessage[];
275+
const seq = this.processPersistentMessages(messages, noEmit);
276+
if (seq != null) {
277+
// we update start_seq in case we need to try again
278+
start_seq = seq! + 1;
279+
}
280+
}
281+
} catch (err) {
282+
// console.log(
283+
// `WARNING: getAllFromPersist - failed -- ${err}, code=${err.code}`,
284+
// );
285+
if (err.code == 403) {
286+
// fatal permission error
287+
throw err;
288+
}
289+
d = Math.min(15000, d * 1.3);
290+
await delay(d);
266291
}
267292
}
268-
// console.log("finished getAll");
269293
};
270294

271295
private processPersistentMessages = (

0 commit comments

Comments
 (0)