Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
cdb961b
feat: add total and synced record count management for streams
Itz-Agasta Oct 2, 2025
776fade
enhance backfill process to support resumed sync with record count re…
Itz-Agasta Oct 2, 2025
6398bf9
feat: persist total record count to state for resume capability in ba…
Itz-Agasta Oct 2, 2025
b79f70e
Merge branch 'staging' into improve
Itz-Agasta Oct 4, 2025
3b50a5d
Merge branch 'staging' into improve
vaibhav-datazip Oct 7, 2025
335571a
Merge branch 'staging' into improve
vaibhav-datazip Oct 14, 2025
bf1ac3a
Merge branch 'staging' into improve
vaibhav-datazip Oct 15, 2025
7961d2b
Merge branch 'staging' into improve
Itz-Agasta Oct 19, 2025
3332a36
Merge branch 'datazip-inc:master' into improve
Itz-Agasta Oct 19, 2025
f3e044a
fix: correct year limit handling in ReformatDate function
Itz-Agasta Oct 2, 2025
948b1fe
feat: track and report committed record count in writer and backfill …
Itz-Agasta Oct 19, 2025
c9f54db
update synced record count based on committed records in change strea…
Itz-Agasta Oct 19, 2025
0315a45
Merge branch 'staging' into improve
shubham19may Oct 20, 2025
628f7b3
Revert "update synced record count based on committed records in chan…
Itz-Agasta Oct 22, 2025
c7952c2
remove the unrelated file from this pr
Itz-Agasta Oct 22, 2025
ff82c4a
fix the log
Itz-Agasta Oct 22, 2025
e7aa390
Fix: Implement reviews
Itz-Agasta Oct 22, 2025
ebdb1ae
Merge upstream/staging: resolve conflicts in destination/writers.go a…
Itz-Agasta Oct 27, 2025
a27fdfd
fix fmts
Itz-Agasta Oct 27, 2025
e7a2dbc
refactor: Improve the backfill process
Itz-Agasta Oct 27, 2025
724d045
refactor: improved backfill logging and remove unused variable
Itz-Agasta Oct 27, 2025
3c225f4
refactor: remove unnecessary blank lines
Itz-Agasta Oct 28, 2025
d89c722
Add new methods for remaining record count with dual type handling
Itz-Agasta Oct 29, 2025
e74771b
refactor: update record count handling in backfill process
Itz-Agasta Oct 29, 2025
bb814a6
Add pre-load logic that loads remaining count
Itz-Agasta Oct 29, 2025
521caa2
Merge branch 'staging' into improve
vaibhav-datazip Oct 30, 2025
99699b5
Merge branch 'staging' into improve
vaibhav-datazip Oct 31, 2025
68c35fa
refactor: enhance remaining record count handling to support both int…
Itz-Agasta Nov 1, 2025
ba93364
refactor: rename remaining record count key
Itz-Agasta Nov 1, 2025
87751c1
Merge branch 'staging' into improve
vaibhav-datazip Nov 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion destination/writers.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type (
batchSize int64
streamArtifact *writerSchema
group *utils.CxGroup
committedCount int64
}
)

Expand Down Expand Up @@ -254,7 +255,9 @@ func (wt *WriterThread) flush(ctx context.Context, buf []types.RawRecord) (err e
return fmt.Errorf("failed to write records: %s", err)
}

logger.Infof("Thread[%s]: successfully wrote %d records", wt.threadID, len(buf))
// Track successfully committed records
wt.committedCount += int64(len(buf))
logger.Infof("Thread[%s]: successfully wrote %d records (total committed: %d)", wt.threadID, len(buf), wt.committedCount)
return nil
}

Expand All @@ -279,3 +282,8 @@ func (wt *WriterThread) Close(ctx context.Context) error {
return wt.writer.Close(ctx)
}
}

// GetCommittedCount returns the total number of records successfully committed by this writer thread
func (wt *WriterThread) GetCommittedCount() int64 {
return wt.committedCount
}
26 changes: 25 additions & 1 deletion drivers/abstract/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,24 @@ import (
func (a *AbstractDriver) Backfill(ctx context.Context, backfilledStreams chan string, pool *destination.WriterPool, stream types.StreamInterface) error {
chunksSet := a.state.GetChunks(stream.Self())
var err error
isResumedSync := false
if chunksSet == nil || chunksSet.Len() == 0 {
chunksSet, err = a.driver.GetOrSplitChunks(ctx, pool, stream)
if err != nil {
return fmt.Errorf("failed to get or split chunks: %s", err)
}
// set state chunks
a.state.SetChunks(stream.Self(), chunksSet)
} else {
// This is a resumed sync - restore stats from state
isResumedSync = true
totalCount := a.state.GetTotalRecordCount(stream.Self())

if totalCount > 0 {
logger.Infof("Resuming sync for stream %s: total records = %d", stream.ID(), totalCount)
// Restore total count to pool stats for progress tracking
pool.AddRecordsToSyncStats(totalCount)
}
}
chunks := chunksSet.Array()
if len(chunks) == 0 {
Expand All @@ -37,7 +48,11 @@ func (a *AbstractDriver) Backfill(ctx context.Context, backfilledStreams chan st
sort.Slice(chunks, func(i, j int) bool {
return typeutils.Compare(chunks[i].Min, chunks[j].Min) < 0
})
logger.Infof("Starting backfill for stream[%s] with %d chunks", stream.GetStream().Name, len(chunks))
if isResumedSync {
logger.Infof("Resuming backfill for stream[%s] with %d remaining chunks", stream.GetStream().Name, len(chunks))
} else {
logger.Infof("Starting backfill for stream[%s] with %d chunks", stream.GetStream().Name, len(chunks))
}
// TODO: create writer instance again on retry
chunkProcessor := func(ctx context.Context, chunk types.Chunk) (err error) {
threadID := fmt.Sprintf("%s_%s", stream.ID(), utils.ULID())
Expand All @@ -60,6 +75,15 @@ func (a *AbstractDriver) Backfill(ctx context.Context, backfilledStreams chan st
if err == nil {
logger.Infof("finished chunk min[%v] and max[%v] of stream %s", chunk.Min, chunk.Max, stream.ID())
chunksLeft := a.state.RemoveChunk(stream.Self(), chunk)

// Update synced record count in state based on committed records only
// This represents records that have been successfully written to the destination
committedRecords := inserter.GetCommittedCount()
previousSyncedCount := a.state.GetSyncedRecordCount(stream.Self())
totalSyncedCount := previousSyncedCount + committedRecords
a.state.SetSyncedRecordCount(stream.Self(), totalSyncedCount)
logger.Infof("Stream %s: chunk completed with %d committed records (total synced: %d)", stream.ID(), committedRecords, totalSyncedCount)

if chunksLeft == 0 && backfilledStreams != nil {
backfilledStreams <- stream.ID()
}
Expand Down
24 changes: 24 additions & 0 deletions drivers/abstract/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,17 @@ func (a *AbstractDriver) RunChangeStream(ctx context.Context, pool *destination.
err = fmt.Errorf("post cdc error: %s, cdc insert thread error: %s", postCDCErr, err)
}

// Update synced record count based on committed records
if err == nil {
committedRecords := inserter.GetCommittedCount()
if committedRecords > 0 {
previousSyncedCount := a.state.GetSyncedRecordCount(streams[index].Self())
totalSyncedCount := previousSyncedCount + committedRecords
a.state.SetSyncedRecordCount(streams[index].Self(), totalSyncedCount)
logger.Infof("Stream %s cdc: committed %d records (total synced: %d)", streams[index].ID(), committedRecords, totalSyncedCount)
}
}

if err != nil {
err = fmt.Errorf("thread[%s]: %s", threadID, err)
}
Expand Down Expand Up @@ -135,6 +146,19 @@ func (a *AbstractDriver) RunChangeStream(ctx context.Context, pool *destination.
if postCDCErr != nil {
err = fmt.Errorf("post cdc error: %s, cdc insert thread error: %s", postCDCErr, err)
}

// Update synced record counts based on committed records
if err == nil {
for stream, insert := range inserters {
committedRecords := insert.GetCommittedCount()
if committedRecords > 0 {
previousSyncedCount := a.state.GetSyncedRecordCount(stream.Self())
totalSyncedCount := previousSyncedCount + committedRecords
a.state.SetSyncedRecordCount(stream.Self(), totalSyncedCount)
logger.Infof("Stream %s cdc: committed %d records (total synced: %d)", stream.ID(), committedRecords, totalSyncedCount)
}
}
}
}()
return RetryOnBackoff(a.driver.MaxRetries(), constants.DefaultRetryTimeout, func() error {
return a.driver.StreamChanges(ctx, nil, func(ctx context.Context, change CDCChange) error {
Expand Down
9 changes: 9 additions & 0 deletions drivers/abstract/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,15 @@ func (a *AbstractDriver) Incremental(ctx context.Context, pool *destination.Writ
if err == nil {
a.state.SetCursor(stream.Self(), primaryCursor, a.reformatCursorValue(maxPrimaryCursorValue))
a.state.SetCursor(stream.Self(), secondaryCursor, a.reformatCursorValue(maxSecondaryCursorValue))

// Update synced record count based on committed records
committedRecords := inserter.GetCommittedCount()
if committedRecords > 0 {
previousSyncedCount := a.state.GetSyncedRecordCount(stream.Self())
totalSyncedCount := previousSyncedCount + committedRecords
a.state.SetSyncedRecordCount(stream.Self(), totalSyncedCount)
logger.Infof("Stream %s incremental: committed %d records (total synced: %d)", stream.ID(), committedRecords, totalSyncedCount)
}
} else {
err = fmt.Errorf("thread[%s]: %s", threadID, err)
}
Expand Down
5 changes: 5 additions & 0 deletions drivers/mongodb/internal/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ func (m *Mongo) GetOrSplitChunks(ctx context.Context, pool *destination.WriterPo

logger.Infof("Total expected count for stream %s: %d", stream.ID(), recordCount)
pool.AddRecordsToSyncStats(recordCount)

// Persist total record count to state for resume capability
if m.state != nil {
m.state.SetTotalRecordCount(stream.Self(), recordCount)
}

// Generate and update chunks
var retryErr error
Expand Down
6 changes: 6 additions & 0 deletions drivers/mysql/internal/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ func (m *MySQL) GetOrSplitChunks(ctx context.Context, pool *destination.WriterPo
}

pool.AddRecordsToSyncStats(approxRowCount)

// Persist total record count to state for resume capability
if m.state != nil {
m.state.SetTotalRecordCount(stream.Self(), approxRowCount)
}

// avgRowSize is returned as []uint8 which is converted to float64
avgRowSizeFloat, err := typeutils.ReformatFloat64(avgRowSize)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions drivers/postgres/internal/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ func (p *Postgres) GetOrSplitChunks(_ context.Context, pool *destination.WriterP
return nil, fmt.Errorf("failed to get approx row count: %s", err)
}
pool.AddRecordsToSyncStats(approxRowCount)

// Persist total record count to state for resume capability
if p.state != nil {
p.state.SetTotalRecordCount(stream.Self(), approxRowCount)
}

return p.splitTableIntoChunks(stream)
}

Expand Down
87 changes: 87 additions & 0 deletions types/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ const (
MixedType StateType = "MIXED"
// constant key for chunks
ChunksKey = "chunks"
// constant keys for stats
TotalRecordCountKey = "total_record_count"
SyncedRecordCountKey = "synced_record_count"
)

type GlobalState struct {
Expand Down Expand Up @@ -254,6 +257,90 @@ func (s *State) RemoveChunk(stream *ConfiguredStream, chunk Chunk) int {
return -1
}

// GetTotalRecordCount retrieves the total record count for a stream from state
func (s *State) GetTotalRecordCount(stream *ConfiguredStream) int64 {
s.RLock()
defer s.RUnlock()

index, contains := utils.ArrayContains(s.Streams, func(elem *StreamState) bool {
return elem.Namespace == stream.Namespace() && elem.Stream == stream.Name()
})
if contains {
if count, loaded := s.Streams[index].State.Load(TotalRecordCountKey); loaded {
if countInt64, ok := count.(int64); ok {
return countInt64
}
// Handle case where count might be stored as float64 (from JSON unmarshalling)
if countFloat64, ok := count.(float64); ok {
return int64(countFloat64)
}
Copy link
Author

@Itz-Agasta Itz-Agasta Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one took me a while to figure out. I added more logging and noticed something weird - even though the state file had "remaining_record_count": 377888, when I read it back, it was coming out as 0!

I traced through the code and found the issue in GetRemainingRecordCount():

if count, loaded := s.Streams[index].State.Load(RemainingRecordCountKey); loaded {
    if countInt64, ok := count.(int64); ok {
        return countInt64  // This was failing!
    }
}
return 0

Ig Go's JSON unmarshaling converts ALL numbers to float64 by default, not their original types....So when we saved remaining_record_count as int64, it came back from JSON as float64. The type assertion count.(int64) was failing silently, and we were returning 0.

}
}
return 0
}

// SetTotalRecordCount stores the total record count for a stream in state
func (s *State) SetTotalRecordCount(stream *ConfiguredStream, count int64) {
s.Lock()
defer s.Unlock()

index, contains := utils.ArrayContains(s.Streams, func(elem *StreamState) bool {
return elem.Namespace == stream.Namespace() && elem.Stream == stream.Name()
})
if contains {
s.Streams[index].State.Store(TotalRecordCountKey, count)
s.Streams[index].HoldsValue.Store(true)
} else {
newStream := s.initStreamState(stream)
newStream.State.Store(TotalRecordCountKey, count)
newStream.HoldsValue.Store(true)
s.Streams = append(s.Streams, newStream)
}
s.LogState()
}

// GetSyncedRecordCount retrieves the synced record count for a stream from state
func (s *State) GetSyncedRecordCount(stream *ConfiguredStream) int64 {
s.RLock()
defer s.RUnlock()

index, contains := utils.ArrayContains(s.Streams, func(elem *StreamState) bool {
return elem.Namespace == stream.Namespace() && elem.Stream == stream.Name()
})
if contains {
if count, loaded := s.Streams[index].State.Load(SyncedRecordCountKey); loaded {
if countInt64, ok := count.(int64); ok {
return countInt64
}
// Handle case where count might be stored as float64 (from JSON unmarshalling)
if countFloat64, ok := count.(float64); ok {
return int64(countFloat64)
}
}
}
return 0
}

// SetSyncedRecordCount stores the synced record count for a stream in state
func (s *State) SetSyncedRecordCount(stream *ConfiguredStream, count int64) {
s.Lock()
defer s.Unlock()

index, contains := utils.ArrayContains(s.Streams, func(elem *StreamState) bool {
return elem.Namespace == stream.Namespace() && elem.Stream == stream.Name()
})
if contains {
s.Streams[index].State.Store(SyncedRecordCountKey, count)
s.Streams[index].HoldsValue.Store(true)
} else {
newStream := s.initStreamState(stream)
newStream.State.Store(SyncedRecordCountKey, count)
newStream.HoldsValue.Store(true)
s.Streams = append(s.Streams, newStream)
}
s.LogState()
}

func (s *State) MarshalJSON() ([]byte, error) {
type Alias State
p := Alias(*s)
Expand Down
6 changes: 3 additions & 3 deletions utils/typeutils/reformat.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@ func ReformatDate(v interface{}) (time.Time, error) {
}

// manage year limit
// even after data being parsed if year doesn't lie in range [0,9999] it failed to get marshaled
if parsed.Year() < 0 {
parsed = parsed.AddDate(0-parsed.Year(), 0, 0)
// even after data being parsed if year doesn't lie in range [1,9999] it failed to get marshaled
if parsed.Year() <= 0 {
parsed = parsed.AddDate(1-parsed.Year(), 0, 0)
} else if parsed.Year() > 9999 {
parsed = parsed.AddDate(-(parsed.Year() - 9999), 0, 0)
}
Expand Down
Loading