Skip to content

Commit 00f7e4f

Browse files
authored
fix: reporting race condition between flusher and syncer (#6585)
# Description ## Problem This PR fixes a critical race condition in the reporting system that causes metrics data loss when transactions take longer than expected to commit. ### The Race Condition The issue occurs due to timing between three operations: 1. **Report()** - Writing metrics to the database (with transaction commit time) 2. **getReports()** - Reading and aggregating reports from a time bucket 3. **DELETE** - Removing processed reports after aggregation ### Data Loss Scenarios #### Scenario 1: Data Deleted Before Commit Completes If a `Report()` transaction (store stage) takes longer than the time between `getReports()` + `mainLoop()` iteration but **shorter** than reaching the DELETE operation: 1. Transaction starts writing metrics for minute bucket `T` 2. `getReports()` reads bucket `T` (doesn't see uncommitted transaction) 3. Transaction commits (data now in bucket `T`) 4. DELETE removes all data from bucket `T` 5. **Result: Metrics are deleted without ever being aggregated → Complete data loss** #### Scenario 2: Duplicate Send Causes Data Loss via Deduplication If the store stage (commit) takes **longer** than even the DELETE statement: 1. Transaction starts writing metrics for minute bucket `T` 2. `getReports()` reads bucket `T` (doesn't see uncommitted transaction) 3. Aggregated data sent to reporting service (without slow transaction data) 4. DELETE removes data from bucket `T` 5. Transaction finally commits (data appears in bucket `T`) 6. Next `getReports()` reads bucket `T` again (now includes slow transaction) 7. Data sent to reporting service again However, the reporting service contract expects **minute level aggregative data** and performs deduplication. When the same time bucket is sent twice: - First send: Missing the slow transaction's data - Second send: Contains only the slow transaction's data - **Result: Reporting service deduplicates and keeps only one, losing the other → Partial data loss** ## Solution This PR implements transaction tracking to ensure reports are only flushed when safe: 1. **Track active transactions by time bucket** - `activeTransactionsByReportedAt` map maintains a count of ongoing transactions per minute bucket 2. **Block unsafe flushes** - `getReports()` checks if any active transactions have `reportedAt < bucketEnd` before flushing 3. **Return specific error** - `errBlockedByActiveTransactions` signals blocking without logging noise 4. **Lifecycle management** - Transaction counter incremented on start, decremented on completion (via defer) 5. **New metric** - `StatReportingMainLoopBlockedDueToActiveTransactionsCount` tracks blocking frequency ### Key Implementation Details - Mutex-protected concurrent access to transaction tracking map - Defer ensures cleanup even on panics/early returns - Non-intrusive error handling (doesn't spam logs for expected blocking) - Proper cleanup (deletes map entry when count reaches 0) ## Linear Ticket [PIPE-2679](https://linear.app/rudderstack/issue/PIPE-2679/reporting-race-condition-between-flusher-and-syncer) ## Security - [ ] The code changed/added as part of this pull request won't create any security issues with how the software is being used.
1 parent b99eee6 commit 00f7e4f

File tree

3 files changed

+448
-11
lines changed

3 files changed

+448
-11
lines changed

enterprise/reporting/reporting.go

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,18 @@ import (
3131

3232
const ReportsTable = "reports"
3333

34+
var errBlockedByActiveTransactions = errors.New("blocked by active transactions")
35+
3436
const (
35-
StatReportingMainLoopTime = "reporting_client_main_loop_time"
36-
StatReportingGetReportsTime = "reporting_client_get_reports_time"
37-
StatReportingGetReportsCount = "reporting_client_get_reports_count"
38-
StatReportingGetAggregatedReportsTime = "reporting_client_get_aggregated_reports_time"
39-
StatReportingGetAggregatedReportsCount = "reporting_client_get_aggregated_reports_count"
40-
StatReportingGetMinReportedAtQueryTime = "reporting_client_get_min_reported_at_query_time"
41-
StatReportingGetReportsQueryTime = "reporting_client_get_reports_query_time"
42-
StatReportingVacuumDuration = "reporting_vacuum_duration"
37+
StatReportingMainLoopTime = "reporting_client_main_loop_time"
38+
StatReportingGetReportsTime = "reporting_client_get_reports_time"
39+
StatReportingGetReportsCount = "reporting_client_get_reports_count"
40+
StatReportingGetAggregatedReportsTime = "reporting_client_get_aggregated_reports_time"
41+
StatReportingGetAggregatedReportsCount = "reporting_client_get_aggregated_reports_count"
42+
StatReportingGetMinReportedAtQueryTime = "reporting_client_get_min_reported_at_query_time"
43+
StatReportingGetReportsQueryTime = "reporting_client_get_reports_query_time"
44+
StatReportingVacuumDuration = "reporting_vacuum_duration"
45+
StatReportingMainLoopBlockedDueToActiveTransactionsCount = "reporting_main_loop_blocked_due_to_active_transactions_count"
4346
)
4447

4548
type DefaultReporter struct {
@@ -74,6 +77,9 @@ type DefaultReporter struct {
7477
eventNamePrefixLength config.ValueLoader[int]
7578
eventNameSuffixLength config.ValueLoader[int]
7679
commonClient *client.Client
80+
81+
activeTransactionsByReportedAtMutex sync.Mutex
82+
activeTransactionsByReportedAt map[int64]int64
7783
}
7884

7985
func NewDefaultReporter(ctx context.Context, conf *config.Config, log logger.Logger, configSubscriber *configSubscriber, stats stats.Stats) *DefaultReporter {
@@ -134,6 +140,7 @@ func NewDefaultReporter(ctx context.Context, conf *config.Config, log logger.Log
134140
eventNamePrefixLength: eventNamePrefixLength,
135141
eventNameSuffixLength: eventNameSuffixLength,
136142
commonClient: client.New(client.RouteMetrics, conf, log, stats),
143+
activeTransactionsByReportedAt: make(map[int64]int64),
137144
}
138145
}
139146

@@ -240,6 +247,20 @@ func (r *DefaultReporter) getReports(currentMs, aggregationIntervalMin int64, sy
240247
return nil, 0, nil
241248
}
242249

250+
safeToFlushCurrentBucket := true
251+
r.activeTransactionsByReportedAtMutex.Lock()
252+
for transactionReportedAt := range r.activeTransactionsByReportedAt {
253+
if transactionReportedAt < bucketEnd {
254+
safeToFlushCurrentBucket = false
255+
break
256+
}
257+
}
258+
r.activeTransactionsByReportedAtMutex.Unlock()
259+
260+
if !safeToFlushCurrentBucket {
261+
return nil, 0, errBlockedByActiveTransactions
262+
}
263+
243264
groupByColumns := "workspace_id, namespace, instance_id, source_definition_id, source_category, source_id, destination_definition_id, destination_id, source_task_run_id, source_job_id, source_job_run_id, transformation_id, transformation_version_id, tracking_plan_id, tracking_plan_version, in_pu, pu, status, terminal_state, initial_state, status_code, event_name, event_type, error_type"
244265
sqlStatement = fmt.Sprintf(`
245266
SELECT
@@ -415,6 +436,7 @@ func (r *DefaultReporter) mainLoop(ctx context.Context, c types.SyncerConfig) {
415436
getReportsCount := r.stats.NewTaggedStat(StatReportingGetReportsCount, stats.HistogramType, tags)
416437
getAggregatedReportsTimer := r.stats.NewTaggedStat(StatReportingGetAggregatedReportsTime, stats.TimerType, tags)
417438
getAggregatedReportsCount := r.stats.NewTaggedStat(StatReportingGetAggregatedReportsCount, stats.HistogramType, tags)
439+
loopBlockedDueToActiveTransactionsCount := r.stats.NewTaggedStat(StatReportingMainLoopBlockedDueToActiveTransactionsCount, stats.CountType, stats.Tags{"clientName": c.Label})
418440

419441
r.getMinReportedAtQueryTime = r.stats.NewTaggedStat(StatReportingGetMinReportedAtQueryTime, stats.TimerType, tags)
420442
r.getReportsQueryTime = r.stats.NewTaggedStat(StatReportingGetReportsQueryTime, stats.TimerType, tags)
@@ -452,7 +474,11 @@ func (r *DefaultReporter) mainLoop(ctx context.Context, c types.SyncerConfig) {
452474
aggregationIntervalMin := int64(aggregationInterval.Load().Minutes())
453475
reports, reportedAt, err := r.getReports(currentMin, aggregationIntervalMin, c.ConnInfo)
454476
if err != nil {
455-
r.log.Errorn("getting reports", obskit.Error(err))
477+
if errors.Is(err, errBlockedByActiveTransactions) {
478+
loopBlockedDueToActiveTransactionsCount.Increment()
479+
} else {
480+
r.log.Errorn("getting reports", obskit.Error(err))
481+
}
456482
select {
457483
case <-ctx.Done():
458484
r.log.Infon("stopping mainLoop for syncer",
@@ -633,7 +659,25 @@ func (r *DefaultReporter) Report(ctx context.Context, metrics []*types.PUReporte
633659
}
634660
defer func() { _ = stmt.Close() }()
635661

662+
r.activeTransactionsByReportedAtMutex.Lock()
636663
reportedAt := time.Now().UTC().Unix() / 60
664+
r.activeTransactionsByReportedAt[reportedAt]++
665+
r.activeTransactionsByReportedAtMutex.Unlock()
666+
667+
// Cleanup when transaction completes (success or failure)
668+
txn.AddFinallyListener(func() {
669+
r.activeTransactionsByReportedAtMutex.Lock()
670+
if r.activeTransactionsByReportedAt[reportedAt] == 1 {
671+
delete(r.activeTransactionsByReportedAt, reportedAt)
672+
} else if r.activeTransactionsByReportedAt[reportedAt] > 1 {
673+
r.activeTransactionsByReportedAt[reportedAt]--
674+
} else {
675+
// This should never happen - indicates a bug in transaction tracking
676+
r.log.Errorn("active transactions counter is invalid")
677+
}
678+
r.activeTransactionsByReportedAtMutex.Unlock()
679+
})
680+
637681
for _, metric := range metrics {
638682
workspaceID := r.configSubscriber.WorkspaceIDFromSource(metric.SourceID)
639683
metric := *metric

utils/tx/tx.go

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,76 @@ package tx
33
import "database/sql"
44

55
// Tx is a wrapper around sql.Tx that supports registering and executing
6-
// post-commit actions, a.k.a. success listeners.
6+
// post-commit actions (success listeners), post-rollback actions (failure listeners),
7+
// and post-completion actions (finally listeners that run regardless of outcome).
78
type Tx struct {
89
*sql.Tx
910
successListeners []func()
11+
failureListeners []func()
12+
finallyListeners []func()
1013
}
1114

1215
// AddSuccessListener registers a listener to be executed after the transaction has been committed successfully.
1316
func (tx *Tx) AddSuccessListener(listener func()) {
1417
tx.successListeners = append(tx.successListeners, listener)
1518
}
1619

17-
// Commit commits the transaction and executes all listeners.
20+
// AddFailureListener registers a listener to be executed after the transaction has been rolled back.
21+
func (tx *Tx) AddFailureListener(listener func()) {
22+
tx.failureListeners = append(tx.failureListeners, listener)
23+
}
24+
25+
// AddFinallyListener registers a listener to be executed after the transaction completes,
26+
// regardless of whether it was committed or rolled back. Finally listeners are executed
27+
// after success/failure listeners. Useful for cleanup that should always happen.
28+
func (tx *Tx) AddFinallyListener(listener func()) {
29+
tx.finallyListeners = append(tx.finallyListeners, listener)
30+
}
31+
32+
// Commit commits the transaction and executes all success listeners on success,
33+
// or failure listeners if the commit fails. Finally listeners are always executed.
34+
// Failure and finally listeners are cleared to prevent double-firing if Rollback is called afterward.
1835
func (tx *Tx) Commit() error {
1936
err := tx.Tx.Commit()
37+
2038
if err == nil {
2139
for _, successListener := range tx.successListeners {
2240
successListener()
2341
}
42+
} else {
43+
for _, failureListener := range tx.failureListeners {
44+
failureListener()
45+
}
46+
}
47+
for _, finallyListener := range tx.finallyListeners {
48+
finallyListener()
2449
}
50+
51+
// Clear failure and finally listeners to prevent double-firing if Rollback() is called after Commit()
52+
// Success listeners don't need clearing as they only fire in Commit() on success
53+
tx.failureListeners = nil
54+
tx.finallyListeners = nil
55+
56+
return err
57+
}
58+
59+
// Rollback rolls back the transaction and executes all failure listeners,
60+
// followed by finally listeners.
61+
// Failure and finally listeners are cleared to prevent double-firing if Commit is called afterward.
62+
func (tx *Tx) Rollback() error {
63+
err := tx.Tx.Rollback()
64+
65+
for _, failureListener := range tx.failureListeners {
66+
failureListener()
67+
}
68+
for _, finallyListener := range tx.finallyListeners {
69+
finallyListener()
70+
}
71+
72+
// Clear failure and finally listeners to prevent double-firing if Commit() is called after Rollback()
73+
// Success listeners don't need clearing as they won't fire in Commit() after a Rollback()
74+
tx.failureListeners = nil
75+
tx.finallyListeners = nil
76+
2577
return err
2678
}

0 commit comments

Comments
 (0)