Skip to content

Commit d66df35

Browse files
kerenbe4Guy Baron
authored and
Guy Baron
committed
Add saga latency metric (#224)
records the execution time of the entire saga, from it's creation until deletion (in ms). having the "Service", "SagaType", "Initiator" as labels|
1 parent 4fdd9fc commit d66df35

File tree

7 files changed

+63
-24
lines changed

7 files changed

+63
-24
lines changed

docs/METRICS.md

+5-4
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ grabbit exposes and reports the following metrics to Prometheus
99
| grabbit | handlers | result | records and counts each run of a handler, having the handler's name, message type and the result as labels|
1010
| grabbit | handlers | latency | records the execution time of each run of a handler, having the handler's name, message type as labels|
1111
| grabbit | messages | rejected_messages | increments each time a message gets rejected |
12-
| grabbit | saga | timedout_sagas | counting the number of timedout saga instances |
13-
| grabbit | outbox | outbox_total_records | reports the total amount of records currently in the outbox |
14-
| grabbit | outbox | outbox_pending_delivery | reports the total amount of records pending delivery currently in the outbox |
15-
| grabbit | outbox | outbox_pending_removal | reports the total amount of records that were sent and pending removal currently in the outbox |
12+
| grabbit | saga | timedout_sagas | counting the number of timedout saga instances |
13+
| grabbit | saga | latency | records the execution time of the entire saga, from it's creation until deletion (in ms). having the "Service", "SagaType", "Initiator" as labels|
14+
| grabbit | outbox | outbox_total_records | reports the total amount of records currently in the outbox |
15+
| grabbit | outbox | outbox_pending_delivery | reports the total amount of records pending delivery currently in the outbox |
16+
| grabbit | outbox | outbox_pending_removal | reports the total amount of records that were sent and pending removal currently in the outbox |
1617

gbus/builder/builder.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
7979
providerLogger := gb.Log().WithField("provider", "mysql")
8080
mysqltx, err := mysql.NewTxProvider(builder.txConnStr)
8181
if err != nil {
82-
panic(err)
82+
errMsg := fmt.Errorf("grabbit: transaction provider failed creating a transaction. %v", err)
83+
panic(errMsg)
8384
}
8485
gb.TxProvider = mysqltx
8586

@@ -91,7 +92,8 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
9192
if builder.purgeOnStartup {
9293
err := sagaStore.Purge()
9394
if err != nil {
94-
panic(err)
95+
errMsg := fmt.Errorf("grabbit: saga store faild to purge. error: %v", err)
96+
panic(errMsg)
9597
}
9698
}
9799
gb.Outbox = mysql.NewOutbox(gb.SvcName, mysqltx, builder.purgeOnStartup, builder.busCfg.OutboxCfg)
@@ -110,7 +112,8 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
110112
if builder.purgeOnStartup {
111113
err := sagaStore.Purge()
112114
if err != nil {
113-
panic(err)
115+
errMsg := fmt.Errorf("grabbit: saga store faild to purge. error: %v", err)
116+
panic(errMsg)
114117
}
115118
}
116119
glue := saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider, gb.Log, timeoutManager)

gbus/metrics/saga_metrics.go

+16-11
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,22 @@ import (
66
io_prometheus_client "github.com/prometheus/client_model/go"
77
)
88

9-
//SagaTimeoutCounter is the prometheus counter counting timed out saga instances
10-
var SagaTimeoutCounter = newSagaTimeoutCounter()
9+
var (
10+
//SagaTimeoutCounter is the prometheus counter counting timed out saga instances
11+
SagaTimeoutCounter = promauto.NewCounter(prometheus.CounterOpts{
12+
Namespace: grabbitPrefix,
13+
Subsystem: "saga",
14+
Name: "timedout_sagas",
15+
Help: "counting the number of timedout saga instances",
16+
})
17+
//SagaLatencySummary is the prometheus summary for the total duration of a saga
18+
SagaLatencySummary = promauto.NewSummaryVec(prometheus.SummaryOpts{
19+
Namespace: grabbitPrefix,
20+
Subsystem: "saga",
21+
Name: "latency",
22+
Help: "The latency of the entire saga",
23+
}, []string{"Service", "SagaType", "Initiator"})
24+
)
1125

1226
//GetSagaTimeoutCounterValue gets the counter value of timed out sagas reported to prometheus
1327
func GetSagaTimeoutCounterValue() (float64, error) {
@@ -20,12 +34,3 @@ func GetSagaTimeoutCounterValue() (float64, error) {
2034

2135
return m.GetCounter().GetValue(), nil
2236
}
23-
24-
func newSagaTimeoutCounter() prometheus.Counter {
25-
return promauto.NewCounter(prometheus.CounterOpts{
26-
Namespace: grabbitPrefix,
27-
Subsystem: "saga",
28-
Name: "timedout_sagas",
29-
Help: "counting the number of timedout saga instances",
30-
})
31-
}

gbus/saga/glue.go

+2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"reflect"
99
"strings"
1010
"sync"
11+
"time"
1112

1213
"github.com/opentracing/opentracing-go"
1314
slog "github.com/opentracing/opentracing-go/log"
@@ -256,6 +257,7 @@ func (imsm *Glue) completeOrUpdateSaga(tx *sql.Tx, instance *Instance) error {
256257

257258
if instance.isComplete() {
258259
imsm.Log().WithField("saga_id", instance.ID).Info("saga has completed and will be deleted")
260+
metrics.SagaLatencySummary.WithLabelValues(imsm.svcName, reflect.TypeOf(instance.UnderlyingInstance).String(), instance.StartedBy).Observe(float64(time.Since(instance.CreatedAt) / time.Millisecond))
259261

260262
deleteErr := imsm.sagaStore.DeleteSaga(tx, instance)
261263
if deleteErr != nil {

gbus/saga/instance.go

+4
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ type Instance struct {
3434
StartedByMessageID string
3535
//StartedByRPCID the rpc id of the message that created the saga
3636
StartedByRPCID string
37+
38+
//CreatedAt the time.Now() timestamp when the saga was created
39+
CreatedAt time.Time
3740
}
3841

3942
func (si *Instance) log() logrus.FieldLogger {
@@ -151,6 +154,7 @@ func NewInstance(sagaType reflect.Type, msgToMethodMap []*MsgToFuncPair) *Instan
151154
ID: xid.New().String(),
152155
UnderlyingInstance: newSaga,
153156
MsgToMethodMap: msgToMethodMap,
157+
CreatedAt: time.Now(),
154158
}
155159
return newInstance
156160
}

gbus/tx/mysql/migrations.go

+17
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,22 @@ func sagaStoreAddRPCIDDetails(svcName string) *migrator.Migration {
6363
}
6464
}
6565

66+
func sagaStoreAddCreatedAtDetails(svcName string) *migrator.Migration {
67+
tblName := tx.GrabbitTableNameTemplate(svcName, "sagas")
68+
69+
addCreatorDetailsSQL := `ALTER TABLE ` + tblName + ` ADD COLUMN created_at DATETIME NOT NULL DEFAULT NOW() AFTER version`
70+
71+
return &migrator.Migration{
72+
Name: "adding the created_at column to the saga table",
73+
Func: func(tx *sql.Tx) error {
74+
if _, err := tx.Exec(addCreatorDetailsSQL); err != nil {
75+
return err
76+
}
77+
return nil
78+
},
79+
}
80+
}
81+
6682
func outboxMigrations(svcName string) *migrator.Migration {
6783

6884
tblName := tx.GrabbitTableNameTemplate(svcName, "outbox")
@@ -158,6 +174,7 @@ func EnsureSchema(db *sql.DB, svcName string) {
158174
outboxChangeColumnLength(svcName),
159175
sagaStoreAddSagaCreatorDetails(svcName),
160176
sagaStoreAddRPCIDDetails(svcName),
177+
sagaStoreAddCreatedAtDetails(svcName),
161178
))
162179
if err != nil {
163180
panic(err)

gbus/tx/sagastore.go

+13-6
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@ import (
99
"reflect"
1010
"regexp"
1111
"strings"
12+
"time"
1213

1314
log "github.com/sirupsen/logrus"
1415

16+
"github.com/go-sql-driver/mysql"
1517
"github.com/wework/grabbit/gbus"
1618
"github.com/wework/grabbit/gbus/saga"
1719
)
@@ -37,8 +39,9 @@ func (store *SagaStore) scanInstances(rows *sql.Rows) ([]*saga.Instance, error)
3739
var startedBySaga sql.NullString
3840
var startedByMsgID sql.NullString
3941
var startedByRPCID sql.NullString
42+
var createdAt mysql.NullTime
4043

41-
error := rows.Scan(&sagaID, &sagaType, &sagaData, &startedBy, &startedByMsgID, &startedByRPCID, &startedBySaga, &version)
44+
error := rows.Scan(&sagaID, &sagaType, &sagaData, &startedBy, &startedByMsgID, &startedByRPCID, &startedBySaga, &version, &createdAt)
4245
if error == sql.ErrNoRows {
4346
return nil, error
4447
} else if error != nil {
@@ -67,6 +70,11 @@ func (store *SagaStore) scanInstances(rows *sql.Rows) ([]*saga.Instance, error)
6770
instance.StartedByRPCID = startedByRPCID.String
6871
}
6972

73+
if createdAt.Valid {
74+
value, _ := createdAt.Value()
75+
instance.CreatedAt = value.(time.Time)
76+
}
77+
7078
if decErr != nil {
7179
store.Log().WithError(decErr).Error("failed to decode saga instance")
7280
return nil, decErr
@@ -83,7 +91,7 @@ func (store *SagaStore) scanInstances(rows *sql.Rows) ([]*saga.Instance, error)
8391
func (store *SagaStore) GetSagasByType(tx *sql.Tx, sagaType reflect.Type) (instances []*saga.Instance, err error) {
8492

8593
tblName := GetSagatableName(store.SvcName)
86-
selectSQL := "SELECT saga_id, saga_type, saga_data, started_by_request_of_svc, started_by_msg_id, started_by_rpcid, started_by_request_of_saga, version FROM " + tblName + " WHERE saga_type=" + store.ParamsMarkers[0]
94+
selectSQL := "SELECT saga_id, saga_type, saga_data, started_by_request_of_svc, started_by_msg_id, started_by_rpcid, started_by_request_of_saga, version, created_at FROM " + tblName + " WHERE saga_type=" + store.ParamsMarkers[0]
8795

8896
rows, err := tx.Query(selectSQL, sagaType.String())
8997
defer func() {
@@ -153,7 +161,7 @@ func (store *SagaStore) DeleteSaga(tx *sql.Tx, instance *saga.Instance) error {
153161
func (store *SagaStore) GetSagaByID(tx *sql.Tx, sagaID string) (*saga.Instance, error) {
154162

155163
tblName := GetSagatableName(store.SvcName)
156-
selectSQL := `SELECT saga_id, saga_type, saga_data, started_by_request_of_svc, started_by_msg_id, started_by_rpcid, started_by_request_of_saga, version FROM ` + tblName + ` WHERE saga_id=` + store.ParamsMarkers[0] + ``
164+
selectSQL := `SELECT saga_id, saga_type, saga_data, started_by_request_of_svc, started_by_msg_id, started_by_rpcid, started_by_request_of_saga, version, created_at FROM ` + tblName + ` WHERE saga_id=` + store.ParamsMarkers[0] + ``
157165

158166
rows, err := tx.Query(selectSQL, sagaID)
159167
defer func() {
@@ -187,14 +195,14 @@ func (store *SagaStore) GetSagaByID(tx *sql.Tx, sagaID string) (*saga.Instance,
187195
func (store *SagaStore) SaveNewSaga(tx *sql.Tx, sagaType reflect.Type, newInstance *saga.Instance) (err error) {
188196
store.RegisterSagaType(newInstance.UnderlyingInstance)
189197
tblName := GetSagatableName(store.SvcName)
190-
insertSQL := `INSERT INTO ` + tblName + ` (saga_id, saga_type, saga_data, started_by_request_of_svc, started_by_msg_id, started_by_rpcid, started_by_request_of_saga, version) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`
198+
insertSQL := `INSERT INTO ` + tblName + ` (saga_id, saga_type, saga_data, started_by_request_of_svc, started_by_msg_id, started_by_rpcid, started_by_request_of_saga, version, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`
191199

192200
var buf []byte
193201
if buf, err = store.serilizeSaga(newInstance); err != nil {
194202
store.Log().WithError(err).WithField("saga_id", newInstance.ID).Error("failed to encode saga with sagaID")
195203
return err
196204
}
197-
_, err = tx.Exec(insertSQL, newInstance.ID, sagaType.String(), buf, newInstance.StartedBy, newInstance.StartedByMessageID, newInstance.StartedByRPCID, newInstance.StartedBySaga, newInstance.ConcurrencyCtrl)
205+
_, err = tx.Exec(insertSQL, newInstance.ID, sagaType.String(), buf, newInstance.StartedBy, newInstance.StartedByMessageID, newInstance.StartedByRPCID, newInstance.StartedBySaga, newInstance.ConcurrencyCtrl, newInstance.CreatedAt)
198206
if err != nil {
199207
store.Log().WithError(err).Error("failed saving new saga")
200208
return err
@@ -251,4 +259,3 @@ func GetSagatableName(svcName string) string {
251259

252260
return strings.ToLower("grabbit_" + sanitized + "_sagas")
253261
}
254-

0 commit comments

Comments
 (0)