Skip to content

Commit 06418f8

Browse files
gyang94wuchong
authored andcommitted
[server] authorize InitWriter RPC with table path parameters
1 parent def979f commit 06418f8

File tree

6 files changed

+135
-4
lines changed

6 files changed

+135
-4
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/write/IdempotenceManager.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
import com.alibaba.fluss.annotation.VisibleForTesting;
2121
import com.alibaba.fluss.exception.OutOfOrderSequenceException;
2222
import com.alibaba.fluss.exception.UnknownWriterIdException;
23+
import com.alibaba.fluss.metadata.PhysicalTablePath;
2324
import com.alibaba.fluss.metadata.TableBucket;
25+
import com.alibaba.fluss.metadata.TablePath;
2426
import com.alibaba.fluss.record.LogRecordBatch;
2527
import com.alibaba.fluss.rpc.gateway.TabletServerGateway;
2628
import com.alibaba.fluss.rpc.messages.InitWriterRequest;
@@ -32,6 +34,8 @@
3234
import javax.annotation.concurrent.ThreadSafe;
3335

3436
import java.util.Optional;
37+
import java.util.Set;
38+
import java.util.stream.Collectors;
3539

3640
import static com.alibaba.fluss.record.LogRecordBatch.NO_WRITER_ID;
3741

@@ -277,11 +281,11 @@ synchronized boolean canRetry(WriteBatch batch, Errors error) {
277281
return false;
278282
}
279283

280-
void maybeWaitForWriterId() {
284+
void maybeWaitForWriterId(Set<PhysicalTablePath> tablePaths) {
281285
if (!isWriterIdValid()) {
282286
try {
283287
tabletServerGateway
284-
.initWriter(new InitWriterRequest())
288+
.initWriter(prepareInitWriterRequest(tablePaths))
285289
.thenAccept(response -> setWriterId(response.getWriterId()))
286290
.exceptionally(
287291
e -> {
@@ -297,6 +301,21 @@ void maybeWaitForWriterId() {
297301
}
298302
}
299303

304+
InitWriterRequest prepareInitWriterRequest(Set<PhysicalTablePath> physicalTables) {
305+
InitWriterRequest initWriterRequest = new InitWriterRequest();
306+
Set<TablePath> tables =
307+
physicalTables.stream()
308+
.map(PhysicalTablePath::getTablePath)
309+
.collect(Collectors.toSet());
310+
for (TablePath tablePath : tables) {
311+
initWriterRequest
312+
.addTablePath()
313+
.setDatabaseName(tablePath.getDatabaseName())
314+
.setTableName(tablePath.getTableName());
315+
}
316+
return initWriterRequest;
317+
}
318+
300319
private int maybeUpdateLastAckedSequence(TableBucket tableBucket, int sequence) {
301320
return idempotenceBucketMap.maybeUpdateLastAckedSequence(tableBucket, sequence);
302321
}

fluss-client/src/main/java/com/alibaba/fluss/client/write/RecordAccumulator.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,10 @@ public Deque<WriteBatch> getDeque(PhysicalTablePath path, TableBucket tableBucke
360360
return bucketAndWriteBatches.batches.get(tableBucket.getBucket());
361361
}
362362

363+
public Set<PhysicalTablePath> getPhysicalTablePathsInBatches() {
364+
return writeBatches.keySet();
365+
}
366+
363367
private List<MemorySegment> allocateMemorySegments(WriteRecord writeRecord) throws IOException {
364368
if (writeRecord.getWriteFormat() == WriteFormat.ARROW_LOG) {
365369
// pre-allocate a batch memory size for Arrow, if it is not sufficient during batching,

fluss-client/src/main/java/com/alibaba/fluss/client/write/Sender.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,9 @@ public void run() {
172172
public void runOnce() throws Exception {
173173
if (idempotenceManager.idempotenceEnabled()) {
174174
// may be wait for writer id.
175-
idempotenceManager.maybeWaitForWriterId();
175+
Set<PhysicalTablePath> targetTables = accumulator.getPhysicalTablePathsInBatches();
176+
// TODO: only request to init writer_id when we have valid target tables
177+
idempotenceManager.maybeWaitForWriterId(targetTables);
176178
}
177179

178180
// do send.

fluss-client/src/test/java/com/alibaba/fluss/client/security/acl/FlussAuthorizationITCase.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.alibaba.fluss.client.Connection;
2020
import com.alibaba.fluss.client.ConnectionFactory;
21+
import com.alibaba.fluss.client.FlussConnection;
2122
import com.alibaba.fluss.client.admin.Admin;
2223
import com.alibaba.fluss.client.table.Table;
2324
import com.alibaba.fluss.client.table.scanner.batch.BatchScanner;
@@ -26,14 +27,19 @@
2627
import com.alibaba.fluss.config.ConfigOptions;
2728
import com.alibaba.fluss.config.Configuration;
2829
import com.alibaba.fluss.config.MemorySize;
30+
import com.alibaba.fluss.exception.AuthorizationException;
2931
import com.alibaba.fluss.metadata.DataLakeFormat;
3032
import com.alibaba.fluss.metadata.DatabaseDescriptor;
3133
import com.alibaba.fluss.metadata.TableBucket;
3234
import com.alibaba.fluss.metadata.TableDescriptor;
35+
import com.alibaba.fluss.metadata.TablePath;
3336
import com.alibaba.fluss.row.InternalRow;
3437
import com.alibaba.fluss.rpc.GatewayClientProxy;
3538
import com.alibaba.fluss.rpc.RpcClient;
3639
import com.alibaba.fluss.rpc.gateway.AdminGateway;
40+
import com.alibaba.fluss.rpc.gateway.TabletServerGateway;
41+
import com.alibaba.fluss.rpc.messages.InitWriterRequest;
42+
import com.alibaba.fluss.rpc.messages.InitWriterResponse;
3743
import com.alibaba.fluss.rpc.messages.MetadataRequest;
3844
import com.alibaba.fluss.rpc.metrics.TestingClientMetricGroup;
3945
import com.alibaba.fluss.security.acl.AccessControlEntry;
@@ -190,6 +196,14 @@ void testNoAuthorizer() throws Exception {
190196
.get();
191197
})
192198
.hasMessageContaining("No Authorizer is configured.");
199+
200+
// test initWriter without authorizer and empty table paths
201+
FlussConnection flussConnection = (FlussConnection) connection;
202+
TabletServerGateway tabletServerGateway =
203+
flussConnection.getMetadataUpdater().newTabletServerClientForNode(0);
204+
InitWriterResponse response =
205+
tabletServerGateway.initWriter(new InitWriterRequest()).get();
206+
assertThat(response.getWriterId()).isGreaterThanOrEqualTo(0);
193207
}
194208

195209
} finally {
@@ -430,6 +444,70 @@ void testGetMetaInfo() throws Exception {
430444
}
431445
}
432446

447+
@Test
448+
void testInitWriter() throws Exception {
449+
TablePath writeAclTable = TablePath.of("test_db_1", "write_acl_table");
450+
TablePath noWriteAclTable = TablePath.of("test_db_1", "no_write_acl_table");
451+
452+
TableDescriptor descriptor =
453+
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(1).build();
454+
rootAdmin.createTable(writeAclTable, descriptor, false).get();
455+
// create acl to allow guest write.
456+
rootAdmin
457+
.createAcls(
458+
Collections.singletonList(
459+
new AclBinding(
460+
Resource.table(writeAclTable),
461+
new AccessControlEntry(
462+
guestPrincipal,
463+
"*",
464+
OperationType.WRITE,
465+
PermissionType.ALLOW))))
466+
.all()
467+
.get();
468+
469+
FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(
470+
rootAdmin.getTableInfo(writeAclTable).get().getTableId());
471+
472+
FlussConnection flussConnection = (FlussConnection) guestConn;
473+
TabletServerGateway tabletServerGateway =
474+
flussConnection.getMetadataUpdater().newTabletServerClientForNode(0);
475+
476+
// test 1: empty table paths
477+
assertThatThrownBy(() -> tabletServerGateway.initWriter(new InitWriterRequest()).get())
478+
.cause()
479+
.isInstanceOf(AuthorizationException.class)
480+
.hasMessageContaining(
481+
"The request of InitWriter requires non empty table paths for authorization.");
482+
483+
// request contains a table path without permission
484+
InitWriterRequest noAclRequest = new InitWriterRequest();
485+
noAclRequest
486+
.addTablePath()
487+
.setDatabaseName(noWriteAclTable.getDatabaseName())
488+
.setTableName(noWriteAclTable.getTableName());
489+
490+
// test 2: no table has write permission
491+
assertThatThrownBy(() -> tabletServerGateway.initWriter(noAclRequest).get())
492+
.cause()
493+
.isInstanceOf(AuthorizationException.class)
494+
.hasMessageContaining(
495+
"No WRITE permission among all the tables: [test_db_1.no_write_acl_table]");
496+
497+
// request contains both a table path with/without permission
498+
InitWriterRequest request = new InitWriterRequest();
499+
request.addTablePath()
500+
.setTableName(writeAclTable.getTableName())
501+
.setDatabaseName(writeAclTable.getDatabaseName());
502+
request.addTablePath()
503+
.setTableName(noWriteAclTable.getTableName())
504+
.setDatabaseName(noWriteAclTable.getDatabaseName());
505+
506+
// test 3: one table has write permission, the other doesn't have permission
507+
InitWriterResponse response = tabletServerGateway.initWriter(request).get();
508+
assertThat(response.getWriterId()).isGreaterThanOrEqualTo(0);
509+
}
510+
433511
@Test
434512
void testProduceAndConsumer() throws Exception {
435513
TableDescriptor descriptor =

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,7 @@ message GetFileSystemSecurityTokenResponse {
367367

368368
// init writer request and response
369369
message InitWriterRequest {
370+
repeated PbTablePath table_path = 1;
370371
}
371372

372373
message InitWriterResponse {

fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletService.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.alibaba.fluss.fs.FileSystem;
2323
import com.alibaba.fluss.metadata.PhysicalTablePath;
2424
import com.alibaba.fluss.metadata.TableBucket;
25+
import com.alibaba.fluss.metadata.TablePath;
2526
import com.alibaba.fluss.record.KvRecordBatch;
2627
import com.alibaba.fluss.record.MemoryLogRecords;
2728
import com.alibaba.fluss.rpc.entity.FetchLogResultForBucket;
@@ -68,6 +69,7 @@
6869
import com.alibaba.fluss.server.log.ListOffsetsParam;
6970
import com.alibaba.fluss.server.metadata.ServerMetadataCache;
7071
import com.alibaba.fluss.server.replica.ReplicaManager;
72+
import com.alibaba.fluss.server.utils.ServerRpcMessageUtils;
7173
import com.alibaba.fluss.server.zk.ZooKeeperClient;
7274

7375
import javax.annotation.Nullable;
@@ -312,7 +314,11 @@ public CompletableFuture<ListOffsetsResponse> listOffsets(ListOffsetsRequest req
312314

313315
@Override
314316
public CompletableFuture<InitWriterResponse> initWriter(InitWriterRequest request) {
315-
// todo: add authorization for table acl until https://github.com/alibaba/fluss/issues/756.
317+
List<TablePath> tablePathsList =
318+
request.getTablePathsList().stream()
319+
.map(ServerRpcMessageUtils::toTablePath)
320+
.collect(Collectors.toList());
321+
authorizeAnyTable(WRITE, tablePathsList);
316322
CompletableFuture<InitWriterResponse> response = new CompletableFuture<>();
317323
response.complete(makeInitWriterResponse(metadataManager.initWriterId()));
318324
return response;
@@ -364,6 +370,27 @@ private void authorizeTable(OperationType operationType, long tableId) {
364370
}
365371
}
366372

373+
private void authorizeAnyTable(OperationType operationType, List<TablePath> tablePaths) {
374+
if (authorizer != null) {
375+
if (tablePaths.isEmpty()) {
376+
throw new AuthorizationException(
377+
"The request of InitWriter requires non empty table paths for authorization.");
378+
}
379+
380+
for (TablePath tablePath : tablePaths) {
381+
Resource tableResource = Resource.table(tablePath);
382+
if (authorizer.isAuthorized(currentSession(), operationType, tableResource)) {
383+
// authorized success if one of the tables has the permission
384+
return;
385+
}
386+
}
387+
throw new AuthorizationException(
388+
String.format(
389+
"No %s permission among all the tables: %s",
390+
operationType, tablePaths));
391+
}
392+
}
393+
367394
/**
368395
* Authorize the given request data for each table bucket, and return the successfully
369396
* authorized request data. The failed authorization will be put into the errorResponseMap.

0 commit comments

Comments
 (0)