Skip to content

Commit 5dfd11f

Browse files
authored
Minor improvement in duckdb conversion (#3096)
* Minor improvement. * Optimized `SELECT COUNT(*)` path * Fixed add column and rename column.
1 parent a1907e0 commit 5dfd11f

File tree

7 files changed

+1138
-7
lines changed

7 files changed

+1138
-7
lines changed

cpp/deeplake_pg/deeplake_executor.cpp

Lines changed: 151 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,17 +129,37 @@ struct DeeplakeExecutorState
129129
}
130130
};
131131

132+
// Simple executor state for COUNT(*) fast path
133+
struct CountExecutorState
134+
{
135+
CustomScanState css;
136+
int64_t count_value;
137+
bool returned = false;
138+
};
139+
140+
// DeepLake executor forward declarations
132141
Node* deeplake_executor_create_scan_state(CustomScan* cscan);
133142
void deeplake_executor_begin_scan(CustomScanState* node, EState* estate, int32_t eflags);
134143
TupleTableSlot* deeplake_executor_exec_scan(CustomScanState* node);
135144
void deeplake_executor_end_scan(CustomScanState* node);
136145
void deeplake_executor_rescan(CustomScanState* node);
137146
void deeplake_executor_explain(CustomScanState* node, List* ancestors, ExplainState* es);
138147

148+
// COUNT(*) executor forward declarations
149+
Node* count_executor_create_scan_state(CustomScan* cscan);
150+
void count_executor_begin_scan(CustomScanState* node, EState* estate, int32_t eflags);
151+
TupleTableSlot* count_executor_exec_scan(CustomScanState* node);
152+
void count_executor_end_scan(CustomScanState* node);
153+
void count_executor_rescan(CustomScanState* node);
154+
void count_executor_explain(CustomScanState* node, List* ancestors, ExplainState* es);
155+
139156
// Custom scan methods
140157
static CustomScanMethods deeplake_executor_scan_methods;
141158
static CustomExecMethods deeplake_executor_exec_methods;
142159

160+
static CustomScanMethods count_executor_scan_methods;
161+
static CustomExecMethods count_executor_exec_methods;
162+
143163
static void init_executor_methods()
144164
{
145165
static bool inited = false;
@@ -148,19 +168,33 @@ static void init_executor_methods()
148168
}
149169
inited = true;
150170

151-
// Initialize scan methods
171+
// Initialize deeplake executor scan methods
152172
memset(&deeplake_executor_scan_methods, 0, sizeof(deeplake_executor_scan_methods));
153173
deeplake_executor_scan_methods.CustomName = "DeeplakeExecutor";
154174
deeplake_executor_scan_methods.CreateCustomScanState = deeplake_executor_create_scan_state;
155175

156-
// Initialize exec methods
176+
// Initialize deeplake executor exec methods
157177
memset(&deeplake_executor_exec_methods, 0, sizeof(deeplake_executor_exec_methods));
158178
deeplake_executor_exec_methods.CustomName = "DeeplakeExecutor";
159179
deeplake_executor_exec_methods.BeginCustomScan = deeplake_executor_begin_scan;
160180
deeplake_executor_exec_methods.ExecCustomScan = deeplake_executor_exec_scan;
161181
deeplake_executor_exec_methods.EndCustomScan = deeplake_executor_end_scan;
162182
deeplake_executor_exec_methods.ReScanCustomScan = deeplake_executor_rescan;
163183
deeplake_executor_exec_methods.ExplainCustomScan = deeplake_executor_explain;
184+
185+
// Initialize COUNT(*) executor scan methods
186+
memset(&count_executor_scan_methods, 0, sizeof(count_executor_scan_methods));
187+
count_executor_scan_methods.CustomName = "CountExecutor";
188+
count_executor_scan_methods.CreateCustomScanState = count_executor_create_scan_state;
189+
190+
// Initialize COUNT(*) executor exec methods
191+
memset(&count_executor_exec_methods, 0, sizeof(count_executor_exec_methods));
192+
count_executor_exec_methods.CustomName = "CountExecutor";
193+
count_executor_exec_methods.BeginCustomScan = count_executor_begin_scan;
194+
count_executor_exec_methods.ExecCustomScan = count_executor_exec_scan;
195+
count_executor_exec_methods.EndCustomScan = count_executor_end_scan;
196+
count_executor_exec_methods.ReScanCustomScan = count_executor_rescan;
197+
count_executor_exec_methods.ExplainCustomScan = count_executor_explain;
164198
}
165199

166200
// Helper: Convert deeplake sample to PostgreSQL Datum
@@ -310,6 +344,85 @@ void deeplake_executor_explain(CustomScanState* node, List* ancestors, ExplainSt
310344
}
311345
}
312346

347+
// ============================================================================
348+
// COUNT(*) Fast Path Executor Implementation
349+
// ============================================================================
350+
351+
// Create scan state for COUNT(*) executor
352+
Node* count_executor_create_scan_state(CustomScan* cscan)
353+
{
354+
CountExecutorState* state = (CountExecutorState*)palloc0(sizeof(CountExecutorState));
355+
NodeSetTag(state, T_CustomScanState);
356+
init_executor_methods();
357+
state->css.methods = &count_executor_exec_methods;
358+
state->css.ss.ps.type = T_CustomScanState;
359+
return (Node*)state;
360+
}
361+
362+
// Begin scan for COUNT(*) executor
363+
void count_executor_begin_scan(CustomScanState* node, EState* estate, int32_t eflags)
364+
{
365+
CountExecutorState* state = (CountExecutorState*)node;
366+
CustomScan* cscan = (CustomScan*)node->ss.ps.plan;
367+
368+
// Initialize scan state
369+
state->css.ss.ps.state = estate;
370+
ExecInitScanTupleSlot(estate, &state->css.ss, ExecTypeFromTL(cscan->scan.plan.targetlist), &TTSOpsVirtual);
371+
372+
// Extract count value from custom_private
373+
ASSERT(list_length(cscan->custom_private) == 1);
374+
Const* count_const = (Const*)linitial(cscan->custom_private);
375+
ASSERT(count_const && IsA(count_const, Const) && !count_const->constisnull);
376+
state->count_value = DatumGetInt64(count_const->constvalue);
377+
state->returned = false;
378+
379+
elog(DEBUG1, "DeepLake COUNT(*) Fast Path: returning %ld", state->count_value);
380+
}
381+
382+
// Execute scan for COUNT(*) executor - return single row with count
383+
TupleTableSlot* count_executor_exec_scan(CustomScanState* node)
384+
{
385+
CountExecutorState* state = (CountExecutorState*)node;
386+
TupleTableSlot* slot = node->ss.ss_ScanTupleSlot;
387+
388+
ExecClearTuple(slot);
389+
390+
// Return count value as single row, only once
391+
if (!state->returned) {
392+
slot->tts_values[0] = Int64GetDatum(state->count_value);
393+
slot->tts_isnull[0] = false;
394+
ExecStoreVirtualTuple(slot);
395+
state->returned = true;
396+
return slot;
397+
}
398+
399+
// Already returned the count
400+
return slot;
401+
}
402+
403+
// End scan for COUNT(*) executor
404+
void count_executor_end_scan(CustomScanState* node)
405+
{
406+
// Nothing to clean up
407+
}
408+
409+
// Rescan for COUNT(*) executor
410+
void count_executor_rescan(CustomScanState* node)
411+
{
412+
CountExecutorState* state = (CountExecutorState*)node;
413+
state->returned = false;
414+
}
415+
416+
// Explain for COUNT(*) executor
417+
void count_executor_explain(CustomScanState* node, List* ancestors, ExplainState* es)
418+
{
419+
CountExecutorState* state = (CountExecutorState*)node;
420+
ExplainPropertyText("DeepLake Optimization", "COUNT(*) Fast Path", es);
421+
ExplainPropertyInteger("Count", nullptr, state->count_value, es);
422+
}
423+
424+
// ============================================================================
425+
313426
// Create a simple targetlist with Var nodes for the output columns
314427
// This converts expressions to simple column references
315428
List* create_simple_targetlist(List* original_targetlist)
@@ -372,6 +485,42 @@ extern "C" PlannedStmt* deeplake_create_direct_execution_plan(Query* parse,
372485
return std_plan;
373486
}
374487

488+
// Fast path for COUNT(*) without WHERE
489+
if (is_pure_count_star_query(parse)) {
490+
// Extract table from rtable
491+
RangeTblEntry* rte = (RangeTblEntry*)linitial(parse->rtable);
492+
Oid table_id = rte->relid;
493+
494+
// Get row count from table_data
495+
if (pg::table_storage::instance().table_exists(table_id)) {
496+
auto& table_data = pg::table_storage::instance().get_table_data(table_id);
497+
int64_t row_count = table_data.num_rows();
498+
499+
// Create a CustomScan node with COUNT(*) executor
500+
CustomScan* cscan = makeNode(CustomScan);
501+
cscan->scan.plan.targetlist = create_simple_targetlist(std_plan->planTree->targetlist);
502+
cscan->scan.plan.qual = NIL;
503+
cscan->scan.scanrelid = 0;
504+
cscan->flags = 0;
505+
cscan->methods = &count_executor_scan_methods;
506+
507+
// Cost estimates (very cheap!)
508+
cscan->scan.plan.startup_cost = 0;
509+
cscan->scan.plan.total_cost = 0.01;
510+
cscan->scan.plan.plan_rows = 1;
511+
cscan->scan.plan.plan_width = sizeof(int64_t);
512+
513+
// Store count value in custom_private
514+
Const* count_const = makeConst(INT8OID, -1, InvalidOid, sizeof(int64_t),
515+
Int64GetDatum(row_count), false, true);
516+
cscan->custom_private = list_make1(count_const);
517+
518+
// Replace plan tree with COUNT(*) executor
519+
std_plan->planTree = reinterpret_cast<Plan*>(cscan);
520+
return std_plan;
521+
}
522+
}
523+
375524
// Create a CustomScan node that will execute the entire query in DeepLake
376525
CustomScan* cscan = makeNode(CustomScan);
377526

cpp/deeplake_pg/deeplake_executor.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ PGDLLEXPORT PlannedStmt* deeplake_create_direct_execution_plan(
3838
}
3939
#endif
4040

41+
// Helper function for COUNT(*) detection
42+
bool is_pure_count_star_query(Query* parse);
43+
4144
namespace pg {
4245
void analyze_plan(PlannedStmt* plan);
4346
} // namespace pg

cpp/deeplake_pg/duckdb_deeplake_scan.cpp

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -814,17 +814,15 @@ class deeplake_scan_function_helper
814814
nd::switch_dtype(col_view->dtype(), [&]<typename T>() {
815815
if constexpr (std::is_arithmetic_v<T>) {
816816
auto att_type = td.get_atttypid(col_idx);
817+
auto* value_ptr = td.get_streamers().value_ptr<T>(col_idx, current_row);
817818
if (att_type == VARCHAROID || att_type == CHAROID || att_type == BPCHAROID) {
818819
auto* duckdb_data = duckdb::FlatVector::GetData<duckdb::string_t>(output_vector);
819820
for (duckdb::idx_t row_in_batch = 0; row_in_batch < batch_size; ++row_in_batch) {
820-
const int64_t row_idx = current_row + row_in_batch;
821-
auto value = td.get_streamers().value<T>(col_idx, row_idx);
822-
duckdb_data[row_in_batch] = add_string(
823-
output_vector, reinterpret_cast<const char*>(&value), 1);
821+
duckdb_data[row_in_batch] =
822+
add_string(output_vector, reinterpret_cast<const char*>(value_ptr + row_in_batch), 1);
824823
}
825824
return;
826825
}
827-
auto* value_ptr = td.get_streamers().value_ptr<T>(col_idx, current_row);
828826
std::memcpy(duckdb::FlatVector::GetData<T>(output_vector), value_ptr, batch_size * sizeof(T));
829827
} else if constexpr (std::is_same_v<T, nd::dict>) {
830828
auto* duckdb_data = duckdb::FlatVector::GetData<duckdb::string_t>(output_vector);

0 commit comments

Comments
 (0)