Skip to content

Commit 1f6a573

Browse files
octo-gonebelopash
authored andcommitted
feat: add s3 ops metrics (#317)
* feat: add s3 ops metrics * changes * fix: add event emitter and change metric to Counter
1 parent 00d1d69 commit 1f6a573

File tree

13 files changed

+135
-14
lines changed

13 files changed

+135
-14
lines changed

.dockerignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,5 @@ docker-compose.yml
3232
/ops/docker-publish.sh
3333

3434
**/.DS_Store
35+
36+
*.temp

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ common/autoinstallers/*/.npmrc
1919

2020
# IDE
2121
.idea
22+
.vscode
2223

2324
# Built js libs
2425
/*/*/lib
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"changes": [
3+
{
4+
"packageName": "@subsquid/util-internal-dump-cli",
5+
"comment": "add prometheus metrics for S3 file system handler",
6+
"type": "patch"
7+
}
8+
],
9+
"packageName": "@subsquid/util-internal-dump-cli"
10+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"changes": [
3+
{
4+
"packageName": "@subsquid/util-internal-fs",
5+
"comment": "add metrics for S3 file system handler",
6+
"type": "patch"
7+
}
8+
],
9+
"packageName": "@subsquid/util-internal-fs"
10+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"changes": [
3+
{
4+
"packageName": "@subsquid/util-internal-ingest-cli",
5+
"comment": "add prometheus metrics",
6+
"type": "minor"
7+
}
8+
],
9+
"packageName": "@subsquid/util-internal-ingest-cli"
10+
}

common/config/rush/pnpm-lock.yaml

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

util/util-internal-dump-cli/src/dumper.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {createFs, Fs} from '@subsquid/util-internal-fs'
88
import {assertRange, printRange, Range, rangeEnd} from '@subsquid/util-internal-range'
99
import {Command} from 'commander'
1010
import {PrometheusServer} from './prometheus'
11+
import {EventEmitter} from 'events'
1112

1213

1314
export interface DumperOptions {
@@ -93,7 +94,7 @@ export abstract class Dumper<B extends {hash: string, height: number}, O extends
9394
@def
9495
protected destination(): Fs {
9596
let dest = assertNotNull(this.options().dest)
96-
return createFs(dest)
97+
return createFs(dest, this.eventEmitter())
9798
}
9899

99100
@def
@@ -118,14 +119,21 @@ export abstract class Dumper<B extends {hash: string, height: number}, O extends
118119
return true
119120
}
120121

122+
@def
123+
protected eventEmitter(): EventEmitter {
124+
return new EventEmitter()
125+
}
126+
121127
@def
122128
protected prometheus() {
123-
return new PrometheusServer(
129+
let server = new PrometheusServer(
124130
this.options().metrics ?? 0,
125131
() => this.getFinalizedHeight(),
126132
this.rpc(),
127133
this.log().child('prometheus')
128134
)
135+
this.eventEmitter().on('S3FsOperation', (op: string) => server.incS3Requests(op))
136+
return server
129137
}
130138

131139
private async *ingest(from?: number, prevHash?: string): AsyncIterable<B[]> {

util/util-internal-dump-cli/src/prometheus.ts

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
import {Logger} from '@subsquid/logger'
22
import {RpcClient} from '@subsquid/rpc-client'
33
import {createPrometheusServer, ListeningServer} from '@subsquid/util-internal-prometheus-server'
4-
import {collectDefaultMetrics, Gauge, Registry} from 'prom-client'
4+
import {collectDefaultMetrics, Gauge, Counter, Registry} from 'prom-client'
55

66

77
export class PrometheusServer {
88
private registry = new Registry()
99
private chainHeightGauge: Gauge
1010
private lastWrittenBlockGauge: Gauge
1111
private rpcRequestsGauge: Gauge
12+
private s3RequestsCounter: Counter
1213

1314
constructor(
1415
private port: number,
@@ -30,13 +31,13 @@ export class PrometheusServer {
3031
}
3132
this.set(chainHeight)
3233
}
33-
});
34+
})
3435

3536
this.lastWrittenBlockGauge = new Gauge({
3637
name: 'sqd_dump_last_written_block',
3738
help: 'Last saved block',
3839
registers: [this.registry]
39-
});
40+
})
4041

4142
this.rpcRequestsGauge = new Gauge({
4243
name: 'sqd_rpc_request_count',
@@ -56,7 +57,14 @@ export class PrometheusServer {
5657
kind: 'failure'
5758
}, metrics.connectionErrors)
5859
}
59-
});
60+
})
61+
62+
this.s3RequestsCounter = new Counter({
63+
name: 'sqd_s3_request_count',
64+
help: 'Number of s3 requests made',
65+
labelNames: ['kind'],
66+
registers: [this.registry],
67+
})
6068

6169
collectDefaultMetrics({register: this.registry})
6270
}
@@ -65,6 +73,10 @@ export class PrometheusServer {
6573
this.lastWrittenBlockGauge.set(block)
6674
}
6775

76+
incS3Requests(kind: string, value?: number) {
77+
this.s3RequestsCounter.inc({kind}, value)
78+
}
79+
6880
serve(): Promise<ListeningServer> {
6981
return createPrometheusServer(this.registry, this.port)
7082
}

util/util-internal-fs/src/factory.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@ import {S3Client} from '@aws-sdk/client-s3'
22
import {Fs} from './interface'
33
import {LocalFs} from './local'
44
import {S3Fs} from './s3'
5+
import {EventEmitter} from 'events'
56

67

7-
export function createFs(url: string): Fs {
8+
export function createFs(url: string, eventEmitter?: EventEmitter): Fs {
89
if (url.includes('://')) {
910
let protocol = new URL(url).protocol
1011
switch(protocol) {
1112
case 's3:':
12-
return createS3Fs(url.slice('s3://'.length))
13+
return createS3Fs(url.slice('s3://'.length), eventEmitter)
1314
default:
1415
throw new Error(`Unsupported protocol: ${protocol}`)
1516
}
@@ -19,12 +20,13 @@ export function createFs(url: string): Fs {
1920
}
2021

2122

22-
function createS3Fs(root: string): S3Fs {
23+
function createS3Fs(root: string, eventEmitter?: EventEmitter): S3Fs {
2324
let client = new S3Client({
2425
endpoint: process.env.AWS_S3_ENDPOINT
2526
})
2627
return new S3Fs({
2728
root,
28-
client
29+
client,
30+
eventEmitter
2931
})
3032
}

util/util-internal-fs/src/s3.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,25 @@ import assert from 'assert'
1010
import {Readable} from 'stream'
1111
import Upath from 'upath'
1212
import {Fs} from './interface'
13+
import {EventEmitter} from 'events'
1314

1415

1516
export interface S3FsOptions {
1617
root: string
1718
client: S3Client
19+
eventEmitter?: EventEmitter
1820
}
1921

2022

2123
export class S3Fs implements Fs {
2224
public readonly client: S3Client
2325
private root: string
26+
private eventEmitter?: EventEmitter
2427

2528
constructor(options: S3FsOptions) {
2629
this.client = options.client
2730
this.root = Upath.normalizeTrim(options.root)
31+
this.eventEmitter = options.eventEmitter
2832
splitPath(this.root)
2933
}
3034

@@ -52,7 +56,8 @@ export class S3Fs implements Fs {
5256
cd(...path: string[]): S3Fs {
5357
return new S3Fs({
5458
client: this.client,
55-
root: this.resolve(path)
59+
root: this.resolve(path),
60+
eventEmitter: this.eventEmitter
5661
})
5762
}
5863

@@ -74,6 +79,7 @@ export class S3Fs implements Fs {
7479
ContinuationToken
7580
})
7681
)
82+
this.eventEmitter?.emit('S3FsOperation', 'ListObjectsV2')
7783

7884
// process folder names
7985
if (res.CommonPrefixes) {
@@ -116,6 +122,7 @@ export class S3Fs implements Fs {
116122
Key,
117123
Body: content
118124
}))
125+
this.eventEmitter?.emit('S3FsOperation', 'PutObject')
119126
}
120127

121128
async delete(path: string): Promise<void> {
@@ -129,6 +136,7 @@ export class S3Fs implements Fs {
129136
ContinuationToken
130137
})
131138
)
139+
this.eventEmitter?.emit('S3FsOperation', 'ListObjectsV2')
132140

133141
if (list.Contents) {
134142
let Objects: ObjectIdentifier[] = []
@@ -144,6 +152,7 @@ export class S3Fs implements Fs {
144152
Objects
145153
}
146154
}))
155+
this.eventEmitter?.emit('S3FsOperation', 'DeleteObjects')
147156
}
148157

149158
if (list.IsTruncated) {
@@ -160,6 +169,7 @@ export class S3Fs implements Fs {
160169
Bucket,
161170
Key
162171
}))
172+
this.eventEmitter?.emit('S3FsOperation', 'GetObject')
163173
assert(res.Body instanceof Readable)
164174
return res.Body
165175
}

0 commit comments

Comments
 (0)