Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package flagsmith

import (
"context"
"time"
)

const (
initialBackoff = 200 * time.Millisecond
maxBackoff = 30 * time.Second
)

// backoff handles exponential backoff with jitter.
type backoff struct {
current time.Duration
}

// newBackoff creates a new backoff instance.
func newBackoff() *backoff {
return &backoff{
current: initialBackoff,
}
}

// next returns the next backoff duration and updates the current backoff.
func (b *backoff) next() time.Duration {
// Add jitter between 0-1s
backoff := b.current + time.Duration(time.Now().UnixNano()%1e9)

// Double the backoff time, but cap it
if b.current < maxBackoff {
b.current *= 2
}

return backoff
}

// reset resets the backoff to initial value.
func (b *backoff) reset() {
b.current = initialBackoff
}

// wait waits for the current backoff time, or until ctx is done.
func (b *backoff) wait(ctx context.Context) {
select {
case <-ctx.Done():
return
case <-time.After(b.next()):
}
}
31 changes: 31 additions & 0 deletions backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package flagsmith

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestBackoff(t *testing.T) {
// Given
b := newBackoff()

// When
first := b.next()
second := b.next()
third := b.next()

// Then
assert.LessOrEqual(t, third, maxBackoff, "Backoff should not exceed max")

// Backoff increases across attempts
assert.Greater(t, second, first, "Second backoff should be greater than the first")
assert.Greater(t, third, second, "Third backoff should be greater than the second")
}

func TestBackoffReset(t *testing.T) {
b := newBackoff()
assert.Greater(t, b.next(), initialBackoff)
b.reset()
assert.Equal(t, initialBackoff, b.current, "Reset should return to initial backoff")
}
78 changes: 70 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Client struct {
identitiesWithOverrides atomic.Value

analyticsProcessor *AnalyticsProcessor
realtime *realtime
defaultFlagHandler func(string) (Flag, error)

client *resty.Client
Expand Down Expand Up @@ -76,7 +77,7 @@ func NewClient(apiKey string, options ...Option) *Client {
OnBeforeRequest(newRestyLogRequestMiddleware(c.log)).
OnAfterResponse(newRestyLogResponseMiddleware(c.log))

c.log.Debug("initialising Flagsmith client",
c.log.Info("initialising Flagsmith client",
"base_url", c.config.baseURL,
"local_evaluation", c.config.localEvaluation,
"offline", c.config.offlineMode,
Expand Down Expand Up @@ -104,10 +105,13 @@ func NewClient(apiKey string, options ...Option) *Client {
if !strings.HasPrefix(apiKey, "ser.") {
panic("In order to use local evaluation, please generate a server key in the environment settings page.")
}
if c.config.polling || !c.config.useRealtime {
// Poll indefinitely
go c.pollEnvironment(c.ctxLocalEval, true)
}
if c.config.useRealtime {
go c.startRealtimeUpdates(c.ctxLocalEval)
} else {
go c.pollEnvironment(c.ctxLocalEval)
// Poll until we get the environment once
go c.pollThenStartRealtime(c.ctxLocalEval)
}
}
// Initialise analytics processor
Expand Down Expand Up @@ -336,26 +340,76 @@ func (c *Client) getEnvironmentFlagsFromEnvironment() (Flags, error) {
), nil
}

func (c *Client) pollEnvironment(ctx context.Context) {
func (c *Client) pollEnvironment(ctx context.Context, pollForever bool) {
log := c.log.With(slog.String("worker", "poll"))
update := func() {
ctx, cancel := context.WithTimeout(ctx, c.config.envRefreshInterval)
log.Debug("polling environment")
ctx, cancel := context.WithTimeout(ctx, c.config.timeout)
defer cancel()
err := c.UpdateEnvironment(ctx)
if err != nil {
c.log.Error("failed to update environment", "error", err)
log.Error("failed to update environment", "error", err)
}
}
update()
ticker := time.NewTicker(c.config.envRefreshInterval)
defer func() {
ticker.Stop()
log.Info("polling stopped")
}()
for {
select {
case <-ticker.C:
if !pollForever {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be moved to real-time module/package; pollEnvironment should not be responsible for this

// Check if environment was successfully fetched
if _, ok := c.environment.Load().(*environments.EnvironmentModel); ok {
if !pollForever {
c.log.Debug("environment initialised")
return
}
}
}
update()
case <-ctx.Done():
return
}
}
}

func (c *Client) pollThenStartRealtime(ctx context.Context) {
b := newBackoff()
update := func() {
c.log.Debug("polling environment")
ctx, cancel := context.WithTimeout(ctx, c.config.envRefreshInterval)
defer cancel()
err := c.UpdateEnvironment(ctx)
if err != nil {
c.log.Error("failed to update environment", "error", err)
b.wait(ctx)
}
}
update()
defer func() {
c.log.Info("initial polling stopped")
}()
for {
select {
case <-ctx.Done():
return
default:
// If environment was fetched, start realtime and finish
if env, ok := c.environment.Load().(*environments.EnvironmentModel); ok {
streamURL := c.config.realtimeBaseUrl + "sse/environments/" + env.APIKey + "/stream"
c.log.Debug("environment initialised, starting realtime updates")
c.realtime = newRealtime(c, ctx, streamURL, env.UpdatedAt)
go c.realtime.start()
return
}
update()
}
}
}

func (c *Client) UpdateEnvironment(ctx context.Context) error {
var env environments.EnvironmentModel
resp, err := c.client.NewRequest().
Expand All @@ -380,14 +434,22 @@ func (c *Client) UpdateEnvironment(ctx context.Context) error {
}
return f
}
isNew := false
previousEnv := c.environment.Load()
if previousEnv == nil || env.UpdatedAt.After(previousEnv.(*environments.EnvironmentModel).UpdatedAt) {
isNew = true
}
c.environment.Store(&env)
identitiesWithOverrides := make(map[string]identities.IdentityModel)
for _, id := range env.IdentityOverrides {
identitiesWithOverrides[id.Identifier] = *id
}
c.identitiesWithOverrides.Store(identitiesWithOverrides)

c.log.Info("environment updated", "environment", env.APIKey)
if isNew {
c.log.Info("environment updated", "environment", env.APIKey, "updated_at", env.UpdatedAt)
}

return nil
}

Expand Down
42 changes: 42 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,3 +977,45 @@ func TestWithSlogLogger(t *testing.T) {
t.Log(logStr)
assert.Contains(t, logStr, "initialising Flagsmith client")
}

func TestWithPollingWorksWithRealtime(t *testing.T) {
ctx := context.Background()
server := httptest.NewServer(http.HandlerFunc(fixtures.EnvironmentDocumentHandler))
defer server.Close()

// guard against data race from goroutines logging at the same time
var logOutput strings.Builder
var logMu sync.Mutex
slogLogger := slog.New(slog.NewTextHandler(writerFunc(func(p []byte) (n int, err error) {
logMu.Lock()
defer logMu.Unlock()
return logOutput.Write(p)
}), &slog.HandlerOptions{
Level: slog.LevelDebug,
}))

// Given
_ = flagsmith.NewClient(fixtures.EnvironmentAPIKey,
flagsmith.WithSlogLogger(slogLogger),
flagsmith.WithLocalEvaluation(ctx),
flagsmith.WithRealtime(),
flagsmith.WithPolling(),
flagsmith.WithBaseURL(server.URL+"/api/v1/"))

// When
time.Sleep(500 * time.Millisecond)

// Then
logMu.Lock()
logStr := logOutput.String()
logMu.Unlock()
assert.Contains(t, logStr, "worker=poll")
assert.Contains(t, logStr, "worker=realtime")
}

// writerFunc implements io.Writer.
type writerFunc func(p []byte) (n int, err error)

func (f writerFunc) Write(p []byte) (n int, err error) {
return f(p)
}
1 change: 1 addition & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type config struct {
offlineMode bool
realtimeBaseUrl string
useRealtime bool
polling bool
}

// defaultConfig returns default configuration.
Expand Down
8 changes: 8 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var _ = []Option{
WithCustomHeaders(nil),
WithDefaultHandler(nil),
WithProxy(""),
WithPolling(),
WithRealtime(),
WithRealtimeBaseURL(""),
WithLogger(nil),
Expand Down Expand Up @@ -157,3 +158,10 @@ func WithRealtimeBaseURL(url string) Option {
c.config.realtimeBaseUrl = url
}
}

// WithPolling makes it so that the client will poll for updates even when WithRealtime is used.
func WithPolling() Option {
return func(c *Client) {
c.config.polling = true
}
}
Loading
Loading