Skip to content

Commit be0cb63

Browse files
committed
Adding telemetry for memgraph storage info, added k8s events, improve logs
1 parent 3b26d7b commit be0cb63

File tree

9 files changed

+473
-23
lines changed

9 files changed

+473
-23
lines changed

README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,22 @@ The operator exposes Prometheus metrics for monitoring cluster health and operat
140140
| `memgraph_validation_last_run_timestamp_seconds` | Gauge | Unix timestamp of the last validation run |
141141
| `memgraph_validation_passed` | Gauge | Whether the last validation passed (1) or not (0) |
142142

143+
### Storage Metrics (from SHOW STORAGE INFO)
144+
145+
The operator collects storage statistics from each Memgraph instance:
146+
147+
| Metric | Type | Description |
148+
|--------|------|-------------|
149+
| `memgraph_storage_vertex_count` | Gauge | Number of vertices in the database |
150+
| `memgraph_storage_edge_count` | Gauge | Number of edges in the database |
151+
| `memgraph_storage_average_degree` | Gauge | Average degree of vertices |
152+
| `memgraph_storage_memory_resident_bytes` | Gauge | Current resident memory usage |
153+
| `memgraph_storage_memory_peak_bytes` | Gauge | Peak resident memory usage |
154+
| `memgraph_storage_disk_usage_bytes` | Gauge | Disk space consumed |
155+
| `memgraph_storage_memory_tracked_bytes` | Gauge | Actively tracked memory allocation |
156+
| `memgraph_storage_allocation_limit_bytes` | Gauge | Maximum memory allocation limit |
157+
| `memgraph_storage_unreleased_delta_objects` | Gauge | Delta objects awaiting cleanup |
158+
143159
All metrics include `cluster` and `namespace` labels. Instance-level metrics also include `instance` and `role` labels.
144160

145161
## Contributing

internal/controller/events.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Copyright 2025 Base14. See LICENSE file for details.
2+
3+
package controller
4+
5+
// Event reason constants for Kubernetes events
6+
const (
7+
// Cluster lifecycle events
8+
EventReasonClusterCreated = "ClusterCreated"
9+
EventReasonClusterReady = "ClusterReady"
10+
EventReasonClusterDegraded = "ClusterDegraded"
11+
12+
// StatefulSet events
13+
EventReasonCreatingStatefulSet = "CreatingStatefulSet"
14+
EventReasonScalingStatefulSet = "ScalingStatefulSet"
15+
16+
// Replication events
17+
EventReasonMainInstanceConfigured = "MainInstanceConfigured"
18+
EventReasonReplicaRegistered = "ReplicaRegistered"
19+
EventReasonReplicaUnregistered = "ReplicaUnregistered"
20+
EventReasonReplicationHealthy = "ReplicationHealthy"
21+
EventReasonReplicationError = "ReplicationError"
22+
EventReasonReplicaUnhealthy = "ReplicaUnhealthy"
23+
EventReasonReplicationLagHigh = "ReplicationLagHigh"
24+
25+
// Failover events
26+
EventReasonMainInstanceFailed = "MainInstanceFailed"
27+
EventReasonFailoverStarted = "FailoverStarted"
28+
EventReasonFailoverCompleted = "FailoverCompleted"
29+
EventReasonFailoverFailed = "FailoverFailed"
30+
31+
// Snapshot events
32+
EventReasonSnapshotCronJobCreated = "SnapshotCronJobCreated"
33+
EventReasonSnapshotCronJobUpdated = "SnapshotCronJobUpdated"
34+
EventReasonSnapshotSucceeded = "SnapshotSucceeded"
35+
EventReasonSnapshotFailed = "SnapshotFailed"
36+
37+
// S3 backup events
38+
EventReasonS3BackupSucceeded = "S3BackupSucceeded"
39+
EventReasonS3BackupFailed = "S3BackupFailed"
40+
41+
// Health check events
42+
EventReasonHealthCheckPassed = "HealthCheckPassed"
43+
EventReasonHealthCheckFailed = "HealthCheckFailed"
44+
45+
// Write service events
46+
EventReasonUpdatedWriteService = "UpdatedWriteService"
47+
48+
// Reconcile events
49+
EventReasonReconcileError = "ReconcileError"
50+
)

internal/controller/memgraphcluster_controller.go

Lines changed: 74 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,17 @@ func (r *MemgraphClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ
100100
// Initialize status if needed
101101
if cluster.Status.Phase == "" {
102102
cluster.Status.Phase = memgraphv1alpha1.ClusterPhasePending
103+
r.Recorder.Event(cluster, corev1.EventTypeNormal, EventReasonClusterCreated,
104+
"Cluster created, starting initialization")
103105
if err := r.Status().Update(ctx, cluster); err != nil {
104106
return ctrl.Result{}, err
105107
}
106108
return ctrl.Result{Requeue: true}, nil
107109
}
108110

111+
// Track previous phase for event emission
112+
previousPhase := cluster.Status.Phase
113+
109114
// Reconcile resources
110115
result, err := r.reconcileResources(ctx, cluster, log)
111116

@@ -114,7 +119,7 @@ func (r *MemgraphClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ
114119
r.metrics.RecordReconcileDuration(cluster.Name, cluster.Namespace, duration)
115120

116121
if err != nil {
117-
r.Recorder.Event(cluster, corev1.EventTypeWarning, "ReconcileError", err.Error())
122+
r.Recorder.Event(cluster, corev1.EventTypeWarning, EventReasonReconcileError, err.Error())
118123
r.metrics.RecordReconcileOperation(cluster.Name, cluster.Namespace, "error")
119124
return result, err
120125
}
@@ -123,6 +128,9 @@ func (r *MemgraphClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ
123128
r.metrics.RecordReconcileOperation(cluster.Name, cluster.Namespace, "success")
124129
r.metrics.RecordClusterPhase(cluster.Name, cluster.Namespace, string(cluster.Status.Phase))
125130

131+
// Emit phase transition events
132+
r.emitPhaseTransitionEvents(cluster, previousPhase)
133+
126134
replicas := cluster.Spec.Replicas
127135
if replicas == 0 {
128136
replicas = 3
@@ -210,7 +218,7 @@ func (r *MemgraphClusterReconciler) reconcileResources(ctx context.Context, clus
210218
} else {
211219
if err := r.replicationManager.ConfigureReplication(ctx, cluster, pods, writeInstance, log); err != nil {
212220
log.Error("failed to configure replication", zap.Error(err))
213-
r.Recorder.Event(cluster, corev1.EventTypeWarning, "ReplicationError",
221+
r.Recorder.Event(cluster, corev1.EventTypeWarning, EventReasonReplicationError,
214222
fmt.Sprintf("Failed to configure replication: %v", err))
215223
} else {
216224
health, err := r.replicationManager.CheckReplicationHealth(ctx, cluster, writeInstance, log)
@@ -240,7 +248,10 @@ func (r *MemgraphClusterReconciler) reconcileResources(ctx context.Context, clus
240248
}
241249
}
242250

243-
// 11. Update status
251+
// 11. Collect storage metrics from all running pods
252+
r.collectStorageMetrics(ctx, cluster, pods, writeInstance, log)
253+
254+
// 12. Update status
244255
if err := r.updateStatus(ctx, cluster, pods, writeInstance, registeredReplicas); err != nil {
245256
return ctrl.Result{}, err
246257
}
@@ -272,7 +283,7 @@ func (r *MemgraphClusterReconciler) ensureReplicationManager() error {
272283
return fmt.Errorf("failed to create memgraph client: %w", err)
273284
}
274285

275-
r.replicationManager = NewReplicationManager(mgClient)
286+
r.replicationManager = NewReplicationManager(mgClient, r.Recorder)
276287
return nil
277288
}
278289

@@ -311,7 +322,7 @@ func (r *MemgraphClusterReconciler) reconcileStatefulSet(ctx context.Context, cl
311322
log.Info("creating StatefulSet",
312323
zap.String("statefulset", desired.Name),
313324
zap.Int32("replicas", *desired.Spec.Replicas))
314-
r.Recorder.Event(cluster, corev1.EventTypeNormal, "CreatingStatefulSet",
325+
r.Recorder.Event(cluster, corev1.EventTypeNormal, EventReasonCreatingStatefulSet,
315326
fmt.Sprintf("Creating StatefulSet %s with %d replicas", desired.Name, *desired.Spec.Replicas))
316327
return r.Create(ctx, desired)
317328
}
@@ -325,7 +336,7 @@ func (r *MemgraphClusterReconciler) reconcileStatefulSet(ctx context.Context, cl
325336
zap.Int32("currentReplicas", *existing.Spec.Replicas),
326337
zap.Int32("desiredReplicas", *desired.Spec.Replicas))
327338
existing.Spec.Replicas = desired.Spec.Replicas
328-
r.Recorder.Event(cluster, corev1.EventTypeNormal, "ScalingStatefulSet",
339+
r.Recorder.Event(cluster, corev1.EventTypeNormal, EventReasonScalingStatefulSet,
329340
fmt.Sprintf("Scaling StatefulSet %s to %d replicas", existing.Name, *desired.Spec.Replicas))
330341
return r.Update(ctx, existing)
331342
}
@@ -360,7 +371,7 @@ func (r *MemgraphClusterReconciler) reconcileWriteService(ctx context.Context, c
360371
zap.String("previousInstance", currentWriteInstance),
361372
zap.String("newInstance", writeInstance))
362373
existing.Spec.Selector = desired.Spec.Selector
363-
r.Recorder.Event(cluster, corev1.EventTypeNormal, "UpdatedWriteService",
374+
r.Recorder.Event(cluster, corev1.EventTypeNormal, EventReasonUpdatedWriteService,
364375
fmt.Sprintf("Write service now pointing to %s", writeInstance))
365376
return r.Update(ctx, existing)
366377
}
@@ -558,6 +569,62 @@ func conditionMessage(ok bool, trueMsg, falseMsg string) string {
558569
return falseMsg
559570
}
560571

572+
// collectStorageMetrics collects storage metrics from all running pods
573+
func (r *MemgraphClusterReconciler) collectStorageMetrics(
574+
ctx context.Context,
575+
cluster *memgraphv1alpha1.MemgraphCluster,
576+
pods []corev1.Pod,
577+
writeInstance string,
578+
log *zap.Logger,
579+
) {
580+
if r.replicationManager == nil || r.replicationManager.Client() == nil {
581+
return
582+
}
583+
584+
mgClient := r.replicationManager.Client()
585+
for _, pod := range pods {
586+
if !isPodReady(&pod) {
587+
continue
588+
}
589+
590+
role := "replica"
591+
if pod.Name == writeInstance {
592+
role = "main"
593+
}
594+
595+
info, err := mgClient.GetStorageInfo(ctx, cluster.Namespace, pod.Name)
596+
if err != nil {
597+
log.Debug("failed to collect storage metrics",
598+
zap.String("pod", pod.Name),
599+
zap.Error(err))
600+
continue
601+
}
602+
603+
r.metrics.RecordStorageInfo(cluster.Name, cluster.Namespace, pod.Name, role, info)
604+
}
605+
}
606+
607+
// emitPhaseTransitionEvents emits events when cluster phase changes
608+
func (r *MemgraphClusterReconciler) emitPhaseTransitionEvents(
609+
cluster *memgraphv1alpha1.MemgraphCluster,
610+
previousPhase memgraphv1alpha1.ClusterPhase,
611+
) {
612+
currentPhase := cluster.Status.Phase
613+
if currentPhase == previousPhase {
614+
return
615+
}
616+
617+
switch {
618+
case currentPhase == memgraphv1alpha1.ClusterPhaseRunning:
619+
r.Recorder.Event(cluster, corev1.EventTypeNormal, EventReasonClusterReady,
620+
fmt.Sprintf("Cluster is ready with %d instances", cluster.Status.ReadyInstances))
621+
case previousPhase == memgraphv1alpha1.ClusterPhaseRunning &&
622+
currentPhase == memgraphv1alpha1.ClusterPhaseInitializing:
623+
r.Recorder.Event(cluster, corev1.EventTypeWarning, EventReasonClusterDegraded,
624+
fmt.Sprintf("Cluster degraded: %d instances ready", cluster.Status.ReadyInstances))
625+
}
626+
}
627+
561628
// SetupWithManager sets up the controller with the Manager.
562629
func (r *MemgraphClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
563630
return ctrl.NewControllerManagedBy(mgr).

internal/controller/metrics.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package controller
44

55
import (
6+
"github.com/base14/memgraph-operator/internal/memgraph"
67
"github.com/prometheus/client_golang/prometheus"
78
"sigs.k8s.io/controller-runtime/pkg/metrics"
89
)
@@ -127,6 +128,79 @@ var (
127128
},
128129
[]string{"cluster", "namespace"},
129130
)
131+
132+
// Memgraph storage metrics (from SHOW STORAGE INFO)
133+
storageVertexCountGauge = prometheus.NewGaugeVec(
134+
prometheus.GaugeOpts{
135+
Name: "memgraph_storage_vertex_count",
136+
Help: "Number of vertices in the database",
137+
},
138+
[]string{"cluster", "namespace", "instance", "role"},
139+
)
140+
141+
storageEdgeCountGauge = prometheus.NewGaugeVec(
142+
prometheus.GaugeOpts{
143+
Name: "memgraph_storage_edge_count",
144+
Help: "Number of edges in the database",
145+
},
146+
[]string{"cluster", "namespace", "instance", "role"},
147+
)
148+
149+
storageAverageDegreeGauge = prometheus.NewGaugeVec(
150+
prometheus.GaugeOpts{
151+
Name: "memgraph_storage_average_degree",
152+
Help: "Average degree of vertices in the database",
153+
},
154+
[]string{"cluster", "namespace", "instance", "role"},
155+
)
156+
157+
storageMemoryResGauge = prometheus.NewGaugeVec(
158+
prometheus.GaugeOpts{
159+
Name: "memgraph_storage_memory_resident_bytes",
160+
Help: "Current resident memory usage in bytes",
161+
},
162+
[]string{"cluster", "namespace", "instance", "role"},
163+
)
164+
165+
storagePeakMemoryResGauge = prometheus.NewGaugeVec(
166+
prometheus.GaugeOpts{
167+
Name: "memgraph_storage_memory_peak_bytes",
168+
Help: "Peak resident memory usage in bytes",
169+
},
170+
[]string{"cluster", "namespace", "instance", "role"},
171+
)
172+
173+
storageDiskUsageGauge = prometheus.NewGaugeVec(
174+
prometheus.GaugeOpts{
175+
Name: "memgraph_storage_disk_usage_bytes",
176+
Help: "Disk space consumed in bytes",
177+
},
178+
[]string{"cluster", "namespace", "instance", "role"},
179+
)
180+
181+
storageMemoryTrackedGauge = prometheus.NewGaugeVec(
182+
prometheus.GaugeOpts{
183+
Name: "memgraph_storage_memory_tracked_bytes",
184+
Help: "Actively tracked memory allocation in bytes",
185+
},
186+
[]string{"cluster", "namespace", "instance", "role"},
187+
)
188+
189+
storageAllocationLimitGauge = prometheus.NewGaugeVec(
190+
prometheus.GaugeOpts{
191+
Name: "memgraph_storage_allocation_limit_bytes",
192+
Help: "Maximum memory allocation limit in bytes",
193+
},
194+
[]string{"cluster", "namespace", "instance", "role"},
195+
)
196+
197+
storageUnreleasedDeltaObjectsGauge = prometheus.NewGaugeVec(
198+
prometheus.GaugeOpts{
199+
Name: "memgraph_storage_unreleased_delta_objects",
200+
Help: "Count of delta objects awaiting cleanup",
201+
},
202+
[]string{"cluster", "namespace", "instance", "role"},
203+
)
130204
)
131205

132206
func init() {
@@ -146,6 +220,16 @@ func init() {
146220
failoverEventsTotal,
147221
validationLastRunTimestamp,
148222
validationPassedGauge,
223+
// Storage metrics
224+
storageVertexCountGauge,
225+
storageEdgeCountGauge,
226+
storageAverageDegreeGauge,
227+
storageMemoryResGauge,
228+
storagePeakMemoryResGauge,
229+
storageDiskUsageGauge,
230+
storageMemoryTrackedGauge,
231+
storageAllocationLimitGauge,
232+
storageUnreleasedDeltaObjectsGauge,
149233
)
150234
}
151235

@@ -235,6 +319,36 @@ func (m *MetricsRecorder) RecordValidation(cluster, namespace string, timestamp
235319
validationPassedGauge.WithLabelValues(cluster, namespace).Set(passedValue)
236320
}
237321

322+
// RecordStorageInfo records storage metrics from SHOW STORAGE INFO
323+
func (m *MetricsRecorder) RecordStorageInfo(cluster, namespace, instance, role string, info *memgraph.StorageInfo) {
324+
if info == nil {
325+
return
326+
}
327+
storageVertexCountGauge.WithLabelValues(cluster, namespace, instance, role).Set(float64(info.VertexCount))
328+
storageEdgeCountGauge.WithLabelValues(cluster, namespace, instance, role).Set(float64(info.EdgeCount))
329+
storageAverageDegreeGauge.WithLabelValues(cluster, namespace, instance, role).Set(info.AverageDegree)
330+
storageMemoryResGauge.WithLabelValues(cluster, namespace, instance, role).Set(float64(info.MemoryRes))
331+
storagePeakMemoryResGauge.WithLabelValues(cluster, namespace, instance, role).Set(float64(info.PeakMemoryRes))
332+
storageDiskUsageGauge.WithLabelValues(cluster, namespace, instance, role).Set(float64(info.DiskUsage))
333+
storageMemoryTrackedGauge.WithLabelValues(cluster, namespace, instance, role).Set(float64(info.MemoryTracked))
334+
storageAllocationLimitGauge.WithLabelValues(cluster, namespace, instance, role).Set(float64(info.AllocationLimit))
335+
storageUnreleasedDeltaObjectsGauge.WithLabelValues(cluster, namespace, instance, role).
336+
Set(float64(info.UnreleasedDeltaObjects))
337+
}
338+
339+
// DeleteInstanceStorageMetrics removes storage metrics for a specific instance
340+
func (m *MetricsRecorder) DeleteInstanceStorageMetrics(cluster, namespace, instance, role string) {
341+
storageVertexCountGauge.DeleteLabelValues(cluster, namespace, instance, role)
342+
storageEdgeCountGauge.DeleteLabelValues(cluster, namespace, instance, role)
343+
storageAverageDegreeGauge.DeleteLabelValues(cluster, namespace, instance, role)
344+
storageMemoryResGauge.DeleteLabelValues(cluster, namespace, instance, role)
345+
storagePeakMemoryResGauge.DeleteLabelValues(cluster, namespace, instance, role)
346+
storageDiskUsageGauge.DeleteLabelValues(cluster, namespace, instance, role)
347+
storageMemoryTrackedGauge.DeleteLabelValues(cluster, namespace, instance, role)
348+
storageAllocationLimitGauge.DeleteLabelValues(cluster, namespace, instance, role)
349+
storageUnreleasedDeltaObjectsGauge.DeleteLabelValues(cluster, namespace, instance, role)
350+
}
351+
238352
// DeleteClusterMetrics removes metrics for a deleted cluster
239353
func (m *MetricsRecorder) DeleteClusterMetrics(cluster, namespace string) {
240354
clusterPhaseGauge.DeleteLabelValues(cluster, namespace)

0 commit comments

Comments
 (0)