Skip to content

Commit 2199ac3

Browse files
authored
Use Hermes WebSocket instead of PubSub (#1112)
## Pull Request Etiquette ### Checklist - [x] Tests have been added in relevant areas - [x] Corresponding changes made to the documentation (README.adoc) <!-- (if irrelevant check the box too) --> ### Type of change Internal change ## Description Following PubSub deprecation, switch to new WebSocket. Hermes seems to be a wrapper of PubSub for now. Closes #960
1 parent 93a635c commit 2199ac3

202 files changed

Lines changed: 2612 additions & 580 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
/authentication
55
/build
66
/out
7+
/logs
78
/streamers
89
/target
910
bin/
@@ -12,6 +13,7 @@ bin/
1213
/.github/antora-docs
1314

1415
*.iml
16+
.env
1517
/analytics.db
1618
/config*.json
1719
/log4j2.xml

miner/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1+
/authentication
12
/build

miner/build.gradle.kts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,5 +164,4 @@ jib {
164164

165165
gitProperties {
166166
failOnNoGitDirectory = false
167-
dotGitDirectory.set(layout.settingsDirectory.dir(".git"))
168167
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package fr.rakambda.channelpointsminer.miner.api.hermes;
2+
3+
public interface ITwitchHermesMessageListener{
4+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package fr.rakambda.channelpointsminer.miner.api.hermes;
2+
3+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.ITwitchHermesWebSocketResponse;
4+
import org.jetbrains.annotations.NotNull;
5+
import org.jetbrains.annotations.Nullable;
6+
7+
public interface ITwitchHermesWebSocketListener {
8+
void onWebSocketMessage(@NotNull ITwitchHermesWebSocketResponse message);
9+
10+
void onWebSocketClosed(@NotNull TwitchHermesWebSocketClient client, int code, @Nullable String reason, boolean remote);
11+
}
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
package fr.rakambda.channelpointsminer.miner.api.hermes;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.core.type.TypeReference;
5+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.request.AuthenticateRequest;
6+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.request.ITwitchHermesWebSocketRequest;
7+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.request.SubscribeRequest;
8+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.request.UnsubscribeRequest;
9+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.request.subscribe.PubSubSubscribeType;
10+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.AuthenticateResponse;
11+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.ITwitchHermesWebSocketResponse;
12+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.KeepAliveResponse;
13+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.NotificationResponse;
14+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.SubscribeResponse;
15+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.UnsubscribeResponse;
16+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.WelcomeResponse;
17+
import fr.rakambda.channelpointsminer.miner.api.passport.TwitchLogin;
18+
import fr.rakambda.channelpointsminer.miner.api.pubsub.data.request.topic.Topic;
19+
import fr.rakambda.channelpointsminer.miner.factory.TimeFactory;
20+
import fr.rakambda.channelpointsminer.miner.log.LogContext;
21+
import fr.rakambda.channelpointsminer.miner.util.json.JacksonUtils;
22+
import lombok.Getter;
23+
import lombok.extern.log4j.Log4j2;
24+
import org.java_websocket.WebSocket;
25+
import org.java_websocket.client.WebSocketClient;
26+
import org.java_websocket.framing.Framedata;
27+
import org.java_websocket.handshake.ServerHandshake;
28+
import org.jetbrains.annotations.NotNull;
29+
import java.net.URI;
30+
import java.time.Instant;
31+
import java.util.Collection;
32+
import java.util.HashMap;
33+
import java.util.Map;
34+
import java.util.Objects;
35+
import java.util.Optional;
36+
import java.util.UUID;
37+
import java.util.concurrent.ConcurrentLinkedQueue;
38+
import static org.java_websocket.framing.CloseFrame.GOING_AWAY;
39+
40+
@Log4j2
41+
public class TwitchHermesWebSocketClient extends WebSocketClient{
42+
private final Collection<ITwitchHermesWebSocketListener> listeners;
43+
@Getter
44+
private final String uuid;
45+
@Getter
46+
private final Map<String, SubscribeRequest> subscribeRequests;
47+
48+
@Getter
49+
private Instant lastPong;
50+
51+
public TwitchHermesWebSocketClient(@NotNull URI uri){
52+
super(uri);
53+
uuid = UUID.randomUUID().toString();
54+
subscribeRequests = new HashMap<>();
55+
56+
setConnectionLostTimeout(0);
57+
listeners = new ConcurrentLinkedQueue<>();
58+
lastPong = Instant.EPOCH;
59+
60+
addHeader("Origin", "https://www.twitch.tv");
61+
}
62+
63+
@Override
64+
public void onOpen(ServerHandshake serverHandshake){
65+
try(var ignored = LogContext.empty().withSocketId(uuid)){
66+
log.info("Hermes WebSocket opened");
67+
}
68+
}
69+
70+
@Override
71+
public void onMessage(String messageStr){
72+
try(var logContext = LogContext.empty().withSocketId(uuid)){
73+
log.trace("Received Hermes WebSocket message: {}", messageStr.strip());
74+
var message = JacksonUtils.read(messageStr, new TypeReference<ITwitchHermesWebSocketResponse>(){});
75+
log.trace("Parsed Hermes message: {}", message);
76+
77+
switch(message){
78+
case WelcomeResponse welcomeResponse -> log.info("Received Hermes welcome with keep alive of {} seconds", welcomeResponse.getWelcome().getKeepaliveSec());
79+
case AuthenticateResponse authenticateResponse -> {
80+
if(authenticateResponse.hasError()){
81+
log.error("Received Hermes error authentication {}", authenticateResponse);
82+
close(GOING_AWAY, "Invalid credentials");
83+
}
84+
}
85+
case KeepAliveResponse ignored -> onPong();
86+
case SubscribeResponse subscribeResponse -> log.debug("Received Hermes subscribe response with status {}", subscribeResponse.getSubscribeResponse().getResult());
87+
case UnsubscribeResponse unsubscribeResponse -> {
88+
log.debug("Received Hermes subscribe response with status {}", unsubscribeResponse.getUnsubscribeResponse().getResult());
89+
subscribeRequests.remove(unsubscribeResponse.getUnsubscribeResponse().getSubscription().getId());
90+
}
91+
case NotificationResponse notificationResponse -> log.debug("Received Hermes notification of type {}", notificationResponse.getNotification().getClass().getSimpleName());
92+
default -> {
93+
}
94+
}
95+
listeners.forEach(listener -> listener.onWebSocketMessage(message));
96+
}
97+
catch(Exception e){
98+
log.error("Failed to handle Hermes WebSocket message {}", messageStr, e);
99+
}
100+
}
101+
102+
@Override
103+
public void onClose(int code, String reason, boolean remote){
104+
try(var ignored = LogContext.empty().withSocketId(uuid)){
105+
log.info("Hermes WebSocket closed with code {}, from host {}, reason {}", code, remote, reason);
106+
listeners.forEach(l -> l.onWebSocketClosed(this, code, reason, remote));
107+
}
108+
}
109+
110+
@Override
111+
public void onError(Exception e){
112+
log.error("Error from Hermes WebSocket", e);
113+
}
114+
115+
private void onPong(){
116+
lastPong = TimeFactory.now();
117+
}
118+
119+
public void authenticate(@NotNull TwitchLogin twitchLogin){
120+
send(new AuthenticateRequest(twitchLogin.getAccessToken()));
121+
}
122+
123+
public void send(@NotNull ITwitchHermesWebSocketRequest request){
124+
try(var ignored = LogContext.empty().withSocketId(uuid)){
125+
var data = JacksonUtils.writeAsString(request);
126+
log.trace("Sending Hermes WebSocket message: {}", data);
127+
send(data);
128+
}
129+
catch(JsonProcessingException e){
130+
log.error("Failed to convert Hermes WebSocket message to json", e);
131+
}
132+
}
133+
134+
@Override
135+
public void onWebsocketPong(WebSocket conn, Framedata f){
136+
onPong();
137+
}
138+
139+
public void addListener(@NotNull ITwitchHermesWebSocketListener listener){
140+
listeners.add(listener);
141+
}
142+
143+
public boolean isPubSubTopicListened(@NotNull Topic topic){
144+
return subscribeRequests.values().stream()
145+
.map(SubscribeRequest::getSubscribe)
146+
.filter(PubSubSubscribeType.class::isInstance)
147+
.map(PubSubSubscribeType.class::cast)
148+
.anyMatch(t -> Objects.equals(t.getPubsub().getTopic(), topic.getValue()));
149+
}
150+
151+
public Optional<String> listenPubSubTopic(@NotNull Topic topic){
152+
try(var ignored = LogContext.empty().withSocketId(uuid)){
153+
var request = SubscribeRequest.pubsub(topic.getValue());
154+
subscribeRequests.put(request.getSubscribe().getId(), request);
155+
send(request);
156+
return Optional.of(request.getSubscribe().getId());
157+
}
158+
}
159+
160+
public void removeSubscription(@NotNull String id){
161+
try(var ignored = LogContext.empty().withSocketId(uuid)){
162+
if(subscribeRequests.containsKey(id)){
163+
send(new UnsubscribeRequest(id));
164+
}
165+
}
166+
}
167+
168+
public int getSubscriptionCount(){
169+
return subscribeRequests.size();
170+
}
171+
}
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
package fr.rakambda.channelpointsminer.miner.api.hermes;
2+
3+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.ITwitchHermesWebSocketResponse;
4+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.NotificationResponse;
5+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.UnsubscribeResponse;
6+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.notification.PubSubNotificationType;
7+
import fr.rakambda.channelpointsminer.miner.api.passport.TwitchLogin;
8+
import fr.rakambda.channelpointsminer.miner.api.pubsub.ITwitchPubSubMessageListener;
9+
import fr.rakambda.channelpointsminer.miner.api.pubsub.data.request.topic.Topic;
10+
import fr.rakambda.channelpointsminer.miner.factory.TimeFactory;
11+
import fr.rakambda.channelpointsminer.miner.factory.TwitchWebSocketClientFactory;
12+
import lombok.extern.log4j.Log4j2;
13+
import org.java_websocket.client.WebSocketClient;
14+
import org.jetbrains.annotations.NotNull;
15+
import org.jetbrains.annotations.Nullable;
16+
import java.util.Collection;
17+
import java.util.Map;
18+
import java.util.Objects;
19+
import java.util.Queue;
20+
import java.util.concurrent.ConcurrentHashMap;
21+
import java.util.concurrent.ConcurrentLinkedQueue;
22+
import static java.time.temporal.ChronoUnit.MINUTES;
23+
import static org.java_websocket.framing.CloseFrame.ABNORMAL_CLOSE;
24+
import static org.java_websocket.framing.CloseFrame.NORMAL;
25+
26+
@Log4j2
27+
public class TwitchHermesWebSocketPool implements AutoCloseable, ITwitchHermesWebSocketListener{
28+
private static final int SOCKET_TIMEOUT_MINUTES = 5;
29+
30+
private final int maxSubscriptionPerClient;
31+
private final TwitchLogin twitchLogin;
32+
33+
private final Collection<TwitchHermesWebSocketClient> clients;
34+
private final Collection<ITwitchHermesMessageListener> listeners;
35+
private final Collection<ITwitchPubSubMessageListener> pubSubListeners;
36+
private final Queue<Topic> pendingTopics;
37+
38+
private final Map<String, fr.rakambda.channelpointsminer.miner.api.pubsub.data.request.topic.Topic> topics;
39+
40+
public TwitchHermesWebSocketPool(int maxSubscriptionPerClient, @NotNull TwitchLogin twitchLogin){
41+
this.maxSubscriptionPerClient = maxSubscriptionPerClient;
42+
this.twitchLogin = twitchLogin;
43+
44+
clients = new ConcurrentLinkedQueue<>();
45+
listeners = new ConcurrentLinkedQueue<>();
46+
pubSubListeners = new ConcurrentLinkedQueue<>();
47+
pendingTopics = new ConcurrentLinkedQueue<>();
48+
topics = new ConcurrentHashMap<>();
49+
}
50+
51+
public void ping(){
52+
checkStaleConnection();
53+
}
54+
55+
public void checkStaleConnection(){
56+
clients.stream()
57+
.filter(client -> TimeFactory.now().isAfter(client.getLastPong().plus(SOCKET_TIMEOUT_MINUTES, MINUTES)))
58+
.forEach(client -> client.close(ABNORMAL_CLOSE, "Timeout reached"));
59+
}
60+
61+
public void removePubSubTopic(@NotNull Topic topic){
62+
var subscriptionId = topics.entrySet().stream().filter(e -> Objects.equals(e.getValue(), topic)).findFirst();
63+
if(subscriptionId.isEmpty()){
64+
return;
65+
}
66+
clients.stream()
67+
.filter(client -> client.isPubSubTopicListened(topic))
68+
.forEach(client -> client.removeSubscription(subscriptionId.get().getKey()));
69+
}
70+
71+
public void addListener(@NotNull ITwitchHermesMessageListener listener){
72+
listeners.add(listener);
73+
}
74+
75+
public void addPubSubListener(@NotNull ITwitchPubSubMessageListener listener){
76+
pubSubListeners.add(listener);
77+
}
78+
79+
@Override
80+
public void onWebSocketMessage(@NotNull ITwitchHermesWebSocketResponse response){
81+
if(response instanceof UnsubscribeResponse u){
82+
topics.remove(u.getUnsubscribeResponse().getSubscription().getId());
83+
}
84+
if(response instanceof NotificationResponse n){
85+
if(n.getNotification() instanceof PubSubNotificationType t){
86+
if(Objects.isNull(t.getPubsub())){
87+
return;
88+
}
89+
var topic = topics.get(n.getNotification().getSubscription().getId());
90+
pubSubListeners.forEach(l -> l.onTwitchMessage(topic, t.getPubsub()));
91+
}
92+
}
93+
}
94+
95+
@Override
96+
public void onWebSocketClosed(@NotNull TwitchHermesWebSocketClient client, int code, @Nullable String reason, boolean remote){
97+
clients.remove(client);
98+
if(code != NORMAL){
99+
pendingTopics.addAll(client.getSubscribeRequests().keySet().stream().map(topics::get).filter(Objects::nonNull).toList());
100+
}
101+
}
102+
103+
public void listenPendingPubSubTopics(){
104+
try{
105+
Topic topic;
106+
while(Objects.nonNull(topic = pendingTopics.poll())){
107+
listenPubSubTopic(topic);
108+
}
109+
}
110+
catch(RuntimeException e){
111+
log.error("Failed to join pending subscriptions", e);
112+
}
113+
}
114+
115+
public void listenPubSubTopic(@NotNull Topic topic){
116+
if(isTopicListened(topic)){
117+
log.debug("Topic {} is already being listened", topics);
118+
return;
119+
}
120+
121+
try{
122+
getAvailableClient().listenPubSubTopic(topic).ifPresent(subscriptionId -> topics.put(subscriptionId, topic));
123+
}
124+
catch(RuntimeException e){
125+
pendingTopics.add(topic);
126+
throw e;
127+
}
128+
}
129+
130+
private boolean isTopicListened(@NotNull Topic topic){
131+
return clients.stream().anyMatch(client -> client.isPubSubTopicListened(topic));
132+
}
133+
134+
@NotNull
135+
private TwitchHermesWebSocketClient getAvailableClient(){
136+
return clients.stream()
137+
.filter(client -> !client.isClosing() && !client.isClosed())
138+
.filter(client -> client.getSubscriptionCount() < maxSubscriptionPerClient)
139+
.findAny()
140+
.orElseGet(() -> {
141+
var client = createNewClient();
142+
client.authenticate(twitchLogin);
143+
return client;
144+
});
145+
}
146+
147+
@NotNull
148+
public TwitchHermesWebSocketClient createNewClient(){
149+
try{
150+
var client = TwitchWebSocketClientFactory.createHermesClient();
151+
log.debug("Created Hermes WebSocket client with uuid {}", client.getUuid());
152+
client.addListener(this);
153+
client.connectBlocking();
154+
clients.add(client);
155+
return client;
156+
}
157+
catch(Exception e){
158+
log.error("Failed to create new Hermes WebSocket");
159+
throw new RuntimeException(e);
160+
}
161+
}
162+
163+
@Override
164+
public void close(){
165+
clients.forEach(WebSocketClient::close);
166+
}
167+
168+
public int getClientCount(){
169+
return clients.size();
170+
}
171+
}

0 commit comments

Comments
 (0)