Skip to content

Commit 5777390

Browse files
committed
Added -O option: print offsets
1 parent 83205d9 commit 5777390

File tree

1 file changed

+13
-2
lines changed

1 file changed

+13
-2
lines changed

kafkacat.c

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ static struct conf {
5252
char mode;
5353
int flags;
5454
#define CONF_F_KEY_DELIM 0x2
55+
#define CONF_F_OFFSET 0x4 /* Print offsets */
5556
int delim;
5657
int key_delim;
5758
int msg_size;
@@ -76,6 +77,7 @@ static struct conf {
7677
.partition = RD_KAFKA_PARTITION_UA,
7778
.msg_size = 1024*1024,
7879
.delim = '\n',
80+
.key_delim = '\t',
7981
};
8082

8183

@@ -195,7 +197,8 @@ static ssize_t produce_file (const char *path) {
195197
return -1;
196198
}
197199

198-
INFO(4, "Producing file %s (%"PRIdMAX" bytes)\n", path, (intmax_t)st.st_size);
200+
INFO(4, "Producing file %s (%"PRIdMAX" bytes)\n",
201+
path, (intmax_t)st.st_size);
199202
produce(ptr, st.st_size, NULL, 0, RD_KAFKA_MSG_F_COPY);
200203

201204
munmap(ptr, st.st_size);
@@ -363,6 +366,10 @@ static void consume_cb (rd_kafka_message_t *rkmessage, void *opaque) {
363366
rd_kafka_message_errstr(rkmessage));
364367
}
365368

369+
/* Print offset (using key delim), if desired */
370+
if (conf.flags & CONF_F_OFFSET)
371+
fprintf(fp, "%"PRId64"%c", rkmessage->offset, conf.key_delim);
372+
366373
/* Print key, if desired */
367374
if (conf.flags & CONF_F_KEY_DELIM)
368375
fprintf(fp, "%.*s%c",
@@ -669,6 +676,7 @@ static void __attribute__((noreturn)) usage (const char *argv0, int exitcode,
669676
" -D <delim> Delimiter to separate messages on output\n"
670677
" -K <delim> Print message keys prefixing the message\n"
671678
" with specified delimiter.\n"
679+
" -O Print message offset using -K delimiter\n"
672680
" -c <cnt> Exit after consuming this number "
673681
"of messages\n"
674682
" -u Unbuffered output\n"
@@ -727,7 +735,7 @@ static void argparse (int argc, char **argv) {
727735
int opt;
728736

729737
while ((opt = getopt(argc, argv,
730-
"PCLt:p:b:z:o:eD:K:d:qvX:c:u")) != -1) {
738+
"PCLt:p:b:z:o:eD:K:Od:qvX:c:u")) != -1) {
731739
switch (opt) {
732740
case 'P':
733741
case 'C':
@@ -773,6 +781,9 @@ static void argparse (int argc, char **argv) {
773781
conf.key_delim = parse_delim(optarg);
774782
conf.flags |= CONF_F_KEY_DELIM;
775783
break;
784+
case 'O':
785+
conf.flags |= CONF_F_OFFSET;
786+
break;
776787
case 'c':
777788
conf.msg_cnt = strtoll(optarg, NULL, 10);
778789
break;

0 commit comments

Comments
 (0)