Skip to content

NETOBSERV-2186: configure sampling field name in FLP, refined sampling metrics #1482

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apis/flowcollector/v1beta1/flowcollector_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func TestBeta1ConversionRoundtrip_Metrics(t *testing.T) {

expectedDefaultMetrics := []v1beta2.FLPMetric{
"namespace_egress_packets_total",
"namespace_sampling",
"namespace_flows_total",
"namespace_rtt_seconds",
"namespace_drop_packets_total",
Expand Down
28 changes: 14 additions & 14 deletions apis/flowmetrics/v1alpha1/flowmetric_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,18 @@ type FilterMatchType string
type FlowDirection string

const (
CounterMetric MetricType = "Counter"
HistogramMetric MetricType = "Histogram"
// Note: we don't expose gauge on purpose to avoid configuration mistake related to gauge limitation.
// 99% of times, "counter" or "histogram" should be the ones to use. We can eventually revisit later.
MatchEqual FilterMatchType = "Equal"
MatchNotEqual FilterMatchType = "NotEqual"
MatchPresence FilterMatchType = "Presence"
MatchAbsence FilterMatchType = "Absence"
MatchRegex FilterMatchType = "MatchRegex"
MatchNotRegex FilterMatchType = "NotMatchRegex"
Egress FlowDirection = "Egress"
Ingress FlowDirection = "Ingress"
AnyDirection FlowDirection = "Any"
CounterMetric MetricType = "Counter"
GaugeMetric MetricType = "Gauge"
HistogramMetric MetricType = "Histogram"
MatchEqual FilterMatchType = "Equal"
MatchNotEqual FilterMatchType = "NotEqual"
MatchPresence FilterMatchType = "Presence"
MatchAbsence FilterMatchType = "Absence"
MatchRegex FilterMatchType = "MatchRegex"
MatchNotRegex FilterMatchType = "NotMatchRegex"
Egress FlowDirection = "Egress"
Ingress FlowDirection = "Ingress"
AnyDirection FlowDirection = "Any"
)

type MetricFilter struct {
Expand Down Expand Up @@ -65,9 +64,10 @@ type FlowMetricSpec struct {
// +required
MetricName string `json:"metricName"`

// Metric type: "Counter" or "Histogram".
// Metric type: "Counter", "Histogram" or "Gauge".
// Use "Counter" for any value that increases over time and on which you can compute a rate, such as Bytes or Packets.
// Use "Histogram" for any value that must be sampled independently, such as latencies.
// Use "Gauge" for other values that don't necessitate accuracy over time (gauges are sampled only every N seconds when Prometheus fetches the metric).
// +kubebuilder:validation:Enum:="Counter";"Histogram"
// +required
Type MetricType `json:"type"`
Expand Down
3 changes: 2 additions & 1 deletion bundle/manifests/flows.netobserv.io_flowmetrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,10 @@ spec:
type: object
type:
description: |-
Metric type: "Counter" or "Histogram".
Metric type: "Counter", "Histogram" or "Gauge".
Use "Counter" for any value that increases over time and on which you can compute a rate, such as Bytes or Packets.
Use "Histogram" for any value that must be sampled independently, such as latencies.
Use "Gauge" for other values that don't necessitate accuracy over time (gauges are sampled only every N seconds when Prometheus fetches the metric).
enum:
- Counter
- Histogram
Expand Down
3 changes: 2 additions & 1 deletion config/crd/bases/flows.netobserv.io_flowmetrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,10 @@ spec:
type: object
type:
description: |-
Metric type: "Counter" or "Histogram".
Metric type: "Counter", "Histogram" or "Gauge".
Use "Counter" for any value that increases over time and on which you can compute a rate, such as Bytes or Packets.
Use "Histogram" for any value that must be sampled independently, such as latencies.
Use "Gauge" for other values that don't necessitate accuracy over time (gauges are sampled only every N seconds when Prometheus fetches the metric).
enum:
- Counter
- Histogram
Expand Down
14 changes: 9 additions & 5 deletions controllers/flp/flp_pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (b *PipelineBuilder) AddProcessorStages() error {
// Custom filters
filters := filtersToFLP(b.desired.Processor.Filters, flowslatest.FLPFilterTargetAll)
if len(filters) > 0 {
nextStage = nextStage.TransformFilter("filters", api.TransformFilter{Rules: filters})
nextStage = nextStage.TransformFilter("filters", newTransformFilter(filters))
}

// Dedup stage
Expand Down Expand Up @@ -260,7 +260,7 @@ func (b *PipelineBuilder) AddProcessorStages() error {
// Custom filters: Loki only
filters := filtersToFLP(b.desired.Processor.Filters, flowslatest.FLPFilterTargetLoki)
if len(filters) > 0 {
lokiStage = lokiStage.TransformFilter("filters-loki", api.TransformFilter{Rules: filters})
lokiStage = lokiStage.TransformFilter("filters-loki", newTransformFilter(filters))
}

lokiWrite := api.WriteLoki{
Expand Down Expand Up @@ -348,7 +348,7 @@ func (b *PipelineBuilder) AddProcessorStages() error {
// Custom filters: Loki only
filters := filtersToFLP(b.desired.Processor.Filters, flowslatest.FLPFilterTargetMetrics)
if len(filters) > 0 {
promStage = promStage.TransformFilter("filters-prom", api.TransformFilter{Rules: filters})
promStage = promStage.TransformFilter("filters-prom", newTransformFilter(filters))
}

// prometheus stage (encode) configuration
Expand All @@ -363,12 +363,16 @@ func (b *PipelineBuilder) AddProcessorStages() error {
// Custom filters: Exporters only
filters = filtersToFLP(b.desired.Processor.Filters, flowslatest.FLPFilterTargetExporters)
if len(filters) > 0 {
expStage = expStage.TransformFilter("filters-exp", api.TransformFilter{Rules: filters})
expStage = expStage.TransformFilter("filters-exp", newTransformFilter(filters))
}
err := b.addCustomExportStages(&expStage, flpMetrics)
return err
}

func newTransformFilter(rules []api.TransformFilterRule) api.TransformFilter {
return api.TransformFilter{Rules: rules, SamplingField: "Sampling"}
}

func filtersToFLP(in []flowslatest.FLPFilterSet, target flowslatest.FLPFilterTarget) []api.TransformFilterRule {
var rules []api.TransformFilterRule
for _, f := range in {
Expand Down Expand Up @@ -576,7 +580,7 @@ func (b *PipelineBuilder) addTransformFilter(lastStage config.PipelineBuilderSta
clusterName = b.desired.Processor.ClusterName
} else {
// Take clustername from openshift
clusterName = string(b.clusterID)
clusterName = b.clusterID
}
if clusterName != "" {
transformFilterRules = []api.TransformFilterRule{
Expand Down
4 changes: 2 additions & 2 deletions controllers/flp/flp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,8 @@ func TestDeploymentChanged(t *testing.T) {
second := b.deployment(annotate(digest))

report := helper.NewChangeReport("")
checkChanged := func(old, new *appsv1.Deployment, spec flowslatest.FlowCollectorSpec) bool {
return helper.DeploymentChanged(old, new, constants.FLPName, !helper.HPAEnabled(&spec.Processor.KafkaConsumerAutoscaler), *spec.Processor.KafkaConsumerReplicas, &report)
checkChanged := func(old, newd *appsv1.Deployment, spec flowslatest.FlowCollectorSpec) bool {
return helper.DeploymentChanged(old, newd, constants.FLPName, !helper.HPAEnabled(&spec.Processor.KafkaConsumerAutoscaler), *spec.Processor.KafkaConsumerReplicas, &report)
}

assert.True(checkChanged(first, second, cfg))
Expand Down
5 changes: 3 additions & 2 deletions docs/FlowMetric.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,10 @@ To check the cardinality of all NetObserv metrics, run as `promql`: `count({__na
<td><b>type</b></td>
<td>enum</td>
<td>
Metric type: "Counter" or "Histogram".
Metric type: "Counter", "Histogram" or "Gauge".
Use "Counter" for any value that increases over time and on which you can compute a rate, such as Bytes or Packets.
Use "Histogram" for any value that must be sampled independently, such as latencies.<br/>
Use "Histogram" for any value that must be sampled independently, such as latencies.
Use "Gauge" for other values that don't necessitate accuracy over time (gauges are sampled only every N seconds when Prometheus fetches the metric).<br/>
<br/>
<i>Enum</i>: Counter, Histogram<br/>
</td>
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,5 @@ require (
sigs.k8s.io/randfill v1.0.0 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.7.0 // indirect
)

replace github.com/netobserv/flowlogs-pipeline => github.com/jotak/flowlogs-pipeline v0.0.0-20250505084625-9f7b3100d4e0
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jotak/flowlogs-pipeline v0.0.0-20250505084625-9f7b3100d4e0 h1:ez3LOe9eOCzKbVH1XVTREydOaCmZSZYD6302gyeFJs0=
github.com/jotak/flowlogs-pipeline v0.0.0-20250505084625-9f7b3100d4e0/go.mod h1:w43fHzPa+/Q6zP5elc6XuqgkWvzZzoduGf8DGb/GbO8=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
Expand All @@ -103,8 +105,6 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/netobserv/flowlogs-pipeline v1.9.0-crc0.0.20250502080315-db59695ae6de h1:Tsjw77SH/RGvho8QL4A4yTuZyidrDBRS2jFL8iBItxc=
github.com/netobserv/flowlogs-pipeline v1.9.0-crc0.0.20250502080315-db59695ae6de/go.mod h1:w43fHzPa+/Q6zP5elc6XuqgkWvzZzoduGf8DGb/GbO8=
github.com/netobserv/netobserv-ebpf-agent v1.9.0-crc0.0.20250425123944-0cd2534ae7b0 h1:b+bhl7w1okr0CBFkSHDKrFjqyCdEza5dUGGaCIwI1PY=
github.com/netobserv/netobserv-ebpf-agent v1.9.0-crc0.0.20250425123944-0cd2534ae7b0/go.mod h1:yPu9nOJ4iDXPOt4df4mZSfrxpGeeKTirUwMDXfwXu/Q=
github.com/onsi/ginkgo/v2 v2.23.4 h1:ktYTpKJAVZnDT4VjxSbiBenUjmlL/5QkBEocaWXiQus=
Expand Down
3 changes: 2 additions & 1 deletion helm/templates/flows.netobserv.io_flowmetrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,10 @@ spec:
type: object
type:
description: |-
Metric type: "Counter" or "Histogram".
Metric type: "Counter", "Histogram" or "Gauge".
Use "Counter" for any value that increases over time and on which you can compute a rate, such as Bytes or Packets.
Use "Histogram" for any value that must be sampled independently, such as latencies.
Use "Gauge" for other values that don't necessitate accuracy over time (gauges are sampled only every N seconds when Prometheus fetches the metric).
enum:
- Counter
- Histogram
Expand Down
2 changes: 1 addition & 1 deletion pkg/dashboards/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func CreateHealthDashboard(netobsNs, nsFlowsMetric string) (string, error) {
d.Rows = append(d.Rows, NewRow("", false, "100px", []Panel{
NewPanel("Flows per second", metricslatest.ChartTypeSingleStat, "", 3, NewTarget(
`sum(rate(netobserv_ingest_flows_processed[1m]))`, "")),
NewPanel("Sampling", metricslatest.ChartTypeSingleStat, "", 3, NewTarget(
NewPanel("Global sampling", metricslatest.ChartTypeSingleStat, "", 3, NewTarget(
"avg(netobserv_agent_sampling_rate)", "")),
NewPanel("Errors last minute", metricslatest.ChartTypeSingleStat, "", 3, NewTarget(
`(sum(increase(netobserv_agent_errors_total{severity!="low"}[1m])) OR on() vector(0))
Expand Down
31 changes: 31 additions & 0 deletions pkg/metrics/predefined_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var (
DefaultIncludeList = []string{
"node_ingress_bytes_total",
"node_egress_bytes_total",
"workload_sampling",
"workload_ingress_bytes_total",
"workload_egress_bytes_total",
"namespace_flows_total",
Expand All @@ -51,6 +52,7 @@ var (
"node_egress_bytes_total",
"workload_ingress_bytes_total",
"workload_egress_bytes_total",
"workload_sampling",
"workload_ingress_packets_total",
"workload_egress_packets_total",
"workload_flows_total",
Expand Down Expand Up @@ -93,6 +95,16 @@ func init() {
})
}
}
// Sampling
predefinedMetrics = append(predefinedMetrics, taggedMetricDefinition{
FlowMetricSpec: metricslatest.FlowMetricSpec{
MetricName: fmt.Sprintf("%s_sampling", groupTrimmed),
Type: metricslatest.GaugeMetric,
ValueField: "Sampling",
Labels: labels,
},
tags: []string{group, "sampling"},
})
// Flows metrics
predefinedMetrics = append(predefinedMetrics, taggedMetricDefinition{
FlowMetricSpec: metricslatest.FlowMetricSpec{
Expand Down Expand Up @@ -283,6 +295,9 @@ func GetIncludeList(spec *flowslatest.FlowCollectorSpec) []string {
if !helper.IsNetworkEventsEnabled(&spec.Agent.EBPF) {
list = removeMetricsByPattern(list, "_network_policy_")
}
if !hasFiltersSampling(spec) {
list = removeMetricsByPattern(list, "_sampling")
}
return list
}

Expand Down Expand Up @@ -339,3 +354,19 @@ func MergePredefined(fm []metricslatest.FlowMetric, fc *flowslatest.FlowCollecto
predefined := GetDefinitions(fc, false)
return append(predefined, fm...)
}

func hasFiltersSampling(fc *flowslatest.FlowCollectorSpec) bool {
if fc.Agent.EBPF.FlowFilter != nil {
for i := range fc.Agent.EBPF.FlowFilter.Rules {
if fc.Agent.EBPF.FlowFilter.Rules[i].Sampling != nil && *fc.Agent.EBPF.FlowFilter.Rules[i].Sampling > 1 {
return true
}
}
}
for _, rule := range fc.Processor.Filters {
if rule.Sampling > 1 {
return true
}
}
return false
}
3 changes: 3 additions & 0 deletions pkg/metrics/predefined_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@ func TestIncludeExclude(t *testing.T) {
res := GetAsIncludeList([]string{"egress", "packets", "flows"}, nil)
assert.Equal([]flowslatest.FLPMetric{
"node_ingress_bytes_total",
"node_sampling",
"node_rtt_seconds",
"node_drop_bytes_total",
"node_dns_latency_seconds",
"node_network_policy_events_total",
"namespace_ingress_bytes_total",
"namespace_sampling",
"namespace_rtt_seconds",
"namespace_drop_bytes_total",
"namespace_dns_latency_seconds",
"namespace_network_policy_events_total",
"workload_ingress_bytes_total",
"workload_sampling",
"workload_rtt_seconds",
"workload_drop_bytes_total",
"workload_dns_latency_seconds",
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ github.com/munnerz/goautoneg
# github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f
## explicit
github.com/mwitkow/go-conntrack
# github.com/netobserv/flowlogs-pipeline v1.9.0-crc0.0.20250502080315-db59695ae6de
# github.com/netobserv/flowlogs-pipeline v1.9.0-crc0.0.20250502080315-db59695ae6de => github.com/jotak/flowlogs-pipeline v0.0.0-20250505084625-9f7b3100d4e0
## explicit; go 1.23.0
github.com/netobserv/flowlogs-pipeline/pkg/api
github.com/netobserv/flowlogs-pipeline/pkg/config
Expand Down Expand Up @@ -1215,3 +1215,4 @@ sigs.k8s.io/structured-merge-diff/v4/value
## explicit; go 1.12
sigs.k8s.io/yaml
sigs.k8s.io/yaml/goyaml.v2
# github.com/netobserv/flowlogs-pipeline => github.com/jotak/flowlogs-pipeline v0.0.0-20250505084625-9f7b3100d4e0