Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
cdb961b
feat: add total and synced record count management for streams
Itz-Agasta Oct 2, 2025
776fade
enhance backfill process to support resumed sync with record count re…
Itz-Agasta Oct 2, 2025
6398bf9
feat: persist total record count to state for resume capability in ba…
Itz-Agasta Oct 2, 2025
b79f70e
Merge branch 'staging' into improve
Itz-Agasta Oct 4, 2025
3b50a5d
Merge branch 'staging' into improve
vaibhav-datazip Oct 7, 2025
335571a
Merge branch 'staging' into improve
vaibhav-datazip Oct 14, 2025
bf1ac3a
Merge branch 'staging' into improve
vaibhav-datazip Oct 15, 2025
7961d2b
Merge branch 'staging' into improve
Itz-Agasta Oct 19, 2025
3332a36
Merge branch 'datazip-inc:master' into improve
Itz-Agasta Oct 19, 2025
f3e044a
fix: correct year limit handling in ReformatDate function
Itz-Agasta Oct 2, 2025
948b1fe
feat: track and report committed record count in writer and backfill …
Itz-Agasta Oct 19, 2025
c9f54db
update synced record count based on committed records in change strea…
Itz-Agasta Oct 19, 2025
0315a45
Merge branch 'staging' into improve
shubham19may Oct 20, 2025
628f7b3
Revert "update synced record count based on committed records in chan…
Itz-Agasta Oct 22, 2025
c7952c2
remove the unrelated file from this pr
Itz-Agasta Oct 22, 2025
ff82c4a
fix the log
Itz-Agasta Oct 22, 2025
e7aa390
Fix: Implement reviews
Itz-Agasta Oct 22, 2025
ebdb1ae
Merge upstream/staging: resolve conflicts in destination/writers.go a…
Itz-Agasta Oct 27, 2025
a27fdfd
fix fmts
Itz-Agasta Oct 27, 2025
e7a2dbc
refactor: Improve the backfill process
Itz-Agasta Oct 27, 2025
724d045
refactor: improved backfill logging and remove unused variable
Itz-Agasta Oct 27, 2025
3c225f4
refactor: remove unnecessary blank lines
Itz-Agasta Oct 28, 2025
d89c722
Add new methods for remaining record count with dual type handling
Itz-Agasta Oct 29, 2025
e74771b
refactor: update record count handling in backfill process
Itz-Agasta Oct 29, 2025
bb814a6
Add pre-load logic that loads remaining count
Itz-Agasta Oct 29, 2025
521caa2
Merge branch 'staging' into improve
vaibhav-datazip Oct 30, 2025
99699b5
Merge branch 'staging' into improve
vaibhav-datazip Oct 31, 2025
68c35fa
refactor: enhance remaining record count handling to support both int…
Itz-Agasta Nov 1, 2025
ba93364
refactor: rename remaining record count key
Itz-Agasta Nov 1, 2025
87751c1
Merge branch 'staging' into improve
vaibhav-datazip Nov 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions destination/writers.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type (
batchSize int64
streamArtifact *writerSchema
group *utils.CxGroup
committedCount int64
}
)

Expand Down Expand Up @@ -249,6 +250,8 @@ func (wt *WriterThread) flush(ctx context.Context, buf []types.RawRecord) (err e
}

logger.Infof("Thread[%s]: successfully wrote %d records", wt.threadID, len(buf))
wt.committedCount += int64(len(buf)) // Track successfully committed records

return nil
}

Expand All @@ -274,6 +277,10 @@ func (wt *WriterThread) Close(ctx context.Context) error {
}
}

func (wt *WriterThread) GetCommittedCount() int64 {
return wt.committedCount
}

func ClearDestination(ctx context.Context, config *types.WriterConfig, dropStreams []types.StreamInterface) error {
newfunc, found := RegisteredWriters[config.Type]
if !found {
Expand Down
15 changes: 14 additions & 1 deletion drivers/abstract/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ func (a *AbstractDriver) Backfill(ctx context.Context, backfilledStreams chan st
}
// set state chunks
a.state.SetChunks(stream.Self(), chunksSet)

// Persist remaining record count from pool stats to state for resume capability
// Note: The initial count has already been added to pool stats before logger started
if pool.GetStats().TotalRecordsToSync.Load() > 0 && a.state != nil {
a.state.SetRemainingRecordCount(stream.Self(), pool.GetStats().TotalRecordsToSync.Load())
}
}
chunks := chunksSet.Array()
if len(chunks) == 0 {
Expand All @@ -37,7 +43,8 @@ func (a *AbstractDriver) Backfill(ctx context.Context, backfilledStreams chan st
sort.Slice(chunks, func(i, j int) bool {
return typeutils.Compare(chunks[i].Min, chunks[j].Min) < 0
})
logger.Infof("Starting backfill for stream[%s] with %d chunks", stream.GetStream().Name, len(chunks))
logger.Infof("Processing backfill for stream[%s] with %d chunks", stream.GetStream().Name, len(chunks))

// TODO: create writer instance again on retry
chunkProcessor := func(ctx context.Context, chunk types.Chunk) (err error) {
threadID := fmt.Sprintf("%s_%s", stream.ID(), utils.ULID())
Expand All @@ -60,6 +67,12 @@ func (a *AbstractDriver) Backfill(ctx context.Context, backfilledStreams chan st
if err == nil {
logger.Infof("finished chunk min[%v] and max[%v] of stream %s", chunk.Min, chunk.Max, stream.ID())
chunksLeft := a.state.RemoveChunk(stream.Self(), chunk)

// Decrement remaining record count by committed records
committedCount := inserter.GetCommittedCount()
a.state.DecrementRemainingRecordCount(stream.Self(), committedCount)
logger.Infof("Stream %s: chunk completed with %d committed records", stream.ID(), committedCount)

if chunksLeft == 0 && backfilledStreams != nil {
backfilledStreams <- stream.ID()
}
Expand Down
26 changes: 23 additions & 3 deletions protocol/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,34 @@ var syncCmd = &cobra.Command{
return err
}

// Setup state early to enable pre-loading remaining records before stats logger starts
connector.SetupState(state)

Copy link
Author

@Itz-Agasta Itz-Agasta Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After implementing the prv changes, I tested it and... still got "Not Determined" :(

So I started debugging. I added log statements everywhere to trace what was happening. That's when I discovered the timing issue in sync.go (probably):

The stats logger was starting BEFORE the remaining records were loaded from state

Ig-

  1. Logger starts -> checks pool stats -> finds TotalRecordsToSync = 0 -> shows "Not Determined"
  2. Then backfill loads state and adds records to pool stats
  3. But logger already decided there's no data

Thats why I added pre-load logic in sync.go that runs BEFORE starting the logger. It iterates through all streams (FullLoad, CDC, Incremental), loads their remaining counts from state, and adds them to pool stats.

// Pre-load remaining record counts from state to pool stats for accurate progress tracking
for _, stream := range selectedStreamsMetadata.FullLoadStreams {
if remainingRecords := state.GetRemainingRecordCount(stream.Self()); remainingRecords > 0 {
pool.AddRecordsToSyncStats(remainingRecords)
logger.Infof("Pre-loaded remaining records for stream %s: %d", stream.ID(), remainingRecords)
}
}
for _, stream := range selectedStreamsMetadata.CDCStreams {
if remainingRecords := state.GetRemainingRecordCount(stream.Self()); remainingRecords > 0 {
pool.AddRecordsToSyncStats(remainingRecords)
logger.Infof("Pre-loaded remaining records for stream %s: %d", stream.ID(), remainingRecords)
}
}
for _, stream := range selectedStreamsMetadata.IncrementalStreams {
if remainingRecords := state.GetRemainingRecordCount(stream.Self()); remainingRecords > 0 {
pool.AddRecordsToSyncStats(remainingRecords)
logger.Infof("Pre-loaded remaining records for stream %s: %d", stream.ID(), remainingRecords)
}
}

// start monitoring stats
logger.StatsLogger(cmd.Context(), func() (int64, int64, int64) {
stats := pool.GetStats()
return stats.ThreadCount.Load(), stats.TotalRecordsToSync.Load(), stats.ReadCount.Load()
})

// Setup State for Connector
connector.SetupState(state)
// Sync Telemetry tracking
telemetry.TrackSyncStarted(syncID, streams, selectedStreamsMetadata.SelectedStreams, selectedStreamsMetadata.CDCStreams, connector.Type(), destinationConfig, catalog)
defer func() {
Expand Down
77 changes: 77 additions & 0 deletions types/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ const (
MixedType StateType = "MIXED"
// constant key for chunks
ChunksKey = "chunks"
// number of remaining records to be synced in resumable sync
RemainingRecordsCount = "remaining_records_count"
)

type GlobalState struct {
Expand Down Expand Up @@ -254,6 +256,81 @@ func (s *State) RemoveChunk(stream *ConfiguredStream, chunk Chunk) int {
return -1
}

// GetRemainingRecordCount retrieves the remaining record count for a stream from state
func (s *State) GetRemainingRecordCount(stream *ConfiguredStream) int64 {
s.RLock()
defer s.RUnlock()

index, contains := utils.ArrayContains(s.Streams, func(elem *StreamState) bool {
return elem.Namespace == stream.Namespace() && elem.Stream == stream.Name()
})
if contains {
if count, loaded := s.Streams[index].State.Load(RemainingRecordsCount); loaded {
if countInt64, ok := count.(int64); ok {
return countInt64
}
if countFloat64, ok := count.(float64); ok {
return int64(countFloat64)
}
Copy link
Author

@Itz-Agasta Itz-Agasta Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one took me a while to figure out. I added more logging and noticed something weird - even though the state file had "remaining_record_count": 377888, when I read it back, it was coming out as 0!

I traced through the code and found the issue in GetRemainingRecordCount():

if count, loaded := s.Streams[index].State.Load(RemainingRecordCountKey); loaded {
    if countInt64, ok := count.(int64); ok {
        return countInt64  // This was failing!
    }
}
return 0

Ig Go's JSON unmarshaling converts ALL numbers to float64 by default, not their original types....So when we saved remaining_record_count as int64, it came back from JSON as float64. The type assertion count.(int64) was failing silently, and we were returning 0.

}
}
return 0
}

// SetRemainingRecordCount stores the remaining record count for a stream in state
func (s *State) SetRemainingRecordCount(stream *ConfiguredStream, count int64) {
s.Lock()
defer s.Unlock()

index, contains := utils.ArrayContains(s.Streams, func(elem *StreamState) bool {
return elem.Namespace == stream.Namespace() && elem.Stream == stream.Name()
})
if contains {
s.Streams[index].State.Store(RemainingRecordsCount, count)
s.Streams[index].HoldsValue.Store(true)
} else {
newStream := s.initStreamState(stream)
newStream.State.Store(RemainingRecordsCount, count)
newStream.HoldsValue.Store(true)
s.Streams = append(s.Streams, newStream)
}
s.LogState()
}

// DecrementRemainingRecordCount decrements the remaining record count for a stream in state
func (s *State) DecrementRemainingRecordCount(stream *ConfiguredStream, count int64) {
s.Lock()
defer s.Unlock()

index, contains := utils.ArrayContains(s.Streams, func(elem *StreamState) bool {
return elem.Namespace == stream.Namespace() && elem.Stream == stream.Name()
})
if contains {
if remaining, loaded := s.Streams[index].State.Load(RemainingRecordsCount); loaded {
var currentRemaining int64

// Handle both int64 and float64 (when loaded from JSON)
if remainingInt64, ok := remaining.(int64); ok {
currentRemaining = remainingInt64
} else if remainingFloat64, ok := remaining.(float64); ok {
currentRemaining = int64(remainingFloat64)
} else {
// If neither type matches, skip decrement
s.LogState()
return
}

newRemaining := currentRemaining - count
if newRemaining < 0 {
newRemaining = 0
}
s.Streams[index].State.Store(RemainingRecordsCount, newRemaining)
s.Streams[index].HoldsValue.Store(true)
}
}
s.LogState()
}

func (s *State) MarshalJSON() ([]byte, error) {
type Alias State
p := Alias(*s)
Expand Down