-
Notifications
You must be signed in to change notification settings - Fork 89
/
Copy pathAbstractChangesStream.ts
40 lines (34 loc) · 1.16 KB
/
AbstractChangesStream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import {
ImplDecorator,
Inject,
QualifierImplDecoratorUtil,
} from '@eggjs/tegg';
import { RegistryType } from '../../../common/enum/Registry.js';
import { Registry } from '../../../core/entity/Registry.js';
import {
EggHttpClient,
EggLogger,
} from 'egg';
export const CHANGE_STREAM_ATTRIBUTE = 'CHANGE_STREAM_ATTRIBUTE';
export type ChangesStreamChange = {
seq: string;
fullname: string;
};
export abstract class AbstractChangeStream {
@Inject()
protected logger: EggLogger;
@Inject()
protected httpclient: EggHttpClient;
abstract getInitialSince(registry: Registry): Promise<string>;
abstract fetchChanges(registry: Registry, since: string): AsyncGenerator<ChangesStreamChange>;
getChangesStreamUrl(registry: Registry, since: string, limit?: number): string {
const url = new URL(registry.changeStream);
url.searchParams.set('since', since);
if (limit) {
url.searchParams.set('limit', String(limit));
}
return url.toString();
}
}
export const RegistryChangesStream: ImplDecorator<AbstractChangeStream, typeof RegistryType> =
QualifierImplDecoratorUtil.generatorDecorator(AbstractChangeStream, CHANGE_STREAM_ATTRIBUTE);