Skip to content

Commit 9a6442e

Browse files
Add metrics
1 parent a0051e7 commit 9a6442e

2 files changed

Lines changed: 71 additions & 8 deletions

File tree

pkg/workflows/wasm/host/execution.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ type execution[T any] struct {
4040
donLogCount uint32
4141
nodeLogCount uint32
4242
awaiting []int32
43+
// peakMemoryBytes is the largest linear memory observed across (re)starts.
44+
// It is populated by callWasm and read by Execute to emit the memory metric.
45+
peakMemoryBytes int64
4346
// suspendOnAwait gates the suspend/resume behaviour. When false, the
4447
// execution behaves as it did before suspension was introduced:
4548
// awaitCapabilities blocks until each response is available and callCapAsync

pkg/workflows/wasm/host/module.go

Lines changed: 68 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,8 @@ type module struct {
132132

133133
cfg *ModuleConfig
134134

135+
metrics *moduleMetrics
136+
135137
wg sync.WaitGroup
136138
stopCh chan struct{}
137139

@@ -394,11 +396,17 @@ func newModule(modCfg *ModuleConfig, binary []byte) (*module, error) {
394396

395397
modCfg.SdkLabeler(v2ImportName)
396398

399+
metrics, err := newModuleMetrics()
400+
if err != nil {
401+
return nil, fmt.Errorf("error creating module metrics: %w", err)
402+
}
403+
397404
m := &module{
398405
engine: engine,
399406
module: mod,
400407
wconfig: cfg,
401408
cfg: modCfg,
409+
metrics: metrics,
402410
stopCh: make(chan struct{}),
403411
v2ImportName: v2ImportName,
404412
}
@@ -626,6 +634,33 @@ func (m *module) Execute(ctx context.Context, req *sdkpb.ExecuteRequest, executo
626634
defer cancel()
627635
overallStart := time.Now()
628636

637+
m.metrics.IncActiveExecutions(ctx, req.SuspendOnAwait)
638+
defer m.metrics.DecActiveExecutions(ctx, req.SuspendOnAwait)
639+
640+
// Accumulated across (re)starts and emitted once the execution completes,
641+
// regardless of outcome. wasmDuration is the wall-clock time spent executing
642+
// guest code (which is also the CPU-time signal), waitDuration the time spent
643+
// suspended waiting for capability responses, and suspensions the number of
644+
// times the execution suspended.
645+
var (
646+
wasmDuration time.Duration
647+
waitDuration time.Duration
648+
suspensions int64
649+
)
650+
defer func() {
651+
// The remaining wall-clock time - store setup/teardown, (de)serialisation
652+
// and scheduling - is attributed to the "parked" phase.
653+
parked := time.Since(overallStart) - wasmDuration - waitDuration
654+
if parked < 0 {
655+
parked = 0
656+
}
657+
m.metrics.RecordExecutionPhase(ctx, req.SuspendOnAwait, phaseWasm, wasmDuration)
658+
m.metrics.RecordExecutionPhase(ctx, req.SuspendOnAwait, phaseWaiting, waitDuration)
659+
m.metrics.RecordExecutionPhase(ctx, req.SuspendOnAwait, phaseParked, parked)
660+
m.metrics.RecordSuspensions(ctx, req.SuspendOnAwait, suspensions)
661+
m.metrics.RecordMemory(ctx, req.SuspendOnAwait, exec.peakMemoryBytes)
662+
}()
663+
629664
for {
630665
// Each (re)start is bounded by the time left in the overall timeout, so a
631666
// resumed run - and the wait for its capability responses - cannot extend
@@ -636,6 +671,7 @@ func (m *module) Execute(ctx context.Context, req *sdkpb.ExecuteRequest, executo
636671
}
637672

638673
executionDuration, err := m.callWasm(remaining, req, linkNoDAG, exec)
674+
wasmDuration += executionDuration
639675

640676
switch {
641677
case containsCode(err, wasm.CodeSuccess):
@@ -645,17 +681,30 @@ func (m *module) Execute(ctx context.Context, req *sdkpb.ExecuteRequest, executo
645681
return exec.response, nil
646682
case isSuspendTrap(err):
647683
m.cfg.Logger.Debugw("received suspension, awaiting responses", "executionID", executionId)
684+
suspensions++
685+
// The execution is suspended for as long as we wait for its pending
686+
// capability responses; track it as such for the duration of the wait.
687+
m.metrics.IncSuspendedExecutions(ctx, req.SuspendOnAwait)
688+
waitStart := time.Now()
648689
// Wait for the pending capability responses, then resume on the next
649690
// loop iteration with a fresh store.
650-
for _, id := range exec.awaiting {
651-
ar, ok := exec.capabilityResponses[id]
652-
if !ok {
653-
return nil, fmt.Errorf("missing capability response for awaited callback %d", id)
654-
}
655-
656-
if _, werr := ar.wait(ctxWithTimeout); werr != nil {
657-
return nil, werr
691+
werr := func() error {
692+
for _, id := range exec.awaiting {
693+
ar, ok := exec.capabilityResponses[id]
694+
if !ok {
695+
return fmt.Errorf("missing capability response for awaited callback %d", id)
696+
}
697+
698+
if _, werr := ar.wait(ctxWithTimeout); werr != nil {
699+
return werr
700+
}
658701
}
702+
return nil
703+
}()
704+
waitDuration += time.Since(waitStart)
705+
m.metrics.DecSuspendedExecutions(ctx, req.SuspendOnAwait)
706+
if werr != nil {
707+
return nil, werr
659708
}
660709

661710
continue
@@ -813,6 +862,17 @@ func callWasm[I, O proto.Message](
813862
startTime := time.Now()
814863
_, err = start.Call(store)
815864
executionDuration := time.Since(startTime)
865+
866+
// Capture the linear memory the guest grew to before the store is closed, so
867+
// Execute can emit it as the peak memory metric.
868+
if mem := instance.GetExport(store, "memory"); mem != nil {
869+
if memory := mem.Memory(); memory != nil {
870+
if used := int64(memory.DataSize(store)); used > exec.peakMemoryBytes {
871+
exec.peakMemoryBytes = used
872+
}
873+
}
874+
}
875+
816876
return executionDuration, err
817877
}
818878

0 commit comments

Comments
 (0)