9
9
10
10
"github.com/cosmos/cosmos-sdk/store/multiversion"
11
11
store "github.com/cosmos/cosmos-sdk/store/types"
12
+ "github.com/cosmos/cosmos-sdk/telemetry"
12
13
sdk "github.com/cosmos/cosmos-sdk/types"
13
14
"github.com/cosmos/cosmos-sdk/types/occ"
14
15
"github.com/cosmos/cosmos-sdk/utils/tracing"
@@ -78,6 +79,7 @@ type scheduler struct {
78
79
allTasks []* deliverTxTask
79
80
executeCh chan func ()
80
81
validateCh chan func ()
82
+ metrics * schedulerMetrics
81
83
}
82
84
83
85
// NewScheduler creates a new scheduler
@@ -86,6 +88,7 @@ func NewScheduler(workers int, tracingInfo *tracing.Info, deliverTxFunc func(ctx
86
88
workers : workers ,
87
89
deliverTx : deliverTxFunc ,
88
90
tracingInfo : tracingInfo ,
91
+ metrics : & schedulerMetrics {},
89
92
}
90
93
}
91
94
@@ -152,11 +155,16 @@ func toTasks(reqs []*sdk.DeliverTxEntry) []*deliverTxTask {
152
155
return res
153
156
}
154
157
155
- func collectResponses (tasks []* deliverTxTask ) []types.ResponseDeliverTx {
158
+ func ( s * scheduler ) collectResponses (tasks []* deliverTxTask ) []types.ResponseDeliverTx {
156
159
res := make ([]types.ResponseDeliverTx , 0 , len (tasks ))
160
+ var maxIncarnation int
157
161
for _ , t := range tasks {
162
+ if t .Incarnation > maxIncarnation {
163
+ maxIncarnation = t .Incarnation
164
+ }
158
165
res = append (res , * t .Response )
159
166
}
167
+ s .metrics .maxIncarnation = maxIncarnation
160
168
return res
161
169
}
162
170
@@ -202,6 +210,19 @@ func (s *scheduler) PrefillEstimates(reqs []*sdk.DeliverTxEntry) {
202
210
}
203
211
}
204
212
213
+ // schedulerMetrics contains metrics for the scheduler
214
+ type schedulerMetrics struct {
215
+ // maxIncarnation is the highest incarnation seen in this set
216
+ maxIncarnation int
217
+ // retries is the number of tx attempts beyond the first attempt
218
+ retries int
219
+ }
220
+
221
+ func (s * scheduler ) emitMetrics () {
222
+ telemetry .IncrCounter (float32 (s .metrics .retries ), "scheduler" , "retries" )
223
+ telemetry .SetGauge (float32 (s .metrics .maxIncarnation ), "scheduler" , "max_incarnation" )
224
+ }
225
+
205
226
func (s * scheduler ) ProcessAll (ctx sdk.Context , reqs []* sdk.DeliverTxEntry ) ([]types.ResponseDeliverTx , error ) {
206
227
// initialize mutli-version stores if they haven't been initialized yet
207
228
s .tryInitMultiVersionStore (ctx )
@@ -211,6 +232,7 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t
211
232
s .allTasks = tasks
212
233
s .executeCh = make (chan func (), len (tasks ))
213
234
s .validateCh = make (chan func (), len (tasks ))
235
+ defer s .emitMetrics ()
214
236
215
237
// default to number of tasks if workers is negative or 0 by this point
216
238
workers := s .workers
@@ -245,11 +267,13 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t
245
267
if err != nil {
246
268
return nil , err
247
269
}
270
+ // these are retries which apply to metrics
271
+ s .metrics .retries += len (toExecute )
248
272
}
249
273
for _ , mv := range s .multiVersionStores {
250
274
mv .WriteLatestToStore ()
251
275
}
252
- return collectResponses (tasks ), nil
276
+ return s . collectResponses (tasks ), nil
253
277
}
254
278
255
279
func (s * scheduler ) shouldRerun (task * deliverTxTask ) bool {
0 commit comments