Skip to content

Commit 373a69f

Browse files
committed
refactoring: Use chatwebhook's models as the SOT for platform Event types
1 parent 792d823 commit 373a69f

58 files changed

Lines changed: 2730 additions & 3193 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitmodules

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[submodule "pkg/streamcontrol/protobuf/imports/chatwebhook"]
2+
path = pkg/streamcontrol/protobuf/imports/chatwebhook
3+
url = https://github.com/xaionaro-go/chatwebhook

go.mod

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ require (
333333
github.com/xaionaro-go/mediamtx v0.0.0-20250406132618-79ecbc3e138f
334334
github.com/xaionaro-go/object v0.0.0-20241026212449-753ce10ec94c
335335
github.com/xaionaro-go/obs-grpc-proxy v0.0.0-20241018162120-5faf4e7a684a
336-
github.com/xaionaro-go/observability v0.0.0-20250622130956-24b7017284e4
336+
github.com/xaionaro-go/observability v0.0.0-20251102143534-3aeb2a25e57d
337337
github.com/xaionaro-go/player v0.0.0-20251020004405-460c9f1a4b11
338338
github.com/xaionaro-go/recoder v0.0.0-20250929011527-29b198af8c77
339339
github.com/xaionaro-go/secret v0.0.0-20250111141743-ced12e1082c2
@@ -343,6 +343,7 @@ require (
343343
github.com/xaionaro-go/unsafetools v0.0.0-20241024014258-a46e1ce3763e
344344
github.com/xaionaro-go/xcontext v0.0.0-20250111150717-e70e1f5b299c
345345
github.com/xaionaro-go/xfyne v0.0.0-20250615190411-4c96281f6e25
346+
github.com/xaionaro-go/xgrpc v0.0.0-20251102160837-04b13583739a
346347
github.com/xaionaro-go/xlogrus v0.0.0-20250111150201-60557109545a
347348
github.com/xaionaro-go/xpath v0.0.0-20250111145115-55f5728f643f
348349
github.com/xaionaro-go/xsync v0.0.0-20250928140805-f801683b71ba
@@ -375,5 +376,5 @@ require (
375376
github.com/pion/turn/v2 v2.1.6 // indirect
376377
github.com/pion/webrtc/v3 v3.3.0 // indirect
377378
github.com/tiendc/go-deepcopy v1.5.2
378-
github.com/xaionaro-go/chatwebhook v0.0.0-20251102013345-80b205a8c53d
379+
github.com/xaionaro-go/chatwebhook v0.0.0-20251102204738-0b8b2966ba1d
379380
)

go.sum

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1117,8 +1117,10 @@ github.com/xaionaro-go/avmediacodec v0.0.0-20250505012527-c819676502d8 h1:FZn9+T
11171117
github.com/xaionaro-go/avmediacodec v0.0.0-20250505012527-c819676502d8/go.mod h1:2W2Kp/HJFXcFBppQ4YytgDy/ydFL3hGc23xSB1U/Luc=
11181118
github.com/xaionaro-go/avpipeline v0.0.0-20250929013757-2eb9ecc88185 h1:eZyEeFRd9Dk7gDfgkQ6ORHawh9RjUR+nBDayATHwmQ8=
11191119
github.com/xaionaro-go/avpipeline v0.0.0-20250929013757-2eb9ecc88185/go.mod h1:TiLj0o11F3HnNXC8MraetW1/MIM3x0ROUrruxBpeSuw=
1120-
github.com/xaionaro-go/chatwebhook v0.0.0-20251102013345-80b205a8c53d h1:YkfE1CQ4MCxSfJjOzIgrXYvsBg4WUQxzPI8aVYLj+WE=
1121-
github.com/xaionaro-go/chatwebhook v0.0.0-20251102013345-80b205a8c53d/go.mod h1:aQT4myeHPflWauMBxfXCUj0XnhBsA1OhJSAH2G08ap0=
1120+
github.com/xaionaro-go/chatwebhook v0.0.0-20251102191426-33a5759f705c h1:fVqdR1EDNO8K0l9ZxhN94X7rKAoDjUhy7OSUskOFOpQ=
1121+
github.com/xaionaro-go/chatwebhook v0.0.0-20251102191426-33a5759f705c/go.mod h1:y6fX0jgyyfrjyj4QMRLAF/ieNHdWsccd2dcGlXNUPfI=
1122+
github.com/xaionaro-go/chatwebhook v0.0.0-20251102204738-0b8b2966ba1d h1:Q8i1p+zyF+K5pF4uz0qI8BJCjcSmQ6HVozCAS5nhv9w=
1123+
github.com/xaionaro-go/chatwebhook v0.0.0-20251102204738-0b8b2966ba1d/go.mod h1:y6fX0jgyyfrjyj4QMRLAF/ieNHdWsccd2dcGlXNUPfI=
11221124
github.com/xaionaro-go/cloudscraper v0.0.0-20251019213127-d3687042cb55 h1:rHvnuVmqa9M+id2Q8e0V/csp0jDhXIBAAuCC76mBW8k=
11231125
github.com/xaionaro-go/cloudscraper v0.0.0-20251019213127-d3687042cb55/go.mod h1:mFODbc87KEjFfERrFjmAGajGZOBQq+kzMd+2wsAZSHo=
11241126
github.com/xaionaro-go/datacounter v1.0.4 h1:+QMZLmu73R5WGkQfUPwlXF/JFN+Weo4iuDZkiL2wVm8=
@@ -1165,8 +1167,8 @@ github.com/xaionaro-go/object v0.0.0-20241026212449-753ce10ec94c h1:2CIIxTRox9au
11651167
github.com/xaionaro-go/object v0.0.0-20241026212449-753ce10ec94c/go.mod h1:vRcA12NWsR0IrS75eqnPBs5aVfCYmJy4bR+6DbJpBCg=
11661168
github.com/xaionaro-go/obs-grpc-proxy v0.0.0-20241018162120-5faf4e7a684a h1:PyX7XpLkj+eAwrPMFMGpvZIG4zBfzAfwNhwTtbORqN0=
11671169
github.com/xaionaro-go/obs-grpc-proxy v0.0.0-20241018162120-5faf4e7a684a/go.mod h1:exSKIlCibB0ww+ABDwH+YG/iNdqVfdzXBBg5LYxkxGw=
1168-
github.com/xaionaro-go/observability v0.0.0-20250622130956-24b7017284e4 h1:dlsJ1l9WI+LTpB5z4f7FCLLrxJ7uV4tj9jWNqpibhA0=
1169-
github.com/xaionaro-go/observability v0.0.0-20250622130956-24b7017284e4/go.mod h1:GkNC0+nPJMOzotTzJVlH4+DvDg9cKrV9qqOe+8/kVw4=
1170+
github.com/xaionaro-go/observability v0.0.0-20251102143534-3aeb2a25e57d h1:3oelsD973xao6Y1ucukBXElEfoSoEJ6naArGf7OQKTU=
1171+
github.com/xaionaro-go/observability v0.0.0-20251102143534-3aeb2a25e57d/go.mod h1:83Ywq4Ukg4EtS3+1HvaboKpt6sqZzwv7oZ5+jLkzSNc=
11701172
github.com/xaionaro-go/player v0.0.0-20251020004405-460c9f1a4b11 h1:F6tbYuPZRSaNWklbZkPpUZJ29rqB50EHr8RfhvnwZ7o=
11711173
github.com/xaionaro-go/player v0.0.0-20251020004405-460c9f1a4b11/go.mod h1:FolxF7IWLEE5aTHOJiZ+6lAk9sGCvOkeqcbw1dYeu2w=
11721174
github.com/xaionaro-go/proxy v0.0.0-20250525144747-579f5a891c15 h1:Qqoy9MDWq2Yh6uazAqQDzqU0doalTL3tRjNCo7X7GXA=
@@ -1194,6 +1196,8 @@ github.com/xaionaro-go/xcontext v0.0.0-20250111150717-e70e1f5b299c h1://sE/WLpO7
11941196
github.com/xaionaro-go/xcontext v0.0.0-20250111150717-e70e1f5b299c/go.mod h1:MGRT1+2m2adVRc4aAn0RVGysjsSRKN9VCyBJLHvQd4k=
11951197
github.com/xaionaro-go/xfyne v0.0.0-20250615190411-4c96281f6e25 h1:zGpt+U8zOCB/gMOpJZ1IzqjfU85fvbL9LUvSL/R/xEk=
11961198
github.com/xaionaro-go/xfyne v0.0.0-20250615190411-4c96281f6e25/go.mod h1:52pwXTJFBlreHnA0sf8EoJvw5MZZNPSRm9kpdSqHOuk=
1199+
github.com/xaionaro-go/xgrpc v0.0.0-20251102160837-04b13583739a h1:rjI5y2zUNQ4Cpxg+DcxCa/RCQgIIw5cHjJGKMtsmH8U=
1200+
github.com/xaionaro-go/xgrpc v0.0.0-20251102160837-04b13583739a/go.mod h1:9mSKZHC8O/CxLVq8bjrUz1ZavPFhn5bGxr60do2hL6g=
11971201
github.com/xaionaro-go/xlogrus v0.0.0-20250111150201-60557109545a h1:EoNRdOtBMnZedKfW7/4xtGmGHC+L+jfmLe5UOnF1tqc=
11981202
github.com/xaionaro-go/xlogrus v0.0.0-20250111150201-60557109545a/go.mod h1:RPfWNuwqJykkA2TEvisXqHgy1ypA/1H2HBdIRSeVJ9o=
11991203
github.com/xaionaro-go/xpath v0.0.0-20250111145115-55f5728f643f h1:ofxY1akRlVdJ/AEDj0EakK4Aez8fzeWTTe2mCAZiJ0A=

pkg/chatmessagesstorage/remove_message.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,20 @@ import (
1111

1212
func (s *ChatMessagesStorage) RemoveMessage(
1313
ctx context.Context,
14-
msgID streamcontrol.ChatMessageID,
14+
msgID streamcontrol.EventID,
1515
) error {
1616
return xsync.DoA2R1(ctx, &s.Mutex, s.removeMessageLocked, ctx, msgID)
1717
}
1818

1919
func (s *ChatMessagesStorage) removeMessageLocked(
2020
ctx context.Context,
21-
msgID streamcontrol.ChatMessageID,
21+
msgID streamcontrol.EventID,
2222
) (_err error) {
2323
logger.Tracef(ctx, "removeMessageLocked(ctx, '%v')", msgID)
2424
defer func() { logger.Tracef(ctx, "/removeMessageLocked(ctx, '%v'): %v", msgID, _err) }()
2525

2626
for idx := range s.Messages {
27-
if s.Messages[idx].MessageID != msgID {
27+
if s.Messages[idx].ID != msgID {
2828
continue
2929
}
3030
s.Messages[idx] = s.Messages[len(s.Messages)-1]

pkg/chatmessagesstorage/sort_and_deduplicate.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,26 @@ func msgLess(ctx context.Context, a *api.ChatMessage, b *api.ChatMessage) bool {
1212
if a.CreatedAt != b.CreatedAt {
1313
return a.CreatedAt.Before(b.CreatedAt)
1414
}
15-
if a.EventType != b.EventType {
16-
return a.EventType < b.EventType
15+
if a.Type != b.Type {
16+
return a.Type < b.Type
1717
}
1818
if a.Platform != b.Platform {
1919
return a.Platform < b.Platform
2020
}
21-
if a.Username != b.Username {
22-
return a.Username < b.Username
21+
if a.User.ID != b.User.ID {
22+
return a.User.ID < b.User.ID
2323
}
24-
if a.MessageID != b.MessageID {
25-
return a.MessageID < b.MessageID
24+
if a.User.Slug != b.User.Slug {
25+
return a.User.Slug < b.User.Slug
26+
}
27+
if a.User.Name != b.User.Name {
28+
return a.User.Name < b.User.Name
2629
}
2730
if a.Message != b.Message {
28-
return a.Message < b.Message
31+
return a.Message.Content < b.Message.Content
32+
}
33+
if a.ID != b.ID {
34+
return a.ID < b.ID
2935
}
3036
if a != b {
3137
logger.Errorf(ctx, "msgs A and B look equal, but are not: A:%#+v B:%#+v", a, b)

pkg/streamcontrol/event.go

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,33 @@ import (
55
"time"
66
)
77

8-
type ChatUserID string
9-
type ChatMessageID string
8+
type EventID string
9+
type UserID string
10+
type Tier string
1011

11-
type ChatMessage struct {
12-
CreatedAt time.Time
13-
EventType EventType
14-
UserID ChatUserID
15-
Username string
16-
MessageID ChatMessageID
17-
Message string
18-
MessageFormatType TextFormatType
19-
Paid Money
12+
type User struct {
13+
ID UserID
14+
Slug string
15+
Name string
16+
}
17+
18+
type Message struct {
19+
Content string
20+
Format TextFormatType
21+
InReplyTo *EventID
22+
}
23+
24+
type Event struct {
25+
ID EventID
26+
CreatedAt time.Time
27+
ExpiresAt *time.Time
28+
Type EventType
29+
User User
30+
TargetUser *User
31+
TargetChannel *User
32+
Message *Message
33+
Paid *Money
34+
Tier *Tier
2035
}
2136

2237
type EventType int
@@ -31,9 +46,12 @@ const (
3146
EventTypeFollow
3247
EventTypeRaid
3348
EventTypeChannelShoutoutReceive
34-
EventTypeSubscribe
49+
EventTypeSubscriptionNew
50+
EventTypeSubscriptionRenewed
51+
EventTypeGiftedSubscription
3552
EventTypeStreamOnline
3653
EventTypeStreamOffline
54+
EventTypeStreamInfoUpdate
3755
EventTypeOther
3856
)
3957

@@ -57,12 +75,16 @@ func (t EventType) String() string {
5775
return "raid"
5876
case EventTypeChannelShoutoutReceive:
5977
return "channel_shoutout_receive"
60-
case EventTypeSubscribe:
61-
return "subscribe"
78+
case EventTypeSubscriptionNew:
79+
return "subscription_new"
80+
case EventTypeSubscriptionRenewed:
81+
return "subscription_renewed"
6282
case EventTypeStreamOnline:
6383
return "stream_online"
6484
case EventTypeStreamOffline:
6585
return "stream_offline"
86+
case EventTypeStreamInfoUpdate:
87+
return "stream_info_update"
6688
case EventTypeOther:
6789
return "other"
6890
}

pkg/streamcontrol/kick/chat_handler.go

Lines changed: 60 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -5,71 +5,81 @@ import (
55
"fmt"
66

77
"github.com/facebookincubator/go-belt/tool/logger"
8-
"github.com/scorfly/gokick"
8+
"github.com/xaionaro-go/chatwebhook/pkg/chatwebhook/kickcom"
9+
chatwebhookclient "github.com/xaionaro-go/chatwebhook/pkg/grpc/client"
10+
"github.com/xaionaro-go/chatwebhook/pkg/grpc/protobuf/go/chatwebhook_grpc"
11+
"github.com/xaionaro-go/observability"
12+
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
13+
"github.com/xaionaro-go/streamctl/pkg/streamcontrol/protobuf/goconv"
914
)
1015

16+
type ChatHandlerAbstract interface {
17+
GetMessagesChan(
18+
ctx context.Context,
19+
) (<-chan streamcontrol.Event, error)
20+
}
21+
1122
type ChatHandler struct {
12-
onClose func(context.Context)
23+
Client *chatwebhookclient.Client
1324
}
1425

26+
var _ ChatHandlerAbstract = (*ChatHandler)(nil)
27+
1528
func NewChatHandler(
1629
ctx context.Context,
17-
client *gokick.Client,
18-
broadcasterUserID *int,
19-
onClose func(context.Context),
30+
client *chatwebhookclient.Client,
2031
) (_ret *ChatHandler, _err error) {
21-
logger.Debugf(ctx, "NewChatHandler(ctx, client, %p)", onClose)
32+
logger.Debugf(ctx, "NewChatHandler")
2233
defer func() {
23-
logger.Debugf(ctx, "/NewChatHandler(ctx, client, %p): %#+v %v", onClose, _ret, _err)
34+
logger.Debugf(ctx, "/NewChatHandler: %#+v %v", _ret, _err)
2435
}()
2536

37+
return &ChatHandler{
38+
Client: client,
39+
}, nil
40+
}
41+
42+
func (h *ChatHandler) GetMessagesChan(
43+
ctx context.Context,
44+
) (<-chan streamcontrol.Event, error) {
2645
ctx, cancelFn := context.WithCancel(ctx)
2746
defer cancelFn()
2847

29-
subscriptions := []gokick.SubscriptionRequest{
30-
{
31-
Name: gokick.SubscriptionNameChatMessage,
32-
Version: 1,
33-
},
34-
{
35-
Name: gokick.SubscriptionNameChannelFollow,
36-
Version: 1,
37-
},
38-
{
39-
Name: gokick.SubscriptionNameChannelSubscriptionRenewal,
40-
Version: 1,
41-
},
42-
{
43-
Name: gokick.SubscriptionNameChannelSubscriptionGifts,
44-
Version: 1,
45-
},
46-
{
47-
Name: gokick.SubscriptionNameChannelSubscriptionCreated,
48-
Version: 1,
49-
},
50-
{
51-
Name: gokick.SubscriptionNameLivestreamStatusUpdated,
52-
Version: 1,
53-
},
54-
{
55-
Name: gokick.SubscriptionNameLivestreamMetadataUpdated,
56-
Version: 1,
57-
},
58-
{
59-
Name: gokick.SubscriptionNameModerationBanned,
60-
Version: 1,
61-
},
62-
}
63-
response, err := client.CreateSubscriptions(
64-
ctx,
65-
gokick.SubscriptionMethodWebhook, // PANIC: we need websockets, we cannot use webhook
66-
subscriptions,
67-
broadcasterUserID,
68-
)
48+
inCh, err := h.Client.GetMessagesChan(ctx, kickcom.ID, "")
6949
if err != nil {
70-
return nil, fmt.Errorf("unable to create the event subscriptions: %w", err)
50+
return nil, fmt.Errorf("kick: failed to get messages chan: %w", err)
51+
}
52+
53+
outCh := make(chan streamcontrol.Event, 1)
54+
observability.Go(ctx, func(ctx context.Context) {
55+
defer close(outCh)
56+
for {
57+
select {
58+
case <-ctx.Done():
59+
return
60+
case ev, ok := <-inCh:
61+
if !ok {
62+
return
63+
}
64+
msg, err := convertKickEventToChatMessage(ev)
65+
if err != nil {
66+
logger.Errorf(ctx, "failed to convert kick event to chat message: %v", err)
67+
continue
68+
}
69+
outCh <- msg
70+
}
71+
}
72+
})
73+
74+
return outCh, nil
75+
}
76+
77+
func convertKickEventToChatMessage(
78+
ev *chatwebhook_grpc.Event,
79+
) (streamcontrol.Event, error) {
80+
if ev == nil {
81+
return streamcontrol.Event{}, fmt.Errorf("event is nil")
7182
}
72-
logger.Debugf(ctx, "subscriptions: %#+v", response)
7383

74-
panic("not implemented, yet")
84+
return goconv.EventGRPC2Go(ev), nil
7585
}

pkg/streamcontrol/kick/chat_handler_obsolete.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88

99
type ChatHandlerOBSOLETE = chathandlerobsolete.ChatHandlerOBSOLETE
1010

11+
var _ ChatHandlerAbstract = (*ChatHandlerOBSOLETE)(nil)
12+
1113
func (k *Kick) newChatHandlerOBSOLETE(
1214
ctx context.Context,
1315
channelSlug string,

pkg/streamcontrol/kick/chathandlerobsolete/client/client.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
"github.com/xaionaro-go/streamctl/pkg/streamcontrol/kick/chathandlerobsolete/protobuf/go/chathandlerobsolete_grpc"
1111
"github.com/xaionaro-go/streamctl/pkg/streamcontrol/protobuf/go/streamcontrol_grpc"
1212
"github.com/xaionaro-go/streamctl/pkg/streamcontrol/protobuf/goconv"
13-
"github.com/xaionaro-go/streamctl/pkg/xgrpc"
13+
"github.com/xaionaro-go/xgrpc"
1414
"google.golang.org/grpc"
1515
"google.golang.org/grpc/credentials/insecure"
1616
)
@@ -109,7 +109,7 @@ func (c *GRPCClient) GetCallWrapper() xgrpc.CallWrapperFunc {
109109

110110
func (c *GRPCClient) GetMessagesChan(
111111
ctx context.Context,
112-
) (<-chan streamcontrol.ChatMessage, error) {
112+
) (<-chan streamcontrol.Event, error) {
113113
return xgrpc.UnwrapChan(
114114
ctx,
115115
c,
@@ -128,9 +128,9 @@ func (c *GRPCClient) GetMessagesChan(
128128
},
129129
func(
130130
ctx context.Context,
131-
event *streamcontrol_grpc.ChatMessage,
132-
) streamcontrol.ChatMessage {
133-
return goconv.ChatMessageGRPC2Go(event)
131+
event *streamcontrol_grpc.Event,
132+
) streamcontrol.Event {
133+
return goconv.EventGRPC2Go(event)
134134
},
135135
)
136136
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
../../../protobuf/chat

0 commit comments

Comments
 (0)