@@ -93,11 +93,17 @@ SourceResultType DuckLakeCompaction::GetDataInternal(ExecutionContext &context,
9393 }
9494 source_state.returned_result = true ;
9595
96+ if (!this ->sink_state ) {
97+ throw InternalException (" DuckLakeCompaction - missing sink state while producing result" );
98+ }
99+ auto &gstate = this ->sink_state ->Cast <DuckLakeInsertGlobalState>();
100+ auto files_created = gstate.written_files .size ();
101+
96102 chunk.SetCardinality (1 );
97103 chunk.SetValue (0 , 0 , Value (table.schema .name ));
98104 chunk.SetValue (1 , 0 , Value (table.name ));
99105 chunk.SetValue (2 , 0 , Value::BIGINT (static_cast <int64_t >(source_files.size ())));
100- chunk.SetValue (3 , 0 , Value::BIGINT (1 )); // Each compaction creates 1 output file
106+ chunk.SetValue (3 , 0 , Value::BIGINT (static_cast < int64_t >(files_created)));
101107 return SourceResultType::FINISHED ;
102108}
103109
@@ -121,7 +127,10 @@ SinkFinalizeType DuckLakeCompaction::Finalize(Pipeline &pipeline, Event &event,
121127 OperatorSinkFinalizeInput &input) const {
122128 auto &global_state = input.global_state .Cast <DuckLakeInsertGlobalState>();
123129
124- if (global_state.written_files .size () != 1 ) {
130+ if (global_state.written_files .size () > 1 ) {
131+ throw InternalException (" DuckLakeCompaction - expected at most a single output file" );
132+ }
133+ if (global_state.written_files .empty () && type != CompactionType::REWRITE_DELETES ) {
125134 throw InternalException (" DuckLakeCompaction - expected a single output file" );
126135 }
127136 // set the partition values correctly
@@ -137,7 +146,9 @@ SinkFinalizeType DuckLakeCompaction::Finalize(Pipeline &pipeline, Event &event,
137146 DuckLakeCompactionEntry compaction_entry;
138147 compaction_entry.row_id_start = row_id_start;
139148 compaction_entry.source_files = source_files;
140- compaction_entry.written_file = global_state.written_files [0 ];
149+ if (!global_state.written_files .empty ()) {
150+ compaction_entry.written_file = global_state.written_files [0 ];
151+ }
141152 compaction_entry.type = type;
142153
143154 auto &transaction = DuckLakeTransaction::Get (context, global_state.table .catalog );
@@ -305,6 +316,9 @@ void DuckLakeCompactor::GenerateCompactions(DuckLakeTableEntry &table,
305316 break ;
306317 }
307318 }
319+ if (compacted_files >= options.max_files ) {
320+ break ;
321+ }
308322 }
309323}
310324
@@ -560,7 +574,7 @@ DuckLakeCompactor::GenerateCompactionCommand(vector<DuckLakeCompactionFileEntry>
560574 copy->filename_pattern = std::move (copy_options.filename_pattern );
561575 copy->file_extension = std::move (copy_options.file_extension );
562576 copy->overwrite_mode = copy_options.overwrite_mode ;
563- copy->per_thread_output = copy_options. per_thread_output ;
577+ copy->per_thread_output = false ;
564578 copy->file_size_bytes = copy_options.file_size_bytes ;
565579 copy->rotate = copy_options.rotate ;
566580 copy->return_type = copy_options.return_type ;
0 commit comments