Skip to content

Commit ab80723

Browse files
[client] Improve: add detailed context to exception messages (#1806)
1 parent 968d3a4 commit ab80723

File tree

2 files changed

+60
-24
lines changed

2 files changed

+60
-24
lines changed

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,11 @@ public Scan project(List<String> projectedColumnNames) {
7474
int index = rowType.getFieldIndex(projectedColumnNames.get(i));
7575
if (index < 0) {
7676
throw new IllegalArgumentException(
77-
"Field " + projectedColumnNames.get(i) + " not found in table schema.");
77+
String.format(
78+
"Field '%s' not found in table schema. Available fields: %s, Table: %s",
79+
projectedColumnNames.get(i),
80+
rowType.getFieldNames(),
81+
tableInfo.getTablePath()));
7882
}
7983
columnIndexes[i] = index;
8084
}
@@ -89,7 +93,10 @@ public Scan limit(int rowNumber) {
8993
@Override
9094
public LogScanner createLogScanner() {
9195
if (limit != null) {
92-
throw new UnsupportedOperationException("LogScanner doesn't support limit pushdown.");
96+
throw new UnsupportedOperationException(
97+
String.format(
98+
"LogScanner doesn't support limit pushdown. Table: %s, requested limit: %d",
99+
tableInfo.getTablePath(), limit));
93100
}
94101
return new LogScannerImpl(
95102
conn.getConfiguration(),
@@ -104,7 +111,9 @@ public LogScanner createLogScanner() {
104111
public BatchScanner createBatchScanner(TableBucket tableBucket) {
105112
if (limit == null) {
106113
throw new UnsupportedOperationException(
107-
"Currently, BatchScanner is only available when limit is set.");
114+
String.format(
115+
"Currently, BatchScanner is only available when limit is set. Table: %s, bucket: %s",
116+
tableInfo.getTablePath(), tableBucket));
108117
}
109118
return new LimitBatchScanner(
110119
tableInfo, tableBucket, conn.getMetadataUpdater(), projectedColumns, limit);
@@ -114,7 +123,9 @@ public BatchScanner createBatchScanner(TableBucket tableBucket) {
114123
public BatchScanner createBatchScanner(TableBucket tableBucket, long snapshotId) {
115124
if (limit != null) {
116125
throw new UnsupportedOperationException(
117-
"Currently, SnapshotBatchScanner doesn't support limit pushdown.");
126+
String.format(
127+
"Currently, SnapshotBatchScanner doesn't support limit pushdown. Table: %s, bucket: %s, snapshot ID: %d, requested limit: %d",
128+
tableInfo.getTablePath(), tableBucket, snapshotId, limit));
118129
}
119130
String scannerTmpDir =
120131
conn.getConfiguration().getString(ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR);
@@ -123,7 +134,11 @@ public BatchScanner createBatchScanner(TableBucket tableBucket, long snapshotId)
123134
try {
124135
snapshotMeta = admin.getKvSnapshotMetadata(tableBucket, snapshotId).get();
125136
} catch (Exception e) {
126-
throw new FlussRuntimeException("Failed to get snapshot metadata", e);
137+
throw new FlussRuntimeException(
138+
String.format(
139+
"Failed to get snapshot metadata for table bucket %s, snapshot ID: %d, Table: %s",
140+
tableBucket, snapshotId, tableInfo.getTablePath()),
141+
e);
127142
}
128143

129144
return new KvSnapshotBatchScanner(

fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,16 @@ public WriterClient(
9292
MetadataUpdater metadataUpdater,
9393
ClientMetricGroup clientMetricGroup,
9494
Admin admin) {
95+
int maxRequestSizeLocal = -1;
96+
IdempotenceManager idempotenceManagerLocal = null;
9597
try {
9698
this.conf = conf;
9799
this.metadataUpdater = metadataUpdater;
98-
this.maxRequestSize =
100+
maxRequestSizeLocal =
99101
(int) conf.get(ConfigOptions.CLIENT_WRITER_REQUEST_MAX_SIZE).getBytes();
100-
this.idempotenceManager = buildIdempotenceManager();
102+
this.maxRequestSize = maxRequestSizeLocal;
103+
idempotenceManagerLocal = buildIdempotenceManager();
104+
this.idempotenceManager = idempotenceManagerLocal;
101105
this.writerMetricGroup = new WriterMetricGroup(clientMetricGroup);
102106

103107
short acks = configureAcks(idempotenceManager.idempotenceEnabled());
@@ -117,7 +121,14 @@ public WriterClient(
117121
this::maybeAbortBatches);
118122
} catch (Throwable t) {
119123
close(Duration.ofMillis(0));
120-
throw new FlussRuntimeException("Failed to construct writer", t);
124+
throw new FlussRuntimeException(
125+
String.format(
126+
"Failed to construct writer. Max request size: %d bytes, Idempotence enabled: %b",
127+
maxRequestSizeLocal,
128+
idempotenceManagerLocal != null
129+
? idempotenceManagerLocal.idempotenceEnabled()
130+
: false),
131+
t);
121132
}
122133
}
123134

@@ -148,7 +159,11 @@ public void flush() {
148159
try {
149160
accumulator.awaitFlushCompletion();
150161
} catch (InterruptedException e) {
151-
throw new FlussRuntimeException("Flush interrupted." + e);
162+
throw new FlussRuntimeException(
163+
String.format(
164+
"Flush interrupted after %d ms. Writer may be in inconsistent state",
165+
System.currentTimeMillis() - start),
166+
e);
152167
}
153168
LOG.trace(
154169
"Flushed accumulated records in writer in {} ms.",
@@ -196,7 +211,12 @@ private void doSend(WriteRecord record, WriteCallback callback) {
196211
// TODO add the wakeup logic refer to Kafka.
197212
}
198213
} catch (Exception e) {
199-
throw new FlussRuntimeException(e);
214+
throw new FlussRuntimeException(
215+
String.format(
216+
"Failed to send record to table %s. Writer state: %s",
217+
record.getPhysicalTablePath(),
218+
sender != null && sender.isRunning() ? "running" : "closed"),
219+
e);
200220
}
201221
}
202222

@@ -212,7 +232,10 @@ private void maybeAbortBatches(Throwable t) {
212232
private void throwIfWriterClosed() {
213233
if (sender == null || !sender.isRunning()) {
214234
throw new IllegalStateException(
215-
"Cannot perform operation after writer has been closed");
235+
String.format(
236+
"Cannot perform write operation after writer has been closed. Sender running: %b, Thread pool shutdown: %b",
237+
sender != null && sender.isRunning(),
238+
ioThreadPool == null || ioThreadPool.isShutdown()));
216239
}
217240
}
218241

@@ -225,11 +248,11 @@ private IdempotenceManager buildIdempotenceManager() {
225248
&& maxInflightRequestPerBucket
226249
> MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE) {
227250
throw new IllegalConfigurationException(
228-
"The value of "
229-
+ ConfigOptions.CLIENT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET.key()
230-
+ " should be less than or equal to "
231-
+ MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE
232-
+ " when idempotence writer enabled to ensure message ordering.");
251+
String.format(
252+
"Invalid configuration for idempotent writer. The value of %s (%d) should be less than or equal to %d when idempotence is enabled to ensure message ordering",
253+
ConfigOptions.CLIENT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET.key(),
254+
maxInflightRequestPerBucket,
255+
MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE));
233256
}
234257

235258
TabletServerGateway tabletServerGateway = metadataUpdater.newRandomTabletServerClient();
@@ -249,10 +272,9 @@ private short configureAcks(boolean idempotenceEnabled) {
249272

250273
if (idempotenceEnabled && ack != -1) {
251274
throw new IllegalConfigurationException(
252-
"Must set "
253-
+ ConfigOptions.CLIENT_WRITER_ACKS.key()
254-
+ " to 'all' in order to use the idempotent writer. Otherwise "
255-
+ "we cannot guarantee idempotence.");
275+
String.format(
276+
"Invalid acks configuration for idempotent writer. Must set %s to 'all' (current value: '%s') in order to use the idempotent writer. Otherwise we cannot guarantee idempotence",
277+
ConfigOptions.CLIENT_WRITER_ACKS.key(), acks));
256278
}
257279

258280
return ack;
@@ -262,10 +284,9 @@ private int configureRetries(boolean idempotenceEnabled) {
262284
int retries = conf.getInt(ConfigOptions.CLIENT_WRITER_RETRIES);
263285
if (idempotenceEnabled && retries == 0) {
264286
throw new IllegalConfigurationException(
265-
"Must set "
266-
+ ConfigOptions.CLIENT_WRITER_RETRIES.key()
267-
+ " to non-zero when using the idempotent writer. Otherwise "
268-
+ "we cannot guarantee idempotence.");
287+
String.format(
288+
"Invalid retries configuration for idempotent writer. Must set %s to non-zero (current value: %d) when using the idempotent writer. Otherwise we cannot guarantee idempotence",
289+
ConfigOptions.CLIENT_WRITER_RETRIES.key(), retries));
269290
}
270291
return retries;
271292
}

0 commit comments

Comments
 (0)