Skip to content

Commit 1e93c27

Browse files
committed
fix
1 parent a208ffa commit 1e93c27

File tree

9 files changed

+245
-21
lines changed

9 files changed

+245
-21
lines changed

paimon-core/src/main/java/org/apache/paimon/catalog/SupportsSnapshots.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.paimon.Snapshot;
2222
import org.apache.paimon.partition.Partition;
23+
import org.apache.paimon.table.TableSnapshot;
2324

2425
import java.util.List;
2526
import java.util.Optional;
@@ -46,5 +47,6 @@ boolean commitSnapshot(Identifier identifier, Snapshot snapshot, List<Partition>
4647
* @return The requested snapshot of the table
4748
* @throws Catalog.TableNotExistException if the target does not exist
4849
*/
49-
Optional<Snapshot> loadSnapshot(Identifier identifier) throws Catalog.TableNotExistException;
50+
Optional<TableSnapshot> loadSnapshot(Identifier identifier)
51+
throws Catalog.TableNotExistException;
5052
}

paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import org.apache.paimon.schema.SchemaChange;
7979
import org.apache.paimon.schema.TableSchema;
8080
import org.apache.paimon.table.Table;
81+
import org.apache.paimon.table.TableSnapshot;
8182
import org.apache.paimon.table.sink.BatchTableCommit;
8283
import org.apache.paimon.utils.Pair;
8384
import org.apache.paimon.view.View;
@@ -349,7 +350,8 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
349350
}
350351

351352
@Override
352-
public Optional<Snapshot> loadSnapshot(Identifier identifier) throws TableNotExistException {
353+
public Optional<TableSnapshot> loadSnapshot(Identifier identifier)
354+
throws TableNotExistException {
353355
GetTableSnapshotResponse response;
354356
try {
355357
response =

paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableSnapshotResponse.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818

1919
package org.apache.paimon.rest.responses;
2020

21-
import org.apache.paimon.Snapshot;
2221
import org.apache.paimon.rest.RESTResponse;
22+
import org.apache.paimon.table.TableSnapshot;
2323

2424
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
2525
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
@@ -33,15 +33,15 @@ public class GetTableSnapshotResponse implements RESTResponse {
3333
private static final String FIELD_SNAPSHOT = "snapshot";
3434

3535
@JsonProperty(FIELD_SNAPSHOT)
36-
private final Snapshot snapshot;
36+
private final TableSnapshot snapshot;
3737

3838
@JsonCreator
39-
public GetTableSnapshotResponse(@JsonProperty(FIELD_SNAPSHOT) Snapshot snapshot) {
39+
public GetTableSnapshotResponse(@JsonProperty(FIELD_SNAPSHOT) TableSnapshot snapshot) {
4040
this.snapshot = snapshot;
4141
}
4242

4343
@JsonGetter(FIELD_SNAPSHOT)
44-
public Snapshot getSnapshot() {
44+
public TableSnapshot getSnapshot() {
4545
return snapshot;
4646
}
4747
}
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.table;
20+
21+
import org.apache.paimon.Snapshot;
22+
import org.apache.paimon.annotation.Public;
23+
24+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
25+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
26+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
27+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
28+
29+
import java.io.Serializable;
30+
import java.util.Objects;
31+
32+
/** Snapshot of a table, including basic statistics of this table. */
33+
@JsonIgnoreProperties(ignoreUnknown = true)
34+
@Public
35+
public class TableSnapshot implements Serializable {
36+
37+
private static final long serialVersionUID = 1L;
38+
39+
public static final String FIELD_SNAPSHOT = "snapshot";
40+
public static final String FIELD_RECORD_COUNT = "recordCount";
41+
public static final String FIELD_FILE_SIZE_IN_BYTES = "fileSizeInBytes";
42+
public static final String FIELD_FILE_COUNT = "fileCount";
43+
public static final String FIELD_LAST_FILE_CREATION_TIME = "lastFileCreationTime";
44+
45+
@JsonProperty(FIELD_SNAPSHOT)
46+
private final Snapshot snapshot;
47+
48+
@JsonProperty(FIELD_RECORD_COUNT)
49+
private final long recordCount;
50+
51+
@JsonProperty(FIELD_FILE_SIZE_IN_BYTES)
52+
private final long fileSizeInBytes;
53+
54+
@JsonProperty(FIELD_FILE_COUNT)
55+
private final long fileCount;
56+
57+
@JsonProperty(FIELD_LAST_FILE_CREATION_TIME)
58+
private final long lastFileCreationTime;
59+
60+
@JsonCreator
61+
public TableSnapshot(
62+
@JsonProperty(FIELD_SNAPSHOT) Snapshot snapshot,
63+
@JsonProperty(FIELD_RECORD_COUNT) long recordCount,
64+
@JsonProperty(FIELD_FILE_SIZE_IN_BYTES) long fileSizeInBytes,
65+
@JsonProperty(FIELD_FILE_COUNT) long fileCount,
66+
@JsonProperty(FIELD_LAST_FILE_CREATION_TIME) long lastFileCreationTime) {
67+
this.snapshot = snapshot;
68+
this.recordCount = recordCount;
69+
this.fileSizeInBytes = fileSizeInBytes;
70+
this.fileCount = fileCount;
71+
this.lastFileCreationTime = lastFileCreationTime;
72+
}
73+
74+
@JsonGetter(FIELD_SNAPSHOT)
75+
public Snapshot snapshot() {
76+
return snapshot;
77+
}
78+
79+
@JsonGetter(FIELD_RECORD_COUNT)
80+
public long recordCount() {
81+
return recordCount;
82+
}
83+
84+
@JsonGetter(FIELD_FILE_SIZE_IN_BYTES)
85+
public long fileSizeInBytes() {
86+
return fileSizeInBytes;
87+
}
88+
89+
@JsonGetter(FIELD_FILE_COUNT)
90+
public long fileCount() {
91+
return fileCount;
92+
}
93+
94+
@JsonGetter(FIELD_LAST_FILE_CREATION_TIME)
95+
public long lastFileCreationTime() {
96+
return lastFileCreationTime;
97+
}
98+
99+
@Override
100+
public boolean equals(Object o) {
101+
if (this == o) {
102+
return true;
103+
}
104+
if (o == null || getClass() != o.getClass()) {
105+
return false;
106+
}
107+
TableSnapshot that = (TableSnapshot) o;
108+
return recordCount == that.recordCount
109+
&& fileSizeInBytes == that.fileSizeInBytes
110+
&& fileCount == that.fileCount
111+
&& lastFileCreationTime == that.lastFileCreationTime
112+
&& Objects.equals(snapshot, that.snapshot);
113+
}
114+
115+
@Override
116+
public int hashCode() {
117+
return Objects.hash(
118+
snapshot, recordCount, fileSizeInBytes, fileCount, lastFileCreationTime);
119+
}
120+
121+
@Override
122+
public String toString() {
123+
return "{"
124+
+ "snapshot="
125+
+ snapshot
126+
+ ", recordCount="
127+
+ recordCount
128+
+ ", fileSizeInBytes="
129+
+ fileSizeInBytes
130+
+ ", fileCount="
131+
+ fileCount
132+
+ ", lastFileCreationTime="
133+
+ lastFileCreationTime
134+
+ '}';
135+
}
136+
}

paimon-core/src/main/java/org/apache/paimon/tag/SnapshotLoaderImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.paimon.catalog.CatalogLoader;
2424
import org.apache.paimon.catalog.Identifier;
2525
import org.apache.paimon.catalog.SupportsSnapshots;
26+
import org.apache.paimon.table.TableSnapshot;
2627
import org.apache.paimon.utils.SnapshotLoader;
2728

2829
import java.io.IOException;
@@ -42,7 +43,9 @@ public SnapshotLoaderImpl(CatalogLoader catalogLoader, Identifier identifier) {
4243
@Override
4344
public Optional<Snapshot> load() throws IOException {
4445
try (Catalog catalog = catalogLoader.load()) {
45-
return ((SupportsSnapshots) catalog).loadSnapshot(identifier);
46+
return ((SupportsSnapshots) catalog)
47+
.loadSnapshot(identifier)
48+
.map(TableSnapshot::snapshot);
4649
} catch (Exception e) {
4750
throw new RuntimeException(e);
4851
}

paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,19 @@ protected void resetDataTokenOnRestServer(Identifier identifier) {
211211
}
212212

213213
@Override
214-
protected void updateSnapshotOnRestServer(Identifier identifier, Snapshot snapshot) {
215-
restCatalogServer.setTableSnapshot(identifier, snapshot);
214+
protected void updateSnapshotOnRestServer(
215+
Identifier identifier,
216+
Snapshot snapshot,
217+
long recordCount,
218+
long fileSizeInBytes,
219+
long fileCount,
220+
long lastFileCreationTime) {
221+
restCatalogServer.setTableSnapshot(
222+
identifier,
223+
snapshot,
224+
recordCount,
225+
fileSizeInBytes,
226+
fileCount,
227+
lastFileCreationTime);
216228
}
217229
}

paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import org.apache.paimon.table.CatalogEnvironment;
7878
import org.apache.paimon.table.FileStoreTable;
7979
import org.apache.paimon.table.FileStoreTableFactory;
80+
import org.apache.paimon.table.TableSnapshot;
8081
import org.apache.paimon.utils.BranchManager;
8182
import org.apache.paimon.utils.Pair;
8283
import org.apache.paimon.view.View;
@@ -133,7 +134,7 @@ public class RESTCatalogServer {
133134
private final Map<String, TableMetadata> tableMetadataStore = new HashMap<>();
134135
private final Map<String, List<Partition>> tablePartitionsStore = new HashMap<>();
135136
private final Map<String, View> viewStore = new HashMap<>();
136-
private final Map<String, Snapshot> tableSnapshotStore = new HashMap<>();
137+
private final Map<String, TableSnapshot> tableSnapshotStore = new HashMap<>();
137138
private final List<String> noPermissionDatabases = new ArrayList<>();
138139
private final List<String> noPermissionTables = new ArrayList<>();
139140
public final ConfigResponse configResponse;
@@ -150,7 +151,7 @@ public RESTCatalogServer(
150151
ResourcePaths resourcePaths = new ResourcePaths(prefix);
151152
this.databaseUri = resourcePaths.databases();
152153
Options conf = new Options();
153-
this.configResponse.getDefaults().forEach((k, v) -> conf.setString(k, v));
154+
this.configResponse.getDefaults().forEach(conf::setString);
154155
conf.setString(CatalogOptions.WAREHOUSE.key(), dataPath);
155156
CatalogContext context = CatalogContext.create(conf);
156157
Path warehousePath = new Path(dataPath);
@@ -180,8 +181,17 @@ public void shutdown() throws IOException {
180181
server.shutdown();
181182
}
182183

183-
public void setTableSnapshot(Identifier identifier, Snapshot snapshot) {
184-
tableSnapshotStore.put(identifier.getFullName(), snapshot);
184+
public void setTableSnapshot(
185+
Identifier identifier,
186+
Snapshot snapshot,
187+
long recordCount,
188+
long fileSizeInBytes,
189+
long fileCount,
190+
long lastFileCreationTime) {
191+
tableSnapshotStore.put(
192+
identifier.getFullName(),
193+
new TableSnapshot(
194+
snapshot, recordCount, fileSizeInBytes, fileCount, lastFileCreationTime));
185195
}
186196

187197
public void setDataToken(Identifier identifier, RESTToken token) {
@@ -539,7 +549,7 @@ private MockResponse getDataTokenHandle(Identifier tableIdentifier) throws Excep
539549

540550
private MockResponse snapshotHandle(Identifier identifier) throws Exception {
541551
RESTResponse response;
542-
Optional<Snapshot> snapshotOptional =
552+
Optional<TableSnapshot> snapshotOptional =
543553
Optional.ofNullable(tableSnapshotStore.get(identifier.getFullName()));
544554
if (!snapshotOptional.isPresent()) {
545555
response =
@@ -1354,7 +1364,38 @@ private boolean commitSnapshot(
13541364
}
13551365
try {
13561366
boolean success = commit.commit(snapshot, branchName, Collections.emptyList());
1357-
tableSnapshotStore.put(identifier.getFullName(), snapshot);
1367+
tableSnapshotStore.compute(
1368+
identifier.getFullName(),
1369+
(k, old) -> {
1370+
long recordCount = 0;
1371+
long fileSizeInBytes = 0;
1372+
long fileCount = 0;
1373+
long lastFileCreationTime = 0;
1374+
if (statistics != null) {
1375+
for (Partition partition : statistics) {
1376+
recordCount += partition.recordCount();
1377+
fileSizeInBytes += partition.fileSizeInBytes();
1378+
fileCount += partition.fileCount();
1379+
if (partition.lastFileCreationTime() > lastFileCreationTime) {
1380+
lastFileCreationTime = partition.lastFileCreationTime();
1381+
}
1382+
}
1383+
}
1384+
if (old != null) {
1385+
recordCount += old.recordCount();
1386+
fileSizeInBytes += old.fileSizeInBytes();
1387+
fileCount += old.fileCount();
1388+
if (old.lastFileCreationTime() > lastFileCreationTime) {
1389+
lastFileCreationTime = old.lastFileCreationTime();
1390+
}
1391+
}
1392+
return new TableSnapshot(
1393+
snapshot,
1394+
recordCount,
1395+
fileCount,
1396+
lastFileCreationTime,
1397+
fileSizeInBytes);
1398+
});
13581399
return success;
13591400
} catch (Exception e) {
13601401
throw new RuntimeException(e);

paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTestBase.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.paimon.schema.SchemaChange;
3737
import org.apache.paimon.table.FileStoreTable;
3838
import org.apache.paimon.table.Table;
39+
import org.apache.paimon.table.TableSnapshot;
3940
import org.apache.paimon.table.sink.BatchTableCommit;
4041
import org.apache.paimon.table.sink.BatchTableWrite;
4142
import org.apache.paimon.table.sink.BatchWriteBuilder;
@@ -728,11 +729,15 @@ void testSnapshotFromREST() throws Exception {
728729
long id = 10086;
729730
long millis = System.currentTimeMillis();
730731
updateSnapshotOnRestServer(
731-
hasSnapshotTableIdentifier, createSnapshotWithMillis(id, millis));
732-
Optional<Snapshot> snapshot = catalog.loadSnapshot(hasSnapshotTableIdentifier);
732+
hasSnapshotTableIdentifier, createSnapshotWithMillis(id, millis), 1, 2, 3, 4);
733+
Optional<TableSnapshot> snapshot = catalog.loadSnapshot(hasSnapshotTableIdentifier);
733734
assertThat(snapshot).isPresent();
734-
assertThat(snapshot.get().id()).isEqualTo(id);
735-
assertThat(snapshot.get().timeMillis()).isEqualTo(millis);
735+
assertThat(snapshot.get().snapshot().id()).isEqualTo(id);
736+
assertThat(snapshot.get().snapshot().timeMillis()).isEqualTo(millis);
737+
assertThat(snapshot.get().recordCount()).isEqualTo(1);
738+
assertThat(snapshot.get().fileSizeInBytes()).isEqualTo(2);
739+
assertThat(snapshot.get().fileCount()).isEqualTo(3);
740+
assertThat(snapshot.get().lastFileCreationTime()).isEqualTo(4);
736741
Identifier noSnapshotTableIdentifier = Identifier.create("test_db_a_1", "unknown");
737742
createTable(noSnapshotTableIdentifier, Maps.newHashMap(), Lists.newArrayList("col1"));
738743
snapshot = catalog.loadSnapshot(noSnapshotTableIdentifier);
@@ -947,7 +952,13 @@ protected abstract void setDataTokenToRestServerForMock(
947952

948953
protected abstract void resetDataTokenOnRestServer(Identifier identifier);
949954

950-
protected abstract void updateSnapshotOnRestServer(Identifier identifier, Snapshot snapshot);
955+
protected abstract void updateSnapshotOnRestServer(
956+
Identifier identifier,
957+
Snapshot snapshot,
958+
long recordCount,
959+
long fileSizeInBytes,
960+
long fileCount,
961+
long lastFileCreationTime);
951962

952963
protected void batchWrite(FileStoreTable tableTestWrite, List<Integer> data) throws Exception {
953964
BatchWriteBuilder writeBuilder = tableTestWrite.newBatchWriteBuilder();

0 commit comments

Comments
 (0)