Skip to content

Commit 3d4dca0

Browse files
authored
[server] Fix unstable test DefaultAuthorizerTest#testHighConcurrencyModificationOfResourceAcls (apache#828)
1 parent 931382a commit 3d4dca0

File tree

1 file changed

+33
-12
lines changed

1 file changed

+33
-12
lines changed

fluss-server/src/test/java/com/alibaba/fluss/server/authorizer/DefaultAuthorizerTest.java

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -480,10 +480,9 @@ void testDistributedConcurrentModificationOfResourceAcls() throws Exception {
480480

481481
retry(
482482
Duration.ofMinutes(1),
483-
() -> {
484-
assertThat(listAcls(authorizer, commonResource))
485-
.isEqualTo(new HashSet<>(Arrays.asList(acl1, acl2)));
486-
});
483+
() ->
484+
assertThat(listAcls(authorizer, commonResource))
485+
.isEqualTo(new HashSet<>(Arrays.asList(acl1, acl2))));
487486

488487
retry(
489488
Duration.ofMinutes(1),
@@ -499,6 +498,7 @@ void testHighConcurrencyModificationOfResourceAcls() throws Exception {
499498
// generate 50 concurrent acl operation tasks.
500499
List<Runnable> concurrentTasks = new ArrayList<>();
501500
Set<AccessControlEntry> expectedAcls = new HashSet<>();
501+
502502
for (int i = 0; i < 50; i++) {
503503
// each task represent that add acl to different user on same resource.
504504
final AccessControlEntry accessControlEntry =
@@ -519,10 +519,22 @@ void testHighConcurrencyModificationOfResourceAcls() throws Exception {
519519
}
520520

521521
if (finalI % 10 == 0) {
522-
dropAcls(
523-
authorizer2,
524-
commonResource,
525-
Collections.singleton(accessControlEntry));
522+
// If cache is still empty because modify notification is not arrived,
523+
// AclBindingFilter will match nothing. Thus retry to make sure that the
524+
// acl is deleted.
525+
retry(
526+
Duration.ofMinutes(1),
527+
() ->
528+
assertThat(
529+
dropAcls(
530+
authorizer2,
531+
commonResource,
532+
Collections.singleton(
533+
accessControlEntry)))
534+
.containsExactly(
535+
new AclBinding(
536+
commonResource,
537+
accessControlEntry)));
526538
}
527539
};
528540
concurrentTasks.add(runnable);
@@ -533,14 +545,13 @@ void testHighConcurrencyModificationOfResourceAcls() throws Exception {
533545
}
534546

535547
runInConcurrent(concurrentTasks);
548+
// Make sure all acl notifacations are arrived.
536549
retry(
537550
Duration.ofMinutes(1),
538551
() -> assertThat(listAcls(authorizer, commonResource)).isEqualTo(expectedAcls));
539552
retry(
540553
Duration.ofMinutes(1),
541-
() -> {
542-
assertThat(listAcls(authorizer2, commonResource)).isEqualTo(expectedAcls);
543-
});
554+
() -> assertThat(listAcls(authorizer2, commonResource)).isEqualTo(expectedAcls));
544555
}
545556

546557
@Test
@@ -597,7 +608,8 @@ Set<AccessControlEntry> listAcls(Authorizer authorizer, Resource resource) {
597608
.collect(Collectors.toSet());
598609
}
599610

600-
void dropAcls(Authorizer authorizer, Resource resource, Set<AccessControlEntry> entries) {
611+
List<AclBinding> dropAcls(
612+
Authorizer authorizer, Resource resource, Set<AccessControlEntry> entries) {
601613
List<AclBindingFilter> aclBindings =
602614
entries.stream()
603615
.map(
@@ -611,14 +623,23 @@ void dropAcls(Authorizer authorizer, Resource resource, Set<AccessControlEntry>
611623
entry.getOperationType(),
612624
entry.getPermissionType())))
613625
.collect(Collectors.toList());
626+
List<AclBinding> deleteAclBindings = new ArrayList<>();
614627
authorizer
615628
.dropAcls(createRootUserSession(), aclBindings)
616629
.forEach(
617630
result -> {
618631
if (result.error().isPresent()) {
619632
throw result.error().get().exception();
633+
} else {
634+
deleteAclBindings.addAll(
635+
result.aclBindingDeleteResults().stream()
636+
.map(
637+
AclDeleteResult.AclBindingDeleteResult
638+
::aclBinding)
639+
.collect(Collectors.toList()));
620640
}
621641
});
642+
return deleteAclBindings;
622643
}
623644

624645
/**

0 commit comments

Comments
 (0)