Skip to content

Commit 8a0996b

Browse files
Updated readme and fixed missing completion time from status
1 parent fb5c511 commit 8a0996b

6 files changed

Lines changed: 95 additions & 45 deletions

File tree

config.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,8 @@ validation:
118118

119119
# --------------------------------------------------------------------------
120120
# full_validation: Controls the scope of CDC data validation.
121-
# - true: (Default) Validates all operations (Inserts, Upserts, Updates, and Deletes).
122-
# - false: Validates ONLY Deletes. This is a massive performance optimization
121+
# - true: Validates all operations (Inserts, Upserts, Updates, and Deletes).
122+
# - false: (Default) Validates ONLY Deletes. This is a massive performance optimization
123123
# since Inserts and Updates are guaranteed by the source change stream
124124
# payload, whereas Deletes lose their full document payload.
125125
#

internal/cdc/cdc.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -427,14 +427,13 @@ func (m *CDCManager) hydrateAndFixOperations(ctx context.Context, ns string, bat
427427
}
428428

429429
foundDocs := make(map[string]bson.M)
430-
if err == nil {
431-
defer cursor.Close(ctx)
432-
for cursor.Next(ctx) {
433-
var doc bson.M
434-
if err := cursor.Decode(&doc); err == nil {
435-
idStr := formatID(doc["_id"])
436-
foundDocs[idStr] = doc
437-
}
430+
431+
defer cursor.Close(ctx)
432+
for cursor.Next(ctx) {
433+
var doc bson.M
434+
if err := cursor.Decode(&doc); err == nil {
435+
idStr := formatID(doc["_id"])
436+
foundDocs[idStr] = doc
438437
}
439438
}
440439

internal/config/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ func LoadConfig() {
224224

225225
// Validation Defaults
226226
viper.SetDefault("validation.enabled", true)
227-
viper.SetDefault("validation.full_validation", true)
227+
viper.SetDefault("validation.full_validation", false)
228228
viper.SetDefault("validation.batch_size", 100)
229229
viper.SetDefault("validation.retry_interval_ms", 500)
230230
viper.SetDefault("validation.max_validation_workers", 4)

internal/flow/monitor.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ func (m *Manager) WaitIfPaused() {
112112
func (m *Manager) HandlePause(w http.ResponseWriter, r *http.Request) {
113113
m.emergencyPaused.Store(true)
114114
if m.statusMgr != nil {
115+
m.statusMgr.SetEmergencyPaused(true)
115116
m.statusMgr.SetState("paused", "Paused manually by user")
116117
m.statusMgr.Persist(context.Background())
117118
}
@@ -123,7 +124,7 @@ func (m *Manager) HandlePause(w http.ResponseWriter, r *http.Request) {
123124
func (m *Manager) HandleResume(w http.ResponseWriter, r *http.Request) {
124125
m.emergencyPaused.Store(false)
125126
if m.statusMgr != nil {
126-
// Restore state gracefully based on current migration phase
127+
m.statusMgr.SetEmergencyPaused(false)
127128
if m.statusMgr.IsMigrationFinalized() {
128129
m.statusMgr.SetState("completed", "Migration Finalized")
129130
} else if m.statusMgr.IsCloneCompleted() {

internal/status/status.go

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ type Manager struct {
121121
estimatedTotalDocs atomic.Int64
122122
clonedBytes atomic.Int64
123123
clonedDocs atomic.Int64
124+
initialSyncLag atomic.Value
124125

125126
// Timestamps (Stored as Unix Seconds)
126127
lastSourceEventTime atomic.Int64
@@ -146,6 +147,7 @@ type Manager struct {
146147
fcWriterQueue atomic.Int64
147148
fcNativeLagged atomic.Bool
148149
fcNativeSustainer atomic.Int64
150+
fcEmergencyPaused atomic.Bool
149151

150152
// Validation Stats (In-Memory)
151153
valTotalChecked atomic.Int64
@@ -155,6 +157,20 @@ type Manager struct {
155157
valQueueSize atomic.Int64
156158
}
157159

160+
func parseTime(v interface{}) time.Time {
161+
if v == nil {
162+
return time.Time{}
163+
}
164+
if t, ok := v.(time.Time); ok {
165+
return t
166+
}
167+
// Handle BSON DateTime (which is an int64 under the hood representing milliseconds)
168+
if dt, ok := v.(bson.DateTime); ok {
169+
return time.Unix(int64(dt)/1000, (int64(dt)%1000)*1000000).UTC()
170+
}
171+
return time.Time{}
172+
}
173+
158174
func NewManager(client *mongo.Client, isSource bool, version string) *Manager {
159175
dbName := config.Cfg.Migration.MetadataDB
160176
collName := config.Cfg.Migration.StatusCollection
@@ -169,7 +185,8 @@ func NewManager(client *mongo.Client, isSource bool, version string) *Manager {
169185
startTime: time.Now().UTC(),
170186
}
171187
m.fcReason.Store("")
172-
m.idxCurrentNs.Store("") // Safely initialize empty string
188+
m.idxCurrentNs.Store("")
189+
m.initialSyncLag.Store(0.0)
173190
return m
174191
}
175192

@@ -286,8 +303,15 @@ func (m *Manager) SetInitialSyncEnd(t time.Time) {
286303
func (m *Manager) IsInitialSyncCompleted() bool {
287304
return m.initialSyncCompleted.Load()
288305
}
306+
289307
func (m *Manager) SetInitialSyncCompleted(lagSeconds float64) {
290308
m.initialSyncCompleted.Store(true)
309+
m.initialSyncLag.Store(lagSeconds)
310+
}
311+
312+
func (m *Manager) SetEmergencyPaused(paused bool) {
313+
m.fcEmergencyPaused.Store(paused)
314+
go m.Persist(context.Background())
291315
}
292316

293317
func (m *Manager) IsCloneCompleted() bool {
@@ -387,16 +411,27 @@ func (m *Manager) LoadAndMerge(ctx context.Context) error {
387411
m.idxFinished.Store(val)
388412
}
389413
}
390-
391414
m.mu.Lock()
392-
if val, ok := doc["initialSyncStart"].(time.Time); ok {
393-
m.initialSyncStart = val
415+
416+
if val, ok := doc["initialSyncStart"]; ok {
417+
m.initialSyncStart = parseTime(val)
394418
}
395-
if val, ok := doc["initialSyncEnd"].(time.Time); ok {
396-
m.initialSyncEnd = val
419+
if val, ok := doc["initialSyncEnd"]; ok {
420+
m.initialSyncEnd = parseTime(val)
397421
}
398422
m.mu.Unlock()
399423

424+
if val, ok := doc["initialSyncLag"]; ok {
425+
switch v := val.(type) {
426+
case float64:
427+
m.initialSyncLag.Store(v)
428+
case int32:
429+
m.initialSyncLag.Store(float64(v))
430+
case int64:
431+
m.initialSyncLag.Store(float64(v))
432+
}
433+
}
434+
400435
if fc, ok := doc["flowControl"].(bson.M); ok {
401436
if val, ok := fc["isPaused"].(bool); ok {
402437
m.fcIsPaused.Store(val)
@@ -416,6 +451,9 @@ func (m *Manager) LoadAndMerge(ctx context.Context) error {
416451
if val, ok := fc["assignedRateLimit"]; ok {
417452
m.fcNativeSustainer.Store(parseInt64(val))
418453
}
454+
if val, ok := fc["emergencyPaused"].(bool); ok {
455+
m.fcEmergencyPaused.Store(val)
456+
}
419457
}
420458

421459
return nil
@@ -437,9 +475,12 @@ func (m *Manager) Persist(ctx context.Context) {
437475
"completed": m.idxFinished.Load(),
438476
}
439477

478+
lag, _ := m.initialSyncLag.Load().(float64)
479+
440480
fcStatus := bson.M{
441481
"enabled": config.Cfg.FlowControl.Enabled,
442482
"isPaused": m.fcIsPaused.Load(),
483+
"emergencyPaused": m.fcEmergencyPaused.Load(),
443484
"pauseReason": m.fcReason.Load().(string),
444485
"currentQueuedOps": m.fcQueuedOps.Load(),
445486
"currentWriterQueue": m.fcWriterQueue.Load(),
@@ -470,6 +511,7 @@ func (m *Manager) Persist(ctx context.Context) {
470511
"initialSyncCompleted": m.initialSyncCompleted.Load(),
471512
"initialSyncStart": start,
472513
"initialSyncEnd": end,
514+
"initialSyncLag": lag,
473515
"migrationFinalized": m.migrationFinalized.Load(),
474516
"flowControl": fcStatus,
475517
"indexing": idxStatus,
@@ -548,6 +590,7 @@ func (m *Manager) buildStatusOutput(valStore ValidatorStore) StatusOutput {
548590
lastSourceUnix := m.lastSourceEventTime.Load()
549591
lastAppliedUnix := m.lastAppliedEventTime.Load()
550592
lastBatchUnix := m.lastBatchAppliedAt.Load()
593+
lag, _ := m.initialSyncLag.Load().(float64)
551594

552595
idleTime := 0.0
553596
cdcLag := 0.0
@@ -670,20 +713,23 @@ func (m *Manager) buildStatusOutput(valStore ValidatorStore) StatusOutput {
670713
StartedAt: startStr,
671714
EndedAt: endStr,
672715
Duration: duration,
673-
CompletionLagSeconds: 0,
716+
CompletionLagSeconds: lag,
674717
},
675718
}
676-
677-
if config.Cfg.FlowControl.Enabled {
719+
isEmergencyPaused := m.fcEmergencyPaused.Load()
720+
if config.Cfg.FlowControl.Enabled || isEmergencyPaused {
678721
output.FlowControl = &FCStatus{
679-
Enabled: true,
680-
IsPaused: m.fcIsPaused.Load(),
722+
Enabled: config.Cfg.FlowControl.Enabled,
723+
IsPaused: m.fcIsPaused.Load() || isEmergencyPaused,
681724
PauseReason: m.fcReason.Load().(string),
682725
CurrentQueuedOps: int(m.fcQueuedOps.Load()),
683726
CurrentWriterQueue: int(m.fcWriterQueue.Load()),
684727
NativeLagged: m.fcNativeLagged.Load(),
685728
NativeSustainerRate: int(m.fcNativeSustainer.Load()),
686729
}
730+
if isEmergencyPaused {
731+
output.FlowControl.PauseReason = "Paused manually by user"
732+
}
687733
}
688734

689735
return output

readme.md

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -642,7 +642,7 @@ This command is used when you are ready to cut over to your new environment. It
642642

643643
```bash
644644
--- docStreamer Status (Live) ---
645-
PID: 775378 (Querying http://localhost:8080/status)
645+
PID: 1321077 (Querying http://localhost:8080/status)
646646
{
647647
"version": "dev",
648648
"ok": true,
@@ -664,41 +664,41 @@ PID: 775378 (Querying http://localhost:8080/status)
664664
"isReplicationLagged": false,
665665
"assignedRateLimit": 0
666666
},
667-
"timeSinceLastEventSeconds": 1861.430341972,
667+
"timeSinceLastEventSeconds": 606.175643039,
668668
"cdcLagSeconds": 0,
669-
"totalEventsApplied": 6182,
670-
"adaptiveSerialBatches": 0,
671-
"insertedDocs": 2830,
672-
"updatedDocs": 2306,
673-
"deletedDocs": 1046,
669+
"totalEventsApplied": 43906,
670+
"adaptiveSerialBatches": 7,
671+
"insertedDocs": 34422,
672+
"updatedDocs": 4710,
673+
"deletedDocs": 4774,
674674
"validation": {
675675
"queuedBatches": 0,
676-
"totalChecked": 10266,
677-
"mismatchFound": 64,
678-
"mismatchFixed": 64,
676+
"totalChecked": 2519,
677+
"mismatchFound": 24,
678+
"mismatchFixed": 24,
679679
"pendingMismatches": 0,
680680
"hotKeysWaiting": 0,
681681
"syncPercent": 100,
682-
"lastValidatedAt": "2026-02-26T21:38:03Z"
682+
"lastValidatedAt": "2026-03-03T19:46:33Z"
683683
},
684684
"lastSourceEventTime": {
685-
"ts": "1772140028",
686-
"isoDate": "2026-02-26T21:07:08Z"
685+
"ts": "1772567184",
686+
"isoDate": "2026-03-03T19:46:24Z"
687687
},
688688
"lastAppliedEventTime": {
689-
"ts": "1772140028",
690-
"isoDate": "2026-02-26T21:07:08Z"
689+
"ts": "1772567184",
690+
"isoDate": "2026-03-03T19:46:24Z"
691691
},
692-
"lastBatchAppliedAt": "2026-02-26T21:38:09Z",
692+
"lastBatchAppliedAt": "2026-03-03T19:56:29Z",
693693
"initialSync": {
694694
"completed": true,
695695
"progressPercent": 100,
696-
"clonedDocs": 3459543,
697-
"estimatedTotalDocs": 3455411,
698-
"clonedBytes": 7026191966,
699-
"estimatedTotalBytes": 8853615388,
700-
"clonedSizeHuman": "6.5 GB",
701-
"estimatedCloneSizeHuman": "8.2 GB",
696+
"clonedDocs": 4945277,
697+
"estimatedTotalDocs": 4945205,
698+
"clonedBytes": 7864311434,
699+
"estimatedTotalBytes": 11430286453,
700+
"clonedSizeHuman": "7.3 GB",
701+
"estimatedCloneSizeHuman": "10.6 GB",
702702
"startedAt": "",
703703
"endedAt": "",
704704
"duration": "N/A",
@@ -821,6 +821,7 @@ Percona docStreamer generates three separate logs, each of the logs location and
821821
1. Application Log (`logs/docStreamer.log`): Tracks the overall application status and any errors encountered.
822822
2. Full Load Log (`logs/full_load.log`): Dedicated to the initial full synchronization process. This log, together with the status endpoint, helps you monitor the progress of the initial sync.
823823
3. CDC Log (`logs/cdc.log`): Dedicated to Change Data Capture (CDC) operations. These operations begin only after the full sync is complete, so this log will remain empty until that point. Use it, along with the status endpoint, to track CDC progress.
824+
4. Validation Log (`logs/validator.log`): Dedicated log for tracking the validation progress
824825

825826
#### Logging Configuration
826827

@@ -832,6 +833,7 @@ The application generates specialized logs to help you monitor different stages
832833
| `file_path` | `logs/docStreamer.log` | The primary application log containing system status and errors. |
833834
| `ops_log_path` | `logs/cdc.log` | Dedicated log for Change Data Capture (CDC) operations and batch details. |
834835
| `full_load_log_path` | `logs/full_load.log` | Dedicated log for tracking the progress and batches of the initial full sync. |
836+
| `validator_log_path` | `logs/validator.log` | Dedicated log for tracking the validation progress. |
835837

836838
<details>
837839
<summary>Application log sample:</summary>
@@ -1149,6 +1151,7 @@ In addition to monitoring lock queues, the Adaptive Flow Control mechanism uses
11491151
| `latency_threshold_ms` | `250` | The maximum acceptable latency for a `serverStatus` command. If exceeded, the system pauses to allow the target to recover. |
11501152
| `active_client_threshold` | `50` | The maximum number of total concurrent active clients allowed on the target before throttling occurs. |
11511153
| `min_wired_tiger_tickets` | `0` | The minimum number of available WiredTiger write tickets required. If it drops below this value, the system pauses. Set to `0` to disable. |
1154+
| `target_max_queued_ops` | `50` | The safety limit for the Target's Global Lock Queue. If any node exceeds this many queued operations, docStreamer pauses. |
11521155

11531156
#### Ad-Hoc Emergency Flow Control (Pause / Resume)
11541157

@@ -1185,6 +1188,7 @@ The data validation engine is highly configurable to balance performance impact
11851188
| Setting | Default | Description |
11861189
|--------:|--------:|-------------|
11871190
| `enabled` | `true` | Master switch for the validation engine. If false, final document verification after CDC writes are skipped. CDC is guaranteed to sync the documents; this is an optional additional validation check. |
1191+
| `full_validation` | `false` | Controls the scope of CDC data validation. If true, all ops are validated. If false (recommended), ONLY Deletes are validated (massive performance boost since inserts/updates are guaranteed by the stream payload). Note: This setting is hot-reloadable and can be changed without restarting the app. |
11881192
| `batch_size` | `100` | Network vs. Memory Trade-off. Controls how many document IDs are bundled into a single database lookup. Larger batches reduce network round-trips but increase memory usage. |
11891193
| `max_validation_workers` | `4` | Concurrency Control. The number of parallel worker threads fetching and comparing documents. Increase this if you have spare CPU/Network capacity and notice validation lagging behind CDC. |
11901194
| `queue_size` | `2000` | Buffer Capacity. The size of the channel buffering CDC events before validation. If the CDC writer is faster than the validator and this buffer fills up, validation requests will be queued and this could cause slowing down the replication stream. |

0 commit comments

Comments
 (0)