Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 128 additions & 15 deletions contrib/pax_storage/src/cpp/access/pax_access_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "storage/paxc_smgr.h"
#include "storage/wal/pax_wal.h"
#include "storage/wal/paxc_wal.h"
#include "storage/pax_itemptr.h"

#define NOT_IMPLEMENTED_YET \
ereport(ERROR, \
Expand All @@ -64,6 +65,116 @@

// access methods that are implemented in C++
namespace pax {
struct AnalyzeBlockItem {
int block;
int row_count;
int64 start_sample_block;
int64 end_sample_block;
};

static std::vector<AnalyzeBlockItem> extract_micro_partitions(Relation rel, Snapshot snapshot, int64 *totalrows) {
auto iter = pax::MicroPartitionIterator::New(rel, snapshot);
std::vector<AnalyzeBlockItem> analyze_items;
int64 ntuples = 0;
while (iter->HasNext()) {
auto mp = iter->Next();
AnalyzeBlockItem item;
item.block = mp.GetMicroPartitionId();
item.row_count = mp.GetTupleCount();
Assert(item.row_count > 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when delete all tuples , row_count maybe 0 ?


ntuples += item.row_count;
analyze_items.emplace_back(item);
}
iter->Release();
*totalrows = ntuples;

std::sort(analyze_items.begin(), analyze_items.end(),
[](const AnalyzeBlockItem &a, const AnalyzeBlockItem &b) {
return a.block < b.block;
});
return analyze_items;
}

static std::vector<AnalyzeBlockItem>
extract_sample_items(Relation rel, Snapshot snapshot, int64 *totalrows) {
std::vector<AnalyzeBlockItem> analyze_items;
analyze_items = extract_micro_partitions(rel, snapshot, totalrows);

int64 row_index = 0;
for (size_t i = 0; i < analyze_items.size(); i++) {
auto &item = analyze_items[i];
item.start_sample_block = row_index;
item.end_sample_block = row_index + item.row_count;
row_index = item.end_sample_block;
}

return analyze_items;
}

static int pax_acquire_sample_rows(Relation onerel, Snapshot snapshot,
HeapTuple *rows, int targrows,
double *totalrows, double *totaldeadrows) {
std::vector<AnalyzeBlockItem> analyze_items;
int64 ntuples = 0;
analyze_items = extract_sample_items(onerel, snapshot, &ntuples);

TupleTableSlot *slot = cbdb::MakeSingleTupleTableSlot(
RelationGetDescr(onerel), table_slot_callbacks(onerel));

// start sample rows
RowSamplerData rs;
size_t analyze_item_index = 0;
int numrows = 0;
double liverows = 0;
double deadrows = 0;

PaxIndexScanDesc desc(onerel);
RowSampler_Init(&rs, ntuples, targrows, random());
while (RowSampler_HasMore(&rs)) {
int64 sample_row = RowSampler_Next(&rs);
cbdb::VacuumDelayPoint();

// seek to the corresponding analyze item
while (analyze_item_index < analyze_items.size() &&
sample_row >= analyze_items[analyze_item_index].end_sample_block) {
analyze_item_index++;
}
if (analyze_item_index == analyze_items.size()) {
break;
}

const auto &item = analyze_items[analyze_item_index];
Assert(sample_row >= item.start_sample_block &&
sample_row < item.end_sample_block);
Assert(sample_row - item.start_sample_block < item.row_count);

int offset = static_cast<int>(sample_row - item.start_sample_block);

ItemPointerData ctid = pax::MakeCTID(item.block, offset);

bool ok = desc.FetchTuple(&ctid, snapshot, slot, nullptr, nullptr);
if (ok) {
liverows += 1;
rows[numrows++] = cbdb::ExecCopyHeapTuple(slot);
} else {
// dead rows
deadrows += 1;
}
cbdb::ExecClearTuple(slot);
}
if (rs.m > 0) {
*totaldeadrows = deadrows / rs.m * (double) ntuples;
*totalrows = ntuples - *totaldeadrows;
} else {
*totalrows = 0.0;
*totaldeadrows = 0.0;
}
desc.Release();
cbdb::ExecDropSingleTupleTableSlot(slot);

return numrows;
}

TableScanDesc CCPaxAccessMethod::ScanBegin(Relation relation, Snapshot snapshot,
int nkeys, struct ScanKeyData *key,
Expand Down Expand Up @@ -285,33 +396,34 @@ TM_Result CCPaxAccessMethod::TupleUpdate(Relation relation, ItemPointer otid,
pg_unreachable();
}

bool CCPaxAccessMethod::ScanAnalyzeNextBlock(TableScanDesc scan,
BlockNumber blockno,
BufferAccessStrategy bstrategy) {
int CCPaxAccessMethod::AcquireSampleRows(Relation onerel, int elevel, HeapTuple *rows,
int targrows, double *totalrows, double *totaldeadrows) {
auto snapshot = GetCatalogSnapshot(InvalidOid);
CBDB_TRY();
{
auto desc = PaxScanDesc::ToDesc(scan);
return desc->ScanAnalyzeNextBlock(blockno, bstrategy);
return pax_acquire_sample_rows(onerel, snapshot, rows, targrows,
totalrows, totaldeadrows);
}
CBDB_CATCH_DEFAULT();
CBDB_FINALLY({});
CBDB_END_TRY();
pg_unreachable();
}

bool CCPaxAccessMethod::ScanAnalyzeNextBlock(TableScanDesc scan,
BlockNumber blockno,
BufferAccessStrategy bstrategy) {
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("analyze next block is not supported on pax relations")));
}

bool CCPaxAccessMethod::ScanAnalyzeNextTuple(TableScanDesc scan,
TransactionId oldest_xmin,
double *liverows, double *deadrows,
TupleTableSlot *slot) {
CBDB_TRY();
{
auto desc = PaxScanDesc::ToDesc(scan);
return desc->ScanAnalyzeNextTuple(oldest_xmin, liverows, deadrows, slot);
}
CBDB_CATCH_DEFAULT();
CBDB_FINALLY({});
CBDB_END_TRY();
pg_unreachable();
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("analyze next tuple is not supported on pax relations")));
}

bool CCPaxAccessMethod::ScanBitmapNextBlock(TableScanDesc scan,
Expand Down Expand Up @@ -770,6 +882,7 @@ static const TableAmRoutine kPaxColumnMethods = {
.relation_vacuum = paxc::PaxAccessMethod::RelationVacuum,
.scan_analyze_next_block = pax::CCPaxAccessMethod::ScanAnalyzeNextBlock,
.scan_analyze_next_tuple = pax::CCPaxAccessMethod::ScanAnalyzeNextTuple,
.relation_acquire_sample_rows = pax::CCPaxAccessMethod::AcquireSampleRows,
.index_build_range_scan = paxc::PaxAccessMethod::IndexBuildRangeScan,
.index_validate_scan = paxc::PaxAccessMethod::IndexValidateScan,

Expand Down
2 changes: 2 additions & 0 deletions contrib/pax_storage/src/cpp/access/pax_access_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ class CCPaxAccessMethod final {

static void RelationNontransactionalTruncate(Relation rel);

static int AcquireSampleRows(Relation onerel, int elevel, HeapTuple *rows,
int targrows, double *totalrows, double *totaldeadrows);
static bool ScanAnalyzeNextBlock(TableScanDesc scan, BlockNumber blockno,
BufferAccessStrategy bstrategy);
static bool ScanAnalyzeNextTuple(TableScanDesc scan,
Expand Down
2 changes: 2 additions & 0 deletions contrib/pax_storage/src/cpp/comm/cbdb_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ extern "C" {
#include "commands/defrem.h"
#include "commands/progress.h"
#include "commands/tablecmds.h"
#include "commands/vacuum.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "nodes/bitmapset.h"
Expand Down Expand Up @@ -135,6 +136,7 @@ extern "C" {
#include "utils/memutils.h"
#include "utils/numeric.h"
#include "utils/relcache.h"
#include "utils/sampling.h"
#include "utils/snapshot.h"
#include "utils/spccache.h"
#include "utils/syscache.h"
Expand Down
12 changes: 12 additions & 0 deletions contrib/pax_storage/src/cpp/comm/cbdb_wrappers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,12 @@ TupleTableSlot *cbdb::MakeSingleTupleTableSlot(
CBDB_WRAP_END;
}

HeapTuple cbdb::ExecCopyHeapTuple(TupleTableSlot *slot) {
CBDB_WRAP_START;
{ return ::ExecCopySlotHeapTuple(slot); }
CBDB_WRAP_END;
}

void cbdb::SlotGetAllAttrs(TupleTableSlot *slot) {
CBDB_WRAP_START;
{ ::slot_getallattrs(slot); }
Expand All @@ -628,3 +634,9 @@ void cbdb::ExecStoreVirtualTuple(TupleTableSlot *slot) {
{ ::ExecStoreVirtualTuple(slot); }
CBDB_WRAP_END;
}

void cbdb::VacuumDelayPoint() {
CBDB_WRAP_START;
{ vacuum_delay_point(); }
CBDB_WRAP_END;
}
3 changes: 3 additions & 0 deletions contrib/pax_storage/src/cpp/comm/cbdb_wrappers.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,13 +285,16 @@ bool NeedWAL(Relation rel);
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot);
TupleTableSlot *MakeSingleTupleTableSlot(TupleDesc tupdesc,
const TupleTableSlotOps *tts_ops);
HeapTuple ExecCopyHeapTuple(TupleTableSlot *slot);


void SlotGetAllAttrs(TupleTableSlot *slot);

void ExecClearTuple(TupleTableSlot *slot);

void ExecStoreVirtualTuple(TupleTableSlot *slot);

void VacuumDelayPoint();
} // namespace cbdb

// clang-format off
Expand Down
22 changes: 5 additions & 17 deletions src/backend/commands/analyze.c
Original file line number Diff line number Diff line change
Expand Up @@ -1728,24 +1728,12 @@ acquire_sample_rows(Relation onerel, int elevel,
* the relation should not be an AO/CO table.
*/
Assert(!RelationIsAppendOptimized(onerel));
if (RelationIsPax(onerel))
{
/* PAX use non-fixed block layout */
BlockNumber pages;
double tuples;
double allvisfrac;
int32 attr_widths;

table_relation_estimate_size(onerel, &attr_widths, &pages,
&tuples, &allvisfrac);

if (tuples > UINT_MAX)
tuples = UINT_MAX;
/*
* PAX uses table_relation_acquire_sample_rows() as well.
*/
Assert(!RelationIsPax(onerel));

totalblocks = (BlockNumber)tuples;
}
else
totalblocks = RelationGetNumberOfBlocks(onerel);
totalblocks = RelationGetNumberOfBlocks(onerel);

/* Need a cutoff xmin for HeapTupleSatisfiesVacuum */
OldestXmin = GetOldestNonRemovableTransactionId(onerel);
Expand Down
Loading