Skip to content

Commit b112f99

Browse files
committed
fix comments
1 parent 2c47060 commit b112f99

File tree

3 files changed

+205
-101
lines changed

3 files changed

+205
-101
lines changed
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink.sink.coordinator;
20+
21+
import org.apache.paimon.Snapshot;
22+
import org.apache.paimon.data.BinaryRow;
23+
import org.apache.paimon.index.IndexFileHandler;
24+
import org.apache.paimon.index.IndexFileMeta;
25+
import org.apache.paimon.io.DataFileMeta;
26+
import org.apache.paimon.manifest.ManifestEntry;
27+
import org.apache.paimon.operation.FileStoreScan;
28+
import org.apache.paimon.operation.write.WriteRestore;
29+
import org.apache.paimon.table.FileStoreTable;
30+
31+
import java.io.IOException;
32+
import java.util.ArrayList;
33+
import java.util.List;
34+
import java.util.Map;
35+
import java.util.Optional;
36+
import java.util.concurrent.ConcurrentHashMap;
37+
38+
import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
39+
import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
40+
41+
/**
42+
* Coordinator for a table, to use a single point to obtain the list of initialization files
43+
* required for write operators.
44+
*/
45+
public class TableWriteCoordinator {
46+
47+
private final FileStoreTable table;
48+
private final Map<String, Long> latestCommittedIdentifiers;
49+
private final FileStoreScan scan;
50+
private final IndexFileHandler indexFileHandler;
51+
52+
private volatile Snapshot snapshot;
53+
54+
public TableWriteCoordinator(FileStoreTable table) {
55+
this.table = table;
56+
this.latestCommittedIdentifiers = new ConcurrentHashMap<>();
57+
this.scan = table.store().newScan();
58+
if (table.coreOptions().manifestDeleteFileDropStats()) {
59+
scan.dropStats();
60+
}
61+
this.indexFileHandler = table.store().newIndexFileHandler();
62+
refresh();
63+
}
64+
65+
private synchronized void refresh() {
66+
Optional<Snapshot> latestSnapshot = table.latestSnapshot();
67+
if (!latestSnapshot.isPresent()) {
68+
return;
69+
}
70+
this.snapshot = latestSnapshot.get();
71+
this.scan.withSnapshot(snapshot);
72+
}
73+
74+
public synchronized ScanCoordinationResponse scan(ScanCoordinationRequest request)
75+
throws IOException {
76+
if (snapshot == null) {
77+
return new ScanCoordinationResponse(null, null, null, null, null);
78+
}
79+
80+
BinaryRow partition = deserializeBinaryRow(request.partition());
81+
int bucket = request.bucket();
82+
83+
List<DataFileMeta> restoreFiles = new ArrayList<>();
84+
List<ManifestEntry> entries = scan.withPartitionBucket(partition, bucket).plan().files();
85+
Integer totalBuckets = WriteRestore.extractDataFiles(entries, restoreFiles);
86+
87+
IndexFileMeta dynamicBucketIndex = null;
88+
if (request.scanDynamicBucketIndex()) {
89+
dynamicBucketIndex =
90+
indexFileHandler.scanHashIndex(snapshot, partition, bucket).orElse(null);
91+
}
92+
93+
List<IndexFileMeta> deleteVectorsIndex = null;
94+
if (request.scanDeleteVectorsIndex()) {
95+
deleteVectorsIndex =
96+
indexFileHandler.scan(snapshot, DELETION_VECTORS_INDEX, partition, bucket);
97+
}
98+
99+
return new ScanCoordinationResponse(
100+
snapshot, totalBuckets, restoreFiles, dynamicBucketIndex, deleteVectorsIndex);
101+
}
102+
103+
public synchronized long latestCommittedIdentifier(String user) {
104+
return latestCommittedIdentifiers.computeIfAbsent(user, this::computeLatestIdentifier);
105+
}
106+
107+
private synchronized long computeLatestIdentifier(String user) {
108+
Optional<Snapshot> snapshotOptional = table.snapshotManager().latestSnapshotOfUser(user);
109+
if (!snapshotOptional.isPresent()) {
110+
return Long.MIN_VALUE;
111+
}
112+
113+
Snapshot latestSnapshotOfUser = snapshotOptional.get();
114+
if (snapshot == null || latestSnapshotOfUser.id() > snapshot.id()) {
115+
snapshot = latestSnapshotOfUser;
116+
}
117+
return latestSnapshotOfUser.commitIdentifier();
118+
}
119+
120+
public void checkpoint() {
121+
// refresh latest snapshot for data & index files scan
122+
refresh();
123+
// refresh latest committed identifiers for all users
124+
latestCommittedIdentifiers.clear();
125+
}
126+
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/WriteOperatorCoordinator.java

Lines changed: 7 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,8 @@
1818

1919
package org.apache.paimon.flink.sink.coordinator;
2020

21-
import org.apache.paimon.CoreOptions;
22-
import org.apache.paimon.Snapshot;
23-
import org.apache.paimon.data.BinaryRow;
2421
import org.apache.paimon.flink.sink.TableWriteOperator;
25-
import org.apache.paimon.fs.Path;
26-
import org.apache.paimon.index.IndexFileHandler;
27-
import org.apache.paimon.index.IndexFileMeta;
28-
import org.apache.paimon.io.DataFileMeta;
29-
import org.apache.paimon.manifest.ManifestEntry;
30-
import org.apache.paimon.operation.FileStoreScan;
31-
import org.apache.paimon.operation.write.WriteRestore;
32-
import org.apache.paimon.options.MemorySize;
3322
import org.apache.paimon.table.FileStoreTable;
34-
import org.apache.paimon.utils.SegmentsCache;
3523

3624
import org.apache.flink.runtime.jobgraph.OperatorID;
3725
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
@@ -40,18 +28,9 @@
4028
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
4129
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
4230

43-
import java.io.IOException;
44-
import java.util.ArrayList;
45-
import java.util.List;
46-
import java.util.Map;
47-
import java.util.Optional;
4831
import java.util.concurrent.CompletableFuture;
49-
import java.util.concurrent.ConcurrentHashMap;
5032
import java.util.concurrent.ThreadPoolExecutor;
5133

52-
import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
53-
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_COORDINATOR_CACHE_MEMORY;
54-
import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
5534
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
5635

5736
/**
@@ -63,88 +42,16 @@ public class WriteOperatorCoordinator implements OperatorCoordinator, Coordinati
6342
private final FileStoreTable table;
6443

6544
private ThreadPoolExecutor executor;
66-
private Map<String, Long> latestCommittedIdentifiers;
67-
68-
private volatile Snapshot snapshot;
69-
private volatile FileStoreScan dataFileScan;
70-
private volatile IndexFileHandler indexFileHandler;
45+
private TableWriteCoordinator coordinator;
7146

7247
public WriteOperatorCoordinator(FileStoreTable table) {
7348
this.table = table;
7449
}
7550

76-
private synchronized void refreshOrCreateScan() {
77-
Optional<Snapshot> latestSnapshot = table.latestSnapshot();
78-
if (!latestSnapshot.isPresent()) {
79-
return;
80-
}
81-
if (dataFileScan == null) {
82-
dataFileScan = table.store().newScan();
83-
if (table.coreOptions().manifestDeleteFileDropStats()) {
84-
dataFileScan.dropStats();
85-
}
86-
}
87-
if (indexFileHandler == null) {
88-
indexFileHandler = table.store().newIndexFileHandler();
89-
}
90-
snapshot = latestSnapshot.get();
91-
dataFileScan.withSnapshot(snapshot);
92-
}
93-
94-
private synchronized ScanCoordinationResponse scanDataFiles(ScanCoordinationRequest request)
95-
throws IOException {
96-
if (snapshot == null) {
97-
return new ScanCoordinationResponse(null, null, null, null, null);
98-
}
99-
100-
BinaryRow partition = deserializeBinaryRow(request.partition());
101-
int bucket = request.bucket();
102-
103-
List<DataFileMeta> restoreFiles = new ArrayList<>();
104-
List<ManifestEntry> entries =
105-
dataFileScan.withPartitionBucket(partition, bucket).plan().files();
106-
Integer totalBuckets = WriteRestore.extractDataFiles(entries, restoreFiles);
107-
108-
IndexFileMeta dynamicBucketIndex = null;
109-
if (request.scanDynamicBucketIndex()) {
110-
dynamicBucketIndex =
111-
indexFileHandler.scanHashIndex(snapshot, partition, bucket).orElse(null);
112-
}
113-
114-
List<IndexFileMeta> deleteVectorsIndex = null;
115-
if (request.scanDeleteVectorsIndex()) {
116-
deleteVectorsIndex =
117-
indexFileHandler.scan(snapshot, DELETION_VECTORS_INDEX, partition, bucket);
118-
}
119-
120-
return new ScanCoordinationResponse(
121-
snapshot, totalBuckets, restoreFiles, dynamicBucketIndex, deleteVectorsIndex);
122-
}
123-
124-
private synchronized LatestIdentifierResponse latestCommittedIdentifier(
125-
LatestIdentifierRequest request) {
126-
String user = request.user();
127-
long identifier =
128-
latestCommittedIdentifiers.computeIfAbsent(
129-
user,
130-
k ->
131-
table.snapshotManager()
132-
.latestSnapshotOfUser(user)
133-
.map(Snapshot::commitIdentifier)
134-
.orElse(Long.MIN_VALUE));
135-
return new LatestIdentifierResponse(identifier);
136-
}
137-
13851
@Override
13952
public void start() throws Exception {
14053
executor = createCachedThreadPool(1, "WriteCoordinator");
141-
latestCommittedIdentifiers = new ConcurrentHashMap<>();
142-
CoreOptions options = table.coreOptions();
143-
MemorySize cacheMemory =
144-
options.toConfiguration().get(SINK_WRITER_COORDINATOR_CACHE_MEMORY);
145-
SegmentsCache<Path> manifestCache = SegmentsCache.create(cacheMemory, Long.MAX_VALUE);
146-
table.setManifestCache(manifestCache);
147-
refreshOrCreateScan();
54+
coordinator = new TableWriteCoordinator(table);
14855
}
14956

15057
@Override
@@ -163,10 +70,12 @@ public CompletableFuture<CoordinationResponse> handleCoordinationRequest(
16370
() -> {
16471
try {
16572
if (request instanceof ScanCoordinationRequest) {
166-
future.complete(scanDataFiles((ScanCoordinationRequest) request));
73+
future.complete(coordinator.scan((ScanCoordinationRequest) request));
16774
} else if (request instanceof LatestIdentifierRequest) {
16875
future.complete(
169-
latestCommittedIdentifier((LatestIdentifierRequest) request));
76+
new LatestIdentifierResponse(
77+
coordinator.latestCommittedIdentifier(
78+
((LatestIdentifierRequest) request).user())));
17079
} else {
17180
throw new UnsupportedOperationException(
17281
"Unsupported request type: " + request);
@@ -180,10 +89,7 @@ public CompletableFuture<CoordinationResponse> handleCoordinationRequest(
18089

18190
@Override
18291
public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) {
183-
// refresh latest snapshot for data & index files scan
184-
refreshOrCreateScan();
185-
// refresh latest committed identifiers for all users
186-
latestCommittedIdentifiers.clear();
92+
coordinator.checkpoint();
18793
}
18894

18995
@Override
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink.sink.coordinator;
20+
21+
import org.apache.paimon.Snapshot;
22+
import org.apache.paimon.catalog.Identifier;
23+
import org.apache.paimon.data.GenericRow;
24+
import org.apache.paimon.schema.Schema;
25+
import org.apache.paimon.table.FileStoreTable;
26+
import org.apache.paimon.table.TableTestBase;
27+
import org.apache.paimon.types.DataTypes;
28+
29+
import org.junit.jupiter.api.Test;
30+
31+
import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
32+
import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
33+
import static org.assertj.core.api.Assertions.assertThat;
34+
35+
class TableWriteCoordinatorTest extends TableTestBase {
36+
37+
@Test
38+
public void testLatestIdentifierAndScan1() throws Exception {
39+
innerTestLatestIdentifierAndScan(true);
40+
}
41+
42+
@Test
43+
public void testLatestIdentifierAndScan2() throws Exception {
44+
innerTestLatestIdentifierAndScan(false);
45+
}
46+
47+
private void innerTestLatestIdentifierAndScan(boolean initSnapshot) throws Exception {
48+
Identifier identifier = new Identifier("db", "table");
49+
Schema schema = Schema.newBuilder().column("f0", DataTypes.INT()).build();
50+
catalog.createDatabase("db", false);
51+
catalog.createTable(identifier, schema, false);
52+
FileStoreTable table = getTable(identifier);
53+
54+
// initial with snapshot 1
55+
if (initSnapshot) {
56+
write(table, GenericRow.of(1));
57+
}
58+
TableWriteCoordinator coordinator = new TableWriteCoordinator(table);
59+
60+
// latest snapshot get snapshot 2
61+
write(table, GenericRow.of(1));
62+
Snapshot latest = table.latestSnapshot().get();
63+
String commitUser = latest.commitUser();
64+
coordinator.latestCommittedIdentifier(commitUser);
65+
66+
// scan
67+
ScanCoordinationRequest request =
68+
new ScanCoordinationRequest(serializeBinaryRow(EMPTY_ROW), 0, false, false);
69+
ScanCoordinationResponse scan = coordinator.scan(request);
70+
assertThat(scan.snapshot().id()).isEqualTo(latest.id());
71+
}
72+
}

0 commit comments

Comments
 (0)