Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions charts/kubeflow-trainer/templates/manager/service-monitor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
{{- /*
Copyright 2024 The Kubeflow authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/ -}}

{{- if .Values.manager.metrics.serviceMonitor.enabled }}
{{- if not (.Capabilities.APIVersions.Has "monitoring.coreos.com/v1/ServiceMonitor") -}}
{{- fail "ServiceMonitor requires the monitoring.coreos.com/v1 CRD (Prometheus Operator). Install Prometheus Operator first or set manager.metrics.serviceMonitor.enabled=false." -}}
{{- end }}
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: {{ include "trainer.manager.service.name" . }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "trainer.manager.labels" . | nindent 4 }}
{{- with .Values.manager.metrics.serviceMonitor.additionalLabels }}
{{- toYaml . | nindent 4 }}
{{- end }}
spec:
selector:
matchLabels:
{{- include "trainer.manager.selectorLabels" . | nindent 6 }}
namespaceSelector:
matchNames:
- {{ .Release.Namespace }}
endpoints:
- port: monitoring-port
path: /metrics
scheme: https
interval: {{ .Values.manager.metrics.serviceMonitor.interval | default "30s" }}
scrapeTimeout: {{ .Values.manager.metrics.serviceMonitor.scrapeTimeout | default "10s" }}
bearerTokenFile: /var/run/secrets/kubernetes.io/serviceaccount/token
tlsConfig:
{{- if .Values.manager.metrics.serviceMonitor.tlsConfig }}
{{- toYaml .Values.manager.metrics.serviceMonitor.tlsConfig | nindent 8 }}
{{- else }}
insecureSkipVerify: true
{{- end }}
{{- end }}
18 changes: 18 additions & 0 deletions charts/kubeflow-trainer/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,24 @@ manager:
seccompProfile:
type: RuntimeDefault

# -- Prometheus metrics configuration.
metrics:
serviceMonitor:
# -- Whether to create a Prometheus Operator ServiceMonitor for /metrics.
# Requires the monitoring.coreos.com/v1 CRDs (Prometheus Operator) installed.
enabled: false
# -- Scrape interval.
interval: 30s
# -- Scrape timeout.
scrapeTimeout: 10s
# -- Extra labels added to the ServiceMonitor metadata (e.g. release: prometheus)
# so that the Prometheus instance selects this monitor.
additionalLabels: {}
# -- TLS config for the metrics endpoint.
# Defaults to insecureSkipVerify: true so the controller-manager's self-signed CA
# does not block scrapes. Override with caFile/serverName for production.
tlsConfig: {}

# -- Controller manager configuration.
# This configuration is used to generate the ConfigMap for the controller manager.
config:
Expand Down
13 changes: 13 additions & 0 deletions cmd/trainer-controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"flag"
"net/http"
"os"
goruntime "runtime"

zaplog "go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -41,10 +42,12 @@ import (
"github.com/kubeflow/trainer/v2/pkg/config"
"github.com/kubeflow/trainer/v2/pkg/controller"
"github.com/kubeflow/trainer/v2/pkg/features"
"github.com/kubeflow/trainer/v2/pkg/metrics"
"github.com/kubeflow/trainer/v2/pkg/runtime"
runtimecore "github.com/kubeflow/trainer/v2/pkg/runtime/core"
"github.com/kubeflow/trainer/v2/pkg/statusserver"
"github.com/kubeflow/trainer/v2/pkg/util/cert"
"github.com/kubeflow/trainer/v2/pkg/version"
"github.com/kubeflow/trainer/v2/pkg/webhooks"
)

Expand Down Expand Up @@ -125,6 +128,16 @@ func main() {
os.Exit(1)
}

metrics.Register()
metrics.BuildInfo.WithLabelValues(
version.GitVersion,
version.GitCommit,
version.BuildDate,
goruntime.Version(),
goruntime.Compiler,
goruntime.GOOS+"/"+goruntime.GOARCH,
).Set(1)

certsReady := make(chan struct{})
if config.IsCertManagementEnabled(&cfg) {
setupLog.Info("Setting up certificate management")
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/onsi/ginkgo/v2 v2.28.1
github.com/onsi/gomega v1.39.1
github.com/open-policy-agent/cert-controller v0.16.0
github.com/prometheus/client_golang v1.23.2
go.uber.org/zap v1.27.1
golang.org/x/crypto v0.48.0
k8s.io/api v0.35.2
Expand Down Expand Up @@ -54,6 +55,7 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/mailru/easyjson v0.9.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand All @@ -62,7 +64,6 @@ require (
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.23.2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.66.1 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions manifests/base/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ apiVersion: v1
kind: Service
metadata:
name: kubeflow-trainer-controller-manager
labels:
app.kubernetes.io/name: trainer
app.kubernetes.io/component: manager
app.kubernetes.io/part-of: kubeflow
spec:
ports:
- name: monitoring-port
Expand Down
4 changes: 4 additions & 0 deletions manifests/overlays/monitoring/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Monitoring overlay — apply this only if Prometheus Operator (monitoring.coreos.com/v1) is installed.
resources:
- ../../base/manager
- service_monitor.yaml
35 changes: 35 additions & 0 deletions manifests/overlays/monitoring/service_monitor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright 2024 The Kubeflow Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ServiceMonitor for the Kubeflow Trainer controller-manager metrics endpoint.
# Requires the Prometheus Operator (monitoring.coreos.com/v1) to be installed.
# Apply this overlay only if Prometheus Operator is available in the cluster.
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: kubeflow-trainer-controller-manager
namespace: kubeflow
spec:
selector:
matchLabels:
app.kubernetes.io/component: manager
Comment thread
1Ayush-Petwal marked this conversation as resolved.
endpoints:
- port: monitoring-port
path: /metrics
scheme: https
interval: 30s
scrapeTimeout: 10s
bearerTokenFile: /var/run/secrets/kubernetes.io/serviceaccount/token
tlsConfig:
insecureSkipVerify: true
60 changes: 58 additions & 2 deletions pkg/controller/trainjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (

trainer "github.com/kubeflow/trainer/v2/pkg/apis/trainer/v1alpha1"
"github.com/kubeflow/trainer/v2/pkg/constants"
"github.com/kubeflow/trainer/v2/pkg/metrics"
jobruntimes "github.com/kubeflow/trainer/v2/pkg/runtime"
"github.com/kubeflow/trainer/v2/pkg/util/trainjob"
)
Expand Down Expand Up @@ -98,6 +99,10 @@ func NewTrainJobReconciler(client client.Client, recorder events.EventRecorder,
// +kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=create;get;list;update

func (r *TrainJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
start := time.Now()
reconcileResult := "success"
defer func() { metrics.ObserveReconcile("trainjob_controller", reconcileResult, time.Since(start)) }()

var trainJob trainer.TrainJob
if err := r.client.Get(ctx, req.NamespacedName, &trainJob); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
Expand Down Expand Up @@ -136,21 +141,70 @@ func (r *TrainJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c

setSuspendedCondition(&trainJob)

// Collect pending lifecycle metric callbacks; fired only after a successful status patch
// to prevent double-counting when the patch fails and the next reconcile re-detects the transition.
var pendingMetrics []func()

// Detect Suspended: False → True transitions.
prevSuspendedCond := meta.FindStatusCondition(prevTrainJob.Status.Conditions, trainer.TrainJobSuspended)
currSuspendedCond := meta.FindStatusCondition(trainJob.Status.Conditions, trainer.TrainJobSuspended)
if currSuspendedCond != nil && currSuspendedCond.Status == metav1.ConditionTrue &&
(prevSuspendedCond == nil || prevSuspendedCond.Status != metav1.ConditionTrue) {
ns, rk := trainJob.Namespace, metrics.RuntimeKind(&trainJob)
pendingMetrics = append(pendingMetrics, func() { metrics.RecordTrainJobSuspended(ns, rk) })
}

if statusErr := setTrainJobStatus(ctx, runtime, &trainJob); statusErr != nil {
err = errors.Join(err, statusErr)
}

// Detect terminal state transitions.
runtimeKind := metrics.RuntimeKind(&trainJob)
prevCompleteCond := meta.FindStatusCondition(prevTrainJob.Status.Conditions, trainer.TrainJobComplete)
currCompleteCond := meta.FindStatusCondition(trainJob.Status.Conditions, trainer.TrainJobComplete)
if currCompleteCond != nil && currCompleteCond.Status == metav1.ConditionTrue &&
(prevCompleteCond == nil || prevCompleteCond.Status != metav1.ConditionTrue) {
ns, rk := trainJob.Namespace, runtimeKind
dur := currCompleteCond.LastTransitionTime.Sub(trainJob.CreationTimestamp.Time)
pendingMetrics = append(pendingMetrics, func() { metrics.RecordTrainJobCompleted(ns, rk, dur) })
}

prevFailedCond := meta.FindStatusCondition(prevTrainJob.Status.Conditions, trainer.TrainJobFailed)
currFailedCond := meta.FindStatusCondition(trainJob.Status.Conditions, trainer.TrainJobFailed)
if currFailedCond != nil && currFailedCond.Status == metav1.ConditionTrue &&
(prevFailedCond == nil || prevFailedCond.Status != metav1.ConditionTrue) {
ns, rk, reason := trainJob.Namespace, runtimeKind, currFailedCond.Reason
dur := currFailedCond.LastTransitionTime.Sub(trainJob.CreationTimestamp.Time)
pendingMetrics = append(pendingMetrics, func() { metrics.RecordTrainJobFailed(ns, rk, reason, dur) })
}

if err != nil {
reconcileResult = "error"
}

if deadlineResult, deadlineErr := r.reconcileDeadline(ctx, &trainJob); deadlineErr != nil || deadlineResult.RequeueAfter > 0 {
if !equality.Semantic.DeepEqual(&trainJob.Status, &prevTrainJob.Status) {
return deadlineResult, errors.Join(err, r.client.Status().Patch(ctx, &trainJob, client.MergeFrom(prevTrainJob)))
patchErr := r.client.Status().Patch(ctx, &trainJob, client.MergeFrom(prevTrainJob))
if patchErr == nil {
for _, fn := range pendingMetrics {
fn()
}
}
return deadlineResult, errors.Join(err, patchErr)
}
return deadlineResult, errors.Join(err, deadlineErr)
}

if !equality.Semantic.DeepEqual(&trainJob.Status, prevTrainJob.Status) {
// TODO(astefanutti): Consider using SSA once controller-runtime client has SSA support
// for sub-resources. See: https://github.com/kubernetes-sigs/controller-runtime/issues/3183
return ctrl.Result{}, errors.Join(err, r.client.Status().Patch(ctx, &trainJob, client.MergeFrom(prevTrainJob)))
patchErr := r.client.Status().Patch(ctx, &trainJob, client.MergeFrom(prevTrainJob))
if patchErr == nil {
for _, fn := range pendingMetrics {
fn()
}
}
return ctrl.Result{}, errors.Join(err, patchErr)
}
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -208,12 +262,14 @@ func (r *TrainJobReconciler) reconcileDeadline(ctx context.Context, trainJob *tr

func (r *TrainJobReconciler) Create(e event.TypedCreateEvent[*trainer.TrainJob]) bool {
r.log.WithValues("trainJob", klog.KObj(e.Object)).Info("TrainJob create event")
metrics.RecordTrainJobCreated(e.Object.Namespace, metrics.RuntimeKind(e.Object))
defer r.notifyWatchers(nil, e.Object)
return true
}

func (r *TrainJobReconciler) Delete(e event.TypedDeleteEvent[*trainer.TrainJob]) bool {
r.log.WithValues("trainJob", klog.KObj(e.Object)).Info("TrainJob delete event")
metrics.RecordTrainJobDeleted(e.Object.Namespace, metrics.RuntimeKind(e.Object))
defer r.notifyWatchers(e.Object, nil)
return true
}
Expand Down
Loading