Skip to content

Commit 0a3fdec

Browse files
authored
Merge pull request #2804 from ClickHouse/03/23/26/error_handling
[client-v2] Error handling Requires to merge to unblock testing.
2 parents d907c94 + e3fd015 commit 0a3fdec

4 files changed

Lines changed: 608 additions & 83 deletions

File tree

clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseColumn.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -952,6 +952,10 @@ public String getColumnName() {
952952
return columnName;
953953
}
954954

955+
public String getColumnIndexAndName() {
956+
return (columnIndex + 1) + " (`" + columnName + "`)";
957+
}
958+
955959
public String getOriginalTypeName() {
956960
return originalTypeName;
957961
}

client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/AbstractBinaryFormatReader.java

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryForm
7373
private Map[] convertions;
7474
private boolean hasNext = true;
7575
private boolean initialState = true; // reader is in initial state, no records have been read yet
76+
private long row = -1; // before first row
77+
private long lastNextCallTs; // for exception to detect slow reader
7678

7779
protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema,BinaryStreamReader.ByteBufferAllocator byteBufferAllocator, Map<ClickHouseDataType, Class<?>> defaultTypeHintMap) {
7880
this.input = inputStream;
@@ -92,6 +94,7 @@ protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings quer
9294
setSchema(schema);
9395
}
9496
this.dataTypeConverter = DataTypeConverter.INSTANCE; // singleton while no need to customize conversion
97+
this.lastNextCallTs = System.currentTimeMillis();
9598
}
9699

97100
protected Object[] currentRecord;
@@ -181,6 +184,7 @@ protected boolean readRecord(Object[] record) throws IOException {
181184
return false;
182185
}
183186

187+
row++;
184188
boolean firstColumn = true;
185189
for (int i = 0; i < columns.length; i++) {
186190
try {
@@ -191,12 +195,12 @@ protected boolean readRecord(Object[] record) throws IOException {
191195
record[i] = null;
192196
}
193197
firstColumn = false;
194-
} catch (EOFException e) {
195-
if (firstColumn) {
198+
} catch (IOException e) {
199+
if (e instanceof EOFException && firstColumn) {
196200
endReached();
197201
return false;
198202
}
199-
throw e;
203+
throw new IOException(recordReadExceptionMsg(columns[i].getColumnIndexAndName()), e);
200204
}
201205
}
202206
return true;
@@ -238,35 +242,52 @@ protected void readNextRecord() {
238242
}
239243
} catch (IOException e) {
240244
endReached();
241-
throw new ClientException("Failed to read next row", e);
245+
throw new ClientException(recordReadExceptionMsg(), e);
242246
}
243247
}
244248

249+
private long timeSinceLastNext() {
250+
return System.currentTimeMillis() - lastNextCallTs;
251+
}
252+
253+
private String recordReadExceptionMsg() {
254+
return recordReadExceptionMsg(null);
255+
}
256+
257+
private String recordReadExceptionMsg(String column) {
258+
return "Reading " + (column != null ? "column " + column + " in " : "")
259+
+ " row " + (row + 1) + " (time since last next call " + timeSinceLastNext() + ")";
260+
}
261+
245262
@Override
246263
public Map<String, Object> next() {
247264
if (!hasNext) {
248265
return null;
249266
}
250267

251-
if (!nextRecordEmpty) {
252-
Object[] tmp = currentRecord;
253-
currentRecord = nextRecord;
254-
nextRecord = tmp;
255-
readNextRecord();
256-
return new RecordWrapper(currentRecord, schema);
257-
} else {
258-
try {
259-
if (readRecord(currentRecord)) {
260-
readNextRecord();
261-
return new RecordWrapper(currentRecord, schema);
262-
} else {
263-
currentRecord = null;
264-
return null;
268+
try {
269+
if (!nextRecordEmpty) {
270+
Object[] tmp = currentRecord;
271+
currentRecord = nextRecord;
272+
nextRecord = tmp;
273+
readNextRecord();
274+
return new RecordWrapper(currentRecord, schema);
275+
} else {
276+
try {
277+
if (readRecord(currentRecord)) {
278+
readNextRecord();
279+
return new RecordWrapper(currentRecord, schema);
280+
} else {
281+
currentRecord = null;
282+
return null;
283+
}
284+
} catch (IOException e) {
285+
endReached();
286+
throw new ClientException(recordReadExceptionMsg(), e);
265287
}
266-
} catch (IOException e) {
267-
endReached();
268-
throw new ClientException("Failed to read row", e);
269288
}
289+
} finally {
290+
lastNextCallTs = System.currentTimeMillis();
270291
}
271292
}
272293

client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java

Lines changed: 104 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.hc.client5.http.config.ConnectionConfig;
2323
import org.apache.hc.client5.http.config.RequestConfig;
2424
import org.apache.hc.client5.http.entity.mime.MultipartEntityBuilder;
25-
import org.apache.hc.client5.http.entity.mime.MultipartPartBuilder;
2625
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
2726
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
2827
import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager;
@@ -86,24 +85,24 @@
8685
import java.net.UnknownHostException;
8786
import java.nio.charset.StandardCharsets;
8887
import java.security.NoSuchAlgorithmException;
88+
import java.util.Arrays;
8989
import java.util.Base64;
9090
import java.util.Collection;
9191
import java.util.Collections;
92+
import java.util.HashMap;
9293
import java.util.HashSet;
9394
import java.util.List;
9495
import java.util.Map;
9596
import java.util.Objects;
96-
import java.util.Optional;
9797
import java.util.Properties;
98-
import java.util.Arrays;
99-
import java.util.HashMap;
10098
import java.util.Set;
10199
import java.util.concurrent.ConcurrentLinkedQueue;
102100
import java.util.concurrent.TimeUnit;
103101
import java.util.concurrent.atomic.AtomicLong;
104102
import java.util.function.BiConsumer;
105103
import java.util.function.Function;
106104
import java.util.regex.Pattern;
105+
import java.util.stream.Stream;
107106

108107
public class HttpAPIClientHelper {
109108

@@ -341,85 +340,130 @@ public CloseableHttpClient createHttpClient(boolean initSslContext, Map<String,
341340
return clientBuilder.build();
342341
}
343342

344-
// private static final String ERROR_CODE_PREFIX_PATTERN = "Code: %d. DB::Exception:";
345343
private static final String ERROR_CODE_PREFIX_PATTERN = "%d. DB::Exception:";
346344

347-
348345
/**
349346
* Reads status line and if error tries to parse response body to get server error message.
350347
*
351348
* @param httpResponse - HTTP response
352349
* @return exception object with server code
353350
*/
354-
public Exception readError(ClassicHttpResponse httpResponse) {
355-
final Header qIdHeader = httpResponse.getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID);
356-
final String queryId = qIdHeader == null ? "" : qIdHeader.getValue();
351+
public Exception readError(HttpPost req, ClassicHttpResponse httpResponse) {
352+
final Header serverQueryIdHeader = httpResponse.getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID);
353+
final Header clientQueryIdHeader = req.getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID);
354+
final Header queryHeader = Stream.of(serverQueryIdHeader, clientQueryIdHeader).filter(Objects::nonNull).findFirst().orElse(null);
355+
final String queryId = queryHeader == null ? "" : queryHeader.getValue();
357356
int serverCode = getHeaderInt(httpResponse.getFirstHeader(ClickHouseHttpProto.HEADER_EXCEPTION_CODE), 0);
358-
InputStream body = null;
359357
try {
360-
body = httpResponse.getEntity().getContent();
361-
byte[] buffer = new byte[ERROR_BODY_BUFFER_SIZE];
362-
byte[] lookUpStr = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode).getBytes(StandardCharsets.UTF_8);
363-
StringBuilder msgBuilder = new StringBuilder();
364-
boolean found = false;
365-
while (true) {
366-
int rBytes = -1;
367-
try {
368-
rBytes = body.read(buffer);
369-
} catch (ClientException e) {
370-
// Invalid LZ4 Magic
371-
if (body instanceof ClickHouseLZ4InputStream) {
372-
ClickHouseLZ4InputStream stream = (ClickHouseLZ4InputStream) body;
373-
body = stream.getInputStream();
374-
byte[] headerBuffer = stream.getHeaderBuffer();
375-
System.arraycopy(headerBuffer, 0, buffer, 0, headerBuffer.length);
376-
rBytes = headerBuffer.length;
358+
return serverCode > 0 ? readClickHouseError(httpResponse.getEntity(), serverCode, queryId, httpResponse.getCode()) :
359+
readNotClickHouseError(httpResponse.getEntity(), queryId, httpResponse.getCode());
360+
} catch (Exception e) {
361+
LOG.error("Failed to read error message", e);
362+
String msg = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode) + " <Unreadable error message> (transport error: " + httpResponse.getCode() + ")";
363+
return new ServerException(serverCode, msg + " (queryId= " + queryId + ")", httpResponse.getCode(), queryId);
364+
}
365+
}
366+
367+
private ServerException readNotClickHouseError(HttpEntity httpEntity, String queryId, int httpCode) {
368+
369+
byte[] buffer = new byte[ERROR_BODY_BUFFER_SIZE];
370+
371+
String msg = null;
372+
InputStream body = null;
373+
int offset = 0;
374+
for (int i = 0; i < 2; i++) {
375+
try {
376+
if (body == null) {
377+
body = httpEntity.getContent();
378+
}
379+
int msgLen = body.read(buffer, offset, buffer.length - offset);
380+
if (msgLen > 0) {
381+
msg = new String(buffer, 0, msgLen, StandardCharsets.UTF_8).trim();
382+
if (msg.isEmpty()) {
383+
msg = "<empty body response>";
377384
}
378385
}
379-
if (rBytes == -1) {
380-
break;
386+
break;
387+
} catch (ClientException e) {
388+
// Invalid LZ4 Magic
389+
if (body instanceof ClickHouseLZ4InputStream) {
390+
ClickHouseLZ4InputStream stream = (ClickHouseLZ4InputStream) body;
391+
body = stream.getInputStream();
392+
byte[] lzHeader = stream.getHeaderBuffer(); // Here is read part of original body
393+
offset = Math.min(lzHeader.length, buffer.length);
394+
System.arraycopy(lzHeader, 0, buffer, 0, offset);
395+
continue;
396+
}
397+
throw e;
398+
} catch (Exception e) {
399+
LOG.warn("Failed to read error message (queryId = " + queryId + ")", e);
400+
break;
401+
}
402+
}
403+
404+
String errormsg = msg == null ? "unknown server error" : msg;
405+
return new ServerException(ServerException.CODE_UNKNOWN, errormsg + " (transport error: " + httpCode +")", httpCode, queryId);
406+
}
407+
408+
private static ServerException readClickHouseError(HttpEntity httpEntity, int serverCode, String queryId, int httpCode) throws Exception {
409+
InputStream body = httpEntity.getContent();
410+
byte[] buffer = new byte[ERROR_BODY_BUFFER_SIZE];
411+
byte[] lookUpStr = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode).getBytes(StandardCharsets.UTF_8);
412+
StringBuilder msgBuilder = new StringBuilder();
413+
boolean found = false;
414+
while (true) {
415+
int rBytes = -1;
416+
try {
417+
rBytes = body.read(buffer);
418+
} catch (ClientException e) {
419+
// Invalid LZ4 Magic
420+
if (body instanceof ClickHouseLZ4InputStream) {
421+
ClickHouseLZ4InputStream stream = (ClickHouseLZ4InputStream) body;
422+
body = stream.getInputStream();
423+
byte[] headerBuffer = stream.getHeaderBuffer();
424+
System.arraycopy(headerBuffer, 0, buffer, 0, headerBuffer.length);
425+
rBytes = headerBuffer.length;
381426
}
427+
}
428+
if (rBytes == -1) {
429+
break;
430+
}
382431

383-
for (int i = 0; i < rBytes; i++) {
384-
if (buffer[i] == lookUpStr[0]) {
385-
found = true;
386-
for (int j = 1; j < Math.min(rBytes - i, lookUpStr.length); j++) {
387-
if (buffer[i + j] != lookUpStr[j]) {
388-
found = false;
389-
break;
390-
}
391-
}
392-
if (found) {
393-
msgBuilder.append(new String(buffer, i, rBytes - i, StandardCharsets.UTF_8));
432+
for (int i = 0; i < rBytes; i++) {
433+
if (buffer[i] == lookUpStr[0]) {
434+
found = true;
435+
for (int j = 1; j < Math.min(rBytes - i, lookUpStr.length); j++) {
436+
if (buffer[i + j] != lookUpStr[j]) {
437+
found = false;
394438
break;
395439
}
396440
}
397-
}
398-
399-
if (found) {
400-
break;
441+
if (found) {
442+
msgBuilder.append(new String(buffer, i, rBytes - i, StandardCharsets.UTF_8));
443+
break;
444+
}
401445
}
402446
}
403447

404-
while (true) {
405-
int rBytes = body.read(buffer);
406-
if (rBytes == -1) {
407-
break;
408-
}
409-
msgBuilder.append(new String(buffer, 0, rBytes, StandardCharsets.UTF_8));
448+
if (found) {
449+
break;
410450
}
451+
}
411452

412-
String msg = msgBuilder.toString().replaceAll("\\s+", " ").replaceAll("\\\\n", " ")
413-
.replaceAll("\\\\/", "/");
414-
if (msg.trim().isEmpty()) {
415-
msg = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode) + " <Unreadable error message> (transport error: " + httpResponse.getCode() + ")";
453+
while (true) {
454+
int rBytes = body.read(buffer);
455+
if (rBytes == -1) {
456+
break;
416457
}
417-
return new ServerException(serverCode, "Code: " + msg + " (queryId= " + queryId + ")", httpResponse.getCode(), queryId);
418-
} catch (Exception e) {
419-
LOG.error("Failed to read error message", e);
420-
String msg = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode) + " <Unreadable error message> (transport error: " + httpResponse.getCode() + ")";
421-
return new ServerException(serverCode, msg + " (queryId= " + queryId + ")", httpResponse.getCode(), queryId);
458+
msgBuilder.append(new String(buffer, 0, rBytes, StandardCharsets.UTF_8));
459+
}
460+
461+
String msg = msgBuilder.toString().replaceAll("\\s+", " ").replaceAll("\\\\n", " ")
462+
.replaceAll("\\\\/", "/");
463+
if (msg.trim().isEmpty()) {
464+
msg = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode) + " <Unreadable error message> (transport error: " + httpCode + ")";
422465
}
466+
return new ServerException(serverCode, "Code: " + msg + " (queryId= " + queryId + ")", httpCode, queryId);
423467
}
424468

425469
private static final long POOL_VENT_TIMEOUT = 10000L;
@@ -536,7 +580,7 @@ private ClassicHttpResponse doPostRequest(Map<String, Object> requestConfig, Htt
536580
throw new ClientException("Server returned '502 Bad gateway'. Check network and proxy settings.");
537581
} else if (httpResponse.getCode() >= HttpStatus.SC_BAD_REQUEST || httpResponse.containsHeader(ClickHouseHttpProto.HEADER_EXCEPTION_CODE)) {
538582
try {
539-
throw readError(httpResponse);
583+
throw readError(req, httpResponse);
540584
} finally {
541585
httpResponse.close();
542586
}

0 commit comments

Comments
 (0)