Skip to content

Commit f23dbe4

Browse files
committed
Move qkafka to bb-plugins
Log-triggers should only fire on TXN_APPLY Include librdkafka-dev in Dockerfile.install Signed-off-by: Mark Hannum <[email protected]>
1 parent 2dcd065 commit f23dbe4

File tree

7 files changed

+4
-157
lines changed

7 files changed

+4
-157
lines changed

Diff for: CMakeLists.txt

+1-11
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ find_package(OpenSSL REQUIRED)
6565
set(PROTOBUF_C_MIN_VERSION 1.1.0)
6666
find_package(Protobuf_C ${PROTOBUF_C_MIN_VERSION} REQUIRED)
6767
find_package(LibEvent REQUIRED)
68+
find_package(RdKafka REQUIRED)
6869
find_package(ZLIB REQUIRED)
6970
if (NOT ${CMAKE_SYSTEM_NAME} STREQUAL Darwin)
7071
find_package(UUID REQUIRED)
@@ -73,17 +74,6 @@ if(${CMAKE_SYSTEM_NAME} STREQUAL Linux OR ${CMAKE_SYSTEM_NAME} STREQUAL Darwin)
7374
find_package(Unwind REQUIRED)
7475
endif()
7576

76-
option(WITH_RDKAFKA "Turn ON to compile with Kafka Publish" OFF)
77-
if(WITH_RDKAFKA)
78-
find_package(RdKafka REQUIRED)
79-
add_definitions(-DWITH_RDKAFKA)
80-
endif()
81-
82-
option(WITH_QKAFKA "Turn ON to allow queue-adds published to Kafka" OFF)
83-
if(WITH_QKAFKA)
84-
find_package(RdKafka REQUIRED)
85-
add_definitions(-DWITH_QKAFKA)
86-
endif()
8777

8878
option(COMDB2_LEGACY_DEFAULTS "Legacy defaults without lrl override" OFF)
8979

Diff for: bdb/log_queue_dump.c

-106
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,6 @@
33
#include <translistener.h>
44
#include <fsnapf.h>
55

6-
#ifdef WITH_QKAFKA
7-
8-
#include "librdkafka/rdkafka.h"
9-
10-
#endif
11-
126
#define copy(dest, type, src, conv) \
137
dest = conv(*(type *)src); \
148
src += sizeof(dest)
@@ -314,103 +308,3 @@ void register_dump_qtrigger(const char *filename, bdb_state_type *(*gethndl)(con
314308
register_logqueue_trigger(filename, gethndl, dump_qtrigger, f, maxsz, LOGQTRIGGER_PUSH);
315309
}
316310

317-
#ifdef WITH_QKAFKA
318-
319-
extern char *gbl_kafka_brokers;
320-
321-
/* Outgoing kafka buffer */
322-
struct kafka_log_queue {
323-
long long seq;
324-
uint64_t genid;
325-
uint8_t key[12];
326-
size_t dtalen;
327-
uint8_t data[1];
328-
};
329-
330-
/* Kafka state */
331-
struct kafka_state {
332-
char *topic;
333-
rd_kafka_topic_t *rkt_p;
334-
rd_kafka_t *rk_p;
335-
uint64_t bytes_written;
336-
};
337-
338-
static void write_kafka(bdb_state_type *bdb_state, const DB_LSN *commit_lsn, const char *filename, const DBT *key,
339-
const DBT *data, void *userptr)
340-
{
341-
struct kafka_state *k = userptr;
342-
size_t fnddtalen = 0, fnddtaoff = 0;
343-
int consumer;
344-
long long seq = 0;
345-
uint64_t genid = 0;
346-
struct bdb_queue_found *fnd = {0};
347-
348-
int rc = bdb_queuedb_trigger_unpack(bdb_state, key, data, &consumer, &genid, &fnd, &fnddtalen, &fnddtaoff, &seq);
349-
if (rc != 0) {
350-
logmsg(LOGMSG_ERROR, "%s failed to unpack record for %s rc=%d\n", __func__, filename, rc);
351-
return;
352-
}
353-
354-
assert(key->size == 12);
355-
356-
struct kafka_log_queue *kq =
357-
(struct kafka_log_queue *)malloc(offsetof(struct kafka_log_queue, data) + fnd->data_len);
358-
kq->seq = seq;
359-
kq->genid = genid;
360-
memcpy(kq->key, key->data, 12);
361-
kq->dtalen = fnd->data_len;
362-
uint8_t *payload = ((uint8_t *)fnd) + fnd->data_offset;
363-
memcpy(kq->data, payload, fnd->data_len);
364-
365-
rc = rd_kafka_produce(k->rkt_p, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, (void *)kq,
366-
offsetof(struct kafka_log_queue, data) + fnd->data_len, NULL, 0, NULL);
367-
368-
free(kq);
369-
if (rc == -1) {
370-
logmsg(LOGMSG_ERROR, "%s Failed to produce to topic %s: %s\n", __func__, rd_kafka_topic_name(k->rkt_p),
371-
rd_kafka_err2str(rd_kafka_last_error()));
372-
}
373-
}
374-
375-
int register_queue_kafka(const char *filename, const char *kafka_topic, bdb_state_type *(*gethndl)(const char *q),
376-
int maxsz)
377-
{
378-
rd_kafka_conf_t *conf;
379-
rd_kafka_t *rk_p;
380-
rd_kafka_topic_t *rkt_p;
381-
char errstr[512];
382-
383-
if (!kafka_topic || !gbl_kafka_brokers) {
384-
logmsg(LOGMSG_ERROR, "%s kafka topic or broker not set\n", __func__);
385-
return -1;
386-
}
387-
conf = rd_kafka_conf_new();
388-
if (rd_kafka_conf_set(conf, "bootstrap.servers", gbl_kafka_brokers, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
389-
logmsg(LOGMSG_ERROR, "%s rd_kafka_conf_set error: %s\n", __func__, errstr);
390-
return -1;
391-
}
392-
393-
rk_p = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
394-
if (!rk_p) {
395-
logmsg(LOGMSG_ERROR, "%s rd_kafka_new error: %s\n", __func__, errstr);
396-
return -1;
397-
}
398-
399-
rkt_p = rd_kafka_topic_new(rk_p, kafka_topic, NULL);
400-
if (!rkt_p) {
401-
logmsg(LOGMSG_ERROR, "%s rd_kafka_topic_new error: %s\n", __func__, rd_kafka_err2str(rd_kafka_last_error()));
402-
return -1;
403-
}
404-
405-
struct kafka_state *kstate = calloc(sizeof(struct kafka_state), 1);
406-
kstate->topic = strdup(kafka_topic);
407-
kstate->rkt_p = rkt_p;
408-
kstate->rk_p = rk_p;
409-
register_logqueue_trigger(filename, gethndl, write_kafka, kstate, maxsz,
410-
LOGQTRIGGER_PUSH | LOGQTRIGGER_MASTER_ONLY);
411-
logmsg(LOGMSG_USER, "Registered kafka-qwrite for %s -> %s\n", filename, kafka_topic);
412-
413-
return 0;
414-
}
415-
416-
#endif

Diff for: bdb/log_queue_trigger.h

-7
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,4 @@ int logqueue_trigger_get(void *qfile, DBT *key, DBT *data, int timeoutms);
1818
void register_dump_qtrigger(const char *filename, bdb_state_type *(*gethndl)(const char *q), const char *outfile,
1919
int maxsz);
2020

21-
#ifdef WITH_QKAFKA
22-
23-
int register_queue_kafka(const char *filename, const char *kafka_topic, bdb_state_type *(*gethndl)(const char *q),
24-
int maxsz);
25-
26-
#endif
27-
2821
#endif

Diff for: berkdb/db/trigger_subscription.c

-3
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,6 @@
44
#include "dbinc/trigger_subscription.h"
55
#include <sys_wrap.h>
66

7-
#include <mem_berkdb.h>
8-
#include <mem_override.h>
9-
107
/*
118
* Maintain mapping of qdb name and its signaling mechanism.
129
* This needs a simple hash table (name -> pthread_cond_t).

Diff for: berkdb/dbinc/db_am.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ extern __thread DB_LSN commit_lsn;
5555
} \
5656
if (argp->type > 3000 || (argp->type > 1000 && argp->type < 2000)) { \
5757
ret = __ufid_to_db(dbenv, argp->txnid, &file_dbp, argp->ufid_fileid, &log_trigger, lsnp); \
58-
if ((ret == 0 || ret == DB_DELETED || ret == DB_IGNORED) && log_trigger != NULL && dbenv->rep_log_trigger_cb) { \
58+
if ((ret == 0 || ret == DB_DELETED || ret == DB_IGNORED) && op == DB_TXN_APPLY && log_trigger != NULL && dbenv->rep_log_trigger_cb) { \
5959
char *fname = NULL; \
6060
__ufid_to_fname(dbenv, &fname, argp->ufid_fileid); \
6161
dbenv->rep_log_trigger_cb(log_trigger, lsnp, &commit_lsn, fname, argp->type, argp); \
@@ -116,7 +116,7 @@ int __log_flush(DB_ENV *dbenv, const DB_LSN *);
116116
ret = __dbreg_id_to_db(dbenv, argp->txnid, \
117117
&file_dbp, argp->fileid, inc_count, lsnp, 0); \
118118
} \
119-
if ((ret == 0 || ret == DB_IGNORED || ret == DB_DELETED ) && log_trigger != NULL && dbenv->rep_log_trigger_cb) { \
119+
if ((ret == 0 || ret == DB_IGNORED || ret == DB_DELETED ) && op == DB_TXN_APPLY && log_trigger != NULL && dbenv->rep_log_trigger_cb) { \
120120
char *fname = NULL; \
121121
__ufid_to_fname(dbenv, &fname, argp->ufid_fileid); \
122122
dbenv->rep_log_trigger_cb(log_trigger, lsnp, &commit_lsn, fname, argp->type, argp); \

Diff for: db/config.c

-28
Original file line numberDiff line numberDiff line change
@@ -1534,34 +1534,6 @@ static int read_lrl_option(struct dbenv *dbenv, char *line,
15341534
logmsg(LOGMSG_USER, "Adding qdump-trigger for %s -> %s max-queue %d\n", bdbq, outfile, maxsz);
15351535
register_dump_qtrigger(bdbq, queuehndl, outfile, maxsz);
15361536

1537-
#ifdef WITH_QKAFKA
1538-
} else if (tokcmp(tok, ltok, "qkafka") == 0) {
1539-
if ((tok = segtok(line, len, &st, &ltok)) == NULL || ltok <= 0) {
1540-
logmsg(LOGMSG_ERROR, "qkafka <qname> <topic> <maxsz>\n");
1541-
return -1;
1542-
}
1543-
char *queue = alloca(ltok + 1);
1544-
tokcpy(tok, ltok, queue);
1545-
char *bdbq = alloca(ltok + 4);
1546-
sprintf(bdbq, "%s%s", Q_TAG, queue);
1547-
1548-
if ((tok = segtok(line, len, &st, &ltok)) == NULL || ltok <= 0) {
1549-
logmsg(LOGMSG_ERROR, "Expected kafka topic\n");
1550-
return -1;
1551-
}
1552-
1553-
char *topic = alloca(ltok + 1);
1554-
tokcpy(tok, ltok, topic);
1555-
1556-
if ((tok = segtok(line, len, &st, &ltok)) == NULL || ltok <= 0) {
1557-
logmsg(LOGMSG_ERROR, "Expected max queue-size\n");
1558-
return -1;
1559-
}
1560-
int maxsz = toknum(tok, ltok);
1561-
1562-
logmsg(LOGMSG_USER, "Adding qkafka trigger for %s -> %s maxsz %d\n", bdbq, topic, maxsz);
1563-
register_queue_kafka(bdbq, topic, queuehndl, maxsz);
1564-
#endif
15651537
} else if (tokcmp(tok, ltok, "replicate_from") == 0) {
15661538
/* 'replicate_from <dbname> <prod|beta|alpha|dev|host|@hst1,hst2,hst3..>' */
15671539
tok = segtok(line, len, &st, &ltok);

Diff for: tests/docker/Dockerfile.install

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ RUN apt-get update && \
1919
liblz4-tool \
2020
libprotobuf-c1 \
2121
libprotobuf-c-dev \
22+
librdkafka-dev \
2223
libreadline-dev \
2324
libsqlite3-0 \
2425
libsqlite3-dev \

0 commit comments

Comments
 (0)