Skip to content

Commit 81f0464

Browse files
Add metrics
1 parent a0051e7 commit 81f0464

3 files changed

Lines changed: 210 additions & 8 deletions

File tree

pkg/workflows/wasm/host/execution.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ type execution[T any] struct {
4040
donLogCount uint32
4141
nodeLogCount uint32
4242
awaiting []int32
43+
// peakMemoryBytes is the largest linear memory observed across (re)starts.
44+
// It is populated by callWasm and read by Execute to emit the memory metric.
45+
peakMemoryBytes int64
4346
// suspendOnAwait gates the suspend/resume behaviour. When false, the
4447
// execution behaves as it did before suspension was introduced:
4548
// awaitCapabilities blocks until each response is available and callCapAsync

pkg/workflows/wasm/host/metrics.go

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package host
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"go.opentelemetry.io/otel/attribute"
9+
"go.opentelemetry.io/otel/metric"
10+
11+
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
12+
)
13+
14+
// Execution phase labels for the execution duration histogram. An execution's
15+
// wall-clock time is attributed to exactly one of these at any instant:
16+
// - phaseWasm: executing guest wasm code inside callWasm.
17+
// - phaseWaiting: suspended, waiting for pending capability responses.
18+
// - phaseParked: everything else between (re)starts - store setup/teardown,
19+
// (de)serialisation and scheduling overhead.
20+
const (
21+
phaseWasm = "wasm"
22+
phaseWaiting = "waiting"
23+
phaseParked = "parked"
24+
)
25+
26+
// moduleMetrics holds the beholder instruments used to observe wasm module
27+
// executions. Instrument names are shared process-wide, so multiple modules can
28+
// safely construct their own moduleMetrics; the meter returns the same
29+
// underlying instrument for a given name.
30+
type moduleMetrics struct {
31+
activeExecutions metric.Int64UpDownCounter
32+
suspendedExecutions metric.Int64UpDownCounter
33+
suspensionsPerExec metric.Int64Histogram
34+
executionDurationMs metric.Int64Histogram
35+
memoryBytes metric.Int64Histogram
36+
}
37+
38+
func newModuleMetrics() (*moduleMetrics, error) {
39+
meter := beholder.GetMeter()
40+
41+
activeExecutions, err := meter.Int64UpDownCounter("platform_wasm_host_active_executions",
42+
metric.WithDescription("Number of wasm module executions currently running"),
43+
metric.WithUnit("{execution}"),
44+
)
45+
if err != nil {
46+
return nil, fmt.Errorf("failed to create active_executions counter: %w", err)
47+
}
48+
49+
suspendedExecutions, err := meter.Int64UpDownCounter("platform_wasm_host_suspended_executions",
50+
metric.WithDescription("Number of wasm module executions currently suspended waiting for capability responses"),
51+
metric.WithUnit("{execution}"),
52+
)
53+
if err != nil {
54+
return nil, fmt.Errorf("failed to create suspended_executions counter: %w", err)
55+
}
56+
57+
suspensionsPerExec, err := meter.Int64Histogram("platform_wasm_host_suspensions_per_execution",
58+
metric.WithDescription("Number of times an execution suspended to await capability responses before completing"),
59+
metric.WithUnit("{suspension}"),
60+
metric.WithExplicitBucketBoundaries(0, 1, 2, 3, 5, 10, 20, 50, 100),
61+
)
62+
if err != nil {
63+
return nil, fmt.Errorf("failed to create suspensions_per_execution histogram: %w", err)
64+
}
65+
66+
executionDurationMs, err := meter.Int64Histogram("platform_wasm_host_execution_duration_ms",
67+
metric.WithDescription("Wall-clock time spent in an execution, broken down by phase (wasm, waiting, parked)"),
68+
metric.WithUnit("ms"),
69+
metric.WithExplicitBucketBoundaries(1, 5, 10, 50, 100, 250, 500, 1_000, 2_000, 5_000, 10_000, 30_000, 60_000, 120_000, 300_000, 600_000),
70+
)
71+
if err != nil {
72+
return nil, fmt.Errorf("failed to create execution_duration_ms histogram: %w", err)
73+
}
74+
75+
memoryBytes, err := meter.Int64Histogram("platform_wasm_host_memory_bytes",
76+
metric.WithDescription("Peak linear memory in bytes used by the wasm module across an execution"),
77+
metric.WithUnit("By"),
78+
metric.WithExplicitBucketBoundaries(1<<20, 4<<20, 16<<20, 32<<20, 64<<20, 128<<20, 256<<20, 512<<20, 1<<30),
79+
)
80+
if err != nil {
81+
return nil, fmt.Errorf("failed to create memory_bytes histogram: %w", err)
82+
}
83+
84+
return &moduleMetrics{
85+
activeExecutions: activeExecutions,
86+
suspendedExecutions: suspendedExecutions,
87+
suspensionsPerExec: suspensionsPerExec,
88+
executionDurationMs: executionDurationMs,
89+
memoryBytes: memoryBytes,
90+
}, nil
91+
}
92+
93+
// suspensionEnabledAttr tags a metric with whether the execution has
94+
// suspend/resume-on-await enabled, so the two populations can be distinguished.
95+
func suspensionEnabledAttr(suspensionEnabled bool) attribute.KeyValue {
96+
return attribute.Bool("suspension_enabled", suspensionEnabled)
97+
}
98+
99+
// IncActiveExecutions marks an execution as started (a).
100+
func (m *moduleMetrics) IncActiveExecutions(ctx context.Context, suspensionEnabled bool) {
101+
m.activeExecutions.Add(ctx, 1, metric.WithAttributes(suspensionEnabledAttr(suspensionEnabled)))
102+
}
103+
104+
// DecActiveExecutions marks an execution as finished (a).
105+
func (m *moduleMetrics) DecActiveExecutions(ctx context.Context, suspensionEnabled bool) {
106+
m.activeExecutions.Add(ctx, -1, metric.WithAttributes(suspensionEnabledAttr(suspensionEnabled)))
107+
}
108+
109+
// IncSuspendedExecutions marks an execution as suspended, waiting for capability
110+
// responses (b).
111+
func (m *moduleMetrics) IncSuspendedExecutions(ctx context.Context, suspensionEnabled bool) {
112+
m.suspendedExecutions.Add(ctx, 1, metric.WithAttributes(suspensionEnabledAttr(suspensionEnabled)))
113+
}
114+
115+
// DecSuspendedExecutions marks a suspended execution as resumed (b).
116+
func (m *moduleMetrics) DecSuspendedExecutions(ctx context.Context, suspensionEnabled bool) {
117+
m.suspendedExecutions.Add(ctx, -1, metric.WithAttributes(suspensionEnabledAttr(suspensionEnabled)))
118+
}
119+
120+
// RecordSuspensions records how many times an execution suspended before
121+
// completing (c).
122+
func (m *moduleMetrics) RecordSuspensions(ctx context.Context, suspensionEnabled bool, suspensions int64) {
123+
m.suspensionsPerExec.Record(ctx, suspensions, metric.WithAttributes(suspensionEnabledAttr(suspensionEnabled)))
124+
}
125+
126+
// RecordExecutionPhase records the wall-clock time spent in a single phase of an
127+
// execution (d). phase is one of phaseWasm, phaseWaiting or phaseParked.
128+
func (m *moduleMetrics) RecordExecutionPhase(ctx context.Context, suspensionEnabled bool, phase string, d time.Duration) {
129+
m.executionDurationMs.Record(ctx, d.Milliseconds(),
130+
metric.WithAttributes(suspensionEnabledAttr(suspensionEnabled), attribute.String("phase", phase)),
131+
)
132+
}
133+
134+
// RecordMemory records the peak linear memory used by an execution (e). Note
135+
// that the CPU-time counterpart of (e) is measured as wall-clock time spent in
136+
// wasm - the phaseWasm bucket of the execution duration histogram (d).
137+
func (m *moduleMetrics) RecordMemory(ctx context.Context, suspensionEnabled bool, memoryBytes int64) {
138+
m.memoryBytes.Record(ctx, memoryBytes, metric.WithAttributes(suspensionEnabledAttr(suspensionEnabled)))
139+
}

pkg/workflows/wasm/host/module.go

Lines changed: 68 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,8 @@ type module struct {
132132

133133
cfg *ModuleConfig
134134

135+
metrics *moduleMetrics
136+
135137
wg sync.WaitGroup
136138
stopCh chan struct{}
137139

@@ -394,11 +396,17 @@ func newModule(modCfg *ModuleConfig, binary []byte) (*module, error) {
394396

395397
modCfg.SdkLabeler(v2ImportName)
396398

399+
metrics, err := newModuleMetrics()
400+
if err != nil {
401+
return nil, fmt.Errorf("error creating module metrics: %w", err)
402+
}
403+
397404
m := &module{
398405
engine: engine,
399406
module: mod,
400407
wconfig: cfg,
401408
cfg: modCfg,
409+
metrics: metrics,
402410
stopCh: make(chan struct{}),
403411
v2ImportName: v2ImportName,
404412
}
@@ -626,6 +634,33 @@ func (m *module) Execute(ctx context.Context, req *sdkpb.ExecuteRequest, executo
626634
defer cancel()
627635
overallStart := time.Now()
628636

637+
m.metrics.IncActiveExecutions(ctx, req.SuspendOnAwait)
638+
defer m.metrics.DecActiveExecutions(ctx, req.SuspendOnAwait)
639+
640+
// Accumulated across (re)starts and emitted once the execution completes,
641+
// regardless of outcome. wasmDuration is the wall-clock time spent executing
642+
// guest code (which is also the CPU-time signal), waitDuration the time spent
643+
// suspended waiting for capability responses, and suspensions the number of
644+
// times the execution suspended.
645+
var (
646+
wasmDuration time.Duration
647+
waitDuration time.Duration
648+
suspensions int64
649+
)
650+
defer func() {
651+
// The remaining wall-clock time - store setup/teardown, (de)serialisation
652+
// and scheduling - is attributed to the "parked" phase.
653+
parked := time.Since(overallStart) - wasmDuration - waitDuration
654+
if parked < 0 {
655+
parked = 0
656+
}
657+
m.metrics.RecordExecutionPhase(ctx, req.SuspendOnAwait, phaseWasm, wasmDuration)
658+
m.metrics.RecordExecutionPhase(ctx, req.SuspendOnAwait, phaseWaiting, waitDuration)
659+
m.metrics.RecordExecutionPhase(ctx, req.SuspendOnAwait, phaseParked, parked)
660+
m.metrics.RecordSuspensions(ctx, req.SuspendOnAwait, suspensions)
661+
m.metrics.RecordMemory(ctx, req.SuspendOnAwait, exec.peakMemoryBytes)
662+
}()
663+
629664
for {
630665
// Each (re)start is bounded by the time left in the overall timeout, so a
631666
// resumed run - and the wait for its capability responses - cannot extend
@@ -636,6 +671,7 @@ func (m *module) Execute(ctx context.Context, req *sdkpb.ExecuteRequest, executo
636671
}
637672

638673
executionDuration, err := m.callWasm(remaining, req, linkNoDAG, exec)
674+
wasmDuration += executionDuration
639675

640676
switch {
641677
case containsCode(err, wasm.CodeSuccess):
@@ -645,17 +681,30 @@ func (m *module) Execute(ctx context.Context, req *sdkpb.ExecuteRequest, executo
645681
return exec.response, nil
646682
case isSuspendTrap(err):
647683
m.cfg.Logger.Debugw("received suspension, awaiting responses", "executionID", executionId)
684+
suspensions++
685+
// The execution is suspended for as long as we wait for its pending
686+
// capability responses; track it as such for the duration of the wait.
687+
m.metrics.IncSuspendedExecutions(ctx, req.SuspendOnAwait)
688+
waitStart := time.Now()
648689
// Wait for the pending capability responses, then resume on the next
649690
// loop iteration with a fresh store.
650-
for _, id := range exec.awaiting {
651-
ar, ok := exec.capabilityResponses[id]
652-
if !ok {
653-
return nil, fmt.Errorf("missing capability response for awaited callback %d", id)
654-
}
655-
656-
if _, werr := ar.wait(ctxWithTimeout); werr != nil {
657-
return nil, werr
691+
werr := func() error {
692+
for _, id := range exec.awaiting {
693+
ar, ok := exec.capabilityResponses[id]
694+
if !ok {
695+
return fmt.Errorf("missing capability response for awaited callback %d", id)
696+
}
697+
698+
if _, werr := ar.wait(ctxWithTimeout); werr != nil {
699+
return werr
700+
}
658701
}
702+
return nil
703+
}()
704+
waitDuration += time.Since(waitStart)
705+
m.metrics.DecSuspendedExecutions(ctx, req.SuspendOnAwait)
706+
if werr != nil {
707+
return nil, werr
659708
}
660709

661710
continue
@@ -813,6 +862,17 @@ func callWasm[I, O proto.Message](
813862
startTime := time.Now()
814863
_, err = start.Call(store)
815864
executionDuration := time.Since(startTime)
865+
866+
// Capture the linear memory the guest grew to before the store is closed, so
867+
// Execute can emit it as the peak memory metric.
868+
if mem := instance.GetExport(store, "memory"); mem != nil {
869+
if memory := mem.Memory(); memory != nil {
870+
if used := int64(memory.DataSize(store)); used > exec.peakMemoryBytes {
871+
exec.peakMemoryBytes = used
872+
}
873+
}
874+
}
875+
816876
return executionDuration, err
817877
}
818878

0 commit comments

Comments
 (0)