-
Notifications
You must be signed in to change notification settings - Fork 2.7k
handle conflits in prw v2 #39570
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
handle conflits in prw v2 #39570
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
# Use this changelog template to create an entry for release notes. | ||
|
||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' | ||
change_type: 'enhancement' | ||
|
||
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) | ||
component: 'pkg/translator/prometheusremotewrite' | ||
|
||
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). | ||
note: 'Handle conflicts in PRW v2' | ||
|
||
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. | ||
issues: [33661] | ||
|
||
# (Optional) One or more lines of additional information to render under the primary note. | ||
# These lines will be padded with 2 spaces and then inserted directly into the document. | ||
# Use pipe (|) for multiline entries. | ||
subtext: | ||
|
||
# If your change doesn't affect end users or the exported elements of any package, | ||
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. | ||
# Optional: The change log or logs in which this entry should be included. | ||
# e.g. '[user]' or '[user, api]' | ||
# Include 'user' if the change is relevant to end users. | ||
# Include 'api' if there is a change to a library API. | ||
# Default: '[user]' | ||
change_logs: [user] |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -33,14 +33,19 @@ func FromMetricsV2(md pmetric.Metrics, settings Settings) (map[string]*writev2.T | |||||
|
||||||
// prometheusConverterV2 converts from OTLP to Prometheus write 2.0 format. | ||||||
type prometheusConverterV2 struct { | ||||||
// TODO handle conflicts | ||||||
unique map[uint64]*writev2.TimeSeries | ||||||
symbolTable writev2.SymbolsTable | ||||||
unique map[uint64]*writev2.TimeSeries | ||||||
// conflicts is a map of time series signatures(an unique identifier for TS labels) to a list of TSs with the same signature. | ||||||
// this is used to handle conflicts that occur when multiple TSs have the same labels or when different labels generate the same signature. | ||||||
conflicts map[uint64][]*writev2.TimeSeries | ||||||
// conflictCount is used to track the number of conflicts that were encountered. | ||||||
conflictCount int | ||||||
symbolTable writev2.SymbolsTable | ||||||
} | ||||||
|
||||||
func newPrometheusConverterV2() *prometheusConverterV2 { | ||||||
return &prometheusConverterV2{ | ||||||
unique: map[uint64]*writev2.TimeSeries{}, | ||||||
conflicts: map[uint64][]*writev2.TimeSeries{}, | ||||||
symbolTable: writev2.NewSymbolTable(), | ||||||
} | ||||||
} | ||||||
|
@@ -109,10 +114,15 @@ func (c *prometheusConverterV2) fromMetrics(md pmetric.Metrics, settings Setting | |||||
|
||||||
// timeSeries returns a slice of the writev2.TimeSeries that were converted from OTel format. | ||||||
func (c *prometheusConverterV2) timeSeries() []writev2.TimeSeries { | ||||||
allTS := make([]writev2.TimeSeries, 0, len(c.unique)) | ||||||
allTS := make([]writev2.TimeSeries, 0, len(c.unique)+c.conflictCount) | ||||||
for _, ts := range c.unique { | ||||||
allTS = append(allTS, *ts) | ||||||
} | ||||||
for _, cTS := range c.conflicts { | ||||||
for _, ts := range cTS { | ||||||
allTS = append(allTS, *ts) | ||||||
} | ||||||
} | ||||||
return allTS | ||||||
} | ||||||
|
||||||
|
@@ -131,9 +141,40 @@ func (c *prometheusConverterV2) addSample(sample *writev2.Sample, lbls []prompb. | |||||
off = c.symbolTable.Symbolize(l.Value) | ||||||
buf = append(buf, off) | ||||||
} | ||||||
ts := writev2.TimeSeries{ | ||||||
|
||||||
sig := timeSeriesSignature(lbls) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For a future PR: timeSeriesSignature also does a sort by label name. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, I added it here because we are triggering a flaky test. I added more details here. I was trying to debug it with @dashpole before his leaving. This is a TODO task to me figure out how to fix🙃 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you should here follow the same logic as in |
||||||
ts := &writev2.TimeSeries{ | ||||||
LabelsRefs: buf, | ||||||
Samples: []writev2.Sample{*sample}, | ||||||
} | ||||||
c.unique[timeSeriesSignature(lbls)] = &ts | ||||||
|
||||||
// check if the time series is already in the unique map | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comment is superfluous since it only replicates what the code does:
Suggested change
|
||||||
if existingTS, ok := c.unique[sig]; ok { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Nit] I think it's better to follow the more robust approach of the PRW v1 code, and check whether
Suggested change
|
||||||
// if the time series is already in the unique map, check if it is the same metric | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment is also superfluous.
Suggested change
|
||||||
if !isSameMetricV2(existingTS, ts) { | ||||||
// if the time series is not the same metric, add it to the conflicts map | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment also replicates what the code does, it will just make maintenance more difficult.
Suggested change
|
||||||
c.conflicts[sig] = append(c.conflicts[sig], ts) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a bug since you're not checking whether |
||||||
c.conflictCount++ | ||||||
} else { | ||||||
// if the time series is the same metric, add the sample to the existing time series | ||||||
existingTS.Samples = append(existingTS.Samples, *sample) | ||||||
} | ||||||
} else { | ||||||
// if the time series is not in the unique map, add it to the unique map | ||||||
c.unique[sig] = ts | ||||||
} | ||||||
} | ||||||
|
||||||
// isSameMetricV2 checks if two time series are the same metric | ||||||
func isSameMetricV2(ts1, ts2 *writev2.TimeSeries) bool { | ||||||
if len(ts1.LabelsRefs) != len(ts2.LabelsRefs) { | ||||||
return false | ||||||
} | ||||||
// As the labels are sorted as name, value, name, value, ... we can compare the labels by index jumping 2 steps at a time | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. plz update the comment on (new) line 134:
since we're going to depend on this, it should say something like "We need to sort labels for..." There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, I don't know if I got it. Here we are comparing the LabelsRefs from different TSs, that are a list of integer arranged to represent: name, value, name, value.... The sort that we are doing on L132-L135 is related to sort the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the
Suggested change
But anyway, On lines 132-135 we're sorting the labels I did take a look at where the labels come from and it's createAttributes which will return consistent ordering probably - config changes not withstanding. So we could probably get away without making the sort, but I feel like that's brittle. |
||||||
for i := 0; i < len(ts1.LabelsRefs); i += 2 { | ||||||
if ts1.LabelsRefs[i] != ts2.LabelsRefs[i] || ts1.LabelsRefs[i+1] != ts2.LabelsRefs[i+1] { | ||||||
return false | ||||||
} | ||||||
} | ||||||
return true | ||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ import ( | |
"testing" | ||
"time" | ||
|
||
"github.com/prometheus/prometheus/prompb" | ||
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" | ||
"github.com/stretchr/testify/require" | ||
"go.opentelemetry.io/collector/pdata/pcommon" | ||
|
@@ -45,3 +46,95 @@ func TestFromMetricsV2(t *testing.T) { | |
require.ElementsMatch(t, want, slices.Collect(maps.Values(tsMap))) | ||
require.ElementsMatch(t, wantedSymbols, symbolsTable.Symbols()) | ||
} | ||
|
||
func TestIsSameMetricV2(t *testing.T) { | ||
tests := []struct { | ||
name string | ||
ts1 *writev2.TimeSeries | ||
ts2 *writev2.TimeSeries | ||
same bool | ||
}{ | ||
{ | ||
name: "same", | ||
same: true, | ||
ts1: &writev2.TimeSeries{ | ||
LabelsRefs: []uint32{1, 2, 3, 4}, | ||
}, | ||
ts2: &writev2.TimeSeries{ | ||
LabelsRefs: []uint32{1, 2, 3, 4}, | ||
}, | ||
}, | ||
{ | ||
name: "different", | ||
same: false, | ||
ts1: &writev2.TimeSeries{ | ||
LabelsRefs: []uint32{1, 2, 3, 4}, | ||
}, | ||
ts2: &writev2.TimeSeries{ | ||
LabelsRefs: []uint32{1, 2, 3, 5}, | ||
}, | ||
}, | ||
} | ||
for _, test := range tests { | ||
require.Equal(t, test.same, isSameMetricV2(test.ts1, test.ts2)) | ||
} | ||
} | ||
|
||
func TestConflictHandling(t *testing.T) { | ||
// Test 1: No conflicts - different metrics should have different hashes | ||
t.Run("different metrics should not conflict", func(t *testing.T) { | ||
converter := newPrometheusConverterV2() | ||
|
||
metric1 := createSample(1.0, []prompb.Label{ | ||
{Name: "name1", Value: "value1"}, | ||
{Name: "name2", Value: "value2"}, | ||
}) | ||
|
||
metric2 := createSample(2.0, []prompb.Label{ | ||
{Name: "name3", Value: "value3"}, | ||
{Name: "name4", Value: "value4"}, | ||
}) | ||
|
||
converter.addSample(metric1.sample, metric1.labels) | ||
converter.addSample(metric2.sample, metric2.labels) | ||
|
||
require.Equal(t, 0, converter.conflictCount) | ||
require.Len(t, converter.unique, 2) | ||
}) | ||
|
||
// Test 2: Same metric - should be merged | ||
t.Run("same metric should be merged", func(t *testing.T) { | ||
converter := newPrometheusConverterV2() | ||
|
||
labels := []prompb.Label{ | ||
{Name: "name", Value: "value"}, | ||
} | ||
|
||
sample1 := &writev2.Sample{Value: 1.0, Timestamp: 1000} | ||
sample2 := &writev2.Sample{Value: 2.0, Timestamp: 2000} | ||
|
||
converter.addSample(sample1, labels) | ||
converter.addSample(sample2, labels) | ||
|
||
require.Equal(t, 0, converter.conflictCount) | ||
require.Len(t, converter.unique, 1) | ||
require.Len(t, converter.unique[timeSeriesSignature(labels)].Samples, 2) | ||
}) | ||
// TODO: Test 3 Conflict - different metrics with same hash | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that the current set of tests were a little bit poor. Struggling to implement this one... Maybe these that I added could be a good start point. Let me know what you think. |
||
} | ||
|
||
// Helper function to create a sample with labels | ||
type metricSample struct { | ||
sample *writev2.Sample | ||
labels []prompb.Label | ||
} | ||
|
||
func createSample(value float64, labels []prompb.Label) metricSample { | ||
return metricSample{ | ||
sample: &writev2.Sample{ | ||
Value: value, | ||
Timestamp: 1000, | ||
}, | ||
labels: labels, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo in filename, should be "conflicts"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this file is deleted at release time and its contents added to the changelog, it is ok to merge with a typo.