Skip to content

Commit 5492564

Browse files
swuferhongwuchong
authored andcommitted
[client] Sender trigger to abort all writeBatches if InitWriter failed with AuthorizationException
1 parent 06418f8 commit 5492564

File tree

16 files changed

+379
-29
lines changed

16 files changed

+379
-29
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/write/ArrowLogWriteBatch.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,4 +131,9 @@ public long writerId() {
131131
public int batchSequence() {
132132
return recordsBuilder.batchSequence();
133133
}
134+
135+
@Override
136+
public void abortRecordAppends() {
137+
recordsBuilder.abort();
138+
}
134139
}

fluss-client/src/main/java/com/alibaba/fluss/client/write/IdempotenceManager.java

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
import java.util.Optional;
3737
import java.util.Set;
38+
import java.util.concurrent.ExecutionException;
3839
import java.util.stream.Collectors;
3940

4041
import static com.alibaba.fluss.record.LogRecordBatch.NO_WRITER_ID;
@@ -281,23 +282,13 @@ synchronized boolean canRetry(WriteBatch batch, Errors error) {
281282
return false;
282283
}
283284

284-
void maybeWaitForWriterId(Set<PhysicalTablePath> tablePaths) {
285+
void maybeWaitForWriterId(Set<PhysicalTablePath> tablePaths)
286+
throws ExecutionException, InterruptedException {
285287
if (!isWriterIdValid()) {
286-
try {
287-
tabletServerGateway
288-
.initWriter(prepareInitWriterRequest(tablePaths))
289-
.thenAccept(response -> setWriterId(response.getWriterId()))
290-
.exceptionally(
291-
e -> {
292-
LOG.error("Failed to get writer id from tablet server.", e);
293-
return null;
294-
})
295-
.get(); // TODO: can optimize into async response handling.
296-
} catch (Exception e) {
297-
LOG.error(
298-
"Received an exception while trying to get writer id from tablet server.",
299-
e);
300-
}
288+
tabletServerGateway
289+
.initWriter(prepareInitWriterRequest(tablePaths))
290+
.thenAccept(response -> setWriterId(response.getWriterId()))
291+
.get(); // TODO: can optimize into async response handling.
301292
}
302293
}
303294

fluss-client/src/main/java/com/alibaba/fluss/client/write/IncompleteBatches.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import javax.annotation.concurrent.ThreadSafe;
2222

23+
import java.util.ArrayList;
2324
import java.util.HashSet;
2425
import java.util.Set;
2526
import java.util.stream.Collectors;
@@ -53,6 +54,12 @@ public void remove(WriteBatch batch) {
5354
}
5455
}
5556

57+
public Iterable<WriteBatch> copyAll() {
58+
synchronized (incomplete) {
59+
return new ArrayList<>(this.incomplete);
60+
}
61+
}
62+
5663
public Iterable<WriteBatch.RequestFuture> requestResults() {
5764
synchronized (incomplete) {
5865
return incomplete.stream()

fluss-client/src/main/java/com/alibaba/fluss/client/write/IndexedLogWriteBatch.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,11 @@ public int batchSequence() {
123123
return recordsBuilder.batchSequence();
124124
}
125125

126+
@Override
127+
public void abortRecordAppends() {
128+
recordsBuilder.abort();
129+
}
130+
126131
public void resetWriterState(long writerId, int batchSequence) {
127132
super.resetWriterState(writerId, batchSequence);
128133
recordsBuilder.resetWriterState(writerId, batchSequence);

fluss-client/src/main/java/com/alibaba/fluss/client/write/KvWriteBatch.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,11 @@ public int batchSequence() {
144144
return recordsBuilder.batchSequence();
145145
}
146146

147+
@Override
148+
public void abortRecordAppends() {
149+
recordsBuilder.abort();
150+
}
151+
147152
public void resetWriterState(long writerId, int batchSequence) {
148153
super.resetWriterState(writerId, batchSequence);
149154
recordsBuilder.resetWriterState(writerId, batchSequence);

fluss-client/src/main/java/com/alibaba/fluss/client/write/RecordAccumulator.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,19 @@ public void reEnqueue(WriteBatch batch) {
294294
}
295295
}
296296

297+
/** Abort all incomplete batches (whether they have been sent or not). */
298+
public void abortBatches(final Exception reason) {
299+
for (WriteBatch batch : incomplete.copyAll()) {
300+
Deque<WriteBatch> dq = getDeque(batch.physicalTablePath(), batch.tableBucket());
301+
synchronized (dq) {
302+
batch.abortRecordAppends();
303+
dq.remove(batch);
304+
}
305+
batch.abort(reason);
306+
deallocate(batch);
307+
}
308+
}
309+
297310
/** Get the deque for the given table-bucket, creating it if necessary. */
298311
private Deque<WriteBatch> getOrCreateDeque(
299312
TableBucket tableBucket, PhysicalTablePath physicalTablePath) {

fluss-client/src/main/java/com/alibaba/fluss/client/write/Sender.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import com.alibaba.fluss.rpc.messages.PutKvResponse;
3939
import com.alibaba.fluss.rpc.protocol.ApiError;
4040
import com.alibaba.fluss.rpc.protocol.Errors;
41+
import com.alibaba.fluss.utils.ExceptionUtils;
4142

4243
import org.slf4j.Logger;
4344
import org.slf4j.LoggerFactory;
@@ -174,7 +175,19 @@ public void runOnce() throws Exception {
174175
// may be wait for writer id.
175176
Set<PhysicalTablePath> targetTables = accumulator.getPhysicalTablePathsInBatches();
176177
// TODO: only request to init writer_id when we have valid target tables
177-
idempotenceManager.maybeWaitForWriterId(targetTables);
178+
try {
179+
idempotenceManager.maybeWaitForWriterId(targetTables);
180+
} catch (Exception e) {
181+
Throwable t = ExceptionUtils.stripExecutionException(e);
182+
183+
// TODO: If 'only request to init writer_id when we have valid target tables' have
184+
// been down, this if check can be removed.
185+
if (!targetTables.isEmpty()) {
186+
maybeAbortBatches((Exception) t);
187+
} else {
188+
LOG.trace("No target tables, ignore init writer id error", t);
189+
}
190+
}
178191
}
179192

180193
// do send.
@@ -256,6 +269,13 @@ private void failBatch(WriteBatch batch, Exception exception, boolean adjustBatc
256269
}
257270
}
258271

272+
private void maybeAbortBatches(Exception exception) {
273+
if (accumulator.hasIncomplete()) {
274+
LOG.error("Aborting write batches due to fatal error", exception);
275+
accumulator.abortBatches(exception);
276+
}
277+
}
278+
259279
private void reEnqueueBatch(WriteBatch batch) {
260280
accumulator.reEnqueue(batch);
261281
maybeRemoveFromInflightBatches(batch);

fluss-client/src/main/java/com/alibaba/fluss/client/write/WriteBatch.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ public abstract boolean tryAppend(WriteRecord writeRecord, WriteCallback callbac
111111

112112
public abstract int batchSequence();
113113

114+
public abstract void abortRecordAppends();
115+
114116
public boolean hasBatchSequence() {
115117
return batchSequence() != LogRecordBatch.NO_BATCH_SEQUENCE;
116118
}
@@ -124,6 +126,17 @@ public void resetWriterState(long writerId, int batchSequence) {
124126
reopened = true;
125127
}
126128

129+
/** Abort the batch and complete the future and callbacks. */
130+
public void abort(Exception exception) {
131+
if (!finalState.compareAndSet(null, FinalState.ABORTED)) {
132+
throw new IllegalStateException(
133+
"Batch has already been completed in final stata " + finalState.get());
134+
}
135+
136+
LOG.trace("Abort batch for tableBucket {}", tableBucket, exception);
137+
completeFutureAndFireCallbacks(exception);
138+
}
139+
127140
public boolean sequenceHasBeenReset() {
128141
return reopened;
129142
}
@@ -251,14 +264,8 @@ private boolean done(@Nullable Exception batchException) {
251264

252265
private enum FinalState {
253266
FAILED,
254-
SUCCEEDED
255-
}
256-
257-
/** The type of write batch. */
258-
public enum WriteBatchType {
259-
ARROW_LOG,
260-
INDEXED_LOG,
261-
KV
267+
SUCCEEDED,
268+
ABORTED
262269
}
263270

264271
/** The future for this batch. */

fluss-client/src/test/java/com/alibaba/fluss/client/security/acl/FlussAuthorizationITCase.java

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,85 @@ void testInitWriter() throws Exception {
508508
assertThat(response.getWriterId()).isGreaterThanOrEqualTo(0);
509509
}
510510

511+
@Test
512+
void testProduceWithNoWriteAuthorization() throws Exception {
513+
TablePath writeAclTable = TablePath.of("test_db_1", "write_acl_table_1");
514+
TablePath noWriteAclTable = TablePath.of("test_db_1", "no_write_acl_table_1");
515+
TableDescriptor descriptor =
516+
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(1).build();
517+
rootAdmin.createTable(writeAclTable, descriptor, false).get();
518+
rootAdmin.createTable(noWriteAclTable, descriptor, false).get();
519+
520+
// create acl to allow guest write for writeAclTable.
521+
rootAdmin
522+
.createAcls(
523+
Collections.singletonList(
524+
new AclBinding(
525+
Resource.table(writeAclTable),
526+
new AccessControlEntry(
527+
guestPrincipal,
528+
"*",
529+
OperationType.WRITE,
530+
PermissionType.ALLOW))))
531+
.all()
532+
.get();
533+
rootAdmin
534+
.createAcls(
535+
Collections.singletonList(
536+
new AclBinding(
537+
Resource.table(noWriteAclTable),
538+
new AccessControlEntry(
539+
guestPrincipal,
540+
"*",
541+
OperationType.READ,
542+
PermissionType.ALLOW))))
543+
.all()
544+
.get();
545+
546+
FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(
547+
rootAdmin.getTableInfo(writeAclTable).get().getTableId());
548+
FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(
549+
rootAdmin.getTableInfo(noWriteAclTable).get().getTableId());
550+
551+
// 1. Try to write data to noWriteAclTable. It should throw AuthorizationException because
552+
// of request writeId failed.
553+
try (Table table = guestConn.getTable(noWriteAclTable)) {
554+
AppendWriter appendWriter = table.newAppend().createWriter();
555+
assertThatThrownBy(() -> appendWriter.append(row(1, "a")).get())
556+
.hasRootCauseInstanceOf(AuthorizationException.class)
557+
.rootCause()
558+
.hasMessageContaining(
559+
String.format(
560+
"No WRITE permission among all the tables: %s",
561+
Collections.singletonList(noWriteAclTable)));
562+
}
563+
564+
// 2. Try to write data to writeAclTable. It will success and writeId will be set.
565+
try (Table table = guestConn.getTable(writeAclTable)) {
566+
AppendWriter appendWriter = table.newAppend().createWriter();
567+
appendWriter.append(row(1, "a")).get();
568+
}
569+
570+
// 3. Try to write data to writeAclTable again. It will throw AuthorizationException because
571+
// of no write permission.
572+
// Note: If guestUser have permission for table lists: [writeAclTable, noWriteAclTable].
573+
// When we give WRITE permission to writeAclTable for guestUser, guestUser will have
574+
// INIT_WRITER permission for both writeAclTable and noWriteAclTable.
575+
// In this case, when guestUser try to write noWriteAclTable, Fluss client can get writerId
576+
// but can not to write to noWriteAclTable because of no WRITE permission.
577+
try (Table table = guestConn.getTable(noWriteAclTable)) {
578+
AppendWriter appendWriter = table.newAppend().createWriter();
579+
assertThatThrownBy(() -> appendWriter.append(row(1, "a")).get())
580+
.hasRootCauseInstanceOf(AuthorizationException.class)
581+
.rootCause()
582+
.hasMessageContaining(
583+
String.format(
584+
"No permission to WRITE table %s in database %s",
585+
noWriteAclTable.getTableName(),
586+
noWriteAclTable.getDatabaseName()));
587+
}
588+
}
589+
511590
@Test
512591
void testProduceAndConsumer() throws Exception {
513592
TableDescriptor descriptor =

fluss-client/src/test/java/com/alibaba/fluss/client/write/ArrowLogWriteBatchTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
import java.util.ArrayList;
4242
import java.util.List;
43+
import java.util.concurrent.CompletableFuture;
4344

4445
import static com.alibaba.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
4546
import static com.alibaba.fluss.record.LogRecordReadContext.createArrowReadContext;
@@ -49,6 +50,7 @@
4950
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_INFO;
5051
import static com.alibaba.fluss.testutils.DataTestUtils.row;
5152
import static org.assertj.core.api.Assertions.assertThat;
53+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
5254

5355
/** Test for {@link ArrowLogWriteBatch}. */
5456
public class ArrowLogWriteBatchTest {
@@ -231,6 +233,59 @@ void testArrowCompressionRatioEstimated() throws Exception {
231233
assertThat(currentRatio).isLessThan(1.0f);
232234
}
233235

236+
@Test
237+
void testBatchAborted() throws Exception {
238+
int bucketId = 0;
239+
int maxSizeInBytes = 10240;
240+
ArrowLogWriteBatch arrowLogWriteBatch =
241+
createArrowLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId), maxSizeInBytes);
242+
int recordCount = 5;
243+
List<CompletableFuture<Void>> futures = new ArrayList<>();
244+
for (int i = 0; i < recordCount; i++) {
245+
CompletableFuture<Void> future = new CompletableFuture<>();
246+
arrowLogWriteBatch.tryAppend(
247+
createWriteRecord(row(i, "a" + i)),
248+
exception -> {
249+
if (exception != null) {
250+
future.completeExceptionally(exception);
251+
} else {
252+
future.complete(null);
253+
}
254+
});
255+
futures.add(future);
256+
}
257+
258+
assertThat(writerProvider.freeWriters()).isEmpty();
259+
arrowLogWriteBatch.abortRecordAppends();
260+
arrowLogWriteBatch.abort(new RuntimeException("close with record batch abort"));
261+
262+
// first try to append.
263+
assertThatThrownBy(
264+
() ->
265+
arrowLogWriteBatch.tryAppend(
266+
createWriteRecord(row(1, "a")), newWriteCallback()))
267+
.isInstanceOf(IllegalStateException.class)
268+
.hasMessageContaining(
269+
"Tried to append a record, but MemoryLogRecordsArrowBuilder has already been aborted");
270+
271+
// try to build.
272+
assertThatThrownBy(arrowLogWriteBatch::build)
273+
.isInstanceOf(IllegalStateException.class)
274+
.hasMessageContaining("Attempting to build an aborted record batch");
275+
276+
// verify arrow writer have recycled.
277+
assertThat(writerProvider.freeWriters()).hasSize(1);
278+
assertThat(writerProvider.freeWriters().get("150001-1-ZSTD-3")).isNotNull();
279+
280+
// verify record append future is completed with exception.
281+
for (CompletableFuture<Void> future : futures) {
282+
assertThatThrownBy(future::join)
283+
.rootCause()
284+
.isInstanceOf(RuntimeException.class)
285+
.hasMessageContaining("close with record batch abort");
286+
}
287+
}
288+
234289
private WriteRecord createWriteRecord(GenericRow row) {
235290
return WriteRecord.forArrowAppend(DATA1_PHYSICAL_TABLE_PATH, row, null);
236291
}

0 commit comments

Comments
 (0)