Skip to content

comdb2_oplog: Preserving seqno across truncates #5097

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions bdb/bdb_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -1729,6 +1729,11 @@ int bdb_llmeta_get_schema_versions(tran_type *t, schema_version_row **data, int
int bdb_del_schema_change_history(tran_type *t, const char *tablename,
uint64_t seed);

/* llmeta entry for comdb2_oplog seqno */
int bdb_get_seqno(tran_type *t, int64_t *seqno);
int bdb_set_seqno(tran_type *t, int64_t seqno);
int bdb_del_seqno(tran_type *t);

typedef struct {
uint64_t genid;
unsigned int file;
Expand Down
49 changes: 49 additions & 0 deletions bdb/llmeta.c
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ typedef enum {
LLMETA_SCHEMACHANGE_STATUS_V2 = 56,
LLMETA_SCHEMACHANGE_LIST = 57, /* list of all sc-s in a uuid txh */
LLMETA_SCHEMACHANGE_STATUS_PROTOBUF = 58, /* Indicate protobuf sc */
LLMETA_MAX_SEQNO = 59,
} llmetakey_t;

struct llmeta_file_type_key {
Expand Down Expand Up @@ -7354,6 +7355,9 @@ int bdb_llmeta_print_record(bdb_state_type *bdb_state, void *key, int keylen,
if (osql_scl_print((uint8_t*)p_buf_key, p_buf_end_key, (uint8_t*)p_buf_data, p_buf_end_data))
logmsg(LOGMSG_USER, " failed to deserialize object\n");
} break;
case LLMETA_MAX_SEQNO:
logmsg(LOGMSG_USER, "LLMETA_MAX_SEQNO: %ld\n", flibc_ntohll(*(int64_t *)p_buf_data));
break;
default:
logmsg(LOGMSG_USER, "Todo (type=%d)\n", type);
break;
Expand Down Expand Up @@ -11372,3 +11376,48 @@ void *buf_get_schemachange(struct schema_change_type *s, void *p_buf, void *p_bu
}
return NULL;
}

typedef union llmeta_seqno_key {
int file_type;
uint8_t buf[LLMETA_IXLEN];
} llmeta_seqno_key;

int bdb_get_seqno(tran_type *t, int64_t *seqno)
{
llmeta_seqno_key k = {0};
k.file_type = htonl(LLMETA_MAX_SEQNO);
int64_t **v;
int n;
int rc;
int bdberr;

rc = kv_get(t, &k, sizeof(llmeta_seqno_key), (void ***)&v, &n, &bdberr);
if (rc == 0)
*seqno = n ? (flibc_ntohll(*v[0]) + 1) : 1;
else
logmsg(LOGMSG_WARN, "%s: kv_get rc %d bdberr %d\n", __func__, rc, bdberr);
return rc;
}

int bdb_set_seqno(tran_type *t, int64_t seqno)
{
int rc, bdberr;
llmeta_seqno_key k = {0};
k.file_type = htonl(LLMETA_MAX_SEQNO);
seqno = flibc_htonll(seqno);
rc = kv_put(t, &k, &seqno, sizeof(int64_t), &bdberr);
if (rc != 0)
logmsg(LOGMSG_WARN, "%s: kv_put rc %d bdberr %d\n", __func__, rc, bdberr);
return rc;
}

int bdb_del_seqno(tran_type *t)
{
int rc, bdberr;
llmeta_seqno_key k = {0};
k.file_type = htonl(LLMETA_MAX_SEQNO);
rc = kv_del(t, &k, &bdberr);
if (rc != 0)
logmsg(LOGMSG_WARN, "%s: kv_del rc %d bdberr %d\n", __func__, rc, bdberr);
return rc;
}
2 changes: 1 addition & 1 deletion db/comdb2.h
Original file line number Diff line number Diff line change
Expand Up @@ -2979,7 +2979,7 @@ long long get_unique_longlong(struct dbenv *env);
void block_new_requests(struct dbenv *dbenv);
void allow_new_requests(struct dbenv *dbenv);

int get_next_seqno(void *tran, long long *seqno);
int get_next_seqno(void *tran, void *sc_tran, long long *seqno);
int add_oplog_entry(struct ireq *iq, void *trans, int type, void *logrec,
int logsz);
int local_replicant_write_clear(struct ireq *in_iq, void *in_trans,
Expand Down
1 change: 1 addition & 0 deletions db/db_tunables.c
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,7 @@ extern int gbl_sc_status_max_rows;
extern int gbl_rep_process_pstack_time;
extern int gbl_sql_recover_time;
extern int gbl_legacy_requests_verbose;
extern int gbl_comdb2_oplog_preserve_seqno;

extern void set_snapshot_impl(snap_impl_enum impl);
extern const char *snap_impl_str(snap_impl_enum impl);
Expand Down
1 change: 1 addition & 0 deletions db/db_tunables.h
Original file line number Diff line number Diff line change
Expand Up @@ -2514,4 +2514,5 @@ REGISTER_TUNABLE("genshard_verbose",
TUNABLE_BOOLEAN, &gbl_gen_shard_verbose, 0, NULL, NULL, NULL,
NULL);
REGISTER_TUNABLE("legacy_verbose", "Log all legacy (opcode+old sql) requests (default: off)", TUNABLE_BOOLEAN, &gbl_legacy_requests_verbose, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("comdb2_oplog_preserve_seqno", "Preserve max value of the seqno in llmeta", TUNABLE_BOOLEAN, &gbl_comdb2_oplog_preserve_seqno, INTERNAL, NULL, NULL, NULL, NULL);
#endif /* _DB_TUNABLES_H */
2 changes: 1 addition & 1 deletion db/localrep.c
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ int local_replicant_write_clear(struct ireq *in_iq, void *in_trans,
goto done;
}

rc = get_next_seqno(trans, &seqno);
rc = get_next_seqno(trans, NULL, &seqno);
if (rc) {
if (rc != RC_INTERNAL_RETRY) {
printf("get_next_seqno unexpected rc %d\n", rc);
Expand Down
11 changes: 11 additions & 0 deletions db/osqlcomm.c
Original file line number Diff line number Diff line change
Expand Up @@ -6894,6 +6894,8 @@ int osql_set_usedb(struct ireq *iq, const char *tablename, int tableversion, int
return 0;
}

int gbl_comdb2_oplog_preserve_seqno = 1;

/**
* Handle the finalize part of a chain of schema changes
*
Expand Down Expand Up @@ -6925,6 +6927,15 @@ int osql_finalize_scs(struct ireq *iq, tran_type *trans)
iq->usedb = iq->sc->db;
assert(iq->sc->nothrevent);

if (gbl_comdb2_oplog_preserve_seqno &&
IS_FASTINIT(iq->sc) &&
gbl_replicate_local &&
strcasecmp(iq->usedb->tablename, "comdb2_oplog") == 0) {
long long seqno;
if (get_next_seqno(trans, iq->sc_tran, &seqno) == 0)
bdb_set_seqno(iq->sc_tran, seqno);
}

rc = finalize_schema_change(iq, iq->sc_tran);
iq->usedb = NULL;
if (rc != SC_OK) {
Expand Down
2 changes: 2 additions & 0 deletions db/process_message.c
Original file line number Diff line number Diff line change
Expand Up @@ -5478,6 +5478,8 @@ int process_command(struct dbenv *dbenv, char *line, int lline, int st)
}
} else if (tokcmp(tok, ltok, "do_not_use_modsnap_for_snapshot") == 0) {
gbl_use_modsnap_for_snapshot = 0;
} else if (tokcmp(tok, ltok, "del_llmeta_comdb2_seqno") == 0) {
bdb_del_seqno(NULL);
} else {
// see if any plugins know how to handle this
struct message_handler *h;
Expand Down
2 changes: 1 addition & 1 deletion db/sqlanalyze.c
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ static int local_replicate_write_analyze(char *table)
useqno = bdb_get_timestamp(thedb->bdb_env);
memcpy(&seqno, &useqno, sizeof(seqno));
} else
rc = get_next_seqno(trans, &seqno);
rc = get_next_seqno(trans, NULL, &seqno);
if (rc) {
if (rc != RC_INTERNAL_RETRY) {
logmsg(LOGMSG_ERROR, "get_next_seqno unexpected rc %d\n", rc);
Expand Down
17 changes: 9 additions & 8 deletions db/toblock.c
Original file line number Diff line number Diff line change
Expand Up @@ -2725,7 +2725,7 @@ static int localrep_seqno(tran_type *trans, block_state_t *p_blkstate)
memcpy(&p_blkstate->seqno, &useqno, sizeof(unsigned long long));
} else {
long long seqno;
rc = get_next_seqno(trans, &seqno);
rc = get_next_seqno(trans, NULL, &seqno);
if (rc == 0)
p_blkstate->seqno = seqno;
}
Expand Down Expand Up @@ -4842,7 +4842,7 @@ static int toblock_main_int(struct javasp_trans_state *javasp_trans_handle, stru
if (gbl_replicate_local && get_dbtable_by_name("comdb2_oplog") &&
!gbl_replicate_local_concurrent) {
long long seqno;
rc = get_next_seqno(trans, &seqno);
rc = get_next_seqno(trans, iq->sc_tran, &seqno);
if (rc) {
if (rc != RC_INTERNAL_RETRY)
logmsg(LOGMSG_ERROR,
Expand Down Expand Up @@ -5017,7 +5017,7 @@ static int toblock_main_int(struct javasp_trans_state *javasp_trans_handle, stru
long long seqno;
struct ireq aiq;

rc = get_next_seqno(trans, &seqno);
rc = get_next_seqno(trans, NULL, &seqno);
if (rc)
GOTOBACKOUT;

Expand Down Expand Up @@ -6490,14 +6490,15 @@ static int keyless_range_delete_post_delete(void *record, size_t record_len,
return 0;
}

int get_next_seqno(void *tran, long long *seqno)
int get_next_seqno(void *tran, void *sc_tran, long long *seqno)
{
/* key is a long long + int + descriptor bytes for each */
char fndkey[14];
int rc;
int fndlen;
int outnull, outsz;
struct ireq iq;
int64_t stored_seqno = 0;

init_fake_ireq(thedb, &iq);

Expand All @@ -6517,9 +6518,10 @@ int get_next_seqno(void *tran, long long *seqno)
&fndlen);
if (rc)
return rc;
if (fndlen == 0)
*seqno = 1;
else {
if (fndlen == 0) {
bdb_get_seqno(sc_tran ? sc_tran : tran, &stored_seqno);
*seqno = stored_seqno;
} else {
long long next_seqno = 0;

/* convert to client format value so we can use it. Call the Routine
Expand All @@ -6540,4 +6542,3 @@ int get_next_seqno(void *tran, long long *seqno)
}
return 0;
}

5 changes: 5 additions & 0 deletions tests/comdb2_oplog_preserve_seqno.test/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ifeq ($(TESTSROOTDIR),)
include ../testcase.mk
else
include $(TESTSROOTDIR)/testcase.mk
endif
23 changes: 23 additions & 0 deletions tests/comdb2_oplog_preserve_seqno.test/comdb2_oplog.csc2
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
tag ondisk {
longlong seqno
int blkpos
int optype
blob ops null=yes
}

tag "log" {
longlong seqno
int optype
int blkpos
blob ops
}

tag "justseq" {
longlong seqno
int blkpos
int optype
}

keys {
"seqno" = seqno + blkpos
}
19 changes: 19 additions & 0 deletions tests/comdb2_oplog_preserve_seqno.test/expected
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
(rows inserted=1)
(rows deleted=1)
(seqno=1, blkpos=0, optype=1)
(seqno=1, blkpos=1, optype=3)
(seqno=2, blkpos=0, optype=2)
(seqno=2, blkpos=1, optype=3)
(rows inserted=1)
(rows deleted=1)
(seqno=4, blkpos=0, optype=1)
(seqno=4, blkpos=1, optype=3)
(seqno=5, blkpos=0, optype=2)
(seqno=5, blkpos=1, optype=3)
LLMETA_MAX_SEQNO: 3
(rows inserted=1)
(rows deleted=1)
(seqno=1, blkpos=0, optype=1)
(seqno=1, blkpos=1, optype=3)
(seqno=2, blkpos=0, optype=2)
(seqno=2, blkpos=1, optype=3)
3 changes: 3 additions & 0 deletions tests/comdb2_oplog_preserve_seqno.test/lrl.options
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
replicate_local
comdb2_oplog_preserve_seqno 1
table comdb2_oplog comdb2_oplog.csc2
33 changes: 33 additions & 0 deletions tests/comdb2_oplog_preserve_seqno.test/runit
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/usr/bin/env bash
bash -n "$0" | exit 1

# Verifies that the value of seqno is preserved across fastinits on comdb2_oplog

dbnm=$1
master=`cdb2sql --tabs ${CDB2_OPTIONS} $dbnm default "SELECT host FROM comdb2_cluster WHERE is_master='Y'"`
cdb2sql $dbnm --host $master "EXEC PROCEDURE sys.cmd.send('comdb2_oplog_preserve_seqno 1')" >/dev/null

cdb2sql -s ${CDB2_OPTIONS} $dbnm default - >actual <<EOF
CREATE TABLE t1 (i INT, comdb2_seqno BIGINT)\$\$
INSERT INTO t1(i) VALUES(1) -- 1
DELETE FROM t1 WHERE 1 -- 2
SELECT seqno, blkpos, optype FROM comdb2_oplog ORDER BY seqno, blkpos
TRUNCATE comdb2_oplog -- 3
INSERT INTO t1(i) VALUES(1) -- 4
DELETE FROM t1 WHERE 1 -- 5
SELECT seqno, blkpos, optype FROM comdb2_oplog ORDER BY seqno, blkpos
EOF

cdb2sql --tabs ${CDB2_OPTIONS} $dbnm default "EXEC PROCEDURE sys.cmd.send('llmeta list')" | grep MAX_SEQNO >>actual

cdb2sql $dbnm --host $master "EXEC PROCEDURE sys.cmd.send('del_llmeta_comdb2_seqno')" >/dev/null
cdb2sql $dbnm --host $master "EXEC PROCEDURE sys.cmd.send('comdb2_oplog_preserve_seqno 0')" >/dev/null
cdb2sql $dbnm --host $master "EXEC PROCEDURE sys.cmd.send('del_llmeta_comdb2_seqno')" >/dev/null
cdb2sql -s ${CDB2_OPTIONS} $dbnm default - >>actual <<EOF
TRUNCATE comdb2_oplog
INSERT INTO t1(i) VALUES(1) -- 1
DELETE FROM t1 WHERE 1 -- 2
SELECT seqno, blkpos, optype FROM comdb2_oplog ORDER BY seqno, blkpos
EOF

diff actual expected