1
1
package coordinator_test
2
2
3
3
import (
4
- << << << < HEAD :v1 / coordinator / points_writer_test.go
5
4
"context"
6
- == == == =
7
5
"errors"
8
- >> >> >> > 62e803 e673 (feat : improve dropped point logging (#26257 )):coordinator / points_writer_test .go
9
6
"fmt"
7
+ "github.com/stretchr/testify/require"
10
8
"sync"
11
9
"sync/atomic"
12
10
"testing"
@@ -55,85 +53,6 @@ func TestPointsWriter_MapShards_One(t *testing.T) {
55
53
}
56
54
}
57
55
58
- << << << < HEAD:v1 / coordinator / points_writer_test .go
59
- == == == =
60
- func TestPointsWriter_MapShards_WriteLimits (t * testing.T ) {
61
- ms := PointsWriterMetaClient {}
62
- c := coordinator .NewPointsWriter ()
63
-
64
- MustParseDuration := func (s string ) time.Duration {
65
- d , err := time .ParseDuration (s )
66
- require .NoError (t , err , "failed to parse duration: %q" , s )
67
- return d
68
- }
69
-
70
- pastWriteLimit := MustParseDuration ("10m" )
71
- futureWriteLimit := MustParseDuration ("15m" )
72
- rp := NewRetentionPolicy ("myp" , time .Now ().Add (- time .Minute * 45 ), 3 * time .Hour , 3 , futureWriteLimit , pastWriteLimit )
73
-
74
- ms .NodeIDFn = func () uint64 { return 1 }
75
- ms .RetentionPolicyFn = func (db , retentionPolicy string ) (* meta.RetentionPolicyInfo , error ) {
76
- return rp , nil
77
- }
78
-
79
- ms .CreateShardGroupIfNotExistsFn = func (database , policy string , timestamp time.Time ) (* meta.ShardGroupInfo , error ) {
80
- return & rp .ShardGroups [0 ], nil
81
- }
82
-
83
- c .MetaClient = ms
84
-
85
- pr := & coordinator.WritePointsRequest {
86
- Database : "mydb" ,
87
- RetentionPolicy : "myrp" ,
88
- }
89
-
90
- pr .AddPoint ("cpu" , 0.0 , time .Now (), nil )
91
- pr .AddPoint ("cpu" , 1.0 , time .Now ().Add (time .Second ), nil )
92
- pr .AddPoint ("cpu" , 2.0 , time .Now ().Add (time .Minute * 30 ), nil )
93
- pr .AddPoint ("cpu" , - 1.0 , time .Now ().Add (- time .Minute * 5 ), nil )
94
- pr .AddPoint ("cpu" , - 2.0 , time .Now ().Add (- time .Minute * 20 ), nil )
95
-
96
- values := []float64 {0.0 , 1.0 , - 1.0 }
97
-
98
- MapPoints (t , c , pr , values , 2 ,
99
- & coordinator.DroppedPoint {Point : pr .Points [4 ], Reason : coordinator .WriteWindowLowerBound },
100
- & coordinator.DroppedPoint {Point : pr .Points [2 ], Reason : coordinator .WriteWindowUpperBound },
101
- "dropped 0 points outside retention policy of duration 3h0m0s and 2 points outside write window (-10m0s to 15m0s) -" )
102
-
103
- // Clear the write limits by setting them to zero
104
- // No points should be dropped
105
- zeroDuration := time .Duration (0 )
106
- rpu := & meta.RetentionPolicyUpdate {
107
- Name : nil ,
108
- Duration : nil ,
109
- ReplicaN : nil ,
110
- ShardGroupDuration : nil ,
111
- FutureWriteLimit : & zeroDuration ,
112
- PastWriteLimit : & zeroDuration ,
113
- }
114
- require .NoError (t , meta .ApplyRetentionUpdate (rpu , rp ), "ApplyRetentionUpdate failed" )
115
- values = []float64 {0.0 , 1.0 , 2.0 , - 1.0 , - 2.0 }
116
- MapPoints (t , c , pr , values , 0 , nil , nil , "dropped 0 points outside retention policy of duration 3h0m0s -" )
117
-
118
- rpu .SetFutureWriteLimit (futureWriteLimit )
119
- require .NoError (t , meta .ApplyRetentionUpdate (rpu , rp ), "ApplyRetentionUpdate failed" )
120
- values = []float64 {0.0 , 1.0 , - 1.0 , - 2.0 }
121
- MapPoints (t , c , pr , values , 1 ,
122
- & coordinator.DroppedPoint {Point : pr .Points [2 ], Reason : coordinator .WriteWindowUpperBound },
123
- & coordinator.DroppedPoint {Point : pr .Points [2 ], Reason : coordinator .WriteWindowUpperBound },
124
- "dropped 0 points outside retention policy of duration 3h0m0s and 1 points outside write window (15m0s) -" )
125
-
126
- rpu .SetFutureWriteLimit (zeroDuration )
127
- rpu .SetPastWriteLimit (pastWriteLimit )
128
- require .NoError (t , meta .ApplyRetentionUpdate (rpu , rp ), "ApplyRetentionUpdate failed" )
129
- values = []float64 {0.0 , 1.0 , 2.0 , - 1.0 }
130
- MapPoints (t , c , pr , values , 1 ,
131
- & coordinator.DroppedPoint {Point : pr .Points [4 ], Reason : coordinator .WriteWindowLowerBound },
132
- & coordinator.DroppedPoint {Point : pr .Points [4 ], Reason : coordinator .WriteWindowLowerBound },
133
- "dropped 0 points outside retention policy of duration 3h0m0s and 1 points outside write window (-10m0s) -" )
134
-
135
- }
136
-
137
56
func MapPoints (t * testing.T , c * coordinator.PointsWriter , pr * coordinator.WritePointsRequest , values []float64 , droppedCount int , minDropped * coordinator.DroppedPoint , maxDropped * coordinator.DroppedPoint , summary string ) {
138
57
var (
139
58
shardMappings * coordinator.ShardMapping
@@ -175,7 +94,6 @@ func MapPoints(t *testing.T, c *coordinator.PointsWriter, pr *coordinator.WriteP
175
94
}
176
95
}
177
96
178
- >> >> >> > 62e803 e673 (feat : improve dropped point logging (#26257 )):coordinator / points_writer_test .go
179
97
// Ensures the points writer maps to a new shard group when the shard duration
180
98
// is changed.
181
99
func TestPointsWriter_MapShards_AlterShardDuration (t * testing.T ) {
@@ -415,8 +333,7 @@ func TestPointsWriter_WritePoints(t *testing.T) {
415
333
pr .AddPoint ("cpu" , 3.0 , time .Now ().Add (time .Hour + time .Second ), nil )
416
334
417
335
// copy to prevent data race
418
- << << << < HEAD:v1 / coordinator / points_writer_test .go
419
- sm := coordinator .NewShardMapping (16 )
336
+ sm := coordinator .NewShardMapping (nil , 16 )
420
337
sm .MapPoint (
421
338
& meta.ShardInfo {ID : uint64 (1 ), Owners : []meta.ShardOwner {
422
339
{NodeID : 1 },
@@ -438,25 +355,6 @@ func TestPointsWriter_WritePoints(t *testing.T) {
438
355
{NodeID : 3 },
439
356
}},
440
357
pr .Points [2 ])
441
- == == == =
442
- theTest := test
443
- sm := coordinator .NewShardMapping (nil , 16 )
444
- sm .MapPoint (& meta.ShardInfo {ID : uint64 (1 ), Owners : []meta.ShardOwner {
445
- {NodeID : 1 },
446
- {NodeID : 2 },
447
- {NodeID : 3 },
448
- }}, pr .Points [0 ])
449
- sm .MapPoint (& meta.ShardInfo {ID : uint64 (2 ), Owners : []meta.ShardOwner {
450
- {NodeID : 1 },
451
- {NodeID : 2 },
452
- {NodeID : 3 },
453
- }}, pr .Points [1 ])
454
- sm .MapPoint (& meta.ShardInfo {ID : uint64 (2 ), Owners : []meta.ShardOwner {
455
- {NodeID : 1 },
456
- {NodeID : 2 },
457
- {NodeID : 3 },
458
- }}, pr .Points [2 ])
459
- >> >> >> > 62e803 e673 (feat : improve dropped point logging (#26257 )):coordinator / points_writer_test .go
460
358
461
359
// Local coordinator.Node ShardWriter
462
360
// lock on the write increment since these functions get called in parallel
@@ -537,15 +435,10 @@ func TestPointsWriter_WritePoints_Dropped(t *testing.T) {
537
435
require .NoError (t , pw .Close (), "failure closing PointsWriter" )
538
436
}(c )
539
437
540
- << << << < HEAD:v1 / coordinator / points_writer_test .go
541
438
err := c .WritePointsPrivileged (context .Background (), pr .Database , pr .RetentionPolicy , models .ConsistencyLevelOne , pr .Points )
542
- if _ , ok := err .(tsdb.PartialWriteError ); ! ok {
543
- == == == =
544
- err := c .WritePointsPrivileged (tsdb.WriteContext {}, pr .Database , pr .RetentionPolicy , models .ConsistencyLevelOne , pr .Points )
545
439
require .Error (t , err , "unexpected success writing points" )
546
440
var pwErr tsdb.PartialWriteError
547
441
if ! errors .As (err , & pwErr ) {
548
- >> >> >> > 62e803 e673 (feat : improve dropped point logging (#26257 )):coordinator / points_writer_test .go
549
442
t .Errorf ("PointsWriter.WritePoints(): got %v, exp %v" , err , tsdb.PartialWriteError {})
550
443
}
551
444
require .Equal (t , 1 , pwErr .Dropped , "wrong number of points dropped" )
0 commit comments