Skip to content

Commit e685f7e

Browse files
committed
Add support for fatal errors
1 parent f4fdfca commit e685f7e

File tree

1 file changed

+16
-0
lines changed

1 file changed

+16
-0
lines changed

kafkacat.c

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1085,7 +1085,23 @@ static void term (int sig) {
10851085
*/
10861086
static void error_cb (rd_kafka_t *rk, int err,
10871087
const char *reason, void *opaque) {
1088+
#if RD_KAFKA_VERSION >= 0x01000000
1089+
if (err == RD_KAFKA_RESP_ERR__FATAL) {
1090+
/* A fatal error has been raised, extract the
1091+
* underlying error, and start graceful termination -
1092+
* this to make sure producer delivery reports are
1093+
* handled before exiting. */
1094+
char fatal_errstr[512];
1095+
rd_kafka_resp_err_t fatal_err;
1096+
1097+
fatal_err = rd_kafka_fatal_error(rk, fatal_errstr,
1098+
sizeof(fatal_errstr));
1099+
KC_INFO(0, "FATAL CLIENT ERROR: %s: %s: terminating\n",
1100+
rd_kafka_err2str(fatal_err), fatal_errstr);
1101+
conf.run = 0;
10881102

1103+
} else
1104+
#endif
10891105
if (err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN) {
10901106
KC_ERROR("%s: %s", rd_kafka_err2str(err),
10911107
reason ? reason : "");

0 commit comments

Comments
 (0)