Skip to content

Commit eb070c8

Browse files
authored
[Hermes] Handle reconnect message (#1114)
## Pull Request Etiquette ### Checklist - [x] Tests have been added in relevant areas - [x] Corresponding changes made to the documentation (README.adoc) <!-- (if irrelevant check the box too) --> ### Type of change Bug fix <!-- Choose one from "Bug fix" / "New Feature" / "Breaking change" / "Internal change" --> ## Description Handle the reconnect message from the Hermes endpoint, effectively disconnecting and reconnecting the previous subscriptions elsewhere. <!-- Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change. -->
1 parent 2199ac3 commit eb070c8

6 files changed

Lines changed: 93 additions & 0 deletions

File tree

miner/src/main/java/fr/rakambda/channelpointsminer/miner/api/hermes/TwitchHermesWebSocketClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.ITwitchHermesWebSocketResponse;
1212
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.KeepAliveResponse;
1313
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.NotificationResponse;
14+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.ReconnectResponse;
1415
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.SubscribeResponse;
1516
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.UnsubscribeResponse;
1617
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.WelcomeResponse;
@@ -89,6 +90,10 @@ public void onMessage(String messageStr){
8990
subscribeRequests.remove(unsubscribeResponse.getUnsubscribeResponse().getSubscription().getId());
9091
}
9192
case NotificationResponse notificationResponse -> log.debug("Received Hermes notification of type {}", notificationResponse.getNotification().getClass().getSimpleName());
93+
case ReconnectResponse ignored -> {
94+
log.warn("Received Hermes reconnect response, TODO: see if any field is useful : {}", messageStr);
95+
close(GOING_AWAY);
96+
}
9297
default -> {
9398
}
9499
}

miner/src/main/java/fr/rakambda/channelpointsminer/miner/api/hermes/TwitchHermesWebSocketPool.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ public void onWebSocketClosed(@NotNull TwitchHermesWebSocketClient client, int c
9797
clients.remove(client);
9898
if(code != NORMAL){
9999
pendingTopics.addAll(client.getSubscribeRequests().keySet().stream().map(topics::get).filter(Objects::nonNull).toList());
100+
client.getSubscribeRequests().keySet().forEach(topics::remove);
100101
}
101102
}
102103

miner/src/main/java/fr/rakambda/channelpointsminer/miner/api/hermes/data/response/ITwitchHermesWebSocketResponse.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
@JsonSubTypes.Type(value = SubscribeResponse.class, name = "subscribeResponse"),
2424
@JsonSubTypes.Type(value = UnsubscribeResponse.class, name = "unsubscribeResponse"),
2525
@JsonSubTypes.Type(value = NotificationResponse.class, name = "notification"),
26+
@JsonSubTypes.Type(value = ReconnectResponse.class, name = "reconnect"),
2627
})
2728
@ToString
2829
@EqualsAndHashCode
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package fr.rakambda.channelpointsminer.miner.api.hermes.data.response;
2+
3+
import com.fasterxml.jackson.annotation.JsonProperty;
4+
import com.fasterxml.jackson.annotation.JsonTypeName;
5+
import lombok.AllArgsConstructor;
6+
import lombok.Builder;
7+
import lombok.EqualsAndHashCode;
8+
import lombok.Getter;
9+
import lombok.NoArgsConstructor;
10+
import lombok.ToString;
11+
import lombok.experimental.SuperBuilder;
12+
13+
@JsonTypeName("reconnect")
14+
@Getter
15+
@SuperBuilder
16+
@EqualsAndHashCode(callSuper = true)
17+
@AllArgsConstructor
18+
@ToString(callSuper = true)
19+
public class ReconnectResponse extends ITwitchHermesWebSocketResponse{
20+
// TODO what fields are present ?
21+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package fr.rakambda.channelpointsminer.miner.api.hermes;
2+
3+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.ReconnectResponse;
4+
import fr.rakambda.channelpointsminer.miner.tests.TestUtils;
5+
import fr.rakambda.channelpointsminer.miner.tests.WebsocketMockServer;
6+
import fr.rakambda.channelpointsminer.miner.tests.WebsocketMockServerExtension;
7+
import io.github.artsok.RepeatedIfExceptionsTest;
8+
import org.awaitility.core.ConditionTimeoutException;
9+
import org.mockito.Mock;
10+
import org.mockito.junit.jupiter.MockitoExtension;
11+
import org.junit.jupiter.api.AfterEach;
12+
import org.junit.jupiter.api.BeforeEach;
13+
import org.junit.jupiter.api.extension.ExtendWith;
14+
import java.net.URI;
15+
import java.time.ZonedDateTime;
16+
import static org.java_websocket.framing.CloseFrame.GOING_AWAY;
17+
import static org.mockito.ArgumentMatchers.any;
18+
import static org.mockito.ArgumentMatchers.anyBoolean;
19+
import static org.mockito.ArgumentMatchers.anyString;
20+
import static org.mockito.ArgumentMatchers.eq;
21+
import static org.mockito.Mockito.timeout;
22+
import static org.mockito.Mockito.verify;
23+
24+
@ExtendWith(MockitoExtension.class)
25+
@ExtendWith(WebsocketMockServerExtension.class)
26+
class TwitchHermesWebSocketReconnectTest{
27+
private static final int MESSAGE_TIMEOUT = 15000;
28+
private TwitchHermesWebSocketClient tested;
29+
30+
@Mock
31+
private ITwitchHermesWebSocketListener listener;
32+
33+
@AfterEach
34+
void tearDown(WebsocketMockServer server){
35+
tested.close();
36+
server.removeClients();
37+
}
38+
39+
@RepeatedIfExceptionsTest(repeats = 5, exceptions = ConditionTimeoutException.class)
40+
void onResponse(WebsocketMockServer server) throws InterruptedException{
41+
tested.connectBlocking();
42+
43+
server.send(TestUtils.getAllResourceContent("api/hermes/reconnect_ok.json"));
44+
45+
var expected = ReconnectResponse.builder()
46+
.id("4ae4865b-cedc-4755-8335-0560b7d05341")
47+
.timestamp(ZonedDateTime.parse("2025-01-02T03:04:05.123456789Z"))
48+
.build();
49+
verify(listener, timeout(MESSAGE_TIMEOUT)).onWebSocketMessage(expected);
50+
verify(listener).onWebSocketClosed(eq(tested), eq(GOING_AWAY), anyString(), anyBoolean());
51+
}
52+
53+
@BeforeEach
54+
void setUp(WebsocketMockServer server){
55+
var uri = URI.create("ws://127.0.0.1:" + server.getPort());
56+
tested = new TwitchHermesWebSocketClient(uri);
57+
tested.setReuseAddr(true);
58+
tested.addListener(listener);
59+
}
60+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"id" : "4ae4865b-cedc-4755-8335-0560b7d05341",
3+
"type" : "reconnect",
4+
"timestamp" : "2025-01-02T03:04:05.123456789Z"
5+
}

0 commit comments

Comments
 (0)