feat: implement 2PC support in parquet writer#959
Conversation
|
Updated the PR with the Current Parquet 2PC shape:
Summary of the refactor:
Also reran the focused/unit/build/lint checks and the Postgres-backed manual demo for full refresh, incremental, and CDC recovery. |
|
@nayanj98 looping you in as @vaibhav-datazip suggested. This PR is ready for review now and the description is updated with the final flow and test coverage. |
|
@siiddhantt Assigning @vaibhav-datazip to review your PR. |
|
@vaibhav-datazip I found the CI failure. It looks test-harness related rather than a 2PC flow failure. The integration cleanup helper deletes only final I’ll push a small fix to clear |
| } | ||
|
|
||
| func (p *Parquet) closePqFiles(ctx context.Context, _ any, closeOnError bool) error { | ||
| func (p *Parquet) dataFiles() []parquetDataFile { |
There was a problem hiding this comment.
can you rename this function to a more appropriate name, also add a comment for this function
There was a problem hiding this comment.
Done, renamed this to stagedDataFiles() and added a short comment. The helper returns the current thread’s staged parquet file identities used during close/upload/promotion, so the old dataFiles() name was too generic.
| if len(remainingStreams) > 0 { | ||
| // Destination 2PC metadata can be ahead of the external source state when | ||
| // the destination commit succeeded but saving the source state failed. If all | ||
| // streams are already committed and the slot is exactly at that committed LSN, | ||
| // there is no WAL left to replay; PostCDC will persist the recovered state. | ||
| skipValidation := recoveryLSN != nil && len(remainingStreams) == 0 && *recoveryLSN == slot.LSN | ||
| if !skipValidation { | ||
| // validate global state (might got invalid during full load) |
There was a problem hiding this comment.
what's the need of this change ?
There was a problem hiding this comment.
This is for the stale external state recovery case.
Scenario:
- Parquet commits destination progress at LSN B in
_completed.json - the Postgres slot is already at B
- OLake crashes before the external source state file is saved, so the next run still starts from old state A
Without this guard, Postgres fails on the A vs B LSN mismatch before destination metadata can repair the state. The skip is intentionally narrow: it only applies when destination metadata is ahead, all streams are already committed, and the replication slot is exactly at the destination-committed LSN. Other mismatches still fail as before.
| return filepath.Join(p.local2PCPath(), p.stagingDirName(threadID)) | ||
| } | ||
|
|
||
| func (p *Parquet) stagingDataDir(basePath string) string { |
There was a problem hiding this comment.
can you change the variable name of the argument, also can you add short comments for the functions in this file
There was a problem hiding this comment.
Done, renamed the argument to avoid confusion with p.basePath, and added short comments around the 2PC helpers.
| } | ||
|
|
||
| func (p *Parquet) listS3CompletedMarkers(ctx context.Context) ([]parquet2PCCompletedMarker, error) { | ||
| prefix := p.s3ObjectPath(filepath.Join(p.basePath, parquet2PCDir)) + "/" |
There was a problem hiding this comment.
why are we saving staging files under _olake_2pc folder and not directly in the root folder
There was a problem hiding this comment.
Keeping staging under _olake_2pc is intentional. Staged files are not committed table data yet, so placing them under the table root can make raw Parquet readers see partial/uncommitted files. _olake_2pc keeps staging hidden but still table-local, and DropStreams() clears it naturally with the table prefix.
| type parquet2PCCompletedMarker struct { | ||
| ThreadID string | ||
| MetadataState *types.MetadataState | ||
| modTime time.Time |
There was a problem hiding this comment.
do we need modTime ?
There was a problem hiding this comment.
Yes, we need an ordering value because CDC/incremental/no-op syncs can leave multiple completed markers over time, and setup needs to restore the latest metadata state. I renamed modTime to completedAt and added a short comment to make that purpose clearer.
| err := p.s3Client.ListObjectsPagesWithContext(ctx, &s3.ListObjectsInput{ | ||
| Bucket: aws.String(p.config.Bucket), | ||
| Prefix: aws.String(prefix), | ||
| }, func(page *s3.ListObjectsOutput, _ bool) bool { | ||
| keys := make([]string, 0, len(page.Contents)) | ||
| for _, obj := range page.Contents { | ||
| if obj.Key != nil { | ||
| keys = append(keys, *obj.Key) | ||
| } | ||
| } | ||
| if len(keys) == 0 { | ||
| return true | ||
| } | ||
| concurrency := min(len(keys), 8) | ||
| pageErr = utils.Concurrent(ctx, keys, concurrency, func(_ context.Context, key string, _ int) error { | ||
| return p.deleteS3Object(ctx, key) | ||
| }) | ||
| return pageErr == nil |
There was a problem hiding this comment.
Retries should be added in this as well as other s3 related functions as well like we have in clearS3files
There was a problem hiding this comment.
Agreed, added retries for the new S3 2PC operations using the same RetryWithSkip + isRateLimitError pattern used by existing Parquet S3 cleanup. This now covers staging upload, marker read/write, listing, copy, and delete paths.
| marker, err := p.readLocalCompletedMarker(entry.Name()) | ||
| if os.IsNotExist(err) { | ||
| continue | ||
| } |
There was a problem hiding this comment.
why don't we create a map here only and not depend on list staging dirs function at all, we will mark the folders without completed marker as false otherwise true and data will be there
There was a problem hiding this comment.
Agreed, refactored this into a single staging-entry map. Each staging dir now maps to either a completed marker or nil, so setup can promote committed staging and delete incomplete staging without separately listing completed markers and staging dirs.
|
@siiddhantt once you are done with your changes after the review , please re-request the review from here so that I know the pr is ready again |

Description
Fixes #856
Adds table-local 2PC metadata for the Parquet destination under:
<namespace>/<table>/_olake_2pc/The goal is to make Parquet recover from destination-side committed progress instead of relying only on the external source state file. This is needed across full refresh, incremental, and CDC syncs:
_olake_2pcinstead of leaving final-path orphan files for later retries to reason aboutThis PR keeps the main change inside
destination/parquetand uses hidden per-thread staging plus a completed marker:_olake_2pc/<escaped-thread-id>.staging/_completed.jsonProtocol:
_olake_2pc/<escaped-thread-id>.staging/Close()writes_completed.json_completed.jsonis kept as the durable Parquet-local progress marker for that threadSetup()callsload2PCState()to recover table-local metadata before a retry starts_completed.json, it treats that thread as committed and promotes any remaining staged files_completed.json, it treats that attempt as incomplete and removes the hidden staging prefix before retryFor full refresh,
_completed.jsoncan be{}because the committed thread/chunk ID is encoded in the staging folder name. For CDC, incremental, and no-op progress,_completed.jsoncontains the optionaltypes.MetadataState, tying cursor/LSN progress to the same completed marker.Implementation details:
parquetDataFileso close, upload, and staging promotion use one file identity instead of reconstructing paths in multiple placesmetadataState()to normalize destination final state into the sametypes.MetadataStateshape used by destination recoverystate.json,commits/*.json,preparing/*.json, and prepare-marker file-list protocolThe completed marker is the single Parquet-local progress record. If the process fails before
_completed.jsonis written, setup treats staging as incomplete and removes it. If the process fails after_completed.jsonis written but before promotion finishes, setup rolls that completed staging forward.Type of change
How Has This Been Tested?
go test ./destination/parquet -count=1go test ./destination/... -count=1go test ./types ./pkg/parser -count=1go test ./drivers/postgres/internal -run TestConfig -count=1go build -o /private/tmp/olake-build-check .golangci-lint run ./destination/parquetScreenshots or Recordings
WIP
Documentation
Related PR's (If Any):