From a973d4ea42554e1495f3215e535520010adfc11f Mon Sep 17 00:00:00 2001 From: Anton Kolesnikov Date: Mon, 28 Apr 2025 00:19:25 +0800 Subject: [PATCH] add basic implementation --- api/gen/proto/go/metastore/v1/compactor.pb.go | 298 ++++++++----- .../go/metastore/v1/compactor_vtproto.pb.go | 367 ++++++++++++++++ .../go/metastore/v1/raft_log/raft_log.pb.go | 158 +++++-- .../v1/raft_log/raft_log_vtproto.pb.go | 393 ++++++++++++++++++ api/metastore/v1/compactor.proto | 14 +- api/metastore/v1/raft_log/raft_log.proto | 8 + api/openapiv2/gen/phlare.swagger.json | 30 +- pkg/experiment/ingester/segment_test.go | 2 +- .../metastore/compaction_raft_handler.go | 9 +- .../metastore/compaction_service.go | 9 +- pkg/experiment/metastore/dlq/recovery.go | 18 +- pkg/experiment/metastore/dlq/recovery_test.go | 6 +- pkg/experiment/metastore/index/index.go | 39 +- pkg/experiment/metastore/index/index_cache.go | 9 + .../metastore/index/store/index_store.go | 36 ++ .../metastore/index/store/partition.go | 44 +- .../metastore/index_raft_handler.go | 51 ++- pkg/experiment/metastore/index_service.go | 74 +++- pkg/experiment/metastore/metastore.go | 47 ++- pkg/experiment/metastore/metastore_raft.go | 16 + pkg/experiment/metastore/query_service.go | 16 +- pkg/experiment/metastore/retention/cleaner.go | 97 +++++ pkg/experiment/metastore/tenant_service.go | 2 + .../metastore/tombstones/tombstone_queue.go | 8 +- .../metastore/tombstones/tombstones.go | 11 +- 25 files changed, 1562 insertions(+), 200 deletions(-) create mode 100644 pkg/experiment/metastore/retention/cleaner.go diff --git a/api/gen/proto/go/metastore/v1/compactor.pb.go b/api/gen/proto/go/metastore/v1/compactor.pb.go index b95ee2df4a..4bc26a9deb 100644 --- a/api/gen/proto/go/metastore/v1/compactor.pb.go +++ b/api/gen/proto/go/metastore/v1/compactor.pb.go @@ -263,6 +263,7 @@ func (x *CompactionJob) GetTombstones() []*Tombstones { type Tombstones struct { state protoimpl.MessageState `protogen:"open.v1"` Blocks *BlockTombstones `protobuf:"bytes,1,opt,name=blocks,proto3" json:"blocks,omitempty"` + Partition *PartitionTombstone `protobuf:"bytes,2,opt,name=partition,proto3" json:"partition,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -304,6 +305,13 @@ func (x *Tombstones) GetBlocks() *BlockTombstones { return nil } +func (x *Tombstones) GetPartition() *PartitionTombstone { + if x != nil { + return x.Partition + } + return nil +} + type BlockTombstones struct { state protoimpl.MessageState `protogen:"open.v1"` Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` @@ -380,6 +388,83 @@ func (x *BlockTombstones) GetBlocks() []string { return nil } +type PartitionTombstone struct { + state protoimpl.MessageState `protogen:"open.v1"` + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + // Lower time boundary. Unix epoch in nanoseconds. + Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Duration int64 `protobuf:"varint,3,opt,name=duration,proto3" json:"duration,omitempty"` + Shard uint32 `protobuf:"varint,4,opt,name=shard,proto3" json:"shard,omitempty"` + Tenant string `protobuf:"bytes,5,opt,name=tenant,proto3" json:"tenant,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *PartitionTombstone) Reset() { + *x = PartitionTombstone{} + mi := &file_metastore_v1_compactor_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *PartitionTombstone) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PartitionTombstone) ProtoMessage() {} + +func (x *PartitionTombstone) ProtoReflect() protoreflect.Message { + mi := &file_metastore_v1_compactor_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PartitionTombstone.ProtoReflect.Descriptor instead. +func (*PartitionTombstone) Descriptor() ([]byte, []int) { + return file_metastore_v1_compactor_proto_rawDescGZIP(), []int{5} +} + +func (x *PartitionTombstone) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *PartitionTombstone) GetTimestamp() int64 { + if x != nil { + return x.Timestamp + } + return 0 +} + +func (x *PartitionTombstone) GetDuration() int64 { + if x != nil { + return x.Duration + } + return 0 +} + +func (x *PartitionTombstone) GetShard() uint32 { + if x != nil { + return x.Shard + } + return 0 +} + +func (x *PartitionTombstone) GetTenant() string { + if x != nil { + return x.Tenant + } + return "" +} + type CompactionJobAssignment struct { state protoimpl.MessageState `protogen:"open.v1"` Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` @@ -391,7 +476,7 @@ type CompactionJobAssignment struct { func (x *CompactionJobAssignment) Reset() { *x = CompactionJobAssignment{} - mi := &file_metastore_v1_compactor_proto_msgTypes[5] + mi := &file_metastore_v1_compactor_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -403,7 +488,7 @@ func (x *CompactionJobAssignment) String() string { func (*CompactionJobAssignment) ProtoMessage() {} func (x *CompactionJobAssignment) ProtoReflect() protoreflect.Message { - mi := &file_metastore_v1_compactor_proto_msgTypes[5] + mi := &file_metastore_v1_compactor_proto_msgTypes[6] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -416,7 +501,7 @@ func (x *CompactionJobAssignment) ProtoReflect() protoreflect.Message { // Deprecated: Use CompactionJobAssignment.ProtoReflect.Descriptor instead. func (*CompactionJobAssignment) Descriptor() ([]byte, []int) { - return file_metastore_v1_compactor_proto_rawDescGZIP(), []int{5} + return file_metastore_v1_compactor_proto_rawDescGZIP(), []int{6} } func (x *CompactionJobAssignment) GetName() string { @@ -453,7 +538,7 @@ type CompactionJobStatusUpdate struct { func (x *CompactionJobStatusUpdate) Reset() { *x = CompactionJobStatusUpdate{} - mi := &file_metastore_v1_compactor_proto_msgTypes[6] + mi := &file_metastore_v1_compactor_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -465,7 +550,7 @@ func (x *CompactionJobStatusUpdate) String() string { func (*CompactionJobStatusUpdate) ProtoMessage() {} func (x *CompactionJobStatusUpdate) ProtoReflect() protoreflect.Message { - mi := &file_metastore_v1_compactor_proto_msgTypes[6] + mi := &file_metastore_v1_compactor_proto_msgTypes[7] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -478,7 +563,7 @@ func (x *CompactionJobStatusUpdate) ProtoReflect() protoreflect.Message { // Deprecated: Use CompactionJobStatusUpdate.ProtoReflect.Descriptor instead. func (*CompactionJobStatusUpdate) Descriptor() ([]byte, []int) { - return file_metastore_v1_compactor_proto_rawDescGZIP(), []int{6} + return file_metastore_v1_compactor_proto_rawDescGZIP(), []int{7} } func (x *CompactionJobStatusUpdate) GetName() string { @@ -519,7 +604,7 @@ type CompactedBlocks struct { func (x *CompactedBlocks) Reset() { *x = CompactedBlocks{} - mi := &file_metastore_v1_compactor_proto_msgTypes[7] + mi := &file_metastore_v1_compactor_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -531,7 +616,7 @@ func (x *CompactedBlocks) String() string { func (*CompactedBlocks) ProtoMessage() {} func (x *CompactedBlocks) ProtoReflect() protoreflect.Message { - mi := &file_metastore_v1_compactor_proto_msgTypes[7] + mi := &file_metastore_v1_compactor_proto_msgTypes[8] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -544,7 +629,7 @@ func (x *CompactedBlocks) ProtoReflect() protoreflect.Message { // Deprecated: Use CompactedBlocks.ProtoReflect.Descriptor instead. func (*CompactedBlocks) Descriptor() ([]byte, []int) { - return file_metastore_v1_compactor_proto_rawDescGZIP(), []int{7} + return file_metastore_v1_compactor_proto_rawDescGZIP(), []int{8} } func (x *CompactedBlocks) GetSourceBlocks() *BlockList { @@ -602,77 +687,90 @@ var file_metastore_v1_compactor_proto_rawDesc = string([]byte{ 0x62, 0x73, 0x74, 0x6f, 0x6e, 0x65, 0x73, 0x18, 0x06, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x6d, 0x65, 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x6f, 0x6d, 0x62, 0x73, 0x74, 0x6f, 0x6e, 0x65, 0x73, 0x52, 0x0a, 0x74, 0x6f, 0x6d, 0x62, 0x73, 0x74, 0x6f, - 0x6e, 0x65, 0x73, 0x22, 0x43, 0x0a, 0x0a, 0x54, 0x6f, 0x6d, 0x62, 0x73, 0x74, 0x6f, 0x6e, 0x65, - 0x73, 0x12, 0x35, 0x0a, 0x06, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x6e, 0x65, 0x73, 0x22, 0x83, 0x01, 0x0a, 0x0a, 0x54, 0x6f, 0x6d, 0x62, 0x73, 0x74, 0x6f, 0x6e, + 0x65, 0x73, 0x12, 0x35, 0x0a, 0x06, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x6d, 0x65, 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, + 0x31, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x54, 0x6f, 0x6d, 0x62, 0x73, 0x74, 0x6f, 0x6e, 0x65, + 0x73, 0x52, 0x06, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x12, 0x3e, 0x0a, 0x09, 0x70, 0x61, 0x72, + 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x6d, + 0x65, 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x61, 0x72, 0x74, + 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x6f, 0x6d, 0x62, 0x73, 0x74, 0x6f, 0x6e, 0x65, 0x52, 0x09, + 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x96, 0x01, 0x0a, 0x0f, 0x42, 0x6c, + 0x6f, 0x63, 0x6b, 0x54, 0x6f, 0x6d, 0x62, 0x73, 0x74, 0x6f, 0x6e, 0x65, 0x73, 0x12, 0x12, 0x0a, + 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, + 0x65, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x68, 0x61, 0x72, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, + 0x52, 0x05, 0x73, 0x68, 0x61, 0x72, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x65, 0x6e, 0x61, 0x6e, + 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x12, + 0x29, 0x0a, 0x10, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6c, 0x65, + 0x76, 0x65, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0f, 0x63, 0x6f, 0x6d, 0x70, 0x61, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x12, 0x16, 0x0a, 0x06, 0x62, 0x6c, + 0x6f, 0x63, 0x6b, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x09, 0x52, 0x06, 0x62, 0x6c, 0x6f, 0x63, + 0x6b, 0x73, 0x22, 0x90, 0x01, 0x0a, 0x12, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, + 0x54, 0x6f, 0x6d, 0x62, 0x73, 0x74, 0x6f, 0x6e, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, + 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x1c, 0x0a, + 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, + 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x1a, 0x0a, 0x08, 0x64, + 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x64, + 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x68, 0x61, 0x72, 0x64, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x73, 0x68, 0x61, 0x72, 0x64, 0x12, 0x16, 0x0a, + 0x06, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, + 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x22, 0x6d, 0x0a, 0x17, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, + 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, + 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, + 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x04, 0x52, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x28, 0x0a, 0x10, 0x6c, 0x65, + 0x61, 0x73, 0x65, 0x5f, 0x65, 0x78, 0x70, 0x69, 0x72, 0x65, 0x73, 0x5f, 0x61, 0x74, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x03, 0x52, 0x0e, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x45, 0x78, 0x70, 0x69, 0x72, + 0x65, 0x73, 0x41, 0x74, 0x22, 0xca, 0x01, 0x0a, 0x19, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, + 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x55, 0x70, 0x64, 0x61, + 0x74, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x39, 0x0a, 0x06, + 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x21, 0x2e, 0x6d, + 0x65, 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6d, 0x70, + 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, + 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x48, 0x0a, 0x10, 0x63, 0x6f, 0x6d, 0x70, 0x61, + 0x63, 0x74, 0x65, 0x64, 0x5f, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x6d, 0x65, 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, - 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x54, 0x6f, 0x6d, 0x62, 0x73, 0x74, 0x6f, 0x6e, 0x65, 0x73, - 0x52, 0x06, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x22, 0x96, 0x01, 0x0a, 0x0f, 0x42, 0x6c, 0x6f, - 0x63, 0x6b, 0x54, 0x6f, 0x6d, 0x62, 0x73, 0x74, 0x6f, 0x6e, 0x65, 0x73, 0x12, 0x12, 0x0a, 0x04, - 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, - 0x12, 0x14, 0x0a, 0x05, 0x73, 0x68, 0x61, 0x72, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, - 0x05, 0x73, 0x68, 0x61, 0x72, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, - 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x12, 0x29, - 0x0a, 0x10, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6c, 0x65, 0x76, - 0x65, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0f, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x63, - 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x12, 0x16, 0x0a, 0x06, 0x62, 0x6c, 0x6f, - 0x63, 0x6b, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x09, 0x52, 0x06, 0x62, 0x6c, 0x6f, 0x63, 0x6b, - 0x73, 0x22, 0x6d, 0x0a, 0x17, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4a, - 0x6f, 0x62, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x12, 0x0a, 0x04, - 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, - 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, - 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x28, 0x0a, 0x10, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x5f, - 0x65, 0x78, 0x70, 0x69, 0x72, 0x65, 0x73, 0x5f, 0x61, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, - 0x52, 0x0e, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x45, 0x78, 0x70, 0x69, 0x72, 0x65, 0x73, 0x41, 0x74, - 0x22, 0xca, 0x01, 0x0a, 0x19, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4a, - 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x12, - 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, - 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x04, 0x52, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x39, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, - 0x75, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x21, 0x2e, 0x6d, 0x65, 0x74, 0x61, 0x73, - 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, - 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x12, 0x48, 0x0a, 0x10, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x65, 0x64, - 0x5f, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, - 0x6d, 0x65, 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6d, - 0x70, 0x61, 0x63, 0x74, 0x65, 0x64, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x52, 0x0f, 0x63, 0x6f, - 0x6d, 0x70, 0x61, 0x63, 0x74, 0x65, 0x64, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x22, 0x87, 0x01, - 0x0a, 0x0f, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x65, 0x64, 0x42, 0x6c, 0x6f, 0x63, 0x6b, - 0x73, 0x12, 0x3c, 0x0a, 0x0d, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x62, 0x6c, 0x6f, 0x63, - 0x6b, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x6d, 0x65, 0x74, 0x61, 0x73, - 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x4c, 0x69, 0x73, - 0x74, 0x52, 0x0c, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x12, - 0x36, 0x0a, 0x0a, 0x6e, 0x65, 0x77, 0x5f, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x18, 0x02, 0x20, - 0x03, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x6d, 0x65, 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, - 0x76, 0x31, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x4d, 0x65, 0x74, 0x61, 0x52, 0x09, 0x6e, 0x65, - 0x77, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x2a, 0x7a, 0x0a, 0x13, 0x43, 0x6f, 0x6d, 0x70, 0x61, - 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x21, - 0x0a, 0x1d, 0x43, 0x4f, 0x4d, 0x50, 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x53, 0x54, 0x41, - 0x54, 0x55, 0x53, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, - 0x00, 0x12, 0x21, 0x0a, 0x1d, 0x43, 0x4f, 0x4d, 0x50, 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, - 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x49, 0x4e, 0x5f, 0x50, 0x52, 0x4f, 0x47, 0x52, 0x45, - 0x53, 0x53, 0x10, 0x01, 0x12, 0x1d, 0x0a, 0x19, 0x43, 0x4f, 0x4d, 0x50, 0x41, 0x43, 0x54, 0x49, - 0x4f, 0x4e, 0x5f, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x53, 0x55, 0x43, 0x43, 0x45, 0x53, - 0x53, 0x10, 0x02, 0x32, 0x7e, 0x0a, 0x11, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, - 0x6e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x69, 0x0a, 0x12, 0x50, 0x6f, 0x6c, 0x6c, - 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x73, 0x12, 0x27, - 0x2e, 0x6d, 0x65, 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x6f, - 0x6c, 0x6c, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x73, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x6d, 0x65, 0x74, 0x61, 0x73, 0x74, - 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x6f, 0x6c, 0x6c, 0x43, 0x6f, 0x6d, 0x70, 0x61, - 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x00, 0x42, 0xbb, 0x01, 0x0a, 0x10, 0x63, 0x6f, 0x6d, 0x2e, 0x6d, 0x65, 0x74, 0x61, - 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x42, 0x0e, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, - 0x74, 0x6f, 0x72, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x46, 0x67, 0x69, 0x74, 0x68, - 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x72, 0x61, 0x66, 0x61, 0x6e, 0x61, 0x2f, 0x70, - 0x79, 0x72, 0x6f, 0x73, 0x63, 0x6f, 0x70, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x65, 0x6e, - 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x6d, 0x65, 0x74, 0x61, 0x73, 0x74, - 0x6f, 0x72, 0x65, 0x2f, 0x76, 0x31, 0x3b, 0x6d, 0x65, 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, - 0x76, 0x31, 0xa2, 0x02, 0x03, 0x4d, 0x58, 0x58, 0xaa, 0x02, 0x0c, 0x4d, 0x65, 0x74, 0x61, 0x73, - 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x0c, 0x4d, 0x65, 0x74, 0x61, 0x73, 0x74, - 0x6f, 0x72, 0x65, 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x18, 0x4d, 0x65, 0x74, 0x61, 0x73, 0x74, 0x6f, - 0x72, 0x65, 0x5c, 0x56, 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, - 0x61, 0xea, 0x02, 0x0d, 0x4d, 0x65, 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x3a, 0x3a, 0x56, - 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x65, 0x64, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x73, + 0x52, 0x0f, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x65, 0x64, 0x42, 0x6c, 0x6f, 0x63, 0x6b, + 0x73, 0x22, 0x87, 0x01, 0x0a, 0x0f, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x65, 0x64, 0x42, + 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x12, 0x3c, 0x0a, 0x0d, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, + 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x6d, + 0x65, 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x42, 0x6c, 0x6f, 0x63, + 0x6b, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x0c, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x42, 0x6c, 0x6f, + 0x63, 0x6b, 0x73, 0x12, 0x36, 0x0a, 0x0a, 0x6e, 0x65, 0x77, 0x5f, 0x62, 0x6c, 0x6f, 0x63, 0x6b, + 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x6d, 0x65, 0x74, 0x61, 0x73, 0x74, + 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x4d, 0x65, 0x74, 0x61, + 0x52, 0x09, 0x6e, 0x65, 0x77, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x2a, 0x7a, 0x0a, 0x13, 0x43, + 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x12, 0x21, 0x0a, 0x1d, 0x43, 0x4f, 0x4d, 0x50, 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, + 0x5f, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, + 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x21, 0x0a, 0x1d, 0x43, 0x4f, 0x4d, 0x50, 0x41, 0x43, 0x54, + 0x49, 0x4f, 0x4e, 0x5f, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x49, 0x4e, 0x5f, 0x50, 0x52, + 0x4f, 0x47, 0x52, 0x45, 0x53, 0x53, 0x10, 0x01, 0x12, 0x1d, 0x0a, 0x19, 0x43, 0x4f, 0x4d, 0x50, + 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x53, 0x55, + 0x43, 0x43, 0x45, 0x53, 0x53, 0x10, 0x02, 0x32, 0x7e, 0x0a, 0x11, 0x43, 0x6f, 0x6d, 0x70, 0x61, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x69, 0x0a, 0x12, + 0x50, 0x6f, 0x6c, 0x6c, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4a, 0x6f, + 0x62, 0x73, 0x12, 0x27, 0x2e, 0x6d, 0x65, 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, + 0x31, 0x2e, 0x50, 0x6f, 0x6c, 0x6c, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, + 0x4a, 0x6f, 0x62, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x6d, 0x65, + 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x6f, 0x6c, 0x6c, 0x43, + 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x73, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0xbb, 0x01, 0x0a, 0x10, 0x63, 0x6f, 0x6d, 0x2e, + 0x6d, 0x65, 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x42, 0x0e, 0x43, 0x6f, + 0x6d, 0x70, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x46, + 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x72, 0x61, 0x66, 0x61, + 0x6e, 0x61, 0x2f, 0x70, 0x79, 0x72, 0x6f, 0x73, 0x63, 0x6f, 0x70, 0x65, 0x2f, 0x61, 0x70, 0x69, + 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x6d, 0x65, + 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2f, 0x76, 0x31, 0x3b, 0x6d, 0x65, 0x74, 0x61, 0x73, + 0x74, 0x6f, 0x72, 0x65, 0x76, 0x31, 0xa2, 0x02, 0x03, 0x4d, 0x58, 0x58, 0xaa, 0x02, 0x0c, 0x4d, + 0x65, 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x0c, 0x4d, 0x65, + 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x18, 0x4d, 0x65, 0x74, + 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x5c, 0x56, 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, + 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0d, 0x4d, 0x65, 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, + 0x65, 0x3a, 0x3a, 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, }) var ( @@ -688,7 +786,7 @@ func file_metastore_v1_compactor_proto_rawDescGZIP() []byte { } var file_metastore_v1_compactor_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_metastore_v1_compactor_proto_msgTypes = make([]protoimpl.MessageInfo, 8) +var file_metastore_v1_compactor_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_metastore_v1_compactor_proto_goTypes = []any{ (CompactionJobStatus)(0), // 0: metastore.v1.CompactionJobStatus (*PollCompactionJobsRequest)(nil), // 1: metastore.v1.PollCompactionJobsRequest @@ -696,29 +794,31 @@ var file_metastore_v1_compactor_proto_goTypes = []any{ (*CompactionJob)(nil), // 3: metastore.v1.CompactionJob (*Tombstones)(nil), // 4: metastore.v1.Tombstones (*BlockTombstones)(nil), // 5: metastore.v1.BlockTombstones - (*CompactionJobAssignment)(nil), // 6: metastore.v1.CompactionJobAssignment - (*CompactionJobStatusUpdate)(nil), // 7: metastore.v1.CompactionJobStatusUpdate - (*CompactedBlocks)(nil), // 8: metastore.v1.CompactedBlocks - (*BlockList)(nil), // 9: metastore.v1.BlockList - (*BlockMeta)(nil), // 10: metastore.v1.BlockMeta + (*PartitionTombstone)(nil), // 6: metastore.v1.PartitionTombstone + (*CompactionJobAssignment)(nil), // 7: metastore.v1.CompactionJobAssignment + (*CompactionJobStatusUpdate)(nil), // 8: metastore.v1.CompactionJobStatusUpdate + (*CompactedBlocks)(nil), // 9: metastore.v1.CompactedBlocks + (*BlockList)(nil), // 10: metastore.v1.BlockList + (*BlockMeta)(nil), // 11: metastore.v1.BlockMeta } var file_metastore_v1_compactor_proto_depIdxs = []int32{ - 7, // 0: metastore.v1.PollCompactionJobsRequest.status_updates:type_name -> metastore.v1.CompactionJobStatusUpdate + 8, // 0: metastore.v1.PollCompactionJobsRequest.status_updates:type_name -> metastore.v1.CompactionJobStatusUpdate 3, // 1: metastore.v1.PollCompactionJobsResponse.compaction_jobs:type_name -> metastore.v1.CompactionJob - 6, // 2: metastore.v1.PollCompactionJobsResponse.assignments:type_name -> metastore.v1.CompactionJobAssignment + 7, // 2: metastore.v1.PollCompactionJobsResponse.assignments:type_name -> metastore.v1.CompactionJobAssignment 4, // 3: metastore.v1.CompactionJob.tombstones:type_name -> metastore.v1.Tombstones 5, // 4: metastore.v1.Tombstones.blocks:type_name -> metastore.v1.BlockTombstones - 0, // 5: metastore.v1.CompactionJobStatusUpdate.status:type_name -> metastore.v1.CompactionJobStatus - 8, // 6: metastore.v1.CompactionJobStatusUpdate.compacted_blocks:type_name -> metastore.v1.CompactedBlocks - 9, // 7: metastore.v1.CompactedBlocks.source_blocks:type_name -> metastore.v1.BlockList - 10, // 8: metastore.v1.CompactedBlocks.new_blocks:type_name -> metastore.v1.BlockMeta - 1, // 9: metastore.v1.CompactionService.PollCompactionJobs:input_type -> metastore.v1.PollCompactionJobsRequest - 2, // 10: metastore.v1.CompactionService.PollCompactionJobs:output_type -> metastore.v1.PollCompactionJobsResponse - 10, // [10:11] is the sub-list for method output_type - 9, // [9:10] is the sub-list for method input_type - 9, // [9:9] is the sub-list for extension type_name - 9, // [9:9] is the sub-list for extension extendee - 0, // [0:9] is the sub-list for field type_name + 6, // 5: metastore.v1.Tombstones.partition:type_name -> metastore.v1.PartitionTombstone + 0, // 6: metastore.v1.CompactionJobStatusUpdate.status:type_name -> metastore.v1.CompactionJobStatus + 9, // 7: metastore.v1.CompactionJobStatusUpdate.compacted_blocks:type_name -> metastore.v1.CompactedBlocks + 10, // 8: metastore.v1.CompactedBlocks.source_blocks:type_name -> metastore.v1.BlockList + 11, // 9: metastore.v1.CompactedBlocks.new_blocks:type_name -> metastore.v1.BlockMeta + 1, // 10: metastore.v1.CompactionService.PollCompactionJobs:input_type -> metastore.v1.PollCompactionJobsRequest + 2, // 11: metastore.v1.CompactionService.PollCompactionJobs:output_type -> metastore.v1.PollCompactionJobsResponse + 11, // [11:12] is the sub-list for method output_type + 10, // [10:11] is the sub-list for method input_type + 10, // [10:10] is the sub-list for extension type_name + 10, // [10:10] is the sub-list for extension extendee + 0, // [0:10] is the sub-list for field type_name } func init() { file_metastore_v1_compactor_proto_init() } @@ -733,7 +833,7 @@ func file_metastore_v1_compactor_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_metastore_v1_compactor_proto_rawDesc), len(file_metastore_v1_compactor_proto_rawDesc)), NumEnums: 1, - NumMessages: 8, + NumMessages: 9, NumExtensions: 0, NumServices: 1, }, diff --git a/api/gen/proto/go/metastore/v1/compactor_vtproto.pb.go b/api/gen/proto/go/metastore/v1/compactor_vtproto.pb.go index 5987cc46aa..0b548e38c9 100644 --- a/api/gen/proto/go/metastore/v1/compactor_vtproto.pb.go +++ b/api/gen/proto/go/metastore/v1/compactor_vtproto.pb.go @@ -115,6 +115,7 @@ func (m *Tombstones) CloneVT() *Tombstones { } r := new(Tombstones) r.Blocks = m.Blocks.CloneVT() + r.Partition = m.Partition.CloneVT() if len(m.unknownFields) > 0 { r.unknownFields = make([]byte, len(m.unknownFields)) copy(r.unknownFields, m.unknownFields) @@ -151,6 +152,27 @@ func (m *BlockTombstones) CloneMessageVT() proto.Message { return m.CloneVT() } +func (m *PartitionTombstone) CloneVT() *PartitionTombstone { + if m == nil { + return (*PartitionTombstone)(nil) + } + r := new(PartitionTombstone) + r.Name = m.Name + r.Timestamp = m.Timestamp + r.Duration = m.Duration + r.Shard = m.Shard + r.Tenant = m.Tenant + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *PartitionTombstone) CloneMessageVT() proto.Message { + return m.CloneVT() +} + func (m *CompactionJobAssignment) CloneVT() *CompactionJobAssignment { if m == nil { return (*CompactionJobAssignment)(nil) @@ -363,6 +385,9 @@ func (this *Tombstones) EqualVT(that *Tombstones) bool { if !this.Blocks.EqualVT(that.Blocks) { return false } + if !this.Partition.EqualVT(that.Partition) { + return false + } return string(this.unknownFields) == string(that.unknownFields) } @@ -410,6 +435,37 @@ func (this *BlockTombstones) EqualMessageVT(thatMsg proto.Message) bool { } return this.EqualVT(that) } +func (this *PartitionTombstone) EqualVT(that *PartitionTombstone) bool { + if this == that { + return true + } else if this == nil || that == nil { + return false + } + if this.Name != that.Name { + return false + } + if this.Timestamp != that.Timestamp { + return false + } + if this.Duration != that.Duration { + return false + } + if this.Shard != that.Shard { + return false + } + if this.Tenant != that.Tenant { + return false + } + return string(this.unknownFields) == string(that.unknownFields) +} + +func (this *PartitionTombstone) EqualMessageVT(thatMsg proto.Message) bool { + that, ok := thatMsg.(*PartitionTombstone) + if !ok { + return false + } + return this.EqualVT(that) +} func (this *CompactionJobAssignment) EqualVT(that *CompactionJobAssignment) bool { if this == that { return true @@ -808,6 +864,16 @@ func (m *Tombstones) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.Partition != nil { + size, err := m.Partition.MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = protohelpers.EncodeVarint(dAtA, i, uint64(size)) + i-- + dAtA[i] = 0x12 + } if m.Blocks != nil { size, err := m.Blocks.MarshalToSizedBufferVT(dAtA[:i]) if err != nil { @@ -887,6 +953,68 @@ func (m *BlockTombstones) MarshalToSizedBufferVT(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *PartitionTombstone) MarshalVT() (dAtA []byte, err error) { + if m == nil { + return nil, nil + } + size := m.SizeVT() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBufferVT(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PartitionTombstone) MarshalToVT(dAtA []byte) (int, error) { + size := m.SizeVT() + return m.MarshalToSizedBufferVT(dAtA[:size]) +} + +func (m *PartitionTombstone) MarshalToSizedBufferVT(dAtA []byte) (int, error) { + if m == nil { + return 0, nil + } + i := len(dAtA) + _ = i + var l int + _ = l + if m.unknownFields != nil { + i -= len(m.unknownFields) + copy(dAtA[i:], m.unknownFields) + } + if len(m.Tenant) > 0 { + i -= len(m.Tenant) + copy(dAtA[i:], m.Tenant) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.Tenant))) + i-- + dAtA[i] = 0x2a + } + if m.Shard != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Shard)) + i-- + dAtA[i] = 0x20 + } + if m.Duration != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Duration)) + i-- + dAtA[i] = 0x18 + } + if m.Timestamp != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Timestamp)) + i-- + dAtA[i] = 0x10 + } + if len(m.Name) > 0 { + i -= len(m.Name) + copy(dAtA[i:], m.Name) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.Name))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + func (m *CompactionJobAssignment) MarshalVT() (dAtA []byte, err error) { if m == nil { return nil, nil @@ -1139,6 +1267,10 @@ func (m *Tombstones) SizeVT() (n int) { l = m.Blocks.SizeVT() n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) } + if m.Partition != nil { + l = m.Partition.SizeVT() + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } n += len(m.unknownFields) return n } @@ -1173,6 +1305,33 @@ func (m *BlockTombstones) SizeVT() (n int) { return n } +func (m *PartitionTombstone) SizeVT() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Name) + if l > 0 { + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + if m.Timestamp != 0 { + n += 1 + protohelpers.SizeOfVarint(uint64(m.Timestamp)) + } + if m.Duration != 0 { + n += 1 + protohelpers.SizeOfVarint(uint64(m.Duration)) + } + if m.Shard != 0 { + n += 1 + protohelpers.SizeOfVarint(uint64(m.Shard)) + } + l = len(m.Tenant) + if l > 0 { + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + n += len(m.unknownFields) + return n +} + func (m *CompactionJobAssignment) SizeVT() (n int) { if m == nil { return 0 @@ -1744,6 +1903,42 @@ func (m *Tombstones) UnmarshalVT(dAtA []byte) error { return err } iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Partition", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Partition == nil { + m.Partition = &PartitionTombstone{} + } + if err := m.Partition.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) @@ -1951,6 +2146,178 @@ func (m *BlockTombstones) UnmarshalVT(dAtA []byte) error { } return nil } +func (m *PartitionTombstone) UnmarshalVT(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PartitionTombstone: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PartitionTombstone: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Name = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType) + } + m.Timestamp = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Timestamp |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Duration", wireType) + } + m.Duration = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Duration |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Shard", wireType) + } + m.Shard = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Shard |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Tenant", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Tenant = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := protohelpers.Skip(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return protohelpers.ErrInvalidLength + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *CompactionJobAssignment) UnmarshalVT(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/api/gen/proto/go/metastore/v1/raft_log/raft_log.pb.go b/api/gen/proto/go/metastore/v1/raft_log/raft_log.pb.go index f98392e0a5..64e6bab84c 100644 --- a/api/gen/proto/go/metastore/v1/raft_log/raft_log.pb.go +++ b/api/gen/proto/go/metastore/v1/raft_log/raft_log.pb.go @@ -29,6 +29,7 @@ const ( RaftCommand_RAFT_COMMAND_ADD_BLOCK_METADATA RaftCommand = 1 RaftCommand_RAFT_COMMAND_GET_COMPACTION_PLAN_UPDATE RaftCommand = 2 RaftCommand_RAFT_COMMAND_UPDATE_COMPACTION_PLAN RaftCommand = 3 + RaftCommand_RAFT_COMMAND_TRUNCATE_INDEX RaftCommand = 4 ) // Enum value maps for RaftCommand. @@ -38,12 +39,14 @@ var ( 1: "RAFT_COMMAND_ADD_BLOCK_METADATA", 2: "RAFT_COMMAND_GET_COMPACTION_PLAN_UPDATE", 3: "RAFT_COMMAND_UPDATE_COMPACTION_PLAN", + 4: "RAFT_COMMAND_TRUNCATE_INDEX", } RaftCommand_value = map[string]int32{ "RAFT_COMMAND_UNKNOWN": 0, "RAFT_COMMAND_ADD_BLOCK_METADATA": 1, "RAFT_COMMAND_GET_COMPACTION_PLAN_UPDATE": 2, "RAFT_COMMAND_UPDATE_COMPACTION_PLAN": 3, + "RAFT_COMMAND_TRUNCATE_INDEX": 4, } ) @@ -926,6 +929,94 @@ func (x *UpdateCompactionPlanResponse) GetPlanUpdate() *CompactionPlanUpdate { return nil } +type TruncateIndexRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Term uint64 `protobuf:"varint,1,opt,name=term,proto3" json:"term,omitempty"` + Tombstones []*v1.Tombstones `protobuf:"bytes,2,rep,name=tombstones,proto3" json:"tombstones,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *TruncateIndexRequest) Reset() { + *x = TruncateIndexRequest{} + mi := &file_metastore_v1_raft_log_raft_log_proto_msgTypes[15] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *TruncateIndexRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TruncateIndexRequest) ProtoMessage() {} + +func (x *TruncateIndexRequest) ProtoReflect() protoreflect.Message { + mi := &file_metastore_v1_raft_log_raft_log_proto_msgTypes[15] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TruncateIndexRequest.ProtoReflect.Descriptor instead. +func (*TruncateIndexRequest) Descriptor() ([]byte, []int) { + return file_metastore_v1_raft_log_raft_log_proto_rawDescGZIP(), []int{15} +} + +func (x *TruncateIndexRequest) GetTerm() uint64 { + if x != nil { + return x.Term + } + return 0 +} + +func (x *TruncateIndexRequest) GetTombstones() []*v1.Tombstones { + if x != nil { + return x.Tombstones + } + return nil +} + +type TruncateIndexResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *TruncateIndexResponse) Reset() { + *x = TruncateIndexResponse{} + mi := &file_metastore_v1_raft_log_raft_log_proto_msgTypes[16] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *TruncateIndexResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TruncateIndexResponse) ProtoMessage() {} + +func (x *TruncateIndexResponse) ProtoReflect() protoreflect.Message { + mi := &file_metastore_v1_raft_log_raft_log_proto_msgTypes[16] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TruncateIndexResponse.ProtoReflect.Descriptor instead. +func (*TruncateIndexResponse) Descriptor() ([]byte, []int) { + return file_metastore_v1_raft_log_raft_log_proto_rawDescGZIP(), []int{16} +} + var File_metastore_v1_raft_log_raft_log_proto protoreflect.FileDescriptor var file_metastore_v1_raft_log_raft_log_proto_rawDesc = string([]byte{ @@ -1068,17 +1159,27 @@ var file_metastore_v1_raft_log_raft_log_proto_rawDesc = string([]byte{ 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x6c, 0x61, 0x6e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x52, 0x0a, 0x70, 0x6c, 0x61, 0x6e, 0x55, 0x70, 0x64, 0x61, - 0x74, 0x65, 0x2a, 0xa2, 0x01, 0x0a, 0x0b, 0x52, 0x61, 0x66, 0x74, 0x43, 0x6f, 0x6d, 0x6d, 0x61, - 0x6e, 0x64, 0x12, 0x18, 0x0a, 0x14, 0x52, 0x41, 0x46, 0x54, 0x5f, 0x43, 0x4f, 0x4d, 0x4d, 0x41, - 0x4e, 0x44, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x23, 0x0a, 0x1f, - 0x52, 0x41, 0x46, 0x54, 0x5f, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x41, 0x44, 0x44, - 0x5f, 0x42, 0x4c, 0x4f, 0x43, 0x4b, 0x5f, 0x4d, 0x45, 0x54, 0x41, 0x44, 0x41, 0x54, 0x41, 0x10, - 0x01, 0x12, 0x2b, 0x0a, 0x27, 0x52, 0x41, 0x46, 0x54, 0x5f, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, - 0x44, 0x5f, 0x47, 0x45, 0x54, 0x5f, 0x43, 0x4f, 0x4d, 0x50, 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, - 0x5f, 0x50, 0x4c, 0x41, 0x4e, 0x5f, 0x55, 0x50, 0x44, 0x41, 0x54, 0x45, 0x10, 0x02, 0x12, 0x27, - 0x0a, 0x23, 0x52, 0x41, 0x46, 0x54, 0x5f, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x55, - 0x50, 0x44, 0x41, 0x54, 0x45, 0x5f, 0x43, 0x4f, 0x4d, 0x50, 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, - 0x5f, 0x50, 0x4c, 0x41, 0x4e, 0x10, 0x03, 0x42, 0x9d, 0x01, 0x0a, 0x0c, 0x63, 0x6f, 0x6d, 0x2e, + 0x74, 0x65, 0x22, 0x64, 0x0a, 0x14, 0x54, 0x72, 0x75, 0x6e, 0x63, 0x61, 0x74, 0x65, 0x49, 0x6e, + 0x64, 0x65, 0x78, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65, + 0x72, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, 0x74, 0x65, 0x72, 0x6d, 0x12, 0x38, + 0x0a, 0x0a, 0x74, 0x6f, 0x6d, 0x62, 0x73, 0x74, 0x6f, 0x6e, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, + 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x6d, 0x65, 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, + 0x31, 0x2e, 0x54, 0x6f, 0x6d, 0x62, 0x73, 0x74, 0x6f, 0x6e, 0x65, 0x73, 0x52, 0x0a, 0x74, 0x6f, + 0x6d, 0x62, 0x73, 0x74, 0x6f, 0x6e, 0x65, 0x73, 0x22, 0x17, 0x0a, 0x15, 0x54, 0x72, 0x75, 0x6e, + 0x63, 0x61, 0x74, 0x65, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x2a, 0xc3, 0x01, 0x0a, 0x0b, 0x52, 0x61, 0x66, 0x74, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, + 0x64, 0x12, 0x18, 0x0a, 0x14, 0x52, 0x41, 0x46, 0x54, 0x5f, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, + 0x44, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x23, 0x0a, 0x1f, 0x52, + 0x41, 0x46, 0x54, 0x5f, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x41, 0x44, 0x44, 0x5f, + 0x42, 0x4c, 0x4f, 0x43, 0x4b, 0x5f, 0x4d, 0x45, 0x54, 0x41, 0x44, 0x41, 0x54, 0x41, 0x10, 0x01, + 0x12, 0x2b, 0x0a, 0x27, 0x52, 0x41, 0x46, 0x54, 0x5f, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, + 0x5f, 0x47, 0x45, 0x54, 0x5f, 0x43, 0x4f, 0x4d, 0x50, 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, + 0x50, 0x4c, 0x41, 0x4e, 0x5f, 0x55, 0x50, 0x44, 0x41, 0x54, 0x45, 0x10, 0x02, 0x12, 0x27, 0x0a, + 0x23, 0x52, 0x41, 0x46, 0x54, 0x5f, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x55, 0x50, + 0x44, 0x41, 0x54, 0x45, 0x5f, 0x43, 0x4f, 0x4d, 0x50, 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, + 0x50, 0x4c, 0x41, 0x4e, 0x10, 0x03, 0x12, 0x1f, 0x0a, 0x1b, 0x52, 0x41, 0x46, 0x54, 0x5f, 0x43, + 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x52, 0x55, 0x4e, 0x43, 0x41, 0x54, 0x45, 0x5f, + 0x49, 0x4e, 0x44, 0x45, 0x58, 0x10, 0x04, 0x42, 0x9d, 0x01, 0x0a, 0x0c, 0x63, 0x6f, 0x6d, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x42, 0x0c, 0x52, 0x61, 0x66, 0x74, 0x4c, 0x6f, 0x67, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x43, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x72, 0x61, 0x66, 0x61, 0x6e, 0x61, 0x2f, 0x70, 0x79, 0x72, @@ -1104,7 +1205,7 @@ func file_metastore_v1_raft_log_raft_log_proto_rawDescGZIP() []byte { } var file_metastore_v1_raft_log_raft_log_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_metastore_v1_raft_log_raft_log_proto_msgTypes = make([]protoimpl.MessageInfo, 15) +var file_metastore_v1_raft_log_raft_log_proto_msgTypes = make([]protoimpl.MessageInfo, 17) var file_metastore_v1_raft_log_raft_log_proto_goTypes = []any{ (RaftCommand)(0), // 0: raft_log.RaftCommand (*AddBlockMetadataRequest)(nil), // 1: raft_log.AddBlockMetadataRequest @@ -1122,15 +1223,17 @@ var file_metastore_v1_raft_log_raft_log_proto_goTypes = []any{ (*CompactionJobPlan)(nil), // 13: raft_log.CompactionJobPlan (*UpdateCompactionPlanRequest)(nil), // 14: raft_log.UpdateCompactionPlanRequest (*UpdateCompactionPlanResponse)(nil), // 15: raft_log.UpdateCompactionPlanResponse - (*v1.BlockMeta)(nil), // 16: metastore.v1.BlockMeta - (v1.CompactionJobStatus)(0), // 17: metastore.v1.CompactionJobStatus - (*v1.CompactedBlocks)(nil), // 18: metastore.v1.CompactedBlocks - (*v1.Tombstones)(nil), // 19: metastore.v1.Tombstones + (*TruncateIndexRequest)(nil), // 16: raft_log.TruncateIndexRequest + (*TruncateIndexResponse)(nil), // 17: raft_log.TruncateIndexResponse + (*v1.BlockMeta)(nil), // 18: metastore.v1.BlockMeta + (v1.CompactionJobStatus)(0), // 19: metastore.v1.CompactionJobStatus + (*v1.CompactedBlocks)(nil), // 20: metastore.v1.CompactedBlocks + (*v1.Tombstones)(nil), // 21: metastore.v1.Tombstones } var file_metastore_v1_raft_log_raft_log_proto_depIdxs = []int32{ - 16, // 0: raft_log.AddBlockMetadataRequest.metadata:type_name -> metastore.v1.BlockMeta + 18, // 0: raft_log.AddBlockMetadataRequest.metadata:type_name -> metastore.v1.BlockMeta 4, // 1: raft_log.GetCompactionPlanUpdateRequest.status_updates:type_name -> raft_log.CompactionJobStatusUpdate - 17, // 2: raft_log.CompactionJobStatusUpdate.status:type_name -> metastore.v1.CompactionJobStatus + 19, // 2: raft_log.CompactionJobStatusUpdate.status:type_name -> metastore.v1.CompactionJobStatus 6, // 3: raft_log.GetCompactionPlanUpdateResponse.plan_update:type_name -> raft_log.CompactionPlanUpdate 7, // 4: raft_log.CompactionPlanUpdate.new_jobs:type_name -> raft_log.NewCompactionJob 8, // 5: raft_log.CompactionPlanUpdate.assigned_jobs:type_name -> raft_log.AssignedCompactionJob @@ -1143,17 +1246,18 @@ var file_metastore_v1_raft_log_raft_log_proto_depIdxs = []int32{ 13, // 12: raft_log.AssignedCompactionJob.plan:type_name -> raft_log.CompactionJobPlan 12, // 13: raft_log.UpdatedCompactionJob.state:type_name -> raft_log.CompactionJobState 12, // 14: raft_log.CompletedCompactionJob.state:type_name -> raft_log.CompactionJobState - 18, // 15: raft_log.CompletedCompactionJob.compacted_blocks:type_name -> metastore.v1.CompactedBlocks + 20, // 15: raft_log.CompletedCompactionJob.compacted_blocks:type_name -> metastore.v1.CompactedBlocks 12, // 16: raft_log.EvictedCompactionJob.state:type_name -> raft_log.CompactionJobState - 17, // 17: raft_log.CompactionJobState.status:type_name -> metastore.v1.CompactionJobStatus - 19, // 18: raft_log.CompactionJobPlan.tombstones:type_name -> metastore.v1.Tombstones + 19, // 17: raft_log.CompactionJobState.status:type_name -> metastore.v1.CompactionJobStatus + 21, // 18: raft_log.CompactionJobPlan.tombstones:type_name -> metastore.v1.Tombstones 6, // 19: raft_log.UpdateCompactionPlanRequest.plan_update:type_name -> raft_log.CompactionPlanUpdate 6, // 20: raft_log.UpdateCompactionPlanResponse.plan_update:type_name -> raft_log.CompactionPlanUpdate - 21, // [21:21] is the sub-list for method output_type - 21, // [21:21] is the sub-list for method input_type - 21, // [21:21] is the sub-list for extension type_name - 21, // [21:21] is the sub-list for extension extendee - 0, // [0:21] is the sub-list for field type_name + 21, // 21: raft_log.TruncateIndexRequest.tombstones:type_name -> metastore.v1.Tombstones + 22, // [22:22] is the sub-list for method output_type + 22, // [22:22] is the sub-list for method input_type + 22, // [22:22] is the sub-list for extension type_name + 22, // [22:22] is the sub-list for extension extendee + 0, // [0:22] is the sub-list for field type_name } func init() { file_metastore_v1_raft_log_raft_log_proto_init() } @@ -1167,7 +1271,7 @@ func file_metastore_v1_raft_log_raft_log_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_metastore_v1_raft_log_raft_log_proto_rawDesc), len(file_metastore_v1_raft_log_raft_log_proto_rawDesc)), NumEnums: 1, - NumMessages: 15, + NumMessages: 17, NumExtensions: 0, NumServices: 0, }, diff --git a/api/gen/proto/go/metastore/v1/raft_log/raft_log_vtproto.pb.go b/api/gen/proto/go/metastore/v1/raft_log/raft_log_vtproto.pb.go index 544877e0c1..c7780beaee 100644 --- a/api/gen/proto/go/metastore/v1/raft_log/raft_log_vtproto.pb.go +++ b/api/gen/proto/go/metastore/v1/raft_log/raft_log_vtproto.pb.go @@ -359,6 +359,50 @@ func (m *UpdateCompactionPlanResponse) CloneMessageVT() proto.Message { return m.CloneVT() } +func (m *TruncateIndexRequest) CloneVT() *TruncateIndexRequest { + if m == nil { + return (*TruncateIndexRequest)(nil) + } + r := new(TruncateIndexRequest) + r.Term = m.Term + if rhs := m.Tombstones; rhs != nil { + tmpContainer := make([]*v1.Tombstones, len(rhs)) + for k, v := range rhs { + if vtpb, ok := interface{}(v).(interface{ CloneVT() *v1.Tombstones }); ok { + tmpContainer[k] = vtpb.CloneVT() + } else { + tmpContainer[k] = proto.Clone(v).(*v1.Tombstones) + } + } + r.Tombstones = tmpContainer + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *TruncateIndexRequest) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (m *TruncateIndexResponse) CloneVT() *TruncateIndexResponse { + if m == nil { + return (*TruncateIndexResponse)(nil) + } + r := new(TruncateIndexResponse) + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *TruncateIndexResponse) CloneMessageVT() proto.Message { + return m.CloneVT() +} + func (this *AddBlockMetadataRequest) EqualVT(that *AddBlockMetadataRequest) bool { if this == that { return true @@ -828,6 +872,62 @@ func (this *UpdateCompactionPlanResponse) EqualMessageVT(thatMsg proto.Message) } return this.EqualVT(that) } +func (this *TruncateIndexRequest) EqualVT(that *TruncateIndexRequest) bool { + if this == that { + return true + } else if this == nil || that == nil { + return false + } + if this.Term != that.Term { + return false + } + if len(this.Tombstones) != len(that.Tombstones) { + return false + } + for i, vx := range this.Tombstones { + vy := that.Tombstones[i] + if p, q := vx, vy; p != q { + if p == nil { + p = &v1.Tombstones{} + } + if q == nil { + q = &v1.Tombstones{} + } + if equal, ok := interface{}(p).(interface{ EqualVT(*v1.Tombstones) bool }); ok { + if !equal.EqualVT(q) { + return false + } + } else if !proto.Equal(p, q) { + return false + } + } + } + return string(this.unknownFields) == string(that.unknownFields) +} + +func (this *TruncateIndexRequest) EqualMessageVT(thatMsg proto.Message) bool { + that, ok := thatMsg.(*TruncateIndexRequest) + if !ok { + return false + } + return this.EqualVT(that) +} +func (this *TruncateIndexResponse) EqualVT(that *TruncateIndexResponse) bool { + if this == that { + return true + } else if this == nil || that == nil { + return false + } + return string(this.unknownFields) == string(that.unknownFields) +} + +func (this *TruncateIndexResponse) EqualMessageVT(thatMsg proto.Message) bool { + that, ok := thatMsg.(*TruncateIndexResponse) + if !ok { + return false + } + return this.EqualVT(that) +} func (m *AddBlockMetadataRequest) MarshalVT() (dAtA []byte, err error) { if m == nil { return nil, nil @@ -1665,6 +1765,101 @@ func (m *UpdateCompactionPlanResponse) MarshalToSizedBufferVT(dAtA []byte) (int, return len(dAtA) - i, nil } +func (m *TruncateIndexRequest) MarshalVT() (dAtA []byte, err error) { + if m == nil { + return nil, nil + } + size := m.SizeVT() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBufferVT(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *TruncateIndexRequest) MarshalToVT(dAtA []byte) (int, error) { + size := m.SizeVT() + return m.MarshalToSizedBufferVT(dAtA[:size]) +} + +func (m *TruncateIndexRequest) MarshalToSizedBufferVT(dAtA []byte) (int, error) { + if m == nil { + return 0, nil + } + i := len(dAtA) + _ = i + var l int + _ = l + if m.unknownFields != nil { + i -= len(m.unknownFields) + copy(dAtA[i:], m.unknownFields) + } + if len(m.Tombstones) > 0 { + for iNdEx := len(m.Tombstones) - 1; iNdEx >= 0; iNdEx-- { + if vtmsg, ok := interface{}(m.Tombstones[iNdEx]).(interface { + MarshalToSizedBufferVT([]byte) (int, error) + }); ok { + size, err := vtmsg.MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = protohelpers.EncodeVarint(dAtA, i, uint64(size)) + } else { + encoded, err := proto.Marshal(m.Tombstones[iNdEx]) + if err != nil { + return 0, err + } + i -= len(encoded) + copy(dAtA[i:], encoded) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(encoded))) + } + i-- + dAtA[i] = 0x12 + } + } + if m.Term != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Term)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func (m *TruncateIndexResponse) MarshalVT() (dAtA []byte, err error) { + if m == nil { + return nil, nil + } + size := m.SizeVT() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBufferVT(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *TruncateIndexResponse) MarshalToVT(dAtA []byte) (int, error) { + size := m.SizeVT() + return m.MarshalToSizedBufferVT(dAtA[:size]) +} + +func (m *TruncateIndexResponse) MarshalToSizedBufferVT(dAtA []byte) (int, error) { + if m == nil { + return 0, nil + } + i := len(dAtA) + _ = i + var l int + _ = l + if m.unknownFields != nil { + i -= len(m.unknownFields) + copy(dAtA[i:], m.unknownFields) + } + return len(dAtA) - i, nil +} + func (m *AddBlockMetadataRequest) SizeVT() (n int) { if m == nil { return 0 @@ -1984,6 +2179,41 @@ func (m *UpdateCompactionPlanResponse) SizeVT() (n int) { return n } +func (m *TruncateIndexRequest) SizeVT() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Term != 0 { + n += 1 + protohelpers.SizeOfVarint(uint64(m.Term)) + } + if len(m.Tombstones) > 0 { + for _, e := range m.Tombstones { + if size, ok := interface{}(e).(interface { + SizeVT() int + }); ok { + l = size.SizeVT() + } else { + l = proto.Size(e) + } + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + } + n += len(m.unknownFields) + return n +} + +func (m *TruncateIndexResponse) SizeVT() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + n += len(m.unknownFields) + return n +} + func (m *AddBlockMetadataRequest) UnmarshalVT(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -3850,3 +4080,166 @@ func (m *UpdateCompactionPlanResponse) UnmarshalVT(dAtA []byte) error { } return nil } +func (m *TruncateIndexRequest) UnmarshalVT(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: TruncateIndexRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: TruncateIndexRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Term", wireType) + } + m.Term = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Term |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Tombstones", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Tombstones = append(m.Tombstones, &v1.Tombstones{}) + if unmarshal, ok := interface{}(m.Tombstones[len(m.Tombstones)-1]).(interface { + UnmarshalVT([]byte) error + }); ok { + if err := unmarshal.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + } else { + if err := proto.Unmarshal(dAtA[iNdEx:postIndex], m.Tombstones[len(m.Tombstones)-1]); err != nil { + return err + } + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := protohelpers.Skip(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return protohelpers.ErrInvalidLength + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *TruncateIndexResponse) UnmarshalVT(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: TruncateIndexResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: TruncateIndexResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := protohelpers.Skip(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return protohelpers.ErrInvalidLength + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} diff --git a/api/metastore/v1/compactor.proto b/api/metastore/v1/compactor.proto index becade9c25..c59a9dbe46 100644 --- a/api/metastore/v1/compactor.proto +++ b/api/metastore/v1/compactor.proto @@ -32,10 +32,7 @@ message CompactionJob { // Tombstones represent objects removed from the index but still stored. message Tombstones { BlockTombstones blocks = 1; - // For now, we only have block tombstones created due to the - // compaction process. Later, we may add more types of tombstones, - // e.g, deleted tenant (shard), partition, dataset, series etc. - // Exactly one member of Tombstones should be present. + PartitionTombstone partition = 2; } message BlockTombstones { @@ -46,6 +43,15 @@ message BlockTombstones { repeated string blocks = 5; } +message PartitionTombstone { + string name = 1; + // Lower time boundary. Unix epoch in nanoseconds. + int64 timestamp = 2; + int64 duration = 3; + uint32 shard = 4; + string tenant = 5; +} + message CompactionJobAssignment { string name = 1; uint64 token = 2; diff --git a/api/metastore/v1/raft_log/raft_log.proto b/api/metastore/v1/raft_log/raft_log.proto index 67ac279b93..27afb60493 100644 --- a/api/metastore/v1/raft_log/raft_log.proto +++ b/api/metastore/v1/raft_log/raft_log.proto @@ -10,6 +10,7 @@ enum RaftCommand { RAFT_COMMAND_ADD_BLOCK_METADATA = 1; RAFT_COMMAND_GET_COMPACTION_PLAN_UPDATE = 2; RAFT_COMMAND_UPDATE_COMPACTION_PLAN = 3; + RAFT_COMMAND_TRUNCATE_INDEX = 4; } message AddBlockMetadataRequest { @@ -108,3 +109,10 @@ message UpdateCompactionPlanRequest { message UpdateCompactionPlanResponse { CompactionPlanUpdate plan_update = 1; } + +message TruncateIndexRequest { + uint64 term = 1; + repeated metastore.v1.Tombstones tombstones = 2; +} + +message TruncateIndexResponse {} diff --git a/api/openapiv2/gen/phlare.swagger.json b/api/openapiv2/gen/phlare.swagger.json index 743df295e6..49afa94837 100644 --- a/api/openapiv2/gen/phlare.swagger.json +++ b/api/openapiv2/gen/phlare.swagger.json @@ -1641,6 +1641,30 @@ } } }, + "v1PartitionTombstone": { + "type": "object", + "properties": { + "name": { + "type": "string" + }, + "timestamp": { + "type": "string", + "format": "int64", + "description": "Lower time boundary. Unix epoch in nanoseconds." + }, + "duration": { + "type": "string", + "format": "int64" + }, + "shard": { + "type": "integer", + "format": "int64" + }, + "tenant": { + "type": "string" + } + } + }, "v1Point": { "type": "object", "properties": { @@ -2391,8 +2415,10 @@ "type": "object", "properties": { "blocks": { - "$ref": "#/definitions/v1BlockTombstones", - "description": "For now, we only have block tombstones created due to the\n compaction process. Later, we may add more types of tombstones,\n e.g, deleted tenant (shard), partition, dataset, series etc.\n Exactly one member of Tombstones should be present." + "$ref": "#/definitions/v1BlockTombstones" + }, + "partition": { + "$ref": "#/definitions/v1PartitionTombstone" } }, "description": "Tombstones represent objects removed from the index but still stored." diff --git a/pkg/experiment/ingester/segment_test.go b/pkg/experiment/ingester/segment_test.go index 9e6f14f7fa..36a2325652 100644 --- a/pkg/experiment/ingester/segment_test.go +++ b/pkg/experiment/ingester/segment_test.go @@ -386,7 +386,7 @@ func TestDLQRecoveryMock(t *testing.T) { recoveredMetas <- meta }). Return(&metastorev1.AddBlockResponse{}, nil) - recovery := dlq.NewRecovery(test.NewTestingLogger(t), dlq.RecoveryConfig{ + recovery := dlq.NewRecovery(test.NewTestingLogger(t), dlq.Config{ Period: 100 * time.Millisecond, }, srv, sw.bucket) recovery.Start() diff --git a/pkg/experiment/metastore/compaction_raft_handler.go b/pkg/experiment/metastore/compaction_raft_handler.go index 5fb52ec0cd..5d6b11ebff 100644 --- a/pkg/experiment/metastore/compaction_raft_handler.go +++ b/pkg/experiment/metastore/compaction_raft_handler.go @@ -15,18 +15,13 @@ type IndexReplacer interface { ReplaceBlocks(*bbolt.Tx, *metastorev1.CompactedBlocks) error } -type TombstoneDeleter interface { - DeleteTombstones(*bbolt.Tx, *raft.Log, ...*metastorev1.Tombstones) error - AddTombstones(*bbolt.Tx, *raft.Log, *metastorev1.Tombstones) error -} - type CompactionCommandHandler struct { logger log.Logger index IndexReplacer compactor compaction.Compactor planner compaction.Planner scheduler compaction.Scheduler - tombstones TombstoneDeleter + tombstones Tombstones } func NewCompactionCommandHandler( @@ -35,7 +30,7 @@ func NewCompactionCommandHandler( compactor compaction.Compactor, planner compaction.Planner, scheduler compaction.Scheduler, - tombstones TombstoneDeleter, + tombstones Tombstones, ) *CompactionCommandHandler { return &CompactionCommandHandler{ logger: logger, diff --git a/pkg/experiment/metastore/compaction_service.go b/pkg/experiment/metastore/compaction_service.go index df15ad4f9b..4a5bfaf6f1 100644 --- a/pkg/experiment/metastore/compaction_service.go +++ b/pkg/experiment/metastore/compaction_service.go @@ -12,6 +12,7 @@ import ( metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1/raft_log" "github.com/grafana/pyroscope/pkg/experiment/metastore/fsm" + "github.com/grafana/pyroscope/pkg/experiment/metastore/raftnode" ) type CompactionService struct { @@ -78,7 +79,9 @@ func (svc *CompactionService) PollCompactionJobs( cmd := fsm.RaftLogEntryType(raft_log.RaftCommand_RAFT_COMMAND_GET_COMPACTION_PLAN_UPDATE) resp, err := svc.raft.Propose(cmd, req) if err != nil { - level.Error(svc.logger).Log("msg", "failed to prepare compaction plan", "err", err) + if !raftnode.IsRaftLeadershipError(err) { + level.Error(svc.logger).Log("msg", "failed to prepare compaction plan", "err", err) + } return nil, err } prepared := resp.(*raft_log.GetCompactionPlanUpdateResponse) @@ -141,7 +144,9 @@ func (svc *CompactionService) PollCompactionJobs( // empty response would indicate that the plan is rejected. proposal := &raft_log.UpdateCompactionPlanRequest{Term: prepared.Term, PlanUpdate: planUpdate} if resp, err = svc.raft.Propose(cmd, proposal); err != nil { - level.Error(svc.logger).Log("msg", "failed to update compaction plan", "err", err) + if !raftnode.IsRaftLeadershipError(err) { + level.Error(svc.logger).Log("msg", "failed to update compaction plan", "err", err) + } return nil, err } accepted := resp.(*raft_log.UpdateCompactionPlanResponse).GetPlanUpdate() diff --git a/pkg/experiment/metastore/dlq/recovery.go b/pkg/experiment/metastore/dlq/recovery.go index 39e660d4e2..25b26c859f 100644 --- a/pkg/experiment/metastore/dlq/recovery.go +++ b/pkg/experiment/metastore/dlq/recovery.go @@ -19,30 +19,30 @@ import ( "github.com/grafana/pyroscope/pkg/experiment/metastore/raftnode" ) -type RecoveryConfig struct { +type Config struct { Period time.Duration `yaml:"dlq_recovery_check_interval"` } -func (c *RecoveryConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { +func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.DurationVar(&c.Period, prefix+"dlq-recovery-check-interval", 15*time.Second, "Dead Letter Queue check interval.") } -type LocalServer interface { +type Metastore interface { AddRecoveredBlock(context.Context, *metastorev1.AddBlockRequest) (*metastorev1.AddBlockResponse, error) } type Recovery struct { - config RecoveryConfig + config Config logger log.Logger - metastore LocalServer + metastore Metastore bucket objstore.Bucket - m sync.Mutex started bool cancel func() + m sync.Mutex } -func NewRecovery(logger log.Logger, config RecoveryConfig, metastore LocalServer, bucket objstore.Bucket) *Recovery { +func NewRecovery(logger log.Logger, config Config, metastore Metastore, bucket objstore.Bucket) *Recovery { return &Recovery{ config: config, logger: logger, @@ -72,7 +72,9 @@ func (r *Recovery) Stop() { r.logger.Log("msg", "recovery already stopped") return } - r.cancel() + if r.cancel != nil { + r.cancel() + } r.started = false r.logger.Log("msg", "recovery stopped") } diff --git a/pkg/experiment/metastore/dlq/recovery_test.go b/pkg/experiment/metastore/dlq/recovery_test.go index 7bad4f2c9c..3280e5e493 100644 --- a/pkg/experiment/metastore/dlq/recovery_test.go +++ b/pkg/experiment/metastore/dlq/recovery_test.go @@ -51,7 +51,7 @@ func TestRecoverTick(t *testing.T) { addMeta(bucket, meta) } - r := NewRecovery(test.NewTestingLogger(t), RecoveryConfig{}, srv, bucket) + r := NewRecovery(test.NewTestingLogger(t), Config{}, srv, bucket) r.recoverTick(context.Background()) expected := []*metastorev1.BlockMeta{ @@ -89,7 +89,7 @@ func TestNotRaftLeader(t *testing.T) { addMeta(bucket, meta) } - r := NewRecovery(test.NewTestingLogger(t), RecoveryConfig{}, srv, bucket) + r := NewRecovery(test.NewTestingLogger(t), Config{}, srv, bucket) r.recoverTick(context.Background()) assert.Equal(t, 1, len(bucket.Objects())) @@ -129,7 +129,7 @@ func TestStartStop(t *testing.T) { addMeta(bucket, meta) } - r := NewRecovery(test.NewTestingLogger(t), RecoveryConfig{Period: time.Millisecond * 10}, srv, bucket) + r := NewRecovery(test.NewTestingLogger(t), Config{Period: time.Millisecond * 10}, srv, bucket) r.Start() defer r.Stop() diff --git a/pkg/experiment/metastore/index/index.go b/pkg/experiment/metastore/index/index.go index d41dbb016a..934ff650fe 100644 --- a/pkg/experiment/metastore/index/index.go +++ b/pkg/experiment/metastore/index/index.go @@ -3,6 +3,7 @@ package index import ( "flag" "fmt" + goiter "iter" "math" "slices" "strings" @@ -66,7 +67,9 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { type Store interface { CreateBuckets(*bbolt.Tx) error ListPartitions(*bbolt.Tx) ([]*store.Partition, error) - LoadShard(*bbolt.Tx, store.PartitionKey, string, uint32) (*store.Shard, error) + DeletePartition(tx *bbolt.Tx, p store.PartitionKey) error + LoadShard(tx *bbolt.Tx, p store.PartitionKey, tenant string, shard uint32) (*store.Shard, error) + DeleteShard(tx *bbolt.Tx, p store.PartitionKey, tenant string, shard uint32) error } type Index struct { @@ -212,6 +215,34 @@ func (i *Index) GetBlocks(tx *bbolt.Tx, list *metastorev1.BlockList) ([]*metasto return metas, nil } +func (i *Index) Partitions() goiter.Seq[*store.Partition] { + return func(yield func(*store.Partition) bool) { + i.mu.RLock() + defer i.mu.RUnlock() + for _, p := range i.partitions { + if !yield(p) { + return + } + } + } +} + +func (i *Index) DeletePartition(tx *bbolt.Tx, key store.PartitionKey, tenant string, shard uint32) error { + i.mu.Lock() + defer i.mu.Unlock() + p := i.getPartition(key) + p.DeleteTenantShard(tenant, shard) + i.shards.delete(key, tenant, shard) + if err := i.store.DeleteShard(tx, key, tenant, shard); err != nil { + return err + } + if !p.IsEmpty() { + return nil + } + i.deletePartition(key) + return i.store.DeletePartition(tx, key) +} + func (i *Index) GetTenantStats(tenant string) *metastorev1.TenantStats { stats := &metastorev1.TenantStats{ DataIngested: false, @@ -295,6 +326,12 @@ func (i *Index) getPartition(key store.PartitionKey) *store.Partition { return nil } +func (i *Index) deletePartition(key store.PartitionKey) { + i.partitions = slices.DeleteFunc(i.partitions, func(p *store.Partition) bool { + return p.Key.Equal(key) + }) +} + func (i *Index) partitionedList(list *metastorev1.BlockList) map[store.PartitionKey]*metastorev1.BlockList { partitions := make(map[store.PartitionKey]*metastorev1.BlockList) for _, b := range list.Blocks { diff --git a/pkg/experiment/metastore/index/index_cache.go b/pkg/experiment/metastore/index/index_cache.go index 8199d26409..c931314c88 100644 --- a/pkg/experiment/metastore/index/index_cache.go +++ b/pkg/experiment/metastore/index/index_cache.go @@ -64,6 +64,15 @@ func (c *shardCache) put(s *indexShard) { c.shards.Add(k, s) } +func (c *shardCache) delete(p store.PartitionKey, tenant string, shard uint32) { + k := shardCacheKey{ + partition: p, + tenant: tenant, + shard: shard, + } + c.shards.Remove(k) +} + func newBlockCache(rcs, wcs int) *blockCache { reads, _ := lru.New2Q[blockCacheKey, *metastorev1.BlockMeta](rcs) write, _ := lru.New[blockCacheKey, *metastorev1.BlockMeta](wcs) diff --git a/pkg/experiment/metastore/index/store/index_store.go b/pkg/experiment/metastore/index/store/index_store.go index 7ce8381f06..734c03875a 100644 --- a/pkg/experiment/metastore/index/store/index_store.go +++ b/pkg/experiment/metastore/index/store/index_store.go @@ -5,6 +5,8 @@ import ( "encoding/binary" "errors" "fmt" + "strconv" + "strings" "time" "go.etcd.io/bbolt" @@ -137,6 +139,28 @@ func (m *IndexStore) LoadShard(tx *bbolt.Tx, p PartitionKey, tenant string, shar return s, nil } +func (m *IndexStore) DeleteShard(tx *bbolt.Tx, p PartitionKey, tenant string, shard uint32) error { + if partition := getPartitionsBucket(tx).Bucket(p.Bytes()); partition != nil { + if shards := partition.Bucket(tenantBucketName(tenant)); shards != nil { + if err := shards.DeleteBucket(binary.BigEndian.AppendUint32(nil, shard)); err != nil { + if !errors.Is(err, bbolt.ErrBucketNotFound) { + return err + } + } + } + } + return nil +} + +func (m *IndexStore) DeletePartition(tx *bbolt.Tx, p PartitionKey) error { + if err := getPartitionsBucket(tx).DeleteBucket(p.Bytes()); err != nil { + if !errors.Is(err, bbolt.ErrBucketNotFound) { + return err + } + } + return nil +} + func (m *IndexStore) loadTenantShard(tx *bbolt.Tx, p PartitionKey, tenant string, shard uint32) (*Shard, error) { tenantShard := getTenantShard(tx, p, tenant, shard) if tenantShard == nil { @@ -269,6 +293,18 @@ func (s *Shard) Delete(tx *bbolt.Tx, blocks ...string) error { return nil } +func (s *Shard) TombstoneName() string { + var b strings.Builder + b.WriteString(s.Partition.String()) + b.WriteByte('-') + b.WriteByte('T') + b.WriteString(s.Tenant) + b.WriteByte('-') + b.WriteByte('S') + b.WriteString(strconv.FormatUint(uint64(s.Shard), 10)) + return b.String() +} + // ShallowCopy creates a shallow copy: no deep copy of the string table. // The copy can be accessed safely by multiple readers, and it represents // a snapshot of the string table at the time of the copy. diff --git a/pkg/experiment/metastore/index/store/partition.go b/pkg/experiment/metastore/index/store/partition.go index ca7fec6bcc..6c1e2c6c30 100644 --- a/pkg/experiment/metastore/index/store/partition.go +++ b/pkg/experiment/metastore/index/store/partition.go @@ -1,8 +1,10 @@ package store import ( + "cmp" "encoding/binary" "errors" + "slices" "time" ) @@ -64,15 +66,49 @@ func (p *Partition) Compare(other *Partition) int { return p.Key.Timestamp.Compare(other.Key.Timestamp) } +func (p *Partition) Shards(dst []Shard) []Shard { + dst = dst[:0] + for tenant, shards := range p.TenantShards { + for shard := range shards { + dst = append(dst, Shard{ + Partition: p.Key, + Tenant: tenant, + Shard: shard, + }) + } + } + slices.SortFunc(dst, func(a, b Shard) int { + t := cmp.Compare(a.Tenant, b.Tenant) + if t != 0 { + return cmp.Compare(a.Shard, b.Shard) + } + return t + }) + return dst +} + +func (p *Partition) DeleteTenantShard(tenant string, shard uint32) { + if t := p.TenantShards[tenant]; t != nil { + delete(t, shard) + if len(t) == 0 { + delete(p.TenantShards, tenant) + } + } +} + +func (p *Partition) IsEmpty() bool { + return len(p.TenantShards) == 0 +} + func NewPartitionKey(timestamp time.Time, duration time.Duration) PartitionKey { return PartitionKey{Timestamp: timestamp.Truncate(duration), Duration: duration} } -func (k PartitionKey) Equal(x PartitionKey) bool { +func (k *PartitionKey) Equal(x PartitionKey) bool { return k.Timestamp.Equal(x.Timestamp) && k.Duration == x.Duration } -func (k PartitionKey) MarshalBinary() ([]byte, error) { +func (k *PartitionKey) MarshalBinary() ([]byte, error) { b := make([]byte, 12) binary.BigEndian.PutUint64(b[0:8], uint64(k.Timestamp.UnixNano())) binary.BigEndian.PutUint32(b[8:12], uint32(k.Duration/time.Second)) @@ -88,12 +124,12 @@ func (k *PartitionKey) UnmarshalBinary(b []byte) error { return nil } -func (k PartitionKey) Bytes() []byte { +func (k *PartitionKey) Bytes() []byte { b, _ := k.MarshalBinary() return b } -func (k PartitionKey) String() string { +func (k *PartitionKey) String() string { b := make([]byte, 0, 32) b = k.Timestamp.UTC().AppendFormat(b, time.DateTime) b = append(b, ' ') diff --git a/pkg/experiment/metastore/index_raft_handler.go b/pkg/experiment/metastore/index_raft_handler.go index e837dbb6d2..63e5779aa6 100644 --- a/pkg/experiment/metastore/index_raft_handler.go +++ b/pkg/experiment/metastore/index_raft_handler.go @@ -1,6 +1,8 @@ package metastore import ( + "time" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/hashicorp/raft" @@ -8,28 +10,41 @@ import ( "go.etcd.io/bbolt" metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" + "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1/raft_log" "github.com/grafana/pyroscope/pkg/experiment/metastore/compaction" "github.com/grafana/pyroscope/pkg/experiment/metastore/index" + indexstore "github.com/grafana/pyroscope/pkg/experiment/metastore/index/store" ) -type Index interface { +type IndexInserter interface { InsertBlock(*bbolt.Tx, *metastorev1.BlockMeta) error } +type IndexDeleter interface { + DeletePartition(tx *bbolt.Tx, partition indexstore.PartitionKey, tenant string, shard uint32) error +} + +type IndexWriter interface { + IndexInserter + IndexDeleter +} + type Tombstones interface { + AddTombstones(*bbolt.Tx, *raft.Log, *metastorev1.Tombstones) error + DeleteTombstones(*bbolt.Tx, *raft.Log, ...*metastorev1.Tombstones) error Exists(tenant string, shard uint32, block string) bool } type IndexCommandHandler struct { logger log.Logger - index Index + index IndexWriter tombstones Tombstones compactor compaction.Compactor } func NewIndexCommandHandler( logger log.Logger, - index Index, + index IndexWriter, tombstones Tombstones, compactor compaction.Compactor, ) *IndexCommandHandler { @@ -61,3 +76,33 @@ func (m *IndexCommandHandler) AddBlock(tx *bbolt.Tx, cmd *raft.Log, req *metasto } return new(metastorev1.AddBlockResponse), nil } + +func (m *IndexCommandHandler) TruncateIndex(tx *bbolt.Tx, cmd *raft.Log, req *raft_log.TruncateIndexRequest) (*raft_log.TruncateIndexResponse, error) { + if req.Term != cmd.Term { + level.Warn(m.logger).Log( + "msg", "rejecting index truncation request", + "current_term", cmd.Term, + "request_term", req.Term, + ) + return new(raft_log.TruncateIndexResponse), nil + } + for _, tombstone := range req.Tombstones { + // Although it's not strictly necessary, we may pass any tombstones + // to TruncateIndex, and the Partition member may be missing. + if p := tombstone.Partition; p != nil { + pk := indexstore.PartitionKey{ + Timestamp: time.Unix(0, p.Timestamp), + Duration: time.Duration(p.Duration), + } + if err := m.index.DeletePartition(tx, pk, p.Tenant, p.Shard); err != nil { + level.Error(m.logger).Log("msg", "failed to delete partition", "err", err) + return nil, err + } + } + if err := m.tombstones.AddTombstones(tx, cmd, tombstone); err != nil { + level.Error(m.logger).Log("msg", "failed to add partition tombstone", "err", err) + return nil, err + } + } + return new(raft_log.TruncateIndexResponse), nil +} diff --git a/pkg/experiment/metastore/index_service.go b/pkg/experiment/metastore/index_service.go index f2f9848c23..3e98c831ce 100644 --- a/pkg/experiment/metastore/index_service.go +++ b/pkg/experiment/metastore/index_service.go @@ -2,6 +2,8 @@ package metastore import ( "context" + goiter "iter" + "time" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -14,6 +16,7 @@ import ( "github.com/grafana/pyroscope/pkg/experiment/block/metadata" placement "github.com/grafana/pyroscope/pkg/experiment/distributor/placement/adaptive_placement" "github.com/grafana/pyroscope/pkg/experiment/metastore/fsm" + indexstore "github.com/grafana/pyroscope/pkg/experiment/metastore/index/store" "github.com/grafana/pyroscope/pkg/experiment/metastore/raftnode" "github.com/grafana/pyroscope/pkg/iter" ) @@ -26,11 +29,20 @@ type IndexBlockFinder interface { GetBlocks(*bbolt.Tx, *metastorev1.BlockList) ([]*metastorev1.BlockMeta, error) } +type IndexPartitionLister interface { + Partitions() goiter.Seq[*indexstore.Partition] +} + +type IndexReader interface { + IndexBlockFinder + IndexPartitionLister +} + func NewIndexService( logger log.Logger, raft Raft, state State, - index IndexBlockFinder, + index IndexReader, stats PlacementStats, ) *IndexService { return &IndexService{ @@ -48,7 +60,7 @@ type IndexService struct { logger log.Logger raft Raft state State - index IndexBlockFinder + index IndexReader stats PlacementStats } @@ -84,7 +96,9 @@ func (svc *IndexService) addBlockMetadata( &raft_log.AddBlockMetadataRequest{Metadata: req.Block}, ) if err != nil { - level.Error(svc.logger).Log("msg", "failed to add block", "block", req.Block.Id, "err", err) + if !raftnode.IsRaftLeadershipError(err) { + level.Error(svc.logger).Log("msg", "failed to add block", "block", req.Block.Id, "err", err) + } return nil, err } return new(metastorev1.AddBlockResponse), nil @@ -111,6 +125,60 @@ func (svc *IndexService) getBlockMetadata(tx *bbolt.Tx, list *metastorev1.BlockL return &metastorev1.GetBlockMetadataResponse{Blocks: found}, nil } +func (svc *IndexService) TruncatePartitions(ctx context.Context, before time.Time, max int) error { + req := &raft_log.TruncateIndexRequest{Tombstones: make([]*metastorev1.Tombstones, 0, max)} + read := func(_ *bbolt.Tx, r raftnode.ReadIndex) { + req.Term = r.Term // See "ABA problem". + svc.createPartitionTombstones(before, req) + } + if readErr := svc.state.ConsistentRead(ctx, read); readErr != nil { + return status.Error(codes.Unavailable, readErr.Error()) + } + if len(req.Tombstones) == 0 { + return nil + } + if _, err := svc.raft.Propose(fsm.RaftLogEntryType(raft_log.RaftCommand_RAFT_COMMAND_TRUNCATE_INDEX), req); err != nil { + if !raftnode.IsRaftLeadershipError(err) { + level.Error(svc.logger).Log("msg", "failed to truncate index", "err", err) + } + return err + } + return nil +} + +func (svc *IndexService) createPartitionTombstones(before time.Time, req *raft_log.TruncateIndexRequest) { + m := cap(req.Tombstones) + var shards []indexstore.Shard + for p := range svc.index.Partitions() { + // We pick all partitions that ended before the given time; + // the partitions cannot contain blocks created after the EndTime. + // However, the blocks may contain data created after the EndTime: + // the boundary is determined by the max time delta allowed for + // compaction – retention policy should include the period. + // TODO(kolesnikovae): + // * We can rely on Max and Min time stored in shard index to + // delete based on the data time. + // * Tenant-level overrides. + if !p.EndTime().Before(before) { + break + } + shards = p.Shards(shards) + for _, shard := range shards { + if len(req.Tombstones) >= m { + break + } + req.Tombstones = append(req.Tombstones, &metastorev1.Tombstones{ + Partition: &metastorev1.PartitionTombstone{ + Name: shard.TombstoneName(), + Timestamp: shard.Partition.Timestamp.UnixNano(), + Shard: shard.Shard, + Tenant: shard.Tenant, + }, + }) + } + } +} + func statsFromMetadata(md *metastorev1.BlockMeta) iter.Iterator[placement.Sample] { return &sampleIterator{md: md} } diff --git a/pkg/experiment/metastore/metastore.go b/pkg/experiment/metastore/metastore.go index f2b193a3b0..7d8f75d147 100644 --- a/pkg/experiment/metastore/metastore.go +++ b/pkg/experiment/metastore/metastore.go @@ -26,20 +26,22 @@ import ( "github.com/grafana/pyroscope/pkg/experiment/metastore/index" raft "github.com/grafana/pyroscope/pkg/experiment/metastore/raftnode" "github.com/grafana/pyroscope/pkg/experiment/metastore/raftnode/raftnodepb" + "github.com/grafana/pyroscope/pkg/experiment/metastore/retention" "github.com/grafana/pyroscope/pkg/experiment/metastore/tombstones" "github.com/grafana/pyroscope/pkg/util/health" ) type Config struct { - Address string `yaml:"address"` - GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate with the metastore."` - MinReadyDuration time.Duration `yaml:"min_ready_duration" category:"advanced"` - Raft raft.Config `yaml:"raft"` - FSM fsm.Config `yaml:",inline" category:"advanced"` - Index index.Config `yaml:"index" category:"advanced"` - DLQRecovery dlq.RecoveryConfig `yaml:",inline" category:"advanced"` - Compactor compactor.Config `yaml:",inline" category:"advanced"` - Scheduler scheduler.Config `yaml:",inline" category:"advanced"` + Address string `yaml:"address"` + GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate with the metastore."` + MinReadyDuration time.Duration `yaml:"min_ready_duration" category:"advanced"` + Raft raft.Config `yaml:"raft"` + FSM fsm.Config `yaml:",inline" category:"advanced"` + Index index.Config `yaml:"index" category:"advanced"` + DLQRecovery dlq.Config `yaml:",inline" category:"advanced"` + Compactor compactor.Config `yaml:",inline" category:"advanced"` + Scheduler scheduler.Config `yaml:",inline" category:"advanced"` + Cleaner retention.Config `yaml:",inline" category:"advanced"` } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { @@ -79,6 +81,7 @@ type Metastore struct { bucket objstore.Bucket placement *placement.Manager dlqRecovery *dlq.Recovery + cleaner *retention.Cleaner index *index.Index indexHandler *IndexCommandHandler @@ -90,9 +93,10 @@ type Metastore struct { compactionHandler *CompactionCommandHandler compactionService *CompactionService - followerRead *raft.StateReader[*bbolt.Tx] - tenantService *TenantService - metadataService *MetadataQueryService + leaderRead *raft.StateReader[*bbolt.Tx] + followerRead *raft.StateReader[*bbolt.Tx] + tenantService *TenantService + queryService *QueryService readyOnce sync.Once readySince time.Time @@ -132,6 +136,9 @@ func New( fsm.RegisterRaftCommandHandler(m.fsm, fsm.RaftLogEntryType(raft_log.RaftCommand_RAFT_COMMAND_ADD_BLOCK_METADATA), m.indexHandler.AddBlock) + fsm.RegisterRaftCommandHandler(m.fsm, + fsm.RaftLogEntryType(raft_log.RaftCommand_RAFT_COMMAND_TRUNCATE_INDEX), + m.indexHandler.TruncateIndex) m.compactionHandler = NewCompactionCommandHandler(m.logger, m.index, m.compactor, m.compactor, m.scheduler, m.tombstones) fsm.RegisterRaftCommandHandler(m.fsm, @@ -151,25 +158,25 @@ func New( return nil, err } - // Create the read-only interface to the state. - // We're currently only using the Follower Read pattern, assuming that - // leader reads are done through the raft log. However, this should be - // optimized in the future to use the Leader Read pattern. + // Create the read-only interfaces to the state. m.followerRead = m.newFollowerReader(client, m.raft, m.fsm) + m.leaderRead = m.newLeaderReader(m.raft, m.fsm) // Services should be registered after FSM and Raft have been initialized. - // Services provide an interface to interact with the metastore. + // Services provide an interface to interact with the metastore components. m.compactionService = NewCompactionService(m.logger, m.raft) - m.indexService = NewIndexService(m.logger, m.raft, m.followerRead, m.index, m.placement) + m.indexService = NewIndexService(m.logger, m.raft, m.leaderRead, m.index, m.placement) m.tenantService = NewTenantService(m.logger, m.followerRead, m.index) - m.metadataService = NewMetadataQueryService(m.logger, m.followerRead, m.index) + m.queryService = NewQueryService(m.logger, m.followerRead, m.index) m.dlqRecovery = dlq.NewRecovery(logger, config.DLQRecovery, m.indexService, bucket) + m.cleaner = retention.NewCleaner(m.logger, m.config.Cleaner, m.indexService) // These are the services that only run on the raft leader. // Keep in mind that the node may not be the leader at the moment the // service is starting, so it should be able to handle conflicts. m.raft.RunOnLeader(m.dlqRecovery) m.raft.RunOnLeader(m.placement) + m.raft.RunOnLeader(m.cleaner) m.service = services.NewBasicService(m.starting, m.running, m.stopping) return m, nil @@ -219,7 +226,7 @@ func (m *Metastore) buildRaftNode() (err error) { func (m *Metastore) Register(server *grpc.Server) { metastorev1.RegisterIndexServiceServer(server, m.indexService) metastorev1.RegisterCompactionServiceServer(server, m.compactionService) - metastorev1.RegisterMetadataQueryServiceServer(server, m.metadataService) + metastorev1.RegisterMetadataQueryServiceServer(server, m.queryService) metastorev1.RegisterTenantServiceServer(server, m.tenantService) m.raft.Register(server) } diff --git a/pkg/experiment/metastore/metastore_raft.go b/pkg/experiment/metastore/metastore_raft.go index cdb5a6c67b..91e9395591 100644 --- a/pkg/experiment/metastore/metastore_raft.go +++ b/pkg/experiment/metastore/metastore_raft.go @@ -42,6 +42,22 @@ func (m *Metastore) newFollowerReader( ) } +// newLeaderReader creates a new leader reader – implementation of the +// Leader Read pattern. See raftnode.StateReader for details. +// The provided node is treated as the leader, attempt to read from +// the local node in follower state will fail. +func (m *Metastore) newLeaderReader( + node *raftnode.Node, + fsm *fsm.FSM, +) *raftnode.StateReader[*bbolt.Tx] { + return raftnode.NewStateReader[*bbolt.Tx]( + node, + &localNode{node: node, fsm: fsm}, + m.config.Raft.LogIndexCheckInterval, + m.config.Raft.ReadIndexMaxDistance, + ) +} + // leaderNode is an implementation of raftnode.Leader interface that // communicates with the leader using the RaftNode service client to // acquire its commit index (ReadIndex). diff --git a/pkg/experiment/metastore/query_service.go b/pkg/experiment/metastore/query_service.go index 803f0fd809..1982899514 100644 --- a/pkg/experiment/metastore/query_service.go +++ b/pkg/experiment/metastore/query_service.go @@ -22,7 +22,7 @@ type IndexQuerier interface { QueryMetadataLabels(*bbolt.Tx, index.MetadataQuery) ([]*typesv1.Labels, error) } -type MetadataQueryService struct { +type QueryService struct { metastorev1.MetadataQueryServiceServer logger log.Logger @@ -30,19 +30,19 @@ type MetadataQueryService struct { index IndexQuerier } -func NewMetadataQueryService( +func NewQueryService( logger log.Logger, state State, index IndexQuerier, -) *MetadataQueryService { - return &MetadataQueryService{ +) *QueryService { + return &QueryService{ logger: logger, state: state, index: index, } } -func (svc *MetadataQueryService) QueryMetadata( +func (svc *QueryService) QueryMetadata( ctx context.Context, req *metastorev1.QueryMetadataRequest, ) (resp *metastorev1.QueryMetadataResponse, err error) { @@ -55,7 +55,7 @@ func (svc *MetadataQueryService) QueryMetadata( return resp, err } -func (svc *MetadataQueryService) queryMetadata( +func (svc *QueryService) queryMetadata( _ context.Context, tx *bbolt.Tx, req *metastorev1.QueryMetadataRequest, @@ -78,7 +78,7 @@ func (svc *MetadataQueryService) queryMetadata( return nil, status.Error(codes.Internal, err.Error()) } -func (svc *MetadataQueryService) QueryMetadataLabels( +func (svc *QueryService) QueryMetadataLabels( ctx context.Context, req *metastorev1.QueryMetadataLabelsRequest, ) (resp *metastorev1.QueryMetadataLabelsResponse, err error) { @@ -91,7 +91,7 @@ func (svc *MetadataQueryService) QueryMetadataLabels( return resp, err } -func (svc *MetadataQueryService) queryMetadataLabels( +func (svc *QueryService) queryMetadataLabels( _ context.Context, tx *bbolt.Tx, req *metastorev1.QueryMetadataLabelsRequest, diff --git a/pkg/experiment/metastore/retention/cleaner.go b/pkg/experiment/metastore/retention/cleaner.go new file mode 100644 index 0000000000..5e46e92d8a --- /dev/null +++ b/pkg/experiment/metastore/retention/cleaner.go @@ -0,0 +1,97 @@ +package retention + +import ( + "context" + "flag" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" +) + +// The maximum number of partitions to delete in one go. +// A partition is qualified by partition key, tenant, and shard ID. +const maxTruncatePartitions = 32 + +type Index interface { + TruncatePartitions(ctx context.Context, before time.Time, max int) error +} + +type Config struct { + RetentionPeriod time.Duration `yaml:"retention_period"` + RetentionCheckInterval time.Duration `yaml:"retention_check_interval"` +} + +func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.DurationVar(&c.RetentionPeriod, prefix+"retention-period", 0, "Data older than this period will be deleted from the storage. 0 to disable.") + f.DurationVar(&c.RetentionCheckInterval, prefix+"retention-check-interval", time.Minute, "Interval for retention check. 0 to disable.") +} + +type Cleaner struct { + logger log.Logger + config Config + index Index + + started bool + cancel context.CancelFunc + m sync.Mutex +} + +func NewCleaner(logger log.Logger, config Config, index Index) *Cleaner { + return &Cleaner{ + logger: logger, + config: config, + index: index, + started: false, + cancel: nil, + m: sync.Mutex{}, + } +} + +func (c *Cleaner) Start() { + c.m.Lock() + defer c.m.Unlock() + if c.started { + c.logger.Log("msg", "index cleaner already started") + return + } + ctx, cancel := context.WithCancel(context.Background()) + c.cancel = cancel + c.started = true + go c.loop(ctx) + c.logger.Log("msg", "index cleaner started") +} + +func (c *Cleaner) Stop() { + c.m.Lock() + defer c.m.Unlock() + if !c.started { + c.logger.Log("msg", "index cleaner already stopped") + return + } + if c.cancel != nil { + c.cancel() + } + c.started = false + c.logger.Log("msg", "index cleaner stopped") +} + +func (c *Cleaner) loop(ctx context.Context) { + if c.config.RetentionCheckInterval == 0 || c.config.RetentionPeriod == 0 { + return + } + ticker := time.NewTicker(c.config.RetentionCheckInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + before := time.Now().Add(-c.config.RetentionPeriod) + if err := c.index.TruncatePartitions(ctx, before, maxTruncatePartitions); err != nil { + level.Error(c.logger).Log("msg", "failed to truncate partitions", "err", err) + } + } + } +} diff --git a/pkg/experiment/metastore/tenant_service.go b/pkg/experiment/metastore/tenant_service.go index b471a8c6f8..6ef07ac725 100644 --- a/pkg/experiment/metastore/tenant_service.go +++ b/pkg/experiment/metastore/tenant_service.go @@ -14,6 +14,8 @@ import ( type TenantIndex interface { GetTenantStats(tenant string) *metastorev1.TenantStats + // TODO(kolesnikovae): Refactor index.GetTenantStats to GetTenant handler. + // IndexPartitionLister } type TenantService struct { diff --git a/pkg/experiment/metastore/tombstones/tombstone_queue.go b/pkg/experiment/metastore/tombstones/tombstone_queue.go index 3a71ccc8cd..ed6f29edb1 100644 --- a/pkg/experiment/metastore/tombstones/tombstone_queue.go +++ b/pkg/experiment/metastore/tombstones/tombstone_queue.go @@ -12,8 +12,11 @@ type tombstoneQueue struct{ head, tail *tombstones } type tombstoneKey string func (k *tombstoneKey) set(t *metastorev1.Tombstones) bool { - if t.Blocks != nil { + switch { + case t.Blocks != nil: *k = tombstoneKey(t.Blocks.Name) + case t.Partition != nil: + *k = tombstoneKey(t.Partition.Name) } return len(*k) > 0 } @@ -25,7 +28,7 @@ type tombstones struct { func newTombstoneQueue() *tombstoneQueue { return &tombstoneQueue{} } -func (q *tombstoneQueue) push(e *tombstones) bool { +func (q *tombstoneQueue) push(e *tombstones) { if q.tail != nil { q.tail.next = e e.prev = q.tail @@ -33,7 +36,6 @@ func (q *tombstoneQueue) push(e *tombstones) bool { q.head = e } q.tail = e - return true } func (q *tombstoneQueue) delete(e *tombstones) *tombstones { diff --git a/pkg/experiment/metastore/tombstones/tombstones.go b/pkg/experiment/metastore/tombstones/tombstones.go index f260202c4a..19188885ab 100644 --- a/pkg/experiment/metastore/tombstones/tombstones.go +++ b/pkg/experiment/metastore/tombstones/tombstones.go @@ -105,13 +105,14 @@ func (x *Tombstones) put(k tombstoneKey, v store.TombstoneEntry) bool { } e := &tombstones{TombstoneEntry: v} x.tombstones[k] = e + x.queue.push(e) if v.Tombstones.Blocks != nil { - if x.queue.push(e) { - x.putBlockTombstones(v.Tombstones.Blocks) - return true - } + // Keep track of the blocks we enqueued. This is + // necessary to answer if the block has already + // been deleted (within a limited time window). + x.putBlockTombstones(v.Tombstones.Blocks) } - return false + return true } func (x *Tombstones) delete(k tombstoneKey) (t *tombstones) {