Skip to content

Commit d3ba020

Browse files
author
Muhammad Fadli Gunardi
committed
fix: draft event name and product metric
1 parent 0f85e97 commit d3ba020

15 files changed

Lines changed: 492 additions & 157 deletions

File tree

config/dedup.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ type dedupConfig struct {
2020
// Enabled controls whether deduplication is active.
2121
// Set DEDUP_ENABLED=true to enable.
2222
Enabled bool
23-
// IdentifierMapping holds the user and session ID field mappings for a connection group.
24-
IdentifierMapping map[string]Identifier
2523
// ProtoClassNameMapping maps event_type to proto class name.
2624
ProtoClassNameMapping map[string]string
2725
// WhitelistConnGroup is a list of connection groups that are processed with dedup.
@@ -31,14 +29,6 @@ type dedupConfig struct {
3129
func dedupConfigLoader() {
3230
viper.SetDefault("DEDUP_ENABLED", "false")
3331

34-
connGroupIdentifierMap := make(map[string]Identifier)
35-
rawMapping := util.MustGetString("DEDUP_IDENTIFIER_MAPPING")
36-
if rawMapping != "" {
37-
if err := json.Unmarshal([]byte(rawMapping), &connGroupIdentifierMap); err != nil {
38-
panic("config: invalid DEDUP_IDENTIFIER_MAPPING: " + err.Error())
39-
}
40-
}
41-
4232
var rawWhitelist []string
4333

4434
rawWhitelistStr := util.MustGetString("DEDUP_WHITELIST_CONN_GROUP")
@@ -62,7 +52,6 @@ func dedupConfigLoader() {
6252
DedupCfg = dedupConfig{
6353
Enabled: util.MustGetBool("DEDUP_ENABLED"),
6454
WhitelistConnGroup: whitelistMap,
65-
IdentifierMapping: connGroupIdentifierMap,
6655
ProtoClassNameMapping: protoClassNameMap,
6756
}
6857
}

ingestionrule/action/deactivate.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
pb "buf.build/gen/go/gotocompany/proton/protocolbuffers/go/gotocompany/raccoon/v1beta1"
99
"github.com/goto/raccoon/config"
10+
"github.com/goto/raccoon/ingestionrule/action/dedup/schemaregistry"
1011
"github.com/goto/raccoon/ingestionrule/action/eval/cache"
1112
"github.com/goto/raccoon/logger"
1213
"github.com/goto/raccoon/metrics"
@@ -17,11 +18,16 @@ import (
1718
type Deactivate struct {
1819
cache *cache.Cache
1920
evalChain Chain
21+
stencil schemaregistry.StencilClient
2022
}
2123

2224
// NewDeactivate creates a new Deactivate action with the given cache and evaluator chain.
23-
func NewDeactivate(c *cache.Cache, evalChain Chain) *Deactivate {
24-
return &Deactivate{cache: c, evalChain: evalChain}
25+
func NewDeactivate(c *cache.Cache, evalChain Chain, stencil schemaregistry.StencilClient) *Deactivate {
26+
return &Deactivate{
27+
cache: c,
28+
evalChain: evalChain,
29+
stencil: stencil,
30+
}
2531
}
2632

2733
// Apply evaluates every event in the batch against the deactivate policy rules.
@@ -31,7 +37,12 @@ func (d *Deactivate) Apply(_ context.Context, events []*pb.Event, connGroup stri
3137
filtered := make([]*pb.Event, 0, len(events))
3238

3339
for _, event := range events {
34-
meta := ExtractMetadata(event, connGroup, config.PolicyCfg.PublisherMapping, config.EventDistribution.PublisherPattern)
40+
meta, err := ExtractMetadata(event, connGroup, config.PolicyCfg.PublisherMapping, config.EventDistribution.PublisherPattern, d.stencil)
41+
if err != nil {
42+
logger.Errorf("deactivate: failed to extract metadata: %v", err)
43+
metrics.Increment(metricNameEventDeserializationError, fmt.Sprintf("conn_group=%s,reason=%s,event_type=%s,product=%s,event_name=%s", connGroup, getErrorReason(err), event.Type, event.Product, event.EventName))
44+
}
45+
3546
logger.Debugf("[deactivate.Apply] meta: event_name=%s, product=%s, publisher=%s, topic=%s, conn_group=%s", meta.EventName, meta.Product, meta.Publisher, meta.TopicName, meta.ConnGroup)
3647

3748
if d.evalChain.Run(meta, d.cache) {

ingestionrule/action/deactivate_test.go

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ import (
88
pb "buf.build/gen/go/gotocompany/proton/protocolbuffers/go/gotocompany/raccoon/v1beta1"
99
"github.com/goto/raccoon/config"
1010
"github.com/goto/raccoon/ingestionrule/action"
11+
"github.com/goto/raccoon/ingestionrule/action/dedup/schemaregistry"
1112
"github.com/goto/raccoon/ingestionrule/action/eval/cache"
1213
"github.com/stretchr/testify/assert"
14+
"google.golang.org/protobuf/reflect/protoreflect"
1315
"google.golang.org/protobuf/types/known/timestamppb"
1416
)
1517

@@ -24,7 +26,7 @@ func buildDeactivateEventCache(name, product, publisher string) *cache.Cache {
2426
}
2527

2628
func newDeactivate(c *cache.Cache) *action.Deactivate {
27-
return action.NewDeactivate(c, action.DefaultChain())
29+
return action.NewDeactivate(c, action.DefaultChain(), schemaregistry.StencilClient{})
2830
}
2931

3032
func TestDeactivate_DropsMatchingEvent(t *testing.T) {
@@ -92,3 +94,52 @@ func TestDeactivate_DropsMatchingTopicRule(t *testing.T) {
9294
}}
9395
assert.Empty(t, newDeactivate(c).Apply(context.Background(), events, "pub-a"))
9496
}
97+
98+
func TestDeactivate_UsesDeserializedPayload(t *testing.T) {
99+
config.DedupCfg.ProtoClassNameMapping = map[string]string{
100+
"component": "ClickEventProto",
101+
}
102+
103+
metaMsg := &mockMessage{
104+
fields: map[string]any{
105+
"event_guid": "test-guid-123",
106+
},
107+
}
108+
109+
staleTime := time.Now()
110+
tsMsg := &mockMessage{
111+
fullName: "google.protobuf.Timestamp",
112+
fields: map[string]any{
113+
"seconds": staleTime.Unix(),
114+
"nanos": int32(staleTime.Nanosecond()),
115+
},
116+
}
117+
118+
parsedMsg := &mockMessage{
119+
fields: map[string]any{
120+
"meta": metaMsg,
121+
"event_name": "deserialized-click",
122+
"product": protoreflect.EnumNumber(1),
123+
"event_timestamp": tsMsg,
124+
},
125+
}
126+
127+
ms := &mockStencilClient{
128+
parseFunc: func(className string, data []byte) (protoreflect.ProtoMessage, error) {
129+
return parsedMsg, nil
130+
},
131+
}
132+
133+
c := buildDeactivateEventCache("deserialized-click", "clickstream", "pub-a")
134+
d := action.NewDeactivate(c, action.DefaultChain(), schemaregistry.StencilClient{Client: ms})
135+
136+
events := []*pb.Event{{
137+
Type: "component",
138+
EventName: "wrapper-click",
139+
Product: "wrapper-app",
140+
EventBytes: []byte("some-bytes"),
141+
EventTimestamp: timestamppb.New(time.Now()),
142+
}}
143+
144+
assert.Empty(t, d.Apply(context.Background(), events, "pub-a"))
145+
}

ingestionrule/action/dedup.go

Lines changed: 18 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -3,45 +3,17 @@ package action
33
import (
44
"context"
55
"fmt"
6-
"strings"
76
"time"
87

98
pb "buf.build/gen/go/gotocompany/proton/protocolbuffers/go/gotocompany/raccoon/v1beta1"
10-
"github.com/spf13/cast"
11-
"google.golang.org/protobuf/reflect/protoreflect"
129

1310
"github.com/goto/raccoon/config"
1411
"github.com/goto/raccoon/ingestionrule/action/dedup/cache"
15-
"github.com/goto/raccoon/ingestionrule/action/dedup/protoutil"
1612
"github.com/goto/raccoon/ingestionrule/action/dedup/schemaregistry"
1713
"github.com/goto/raccoon/logger"
1814
"github.com/goto/raccoon/metrics"
1915
)
2016

21-
const (
22-
metricNameEventDeserializationError = "event_deserialization_error"
23-
metricNameEventDeserializationLatency = "event_deserialization_latency"
24-
metricNameEventDuplicateCheckerLatency = "event_duplicate_checker_latency"
25-
)
26-
27-
const (
28-
reasonProtoClassNotFound = "proto class not found"
29-
reasonStencilParseError = "stencil parse error"
30-
reasonPublisherNotFound = "publisher not found"
31-
32-
reasonUserIDNotFound = "userID not found"
33-
reasonUserIDTypeInvalid = "userID type invalid"
34-
35-
reasonSessionIDNotFound = "sessionID not found"
36-
reasonSessionIDTypeInvalid = "sessionID type invalid"
37-
38-
reasonEventNameNotFound = "event_name not found"
39-
reasonEventNameTypeInvalid = "event_name type invalid"
40-
41-
reasonEventTimestampNotFound = "event_timestamp not found"
42-
reasonEventTimestampTypeInvalid = "event_timestamp type invalid"
43-
)
44-
4517
// DuplicateChecker defines the capability to verify event uniqueness.
4618
type DuplicateChecker interface {
4719
AreDuplicates(ctx context.Context, events []cache.EventMetadata) ([]bool, error)
@@ -51,8 +23,6 @@ type DuplicateChecker interface {
5123

5224
// processState holds the state of each event being processed.
5325
type processState struct {
54-
// event is the original event being processed.
55-
event *pb.Event
5626
// isValid indicates whether the event has valid metadata and should be checked for duplication.
5727
isValid bool
5828
}
@@ -91,23 +61,29 @@ func (d *Dedup) Apply(ctx context.Context, events []*pb.Event, connGroup string)
9161

9262
for i, event := range events {
9363
startDeserialize := time.Now()
94-
meta, err := d.extractMetadata(event, connGroup)
64+
meta, err := ExtractMetadata(event, connGroup, config.PolicyCfg.PublisherMapping, config.EventDistribution.PublisherPattern, d.stencil)
9565
metrics.Timing(metricNameEventDeserializationLatency, time.Since(startDeserialize).Milliseconds(), fmt.Sprintf("conn_group=%s", connGroup))
9666

9767
if err != nil {
9868
logger.Errorf("dedup: failed to extract metadata: %v", err)
99-
states[i] = processState{event: event, isValid: false}
69+
metrics.Increment(metricNameEventDeserializationError, fmt.Sprintf("conn_group=%s,reason=%s,event_type=%s,product=%s,event_name=%s", connGroup, getErrorReason(err), event.Type, event.Product, event.EventName))
70+
states[i] = processState{isValid: false}
10071
continue
10172
}
10273

10374
if meta.EventGUID == "" || meta.Publisher == "" {
10475
logger.Errorf("dedup: missing metadata fields: %+v for conn_group=%s,product=%s,event_name=%s", meta, connGroup, event.Product, event.EventName)
105-
states[i] = processState{event: event, isValid: false}
76+
states[i] = processState{isValid: false}
10677
continue
10778
}
10879

109-
states[i] = processState{event: event, isValid: true}
110-
metadataBatch = append(metadataBatch, meta)
80+
states[i] = processState{isValid: true}
81+
metadataBatch = append(metadataBatch, cache.EventMetadata{
82+
Publisher: meta.Publisher,
83+
EventGUID: meta.EventGUID,
84+
EventName: meta.EventName,
85+
Product: meta.Product,
86+
})
11187
}
11288

11389
var isDuplicateResults []bool
@@ -122,9 +98,10 @@ func (d *Dedup) Apply(ctx context.Context, events []*pb.Event, connGroup string)
12298
uniqueEvents := make([]*pb.Event, 0, len(events))
12399
resultIdx := 0 // Tracks our position in the isDuplicateResults slice
124100

125-
for _, state := range states {
101+
for i, state := range states {
102+
event := events[i]
126103
if !state.isValid {
127-
uniqueEvents = append(uniqueEvents, state.event)
104+
uniqueEvents = append(uniqueEvents, event)
128105
continue
129106
}
130107

@@ -133,86 +110,22 @@ func (d *Dedup) Apply(ctx context.Context, events []*pb.Event, connGroup string)
133110
logger.Errorf("dedup: cache batch verification failed, bypassing filter: %v", cacheErr)
134111
}
135112

136-
uniqueEvents = append(uniqueEvents, state.event)
113+
uniqueEvents = append(uniqueEvents, event)
137114
resultIdx++
138115
continue
139116
}
140117

141118
isDuplicate := isDuplicateResults[resultIdx]
119+
meta := metadataBatch[resultIdx]
142120
resultIdx++
143121

144122
if isDuplicate {
145-
metrics.Increment(metricEventLossCount, fmt.Sprintf("reason=DEDUP_POLICY,event_name=%s,product=%s,conn_group=%s,event_type=%s", state.event.EventName, state.event.Product, connGroup, state.event.Type))
123+
metrics.Increment(metricEventLossCount, fmt.Sprintf("reason=DEDUP_POLICY,event_name=%s,product=%s,conn_group=%s,event_type=%s", meta.EventName, meta.Product, connGroup, event.Type))
146124
continue
147125
}
148126

149-
uniqueEvents = append(uniqueEvents, state.event)
127+
uniqueEvents = append(uniqueEvents, event)
150128
}
151129

152130
return uniqueEvents
153131
}
154-
155-
// extractMetadata deserializes dynamic protobuf payloads using Stencil and handles identity field extractions.
156-
func (d *Dedup) extractMetadata(event *pb.Event, connGroup string) (cache.EventMetadata, error) {
157-
protoClass, ok := config.DedupCfg.ProtoClassNameMapping[event.Type]
158-
if !ok {
159-
metrics.Increment(metricNameEventDeserializationError,
160-
fmt.Sprintf("conn_group=%s,reason=%s,event_type=%s,product=%s,event_name=%s", connGroup, reasonProtoClassNotFound, event.Type, event.Product, event.EventName))
161-
return cache.EventMetadata{}, fmt.Errorf("failed to find proto class for conn_group=%s,event_type=%s,product=%s,event_name=%s", connGroup, event.Type, event.Product, event.EventName)
162-
}
163-
164-
publisher, ok := config.PolicyCfg.PublisherMapping[connGroup]
165-
if !ok {
166-
metrics.Increment(metricNameEventDeserializationError,
167-
fmt.Sprintf("conn_group=%s,reason=%s,event_type=%s,product=%s,event_name=%s", connGroup, reasonPublisherNotFound, event.Type, event.Product, event.EventName))
168-
return cache.EventMetadata{}, fmt.Errorf("failed to publisher for conn_group=%s,event_type=%s,product=%s,event_name=%s", connGroup, event.Type, event.Product, event.EventName)
169-
}
170-
171-
parsedMsg, err := d.stencil.Client.Parse(protoClass, event.EventBytes)
172-
if err != nil {
173-
metrics.Increment(metricNameEventDeserializationError,
174-
fmt.Sprintf("conn_group=%s,reason=%s,event_type=%s,product=%s,event_name=%s", connGroup, reasonStencilParseError, event.Type, event.Product, event.EventName))
175-
return cache.EventMetadata{}, fmt.Errorf("failed to parse proto class for conn_group=%s,event_type=%s,product=%s,event_name=%s", connGroup, event.Type, event.Product, event.EventName)
176-
}
177-
178-
ref := parsedMsg.ProtoReflect()
179-
180-
const protoFieldEventGUID = "meta.event_guid"
181-
182-
eventGUID, err := d.getStringField(ref, protoFieldEventGUID, connGroup, event, protoFieldEventGUID, reasonEventNameNotFound, reasonEventNameTypeInvalid)
183-
if err != nil {
184-
return cache.EventMetadata{}, err
185-
}
186-
187-
return cache.EventMetadata{
188-
EventGUID: eventGUID,
189-
Publisher: publisher,
190-
}, nil
191-
}
192-
193-
// getStringField is a helper function to safely extract, convert to string, and handle error telemetry for identifier fields.
194-
func (d *Dedup) getStringField(
195-
ref protoreflect.Message,
196-
path string,
197-
connGroup string,
198-
event *pb.Event,
199-
fieldName string,
200-
reasonNotFound string,
201-
reasonTypeInvalid string,
202-
) (string, error) {
203-
rawVal, ok := protoutil.GetFieldValue(ref, strings.Split(path, "."))
204-
if !ok {
205-
metrics.Increment(metricNameEventDeserializationError,
206-
fmt.Sprintf("conn_group=%s,reason=%s,event_type=%s,product=%s,event_name=%s", connGroup, reasonNotFound, event.Type, event.Product, event.EventName))
207-
return "", fmt.Errorf("failed to find %s for conn_group=%s,event_type=%s,product=%s,event_name=%s", fieldName, connGroup, event.Type, event.Product, event.EventName)
208-
}
209-
210-
val, err := cast.ToStringE(rawVal)
211-
if err != nil {
212-
metrics.Increment(metricNameEventDeserializationError,
213-
fmt.Sprintf("conn_group=%s,reason=%s,event_type=%s,product=%s,event_name=%s", connGroup, reasonTypeInvalid, event.Type, event.Product, event.EventName))
214-
return "", fmt.Errorf("%s field type is not convertible to string for conn_group=%s,event_type=%s,product=%s,event_name=%s: %w", fieldName, connGroup, event.Type, event.Product, event.EventName, err)
215-
}
216-
217-
return val, nil
218-
}

ingestionrule/action/dedup/cache/cache.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ type Store struct {
4040
type EventMetadata struct {
4141
Publisher string
4242
EventGUID string
43+
EventName string
44+
Product string
4345
}
4446

4547
// NewStore instantiates the unified storage framework wrapper.

0 commit comments

Comments
 (0)