Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions .github/workflows/node-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ jobs:
- name: Checkout Kubernetes
uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
with:
repository: kubernetes/kubernetes
repository: dims/kubernetes
ref: add-logs-to-kubelet-metrics
path: src/k8s.io/kubernetes

- name: Install Go
Expand Down Expand Up @@ -88,12 +89,10 @@ jobs:
- name: Run Node E2E Tests
working-directory: ./src/k8s.io/kubernetes
run: |
# TODO: Remove "ResourceMetrics" feature skip once containerd supports ListMetricDescriptors:
# https://github.com/kubernetes/kubernetes/blob/v1.33.2/test/e2e_node/resource_metrics_test.go#L67-L71
sudo make test-e2e-node \
FOCUS='\[NodeConformance\]|\[Feature:.+\]|\[Feature\]' \
SKIP='\[Flaky\]|\[Slow\]|\[Serial\]|\[Feature:UserNamespacesSupport\]|\[Feature:PodLifecycleSleepActionAllowZero\]|\[Feature:UserNamespacesPodSecurityStandards\]|\[Feature:KubeletCredentialProviders\]|\[Feature:LockContention\]|\[Feature:SCTPConnectivity\]|\[Feature:ResourceMetrics\]|\[Alpha\]' \
TEST_ARGS='--kubelet-flags="--cgroup-driver=systemd --cgroups-per-qos=true --cgroup-root=/ --runtime-cgroups=/system.slice/containerd.service"'
SKIP='\[Flaky\]|\[Slow\]|\[Serial\]|\[Feature:UserNamespacesSupport\]|\[Feature:PodLifecycleSleepActionAllowZero\]|\[Feature:UserNamespacesPodSecurityStandards\]|\[Feature:KubeletCredentialProviders\]|\[Feature:LockContention\]|\[Feature:SCTPConnectivity\]|\[Alpha\]' \
TEST_ARGS='--feature-gates=PodAndContainerStatsFromCRI=true --kubelet-flags="--cgroup-driver=systemd --cgroups-per-qos=true --cgroup-root=/ --runtime-cgroups=/system.slice/containerd.service --feature-gates=PodAndContainerStatsFromCRI=true"'

- name: Collect Logs on Failure
if: failure()
Expand Down
71 changes: 65 additions & 6 deletions internal/cri/server/container_stats_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,11 @@ func (c *criService) toContainerStats(
}

if cs.stats.Cpu != nil && cs.stats.Cpu.UsageCoreNanoSeconds != nil {
// this is a calculated value and should be computed for all OSes
// UsageNanoCores is a calculated value and should be computed for all OSes
nanoUsage, err := c.getUsageNanoCores(cntr.Metadata.ID, false, cs.stats.Cpu.UsageCoreNanoSeconds.Value, time.Unix(0, cs.stats.Cpu.Timestamp))
if err != nil {
// If an error occurred when getting nano cores usage, skip the container
log.G(ctx).Warnf("skipping container %q, failed to get metrics handler: %v", cntr.ID, err.Error())
log.G(ctx).WithError(err).Warnf("failed to get usage nano cores for container %q", cntr.ID)
continue
}
cs.stats.Cpu.UsageNanoCores = &runtime.UInt64Value{Value: nanoUsage}
Expand All @@ -179,6 +179,17 @@ func (c *criService) toCRIContainerStats(css []containerStats) *runtime.ListCont
}

func (c *criService) getUsageNanoCores(containerID string, isSandbox bool, currentUsageCoreNanoSeconds uint64, currentTimestamp time.Time) (uint64, error) {
// First, try to get pre-calculated UsageNanoCores from the background stats collector.
// This ensures we have valid data even on the first query (as the collector runs
// continuously in the background, similar to cAdvisor's housekeeping).
if c.statsCollector != nil {
if usageNanoCores, ok := c.statsCollector.GetUsageNanoCores(containerID); ok {
return usageNanoCores, nil
}
}

// Fall back to the original implementation if the collector doesn't have data.
// This can happen for newly created containers that haven't been collected yet.
var oldStats *stats.ContainerStats

if isSandbox {
Expand Down Expand Up @@ -424,9 +435,9 @@ func (c *criService) linuxContainerMetrics(
return containerStats{}, fmt.Errorf("failed to obtain memory stats: %w", err)
}
cs.Memory = memoryStats
if err != nil {
return containerStats{}, fmt.Errorf("failed to obtain pid count: %w", err)
}

// IO stats (only has PSI for cgroupv2)
cs.Io = c.ioContainerStats(metrics, protobuf.FromTimestamp(stats.Timestamp))

return containerStats{&cs, pids}, nil
}
Expand Down Expand Up @@ -482,6 +493,36 @@ func getAvailableBytesV2(memory *cg2.MemoryStat, workingSetBytes uint64) uint64
return 0
}

// convertCg2PSIToCRI converts cgroupv2 PSIStats to CRI PsiStats.
// cgroupv2 PSI Total is in microseconds, CRI expects nanoseconds.
func convertCg2PSIToCRI(psi *cg2.PSIStats) *runtime.PsiStats {
if psi == nil {
return nil
}

result := &runtime.PsiStats{}

if psi.Full != nil {
result.Full = &runtime.PsiData{
Total: psi.Full.Total * 1000, // convert microseconds to nanoseconds
Avg10: psi.Full.Avg10,
Avg60: psi.Full.Avg60,
Avg300: psi.Full.Avg300,
}
}

if psi.Some != nil {
result.Some = &runtime.PsiData{
Total: psi.Some.Total * 1000, // convert microseconds to nanoseconds
Avg10: psi.Some.Avg10,
Avg60: psi.Some.Avg60,
Avg300: psi.Some.Avg300,
}
}

return result
}

func (c *criService) cpuContainerStats(ID string, isSandbox bool, stats cgroupMetrics, timestamp time.Time) (*runtime.CpuUsage, error) {
switch {
case stats.v1 != nil:
Expand All @@ -498,10 +539,10 @@ func (c *criService) cpuContainerStats(ID string, isSandbox bool, stats cgroupMe
if metrics.CPU != nil {
// convert to nano seconds
usageCoreNanoSeconds := metrics.CPU.UsageUsec * 1000

return &runtime.CpuUsage{
Timestamp: timestamp.UnixNano(),
UsageCoreNanoSeconds: &runtime.UInt64Value{Value: usageCoreNanoSeconds},
Psi: convertCg2PSIToCRI(metrics.CPU.PSI),
}, nil
}
}
Expand Down Expand Up @@ -544,8 +585,26 @@ func (c *criService) memoryContainerStats(ID string, stats cgroupMetrics, timest
RssBytes: &runtime.UInt64Value{Value: metrics.Memory.Anon},
PageFaults: &runtime.UInt64Value{Value: metrics.Memory.Pgfault},
MajorPageFaults: &runtime.UInt64Value{Value: metrics.Memory.Pgmajfault},
Psi: convertCg2PSIToCRI(metrics.Memory.PSI),
}, nil
}
}
return nil, nil
}

func (c *criService) ioContainerStats(stats cgroupMetrics, timestamp time.Time) *runtime.IoUsage {
switch {
case stats.v1 != nil:
// cgroupv1 doesn't have IO PSI stats
return nil
case stats.v2 != nil:
metrics := stats.v2
if metrics.Io != nil && metrics.Io.PSI != nil {
return &runtime.IoUsage{
Timestamp: timestamp.UnixNano(),
Psi: convertCg2PSIToCRI(metrics.Io.PSI),
}
}
}
return nil
}
24 changes: 17 additions & 7 deletions internal/cri/server/sandbox_stats_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ func (c *criService) podSandboxStats(
if err != nil {
return nil, fmt.Errorf("failed to obtain cpu stats: %w", err)
}
if cpuStats != nil && cpuStats.UsageCoreNanoSeconds != nil {
nanoUsage, err := c.getUsageNanoCores(meta.ID, true /* isSandbox */, cpuStats.UsageCoreNanoSeconds.Value, timestamp)
if err != nil {
return nil, fmt.Errorf("failed to get usage nano cores: %w", err)
}
cpuStats.UsageNanoCores = &runtime.UInt64Value{Value: nanoUsage}
}
podSandboxStats.Linux.Cpu = cpuStats

memoryStats, err := c.memoryContainerStats(meta.ID, *stats, timestamp)
Expand All @@ -76,14 +83,17 @@ func (c *criService) podSandboxStats(
if err != nil {
return nil, fmt.Errorf("failed to obtain network stats: %w", err)
}
defaultInterface := &runtime.NetworkInterfaceUsage{
Name: defaultIfName,
RxBytes: &runtime.UInt64Value{Value: linkStats.RxBytes},
RxErrors: &runtime.UInt64Value{Value: linkStats.RxErrors},
TxBytes: &runtime.UInt64Value{Value: linkStats.TxBytes},
TxErrors: &runtime.UInt64Value{Value: linkStats.TxErrors},
}
podSandboxStats.Linux.Network = &runtime.NetworkUsage{
DefaultInterface: &runtime.NetworkInterfaceUsage{
Name: defaultIfName,
RxBytes: &runtime.UInt64Value{Value: linkStats.RxBytes},
RxErrors: &runtime.UInt64Value{Value: linkStats.RxErrors},
TxBytes: &runtime.UInt64Value{Value: linkStats.TxBytes},
TxErrors: &runtime.UInt64Value{Value: linkStats.TxErrors},
},
Timestamp: timestamp.UnixNano(),
DefaultInterface: defaultInterface,
Interfaces: []*runtime.NetworkInterfaceUsage{defaultInterface},
}
}

Expand Down
10 changes: 10 additions & 0 deletions internal/cri/server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ type criService struct {
runtimeHandlers map[string]*runtime.RuntimeHandler
// runtimeFeatures container runtime features info
runtimeFeatures *runtime.RuntimeFeatures
// statsCollector collects CPU stats in background for UsageNanoCores calculation
statsCollector *StatsCollector
}

type CRIServiceOptions struct {
Expand Down Expand Up @@ -265,6 +267,11 @@ func (c *criService) Run(ready func()) error {
// then you have to manually filter namespace foo
c.eventMonitor.Subscribe(c.client, []string{`topic=="/tasks/oom"`, `topic~="/images/"`})

// Start the background stats collector for UsageNanoCores calculation
log.L.Info("Start stats collector")
c.statsCollector = NewStatsCollector(c)
c.statsCollector.Start()

log.L.Infof("Start recovering state")
if err := c.recover(ctrdutil.NamespacedContext()); err != nil {
return fmt.Errorf("failed to recover state: %w", err)
Expand Down Expand Up @@ -359,6 +366,9 @@ func (c *criService) Close() error {
}
}
c.eventMonitor.Stop()
if c.statsCollector != nil {
c.statsCollector.Stop()
}
if err := c.streamServer.Stop(); err != nil {
return fmt.Errorf("failed to stop stream server: %w", err)
}
Expand Down
Loading
Loading