Skip to content

Commit 0e74717

Browse files
committed
fix: replace mutex with atomic operations to optimize loggedEvents management
1 parent 68cefdf commit 0e74717

File tree

2 files changed

+9
-17
lines changed

2 files changed

+9
-17
lines changed

processor/internal/transformer/destination_transformer/destination_transformer.go

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,28 +9,26 @@ import (
99
"strconv"
1010
"strings"
1111
"sync"
12+
"sync/atomic"
1213
"time"
1314

14-
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
15-
"github.com/rudderlabs/rudder-server/jsonrs"
16-
transformerfs "github.com/rudderlabs/rudder-server/services/transformer"
17-
18-
obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
19-
2015
"github.com/cenkalti/backoff"
2116
"github.com/samber/lo"
2217

2318
"github.com/rudderlabs/rudder-go-kit/config"
2419
"github.com/rudderlabs/rudder-go-kit/filemanager"
2520
"github.com/rudderlabs/rudder-go-kit/logger"
2621
"github.com/rudderlabs/rudder-go-kit/stats"
27-
22+
obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
23+
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
2824
transformerclient "github.com/rudderlabs/rudder-server/internal/transformer-client"
25+
"github.com/rudderlabs/rudder-server/jsonrs"
2926
"github.com/rudderlabs/rudder-server/processor/integrations"
3027
transformerutils "github.com/rudderlabs/rudder-server/processor/internal/transformer"
3128
"github.com/rudderlabs/rudder-server/processor/internal/transformer/destination_transformer/embedded/kafka"
3229
"github.com/rudderlabs/rudder-server/processor/internal/transformer/destination_transformer/embedded/pubsub"
3330
"github.com/rudderlabs/rudder-server/processor/types"
31+
transformerfs "github.com/rudderlabs/rudder-server/services/transformer"
3432
"github.com/rudderlabs/rudder-server/utils/httputil"
3533
reportingtypes "github.com/rudderlabs/rudder-server/utils/types"
3634
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
@@ -79,9 +77,6 @@ func New(conf *config.Config, log logger.Logger, stat stats.Stats, opts ...Opt)
7977
handle.stats.matchedEvents = handle.stat.NewStat("embedded_destination_transform_matched_events", stats.CountType)
8078
handle.stats.mismatchedEvents = handle.stat.NewStat("embedded_destination_transform_mismatched_events", stats.CountType)
8179

82-
handle.loggedEvents = 0
83-
handle.loggedEventsMu = sync.Mutex{}
84-
8580
var err error
8681
handle.samplingFileManager, err = getSamplingUploader(conf, log)
8782
if err != nil {
@@ -123,8 +118,7 @@ type Client struct {
123118
mismatchedEvents stats.Counter
124119
}
125120

126-
loggedEventsMu sync.Mutex
127-
loggedEvents int64
121+
loggedEvents atomic.Int64
128122
samplingFileManager *filemanager.S3Manager
129123
}
130124

processor/internal/transformer/destination_transformer/logger.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,7 @@ func (c *Client) CompareAndLog(
2626
return
2727
}
2828

29-
c.loggedEventsMu.Lock()
30-
defer c.loggedEventsMu.Unlock()
31-
32-
if c.loggedEvents >= int64(c.config.maxLoggedEvents.Load()) {
29+
if c.loggedEvents.Load() >= int64(c.config.maxLoggedEvents.Load()) {
3330
return
3431
}
3532

@@ -58,7 +55,8 @@ func (c *Client) CompareAndLog(
5855
logger.NewStringField("location", file.Location),
5956
logger.NewStringField("objectName", file.ObjectName),
6057
)
61-
c.loggedEvents += int64(len(differingResponse))
58+
59+
c.loggedEvents.And(int64(len(differingResponse)))
6260
}
6361

6462
func (c *Client) differingEvents(

0 commit comments

Comments
 (0)