diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAutoKeyGenForSQL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAutoKeyGenForSQL.scala index f8638f6ec9258..d11cf8ea377a5 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAutoKeyGenForSQL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAutoKeyGenForSQL.scala @@ -19,15 +19,21 @@ package org.apache.hudi +import org.apache.hudi.TestAutoKeyGenForSQL.randomString +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.view.{FileSystemViewManager, HoodieTableFileSystemView} import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration import org.apache.hudi.testutils.SparkClientFunctionalTestHarness import org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf import org.apache.spark.SparkConf -import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.CsvSource +import org.junit.jupiter.params.provider.{CsvSource, EnumSource} + +import scala.util.Random class TestAutoKeyGenForSQL extends SparkClientFunctionalTestHarness { override def conf: SparkConf = conf(getSparkSqlConf) @@ -36,7 +42,6 @@ class TestAutoKeyGenForSQL extends SparkClientFunctionalTestHarness { @CsvSource(value = Array("MERGE_ON_READ", "COPY_ON_WRITE")) def testAutoKeyGen(tableType: String): Unit = { // No record key is set, which should trigger auto key gen. - // MOR table is used to generate log files. val tableName = "hoodie_test_" + tableType spark.sql( s""" @@ -70,11 +75,8 @@ class TestAutoKeyGenForSQL extends SparkClientFunctionalTestHarness { | (1695173887231,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06 ,'chennai'), | (1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai'); """.stripMargin) - // Create the first log file by update. spark.sql(s"UPDATE $tableName SET fare = 25.0 WHERE rider = 'rider-D';") - // Create the second log file by delete. spark.sql(s"DELETE FROM $tableName WHERE uuid = '334e26e9-8355-45cc-97c6-c31daf0df330';") - // Create the third log file by delete. spark.sql(s"DELETE FROM $tableName WHERE uuid = '9909a8b1-2d15-4d3d-8ec9-efc48c536a00';") // Validate: data integrity. @@ -97,4 +99,198 @@ class TestAutoKeyGenForSQL extends SparkClientFunctionalTestHarness { // Record key fields should be empty. assertTrue(metaClient.getTableConfig.getRecordKeyFields.isEmpty) } + + @ParameterizedTest + @EnumSource(classOf[HoodieTableType]) + def testAutoKeyGenForImmutableWorkflow(tableType: HoodieTableType): Unit = { + // No record key is set, which should trigger auto key gen. + val tableName = "hoodie_immutable_" + tableType + val compactionEnabled = if (tableType == HoodieTableType.MERGE_ON_READ) "true" else "false" + spark.sql( + s""" + |CREATE TABLE $tableName ( + | ts BIGINT, + | uuid STRING, + | rider STRING, + | driver STRING, + | fare DOUBLE, + | city STRING ) + | USING hudi + | OPTIONS ( + | hoodie.metadata.enable = 'true', + | hoodie.enable.data.skipping = 'true', + | hoodie.write.record.merge.mode = 'COMMIT_TIME_ORDERING', + | hoodie.clean.commits.retained = '5', + | hoodie.keep.max.commits = '3', + | hoodie.keep.min.commits = '2', + | hoodie.clustering.inline = 'true', + | hoodie.clustering.inline.max.commits = '2', + | hoodie.compact.inline = '$compactionEnabled') + | PARTITIONED BY(city) + | LOCATION '$basePath' + | TBLPROPERTIES (hoodie.datasource.write.table.type='${tableType.name}') + """.stripMargin) + spark.sql( + s""" + |INSERT INTO $tableName VALUES + | (1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'), + | (1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70 ,'san_francisco'), + | (1695046462179,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90 ,'san_francisco'), + | (1695332066204,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'), + | (1695516137016,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'), + | (1695376420876,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40 ,'sao_paulo'), + | (1695173887231,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06 ,'chennai'), + | (1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai'); + """.stripMargin) + + import java.util.UUID + import scala.util.Random + for (i <- 0 until 30) { + val ts: Long = 1695115999911L + i + 1 + val uuid: String = UUID.randomUUID.toString + val rider: String = s"rider-$i" + val driver: String = s"driver-$i" + val fare: Float = Random.nextFloat + val city: String = randomString(8) + + spark.sql( + s""" + |INSERT INTO $tableName VALUES + |($ts, '$uuid', '$rider', '$driver', $fare, '$city'); + """.stripMargin) + } + + // Validate: data integrity + val noRecords = spark.sql(s"SELECT * FROM $tableName").count() + assertEquals(38, noRecords) + // Validate: table property. + val metaClient: HoodieTableMetaClient = HoodieTableMetaClient + .builder() + .setBasePath(basePath) + .setConf(new HadoopStorageConfiguration(spark.sparkContext.hadoopConfiguration)) + .build() + // Validate: record key fields should be empty. + assertTrue(metaClient.getTableConfig.getRecordKeyFields.isEmpty) + // Validate: table services are triggered. + assertFalse(metaClient.getActiveTimeline.getCleanerTimeline.getInstants.isEmpty) + assertFalse(metaClient.getArchivedTimeline.getInstants.isEmpty) + assertFalse(metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.isEmpty) + if (tableType == HoodieTableType.MERGE_ON_READ) { + assertFalse(metaClient.getActiveTimeline.getCommitsAndCompactionTimeline.empty()) + val fsv: HoodieTableFileSystemView = FileSystemViewManager.createInMemoryFileSystemView( + context, metaClient, HoodieMetadataConfig.newBuilder.enable(true).build) + fsv.loadAllPartitions() + assertFalse(fsv.getAllFileGroups.flatMap(_.getAllFileSlices).anyMatch(_.hasLogFiles)) + } + } + + @ParameterizedTest + @EnumSource(classOf[HoodieTableType]) + def testAutoKeyGenForMutableWorkflow(tableType: HoodieTableType): Unit = { + // No record key is set, which should trigger auto key gen. + val tableName = "hoodie_mutable__" + tableType + val compactionEnabled = if (tableType == HoodieTableType.MERGE_ON_READ) "true" else "false" + spark.sql( + s""" + |CREATE TABLE $tableName ( + | ts BIGINT, + | uuid STRING, + | rider STRING, + | driver STRING, + | fare DOUBLE, + | city STRING ) + | USING hudi + | OPTIONS ( + | hoodie.metadata.enable = 'true', + | hoodie.enable.data.skipping = 'true', + | hoodie.write.record.merge.mode = 'COMMIT_TIME_ORDERING', + | hoodie.clean.commits.retained = '5', + | hoodie.keep.max.commits = '3', + | hoodie.keep.min.commits = '2', + | hoodie.clustering.inline = 'true', + | hoodie.clustering.inline.max.commits = '2', + | hoodie.compact.inline = '$compactionEnabled') + | PARTITIONED BY(city) + | LOCATION '$basePath' + | TBLPROPERTIES (hoodie.datasource.write.table.type='${tableType.name}') + """.stripMargin) + spark.sql( + s""" + |INSERT INTO $tableName VALUES + | (1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'), + | (1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-B','driver-M',27.70 ,'san_francisco'), + | (1695046462179,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-C','driver-L',33.90 ,'san_francisco'), + | (1695332066204,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-D','driver-O',93.50,'san_francisco'), + | (1695516137016,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-E','driver-P',34.15,'sao_paulo'), + | (1695376420876,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-F','driver-Q',43.40 ,'sao_paulo'), + | (1695173887231,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-G','driver-S',41.06 ,'chennai'), + | (1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-H','driver-T',17.85,'chennai'); + """.stripMargin) + + import java.util.UUID + for (i <- 0 until 10) { + val ts: Long = 1695115999911L + i + 1 + val uuid: String = UUID.randomUUID.toString + val rider: String = s"rider-$i" + val driver: String = s"driver-$i" + val fare: Float = Random.nextFloat + val city: String = randomString(8) + spark.sql( + s""" + |INSERT INTO $tableName VALUES + |($ts, '$uuid', '$rider', '$driver', $fare, '$city'); + """.stripMargin) + } + + for (i <- 0 until 10) { + val ts: Long = 1695115999911L + i + 1 + val rider: String = s"rider-${'A' + new Random().nextInt(8)}" + spark.sql( + s""" + |UPDATE $tableName + |SET ts = $ts + |WHERE rider = '$rider' + """.stripMargin) + } + + for (i <- 0 until 2) { + val rider: String = s"rider-${('A' + new Random().nextInt(8)).toChar}" + spark.sql( + s""" + |DELETE FROM $tableName + |WHERE rider = '$rider' + """.stripMargin) + } + + // Validate: data integrity + val noRecords = spark.sql(s"SELECT * FROM $tableName").count() + assertEquals(16, noRecords) + // Validate: table property. + val metaClient: HoodieTableMetaClient = HoodieTableMetaClient + .builder() + .setBasePath(basePath) + .setConf(new HadoopStorageConfiguration(spark.sparkContext.hadoopConfiguration)) + .build() + // Validate: record key fields should be empty. + assertTrue(metaClient.getTableConfig.getRecordKeyFields.isEmpty) + // Validate: table services are triggered. + assertFalse(metaClient.getActiveTimeline.getCleanerTimeline.getInstants.isEmpty) + assertFalse(metaClient.getArchivedTimeline.getInstants.isEmpty) + assertFalse(metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.isEmpty) + if (tableType == HoodieTableType.MERGE_ON_READ) { + assertFalse(metaClient.getActiveTimeline.getCommitsAndCompactionTimeline.empty()) + val fsv: HoodieTableFileSystemView = FileSystemViewManager.createInMemoryFileSystemView( + context, metaClient, HoodieMetadataConfig.newBuilder.enable(true).build) + fsv.loadAllPartitions() + assertTrue(fsv.getAllFileGroups.flatMap(_.getAllFileSlices).anyMatch(_.hasLogFiles)) + } + } +} + +object TestAutoKeyGenForSQL { + val chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + + def randomString(length: Int): String = { + (1 to length).map(_ => chars(Random.nextInt(chars.length))).mkString + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoKeyGeneration.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoKeyGeneration.scala new file mode 100644 index 0000000000000..71c8db39b940b --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoKeyGeneration.scala @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.{DataSourceWriteOptions, ScalaAssertionSupport} +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.common.table.view.{FileSystemViewManager, HoodieTableFileSystemView} +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.common.testutils.HoodieTestDataGenerator.recordsToStrings +import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig, HoodieWriteConfig} +import org.apache.hudi.keygen.SimpleKeyGenerator +import org.apache.hudi.testutils.HoodieSparkClientTestBase + +import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} +import org.apache.spark.sql.functions.col +import org.junit.jupiter.api.{AfterEach, BeforeEach} +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.EnumSource + +class TestAutoKeyGeneration extends HoodieSparkClientTestBase with ScalaAssertionSupport { + var spark: SparkSession = null + val verificationCol: String = "driver" + val updatedVerificationVal: String = "driver_update" + val commonOpts: Map[String, String] = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", + HoodieTableConfig.ORDERING_FIELDS.key -> "timestamp", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key -> classOf[SimpleKeyGenerator].getName, + HoodieClusteringConfig.INLINE_CLUSTERING.key -> "true", + HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key -> "3" + ) + + @BeforeEach override def setUp(): Unit = { + setTableName("hoodie_test") + initPath() + initSparkContexts() + spark = sqlContext.sparkSession + initTestDataGenerator() + initHoodieStorage() + } + + @AfterEach override def tearDown(): Unit = { + cleanupSparkContexts() + cleanupTestDataGenerator() + cleanupFileSystem() + } + + @ParameterizedTest + @EnumSource(classOf[HoodieTableType]) + def testAutoKeyGenerationWithImmutableFlow(tableType: HoodieTableType): Unit = { + val compactionEnabled = if (tableType == HoodieTableType.MERGE_ON_READ) "true" else "false" + val writeOptions = commonOpts ++ Map( + DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(), + HoodieCompactionConfig.INLINE_COMPACT.key -> compactionEnabled + ) + // Bulk insert first. + val records = recordsToStrings(dataGen.generateInserts("001", 5)) + val inputDF: Dataset[Row] = spark.read.json(jsc.parallelize(records, 2)) + inputDF.write.format("hudi").partitionBy("partition") + .options(writeOptions) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Overwrite).save(basePath) + metaClient = HoodieTableMetaClient.builder.setConf(storageConf).setBasePath(basePath).build + assertEquals(9, metaClient.getTableConfig.getTableVersion.versionCode()) + // Ensure no key fields are set. + assertTrue(metaClient.getTableConfig.getRecordKeyFields.isEmpty) + + // 30 inserts; every insert adds 5 records. + for (i <- 0 until 30) { + val records = recordsToStrings(dataGen.generateInserts("$i%03d", 5)) + val inputDF: Dataset[Row] = spark.read.json(jsc.parallelize(records, 2)) + inputDF.write.format("hudi").partitionBy("partition") + .options(writeOptions) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Append).save(basePath) + } + + // Validate configs. + metaClient = HoodieTableMetaClient.reload(metaClient) + assertTrue(metaClient.getTableConfig.getRecordKeyFields.isEmpty) + // Validate all records are unique. + val numRecords = spark.read.format("hudi").load(basePath).count() + assertEquals(155, numRecords) + // Validate clean, archive, and clustering operation exists. + assertFalse(metaClient.getActiveTimeline.getCleanerTimeline.empty()) + assertFalse(metaClient.getArchivedTimeline.empty()) + assertFalse(metaClient.getActiveTimeline.getCompletedReplaceTimeline.empty()) + if (tableType == HoodieTableType.MERGE_ON_READ) { + assertFalse(metaClient.getActiveTimeline.getCommitsAndCompactionTimeline.empty()) + val fsv: HoodieTableFileSystemView = FileSystemViewManager.createInMemoryFileSystemView( + context, metaClient, HoodieMetadataConfig.newBuilder.enable(true).build) + fsv.loadAllPartitions() + assertFalse(fsv.getAllFileGroups.flatMap(_.getAllFileSlices).anyMatch(_.hasLogFiles)) + } + } + + @ParameterizedTest + @EnumSource(classOf[HoodieTableType]) + def testAutoKeyGenerationWithMutableFlow(tableType: HoodieTableType): Unit = { + val compactionEnabled = if (tableType == HoodieTableType.MERGE_ON_READ) "true" else "false" + val writeOptions = commonOpts ++ Map( + DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(), + HoodieCompactionConfig.INLINE_COMPACT.key -> compactionEnabled + ) + // Bulk insert first. + val records = recordsToStrings(dataGen.generateInserts("001", 5)) + val inputDF: Dataset[Row] = spark.read.json(jsc.parallelize(records, 2)) + inputDF.write.format("hudi").partitionBy("partition") + .options(writeOptions) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Overwrite).save(basePath) + metaClient = HoodieTableMetaClient.builder.setConf(storageConf).setBasePath(basePath).build + assertEquals(9, metaClient.getTableConfig.getTableVersion.versionCode()) + // Ensure no key fields are set. + assertTrue(metaClient.getTableConfig.getRecordKeyFields.isEmpty) + // 10 inserts. + for (i <- 0 until 10) { + val records = recordsToStrings(dataGen.generateInserts("$i%03d", 5)) + val inputDF: Dataset[Row] = spark.read.json(jsc.parallelize(records, 2)) + inputDF.write.format("hudi").options(writeOptions) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Append).save(basePath) + } + // 10 updates. + for (i <- 10 until 20) { + val inputDF = spark.read.format("hudi").load(basePath) + .withColumn("weight", col("weight") * 2) + inputDF.write.format("hudi").partitionBy("partition") + .options(writeOptions) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Append).save(basePath) + } + // 10 deletes. + for (i <- 20 until 30) { + val inputDF = spark.read.format("hudi").load(basePath).limit(1) + inputDF.write.format("hudi").partitionBy("partition") + .options(writeOptions) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL) + .mode(SaveMode.Append).save(basePath) + } + // Validate configs. + metaClient = HoodieTableMetaClient.reload(metaClient) + assertTrue(metaClient.getTableConfig.getRecordKeyFields.isEmpty) + // Validate all records are unique. + val numRecords = spark.read.format("hudi").load(basePath).count() + assertEquals(45, numRecords) + // Validate clean, archive, and clustering operation exists. + assertFalse(metaClient.getActiveTimeline.getCleanerTimeline.getInstants.isEmpty) + assertFalse(metaClient.getArchivedTimeline.getInstants.isEmpty) + assertFalse(metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.isEmpty) + if (tableType == HoodieTableType.MERGE_ON_READ) { + assertFalse(metaClient.getActiveTimeline.getCommitsAndCompactionTimeline.empty()) + val fsv: HoodieTableFileSystemView = FileSystemViewManager.createInMemoryFileSystemView( + context, metaClient, HoodieMetadataConfig.newBuilder.enable(true).build) + fsv.loadAllPartitions() + assertTrue(fsv.getAllFileGroups.flatMap(_.getAllFileSlices).anyMatch(_.hasLogFiles)) + } + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala index 834173ad33928..688d17bad6306 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger} import org.apache.spark.sql.types.StructType import org.junit.jupiter.api.{BeforeEach, Test} -import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource} import org.slf4j.LoggerFactory @@ -567,4 +567,44 @@ class TestStructuredStreaming extends HoodieSparkClientTestBase { } } } + + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testStructuredStreamingWithAutoKeyGen(tableType: HoodieTableType): Unit = { + val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest") + // First chunk of data + val records1 = recordsToStrings(dataGen.generateInserts("000", 10)).asScala.toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath) + val extraOpts = Map( + DataSourceWriteOptions.STREAMING_DISABLE_COMPACTION.key -> "true", + DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name, + "hoodie.datasource.compaction.async.enable" -> "false", + "hoodie.write.record.merge.mode" -> "COMMIT_TIME_ORDERING", + "hoodie.clean.commits.retained" -> "5", + "hoodie.keep.max.commits" -> "3", + "hoodie.keep.min.commits" -> "2", + "hoodie.clustering.inline" -> "true", + "hoodie.clustering.inline.max.commits" -> "2") + var opts = commonOpts ++ extraOpts + // Enable AutoKeyGen + opts -= "hoodie.datasource.write.recordkey.field" + streamingWrite(inputDF1.schema, sourcePath, destPath, opts) + var metaClient = HoodieTestUtils.createMetaClient(storage, destPath) + assertTrue(metaClient.getTableConfig.getRecordKeyFields.isEmpty) + + for (i <- 1 to 24) { + val id = String.format("%03d", new Integer(i)) + val records = recordsToStrings(dataGen.generateUpdates(id, 10)).asScala.toList + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + inputDF.coalesce(1).write.mode(SaveMode.Append).json(sourcePath) + streamingWrite(inputDF.schema, sourcePath, destPath, opts) + } + metaClient = HoodieTableMetaClient.reload(metaClient) + assertTrue(metaClient.getTableConfig.getRecordKeyFields.isEmpty) + assertFalse(metaClient.getActiveTimeline.getCompletedReplaceTimeline.empty()) + assertFalse(metaClient.getActiveTimeline.getCleanerTimeline.empty()) + assertFalse(metaClient.getArchivedTimeline.empty()) + assertEquals(250, spark.read.format("hudi").load(destPath).count()) + } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index 4cbe5de74d73f..cb410a4b55ff6 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -3376,6 +3376,45 @@ void testErrorTableSourcePersist(WriteOperationType writeOperationType, boolean assertRecordCount(950, tableBasePath, sqlContext); } + @ParameterizedTest + @EnumSource(HoodieTableType.class) + void testDeltaSyncWithAutoKeyGenAndImmutableOperations(HoodieTableType tableType) throws Exception { + PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + testNum; + int parquetRecordsCount = 10; + HoodieTestDataGenerator dataGenerator = prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, false, null, null); + TypedProperties extraProps = new TypedProperties(); + extraProps.setProperty("hoodie.datasource.write.table.type", tableType.name()); + extraProps.setProperty("hoodie.datasource.compaction.async.enable", "false"); + extraProps.setProperty("hoodie.write.record.merge.mode", "COMMIT_TIME_ORDERING"); + extraProps.setProperty("hoodie.clean.commits.retained", "5"); + extraProps.setProperty("hoodie.keep.max.commits", "3"); + extraProps.setProperty("hoodie.keep.min.commits", "2"); + extraProps.setProperty("hoodie.clustering.inline", "true"); + extraProps.setProperty("hoodie.clustering.inline.max.commits", "2"); + prepareParquetDFSSource(false, false, "source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET, + PARQUET_SOURCE_ROOT, false, "partition_path", "", extraProps, true); + String tableBasePath = basePath + "test_parquet_table" + testNum; + HoodieDeltaStreamer.Config deltaCfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, ParquetDFSSource.class.getName(), + null, PROPS_FILENAME_TEST_PARQUET, false, + false, 100000, false, null, tableType.name(), "timestamp", null); + deltaCfg.retryLastPendingInlineCompactionJob = false; + HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(deltaCfg, jsc); + + for (int i = 0; i < 30; i++) { + deltaStreamer.sync(); + assertRecordCount(parquetRecordsCount + i * 5, tableBasePath, sqlContext); + prepareParquetDFSUpdates(5, PARQUET_SOURCE_ROOT, (i + 3) + ".parquet", false, null, null, dataGenerator, "001"); + deltaStreamer.sync(); + } + + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setStorage(storage).setBasePath(tableBasePath).build(); + assertTrue(metaClient.getTableConfig().getRecordKeyFields().isEmpty()); + assertRecordCount(160, tableBasePath, sqlContext); + assertFalse(metaClient.getActiveTimeline().getCleanerTimeline().getInstants().isEmpty()); + assertFalse(metaClient.getArchivedTimeline().getInstants().isEmpty()); + assertFalse(metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().isEmpty()); + } + private Set getAllFileIDsInTable(String tableBasePath, Option partition) { HoodieTableMetaClient metaClient = createMetaClient(jsc, tableBasePath); final HoodieTableFileSystemView fsView = HoodieTableFileSystemView.fileListingBasedFileSystemView(context, metaClient, metaClient.getCommitsAndCompactionTimeline());