@@ -18,6 +18,7 @@ package malachite
1818
1919import (
2020 "context"
21+ "fmt"
2122 "math"
2223 "strconv"
2324 "strings"
@@ -38,6 +39,7 @@ import (
3839 "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common"
3940 "github.com/kubewharf/katalyst-core/pkg/util/general"
4041 "github.com/kubewharf/katalyst-core/pkg/util/machine"
42+ "github.com/kubewharf/katalyst-core/pkg/util/metric"
4143 utilmetric "github.com/kubewharf/katalyst-core/pkg/util/metric"
4244)
4345
@@ -412,6 +414,7 @@ func (m *MalachiteMetricsProvisioner) processSystemNetData(systemNetData *malach
412414 if systemNetData == nil {
413415 return
414416 }
417+
415418 // todo, currently we only get a unified data for the whole system io data
416419 updateTime := time .Unix (systemNetData .UpdateTime , 0 )
417420
@@ -433,6 +436,8 @@ func (m *MalachiteMetricsProvisioner) processSystemNetData(systemNetData *malach
433436 utilmetric.MetricData {Value : float64 (systemNetData .TCP .TCPOutSegs ), Time : & updateTime })
434437 m .metricStore .SetNodeMetric (consts .MetricNetTcpCloseWait ,
435438 utilmetric.MetricData {Value : float64 (systemNetData .TCP .TCPCloseWait ), Time : & updateTime })
439+ m .metricStore .SetNodeMetric (consts .MetricNetUpdateTime ,
440+ utilmetric.MetricData {Value : float64 (systemNetData .UpdateTime ), Time : & updateTime })
436441
437442 for _ , device := range systemNetData .NetworkCard {
438443 // for now, we will only consider standard network interface
@@ -441,6 +446,14 @@ func (m *MalachiteMetricsProvisioner) processSystemNetData(systemNetData *malach
441446 continue
442447 }
443448
449+ errs := []error {}
450+ // setNetworkRateMetric will use metricStore.GetNetworkMetric to get previous round metric,
451+ // we should call setNetworkRateMetric before calling SetNetworkMetric
452+ errs = append (errs , m .setNetworkRateMetric (device .Name , consts .MetricNetTransmitBPS ,
453+ consts .MetricNetTransmitBytes , float64 (device .TransmitBytes ), & updateTime ))
454+ errs = append (errs , m .setNetworkRateMetric (device .Name , consts .MetricNetReceiveBPS ,
455+ consts .MetricNetReceiveBytes , float64 (device .ReceiveBytes ), & updateTime ))
456+
444457 m .metricStore .SetNetworkMetric (device .Name , consts .MetricNetReceiveBytes ,
445458 utilmetric.MetricData {Value : float64 (device .ReceiveBytes ), Time : & updateTime })
446459 m .metricStore .SetNetworkMetric (device .Name , consts .MetricNetReceivePackets ,
@@ -474,7 +487,52 @@ func (m *MalachiteMetricsProvisioner) processSystemNetData(systemNetData *malach
474487 m .metricStore .SetNetworkMetric (device .Name , consts .MetricNetTransmitCompressed ,
475488 utilmetric.MetricData {Value : float64 (device .TransmitCompressed ), Time : & updateTime })
476489
490+ if device .Speeds != nil {
491+ m .metricStore .SetNetworkMetric (device .Name , consts .MetricNetSpeed ,
492+ utilmetric.MetricData {Value : float64 (* device .Speeds ), Time : & updateTime })
493+ }
494+
495+ aggErrs := errors .NewAggregate (errs )
496+
497+ if aggErrs != nil {
498+ general .Warningf ("set network metrics for: %s got errors: %s" , device .Name , aggErrs .Error ())
499+ }
500+ }
501+ }
502+
503+ func (m * MalachiteMetricsProvisioner ) setNetworkRateMetric (deviceName ,
504+ rateMetricName , valueMetricName string ,
505+ curValue float64 ,
506+ curUpdateTime * time.Time ,
507+ ) error {
508+ lastMetric , err := m .metricStore .GetNetworkMetric (deviceName , valueMetricName )
509+ if err != nil {
510+ return fmt .Errorf ("get value metric: %s for %s failed with err: %v" ,
511+ valueMetricName , rateMetricName , err )
512+ }
513+
514+ lastValue := lastMetric .Value
515+ lastUpdateTime := lastMetric .Time
516+
517+ if curUpdateTime == nil || lastUpdateTime == nil || curUpdateTime .Unix () == 0 || lastUpdateTime .Unix () == 0 {
518+ return fmt .Errorf ("nil curUpdateTime or lastUpdateTime for rateMetricName: %s" , rateMetricName )
519+ }
520+
521+ timeDeltaInSec := curUpdateTime .Sub (* lastUpdateTime ).Seconds ()
522+
523+ if timeDeltaInSec <= 0 {
524+ return fmt .Errorf ("invalid timeDelta: %.2f" , timeDeltaInSec )
477525 }
526+
527+ if (curValue > lastValue ) && (curValue != 0 ) && (lastValue != 0 ) {
528+ m .metricStore .SetNetworkMetric (deviceName , rateMetricName ,
529+ metric.MetricData {Value : (curValue - lastValue ) / timeDeltaInSec , Time : curUpdateTime })
530+ } else {
531+ return fmt .Errorf ("invalid curValue: %.2f, lastValue: %.2f for rateMetricName: %s" ,
532+ curValue , lastValue , rateMetricName )
533+ }
534+
535+ return nil
478536}
479537
480538func (m * MalachiteMetricsProvisioner ) processSystemNumaData (systemMemoryData * malachitetypes.SystemMemoryData , systemComputeData * malachitetypes.SystemComputeData ) {
0 commit comments