Skip to content

Commit 61ca12d

Browse files
committed
[core] Avoid getting file size for manifest list and writing file
1 parent 0d88499 commit 61ca12d

File tree

17 files changed

+165
-46
lines changed

17 files changed

+165
-46
lines changed

paimon-core/src/main/java/org/apache/paimon/Changelog.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,11 @@ public Changelog(Snapshot snapshot) {
4747
snapshot.id(),
4848
snapshot.schemaId(),
4949
snapshot.baseManifestList(),
50+
snapshot.baseManifestListSize(),
5051
snapshot.deltaManifestList(),
52+
snapshot.deltaManifestListSize(),
5153
snapshot.changelogManifestList(),
54+
snapshot.changelogManifestListSize(),
5255
snapshot.indexManifest(),
5356
snapshot.commitUser(),
5457
snapshot.commitIdentifier(),
@@ -68,8 +71,12 @@ public Changelog(
6871
@JsonProperty(FIELD_ID) long id,
6972
@JsonProperty(FIELD_SCHEMA_ID) long schemaId,
7073
@JsonProperty(FIELD_BASE_MANIFEST_LIST) String baseManifestList,
74+
@JsonProperty(FIELD_BASE_MANIFEST_LIST_SIZE) @Nullable Long baseManifestListSize,
7175
@JsonProperty(FIELD_DELTA_MANIFEST_LIST) String deltaManifestList,
76+
@JsonProperty(FIELD_DELTA_MANIFEST_LIST_SIZE) @Nullable Long deltaManifestListSize,
7277
@JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST) @Nullable String changelogManifestList,
78+
@JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST_SIZE) @Nullable
79+
Long changelogManifestListSize,
7380
@JsonProperty(FIELD_INDEX_MANIFEST) @Nullable String indexManifest,
7481
@JsonProperty(FIELD_COMMIT_USER) String commitUser,
7582
@JsonProperty(FIELD_COMMIT_IDENTIFIER) long commitIdentifier,
@@ -86,8 +93,11 @@ public Changelog(
8693
id,
8794
schemaId,
8895
baseManifestList,
96+
baseManifestListSize,
8997
deltaManifestList,
98+
deltaManifestListSize,
9099
changelogManifestList,
100+
changelogManifestListSize,
91101
indexManifest,
92102
commitUser,
93103
commitIdentifier,

paimon-core/src/main/java/org/apache/paimon/Snapshot.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,11 @@ public class Snapshot implements Serializable {
7575
protected static final String FIELD_ID = "id";
7676
protected static final String FIELD_SCHEMA_ID = "schemaId";
7777
protected static final String FIELD_BASE_MANIFEST_LIST = "baseManifestList";
78+
protected static final String FIELD_BASE_MANIFEST_LIST_SIZE = "baseManifestListSize";
7879
protected static final String FIELD_DELTA_MANIFEST_LIST = "deltaManifestList";
80+
protected static final String FIELD_DELTA_MANIFEST_LIST_SIZE = "deltaManifestListSize";
7981
protected static final String FIELD_CHANGELOG_MANIFEST_LIST = "changelogManifestList";
82+
protected static final String FIELD_CHANGELOG_MANIFEST_LIST_SIZE = "changelogManifestListSize";
8083
protected static final String FIELD_INDEX_MANIFEST = "indexManifest";
8184
protected static final String FIELD_COMMIT_USER = "commitUser";
8285
protected static final String FIELD_COMMIT_IDENTIFIER = "commitIdentifier";
@@ -105,17 +108,32 @@ public class Snapshot implements Serializable {
105108
@JsonProperty(FIELD_BASE_MANIFEST_LIST)
106109
protected final String baseManifestList;
107110

111+
@JsonProperty(FIELD_BASE_MANIFEST_LIST_SIZE)
112+
@JsonInclude(JsonInclude.Include.NON_NULL)
113+
@Nullable
114+
protected final Long baseManifestListSize;
115+
108116
// a manifest list recording all new changes occurred in this snapshot
109117
// for faster expire and streaming reads
110118
@JsonProperty(FIELD_DELTA_MANIFEST_LIST)
111119
protected final String deltaManifestList;
112120

121+
@JsonProperty(FIELD_DELTA_MANIFEST_LIST_SIZE)
122+
@JsonInclude(JsonInclude.Include.NON_NULL)
123+
@Nullable
124+
protected final Long deltaManifestListSize;
125+
113126
// a manifest list recording all changelog produced in this snapshot
114127
// null if no changelog is produced, or for paimon <= 0.2
115128
@JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST)
116129
@Nullable
117130
protected final String changelogManifestList;
118131

132+
@JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST_SIZE)
133+
@JsonInclude(JsonInclude.Include.NON_NULL)
134+
@Nullable
135+
protected final Long changelogManifestListSize;
136+
119137
// a manifest recording all index files of this table
120138
// null if no index file
121139
@JsonProperty(FIELD_INDEX_MANIFEST)
@@ -185,8 +203,11 @@ public Snapshot(
185203
long id,
186204
long schemaId,
187205
String baseManifestList,
206+
@Nullable Long baseManifestListSize,
188207
String deltaManifestList,
208+
@Nullable Long deltaManifestListSize,
189209
@Nullable String changelogManifestList,
210+
@Nullable Long changelogManifestListSize,
190211
@Nullable String indexManifest,
191212
String commitUser,
192213
long commitIdentifier,
@@ -203,8 +224,11 @@ public Snapshot(
203224
id,
204225
schemaId,
205226
baseManifestList,
227+
baseManifestListSize,
206228
deltaManifestList,
229+
deltaManifestListSize,
207230
changelogManifestList,
231+
changelogManifestListSize,
208232
indexManifest,
209233
commitUser,
210234
commitIdentifier,
@@ -224,8 +248,12 @@ public Snapshot(
224248
@JsonProperty(FIELD_ID) long id,
225249
@JsonProperty(FIELD_SCHEMA_ID) long schemaId,
226250
@JsonProperty(FIELD_BASE_MANIFEST_LIST) String baseManifestList,
251+
@JsonProperty(FIELD_BASE_MANIFEST_LIST_SIZE) @Nullable Long baseManifestListSize,
227252
@JsonProperty(FIELD_DELTA_MANIFEST_LIST) String deltaManifestList,
253+
@JsonProperty(FIELD_DELTA_MANIFEST_LIST_SIZE) @Nullable Long deltaManifestListSize,
228254
@JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST) @Nullable String changelogManifestList,
255+
@JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST_SIZE) @Nullable
256+
Long changelogManifestListSize,
229257
@JsonProperty(FIELD_INDEX_MANIFEST) @Nullable String indexManifest,
230258
@JsonProperty(FIELD_COMMIT_USER) String commitUser,
231259
@JsonProperty(FIELD_COMMIT_IDENTIFIER) long commitIdentifier,
@@ -241,8 +269,11 @@ public Snapshot(
241269
this.id = id;
242270
this.schemaId = schemaId;
243271
this.baseManifestList = baseManifestList;
272+
this.baseManifestListSize = baseManifestListSize;
244273
this.deltaManifestList = deltaManifestList;
274+
this.deltaManifestListSize = deltaManifestListSize;
245275
this.changelogManifestList = changelogManifestList;
276+
this.changelogManifestListSize = changelogManifestListSize;
246277
this.indexManifest = indexManifest;
247278
this.commitUser = commitUser;
248279
this.commitIdentifier = commitIdentifier;
@@ -277,17 +308,35 @@ public String baseManifestList() {
277308
return baseManifestList;
278309
}
279310

311+
@JsonGetter(FIELD_BASE_MANIFEST_LIST_SIZE)
312+
@Nullable
313+
public Long baseManifestListSize() {
314+
return baseManifestListSize;
315+
}
316+
280317
@JsonGetter(FIELD_DELTA_MANIFEST_LIST)
281318
public String deltaManifestList() {
282319
return deltaManifestList;
283320
}
284321

322+
@JsonGetter(FIELD_DELTA_MANIFEST_LIST_SIZE)
323+
@Nullable
324+
public Long deltaManifestListSize() {
325+
return deltaManifestListSize;
326+
}
327+
285328
@JsonGetter(FIELD_CHANGELOG_MANIFEST_LIST)
286329
@Nullable
287330
public String changelogManifestList() {
288331
return changelogManifestList;
289332
}
290333

334+
@JsonGetter(FIELD_CHANGELOG_MANIFEST_LIST_SIZE)
335+
@Nullable
336+
public Long changelogManifestListSize() {
337+
return changelogManifestListSize;
338+
}
339+
291340
@JsonGetter(FIELD_INDEX_MANIFEST)
292341
@Nullable
293342
public String indexManifest() {
@@ -361,8 +410,11 @@ public int hashCode() {
361410
id,
362411
schemaId,
363412
baseManifestList,
413+
baseManifestListSize,
364414
deltaManifestList,
415+
deltaManifestListSize,
365416
changelogManifestList,
417+
changelogManifestListSize,
366418
indexManifest,
367419
commitUser,
368420
commitIdentifier,
@@ -385,8 +437,11 @@ public boolean equals(Object o) {
385437
&& id == that.id
386438
&& schemaId == that.schemaId
387439
&& Objects.equals(baseManifestList, that.baseManifestList)
440+
&& Objects.equals(baseManifestListSize, that.baseManifestListSize)
388441
&& Objects.equals(deltaManifestList, that.deltaManifestList)
442+
&& Objects.equals(deltaManifestListSize, that.deltaManifestListSize)
389443
&& Objects.equals(changelogManifestList, that.changelogManifestList)
444+
&& Objects.equals(changelogManifestListSize, that.changelogManifestListSize)
390445
&& Objects.equals(indexManifest, that.indexManifest)
391446
&& Objects.equals(commitUser, that.commitUser)
392447
&& commitIdentifier == that.commitIdentifier

paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ public void write(IcebergManifestEntry entry) throws IOException {
236236
}
237237

238238
@Override
239-
public IcebergManifestFileMeta result() throws IOException {
239+
public IcebergManifestFileMeta result() {
240240
SimpleColStats[] stats = partitionStatsCollector.extract();
241241
List<IcebergPartitionSummary> partitionSummaries = new ArrayList<>();
242242
for (int i = 0; i < stats.length; i++) {
@@ -251,7 +251,7 @@ public IcebergManifestFileMeta result() throws IOException {
251251
}
252252
return new IcebergManifestFileMeta(
253253
path.toString(),
254-
fileIO.getFileSize(path),
254+
outputBytes,
255255
IcebergPartitionSpec.SPEC_ID,
256256
Content.DATA,
257257
sequenceNumber,

paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ public DataFileMeta result() throws IOException {
169169
return null;
170170
}
171171

172-
long fileSize = fileIO.getFileSize(path);
172+
long fileSize = outputBytes;
173173
Pair<SimpleColStats[], SimpleColStats[]> keyValueStats =
174174
fetchKeyValueStats(fieldStats(fileSize));
175175

paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public void close() throws IOException {
109109

110110
@Override
111111
public DataFileMeta result() throws IOException {
112-
long fileSize = fileIO.getFileSize(path);
112+
long fileSize = outputBytes;
113113
Pair<List<String>, SimpleStats> statsPair =
114114
statsArraySerializer.toBinary(fieldStats(fileSize));
115115
DataFileIndexWriter.FileIndexResult indexResult =

paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public abstract class SingleFileWriter<T, R> implements FileWriter<T, R> {
5252
private FormatWriter writer;
5353
private PositionOutputStream out;
5454

55+
protected long outputBytes;
5556
private long recordCount;
5657
protected boolean closed;
5758

@@ -170,7 +171,7 @@ public void close() throws IOException {
170171
}
171172

172173
if (LOG.isDebugEnabled()) {
173-
LOG.debug("Closing file " + path);
174+
LOG.debug("Closing file {}", path);
174175
}
175176

176177
try {
@@ -180,6 +181,7 @@ public void close() throws IOException {
180181
}
181182
if (out != null) {
182183
out.flush();
184+
outputBytes = out.getPos();
183185
out.close();
184186
out = null;
185187
}

paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ public void write(ManifestEntry entry) throws IOException {
165165
public ManifestFileMeta result() throws IOException {
166166
return new ManifestFileMeta(
167167
path.getName(),
168-
fileIO.getFileSize(path),
168+
outputBytes,
169169
numAddedFiles,
170170
numDeletedFiles,
171171
partitionStatsSerializer.toBinaryAllMode(partitionStatsCollector.extract()),

paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.paimon.types.RowType;
2828
import org.apache.paimon.utils.FileStorePathFactory;
2929
import org.apache.paimon.utils.ObjectsFile;
30+
import org.apache.paimon.utils.Pair;
3031
import org.apache.paimon.utils.PathFactory;
3132
import org.apache.paimon.utils.SegmentsCache;
3233
import org.apache.paimon.utils.VersionedObjectSerializer;
@@ -83,7 +84,7 @@ public List<ManifestFileMeta> readAllManifests(Snapshot snapshot) {
8384
*/
8485
public List<ManifestFileMeta> readDataManifests(Snapshot snapshot) {
8586
List<ManifestFileMeta> result = new ArrayList<>();
86-
result.addAll(read(snapshot.baseManifestList()));
87+
result.addAll(read(snapshot.baseManifestList(), snapshot.baseManifestListSize()));
8788
result.addAll(readDeltaManifests(snapshot));
8889
return result;
8990
}
@@ -94,7 +95,7 @@ public List<ManifestFileMeta> readDataManifests(Snapshot snapshot) {
9495
* @return a list of ManifestFileMeta.
9596
*/
9697
public List<ManifestFileMeta> readDeltaManifests(Snapshot snapshot) {
97-
return read(snapshot.deltaManifestList());
98+
return read(snapshot.deltaManifestList(), snapshot.deltaManifestListSize());
9899
}
99100

100101
/**
@@ -105,16 +106,16 @@ public List<ManifestFileMeta> readDeltaManifests(Snapshot snapshot) {
105106
public List<ManifestFileMeta> readChangelogManifests(Snapshot snapshot) {
106107
return snapshot.changelogManifestList() == null
107108
? Collections.emptyList()
108-
: read(snapshot.changelogManifestList());
109+
: read(snapshot.changelogManifestList(), snapshot.changelogManifestListSize());
109110
}
110111

111112
/**
112113
* Write several {@link ManifestFileMeta}s into a manifest list.
113114
*
114115
* <p>NOTE: This method is atomic.
115116
*/
116-
public String write(List<ManifestFileMeta> metas) {
117-
return super.writeWithoutRolling(metas);
117+
public Pair<String, Long> write(List<ManifestFileMeta> metas) {
118+
return super.writeWithoutRolling(metas.iterator());
118119
}
119120

120121
/** Creator of {@link ManifestList}. */

0 commit comments

Comments
 (0)