@@ -2,170 +2,22 @@ package kick
22
33import (
44 "context"
5- "fmt"
6- "slices"
7- "strconv"
8- "time"
95
10- "github.com/facebookincubator/go-belt/tool/logger"
11- "github.com/xaionaro-go/kickcom"
12- "github.com/xaionaro-go/observability"
13- "github.com/xaionaro-go/streamctl/pkg/streamcontrol"
6+ "github.com/xaionaro-go/streamctl/pkg/streamcontrol/kick/chathandlerobsolete"
147)
158
16- type ChatClientOBSOLETE interface {
17- GetChatMessagesV2 (
18- ctx context.Context ,
19- channelID uint64 ,
20- cursor uint64 ,
21- ) (* kickcom.ChatMessagesV2Reply , error )
22- }
23-
24- type ChatHandlerOBSOLETE struct {
25- currentCursor uint64
26- channelID uint64
27- lastMessageID string
28- client ChatClientOBSOLETE
29- cancelFunc context.CancelFunc
30- messagesOutChan chan streamcontrol.ChatMessage
31- onClose func (context.Context , * ChatHandlerOBSOLETE )
32- }
9+ type ChatHandlerOBSOLETE = chathandlerobsolete.ChatHandlerOBSOLETE
3310
3411func (k * Kick ) newChatHandlerOBSOLETE (
3512 ctx context.Context ,
3613 channelSlug string ,
37- onClose func (context.Context , * ChatHandlerOBSOLETE ),
3814) (* ChatHandlerOBSOLETE , error ) {
39- reverseEngClient , err := kickcom .New ()
40- if err != nil {
41- return nil , fmt .Errorf ("unable to initialize a client to Kick: %w" , err )
42- }
43-
44- var resp * kickcom.ChannelV1
45- {
46- ctx , cancelFn := context .WithTimeout (ctx , time .Minute )
47- resp , err = reverseEngClient .GetChannelV1 (ctx , channelSlug )
48- cancelFn ()
49- if err != nil {
50- return nil , fmt .Errorf ("unable to get channel '%s' info: %w" , channelSlug , err )
51- }
52- }
53- return NewChatHandlerOBSOLETE (ctx , reverseEngClient , resp .ID , onClose )
15+ return NewChatHandlerOBSOLETE (ctx , channelSlug )
5416}
5517
5618func NewChatHandlerOBSOLETE (
5719 ctx context.Context ,
58- chatClient ChatClientOBSOLETE ,
59- channelID uint64 ,
60- onClose func (context.Context , * ChatHandlerOBSOLETE ),
61- ) (_ret * ChatHandlerOBSOLETE , _err error ) {
62- logger .Debugf (ctx , "NewChatHandlerOBSOLETE(ctx, client, %d, %p)" , channelID , onClose )
63- defer func () {
64- logger .Debugf (ctx , "/NewChatHandlerOBSOLETE(ctx, client, %d, %p): %#+v %v" , channelID , onClose , _ret , _err )
65- }()
66-
67- ctx , cancelFn := context .WithCancel (ctx )
68- h := & ChatHandlerOBSOLETE {
69- currentCursor : 0 ,
70- client : chatClient ,
71- channelID : channelID ,
72- cancelFunc : cancelFn ,
73- onClose : onClose ,
74- }
75- h .init (ctx )
76- return h , nil
77- }
78-
79- func (h * ChatHandlerOBSOLETE ) init (ctx context.Context ) {
80- h .messagesOutChan = make (chan streamcontrol.ChatMessage , 100 )
81- observability .Go (ctx , func (ctx context.Context ) {
82- if h .onClose != nil {
83- defer h .onClose (ctx , h )
84- }
85- defer close (h .messagesOutChan )
86- h .loop (ctx )
87- })
88- }
89-
90- func (h * ChatHandlerOBSOLETE ) loop (ctx context.Context ) {
91- err := h .iterate (ctx )
92- if err != nil {
93- logger .Errorf (ctx , "unable to perform an iteration: %v" , err )
94- return
95- }
96-
97- t := time .NewTicker (time .Second )
98- defer t .Stop ()
99- for {
100- select {
101- case <- ctx .Done ():
102- return
103- case <- t .C :
104- }
105- err := h .iterate (ctx )
106- if err != nil {
107- logger .Errorf (ctx , "unable to perform an iteration: %v" , err )
108- return
109- }
110- }
111- }
112-
113- func (h * ChatHandlerOBSOLETE ) iterate (ctx context.Context ) error {
114- startTS := time .Now ()
115- reply , err := h .client .GetChatMessagesV2 (ctx , uint64 (h .channelID ), 0 )
116- if err != nil {
117- return fmt .Errorf ("unable to get the chat messages of channel with ID %d: %w" , h .channelID , err )
118- }
119- rtt := time .Since (startTS )
120- logger .Tracef (ctx , "round trip time == %v (messages count: %d)" , rtt , len (reply .Data .Messages ))
121-
122- // TODO: use the cursor instead of message ID to avoid duplicates
123- if reply .Data .Cursor != "" {
124- cursor , err := strconv .ParseUint (reply .Data .Cursor , 10 , 64 )
125- if err != nil {
126- return fmt .Errorf ("unable to parse the cursor value '%s': %w" , reply .Data .Cursor , err )
127- }
128- h .currentCursor = cursor
129- } else {
130- h .currentCursor = 0
131- }
132-
133- // assuming reply.Data is already sorted by CreatedAt DESC
134-
135- slices .Reverse (reply .Data .Messages )
136-
137- // skipping everything we already notified about
138- var firstNewIdx int
139- for idx , msg := range reply .Data .Messages {
140- if msg .ID == h .lastMessageID {
141- firstNewIdx = idx + 1
142- break
143- }
144- }
145-
146- for _ , msg := range reply .Data .Messages [firstNewIdx :] {
147- h .sendMessage (msg )
148- }
149- return nil
150- }
151-
152- func (h * ChatHandlerOBSOLETE ) sendMessage (
153- msg kickcom.ChatMessageV2 ,
154- ) {
155- h .lastMessageID = msg .ID
156- select {
157- case h .messagesOutChan <- streamcontrol.ChatMessage {
158- CreatedAt : msg .CreatedAt ,
159- EventType : streamcontrol .EventTypeChatMessage ,
160- UserID : streamcontrol .ChatUserID (fmt .Sprintf ("%d" , msg .UserID )),
161- Username : msg .Sender .Slug ,
162- MessageID : streamcontrol .ChatMessageID (msg .ID ),
163- Message : msg .Content ,
164- MessageFormatType : streamcontrol .TextFormatTypePlain ,
165- }:
166- default :
167- }
168- }
169- func (h * ChatHandlerOBSOLETE ) MessagesChan () <- chan streamcontrol.ChatMessage {
170- return h .messagesOutChan
20+ channelSlug string ,
21+ ) (* ChatHandlerOBSOLETE , error ) {
22+ return chathandlerobsolete .New (ctx , channelSlug )
17123}
0 commit comments