Skip to content

Commit f03a039

Browse files
authored
Merge pull request #516 from drift-labs/wphan/pyth-cranker-metrics
pyth-crankers: add tx recorder metrics and health check
2 parents 4cbce13 + 86c56fa commit f03a039

File tree

4 files changed

+913
-1136
lines changed

4 files changed

+913
-1136
lines changed

src/bots/common/txRecorder.ts

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,28 @@ import {
66
ExplicitBucketHistogramAggregation,
77
InstrumentType,
88
} from '@opentelemetry/sdk-metrics-base';
9-
import { metrics, type Histogram } from '@opentelemetry/api';
9+
import { metrics, ObservableGauge, type Histogram } from '@opentelemetry/api';
1010
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
1111
import { registerInstrumentations } from '@opentelemetry/instrumentation';
1212

1313
export class TxRecorder {
1414
private txLatency?: Histogram;
1515
private slotLatency?: Histogram;
16+
private txLatencyEma?: ObservableGauge;
1617
private attrs: Record<string, string> = {};
18+
private movingAvgLatencyMs?: number;
19+
private emaAlpha: number = 0.1;
20+
private healthThresholdMs: number = 20000;
21+
22+
constructor(
23+
botName: string,
24+
port?: number,
25+
disabled?: boolean,
26+
thresholdMs = 20000
27+
) {
28+
// set health threshold regardless of metrics being enabled
29+
this.healthThresholdMs = thresholdMs;
1730

18-
constructor(botName: string, port?: number, disabled?: boolean) {
1931
if (disabled || !port) {
2032
logger.info(`metrics disabled for bot ${botName}`);
2133
return;
@@ -71,10 +83,30 @@ export class TxRecorder {
7183
this.slotLatency = meter.createHistogram('tx_slot_latency', {
7284
unit: 'slots',
7385
});
86+
this.txLatencyEma = meter.createObservableGauge('tx_latency_ema', {
87+
unit: 'ms',
88+
});
89+
this.txLatencyEma.addCallback((result) => {
90+
result.observe(this.movingAvgLatencyMs ?? 0, this.attrs);
91+
});
7492
}
7593

7694
send(latencyMs: number, slotDelta = 0) {
7795
this.txLatency?.record(latencyMs, this.attrs);
7896
this.slotLatency?.record(Math.max(0, slotDelta), this.attrs);
97+
98+
// update exponential moving average of tx latency
99+
if (this.movingAvgLatencyMs === undefined) {
100+
this.movingAvgLatencyMs = latencyMs;
101+
} else {
102+
this.movingAvgLatencyMs =
103+
this.emaAlpha * latencyMs +
104+
(1 - this.emaAlpha) * this.movingAvgLatencyMs;
105+
}
106+
}
107+
108+
isHealthy(): boolean {
109+
if (this.movingAvgLatencyMs === undefined) return true;
110+
return this.movingAvgLatencyMs <= this.healthThresholdMs;
79111
}
80112
}

src/bots/pythCranker.ts

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,12 @@ export class PythCrankerBot implements Bot {
130130
this.slotStalenessThresholdRestart =
131131
crankConfigs.slotStalenessThresholdRestart;
132132

133-
this.txRecorder = new TxRecorder(this.name, crankConfigs.metricsPort);
133+
this.txRecorder = new TxRecorder(
134+
this.name,
135+
crankConfigs.metricsPort,
136+
false,
137+
20_000
138+
);
134139
}
135140

136141
async init(): Promise<void> {
@@ -487,16 +492,14 @@ export class PythCrankerBot implements Bot {
487492
this.driftClient
488493
.sendTransaction(simResult.tx)
489494
.then((txSigAndSlot: TxSigAndSlot) => {
490-
this.txRecorder.send(
491-
Date.now() - startTime,
492-
txSigAndSlot.slot - sendSlot
493-
);
495+
const duration = Date.now() - startTime;
496+
this.txRecorder.send(duration, txSigAndSlot.slot - sendSlot);
494497
logger.info(
495498
`Posted multi pyth pull oracle for ${feedIds.map(
496499
(feedId) => feedId.baseSymbol
497-
)} update atomic tx: ${txSigAndSlot.txSig}, took ${
498-
Date.now() - startTime
499-
}ms, landed slot: ${
500+
)} update atomic tx: ${
501+
txSigAndSlot.txSig
502+
}, took ${duration}ms, landed slot: ${
500503
txSigAndSlot.slot
501504
}, sent slot: ${sendSlot}`
502505
);
@@ -534,6 +537,11 @@ export class PythCrankerBot implements Bot {
534537
}
535538

536539
async healthCheck(): Promise<boolean> {
540+
const txRecorderHealthy = this.txRecorder.isHealthy();
541+
if (!txRecorderHealthy) {
542+
logger.warn(`${this.name} bot tx recorder is unhealthy`);
543+
}
544+
this.health = txRecorderHealthy;
537545
return this.health;
538546
}
539547
}

src/bots/pythLazerCranker.ts

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import {
3030
PythLazerSubscriber,
3131
} from '../pythLazerSubscriber';
3232
import { Channel } from '@pythnetwork/pyth-lazer-sdk';
33+
import { TxRecorder } from './common/txRecorder';
3334

3435
setGlobalDispatcher(
3536
new Agent({
@@ -50,6 +51,8 @@ export class PythLazerCrankerBot implements Bot {
5051

5152
private blockhashSubscriber: BlockhashSubscriber;
5253
private health: boolean = true;
54+
// Metrics
55+
private txRecorder: TxRecorder;
5356

5457
constructor(
5558
private globalConfig: GlobalConfig,
@@ -150,6 +153,13 @@ export class PythLazerCrankerBot implements Bot {
150153
this.blockhashSubscriber = new BlockhashSubscriber({
151154
connection: driftClient.connection,
152155
});
156+
157+
this.txRecorder = new TxRecorder(
158+
this.name,
159+
crankConfigs.metricsPort,
160+
false,
161+
20_000
162+
);
153163
}
154164

155165
async init(): Promise<void> {
@@ -257,10 +267,10 @@ export class PythLazerCrankerBot implements Bot {
257267
this.driftClient
258268
.sendTransaction(simResult.tx)
259269
.then((txSigAndSlot: TxSigAndSlot) => {
270+
const duration = Date.now() - startTime;
271+
this.txRecorder.send(duration);
260272
logger.info(
261-
`Posted pyth lazer oracles for ${feedIds} update atomic tx: ${
262-
txSigAndSlot.txSig
263-
}, took ${Date.now() - startTime}ms`
273+
`Posted pyth lazer oracles for ${feedIds} update atomic tx: ${txSigAndSlot.txSig}, took ${duration}ms, skippedSim: false`
264274
);
265275
})
266276
.catch((e) => {
@@ -277,10 +287,10 @@ export class PythLazerCrankerBot implements Bot {
277287
this.driftClient
278288
.sendTransaction(tx)
279289
.then((txSigAndSlot: TxSigAndSlot) => {
290+
const duration = Date.now() - startTime;
291+
this.txRecorder.send(duration);
280292
logger.info(
281-
`Posted pyth lazer oracles for ${feedIds} update atomic tx: ${
282-
txSigAndSlot.txSig
283-
}, took ${Date.now() - startTime}ms`
293+
`Posted pyth lazer oracles for ${feedIds} update atomic tx: ${txSigAndSlot.txSig}, took ${duration}ms, skippedSim: true`
284294
);
285295
})
286296
.catch((e) => {
@@ -291,6 +301,11 @@ export class PythLazerCrankerBot implements Bot {
291301
}
292302

293303
async healthCheck(): Promise<boolean> {
304+
const txRecorderHealthy = this.txRecorder.isHealthy();
305+
if (!txRecorderHealthy) {
306+
logger.warn(`${this.name} bot tx recorder is unhealthy`);
307+
}
308+
this.health = txRecorderHealthy;
294309
return this.health;
295310
}
296311
}

0 commit comments

Comments
 (0)