Skip to content

Commit 2fb64a3

Browse files
committed
feedback
1 parent 26d4459 commit 2fb64a3

File tree

1 file changed

+8
-26
lines changed

1 file changed

+8
-26
lines changed

realtime.go

Lines changed: 8 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"encoding/json"
77
"errors"
8+
"fmt"
89
"log/slog"
910
"net/http"
1011
"strings"
@@ -33,10 +34,9 @@ func newRealtime(client *Client, ctx context.Context, streamURL string, envUpdat
3334
slog.String("worker", "realtime"),
3435
slog.String("stream", streamURL),
3536
),
36-
streamURL: streamURL,
37-
envUpdatedAt: envUpdatedAt,
38-
backoff: newBackoff(),
39-
reconnectChan: make(chan struct{}, 1),
37+
streamURL: streamURL,
38+
envUpdatedAt: envUpdatedAt,
39+
backoff: newBackoff(),
4040
}
4141
}
4242

@@ -50,9 +50,6 @@ func (r *realtime) start() {
5050
select {
5151
case <-r.ctx.Done():
5252
return
53-
case <-r.reconnectChan:
54-
// Reset backoff on successful reconnect
55-
r.backoff.reset()
5653
default:
5754
if err := r.connect(); err != nil {
5855
r.log.Error("failed to connect", "error", err)
@@ -69,9 +66,12 @@ func (r *realtime) connect() error {
6966
return err
7067
}
7168
defer resp.Body.Close()
69+
if resp.StatusCode != http.StatusOK {
70+
return fmt.Errorf("error response connecting to stream: %d", resp.StatusCode)
71+
}
7272

7373
r.log.Info("connected")
74-
r.reconnectChan <- struct{}{}
74+
r.backoff.reset()
7575

7676
scanner := bufio.NewScanner(resp.Body)
7777
for scanner.Scan() {
@@ -123,24 +123,6 @@ func (r *realtime) wait() {
123123
}
124124
}
125125

126-
func (c *Client) startRealtimeUpdates(ctx context.Context) {
127-
// Initial environment fetch
128-
if err := c.UpdateEnvironment(ctx); err != nil {
129-
c.log.Error("failed to fetch initial environment", "error", err)
130-
return
131-
}
132-
133-
env, ok := c.environment.Load().(*environments.EnvironmentModel)
134-
if !ok {
135-
c.log.Error("failed to load environment")
136-
return
137-
}
138-
139-
streamURL := c.config.realtimeBaseUrl + "sse/environments/" + env.APIKey + "/stream"
140-
conn := newRealtime(c, ctx, streamURL, env.UpdatedAt)
141-
conn.start()
142-
}
143-
144126
func parseUpdatedAtFromSSE(line string) (time.Time, error) {
145127
var eventData struct {
146128
UpdatedAt float64 `json:"updated_at"`

0 commit comments

Comments
 (0)