7
7
"errors"
8
8
"log/slog"
9
9
"net/http"
10
+ "net/url"
10
11
"strings"
11
12
"time"
12
13
)
@@ -16,17 +17,25 @@ func (c *Client) startRealtimeUpdates(ctx context.Context) {
16
17
if err != nil {
17
18
panic ("Failed to fetch the environment while configuring real-time updates" )
18
19
}
20
+
19
21
env := c .environment .GetEnvironment ()
20
- stream_url := c .config .realtimeBaseUrl + "sse/environments/" + env .APIKey + "/stream"
21
22
envUpdatedAt := env .UpdatedAt
23
+ log := c .log .With ("environment" , env .APIKey , "current_updated_at" , & envUpdatedAt )
24
+
25
+ streamPath , err := url .JoinPath (c .config .realtimeBaseUrl , "sse/environments" , env .APIKey , "stream" )
26
+ if err != nil {
27
+ log .Error ("failed to build stream URL" , "error" , err )
28
+ panic (err )
29
+ }
30
+
22
31
for {
23
32
select {
24
33
case <- ctx .Done ():
25
34
return
26
35
default :
27
- resp , err := http .Get (stream_url )
36
+ resp , err := http .Get (streamPath )
28
37
if err != nil {
29
- c . log .Error ("failed to connect to realtime service" , "error" , err )
38
+ log .Error ("failed to connect to realtime service" , "error" , err )
30
39
continue
31
40
}
32
41
defer resp .Body .Close ()
@@ -37,11 +46,12 @@ func (c *Client) startRealtimeUpdates(ctx context.Context) {
37
46
if strings .HasPrefix (line , "data: " ) {
38
47
parsedTime , err := parseUpdatedAtFromSSE (line )
39
48
if err != nil {
40
- c . log .Error ("failed to parse realtime update event" , "error" , err , "raw_event" , line )
49
+ log .Error ("failed to parse realtime update event" , "error" , err , "raw_event" , line )
41
50
continue
42
51
}
43
52
if parsedTime .After (envUpdatedAt ) {
44
- c .log .Info ("received update event" ,
53
+ log .WithGroup ("event" ).Info ("received update event" ,
54
+ slog .Duration ("update_delay" , parsedTime .Sub (envUpdatedAt )),
45
55
slog .Time ("updated_at" , parsedTime ),
46
56
slog .String ("environment" , env .APIKey ),
47
57
)
@@ -59,6 +69,7 @@ func (c *Client) startRealtimeUpdates(ctx context.Context) {
59
69
}
60
70
}
61
71
}
72
+
62
73
func parseUpdatedAtFromSSE (line string ) (time.Time , error ) {
63
74
var eventData struct {
64
75
UpdatedAt float64 `json:"updated_at"`
0 commit comments