Skip to content

Commit 1e90d71

Browse files
authored
feat: tray event client (#639)
Signed-off-by: Chris Gianelloni <wolf31o2@blinklabs.io>
1 parent c93e37e commit 1e90d71

File tree

4 files changed

+868
-0
lines changed

4 files changed

+868
-0
lines changed

tray/events.go

Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
// Copyright 2026 Blink Labs Software
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package tray
16+
17+
import (
18+
"encoding/json"
19+
"errors"
20+
"fmt"
21+
"log/slog"
22+
"net/url"
23+
"strings"
24+
"sync"
25+
"time"
26+
27+
"github.com/blinklabs-io/adder/event"
28+
"github.com/gorilla/websocket"
29+
)
30+
31+
const (
32+
eventChanBuffer = 64
33+
maxReconnectDelay = 30 * time.Second
34+
baseReconnectDelay = 500 * time.Millisecond
35+
)
36+
37+
// EventClient connects to adder's /events WebSocket endpoint and
38+
// delivers parsed events on a channel. It reconnects automatically
39+
// with exponential backoff when the connection drops.
40+
type EventClient struct {
41+
address string
42+
port uint
43+
events chan event.Event
44+
stopCh chan struct{}
45+
status *StatusTracker
46+
typeFilter []string
47+
mu sync.Mutex
48+
conn *websocket.Conn // guarded by mu
49+
started bool
50+
closeOnce sync.Once
51+
wg sync.WaitGroup
52+
}
53+
54+
// EventClientOption is a functional option for EventClient.
55+
type EventClientOption func(*EventClient)
56+
57+
// WithEventTypes sets the event types to subscribe to. The types are
58+
// sent as a query parameter on the WS URL for server-side filtering.
59+
func WithEventTypes(types []string) EventClientOption {
60+
return func(c *EventClient) {
61+
c.typeFilter = append([]string(nil), types...)
62+
}
63+
}
64+
65+
// WithEventStatusTracker sets a StatusTracker that the client updates
66+
// as its connection state changes. A nil tracker is ignored.
67+
func WithEventStatusTracker(t *StatusTracker) EventClientOption {
68+
return func(c *EventClient) {
69+
if t != nil {
70+
c.status = t
71+
}
72+
}
73+
}
74+
75+
// NewEventClient creates an EventClient that will connect to the
76+
// given address and port.
77+
func NewEventClient(
78+
address string,
79+
port uint,
80+
opts ...EventClientOption,
81+
) *EventClient {
82+
c := &EventClient{
83+
address: address,
84+
port: port,
85+
events: make(chan event.Event, eventChanBuffer),
86+
stopCh: make(chan struct{}),
87+
status: NewStatusTracker(),
88+
}
89+
for _, opt := range opts {
90+
opt(c)
91+
}
92+
return c
93+
}
94+
95+
// Events returns a read-only channel of events received from the
96+
// server.
97+
func (c *EventClient) Events() <-chan event.Event {
98+
return c.events
99+
}
100+
101+
// Start begins the background connection loop. Returns an error if
102+
// already started.
103+
func (c *EventClient) Start() error {
104+
c.mu.Lock()
105+
defer c.mu.Unlock()
106+
107+
if c.started {
108+
return errors.New("event client already started")
109+
}
110+
c.started = true
111+
c.status.Set(StatusStarting)
112+
113+
c.wg.Add(1)
114+
go func() {
115+
defer c.wg.Done()
116+
c.connectLoop()
117+
}()
118+
return nil
119+
}
120+
121+
// Stop closes the connection and stops reconnection. It is safe to
122+
// call multiple times or without a prior Start.
123+
func (c *EventClient) Stop() {
124+
c.mu.Lock()
125+
wasStarted := c.started
126+
c.mu.Unlock()
127+
128+
if !wasStarted {
129+
return
130+
}
131+
132+
c.closeOnce.Do(func() {
133+
close(c.stopCh)
134+
})
135+
136+
// Close the active connection to unblock ReadMessage
137+
c.mu.Lock()
138+
if c.conn != nil {
139+
c.conn.Close()
140+
}
141+
c.mu.Unlock()
142+
143+
c.wg.Wait()
144+
}
145+
146+
// wsURL builds the WebSocket URL with optional type filter query
147+
// parameter.
148+
func (c *EventClient) wsURL() string {
149+
u := url.URL{
150+
Scheme: "ws",
151+
Host: fmt.Sprintf("%s:%d", c.address, c.port),
152+
Path: "/events",
153+
}
154+
if len(c.typeFilter) > 0 {
155+
q := u.Query()
156+
q.Set("types", strings.Join(c.typeFilter, ","))
157+
u.RawQuery = q.Encode()
158+
}
159+
return u.String()
160+
}
161+
162+
// connectLoop is the main reconnection loop. It runs in its own
163+
// goroutine and exits when stopCh is closed.
164+
func (c *EventClient) connectLoop() {
165+
defer func() {
166+
c.status.Set(StatusStopped)
167+
close(c.events)
168+
}()
169+
170+
attempt := 0
171+
for {
172+
select {
173+
case <-c.stopCh:
174+
return
175+
default:
176+
}
177+
178+
conn, err := c.dial()
179+
if err != nil {
180+
slog.Debug("ws dial failed", "error", err, "attempt", attempt)
181+
attempt++
182+
if !c.backoff(attempt) {
183+
return
184+
}
185+
continue
186+
}
187+
188+
// Connected — store conn so Stop() can close it.
189+
// Re-check stopCh under the same lock to handle the race
190+
// where Stop() already checked c.conn (was nil during dial).
191+
c.mu.Lock()
192+
c.conn = conn
193+
select {
194+
case <-c.stopCh:
195+
c.conn = nil
196+
c.mu.Unlock()
197+
conn.Close()
198+
return
199+
default:
200+
}
201+
c.mu.Unlock()
202+
203+
attempt = 0
204+
c.status.Set(StatusConnected)
205+
slog.Info("connected to adder events endpoint", "url", c.wsURL())
206+
207+
// Read events until error or stop
208+
reconnect := c.readLoop(conn)
209+
210+
c.mu.Lock()
211+
c.conn = nil
212+
c.mu.Unlock()
213+
conn.Close()
214+
215+
if !reconnect {
216+
return
217+
}
218+
219+
// Connection lost, try to reconnect
220+
c.status.Set(StatusReconnecting)
221+
attempt++
222+
if !c.backoff(attempt) {
223+
return
224+
}
225+
}
226+
}
227+
228+
// dial connects to the WS endpoint.
229+
func (c *EventClient) dial() (*websocket.Conn, error) {
230+
conn, _, err := websocket.DefaultDialer.Dial(c.wsURL(), nil)
231+
if err != nil {
232+
return nil, fmt.Errorf("dialing ws: %w", err)
233+
}
234+
return conn, nil
235+
}
236+
237+
// readLoop reads messages from the WS connection and delivers them
238+
// to the events channel. Returns true if it should reconnect, false
239+
// if stop was requested.
240+
func (c *EventClient) readLoop(conn *websocket.Conn) bool {
241+
for {
242+
_, msg, err := conn.ReadMessage()
243+
if err != nil {
244+
// Check if we were asked to stop
245+
select {
246+
case <-c.stopCh:
247+
return false
248+
default:
249+
slog.Debug("ws read error, will reconnect", "error", err)
250+
return true
251+
}
252+
}
253+
254+
var evt event.Event
255+
if err := json.Unmarshal(msg, &evt); err != nil {
256+
slog.Debug("failed to unmarshal event", "error", err)
257+
continue
258+
}
259+
260+
// Non-blocking send
261+
select {
262+
case c.events <- evt:
263+
default:
264+
slog.Debug("event dropped, channel full", "type", evt.Type)
265+
}
266+
}
267+
}
268+
269+
// backoff waits for an exponential delay before the next reconnect
270+
// attempt. Returns false if stop was requested during the wait.
271+
func (c *EventClient) backoff(attempt int) bool {
272+
delay := backoffDelay(attempt)
273+
slog.Debug("reconnect backoff", "delay", delay, "attempt", attempt)
274+
275+
select {
276+
case <-time.After(delay):
277+
return true
278+
case <-c.stopCh:
279+
return false
280+
}
281+
}
282+
283+
// backoffDelay returns an exponential delay capped at
284+
// maxReconnectDelay.
285+
func backoffDelay(attempt int) time.Duration {
286+
delay := baseReconnectDelay
287+
for i := 1; i < attempt; i++ {
288+
delay *= 2
289+
if delay > maxReconnectDelay {
290+
return maxReconnectDelay
291+
}
292+
}
293+
return delay
294+
}

0 commit comments

Comments
 (0)