Skip to content

Commit ea27b72

Browse files
committed
👌 feat(message:deduplication) beginign of work
1 parent 60007d8 commit ea27b72

File tree

11 files changed

+384
-48
lines changed

11 files changed

+384
-48
lines changed

gbus/abstractions.go

+10
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,14 @@ const (
2222
REPLY Semantics = "reply"
2323
)
2424

25+
type DeduplicationPolicy int
26+
27+
const (
28+
DeduplicationPolicyNone DeduplicationPolicy = iota
29+
DeduplicationPolicyReject
30+
DeduplicationPolicyAck
31+
)
32+
2533
//Bus interface provides the majority of functionality to Send, Reply and Publish messages to the Bus
2634
type Bus interface {
2735
HandlerRegister
@@ -213,6 +221,8 @@ type Builder interface {
213221

214222
//WithLogger set custom logger instance
215223
WithLogger(logger logrus.FieldLogger) Builder
224+
225+
WithDeduplicationPolicy(method DeduplicationPolicy, age time.Duration) Builder
216226
}
217227

218228
//Invocation context for a specific processed message

gbus/builder/builder.go

+40-21
Original file line numberDiff line numberDiff line change
@@ -5,31 +5,35 @@ import (
55
"sync"
66
"time"
77

8+
"emperror.dev/errors"
89
"github.com/sirupsen/logrus"
910

1011
"github.com/wework/grabbit/gbus"
12+
"github.com/wework/grabbit/gbus/deduplicator/implementation"
1113
"github.com/wework/grabbit/gbus/saga"
1214
"github.com/wework/grabbit/gbus/serialization"
1315
"github.com/wework/grabbit/gbus/tx/mysql"
1416
)
1517

1618
type defaultBuilder struct {
17-
PrefetchCount uint
18-
connStr string
19-
purgeOnStartup bool
20-
sagaStoreConnStr string
21-
txnl bool
22-
txConnStr string
23-
txnlProvider string
24-
workerNum uint
25-
serializer gbus.Serializer
26-
dlx string
27-
defaultPolicies []gbus.MessagePolicy
28-
confirm bool
29-
dbPingTimeout time.Duration
30-
usingPingTimeout bool
31-
logger logrus.FieldLogger
32-
busCfg gbus.BusConfiguration
19+
PrefetchCount uint
20+
connStr string
21+
purgeOnStartup bool
22+
sagaStoreConnStr string
23+
txnl bool
24+
txConnStr string
25+
txnlProvider string
26+
workerNum uint
27+
serializer gbus.Serializer
28+
dlx string
29+
defaultPolicies []gbus.MessagePolicy
30+
confirm bool
31+
dbPingTimeout time.Duration
32+
usingPingTimeout bool
33+
logger logrus.FieldLogger
34+
busCfg gbus.BusConfiguration
35+
deduplicationPolicy gbus.DeduplicationPolicy
36+
deduplicationRetentionAge time.Duration
3337
}
3438

3539
func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
@@ -53,6 +57,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
5357
DefaultPolicies: builder.defaultPolicies,
5458
DbPingTimeout: 3,
5559
Confirm: builder.confirm,
60+
DeduplicationPolicy: builder.deduplicationPolicy,
5661
}
5762

5863
var finalLogger logrus.FieldLogger
@@ -107,6 +112,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
107112
if builder.usingPingTimeout {
108113
gb.DbPingTimeout = builder.dbPingTimeout
109114
}
115+
gb.Deduplicator = implementation.NewDeduplicator(svcName, builder.deduplicationPolicy, gb.TxProvider, builder.deduplicationRetentionAge, gb.Log())
110116

111117
//TODO move this into the NewSagaStore factory methods
112118
if builder.purgeOnStartup {
@@ -115,6 +121,11 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
115121
errMsg := fmt.Errorf("grabbit: saga store faild to purge. error: %v", err)
116122
panic(errMsg)
117123
}
124+
err = gb.Deduplicator.Purge()
125+
if err != nil {
126+
errMsg := errors.NewWithDetails("duplicator failed to purge", "component", "grabbit", "feature", "deduplicator")
127+
panic(errMsg)
128+
}
118129
}
119130
glue := saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider, gb.Log, timeoutManager)
120131
glue.SetLogger(gb.Log())
@@ -206,6 +217,12 @@ func (builder *defaultBuilder) WithLogger(logger logrus.FieldLogger) gbus.Builde
206217
return builder
207218
}
208219

220+
func (builder *defaultBuilder) WithDeduplicationPolicy(policy gbus.DeduplicationPolicy, age time.Duration) gbus.Builder {
221+
builder.deduplicationPolicy = policy
222+
builder.deduplicationRetentionAge = age
223+
return builder
224+
}
225+
209226
//New :)
210227
func New() Nu {
211228
return Nu{}
@@ -218,9 +235,11 @@ type Nu struct {
218235
//Bus inits a new BusBuilder
219236
func (Nu) Bus(brokerConnStr string) gbus.Builder {
220237
return &defaultBuilder{
221-
busCfg: gbus.BusConfiguration{},
222-
PrefetchCount: 1,
223-
connStr: brokerConnStr,
224-
serializer: serialization.NewGobSerializer(),
225-
defaultPolicies: make([]gbus.MessagePolicy, 0)}
238+
busCfg: gbus.BusConfiguration{},
239+
PrefetchCount: 1,
240+
connStr: brokerConnStr,
241+
serializer: serialization.NewGobSerializer(),
242+
defaultPolicies: make([]gbus.MessagePolicy, 0),
243+
deduplicationPolicy: gbus.DeduplicationPolicyNone,
244+
}
226245
}

gbus/bus.go

+18-10
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"sync"
1010
"time"
1111

12+
"github.com/wework/grabbit/gbus/deduplicator"
1213
"github.com/wework/grabbit/gbus/metrics"
1314

1415
"github.com/opentracing-contrib/go-amqp/amqptracer"
@@ -56,15 +57,17 @@ type DefaultBus struct {
5657
Glue SagaGlue
5758
TxProvider TxProvider
5859

59-
WorkerNum uint
60-
Serializer Serializer
61-
DLX string
62-
DefaultPolicies []MessagePolicy
63-
Confirm bool
64-
healthChan chan error
65-
backpressure bool
66-
DbPingTimeout time.Duration
67-
amqpConnected bool
60+
WorkerNum uint
61+
Serializer Serializer
62+
DLX string
63+
DeduplicationPolicy DeduplicationPolicy
64+
Deduplicator deduplicator.DeduplicatorStore
65+
DefaultPolicies []MessagePolicy
66+
Confirm bool
67+
healthChan chan error
68+
backpressure bool
69+
DbPingTimeout time.Duration
70+
amqpConnected bool
6871
}
6972

7073
var (
@@ -222,6 +225,8 @@ func (b *DefaultBus) Start() error {
222225
return startErr
223226
}
224227

228+
b.Deduplicator.Start()
229+
225230
//declare queue
226231
var q amqp.Queue
227232
if q, e = b.createServiceQueue(); e != nil {
@@ -294,7 +299,10 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) {
294299
registrations: b.Registrations,
295300
serializer: b.Serializer,
296301
b: b,
297-
amqpErrors: b.amqpErrors}
302+
amqpErrors: b.amqpErrors,
303+
delicatePolicy: b.DeduplicationPolicy,
304+
duplicateStore: b.Deduplicator,
305+
}
298306

299307
err := w.Start()
300308
if err != nil {
+138
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package implementation
2+
3+
import (
4+
"database/sql"
5+
"time"
6+
7+
"emperror.dev/errors"
8+
"github.com/sirupsen/logrus"
9+
10+
"github.com/wework/grabbit/gbus"
11+
"github.com/wework/grabbit/gbus/deduplicator"
12+
"github.com/wework/grabbit/gbus/tx"
13+
)
14+
15+
var _ deduplicator.DeduplicatorStore = &deduper{}
16+
17+
type deduper struct {
18+
*gbus.Glogged
19+
svcName string
20+
policy gbus.DeduplicationPolicy
21+
txProvider gbus.TxProvider
22+
age time.Duration
23+
ticker *time.Ticker
24+
done chan bool
25+
tableName string
26+
}
27+
28+
func (d *deduper) Purge() (err error) {
29+
truncateSQL := "TRUNCATE TABLE " + d.tableName
30+
txp, err := d.txProvider.New()
31+
if err != nil {
32+
return err
33+
}
34+
defer func() {
35+
if err != nil {
36+
serr := txp.Rollback()
37+
err = errors.Append(err, serr)
38+
}
39+
err = txp.Commit()
40+
}()
41+
_, err = txp.Exec(truncateSQL)
42+
if err != nil {
43+
return err
44+
}
45+
return nil
46+
}
47+
48+
func (d *deduper) Start() {
49+
d.ticker = time.NewTicker(time.Minute)
50+
d.done = make(chan bool)
51+
deleteQuery := "DELETE FROM " + d.tableName + " WHERE `created_at` < ?"
52+
go func() {
53+
for {
54+
select {
55+
case <-d.done:
56+
return
57+
case <-d.ticker.C:
58+
oldest := time.Now().Add(-1 * d.age)
59+
tx, err := d.txProvider.New()
60+
if err != nil {
61+
d.Log().WithError(err).Error("failed to acquire a tx")
62+
continue
63+
}
64+
result, err := tx.Exec(deleteQuery, oldest)
65+
if err != nil && err != sql.ErrNoRows {
66+
d.Log().WithError(err).Error("failed executing delete query")
67+
}
68+
n, err := result.RowsAffected()
69+
if err != nil {
70+
d.Log().WithError(err).Error("failed to get count of affected rows")
71+
} else {
72+
d.Log().WithField("table_name", d.tableName).WithField("rows_deleted", n).
73+
Info("successfully cleanup duplicates table")
74+
}
75+
}
76+
}
77+
}()
78+
}
79+
80+
func (d *deduper) Stop() {
81+
d.Log().Info("shutting down deduplicator")
82+
d.ticker.Stop()
83+
close(d.done)
84+
}
85+
86+
//
87+
func (d *deduper) StoreMessageId(tx *sql.Tx, id string) error {
88+
insertSQL := "INSERT INTO " + d.tableName + " (id) values (?)"
89+
_, err := tx.Exec(insertSQL, id)
90+
if err != nil {
91+
d.Log().WithError(err).Error("failed to insert the id of the message into the dedup table")
92+
return err
93+
}
94+
return nil
95+
}
96+
97+
// MessageExists checks if a message id is in the deduplication table and returns an error if it fails
98+
func (d *deduper) MessageExists(id string) (bool, error) {
99+
if d.policy == gbus.DeduplicationPolicyNone {
100+
return false, nil
101+
}
102+
tx, err := d.txProvider.New()
103+
if err != nil {
104+
return true, err
105+
}
106+
defer func() {
107+
err = tx.Rollback()
108+
if err != nil {
109+
d.Log().WithError(err).Error("could not commit tx for query MessageExists")
110+
}
111+
}()
112+
selectSQL := "SELECT EXISTS (SELECT id FROM " + d.tableName + " WHERE id = ? limit 1)"
113+
114+
var exists bool
115+
err = tx.QueryRow(selectSQL, id).Scan(&exists)
116+
if err != nil && err == sql.ErrNoRows {
117+
return false, nil
118+
}
119+
120+
if err != nil {
121+
return true, err
122+
}
123+
124+
return true, nil
125+
}
126+
127+
func NewDeduplicator(svcName string, policy gbus.DeduplicationPolicy, txProvider gbus.TxProvider, age time.Duration, logger logrus.FieldLogger) deduplicator.DeduplicatorStore {
128+
d := &deduper{
129+
svcName: svcName,
130+
policy: policy,
131+
txProvider: txProvider,
132+
age: age,
133+
tableName: tx.GrabbitTableNameTemplate(svcName, "duplicates"),
134+
}
135+
l := logger.WithField("grabbit", "deduplicator")
136+
d.SetLogger(l)
137+
return d
138+
}

gbus/deduplicator/store.go

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package deduplicator
2+
3+
import (
4+
"database/sql"
5+
)
6+
7+
// DeduplicatorStore abtracts the way deduplicateor manages the
8+
type DeduplicatorStore interface {
9+
StoreMessageId(tx *sql.Tx, id string) error
10+
MessageExists(id string) (bool, error)
11+
Purge() error
12+
Start()
13+
Stop()
14+
}

0 commit comments

Comments
 (0)