Skip to content

handle conflits in prw v2 #39570

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/handle-conflits.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: 'pkg/translator/prometheusremotewrite'

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: 'Handle conflicts in PRW v2'

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33661]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
52 changes: 47 additions & 5 deletions pkg/translator/prometheusremotewrite/metrics_to_prw_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,17 @@ func FromMetricsV2(md pmetric.Metrics, settings Settings) (map[string]*writev2.T

// prometheusConverterV2 converts from OTLP to Prometheus write 2.0 format.
type prometheusConverterV2 struct {
// TODO handle conflicts
unique map[uint64]*writev2.TimeSeries
unique map[uint64]*writev2.TimeSeries
// conflicts is a map of time series signatures(an unique identifier for TS labels) to a list of TSs with the same signature
// this is used to handle conflicts, that occur when multiple TSs have the same labels
conflicts map[uint64][]*writev2.TimeSeries
symbolTable writev2.SymbolsTable
}

func newPrometheusConverterV2() *prometheusConverterV2 {
return &prometheusConverterV2{
unique: map[uint64]*writev2.TimeSeries{},
conflicts: map[uint64][]*writev2.TimeSeries{},
symbolTable: writev2.NewSymbolTable(),
}
}
Expand Down Expand Up @@ -109,10 +112,19 @@ func (c *prometheusConverterV2) fromMetrics(md pmetric.Metrics, settings Setting

// timeSeries returns a slice of the writev2.TimeSeries that were converted from OTel format.
func (c *prometheusConverterV2) timeSeries() []writev2.TimeSeries {
allTS := make([]writev2.TimeSeries, 0, len(c.unique))
conflicts := 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would be a bit more efficient to keep the number of conflicts as a running tally in prometheusConverterV2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3135279

I'm also updated the comment over the conflict map, because I hadn't thought that we can handle with hash conflits.. Like you share with my on slack

for _, ts := range c.conflicts {
conflicts += len(ts)
}
allTS := make([]writev2.TimeSeries, 0, len(c.unique)+conflicts)
for _, ts := range c.unique {
allTS = append(allTS, *ts)
}
for _, cTS := range c.conflicts {
for _, ts := range cTS {
allTS = append(allTS, *ts)
}
}
return allTS
}

Expand All @@ -131,9 +143,39 @@ func (c *prometheusConverterV2) addSample(sample *writev2.Sample, lbls []prompb.
off = c.symbolTable.Symbolize(l.Value)
buf = append(buf, off)
}
ts := writev2.TimeSeries{

sig := timeSeriesSignature(lbls)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a future PR: timeSeriesSignature also does a sort by label name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I added it here because we are triggering a flaky test. I added more details here. I was trying to debug it with @dashpole before his leaving. This is a TODO task to me figure out how to fix🙃

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should here follow the same logic as in prometheusConverter.addSample, and use a method getOrCreateTimeSeries to obtain the time series to add the sample to. Creating a new time series for every sample even if the time series already exists is bad for performance.

ts := &writev2.TimeSeries{
LabelsRefs: buf,
Samples: []writev2.Sample{*sample},
}
c.unique[timeSeriesSignature(lbls)] = &ts

// check if the time series is already in the unique map
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is superfluous since it only replicates what the code does:

Suggested change
// check if the time series is already in the unique map

if existingTS, ok := c.unique[sig]; ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Nit] I think it's better to follow the more robust approach of the PRW v1 code, and check whether existingTS is nil because you then kill two birds with one stone (you check both whether existingTS is nil and whether it exists in the map):

Suggested change
if existingTS, ok := c.unique[sig]; ok {
if existingTS := c.unique[sig]; existingTS != nil {

// if the time series is already in the unique map, check if it is the same metric
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is also superfluous.

Suggested change
// if the time series is already in the unique map, check if it is the same metric

if !isSameMetricV2(existingTS, ts) {
// if the time series is not the same metric, add it to the conflicts map
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment also replicates what the code does, it will just make maintenance more difficult.

Suggested change
// if the time series is not the same metric, add it to the conflicts map

c.conflicts[sig] = append(c.conflicts[sig], ts)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bug since you're not checking whether c.conflicts[sig] already has a time series with the same labels.

} else {
// if the time series is the same metric, add the sample to the existing time series
existingTS.Samples = append(existingTS.Samples, *sample)
}
} else {
// if the time series is not in the unique map, add it to the unique map
c.unique[sig] = ts
}
}

// isSameMetricV2 checks if two time series are the same metric
func isSameMetricV2(ts1, ts2 *writev2.TimeSeries) bool {
if len(ts1.LabelsRefs) != len(ts2.LabelsRefs) {
return false
}
// As the labels are sorted as name, value, name, value, ... we can compare the labels by index jumping 2 steps at a time
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz update the comment on (new) line 134:

// TODO: Read the PRW spec to see if labels need to be sorted. If it is, then we need to sort in export code. If not, we can sort in the test. (@dashpole have more context on this)

since we're going to depend on this, it should say something like "We need to sort labels for..."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't know if I got it. Here we are comparing the LabelsRefs from different TSs, that are a list of integer arranged to represent: name, value, name, value....

The sort that we are doing on L132-L135 is related to sort the []prompb.Label. The confusion here was made because I used the sort word?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the sort word is a bit ambiguous here, let's say

Suggested change
// As the labels are sorted as name, value, name, value, ... we can compare the labels by index jumping 2 steps at a time
// As the labels are ordered as name, value, name, value, ... we can compare the labels by index jumping 2 steps at a time

But anyway, isSameMetricV2 will only work correctly if the order of labels is consistent, since otherwise it will say "not the same" for {a="1", b="2"} vs {b="2", a="1"} label sets.

On lines 132-135 we're sorting the labels []prompb.Label , but then we're converting these labels into the references on lines 137-143. So the order of the references is the same as the order of the sorted labels. Which means that we ensure consistent order for isSameMetricV2 with the sort on 132-135.

I did take a look at where the labels come from and it's createAttributes which will return consistent ordering probably - config changes not withstanding. So we could probably get away without making the sort, but I feel like that's brittle.

for i := 0; i < len(ts1.LabelsRefs); i += 2 {
if ts1.LabelsRefs[i] != ts2.LabelsRefs[i] || ts1.LabelsRefs[i+1] != ts2.LabelsRefs[i+1] {
return false
}
}
return true
}
33 changes: 33 additions & 0 deletions pkg/translator/prometheusremotewrite/metrics_to_prw_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,36 @@ func TestFromMetricsV2(t *testing.T) {
require.ElementsMatch(t, want, slices.Collect(maps.Values(tsMap)))
require.ElementsMatch(t, wantedSymbols, symbolsTable.Symbols())
}

func TestIsSameMetricV2(t *testing.T) {
tests := []struct {
name string
ts1 *writev2.TimeSeries
ts2 *writev2.TimeSeries
same bool
}{
{
name: "same",
same: true,
ts1: &writev2.TimeSeries{
LabelsRefs: []uint32{1, 2, 3, 4},
},
ts2: &writev2.TimeSeries{
LabelsRefs: []uint32{1, 2, 3, 4},
},
},
{
name: "different",
same: false,
ts1: &writev2.TimeSeries{
LabelsRefs: []uint32{1, 2, 3, 4},
},
ts2: &writev2.TimeSeries{
LabelsRefs: []uint32{1, 2, 3, 5},
},
},
}
for _, test := range tests {
require.Equal(t, test.same, isSameMetricV2(test.ts1, test.ts2))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,6 @@ func TestPrometheusConverterV2_addGaugeNumberDataPoints(t *testing.T) {
}
}

// Right now we are not handling duplicates, the second one will just overwrite the first one as this test case shows
// In follow-up PRs we plan to start handling conflicts and this test will be updated to reflect the new behavior.
func TestPrometheusConverterV2_addGaugeNumberDataPointsDuplicate(t *testing.T) {
ts := uint64(time.Now().UnixNano())
metric1 := getIntGaugeMetric(
Expand All @@ -138,21 +136,23 @@ func TestPrometheusConverterV2_addGaugeNumberDataPointsDuplicate(t *testing.T) {
pcommon.NewMap(),
2, ts,
)
want := func() map[uint64]*writev2.TimeSeries {
labels := labels.Labels{

want := map[uint64]*writev2.TimeSeries{
labels.Labels{
labels.Label{
Name: labels.MetricName,
Value: "test",
},
}
return map[uint64]*writev2.TimeSeries{
labels.Hash(): {
LabelsRefs: []uint32{1, 2},
Samples: []writev2.Sample{
{Timestamp: convertTimeStamp(pcommon.Timestamp(ts)), Value: 2},
},
},
}
}.Hash(): {
LabelsRefs: []uint32{1, 2},
Samples: []writev2.Sample{{
Timestamp: convertTimeStamp(pcommon.Timestamp(ts)),
Value: 1,
}, {
Timestamp: convertTimeStamp(pcommon.Timestamp(ts)),
Value: 2,
}},
},
}

settings := Settings{
Expand All @@ -166,5 +166,5 @@ func TestPrometheusConverterV2_addGaugeNumberDataPointsDuplicate(t *testing.T) {
converter.addGaugeNumberDataPoints(metric1.Gauge().DataPoints(), pcommon.NewResource(), settings, metric1.Name())
converter.addGaugeNumberDataPoints(metric2.Gauge().DataPoints(), pcommon.NewResource(), settings, metric2.Name())

assert.Equal(t, want(), converter.unique)
assert.Equal(t, want, converter.unique)
}
Loading