feat: integrate processor partition migrator in processor and embedded apps#6622
feat: integrate processor partition migrator in processor and embedded apps#6622
Conversation
97d5ca7 to
64de959
Compare
768aed6 to
ac24846
Compare
64de959 to
6fc3ce2
Compare
| if _, err := tx.ExecContext(ctx, fmt.Sprintf(`CREATE INDEX "idx_%[1]s_partid" ON %[1]q (partition_id)`, newDS.JobTable)); err != nil { | ||
| return fmt.Errorf("creating partition_id index: %w", err) | ||
| } | ||
| if _, err := tx.ExecContext(ctx, fmt.Sprintf(`CREATE INDEX "idx_%[1]s_partid" ON %[1]q (partition_id)`, newDS.JobTable)); err != nil { |
There was a problem hiding this comment.
Note: we are creating the index regardless of the numPartitions configuration option, since buffer JobsDBs will have numPartitions: 0, but still, we need to be able to get jobs out of them for specific partitions.
We don't want to enable numPartitions for buffer JobsDBs, since there is no need to create xx_buf_read_excluded_partitions and xx_buf_buffered_partitionstables for them or create any extra entries in the buffered_partitions_versions table.
| } | ||
|
|
||
| targetURLProvider, err := func() (func(targetNodeIndex int) (string, error), error) { | ||
| processorNodeHostPattern := config.GetStringVar("", "PROCESSOR_NODE_HOST_PATTERN") |
There was a problem hiding this comment.
Note: this is a new env variable we'll be adding in processor pods
| if err != nil { | ||
| return nil, nil, nil, nil, fmt.Errorf("getting etcd client: %w", err) | ||
| } | ||
| nodeIndex := config.GetIntVar(-1, 1, "PROCESSOR_INDEX") |
There was a problem hiding this comment.
Note: this is an existing env variable
| } | ||
| } | ||
|
|
||
| log := logger.NewLogger().Child("partitionmigrator") |
There was a problem hiding this comment.
Note: using the same logger across components so that it is easier to setup alerts
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #6622 +/- ##
==========================================
+ Coverage 79.56% 79.68% +0.12%
==========================================
Files 556 562 +6
Lines 62176 62829 +653
==========================================
+ Hits 49470 50068 +598
- Misses 9783 9794 +11
- Partials 2923 2967 +44 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
d430532 to
7e8deaf
Compare
ac24846 to
e2ec244
Compare
9f9adeb to
c5c0f76
Compare
e2ec244 to
f6fb61b
Compare
c2da87a to
316abaa
Compare
316abaa to
c9ea36a
Compare
…d apps 🔒 Scanned for secrets using gitleaks 8.30.0
c9ea36a to
40aa461
Compare
🔒 Scanned for secrets using gitleaks 8.30.0
37eeb9a to
f0368bc
Compare
🔒 Scanned for secrets using gitleaks 8.30.0
🤖 I have created a release *beep* *boop* --- ## [1.67.0-rc.1](v1.66.0...v1.67.0-rc.1) (2026-02-02) ### Features * gateway partition migrator ([#6631](#6631)) ([344c85e](344c85e)) * integrate processor partition migrator in processor and embedded apps ([#6622](#6622)) ([b5ee878](b5ee878)) * processor partition migrator watch for new migrations ([#6611](#6611)) ([8bfb2ca](8bfb2ca)) * source node partition migrator ([#6614](#6614)) ([9e8b458](9e8b458)) * target node partition migrator ([#6617](#6617)) ([1cffea7](1cffea7)) * transformation language support ([#6637](#6637)) ([c989784](c989784)) ### Bug Fixes * negative pending events are recorded for destinations after startup ([#6619](#6619)) ([1e43b02](1e43b02)) * openapi definition validation ([#6643](#6643)) ([905f2ee](905f2ee)) ### Miscellaneous * cleanup code to push image to dockerhub for rudder-server ent ([#6628](#6628)) ([cf59323](cf59323)) * sync release v1.66.0 to main branch ([#6618](#6618)) ([c0b9619](c0b9619)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
🤖 I have created a release *beep* *boop* --- ## [1.67.0-rc.2](v1.66.0...v1.67.0-rc.2) (2026-02-02) ### Features * gateway partition migrator ([#6631](#6631)) ([344c85e](344c85e)) * integrate processor partition migrator in processor and embedded apps ([#6622](#6622)) ([b5ee878](b5ee878)) * processor partition migrator watch for new migrations ([#6611](#6611)) ([8bfb2ca](8bfb2ca)) * source node partition migrator ([#6614](#6614)) ([9e8b458](9e8b458)) * target node partition migrator ([#6617](#6617)) ([1cffea7](1cffea7)) * transformation language support ([#6637](#6637)) ([c989784](c989784)) ### Bug Fixes * negative pending events are recorded for destinations after startup ([#6619](#6619)) ([1e43b02](1e43b02)) * openapi definition validation ([#6643](#6643)) ([905f2ee](905f2ee)) * tracking plan causing a panic in case of transformer non 200 status code response ([#6653](#6653)) ([b51ca68](b51ca68)) ### Miscellaneous * cleanup code to push image to dockerhub for rudder-server ent ([#6628](#6628)) ([cf59323](cf59323)) * ecr builds rate limited ([#6657](#6657)) ([59c2edb](59c2edb)) * migrate from PAT to GitHub App token (SEC-58) ([#6641](#6641)) ([3c78db1](3c78db1)) * sync release v1.66.0 to main branch ([#6618](#6618)) ([c0b9619](c0b9619)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please). Co-authored-by: rudderstack-github-actions[bot] <236995729+rudderstack-github-actions[bot]@users.noreply.github.com>
🤖 I have created a release *beep* *boop* --- ## [1.67.0](v1.66.0...v1.67.0) (2026-02-02) ### Features * gateway partition migrator ([#6631](#6631)) ([344c85e](344c85e)) * integrate processor partition migrator in processor and embedded apps ([#6622](#6622)) ([b5ee878](b5ee878)) * processor partition migrator watch for new migrations ([#6611](#6611)) ([8bfb2ca](8bfb2ca)) * source node partition migrator ([#6614](#6614)) ([9e8b458](9e8b458)) * target node partition migrator ([#6617](#6617)) ([1cffea7](1cffea7)) * transformation language support ([#6637](#6637)) ([c989784](c989784)) ### Bug Fixes * negative pending events are recorded for destinations after startup ([#6619](#6619)) ([1e43b02](1e43b02)) * openapi definition validation ([#6643](#6643)) ([905f2ee](905f2ee)) * tracking plan causing a panic in case of transformer non 200 status code response ([#6653](#6653)) ([b51ca68](b51ca68)) ### Miscellaneous * cleanup code to push image to dockerhub for rudder-server ent ([#6628](#6628)) ([cf59323](cf59323)) * ecr builds rate limited ([#6657](#6657)) ([59c2edb](59c2edb)) * migrate from PAT to GitHub App token (SEC-58) ([#6641](#6641)) ([3c78db1](3c78db1)) * sync release v1.66.0 to main branch ([#6618](#6618)) ([c0b9619](c0b9619)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please). Co-authored-by: rudderstack-github-actions[bot] <236995729+rudderstack-github-actions[bot]@users.noreply.github.com>
🔒 Scanned for secrets using gitleaks 8.30.0
Description
Integrating processor partition migrator in embedded and processor apps & introducing an integration test for the embedded scenario:
cancelfunction frommainto therunnerso that source migrator is able to shutdown the server in case of a timeout while waiting for inflight jobs to settle.clustertest.PartitionRoutingProxy, a simple http reverse proxy which routes requests to the proper server backend based on user id.clustertest.PartitionMigrationExecutor, a component which is able to orchestrate one single partition migration through etcd.X-Rudder-Instance-Idheader in its requests.readerCapacity&writerCapacityare uninitialized.partition_idindex in jobsdb regardless of thenumPartitionsconfiguration option, since buffer JobsDBs are going to havenumPartitions: 0, but still, we need to be able to get jobs out of them for specific partitions.Linear Ticket
resolves PIPE-2700
Security