Skip to content

Commit 6a28e6e

Browse files
authored
[spark] support to report partition statistics for spark batch job (apache#5280)
1 parent deeed7e commit 6a28e6e

File tree

8 files changed

+68
-24
lines changed

8 files changed

+68
-24
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -671,6 +671,12 @@
671671
<td>Duration</td>
672672
<td>The expiration interval of a partition. A partition will be expired if it‘s lifetime is over this value. Partition time is extracted from the partition value.</td>
673673
</tr>
674+
<tr>
675+
<td><h5>partition.idle-time-to-report-statistic</h5></td>
676+
<td style="word-wrap: break-word;">1 h</td>
677+
<td>Duration</td>
678+
<td>Set a time duration when a partition has no new data after this time duration, start to report the partition statistics to hms.</td>
679+
</tr>
674680
<tr>
675681
<td><h5>partition.legacy-name</h5></td>
676682
<td style="word-wrap: break-word;">true</td>

docs/layouts/shortcodes/generated/flink_connector_configuration.html

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,6 @@
8686
<td>Duration</td>
8787
<td>Set a time duration when a partition has no new data after this time duration, mark the done status to indicate that the data is ready.</td>
8888
</tr>
89-
<tr>
90-
<td><h5>partition.idle-time-to-report-statistic</h5></td>
91-
<td style="word-wrap: break-word;">1 h</td>
92-
<td>Duration</td>
93-
<td>Set a time duration when a partition has no new data after this time duration, start to report the partition statistics to hms.</td>
94-
</tr>
9589
<tr>
9690
<td><h5>partition.time-interval</h5></td>
9791
<td style="word-wrap: break-word;">(none)</td>

paimon-common/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1600,6 +1600,14 @@ public class CoreOptions implements Serializable {
16001600
.withDescription(
16011601
"Enable data file thin mode to avoid duplicate columns storage.");
16021602

1603+
public static final ConfigOption<Duration> PARTITION_IDLE_TIME_TO_REPORT_STATISTIC =
1604+
key("partition.idle-time-to-report-statistic")
1605+
.durationType()
1606+
.defaultValue(Duration.ofHours(1))
1607+
.withDescription(
1608+
"Set a time duration when a partition has no new data after this time duration, "
1609+
+ "start to report the partition statistics to hms.");
1610+
16031611
@ExcludeFromDocumentation("Only used internally to support materialized table")
16041612
public static final ConfigOption<String> MATERIALIZED_TABLE_DEFINITION_QUERY =
16051613
key("materialized-table.definition-query")

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java renamed to paimon-core/src/main/java/org/apache/paimon/utils/PartitionStatisticsReporter.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.paimon.flink.sink.partition;
19+
package org.apache.paimon.utils;
2020

2121
import org.apache.paimon.Snapshot;
2222
import org.apache.paimon.fs.Path;
@@ -27,8 +27,6 @@
2727
import org.apache.paimon.table.source.DataSplit;
2828
import org.apache.paimon.table.source.ScanMode;
2929
import org.apache.paimon.table.source.snapshot.SnapshotReader;
30-
import org.apache.paimon.utils.Preconditions;
31-
import org.apache.paimon.utils.SnapshotManager;
3230

3331
import org.slf4j.Logger;
3432
import org.slf4j.LoggerFactory;

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java renamed to paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.paimon.flink.sink.partition;
19+
package org.apache.paimon.utils;
2020

2121
import org.apache.paimon.data.BinaryString;
2222
import org.apache.paimon.data.GenericRow;
@@ -33,7 +33,6 @@
3333
import org.apache.paimon.table.sink.CommitMessage;
3434
import org.apache.paimon.types.DataField;
3535
import org.apache.paimon.types.DataTypes;
36-
import org.apache.paimon.utils.PartitionPathUtils;
3736

3837
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
3938
import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -372,14 +372,6 @@ public class FlinkConnectorOptions {
372372
"You can specify time interval for partition, for example, "
373373
+ "daily partition is '1 d', hourly partition is '1 h'.");
374374

375-
public static final ConfigOption<Duration> PARTITION_IDLE_TIME_TO_REPORT_STATISTIC =
376-
key("partition.idle-time-to-report-statistic")
377-
.durationType()
378-
.defaultValue(Duration.ofHours(1))
379-
.withDescription(
380-
"Set a time duration when a partition has no new data after this time duration, "
381-
+ "start to report the partition statistics to hms.");
382-
383375
public static final ConfigOption<String> CLUSTERING_COLUMNS =
384376
key("sink.clustering.by-columns")
385377
.stringType()

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.paimon.flink.sink.partition;
2020

2121
import org.apache.paimon.CoreOptions;
22-
import org.apache.paimon.flink.FlinkConnectorOptions;
2322
import org.apache.paimon.manifest.ManifestCommittable;
2423
import org.apache.paimon.options.Options;
2524
import org.apache.paimon.table.FileStoreTable;
@@ -28,6 +27,7 @@
2827
import org.apache.paimon.table.sink.CommitMessageImpl;
2928
import org.apache.paimon.utils.InternalRowPartitionComputer;
3029
import org.apache.paimon.utils.PartitionPathUtils;
30+
import org.apache.paimon.utils.PartitionStatisticsReporter;
3131

3232
import org.apache.flink.api.common.state.ListState;
3333
import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -145,8 +145,7 @@ public static Optional<ReportPartStatsListener> create(
145145

146146
CoreOptions coreOptions = table.coreOptions();
147147
Options options = coreOptions.toConfiguration();
148-
if (options.get(FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC).toMillis()
149-
<= 0) {
148+
if (options.get(CoreOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC).toMillis() <= 0) {
150149
return Optional.empty();
151150
}
152151

@@ -177,7 +176,7 @@ public static Optional<ReportPartStatsListener> create(
177176
new PartitionStatisticsReporter(table, partitionHandler),
178177
stateStore,
179178
isRestored,
180-
options.get(FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC)
179+
options.get(CoreOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC)
181180
.toMillis()));
182181
}
183182

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.paimon.spark.commands
2020

21+
import org.apache.paimon.CoreOptions
2122
import org.apache.paimon.CoreOptions.WRITE_ONLY
2223
import org.apache.paimon.codegen.CodeGenUtils
2324
import org.apache.paimon.crosspartition.{IndexBootstrap, KeyPartOrRow}
@@ -34,12 +35,13 @@ import org.apache.paimon.table.BucketMode._
3435
import org.apache.paimon.table.FileStoreTable
3536
import org.apache.paimon.table.sink._
3637
import org.apache.paimon.types.{RowKind, RowType}
37-
import org.apache.paimon.utils.SerializationUtils
38+
import org.apache.paimon.utils.{InternalRowPartitionComputer, PartitionPathUtils, PartitionStatisticsReporter, SerializationUtils}
3839

3940
import org.apache.spark.{Partitioner, TaskContext}
4041
import org.apache.spark.rdd.RDD
4142
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
4243
import org.apache.spark.sql.functions._
44+
import org.slf4j.LoggerFactory
4345

4446
import java.io.IOException
4547
import java.util.Collections.singletonMap
@@ -54,6 +56,8 @@ case class PaimonSparkWriter(table: FileStoreTable) {
5456

5557
private lazy val bucketMode = table.bucketMode
5658

59+
private lazy val log = LoggerFactory.getLogger(classOf[PaimonSparkWriter])
60+
5761
@transient private lazy val serializer = new CommitMessageSerializer
5862

5963
val writeBuilder: BatchWriteBuilder = table.newBatchWriteBuilder()
@@ -316,6 +320,48 @@ case class PaimonSparkWriter(table: FileStoreTable) {
316320
.toSeq
317321
}
318322

323+
private def reportToHms(messages: Seq[CommitMessage]): Unit = {
324+
val options = table.coreOptions()
325+
val config = options.toConfiguration
326+
327+
if (
328+
config.get(CoreOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC).toMillis <= 0 ||
329+
table.partitionKeys.isEmpty ||
330+
!options.partitionedTableInMetastore ||
331+
table.catalogEnvironment.partitionHandler() == null
332+
) {
333+
return
334+
}
335+
336+
val partitionComputer = new InternalRowPartitionComputer(
337+
options.partitionDefaultName,
338+
table.schema.logicalPartitionType,
339+
table.partitionKeys.toArray(new Array[String](0)),
340+
options.legacyPartitionName()
341+
)
342+
val hmsReporter = new PartitionStatisticsReporter(
343+
table,
344+
table.catalogEnvironment.partitionHandler()
345+
)
346+
347+
val partitions = messages.map(_.partition()).distinct
348+
val currentTime = System.currentTimeMillis()
349+
try {
350+
partitions.foreach {
351+
partition =>
352+
val partitionPath = PartitionPathUtils.generatePartitionPath(
353+
partitionComputer.generatePartValues(partition))
354+
hmsReporter.report(partitionPath, currentTime)
355+
}
356+
} catch {
357+
case e: Throwable =>
358+
log.warn("Failed to report to hms", e)
359+
360+
} finally {
361+
hmsReporter.close()
362+
}
363+
}
364+
319365
def commit(commitMessages: Seq[CommitMessage]): Unit = {
320366
val tableCommit = writeBuilder.newCommit()
321367
try {
@@ -325,6 +371,8 @@ case class PaimonSparkWriter(table: FileStoreTable) {
325371
} finally {
326372
tableCommit.close()
327373
}
374+
375+
reportToHms(commitMessages)
328376
}
329377

330378
/** Boostrap and repartition for cross partition mode. */

0 commit comments

Comments
 (0)