@@ -42,6 +42,11 @@ var ErrFollowerDivergedFromMaster = errors.New("follower diverged from master")
4242var ErrNoSynchronousReplicationOnMaster = errors .New ("master is not running with synchronous replication" )
4343var ErrInvalidReplicationMetadata = errors .New ("invalid replication metadata retrieved" )
4444
45+ type prefetchTxEntry struct {
46+ data []byte
47+ addedAt time.Time
48+ }
49+
4550type TxReplicator struct {
4651 uuid xid.ID
4752
@@ -61,7 +66,7 @@ type TxReplicator struct {
6166
6267 lastTx uint64
6368
64- prefetchTxBuffer chan [] byte // buffered channel of exported txs
69+ prefetchTxBuffer chan prefetchTxEntry // buffered channel of exported txs
6570 replicationConcurrency int
6671
6772 allowTxDiscarding bool
@@ -72,6 +77,8 @@ type TxReplicator struct {
7277 running bool
7378
7479 mutex sync.Mutex
80+
81+ metrics metrics
7582}
7683
7784func NewTxReplicator (uuid xid.ID , db database.DB , opts * Options , logger logger.Logger ) (* TxReplicator , error ) {
@@ -86,10 +93,11 @@ func NewTxReplicator(uuid xid.ID, db database.DB, opts *Options, logger logger.L
8693 logger : logger ,
8794 _masterDB : fullAddress (opts .masterDatabase , opts .masterAddress , opts .masterPort ),
8895 streamSrvFactory : stream .NewStreamServiceFactory (opts .streamChunkSize ),
89- prefetchTxBuffer : make (chan [] byte , opts .prefetchTxBufferSize ),
96+ prefetchTxBuffer : make (chan prefetchTxEntry , opts .prefetchTxBufferSize ),
9097 replicationConcurrency : opts .replicationCommitConcurrency ,
9198 allowTxDiscarding : opts .allowTxDiscarding ,
9299 delayer : opts .delayer ,
100+ metrics : metricsForDb (db .GetName ()),
93101 }, nil
94102}
95103
@@ -164,46 +172,77 @@ func (txr *TxReplicator) Start() error {
164172 }
165173 }()
166174
175+ txr .metrics .reset ()
176+
167177 for i := 0 ; i < txr .replicationConcurrency ; i ++ {
168178 go func () {
179+ txr .metrics .replicators .Inc ()
180+ defer txr .metrics .replicators .Dec ()
181+
169182 for etx := range txr .prefetchTxBuffer {
170- consecutiveFailures := 0
171-
172- // replication must be retried as many times as necessary
173- for {
174- _ , err := txr .db .ReplicateTx (etx )
175- if err == nil {
176- break // transaction successfully replicated
177- }
178- if errors .Is (err , ErrAlreadyStopped ) {
179- return
180- }
181-
182- if strings .Contains (err .Error (), "tx already committed" ) {
183- break // transaction successfully replicated
184- }
185-
186- txr .logger .Infof ("Failed to replicate transaction from '%s' to '%s'. Reason: %s" , txr ._masterDB , txr .db .GetName (), err .Error ())
187-
188- consecutiveFailures ++
189-
190- timer := time .NewTimer (txr .delayer .DelayAfter (consecutiveFailures ))
191- select {
192- case <- txr .context .Done ():
193- timer .Stop ()
194- return
195- case <- timer .C :
196- }
183+ txr .metrics .txWaitQueueHistogram .Observe (time .Since (etx .addedAt ).Seconds ())
184+
185+ if ! txr .replicateSingleTx (etx .data ) {
186+ break
197187 }
198188 }
199189 }()
200190 }
201191
202- txr .logger .Infof ("Replication from '%s' to '%s' succesfully initialized" , txr ._masterDB , txr .db .GetName ())
192+ txr .logger .Infof ("Replication from '%s' to '%s' successfully initialized" , txr ._masterDB , txr .db .GetName ())
203193
204194 return nil
205195}
206196
197+ func (txr * TxReplicator ) replicateSingleTx (data []byte ) bool {
198+ txr .metrics .replicatorsActive .Inc ()
199+ defer txr .metrics .replicatorsActive .Dec ()
200+ defer txr .metrics .replicationTimeHistogramTimer ().ObserveDuration ()
201+
202+ consecutiveFailures := 0
203+
204+ // replication must be retried as many times as necessary
205+ for {
206+ _ , err := txr .db .ReplicateTx (data )
207+ if err == nil {
208+ break // transaction successfully replicated
209+ }
210+ if errors .Is (err , ErrAlreadyStopped ) {
211+ return false
212+ }
213+
214+ if strings .Contains (err .Error (), "tx already committed" ) {
215+ break // transaction successfully replicated
216+ }
217+
218+ txr .logger .Infof ("Failed to replicate transaction from '%s' to '%s'. Reason: %s" , txr ._masterDB , txr .db .GetName (), err .Error ())
219+
220+ consecutiveFailures ++
221+
222+ if ! txr .replicationFailureDelay (consecutiveFailures ) {
223+ return false
224+ }
225+ }
226+
227+ return true
228+ }
229+
230+ func (txr * TxReplicator ) replicationFailureDelay (consecutiveFailures int ) bool {
231+ txr .metrics .replicationRetries .Inc ()
232+
233+ txr .metrics .replicatorsInRetryDelay .Inc ()
234+ defer txr .metrics .replicatorsInRetryDelay .Dec ()
235+
236+ timer := time .NewTimer (txr .delayer .DelayAfter (consecutiveFailures ))
237+ select {
238+ case <- txr .context .Done ():
239+ timer .Stop ()
240+ return false
241+ case <- timer .C :
242+ return true
243+ }
244+ }
245+
207246func fullAddress (db , address string , port int ) string {
208247 return fmt .Sprintf ("%s@%s:%d" , db , address , port )
209248}
@@ -358,7 +397,10 @@ func (txr *TxReplicator) fetchNextTx() error {
358397
359398 if len (etx ) > 0 {
360399 // in some cases the transaction is not provided but only the master commit state
361- txr .prefetchTxBuffer <- etx
400+ txr .prefetchTxBuffer <- prefetchTxEntry {
401+ data : etx ,
402+ addedAt : time .Now (),
403+ }
362404 txr .lastTx ++
363405 }
364406
0 commit comments