-
Notifications
You must be signed in to change notification settings - Fork 2.4k
[MINOR] Add tests for auto keygen for immutable and mutable workflow #13889
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[MINOR] Add tests for auto keygen for immutable and mutable workflow #13889
Conversation
2624dd8
to
7d8b75a
Compare
// Validate: table services are triggered. | ||
assertFalse(metaClient.getActiveTimeline.getCleanerTimeline.getInstants.isEmpty) | ||
assertFalse(metaClient.getArchivedTimeline.getInstants.isEmpty) | ||
assertFalse(metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.isEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add validation for data.
|
||
for (i <- 0 until 10) { | ||
val ts: Long = 1695115999911L + i + 1 | ||
val rider: String = s"rider-${'A' + new Random().nextInt(8)}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we initialize Random
once for the test and use it everywhere.
|
||
// Validate: data integrity | ||
val noRecords = spark.sql(s"SELECT * FROM $tableName").count() | ||
assertEquals(16, noRecords) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we declare a set for list of riders to update and delete. just to ensure the random does not produce the same rider records again.
so we know for sure, we are doing to update or delete two diff rider entries.
assertTrue(metaClient.getTableConfig.getRecordKeyFields.isEmpty) | ||
// Validate all records are unique. | ||
val numRecords = spark.read.format("hudi").load(basePath).count() | ||
assertEquals(45, numRecords) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we assertain the data is intact.
assertFalse(metaClient.getArchivedTimeline.getInstants.isEmpty) | ||
assertFalse(metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.isEmpty) | ||
if (tableType == HoodieTableType.MERGE_ON_READ) { | ||
assertFalse(metaClient.getActiveTimeline.getCommitsAndCompactionTimeline.empty()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should be able to ascertain exact number of compaction commits.
same for clean commits and replace commits above as well instead of just checking for ~isEmpty
assertFalse(metaClient.getArchivedTimeline.empty()) | ||
assertFalse(metaClient.getActiveTimeline.getCompletedReplaceTimeline.empty()) | ||
if (tableType == HoodieTableType.MERGE_ON_READ) { | ||
assertFalse(metaClient.getActiveTimeline.getCommitsAndCompactionTimeline.empty()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should be able to ascertain exact number of compaction commits.
same for clean commits and replace commits above as well instead of just checking for ~isEmpty
assertFalse(metaClient.getArchivedTimeline.getInstants.isEmpty) | ||
assertFalse(metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.isEmpty) | ||
if (tableType == HoodieTableType.MERGE_ON_READ) { | ||
assertFalse(metaClient.getActiveTimeline.getCommitsAndCompactionTimeline.empty()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should be able to ascertain exact number of compaction commits.
same for clean commits and replace commits above as well instead of just checking for ~isEmpty
@ParameterizedTest | ||
@EnumSource(value = classOf[HoodieTableType]) | ||
def testStructuredStreamingWithAutoKeyGen(tableType: HoodieTableType): Unit = { | ||
val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we try a mix of bulk insert and insert operation
|
||
@ParameterizedTest | ||
@EnumSource(HoodieTableType.class) | ||
void testDeltaSyncWithAutoKeyGenAndImmutableOperations(HoodieTableType tableType) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we try a mix of bulk insert, insert and upsert.
even though we might set upsert, for auto key gen, hudi should automatically switch it to insert.
lets validate that from commit metadata as well.
7d8b75a
to
b81f6d4
Compare
hey @linliu-code : this patch is still awaiting for you to address feedback. |
Change Logs
Impact
Verified if auto keygen works as expected when there are 10s of immutable or mutable operations with table services enabled.
Risk level (write none, low medium or high below)
None.
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist