Skip to content

Commit 5ef1f45

Browse files
Add metrics
1 parent e84cab0 commit 5ef1f45

3 files changed

Lines changed: 203 additions & 8 deletions

File tree

pkg/workflows/wasm/host/execution.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ type execution[T any] struct {
4040
donLogCount uint32
4141
nodeLogCount uint32
4242
awaiting []int32
43+
// peakMemoryBytes is the largest linear memory observed across (re)starts.
44+
// It is populated by callWasm and read by Execute to emit the memory metric.
45+
peakMemoryBytes int64
4346
// suspendOnAwait gates the suspend/resume behaviour. When false, the
4447
// execution behaves as it did before suspension was introduced:
4548
// awaitCapabilities blocks until each response is available and callCapAsync

pkg/workflows/wasm/host/metrics.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package host
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"go.opentelemetry.io/otel/attribute"
9+
"go.opentelemetry.io/otel/metric"
10+
11+
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
12+
)
13+
14+
// Execution phase labels for the execution duration histogram:
15+
// - phaseWasm: wall-clock time executing guest wasm code inside callWasm.
16+
// - phaseWaiting: wall-clock time suspended, waiting for pending capability
17+
// responses.
18+
// - phaseTotal: total wall-clock time for the execution, end to end.
19+
const (
20+
phaseWasm = "wasm"
21+
phaseWaiting = "waiting"
22+
phaseTotal = "total"
23+
)
24+
25+
// moduleMetrics holds the beholder instruments used to observe wasm module
26+
// executions. Instrument names are shared process-wide, so multiple modules can
27+
// safely construct their own moduleMetrics; the meter returns the same
28+
// underlying instrument for a given name.
29+
type moduleMetrics struct {
30+
activeExecutions metric.Int64UpDownCounter
31+
suspendedExecutions metric.Int64UpDownCounter
32+
suspensionsPerExec metric.Int64Histogram
33+
executionDurationMs metric.Int64Histogram
34+
memoryBytes metric.Int64Histogram
35+
}
36+
37+
func newModuleMetrics() (*moduleMetrics, error) {
38+
meter := beholder.GetMeter()
39+
40+
activeExecutions, err := meter.Int64UpDownCounter("platform_wasm_host_active_executions",
41+
metric.WithDescription("Number of wasm module executions currently running"),
42+
metric.WithUnit("{execution}"),
43+
)
44+
if err != nil {
45+
return nil, fmt.Errorf("failed to create active_executions counter: %w", err)
46+
}
47+
48+
suspendedExecutions, err := meter.Int64UpDownCounter("platform_wasm_host_suspended_executions",
49+
metric.WithDescription("Number of wasm module executions currently suspended waiting for capability responses"),
50+
metric.WithUnit("{execution}"),
51+
)
52+
if err != nil {
53+
return nil, fmt.Errorf("failed to create suspended_executions counter: %w", err)
54+
}
55+
56+
suspensionsPerExec, err := meter.Int64Histogram("platform_wasm_host_suspensions_per_execution",
57+
metric.WithDescription("Number of times an execution suspended to await capability responses before completing"),
58+
metric.WithUnit("{suspension}"),
59+
metric.WithExplicitBucketBoundaries(0, 1, 2, 3, 5, 10, 20, 50, 100),
60+
)
61+
if err != nil {
62+
return nil, fmt.Errorf("failed to create suspensions_per_execution histogram: %w", err)
63+
}
64+
65+
executionDurationMs, err := meter.Int64Histogram("platform_wasm_host_execution_duration_ms",
66+
metric.WithDescription("Wall-clock time spent in an execution, by phase (wasm, waiting) plus the end-to-end total"),
67+
metric.WithUnit("ms"),
68+
metric.WithExplicitBucketBoundaries(1, 5, 10, 50, 100, 250, 500, 1_000, 2_000, 5_000, 10_000, 30_000, 60_000, 120_000, 300_000, 600_000),
69+
)
70+
if err != nil {
71+
return nil, fmt.Errorf("failed to create execution_duration_ms histogram: %w", err)
72+
}
73+
74+
memoryBytes, err := meter.Int64Histogram("platform_wasm_host_memory_bytes",
75+
metric.WithDescription("Peak linear memory in bytes used by the wasm module across an execution"),
76+
metric.WithUnit("By"),
77+
metric.WithExplicitBucketBoundaries(1<<20, 4<<20, 16<<20, 32<<20, 64<<20, 128<<20, 256<<20, 512<<20, 1<<30),
78+
)
79+
if err != nil {
80+
return nil, fmt.Errorf("failed to create memory_bytes histogram: %w", err)
81+
}
82+
83+
return &moduleMetrics{
84+
activeExecutions: activeExecutions,
85+
suspendedExecutions: suspendedExecutions,
86+
suspensionsPerExec: suspensionsPerExec,
87+
executionDurationMs: executionDurationMs,
88+
memoryBytes: memoryBytes,
89+
}, nil
90+
}
91+
92+
// suspensionEnabledAttr tags a metric with whether the execution has
93+
// suspend/resume-on-await enabled, so the two populations can be distinguished.
94+
func suspensionEnabledAttr(suspensionEnabled bool) attribute.KeyValue {
95+
return attribute.Bool("suspension_enabled", suspensionEnabled)
96+
}
97+
98+
// IncActiveExecutions marks an execution as started (a).
99+
func (m *moduleMetrics) IncActiveExecutions(ctx context.Context, suspensionEnabled bool) {
100+
m.activeExecutions.Add(ctx, 1, metric.WithAttributes(suspensionEnabledAttr(suspensionEnabled)))
101+
}
102+
103+
// DecActiveExecutions marks an execution as finished (a).
104+
func (m *moduleMetrics) DecActiveExecutions(ctx context.Context, suspensionEnabled bool) {
105+
m.activeExecutions.Add(ctx, -1, metric.WithAttributes(suspensionEnabledAttr(suspensionEnabled)))
106+
}
107+
108+
// IncSuspendedExecutions marks an execution as suspended, waiting for capability
109+
// responses (b).
110+
func (m *moduleMetrics) IncSuspendedExecutions(ctx context.Context, suspensionEnabled bool) {
111+
m.suspendedExecutions.Add(ctx, 1, metric.WithAttributes(suspensionEnabledAttr(suspensionEnabled)))
112+
}
113+
114+
// DecSuspendedExecutions marks a suspended execution as resumed (b).
115+
func (m *moduleMetrics) DecSuspendedExecutions(ctx context.Context, suspensionEnabled bool) {
116+
m.suspendedExecutions.Add(ctx, -1, metric.WithAttributes(suspensionEnabledAttr(suspensionEnabled)))
117+
}
118+
119+
// RecordSuspensions records how many times an execution suspended before
120+
// completing (c).
121+
func (m *moduleMetrics) RecordSuspensions(ctx context.Context, suspensionEnabled bool, suspensions int64) {
122+
m.suspensionsPerExec.Record(ctx, suspensions, metric.WithAttributes(suspensionEnabledAttr(suspensionEnabled)))
123+
}
124+
125+
// RecordExecutionPhase records the wall-clock time spent in a single phase of an
126+
// execution (d). phase is one of phaseWasm, phaseWaiting or phaseTotal.
127+
func (m *moduleMetrics) RecordExecutionPhase(ctx context.Context, suspensionEnabled bool, phase string, d time.Duration) {
128+
m.executionDurationMs.Record(ctx, d.Milliseconds(),
129+
metric.WithAttributes(suspensionEnabledAttr(suspensionEnabled), attribute.String("phase", phase)),
130+
)
131+
}
132+
133+
// RecordMemory records the peak linear memory used by an execution (e). Note
134+
// that the CPU-time counterpart of (e) is measured as wall-clock time spent in
135+
// wasm - the phaseWasm bucket of the execution duration histogram (d).
136+
func (m *moduleMetrics) RecordMemory(ctx context.Context, suspensionEnabled bool, memoryBytes int64) {
137+
m.memoryBytes.Record(ctx, memoryBytes, metric.WithAttributes(suspensionEnabledAttr(suspensionEnabled)))
138+
}

pkg/workflows/wasm/host/module.go

Lines changed: 62 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,8 @@ type module struct {
132132

133133
cfg *ModuleConfig
134134

135+
metrics *moduleMetrics
136+
135137
wg sync.WaitGroup
136138
stopCh chan struct{}
137139

@@ -394,11 +396,17 @@ func newModule(modCfg *ModuleConfig, binary []byte) (*module, error) {
394396

395397
modCfg.SdkLabeler(v2ImportName)
396398

399+
metrics, err := newModuleMetrics()
400+
if err != nil {
401+
return nil, fmt.Errorf("error creating module metrics: %w", err)
402+
}
403+
397404
m := &module{
398405
engine: engine,
399406
module: mod,
400407
wconfig: cfg,
401408
cfg: modCfg,
409+
metrics: metrics,
402410
stopCh: make(chan struct{}),
403411
v2ImportName: v2ImportName,
404412
}
@@ -626,6 +634,27 @@ func (m *module) Execute(ctx context.Context, req *sdkpb.ExecuteRequest, executo
626634
defer cancel()
627635
overallStart := time.Now()
628636

637+
m.metrics.IncActiveExecutions(ctx, req.SuspendOnAwait)
638+
defer m.metrics.DecActiveExecutions(ctx, req.SuspendOnAwait)
639+
640+
// Accumulated across (re)starts and emitted once the execution completes,
641+
// regardless of outcome. wasmDuration is the wall-clock time spent executing
642+
// guest code (which is also the CPU-time signal), waitDuration the time spent
643+
// suspended waiting for capability responses, and suspensions the number of
644+
// times the execution suspended.
645+
var (
646+
wasmDuration time.Duration
647+
waitDuration time.Duration
648+
suspensions int64
649+
)
650+
defer func() {
651+
m.metrics.RecordExecutionPhase(ctx, req.SuspendOnAwait, phaseWasm, wasmDuration)
652+
m.metrics.RecordExecutionPhase(ctx, req.SuspendOnAwait, phaseWaiting, waitDuration)
653+
m.metrics.RecordExecutionPhase(ctx, req.SuspendOnAwait, phaseTotal, time.Since(overallStart))
654+
m.metrics.RecordSuspensions(ctx, req.SuspendOnAwait, suspensions)
655+
m.metrics.RecordMemory(ctx, req.SuspendOnAwait, exec.peakMemoryBytes)
656+
}()
657+
629658
for {
630659
// Each (re)start is bounded by the time left in the overall timeout, so a
631660
// resumed run - and the wait for its capability responses - cannot extend
@@ -636,6 +665,7 @@ func (m *module) Execute(ctx context.Context, req *sdkpb.ExecuteRequest, executo
636665
}
637666

638667
executionDuration, err := m.callWasm(remaining, req, linkNoDAG, exec)
668+
wasmDuration += executionDuration
639669

640670
switch {
641671
case containsCode(err, wasm.CodeSuccess):
@@ -645,17 +675,30 @@ func (m *module) Execute(ctx context.Context, req *sdkpb.ExecuteRequest, executo
645675
return exec.response, nil
646676
case isSuspendTrap(err):
647677
m.cfg.Logger.Debugw("received suspension, awaiting responses", "executionID", executionId)
678+
suspensions++
679+
// The execution is suspended for as long as we wait for its pending
680+
// capability responses; track it as such for the duration of the wait.
681+
m.metrics.IncSuspendedExecutions(ctx, req.SuspendOnAwait)
682+
waitStart := time.Now()
648683
// Wait for the pending capability responses, then resume on the next
649684
// loop iteration with a fresh store.
650-
for _, id := range exec.awaiting {
651-
ar, ok := exec.capabilityResponses[id]
652-
if !ok {
653-
return nil, fmt.Errorf("missing capability response for awaited callback %d", id)
654-
}
655-
656-
if _, werr := ar.wait(ctxWithTimeout); werr != nil {
657-
return nil, werr
685+
werr := func() error {
686+
for _, id := range exec.awaiting {
687+
ar, ok := exec.capabilityResponses[id]
688+
if !ok {
689+
return fmt.Errorf("missing capability response for awaited callback %d", id)
690+
}
691+
692+
if _, werr := ar.wait(ctxWithTimeout); werr != nil {
693+
return werr
694+
}
658695
}
696+
return nil
697+
}()
698+
waitDuration += time.Since(waitStart)
699+
m.metrics.DecSuspendedExecutions(ctx, req.SuspendOnAwait)
700+
if werr != nil {
701+
return nil, werr
659702
}
660703

661704
continue
@@ -813,6 +856,17 @@ func callWasm[I, O proto.Message](
813856
startTime := time.Now()
814857
_, err = start.Call(store)
815858
executionDuration := time.Since(startTime)
859+
860+
// Capture the linear memory the guest grew to before the store is closed, so
861+
// Execute can emit it as the peak memory metric.
862+
if mem := instance.GetExport(store, "memory"); mem != nil {
863+
if memory := mem.Memory(); memory != nil {
864+
if used := int64(memory.DataSize(store)); used > exec.peakMemoryBytes {
865+
exec.peakMemoryBytes = used
866+
}
867+
}
868+
}
869+
816870
return executionDuration, err
817871
}
818872

0 commit comments

Comments
 (0)