Skip to content

Commit 3e13ef0

Browse files
committed
[Hermes] Reconnect through reconnect URL + Error event
1 parent a316419 commit 3e13ef0

21 files changed

Lines changed: 250 additions & 64 deletions

miner/src/main/java/fr/rakambda/channelpointsminer/miner/api/hermes/TwitchHermesWebSocketClient.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.WelcomeResponse;
1818
import fr.rakambda.channelpointsminer.miner.api.passport.TwitchLogin;
1919
import fr.rakambda.channelpointsminer.miner.api.pubsub.data.request.topic.Topic;
20+
import fr.rakambda.channelpointsminer.miner.event.impl.ErrorEvent;
21+
import fr.rakambda.channelpointsminer.miner.event.manager.IEventManager;
2022
import fr.rakambda.channelpointsminer.miner.factory.TimeFactory;
2123
import fr.rakambda.channelpointsminer.miner.log.LogContext;
2224
import fr.rakambda.channelpointsminer.miner.util.json.JacksonUtils;
@@ -37,6 +39,7 @@
3739
import java.util.UUID;
3840
import java.util.concurrent.ConcurrentLinkedQueue;
3941
import static org.java_websocket.framing.CloseFrame.GOING_AWAY;
42+
import static org.java_websocket.framing.CloseFrame.NORMAL;
4043

4144
@Log4j2
4245
public class TwitchHermesWebSocketClient extends WebSocketClient{
@@ -45,12 +48,15 @@ public class TwitchHermesWebSocketClient extends WebSocketClient{
4548
private final String uuid;
4649
@Getter
4750
private final Map<String, SubscribeRequest> subscribeRequests;
51+
@NotNull
52+
private final IEventManager eventManager;
4853

4954
@Getter
5055
private Instant lastPong;
5156

52-
public TwitchHermesWebSocketClient(@NotNull URI uri){
57+
public TwitchHermesWebSocketClient(@NotNull URI uri, @NotNull IEventManager eventManager){
5358
super(uri);
59+
this.eventManager = eventManager;
5460
uuid = UUID.randomUUID().toString();
5561
subscribeRequests = new HashMap<>();
5662

@@ -86,13 +92,18 @@ public void onMessage(String messageStr){
8692
case KeepAliveResponse ignored -> onPong();
8793
case SubscribeResponse subscribeResponse -> log.debug("Received Hermes subscribe response with status {}", subscribeResponse.getSubscribeResponse().getResult());
8894
case UnsubscribeResponse unsubscribeResponse -> {
89-
log.debug("Received Hermes subscribe response with status {}", unsubscribeResponse.getUnsubscribeResponse().getResult());
95+
log.debug("Received Hermes unsubscribe response with status {}", unsubscribeResponse.getUnsubscribeResponse().getResult());
9096
subscribeRequests.remove(unsubscribeResponse.getUnsubscribeResponse().getSubscription().getId());
9197
}
9298
case NotificationResponse notificationResponse -> log.debug("Received Hermes notification of type {}", notificationResponse.getNotification().getClass().getSimpleName());
93-
case ReconnectResponse ignored -> {
94-
log.warn("Received Hermes reconnect response, TODO: see if any field is useful : {}", messageStr);
95-
close(GOING_AWAY);
99+
case ReconnectResponse reconnectResponse -> {
100+
if(Objects.nonNull(reconnectResponse.getReconnect()) && Objects.nonNull(reconnectResponse.getReconnect().getUrl())){
101+
close(NORMAL);
102+
}
103+
else{
104+
log.warn("Received Hermes reconnect response without a reconnect URL");
105+
close(GOING_AWAY);
106+
}
96107
}
97108
default -> {
98109
}
@@ -101,6 +112,7 @@ public void onMessage(String messageStr){
101112
}
102113
catch(Exception e){
103114
log.error("Failed to handle Hermes WebSocket message {}", messageStr, e);
115+
eventManager.onEvent(new ErrorEvent("Hermes API", "Failed to handle Hermes WebSocket message %s".formatted(messageStr), e));
104116
}
105117
}
106118

miner/src/main/java/fr/rakambda/channelpointsminer/miner/api/hermes/TwitchHermesWebSocketPool.java

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@
22

33
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.ITwitchHermesWebSocketResponse;
44
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.NotificationResponse;
5+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.ReconnectResponse;
56
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.UnsubscribeResponse;
67
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.notification.PubSubNotificationType;
78
import fr.rakambda.channelpointsminer.miner.api.passport.TwitchLogin;
89
import fr.rakambda.channelpointsminer.miner.api.pubsub.ITwitchPubSubMessageListener;
910
import fr.rakambda.channelpointsminer.miner.api.pubsub.data.request.topic.Topic;
11+
import fr.rakambda.channelpointsminer.miner.event.impl.ErrorEvent;
12+
import fr.rakambda.channelpointsminer.miner.event.manager.IEventManager;
1013
import fr.rakambda.channelpointsminer.miner.factory.TimeFactory;
1114
import fr.rakambda.channelpointsminer.miner.factory.TwitchWebSocketClientFactory;
1215
import lombok.extern.log4j.Log4j2;
@@ -29,6 +32,7 @@ public class TwitchHermesWebSocketPool implements AutoCloseable, ITwitchHermesWe
2932

3033
private final int maxSubscriptionPerClient;
3134
private final TwitchLogin twitchLogin;
35+
private final IEventManager eventManager;
3236

3337
private final Collection<TwitchHermesWebSocketClient> clients;
3438
private final Collection<ITwitchHermesMessageListener> listeners;
@@ -37,9 +41,10 @@ public class TwitchHermesWebSocketPool implements AutoCloseable, ITwitchHermesWe
3741

3842
private final Map<String, fr.rakambda.channelpointsminer.miner.api.pubsub.data.request.topic.Topic> topics;
3943

40-
public TwitchHermesWebSocketPool(int maxSubscriptionPerClient, @NotNull TwitchLogin twitchLogin){
44+
public TwitchHermesWebSocketPool(int maxSubscriptionPerClient, @NotNull TwitchLogin twitchLogin, @NotNull IEventManager eventManager){
4145
this.maxSubscriptionPerClient = maxSubscriptionPerClient;
4246
this.twitchLogin = twitchLogin;
47+
this.eventManager = eventManager;
4348

4449
clients = new ConcurrentLinkedQueue<>();
4550
listeners = new ConcurrentLinkedQueue<>();
@@ -81,12 +86,28 @@ public void onWebSocketMessage(@NotNull ITwitchHermesWebSocketResponse response)
8186
if(response instanceof UnsubscribeResponse u){
8287
topics.remove(u.getUnsubscribeResponse().getSubscription().getId());
8388
}
89+
if(response instanceof ReconnectResponse r){
90+
if(Objects.nonNull(r.getReconnect()) && Objects.nonNull(r.getReconnect().getUrl())){
91+
try{
92+
createReconnectClient(r.getReconnect().getUrl()); // TODO do the subscription ids change ?
93+
}
94+
catch(Exception e){
95+
eventManager.onEvent(new ErrorEvent("Hermes API", "Failed to reconnect client with reconnect URL", e));
96+
}
97+
}
98+
}
8499
if(response instanceof NotificationResponse n){
85100
if(n.getNotification() instanceof PubSubNotificationType t){
86101
if(Objects.isNull(t.getPubsub())){
87102
return;
88103
}
89-
var topic = topics.get(n.getNotification().getSubscription().getId());
104+
var subscriptionId = n.getNotification().getSubscription().getId();
105+
var topic = topics.get(subscriptionId);
106+
if(Objects.isNull(topic)){
107+
log.error("Received Hermes PubSub message for unknown topic from subscription id {}", subscriptionId);
108+
eventManager.onEvent(new ErrorEvent("Hermes API", "Received PubSub message for unknown topic from subscription id %s".formatted(subscriptionId)));
109+
return;
110+
}
90111
pubSubListeners.forEach(l -> l.onTwitchMessage(topic, t.getPubsub()));
91112
}
92113
}
@@ -110,6 +131,7 @@ public void listenPendingPubSubTopics(){
110131
}
111132
catch(RuntimeException e){
112133
log.error("Failed to join pending subscriptions", e);
134+
eventManager.onEvent(new ErrorEvent("Hermes API", "Failed to join pending subscriptions"));
113135
}
114136
}
115137

@@ -124,6 +146,7 @@ public void listenPubSubTopic(@NotNull Topic topic){
124146
}
125147
catch(RuntimeException e){
126148
pendingTopics.add(topic);
149+
eventManager.onEvent(new ErrorEvent("Hermes API", "Failed to listen to PubSub topic", e));
127150
throw e;
128151
}
129152
}
@@ -148,8 +171,8 @@ private TwitchHermesWebSocketClient getAvailableClient(){
148171
@NotNull
149172
public TwitchHermesWebSocketClient createNewClient(){
150173
try{
151-
var client = TwitchWebSocketClientFactory.createHermesClient();
152-
log.debug("Created Hermes WebSocket client with uuid {}", client.getUuid());
174+
var client = TwitchWebSocketClientFactory.createHermesClient(eventManager);
175+
log.info("Created Hermes WebSocket client with uuid {}", client.getUuid());
153176
client.addListener(this);
154177
client.connectBlocking();
155178
clients.add(client);
@@ -161,6 +184,22 @@ public TwitchHermesWebSocketClient createNewClient(){
161184
}
162185
}
163186

187+
@NotNull
188+
private TwitchHermesWebSocketClient createReconnectClient(@NotNull String reconnectUrl){
189+
try{
190+
var client = TwitchWebSocketClientFactory.createHermesClient(reconnectUrl, eventManager);
191+
log.info("Created (reconnect) Hermes WebSocket client with uuid {}", client.getUuid());
192+
client.addListener(this);
193+
client.connectBlocking();
194+
clients.add(client);
195+
return client;
196+
}
197+
catch(Exception e){
198+
log.error("Failed to create new (reconnect) Hermes WebSocket");
199+
throw new RuntimeException(e);
200+
}
201+
}
202+
164203
@Override
165204
public void close(){
166205
clients.forEach(WebSocketClient::close);

miner/src/main/java/fr/rakambda/channelpointsminer/miner/api/hermes/data/response/ReconnectResponse.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,17 @@
1717
@AllArgsConstructor
1818
@ToString(callSuper = true)
1919
public class ReconnectResponse extends ITwitchHermesWebSocketResponse{
20-
// TODO what fields are present ?
20+
@JsonProperty("reconnect")
21+
private Reconnect reconnect;
22+
23+
@Getter
24+
@NoArgsConstructor
25+
@AllArgsConstructor
26+
@EqualsAndHashCode
27+
@ToString
28+
@Builder
29+
public static class Reconnect{
30+
@JsonProperty("url")
31+
private String url;
32+
}
2133
}

miner/src/main/java/fr/rakambda/channelpointsminer/miner/event/EventVariableKey.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,6 @@ public class EventVariableKey{
2424
public static final String STREAMER_URL = "streamer_url";
2525
public static final String STREAMER_PROFILE_PICTURE_URL = "streamer_profile_picture_url";
2626
public static final String VERSION = "version";
27+
public static final String CATEGORY = "category";
28+
public static final String THROWABLE = "throwable";
2729
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package fr.rakambda.channelpointsminer.miner.event.impl;
2+
3+
import fr.rakambda.channelpointsminer.miner.event.AbstractLoggableEvent;
4+
import fr.rakambda.channelpointsminer.miner.event.EventVariableKey;
5+
import fr.rakambda.channelpointsminer.miner.factory.TimeFactory;
6+
import lombok.EqualsAndHashCode;
7+
import lombok.ToString;
8+
import org.apache.commons.lang3.exception.ExceptionUtils;
9+
import org.jetbrains.annotations.NotNull;
10+
import org.jetbrains.annotations.Nullable;
11+
import java.io.PrintWriter;
12+
import java.io.StringWriter;
13+
import java.time.Instant;
14+
import java.util.Objects;
15+
16+
@EqualsAndHashCode(callSuper = true)
17+
@ToString
18+
public class ErrorEvent extends AbstractLoggableEvent{
19+
private final String category;
20+
private final String message;
21+
private final Throwable throwable;
22+
23+
public ErrorEvent(@NotNull String category, @NotNull String message, @Nullable Throwable throwable, @NotNull Instant instant){
24+
super(instant);
25+
this.category = category;
26+
this.message = message;
27+
this.throwable = throwable;
28+
}
29+
30+
public ErrorEvent(@NotNull String category, @NotNull String message){
31+
this(category, message, null, TimeFactory.now());
32+
}
33+
34+
public ErrorEvent(@NotNull String category, @NotNull String message, @Nullable Throwable throwable){
35+
this(category, message, throwable, TimeFactory.now());
36+
}
37+
38+
@Override
39+
@NotNull
40+
public String getConsoleLogFormat(){
41+
return "Error received : [{category}] {message} {throwable}";
42+
}
43+
44+
@Override
45+
public String lookup(String key){
46+
if(EventVariableKey.MESSAGE.equals(key)){
47+
return message;
48+
}
49+
if(EventVariableKey.CATEGORY.equals(key)){
50+
return category;
51+
}
52+
if(EventVariableKey.THROWABLE.equals(key)){
53+
return Objects.isNull(throwable) ? "" : ExceptionUtils.getStackTrace(throwable);
54+
}
55+
return super.lookup(key);
56+
}
57+
58+
@Override
59+
@NotNull
60+
public String getDefaultFormat(){
61+
return "[{username}] {emoji} : Error received : [{category}] {message} {throwable}";
62+
}
63+
64+
@Override
65+
@NotNull
66+
protected String getColor(){
67+
return COLOR_WARN;
68+
}
69+
70+
@Override
71+
@NotNull
72+
protected String getEmoji(){
73+
return "😵";
74+
}
75+
}

miner/src/main/java/fr/rakambda/channelpointsminer/miner/factory/ApiFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ public static IVersionProvider createVersionProvider(@NotNull VersionProvider ve
199199
}
200200

201201
@NotNull
202-
public static TwitchHermesWebSocketPool createHermesWebSocketPool(@NotNull TwitchLogin twitchLogin){
203-
return new TwitchHermesWebSocketPool(50, twitchLogin);
202+
public static TwitchHermesWebSocketPool createHermesWebSocketPool(@NotNull TwitchLogin twitchLogin, @NotNull IEventManager eventManager){
203+
return new TwitchHermesWebSocketPool(50, twitchLogin, eventManager);
204204
}
205205
}

miner/src/main/java/fr/rakambda/channelpointsminer/miner/factory/TwitchWebSocketClientFactory.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import fr.rakambda.channelpointsminer.miner.api.passport.TwitchClient;
66
import fr.rakambda.channelpointsminer.miner.api.passport.TwitchLogin;
77
import fr.rakambda.channelpointsminer.miner.api.pubsub.TwitchPubSubWebSocketClient;
8+
import fr.rakambda.channelpointsminer.miner.event.manager.IEventManager;
89
import lombok.NoArgsConstructor;
910
import org.jetbrains.annotations.NotNull;
1011
import java.net.URI;
@@ -22,8 +23,13 @@ public static TwitchPubSubWebSocketClient createPubSubClient(){
2223
}
2324

2425
@NotNull
25-
public static TwitchHermesWebSocketClient createHermesClient(){
26-
return new TwitchHermesWebSocketClient(URI.create("%s?clientId=%s".formatted(HERMES_URI_BASE, TwitchClient.WEB.getClientId())));
26+
public static TwitchHermesWebSocketClient createHermesClient(@NotNull IEventManager eventManager){
27+
return new TwitchHermesWebSocketClient(URI.create("%s?clientId=%s".formatted(HERMES_URI_BASE, TwitchClient.WEB.getClientId())), eventManager);
28+
}
29+
30+
@NotNull
31+
public static TwitchHermesWebSocketClient createHermesClient(@NotNull String reconnectUrl, @NotNull IEventManager eventManager){
32+
return new TwitchHermesWebSocketClient(URI.create(reconnectUrl), eventManager);
2733
}
2834

2935
@NotNull

miner/src/main/java/fr/rakambda/channelpointsminer/miner/miner/Miner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ private void login(){
166166
var integrityProvider = ApiFactory.createIntegrityProvider(twitchLogin, versionProvider, accountConfiguration.getLoginMethod(), eventManager);
167167
gqlApi = ApiFactory.createGqlApi(twitchLogin, integrityProvider);
168168
twitchApi = ApiFactory.createTwitchApi(twitchLogin);
169-
hermesWebSocketPool = ApiFactory.createHermesWebSocketPool(twitchLogin);
169+
hermesWebSocketPool = ApiFactory.createHermesWebSocketPool(twitchLogin, eventManager);
170170
hermesWebSocketPool.addPubSubListener(this);
171171
chatClient = TwitchChatFactory.createChat(this, accountConfiguration.getChatMode(), listenMessages);
172172
chatClient.addChatMessageListener(new TwitchChatEventProducer(eventManager));

miner/src/test/java/fr/rakambda/channelpointsminer/miner/api/hermes/TwitchHermesWebSocketAuthenticateTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package fr.rakambda.channelpointsminer.miner.api.hermes;
22

33
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.AuthenticateResponse;
4+
import fr.rakambda.channelpointsminer.miner.event.manager.IEventManager;
45
import fr.rakambda.channelpointsminer.miner.tests.TestUtils;
56
import fr.rakambda.channelpointsminer.miner.tests.WebsocketMockServer;
67
import fr.rakambda.channelpointsminer.miner.tests.WebsocketMockServerExtension;
@@ -24,6 +25,8 @@ class TwitchHermesWebSocketAuthenticateTest {
2425

2526
@Mock
2627
private ITwitchHermesWebSocketListener listener;
28+
@Mock
29+
private IEventManager eventManager;
2730

2831
@AfterEach
2932
void tearDown(WebsocketMockServer server){
@@ -50,7 +53,7 @@ void onResponse(WebsocketMockServer server) throws InterruptedException{
5053
@BeforeEach
5154
void setUp(WebsocketMockServer server){
5255
var uri = URI.create("ws://127.0.0.1:" + server.getPort());
53-
tested = new TwitchHermesWebSocketClient(uri);
56+
tested = new TwitchHermesWebSocketClient(uri, eventManager);
5457
tested.setReuseAddr(true);
5558
tested.addListener(listener);
5659
}

miner/src/test/java/fr/rakambda/channelpointsminer/miner/api/hermes/TwitchHermesWebSocketClientTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import fr.rakambda.channelpointsminer.miner.api.passport.TwitchLogin;
44
import fr.rakambda.channelpointsminer.miner.api.pubsub.data.request.topic.Topic;
5+
import fr.rakambda.channelpointsminer.miner.event.manager.IEventManager;
56
import fr.rakambda.channelpointsminer.miner.factory.TimeFactory;
67
import fr.rakambda.channelpointsminer.miner.tests.WebsocketMockServer;
78
import fr.rakambda.channelpointsminer.miner.tests.WebsocketMockServerExtension;
@@ -39,6 +40,8 @@ class TwitchHermesWebSocketClientTest{
3940
private ITwitchHermesWebSocketListener listener;
4041
@Mock
4142
private TwitchLogin twitchLogin;
43+
@Mock
44+
private IEventManager eventManager;
4245

4346
@BeforeEach
4447
void setUp(){
@@ -150,7 +153,7 @@ void authenticate(WebsocketMockServer server) throws InterruptedException{
150153
@BeforeEach
151154
void setUp(WebsocketMockServer server){
152155
var uri = URI.create("ws://127.0.0.1:" + server.getPort());
153-
tested = new TwitchHermesWebSocketClient(uri);
156+
tested = new TwitchHermesWebSocketClient(uri, eventManager);
154157
tested.setReuseAddr(true);
155158
tested.addListener(listener);
156159
}

0 commit comments

Comments
 (0)