Skip to content

Commit 65d9c19

Browse files
authored
Improve stream up/down detection by requesting current status on WithIsStreamLive GQL operation (#1116)
## Pull Request Etiquette ### Checklist - [x] Tests have been added in relevant areas - [x] Corresponding changes made to the documentation (README.adoc) <!-- (if irrelevant check the box too) --> ### Type of change Internal change <!-- Choose one from "Bug fix" / "New Feature" / "Breaking change" / "Internal change" --> ## Description To avoid false start/stop events from being fired when broadcast settings change, request current live status and compare it to what we have in memory. <!-- Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change. -->
1 parent afb737d commit 65d9c19

11 files changed

Lines changed: 319 additions & 19 deletions

File tree

miner/src/main/java/fr/rakambda/channelpointsminer/miner/api/gql/gql/GQLApi.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import fr.rakambda.channelpointsminer.miner.api.gql.gql.data.types.User;
3636
import fr.rakambda.channelpointsminer.miner.api.gql.gql.data.videoplayerstreaminfooverlaychannel.VideoPlayerStreamInfoOverlayChannelData;
3737
import fr.rakambda.channelpointsminer.miner.api.gql.gql.data.videoplayerstreaminfooverlaychannel.VideoPlayerStreamInfoOverlayChannelOperation;
38+
import fr.rakambda.channelpointsminer.miner.api.gql.gql.data.withislive.WithIsStreamLiveData;
39+
import fr.rakambda.channelpointsminer.miner.api.gql.gql.data.withislive.WithIsStreamLiveOperation;
3840
import fr.rakambda.channelpointsminer.miner.api.gql.integrity.IIntegrityProvider;
3941
import fr.rakambda.channelpointsminer.miner.api.gql.integrity.IntegrityException;
4042
import fr.rakambda.channelpointsminer.miner.api.passport.TwitchLogin;
@@ -199,7 +201,12 @@ public Optional<GQLResponse<MakePredictionData>> makePrediction(@NotNull String
199201
public Optional<GQLResponse<PlaybackAccessTokenData>> playbackAccessToken(@NotNull String login){
200202
return postGqlRequest(new PlaybackAccessTokenOperation(login));
201203
}
202-
204+
205+
@NotNull
206+
public Optional<GQLResponse<WithIsStreamLiveData>> withIsStreamLive(@NotNull String id){
207+
return postGqlRequest(new WithIsStreamLiveOperation(id));
208+
}
209+
203210
@NotNull
204211
public List<User> allChannelFollows(){
205212
var follows = new ArrayList<User>();
@@ -234,12 +241,12 @@ public List<User> allChannelFollows(){
234241

235242
return follows;
236243
}
237-
244+
238245
@NotNull
239246
public Optional<GQLResponse<ChannelFollowsData>> channelFollows(int limit, @NotNull String order, @Nullable String cursor){
240247
return postGqlRequest(new ChannelFollowsOperation(limit, order, cursor));
241248
}
242-
249+
243250
@NotNull
244251
public Optional<GQLResponse<ChatRoomBanStatusData>> chatRoomBanStatus(@NotNull String channelId, @NotNull String targetUserId){
245252
return postGqlRequest(new ChatRoomBanStatusOperation(channelId, targetUserId));
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package fr.rakambda.channelpointsminer.miner.api.gql.gql.data.withislive;
2+
3+
import com.fasterxml.jackson.annotation.JsonProperty;
4+
import fr.rakambda.channelpointsminer.miner.api.gql.gql.data.types.StreamPlaybackAccessToken;
5+
import fr.rakambda.channelpointsminer.miner.api.gql.gql.data.types.User;
6+
import lombok.AllArgsConstructor;
7+
import lombok.Builder;
8+
import lombok.EqualsAndHashCode;
9+
import lombok.Getter;
10+
import lombok.NoArgsConstructor;
11+
import lombok.ToString;
12+
import org.jetbrains.annotations.NotNull;
13+
14+
@Getter
15+
@NoArgsConstructor
16+
@AllArgsConstructor
17+
@Builder
18+
@EqualsAndHashCode
19+
@ToString
20+
public class WithIsStreamLiveData {
21+
@JsonProperty("user")
22+
@NotNull
23+
private User user;
24+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package fr.rakambda.channelpointsminer.miner.api.gql.gql.data.withislive;
2+
3+
import fr.rakambda.channelpointsminer.miner.api.gql.gql.data.GQLResponse;
4+
import fr.rakambda.channelpointsminer.miner.api.gql.gql.data.IGQLOperation;
5+
import fr.rakambda.channelpointsminer.miner.api.gql.gql.data.PersistedQueryExtension;
6+
import kong.unirest.core.GenericType;
7+
import lombok.EqualsAndHashCode;
8+
import lombok.Getter;
9+
import lombok.ToString;
10+
import org.jetbrains.annotations.NotNull;
11+
12+
@Getter
13+
@EqualsAndHashCode(callSuper = true)
14+
@ToString
15+
public class WithIsStreamLiveOperation extends IGQLOperation<WithIsStreamLiveData>{
16+
public WithIsStreamLiveOperation(@NotNull String id){
17+
super("WithIsStreamLiveQuery");
18+
addPersistedQueryExtension(new PersistedQueryExtension(1, "04e46329a6786ff3a81c01c50bfa5d725902507a0deb83b0edbf7abe7a3716ea"));
19+
addVariable("id", id);
20+
}
21+
22+
@Override
23+
@NotNull
24+
public GenericType<GQLResponse<WithIsStreamLiveData>> getResponseType(){
25+
return new GenericType<>(){};
26+
}
27+
}

miner/src/main/java/fr/rakambda/channelpointsminer/miner/factory/ApiFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import fr.rakambda.channelpointsminer.miner.event.manager.IEventManager;
3232
import fr.rakambda.channelpointsminer.miner.log.UnirestLogger;
3333
import fr.rakambda.channelpointsminer.miner.util.CommonUtils;
34+
import fr.rakambda.channelpointsminer.miner.util.DiscordRetryStrategy;
3435
import fr.rakambda.channelpointsminer.miner.util.json.JacksonUtils;
3536
import kong.unirest.core.HeaderNames;
3637
import kong.unirest.core.Unirest;
@@ -65,7 +66,7 @@ private static UnirestInstance createUnirestInstance(@Nullable TwitchClient twit
6566
.interceptor(new UnirestLogger());
6667

6768
if(Objects.isNull(twitchClient) || twitchClient == TwitchClient.WEB){
68-
unirest.config().setDefaultHeader(USER_AGENT, "Mozilla/5.0 (X11; Linux x86_64; rv:104.0) Gecko/20100101 Firefox/104.0");
69+
unirest.config().setDefaultHeader(USER_AGENT, "Mozilla/5.0 (X11; Linux x86_64; rv:142.0) Gecko/20100101 Firefox/142.0");
6970
}
7071
else if(twitchClient == TwitchClient.MOBILE){
7172
unirest.config().setDefaultHeader(USER_AGENT, "Dalvik/2.1.0 (Linux; U; Android 7.1.2; SM-G975N Build/N2G48C) tv.twitch.android.app/13.4.1/1304010");
@@ -107,7 +108,7 @@ public static TwitchApi createTwitchApi(@NotNull TwitchLogin twitchLogin){
107108
@NotNull
108109
public static DiscordApi createDiscordApi(@NotNull URL webhookUrl){
109110
var unirestInstance = createUnirestInstance(null);
110-
unirestInstance.config().retryAfter(true, 5);
111+
unirestInstance.config().retryAfter(new DiscordRetryStrategy(5));
111112

112113
return new DiscordApi(webhookUrl, unirestInstance);
113114
}

miner/src/main/java/fr/rakambda/channelpointsminer/miner/handler/StreamStartEndHandler.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package fr.rakambda.channelpointsminer.miner.handler;
22

3+
import fr.rakambda.channelpointsminer.miner.api.gql.gql.data.GQLResponse;
34
import fr.rakambda.channelpointsminer.miner.api.pubsub.data.message.BroadcastSettingsUpdate;
45
import fr.rakambda.channelpointsminer.miner.api.pubsub.data.message.StreamDown;
56
import fr.rakambda.channelpointsminer.miner.api.pubsub.data.message.StreamUp;
@@ -47,31 +48,38 @@ public void onBroadcastSettingsUpdate(@NotNull Topic topic, @NotNull BroadcastSe
4748
var streamerId = message.getChannelId();
4849
var streamer = miner.getStreamerById(streamerId);
4950

50-
if(streamer.map(Streamer::isStreaming).orElse(true)){
51-
streamDown(streamerId, streamer.orElse(null), streamer.map(Streamer::getUsername).orElse(null), TimeFactory.now(), false);
51+
var status = miner.getGqlApi().withIsStreamLive(streamerId);
52+
var streaming = status.map(GQLResponse::getData).map(d -> Objects.nonNull(d.getUser().getStream()));
53+
var memoryStreaming = streamer.map(Streamer::isStreaming).orElse(false);
54+
55+
// Fire event only if we know the current streaming status, and it is different from what we currently have in memory (i.e. the stream status changed and not just some parameters)
56+
var fireEvent = streaming.isPresent() && streaming.get() != memoryStreaming;
57+
58+
if(streaming.orElseGet(() -> !memoryStreaming)){
59+
streamUp(streamerId, streamer.orElse(null), streamer.map(Streamer::getUsername).orElse(null), TimeFactory.now(), fireEvent);
5260
}
5361
else{
54-
streamUp(streamerId, streamer.orElse(null), streamer.map(Streamer::getUsername).orElse(null), TimeFactory.now(), false);
62+
streamDown(streamerId, streamer.orElse(null), streamer.map(Streamer::getUsername).orElse(null), TimeFactory.now(), fireEvent);
5563
}
5664
}
5765

5866
private void streamUp(@NotNull String streamerId, @Nullable Streamer streamer, @Nullable String username, @NotNull Instant serverTime, boolean fireEvent){
5967
updateStream(streamerId, streamer);
60-
Optional.ofNullable(streamer)
61-
.filter(s -> s.getSettings().isJoinIrc())
62-
.map(Streamer::getUsername)
63-
.ifPresent(miner.getChatClient()::join);
6468
if(fireEvent){
69+
Optional.ofNullable(streamer)
70+
.filter(s -> s.getSettings().isJoinIrc())
71+
.map(Streamer::getUsername)
72+
.ifPresent(miner.getChatClient()::join);
6573
eventManager.onEvent(new StreamUpEvent(streamerId, username, streamer, serverTime));
6674
}
6775
}
6876

6977
private void streamDown(@NotNull String streamerId, @Nullable Streamer streamer, @Nullable String username, @NotNull Instant serverTime, boolean fireEvent){
7078
updateStream(streamerId, streamer);
71-
Optional.ofNullable(streamer)
72-
.map(Streamer::getUsername)
73-
.ifPresent(miner.getChatClient()::leave);
7479
if(fireEvent){
80+
Optional.ofNullable(streamer)
81+
.map(Streamer::getUsername)
82+
.ifPresent(miner.getChatClient()::leave);
7583
eventManager.onEvent(new StreamDownEvent(streamerId, username, streamer, serverTime));
7684
}
7785
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package fr.rakambda.channelpointsminer.miner.util;
2+
3+
import kong.unirest.core.HttpResponse;
4+
import kong.unirest.core.RetryStrategy;
5+
import lombok.extern.log4j.Log4j2;
6+
import java.time.Duration;
7+
8+
@Log4j2
9+
public class DiscordRetryStrategy extends RetryStrategy.Standard{
10+
public DiscordRetryStrategy(int maxAttempts){
11+
super(maxAttempts);
12+
}
13+
14+
@Override
15+
public long getWaitTime(HttpResponse response){
16+
var delay = super.getWaitTime(response);
17+
log.info("Discord API call delayed for {}", Duration.ofMillis(delay));
18+
return delay;
19+
}
20+
}

miner/src/test/java/fr/rakambda/channelpointsminer/miner/api/gql/gql/GQLChannelFollowsTest.java renamed to miner/src/test/java/fr/rakambda/channelpointsminer/miner/api/gql/gql/GQLApiChannelFollowsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
@ExtendWith(MockitoExtension.class)
2222
@ExtendWith(UnirestMockExtension.class)
23-
class GQLChannelFollowsTest extends AbstractGQLTest{
23+
class GQLApiChannelFollowsTest extends AbstractGQLTest{
2424
public static final String VALID_QUERY = "{\"extensions\":{\"persistedQuery\":{\"sha256Hash\":\"eecf815273d3d949e5cf0085cc5084cd8a1b5b7b6f7990cf43cb0beadf546907\",\"version\":1}},\"operationName\":\"ChannelFollows\",\"variables\":{\"limit\":%d,\"order\":\"%s\"}}";
2525
public static final String VALID_QUERY_WITH_CURSOR = "{\"extensions\":{\"persistedQuery\":{\"sha256Hash\":\"eecf815273d3d949e5cf0085cc5084cd8a1b5b7b6f7990cf43cb0beadf546907\",\"version\":1}},\"operationName\":\"ChannelFollows\",\"variables\":{\"cursor\":\"%s\",\"limit\":%d,\"order\":\"%s\"}}";
2626
private static final int LIMIT = 15;
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package fr.rakambda.channelpointsminer.miner.api.gql.gql;
2+
3+
import fr.rakambda.channelpointsminer.miner.api.gql.gql.data.GQLResponse;
4+
import fr.rakambda.channelpointsminer.miner.api.gql.gql.data.types.Stream;
5+
import fr.rakambda.channelpointsminer.miner.api.gql.gql.data.types.User;
6+
import fr.rakambda.channelpointsminer.miner.api.gql.gql.data.withislive.WithIsStreamLiveData;
7+
import fr.rakambda.channelpointsminer.miner.tests.UnirestMockExtension;
8+
import org.mockito.junit.jupiter.MockitoExtension;
9+
import org.junit.jupiter.api.Test;
10+
import org.junit.jupiter.api.extension.ExtendWith;
11+
import java.util.Map;
12+
import static org.assertj.core.api.Assertions.assertThat;
13+
14+
@ExtendWith(MockitoExtension.class)
15+
@ExtendWith(UnirestMockExtension.class)
16+
class GQLApiWithIsStreamLiveTest extends AbstractGQLTest{
17+
private static final String CHANNEL_ID = "channel-id";
18+
19+
@Test
20+
void nominalLive(){
21+
var expected = GQLResponse.<WithIsStreamLiveData> builder()
22+
.extensions(Map.of(
23+
"durationMilliseconds", 58,
24+
"operationName", "WithIsStreamLiveQuery",
25+
"requestID", "request-id"
26+
))
27+
.data(WithIsStreamLiveData.builder()
28+
.user(User.builder()
29+
.id("987654321")
30+
.stream(Stream.builder()
31+
.id("12345")
32+
.build())
33+
.build())
34+
.build())
35+
.build();
36+
37+
expectValidRequestOkWithIntegrityOk("api/gql/gql/withisstreamlive_live.json");
38+
39+
assertThat(tested.withIsStreamLive(CHANNEL_ID)).contains(expected);
40+
41+
verifyAll();
42+
}
43+
44+
@Test
45+
void nominalNotLive(){
46+
var expected = GQLResponse.<WithIsStreamLiveData> builder()
47+
.extensions(Map.of(
48+
"durationMilliseconds", 58,
49+
"operationName", "WithIsStreamLiveQuery",
50+
"requestID", "request-id"
51+
))
52+
.data(WithIsStreamLiveData.builder()
53+
.user(User.builder()
54+
.id("987654321")
55+
.build())
56+
.build())
57+
.build();
58+
59+
expectValidRequestOkWithIntegrityOk("api/gql/gql/withisstreamlive_not_live.json");
60+
61+
assertThat(tested.withIsStreamLive(CHANNEL_ID)).contains(expected);
62+
63+
verifyAll();
64+
}
65+
66+
@Override
67+
protected String getValidRequest(){
68+
return "{\"extensions\":{\"persistedQuery\":{\"sha256Hash\":\"04e46329a6786ff3a81c01c50bfa5d725902507a0deb83b0edbf7abe7a3716ea\",\"version\":1}},\"operationName\":\"WithIsStreamLiveQuery\",\"variables\":{\"id\":\"%s\"}}".formatted(CHANNEL_ID);
69+
}
70+
}

0 commit comments

Comments
 (0)