Skip to content

Commit 0b5bf95

Browse files
authored
Listen to BroadcastSettingsUpdate message to circumvent missing streamUp/streamDown events (#1115)
## Pull Request Etiquette ### Checklist - [x] Tests have been added in relevant areas - [x] Corresponding changes made to the documentation (README.adoc) <!-- (if irrelevant check the box too) --> ### Type of change New Feature ## Description This event is sent when stream start/stops. However it doesn't send enough info to know if the stream started or ended so we'll have to rely on our internal state which may be unreliable sometimes.
1 parent a8396a2 commit 0b5bf95

10 files changed

Lines changed: 169 additions & 16 deletions

File tree

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package fr.rakambda.channelpointsminer.miner.api.pubsub.data.message;
2+
3+
import com.fasterxml.jackson.annotation.JsonProperty;
4+
import com.fasterxml.jackson.annotation.JsonTypeName;
5+
import lombok.AllArgsConstructor;
6+
import lombok.Builder;
7+
import lombok.EqualsAndHashCode;
8+
import lombok.Getter;
9+
import lombok.NoArgsConstructor;
10+
import lombok.ToString;
11+
12+
@JsonTypeName("broadcast_settings_update")
13+
@Getter
14+
@EqualsAndHashCode(callSuper = true)
15+
@ToString
16+
@NoArgsConstructor
17+
@AllArgsConstructor
18+
@Builder
19+
public class BroadcastSettingsUpdate extends IPubSubMessage{
20+
@JsonProperty("channel_id")
21+
private String channelId;
22+
}

miner/src/main/java/fr/rakambda/channelpointsminer/miner/api/pubsub/data/message/IPubSubMessage.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
@JsonSubTypes.Type(value = DropProgress.class, name = "drop-progress"),
2828
@JsonSubTypes.Type(value = DropClaim.class, name = "drop-claim"),
2929
@JsonSubTypes.Type(value = ViewCount.class, name = "viewcount"),
30+
@JsonSubTypes.Type(value = BroadcastSettingsUpdate.class, name = "broadcast_settings_update"),
3031
})
3132
@EqualsAndHashCode
3233
@ToString

miner/src/main/java/fr/rakambda/channelpointsminer/miner/api/pubsub/data/request/topic/TopicName.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ public enum TopicName{
1616
PREDICTIONS_CHANNEL_V1("predictions-channel-v1", false),
1717
ONSITE_NOTIFICATIONS("onsite-notifications", true),
1818
COMMUNITY_MOMENTS_CHANNEL_V1("community-moments-channel-v1", false),
19-
USER_DROP_EVENTS("user-drop-events", false);
19+
USER_DROP_EVENTS("user-drop-events", false),
20+
BROADCAST_SETTINGS_UPDATE("broadcast-settings-update", false);
2021

2122
@Getter(onMethod_ = @JsonValue)
2223
private final String value;

miner/src/main/java/fr/rakambda/channelpointsminer/miner/handler/PubSubMessageHandlerAdapter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package fr.rakambda.channelpointsminer.miner.handler;
22

3+
import fr.rakambda.channelpointsminer.miner.api.pubsub.data.message.BroadcastSettingsUpdate;
34
import fr.rakambda.channelpointsminer.miner.api.pubsub.data.message.ClaimAvailable;
45
import fr.rakambda.channelpointsminer.miner.api.pubsub.data.message.CommunityMomentStart;
56
import fr.rakambda.channelpointsminer.miner.api.pubsub.data.message.CreateNotification;
@@ -67,6 +68,8 @@ public void onDropClaim(@NotNull Topic topic, @NotNull DropClaim message){}
6768

6869
public void onViewCount(@NotNull Topic topic, @NotNull ViewCount message){}
6970

71+
public void onBroadcastSettingsUpdate(@NotNull Topic topic, @NotNull BroadcastSettingsUpdate message){}
72+
7073
@Override
7174
public void handle(@NotNull Topic topic, @NotNull IPubSubMessage message){
7275
for(var clazz : ClassWalker.range(message.getClass(), IPubSubMessage.class)){

miner/src/main/java/fr/rakambda/channelpointsminer/miner/handler/StreamStartEndHandler.java

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,21 @@
11
package fr.rakambda.channelpointsminer.miner.handler;
22

3+
import fr.rakambda.channelpointsminer.miner.api.pubsub.data.message.BroadcastSettingsUpdate;
34
import fr.rakambda.channelpointsminer.miner.api.pubsub.data.message.StreamDown;
45
import fr.rakambda.channelpointsminer.miner.api.pubsub.data.message.StreamUp;
56
import fr.rakambda.channelpointsminer.miner.api.pubsub.data.request.topic.Topic;
67
import fr.rakambda.channelpointsminer.miner.event.impl.StreamDownEvent;
78
import fr.rakambda.channelpointsminer.miner.event.impl.StreamUpEvent;
89
import fr.rakambda.channelpointsminer.miner.event.manager.IEventManager;
10+
import fr.rakambda.channelpointsminer.miner.factory.TimeFactory;
911
import fr.rakambda.channelpointsminer.miner.log.LogContext;
1012
import fr.rakambda.channelpointsminer.miner.miner.IMiner;
1113
import fr.rakambda.channelpointsminer.miner.streamer.Streamer;
1214
import lombok.RequiredArgsConstructor;
1315
import lombok.extern.log4j.Log4j2;
1416
import org.jetbrains.annotations.NotNull;
1517
import org.jetbrains.annotations.Nullable;
18+
import java.time.Instant;
1619
import java.util.Objects;
1720
import java.util.Optional;
1821
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -26,34 +29,53 @@ public class StreamStartEndHandler extends PubSubMessageHandlerAdapter{
2629
private final IEventManager eventManager;
2730

2831
@Override
29-
public void onStreamDown(@NotNull Topic topic, @NotNull StreamDown message){
32+
public void onStreamUp(@NotNull Topic topic, @NotNull StreamUp message){
3033
var streamerId = topic.getTarget();
31-
var streamer = miner.getStreamerById(streamerId).orElse(null);
32-
var username = Objects.isNull(streamer) ? null : streamer.getUsername();
33-
updateStream(topic, streamer);
34-
Optional.ofNullable(streamer)
35-
.map(Streamer::getUsername)
36-
.ifPresent(miner.getChatClient()::leave);
37-
eventManager.onEvent(new StreamDownEvent(streamerId, username, streamer, message.getServerTime()));
34+
var streamer = miner.getStreamerById(streamerId);
35+
streamUp(streamerId, streamer.orElse(null), streamer.map(Streamer::getUsername).orElse(null), message.getServerTime());
3836
}
3937

4038
@Override
41-
public void onStreamUp(@NotNull Topic topic, @NotNull StreamUp message){
39+
public void onStreamDown(@NotNull Topic topic, @NotNull StreamDown message){
4240
var streamerId = topic.getTarget();
43-
var streamer = miner.getStreamerById(streamerId).orElse(null);
44-
var username = Objects.isNull(streamer) ? null : streamer.getUsername();
45-
updateStream(topic, streamer);
41+
var streamer = miner.getStreamerById(streamerId);
42+
streamDown(streamerId, streamer.orElse(null), streamer.map(Streamer::getUsername).orElse(null), message.getServerTime());
43+
}
44+
45+
@Override
46+
public void onBroadcastSettingsUpdate(@NotNull Topic topic, @NotNull BroadcastSettingsUpdate message){
47+
var streamerId = message.getChannelId();
48+
var streamer = miner.getStreamerById(streamerId);
49+
50+
if(streamer.map(Streamer::isStreaming).orElse(true)){
51+
streamDown(streamerId, streamer.orElse(null), streamer.map(Streamer::getUsername).orElse(null), TimeFactory.now());
52+
}
53+
else{
54+
streamUp(streamerId, streamer.orElse(null), streamer.map(Streamer::getUsername).orElse(null), TimeFactory.now());
55+
}
56+
}
57+
58+
private void streamUp(@NotNull String streamerId, @Nullable Streamer streamer, @Nullable String username, @NotNull Instant serverTime){
59+
updateStream(streamerId, streamer);
4660
Optional.ofNullable(streamer)
4761
.filter(s -> s.getSettings().isJoinIrc())
4862
.map(Streamer::getUsername)
4963
.ifPresent(miner.getChatClient()::join);
50-
eventManager.onEvent(new StreamUpEvent(streamerId, username, streamer, message.getServerTime()));
64+
eventManager.onEvent(new StreamUpEvent(streamerId, username, streamer, serverTime));
65+
}
66+
67+
private void streamDown(@NotNull String streamerId, @Nullable Streamer streamer, @Nullable String username, @NotNull Instant serverTime){
68+
updateStream(streamerId, streamer);
69+
Optional.ofNullable(streamer)
70+
.map(Streamer::getUsername)
71+
.ifPresent(miner.getChatClient()::leave);
72+
eventManager.onEvent(new StreamDownEvent(streamerId, username, streamer, serverTime));
5173
}
5274

53-
private void updateStream(@NotNull Topic topic, @Nullable Streamer streamer){
75+
private void updateStream(@Nullable String streamerId, @Nullable Streamer streamer){
5476
try(var ignored = LogContext.with(miner).withStreamer(streamer)){
5577
if(Objects.isNull(streamer)){
56-
log.warn("Couldn't find associated streamer with target {}", topic.getTarget());
78+
log.warn("Couldn't find associated streamer with id {}, not updating its info", streamerId);
5779
return;
5880
}
5981

miner/src/main/java/fr/rakambda/channelpointsminer/miner/miner/Miner.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.concurrent.ScheduledExecutorService;
4646
import java.util.concurrent.ScheduledFuture;
4747
import java.util.concurrent.TimeUnit;
48+
import static fr.rakambda.channelpointsminer.miner.api.pubsub.data.request.topic.TopicName.BROADCAST_SETTINGS_UPDATE;
4849
import static fr.rakambda.channelpointsminer.miner.api.pubsub.data.request.topic.TopicName.COMMUNITY_MOMENTS_CHANNEL_V1;
4950
import static fr.rakambda.channelpointsminer.miner.api.pubsub.data.request.topic.TopicName.COMMUNITY_POINTS_USER_V1;
5051
import static fr.rakambda.channelpointsminer.miner.api.pubsub.data.request.topic.TopicName.ONSITE_NOTIFICATIONS;
@@ -221,6 +222,7 @@ public void updateStreamer(@NotNull Streamer streamer){
221222
}
222223

223224
listenTopic(VIDEO_PLAYBACK_BY_ID, streamer.getId());
225+
listenTopic(BROADCAST_SETTINGS_UPDATE, streamer.getId());
224226

225227
if(streamer.getSettings().isMakePredictions()){
226228
listenTopic(PREDICTIONS_USER_V1, getTwitchLogin().fetchUserId(gqlApi));
@@ -262,6 +264,7 @@ public boolean removeStreamer(@NotNull Streamer streamer){
262264
}
263265
log.info("Removing streamer from the mining list");
264266
removeTopic(VIDEO_PLAYBACK_BY_ID, streamer.getId());
267+
removeTopic(BROADCAST_SETTINGS_UPDATE, streamer.getId());
265268
removeTopic(PREDICTIONS_CHANNEL_V1, streamer.getId());
266269
removeTopic(COMMUNITY_MOMENTS_CHANNEL_V1, streamer.getId());
267270
removeTopic(RAID, streamer.getId());

miner/src/test/java/fr/rakambda/channelpointsminer/miner/api/pubsub/TwitchPubSubWebSocketClientMessageTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import fr.rakambda.channelpointsminer.miner.api.pubsub.ITwitchPubSubWebSocketListener;
44
import fr.rakambda.channelpointsminer.miner.api.pubsub.TwitchPubSubWebSocketClient;
5+
import fr.rakambda.channelpointsminer.miner.api.pubsub.data.message.BroadcastSettingsUpdate;
56
import fr.rakambda.channelpointsminer.miner.api.pubsub.data.message.ClaimAvailable;
67
import fr.rakambda.channelpointsminer.miner.api.pubsub.data.message.CommunityMomentStart;
78
import fr.rakambda.channelpointsminer.miner.api.pubsub.data.message.CreateNotification;
@@ -47,6 +48,7 @@
4748
import java.net.MalformedURLException;
4849
import java.net.URI;
4950
import java.time.ZonedDateTime;
51+
import static fr.rakambda.channelpointsminer.miner.api.pubsub.data.request.topic.TopicName.BROADCAST_SETTINGS_UPDATE;
5052
import static fr.rakambda.channelpointsminer.miner.api.pubsub.data.request.topic.TopicName.COMMUNITY_MOMENTS_CHANNEL_V1;
5153
import static fr.rakambda.channelpointsminer.miner.api.pubsub.data.request.topic.TopicName.COMMUNITY_POINTS_USER_V1;
5254
import static fr.rakambda.channelpointsminer.miner.api.pubsub.data.request.topic.TopicName.ONSITE_NOTIFICATIONS;
@@ -380,6 +382,24 @@ void onViewCount(WebsocketMockServer server){
380382
verify(listener, timeout(MESSAGE_TIMEOUT)).onWebSocketMessage(expected);
381383
}
382384

385+
@Test
386+
void onBroadcastSettingsUpdate(WebsocketMockServer server){
387+
server.send(TestUtils.getAllResourceContent("api/ws/broadcastSettingsUpdate.json"));
388+
389+
var expected = MessageResponse.builder()
390+
.data(MessageData.builder()
391+
.topic(Topic.builder()
392+
.name(BROADCAST_SETTINGS_UPDATE)
393+
.target("987654321")
394+
.build())
395+
.message(BroadcastSettingsUpdate.builder()
396+
.channelId("987654321")
397+
.build())
398+
.build())
399+
.build();
400+
verify(listener, timeout(MESSAGE_TIMEOUT)).onWebSocketMessage(expected);
401+
}
402+
383403
@Test
384404
void onUnknownEvent(WebsocketMockServer server){
385405
server.send(TestUtils.getAllResourceContent("api/ws/unknown.json"));

miner/src/test/java/fr/rakambda/channelpointsminer/miner/handler/StreamStartEndHandlerTest.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package fr.rakambda.channelpointsminer.miner.handler;
22

33
import fr.rakambda.channelpointsminer.miner.api.chat.ITwitchChatClient;
4+
import fr.rakambda.channelpointsminer.miner.api.pubsub.data.message.BroadcastSettingsUpdate;
45
import fr.rakambda.channelpointsminer.miner.api.pubsub.data.message.StreamDown;
56
import fr.rakambda.channelpointsminer.miner.api.pubsub.data.message.StreamUp;
67
import fr.rakambda.channelpointsminer.miner.api.pubsub.data.request.topic.Topic;
78
import fr.rakambda.channelpointsminer.miner.event.impl.StreamDownEvent;
89
import fr.rakambda.channelpointsminer.miner.event.impl.StreamUpEvent;
910
import fr.rakambda.channelpointsminer.miner.event.manager.IEventManager;
11+
import fr.rakambda.channelpointsminer.miner.factory.TimeFactory;
1012
import fr.rakambda.channelpointsminer.miner.miner.IMiner;
1113
import fr.rakambda.channelpointsminer.miner.streamer.Streamer;
1214
import fr.rakambda.channelpointsminer.miner.streamer.StreamerSettings;
@@ -23,6 +25,7 @@
2325
import static org.mockito.ArgumentMatchers.any;
2426
import static org.mockito.ArgumentMatchers.anyLong;
2527
import static org.mockito.Mockito.lenient;
28+
import static org.mockito.Mockito.mockStatic;
2629
import static org.mockito.Mockito.never;
2730
import static org.mockito.Mockito.verify;
2831
import static org.mockito.Mockito.when;
@@ -52,17 +55,21 @@ class StreamStartEndHandlerTest{
5255
@Mock
5356
private StreamDown streamDownMessage;
5457
@Mock
58+
private BroadcastSettingsUpdate broadcastSettingsUpdateMessage;
59+
@Mock
5560
private ITwitchChatClient chatClient;
5661

5762
@BeforeEach
5863
void setUp(){
5964
lenient().when(topic.getTarget()).thenReturn(STREAMER_ID);
6065
lenient().when(streamer.getUsername()).thenReturn(STREAMER_NAME);
6166
lenient().when(streamer.getSettings()).thenReturn(streamerSettings);
67+
lenient().when(streamer.isStreaming()).thenReturn(false);
6268
lenient().when(streamerSettings.isJoinIrc()).thenReturn(false);
6369
lenient().when(miner.getChatClient()).thenReturn(chatClient);
6470
lenient().when(streamUpMessage.getServerTime()).thenReturn(NOW);
6571
lenient().when(streamDownMessage.getServerTime()).thenReturn(NOW);
72+
lenient().when(broadcastSettingsUpdateMessage.getChannelId()).thenReturn(STREAMER_ID);
6673
}
6774

6875
@Test
@@ -138,4 +145,62 @@ void streamDownUnknown(){
138145
verify(eventManager).onEvent(new StreamDownEvent(STREAMER_ID, null, null, NOW));
139146
verify(chatClient, never()).leave(any());
140147
}
148+
149+
@Test
150+
void broadcastSettingsUpdateAndNotStreaming(){
151+
try(var timeFactory = mockStatic(TimeFactory.class)){
152+
timeFactory.when(TimeFactory::now).thenReturn(NOW);
153+
154+
when(miner.schedule(any(Runnable.class), anyLong(), any())).thenAnswer(invocation -> {
155+
var runnable = invocation.getArgument(0, Runnable.class);
156+
runnable.run();
157+
return null;
158+
});
159+
160+
when(miner.getStreamerById(STREAMER_ID)).thenReturn(Optional.of(streamer));
161+
162+
assertDoesNotThrow(() -> tested.handle(topic, broadcastSettingsUpdateMessage));
163+
164+
verify(miner).updateStreamerInfos(streamer);
165+
verify(eventManager).onEvent(new StreamUpEvent(STREAMER_ID, STREAMER_NAME, streamer, NOW));
166+
verify(chatClient, never()).join(any());
167+
}
168+
}
169+
170+
@Test
171+
void broadcastSettingsUpdateAndStreaming(){
172+
try(var timeFactory = mockStatic(TimeFactory.class)){
173+
timeFactory.when(TimeFactory::now).thenReturn(NOW);
174+
175+
when(miner.schedule(any(Runnable.class), anyLong(), any())).thenAnswer(invocation -> {
176+
var runnable = invocation.getArgument(0, Runnable.class);
177+
runnable.run();
178+
return null;
179+
});
180+
181+
when(miner.getStreamerById(STREAMER_ID)).thenReturn(Optional.of(streamer));
182+
when(streamer.isStreaming()).thenReturn(true);
183+
184+
assertDoesNotThrow(() -> tested.handle(topic, broadcastSettingsUpdateMessage));
185+
186+
verify(miner).updateStreamerInfos(streamer);
187+
verify(eventManager).onEvent(new StreamDownEvent(STREAMER_ID, STREAMER_NAME, streamer, NOW));
188+
verify(chatClient).leave(STREAMER_NAME);
189+
}
190+
}
191+
192+
@Test
193+
void broadcastSettingsUpdateAndUnknown(){
194+
try(var timeFactory = mockStatic(TimeFactory.class)){
195+
timeFactory.when(TimeFactory::now).thenReturn(NOW);
196+
197+
when(miner.getStreamerById(STREAMER_ID)).thenReturn(Optional.empty());
198+
199+
assertDoesNotThrow(() -> tested.handle(topic, broadcastSettingsUpdateMessage));
200+
201+
verify(miner, never()).schedule(any(Runnable.class), anyLong(), any());
202+
verify(eventManager).onEvent(new StreamDownEvent(STREAMER_ID, null, null, NOW));
203+
verify(chatClient, never()).leave(STREAMER_NAME);
204+
}
205+
}
141206
}

miner/src/test/java/fr/rakambda/channelpointsminer/miner/miner/MinerTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.concurrent.ScheduledExecutorService;
4646
import java.util.concurrent.ScheduledFuture;
4747
import java.util.concurrent.atomic.AtomicBoolean;
48+
import static fr.rakambda.channelpointsminer.miner.api.pubsub.data.request.topic.TopicName.BROADCAST_SETTINGS_UPDATE;
4849
import static fr.rakambda.channelpointsminer.miner.api.pubsub.data.request.topic.TopicName.COMMUNITY_MOMENTS_CHANNEL_V1;
4950
import static fr.rakambda.channelpointsminer.miner.api.pubsub.data.request.topic.TopicName.COMMUNITY_POINTS_USER_V1;
5051
import static fr.rakambda.channelpointsminer.miner.api.pubsub.data.request.topic.TopicName.PREDICTIONS_CHANNEL_V1;
@@ -414,6 +415,7 @@ void addStreamerWithPredictions(){
414415
verify(updateStreamInfo).run(streamer);
415416
verify(hermesWebSocketPool).listenPubSubTopic(Topic.builder().name(PREDICTIONS_USER_V1).target(USER_ID).build());
416417
verify(hermesWebSocketPool).listenPubSubTopic(Topic.builder().name(VIDEO_PLAYBACK_BY_ID).target(STREAMER_ID).build());
418+
verify(hermesWebSocketPool).listenPubSubTopic(Topic.builder().name(BROADCAST_SETTINGS_UPDATE).target(STREAMER_ID).build());
417419
verify(hermesWebSocketPool).listenPubSubTopic(Topic.builder().name(USER_DROP_EVENTS).target(USER_ID).build());
418420
verify(hermesWebSocketPool).listenPubSubTopic(Topic.builder().name(PREDICTIONS_CHANNEL_V1).target(STREAMER_ID).build());
419421
verify(eventManager).onEvent(new StreamerAddedEvent(streamer, NOW));
@@ -452,6 +454,7 @@ void addStreamerWithMoments(){
452454

453455
verify(updateStreamInfo).run(streamer);
454456
verify(hermesWebSocketPool).listenPubSubTopic(Topic.builder().name(VIDEO_PLAYBACK_BY_ID).target(STREAMER_ID).build());
457+
verify(hermesWebSocketPool).listenPubSubTopic(Topic.builder().name(BROADCAST_SETTINGS_UPDATE).target(STREAMER_ID).build());
455458
verify(hermesWebSocketPool).listenPubSubTopic(Topic.builder().name(COMMUNITY_MOMENTS_CHANNEL_V1).target(STREAMER_ID).build());
456459
verify(eventManager).onEvent(new StreamerAddedEvent(streamer, NOW));
457460
}
@@ -489,6 +492,7 @@ void addStreamerWithRaid(){
489492

490493
verify(updateStreamInfo).run(streamer);
491494
verify(hermesWebSocketPool).listenPubSubTopic(Topic.builder().name(VIDEO_PLAYBACK_BY_ID).target(STREAMER_ID).build());
495+
verify(hermesWebSocketPool).listenPubSubTopic(Topic.builder().name(BROADCAST_SETTINGS_UPDATE).target(STREAMER_ID).build());
492496
verify(hermesWebSocketPool).listenPubSubTopic(Topic.builder().name(RAID).target(STREAMER_ID).build());
493497
verify(eventManager).onEvent(new StreamerAddedEvent(streamer, NOW));
494498
}
@@ -527,6 +531,7 @@ void addStreamerWithIrcAndStreamerOffline(){
527531

528532
verify(updateStreamInfo).run(streamer);
529533
verify(hermesWebSocketPool).listenPubSubTopic(Topic.builder().name(VIDEO_PLAYBACK_BY_ID).target(STREAMER_ID).build());
534+
verify(hermesWebSocketPool).listenPubSubTopic(Topic.builder().name(BROADCAST_SETTINGS_UPDATE).target(STREAMER_ID).build());
530535
verify(eventManager).onEvent(new StreamerAddedEvent(streamer, NOW));
531536
verify(twitchChatClient, never()).join(any());
532537
}
@@ -566,6 +571,7 @@ void addStreamerWithIrcAndStreamerOnline(){
566571

567572
verify(updateStreamInfo).run(streamer);
568573
verify(hermesWebSocketPool).listenPubSubTopic(Topic.builder().name(VIDEO_PLAYBACK_BY_ID).target(STREAMER_ID).build());
574+
verify(hermesWebSocketPool).listenPubSubTopic(Topic.builder().name(BROADCAST_SETTINGS_UPDATE).target(STREAMER_ID).build());
569575
verify(eventManager).onEvent(new StreamerAddedEvent(streamer, NOW));
570576
verify(twitchChatClient).join(STREAMER_USERNAME);
571577
}
@@ -602,6 +608,7 @@ void addDuplicateStreamer(){
602608

603609
verify(updateStreamInfo).run(streamer);
604610
verify(hermesWebSocketPool).listenPubSubTopic(Topic.builder().name(VIDEO_PLAYBACK_BY_ID).target(STREAMER_ID).build());
611+
verify(hermesWebSocketPool).listenPubSubTopic(Topic.builder().name(BROADCAST_SETTINGS_UPDATE).target(STREAMER_ID).build());
605612
verify(eventManager).onEvent(new StreamerAddedEvent(streamer, NOW));
606613
}
607614
}
@@ -686,6 +693,7 @@ void removeStreamer(){
686693
tested.removeStreamer(streamer);
687694

688695
verify(hermesWebSocketPool).removePubSubTopic(Topic.builder().name(VIDEO_PLAYBACK_BY_ID).target(STREAMER_ID).build());
696+
verify(hermesWebSocketPool).removePubSubTopic(Topic.builder().name(BROADCAST_SETTINGS_UPDATE).target(STREAMER_ID).build());
689697
verify(hermesWebSocketPool).removePubSubTopic(Topic.builder().name(PREDICTIONS_CHANNEL_V1).target(STREAMER_ID).build());
690698
verify(hermesWebSocketPool).removePubSubTopic(Topic.builder().name(COMMUNITY_MOMENTS_CHANNEL_V1).target(STREAMER_ID).build());
691699
verify(hermesWebSocketPool).removePubSubTopic(Topic.builder().name(RAID).target(STREAMER_ID).build());
@@ -764,6 +772,7 @@ void updateStreamerAllActivatedOnline(){
764772
assertDoesNotThrow(() -> tested.updateStreamer(streamer));
765773

766774
verify(hermesWebSocketPool).listenPubSubTopic(Topic.builder().name(VIDEO_PLAYBACK_BY_ID).target(STREAMER_ID).build());
775+
verify(hermesWebSocketPool).listenPubSubTopic(Topic.builder().name(BROADCAST_SETTINGS_UPDATE).target(STREAMER_ID).build());
767776
verify(hermesWebSocketPool).listenPubSubTopic(Topic.builder().name(USER_DROP_EVENTS).target(USER_ID).build());
768777
verify(hermesWebSocketPool).listenPubSubTopic(Topic.builder().name(PREDICTIONS_USER_V1).target(USER_ID).build());
769778
verify(hermesWebSocketPool).listenPubSubTopic(Topic.builder().name(PREDICTIONS_CHANNEL_V1).target(STREAMER_ID).build());

0 commit comments

Comments
 (0)