Skip to content

Commit 7f2f1ef

Browse files
committed
feat: tray event client
Signed-off-by: Chris Gianelloni <wolf31o2@blinklabs.io>
1 parent d6d7158 commit 7f2f1ef

File tree

4 files changed

+767
-0
lines changed

4 files changed

+767
-0
lines changed

tray/events.go

Lines changed: 282 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
// Copyright 2026 Blink Labs Software
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package tray
16+
17+
import (
18+
"encoding/json"
19+
"errors"
20+
"fmt"
21+
"log/slog"
22+
"net/url"
23+
"strings"
24+
"sync"
25+
"time"
26+
27+
"github.com/blinklabs-io/adder/event"
28+
"github.com/gorilla/websocket"
29+
)
30+
31+
const (
32+
eventChanBuffer = 64
33+
maxReconnectDelay = 30 * time.Second
34+
baseReconnectDelay = 500 * time.Millisecond
35+
)
36+
37+
// EventClient connects to adder's /events WebSocket endpoint and
38+
// delivers parsed events on a channel. It reconnects automatically
39+
// with exponential backoff when the connection drops.
40+
type EventClient struct {
41+
address string
42+
port uint
43+
events chan event.Event
44+
stopCh chan struct{}
45+
status *StatusTracker
46+
typeFilter []string
47+
mu sync.Mutex
48+
conn *websocket.Conn // guarded by mu
49+
started bool
50+
closeOnce sync.Once
51+
wg sync.WaitGroup
52+
}
53+
54+
// EventClientOption is a functional option for EventClient.
55+
type EventClientOption func(*EventClient)
56+
57+
// WithEventTypes sets the event types to subscribe to. The types are
58+
// sent as a query parameter on the WS URL for server-side filtering.
59+
func WithEventTypes(types []string) EventClientOption {
60+
return func(c *EventClient) {
61+
c.typeFilter = types
62+
}
63+
}
64+
65+
// WithEventStatusTracker sets a StatusTracker that the client updates
66+
// as its connection state changes.
67+
func WithEventStatusTracker(t *StatusTracker) EventClientOption {
68+
return func(c *EventClient) {
69+
c.status = t
70+
}
71+
}
72+
73+
// NewEventClient creates an EventClient that will connect to the
74+
// given address and port.
75+
func NewEventClient(
76+
address string,
77+
port uint,
78+
opts ...EventClientOption,
79+
) *EventClient {
80+
c := &EventClient{
81+
address: address,
82+
port: port,
83+
events: make(chan event.Event, eventChanBuffer),
84+
stopCh: make(chan struct{}),
85+
status: NewStatusTracker(),
86+
}
87+
for _, opt := range opts {
88+
opt(c)
89+
}
90+
return c
91+
}
92+
93+
// Events returns a read-only channel of events received from the
94+
// server.
95+
func (c *EventClient) Events() <-chan event.Event {
96+
return c.events
97+
}
98+
99+
// Start begins the background connection loop. Returns an error if
100+
// already started.
101+
func (c *EventClient) Start() error {
102+
c.mu.Lock()
103+
defer c.mu.Unlock()
104+
105+
if c.started {
106+
return errors.New("event client already started")
107+
}
108+
c.started = true
109+
c.status.Set(StatusStarting)
110+
111+
c.wg.Add(1)
112+
go func() {
113+
defer c.wg.Done()
114+
c.connectLoop()
115+
}()
116+
return nil
117+
}
118+
119+
// Stop closes the connection and stops reconnection. It is safe to
120+
// call multiple times or without a prior Start.
121+
func (c *EventClient) Stop() {
122+
c.mu.Lock()
123+
wasStarted := c.started
124+
c.mu.Unlock()
125+
126+
if !wasStarted {
127+
return
128+
}
129+
130+
c.closeOnce.Do(func() {
131+
close(c.stopCh)
132+
})
133+
134+
// Close the active connection to unblock ReadMessage
135+
c.mu.Lock()
136+
if c.conn != nil {
137+
c.conn.Close()
138+
}
139+
c.mu.Unlock()
140+
141+
c.wg.Wait()
142+
}
143+
144+
// wsURL builds the WebSocket URL with optional type filter query
145+
// parameter.
146+
func (c *EventClient) wsURL() string {
147+
u := url.URL{
148+
Scheme: "ws",
149+
Host: fmt.Sprintf("%s:%d", c.address, c.port),
150+
Path: "/events",
151+
}
152+
if len(c.typeFilter) > 0 {
153+
q := u.Query()
154+
q.Set("types", strings.Join(c.typeFilter, ","))
155+
u.RawQuery = q.Encode()
156+
}
157+
return u.String()
158+
}
159+
160+
// connectLoop is the main reconnection loop. It runs in its own
161+
// goroutine and exits when stopCh is closed.
162+
func (c *EventClient) connectLoop() {
163+
defer func() {
164+
c.status.Set(StatusStopped)
165+
close(c.events)
166+
}()
167+
168+
attempt := 0
169+
for {
170+
select {
171+
case <-c.stopCh:
172+
return
173+
default:
174+
}
175+
176+
conn, err := c.dial()
177+
if err != nil {
178+
slog.Debug("ws dial failed", "error", err, "attempt", attempt)
179+
attempt++
180+
if !c.backoff(attempt) {
181+
return
182+
}
183+
continue
184+
}
185+
186+
// Connected — store conn so Stop() can close it
187+
c.mu.Lock()
188+
c.conn = conn
189+
c.mu.Unlock()
190+
191+
attempt = 0
192+
c.status.Set(StatusConnected)
193+
slog.Info("connected to adder events endpoint", "url", c.wsURL())
194+
195+
// Read events until error or stop
196+
reconnect := c.readLoop(conn)
197+
198+
c.mu.Lock()
199+
c.conn = nil
200+
c.mu.Unlock()
201+
conn.Close()
202+
203+
if !reconnect {
204+
return
205+
}
206+
207+
// Connection lost, try to reconnect
208+
c.status.Set(StatusReconnecting)
209+
attempt++
210+
if !c.backoff(attempt) {
211+
return
212+
}
213+
}
214+
}
215+
216+
// dial connects to the WS endpoint.
217+
func (c *EventClient) dial() (*websocket.Conn, error) {
218+
conn, _, err := websocket.DefaultDialer.Dial(c.wsURL(), nil)
219+
if err != nil {
220+
return nil, fmt.Errorf("dialing ws: %w", err)
221+
}
222+
return conn, nil
223+
}
224+
225+
// readLoop reads messages from the WS connection and delivers them
226+
// to the events channel. Returns true if it should reconnect, false
227+
// if stop was requested.
228+
func (c *EventClient) readLoop(conn *websocket.Conn) bool {
229+
for {
230+
_, msg, err := conn.ReadMessage()
231+
if err != nil {
232+
// Check if we were asked to stop
233+
select {
234+
case <-c.stopCh:
235+
return false
236+
default:
237+
slog.Debug("ws read error, will reconnect", "error", err)
238+
return true
239+
}
240+
}
241+
242+
var evt event.Event
243+
if err := json.Unmarshal(msg, &evt); err != nil {
244+
slog.Debug("failed to unmarshal event", "error", err)
245+
continue
246+
}
247+
248+
// Non-blocking send
249+
select {
250+
case c.events <- evt:
251+
default:
252+
// channel full, drop event
253+
}
254+
}
255+
}
256+
257+
// backoff waits for an exponential delay before the next reconnect
258+
// attempt. Returns false if stop was requested during the wait.
259+
func (c *EventClient) backoff(attempt int) bool {
260+
delay := backoffDelay(attempt)
261+
slog.Debug("reconnect backoff", "delay", delay, "attempt", attempt)
262+
263+
select {
264+
case <-time.After(delay):
265+
return true
266+
case <-c.stopCh:
267+
return false
268+
}
269+
}
270+
271+
// backoffDelay returns an exponential delay capped at
272+
// maxReconnectDelay.
273+
func backoffDelay(attempt int) time.Duration {
274+
delay := baseReconnectDelay
275+
for i := 1; i < attempt; i++ {
276+
delay *= 2
277+
if delay > maxReconnectDelay {
278+
return maxReconnectDelay
279+
}
280+
}
281+
return delay
282+
}

0 commit comments

Comments
 (0)