@@ -30,6 +30,7 @@ type Client struct {
3030 identitiesWithOverrides atomic.Value
3131
3232 analyticsProcessor * AnalyticsProcessor
33+ realtime * realtime
3334 defaultFlagHandler func (string ) (Flag , error )
3435
3536 client * resty.Client
@@ -76,7 +77,7 @@ func NewClient(apiKey string, options ...Option) *Client {
7677 OnBeforeRequest (newRestyLogRequestMiddleware (c .log )).
7778 OnAfterResponse (newRestyLogResponseMiddleware (c .log ))
7879
79- c .log .Debug ("initialising Flagsmith client" ,
80+ c .log .Info ("initialising Flagsmith client" ,
8081 "base_url" , c .config .baseURL ,
8182 "local_evaluation" , c .config .localEvaluation ,
8283 "offline" , c .config .offlineMode ,
@@ -104,10 +105,13 @@ func NewClient(apiKey string, options ...Option) *Client {
104105 if ! strings .HasPrefix (apiKey , "ser." ) {
105106 panic ("In order to use local evaluation, please generate a server key in the environment settings page." )
106107 }
108+ if c .config .polling || ! c .config .useRealtime {
109+ // Poll indefinitely
110+ go c .pollEnvironment (c .ctxLocalEval , true )
111+ }
107112 if c .config .useRealtime {
108- go c .startRealtimeUpdates (c .ctxLocalEval )
109- } else {
110- go c .pollEnvironment (c .ctxLocalEval )
113+ // Poll until we get the environment once
114+ go c .pollThenStartRealtime (c .ctxLocalEval )
111115 }
112116 }
113117 // Initialise analytics processor
@@ -336,26 +340,76 @@ func (c *Client) getEnvironmentFlagsFromEnvironment() (Flags, error) {
336340 ), nil
337341}
338342
339- func (c * Client ) pollEnvironment (ctx context.Context ) {
343+ func (c * Client ) pollEnvironment (ctx context.Context , pollForever bool ) {
344+ log := c .log .With (slog .String ("worker" , "poll" ))
340345 update := func () {
341- ctx , cancel := context .WithTimeout (ctx , c .config .envRefreshInterval )
346+ log .Debug ("polling environment" )
347+ ctx , cancel := context .WithTimeout (ctx , c .config .timeout )
342348 defer cancel ()
343349 err := c .UpdateEnvironment (ctx )
344350 if err != nil {
345- c . log .Error ("failed to update environment" , "error" , err )
351+ log .Error ("failed to update environment" , "error" , err )
346352 }
347353 }
348354 update ()
349355 ticker := time .NewTicker (c .config .envRefreshInterval )
356+ defer func () {
357+ ticker .Stop ()
358+ log .Info ("polling stopped" )
359+ }()
350360 for {
351361 select {
352362 case <- ticker .C :
363+ if ! pollForever {
364+ // Check if environment was successfully fetched
365+ if _ , ok := c .environment .Load ().(* environments.EnvironmentModel ); ok {
366+ if ! pollForever {
367+ c .log .Debug ("environment initialised" )
368+ return
369+ }
370+ }
371+ }
353372 update ()
354373 case <- ctx .Done ():
355374 return
356375 }
357376 }
358377}
378+
379+ func (c * Client ) pollThenStartRealtime (ctx context.Context ) {
380+ b := newBackoff ()
381+ update := func () {
382+ c .log .Debug ("polling environment" )
383+ ctx , cancel := context .WithTimeout (ctx , c .config .envRefreshInterval )
384+ defer cancel ()
385+ err := c .UpdateEnvironment (ctx )
386+ if err != nil {
387+ c .log .Error ("failed to update environment" , "error" , err )
388+ b .wait (ctx )
389+ }
390+ }
391+ update ()
392+ defer func () {
393+ c .log .Info ("initial polling stopped" )
394+ }()
395+ for {
396+ select {
397+ case <- ctx .Done ():
398+ return
399+ default :
400+ // If environment was fetched, start realtime and finish
401+ if env , ok := c .environment .Load ().(* environments.EnvironmentModel ); ok {
402+ streamURL := c .config .realtimeBaseUrl + "sse/environments/" + env .APIKey + "/stream"
403+ c .log .Debug ("environment initialised, starting realtime updates" )
404+ c .realtime = newRealtime (c , ctx , streamURL , env .UpdatedAt )
405+ go c .realtime .start ()
406+ return
407+ }
408+ update ()
409+ }
410+ }
411+ }
412+
359413func (c * Client ) UpdateEnvironment (ctx context.Context ) error {
360414 var env environments.EnvironmentModel
361415 resp , err := c .client .NewRequest ().
@@ -380,14 +434,22 @@ func (c *Client) UpdateEnvironment(ctx context.Context) error {
380434 }
381435 return f
382436 }
437+ isNew := false
438+ previousEnv := c .environment .Load ()
439+ if previousEnv == nil || env .UpdatedAt .After (previousEnv .(* environments.EnvironmentModel ).UpdatedAt ) {
440+ isNew = true
441+ }
383442 c .environment .Store (& env )
384443 identitiesWithOverrides := make (map [string ]identities.IdentityModel )
385444 for _ , id := range env .IdentityOverrides {
386445 identitiesWithOverrides [id .Identifier ] = * id
387446 }
388447 c .identitiesWithOverrides .Store (identitiesWithOverrides )
389448
390- c .log .Info ("environment updated" , "environment" , env .APIKey )
449+ if isNew {
450+ c .log .Info ("environment updated" , "environment" , env .APIKey , "updated_at" , env .UpdatedAt )
451+ }
452+
391453 return nil
392454}
393455
0 commit comments