Skip to content

Commit 9b38040

Browse files
authored
Delete by range (#426)
* support delete by range * update partitions max time when needed + tests + debug logs and prints * only delete item if needed + remove logs and unneeded tests * fix possible panic * go fmt * fix race * go fmt * fix hang + debug prints * add ctl validation * recalculate server-side aggregations + make deleting of whole partitions parallel * add maxTime optimization to set as the latest datapoint in the last chunk * delete entire chunk if no samples are left in the chunk after deleting * break update expressions to smaller chunks due to engine limitation * change max time logic in delete * update v3io-go * linter stuff
1 parent 761e919 commit 9b38040

File tree

11 files changed

+1767
-250
lines changed

11 files changed

+1767
-250
lines changed

Diff for: go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ require (
1919
github.com/stretchr/testify v1.4.0
2020
github.com/tinylib/msgp v1.1.1 // indirect
2121
github.com/v3io/frames v0.6.8-v0.9.11
22-
github.com/v3io/v3io-go v0.0.5-0.20191205125653-9003ae83f0b6
22+
github.com/v3io/v3io-go v0.0.7-0.20200216132233-3b52a325296d
2323
github.com/v3io/v3io-go-http v0.0.0-20190415143924-cc2fbcde6663 // indirect
2424
github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2
2525
google.golang.org/genproto v0.0.0-20181026194446-8b5d7a19e2d9 // indirect

Diff for: go.sum

+3-4
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,9 @@ github.com/prometheus/prometheus v2.5.0+incompatible h1:7QPitgO2kOFG8ecuRn9O/4L9
7676
github.com/prometheus/prometheus v2.5.0+incompatible/go.mod h1:oAIUtOny2rjMX0OWN5vPR5/q/twIROJvdqnQKDdil/s=
7777
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
7878
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
79+
github.com/russross/blackfriday v1.5.2 h1:HyvC0ARfnZBqnXwABFeSZHpKvJHJJfPz81GNueLj0oo=
7980
github.com/russross/blackfriday v1.5.2+incompatible h1:/YIL6L1Deczl4O/cQ7ZVdrdKwuB6y7EWpw9LkD8xofE=
8081
github.com/russross/blackfriday v1.5.2+incompatible/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
81-
github.com/russross/blackfriday v1.5.2 h1:HyvC0ARfnZBqnXwABFeSZHpKvJHJJfPz81GNueLj0oo=
8282
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
8383
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
8484
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
@@ -104,10 +104,9 @@ github.com/v3io/frames v0.6.8-v0.9.11/go.mod h1:V3j8yjzhNNGXjosCBn7Qf8C8jo25Y+7G
104104
github.com/v3io/sqlparser v0.0.0-20190306105200-4d7273501871 h1:myF4tU/HdFWU1UzMdf16cHRbownzsyvL7VKIHqkrSvo=
105105
github.com/v3io/sqlparser v0.0.0-20190306105200-4d7273501871/go.mod h1:QD2Bo64oyTWzeV8RFehXS0hZEDFgOK99/h2a6ErRu6E=
106106
github.com/v3io/v3io-go v0.0.0-20191024084247-042df6b5ee40eb60996ab7f4e74ec9aa07d996c4/go.mod h1:IFb6dJiyvJnOjXUoCoPJ5UViaYjgVYmqJb4fD1qDeLk=
107-
github.com/v3io/v3io-go v0.0.0-20191120130819-9003ae83f0b673afb88b862d8f46dcc818684450 h1:3JMzABqziU+dBO4NCoIGRhI/NGYPd6d6Zug68nTXQkU=
108107
github.com/v3io/v3io-go v0.0.0-20191120130819-9003ae83f0b673afb88b862d8f46dcc818684450/go.mod h1:IFb6dJiyvJnOjXUoCoPJ5UViaYjgVYmqJb4fD1qDeLk=
109-
github.com/v3io/v3io-go v0.0.5-0.20191205125653-9003ae83f0b6 h1:+52DyMCjcWg6uXAlTe0KgbOsiQqUKrtL9tBPSERhyFg=
110-
github.com/v3io/v3io-go v0.0.5-0.20191205125653-9003ae83f0b6/go.mod h1:IFb6dJiyvJnOjXUoCoPJ5UViaYjgVYmqJb4fD1qDeLk=
108+
github.com/v3io/v3io-go v0.0.7-0.20200216132233-3b52a325296d h1:OotbIx7+QYju2DlAAVxWz0QFzBicHLc47u9DJGpVUL4=
109+
github.com/v3io/v3io-go v0.0.7-0.20200216132233-3b52a325296d/go.mod h1:IFb6dJiyvJnOjXUoCoPJ5UViaYjgVYmqJb4fD1qDeLk=
111110
github.com/v3io/v3io-go-http v0.0.0-20190221115935-53e2b487c9a2 h1:NJc63wM25iS+ci5z7LVwjWD4QM0QpTQw/fovKzatss0=
112111
github.com/v3io/v3io-go-http v0.0.0-20190221115935-53e2b487c9a2/go.mod h1:GXYcR9MxgfbE3BJdkXki5EclvtS8Nxu2RQNLA8hMMog=
113112
github.com/v3io/v3io-go-http v0.0.0-20190415143924-cc2fbcde6663 h1:WZcM/GRBAastacksmv5pODbtr8fJ/0/9EsPDpPfXkRk=

Diff for: pkg/config/config.go

+10-8
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,16 @@ const (
6262
DefaultUseServerAggregateCoefficient = 3
6363

6464
// KV attribute names
65-
MaxTimeAttrName = "_maxtime"
66-
LabelSetAttrName = "_lset"
67-
EncodingAttrName = "_enc"
68-
OutOfOrderAttrName = "_ooo"
69-
MetricNameAttrName = "_name"
70-
ObjectNameAttrName = "__name"
71-
ChunkAttrPrefix = "_v"
72-
AggregateAttrPrefix = "_v_"
65+
MaxTimeAttrName = "_maxtime"
66+
LabelSetAttrName = "_lset"
67+
EncodingAttrName = "_enc"
68+
OutOfOrderAttrName = "_ooo"
69+
MetricNameAttrName = "_name"
70+
ObjectNameAttrName = "__name"
71+
ChunkAttrPrefix = "_v"
72+
AggregateAttrPrefix = "_v_"
73+
MtimeSecsAttributeName = "__mtime_secs"
74+
MtimeNSecsAttributeName = "__mtime_nsecs"
7375

7476
PrometheusMetricNameAttribute = "__name__"
7577

Diff for: pkg/partmgr/partmgr.go

+60-2
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ func (p *PartitionManager) updateSchema() error {
198198
}
199199

200200
input := &v3io.PutItemInput{Path: schemaFilePath, Attributes: attributes}
201-
err := p.container.PutItemSync(input)
201+
_, err := p.container.PutItemSync(input)
202202

203203
if err != nil {
204204
outerError = errors.Wrap(err, "failed to update partitions table.")
@@ -238,7 +238,7 @@ func (p *PartitionManager) DeletePartitionsFromSchema(partitionsToDelete []*DBPa
238238
deletePartitionExpression.WriteString(");")
239239
}
240240
expression := deletePartitionExpression.String()
241-
err := p.container.UpdateItemSync(&v3io.UpdateItemInput{Path: p.GetSchemaFilePath(), Expression: &expression})
241+
_, err := p.container.UpdateItemSync(&v3io.UpdateItemInput{Path: p.GetSchemaFilePath(), Expression: &expression})
242242
if err != nil {
243243
return err
244244
}
@@ -592,6 +592,33 @@ func (p *DBPartition) Time2Bucket(t int64) int {
592592
return int((t - p.startTime) / p.rollupTime)
593593
}
594594

595+
// Return the start time of an aggregation bucket by id
596+
func (p *DBPartition) GetAggregationBucketStartTime(id int) int64 {
597+
return p.startTime + int64(id)*p.rollupTime
598+
}
599+
600+
// Return the end time of an aggregation bucket by id
601+
func (p *DBPartition) GetAggregationBucketEndTime(id int) int64 {
602+
return p.startTime + int64(id+1)*p.rollupTime - 1
603+
}
604+
605+
func (p *DBPartition) Times2BucketRange(start, end int64) []int {
606+
var buckets []int
607+
608+
if start > p.GetEndTime() || end < p.startTime {
609+
return buckets
610+
}
611+
612+
startingAggrBucket := p.Time2Bucket(start)
613+
endAggrBucket := p.Time2Bucket(end)
614+
615+
for bucketID := startingAggrBucket; bucketID <= endAggrBucket; bucketID++ {
616+
buckets = append(buckets, bucketID)
617+
}
618+
619+
return buckets
620+
}
621+
595622
// Return the nearest chunk start time for the specified time
596623
func (p *DBPartition) GetChunkMint(t int64) int64 {
597624
if t > p.GetEndTime() {
@@ -622,6 +649,37 @@ func (p *DBPartition) TimeToChunkId(tmilli int64) (int, error) {
622649
}
623650
}
624651

652+
// Check if a chunk (by attribute name) is in the given time range.
653+
func (p *DBPartition) IsChunkInRangeByAttr(attr string, mint, maxt int64) bool {
654+
655+
// Discard '_v' prefix
656+
chunkIDStr := attr[2:]
657+
chunkID, err := strconv.ParseInt(chunkIDStr, 10, 64)
658+
if err != nil {
659+
return false
660+
}
661+
662+
chunkStartTime := p.startTime + (chunkID-1)*p.chunkInterval
663+
chunkEndTime := chunkStartTime + p.chunkInterval - 1
664+
665+
return mint <= chunkStartTime && maxt >= chunkEndTime
666+
}
667+
668+
// Get a chunk's start time by it's attribute name
669+
func (p *DBPartition) GetChunkStartTimeByAttr(attr string) (int64, error) {
670+
671+
// Discard '_v' prefix
672+
chunkIDStr := attr[2:]
673+
chunkID, err := strconv.ParseInt(chunkIDStr, 10, 64)
674+
if err != nil {
675+
return 0, err
676+
}
677+
678+
chunkStartTime := p.startTime + (chunkID-1)*p.chunkInterval
679+
680+
return chunkStartTime, nil
681+
}
682+
625683
// Check whether the specified time is within the range of this partition
626684
func (p *DBPartition) InRange(t int64) bool {
627685
if p.manager.cyclic {

0 commit comments

Comments
 (0)