Skip to content

Commit 503a15f

Browse files
committed
[core] Introduce Flink 'CreateGlobalIndexProcedure'
1 parent 135360d commit 503a15f

File tree

15 files changed

+560
-67
lines changed

15 files changed

+560
-67
lines changed

paimon-common/src/main/java/org/apache/paimon/utils/ParameterUtils.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,16 @@
1818

1919
package org.apache.paimon.utils;
2020

21+
import org.apache.paimon.predicate.Predicate;
22+
import org.apache.paimon.predicate.PredicateBuilder;
2123
import org.apache.paimon.types.DataField;
2224
import org.apache.paimon.types.DataTypeJsonParser;
25+
import org.apache.paimon.types.RowType;
2326

2427
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
2528

29+
import javax.annotation.Nullable;
30+
2631
import java.util.ArrayList;
2732
import java.util.HashMap;
2833
import java.util.List;
@@ -39,6 +44,23 @@ public static List<Map<String, String>> getPartitions(String... partitionStrings
3944
return partitions;
4045
}
4146

47+
@Nullable
48+
public static Predicate toPartitionPredicate(
49+
List<Map<String, String>> partitionList,
50+
RowType partitionType,
51+
String partitionDefaultName) {
52+
if (partitionList == null || partitionList.isEmpty()) {
53+
return null;
54+
}
55+
return PredicateBuilder.or(
56+
partitionList.stream()
57+
.map(
58+
p ->
59+
PredicateBuilder.partition(
60+
p, partitionType, partitionDefaultName))
61+
.toArray(Predicate[]::new));
62+
}
63+
4264
public static Map<String, String> parseCommaSeparatedKeyValues(String keyValues) {
4365
Map<String, String> kvs = new HashMap<>();
4466
if (!StringUtils.isNullOrWhitespaceOnly(keyValues)) {

paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,14 @@ public List<CommitMessage> build(List<DataSplit> splits, IOManager ioManager)
178178
return result;
179179
}
180180

181+
public long recordsPerRange() {
182+
return recordsPerRange;
183+
}
184+
185+
public RowIdIndexFieldsExtractor extractor() {
186+
return extractor;
187+
}
188+
181189
public List<CommitMessage> build(Range rowRange, Iterator<InternalRow> data)
182190
throws IOException {
183191
long counter = 0;
@@ -218,7 +226,7 @@ public List<CommitMessage> build(Range rowRange, Iterator<InternalRow> data)
218226
return commitMessages;
219227
}
220228

221-
private GlobalIndexParallelWriter createWriter() throws IOException {
229+
public GlobalIndexParallelWriter createWriter() throws IOException {
222230
GlobalIndexParallelWriter currentWriter;
223231
GlobalIndexWriter indexWriter = createIndexWriter(table, indexType, indexField, options);
224232
if (!(indexWriter instanceof GlobalIndexParallelWriter)) {
@@ -230,7 +238,7 @@ private GlobalIndexParallelWriter createWriter() throws IOException {
230238
return currentWriter;
231239
}
232240

233-
private CommitMessage flushIndex(
241+
public CommitMessage flushIndex(
234242
Range rowRange, List<ResultEntry> resultEntries, BinaryRow partition)
235243
throws IOException {
236244
List<IndexFileMeta> indexFileMetas =

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,13 @@ public void build() throws Exception {
127127
.setRangeNumber(sinkParallelism * 10)
128128
.build();
129129

130-
TableSorter sorter = TableSorter.getSorter(env, source, fileStoreTable, sortInfo);
130+
TableSorter sorter =
131+
TableSorter.getSorter(
132+
env,
133+
source,
134+
fileStoreTable.coreOptions(),
135+
fileStoreTable.rowType(),
136+
sortInfo);
131137

132138
new SortCompactSinkBuilder(fileStoreTable)
133139
.forCompact(true)
Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink.btree;
20+
21+
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.data.BinaryRow;
23+
import org.apache.paimon.data.InternalRow;
24+
import org.apache.paimon.flink.FlinkRowData;
25+
import org.apache.paimon.flink.FlinkRowWrapper;
26+
import org.apache.paimon.flink.LogicalTypeConversion;
27+
import org.apache.paimon.flink.sink.Committable;
28+
import org.apache.paimon.flink.sink.CommittableTypeInfo;
29+
import org.apache.paimon.flink.sink.CommitterOperatorFactory;
30+
import org.apache.paimon.flink.sink.NoopCommittableStateManager;
31+
import org.apache.paimon.flink.sink.StoreCommitter;
32+
import org.apache.paimon.flink.sorter.TableSortInfo;
33+
import org.apache.paimon.flink.sorter.TableSorter;
34+
import org.apache.paimon.flink.utils.BoundedOneInputOperator;
35+
import org.apache.paimon.flink.utils.JavaTypeInfo;
36+
import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
37+
import org.apache.paimon.globalindex.RowIdIndexFieldsExtractor;
38+
import org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder;
39+
import org.apache.paimon.globalindex.btree.BTreeIndexOptions;
40+
import org.apache.paimon.options.Options;
41+
import org.apache.paimon.partition.PartitionPredicate;
42+
import org.apache.paimon.reader.RecordReader;
43+
import org.apache.paimon.table.FileStoreTable;
44+
import org.apache.paimon.table.SpecialFields;
45+
import org.apache.paimon.table.sink.BatchWriteBuilder;
46+
import org.apache.paimon.table.sink.CommitMessage;
47+
import org.apache.paimon.table.source.DataSplit;
48+
import org.apache.paimon.table.source.ReadBuilder;
49+
import org.apache.paimon.table.source.TableRead;
50+
import org.apache.paimon.types.RowType;
51+
import org.apache.paimon.utils.Range;
52+
53+
import org.apache.flink.streaming.api.datastream.DataStream;
54+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
55+
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
56+
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
57+
import org.apache.flink.table.data.RowData;
58+
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
59+
60+
import java.io.IOException;
61+
import java.util.ArrayList;
62+
import java.util.List;
63+
import java.util.Objects;
64+
65+
import static org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder.calcRowRange;
66+
67+
/** The {@link BTreeIndexTopoBuilder} for BTree index in Flink. */
68+
public class BTreeIndexTopoBuilder {
69+
70+
public static void buildIndex(
71+
StreamExecutionEnvironment env,
72+
FileStoreTable table,
73+
String indexColumn,
74+
PartitionPredicate partitionPredicate,
75+
Options userOptions)
76+
throws Exception {
77+
// 1. Create BTree index builder and scan splits
78+
BTreeGlobalIndexBuilder indexBuilder =
79+
new BTreeGlobalIndexBuilder(table)
80+
.withIndexType("btree")
81+
.withIndexField(indexColumn);
82+
if (partitionPredicate != null) {
83+
indexBuilder = indexBuilder.withPartitionPredicate(partitionPredicate);
84+
}
85+
86+
List<DataSplit> splits = indexBuilder.scan();
87+
Range range = calcRowRange(splits);
88+
if (splits.isEmpty() || range == null) {
89+
return;
90+
}
91+
92+
// 2. Select necessary columns (partition keys + index field + ROW_ID)
93+
List<String> selectedColumns = new ArrayList<>(table.partitionKeys());
94+
selectedColumns.add(indexColumn);
95+
96+
RowType readType = SpecialFields.rowTypeWithRowId(table.rowType().project(selectedColumns));
97+
98+
// 3. Calculate parallelism and sort
99+
long recordsPerRange = userOptions.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE);
100+
int parallelism = Math.max((int) (range.count() / recordsPerRange), 1);
101+
int maxParallelism = userOptions.get(BTreeIndexOptions.BTREE_INDEX_BUILD_MAX_PARALLELISM);
102+
parallelism = Math.min(parallelism, maxParallelism);
103+
104+
// 4. Create source from splits and select columns
105+
DataStream<DataSplit> sourceStream =
106+
env.fromData(new JavaTypeInfo<>(DataSplit.class), splits.toArray(new DataSplit[0]))
107+
.name("Global Index Source")
108+
.setParallelism(1);
109+
110+
ReadBuilder readBuilder = table.newReadBuilder().withReadType(readType);
111+
DataStream<RowData> rowDataStream =
112+
sourceStream
113+
.transform(
114+
"Read Data",
115+
InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(readType)),
116+
new ReadDataOperator(readBuilder))
117+
.setParallelism(parallelism);
118+
119+
// 5. Sort data using TableSorter style
120+
// Configure sort info similar to SortCompactAction
121+
CoreOptions coreOptions = table.coreOptions();
122+
TableSortInfo sortInfo =
123+
new TableSortInfo.Builder()
124+
.setSortColumns(selectedColumns)
125+
.setSortStrategy(CoreOptions.OrderType.ORDER)
126+
.setSinkParallelism(parallelism)
127+
.setLocalSampleSize(parallelism * coreOptions.getLocalSampleMagnification())
128+
.setGlobalSampleSize(parallelism * 1000)
129+
.setRangeNumber(parallelism * 10)
130+
.build();
131+
132+
// Use TableSorter for sorting
133+
TableSorter sorter =
134+
TableSorter.getSorter(env, rowDataStream, coreOptions, readType, sortInfo);
135+
DataStream<RowData> sortedStream = sorter.sort();
136+
137+
// 6. Build index for each partition
138+
DataStream<Committable> commitMessages =
139+
sortedStream
140+
.transform(
141+
"write-btree-index",
142+
new CommittableTypeInfo(),
143+
new WriteIndexOperator(range, indexBuilder))
144+
.setParallelism(parallelism);
145+
146+
// 7. Commit all commit messages
147+
commit(table, commitMessages);
148+
149+
env.execute("Create btree global index for table: " + table.name());
150+
}
151+
152+
private static void commit(FileStoreTable table, DataStream<Committable> written) {
153+
OneInputStreamOperatorFactory<Committable, Committable> committerOperator =
154+
new CommitterOperatorFactory<>(
155+
false,
156+
true,
157+
"BTreeIndexCommitter",
158+
context ->
159+
new StoreCommitter(
160+
table, table.newCommit(context.commitUser()), context),
161+
new NoopCommittableStateManager());
162+
163+
written.transform("COMMIT OPERATOR", new CommittableTypeInfo(), committerOperator)
164+
.setParallelism(1)
165+
.setMaxParallelism(1);
166+
}
167+
168+
/** Operator to read data from splits. */
169+
private static class ReadDataOperator
170+
extends org.apache.flink.table.runtime.operators.TableStreamOperator<RowData>
171+
implements org.apache.flink.streaming.api.operators.OneInputStreamOperator<
172+
DataSplit, RowData> {
173+
174+
private static final long serialVersionUID = 1L;
175+
176+
private final ReadBuilder readBuilder;
177+
178+
private transient TableRead tableRead;
179+
180+
public ReadDataOperator(ReadBuilder readBuilder) {
181+
this.readBuilder = readBuilder;
182+
}
183+
184+
@Override
185+
public void open() throws Exception {
186+
super.open();
187+
this.tableRead = readBuilder.newRead();
188+
}
189+
190+
@Override
191+
public void processElement(StreamRecord<DataSplit> element) throws Exception {
192+
DataSplit split = element.getValue();
193+
try (RecordReader<InternalRow> reader = tableRead.createReader(split)) {
194+
reader.forEachRemaining(
195+
row -> output.collect(new StreamRecord<>(new FlinkRowData(row))));
196+
}
197+
}
198+
}
199+
200+
private static class WriteIndexOperator extends BoundedOneInputOperator<RowData, Committable> {
201+
202+
private final Range rowRange;
203+
private final BTreeGlobalIndexBuilder builder;
204+
205+
private transient long counter;
206+
private transient BinaryRow currentPart;
207+
private transient GlobalIndexParallelWriter currentWriter;
208+
private transient List<CommitMessage> commitMessages;
209+
210+
public WriteIndexOperator(Range rowRange, BTreeGlobalIndexBuilder builder) {
211+
this.rowRange = rowRange;
212+
this.builder = builder;
213+
}
214+
215+
@Override
216+
public void open() throws Exception {
217+
super.open();
218+
commitMessages = new ArrayList<>();
219+
}
220+
221+
@Override
222+
public void processElement(StreamRecord<RowData> element) throws IOException {
223+
InternalRow row = new FlinkRowWrapper(element.getValue());
224+
225+
RowIdIndexFieldsExtractor extractor = builder.extractor();
226+
BinaryRow partRow = extractor.extractPartition(row);
227+
228+
// the input is sorted by <partition, indexedField>
229+
if (currentWriter != null) {
230+
if (!Objects.equals(partRow, currentPart) || counter >= builder.recordsPerRange()) {
231+
commitMessages.add(
232+
builder.flushIndex(rowRange, currentWriter.finish(), currentPart));
233+
currentWriter = null;
234+
counter = 0;
235+
}
236+
}
237+
238+
// write <value, rowId> pair to index file
239+
currentPart = partRow;
240+
counter++;
241+
242+
if (currentWriter == null) {
243+
currentWriter = builder.createWriter();
244+
}
245+
246+
// convert the original rowId to local rowId
247+
long localRowId = extractor.extractRowId(row) - rowRange.from;
248+
currentWriter.write(extractor.extractIndexField(row), localRowId);
249+
}
250+
251+
@Override
252+
public void endInput() throws IOException {
253+
if (counter > 0) {
254+
commitMessages.add(
255+
builder.flushIndex(rowRange, currentWriter.finish(), currentPart));
256+
}
257+
for (CommitMessage message : commitMessages) {
258+
output.collect(
259+
new StreamRecord<>(
260+
new Committable(BatchWriteBuilder.COMMIT_IDENTIFIER, message)));
261+
}
262+
commitMessages.clear();
263+
}
264+
}
265+
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,13 @@ protected List<DataStream<Committable>> buildCompactOperator(
146146
.setRangeNumber(sinkParallelism * 10)
147147
.build();
148148
DataStream<RowData> sorted =
149-
TableSorter.getSorter(env, sourcePair.getLeft(), table, sortInfo).sort();
149+
TableSorter.getSorter(
150+
env,
151+
sourcePair.getLeft(),
152+
table.coreOptions(),
153+
table.rowType(),
154+
sortInfo)
155+
.sort();
150156

151157
// 2.3 write and then reorganize the committable
152158
// set parallelism to null, and it'll forward parallelism when doWrite()

0 commit comments

Comments
 (0)