Skip to content

Commit e05092b

Browse files
authored
expose output channel and guard against user closing it (#103)
1 parent 46f551f commit e05092b

3 files changed

Lines changed: 29 additions & 48 deletions

File tree

websocket/polygon.go

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -154,17 +154,10 @@ func (c *Client) Unsubscribe(topic Topic, tickers ...string) error {
154154
return nil
155155
}
156156

157-
// Output returns the next message in the output queue. If no messages are available, it returns nil.
158-
func (c *Client) Output() any {
159-
select {
160-
case out, ok := <-c.output:
161-
if ok {
162-
return out
163-
}
164-
return nil
165-
default:
166-
return nil
167-
}
157+
// Output returns the output queue. If the channel is closed by the user (not
158+
// recommended), the client connection will close as well.
159+
func (c *Client) Output() chan any {
160+
return c.output
168161
}
169162

170163
// Close attempt to gracefully close the connection to the server.
@@ -254,6 +247,17 @@ func (c *Client) reconnect() {
254247
}
255248
}
256249

250+
func (c *Client) closeOutput() {
251+
defer func() {
252+
if r := recover(); r != nil {
253+
c.log.Debugf("output channel was closed by user")
254+
} else {
255+
c.log.Debugf("output channel closed")
256+
}
257+
}()
258+
close(c.output)
259+
}
260+
257261
func (c *Client) close(reconnect bool) {
258262
if c.conn == nil {
259263
return
@@ -270,7 +274,7 @@ func (c *Client) close(reconnect bool) {
270274
c.log.Errorf("process thread closed: %v", err)
271275
}
272276
c.shouldClose = true
273-
close(c.output)
277+
c.closeOutput()
274278
}
275279

276280
if c.conn != nil {
@@ -329,16 +333,15 @@ func (c *Client) write() error {
329333
if err := c.conn.WriteMessage(websocket.TextMessage, msg); err != nil {
330334
return fmt.Errorf("failed to send message: %w", err)
331335
}
332-
default:
333-
continue
334336
}
335337
}
336338
}
337339

338340
func (c *Client) process() (err error) {
339341
defer func() {
340342
c.log.Debugf("process thread closed")
341-
if err != nil {
343+
r := recover()
344+
if r != nil || err != nil {
342345
go c.Close() // this client should close if it hits a fatal error (e.g. auth failed)
343346
}
344347
}()
@@ -356,8 +359,6 @@ func (c *Client) process() (err error) {
356359
if err := c.route(msgs); err != nil {
357360
return err
358361
}
359-
default:
360-
continue
361362
}
362363
}
363364
}

websocket/polygon_live_test.go

Lines changed: 11 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,16 @@ func TestMain(t *testing.T) {
5151
log.Fatal(err)
5252
}
5353

54-
go printOutput(ctx, c) // comment for raw data handling
55-
// go printRawOutput(ctx, c) // uncomment for raw data handling
54+
go printOutput(c) // comment for raw data handling
55+
// go printRawOutput(c) // uncomment for raw data handling
5656

5757
time.Sleep(10 * time.Second)
5858
if err := c.Subscribe(polygonws.StocksTrades, "*"); err != nil {
5959
log.Error(err)
6060
}
6161

62+
// close(c.Output()) // uncomment to test behavior when output channel is closed
63+
6264
time.Sleep(250 * time.Millisecond)
6365
if err := c.Unsubscribe(polygonws.StocksTrades); err != nil {
6466
log.Error(err)
@@ -91,35 +93,17 @@ func TestMain(t *testing.T) {
9193
}
9294
}
9395

94-
func printOutput(ctx context.Context, client *polygonws.Client) {
95-
for {
96-
select {
97-
case <-ctx.Done():
98-
return
99-
default:
100-
out := client.Output()
101-
if out == nil {
102-
continue
103-
}
104-
fmt.Println(out)
105-
}
96+
func printOutput(client *polygonws.Client) {
97+
for out := range client.Output() {
98+
fmt.Println(out)
10699
}
107100
}
108101

109102
//nolint:deadcode
110-
func printRawOutput(ctx context.Context, client *polygonws.Client) {
111-
for {
112-
select {
113-
case <-ctx.Done():
114-
return
115-
default:
116-
out := client.Output()
117-
if out == nil {
118-
continue
119-
}
120-
if b, ok := out.(json.RawMessage); ok {
121-
fmt.Println(string(b))
122-
}
103+
func printRawOutput(client *polygonws.Client) {
104+
for out := range client.Output() {
105+
if b, ok := out.(json.RawMessage); ok {
106+
fmt.Println(string(b))
123107
}
124108
}
125109
}

websocket/polygon_test.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,6 @@ func TestConnectAuthSuccess(t *testing.T) {
8989
// closing before connecting shouldn't do anthing
9090
c.Close()
9191

92-
// accessing output early shouldn't do anything
93-
out := c.Output()
94-
assert.Nil(t, out)
95-
9692
// connect successfully
9793
err = c.Connect()
9894
assert.Nil(t, err)

0 commit comments

Comments
 (0)