-
Notifications
You must be signed in to change notification settings - Fork 36
Expand file tree
/
Copy pathchatroom.service.ts
More file actions
101 lines (89 loc) · 2.46 KB
/
chatroom.service.ts
File metadata and controls
101 lines (89 loc) · 2.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import {
type Context,
type EagerCollection,
type Json,
type Mapper,
type Resource,
type AnySkipService,
type Values,
} from "@skipruntime/core";
import { KafkaExternalService } from "@skip-adapter/kafka";
type Message = {
id: number;
author: string;
body: string;
timestamp: number;
};
type Like = {
id: number;
message_id: number;
author: string;
};
type LikedMessage = Message & { likedBy: string[] };
type ResourceInputs = {
messages: EagerCollection<number, Message>;
likersByMessage: EagerCollection<number, string>;
};
const kafka = new KafkaExternalService(
{
clientId: "skip-chatroom",
brokers: [{ host: "kafka", port: 19092 }],
},
(msg) => {
const value = JSON.parse(msg.value);
return [[-Number(value.id), value]];
},
);
class GroupByMessage implements Mapper<number, Like, number, string> {
mapEntry(_id: number, likes: Values<Like>): Iterable<[number, string]> {
return likes.toArray().map((like) => [-like.message_id, like.author]);
}
}
class JoinUniqueLikers
implements Mapper<number, Message, number, LikedMessage>
{
constructor(private likersByMessage: EagerCollection<number, string>) {}
mapEntry(
id: number,
messages: Values<Message>,
): Iterable<[number, LikedMessage]> {
const msg: Message = messages.getUnique();
const uniqueLikers = Array.from(new Set(this.likersByMessage.getArray(id)));
return [[id, { ...msg, likedBy: uniqueLikers }]];
}
}
class MessagesResource implements Resource<ResourceInputs> {
private limit: number;
constructor(param: Json) {
if (typeof param == "number") this.limit = param;
else this.limit = 25;
}
instantiate(
collections: ResourceInputs,
): EagerCollection<number, LikedMessage> {
return collections.messages
.take(this.limit)
.map(JoinUniqueLikers, collections.likersByMessage);
}
}
export const service: AnySkipService = {
inputs: {},
resources: { messages: MessagesResource },
externalServices: { kafka },
createGraph(_: {}, context: Context): ResourceInputs {
const messages = context.useExternalResource<number, Message>({
service: "kafka",
identifier: "skip-chatroom-messages",
});
const likes = context.useExternalResource<number, Like>({
service: "kafka",
identifier: "skip-chatroom-likes",
});
const likersByMessage: EagerCollection<number, string> =
likes.map(GroupByMessage);
return {
likersByMessage,
messages,
};
},
};