Skip to content

Commit e983df6

Browse files
committed
add pollThenStartRealtime
1 parent 0729ae5 commit e983df6

File tree

3 files changed

+50
-20
lines changed

3 files changed

+50
-20
lines changed

backoff.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package flagsmith
22

3-
import "time"
3+
import (
4+
"context"
5+
"time"
6+
)
47

58
const (
69
initialBackoff = 200 * time.Millisecond
@@ -36,3 +39,12 @@ func (b *backoff) next() time.Duration {
3639
func (b *backoff) reset() {
3740
b.current = initialBackoff
3841
}
42+
43+
// wait waits for the current backoff time, or until ctx is done
44+
func (b *backoff) wait(ctx context.Context) {
45+
select {
46+
case <-ctx.Done():
47+
return
48+
case <-time.After(b.next()):
49+
}
50+
}

client.go

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func NewClient(apiKey string, options ...Option) *Client {
114114
}
115115
if c.config.useRealtime {
116116
// Poll until we get the environment once
117-
go c.pollEnvironment(c.ctxLocalEval, false)
117+
go c.pollThenStartRealtime(c.ctxLocalEval)
118118
}
119119

120120
}
@@ -380,6 +380,40 @@ func (c *Client) pollEnvironment(ctx context.Context, pollForever bool) {
380380
}
381381
}
382382

383+
func (c *Client) pollThenStartRealtime(ctx context.Context) {
384+
b := newBackoff()
385+
update := func() {
386+
c.log.Debug("polling environment")
387+
ctx, cancel := context.WithTimeout(ctx, c.config.envRefreshInterval)
388+
defer cancel()
389+
err := c.UpdateEnvironment(ctx)
390+
if err != nil {
391+
c.log.Error("failed to update environment", "error", err)
392+
b.wait(ctx)
393+
}
394+
}
395+
update()
396+
defer func() {
397+
c.log.Info("initial polling stopped")
398+
}()
399+
for {
400+
select {
401+
case <-ctx.Done():
402+
return
403+
default:
404+
// If environment was fetched, start realtime and finish
405+
if env, ok := c.environment.Load().(*environments.EnvironmentModel); ok {
406+
streamURL := c.config.realtimeBaseUrl + "sse/environments/" + env.APIKey + "/stream"
407+
c.log.Debug("environment initialised, starting realtime updates")
408+
c.realtime = newRealtime(c, ctx, streamURL, env.UpdatedAt)
409+
go c.realtime.start()
410+
return
411+
}
412+
update()
413+
}
414+
}
415+
}
416+
383417
func (c *Client) UpdateEnvironment(ctx context.Context) error {
384418
var env environments.EnvironmentModel
385419
resp, err := c.client.NewRequest().
@@ -412,14 +446,7 @@ func (c *Client) UpdateEnvironment(ctx context.Context) error {
412446
c.identitiesWithOverrides.Store(identitiesWithOverrides)
413447

414448
c.log.Info("environment updated", "environment", env.APIKey)
415-
c.once.Do(func() {
416-
if c.config.useRealtime && c.realtime == nil {
417-
streamURL := c.config.realtimeBaseUrl + "sse/environments/" + env.APIKey + "/stream"
418-
c.realtime = newRealtime(c, c.ctxLocalEval, streamURL, env.UpdatedAt)
419-
c.log.Debug("environment initialised, starting realtime updates")
420-
go c.realtime.start()
421-
}
422-
})
449+
423450
return nil
424451
}
425452

realtime.go

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func (r *realtime) start() {
5353
default:
5454
if err := r.connect(); err != nil {
5555
r.log.Error("failed to connect", "error", err)
56-
r.wait()
56+
r.backoff.wait(r.ctx)
5757
}
5858
}
5959
}
@@ -114,15 +114,6 @@ func (r *realtime) handleEvent(line string) error {
114114
return nil
115115
}
116116

117-
// wait waits for the current backoff time
118-
func (r *realtime) wait() {
119-
select {
120-
case <-r.ctx.Done():
121-
return
122-
case <-time.After(r.backoff.next()):
123-
}
124-
}
125-
126117
func parseUpdatedAtFromSSE(line string) (time.Time, error) {
127118
var eventData struct {
128119
UpdatedAt float64 `json:"updated_at"`

0 commit comments

Comments
 (0)