diff --git a/.github/workflows/verify.yml b/.github/workflows/verify.yml index e58cf70c4b..214aea68f4 100644 --- a/.github/workflows/verify.yml +++ b/.github/workflows/verify.yml @@ -77,5 +77,5 @@ jobs: - name: golangci-lint uses: golangci/golangci-lint-action@v7 with: - version: v2.0.2 + version: v2.1.1 args: -v diff --git a/Makefile b/Makefile index 8f836481e6..b549f273f4 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ TESTFILE=_testok MOUNT_PATH=/local # go tools versions -GOLANGCI=github.com/golangci/golangci-lint/v2/cmd/golangci-lint@v2.0.2 +GOLANGCI=github.com/golangci/golangci-lint/v2/cmd/golangci-lint@v2.1.1 gofumpt=mvdan.cc/gofumpt@latest govulncheck=golang.org/x/vuln/cmd/govulncheck@latest goimports=golang.org/x/tools/cmd/goimports@latest diff --git a/integration_test/docker_test/docker_test.go b/integration_test/docker_test/docker_test.go index 02bfc8dda5..f4a2d79f74 100644 --- a/integration_test/docker_test/docker_test.go +++ b/integration_test/docker_test/docker_test.go @@ -175,12 +175,12 @@ func testCases(t *testing.T) { eventSql := "select anonymous_id, user_id from dev_integration_test_1.identifies limit 1;" _ = db.QueryRow(eventSql).Scan(&myEvent.anonymousID, &myEvent.userID) return myEvent.anonymousID == "anonymousId_1" - }, time.Minute, 10*time.Millisecond) + }, 2*time.Minute, 10*time.Millisecond) require.Eventually(t, func() bool { eventSql := "select count(*) from dev_integration_test_1.identifies;" _ = db.QueryRow(eventSql).Scan(&myEvent.count) return myEvent.count == "2" - }, time.Minute, 10*time.Millisecond) + }, 2*time.Minute, 10*time.Millisecond) // Verify User Transformation eventSql := "select context_myuniqueid,context_id,context_ip from dev_integration_test_1.identifies;" @@ -194,12 +194,12 @@ func testCases(t *testing.T) { eventSql := "select anonymous_id, user_id from dev_integration_test_1.users limit 1;" _ = db.QueryRow(eventSql).Scan(&myEvent.anonymousID, &myEvent.userID) return myEvent.anonymousID == "anonymousId_1" - }, time.Minute, 10*time.Millisecond) + }, 2*time.Minute, 10*time.Millisecond) require.Eventually(t, func() bool { eventSql := "select count(*) from dev_integration_test_1.users;" _ = db.QueryRow(eventSql).Scan(&myEvent.count) return myEvent.count == "1" - }, time.Minute, 10*time.Millisecond) + }, 2*time.Minute, 10*time.Millisecond) // Verify User Transformation eventSql = "select context_myuniqueid,context_id,context_ip from dev_integration_test_1.users;" @@ -213,12 +213,12 @@ func testCases(t *testing.T) { eventSql := "select anonymous_id, user_id from dev_integration_test_1.screens limit 1;" _ = db.QueryRow(eventSql).Scan(&myEvent.anonymousID, &myEvent.userID) return myEvent.anonymousID == "anonymousId_1" - }, time.Minute, 10*time.Millisecond) + }, 2*time.Minute, 10*time.Millisecond) require.Eventually(t, func() bool { eventSql := "select count(*) from dev_integration_test_1.screens;" _ = db.QueryRow(eventSql).Scan(&myEvent.count) return myEvent.count == "1" - }, time.Minute, 10*time.Millisecond) + }, 2*time.Minute, 10*time.Millisecond) // Verify User Transformation eventSql = "select prop_key,myuniqueid,ip from dev_integration_test_1.screens;" diff --git a/router/batchrouter/batchrouter_test.go b/router/batchrouter/batchrouter_test.go index abc2a0fa00..fc183110f9 100644 --- a/router/batchrouter/batchrouter_test.go +++ b/router/batchrouter/batchrouter_test.go @@ -302,7 +302,7 @@ var _ = Describe("BatchRouter", func() { <-batchrouter.backendConfigInitialized batchrouter.minIdleSleep = config.SingleValueLoader(time.Microsecond) - batchrouter.uploadFreq = config.SingleValueLoader(time.Microsecond) + batchrouter.uploadFreq = config.SingleValueLoader(5 * time.Millisecond) batchrouter.mainLoopFreq = config.SingleValueLoader(time.Microsecond) ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup @@ -409,19 +409,13 @@ var _ = Describe("BatchRouter", func() { }, ).AnyTimes() - c.mockBatchRouterJobsDB.EXPECT().UpdateJobStatus(gomock.Any(), gomock.Any(), []string{CustomVal["S3"]}, gomock.Any()).Times(1). - Do(func(ctx context.Context, statuses []*jobsdb.JobStatusT, _, _ interface{}) { - assertJobStatus(toRetryJobsList[0], statuses[0], jobsdb.Executing.State, `{}`, 130) - assertJobStatus(toRetryJobsList[1], statuses[1], jobsdb.Executing.State, `{}`, 4) - }).Return(nil) - c.mockBatchRouterJobsDB.EXPECT().WithUpdateSafeTx(gomock.Any(), gomock.Any()).Times(1).Do(func(ctx context.Context, f func(tx jobsdb.UpdateSafeTx) error) { _ = f(jobsdb.EmptyUpdateSafeTx()) }).Return(nil) c.mockBatchRouterJobsDB.EXPECT().UpdateJobStatusInTx(gomock.Any(), gomock.Any(), gomock.Any(), []string{CustomVal["S3"]}, gomock.Any()).Times(1). Do(func(ctx context.Context, _ interface{}, statuses []*jobsdb.JobStatusT, _, _ interface{}) { - assertJobStatus(toRetryJobsList[0], statuses[0], jobsdb.Aborted.State, fmt.Sprintf(`{"firstAttemptedAt": "%s", "Error": "BRT: Batch destination source not found in config for sourceID: %s"}`, attempt1.Format(misc.RFC3339Milli), SourceIDEnabled+"random"), 130) - assertJobStatus(toRetryJobsList[1], statuses[1], jobsdb.Aborted.State, fmt.Sprintf(`{"firstAttemptedAt": "%s", "Error": "BRT: Batch destination source not found in config for sourceID: %s"}`, attempt2.Format(misc.RFC3339Milli), SourceIDEnabled+"random"), 4) + assertJobStatus(toRetryJobsList[0], statuses[0], jobsdb.Aborted.State, "{\"reason\":\"source_not_found\"}", 130) + assertJobStatus(toRetryJobsList[1], statuses[1], jobsdb.Aborted.State, "{\"reason\":\"source_not_found\"}", 4) }).Return(nil) c.mockProcErrorsDB.EXPECT().Store(gomock.Any(), gomock.Any()).Times(1).DoAndReturn( func(ctx context.Context, _ []*jobsdb.JobT) error { @@ -432,7 +426,7 @@ var _ = Describe("BatchRouter", func() { <-batchrouter.backendConfigInitialized batchrouter.minIdleSleep = config.SingleValueLoader(time.Microsecond) - batchrouter.uploadFreq = config.SingleValueLoader(time.Microsecond) + batchrouter.uploadFreq = config.SingleValueLoader(5 * time.Millisecond) batchrouter.mainLoopFreq = config.SingleValueLoader(time.Microsecond) done := make(chan struct{}) go func() { @@ -620,11 +614,11 @@ func TestBatchRouter(t *testing.T) { "table": "tracks" } }`), - Parameters: jsonb.RawMessage([]byte(fmt.Sprintf(`{ + Parameters: []byte(fmt.Sprintf(`{ "source_id": %[1]q, "destination_id": %[2]q, "receivedAt": %[3]q - }`, bc.Sources[0].ID, s3Dest.ID, time.Now().Format(time.RFC3339)))), + }`, bc.Sources[0].ID, s3Dest.ID, time.Now().Format(time.RFC3339))), CustomVal: s3Dest.DestinationDefinition.Name, CreatedAt: time.Now(), }) @@ -649,8 +643,8 @@ func TestBatchRouter(t *testing.T) { ) batchrouter.minIdleSleep = config.SingleValueLoader(time.Microsecond) - batchrouter.uploadFreq = config.SingleValueLoader(time.Microsecond) - batchrouter.mainLoopFreq = config.SingleValueLoader(time.Microsecond) + batchrouter.uploadFreq = config.SingleValueLoader(5 * time.Millisecond) + batchrouter.mainLoopFreq = config.SingleValueLoader(time.Second) err = routerDB.Store(context.Background(), jobs) require.NoError(t, err) @@ -665,7 +659,7 @@ func TestBatchRouter(t *testing.T) { return false } return len(minioContents) == len(bcs) - }, 5*time.Second, 200*time.Millisecond) + }, 2*time.Minute, 200*time.Millisecond) minioContents, err := minioResource.Contents(context.Background(), "") require.NoError(t, err) diff --git a/router/batchrouter/consumer_worker.go b/router/batchrouter/consumer_worker.go new file mode 100644 index 0000000000..39e22dc873 --- /dev/null +++ b/router/batchrouter/consumer_worker.go @@ -0,0 +1,125 @@ +package batchrouter + +import ( + "context" + "time" + + "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-server/jobsdb" +) + +// ConsumerWorker handles job consumption for a specific source-destination pair. +// It acts as a buffer between job ingestion and batch processing, storing jobs +// in partition queues for the batch worker to process. +type ConsumerWorker struct { + sourceID string + destID string + jobChan chan *jobsdb.JobT + ctx context.Context + logger logger.Logger + callbacks *ConsumerCallbacks +} + +// ConsumerCallbacks contains callback functions needed by the consumer worker +type ConsumerCallbacks struct { + GetUploadFrequency func() time.Duration + GetMaxBatchSize func() int + ProcessJobs func(ctx context.Context, jobs []*jobsdb.JobT) error + ReleaseWorker func(sourceID, destID string) + PartitionJobs func(jobs []*jobsdb.JobT) [][]*jobsdb.JobT +} + +// NewConsumerWorker creates a new consumer worker for a source-destination pair +func NewConsumerWorker( + sourceID string, + destID string, + jobChan chan *jobsdb.JobT, + ctx context.Context, + logger logger.Logger, + callbacks *ConsumerCallbacks, +) *ConsumerWorker { + return &ConsumerWorker{ + sourceID: sourceID, + destID: destID, + jobChan: jobChan, + ctx: ctx, + logger: logger.Child("consumer-worker").With("sourceID", sourceID, "destID", destID), + callbacks: callbacks, + } +} + +// Work implements the workerpool.Worker interface. +// It consumes jobs from the channel and processes them in batches. +// When either the batch size threshold or the upload frequency timer is reached, +// it sends the collected jobs for processing using the callbacks.ProcessJobs method. +// This directly processes the jobs without sending them back to the buffer. +// Returns true if any jobs were processed. +func (c *ConsumerWorker) Work() bool { + defer c.callbacks.ReleaseWorker(c.sourceID, c.destID) + + uploadFrequency := c.callbacks.GetUploadFrequency() + maxBatchSize := c.callbacks.GetMaxBatchSize() + + var jobs []*jobsdb.JobT + timer := time.NewTimer(uploadFrequency) + defer timer.Stop() + + jobsProcessed := false + + for { + select { + case <-c.ctx.Done(): + if len(jobs) > 0 { + partitionedJobs := c.callbacks.PartitionJobs(jobs) + for _, jobBatch := range partitionedJobs { + if err := c.callbacks.ProcessJobs(c.ctx, jobBatch); err != nil { + c.logger.Errorf("Error processing jobs: %v", err) + } + } + jobsProcessed = true + } + return jobsProcessed + + case job := <-c.jobChan: + jobs = append(jobs, job) + if len(jobs) >= maxBatchSize { + partitionedJobs := c.callbacks.PartitionJobs(jobs) + for _, jobBatch := range partitionedJobs { + if err := c.callbacks.ProcessJobs(c.ctx, jobBatch); err != nil { + c.logger.Errorf("Error processing jobs: %v", err) + } + } + jobs = nil + timer.Reset(uploadFrequency) + jobsProcessed = true + } + + case <-timer.C: + if len(jobs) > 0 { + partitionedJobs := c.callbacks.PartitionJobs(jobs) + for _, jobBatch := range partitionedJobs { + if err := c.callbacks.ProcessJobs(c.ctx, jobBatch); err != nil { + c.logger.Errorf("Error processing jobs: %v", err) + } + } + jobsProcessed = true + } + timer.Reset(uploadFrequency) + return jobsProcessed + } + } +} + +// SleepDurations returns the min and max sleep durations for the worker when idle +func (c *ConsumerWorker) SleepDurations() (min, max time.Duration) { + return time.Millisecond * 100, time.Second * 5 +} + +// Stop implements the workerpool.Worker interface +func (c *ConsumerWorker) Stop() { + // No cleanup needed +} + +func (c *ConsumerWorker) IsIdle() bool { + return len(c.jobChan) == 0 +} diff --git a/router/batchrouter/handle.go b/router/batchrouter/handle.go index b0e16dca81..695f2c660b 100644 --- a/router/batchrouter/handle.go +++ b/router/batchrouter/handle.go @@ -109,6 +109,8 @@ type Handle struct { backgroundCancel context.CancelFunc backgroundWait func() error + jobBuffer *JobBuffer // Added for channel-based job buffering + backendConfigInitializedOnce sync.Once backendConfigInitialized chan bool @@ -121,9 +123,8 @@ type Handle struct { encounteredMergeRuleMap map[string]map[string]bool limiter struct { - read kitsync.Limiter - process kitsync.Limiter - upload kitsync.Limiter + read kitsync.Limiter + upload kitsync.Limiter } lastExecTimesMu sync.RWMutex @@ -190,8 +191,7 @@ func (brt *Handle) getWorkerJobs(partition string) (workerJobs []*DestinationJob if brt.skipFetchingJobs(partition) { return } - - defer brt.limiter.read.Begin("")() + defer brt.limiter.read.Begin(partition)() brt.configSubscriberMu.RLock() destinationsMap := brt.destinationsMap @@ -621,6 +621,7 @@ func (brt *Handle) updateJobStatus(batchJobs *BatchedJobs, isWarehouse bool, err brt.failingDestinationsMu.Lock() brt.failingDestinations[batchJobs.Connection.Destination.ID] = batchReqMetric.batchRequestFailed > 0 brt.failingDestinationsMu.Unlock() + var statusList []*jobsdb.JobStatusT if isWarehouse && notifyWarehouseErr { diff --git a/router/batchrouter/handle_lifecycle.go b/router/batchrouter/handle_lifecycle.go index 96e51b58f8..176ed90f5c 100644 --- a/router/batchrouter/handle_lifecycle.go +++ b/router/batchrouter/handle_lifecycle.go @@ -141,15 +141,6 @@ func (brt *Handle) Setup( return time.After(limiterStatsPeriod) }), ) - brt.limiter.process = kitsync.NewLimiter(ctx, &limiterGroup, "brt_process", - getBatchRouterConfigInt("Limiter.process.limit", brt.destType, 20), - stats.Default, - kitsync.WithLimiterDynamicPeriod(config.GetDuration("BatchRouter.Limiter.process.dynamicPeriod", 1, time.Second)), - kitsync.WithLimiterTags(map[string]string{"destType": brt.destType}), - kitsync.WithLimiterStatsTriggerFunc(func() <-chan time.Time { - return time.After(limiterStatsPeriod) - }), - ) brt.limiter.upload = kitsync.NewLimiter(ctx, &limiterGroup, "brt_upload", getBatchRouterConfigInt("Limiter.upload.limit", brt.destType, 50), stats.Default, @@ -161,9 +152,17 @@ func (brt *Handle) Setup( ) brt.logger.Infof("BRT: Batch Router started: %s", destType) - brt.crashRecover() + if asynccommon.IsAsyncDestination(brt.destType) { + brt.startAsyncDestinationManager() + } + + brt.backgroundGroup.Go(crash.Wrapper(func() error { + brt.backendConfigSubscriber() + return nil + })) + // periodically publish a zero counter for ensuring that stuck processing pipeline alert // can always detect a stuck batch router brt.backgroundGroup.Go(func() error { @@ -192,14 +191,8 @@ func (brt *Handle) Setup( return nil })) - if asynccommon.IsAsyncDestination(destType) { - brt.startAsyncDestinationManager() - } - - brt.backgroundGroup.Go(crash.Wrapper(func() error { - brt.backendConfigSubscriber() - return nil - })) + // Initialize the job buffer + brt.jobBuffer = NewJobBuffer(brt) } func (brt *Handle) setupReloadableVars() { @@ -262,8 +255,21 @@ func (brt *Handle) Start() { // Shutdown stops the batch router func (brt *Handle) Shutdown() { + // Signal all goroutines to stop via context cancellation + brt.logger.Info("Initiating batch router shutdown") brt.backgroundCancel() + + // Stop all job buffer timers and workerPool + if brt.jobBuffer != nil { + brt.logger.Debug("Stopping JobBuffer and its resources") + + // Stop the job buffer (which will also stop the worker pool) + brt.jobBuffer.Shutdown() + } + + // Wait for all background goroutines to complete _ = brt.backgroundWait() + brt.logger.Info("Batch router shutdown complete") } func (brt *Handle) initAsyncDestinationStruct(destination *backendconfig.DestinationT) { @@ -399,7 +405,9 @@ func (brt *Handle) backendConfigSubscriber() { if destination.DestinationDefinition.Name == brt.destType && destination.Enabled { if _, ok := destinationsMap[destination.ID]; !ok { destinationsMap[destination.ID] = &routerutils.DestinationWithSources{Destination: destination, Sources: []backendconfig.SourceT{}} - uploadIntervalMap[destination.ID] = brt.uploadInterval(destination.Config) + if asynccommon.IsAsyncDestination(brt.destType) { + uploadIntervalMap[destination.ID] = brt.uploadInterval(destination.Config) + } } destinationsMap[destination.ID].Sources = append(destinationsMap[destination.ID].Sources, source) brt.refreshDestination(destination) diff --git a/router/batchrouter/handle_observability.go b/router/batchrouter/handle_observability.go index 3508e74c7b..9ec8609b55 100644 --- a/router/batchrouter/handle_observability.go +++ b/router/batchrouter/handle_observability.go @@ -25,32 +25,32 @@ func (brt *Handle) collectMetrics(ctx context.Context) { } for { - brt.batchRequestsMetricMu.RLock() - var diagnosisProperties map[string]interface{} - success := 0 - failed := 0 - for _, batchReqMetric := range brt.batchRequestsMetric { - success = success + batchReqMetric.batchRequestSuccess - failed = failed + batchReqMetric.batchRequestFailed - } - if len(brt.batchRequestsMetric) > 0 { - diagnosisProperties = map[string]interface{}{ - brt.destType: map[string]interface{}{ - diagnostics.BatchRouterSuccess: success, - diagnostics.BatchRouterFailed: failed, - }, - } - - brt.Diagnostics.Track(diagnostics.BatchRouterEvents, diagnosisProperties) - } - - brt.batchRequestsMetric = nil - brt.batchRequestsMetricMu.RUnlock() - select { case <-ctx.Done(): return + case <-brt.diagnosisTicker.C: + brt.batchRequestsMetricMu.RLock() + var diagnosisProperties map[string]interface{} + success := 0 + failed := 0 + for _, batchReqMetric := range brt.batchRequestsMetric { + success = success + batchReqMetric.batchRequestSuccess + failed = failed + batchReqMetric.batchRequestFailed + } + if len(brt.batchRequestsMetric) > 0 { + diagnosisProperties = map[string]interface{}{ + brt.destType: map[string]interface{}{ + diagnostics.BatchRouterSuccess: success, + diagnostics.BatchRouterFailed: failed, + }, + } + + brt.Diagnostics.Track(diagnostics.BatchRouterEvents, diagnosisProperties) + } + + brt.batchRequestsMetric = nil + brt.batchRequestsMetricMu.RUnlock() } } } @@ -128,7 +128,7 @@ func (brt *Handle) recordAsyncDestinationDeliveryStatus(sourceID, destinationID brt.debugger.RecordEventDeliveryStatus(destinationID, &deliveryStatus) } -func (brt *Handle) recordDeliveryStatus(batchDestination Connection, output UploadResult, isWarehouse bool) { +func (brt *Handle) recordDeliveryStatus(destinationId, sourceId string, output UploadResult, isWarehouse bool) { var ( errorCode string jobState string @@ -162,15 +162,15 @@ func (brt *Handle) recordDeliveryStatus(batchDestination Connection, output Uplo EventName: fmt.Sprint(output.TotalEvents) + " events", EventType: "", SentAt: time.Now().Format(misc.RFC3339Milli), - DestinationID: batchDestination.Destination.ID, - SourceID: batchDestination.Source.ID, + DestinationID: destinationId, + SourceID: sourceId, Payload: payload, AttemptNum: 1, JobState: jobState, ErrorCode: errorCode, ErrorResponse: errorResp, } - brt.debugger.RecordEventDeliveryStatus(batchDestination.Destination.ID, &deliveryStatus) + brt.debugger.RecordEventDeliveryStatus(destinationId, &deliveryStatus) } func (brt *Handle) trackRequestMetrics(batchReqDiagnostics batchRequestMetric) { diff --git a/router/batchrouter/job_buffer.go b/router/batchrouter/job_buffer.go new file mode 100644 index 0000000000..ecf9868ebc --- /dev/null +++ b/router/batchrouter/job_buffer.go @@ -0,0 +1,295 @@ +package batchrouter + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/rudderlabs/rudder-go-kit/stats" + kitsync "github.com/rudderlabs/rudder-go-kit/sync" + backendconfig "github.com/rudderlabs/rudder-server/backend-config" + "github.com/rudderlabs/rudder-server/jobsdb" + asynccommon "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common" + routerutils "github.com/rudderlabs/rudder-server/router/utils" + "github.com/rudderlabs/rudder-server/utils/misc" + "github.com/rudderlabs/rudder-server/utils/workerpool" + warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" +) + +// JobBuffer manages job buffering and batch processing for source-destination pairs. +// It handles the buffering of jobs through channels and manages batch workers that +// process these jobs in configurable batch sizes. +type JobBuffer struct { + // Core components + brt *Handle + ctx context.Context + cancel context.CancelFunc + + // Channel management + sourceDestMap map[string]chan *jobsdb.JobT + mu sync.RWMutex + + // Consumer management + consumerPool workerpool.WorkerPool + workerLimiter kitsync.Limiter + activeConsumers sync.Map +} + +// NewJobBuffer creates and initializes a new JobBuffer instance +func NewJobBuffer(brt *Handle) *JobBuffer { + ctx, cancel := context.WithCancel(context.Background()) + maxConsumers := brt.conf.GetInt("BatchRouter.maxConsumers", 100) + + // Create a WaitGroup for the limiter + var limiterGroup sync.WaitGroup + + jb := &JobBuffer{ + brt: brt, + sourceDestMap: make(map[string]chan *jobsdb.JobT), + ctx: ctx, + cancel: cancel, + workerLimiter: kitsync.NewLimiter(ctx, &limiterGroup, "batch_router_consumer_workers", maxConsumers, stats.Default), + } + + // Initialize the consumer worker pool + jb.consumerPool = workerpool.New(ctx, jb.createConsumerWorker, brt.logger) + + return jb +} + +// createConsumerWorker is a factory function for creating new consumer workers +func (jb *JobBuffer) createConsumerWorker(key string) workerpool.Worker { + sourceID, destID := ParseConnectionKey(key) + if sourceID == "" || destID == "" { + jb.brt.logger.Errorf("Invalid connection key format: %s", key) + return nil + } + + jobsChan := make(chan *jobsdb.JobT, 1000) + jb.mu.Lock() + jb.sourceDestMap[key] = jobsChan + jb.mu.Unlock() + + callbacks := &ConsumerCallbacks{ + GetUploadFrequency: func() time.Duration { + return jb.brt.uploadFreq.Load() + }, + GetMaxBatchSize: func() int { + return jb.brt.maxEventsInABatch + }, + ProcessJobs: func(ctx context.Context, jobs []*jobsdb.JobT) error { + // Process jobs for this destination using the batch router's destination processor + destWithSources, ok := jb.brt.destinationsMap[destID] + if !ok { + return fmt.Errorf("destination not found: %s", destID) + } + + // Create batch job structure with connection info for processing + // Find appropriate source for this destination + var source backendconfig.SourceT + for _, s := range destWithSources.Sources { + if s.ID == sourceID { + source = s + break + } + } + + batchedJobs := &BatchedJobs{ + Jobs: jobs, + Connection: &Connection{ + Destination: destWithSources.Destination, + Source: source, + }, + } + + // Track connection details for metrics and reporting + jobIDConnectionDetailsMap := make(map[int64]jobsdb.ConnectionDetails) + + for _, job := range jobs { + // Use strings and routerutils packages to ensure they are used + // Add metadata for tracking the consumer worker handling + job.Parameters = routerutils.EnhanceJSON(job.Parameters, "consumer_key", + strings.Join([]string{sourceID, destID}, ":")) + + jobIDConnectionDetailsMap[job.JobID] = jobsdb.ConnectionDetails{ + SourceID: sourceID, + DestinationID: destID, + } + } + + // Process based on destination type + var uploadSuccess bool + var uploadError error + + switch { + case IsObjectStorageDestination(jb.brt.destType): + // Object storage destinations (S3, GCS, etc) + output := jb.brt.upload(jb.brt.destType, batchedJobs, false) + defer misc.RemoveFilePaths(output.LocalFilePaths...) + + jb.brt.recordDeliveryStatus(destID, sourceID, output, false) + jb.brt.updateJobStatus(batchedJobs, false, output.Error, false) + + if output.JournalOpID > 0 { + jb.brt.jobsDB.JournalDeleteEntry(output.JournalOpID) + } + + if output.Error == nil { + jb.brt.recordUploadStats(*batchedJobs.Connection, output) + uploadSuccess = true + } else { + uploadError = output.Error + } + + case IsWarehouseDestination(jb.brt.destType): + // Warehouse destinations + useRudderStorage := misc.IsConfiguredToUseRudderObjectStorage(batchedJobs.Connection.Destination.Config) + objectStorageType := warehouseutils.ObjectStorageType(jb.brt.destType, batchedJobs.Connection.Destination.Config, useRudderStorage) + + splitBatchJobs := jb.brt.splitBatchJobsOnTimeWindow(*batchedJobs) + allSuccess := true + + for _, splitJob := range splitBatchJobs { + output := jb.brt.upload(objectStorageType, splitJob, true) + defer misc.RemoveFilePaths(output.LocalFilePaths...) + + notifyWarehouseErr := false + if output.Error == nil && output.Key != "" { + output.Error = jb.brt.pingWarehouse(splitJob, output) + if output.Error != nil { + notifyWarehouseErr = true + allSuccess = false + uploadError = output.Error + } + warehouseutils.DestStat(stats.CountType, "generate_staging_files", destID).Count(1) + warehouseutils.DestStat(stats.CountType, "staging_file_batch_size", destID).Count(len(splitJob.Jobs)) + } else if output.Error != nil { + allSuccess = false + uploadError = output.Error + } + + jb.brt.recordDeliveryStatus(destID, sourceID, output, true) + jb.brt.updateJobStatus(splitJob, true, output.Error, notifyWarehouseErr) + } + + uploadSuccess = allSuccess + + case asynccommon.IsAsyncDestination(jb.brt.destType): + // Async destinations + jb.brt.sendJobsToStorage(*batchedJobs) + uploadSuccess = true + + default: + uploadError = fmt.Errorf("unsupported destination type %s for job buffer", jb.brt.destType) + jb.brt.logger.Error(uploadError) + } + + // Update failing destinations map + jb.brt.failingDestinationsMu.Lock() + jb.brt.failingDestinations[destID] = !uploadSuccess + jb.brt.failingDestinationsMu.Unlock() + + if !uploadSuccess && uploadError != nil { + jb.brt.logger.Errorf("Upload failed for destination %s: %v", destID, uploadError) + return uploadError + } + + return nil + }, + ReleaseWorker: func(sourceID, destID string) { + key := fmt.Sprintf("%s:%s", sourceID, destID) + jb.mu.Lock() + delete(jb.sourceDestMap, key) + jb.mu.Unlock() + jb.activeConsumers.Delete(key) + }, + PartitionJobs: func(jobs []*jobsdb.JobT) [][]*jobsdb.JobT { + // Group jobs into batches based on size and other criteria + var batches [][]*jobsdb.JobT + currentBatch := make([]*jobsdb.JobT, 0, jb.brt.maxEventsInABatch) + + for _, job := range jobs { + currentBatch = append(currentBatch, job) + if len(currentBatch) >= jb.brt.maxEventsInABatch { + batches = append(batches, currentBatch) + currentBatch = make([]*jobsdb.JobT, 0, jb.brt.maxEventsInABatch) + } + } + if len(currentBatch) > 0 { + batches = append(batches, currentBatch) + } + return batches + }, + } + + worker := NewConsumerWorker(sourceID, destID, jobsChan, jb.ctx, jb.brt.logger, callbacks) + jb.activeConsumers.Store(key, worker) + return worker +} + +// getSourceDestKey generates a unique key for a source-destination pair +func getSourceDestKey(sourceID, destID string) string { + return fmt.Sprintf("%s:%s", sourceID, destID) +} + +func ParseConnectionKey(key string) (sourceID, destID string) { + parts := strings.Split(key, ":") + if len(parts) != 2 { + return "", "" + } + return parts[0], parts[1] +} + +// getOrCreateJobChannel returns or creates a buffered channel for a source-destination pair +func (jb *JobBuffer) getOrCreateJobChannel(sourceID, destID string) chan *jobsdb.JobT { + key := getSourceDestKey(sourceID, destID) + + // Fast path: check if channel exists + jb.mu.RLock() + ch, exists := jb.sourceDestMap[key] + jb.mu.RUnlock() + if exists { + return ch + } + + // Slow path: create new channel + jb.mu.Lock() + defer jb.mu.Unlock() + + // Double-check to avoid race conditions + if ch, exists = jb.sourceDestMap[key]; exists { + return ch + } + + // Determine buffer size + bufferSize := jb.brt.maxEventsInABatch + if customSize := jb.brt.conf.GetIntVar(0, 0, "BatchRouter."+jb.brt.destType+".channelBufferSize", "BatchRouter.channelBufferSize"); customSize > 0 { + bufferSize = customSize + } + + // Create and initialize new channel + ch = make(chan *jobsdb.JobT, bufferSize) + jb.sourceDestMap[key] = ch + + return ch +} + +// AddJob adds a job to the appropriate buffer channel and ensures a consumer exists +func (jb *JobBuffer) AddJob(sourceID, destID string, job *jobsdb.JobT) { + key := getSourceDestKey(sourceID, destID) + ch := jb.getOrCreateJobChannel(sourceID, destID) + + // Ensure the consumer is active + jb.consumerPool.PingWorker(key) + + // Send the job to the channel + ch <- job +} + +// Shutdown gracefully shuts down the job buffer +func (jb *JobBuffer) Shutdown() { + jb.cancel() + jb.consumerPool.Shutdown() +} diff --git a/router/batchrouter/worker.go b/router/batchrouter/worker.go index 2aeeb2f3a3..c7cc99d46e 100644 --- a/router/batchrouter/worker.go +++ b/router/batchrouter/worker.go @@ -5,37 +5,73 @@ import ( "fmt" "slices" "strings" - "sync" "time" - "github.com/samber/lo" + "github.com/sony/gobreaker" "github.com/tidwall/gjson" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" - backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/jobsdb" - asynccommon "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common" routerutils "github.com/rudderlabs/rudder-server/router/utils" - "github.com/rudderlabs/rudder-server/rruntime" "github.com/rudderlabs/rudder-server/utils/misc" - warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" ) -// newWorker creates a new worker for the provided partition. +// newWorker creates a new batch router worker func newWorker(partition string, logger logger.Logger, brt *Handle) *worker { w := &worker{ partition: partition, logger: logger, brt: brt, } + w.cb = w.createCircuitBreaker() return w } +// worker represents a batch router worker that processes jobs for a specific partition type worker struct { partition string logger logger.Logger brt *Handle + cb *gobreaker.CircuitBreaker +} + +// createCircuitBreaker creates a new circuit breaker for this worker +func (w *worker) createCircuitBreaker() *gobreaker.CircuitBreaker { + maxFailures := w.brt.conf.GetIntVar(3, 1, "BatchRouter."+w.brt.destType+".circuitBreaker.maxFailures", "BatchRouter.circuitBreaker.maxFailures") + interval := w.brt.conf.GetDurationVar(1, time.Minute, "BatchRouter."+w.brt.destType+".circuitBreaker.interval", "BatchRouter.circuitBreaker.interval") + timeout := w.brt.conf.GetDurationVar(5, time.Minute, "BatchRouter."+w.brt.destType+".circuitBreaker.timeout", "BatchRouter.circuitBreaker.timeout") + + cbSettings := gobreaker.Settings{ + Name: w.partition, + MaxRequests: uint32(maxFailures), + Interval: interval, + Timeout: timeout, + ReadyToTrip: func(counts gobreaker.Counts) bool { + failureRatio := float64(counts.TotalFailures) / float64(counts.Requests) + return counts.Requests >= uint32(maxFailures) && failureRatio >= 0.5 + }, + OnStateChange: func(name string, from, to gobreaker.State) { + if from != to { + w.logger.Infof("Circuit breaker %s state change from %s to %s", name, from, to) + + // Update failing destinations map based on circuit breaker state + w.brt.failingDestinationsMu.Lock() + w.brt.failingDestinations[w.partition] = to == gobreaker.StateOpen + w.brt.failingDestinationsMu.Unlock() + + // Emit metrics for circuit breaker state changes + statTags := stats.Tags{ + "destType": w.brt.destType, + "key": name, + "state": to.String(), + } + stats.Default.NewTaggedStat("batch_router_circuit_breaker_state", stats.GaugeType, statTags).Gauge(1) + } + }, + } + + return gobreaker.NewCircuitBreaker(cbSettings) } // Work retrieves jobs from batch router for the worker's partition and processes them, @@ -43,227 +79,224 @@ type worker struct { // The function returns when processing completes and the return value is true if at least 1 job was processed, // false otherwise. func (w *worker) Work() bool { - brt := w.brt - workerJobs := brt.getWorkerJobs(w.partition) - if len(workerJobs) == 0 { + // Execute work through circuit breaker + result, err := w.cb.Execute(func() (interface{}, error) { + workerJobs := w.brt.getWorkerJobs(w.partition) + if len(workerJobs) == 0 { + return false, nil + } + + // Record worker activity - number of destination jobs being processed + stats.Default.NewTaggedStat("batch_router_worker_jobs", stats.GaugeType, stats.Tags{ + "partition": w.partition, + "destType": w.brt.destType, + }).Gauge(len(workerJobs)) + + for _, workerJob := range workerJobs { + w.routeJobsToBuffer(workerJob) + } + return true, nil + }) + if err != nil { + w.logger.Errorf("Circuit breaker prevented processing for partition %s: %v", w.partition, err) return false } - var jobsWg sync.WaitGroup - jobsWg.Add(len(workerJobs)) - for _, workerJob := range workerJobs { - w.processJobAsync(&jobsWg, workerJob) - } - jobsWg.Wait() - return true + return result.(bool) } -// processJobAsync spawns a goroutine and processes the destination's jobs. The provided wait group is notified when the goroutine completes. -func (w *worker) processJobAsync(jobsWg *sync.WaitGroup, destinationJobs *DestinationJobs) { +// routeJobsToBuffer sends jobs to appropriate channels in the job buffer. +// This is the entry point for jobs into the buffering system: +// 1. Jobs are checked for drain conditions (e.g., source not found, expired) +// 2. Jobs passing validation are marked as executing in JobsDB +// 3. Valid jobs are sent to JobBuffer.AddJob which: +// - Ensures a consumer worker exists for the source-destination pair +// - Adds the job to a channel specific to that source-destination pair +// 4. The ConsumerWorker then picks up jobs from the channel and processes them +// in batches based on batch size and time thresholds +func (w *worker) routeJobsToBuffer(destinationJobs *DestinationJobs) { brt := w.brt - rruntime.Go(func() { - defer brt.limiter.process.Begin("")() - defer jobsWg.Done() - destWithSources := destinationJobs.destWithSources - parameterFilters := []jobsdb.ParameterFilterT{{Name: "destination_id", Value: destWithSources.Destination.ID}} - var statusList []*jobsdb.JobStatusT - var drainList []*jobsdb.JobStatusT - var drainJobList []*jobsdb.JobT - drainStatsbyDest := make(map[string]*routerutils.DrainStats) - jobIDConnectionDetailsMap := make(map[int64]jobsdb.ConnectionDetails) + destWithSources := destinationJobs.destWithSources + parameterFilters := []jobsdb.ParameterFilterT{{Name: "destination_id", Value: destWithSources.Destination.ID}} + var statusList []*jobsdb.JobStatusT + var drainList []*jobsdb.JobStatusT + var drainJobList []*jobsdb.JobT + drainStatsbyDest := make(map[string]*routerutils.DrainStats) + jobIDConnectionDetailsMap := make(map[int64]jobsdb.ConnectionDetails) - jobsBySource := make(map[string][]*jobsdb.JobT) - for _, job := range destinationJobs.jobs { - sourceID := gjson.GetBytes(job.Parameters, "source_id").String() - destinationID := destWithSources.Destination.ID - jobIDConnectionDetailsMap[job.JobID] = jobsdb.ConnectionDetails{ - SourceID: sourceID, - DestinationID: destinationID, - } - if drain, reason := brt.drainer.Drain( - job.CreatedAt, - destWithSources.Destination.ID, - gjson.GetBytes(job.Parameters, "source_job_run_id").String(), - ); drain { - status := jobsdb.JobStatusT{ - JobID: job.JobID, - AttemptNum: job.LastJobStatus.AttemptNum + 1, - JobState: jobsdb.Aborted.State, - ExecTime: time.Now(), - RetryTime: time.Now(), - ErrorCode: routerutils.DRAIN_ERROR_CODE, - ErrorResponse: routerutils.EnhanceJSON([]byte(`{}`), "reason", reason), - Parameters: []byte(`{}`), // check - JobParameters: job.Parameters, - WorkspaceId: job.WorkspaceId, - } - // Enhancing job parameter with the drain reason. - job.Parameters = routerutils.EnhanceJSON(job.Parameters, "stage", "batch_router") - job.Parameters = routerutils.EnhanceJSON(job.Parameters, "reason", reason) - drainList = append(drainList, &status) - drainJobList = append(drainJobList, job) - if _, ok := drainStatsbyDest[destinationID]; !ok { - drainStatsbyDest[destinationID] = &routerutils.DrainStats{ - Count: 0, - Reasons: []string{}, - Workspace: job.WorkspaceId, - } - } - drainStatsbyDest[destinationID].Count = drainStatsbyDest[destinationID].Count + 1 - if !slices.Contains(drainStatsbyDest[destinationID].Reasons, reason) { - drainStatsbyDest[destinationID].Reasons = append(drainStatsbyDest[destinationID].Reasons, reason) - } - } else { - if _, ok := jobsBySource[sourceID]; !ok { - jobsBySource[sourceID] = []*jobsdb.JobT{} - } - jobsBySource[sourceID] = append(jobsBySource[sourceID], job) + // Organize jobs by destination and source + type jobEntry struct { + job *jobsdb.JobT + status *jobsdb.JobStatusT + sourceID string + destID string + } - status := jobsdb.JobStatusT{ - JobID: job.JobID, - AttemptNum: job.LastJobStatus.AttemptNum + 1, - JobState: jobsdb.Executing.State, - ExecTime: time.Now(), - RetryTime: time.Now(), - ErrorCode: "", - ErrorResponse: []byte(`{}`), // check - Parameters: []byte(`{}`), // check - JobParameters: job.Parameters, - WorkspaceId: job.WorkspaceId, + var jobsToBuffer []jobEntry + + // Process jobs and check for drain conditions + for _, job := range destinationJobs.jobs { + sourceID := gjson.GetBytes(job.Parameters, "source_id").String() + destinationID := destWithSources.Destination.ID + jobIDConnectionDetailsMap[job.JobID] = jobsdb.ConnectionDetails{ + SourceID: sourceID, + DestinationID: destinationID, + } + sourceFound := false + for _, s := range destWithSources.Sources { + if s.ID == sourceID { + sourceFound = true + break + } + } + if !sourceFound { + status := jobsdb.JobStatusT{ + JobID: job.JobID, + AttemptNum: job.LastJobStatus.AttemptNum + 1, + JobState: jobsdb.Aborted.State, + ExecTime: time.Now(), + RetryTime: time.Now(), + ErrorCode: routerutils.DRAIN_ERROR_CODE, + ErrorResponse: routerutils.EnhanceJSON([]byte(`{}`), "reason", "source_not_found"), + Parameters: []byte(`{}`), + JobParameters: job.Parameters, + WorkspaceId: job.WorkspaceId, + } + job.Parameters = routerutils.EnhanceJSON(job.Parameters, "stage", "batch_router") + job.Parameters = routerutils.EnhanceJSON(job.Parameters, "reason", "source_not_found") + drainList = append(drainList, &status) + drainJobList = append(drainJobList, job) + if _, ok := drainStatsbyDest[destinationID]; !ok { + drainStatsbyDest[destinationID] = &routerutils.DrainStats{ + Count: 0, + Reasons: []string{}, + Workspace: job.WorkspaceId, } - statusList = append(statusList, &status) } + drainStatsbyDest[destinationID].Count = drainStatsbyDest[destinationID].Count + 1 + if !slices.Contains(drainStatsbyDest[destinationID].Reasons, "source_not_found") { + drainStatsbyDest[destinationID].Reasons = append(drainStatsbyDest[destinationID].Reasons, "source_not_found") + } + continue } - // Mark the drainList jobs as Aborted - if len(drainList) > 0 { - err := misc.RetryWithNotify(context.Background(), brt.jobsDBCommandTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) error { - return brt.errorDB.Store(ctx, drainJobList) - }, brt.sendRetryStoreStats) - if err != nil { - panic(fmt.Errorf("storing %s jobs into ErrorDB: %w", brt.destType, err)) + + // Normal job processing path - check custom drainer conditions first + if drain, reason := brt.drainer.Drain( + job.CreatedAt, + destWithSources.Destination.ID, + gjson.GetBytes(job.Parameters, "source_job_run_id").String(), + ); drain { + status := jobsdb.JobStatusT{ + JobID: job.JobID, + AttemptNum: job.LastJobStatus.AttemptNum + 1, + JobState: jobsdb.Aborted.State, + ExecTime: time.Now(), + RetryTime: time.Now(), + ErrorCode: routerutils.DRAIN_ERROR_CODE, + ErrorResponse: routerutils.EnhanceJSON([]byte(`{}`), "reason", reason), + Parameters: []byte(`{}`), + JobParameters: job.Parameters, + WorkspaceId: job.WorkspaceId, } - reportMetrics := brt.getReportMetrics(getReportMetricsParams{ - StatusList: drainList, - ParametersMap: brt.getParamertsFromJobs(drainJobList), - }) - err = misc.RetryWithNotify(context.Background(), brt.jobsDBCommandTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) error { - return brt.jobsDB.WithUpdateSafeTx(ctx, func(tx jobsdb.UpdateSafeTx) error { - err := brt.jobsDB.UpdateJobStatusInTx(ctx, tx, drainList, []string{brt.destType}, parameterFilters) - if err != nil { - return fmt.Errorf("marking %s job statuses as aborted: %w", brt.destType, err) - } - if brt.reporting != nil && brt.reportingEnabled { - if err = brt.reporting.Report(ctx, reportMetrics, tx.Tx()); err != nil { - return fmt.Errorf("reporting metrics: %w", err) - } - } - // rsources stats - return brt.updateRudderSourcesStats(ctx, tx, drainJobList, drainList) - }) - }, brt.sendRetryUpdateStats) - if err != nil { - panic(err) + job.Parameters = routerutils.EnhanceJSON(job.Parameters, "stage", "batch_router") + job.Parameters = routerutils.EnhanceJSON(job.Parameters, "reason", reason) + drainList = append(drainList, &status) + drainJobList = append(drainJobList, job) + if _, ok := drainStatsbyDest[destinationID]; !ok { + drainStatsbyDest[destinationID] = &routerutils.DrainStats{ + Count: 0, + Reasons: []string{}, + Workspace: job.WorkspaceId, + } } - routerutils.UpdateProcessedEventsMetrics(stats.Default, module, brt.destType, statusList, jobIDConnectionDetailsMap) - for destID, destDrainStat := range drainStatsbyDest { - stats.Default.NewTaggedStat("drained_events", stats.CountType, stats.Tags{ - "destType": brt.destType, - "destId": destID, - "module": "batchrouter", - "reasons": strings.Join(destDrainStat.Reasons, ", "), - "workspaceId": destDrainStat.Workspace, - }).Count(destDrainStat.Count) - w.brt.pendingEventsRegistry.DecreasePendingEvents("batch_rt", destDrainStat.Workspace, brt.destType, float64(destDrainStat.Count)) + drainStatsbyDest[destinationID].Count = drainStatsbyDest[destinationID].Count + 1 + if !slices.Contains(drainStatsbyDest[destinationID].Reasons, reason) { + drainStatsbyDest[destinationID].Reasons = append(drainStatsbyDest[destinationID].Reasons, reason) } + } else { + status := jobsdb.JobStatusT{ + JobID: job.JobID, + AttemptNum: job.LastJobStatus.AttemptNum + 1, + JobState: jobsdb.Executing.State, + ExecTime: time.Now(), + RetryTime: time.Now(), + ErrorCode: "", + ErrorResponse: []byte(`{}`), + Parameters: []byte(`{}`), + JobParameters: job.Parameters, + WorkspaceId: job.WorkspaceId, + } + + statusList = append(statusList, &status) + jobsToBuffer = append(jobsToBuffer, jobEntry{ + job: job, + status: &status, + sourceID: sourceID, + destID: destinationID, + }) } - // Mark the jobs as executing + } + + // Mark jobs as executing in a single batch operation + if len(statusList) > 0 { + brt.logger.Debugf("BRT: %s: DB Status update complete for parameter Filters: %v", brt.destType, parameterFilters) err := misc.RetryWithNotify(context.Background(), brt.jobsDBCommandTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) error { return brt.jobsDB.UpdateJobStatus(ctx, statusList, []string{brt.destType}, parameterFilters) }, brt.sendRetryUpdateStats) if err != nil { - panic(fmt.Errorf("storing %s jobs into ErrorDB: %w", brt.destType, err)) + panic(fmt.Errorf("updating %s job statuses: %w", brt.destType, err)) } brt.logger.Debugf("BRT: %s: DB Status update complete for parameter Filters: %v", brt.destType, parameterFilters) - var wg sync.WaitGroup - wg.Add(len(jobsBySource)) - - for sourceID, jobs := range jobsBySource { - source, found := lo.Find(destWithSources.Sources, func(s backendconfig.SourceT) bool { - return s.ID == sourceID - }) - batchedJobs := BatchedJobs{ - Jobs: jobs, - Connection: &Connection{ - Destination: destWithSources.Destination, - Source: source, - }, - } - if !found { - // TODO: Should not happen. Handle this - err := fmt.Errorf("BRT: Batch destination source not found in config for sourceID: %s", sourceID) - brt.updateJobStatus(&batchedJobs, false, err, false) - wg.Done() - continue - } - rruntime.Go(func() { - defer brt.limiter.upload.Begin("")() - switch { - case IsObjectStorageDestination(brt.destType): - destUploadStat := stats.Default.NewTaggedStat("batch_router_dest_upload_time", stats.TimerType, stats.Tags{ - "destType": brt.destType, - }) - destUploadStart := time.Now() - output := brt.upload(brt.destType, &batchedJobs, false) - brt.recordDeliveryStatus(*batchedJobs.Connection, output, false) - brt.updateJobStatus(&batchedJobs, false, output.Error, false) - misc.RemoveFilePaths(output.LocalFilePaths...) - if output.JournalOpID > 0 { - brt.jobsDB.JournalDeleteEntry(output.JournalOpID) - } - if output.Error == nil { - brt.recordUploadStats(*batchedJobs.Connection, output) - } + // Now that all statuses are updated, we can safely send jobs to buffer + for _, entry := range jobsToBuffer { + // Use the AddJob method which handles circuit breaker integration + brt.jobBuffer.AddJob(entry.sourceID, entry.destID, entry.job) + } + } - destUploadStat.Since(destUploadStart) - case IsWarehouseDestination(brt.destType): - useRudderStorage := misc.IsConfiguredToUseRudderObjectStorage(batchedJobs.Connection.Destination.Config) - objectStorageType := warehouseutils.ObjectStorageType(brt.destType, batchedJobs.Connection.Destination.Config, useRudderStorage) - destUploadStat := stats.Default.NewTaggedStat("batch_router_dest_upload_time", stats.TimerType, stats.Tags{ - "destType": brt.destType, - "objectStorageType": objectStorageType, - }) - destUploadStart := time.Now() - splitBatchJobs := brt.splitBatchJobsOnTimeWindow(batchedJobs) - for _, batchJob := range splitBatchJobs { - output := brt.upload(objectStorageType, batchJob, true) - notifyWarehouseErr := false - if output.Error == nil && output.Key != "" { - output.Error = brt.pingWarehouse(batchJob, output) - if output.Error != nil { - notifyWarehouseErr = true - } - warehouseutils.DestStat(stats.CountType, "generate_staging_files", batchJob.Connection.Destination.ID).Count(1) - warehouseutils.DestStat(stats.CountType, "staging_file_batch_size", batchJob.Connection.Destination.ID).Count(len(batchJob.Jobs)) - } - brt.recordDeliveryStatus(*batchJob.Connection, output, true) - brt.updateJobStatus(batchJob, true, output.Error, notifyWarehouseErr) - misc.RemoveFilePaths(output.LocalFilePaths...) + // Mark the drainList jobs as Aborted + if len(drainList) > 0 { + w.brt.logger.Info("drainList", drainList) + err := misc.RetryWithNotify(context.Background(), brt.jobsDBCommandTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) error { + return brt.errorDB.Store(ctx, drainJobList) + }, brt.sendRetryStoreStats) + if err != nil { + panic(fmt.Errorf("storing %s jobs into ErrorDB: %w", brt.destType, err)) + } + reportMetrics := brt.getReportMetrics(getReportMetricsParams{ + StatusList: drainList, + ParametersMap: brt.getParamertsFromJobs(drainJobList), + }) + err = misc.RetryWithNotify(context.Background(), brt.jobsDBCommandTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) error { + return brt.jobsDB.WithUpdateSafeTx(ctx, func(tx jobsdb.UpdateSafeTx) error { + err := brt.jobsDB.UpdateJobStatusInTx(ctx, tx, drainList, []string{brt.destType}, parameterFilters) + if err != nil { + return fmt.Errorf("marking %s job statuses as aborted: %w", brt.destType, err) + } + if brt.reporting != nil && brt.reportingEnabled { + if err = brt.reporting.Report(ctx, reportMetrics, tx.Tx()); err != nil { + return fmt.Errorf("reporting metrics: %w", err) } - destUploadStat.Since(destUploadStart) - case asynccommon.IsAsyncDestination(brt.destType): - destUploadStat := stats.Default.NewTaggedStat("batch_router_dest_upload_time", stats.TimerType, stats.Tags{ - "destType": brt.destType, - }) - destUploadStart := time.Now() - brt.sendJobsToStorage(batchedJobs) - destUploadStat.Since(destUploadStart) } - wg.Done() + return brt.updateRudderSourcesStats(ctx, tx, drainJobList, drainList) }) + }, brt.sendRetryUpdateStats) + if err != nil { + panic(err) } - wg.Wait() - }) + routerutils.UpdateProcessedEventsMetrics(stats.Default, module, brt.destType, statusList, jobIDConnectionDetailsMap) + for destID, destDrainStat := range drainStatsbyDest { + stats.Default.NewTaggedStat("drained_events", stats.CountType, stats.Tags{ + "destType": brt.destType, + "destId": destID, + "module": "batchrouter", + "reasons": strings.Join(destDrainStat.Reasons, ", "), + "workspaceId": destDrainStat.Workspace, + }).Count(destDrainStat.Count) + w.brt.pendingEventsRegistry.DecreasePendingEvents("batch_rt", destDrainStat.Workspace, brt.destType, float64(destDrainStat.Count)) + } + } } // SleepDurations returns the min and max sleep durations for the worker when idle, i.e when [Work] returns false. @@ -282,5 +315,4 @@ func (w *worker) SleepDurations() (min, max time.Duration) { // Stop is no-op for this worker since the worker is not running any goroutine internally. func (w *worker) Stop() { - // no-op }