-
Notifications
You must be signed in to change notification settings - Fork 521
Expand file tree
/
Copy pathindex.ts
More file actions
156 lines (134 loc) · 5.25 KB
/
index.ts
File metadata and controls
156 lines (134 loc) · 5.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import { TimeoutError } from '@libp2p/interface'
import { pbStream } from '@libp2p/utils'
import { Message, MessageType } from '../message/dht.js'
import { AddProviderHandler } from './handlers/add-provider.js'
import { FindNodeHandler } from './handlers/find-node.js'
import { GetProvidersHandler } from './handlers/get-providers.js'
import { GetValueHandler } from './handlers/get-value.js'
import { PingHandler } from './handlers/ping.js'
import { PutValueHandler } from './handlers/put-value.js'
import type { PeerInfoMapper, Selectors, Validators } from '../index.js'
import type { PeerRouting } from '../peer-routing/index.js'
import type { Providers } from '../providers.js'
import type { FindNodeHandlerComponents } from './handlers/find-node.js'
import type { GetProvidersHandlerComponents } from './handlers/get-providers.js'
import type { GetValueHandlerComponents } from './handlers/get-value.js'
import type { PutValueHandlerComponents } from './handlers/put-value.js'
import type { RoutingTable } from '../routing-table/index.js'
import type { CounterGroup, Logger, Metrics, PeerId, MetricGroup, Connection, Stream } from '@libp2p/interface'
export interface DHTMessageHandler {
handle(peerId: PeerId, msg: Message): Promise<Message | undefined>
}
export interface RPCInit {
routingTable: RoutingTable
providers: Providers
peerRouting: PeerRouting
validators: Validators
selectors: Selectors
logPrefix: string
metricsPrefix: string
datastorePrefix: string
peerInfoMapper: PeerInfoMapper
incomingMessageTimeout?: number
}
export interface RPCComponents extends GetValueHandlerComponents, PutValueHandlerComponents, FindNodeHandlerComponents, GetProvidersHandlerComponents {
metrics?: Metrics
}
export class RPC {
private readonly handlers: Record<string, DHTMessageHandler>
private readonly log: Logger
private readonly metrics: {
operations?: CounterGroup
errors?: CounterGroup
rpcTime?: MetricGroup
}
private readonly incomingMessageTimeout: number
constructor (components: RPCComponents, init: RPCInit) {
this.metrics = {
operations: components.metrics?.registerCounterGroup(`${init.metricsPrefix}_inbound_rpc_requests_total`),
errors: components.metrics?.registerCounterGroup(`${init.metricsPrefix}_inbound_rpc_errors_total`),
rpcTime: components.metrics?.registerMetricGroup(`${init.metricsPrefix}_inbound_rpc_time_seconds`, { label: 'operation' })
}
this.log = components.logger.forComponent(`${init.logPrefix}:rpc`)
this.incomingMessageTimeout = init.incomingMessageTimeout ?? 10_000
this.handlers = {
[MessageType.GET_VALUE.toString()]: new GetValueHandler(components, init),
[MessageType.PUT_VALUE.toString()]: new PutValueHandler(components, init),
[MessageType.FIND_NODE.toString()]: new FindNodeHandler(components, init),
[MessageType.ADD_PROVIDER.toString()]: new AddProviderHandler(components, init),
[MessageType.GET_PROVIDERS.toString()]: new GetProvidersHandler(components, init),
[MessageType.PING.toString()]: new PingHandler(components, init)
}
}
/**
* Process incoming DHT messages
*/
async handleMessage (peerId: PeerId, msg: Message): Promise<Message | undefined> {
// get handler & execute it
const handler = this.handlers[msg.type]
if (handler == null) {
this.log.error(`no handler found for message type: ${msg.type}`)
return
}
try {
this.metrics.operations?.increment({
[msg.type]: true
})
return await handler.handle(peerId, msg)
} catch {
this.metrics.errors?.increment({
[msg.type]: true
})
}
}
/**
* Handle incoming streams on the dht protocol
*/
async onIncomingStream (stream: Stream, connection: Connection): Promise<void> {
const abortListener = (): void => {
stream.abort(new TimeoutError())
}
let signal = AbortSignal.timeout(this.incomingMessageTimeout)
signal.addEventListener('abort', abortListener)
const messages = pbStream(stream).pb(Message)
while (true) {
// the remote will not send any more data
if (stream.readStatus !== 'readable') {
await stream.close({
signal
})
break
}
const message = await messages.read({
signal
})
const stopSuccessTimer = this.metrics?.rpcTime?.timer(message.type.toString())
const stopErrorTimer = this.metrics?.rpcTime?.timer(message.type.toString())
let errored = false
try {
// handle the message
this.log('incoming %s from %p', message.type, connection.remotePeer)
const res = await this.handleMessage(connection.remotePeer, message)
// Not all handlers will return a response
if (res != null) {
await messages.write(res, {
signal
})
}
} catch (err) {
errored = true
stopErrorTimer?.()
throw err
} finally {
if (!errored) {
stopSuccessTimer?.()
}
}
// we have received a message so reset the timeout controller to
// allow the remote to send another
signal.removeEventListener('abort', abortListener)
signal = AbortSignal.timeout(this.incomingMessageTimeout)
signal.addEventListener('abort', abortListener)
}
}
}