Skip to content

Commit 66a6353

Browse files
committed
[kick] Switch to chatwebhook for chat
1 parent 373a69f commit 66a6353

8 files changed

Lines changed: 98 additions & 18 deletions

File tree

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,5 +376,5 @@ require (
376376
github.com/pion/turn/v2 v2.1.6 // indirect
377377
github.com/pion/webrtc/v3 v3.3.0 // indirect
378378
github.com/tiendc/go-deepcopy v1.5.2
379-
github.com/xaionaro-go/chatwebhook v0.0.0-20251102204738-0b8b2966ba1d
379+
github.com/xaionaro-go/chatwebhook v0.0.0-20251102210754-fc155de0d2a9
380380
)

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1121,6 +1121,8 @@ github.com/xaionaro-go/chatwebhook v0.0.0-20251102191426-33a5759f705c h1:fVqdR1E
11211121
github.com/xaionaro-go/chatwebhook v0.0.0-20251102191426-33a5759f705c/go.mod h1:y6fX0jgyyfrjyj4QMRLAF/ieNHdWsccd2dcGlXNUPfI=
11221122
github.com/xaionaro-go/chatwebhook v0.0.0-20251102204738-0b8b2966ba1d h1:Q8i1p+zyF+K5pF4uz0qI8BJCjcSmQ6HVozCAS5nhv9w=
11231123
github.com/xaionaro-go/chatwebhook v0.0.0-20251102204738-0b8b2966ba1d/go.mod h1:y6fX0jgyyfrjyj4QMRLAF/ieNHdWsccd2dcGlXNUPfI=
1124+
github.com/xaionaro-go/chatwebhook v0.0.0-20251102210754-fc155de0d2a9 h1:OjxCra9GSMYajfb+brJS4oFiPUTckxipJZeFBNlCKAQ=
1125+
github.com/xaionaro-go/chatwebhook v0.0.0-20251102210754-fc155de0d2a9/go.mod h1:y6fX0jgyyfrjyj4QMRLAF/ieNHdWsccd2dcGlXNUPfI=
11241126
github.com/xaionaro-go/cloudscraper v0.0.0-20251019213127-d3687042cb55 h1:rHvnuVmqa9M+id2Q8e0V/csp0jDhXIBAAuCC76mBW8k=
11251127
github.com/xaionaro-go/cloudscraper v0.0.0-20251019213127-d3687042cb55/go.mod h1:mFODbc87KEjFfERrFjmAGajGZOBQq+kzMd+2wsAZSHo=
11261128
github.com/xaionaro-go/datacounter v1.0.4 h1:+QMZLmu73R5WGkQfUPwlXF/JFN+Weo4iuDZkiL2wVm8=

pkg/streamcontrol/kick/chat_handler.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,26 +39,41 @@ func NewChatHandler(
3939
}, nil
4040
}
4141

42+
func (k *Kick) newChatHandler(
43+
ctx context.Context,
44+
) (*ChatHandler, error) {
45+
c, err := chatwebhookclient.New(ctx, chatwebhookclient.DefaultServerAddress)
46+
if err != nil {
47+
return nil, fmt.Errorf("kick: failed to create chat webhook client: %w", err)
48+
}
49+
return NewChatHandler(ctx, c)
50+
}
51+
4252
func (h *ChatHandler) GetMessagesChan(
4353
ctx context.Context,
4454
) (<-chan streamcontrol.Event, error) {
4555
ctx, cancelFn := context.WithCancel(ctx)
46-
defer cancelFn()
4756

4857
inCh, err := h.Client.GetMessagesChan(ctx, kickcom.ID, "")
4958
if err != nil {
59+
cancelFn()
5060
return nil, fmt.Errorf("kick: failed to get messages chan: %w", err)
5161
}
5262

5363
outCh := make(chan streamcontrol.Event, 1)
5464
observability.Go(ctx, func(ctx context.Context) {
5565
defer close(outCh)
66+
defer cancelFn()
67+
logger.Debugf(ctx, "kick: started forwarding chat messages")
68+
defer logger.Debugf(ctx, "kick: stopped forwarding chat messages")
5669
for {
5770
select {
5871
case <-ctx.Done():
72+
logger.Debugf(ctx, "kick: forwarding chat messages: context is closed; %v", ctx.Err())
5973
return
6074
case ev, ok := <-inCh:
6175
if !ok {
76+
logger.Debugf(ctx, "kick: forwarding chat messages: input channel is closed")
6277
return
6378
}
6479
msg, err := convertKickEventToChatMessage(ev)

pkg/streamcontrol/kick/cmd/chatlistener/main.go

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ import (
66
"log"
77
"os"
88

9+
"github.com/davecgh/go-spew/spew"
910
"github.com/facebookincubator/go-belt"
1011
"github.com/facebookincubator/go-belt/tool/logger"
1112
xlogrus "github.com/facebookincubator/go-belt/tool/logger/implementation/logrus"
1213
"github.com/spf13/pflag"
14+
chatwebhookclient "github.com/xaionaro-go/chatwebhook/pkg/grpc/client"
1315
"github.com/xaionaro-go/streamctl/pkg/streamcontrol/kick"
1416
)
1517

@@ -20,32 +22,36 @@ func assertNoError(err error) {
2022
log.Panic(err)
2123
}
2224

25+
func must[T any](v T, err error) T {
26+
assertNoError(err)
27+
return v
28+
}
29+
2330
func main() {
2431
logLevel := logger.LevelInfo
2532
pflag.Var(&logLevel, "log-level", "")
33+
chatwebhookAddr := pflag.String("chatwebhook-server", chatwebhookclient.DefaultServerAddress, "chat webhook gRPC server address")
2634
pflag.Parse()
2735

28-
if pflag.NArg() != 1 {
29-
fmt.Fprintf(os.Stderr, "expected 1 argument\n")
36+
if pflag.NArg() != 0 {
37+
fmt.Fprintf(os.Stderr, "expected 0 argument\n")
3038
os.Exit(1)
3139
}
3240

33-
channelSlug := pflag.Arg(0)
34-
3541
ctx := logger.CtxWithLogger(context.Background(), xlogrus.Default().WithLevel(logLevel))
3642
logger.Default = func() logger.Logger {
3743
return logger.FromCtx(ctx)
3844
}
3945
defer belt.Flush(ctx)
4046

41-
h, err := kick.NewChatHandlerOBSOLETE(ctx, channelSlug)
42-
assertNoError(err)
43-
44-
msgCh, err := h.GetMessagesChan(ctx)
45-
assertNoError(err)
47+
logger.Debugf(ctx, "connecting to chat webhook server at %q", *chatwebhookAddr)
48+
c := must(chatwebhookclient.New(ctx, *chatwebhookAddr))
49+
h := must(kick.NewChatHandler(ctx, c))
50+
msgCh := must(h.GetMessagesChan(ctx))
4651

4752
fmt.Println("started")
4853
for ev := range msgCh {
49-
fmt.Printf("%#+v\n", ev)
54+
spew.Dump(ev)
5055
}
56+
fmt.Println("ended")
5157
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"os"
8+
9+
"github.com/davecgh/go-spew/spew"
10+
"github.com/facebookincubator/go-belt"
11+
"github.com/facebookincubator/go-belt/tool/logger"
12+
xlogrus "github.com/facebookincubator/go-belt/tool/logger/implementation/logrus"
13+
"github.com/spf13/pflag"
14+
"github.com/xaionaro-go/streamctl/pkg/streamcontrol/kick"
15+
)
16+
17+
func assertNoError(err error) {
18+
if err == nil {
19+
return
20+
}
21+
log.Panic(err)
22+
}
23+
24+
func must[T any](v T, err error) T {
25+
assertNoError(err)
26+
return v
27+
}
28+
29+
func main() {
30+
logLevel := logger.LevelInfo
31+
pflag.Var(&logLevel, "log-level", "")
32+
pflag.Parse()
33+
34+
if pflag.NArg() != 1 {
35+
fmt.Fprintf(os.Stderr, "expected 1 argument\n")
36+
os.Exit(1)
37+
}
38+
39+
channelSlug := pflag.Arg(0)
40+
41+
ctx := logger.CtxWithLogger(context.Background(), xlogrus.Default().WithLevel(logLevel))
42+
logger.Default = func() logger.Logger {
43+
return logger.FromCtx(ctx)
44+
}
45+
defer belt.Flush(ctx)
46+
47+
h := must(kick.NewChatHandlerOBSOLETE(ctx, channelSlug))
48+
msgCh := must(h.GetMessagesChan(ctx))
49+
50+
fmt.Println("started")
51+
for ev := range msgCh {
52+
spew.Dump(ev)
53+
}
54+
}

pkg/streamcontrol/kick/config.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55

66
"github.com/xaionaro-go/streamctl/pkg/streamcontrol"
7-
streamctl "github.com/xaionaro-go/streamctl/pkg/streamcontrol"
87
kick "github.com/xaionaro-go/streamctl/pkg/streamcontrol/kick/types"
98
)
109

@@ -17,10 +16,10 @@ type PlatformSpecificConfig = kick.PlatformSpecificConfig
1716
type OAuthHandler = kick.OAuthHandler
1817

1918
func init() {
20-
streamctl.RegisterPlatform[PlatformSpecificConfig, StreamProfile](ID)
19+
streamcontrol.RegisterPlatform[PlatformSpecificConfig, StreamProfile](ID)
2120
}
2221

23-
func InitConfig(cfg streamctl.Config) {
22+
func InitConfig(cfg streamcontrol.Config) {
2423
kick.InitConfig(cfg)
2524
}
2625

pkg/streamcontrol/kick/kick.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ func (k *Kick) keepAliveLoop(
158158
func (k *Kick) initChatHandlerNoLock(
159159
ctx context.Context,
160160
) error {
161-
chatHandler, err := k.newChatHandlerOBSOLETE(ctx, k.CurrentConfig.Config.Channel)
161+
chatHandler, err := k.newChatHandler(ctx)
162162
if err == nil {
163163
k.ChatHandler = chatHandler
164164
return nil
@@ -176,7 +176,7 @@ func (k *Kick) initChatHandlerNoLock(
176176
return fmt.Errorf("ctx is closed: %w", ctx.Err())
177177
default:
178178
}
179-
chatHandler, err = k.newChatHandlerOBSOLETE(ctx, k.CurrentConfig.Config.Channel)
179+
chatHandler, err = k.newChatHandler(ctx)
180180
if err != nil {
181181
logger.Debugf(ctx, "initChatHandler: unable to create a new chat handler: %v", err)
182182
continue
@@ -602,6 +602,10 @@ func (k *Kick) GetChatMessagesChan(
602602
k.resetChatHandler(ctx)
603603
continue
604604
}
605+
if ev.TargetChannel.Slug != k.Channel.Slug {
606+
logger.Warnf(ctx, "skipping a message for another channel: %q != %q", ev.TargetChannel.Slug, k.Channel.Slug)
607+
continue
608+
}
605609
logger.Tracef(ctx, "GetChatMessagesChan: received a message")
606610
outCh <- ev
607611
}

0 commit comments

Comments
 (0)