Skip to content

Commit e363fda

Browse files
committed
vendor: Update vendored sources to duckdb/duckdb@7a9e96d
Merge V1.2 histrionicus into main (duckdb/duckdb#16324) Bump Julia FixedPointDecimals dependency version (duckdb/duckdb#16323)
1 parent 22cc7f0 commit e363fda

File tree

23 files changed

+179
-82
lines changed

23 files changed

+179
-82
lines changed

src/duckdb/extension/core_functions/aggregate/distributive/string_agg.cpp

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -44,41 +44,33 @@ struct StringAggFunction {
4444
if (!state.dataptr) {
4545
finalize_data.ReturnNull();
4646
} else {
47-
target = StringVector::AddString(finalize_data.result, state.dataptr, state.size);
48-
}
49-
}
50-
51-
template <class STATE>
52-
static void Destroy(STATE &state, AggregateInputData &aggr_input_data) {
53-
if (state.dataptr) {
54-
delete[] state.dataptr;
47+
target = string_t(state.dataptr, state.size);
5548
}
5649
}
5750

5851
static bool IgnoreNull() {
5952
return true;
6053
}
6154

62-
static inline void PerformOperation(StringAggState &state, const char *str, const char *sep, idx_t str_size,
63-
idx_t sep_size) {
55+
static inline void PerformOperation(StringAggState &state, ArenaAllocator &allocator, const char *str,
56+
const char *sep, idx_t str_size, idx_t sep_size) {
6457
if (!state.dataptr) {
6558
// first iteration: allocate space for the string and copy it into the state
6659
state.alloc_size = MaxValue<idx_t>(8, NextPowerOfTwo(str_size));
67-
state.dataptr = new char[state.alloc_size];
60+
state.dataptr = char_ptr_cast(allocator.Allocate(state.alloc_size));
6861
state.size = str_size;
6962
memcpy(state.dataptr, str, str_size);
7063
} else {
7164
// subsequent iteration: first check if we have space to place the string and separator
7265
idx_t required_size = state.size + str_size + sep_size;
7366
if (required_size > state.alloc_size) {
7467
// no space! allocate extra space
68+
const auto old_size = state.alloc_size;
7569
while (state.alloc_size < required_size) {
7670
state.alloc_size *= 2;
7771
}
78-
auto new_data = new char[state.alloc_size];
79-
memcpy(new_data, state.dataptr, state.size);
80-
delete[] state.dataptr;
81-
state.dataptr = new_data;
72+
state.dataptr =
73+
char_ptr_cast(allocator.Reallocate(data_ptr_cast(state.dataptr), old_size, state.alloc_size));
8274
}
8375
// copy the separator
8476
memcpy(state.dataptr + state.size, sep, sep_size);
@@ -89,14 +81,15 @@ struct StringAggFunction {
8981
}
9082
}
9183

92-
static inline void PerformOperation(StringAggState &state, string_t str, optional_ptr<FunctionData> data_p) {
84+
static inline void PerformOperation(StringAggState &state, ArenaAllocator &allocator, string_t str,
85+
optional_ptr<FunctionData> data_p) {
9386
auto &data = data_p->Cast<StringAggBindData>();
94-
PerformOperation(state, str.GetData(), data.sep.c_str(), str.GetSize(), data.sep.size());
87+
PerformOperation(state, allocator, str.GetData(), data.sep.c_str(), str.GetSize(), data.sep.size());
9588
}
9689

9790
template <class INPUT_TYPE, class STATE, class OP>
9891
static void Operation(STATE &state, const INPUT_TYPE &input, AggregateUnaryInput &unary_input) {
99-
PerformOperation(state, input, unary_input.input.bind_data);
92+
PerformOperation(state, unary_input.input.allocator, input, unary_input.input.bind_data);
10093
}
10194

10295
template <class INPUT_TYPE, class STATE, class OP>
@@ -113,8 +106,8 @@ struct StringAggFunction {
113106
// source is not set: skip combining
114107
return;
115108
}
116-
PerformOperation(target, string_t(source.dataptr, UnsafeNumericCast<uint32_t>(source.size)),
117-
aggr_input_data.bind_data);
109+
PerformOperation(target, aggr_input_data.allocator,
110+
string_t(source.dataptr, UnsafeNumericCast<uint32_t>(source.size)), aggr_input_data.bind_data);
118111
}
119112
};
120113

@@ -162,8 +155,7 @@ AggregateFunctionSet StringAggFun::GetFunctions() {
162155
AggregateFunction::UnaryScatterUpdate<StringAggState, string_t, StringAggFunction>,
163156
AggregateFunction::StateCombine<StringAggState, StringAggFunction>,
164157
AggregateFunction::StateFinalize<StringAggState, string_t, StringAggFunction>,
165-
AggregateFunction::UnaryUpdate<StringAggState, string_t, StringAggFunction>, StringAggBind,
166-
AggregateFunction::StateDestroy<StringAggState, StringAggFunction>);
158+
AggregateFunction::UnaryUpdate<StringAggState, string_t, StringAggFunction>, StringAggBind);
167159
string_agg_param.serialize = StringAggSerialize;
168160
string_agg_param.deserialize = StringAggDeserialize;
169161
string_agg.AddFunction(string_agg_param);

src/duckdb/extension/core_functions/aggregate/nested/list.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,6 @@ static void ListFinalize(Vector &states_vector, AggregateInputData &aggr_input_d
116116

117117
// first iterate over all entries and set up the list entries, and get the newly required total length
118118
for (idx_t i = 0; i < count; i++) {
119-
120119
auto &state = *states[states_data.sel->get_index(i)];
121120
const auto rid = i + offset;
122121
result_data[rid].offset = total_len;

src/duckdb/extension/core_functions/scalar/list/list_aggregates.cpp

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,17 @@
1515

1616
namespace duckdb {
1717

18-
// FIXME: use a local state for each thread to increase performance?
18+
struct ListAggregatesLocalState : public FunctionLocalState {
19+
explicit ListAggregatesLocalState(Allocator &allocator) : arena_allocator(allocator) {
20+
}
21+
22+
ArenaAllocator arena_allocator;
23+
};
24+
25+
unique_ptr<FunctionLocalState> ListAggregatesInitLocalState(ExpressionState &state, const BoundFunctionExpression &expr,
26+
FunctionData *bind_data) {
27+
return make_uniq<ListAggregatesLocalState>(BufferAllocator::Get(state.GetContext()));
28+
}
1929
// FIXME: benchmark the use of simple_update against using update (if applicable)
2030

2131
static unique_ptr<FunctionData> ListAggregatesBindFailure(ScalarFunction &bound_function) {
@@ -207,7 +217,8 @@ static void ListAggregatesFunction(DataChunk &args, ExpressionState &state, Vect
207217
auto &func_expr = state.expr.Cast<BoundFunctionExpression>();
208218
auto &info = func_expr.bind_info->Cast<ListAggregatesBindData>();
209219
auto &aggr = info.aggr_expr->Cast<BoundAggregateExpression>();
210-
ArenaAllocator allocator(Allocator::DefaultAllocator());
220+
auto &allocator = ExecuteFunctionState::GetFunctionState(state)->Cast<ListAggregatesLocalState>().arena_allocator;
221+
allocator.Reset();
211222
AggregateInputData aggr_input_data(aggr.bind_info.get(), allocator);
212223

213224
D_ASSERT(aggr.function.update);
@@ -511,8 +522,9 @@ static unique_ptr<FunctionData> ListUniqueBind(ClientContext &context, ScalarFun
511522
}
512523

513524
ScalarFunction ListAggregateFun::GetFunction() {
514-
auto result = ScalarFunction({LogicalType::LIST(LogicalType::ANY), LogicalType::VARCHAR}, LogicalType::ANY,
515-
ListAggregateFunction, ListAggregateBind);
525+
auto result =
526+
ScalarFunction({LogicalType::LIST(LogicalType::ANY), LogicalType::VARCHAR}, LogicalType::ANY,
527+
ListAggregateFunction, ListAggregateBind, nullptr, nullptr, ListAggregatesInitLocalState);
516528
BaseScalarFunction::SetReturnsError(result);
517529
result.null_handling = FunctionNullHandling::SPECIAL_HANDLING;
518530
result.varargs = LogicalType::ANY;
@@ -523,12 +535,12 @@ ScalarFunction ListAggregateFun::GetFunction() {
523535

524536
ScalarFunction ListDistinctFun::GetFunction() {
525537
return ScalarFunction({LogicalType::LIST(LogicalType::ANY)}, LogicalType::LIST(LogicalType::ANY),
526-
ListDistinctFunction, ListDistinctBind);
538+
ListDistinctFunction, ListDistinctBind, nullptr, nullptr, ListAggregatesInitLocalState);
527539
}
528540

529541
ScalarFunction ListUniqueFun::GetFunction() {
530542
return ScalarFunction({LogicalType::LIST(LogicalType::ANY)}, LogicalType::UBIGINT, ListUniqueFunction,
531-
ListUniqueBind);
543+
ListUniqueBind, nullptr, nullptr, ListAggregatesInitLocalState);
532544
}
533545

534546
} // namespace duckdb

src/duckdb/src/common/compressed_file_system.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ void CompressedFile::Initialize(bool write) {
3131
stream_data.out_buff_start = stream_data.out_buff.get();
3232
stream_data.out_buff_end = stream_data.out_buff.get();
3333

34+
current_position = 0;
35+
3436
stream_wrapper = compressed_fs.CreateStream();
3537
stream_wrapper->Initialize(*this, write);
3638
}

src/duckdb/src/execution/operator/aggregate/physical_window.cpp

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,11 @@ class WindowLocalSourceState : public LocalSourceState {
575575

576576
explicit WindowLocalSourceState(WindowGlobalSourceState &gsource);
577577

578+
void ReleaseLocalStates() {
579+
auto &local_states = window_hash_group->thread_states.at(task->thread_idx);
580+
local_states.clear();
581+
}
582+
578583
//! Does the task have more work to do?
579584
bool TaskFinished() const {
580585
return !task || task->begin_idx == task->end_idx;
@@ -792,6 +797,12 @@ void WindowGlobalSourceState::FinishTask(TaskPtr task) {
792797
}
793798

794799
bool WindowLocalSourceState::TryAssignTask() {
800+
D_ASSERT(TaskFinished());
801+
if (task && task->stage == WindowGroupStage::GETDATA) {
802+
// If this state completed the last block in the previous iteration,
803+
// release out local state memory.
804+
ReleaseLocalStates();
805+
}
795806
// Because downstream operators may be using our internal buffers,
796807
// we can't "finish" a task until we are about to get the next one.
797808

@@ -888,10 +899,6 @@ void WindowLocalSourceState::GetData(DataChunk &result) {
888899
++task->begin_idx;
889900
}
890901

891-
// If that was the last block, release out local state memory.
892-
if (TaskFinished()) {
893-
local_states.clear();
894-
}
895902
result.Verify();
896903
}
897904

src/duckdb/src/execution/operator/csv_scanner/sniffer/dialect_detection.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,7 @@ void CSVSniffer::RefineCandidates() {
491491

492492
for (idx_t i = 1; i <= options.sample_size_chunks; i++) {
493493
vector<unique_ptr<ColumnCountScanner>> successful_candidates;
494-
bool done = false;
494+
bool done = candidates.empty();
495495
for (auto &cur_candidate : candidates) {
496496
const bool finished_file = cur_candidate->FinishedFile();
497497
if (successful_candidates.empty()) {

src/duckdb/src/execution/operator/helper/physical_reservoir_sample.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ SourceResultType PhysicalReservoirSample::GetData(ExecutionContext &context, Dat
8686
return SourceResultType::FINISHED;
8787
}
8888
auto sample_chunk = sink.sample->GetChunk();
89+
8990
if (!sample_chunk) {
9091
return SourceResultType::FINISHED;
9192
}

src/duckdb/src/execution/operator/schema/physical_create_art_index.cpp

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ PhysicalCreateARTIndex::PhysicalCreateARTIndex(LogicalOperator &op, TableCatalog
3434

3535
class CreateARTIndexGlobalSinkState : public GlobalSinkState {
3636
public:
37+
//! We merge the local indexes into one global index.
3738
unique_ptr<BoundIndex> global_index;
3839
};
3940

@@ -53,8 +54,10 @@ class CreateARTIndexLocalSinkState : public LocalSinkState {
5354
};
5455

5556
unique_ptr<GlobalSinkState> PhysicalCreateARTIndex::GetGlobalSinkState(ClientContext &context) const {
56-
// Create the global sink state and add the global index.
57+
// Create the global sink state.
5758
auto state = make_uniq<CreateARTIndexGlobalSinkState>();
59+
60+
// Create the global index.
5861
auto &storage = table.GetStorage();
5962
state->global_index = make_uniq<ART>(info->index_name, info->constraint_type, storage_ids,
6063
TableIOManager::Get(storage), unbound_expressions, storage.db);
@@ -123,7 +126,6 @@ SinkResultType PhysicalCreateARTIndex::SinkSorted(OperatorSinkInput &input) cons
123126

124127
SinkResultType PhysicalCreateARTIndex::Sink(ExecutionContext &context, DataChunk &chunk,
125128
OperatorSinkInput &input) const {
126-
127129
D_ASSERT(chunk.ColumnCount() >= 2);
128130
auto &l_state = input.local_state.Cast<CreateARTIndexLocalSinkState>();
129131
l_state.arena_allocator.Reset();
@@ -151,11 +153,10 @@ SinkResultType PhysicalCreateARTIndex::Sink(ExecutionContext &context, DataChunk
151153

152154
SinkCombineResultType PhysicalCreateARTIndex::Combine(ExecutionContext &context,
153155
OperatorSinkCombineInput &input) const {
154-
155156
auto &g_state = input.global_state.Cast<CreateARTIndexGlobalSinkState>();
156-
auto &l_state = input.local_state.Cast<CreateARTIndexLocalSinkState>();
157157

158158
// Merge the local index into the global index.
159+
auto &l_state = input.local_state.Cast<CreateARTIndexLocalSinkState>();
159160
if (!g_state.global_index->MergeIndexes(*l_state.local_index)) {
160161
throw ConstraintException("Data contains duplicates on indexed column(s)");
161162
}
@@ -165,8 +166,6 @@ SinkCombineResultType PhysicalCreateARTIndex::Combine(ExecutionContext &context,
165166

166167
SinkFinalizeType PhysicalCreateARTIndex::Finalize(Pipeline &pipeline, Event &event, ClientContext &context,
167168
OperatorSinkFinalizeInput &input) const {
168-
169-
// Here, we set the resulting global index as the newly created index of the table.
170169
auto &state = input.global_state.Cast<CreateARTIndexGlobalSinkState>();
171170

172171
// Vacuum excess memory and verify.
@@ -182,7 +181,6 @@ SinkFinalizeType PhysicalCreateARTIndex::Finalize(Pipeline &pipeline, Event &eve
182181
auto &schema = table.schema;
183182
info->column_ids = storage_ids;
184183

185-
// FIXME: We should check for catalog exceptions prior to index creation, and later double-check.
186184
if (!alter_table_info) {
187185
// Ensure that the index does not yet exist in the catalog.
188186
auto entry = schema.GetEntry(schema.GetCatalogTransaction(context), CatalogType::INDEX_ENTRY, info->index_name);

src/duckdb/src/execution/physical_plan/plan_create_index.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,21 @@
66
#include "duckdb/planner/expression/bound_reference_expression.hpp"
77
#include "duckdb/planner/operator/logical_create_index.hpp"
88
#include "duckdb/planner/operator/logical_get.hpp"
9+
#include "duckdb/execution/operator/scan/physical_dummy_scan.hpp"
910

1011
namespace duckdb {
1112

1213
unique_ptr<PhysicalOperator> PhysicalPlanGenerator::CreatePlan(LogicalCreateIndex &op) {
14+
// Early-out, if the index already exists.
15+
auto &schema = op.table.schema;
16+
auto entry = schema.GetEntry(schema.GetCatalogTransaction(context), CatalogType::INDEX_ENTRY, op.info->index_name);
17+
if (entry) {
18+
if (op.info->on_conflict != OnCreateConflict::IGNORE_ON_CONFLICT) {
19+
throw CatalogException("Index with name \"%s\" already exists!", op.info->index_name);
20+
}
21+
return make_uniq<PhysicalDummyScan>(op.types, op.estimated_cardinality);
22+
}
23+
1324
// Ensure that all expressions contain valid scalar functions.
1425
// E.g., get_current_timestamp(), random(), and sequence values cannot be index keys.
1526
for (idx_t i = 0; i < op.unbound_expressions.size(); i++) {

0 commit comments

Comments
 (0)