Skip to content

Commit afd17ff

Browse files
SkipScan over compressed chunks
1 parent cf3e182 commit afd17ff

28 files changed

+12238
-164
lines changed

.unreleased/pr_7983

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Implements: #7983 Support for SkipScan over compressed data

src/guc.c

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ TSDLLEXPORT bool ts_guc_enable_skip_scan = true;
168168
#if PG16_GE
169169
TSDLLEXPORT bool ts_guc_enable_skip_scan_for_distinct_aggregates = true;
170170
#endif
171+
TSDLLEXPORT bool ts_guc_enable_compressed_skip_scan = true;
171172
static char *ts_guc_default_segmentby_fn = NULL;
172173
static char *ts_guc_default_orderby_fn = NULL;
173174
TSDLLEXPORT bool ts_guc_enable_job_execution_logging = false;
@@ -668,6 +669,18 @@ _guc_init(void)
668669
NULL,
669670
NULL);
670671
#endif
672+
673+
DefineCustomBoolVariable(MAKE_EXTOPTION("enable_compressed_skipscan"),
674+
"Enable SkipScan for compressed chunks",
675+
"Enable SkipScan for distinct inputs over compressed chunks",
676+
&ts_guc_enable_compressed_skip_scan,
677+
true,
678+
PGC_USERSET,
679+
0,
680+
NULL,
681+
NULL,
682+
NULL);
683+
671684
DefineCustomBoolVariable(MAKE_EXTOPTION("enable_compression_wal_markers"),
672685
"Enable WAL markers for compression ops",
673686
"Enable the generation of markers in the WAL stream which mark the "

src/guc.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ extern TSDLLEXPORT int ts_guc_compression_batch_size_limit;
7676
#if PG16_GE
7777
extern TSDLLEXPORT bool ts_guc_enable_skip_scan_for_distinct_aggregates;
7878
#endif
79+
extern TSDLLEXPORT bool ts_guc_enable_compressed_skip_scan;
7980

8081
/* Only settable in debug mode for testing */
8182
extern TSDLLEXPORT bool ts_guc_enable_null_compression;

tsl/src/nodes/decompress_chunk/compressed_batch.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1244,6 +1244,10 @@ compressed_batch_advance(DecompressContext *dcontext, DecompressBatchState *batc
12441244
}
12451245

12461246
/* The tuple passed the qual. */
1247+
if (dcontext->unique_segmentby)
1248+
{
1249+
batch_state->next_batch_row = batch_state->total_batch_rows - 1;
1250+
}
12471251
batch_state->next_batch_row++;
12481252
return;
12491253
}

tsl/src/nodes/decompress_chunk/decompress_chunk.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,5 +61,6 @@ void ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *rel, cons
6161
const Chunk *chunk);
6262

6363
extern bool ts_is_decompress_chunk_path(Path *path);
64+
extern bool ts_is_decompress_chunk_plan(Plan *plan);
6465

6566
DecompressChunkPath *copy_decompress_chunk_path(DecompressChunkPath *src);

tsl/src/nodes/decompress_chunk/decompress_context.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ typedef struct DecompressContext
7575
bool reverse;
7676
bool batch_sorted_merge; /* Batch sorted merge optimization enabled. */
7777
bool enable_bulk_decompression;
78+
bool unique_segmentby;
7879

7980
/*
8081
* Scratch space for bulk decompression which might need a lot of temporary

tsl/src/nodes/decompress_chunk/exec.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ decompress_chunk_state_create(CustomScan *cscan)
8181
list_nth_int(settings, DCS_BatchSortedMerge);
8282
chunk_state->decompress_context.enable_bulk_decompression =
8383
list_nth_int(settings, DCS_EnableBulkDecompression);
84+
chunk_state->decompress_context.unique_segmentby = list_nth_int(settings, DCS_UniqueSegmentBy);
8485
chunk_state->has_row_marks = list_nth_int(settings, DCS_HasRowMarks);
8586

8687
Assert(IsA(cscan->custom_exprs, List));

tsl/src/nodes/decompress_chunk/planner.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@ static CustomScanMethods decompress_chunk_plan_methods = {
4444
.CreateCustomScanState = decompress_chunk_state_create,
4545
};
4646

47+
/* Check if the provided plan is a DecompressChunkPlan */
48+
bool
49+
ts_is_decompress_chunk_plan(Plan *plan)
50+
{
51+
return IsA(plan, CustomScan) &&
52+
castNode(CustomScan, plan)->methods == &decompress_chunk_plan_methods;
53+
}
54+
4755
void
4856
_decompress_chunk_init(void)
4957
{
@@ -1344,6 +1352,7 @@ decompress_chunk_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *pat
13441352
lfirst_int(list_nth_cell(settings, DCS_Reverse)) = dcpath->reverse;
13451353
lfirst_int(list_nth_cell(settings, DCS_BatchSortedMerge)) = dcpath->batch_sorted_merge;
13461354
lfirst_int(list_nth_cell(settings, DCS_EnableBulkDecompression)) = enable_bulk_decompression;
1355+
lfirst_int(list_nth_cell(settings, DCS_UniqueSegmentBy)) = false;
13471356
lfirst_int(list_nth_cell(settings, DCS_HasRowMarks)) = root->parse->rowMarks != NIL;
13481357

13491358
/*

tsl/src/nodes/decompress_chunk/planner.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ typedef enum
1515
DCS_BatchSortedMerge = 3,
1616
DCS_EnableBulkDecompression = 4,
1717
DCS_HasRowMarks = 5,
18+
DCS_UniqueSegmentBy = 6,
1819
DCS_Count
1920
} DecompressChunkSettingsIndex;
2021

tsl/src/nodes/skip_scan/exec.c

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
#include <utils/datum.h>
4848

4949
#include "guc.h"
50+
#include "nodes/decompress_chunk/decompress_chunk.h"
5051
#include "nodes/skip_scan/skip_scan.h"
5152

5253
typedef enum SkipScanStage
@@ -88,6 +89,12 @@ typedef struct SkipScanState
8889
/* rescan required before getting next tuple */
8990
bool needs_rescan;
9091

92+
/* child_plan node is the input for skip scan plan node.
93+
* if skipscan is directly over index, child_plan = idx_scan
94+
* if skip scan is over compressed index, idx_scan = compressed_index,
95+
* child_plan = decompressed input into skip scan
96+
*/
97+
Plan *child_plan;
9198
void *idx_scan;
9299
} SkipScanState;
93100

@@ -102,8 +109,19 @@ skip_scan_begin(CustomScanState *node, EState *estate, int eflags)
102109
SkipScanState *state = (SkipScanState *) node;
103110
state->ctx = AllocSetContextCreate(estate->es_query_cxt, "skipscan", ALLOCSET_DEFAULT_SIZES);
104111

105-
state->idx = (ScanState *) ExecInitNode(state->idx_scan, estate, eflags);
106-
node->custom_ps = list_make1(state->idx);
112+
node->custom_ps = list_make1((ScanState *) ExecInitNode(state->child_plan, estate, eflags));
113+
ScanState *child_state = linitial(node->custom_ps);
114+
if (state->child_plan == state->idx_scan)
115+
{
116+
state->idx = child_state;
117+
}
118+
else if (IsA(child_state, CustomScanState))
119+
{
120+
Assert(ts_is_decompress_chunk_plan(state->child_plan));
121+
state->idx = linitial(castNode(CustomScanState, child_state)->custom_ps);
122+
}
123+
else
124+
elog(ERROR, "unknown subscan type in SkipScan");
107125

108126
if (IsA(state->idx_scan, IndexScan))
109127
{
@@ -242,6 +260,7 @@ skip_scan_exec(CustomScanState *node)
242260
{
243261
SkipScanState *state = (SkipScanState *) node;
244262
TupleTableSlot *result;
263+
ScanState *child_state;
245264

246265
/*
247266
* We are not supporting projection here since no plan
@@ -267,7 +286,8 @@ skip_scan_exec(CustomScanState *node)
267286
break;
268287

269288
case SS_NULLS_FIRST:
270-
result = state->idx->ps.ExecProcNode(&state->idx->ps);
289+
child_state = linitial(state->cscan_state.custom_ps);
290+
result = child_state->ps.ExecProcNode(&child_state->ps);
271291

272292
/*
273293
* if we found a NULL value we return it, otherwise
@@ -281,7 +301,8 @@ skip_scan_exec(CustomScanState *node)
281301

282302
case SS_NOT_NULL:
283303
case SS_VALUES:
284-
result = state->idx->ps.ExecProcNode(&state->idx->ps);
304+
child_state = linitial(state->cscan_state.custom_ps);
305+
result = child_state->ps.ExecProcNode(&child_state->ps);
285306

286307
if (!TupIsNull(result))
287308
{
@@ -314,7 +335,8 @@ skip_scan_exec(CustomScanState *node)
314335
break;
315336

316337
case SS_NULLS_LAST:
317-
result = state->idx->ps.ExecProcNode(&state->idx->ps);
338+
child_state = linitial(state->cscan_state.custom_ps);
339+
result = child_state->ps.ExecProcNode(&child_state->ps);
318340
skip_scan_switch_stage(state, SS_END);
319341
return result;
320342
break;
@@ -330,7 +352,8 @@ static void
330352
skip_scan_end(CustomScanState *node)
331353
{
332354
SkipScanState *state = (SkipScanState *) node;
333-
ExecEndNode(&state->idx->ps);
355+
ScanState *child_state = linitial(state->cscan_state.custom_ps);
356+
ExecEndNode(&child_state->ps);
334357
}
335358

336359
static void
@@ -352,7 +375,8 @@ skip_scan_rescan(CustomScanState *node)
352375
state->prev_datum = 0;
353376

354377
state->needs_rescan = false;
355-
ExecReScan(&state->idx->ps);
378+
ScanState *child_state = linitial(state->cscan_state.custom_ps);
379+
ExecReScan(&child_state->ps);
356380
MemoryContextReset(state->ctx);
357381
}
358382

@@ -369,7 +393,16 @@ tsl_skip_scan_state_create(CustomScan *cscan)
369393
{
370394
SkipScanState *state = (SkipScanState *) newNode(sizeof(SkipScanState), T_CustomScanState);
371395

372-
state->idx_scan = linitial(cscan->custom_plans);
396+
state->child_plan = linitial(cscan->custom_plans);
397+
if (ts_is_decompress_chunk_plan(state->child_plan))
398+
{
399+
CustomScan *csplan = castNode(CustomScan, state->child_plan);
400+
state->idx_scan = linitial(csplan->custom_plans);
401+
}
402+
else
403+
{
404+
state->idx_scan = state->child_plan;
405+
}
373406
state->stage = SS_BEGIN;
374407

375408
state->distinct_col_attnum = linitial_int(cscan->custom_private);

0 commit comments

Comments
 (0)