Skip to content

Commit

Permalink
agent: Add per-VM metric for desired CU(s)
Browse files Browse the repository at this point in the history
This commit adds a new per-VM metric: autoscaling_vm_desired_cu.

It's based on the same "desired CU" information exposed by the scaling
event reporting, but updated continuously instead of being rate limited
to avoid spamming our reporting.

The metric has the same base labels as the other per-VM metrics, with
the addition of the "reason" label, which is one of:

* "total" - the goal CU, after taking the maximum of the individual
  parts and rounding up to the next unit.
* "cpu" - goal CU size in order to fit the current CPU usage
* "mem" - goal CU size in order to fit the current memory usage, which
  includes some assesssment
* "lfc" - goal CU size in order to fit the estimated working set size

All of these values are also multiplied by the same Compute Unit factor
as with the normal scaling event reporting, so that Neon's fractional
compute units are exposed as such in the metrics, even as we use integer
compute units in the autoscaler-agent.

Also note that all values except "total" are NOT rounded, and instead
show the fractional amounts to allow better comparison.

KNOWN LIMITATION: If ReportDesiredScaling is disabled at runtime for a
particular VM, the metrics will not be cleared, and instead will just
cease to be updated. I figured this is a reasonable trade-off for
simplicity.
  • Loading branch information
sharnoff committed Oct 12, 2024
1 parent 693b601 commit c9f1d75
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 13 deletions.
2 changes: 1 addition & 1 deletion autoscaler-agent/config_map.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ data:
"scalingEvents": {
"cuMultiplier": 0.25,
"clients": {}
}
},
"monitor": {
"serverPort": 10301,
"responseTimeoutSeconds": 5,
Expand Down
13 changes: 10 additions & 3 deletions pkg/agent/core/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,14 @@ func (s *state) desiredResourcesFromMetricsOrRequestedUpscaling(now time.Time) (
// 3. that's it!

reportGoals := func(goalCU uint32, parts scalingevents.GoalCUComponents) {
panic("todo")
currentCU, ok := s.VM.Using().DivResources(s.Config.ComputeUnit)
if !ok {
return // skip reporting if the current CU is not right.
}

if report := s.Config.ObservabilityCallbacks.DesiredScaling; report != nil {
report(now, uint32(currentCU), goalCU, parts)
}
}

sg, goalCULogFields := calculateGoalCU(
Expand Down Expand Up @@ -1232,12 +1239,12 @@ func (s *State) NeonVM() NeonVMHandle {
}

func (h NeonVMHandle) StartingRequest(now time.Time, resources api.Resources) {
if f := h.s.Config.ObservabilityCallbacks.ScalingEvent; f != nil {
if report := h.s.Config.ObservabilityCallbacks.ScalingEvent; report != nil {
currentCU, currentOk := h.s.VM.Using().DivResources(h.s.Config.ComputeUnit)
targetCU, targetOk := resources.DivResources(h.s.Config.ComputeUnit)

if currentOk && targetOk {
f(now, uint32(currentCU), uint32(targetCU))
report(now, uint32(currentCU), uint32(targetCU))
}
}

Expand Down
8 changes: 7 additions & 1 deletion pkg/agent/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,13 @@ func (r MainRunner) Run(logger *zap.Logger, ctx context.Context) error {
return fmt.Errorf("Error creating scaling events reporter: %w", err)
}

globalState, globalPromReg := r.newAgentState(logger, r.EnvArgs.K8sPodIP, schedTracker, scalingReporter)
globalState, globalPromReg := r.newAgentState(
logger,
r.EnvArgs.K8sPodIP,
schedTracker,
perVMMetrics,
scalingReporter,
)
watchMetrics.MustRegister(globalPromReg)

logger.Info("Starting billing metrics collector")
Expand Down
3 changes: 3 additions & 0 deletions pkg/agent/globalstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type agentState struct {
vmClient *vmclient.Clientset
schedTracker *schedwatch.SchedulerTracker
metrics GlobalMetrics
vmMetrics *PerVMMetrics

scalingReporter *scalingevents.Reporter
}
Expand All @@ -49,6 +50,7 @@ func (r MainRunner) newAgentState(
baseLogger *zap.Logger,
podIP string,
schedTracker *schedwatch.SchedulerTracker,
perVMMetrics *PerVMMetrics,
scalingReporter *scalingevents.Reporter,
) (*agentState, *prometheus.Registry) {
metrics, promReg := makeGlobalMetrics()
Expand All @@ -63,6 +65,7 @@ func (r MainRunner) newAgentState(
podIP: podIP,
schedTracker: schedTracker,
metrics: metrics,
vmMetrics: perVMMetrics,

scalingReporter: scalingReporter,
}
Expand Down
79 changes: 77 additions & 2 deletions pkg/agent/prommetrics.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package agent

import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/samber/lo"

vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
"github.com/neondatabase/autoscaling/pkg/agent/core/revsource"
"github.com/neondatabase/autoscaling/pkg/agent/scalingevents"
"github.com/neondatabase/autoscaling/pkg/util"
)

Expand Down Expand Up @@ -342,9 +345,22 @@ func WrapHistogramVec(hist *prometheus.HistogramVec) revsource.ObserveCallback {
}

type PerVMMetrics struct {
// activeMu and activeVMs exist to track the set of VMs currently represented in the metrics, so
// that when we set the desired CU from internal information, we can check whether the VM still
// exists.
// Otherwise it's not possible to prevent data races that would result in leaking metric labels.
activeMu sync.Mutex
activeVMs map[util.NamespacedName]vmMetadata

cpu *prometheus.GaugeVec
memory *prometheus.GaugeVec
restartCount *prometheus.GaugeVec
desiredCU *prometheus.GaugeVec
}

type vmMetadata struct {
endpointID string
projectID string
}

type vmResourceValueType string
Expand All @@ -358,10 +374,13 @@ const (
vmResourceValueAutoscalingMax vmResourceValueType = "autoscaling_max"
)

func makePerVMMetrics() (PerVMMetrics, *prometheus.Registry) {
func makePerVMMetrics() (*PerVMMetrics, *prometheus.Registry) {
reg := prometheus.NewRegistry()

metrics := PerVMMetrics{
metrics := &PerVMMetrics{
activeMu: sync.Mutex{},
activeVMs: make(map[util.NamespacedName]vmMetadata),

cpu: util.RegisterMetric(reg, prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "autoscaling_vm_cpu_cores",
Expand Down Expand Up @@ -400,6 +419,19 @@ func makePerVMMetrics() (PerVMMetrics, *prometheus.Registry) {
"project_id", // .metadata.labels["neon/project-id"]
},
)),
desiredCU: util.RegisterMetric(reg, prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "autoscaling_vm_desired_cu",
Help: "Amount of Compute Units desired for a VM: the total, and the components for cpu, memory, and LFC",
},
[]string{
"vm_namespace", // .metadata.namespace
"vm_name", // .metadata.name
"endpoint_id", // .metadata.labels["neon/endpoint-id"]
"project_id", // .metadata.labels["neon/project-id"]
"reason", // desiredCUReason: total, cpu, mem, lfc
},
)),
}

return metrics, reg
Expand All @@ -424,3 +456,46 @@ type vmMetric struct {
labels prometheus.Labels
value float64
}

func (m *PerVMMetrics) updateDesiredCU(
vm util.NamespacedName,
conversionFactor float64,
total uint32,
parts scalingevents.GoalCUComponents,
) {
m.activeMu.Lock()
defer m.activeMu.Unlock()

// Don't do anything if this VM is not known. Either the relevant watch event hasn't been
// processed yet (unlikely, maybe impossible?) or it has since been deleted (in which case we
// don't want to leak metrics that won't get cleaned up)
info, ok := m.activeVMs[vm]
if !ok {
return
}

pairs := []struct {
key string
value *float64
}{
{"total", lo.ToPtr(float64(total))},
{"cpu", parts.CPU},
{"mem", parts.Mem},
{"lfc", parts.LFC},
}

for _, p := range pairs {
labels := prometheus.Labels{
"vm_namespace": vm.Namespace,
"vm_name": vm.Name,
"endpoint_id": info.endpointID,
"project_id": info.projectID,
"reason": p.key,
}
if p.value == nil {
m.desiredCU.Delete(labels)
} else {
m.desiredCU.With(labels).Set(*p.value * conversionFactor /* multiply to allow fractional CU in metrics */)
}
}
}
8 changes: 6 additions & 2 deletions pkg/agent/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,12 @@ func (r *Runner) reportDesiredScaling(
return
}

// TODO: Use this opportunity to report the desired scaling in the per-VM
// metrics.
r.global.vmMetrics.updateDesiredCU(
r.vmName,
r.global.config.ScalingEvents.CUMultiplier, // have to multiply before exposing as metrics here.
targetCU,
parts,
)

rl.report(r.global.scalingReporter, timestamp, endpointID, currentCU, targetCU, parts)
}
Expand Down
34 changes: 30 additions & 4 deletions pkg/agent/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func startVMWatcher(
config *Config,
vmClient *vmclient.Clientset,
metrics watch.Metrics,
perVMMetrics PerVMMetrics,
perVMMetrics *PerVMMetrics,
nodeName string,
submitEvent func(vmEvent),
) (*watch.Store[vmapi.VirtualMachine], error) {
Expand All @@ -91,7 +91,7 @@ func startVMWatcher(
metav1.ListOptions{},
watch.HandlerFuncs[*vmapi.VirtualMachine]{
AddFunc: func(vm *vmapi.VirtualMachine, preexisting bool) {
setVMMetrics(&perVMMetrics, vm, nodeName)
setVMMetrics(perVMMetrics, vm, nodeName)

if vmIsOurResponsibility(vm, config, nodeName) {
event, err := makeVMEvent(logger, vm, vmEventAdded)
Expand All @@ -106,7 +106,7 @@ func startVMWatcher(
}
},
UpdateFunc: func(oldVM, newVM *vmapi.VirtualMachine) {
updateVMMetrics(&perVMMetrics, oldVM, newVM, nodeName)
updateVMMetrics(perVMMetrics, oldVM, newVM, nodeName)

oldIsOurs := vmIsOurResponsibility(oldVM, config, nodeName)
newIsOurs := vmIsOurResponsibility(newVM, config, nodeName)
Expand Down Expand Up @@ -140,7 +140,7 @@ func startVMWatcher(
submitEvent(event)
},
DeleteFunc: func(vm *vmapi.VirtualMachine, maybeStale bool) {
deleteVMMetrics(&perVMMetrics, vm, nodeName)
deleteVMMetrics(perVMMetrics, vm, nodeName)

if vmIsOurResponsibility(vm, config, nodeName) {
event, err := makeVMEvent(logger, vm, vmEventDeleted)
Expand Down Expand Up @@ -319,6 +319,14 @@ func setVMMetrics(perVMMetrics *PerVMMetrics, vm *vmapi.VirtualMachine, nodeName
for _, m := range restartCountMetrics {
perVMMetrics.restartCount.With(m.labels).Set(m.value)
}

// Add the VM to the internal tracker:
perVMMetrics.activeMu.Lock()
defer perVMMetrics.activeMu.Unlock()
perVMMetrics.activeVMs[util.GetNamespacedName(vm)] = vmMetadata{
endpointID: vm.Labels[endpointLabel],
projectID: vm.Labels[projectLabel],
}
}

func updateVMMetrics(perVMMetrics *PerVMMetrics, oldVM, newVM *vmapi.VirtualMachine, nodeName string) {
Expand Down Expand Up @@ -357,6 +365,14 @@ func updateVMMetrics(perVMMetrics *PerVMMetrics, oldVM, newVM *vmapi.VirtualMach
oldRestartCountMetrics := makeVMRestartMetrics(oldVM)
newRestartCountMetrics := makeVMRestartMetrics(newVM)
updateMetrics(perVMMetrics.restartCount, oldRestartCountMetrics, newRestartCountMetrics)

// Update the VM in the internal tracker:
perVMMetrics.activeMu.Lock()
defer perVMMetrics.activeMu.Unlock()
perVMMetrics.activeVMs[util.GetNamespacedName(newVM /* name can't change */)] = vmMetadata{
endpointID: newVM.Labels[endpointLabel],
projectID: newVM.Labels[projectLabel],
}
}

func deleteVMMetrics(perVMMetrics *PerVMMetrics, vm *vmapi.VirtualMachine, nodeName string) {
Expand All @@ -378,4 +394,14 @@ func deleteVMMetrics(perVMMetrics *PerVMMetrics, vm *vmapi.VirtualMachine, nodeN
for _, m := range restartCountMetrics {
perVMMetrics.restartCount.Delete(m.labels)
}

// Remove the VM from the internal tracker:
perVMMetrics.activeMu.Lock()
defer perVMMetrics.activeMu.Unlock()
delete(perVMMetrics.activeVMs, util.GetNamespacedName(vm))
// ... and any metrics that were associated with it:
perVMMetrics.desiredCU.DeletePartialMatch(prometheus.Labels{
"vm_namespace": vm.Namespace,
"vm_name": vm.Name,
})
}

0 comments on commit c9f1d75

Please sign in to comment.