Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,16 @@ func NewAnalyticsProcessor(ctx context.Context, client *resty.Client, baseURL st
endpoint: baseURL + AnalyticsEndpoint,
log: log,
}
log.Debugf("analytics processor starting")
go processor.start(ctx, tickerInterval)
return &processor
}

func (a *AnalyticsProcessor) start(ctx context.Context, tickerInterval int) {
ticker := time.NewTicker(time.Duration(tickerInterval) * time.Millisecond)
defer func() {
a.log.Debugf("analytics processor stopped")
}()
for {
select {
case <-ticker.C:
Expand Down
2 changes: 1 addition & 1 deletion analytics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestAnalytics(t *testing.T) {
client.SetHeader("X-Environment-Key", EnvironmentAPIKey)

// Now let's create the processor
processor := NewAnalyticsProcessor(context.Background(), client, server.URL+"/api/v1/", &analyticsTimer, createLogger())
processor := NewAnalyticsProcessor(context.Background(), client, server.URL+"/api/v1/", &analyticsTimer, newSlogToLoggerAdapter(createLogger()))

// and, track some features
processor.TrackFeature("feature_1")
Expand Down
32 changes: 28 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package flagsmith
import (
"context"
"fmt"
"log/slog"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -34,7 +35,7 @@ type Client struct {
client *resty.Client
ctxLocalEval context.Context
ctxAnalytics context.Context
log Logger
log *slog.Logger
offlineHandler OfflineHandler
errorHandler func(handler *FlagsmithAPIError)
}
Expand Down Expand Up @@ -70,7 +71,21 @@ func NewClient(apiKey string, options ...Option) *Client {
opt(c)
}
}
c.client.SetLogger(c.log)
c.client = c.client.
SetLogger(newSlogToRestyAdapter(c.log)).
OnBeforeRequest(newRestyLogRequestMiddleware(c.log)).
OnAfterResponse(newRestyLogResponseMiddleware(c.log))

c.log.Debug("initialising Flagsmith client",
"base_url", c.config.baseURL,
"local_evaluation", c.config.localEvaluation,
"offline", c.config.offlineMode,
"analytics", c.config.enableAnalytics,
"realtime", c.config.useRealtime,
"realtime_url", c.config.realtimeBaseUrl,
"env_refresh_interval", c.config.envRefreshInterval,
"timeout", c.config.timeout,
)

if c.config.offlineMode && c.offlineHandler == nil {
panic("offline handler must be provided to use offline mode.")
Expand All @@ -97,7 +112,15 @@ func NewClient(apiKey string, options ...Option) *Client {
}
// Initialise analytics processor
if c.config.enableAnalytics {
c.analyticsProcessor = NewAnalyticsProcessor(c.ctxAnalytics, c.client, c.config.baseURL, nil, c.log)
c.analyticsProcessor = NewAnalyticsProcessor(
c.ctxAnalytics,
c.client,
c.config.baseURL,
nil,
newSlogToLoggerAdapter(
c.log.With(slog.String("worker", "analytics")),
),
)
}
return c
}
Expand Down Expand Up @@ -319,7 +342,7 @@ func (c *Client) pollEnvironment(ctx context.Context) {
defer cancel()
err := c.UpdateEnvironment(ctx)
if err != nil {
c.log.Errorf("Failed to update environment: %v", err)
c.log.Error("failed to update environment", "error", err)
}
}
update()
Expand Down Expand Up @@ -364,6 +387,7 @@ func (c *Client) UpdateEnvironment(ctx context.Context) error {
}
c.identitiesWithOverrides.Store(identitiesWithOverrides)

c.log.Info("environment updated", "environment", env.APIKey)
return nil
}

Expand Down
18 changes: 18 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -959,3 +961,19 @@ data: {"updated_at": %f}
// Flush the event to the client
flusher.Flush()
}

func TestWithSlogLogger(t *testing.T) {
// Given
var logOutput strings.Builder
slogLogger := slog.New(slog.NewTextHandler(&logOutput, &slog.HandlerOptions{
Level: slog.LevelDebug,
}))

// When
_ = flagsmith.NewClient(fixtures.EnvironmentAPIKey, flagsmith.WithSlogLogger(slogLogger))

// Then
logStr := logOutput.String()
t.Log(logStr)
assert.Contains(t, logStr, "initialising Flagsmith client")
}
149 changes: 131 additions & 18 deletions logger.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
package flagsmith

import (
"log"
"context"
"fmt"
"log/slog"
"os"
"strings"
"time"

"github.com/go-resty/resty/v2"
)

// Logger is the interface used for logging by flagsmith client. This interface defines the methods
Expand All @@ -19,33 +25,140 @@ type Logger interface {
Debugf(format string, v ...interface{})
}

func createLogger() *logger {
l := &logger{l: log.New(os.Stderr, "", log.Ldate|log.Lmicroseconds)}
return l
// slogToRestyAdapter adapts a slog.Logger to resty.Logger.
type slogToRestyAdapter struct {
logger *slog.Logger
}

func newSlogToRestyAdapter(logger *slog.Logger) *slogToRestyAdapter {
return &slogToRestyAdapter{logger: logger}
}

func (l *slogToRestyAdapter) Errorf(format string, v ...interface{}) {
l.logger.Error(format, v...)
}

func (l *slogToRestyAdapter) Warnf(format string, v ...interface{}) {
l.logger.Warn(format, v...)
}

func (l *slogToRestyAdapter) Debugf(format string, v ...interface{}) {
l.logger.Debug(format, v...)
}

var _ Logger = (*logger)(nil)
// slogToLoggerAdapter adapts a slog.Logger to our Logger interface.
type slogToLoggerAdapter struct {
logger *slog.Logger
}

func newSlogToLoggerAdapter(logger *slog.Logger) *slogToLoggerAdapter {
return &slogToLoggerAdapter{logger: logger}
}

func (l *slogToLoggerAdapter) Errorf(format string, v ...interface{}) {
l.logger.Error(fmt.Sprintf(format, v...))
}

type logger struct {
l *log.Logger
func (l *slogToLoggerAdapter) Warnf(format string, v ...interface{}) {
l.logger.Warn(fmt.Sprintf(format, v...))
}

func (l *logger) Errorf(format string, v ...interface{}) {
l.output("ERROR FLAGSMITH: "+format, v...)
func (l *slogToLoggerAdapter) Debugf(format string, v ...interface{}) {
l.logger.Debug(fmt.Sprintf(format, v...))
}

func (l *logger) Warnf(format string, v ...interface{}) {
l.output("WARN FLAGSMITH: "+format, v...)
// loggerToSlogAdapter adapts our Logger interface to a slog.Logger.
type loggerToSlogAdapter struct {
logger Logger
}

func (l *logger) Debugf(format string, v ...interface{}) {
l.output("DEBUG FLAGSMITH: "+format, v...)
func newLoggerToSlogAdapter(logger Logger) *slog.Logger {
return slog.New(&loggerToSlogAdapter{logger: logger})
}

func (l *logger) output(format string, v ...interface{}) {
if len(v) == 0 {
l.l.Print(format)
return
// implement slog.Handler interface to adapt our Logger interface to a slog.Logger

func (l *loggerToSlogAdapter) Enabled(ctx context.Context, level slog.Level) bool {
return true
}

func (l *loggerToSlogAdapter) Handle(ctx context.Context, r slog.Record) error {
msg := r.Message
var attrs strings.Builder
r.Attrs(func(a slog.Attr) bool {
attrs.WriteString(a.String() + " ")
return true
})
msg += attrs.String()

switch r.Level {
case slog.LevelError:
l.logger.Errorf(msg)
case slog.LevelWarn:
l.logger.Warnf(msg)
case slog.LevelDebug:
l.logger.Debugf(msg)
}
return nil
}

func (l *loggerToSlogAdapter) WithAttrs(_ []slog.Attr) slog.Handler {
return l
}

func (l *loggerToSlogAdapter) WithGroup(_ string) slog.Handler {
return l
}

func createLogger() *slog.Logger {
return slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
Level: slog.LevelDebug,
}))
}

const (
contextLoggerKey contextKey = contextKey("logger")
contextStartTimeKey contextKey = contextKey("startTime")
)

func newRestyLogRequestMiddleware(logger *slog.Logger) resty.RequestMiddleware {
return func(c *resty.Client, req *resty.Request) error {
// Create a child logger with request metadata
reqLogger := logger.WithGroup("http").With(
"method", req.Method,
"url", req.URL,
)
reqLogger.Debug("request")

// Store the logger in this request's context, and use it in the response
req.SetContext(context.WithValue(req.Context(), contextLoggerKey, reqLogger))

// Time the current request
req.SetContext(context.WithValue(req.Context(), contextStartTimeKey, time.Now()))

return nil
}
}

func newRestyLogResponseMiddleware(logger *slog.Logger) resty.ResponseMiddleware {
return func(client *resty.Client, resp *resty.Response) error {
// Retrieve the logger and start time from context
reqLogger, _ := resp.Request.Context().Value(contextLoggerKey).(*slog.Logger)
startTime, _ := resp.Request.Context().Value(contextStartTimeKey).(time.Time)

if reqLogger == nil {
reqLogger = logger
}
reqLogger = reqLogger.With(
slog.Int("status", resp.StatusCode()),
slog.Duration("duration", time.Since(startTime)),
slog.Int64("content_length", resp.Size()),
)
if resp.IsError() {
reqLogger.Error("error response")
} else {
reqLogger.Debug("response")
}
return nil
}
l.l.Printf(format, v...)
}
11 changes: 11 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"strings"
"time"

"log/slog"
)

type Option func(c *Client)
Expand All @@ -22,6 +24,8 @@ var _ = []Option{
WithProxy(""),
WithRealtime(),
WithRealtimeBaseURL(""),
WithLogger(nil),
WithSlogLogger(nil),
}

func WithBaseURL(url string) Option {
Expand Down Expand Up @@ -93,6 +97,13 @@ func WithDefaultHandler(handler func(string) (Flag, error)) Option {

// Allows the client to use any logger that implements the `Logger` interface.
func WithLogger(logger Logger) Option {
return func(c *Client) {
c.log = newLoggerToSlogAdapter(logger)
}
}

// WithSlogLogger allows the client to use a slog.Logger for logging.
func WithSlogLogger(logger *slog.Logger) Option {
return func(c *Client) {
c.log = logger
}
Expand Down
18 changes: 13 additions & 5 deletions realtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"errors"
"log/slog"
"net/http"
"strings"
"time"
Expand All @@ -20,41 +21,48 @@ func (c *Client) startRealtimeUpdates(ctx context.Context) {
env, _ := c.environment.Load().(*environments.EnvironmentModel)
stream_url := c.config.realtimeBaseUrl + "sse/environments/" + env.APIKey + "/stream"
envUpdatedAt := env.UpdatedAt
log := c.log.With(
slog.String("worker", "realtime"),
slog.String("stream", stream_url),
)
defer func() {
log.Info("realtime stopped")
}()
for {
select {
case <-ctx.Done():
return
default:
resp, err := http.Get(stream_url)
if err != nil {
c.log.Errorf("Error connecting to realtime server: %v", err)
log.Error("failed to connect to realtime stream", "error", err)
continue
}
defer resp.Body.Close()
log.Info("connected")

scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "data: ") {
parsedTime, err := parseUpdatedAtFromSSE(line)
if err != nil {
c.log.Errorf("Error reading realtime stream: %v", err)
log.Error("failed to parse event message", "error", err, "message", line)
continue
}
if parsedTime.After(envUpdatedAt) {
err = c.UpdateEnvironment(ctx)
if err != nil {
c.log.Errorf("Failed to update the environment: %v", err)
log.Error("failed to update environment after receiving event", "error", err)
continue
}
env, _ := c.environment.Load().(*environments.EnvironmentModel)

envUpdatedAt = env.UpdatedAt
}
}
}
if err := scanner.Err(); err != nil {
c.log.Errorf("Error reading realtime stream: %v", err)
log.Error("error reading from realtime stream", "error", err)
}
}
}
Expand Down
Loading