Skip to content

Commit dd689c7

Browse files
author
Bartłomiej Święcki
committed
chore(pkg/replication): Add TX gap metrics
Added metrics: * immudb_replication_primary_committed_tx_id - this is the latest known committed transaction ID of the prinary * immudb_replication_allow_commit_up_to_tx_id - tx ID the replica is allowed to commit up to Signed-off-by: Bartłomiej Święcki <[email protected]>
1 parent 0b877ba commit dd689c7

File tree

3 files changed

+36
-5
lines changed

3 files changed

+36
-5
lines changed

pkg/replication/metrics.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,16 @@ var (
5555
Name: "immudb_replication_replicators_retries",
5656
Help: "number of retries while replicating transactions caused by errors",
5757
}, []string{"db"})
58+
59+
_metricsReplicationPrimaryCommittedTxID = promauto.NewGaugeVec(prometheus.GaugeOpts{
60+
Name: "immudb_replication_primary_committed_tx_id",
61+
Help: "the latest know transaction ID committed on the primary node",
62+
}, []string{"db"})
63+
64+
_metricsAllowCommitUpToTxID = promauto.NewGaugeVec(prometheus.GaugeOpts{
65+
Name: "immudb_replication_allow_commit_up_to_tx_id",
66+
Help: "most recently received confirmation up to which commit id the replica is allowed to durably commit",
67+
}, []string{"db"})
5868
)
5969

6070
type metrics struct {
@@ -64,6 +74,8 @@ type metrics struct {
6474
replicators prometheus.Gauge
6575
replicatorsActive prometheus.Gauge
6676
replicatorsInRetryDelay prometheus.Gauge
77+
primaryCommittedTxID prometheus.Gauge
78+
allowCommitUpToTxID prometheus.Gauge
6779
}
6880

6981
// metricsForDb returns metrics object for particular database name
@@ -75,6 +87,8 @@ func metricsForDb(dbName string) metrics {
7587
replicators: _metricsReplicators.WithLabelValues(dbName),
7688
replicatorsActive: _metricsReplicatorsActive.WithLabelValues(dbName),
7789
replicatorsInRetryDelay: _metricsReplicatorsInRetryDelay.WithLabelValues(dbName),
90+
primaryCommittedTxID: _metricsReplicationPrimaryCommittedTxID.WithLabelValues(dbName),
91+
allowCommitUpToTxID: _metricsAllowCommitUpToTxID.WithLabelValues(dbName),
7892
}
7993
}
8094

@@ -83,6 +97,8 @@ func (m *metrics) reset() {
8397
m.replicators.Set(0)
8498
m.replicatorsActive.Set(0)
8599
m.replicatorsInRetryDelay.Set(0)
100+
m.primaryCommittedTxID.Set(0)
101+
m.allowCommitUpToTxID.Set(0)
86102
}
87103

88104
// replicationTimeHistogramTimer returns prometheus timer for replicationTimeHistogram

pkg/replication/replicator.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -369,19 +369,27 @@ func (txr *TxReplicator) fetchNextTx() error {
369369
if syncReplicationEnabled {
370370
md := exportTxStream.Trailer()
371371

372-
if len(md.Get("may-commit-up-to-txid-bin")) == 0 || len(md.Get("may-commit-up-to-alh-bin")) == 0 {
372+
if len(md.Get("may-commit-up-to-txid-bin")) == 0 ||
373+
len(md.Get("may-commit-up-to-alh-bin")) == 0 ||
374+
len(md.Get("committed-txid-bin")) == 0 {
373375
return ErrNoSynchronousReplicationOnMaster
374376
}
375377

376-
if len(md.Get("may-commit-up-to-txid-bin")[0]) != 8 || len(md.Get("may-commit-up-to-alh-bin")[0]) != sha256.Size {
378+
if len(md.Get("may-commit-up-to-txid-bin")[0]) != 8 ||
379+
len(md.Get("may-commit-up-to-alh-bin")[0]) != sha256.Size ||
380+
len(md.Get("committed-txid-bin")[0]) != 8 {
377381
return ErrInvalidReplicationMetadata
378382
}
379383

380384
mayCommitUpToTxID := binary.BigEndian.Uint64([]byte(md.Get("may-commit-up-to-txid-bin")[0]))
385+
committedTxID := binary.BigEndian.Uint64([]byte(md.Get("committed-txid-bin")[0]))
381386

382387
var mayCommitUpToAlh [sha256.Size]byte
383388
copy(mayCommitUpToAlh[:], []byte(md.Get("may-commit-up-to-alh-bin")[0]))
384389

390+
txr.metrics.primaryCommittedTxID.Set(float64(committedTxID))
391+
txr.metrics.allowCommitUpToTxID.Set(float64(mayCommitUpToTxID))
392+
385393
if mayCommitUpToTxID > commitState.TxId {
386394
err = txr.db.AllowCommitUpto(mayCommitUpToTxID, mayCommitUpToAlh)
387395
if err != nil {

pkg/server/stream_replication.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,19 @@ func (s *ImmuServer) ExportTx(req *schema.ExportTxRequest, txsServer schema.Immu
3838

3939
defer func() {
4040
if req.FollowerState != nil {
41-
var bTxID [8]byte
42-
binary.BigEndian.PutUint64(bTxID[:], mayCommitUpToTxID)
41+
var bMayCommitUpToTxID [8]byte
42+
binary.BigEndian.PutUint64(bMayCommitUpToTxID[:], mayCommitUpToTxID)
43+
44+
var bCommittedTxID [8]byte
45+
state, err := db.CurrentState()
46+
if err == nil {
47+
binary.BigEndian.PutUint64(bCommittedTxID[:], state.TxId)
48+
}
4349

4450
md := metadata.Pairs(
45-
"may-commit-up-to-txid-bin", string(bTxID[:]),
51+
"may-commit-up-to-txid-bin", string(bMayCommitUpToTxID[:]),
4652
"may-commit-up-to-alh-bin", string(mayCommitUpToAlh[:]),
53+
"committed-txid-bin", string(bCommittedTxID[:]),
4754
)
4855

4956
txsServer.SetTrailer(md)

0 commit comments

Comments
 (0)