Skip to content

Commit d84e7b2

Browse files
Implemented configurable retry logic to full load and cdc; updated docs
1 parent 3825141 commit d84e7b2

6 files changed

Lines changed: 232 additions & 41 deletions

File tree

config.yaml

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,18 @@ cloner:
158158
# A collection of 1M docs will be split into 100 segments of 10k docs
159159
segment_size_docs: 10000
160160

161+
# num_retries: How many times to retry failed operations (reads/writes)
162+
# Default: 5
163+
num_retries: 5
164+
165+
# retry_interval_ms: How long to wait (in ms) between retries
166+
# Default: 1000
167+
retry_interval_ms: 1000
168+
169+
# write_timeout_ms: Max time (in ms) for a single bulk write batch to complete
170+
# Default: 30000
171+
write_timeout_ms: 30000
172+
161173
# -----------------------------------------------
162174
# Change Data Capture (CDC) Settings
163175
# -----------------------------------------------
@@ -179,4 +191,18 @@ cdc:
179191
# Increasing this value helps if you have a very high volume of changes on the source (DocumentDB) and your target (MongoDB) has plenty of CPU/IO capacity.
180192
# It prevents the application from falling behind simply because it can't write fast enough.
181193
# Resource Usage: Setting this too high can saturate the connections or CPU on your target MongoDB cluster, potentially slowing down other operations.
182-
max_write_workers: 8
194+
max_write_workers: 8
195+
196+
# num_retries: How many times to retry a failed batch (due to network/connection issues)
197+
# before giving up and stopping the migration.
198+
# Default: 10
199+
num_retries: 10
200+
201+
# retry_interval_ms: How long to wait (in milliseconds) between retry attempts.
202+
# Default: 1000 (1 second)
203+
retry_interval_ms: 1000
204+
205+
# write_timeout_ms: The maximum time to wait for a BulkWrite operation to complete.
206+
# If the network hangs, this ensures the worker doesn't freeze forever.
207+
# Default: 30000 (30 seconds)
208+
write_timeout_ms: 30000

faq.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,17 @@ While the architecture supports sharding, there are important caveats, particula
231231

232232
---
233233

234+
## Q: Can I migrate from a non sharded DocumentDB cluster to a sharded MongoDB cluster?
235+
**A:** Yes, please note that docStreamer does not perform sharding setup operations (such as enableSharding or shardCollection). You must configure the sharding topology manually before starting the migration.
236+
237+
Recommended Workflow:
238+
239+
1. Pre-Create and Shard: Before starting the migration, manually create your target databases and collections, and enable sharding with your desired shard keys.
240+
2. Start Migration: When docStreamer starts, it will detect that the collections already exist and skip the creation step.
241+
3. Data Loading: The tool will insert data through the mongos router, allowing MongoDB to automatically distribute the documents across shards based on your pre-configured setup.
242+
243+
---
244+
234245
## Q: Are new databases and collections migrated to destination if they were created on source while docStreamer is running?
235246
**A:** Yes. New collections and databases created while docStreamer is running (and while it is paused), will be detected and migrated.
236247

internal/cdc/cdc.go

Lines changed: 127 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"hash/fnv"
7+
"strings"
78
"sync"
89
"sync/atomic"
910
"time"
@@ -19,10 +20,9 @@ import (
1920
)
2021

2122
type CDCManager struct {
22-
sourceClient *mongo.Client
23-
targetClient *mongo.Client
24-
eventQueue chan *ChangeEvent
25-
// channel type carries Batch struct (Models + IDs)
23+
sourceClient *mongo.Client
24+
targetClient *mongo.Client
25+
eventQueue chan *ChangeEvent
2626
flushQueues []chan map[string]*Batch
2727
bulkWriters []*BulkWriter
2828
startAt bson.Timestamp
@@ -31,14 +31,30 @@ type CDCManager struct {
3131
statusManager *status.Manager
3232
tracker *validator.InFlightTracker
3333
store *validator.Store
34-
validatorMgr *validator.Manager // Access to validation manager
34+
validatorMgr *validator.Manager
3535
shutdownWG sync.WaitGroup
3636
workerWG sync.WaitGroup
3737
totalEventsApplied atomic.Int64
3838
checkpointDocID string
39-
// Maps for fast exclusion lookup
40-
excludeDBs map[string]bool
41-
excludeColls map[string]bool
39+
excludeDBs map[string]bool
40+
excludeColls map[string]bool
41+
fatalErrorChan chan error
42+
}
43+
44+
// shouldRetry checks if an error is a transient network or connection issue
45+
func shouldRetry(err error) bool {
46+
if err == nil {
47+
return false
48+
}
49+
msg := strings.ToLower(err.Error())
50+
return strings.Contains(msg, "connection") ||
51+
strings.Contains(msg, "network") ||
52+
strings.Contains(msg, "timeout") ||
53+
strings.Contains(msg, "deadline") ||
54+
strings.Contains(msg, "socket") ||
55+
strings.Contains(msg, "topology") ||
56+
strings.Contains(msg, "context canceled") ||
57+
strings.Contains(msg, "server selection error")
4258
}
4359

4460
func NewManager(source, target *mongo.Client, checkpointDocID string, startAt bson.Timestamp, checkpoint *checkpoint.Manager, statusMgr *status.Manager, tracker *validator.InFlightTracker, store *validator.Store, valMgr *validator.Manager) *CDCManager {
@@ -98,6 +114,7 @@ func NewManager(source, target *mongo.Client, checkpointDocID string, startAt bs
98114
checkpointDocID: checkpointDocID,
99115
excludeDBs: excludeDBs,
100116
excludeColls: excludeColls,
117+
fatalErrorChan: make(chan error, workerCount+1), // Buffer slightly to prevent blocking
101118
}
102119
mgr.totalEventsApplied.Store(initialEvents)
103120
return mgr
@@ -107,9 +124,13 @@ func NewManager(source, target *mongo.Client, checkpointDocID string, startAt bs
107124
func (m *CDCManager) handleBulkWrite(ctx context.Context, batchMap map[string]*Batch) (int64, []string, bson.Timestamp, error) {
108125
var totalOps int64
109126
var namespaces []string
110-
var firstErr error
127+
var lastErr error
111128
var batchMaxTS bson.Timestamp
112129

130+
// Configurable Retry Settings
131+
maxRetries := config.Cfg.CDC.NumRetries
132+
retryInterval := time.Duration(config.Cfg.CDC.RetryIntervalMS) * time.Millisecond
133+
113134
for ns, batch := range batchMap {
114135
if len(batch.Models) == 0 {
115136
continue
@@ -130,21 +151,59 @@ func (m *CDCManager) handleBulkWrite(ctx context.Context, batchMap map[string]*B
130151
targetColl := m.targetClient.Database(db).Collection(coll)
131152
opts := options.BulkWrite().SetOrdered(true)
132153

133-
result, err := targetColl.BulkWrite(ctx, batch.Models, opts)
154+
// --- RETRY LOOP ---
155+
var success bool
156+
for i := 0; i <= maxRetries; i++ {
157+
if ctx.Err() != nil {
158+
return totalOps, namespaces, batchMaxTS, ctx.Err()
159+
}
134160

135-
if err != nil {
136-
logging.PrintError(fmt.Sprintf("[%s] BulkWrite failed: %v", ns, err), 0)
137-
if wErr, ok := err.(mongo.BulkWriteException); ok {
138-
logging.PrintError(fmt.Sprintf("[%s] ... %d write errors", ns, len(wErr.WriteErrors)), 0)
161+
// If this is a retry, wait before proceeding
162+
if i > 0 {
163+
time.Sleep(retryInterval)
139164
}
140-
if firstErr == nil {
141-
firstErr = err
165+
166+
result, err := targetColl.BulkWrite(ctx, batch.Models, opts)
167+
168+
if err != nil {
169+
// 1. Is it a BulkWriteException (partial failure)?
170+
if wErr, ok := err.(mongo.BulkWriteException); ok {
171+
// Check if any errors inside are non-transient (e.g. DuplicateKey if we weren't doing upserts, or Schema validation)
172+
// But we mostly care about network here. If the error itself is a connectivity one, shouldRetry will catch it.
173+
// For structural errors (e.g. type mismatch), we SHOULD NOT retry.
174+
logging.PrintError(fmt.Sprintf("[%s] BulkWrite Partial Error: %d write errors", ns, len(wErr.WriteErrors)), 0)
175+
lastErr = err
176+
// If it's a structural error, break immediately (don't retry infinite loops on bad data)
177+
if !shouldRetry(err) {
178+
break
179+
}
180+
}
181+
182+
// 2. Check if the error is Transient/Network related
183+
if shouldRetry(err) {
184+
logging.PrintWarning(fmt.Sprintf("[%s] CDC BulkWrite failed (attempt %d/%d): %v. Retrying...", ns, i+1, maxRetries+1, err), 0)
185+
lastErr = err
186+
continue // Retry loop
187+
}
188+
189+
// 3. If not retryable, log and break
190+
logging.PrintError(fmt.Sprintf("[%s] CDC BulkWrite FATAL error: %v", ns, err), 0)
191+
lastErr = err
192+
break
142193
}
143-
continue
194+
195+
// Success!
196+
totalOps += result.InsertedCount + result.ModifiedCount + result.UpsertedCount + result.DeletedCount
197+
namespaces = append(namespaces, ns)
198+
success = true
199+
break // Exit retry loop
144200
}
145201

146-
totalOps += result.InsertedCount + result.ModifiedCount + result.UpsertedCount + result.DeletedCount
147-
namespaces = append(namespaces, ns)
202+
if !success {
203+
// If we exhausted retries or hit a fatal error, we stop processing this batch map.
204+
// This causes the worker to report the error upstream, likely pausing the pipeline.
205+
return totalOps, namespaces, batchMaxTS, lastErr
206+
}
148207

149208
// --- EVENT-DRIVEN VALIDATION ---
150209
// The write succeeded. Now we queue these IDs for validation.
@@ -153,32 +212,47 @@ func (m *CDCManager) handleBulkWrite(ctx context.Context, batchMap map[string]*B
153212
}
154213
}
155214

156-
return totalOps, namespaces, batchMaxTS, firstErr
215+
return totalOps, namespaces, batchMaxTS, nil
157216
}
158217

159218
func (m *CDCManager) startFlushWorkers() {
160219
workerCount := len(m.flushQueues)
161220
logging.PrintInfo(fmt.Sprintf("[CDC] Starting %d partition-aware write workers...", workerCount), 0)
162221

222+
// Use configurable timeout
223+
writeTimeout := time.Duration(config.Cfg.CDC.WriteTimeoutMS) * time.Millisecond
224+
163225
for i := 0; i < workerCount; i++ {
164226
m.workerWG.Add(1)
165227
go func(workerID int, queue <-chan map[string]*Batch) {
166228
defer m.workerWG.Done()
167229

168230
for batch := range queue {
169231
start := time.Now()
170-
writeCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
171-
// Capture max timestamp returned by handleBulkWrite
232+
233+
// Apply configurable timeout for the write operation
234+
writeCtx, cancel := context.WithTimeout(context.Background(), writeTimeout)
235+
172236
flushedCount, namespaces, batchMaxTS, err := m.handleBulkWrite(writeCtx, batch)
173237
cancel()
174238

175239
logging.LogCDCOp(start, flushedCount, namespaces, err)
176240

177241
if err != nil {
178-
logging.PrintError(fmt.Sprintf("[CDC Worker %d] CRITICAL: Batch flush failed: %v", workerID, err), 0)
242+
errMsg := fmt.Errorf("[CDC Worker %d] FATAL: Batch flush failed after retries: %w", workerID, err)
243+
logging.PrintError(errMsg.Error(), 0)
244+
245+
// 1. Report Fatal Error
246+
select {
247+
case m.fatalErrorChan <- errMsg:
248+
default:
249+
// Channel full, another worker probably already reported an error
250+
}
251+
252+
// 2. Stop this worker immediately
253+
return
179254
} else {
180255
m.totalEventsApplied.Add(flushedCount)
181-
// Update status with the applied timestamp
182256
m.statusManager.UpdateAppliedStats(batchMaxTS)
183257
}
184258
}
@@ -189,7 +263,7 @@ func (m *CDCManager) startFlushWorkers() {
189263
func (m *CDCManager) Start(ctx context.Context) error {
190264
logging.PrintInfo(fmt.Sprintf("Starting cluster-wide CDC... Resuming from checkpoint: %v", m.startAt), 0)
191265

192-
// FIX: Reconcile stats on startup ---
266+
// Reconcile stats on startup ---
193267
go func() {
194268
logging.PrintInfo("[CDC] Reconciling validation statistics...", 0)
195269
if err := m.validatorMgr.ReconcileStats(ctx); err != nil {
@@ -199,17 +273,41 @@ func (m *CDCManager) Start(ctx context.Context) error {
199273
}
200274
}()
201275

276+
// 1. Wrap context to allow cancellation on fatal error
277+
ctx, cancel := context.WithCancel(ctx)
278+
defer cancel()
279+
280+
// 2. Start monitor for fatal errors
281+
go func() {
282+
select {
283+
case err := <-m.fatalErrorChan:
284+
logging.PrintError(fmt.Sprintf("FATAL CDC ERROR: %v. Initiating shutdown.", err), 0)
285+
m.statusManager.SetError(err.Error())
286+
cancel() // Cancel the main context
287+
case <-ctx.Done():
288+
// Normal shutdown
289+
}
290+
}()
291+
202292
m.startFlushWorkers()
203293
m.shutdownWG.Add(1)
204294
go m.processChanges(ctx)
205295

206296
err := m.watchChanges(ctx)
207297

298+
// Check if exit was due to fatal error
299+
select {
300+
case fatalErr := <-m.fatalErrorChan:
301+
err = fatalErr
302+
default:
303+
}
304+
208305
logging.PrintInfo("[CDC] Watcher stopped. Waiting for processor to finalize...", 0)
209306
m.shutdownWG.Wait()
210307
m.workerWG.Wait()
211308

212-
if (m.lastSuccessfulTS != bson.Timestamp{}) {
309+
// Only save checkpoint if we exited cleanly (no fatal errors)
310+
if err == nil && (m.lastSuccessfulTS != bson.Timestamp{}) {
213311
initialT0, _ := m.checkpoint.GetResumeTimestamp(context.Background(), m.checkpointDocID)
214312
if initialT0.T == 0 || m.lastSuccessfulTS.T > initialT0.T || (m.lastSuccessfulTS.T == initialT0.T && m.lastSuccessfulTS.I > initialT0.I) {
215313
saveCtx := context.Background()
@@ -218,7 +316,10 @@ func (m *CDCManager) Start(ctx context.Context) error {
218316
}
219317
m.statusManager.UpdateCDCStats(m.totalEventsApplied.Load(), m.lastSuccessfulTS)
220318
m.statusManager.Persist(context.Background())
319+
} else if err != nil {
320+
logging.PrintWarning("Skipping final checkpoint save due to error shutdown.", 0)
221321
}
322+
222323
return err
223324
}
224325

0 commit comments

Comments
 (0)