Skip to content

Commit d37016b

Browse files
authored
[test] Fix unstable test cases in FlussAuthorizationITCase (#1102)
1 parent 23f6b1c commit d37016b

File tree

5 files changed

+109
-94
lines changed

5 files changed

+109
-94
lines changed

fluss-client/src/test/java/com/alibaba/fluss/client/security/acl/FlussAuthorizationITCase.java

Lines changed: 55 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -171,32 +171,31 @@ void testNoAuthorizer() throws Exception {
171171
conf.setString("client.security.sasl.password", "password");
172172
try (Connection connection = ConnectionFactory.createConnection(conf);
173173
Admin admin = connection.getAdmin()) {
174-
assertThatThrownBy(
175-
() -> {
176-
admin.listAcls(AclBindingFilter.ANY).get();
177-
})
174+
assertThatThrownBy(() -> admin.listAcls(AclBindingFilter.ANY).get())
178175
.hasMessageContaining("No Authorizer is configured.");
179176
assertThatThrownBy(
180-
() -> {
181-
admin.createAcls(
182-
Collections.singletonList(
183-
new AclBinding(
184-
Resource.cluster(),
185-
new AccessControlEntry(
186-
WILD_CARD_PRINCIPAL,
187-
WILD_CARD_HOST,
188-
OperationType.CREATE,
189-
PermissionType.ALLOW))))
190-
.all()
191-
.get();
192-
})
177+
() ->
178+
admin.createAcls(
179+
Collections.singletonList(
180+
new AclBinding(
181+
Resource.cluster(),
182+
new AccessControlEntry(
183+
WILD_CARD_PRINCIPAL,
184+
WILD_CARD_HOST,
185+
OperationType
186+
.CREATE,
187+
PermissionType
188+
.ALLOW))))
189+
.all()
190+
.get())
193191
.hasMessageContaining("No Authorizer is configured.");
194192
assertThatThrownBy(
195-
() -> {
196-
admin.dropAcls(Collections.singletonList(AclBindingFilter.ANY))
197-
.all()
198-
.get();
199-
})
193+
() ->
194+
admin.dropAcls(
195+
Collections.singletonList(
196+
AclBindingFilter.ANY))
197+
.all()
198+
.get())
200199
.hasMessageContaining("No Authorizer is configured.");
201200

202201
// test initWriter without authorizer and empty table paths
@@ -455,21 +454,15 @@ void testInitWriter() throws Exception {
455454
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(1).build();
456455
rootAdmin.createTable(writeAclTable, descriptor, false).get();
457456
// create acl to allow guest write.
458-
rootAdmin
459-
.createAcls(
460-
Collections.singletonList(
461-
new AclBinding(
462-
Resource.table(writeAclTable),
463-
new AccessControlEntry(
464-
guestPrincipal,
465-
"*",
466-
OperationType.WRITE,
467-
PermissionType.ALLOW))))
468-
.all()
469-
.get();
457+
AclBinding aclBinding =
458+
new AclBinding(
459+
Resource.table(writeAclTable),
460+
new AccessControlEntry(
461+
guestPrincipal, "*", OperationType.WRITE, PermissionType.ALLOW));
462+
rootAdmin.createAcls(Collections.singletonList(aclBinding)).all().get();
470463

471-
FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(
472-
rootAdmin.getTableInfo(writeAclTable).get().getTableId());
464+
FLUSS_CLUSTER_EXTENSION.waitUtilTableReadyWithAuthorization(
465+
rootAdmin.getTableInfo(writeAclTable).get().getTableId(), aclBinding);
473466

474467
FlussConnection flussConnection = (FlussConnection) guestConn;
475468
TabletServerGateway tabletServerGateway =
@@ -520,35 +513,24 @@ void testProduceWithNoWriteAuthorization() throws Exception {
520513
rootAdmin.createTable(noWriteAclTable, descriptor, false).get();
521514

522515
// create acl to allow guest write for writeAclTable.
523-
rootAdmin
524-
.createAcls(
525-
Collections.singletonList(
526-
new AclBinding(
527-
Resource.table(writeAclTable),
528-
new AccessControlEntry(
529-
guestPrincipal,
530-
"*",
531-
OperationType.WRITE,
532-
PermissionType.ALLOW))))
533-
.all()
534-
.get();
535-
rootAdmin
536-
.createAcls(
537-
Collections.singletonList(
538-
new AclBinding(
539-
Resource.table(noWriteAclTable),
540-
new AccessControlEntry(
541-
guestPrincipal,
542-
"*",
543-
OperationType.READ,
544-
PermissionType.ALLOW))))
545-
.all()
546-
.get();
547-
548-
FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(
549-
rootAdmin.getTableInfo(writeAclTable).get().getTableId());
550-
FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(
551-
rootAdmin.getTableInfo(noWriteAclTable).get().getTableId());
516+
AclBinding aclBindingOfWriteAclTable =
517+
new AclBinding(
518+
Resource.table(writeAclTable),
519+
new AccessControlEntry(
520+
guestPrincipal, "*", OperationType.WRITE, PermissionType.ALLOW));
521+
AclBinding aclBindingOfNoWriteAclTable =
522+
new AclBinding(
523+
Resource.table(noWriteAclTable),
524+
new AccessControlEntry(guestPrincipal, "*", READ, PermissionType.ALLOW));
525+
rootAdmin.createAcls(Collections.singletonList(aclBindingOfWriteAclTable)).all().get();
526+
rootAdmin.createAcls(Collections.singletonList(aclBindingOfNoWriteAclTable)).all().get();
527+
528+
FLUSS_CLUSTER_EXTENSION.waitUtilTableReadyWithAuthorization(
529+
rootAdmin.getTableInfo(writeAclTable).get().getTableId(),
530+
aclBindingOfWriteAclTable);
531+
FLUSS_CLUSTER_EXTENSION.waitUtilTableReadyWithAuthorization(
532+
rootAdmin.getTableInfo(noWriteAclTable).get().getTableId(),
533+
aclBindingOfNoWriteAclTable);
552534

553535
// 1. Try to write data to noWriteAclTable. It should throw AuthorizationException because
554536
// of request writeId failed.
@@ -595,20 +577,14 @@ void testProduceAndConsumer() throws Exception {
595577
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(1).build();
596578
rootAdmin.createTable(DATA1_TABLE_PATH, descriptor, false).get();
597579
// create acl to allow guest write.
598-
rootAdmin
599-
.createAcls(
600-
Collections.singletonList(
601-
new AclBinding(
602-
Resource.table(DATA1_TABLE_PATH),
603-
new AccessControlEntry(
604-
guestPrincipal,
605-
"*",
606-
OperationType.WRITE,
607-
PermissionType.ALLOW))))
608-
.all()
609-
.get();
610-
FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(
611-
rootAdmin.getTableInfo(DATA1_TABLE_PATH).get().getTableId());
580+
AclBinding aclBinding =
581+
new AclBinding(
582+
Resource.table(DATA1_TABLE_PATH),
583+
new AccessControlEntry(
584+
guestPrincipal, "*", OperationType.WRITE, PermissionType.ALLOW));
585+
rootAdmin.createAcls(Collections.singletonList(aclBinding)).all().get();
586+
FLUSS_CLUSTER_EXTENSION.waitUtilTableReadyWithAuthorization(
587+
rootAdmin.getTableInfo(DATA1_TABLE_PATH).get().getTableId(), aclBinding);
612588
try (Table table = guestConn.getTable(DATA1_TABLE_PATH)) {
613589
AppendWriter appendWriter = table.newAppend().createWriter();
614590
appendWriter.append(row(1, "a")).get();

fluss-server/src/main/java/com/alibaba/fluss/server/RpcServiceBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public abstract class RpcServiceBase extends RpcGatewayService implements AdminR
137137
private final ApiManager apiManager;
138138
protected final ZooKeeperClient zkClient;
139139
protected final MetadataManager metadataManager;
140-
protected final Authorizer authorizer;
140+
protected final @Nullable Authorizer authorizer;
141141

142142
private long tokenLastUpdateTimeMs = 0;
143143
private ObtainedSecurityToken securityToken = null;

fluss-server/src/main/java/com/alibaba/fluss/server/authorizer/DefaultAuthorizer.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.alibaba.fluss.server.authorizer;
1818

19+
import com.alibaba.fluss.annotation.VisibleForTesting;
1920
import com.alibaba.fluss.config.ConfigOptions;
2021
import com.alibaba.fluss.config.Configuration;
2122
import com.alibaba.fluss.exception.ApiException;
@@ -116,7 +117,6 @@ public class DefaultAuthorizer extends AbstractAuthorizer implements FatalErrorH
116117
OPS_MAPPING = Collections.unmodifiableMap(map);
117118
}
118119

119-
private final Configuration configuration;
120120
private final Set<FlussPrincipal> superUsers;
121121

122122
private final ZooKeeperClient zooKeeperClient;
@@ -131,7 +131,7 @@ public class DefaultAuthorizer extends AbstractAuthorizer implements FatalErrorH
131131
private final HashMap<ResourceTypeKey, Set<String>> resourceCache = new HashMap<>();
132132

133133
public DefaultAuthorizer(AuthorizationPlugin.Context context) {
134-
this.configuration = context.getConfiguration();
134+
Configuration configuration = context.getConfiguration();
135135
this.superUsers = parseSuperUsers(configuration);
136136
if (context.getZooKeeperClient().isPresent()) {
137137
this.zooKeeperClient = context.getZooKeeperClient().get();
@@ -203,21 +203,20 @@ public List<AclCreateResult> addAcls(Session session, List<AclBinding> aclBindin
203203
});
204204
entries.values()
205205
.forEach(
206-
idx -> {
207-
results[idx] =
208-
AclCreateResult.success(
209-
aclBindings.get(idx));
210-
});
206+
idx ->
207+
results[idx] =
208+
AclCreateResult.success(
209+
aclBindings.get(idx)));
211210

212211
} catch (Throwable e) {
213212
ApiException exception = ApiError.fromThrowable(e).exception();
214213
entries.values()
215214
.forEach(
216-
idx -> {
217-
results[idx] =
218-
new AclCreateResult(
219-
aclBindings.get(idx), exception);
220-
});
215+
idx ->
216+
results[idx] =
217+
new AclCreateResult(
218+
aclBindings.get(idx),
219+
exception));
221220
}
222221
});
223222
}
@@ -470,7 +469,8 @@ private void updateAclChangedFlag(Resource resource) {
470469
}
471470
}
472471

473-
boolean aclsAllowAccess(
472+
@VisibleForTesting
473+
public boolean aclsAllowAccess(
474474
Resource resource, FlussPrincipal principal, OperationType operation, String host) {
475475
Set<AccessControlEntry> accessControlEntries = matchingAcls(resource);
476476
return isEmptyAclAndAuthorized(resource, accessControlEntries)
@@ -684,7 +684,7 @@ public String toString() {
684684
* zknode version.
685685
*/
686686
public static class VersionedAcls {
687-
Set<AccessControlEntry> acls;
687+
public Set<AccessControlEntry> acls;
688688
int zkVersion;
689689

690690
public VersionedAcls(int zkVersion, Set<AccessControlEntry> acls) {

fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,11 @@ public ReplicaManager getReplicaManager() {
422422
return replicaManager;
423423
}
424424

425+
@VisibleForTesting
426+
public @Nullable Authorizer getAuthorizer() {
427+
return authorizer;
428+
}
429+
425430
private static void validateConfigs(Configuration conf) {
426431
Optional<Integer> serverId = conf.getOptional(ConfigOptions.TABLET_SERVER_ID);
427432
if (!serverId.isPresent()) {

fluss-server/src/test/java/com/alibaba/fluss/server/testutils/FlussClusterExtension.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@
3838
import com.alibaba.fluss.rpc.messages.PbNotifyLeaderAndIsrReqForBucket;
3939
import com.alibaba.fluss.rpc.messages.StopReplicaRequest;
4040
import com.alibaba.fluss.rpc.metrics.ClientMetricGroup;
41+
import com.alibaba.fluss.security.acl.AccessControlEntry;
42+
import com.alibaba.fluss.security.acl.AclBinding;
43+
import com.alibaba.fluss.server.authorizer.Authorizer;
44+
import com.alibaba.fluss.server.authorizer.DefaultAuthorizer;
4145
import com.alibaba.fluss.server.coordinator.CoordinatorServer;
4246
import com.alibaba.fluss.server.coordinator.MetadataManager;
4347
import com.alibaba.fluss.server.entity.NotifyLeaderAndIsrData;
@@ -508,6 +512,13 @@ public void waitUtilAllGatewayHasSameMetadata() {
508512

509513
/** Wait until all the table assignments buckets are ready for table. */
510514
public void waitUtilTableReady(long tableId) {
515+
waitUtilTableReadyWithAuthorization(tableId, null);
516+
}
517+
518+
/**
519+
* Wait until all the table assignments buckets and required authorization are ready for table.
520+
*/
521+
public void waitUtilTableReadyWithAuthorization(long tableId, @Nullable AclBinding aclBinding) {
511522
ZooKeeperClient zkClient = getZooKeeperClient();
512523
retry(
513524
Duration.ofMinutes(1),
@@ -516,6 +527,29 @@ public void waitUtilTableReady(long tableId) {
516527
zkClient.getTableAssignment(tableId);
517528
assertThat(tableAssignmentOpt).isPresent();
518529
waitReplicaInAssignmentReady(zkClient, tableAssignmentOpt.get(), tableId, null);
530+
531+
if (aclBinding != null) {
532+
getTabletServers()
533+
.forEach(
534+
ts -> {
535+
Authorizer authorizer = ts.getAuthorizer();
536+
assertThat(authorizer).isNotNull();
537+
AccessControlEntry accessControlEntry =
538+
aclBinding.getAccessControlEntry();
539+
assertThat(
540+
((DefaultAuthorizer) authorizer)
541+
.aclsAllowAccess(
542+
aclBinding
543+
.getResource(),
544+
accessControlEntry
545+
.getPrincipal(),
546+
accessControlEntry
547+
.getOperationType(),
548+
accessControlEntry
549+
.getHost()))
550+
.isTrue();
551+
});
552+
}
519553
});
520554
}
521555

0 commit comments

Comments
 (0)