@@ -2,10 +2,29 @@ package targetnode
22
33import (
44 "context"
5+ "fmt"
6+ "strconv"
7+ "strings"
8+ "sync"
9+ "time"
510
611 "golang.org/x/sync/errgroup"
712
13+ "github.com/samber/lo"
14+
15+ "github.com/rudderlabs/rudder-go-kit/config"
16+ "github.com/rudderlabs/rudder-go-kit/etcdwatcher"
17+ "github.com/rudderlabs/rudder-go-kit/jsonrs"
18+ "github.com/rudderlabs/rudder-go-kit/logger"
19+ "github.com/rudderlabs/rudder-go-kit/stats"
20+ obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
821 etcdtypes "github.com/rudderlabs/rudder-schemas/go/cluster"
22+ "github.com/rudderlabs/rudder-server/cluster/migrator/etcdclient"
23+ "github.com/rudderlabs/rudder-server/cluster/migrator/etcdkeys"
24+ "github.com/rudderlabs/rudder-server/cluster/migrator/partitionmigration/server"
25+ "github.com/rudderlabs/rudder-server/cluster/migrator/retry"
26+ "github.com/rudderlabs/rudder-server/cluster/partitionbuffer"
27+ "github.com/rudderlabs/rudder-server/jobsdb"
928)
1029
1130// Migrator defines the interface for a target node migrator
@@ -16,3 +35,235 @@ type Migrator interface {
1635 // Run starts a gRPC server for accepting jobs from source nodes and watches for moved migration jobs assigned to this target node
1736 Run (ctx context.Context , wg * errgroup.Group ) error
1837}
38+
39+ type migrator struct {
40+ nodeIndex int
41+ nodeName string
42+
43+ etcdClient etcdclient.Client
44+ bufferedJobsDBs [][]partitionbuffer.JobsDBPartitionBuffer // hierarchy of partition buffer jobsdbs
45+ unbufferedJobsDBs []jobsdb.JobsDB // jobsdbs used for writing migrated jobs to (unbuffered)
46+ config * config.Config
47+ logger logger.Logger
48+ stats stats.Stats
49+
50+ // state
51+ pendingMigrationJobsMu sync.Mutex
52+ pendingMigrationJobs map [string ]struct {}
53+ }
54+
55+ // Handle marks the partitions assigned to this target node as buffered in all buffered jobsDBs.
56+ func (m * migrator ) Handle (ctx context.Context , migration * etcdtypes.PartitionMigration ) error {
57+ defer m .stats .NewTaggedStat ("partition_mig_target_handle" , stats .TimerType , m .statsTags ()).RecordDuration ()()
58+ targetPartitions := getTargetPartitions (migration , m .nodeIndex )
59+ if len (targetPartitions ) == 0 {
60+ return nil // no partitions assigned to this target node
61+ }
62+
63+ // Mark the partitions as buffered in all bufferedJobsDBs
64+ g , ctx := errgroup .WithContext (ctx )
65+ allBufferedDBs := lo .FlatMap (m .bufferedJobsDBs , func (x []partitionbuffer.JobsDBPartitionBuffer , _ int ) []partitionbuffer.JobsDBPartitionBuffer {
66+ return x
67+ })
68+ for _ , pb := range allBufferedDBs {
69+ g .Go (func () error {
70+ if err := pb .BufferPartitions (ctx , targetPartitions ); err != nil {
71+ return fmt .Errorf ("marking partitions as buffered in in %q jobsdb: %w" , pb .Identifier (), err )
72+ }
73+ return nil
74+ })
75+ }
76+ return g .Wait ()
77+ }
78+
79+ // Run watches for moved migration jobs assigned to this target node and handles them asynchronously.
80+ // It also starts a gRPC server for accepting jobs from source nodes.
81+ // All go routines are added to the provided errgroup.Group.
82+ // It returns an error if the watcher cannot be created, or if the gRPC server fails to start.
83+ func (m * migrator ) Run (ctx context.Context , wg * errgroup.Group ) error {
84+ m .pendingMigrationJobs = make (map [string ]struct {}) // reset pending migration jobs map
85+
86+ // create a watcher for partition migration jobs
87+ jobWatcher , err := etcdwatcher .NewBuilder [* etcdtypes.PartitionMigrationJob ](m .etcdClient ,
88+ etcdkeys .MigrationJobKeyPrefix (m .config )).
89+ WithPrefix ().
90+ WithWatchEventType (etcdwatcher .PutWatchEventType ).
91+ WithWatchMode (etcdwatcher .AllMode ).
92+ WithFilter (func (event * etcdwatcher.Event [* etcdtypes.PartitionMigrationJob ]) bool {
93+ // only watch for moved migration jobs
94+ if event .Value .Status != etcdtypes .PartitionMigrationJobStatusMoved {
95+ return false
96+ }
97+ // where this node is a target node
98+ pmj := event .Value
99+ if pmj .TargetNode != m .nodeIndex {
100+ return false
101+ }
102+ // skip if migration job is already being processed,
103+ // otherwise add it to pending migration jobs
104+ m .pendingMigrationJobsMu .Lock ()
105+ _ , exists := m .pendingMigrationJobs [event .Value .JobID ]
106+ if ! exists {
107+ m .pendingMigrationJobs [event .Value .JobID ] = struct {}{}
108+ }
109+ m .pendingMigrationJobsMu .Unlock ()
110+ return ! exists
111+ }).
112+ Build ()
113+ if err != nil {
114+ return fmt .Errorf ("creating etcd watcher: %w" , err )
115+ }
116+ // start gRPC server for accepting jobs from source nodes
117+ pms := server .NewPartitionMigrationServer (ctx , m .unbufferedJobsDBs ,
118+ server .WithDedupEnabled (m .config .GetBoolVar (true , "PartitionMigration.Grpc.Server.dedupEnabled" )),
119+ server .WithStreamTimeout (m .config .GetReloadableDurationVar (10 , time .Minute , "PartitionMigration.Grpc.Server.streamTimeout" )),
120+ server .WithLogger (m .logger ),
121+ server .WithStats (m .stats ),
122+ )
123+ grpcServer := server .NewGRPCServer (m .config , pms )
124+ if err := grpcServer .Start (); err != nil {
125+ return fmt .Errorf ("starting gRPC server: %w" , err )
126+ }
127+
128+ // start watching for moved migration jobs
129+ wg .Go (func () error {
130+ // It keeps retrying on errors using an exponential backoff until the context is done.
131+ if err := retry .PerpetualExponentialBackoffWithNotify (ctx , m .config ,
132+ func () error {
133+ // Watch for moved partition migration job events and handle them
134+ values , leave := jobWatcher .Watch (ctx )
135+ defer leave ()
136+ for value := range values {
137+ if value .Error != nil {
138+ return fmt .Errorf ("watching partition migration job events: %w" , value .Error )
139+ }
140+ // handle moved migration event asynchronously
141+ wg .Go (func () error {
142+ m .onNewJob (ctx , value .Event .Key , value .Event .Value )
143+ return nil
144+ })
145+ }
146+ return nil
147+ },
148+ func (err error , d time.Duration ) {
149+ m .logger .Errorn ("Error watching for moved partition migration jobs, retrying" ,
150+ logger .NewDurationField ("retryIn" , d ),
151+ obskit .Error (err ),
152+ )
153+ },
154+ ); err != nil {
155+ m .logger .Errorn ("Failed to watch moved partition migration job events, context done" ,
156+ obskit .Error (err ),
157+ )
158+ }
159+ return nil
160+ })
161+
162+ // stop gRPC server when context is done
163+ wg .Go (func () error {
164+ <- ctx .Done ()
165+ grpcServer .Stop ()
166+ return nil
167+ })
168+ return nil
169+ }
170+
171+ // onNewJob handles a moved partition migration job assigned to this target node:
172+ // For each group of buffered JobsDBPartitionBuffers, it flushes the buffered partitions for
173+ // this job concurrently in all partition buffers of that group. Then it marks the job as completed
174+ //
175+ // It retries on errors with an exponential backoff until the context is done.
176+ func (m * migrator ) onNewJob (ctx context.Context , key string , job * etcdtypes.PartitionMigrationJob ) {
177+ start := time .Now ()
178+ log := m .logger .Withn (
179+ logger .NewStringField ("migrationID" , job .MigrationID ),
180+ logger .NewStringField ("jobID" , job .JobID ),
181+ logger .NewIntField ("sourceNode" , int64 (job .SourceNode )),
182+ logger .NewIntField ("targetNode" , int64 (job .TargetNode )),
183+ )
184+ log .Infon ("Received moved partition migration job" ,
185+ logger .NewStringField ("partitions" , fmt .Sprintf ("%v" , job .Partitions )),
186+ )
187+
188+ // Keep retrying errors with a backoff until context is done
189+ if err := retry .PerpetualExponentialBackoffWithNotify (ctx , m .config ,
190+ func () error {
191+ // flush jobs in sequence
192+ for pbGroupIndex , pbGroup := range m .bufferedJobsDBs {
193+ // flush buffered partitions for this job in all partition buffers of this group concurrently
194+ g , jobCtx := errgroup .WithContext (ctx )
195+ groupIdentifiers := lo .Map (pbGroup , func (p partitionbuffer.JobsDBPartitionBuffer , _ int ) string { return p .Identifier () })
196+ log .Infon ("Flushing buffered partitions for group" ,
197+ logger .NewIntField ("groupIndex" , int64 (pbGroupIndex )),
198+ logger .NewStringField ("groupMembers" , strings .Join (groupIdentifiers , "," )),
199+ )
200+ for _ , pb := range pbGroup {
201+ pb := pb
202+ g .Go (func () error {
203+ if err := pb .FlushBufferedPartitions (jobCtx , job .Partitions ); err != nil {
204+ return fmt .Errorf ("flushing buffered partitions of %q: %w" , pb .Identifier (), err )
205+ }
206+ return nil
207+ })
208+ }
209+ if err := g .Wait (); err != nil {
210+ return fmt .Errorf ("flushing buffered partitions for group index %d: %w" , pbGroupIndex , err )
211+ }
212+ log .Infon ("Flushed buffered partitions for group successfully" ,
213+ logger .NewIntField ("groupIndex" , int64 (pbGroupIndex )),
214+ logger .NewStringField ("groupMembers" , strings .Join (groupIdentifiers , "," )),
215+ )
216+ }
217+
218+ // mark partition migration job status as [completed] in etcd
219+ job .Status = etcdtypes .PartitionMigrationJobStatusCompleted
220+ v , err := jsonrs .Marshal (job )
221+ if err != nil {
222+ return fmt .Errorf ("marshaling migration job to mark its status as completed: %w" , err )
223+ }
224+ res , err := m .etcdClient .Put (ctx , key , string (v ))
225+ if err != nil {
226+ return fmt .Errorf ("marking partition migration job status as completed in etcd: %w" , err )
227+ }
228+
229+ // remove from pending migration jobs
230+ m .pendingMigrationJobsMu .Lock ()
231+ delete (m .pendingMigrationJobs , job .JobID )
232+ m .pendingMigrationJobsMu .Unlock ()
233+
234+ log .Infon ("Partition migration job status marked as completed successfully" ,
235+ logger .NewIntField ("revision" , res .Header .Revision ),
236+ )
237+ m .stats .NewTaggedStat ("partition_mig_target_job" , stats .TimerType , m .statsTags ()).SendTiming (time .Since (start ))
238+ return nil
239+ },
240+ func (err error , d time.Duration ) {
241+ log .Errorn ("Error handling moved partition migration job, retrying" ,
242+ logger .NewDurationField ("retryIn" , d ),
243+ obskit .Error (err ),
244+ )
245+ },
246+ ); err != nil {
247+ log .Errorn ("Failed to handle moved partition migration job, context done" ,
248+ obskit .Error (err ),
249+ )
250+ }
251+ }
252+
253+ func (m * migrator ) statsTags () stats.Tags {
254+ return stats.Tags {
255+ "nodeIndex" : strconv .Itoa (m .nodeIndex ),
256+ "component" : "processor" ,
257+ }
258+ }
259+
260+ // getTargetPartitions returns the list of partitions assigned to the given target node index in the migration jobs
261+ func getTargetPartitions (migration * etcdtypes.PartitionMigration , nodeIndex int ) []string {
262+ var targetPartitions []string
263+ for _ , job := range migration .Jobs {
264+ if job .TargetNode == nodeIndex {
265+ targetPartitions = append (targetPartitions , job .Partitions ... )
266+ }
267+ }
268+ return targetPartitions
269+ }
0 commit comments