-
Notifications
You must be signed in to change notification settings - Fork 4.1k
Expand file tree
/
Copy pathChunkedSegmentSealedImpl.h
More file actions
1604 lines (1378 loc) · 57.5 KB
/
Copy pathChunkedSegmentSealedImpl.h
File metadata and controls
1604 lines (1378 loc) · 57.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include <folly/ExceptionWrapper.h>
#include <stdint.h>
#include <any>
#include <atomic>
#include <cstddef>
#include <functional>
#include <map>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <string_view>
#include <tuple>
#include <type_traits>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "ConcurrentVector.h"
#include "DeletedRecord.h"
#include "NamedType/underlying_functionalities.hpp"
#include "SealedIndexingRecord.h"
#include "SegmentSealed.h"
#include "cachinglayer/CacheSlot.h"
#include "cachinglayer/Utils.h"
#include "common/Array.h"
#include "common/ArrayOffsets.h"
#include "common/BitsetView.h"
#include "common/Chunk.h"
#include "common/EasyAssert.h"
#include "common/FieldMeta.h"
#include "common/IndexMeta.h"
#include "common/Json.h"
#include "common/LoadInfo.h"
#include "common/OpContext.h"
#include "common/QueryInfo.h"
#include "common/QueryResult.h"
#include "common/Schema.h"
#include "common/Span.h"
#include "common/SystemProperty.h"
#include "common/Tracer.h"
#include "common/Types.h"
#include "common/VectorArray.h"
#include "common/protobuf_utils.h"
#include "fmt/core.h"
#include "folly/FBVector.h"
#include "folly/Synchronized.h"
#include "google/protobuf/message.h"
#include "index/Index.h"
#include "index/NgramInvertedIndex.h"
#include "index/json_stats/JsonKeyStats.h"
#include "milvus-storage/column_groups.h"
#include "milvus-storage/common/metadata.h"
#include "milvus-storage/properties.h"
#include "milvus-storage/reader.h"
#include "mmap/ChunkedColumnInterface.h"
#include "mmap/Types.h"
#include "parquet/statistics.h"
#include "pb/common.pb.h"
#include "pb/index_cgo_msg.pb.h"
#include "pb/plan.pb.h"
#include "pb/segcore.pb.h"
#include "query/PlanImpl.h"
#include "segcore/IndexConfigGenerator.h"
#include "segcore/InsertRecord.h"
#include "segcore/SegcoreConfig.h"
#include "segcore/SegmentInterface.h"
#include "segcore/SegmentLoadInfo.h"
#include "segcore/Types.h"
#include "storage/MmapChunkManager.h"
#include "segcore/TextColumnCache.h"
namespace milvus::segcore {
namespace storagev1translator {
class InsertRecordTranslator;
}
namespace storagev2translator {
class TimestampIndexCell;
class PkIndexCell;
} // namespace storagev2translator
using namespace milvus::cachinglayer;
// Test-only accessor that pokes private members to simulate v2/v3 segment
// state (raw timestamp column emplaced into fields_ alongside an overwritten
// timestamp index). Defined in internal/core/unittest/test_commit_timestamp.cpp.
class CommitTimestampV2TestAccess;
class ChunkedSegmentSealedImpl : public SegmentSealed {
friend class CommitTimestampV2TestAccess;
public:
using ParquetStatistics = std::vector<std::shared_ptr<parquet::Statistics>>;
explicit ChunkedSegmentSealedImpl(SchemaPtr schema,
IndexMetaPtr index_meta,
const SegcoreConfig& segcore_config,
int64_t segment_id,
bool is_sorted_by_pk = false);
~ChunkedSegmentSealedImpl() override;
void
LoadIndex(LoadIndexInfo& info) override;
void
LoadFieldData(const LoadFieldDataInfo& info,
milvus::OpContext* op_ctx = nullptr) override;
void
LoadDeletedRecord(const LoadDeletedRecordInfo& info) override;
void
LoadSegmentMeta(
const milvus::proto::segcore::LoadSegmentMeta& segment_meta) override;
void
DropIndex(const FieldId field_id) override;
void
DropJSONIndex(const FieldId field_id,
const std::string& nested_path) override;
void
DropFieldData(const FieldId field_id) override;
bool
HasIndex(FieldId field_id) const override;
bool
HasJsonIndex(FieldId field_id) const override;
bool
HasFieldData(FieldId field_id) const override;
std::pair<std::shared_ptr<ChunkedColumnInterface>, bool>
GetFieldDataIfExist(FieldId field_id) const;
std::vector<PinWrapper<const index::IndexBase*>>
PinIndex(milvus::OpContext* op_ctx,
FieldId field_id,
bool include_ngram = false) const override {
auto [scalar_indexings, ngram_fields] =
lock(folly::rlock(scalar_indexings_), folly::rlock(ngram_fields_));
if (!include_ngram) {
if (ngram_fields->find(field_id) != ngram_fields->end()) {
return {};
}
}
auto iter = scalar_indexings->find(field_id);
if (iter == scalar_indexings->end()) {
return {};
}
auto ca = SemiInlineGet(iter->second->PinCells(op_ctx, {0}));
auto index = ca->get_cell_of(0);
return {PinWrapper<const index::IndexBase*>(std::move(ca), index)};
}
bool
Contain(const PkType& pk) const override;
void
AddFieldDataInfoForSealed(
const LoadFieldDataInfo& field_data_info) override;
int64_t
get_segment_id() const override {
return id_;
}
bool
HasRawData(int64_t field_id) const override;
// Returns true only if the index itself contains raw data,
// without considering whether field data is loaded.
bool
IndexHasRawData(FieldId field_id) const;
bool
CalcDistByIDs(FieldId field_id,
const knowhere::DataSetPtr& query_dataset,
const int64_t* seg_offsets,
size_t count,
bool is_cosine,
float* distances) const override {
return CalcDistByIDs(nullptr,
field_id,
query_dataset,
seg_offsets,
count,
is_cosine,
distances);
}
bool
CalcDistByIDs(milvus::OpContext* op_ctx,
FieldId field_id,
const knowhere::DataSetPtr& query_dataset,
const int64_t* seg_offsets,
size_t count,
bool is_cosine,
float* distances) const override;
bool
IsIndexRefineEnabled(FieldId field_id) const override {
return IsIndexRefineEnabled(nullptr, field_id);
}
bool
IsIndexRefineEnabled(milvus::OpContext* op_ctx,
FieldId field_id) const override;
DataType
GetFieldDataType(FieldId fieldId) const override;
void
RemoveFieldFile(const FieldId field_id) override;
void
CreateTextIndex(FieldId field_id,
milvus::OpContext* op_ctx = nullptr) override;
void
LoadTextIndex(milvus::OpContext* op_ctx,
std::shared_ptr<milvus::proto::indexcgo::LoadTextIndexInfo>
info_proto) override;
void
LoadJsonKeyIndex(
milvus::OpContext* op_ctx,
std::shared_ptr<milvus::proto::indexcgo::LoadJsonKeyIndexInfo>
info_proto);
void
LoadBatchJsonKeyIndexes(
milvus::OpContext* op_ctx,
const std::unordered_map<
FieldId,
std::shared_ptr<milvus::proto::indexcgo::LoadJsonKeyIndexInfo>>&
infos);
void
RemoveJsonStats(FieldId field_id) override {
std::unique_lock lck(mutex_);
json_stats_.erase(field_id);
}
void
LoadJsonStats(FieldId field_id,
std::shared_ptr<index::JsonKeyStats> stats) override {
std::unique_lock lck(mutex_);
json_stats_[field_id] = stats;
}
std::shared_ptr<index::JsonKeyStats>
GetJsonStats(milvus::OpContext* op_ctx, FieldId field_id) const override {
std::shared_lock lck(mutex_);
auto iter = json_stats_.find(field_id);
if (iter == json_stats_.end()) {
return nullptr;
}
return iter->second;
}
PinWrapper<index::NgramInvertedIndex*>
GetNgramIndex(milvus::OpContext* op_ctx, FieldId field_id) const override;
PinWrapper<index::NgramInvertedIndex*>
GetNgramIndexForJson(milvus::OpContext* op_ctx,
FieldId field_id,
const std::string& nested_path) const override;
std::shared_ptr<const IArrayOffsets>
GetArrayOffsets(FieldId field_id) const override {
auto it = array_offsets_map_.find(field_id);
if (it != array_offsets_map_.end()) {
return it->second;
}
return nullptr;
}
void
BulkGetJsonData(milvus::OpContext* op_ctx,
FieldId field_id,
const std::function<void(milvus::Json, size_t, bool)>& fn,
const int64_t* offsets,
int64_t count) const override {
auto column = fields_.rlock()->at(field_id);
column->BulkRawJsonAt(op_ctx, fn, offsets, count);
}
void
Reopen(SchemaPtr sch) override;
void
Reopen(
milvus::OpContext* op_ctx,
const milvus::proto::segcore::SegmentLoadInfo& new_load_info) override;
void
Reopen(milvus::OpContext* op_ctx,
const milvus::proto::segcore::SegmentLoadInfo& new_load_info,
SchemaPtr new_schema) override;
void
LazyCheckSchema(SchemaPtr sch, milvus::OpContext* op_ctx) override;
void
SetLoadInfo(milvus::proto::segcore::SegmentLoadInfo load_info) override;
void
SetCommitTimestamp(uint64_t ts) override;
uint64_t
GetCommitTimestamp() const override;
// When non-zero commit_ts_ is active, every row in this segment carries
// commit_ts_ as its effective row timestamp (load-time overwrite). Returns
// nullopt otherwise. All timestamp consumers — read_ts (search_batch_pks),
// bulk_subscript(Timestamp), mask_with_timestamps — must route through
// this so the override applies uniformly on v1 AND v2/v3 storage paths.
std::optional<Timestamp>
EffectiveCommitTs() const {
return commit_ts_ != 0 ? std::optional<Timestamp>{commit_ts_}
: std::nullopt;
}
void
Load(milvus::tracer::TraceContext& trace_ctx,
milvus::OpContext* op_ctx) override;
void
LoadManifest(const std::string& manifest_path);
public:
size_t
GetMemoryUsageInBytes() const override {
return stats_.mem_size.load() + deleted_record_.mem_size();
}
InsertRecord<true>&
get_insert_record() override {
return insert_record_;
}
int64_t
get_row_count() const override;
int64_t
get_deleted_count() const override;
Timestamp
get_max_timestamp() const override {
return insert_record_.timestamp_index_.get_max_timestamp();
}
const Schema&
get_schema() const override;
void
pk_range(milvus::OpContext* op_ctx,
proto::plan::OpType op,
const PkType& pk,
BitsetTypeView& bitset) const override;
void
search_sorted_pk_range(milvus::OpContext* op_ctx,
proto::plan::OpType op,
const PkType& pk,
BitsetTypeView& bitset) const;
void
pk_binary_range(milvus::OpContext* op_ctx,
const PkType& lower_pk,
bool lower_inclusive,
const PkType& upper_pk,
bool upper_inclusive,
BitsetTypeView& bitset) const override;
std::unique_ptr<DataArray>
get_vector(milvus::OpContext* op_ctx,
FieldId field_id,
const int64_t* ids,
int64_t count) const override;
std::unique_ptr<DataArray>
get_emb_list(milvus::OpContext* op_ctx,
FieldId field_id,
const FieldMeta& field_meta,
const int64_t* seg_offsets,
int64_t count) const;
bool
is_nullable(FieldId field_id) const override {
auto& field_meta = schema_->operator[](field_id);
return field_meta.is_nullable();
};
bool
is_chunked() const override {
return true;
}
void
search_pks(BitsetType& bitset, const std::vector<PkType>& pks) const;
void
search_batch_pks(
const std::vector<PkType>& pks,
const std::function<Timestamp(const size_t idx)>& get_timestamp,
bool include_same_ts,
const std::function<void(const SegOffset offset, const Timestamp ts)>&
callback) const;
public:
// Non-virtual helper called via dynamic_cast from SegmentInterface.
// Must be public for cross-class access.
bool
TryTakeForRetrieve(
const query::RetrievePlan* plan,
const std::unique_ptr<proto::segcore::RetrieveResults>& results,
const int64_t* offsets,
int64_t size,
bool ignore_non_pk,
bool fill_ids,
milvus::OpContext* op_ctx = nullptr) const;
// count of chunk that has raw data
int64_t
num_chunk_data(FieldId field_id) const override;
int64_t
num_chunk(FieldId field_id) const override;
// return size_per_chunk for each chunk, renaming against confusion
int64_t
size_per_chunk() const override;
int64_t
chunk_size(FieldId field_id, int64_t chunk_id) const override;
std::pair<int64_t, int64_t>
get_chunk_by_offset(FieldId field_id, int64_t offset) const override;
int64_t
num_rows_until_chunk(FieldId field_id, int64_t chunk_id) const override;
SegcoreError
Delete(int64_t size,
const IdArray* pks,
const Timestamp* timestamps) override;
std::pair<std::vector<OffsetMap::OffsetType>, bool>
find_first_n(int64_t limit, const BitsetTypeView& bitset) const override;
std::tuple<std::vector<int64_t>, std::vector<std::vector<int32_t>>, bool>
find_first_n_element(
int64_t limit,
const BitsetTypeView& element_bitset,
const IArrayOffsets* array_offsets,
const std::optional<QueryIteratorCursor>& cursor) const override;
// Calculate: output[i] = Vec[seg_offset[i]]
// where Vec is determined from field_offset
std::unique_ptr<DataArray>
bulk_subscript(milvus::OpContext* op_ctx,
FieldId field_id,
const int64_t* seg_offsets,
int64_t count) const override;
std::unique_ptr<DataArray>
bulk_subscript(
milvus::OpContext* op_ctx,
FieldId field_id,
const int64_t* seg_offsets,
int64_t count,
const std::vector<std::string>& dynamic_field_names) const override;
bool
is_mmap_field(FieldId id) const override;
void
ClearData() override;
bool
is_field_exist(FieldId field_id) const override {
return schema_->get_fields().find(field_id) !=
schema_->get_fields().end();
}
void
prefetch_chunks(milvus::OpContext* op_ctx,
FieldId field_id,
const std::vector<int64_t>& chunk_ids) const override;
protected:
// blob and row_count
PinWrapper<SpanBase>
chunk_data_impl(milvus::OpContext* op_ctx,
FieldId field_id,
int64_t chunk_id) const override;
PinWrapper<std::pair<std::vector<std::string_view>, FixedVector<bool>>>
chunk_string_view_impl(
milvus::OpContext* op_ctx,
FieldId field_id,
int64_t chunk_id,
std::optional<std::pair<int64_t, int64_t>> offset_len) const override;
PinWrapper<std::pair<std::vector<ArrayView>, FixedVector<bool>>>
chunk_array_view_impl(
milvus::OpContext* op_ctx,
FieldId field_id,
int64_t chunk_id,
std::optional<std::pair<int64_t, int64_t>> offset_len) const override;
PinWrapper<std::pair<std::vector<VectorArrayView>, FixedVector<bool>>>
chunk_vector_array_view_impl(
milvus::OpContext* op_ctx,
FieldId field_id,
int64_t chunk_id,
std::optional<std::pair<int64_t, int64_t>> offset_len) const override;
PinWrapper<std::pair<std::vector<std::string_view>, FixedVector<bool>>>
chunk_string_views_by_offsets(
milvus::OpContext* op_ctx,
FieldId field_id,
int64_t chunk_id,
const FixedVector<int32_t>& offsets) const override;
PinWrapper<std::pair<std::vector<ArrayView>, FixedVector<bool>>>
chunk_array_views_by_offsets(
milvus::OpContext* op_ctx,
FieldId field_id,
int64_t chunk_id,
const FixedVector<int32_t>& offsets) const override;
// Calculate: output[i] = Vec[seg_offset[i]],
// where Vec is determined from field_offset
void
bulk_subscript(milvus::OpContext* op_ctx,
SystemFieldType system_type,
const int64_t* seg_offsets,
int64_t count,
void* output) const override;
void
bulk_subscript(milvus::OpContext* op_ctx,
FieldId field_id,
DataType data_type,
const int64_t* seg_offsets,
int64_t count,
void* data,
TargetBitmap& valid_map,
bool small_int_raw_type = false) const override;
// Override to inject take() fast path for Search output fields.
// Uses existing vtable slot — no layout change.
void
FillTargetEntry(const query::Plan* plan,
SearchResult& results,
milvus::OpContext* op_ctx = nullptr) const override;
bool
TryTakeForSearch(const query::Plan* plan,
const int64_t* seg_offsets,
int64_t size,
SearchResult& results,
milvus::OpContext* op_ctx = nullptr) const;
// Shared helpers for TryTakeForRetrieve / TryTakeForSearch
struct TakeContext {
std::vector<int64_t> unique_offsets;
std::vector<int64_t> result_mapping; // orig_pos → unique index
};
static TakeContext
BuildTakeContext(const int64_t* offsets, int64_t size);
// Converts a combined Arrow array into a proto DataArray using
// result_mapping for reorder. Returns nullptr on unsupported type.
static std::unique_ptr<DataArray>
ArrowToDataArray(
const std::shared_ptr<arrow::Array>& arr,
const FieldMeta& field_meta,
const std::vector<int64_t>& result_mapping,
int64_t size,
const std::vector<std::string>* dynamic_field_names = nullptr,
const std::string* text_lob_path = nullptr);
// Calls reader_->take() with timing. Returns the table on success,
// or nullptr on failure (logs a warning). Checks op_ctx for cancellation
// before invoking reader_->take(), which can do remote reads.
std::shared_ptr<arrow::Table>
ExecuteTake(const std::vector<int64_t>& unique_offsets,
const std::shared_ptr<std::vector<std::string>>& needed_columns,
const char* caller_tag,
double& elapsed_ms,
milvus::OpContext* op_ctx = nullptr) const;
void
check_search(const query::Plan* plan) const override;
int64_t
get_active_count(Timestamp ts) const override;
const ConcurrentVector<Timestamp>&
get_timestamps() const override {
// Sealed segments no longer store timestamps in ConcurrentVector.
// Only growing segments use this method.
ThrowInfo(NotImplemented,
"sealed segment does not support get_timestamps()");
}
// Load Geometry cache for a field
void
LoadGeometryCache(FieldId field_id,
const std::shared_ptr<ChunkedColumnInterface>& column);
private:
void
load_system_field_internal(
FieldId field_id,
FieldDataInfo& data,
milvus::proto::common::LoadPriority load_priority);
// Initialize timestamp index with owned data (StorageV1 path)
void
init_storage_v1_timestamp_index(std::vector<Timestamp> timestamps,
size_t num_rows);
template <typename PK>
void
search_sorted_pk_range_impl(
proto::plan::OpType op,
const PK& target,
const std::shared_ptr<ChunkedColumnInterface>& pk_column,
BitsetTypeView& bitset) const {
const auto num_chunk = pk_column->num_chunks();
if (num_chunk == 0) {
return;
}
auto all_chunk_pins = pk_column->GetAllChunks(nullptr);
if (op == proto::plan::OpType::Equal) {
// find first occurrence
auto [chunk_id, in_chunk_offset, exact_match] =
this->pk_lower_bound<PK>(
target, pk_column.get(), all_chunk_pins, 0);
if (exact_match) {
// find last occurrence
auto [last_chunk_id, last_in_chunk_offset] =
this->find_last_pk_position<PK>(target,
pk_column.get(),
all_chunk_pins,
chunk_id,
in_chunk_offset);
auto start_idx =
pk_column->GetNumRowsUntilChunk(chunk_id) + in_chunk_offset;
auto end_idx = pk_column->GetNumRowsUntilChunk(last_chunk_id) +
last_in_chunk_offset;
bitset.set(start_idx, end_idx - start_idx + 1, true);
}
} else if (op == proto::plan::OpType::GreaterEqual ||
op == proto::plan::OpType::GreaterThan) {
auto [chunk_id, in_chunk_offset, exact_match] =
this->pk_lower_bound<PK>(
target, pk_column.get(), all_chunk_pins, 0);
if (chunk_id != -1) {
int64_t start_idx =
pk_column->GetNumRowsUntilChunk(chunk_id) + in_chunk_offset;
if (exact_match && op == proto::plan::OpType::GreaterThan) {
auto [last_chunk_id, last_in_chunk_offset] =
this->find_last_pk_position<PK>(target,
pk_column.get(),
all_chunk_pins,
chunk_id,
in_chunk_offset);
start_idx = pk_column->GetNumRowsUntilChunk(last_chunk_id) +
last_in_chunk_offset + 1;
}
if (start_idx < bitset.size()) {
bitset.set(start_idx, bitset.size() - start_idx, true);
}
}
} else if (op == proto::plan::OpType::LessEqual ||
op == proto::plan::OpType::LessThan) {
auto [chunk_id, in_chunk_offset, exact_match] =
this->pk_lower_bound<PK>(
target, pk_column.get(), all_chunk_pins, 0);
int64_t end_idx;
if (chunk_id == -1) {
end_idx = bitset.size();
} else if (op == proto::plan::OpType::LessEqual && exact_match) {
auto [last_chunk_id, last_in_chunk_offset] =
this->find_last_pk_position<PK>(target,
pk_column.get(),
all_chunk_pins,
chunk_id,
in_chunk_offset);
end_idx = pk_column->GetNumRowsUntilChunk(last_chunk_id) +
last_in_chunk_offset + 1;
} else {
end_idx =
pk_column->GetNumRowsUntilChunk(chunk_id) + in_chunk_offset;
}
if (end_idx > 0) {
bitset.set(0, end_idx, true);
}
} else {
ThrowInfo(ErrorCode::Unsupported,
fmt::format("unsupported op type {}", op));
}
}
template <typename PK>
void
search_sorted_pk_binary_range_impl(
const PK& lower_val,
bool lower_inclusive,
const PK& upper_val,
bool upper_inclusive,
const std::shared_ptr<ChunkedColumnInterface>& pk_column,
BitsetTypeView& bitset) const {
const auto num_chunk = pk_column->num_chunks();
if (num_chunk == 0) {
return;
}
auto all_chunk_pins = pk_column->GetAllChunks(nullptr);
// Find the lower bound position (first value >= lower_val or > lower_val)
auto [lower_chunk_id, lower_in_chunk_offset, lower_exact_match] =
this->pk_lower_bound<PK>(
lower_val, pk_column.get(), all_chunk_pins, 0);
int64_t start_idx = 0;
if (lower_chunk_id != -1) {
start_idx = pk_column->GetNumRowsUntilChunk(lower_chunk_id) +
lower_in_chunk_offset;
// If lower_inclusive is false and we found an exact match, skip all equal values
if (!lower_inclusive && lower_exact_match) {
auto [last_chunk_id, last_in_chunk_offset] =
this->find_last_pk_position<PK>(lower_val,
pk_column.get(),
all_chunk_pins,
lower_chunk_id,
lower_in_chunk_offset);
start_idx = pk_column->GetNumRowsUntilChunk(last_chunk_id) +
last_in_chunk_offset + 1;
}
} else {
// lower_val is greater than all values, no results
return;
}
// Find the upper bound position (first value >= upper_val or > upper_val)
auto [upper_chunk_id, upper_in_chunk_offset, upper_exact_match] =
this->pk_lower_bound<PK>(
upper_val, pk_column.get(), all_chunk_pins, 0);
int64_t end_idx = 0;
if (upper_chunk_id == -1) {
// upper_val is greater than all values, include all from start_idx to end
end_idx = bitset.size();
} else {
// If upper_inclusive is true and we found an exact match, include all equal values
if (upper_inclusive && upper_exact_match) {
auto [last_chunk_id, last_in_chunk_offset] =
this->find_last_pk_position<PK>(upper_val,
pk_column.get(),
all_chunk_pins,
upper_chunk_id,
upper_in_chunk_offset);
end_idx = pk_column->GetNumRowsUntilChunk(last_chunk_id) +
last_in_chunk_offset + 1;
} else {
// upper_inclusive is false or no exact match
// In both cases, end at the position of first value >= upper_val
end_idx = pk_column->GetNumRowsUntilChunk(upper_chunk_id) +
upper_in_chunk_offset;
}
}
// Set bits from start_idx to end_idx - 1
if (start_idx < end_idx) {
bitset.set(start_idx, end_idx - start_idx, true);
}
}
template <typename PK>
std::optional<int64_t>
find_sorted_pk_doc_offset(
const PK& pk,
const std::shared_ptr<ChunkedColumnInterface>& pk_column) const {
auto all_chunk_pins = pk_column->GetAllChunks(nullptr);
auto [chunk_id, in_chunk_offset, exact_match] =
this->pk_lower_bound<PK>(pk, pk_column.get(), all_chunk_pins, 0);
if (!exact_match) {
return std::nullopt;
}
return pk_column->GetNumRowsUntilChunk(chunk_id) + in_chunk_offset;
}
template <typename PK>
void
search_pks_with_two_pointers_impl(
BitsetTypeView& bitset,
const std::vector<PkType>& pks,
const std::shared_ptr<ChunkedColumnInterface>& pk_column) const {
// TODO: we should sort pks during plan generation
std::vector<PK> sorted_pks;
sorted_pks.reserve(pks.size());
for (const auto& pk : pks) {
sorted_pks.push_back(std::get<PK>(pk));
}
std::sort(sorted_pks.begin(), sorted_pks.end());
auto all_chunk_pins = pk_column->GetAllChunks(nullptr);
size_t pk_idx = 0;
int last_chunk_id = 0;
while (pk_idx < sorted_pks.size()) {
const auto& target_pk = sorted_pks[pk_idx];
// find the first occurrence of target_pk
auto [chunk_id, in_chunk_offset, exact_match] =
this->pk_lower_bound<PK>(
target_pk, pk_column.get(), all_chunk_pins, last_chunk_id);
if (chunk_id == -1) {
// All remaining PKs are greater than all values in pk_column
break;
}
if (exact_match) {
// Found exact match, find the last occurrence
auto [last_chunk_id_found, last_in_chunk_offset] =
this->find_last_pk_position<PK>(target_pk,
pk_column.get(),
all_chunk_pins,
chunk_id,
in_chunk_offset);
// Mark all occurrences from first to last position using global indices
auto start_idx =
pk_column->GetNumRowsUntilChunk(chunk_id) + in_chunk_offset;
auto end_idx =
pk_column->GetNumRowsUntilChunk(last_chunk_id_found) +
last_in_chunk_offset;
bitset.set(start_idx, end_idx - start_idx + 1, true);
last_chunk_id = last_chunk_id_found;
}
while (pk_idx < sorted_pks.size() &&
sorted_pks[pk_idx] == target_pk) {
pk_idx++;
}
}
}
// Binary search to find lower_bound of pk in pk_column starting from from_chunk_id
// Returns: (chunk_id, in_chunk_offset, exists)
// - chunk_id: the chunk containing the first value >= pk
// - in_chunk_offset: offset of the first value >= pk in that chunk
// - exists: true if found an exact match (value == pk), false otherwise
// - If pk doesn't exist, returns the position of first value > pk with exists=false
// - If pk is greater than all values, returns {-1, -1, false}
template <typename PK>
std::tuple<int, int, bool>
pk_lower_bound(const PK& pk,
const ChunkedColumnInterface* pk_column,
const std::vector<PinWrapper<Chunk*>>& all_chunk_pins,
int from_chunk_id = 0) const {
const auto num_chunk = pk_column->num_chunks();
if (from_chunk_id >= num_chunk) {
return {-1, -1, false}; // Invalid starting chunk
}
using PKViewType = std::conditional_t<std::is_same_v<PK, int64_t>,
int64_t,
std::string_view>;
auto get_val_view = [&](int chunk_id,
int in_chunk_offset) -> PKViewType {
auto& pw = all_chunk_pins[chunk_id];
if constexpr (std::is_same_v<PK, int64_t>) {
auto src =
reinterpret_cast<const int64_t*>(pw.get()->RawData());
return src[in_chunk_offset];
} else {
auto string_chunk = static_cast<StringChunk*>(pw.get());
return string_chunk->operator[](in_chunk_offset);
}
};
// Binary search at chunk level to find the first chunk that might contain pk
int left_chunk_id = from_chunk_id;
int right_chunk_id = num_chunk - 1;
int target_chunk_id = -1;
while (left_chunk_id <= right_chunk_id) {
int mid_chunk_id =
left_chunk_id + (right_chunk_id - left_chunk_id) / 2;
auto chunk_row_num = pk_column->chunk_row_nums(mid_chunk_id);
PKViewType min_val = get_val_view(mid_chunk_id, 0);
PKViewType max_val = get_val_view(mid_chunk_id, chunk_row_num - 1);
if (pk >= min_val && pk <= max_val) {
// pk might be in this chunk
target_chunk_id = mid_chunk_id;
break;
} else if (pk < min_val) {
// pk is before this chunk, could be in an earlier chunk
target_chunk_id = mid_chunk_id; // This chunk has values >= pk
right_chunk_id = mid_chunk_id - 1;
} else {
// pk is after this chunk, search in later chunks
left_chunk_id = mid_chunk_id + 1;
}
}
// If no suitable chunk found, check if we need the first position after all chunks
if (target_chunk_id == -1) {
if (left_chunk_id >= num_chunk) {
// pk is greater than all values
return {-1, -1, false};
}
target_chunk_id = left_chunk_id;
}
// Binary search within the target chunk to find lower_bound position
auto chunk_row_num = pk_column->chunk_row_nums(target_chunk_id);
int left_offset = 0;
int right_offset = chunk_row_num;
while (left_offset < right_offset) {
int mid_offset = left_offset + (right_offset - left_offset) / 2;
PKViewType mid_val = get_val_view(target_chunk_id, mid_offset);
if (mid_val < pk) {
left_offset = mid_offset + 1;
} else {
right_offset = mid_offset;
}
}
// Check if we found a valid position
if (left_offset < chunk_row_num) {
// Found position within current chunk
PKViewType found_val = get_val_view(target_chunk_id, left_offset);
bool exact_match = (found_val == pk);
return {target_chunk_id, left_offset, exact_match};
} else {
// Position is beyond current chunk, try next chunk
if (target_chunk_id + 1 < num_chunk) {
// Next chunk exists, return its first position
// Check if the first value in next chunk equals pk
PKViewType next_val = get_val_view(target_chunk_id + 1, 0);
bool exact_match = (next_val == pk);
return {target_chunk_id + 1, 0, exact_match};
} else {
// No more chunks, pk is greater than all values
return {-1, -1, false};
}
}
}
// Find the last occurrence position of pk starting from a known first occurrence
// Parameters:
// - pk: the primary key to search for
// - pk_column: the primary key column
// - first_chunk_id: chunk id of the first occurrence (from pk_lower_bound)
// - first_in_chunk_offset: offset in chunk of the first occurrence (from pk_lower_bound)
// Returns: (last_chunk_id, last_in_chunk_offset)
// - The position of the last occurrence of pk
// Note: This function assumes pk exists and linearly scans forward.
// It's efficient when pk has few duplicates.
template <typename PK>
std::tuple<int, int>
find_last_pk_position(const PK& pk,
const ChunkedColumnInterface* pk_column,
const std::vector<PinWrapper<Chunk*>>& all_chunk_pins,
int first_chunk_id,
int first_in_chunk_offset) const {
const auto num_chunk = pk_column->num_chunks();