Skip to content

Commit e77545f

Browse files
committed
chore(pkg/replication): add replication lag metric
Signed-off-by: Stefano Scafiti <[email protected]>
1 parent 86e956f commit e77545f

File tree

5 files changed

+53
-16
lines changed

5 files changed

+53
-16
lines changed

pkg/database/lazy_db.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package database
1919
import (
2020
"context"
2121
"crypto/sha256"
22-
"io"
22+
"errors"
2323
"path/filepath"
2424
"time"
2525

@@ -29,6 +29,8 @@ import (
2929
"github.com/codenotary/immudb/pkg/api/schema"
3030
)
3131

32+
var ErrNoNewTransactions = errors.New("no new transactions")
33+
3234
type lazyDB struct {
3335
m *DBManager
3436

@@ -440,7 +442,7 @@ func (db *lazyDB) ExportTxByID(ctx context.Context, req *schema.ExportTxRequest)
440442

441443
if !req.AllowPreCommitted {
442444
if req.Tx > state.TxId {
443-
return nil, 0, [sha256.Size]byte{}, io.EOF
445+
return nil, 0, [sha256.Size]byte{}, ErrNoNewTransactions
444446
}
445447
}
446448

pkg/replication/metrics.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ var (
6565
Name: "immudb_replication_allow_commit_up_to_tx_id",
6666
Help: "most recently received confirmation up to which commit id the replica is allowed to durably commit",
6767
}, []string{"db"})
68+
69+
_metricsReplicationLag = promauto.NewGaugeVec(prometheus.GaugeOpts{
70+
Name: "immudb_replication_lag",
71+
Help: "The difference between the last transaction committed by the primary and replicated by the replica",
72+
}, []string{"db"})
6873
)
6974

7075
type metrics struct {
@@ -76,6 +81,7 @@ type metrics struct {
7681
replicatorsInRetryDelay prometheus.Gauge
7782
primaryCommittedTxID prometheus.Gauge
7883
allowCommitUpToTxID prometheus.Gauge
84+
replicationLag prometheus.Gauge
7985
}
8086

8187
// metricsForDb returns metrics object for particular database name
@@ -89,6 +95,7 @@ func metricsForDb(dbName string) metrics {
8995
replicatorsInRetryDelay: _metricsReplicatorsInRetryDelay.WithLabelValues(dbName),
9096
primaryCommittedTxID: _metricsReplicationPrimaryCommittedTxID.WithLabelValues(dbName),
9197
allowCommitUpToTxID: _metricsAllowCommitUpToTxID.WithLabelValues(dbName),
98+
replicationLag: _metricsReplicationLag.WithLabelValues(dbName),
9299
}
93100
}
94101

@@ -99,6 +106,7 @@ func (m *metrics) reset() {
99106
m.replicatorsInRetryDelay.Set(0)
100107
m.primaryCommittedTxID.Set(0)
101108
m.allowCommitUpToTxID.Set(0)
109+
m.replicationLag.Set(0)
102110
}
103111

104112
// replicationTimeHistogramTimer returns prometheus timer for replicationTimeHistogram

pkg/replication/replicator.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,14 @@ func (txr *TxReplicator) fetchNextTx() error {
377377
defer txr.disconnect()
378378
}
379379

380+
txr.maybeUpdateReplicationLag(commitState.TxId, emd)
381+
380382
if err != nil && !errors.Is(err, io.EOF) {
383+
if strings.Contains(err.Error(), database.ErrNoNewTransactions.Error()) {
384+
txr.metrics.replicationLag.Set(0)
385+
return err
386+
}
387+
381388
if strings.Contains(err.Error(), "replica commit state diverged from primary") {
382389
txr.logger.Errorf("replica commit state at '%s' diverged from primary's", txr.db.GetName())
383390
return ErrReplicaDivergedFromPrimary
@@ -497,3 +504,13 @@ func (txr *TxReplicator) Error() error {
497504

498505
return txr.err
499506
}
507+
508+
func (txr *TxReplicator) maybeUpdateReplicationLag(lastCommittedTxID uint64, metadata map[string][]byte) {
509+
primaryLastCommittedTxIDBin, ok := metadata["committed-txid-bin"]
510+
if !ok {
511+
return
512+
}
513+
514+
lag := binary.BigEndian.Uint64(primaryLastCommittedTxIDBin) - lastCommittedTxID
515+
txr.metrics.replicationLag.Set(float64(lag))
516+
}

pkg/server/server.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -317,9 +317,18 @@ func (s *ImmuServer) Start() (err error) {
317317
startedAt = time.Now()
318318

319319
if s.Options.MetricsServer {
320-
s.metricsServer = StartMetrics(1*time.Minute, s.Options.MetricsBind(), s.Options.TLSConfig, s.Logger, s.metricFuncServerUptimeCounter,
321-
s.metricFuncComputeDBSizes, s.metricFuncComputeDBEntries, s.metricFuncComputeLoadedDBSize, s.metricFuncComputeSessionCount,
320+
s.metricsServer = StartMetrics(
321+
1*time.Minute,
322+
s.Options.MetricsBind(),
323+
s.Options.TLSConfig,
324+
s.Logger,
325+
s.metricFuncServerUptimeCounter,
326+
s.metricFuncComputeDBSizes,
327+
s.metricFuncComputeDBEntries,
328+
s.metricFuncComputeLoadedDBSize,
329+
s.metricFuncComputeSessionCount,
322330
s.Options.PProf)
331+
323332
defer func() {
324333
if err := s.metricsServer.Close(); err != nil {
325334
s.Logger.Errorf("failed to shutdown metric server: %s", err)

pkg/server/stream_replication.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -62,23 +62,24 @@ func (s *ImmuServer) exportTx(req *schema.ExportTxRequest, txsServer schema.Immu
6262
return err
6363
}
6464

65-
var streamMetadata map[string][]byte
65+
var bCommittedTxID [8]byte
66+
state, err := db.CurrentState()
67+
if err == nil {
68+
binary.BigEndian.PutUint64(bCommittedTxID[:], state.TxId)
69+
}
70+
71+
// In asynchronous replication, the last committed transaction value is sent to the replica
72+
// to enable updating its replication lag.
73+
streamMetadata := map[string][]byte{
74+
"committed-txid-bin": bCommittedTxID[:],
75+
}
6676

6777
if req.ReplicaState != nil {
6878
var bMayCommitUpToTxID [8]byte
6979
binary.BigEndian.PutUint64(bMayCommitUpToTxID[:], mayCommitUpToTxID)
7080

71-
var bCommittedTxID [8]byte
72-
state, err := db.CurrentState()
73-
if err == nil {
74-
binary.BigEndian.PutUint64(bCommittedTxID[:], state.TxId)
75-
}
76-
77-
streamMetadata = map[string][]byte{
78-
"may-commit-up-to-txid-bin": bMayCommitUpToTxID[:],
79-
"may-commit-up-to-alh-bin": mayCommitUpToAlh[:],
80-
"committed-txid-bin": bCommittedTxID[:],
81-
}
81+
streamMetadata["may-commit-up-to-txid-bin"] = bMayCommitUpToTxID[:]
82+
streamMetadata["may-commit-up-to-alh-bin"] = mayCommitUpToAlh[:]
8283

8384
if setTrailer {
8485
// trailer metadata is kept for backward compatibility

0 commit comments

Comments
 (0)