Skip to content

Conversation

@Itz-Agasta
Copy link

@Itz-Agasta Itz-Agasta commented Oct 2, 2025

Description

This PR implements stat persistence in the state object, enabling accurate progress tracking and time estimation when resuming syncs. Previously, when a sync was interrupted and resumed, all progress metrics (total records, synced count, estimated time) were lost, leading to poor user experience and observability.

Closes #110

Type of change

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

Changes

1. Added New State Tracking (types/state.go)

I added 4 functions that let it save and load these statistics:

  • SetTotalRecordCount() - Saves the total number of records
  • GetTotalRecordCount() - Loads the total number of records
  • SetSyncedRecordCount() - Saves how many records are done
  • GetSyncedRecordCount() - Loads how many records are done

2. Updated Database Drivers

update Mongo, MySQL, and PostgreSQL so it:

  • Calculates the total records at the start
  • Saves this number to the state file
  • Updates the "synced so far" number after each batch completes

3. Updated Resume Logic

Now when soemone resume a sync, the system:

  • Checks if it's a resumed sync (by looking for existing chunks in state)
  • Loads the saved statistics
  • Restores them so progress tracking works correctly
  • Continues from where it left off with accurate numbers

How Has This Been Tested?

# Build the entire project
go build ./...
# Success

@CLAassistant
Copy link

CLAassistant commented Oct 2, 2025

CLA assistant check
All committers have signed the CLA.

@Itz-Agasta
Copy link
Author

Hi @vaibhav-datazip , hope y are having a great day....

so I'm a bit unsure how to test the e2e resume behaviour locally (I can run the types unit tests, but I don't yet have a clear integration test procedure that simulates a driver + destination pause/resume). Could you advise something aout it

@Itz-Agasta
Copy link
Author

Also... I didn't implement the change for the Oracle driver yet....cuz it had a TODO for counting rows/find chunks and i wanted to confirm expected approach before touching Oracle-specific SQL.

@vaibhav-datazip
Copy link
Collaborator

Hi @Itz-Agasta ,
you can run the e2e testing by using any of your db, icebergs local-test or parquet destination and setting up debugging environment, once the sync has started you can see that stats are getting updated , now from debugger you can stop this sync and run the sync again but this time with state, as the chunks which haven't been synced are still saved there.

Also, for this you need to have good amount of rows in your table so that sync takes few minutes to get finished.

And just wanted to remind, please raise your pr keeping staging branch as base.

@Itz-Agasta Itz-Agasta changed the base branch from master to staging October 4, 2025 17:47
@Itz-Agasta
Copy link
Author

Itz-Agasta commented Oct 4, 2025

And just wanted to remind, please raise your pr keeping staging branch as base.

Sorry, my bad... I’ve updated it. (i should have rebase it tough 😢)

you can run the e2e testing by using any of your db, icebergs local-test or parquet destination and setting up debugging environment, once the sync has started you can see that stats are getting updated , now from debugger you can stop this sync and run the sync again but this time with state, as the chunks which haven't been synced are still saved there.
Also, for this you need to have good amount of rows in your table so that sync takes few minutes to get finished.

cool i will let you know after testing it.

And what about the oracle driver....do you think I’m heading in the right direction?

@vaibhav-datazip
Copy link
Collaborator

No problem, @Itz-Agasta
Do let me know once your changes are done, I’ll start the review then.
As for Oracle stats are not being saved, that issue is already being worked on by someone else, so you can add your fix for the other drivers for now.

@vaibhav-datazip
Copy link
Collaborator

vaibhav-datazip commented Oct 6, 2025

Hi @Itz-Agasta ,
Just checking in, when can I expect this PR to be ready for review from your side ?

@Itz-Agasta
Copy link
Author

Hi @Itz-Agasta , Just checking in, when can I expect this PR to be ready for review from your side ?

By tomorrow, i have implemented it and just have to perform a e2e test and check for any unexpected bhv.

@vaibhav-datazip vaibhav-datazip added the hacktoberfest Issues open for Hacktoberfest contributors label Oct 7, 2025
@Itz-Agasta
Copy link
Author

Itz-Agasta commented Oct 7, 2025

Hi @Itz-Agasta , you can run the e2e testing by using any of your db, icebergs local-test or parquet destination and setting up debugging environment, once the sync has started you can see that stats are getting updated , now from debugger you can stop this sync and run the sync again but this time with state, as the chunks which haven't been synced are still saved there.

2025-10-07.19-31-55.mp4

Hi @vaibhav-datazip, i have tried the e2e test as y told me. I followed the docs https://olake.io/docs/community/setting-up-a-dev-env and, you can see:

  • The sync starts normally and updates stats as expected.

  • I paused the sync using the debugger after ~8k records.

  • On resuming with the saved state, the sync continued from ~8k and updated stats correctly

Can y pls check it out now? Thanks

@vaibhav-datazip
Copy link
Collaborator

Hi @Itz-Agasta , maybe my explanation was not that clear.

  • By stopping the sync I mean to stop it completely not pausing it (that leftmost red square).
  • By doing this we will be replicating a scenario where the sync stops abruptly and the user will run it again with state file which gets generated.
  • Also, you can replace the file mentioned under click to view .vscode/launch.json on this page. It has all the drivers and mode preset so you can directly select and run those modes, if your files are present.

@Itz-Agasta
Copy link
Author

  • By stopping the sync I mean to stop it completely not pausing it (that leftmost red square).

Okkkk I seee ... I get it now i will test it by tommorow and let y know

@Itz-Agasta
Copy link
Author

@vaibhav-datazip 1 more question ...if i stoped the debugger. How am I suppose to restart the sync using the state file... Any cmds that I can run with the '-- state' flag ?

I mean

  1. I start the debugger
  2. I stopped the debugger completely
  3. Lets say the state file has synced upto ~8k rows
  4. Now using that state file how I am I supposed to continue the sync ? As at this point if I start the debugger again it will start from 0 probably.
    Any cmds that I can use passing the state file as a flag or argument?

@vaibhav-datazip
Copy link
Collaborator

vaibhav-datazip commented Oct 7, 2025

@Itz-Agasta
The state file is generated when the chunks are created or the bookmark is being noted (in case of cdc or incremental sync).

  • This process takes place before the chunk iteration is done (the process where records are actually written in the destination).
  • So your state files has chunks data, put a break point in chunk iterator function and your program will stop, go to your state file and check there.
    you will be able to see chunks min and max values,
  • Once you stop the sync you will be able to start the sync again with leftover chunks, and --state flag can be used for this.

to further help yourself out , you can use the debugger file mentioned earlier or you can go through this doc as well , which mentions all the flags and commands which can be used in OLake.

If you still have some doubt, please feel free to ask.

@Itz-Agasta
Copy link
Author

Before

2025-10-08.20-04-39.mp4

Now

2025-10-08.20-08-59.mp4

@vaibhav-datazip

@Itz-Agasta
Copy link
Author

Itz-Agasta commented Oct 8, 2025

Before

2025-10-08.20-04-39.mp4

Now

2025-10-08.20-08-59.mp4

@vaibhav-datazip

Hi @vaibhav-datazip , you can check this out now.

One question though.... in both the previous and current cases, when I start the sync with --state, the synced records in the stats.json (right side) resets to 0. Shouldn’t it continue from where it left off? For eg, if the last sync reached 8k, shouldn’t it resume from 8k instead of restarting? Or am I missing something?

@vaibhav-datazip
Copy link
Collaborator

records are written in parquet files in chunks. so if possible, for testing , you can decrease the chunk size to 1000 and try it out .

currently the batch size is set such that size of parquet file generated becomes 256 mb , you can find this value in constants.go, if you are unable to set batch size to 1k, try decreasing this size to 1mb and you will see some changes in stats as well

@Itz-Agasta
Copy link
Author

currently the batch size is set such that size of parquet file generated becomes 256 mb , you can find this value in constants.go, if you are unable to set batch size to 1k, try decreasing this size to 1mb and you will see some changes in stats as well

Yep its resuming form where it left now

2025-10-09.12-53-53.mp4

But why with --state, its exceding the total record limit :(

@vaibhav-datazip
Copy link
Collaborator

So, the stats of total records synced in stats file is representing records in batch of 10k, These are yet to be committed. once they will be committed the user can see them in their destination.

But if before this whole chunk (the parent unit of batch) fails to sync, whole chunk will be synced again . That's the reason why you are seeing records exceeding total number of records.

An easy way to solve this is, don't update the state file based on this total records synced from stats, instead update that only when one whole chunk gets committed. In that way we will have correct numbers when the sync is resumed again

@vaibhav-datazip
Copy link
Collaborator

@Itz-Agasta can you check my previous comment and do the necessary changes

@Itz-Agasta
Copy link
Author

@Itz-Agasta there are some merge conflicts , can you resolve them ?

Yep.. i have resolved them,,, you can review it now :)

@Itz-Agasta
Copy link
Author

@vaibhav-datazip i have refactor the pr as y asked.... i think its good to go now .. pls check it.

@Itz-Agasta
Copy link
Author

@vaibhav-datazip done.. y can check it now

@vaibhav-datazip
Copy link
Collaborator

@Itz-Agasta I tried testing this using postgres , ran the sync by changing chunk size and stopped it. When trying to re run sync with state I am unable to see estimated remaining time. I have attached video for your reference.

Screen.Recording.2025-10-28.at.4.06.10.PM.mov

Also, instead of using 2 variables total records and synced records, you can just remaining records variable. Because that is used to check remaining time.

@Itz-Agasta
Copy link
Author

Also, instead of using 2 variables total records and synced records, you can just remaining records variable. Because that is used to check remaining time.

ok you mean,,,,,instead of storing total_record_count and synced_record_count, just we will store remaining_record_count and update it as we go. mean-

  • Fresh sync: remaining = total
  • After chunk completes: remaining -= chunk_records
  • Resume: just load remaining and add to pool
    right?

@Itz-Agasta
Copy link
Author

Itz-Agasta commented Oct 29, 2025

Hi @vaibhav-datazip , I implemented your suggestion....So, Instead of tracking total_record_count and synced_record_count separately, I switched to just tracking remaining_record_count.

I added three methods:

  • SetRemainingRecordCount() - saves the count when starting a fresh sync
  • GetRemainingRecordCount() - loads it when resuming
  • DecrementRemainingRecordCount() - reduces it after each chunk completes

Here is the final result-

2025-10-30.01-41-03.mp4

You can test it now ...

}

connector.SetupState(state)

Copy link
Author

@Itz-Agasta Itz-Agasta Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After implementing the prv changes, I tested it and... still got "Not Determined" :(

So I started debugging. I added log statements everywhere to trace what was happening. That's when I discovered the timing issue in sync.go (probably):

The stats logger was starting BEFORE the remaining records were loaded from state

Ig-

  1. Logger starts -> checks pool stats -> finds TotalRecordsToSync = 0 -> shows "Not Determined"
  2. Then backfill loads state and adds records to pool stats
  3. But logger already decided there's no data

Thats why I added pre-load logic in sync.go that runs BEFORE starting the logger. It iterates through all streams (FullLoad, CDC, Incremental), loads their remaining counts from state, and adds them to pool stats.

}
if countFloat64, ok := count.(float64); ok {
return int64(countFloat64)
}
Copy link
Author

@Itz-Agasta Itz-Agasta Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one took me a while to figure out. I added more logging and noticed something weird - even though the state file had "remaining_record_count": 377888, when I read it back, it was coming out as 0!

I traced through the code and found the issue in GetRemainingRecordCount():

if count, loaded := s.Streams[index].State.Load(RemainingRecordCountKey); loaded {
    if countInt64, ok := count.(int64); ok {
        return countInt64  // This was failing!
    }
}
return 0

Ig Go's JSON unmarshaling converts ALL numbers to float64 by default, not their original types....So when we saved remaining_record_count as int64, it came back from JSON as float64. The type assertion count.(int64) was failing silently, and we were returning 0.

@vaibhav-datazip
Copy link
Collaborator

@Itz-Agasta , I tried testing it again. when I resumed the sync after stopping it, the remaining records count didn't reduce , as you can see in the video attached.
can you do proper testing from your side before next review request.

Screen.Recording.2025-10-30.at.4.11.31.PM.mov

@Itz-Agasta
Copy link
Author

Itz-Agasta commented Nov 1, 2025

@Itz-Agasta , I tried testing it again. when I resumed the sync after stopping it, the remaining records count didn't reduce , as you can see in the video attached. can you do proper testing from your side before next review request.
Screen.Recording.2025-10-30.at.4.11.31.PM.mov

Ahhhh. it was the same JSON type mismatch bug, but in DecrementRemainingRecordCount() same issue....When state loads from JSON, remaining_record_count becomes float64. The old code only checked for int64 only.... so the type assertion failed silently and never decremented the count.... i fixed it.. you can check now

@Itz-Agasta
Copy link
Author

@vaibhav-datazip please check this e2e test... i hope its perfect now.

2025-11-01.19-58-59.mp4

@vaibhav-datazip
Copy link
Collaborator

@Itz-Agasta , will be testing and reviewing soon

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

hacktoberfest Issues open for Hacktoberfest contributors

Projects

None yet

Development

Successfully merging this pull request may close these issues.

improvement: enabling stats on resumed sync

5 participants