Skip to content

Commit eee16f5

Browse files
committed
temp
1 parent d5824f0 commit eee16f5

65 files changed

Lines changed: 1809 additions & 0 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package fr.rakambda.channelpointsminer.miner.api.hermes;
2+
3+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.message.IHermesMessage;
4+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.request.topic.Topic;
5+
import org.jetbrains.annotations.NotNull;
6+
7+
public interface ITwitchHermesMessageListener {
8+
void onTwitchMessage(@NotNull Topic topic, @NotNull IHermesMessage message);
9+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package fr.rakambda.channelpointsminer.miner.api.hermes;
2+
3+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.ITwitchHermesWebSocketResponse;
4+
import org.jetbrains.annotations.NotNull;
5+
import org.jetbrains.annotations.Nullable;
6+
7+
public interface ITwitchHermesWebSocketListener {
8+
void onWebSocketMessage(@NotNull ITwitchHermesWebSocketResponse message);
9+
10+
void onWebSocketClosed(@NotNull TwitchHermesWebSocketClient client, int code, @Nullable String reason, boolean remote);
11+
}
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
package fr.rakambda.channelpointsminer.miner.api.hermes;
2+
3+
import java.net.URI;
4+
import java.time.Instant;
5+
import java.util.Collection;
6+
import java.util.HashMap;
7+
import java.util.HashSet;
8+
import java.util.Map;
9+
import java.util.Objects;
10+
import java.util.Set;
11+
import java.util.UUID;
12+
import java.util.concurrent.ConcurrentLinkedQueue;
13+
import static org.java_websocket.framing.CloseFrame.GOING_AWAY;
14+
import com.fasterxml.jackson.core.JsonProcessingException;
15+
import com.fasterxml.jackson.core.type.TypeReference;
16+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.request.ITwitchHermesWebSocketRequest;
17+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.request.ListenTopicRequest;
18+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.request.KeepAliveRequest;
19+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.request.UnlistenTopicRequest;
20+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.request.topic.Topic;
21+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.request.topic.Topics;
22+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.AuthenticateResponse;
23+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.ITwitchHermesWebSocketResponse;
24+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.MessageResponseHermes;
25+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.PongResponseHermes;
26+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.ReconnectResponseHermes;
27+
import fr.rakambda.channelpointsminer.miner.factory.TimeFactory;
28+
import fr.rakambda.channelpointsminer.miner.log.LogContext;
29+
import fr.rakambda.channelpointsminer.miner.util.json.JacksonUtils;
30+
import lombok.Getter;
31+
import lombok.extern.log4j.Log4j2;
32+
import org.java_websocket.WebSocket;
33+
import org.java_websocket.client.WebSocketClient;
34+
import org.java_websocket.framing.Framedata;
35+
import org.java_websocket.handshake.ServerHandshake;
36+
import org.jetbrains.annotations.NotNull;
37+
38+
@Log4j2
39+
public class TwitchHermesWebSocketClient extends WebSocketClient{
40+
@Getter
41+
private final Set<Topics> topics;
42+
private final Collection<ITwitchHermesWebSocketListener> listeners;
43+
@Getter
44+
private final String uuid;
45+
private final Map<String, ListenTopicRequest> listenRequests;
46+
47+
@Getter
48+
private Instant lastPong;
49+
50+
public TwitchHermesWebSocketClient(@NotNull URI uri){
51+
super(uri);
52+
uuid = UUID.randomUUID().toString();
53+
listenRequests = new HashMap<>();
54+
55+
setConnectionLostTimeout(0);
56+
topics = new HashSet<>();
57+
listeners = new ConcurrentLinkedQueue<>();
58+
lastPong = Instant.EPOCH;
59+
60+
addHeader("Origin", "https://www.twitch.tv");
61+
addHeader("Sec-Websocket-Key", "g5vRgkpsUreEDo2HQn0RgQ==");
62+
addHeader("Sec-Websocket-Version", "13");
63+
}
64+
65+
@Override
66+
public void onOpen(ServerHandshake serverHandshake){
67+
try(var ignored = LogContext.empty().withSocketId(uuid)){
68+
log.info("Hermes WebSocket opened");
69+
}
70+
}
71+
72+
@Override
73+
public void onMessage(String messageStr){
74+
try(var logContext = LogContext.empty().withSocketId(uuid)){
75+
log.trace("Received Hermes WebSocket message: {}", messageStr.strip());
76+
var message = JacksonUtils.read(messageStr, new TypeReference<ITwitchHermesWebSocketResponse>(){});
77+
log.trace("Parsed Hermes message: {}", message);
78+
79+
switch(message){
80+
case AuthenticateResponse authenticateResponse -> {
81+
if(authenticateResponse.hasError()){
82+
log.error("Received Hermes error authentication {}", authenticateResponse);
83+
close(GOING_AWAY, "Invalid credentials");
84+
}
85+
}
86+
case PongResponseHermes ignored1 -> onPong();
87+
case MessageResponseHermes messageResponse -> {
88+
}
89+
case ReconnectResponseHermes ignored -> close(GOING_AWAY);
90+
default -> {
91+
}
92+
}
93+
listeners.forEach(listener -> listener.onWebSocketMessage(message));
94+
}
95+
catch(Exception e){
96+
log.error("Failed to handle Hermes WebSocket message {}", messageStr, e);
97+
}
98+
}
99+
100+
@Override
101+
public void onClose(int code, String reason, boolean remote){
102+
try(var ignored = LogContext.empty().withSocketId(uuid)){
103+
log.info("Hermes WebSocket closed with code {}, from host {}, reason {}", code, remote, reason);
104+
listeners.forEach(l -> l.onWebSocketClosed(this, code, reason, remote));
105+
}
106+
}
107+
108+
@Override
109+
public void onError(Exception e){
110+
log.error("Error from Hermes WebSocket", e);
111+
}
112+
113+
private void onPong(){
114+
lastPong = TimeFactory.now();
115+
}
116+
117+
public void ping(){
118+
send(new KeepAliveRequest());
119+
}
120+
121+
public void send(@NotNull ITwitchHermesWebSocketRequest request){
122+
try(var ignored = LogContext.empty().withSocketId(uuid)){
123+
var data = JacksonUtils.writeAsString(request);
124+
log.trace("Sending Hermes WebSocket message: {}", data);
125+
send(data);
126+
}
127+
catch(JsonProcessingException e){
128+
log.error("Failed to convert Hermes WebSocket message to json", e);
129+
}
130+
}
131+
132+
@Override
133+
public void onWebsocketPong(WebSocket conn, Framedata f){
134+
onPong();
135+
}
136+
137+
public void addListener(@NotNull ITwitchHermesWebSocketListener listener){
138+
listeners.add(listener);
139+
}
140+
141+
public boolean isTopicListened(@NotNull Topic topic){
142+
return topics.stream()
143+
.flatMap(t -> t.getTopics().stream())
144+
.anyMatch(t -> Objects.equals(t, topic));
145+
}
146+
147+
public void listenTopic(@NotNull Topics topics){
148+
try(var ignored = LogContext.empty().withSocketId(uuid)){
149+
if(this.topics.add(topics)){
150+
var request = new ListenTopicRequest(topics);
151+
listenRequests.put(request.getNonce(), request);
152+
send(request);
153+
}
154+
}
155+
}
156+
157+
public void removeTopic(@NotNull Topic topic){
158+
try(var ignored = LogContext.empty().withSocketId(uuid)){
159+
var topics = this.topics.stream()
160+
.filter(t -> t.getTopics().contains(topic))
161+
.toList();
162+
163+
topics.forEach(t -> {
164+
send(new UnlistenTopicRequest(t));
165+
this.topics.remove(t);
166+
});
167+
}
168+
}
169+
170+
public int getTopicCount(){
171+
return topics.stream().mapToInt(Topics::getTopicCount).sum();
172+
}
173+
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package fr.rakambda.channelpointsminer.miner.api.hermes;
2+
3+
import java.util.Collection;
4+
import java.util.Objects;
5+
import java.util.Queue;
6+
import java.util.concurrent.ConcurrentLinkedQueue;
7+
import static java.time.temporal.ChronoUnit.MINUTES;
8+
import static org.java_websocket.framing.CloseFrame.ABNORMAL_CLOSE;
9+
import static org.java_websocket.framing.CloseFrame.NORMAL;
10+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.request.topic.Topic;
11+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.request.topic.Topics;
12+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.ITwitchHermesWebSocketResponse;
13+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.response.MessageResponseHermes;
14+
import fr.rakambda.channelpointsminer.miner.factory.TimeFactory;
15+
import fr.rakambda.channelpointsminer.miner.factory.TwitchWebSocketClientFactory;
16+
import lombok.extern.log4j.Log4j2;
17+
import org.java_websocket.client.WebSocketClient;
18+
import org.jetbrains.annotations.NotNull;
19+
import org.jetbrains.annotations.Nullable;
20+
21+
@Log4j2
22+
public class TwitchHermesWebSocketPool implements AutoCloseable, ITwitchHermesWebSocketListener {
23+
private static final int SOCKET_TIMEOUT_MINUTES = 5;
24+
25+
private final Collection<TwitchHermesWebSocketClient> clients;
26+
private final Collection<ITwitchHermesMessageListener> listeners;
27+
private final Queue<Topics> pendingTopics;
28+
private final int maxTopicPerClient;
29+
30+
public TwitchHermesWebSocketPool(int maxTopicPerClient){
31+
this.maxTopicPerClient = maxTopicPerClient;
32+
clients = new ConcurrentLinkedQueue<>();
33+
listeners = new ConcurrentLinkedQueue<>();
34+
pendingTopics = new ConcurrentLinkedQueue<>();
35+
}
36+
37+
public void ping(){
38+
checkStaleConnection();
39+
40+
clients.stream()
41+
.filter(WebSocketClient::isOpen)
42+
.filter(client -> !client.isClosing())
43+
.forEach(TwitchHermesWebSocketClient::ping);
44+
}
45+
46+
public void checkStaleConnection(){
47+
clients.stream()
48+
.filter(client -> TimeFactory.now().isAfter(client.getLastPong().plus(SOCKET_TIMEOUT_MINUTES, MINUTES)))
49+
.forEach(client -> client.close(ABNORMAL_CLOSE, "Timeout reached"));
50+
}
51+
52+
public void removeTopic(@NotNull Topic topic){
53+
clients.stream()
54+
.filter(client -> client.isTopicListened(topic))
55+
.forEach(client -> client.removeTopic(topic));
56+
}
57+
58+
public void addListener(@NotNull ITwitchHermesMessageListener listener){
59+
listeners.add(listener);
60+
}
61+
62+
@Override
63+
public void onWebSocketMessage(@NotNull ITwitchHermesWebSocketResponse response){
64+
if(response instanceof MessageResponseHermes m){
65+
var topic = m.getData().getTopic();
66+
var message = m.getData().getMessage();
67+
listeners.forEach(l -> l.onTwitchMessage(topic, message));
68+
}
69+
}
70+
71+
@Override
72+
public void onWebSocketClosed(@NotNull TwitchHermesWebSocketClient client, int code, @Nullable String reason, boolean remote){
73+
clients.remove(client);
74+
if(code != NORMAL){
75+
pendingTopics.addAll(client.getTopics());
76+
}
77+
}
78+
79+
public void listenPendingTopics(){
80+
try{
81+
Topics topic;
82+
while(Objects.nonNull(topic = pendingTopics.poll())){
83+
listenTopic(topic);
84+
}
85+
}
86+
catch(RuntimeException e){
87+
log.error("Failed to join pending chats", e);
88+
}
89+
}
90+
91+
public void listenTopic(@NotNull Topics topics){
92+
var isListened = topics.getTopics().stream().anyMatch(this::isTopicListened);
93+
if(isListened){
94+
log.debug("Topic {} is already being listened", topics);
95+
return;
96+
}
97+
98+
try{
99+
getAvailableClient().listenTopic(topics);
100+
}
101+
catch(RuntimeException e){
102+
pendingTopics.add(topics);
103+
throw e;
104+
}
105+
}
106+
107+
private boolean isTopicListened(@NotNull Topic topic){
108+
return clients.stream().anyMatch(client -> client.isTopicListened(topic));
109+
}
110+
111+
@NotNull
112+
private TwitchHermesWebSocketClient getAvailableClient(){
113+
return clients.stream()
114+
.filter(client -> !client.isClosing() && !client.isClosed())
115+
.filter(client -> client.getTopicCount() < maxTopicPerClient)
116+
.findAny()
117+
.orElseGet(this::createNewClient);
118+
}
119+
120+
@NotNull
121+
public TwitchHermesWebSocketClient createNewClient(){
122+
try{
123+
var client = TwitchWebSocketClientFactory.createHermesClient();
124+
log.debug("Created Hermes WebSocket client with uuid {}", client.getUuid());
125+
client.addListener(this);
126+
client.connectBlocking();
127+
clients.add(client);
128+
return client;
129+
}
130+
catch(Exception e){
131+
log.error("Failed to create new Hermes WebSocket");
132+
throw new RuntimeException(e);
133+
}
134+
}
135+
136+
@Override
137+
public void close(){
138+
clients.forEach(WebSocketClient::close);
139+
}
140+
141+
public int getClientCount(){
142+
return clients.size();
143+
}
144+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package fr.rakambda.channelpointsminer.miner.api.hermes.data.message;
2+
3+
import com.fasterxml.jackson.annotation.JsonProperty;
4+
import com.fasterxml.jackson.annotation.JsonTypeName;
5+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.message.claimavailable.ClaimAvailableData;
6+
import lombok.AllArgsConstructor;
7+
import lombok.Builder;
8+
import lombok.EqualsAndHashCode;
9+
import lombok.Getter;
10+
import lombok.NoArgsConstructor;
11+
import lombok.ToString;
12+
import org.jetbrains.annotations.NotNull;
13+
14+
@NoArgsConstructor
15+
@AllArgsConstructor
16+
@JsonTypeName("claim-available")
17+
@Getter
18+
@EqualsAndHashCode(callSuper = true)
19+
@ToString
20+
@Builder
21+
public class ClaimAvailable extends IHermesMessage {
22+
@JsonProperty("data")
23+
@NotNull
24+
private ClaimAvailableData data;
25+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package fr.rakambda.channelpointsminer.miner.api.hermes.data.message;
2+
3+
import com.fasterxml.jackson.annotation.JsonProperty;
4+
import com.fasterxml.jackson.annotation.JsonTypeName;
5+
import fr.rakambda.channelpointsminer.miner.api.hermes.data.message.communitymoment.CommunityMomentStartData;
6+
import lombok.AllArgsConstructor;
7+
import lombok.Builder;
8+
import lombok.EqualsAndHashCode;
9+
import lombok.Getter;
10+
import lombok.NoArgsConstructor;
11+
import lombok.ToString;
12+
import org.jetbrains.annotations.NotNull;
13+
14+
@JsonTypeName("active")
15+
@Getter
16+
@EqualsAndHashCode(callSuper = true)
17+
@ToString
18+
@NoArgsConstructor
19+
@AllArgsConstructor
20+
@Builder
21+
public class CommunityMomentStart extends IHermesMessage {
22+
@JsonProperty("data")
23+
@NotNull
24+
private CommunityMomentStartData data;
25+
}

0 commit comments

Comments
 (0)