Skip to content

Move qkafka to bb-plugins / Log-triggers should only fire on TXN_APPLY #5103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,6 @@ if(${CMAKE_SYSTEM_NAME} STREQUAL Linux OR ${CMAKE_SYSTEM_NAME} STREQUAL Darwin)
find_package(Unwind REQUIRED)
endif()

option(WITH_RDKAFKA "Turn ON to compile with Kafka Publish" OFF)
if(WITH_RDKAFKA)
find_package(RdKafka REQUIRED)
add_definitions(-DWITH_RDKAFKA)
endif()

option(WITH_QKAFKA "Turn ON to allow queue-adds published to Kafka" OFF)
if(WITH_QKAFKA)
find_package(RdKafka REQUIRED)
add_definitions(-DWITH_QKAFKA)
endif()

option(COMDB2_LEGACY_DEFAULTS "Legacy defaults without lrl override" OFF)

Expand Down
106 changes: 0 additions & 106 deletions bdb/log_queue_dump.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,6 @@
#include <translistener.h>
#include <fsnapf.h>

#ifdef WITH_QKAFKA

#include "librdkafka/rdkafka.h"

#endif

#define copy(dest, type, src, conv) \
dest = conv(*(type *)src); \
src += sizeof(dest)
Expand Down Expand Up @@ -314,103 +308,3 @@ void register_dump_qtrigger(const char *filename, bdb_state_type *(*gethndl)(con
register_logqueue_trigger(filename, gethndl, dump_qtrigger, f, maxsz, LOGQTRIGGER_PUSH);
}

#ifdef WITH_QKAFKA

extern char *gbl_kafka_brokers;

/* Outgoing kafka buffer */
struct kafka_log_queue {
long long seq;
uint64_t genid;
uint8_t key[12];
size_t dtalen;
uint8_t data[1];
};

/* Kafka state */
struct kafka_state {
char *topic;
rd_kafka_topic_t *rkt_p;
rd_kafka_t *rk_p;
uint64_t bytes_written;
};

static void write_kafka(bdb_state_type *bdb_state, const DB_LSN *commit_lsn, const char *filename, const DBT *key,
const DBT *data, void *userptr)
{
struct kafka_state *k = userptr;
size_t fnddtalen = 0, fnddtaoff = 0;
int consumer;
long long seq = 0;
uint64_t genid = 0;
struct bdb_queue_found *fnd = {0};

int rc = bdb_queuedb_trigger_unpack(bdb_state, key, data, &consumer, &genid, &fnd, &fnddtalen, &fnddtaoff, &seq);
if (rc != 0) {
logmsg(LOGMSG_ERROR, "%s failed to unpack record for %s rc=%d\n", __func__, filename, rc);
return;
}

assert(key->size == 12);

struct kafka_log_queue *kq =
(struct kafka_log_queue *)malloc(offsetof(struct kafka_log_queue, data) + fnd->data_len);
kq->seq = seq;
kq->genid = genid;
memcpy(kq->key, key->data, 12);
kq->dtalen = fnd->data_len;
uint8_t *payload = ((uint8_t *)fnd) + fnd->data_offset;
memcpy(kq->data, payload, fnd->data_len);

rc = rd_kafka_produce(k->rkt_p, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, (void *)kq,
offsetof(struct kafka_log_queue, data) + fnd->data_len, NULL, 0, NULL);

free(kq);
if (rc == -1) {
logmsg(LOGMSG_ERROR, "%s Failed to produce to topic %s: %s\n", __func__, rd_kafka_topic_name(k->rkt_p),
rd_kafka_err2str(rd_kafka_last_error()));
}
}

int register_queue_kafka(const char *filename, const char *kafka_topic, bdb_state_type *(*gethndl)(const char *q),
int maxsz)
{
rd_kafka_conf_t *conf;
rd_kafka_t *rk_p;
rd_kafka_topic_t *rkt_p;
char errstr[512];

if (!kafka_topic || !gbl_kafka_brokers) {
logmsg(LOGMSG_ERROR, "%s kafka topic or broker not set\n", __func__);
return -1;
}
conf = rd_kafka_conf_new();
if (rd_kafka_conf_set(conf, "bootstrap.servers", gbl_kafka_brokers, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
logmsg(LOGMSG_ERROR, "%s rd_kafka_conf_set error: %s\n", __func__, errstr);
return -1;
}

rk_p = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!rk_p) {
logmsg(LOGMSG_ERROR, "%s rd_kafka_new error: %s\n", __func__, errstr);
return -1;
}

rkt_p = rd_kafka_topic_new(rk_p, kafka_topic, NULL);
if (!rkt_p) {
logmsg(LOGMSG_ERROR, "%s rd_kafka_topic_new error: %s\n", __func__, rd_kafka_err2str(rd_kafka_last_error()));
return -1;
}

struct kafka_state *kstate = calloc(sizeof(struct kafka_state), 1);
kstate->topic = strdup(kafka_topic);
kstate->rkt_p = rkt_p;
kstate->rk_p = rk_p;
register_logqueue_trigger(filename, gethndl, write_kafka, kstate, maxsz,
LOGQTRIGGER_PUSH | LOGQTRIGGER_MASTER_ONLY);
logmsg(LOGMSG_USER, "Registered kafka-qwrite for %s -> %s\n", filename, kafka_topic);

return 0;
}

#endif
7 changes: 0 additions & 7 deletions bdb/log_queue_trigger.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,4 @@ int logqueue_trigger_get(void *qfile, DBT *key, DBT *data, int timeoutms);
void register_dump_qtrigger(const char *filename, bdb_state_type *(*gethndl)(const char *q), const char *outfile,
int maxsz);

#ifdef WITH_QKAFKA

int register_queue_kafka(const char *filename, const char *kafka_topic, bdb_state_type *(*gethndl)(const char *q),
int maxsz);

#endif

#endif
3 changes: 0 additions & 3 deletions berkdb/db/trigger_subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@
#include "dbinc/trigger_subscription.h"
#include <sys_wrap.h>

#include <mem_berkdb.h>
#include <mem_override.h>

/*
* Maintain mapping of qdb name and its signaling mechanism.
* This needs a simple hash table (name -> pthread_cond_t).
Expand Down
4 changes: 2 additions & 2 deletions berkdb/dbinc/db_am.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ extern __thread DB_LSN commit_lsn;
} \
if (argp->type > 3000 || (argp->type > 1000 && argp->type < 2000)) { \
ret = __ufid_to_db(dbenv, argp->txnid, &file_dbp, argp->ufid_fileid, &log_trigger, lsnp); \
if ((ret == 0 || ret == DB_DELETED || ret == DB_IGNORED) && log_trigger != NULL && dbenv->rep_log_trigger_cb) { \
if ((ret == 0 || ret == DB_DELETED || ret == DB_IGNORED) && op == DB_TXN_APPLY && log_trigger != NULL && dbenv->rep_log_trigger_cb) { \
char *fname = NULL; \
__ufid_to_fname(dbenv, &fname, argp->ufid_fileid); \
dbenv->rep_log_trigger_cb(log_trigger, lsnp, &commit_lsn, fname, argp->type, argp); \
Expand Down Expand Up @@ -116,7 +116,7 @@ int __log_flush(DB_ENV *dbenv, const DB_LSN *);
ret = __dbreg_id_to_db(dbenv, argp->txnid, \
&file_dbp, argp->fileid, inc_count, lsnp, 0); \
} \
if ((ret == 0 || ret == DB_IGNORED || ret == DB_DELETED ) && log_trigger != NULL && dbenv->rep_log_trigger_cb) { \
if ((ret == 0 || ret == DB_IGNORED || ret == DB_DELETED ) && op == DB_TXN_APPLY && log_trigger != NULL && dbenv->rep_log_trigger_cb) { \
char *fname = NULL; \
__ufid_to_fname(dbenv, &fname, argp->ufid_fileid); \
dbenv->rep_log_trigger_cb(log_trigger, lsnp, &commit_lsn, fname, argp->type, argp); \
Expand Down
28 changes: 0 additions & 28 deletions db/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -1534,34 +1534,6 @@ static int read_lrl_option(struct dbenv *dbenv, char *line,
logmsg(LOGMSG_USER, "Adding qdump-trigger for %s -> %s max-queue %d\n", bdbq, outfile, maxsz);
register_dump_qtrigger(bdbq, queuehndl, outfile, maxsz);

#ifdef WITH_QKAFKA
} else if (tokcmp(tok, ltok, "qkafka") == 0) {
if ((tok = segtok(line, len, &st, &ltok)) == NULL || ltok <= 0) {
logmsg(LOGMSG_ERROR, "qkafka <qname> <topic> <maxsz>\n");
return -1;
}
char *queue = alloca(ltok + 1);
tokcpy(tok, ltok, queue);
char *bdbq = alloca(ltok + 4);
sprintf(bdbq, "%s%s", Q_TAG, queue);

if ((tok = segtok(line, len, &st, &ltok)) == NULL || ltok <= 0) {
logmsg(LOGMSG_ERROR, "Expected kafka topic\n");
return -1;
}

char *topic = alloca(ltok + 1);
tokcpy(tok, ltok, topic);

if ((tok = segtok(line, len, &st, &ltok)) == NULL || ltok <= 0) {
logmsg(LOGMSG_ERROR, "Expected max queue-size\n");
return -1;
}
int maxsz = toknum(tok, ltok);

logmsg(LOGMSG_USER, "Adding qkafka trigger for %s -> %s maxsz %d\n", bdbq, topic, maxsz);
register_queue_kafka(bdbq, topic, queuehndl, maxsz);
#endif
} else if (tokcmp(tok, ltok, "replicate_from") == 0) {
/* 'replicate_from <dbname> <prod|beta|alpha|dev|host|@hst1,hst2,hst3..>' */
tok = segtok(line, len, &st, &ltok);
Expand Down