|
49 | 49 | #include "storage/paxc_smgr.h" |
50 | 50 | #include "storage/wal/pax_wal.h" |
51 | 51 | #include "storage/wal/paxc_wal.h" |
| 52 | +#include "storage/pax_itemptr.h" |
52 | 53 |
|
53 | 54 | #define NOT_IMPLEMENTED_YET \ |
54 | 55 | ereport(ERROR, \ |
|
64 | 65 |
|
65 | 66 | // access methods that are implemented in C++ |
66 | 67 | namespace pax { |
| 68 | +struct AnalyzeBlockItem { |
| 69 | + int block; |
| 70 | + int row_count; |
| 71 | + int64 start_sample_block; |
| 72 | + int64 end_sample_block; |
| 73 | +}; |
| 74 | + |
| 75 | +static std::vector<AnalyzeBlockItem> extract_micro_partitions(Relation rel, Snapshot snapshot, int64 *totalrows) { |
| 76 | + auto iter = pax::MicroPartitionIterator::New(rel, snapshot); |
| 77 | + std::vector<AnalyzeBlockItem> analyze_items; |
| 78 | + int64 ntuples = 0; |
| 79 | + while (iter->HasNext()) { |
| 80 | + auto mp = iter->Next(); |
| 81 | + AnalyzeBlockItem item; |
| 82 | + item.block = mp.GetMicroPartitionId(); |
| 83 | + item.row_count = mp.GetTupleCount(); |
| 84 | + Assert(item.row_count > 0); |
| 85 | + |
| 86 | + ntuples += item.row_count; |
| 87 | + analyze_items.emplace_back(item); |
| 88 | + } |
| 89 | + iter->Release(); |
| 90 | + *totalrows = ntuples; |
| 91 | + |
| 92 | + std::sort(analyze_items.begin(), analyze_items.end(), |
| 93 | + [](const AnalyzeBlockItem &a, const AnalyzeBlockItem &b) { |
| 94 | + return a.block < b.block; |
| 95 | + }); |
| 96 | + return analyze_items; |
| 97 | +} |
| 98 | + |
| 99 | +static std::vector<AnalyzeBlockItem> |
| 100 | +extract_sample_items(Relation rel, Snapshot snapshot, int64 *totalrows) { |
| 101 | + std::vector<AnalyzeBlockItem> analyze_items; |
| 102 | + analyze_items = extract_micro_partitions(rel, snapshot, totalrows); |
| 103 | + |
| 104 | + int64 row_index = 0; |
| 105 | + for (size_t i = 0; i < analyze_items.size(); i++) { |
| 106 | + auto &item = analyze_items[i]; |
| 107 | + item.start_sample_block = row_index; |
| 108 | + item.end_sample_block = row_index + item.row_count; |
| 109 | + row_index = item.end_sample_block; |
| 110 | + } |
| 111 | + |
| 112 | + return analyze_items; |
| 113 | +} |
| 114 | + |
| 115 | +static int pax_acquire_sample_rows(Relation onerel, Snapshot snapshot, |
| 116 | + HeapTuple *rows, int targrows, |
| 117 | + double *totalrows, double *totaldeadrows) { |
| 118 | + std::vector<AnalyzeBlockItem> analyze_items; |
| 119 | + int64 ntuples = 0; |
| 120 | + analyze_items = extract_sample_items(onerel, snapshot, &ntuples); |
| 121 | + |
| 122 | + TupleTableSlot *slot = cbdb::MakeSingleTupleTableSlot( |
| 123 | + RelationGetDescr(onerel), table_slot_callbacks(onerel)); |
| 124 | + |
| 125 | + // start sample rows |
| 126 | + RowSamplerData rs; |
| 127 | + size_t analyze_item_index = 0; |
| 128 | + int numrows = 0; |
| 129 | + double liverows = 0; |
| 130 | + double deadrows = 0; |
| 131 | + |
| 132 | + PaxIndexScanDesc desc(onerel); |
| 133 | + RowSampler_Init(&rs, ntuples, targrows, random()); |
| 134 | + while (RowSampler_HasMore(&rs)) { |
| 135 | + int64 sample_row = RowSampler_Next(&rs); |
| 136 | + cbdb::VacuumDelayPoint(); |
| 137 | + |
| 138 | + // seek to the corresponding analyze item |
| 139 | + while (analyze_item_index < analyze_items.size() && |
| 140 | + sample_row >= analyze_items[analyze_item_index].end_sample_block) { |
| 141 | + analyze_item_index++; |
| 142 | + } |
| 143 | + if (analyze_item_index == analyze_items.size()) { |
| 144 | + break; |
| 145 | + } |
| 146 | + |
| 147 | + const auto &item = analyze_items[analyze_item_index]; |
| 148 | + Assert(sample_row >= item.start_sample_block && |
| 149 | + sample_row < item.end_sample_block); |
| 150 | + Assert(sample_row - item.start_sample_block < item.row_count); |
| 151 | + |
| 152 | + int offset = static_cast<int>(sample_row - item.start_sample_block); |
| 153 | + |
| 154 | + ItemPointerData ctid = pax::MakeCTID(item.block, offset); |
| 155 | + |
| 156 | + bool ok = desc.FetchTuple(&ctid, snapshot, slot, nullptr, nullptr); |
| 157 | + if (ok) { |
| 158 | + liverows += 1; |
| 159 | + rows[numrows++] = cbdb::ExecCopyHeapTuple(slot); |
| 160 | + } else { |
| 161 | + // dead rows |
| 162 | + deadrows += 1; |
| 163 | + } |
| 164 | + cbdb::ExecClearTuple(slot); |
| 165 | + } |
| 166 | + *totaldeadrows = deadrows / rs.m * (double) ntuples; |
| 167 | + *totalrows = ntuples - *totaldeadrows; |
| 168 | + desc.Release(); |
| 169 | + cbdb::ExecDropSingleTupleTableSlot(slot); |
| 170 | + |
| 171 | + return numrows; |
| 172 | +} |
67 | 173 |
|
68 | 174 | TableScanDesc CCPaxAccessMethod::ScanBegin(Relation relation, Snapshot snapshot, |
69 | 175 | int nkeys, struct ScanKeyData *key, |
@@ -285,33 +391,34 @@ TM_Result CCPaxAccessMethod::TupleUpdate(Relation relation, ItemPointer otid, |
285 | 391 | pg_unreachable(); |
286 | 392 | } |
287 | 393 |
|
288 | | -bool CCPaxAccessMethod::ScanAnalyzeNextBlock(TableScanDesc scan, |
289 | | - BlockNumber blockno, |
290 | | - BufferAccessStrategy bstrategy) { |
| 394 | +int CCPaxAccessMethod::AcquireSampleRows(Relation onerel, int elevel, HeapTuple *rows, |
| 395 | + int targrows, double *totalrows, double *totaldeadrows) { |
| 396 | + auto snapshot = GetCatalogSnapshot(InvalidOid); |
291 | 397 | CBDB_TRY(); |
292 | 398 | { |
293 | | - auto desc = PaxScanDesc::ToDesc(scan); |
294 | | - return desc->ScanAnalyzeNextBlock(blockno, bstrategy); |
| 399 | + return pax_acquire_sample_rows(onerel, snapshot, rows, targrows, |
| 400 | + totalrows, totaldeadrows); |
295 | 401 | } |
296 | 402 | CBDB_CATCH_DEFAULT(); |
297 | | - CBDB_FINALLY({}); |
298 | 403 | CBDB_END_TRY(); |
299 | 404 | pg_unreachable(); |
300 | 405 | } |
301 | 406 |
|
| 407 | +bool CCPaxAccessMethod::ScanAnalyzeNextBlock(TableScanDesc scan, |
| 408 | + BlockNumber blockno, |
| 409 | + BufferAccessStrategy bstrategy) { |
| 410 | + ereport(ERROR, |
| 411 | + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| 412 | + errmsg("analyze next block is not supported on pax relations"))); |
| 413 | +} |
| 414 | + |
302 | 415 | bool CCPaxAccessMethod::ScanAnalyzeNextTuple(TableScanDesc scan, |
303 | 416 | TransactionId oldest_xmin, |
304 | 417 | double *liverows, double *deadrows, |
305 | 418 | TupleTableSlot *slot) { |
306 | | - CBDB_TRY(); |
307 | | - { |
308 | | - auto desc = PaxScanDesc::ToDesc(scan); |
309 | | - return desc->ScanAnalyzeNextTuple(oldest_xmin, liverows, deadrows, slot); |
310 | | - } |
311 | | - CBDB_CATCH_DEFAULT(); |
312 | | - CBDB_FINALLY({}); |
313 | | - CBDB_END_TRY(); |
314 | | - pg_unreachable(); |
| 419 | + ereport(ERROR, |
| 420 | + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| 421 | + errmsg("analyze next tuple is not supported on pax relations"))); |
315 | 422 | } |
316 | 423 |
|
317 | 424 | bool CCPaxAccessMethod::ScanBitmapNextBlock(TableScanDesc scan, |
@@ -770,6 +877,7 @@ static const TableAmRoutine kPaxColumnMethods = { |
770 | 877 | .relation_vacuum = paxc::PaxAccessMethod::RelationVacuum, |
771 | 878 | .scan_analyze_next_block = pax::CCPaxAccessMethod::ScanAnalyzeNextBlock, |
772 | 879 | .scan_analyze_next_tuple = pax::CCPaxAccessMethod::ScanAnalyzeNextTuple, |
| 880 | + .relation_acquire_sample_rows = pax::CCPaxAccessMethod::AcquireSampleRows, |
773 | 881 | .index_build_range_scan = paxc::PaxAccessMethod::IndexBuildRangeScan, |
774 | 882 | .index_validate_scan = paxc::PaxAccessMethod::IndexValidateScan, |
775 | 883 |
|
|
0 commit comments