Skip to content

Commit 6ad7020

Browse files
committed
realtime and poll
1 parent 588a5fb commit 6ad7020

File tree

3 files changed

+56
-40
lines changed

3 files changed

+56
-40
lines changed

client.go

Lines changed: 50 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"log/slog"
8+
"net/url"
89
"strings"
910
"time"
1011

@@ -77,6 +78,17 @@ func NewClient(apiKey string, options ...Option) (*Client, error) {
7778
}
7879
}
7980

81+
c.log.Debug("initialising Flagsmith client",
82+
slog.String("api_url", c.baseURL),
83+
slog.Bool("local_eval", c.localEvaluation),
84+
slog.Duration("environment_refresh_interval", c.envRefreshInterval),
85+
slog.Bool("realtime", c.useRealtime),
86+
slog.String("realtime_url", c.realtimeBaseUrl),
87+
slog.Bool("offline", c.state.IsOffline()),
88+
slog.Duration("timeout", c.timeout),
89+
slog.String("proxy", c.proxy),
90+
)
91+
8092
if c.state.IsOffline() {
8193
return c, nil
8294
}
@@ -99,15 +111,32 @@ func NewClient(apiKey string, options ...Option) (*Client, error) {
99111

100112
if c.localEvaluation {
101113
if !strings.HasPrefix(apiKey, "ser.") {
102-
return nil, errors.New("using local flag evaluation requires a server-side SDK key; got " + apiKey)
114+
return nil, errors.New("local flag evaluation requires a server-side SDK key; got " + apiKey)
115+
}
116+
if c.envRefreshInterval == 0 && !c.useRealtime {
117+
return nil, errors.New("local flag evaluation requires a non-zero refresh interval or enabling real-time updates")
103118
}
119+
120+
// Fail fast if we can't fetch the initial environment within the timeout
121+
ctxWithTimeout, cancel := context.WithTimeout(c.ctxLocalEval, c.timeout)
122+
defer cancel()
123+
c.log.Debug("fetching initial environment")
124+
env, err := c.updateAndReturnEnvironment(ctxWithTimeout)
125+
if err != nil {
126+
return nil, fmt.Errorf("failed to fetch initial environment: %w", err)
127+
}
128+
104129
if c.useRealtime {
105-
go c.startRealtimeUpdates(c.ctxLocalEval)
106-
} else {
130+
streamPath, err := url.JoinPath(c.realtimeBaseUrl, "sse/environments", env.APIKey, "stream")
131+
if err != nil {
132+
return nil, fmt.Errorf("failed to build stream URL: %w", err)
133+
}
134+
go c.startRealtimeUpdates(c.ctxLocalEval, streamPath)
135+
}
136+
if c.envRefreshInterval > 0 {
107137
go c.pollEnvironment(c.ctxLocalEval)
108138
}
109139
}
110-
111140
return c, nil
112141
}
113142

@@ -158,6 +187,11 @@ func (c *Client) GetFlags(ctx context.Context, ec EvaluationContext) (f Flags, e
158187
// UpdateEnvironment fetches the current environment state from the Flagsmith API. It is called periodically when using
159188
// [WithLocalEvaluation], or when [WithRealtime] is enabled and an update event was received.
160189
func (c *Client) UpdateEnvironment(ctx context.Context) error {
190+
_, err := c.updateAndReturnEnvironment(ctx)
191+
return err
192+
}
193+
194+
func (c *Client) updateAndReturnEnvironment(ctx context.Context) (*environments.EnvironmentModel, error) {
161195
var env environments.EnvironmentModel
162196
resp, err := c.client.
163197
NewRequest().
@@ -167,16 +201,16 @@ func (c *Client) UpdateEnvironment(ctx context.Context) error {
167201
Get(c.baseURL + "environment-document/")
168202

169203
if err != nil {
170-
return c.handleError(&APIError{Err: err})
204+
return nil, c.handleError(&APIError{Err: err})
171205
}
172206
if resp.IsError() {
173207
e := &APIError{response: resp.RawResponse}
174-
return c.handleError(e)
208+
return nil, c.handleError(e)
175209
}
176210
c.state.SetEnvironment(&env)
177211

178212
c.log.Info("environment updated", "environment", env.APIKey)
179-
return nil
213+
return &env, nil
180214
}
181215

182216
// GetIdentitySegments returns the segments that this evaluation context is a part of. It requires a local environment
@@ -309,21 +343,20 @@ func (c *Client) getIdentityFlagsFromEnvironment(identifier string, traits map[s
309343
}
310344

311345
func (c *Client) pollEnvironment(ctx context.Context) {
312-
update := func() {
313-
ctx, cancel := context.WithTimeout(ctx, c.envRefreshInterval)
314-
defer cancel()
315-
err := c.UpdateEnvironment(ctx)
316-
if err != nil {
317-
c.log.Error("pollEnvironment failed", "error", err)
318-
}
319-
}
320-
update()
321346
ticker := time.NewTicker(c.envRefreshInterval)
322347
for {
323348
select {
324349
case <-ticker.C:
350+
env, ok := c.state.GetEnvironment()
351+
if ok && time.Since(env.UpdatedAt) < c.envRefreshInterval {
352+
c.log.Debug("environment is already fresh, skipping poll")
353+
continue
354+
}
325355
c.log.Debug("polling environment")
326-
update()
356+
err := c.UpdateEnvironment(ctx)
357+
if err != nil {
358+
c.log.Error("pollEnvironment failed", "error", err)
359+
}
327360
case <-ctx.Done():
328361
return
329362
}

client_test.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"net/http"
99
"net/http/httptest"
1010
"sync"
11+
"sync/atomic"
1112
"testing"
1213
"time"
1314

@@ -100,15 +101,10 @@ func TestClientUpdatesEnvironmentOnStartForLocalEvaluation(t *testing.T) {
100101
func TestClientUpdatesEnvironmentOnEachRefresh(t *testing.T) {
101102
// Given
102103
ctx := context.Background()
103-
actualEnvironmentRefreshCounter := struct {
104-
mu sync.Mutex
105-
count int
106-
}{}
104+
var actualEnvironmentRefreshCounter atomic.Uint64
107105
expectedEnvironmentRefreshCount := 3
108106
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
109-
actualEnvironmentRefreshCounter.mu.Lock()
110-
actualEnvironmentRefreshCounter.count++
111-
actualEnvironmentRefreshCounter.mu.Unlock()
107+
actualEnvironmentRefreshCounter.Add(1)
112108
assert.Equal(t, req.URL.Path, "/api/v1/environment-document/")
113109
assert.Equal(t, fixtures.EnvironmentAPIKey, req.Header.Get("X-Environment-Key"))
114110

@@ -133,8 +129,7 @@ func TestClientUpdatesEnvironmentOnEachRefresh(t *testing.T) {
133129
// one when the client starts and 2
134130
// for each time the refresh interval expires
135131

136-
actualEnvironmentRefreshCounter.mu.Lock()
137-
assert.Equal(t, expectedEnvironmentRefreshCount, actualEnvironmentRefreshCounter.count)
132+
assert.Equal(t, expectedEnvironmentRefreshCount, int(actualEnvironmentRefreshCounter.Load()))
138133
}
139134

140135
func TestGetFlags(t *testing.T) {

realtime.go

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,33 +7,21 @@ import (
77
"errors"
88
"log/slog"
99
"net/http"
10-
"net/url"
1110
"strings"
1211
"time"
1312
)
1413

15-
func (c *Client) startRealtimeUpdates(ctx context.Context) {
16-
err := c.UpdateEnvironment(ctx)
17-
if err != nil {
18-
panic("Failed to fetch the environment while configuring real-time updates")
19-
}
20-
14+
func (c *Client) startRealtimeUpdates(ctx context.Context, stream string) {
2115
env, _ := c.state.GetEnvironment()
2216
envUpdatedAt := env.UpdatedAt
2317
log := c.log.With("environment", env.APIKey, "current_updated_at", &envUpdatedAt)
2418

25-
streamPath, err := url.JoinPath(c.realtimeBaseUrl, "sse/environments", env.APIKey, "stream")
26-
if err != nil {
27-
log.Error("failed to build stream URL", "error", err)
28-
panic(err)
29-
}
30-
3119
for {
3220
select {
3321
case <-ctx.Done():
3422
return
3523
default:
36-
resp, err := http.Get(streamPath)
24+
resp, err := http.Get(stream)
3725
if err != nil {
3826
log.Error("failed to connect to realtime service", "error", err)
3927
continue

0 commit comments

Comments
 (0)