Skip to content

Commit 79750f8

Browse files
committed
fix: roll up lifecycle and metrics backlog fixes (EN-1185-EN-1190)
1 parent edae674 commit 79750f8

14 files changed

Lines changed: 1226 additions & 197 deletions

File tree

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ require (
8181
golang.org/x/oauth2 v0.35.0
8282
golang.org/x/text v0.37.0
8383
google.golang.org/grpc v1.80.0
84+
google.golang.org/protobuf v1.36.11
8485
)
8586

8687
require (
@@ -226,6 +227,5 @@ require (
226227
golang.org/x/tools v0.44.0 // indirect
227228
google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 // indirect
228229
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 // indirect
229-
google.golang.org/protobuf v1.36.11 // indirect
230230
gopkg.in/yaml.v3 v3.0.1 // indirect
231231
)

pkg/fx/observefx/metrics.go

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -62,35 +62,38 @@ func MetricsModule(cfg metrics.ModuleConfig) fx.Option {
6262

6363
return ret
6464
}, fx.ParamTags(metricsProviderOptionKey))),
65-
fx.Invoke(func(lc fx.Lifecycle, metricProvider *sdkmetric.MeterProvider, options ...runtime.Option) {
66-
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
67-
b3.New(), propagation.TraceContext{}))
68-
lc.Append(fx.Hook{
69-
OnStart: func(ctx context.Context) error {
70-
if cfg.RuntimeMetrics {
71-
if err := runtime.Start(options...); err != nil {
72-
return err
65+
fx.Invoke(fx.Annotate(
66+
func(lc fx.Lifecycle, metricProvider *sdkmetric.MeterProvider, options ...runtime.Option) {
67+
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
68+
b3.New(), propagation.TraceContext{}))
69+
lc.Append(fx.Hook{
70+
OnStart: func(ctx context.Context) error {
71+
if cfg.RuntimeMetrics {
72+
if err := runtime.Start(options...); err != nil {
73+
return err
74+
}
75+
if err := host.Start(); err != nil {
76+
return err
77+
}
7378
}
74-
if err := host.Start(); err != nil {
75-
return err
79+
return nil
80+
},
81+
OnStop: func(ctx context.Context) error {
82+
logging.FromContext(ctx).Infof("Flush metrics")
83+
if err := metricProvider.ForceFlush(ctx); err != nil {
84+
logging.FromContext(ctx).Errorf("unable to flush metrics: %s", err)
7685
}
77-
}
78-
return nil
79-
},
80-
OnStop: func(ctx context.Context) error {
81-
logging.FromContext(ctx).Infof("Flush metrics")
82-
if err := metricProvider.ForceFlush(ctx); err != nil {
83-
logging.FromContext(ctx).Errorf("unable to flush metrics: %s", err)
84-
}
85-
logging.FromContext(ctx).Infof("Shutting down metrics provider")
86-
if err := metricProvider.Shutdown(ctx); err != nil {
87-
logging.FromContext(ctx).Errorf("unable to shutdown metrics provider: %s", err)
88-
}
89-
logging.FromContext(ctx).Infof("Metrics provider stopped")
90-
return nil
91-
},
92-
})
93-
}),
86+
logging.FromContext(ctx).Infof("Shutting down metrics provider")
87+
if err := metricProvider.Shutdown(ctx); err != nil {
88+
logging.FromContext(ctx).Errorf("unable to shutdown metrics provider: %s", err)
89+
}
90+
logging.FromContext(ctx).Infof("Metrics provider stopped")
91+
return nil
92+
},
93+
})
94+
},
95+
fx.ParamTags(``, ``, metricsRuntimeOptionKey),
96+
)),
9497
ProvideMetricsProviderOption(sdkmetric.WithResource),
9598
ProvideMetricsProviderOption(sdkmetric.WithReader),
9699
fx.Provide(
@@ -99,10 +102,12 @@ func MetricsModule(cfg metrics.ModuleConfig) fx.Option {
99102
ProvideOTLPMetricsPeriodicReaderOption(func() sdkmetric.PeriodicReaderOption {
100103
return sdkmetric.WithInterval(cfg.PushInterval)
101104
}),
102-
ProvideRuntimeMetricsOption(func() runtime.Option {
103-
return runtime.WithMinimumReadMemStatsInterval(cfg.MinimumReadMemStatsInterval)
104-
}),
105105
)
106+
if cfg.MinimumReadMemStatsInterval > 0 {
107+
options = append(options, ProvideRuntimeMetricsOption(func() runtime.Option {
108+
return runtime.WithMinimumReadMemStatsInterval(cfg.MinimumReadMemStatsInterval)
109+
}))
110+
}
106111

107112
switch cfg.Exporter {
108113
case metrics.StdoutExporter:

pkg/fx/observefx/metrics_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package observefx_test
2+
3+
import (
4+
"sync/atomic"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/require"
9+
"go.opentelemetry.io/contrib/instrumentation/runtime"
10+
"go.uber.org/fx"
11+
"go.uber.org/fx/fxtest"
12+
13+
"github.com/formancehq/go-libs/v5/pkg/fx/observefx"
14+
"github.com/formancehq/go-libs/v5/pkg/observe"
15+
"github.com/formancehq/go-libs/v5/pkg/observe/metrics"
16+
)
17+
18+
func TestMetricsModuleProvidesRuntimeOptionsToRuntimeMetricsInvoke(t *testing.T) {
19+
var runtimeOptionProvided atomic.Bool
20+
21+
app := fxtest.New(t,
22+
observefx.ResourceModule(observe.Config{
23+
ServiceName: "metrics-test",
24+
}),
25+
observefx.MetricsModule(metrics.ModuleConfig{
26+
MinimumReadMemStatsInterval: time.Second,
27+
}),
28+
observefx.ProvideRuntimeMetricsOption(func() runtime.Option {
29+
runtimeOptionProvided.Store(true)
30+
return runtime.WithMinimumReadMemStatsInterval(100 * time.Millisecond)
31+
}),
32+
fx.NopLogger,
33+
)
34+
app.RequireStart()
35+
defer app.RequireStop()
36+
37+
require.True(t, runtimeOptionProvided.Load())
38+
}
39+
40+
func TestMetricsModuleDoesNotProvideZeroRuntimeMetricsInterval(t *testing.T) {
41+
var runtimeOptionsCount int
42+
43+
app := fxtest.New(t,
44+
observefx.ResourceModule(observe.Config{
45+
ServiceName: "metrics-test",
46+
}),
47+
observefx.MetricsModule(metrics.ModuleConfig{}),
48+
fx.Invoke(fx.Annotate(
49+
func(options ...runtime.Option) {
50+
runtimeOptionsCount = len(options)
51+
},
52+
fx.ParamTags(`group:"_metricsRuntimeOption"`),
53+
)),
54+
fx.NopLogger,
55+
)
56+
app.RequireStart()
57+
defer app.RequireStop()
58+
59+
require.Zero(t, runtimeOptionsCount)
60+
}

pkg/fx/workflowfx/temporal.go

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package workflowfx
22

33
import (
44
"context"
5+
"sync/atomic"
56

67
"github.com/spf13/cobra"
78
"go.opentelemetry.io/otel/metric"
@@ -66,25 +67,34 @@ func TemporalWorkerModule(ctx context.Context, taskQueue string, options worker.
6667
}, fx.ParamTags(``, ``, `group:"workflows"`, `group:"activities"`)),
6768
),
6869
fx.Invoke(func(lc fx.Lifecycle, w worker.Worker) {
69-
willStop := false
70-
lc.Append(fx.Hook{
71-
OnStart: func(ctx context.Context) error {
72-
go func() {
73-
err := w.Run(worker.InterruptCh())
74-
if err != nil {
75-
if !willStop {
76-
panic(err)
77-
}
78-
}
79-
}()
80-
return nil
81-
},
82-
OnStop: func(ctx context.Context) error {
83-
willStop = true
84-
w.Stop()
85-
return nil
86-
},
87-
})
70+
registerTemporalWorkerLifecycle(lc, w)
8871
}),
8972
)
9073
}
74+
75+
type temporalWorkerRunner interface {
76+
Run(interruptCh <-chan interface{}) error
77+
Stop()
78+
}
79+
80+
func registerTemporalWorkerLifecycle(lc fx.Lifecycle, w temporalWorkerRunner) {
81+
var willStop atomic.Bool
82+
lc.Append(fx.Hook{
83+
OnStart: func(ctx context.Context) error {
84+
go func() {
85+
err := w.Run(worker.InterruptCh())
86+
if err != nil {
87+
if !willStop.Load() {
88+
panic(err)
89+
}
90+
}
91+
}()
92+
return nil
93+
},
94+
OnStop: func(ctx context.Context) error {
95+
willStop.Store(true)
96+
w.Stop()
97+
return nil
98+
},
99+
})
100+
}

pkg/fx/workflowfx/temporal_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package workflowfx
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
"time"
8+
9+
"go.uber.org/fx"
10+
)
11+
12+
func TestRegisterTemporalWorkerLifecycleIsRaceSafeWhenRunReturnsDuringStop(t *testing.T) {
13+
lc := &lifecycleRecorder{}
14+
w := &shutdownErrorWorker{
15+
runErr: errors.New("worker stopped"),
16+
runReturned: make(chan struct{}),
17+
}
18+
19+
registerTemporalWorkerLifecycle(lc, w)
20+
21+
if len(lc.hooks) != 1 {
22+
t.Fatalf("expected 1 lifecycle hook, got %d", len(lc.hooks))
23+
}
24+
25+
hook := lc.hooks[0]
26+
if hook.OnStart == nil {
27+
t.Fatal("expected OnStart hook")
28+
}
29+
if hook.OnStop == nil {
30+
t.Fatal("expected OnStop hook")
31+
}
32+
33+
if err := hook.OnStart(context.Background()); err != nil {
34+
t.Fatalf("OnStart returned error: %v", err)
35+
}
36+
if err := hook.OnStop(context.Background()); err != nil {
37+
t.Fatalf("OnStop returned error: %v", err)
38+
}
39+
40+
time.Sleep(10 * time.Millisecond)
41+
}
42+
43+
type lifecycleRecorder struct {
44+
hooks []fx.Hook
45+
}
46+
47+
func (lc *lifecycleRecorder) Append(hook fx.Hook) {
48+
lc.hooks = append(lc.hooks, hook)
49+
}
50+
51+
type shutdownErrorWorker struct {
52+
runErr error
53+
runReturned chan struct{}
54+
}
55+
56+
func (w *shutdownErrorWorker) Run(<-chan interface{}) error {
57+
time.Sleep(10 * time.Millisecond)
58+
close(w.runReturned)
59+
return w.runErr
60+
}
61+
62+
func (w *shutdownErrorWorker) Stop() {
63+
select {
64+
case <-w.runReturned:
65+
case <-time.After(time.Second):
66+
panic("worker Run did not return")
67+
}
68+
}

0 commit comments

Comments
 (0)