Skip to content

Commit 4741786

Browse files
authored
Feat block metrics (#389)
Block related metrics added for dumper and iongester.
1 parent 4d8980d commit 4741786

File tree

13 files changed

+287
-13
lines changed

13 files changed

+287
-13
lines changed

fuel/fuel-dump/src/dumper.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,13 @@ export class FuelDumper extends Dumper<BlockData, Options> {
2828
return block.block.header.prevRoot
2929
}
3030

31+
protected getBlockTimestamp(block: BlockData): number {
32+
const TAI64_UNIX_OFFSET = BigInt("4611686018427387914");
33+
const taiTimestamp = BigInt(block.block.header.time);
34+
const unixTimeMs = taiTimestamp - TAI64_UNIX_OFFSET;
35+
return Number(unixTimeMs);
36+
}
37+
3138
protected validateChainContinuity(): boolean {
3239
return false
3340
}

fuel/fuel-ingest/src/ingest.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,15 @@ export class FuelIngest extends Ingest<IngestOptions> {
2929
})
3030
}
3131
}
32+
33+
protected getBlockHeight(block: any): number {
34+
return Number(block.header.height) || 0
35+
}
36+
37+
protected getBlockTimestamp(block: any): number {
38+
const TAI64_UNIX_OFFSET = BigInt("4611686018427387914");
39+
const taiTimestamp = BigInt(block.header.time);
40+
const unixTimeMs = taiTimestamp - TAI64_UNIX_OFFSET;
41+
return Number(unixTimeMs);
42+
}
3243
}

solana/solana-dump/src/dumper.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ export class SolanaDumper extends Dumper<Block, Options> {
4848
return block.block.previousBlockhash
4949
}
5050

51+
protected getBlockTimestamp(block: Block): number {
52+
return Number(block.block.blockTime) || 0
53+
}
54+
5155
@def
5256
private solanaRpc(): Rpc {
5357
return new Rpc(this.rpc(), 0, this.options().maxConfirmationAttempts)

solana/solana-ingest/src/ingest.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,12 @@ export class SolanaIngest extends Ingest<Options> {
4545
})
4646
}
4747
}
48+
49+
protected getBlockHeight(block: any): number {
50+
return Number(block.header.height) || 0
51+
}
52+
53+
protected getBlockTimestamp(block: any): number {
54+
return Number(block.header.timestamp) || 0
55+
}
4856
}

substrate/substrate-dump/src/dumper.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,4 +113,8 @@ export class SubstrateDumper extends Dumper<BlockData, Options> {
113113
protected getPrevBlockHash(block: BlockData): string {
114114
return block.block.block.header.parentHash
115115
}
116+
117+
protected getBlockTimestamp(block: BlockData): number {
118+
return Math.floor(Date.now() / 1000); //mocked timestamp for now
119+
}
116120
}

substrate/substrate-ingest/src/ingest.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,12 @@ export class SubstrateIngest extends Ingest<Options> {
8888
yield toJSON(blocks)
8989
}
9090
}
91+
92+
protected getBlockHeight(block: any): number {
93+
return block.header.height || 0
94+
}
95+
96+
protected getBlockTimestamp(block: any): number {
97+
return Math.floor(block.header.timestamp / 1000) || 0
98+
}
9199
}

tron/tron-dump/src/dumper.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ export class TronDumper extends Dumper<BlockData, Options> {
2626
return block.block.block_header.raw_data.parentHash
2727
}
2828

29+
protected getBlockTimestamp(block: BlockData): number {
30+
return block.block.block_header.raw_data.timestamp || 0
31+
}
32+
2933
@def
3034
httpApi(): HttpApi {
3135
let client = new TronHttpClient({

tron/tron-ingest/src/ingest.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,12 @@ export class TronIngest extends Ingest<IngestOptions> {
2929
})
3030
}
3131
}
32+
33+
protected getBlockHeight(block: any): number {
34+
return block.header.height || 0
35+
}
36+
37+
protected getBlockTimestamp(block: any): number {
38+
return Math.floor(block.header.timestamp / 1000) || 0
39+
}
3240
}

util/rpc-client/src/client.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,15 @@ export interface RpcClientOptions {
6262
log?: Logger | null
6363
}
6464

65+
// Add interface for RPC metrics
66+
export interface RpcMetrics {
67+
url: string
68+
requestsServed: number
69+
connectionErrors: number
70+
notificationsReceived: number
71+
avg_response_time: number
72+
}
73+
6574

6675
export interface CallOptions<R=any> {
6776
priority?: number
@@ -113,6 +122,7 @@ export class RpcClient {
113122
private connectionErrors = 0
114123
private requestsServed = 0
115124
private notificationsReceived = 0
125+
private totalResponseTime = 0
116126
private backoffEpoch = 0
117127
private backoffTime?: number
118128
private notificationListeners: ((msg: RpcNotification) => void)[] = []
@@ -179,12 +189,13 @@ export class RpcClient {
179189
return this.maxCapacity
180190
}
181191

182-
getMetrics() {
192+
getMetrics(): RpcMetrics {
183193
return {
184194
url: this.url,
185195
requestsServed: this.requestsServed,
186196
connectionErrors: this.connectionErrors,
187-
notificationsReceived: this.notificationsReceived
197+
notificationsReceived: this.notificationsReceived,
198+
avg_response_time: this.requestsServed > 0 ? this.totalResponseTime / this.requestsServed : 0
188199
}
189200
}
190201

@@ -364,6 +375,7 @@ export class RpcClient {
364375
this.capacity -= 1
365376
let backoffEpoch = this.backoffEpoch
366377
let promise: Promise<any>
378+
const startTime = Date.now()
367379
if (Array.isArray(req.call)) {
368380
let call = req.call
369381
this.log?.debug({rpcBatchId: [call[0].id, last(call).id]}, 'rpc send')
@@ -382,6 +394,9 @@ export class RpcClient {
382394
})
383395
}
384396
promise.then(result => {
397+
const responseTimeSeconds = (Date.now() - startTime) / 1000
398+
this.totalResponseTime += responseTimeSeconds
399+
385400
this.requestsServed += 1
386401
if (this.backoffEpoch == backoffEpoch) {
387402
this.connectionErrorsInRow = 0

util/util-internal-dump-cli/src/dumper.ts

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,37 @@ export interface DumperOptions {
2323
writeBatchSize: number
2424
topDirSize: number
2525
metrics?: number
26+
maxCacheSize?: number
2627
}
2728

2829

2930
export abstract class Dumper<B extends {hash: string, height: number}, O extends DumperOptions = DumperOptions> {
31+
32+
private timestampCache = new Map<number, number>();
33+
34+
private addToCache(block: B): void {
35+
const maxCacheSize = this.options().maxCacheSize ?? this.getDefaultCacheSize();
36+
if (this.timestampCache.size >= maxCacheSize) {
37+
const heights = Array.from(this.timestampCache.keys()).sort((a, b) => a - b);
38+
const removeCount = Math.ceil(maxCacheSize * 0.2);
39+
const keysToRemove = heights.slice(0, removeCount);
40+
for (const key of keysToRemove) {
41+
this.timestampCache.delete(key);
42+
}
43+
this.log().debug(`Cache cleanup: removed ${keysToRemove.length} oldest block timestamps`);
44+
}
45+
46+
this.timestampCache.set(block.height, this.getBlockTimestamp(block));
47+
}
48+
3049
protected abstract getBlocks(range: Range): AsyncIterable<B[]>
3150

3251
protected abstract getFinalizedHeight(): Promise<number>
3352

3453
protected abstract getPrevBlockHash(block: B): string
3554

55+
protected abstract getBlockTimestamp(block: B): number
56+
3657
protected setUpProgram(program: Command): void {}
3758

3859
protected getDefaultChunkSize(): number {
@@ -43,6 +64,10 @@ export abstract class Dumper<B extends {hash: string, height: number}, O extends
4364
return 1024
4465
}
4566

67+
protected getDefaultCacheSize(): number {
68+
return 10
69+
}
70+
4671
protected getLoggingNamespace(): string {
4772
return 'sqd:dump'
4873
}
@@ -61,6 +86,7 @@ export abstract class Dumper<B extends {hash: string, height: number}, O extends
6186
program.option('--chunk-size <MB>', 'Data chunk size in megabytes', positiveInt, this.getDefaultChunkSize())
6287
program.option('--write-batch-size <number>', 'Number of blocks to write at a time', positiveInt, 10)
6388
program.option('--top-dir-size <number>', 'Number of items in a top level dir', positiveInt, this.getDefaultTopDirSize())
89+
program.option('--max-cache-size <number>', 'Maximum number of blocks to keep in memory cache', positiveInt, this.getDefaultCacheSize())
6490
program.option('--metrics <port>', 'Enable prometheus metrics server', nat)
6591
return program
6692
}
@@ -176,6 +202,17 @@ export abstract class Dumper<B extends {hash: string, height: number}, O extends
176202
}
177203
}
178204

205+
const lastBlock = last(blocks)
206+
const mintedTimestamp = this.getBlockTimestamp(lastBlock)
207+
208+
for (const block of blocks) {
209+
this.addToCache(block);
210+
}
211+
212+
this.prometheus().setLatestBlockMetrics(lastBlock.height, mintedTimestamp)
213+
this.log().debug(`Received block ${lastBlock.height} with minted timestamp ${mintedTimestamp}`)
214+
this.log().debug(`Cache size: ${this.timestampCache.size}`)
215+
179216
yield blocks
180217

181218
progress.setCurrentValue(last(blocks).height)
@@ -206,7 +243,11 @@ export abstract class Dumper<B extends {hash: string, height: number}, O extends
206243
for (let block of bb) {
207244
process.stdout.write(JSON.stringify(block) + '\n')
208245
}
209-
prometheus.setLastWrittenBlock(last(bb).height)
246+
const lastBlockHeight = last(bb).height;
247+
prometheus.setLastWrittenBlock(lastBlockHeight);
248+
const processedTimestamp = this.getBlockTimestamp(last(bb));
249+
prometheus.setProcessedBlockMetrics(processedTimestamp);
250+
this.log().debug(`Processed block ${lastBlockHeight} at ${processedTimestamp}`);
210251
}
211252
} else {
212253
let archive = new ArchiveLayout(this.destination(), {
@@ -217,7 +258,18 @@ export abstract class Dumper<B extends {hash: string, height: number}, O extends
217258
range: this.range(),
218259
chunkSize: chunkSize * 1024 * 1024,
219260
writeBatchSize: this.options().writeBatchSize,
220-
onSuccessWrite: ctx => prometheus.setLastWrittenBlock(ctx.blockRange.to.height)
261+
onSuccessWrite: ctx => {
262+
const blockHeight = ctx.blockRange.to.height;
263+
prometheus.setLastWrittenBlock(blockHeight);
264+
265+
const cachedTimestamp = this.timestampCache.get(blockHeight);
266+
if (cachedTimestamp) {
267+
prometheus.setProcessedBlockMetrics(cachedTimestamp);
268+
this.log().debug(`Processed block ${blockHeight} at ${cachedTimestamp}`);
269+
} else {
270+
this.log().warn(`No cached timestamp available for height ${blockHeight}`);
271+
}
272+
}
221273
})
222274
}
223275
}, err => {

0 commit comments

Comments
 (0)