Skip to content

Commit 61085a8

Browse files
authored
Merge pull request #119 from eosnetworkfoundation/kayan_ws_dest
route websocket proxy traffics to different destinations
2 parents 2e6bafb + 2146b6b commit 61085a8

File tree

4 files changed

+212
-87
lines changed

4 files changed

+212
-87
lines changed

peripherals/eos-evm-ws-proxy/block-monitor.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,10 @@ class BlockMonitor extends EventEmitter {
130130
}
131131
}
132132

133-
if( found_next_block == true ) {
133+
if( found_next_block == true && this.reversible_blocks.length > 180 + 6) { // reduce frequency of calling get_evm_lib
134134
const evm_lib = await this.get_evm_lib();
135-
while(this.reversible_blocks.length > 0 && this.reversible_blocks.peek().number < evm_lib) {
135+
// keep at least 180 blocks in case evm-node is out of sync with leap
136+
while(this.reversible_blocks.length > 180 && this.reversible_blocks.peek().number < evm_lib) {
136137
this.remove_front_block();
137138
}
138139
}

peripherals/eos-evm-ws-proxy/config.js

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
require('dotenv').config();
22
const ws_listening_port = parseInt(process.env.WS_LISTENING_PORT, 10) || 3333;
33
const ws_listening_host = process.env.WS_LISTENING_HOST || "localhost";
4-
const web3_rpc_endpoint = process.env.WEB3_RPC_ENDPOINT || "http://localhost:5000/";
4+
const web3_rpc_endpoint = process.env.WEB3_RPC_ENDPOINT || "http://127.0.0.1:8881/";
5+
const web3_rpc_test_endpoint = process.env.WEB3_RPC_TEST_ENDPOINT || "http://127.0.0.1:8882/";
56
const nodeos_rpc_endpoint = process.env.NODEOS_RPC_ENDPOINT || "http://127.0.0.1:8888/";
7+
const miner_rpc_endpoint = process.env.MINER_RPC_ENDPOINT || "http://127.0.0.1:18888/";
8+
const whitelist_methods = process.env.WHITELIST_METHODS || "net_version,eth_blockNumber,eth_chainId,eth_protocolVersion,eth_getBlockByHash,eth_getBlockByNumber,eth_getBlockTransactionCountByHash,eth_getBlockTransactionCountByNumber,eth_getUncleByBlockHashAndIndex,eth_getUncleByBlockNumberAndIndex,eth_getUncleCountByBlockHash,eth_getUncleCountByBlockNumber,eth_getTransactionByHash,eth_getRawTransactionByHash,eth_getTransactionByBlockHashAndIndex,eth_getRawTransactionByBlockHashAndIndex,eth_getTransactionByBlockNumberAndIndex,eth_getRawTransactionByBlockNumberAndIndex,eth_getTransactionReceipt,eth_getBlockReceipts,eth_estimateGas,eth_getBalance,eth_getCode,eth_getTransactionCount,eth_getStorageAt,eth_call,eth_callBundle,eth_createAccessList";
69
const poll_interval = parseInt(process.env.POLL_INTERVAL, 10) || 1000;
710
const max_logs_subs_per_connection = parseInt(process.env.MAX_LOGS_SUBS_PER_CONNECTION, 10) || 1;
811
const max_minedtx_subs_per_connection = parseInt(process.env.MAX_MINEDTX_SUBS_PER_CONNECTION, 10) || 1;
@@ -13,10 +16,13 @@ module.exports = {
1316
ws_listening_port,
1417
ws_listening_host,
1518
web3_rpc_endpoint,
19+
web3_rpc_test_endpoint,
1620
nodeos_rpc_endpoint,
21+
miner_rpc_endpoint,
1722
poll_interval,
1823
max_logs_subs_per_connection,
1924
max_minedtx_subs_per_connection,
2025
log_level,
26+
whitelist_methods,
2127
genesis_json
2228
};

peripherals/eos-evm-ws-proxy/subscription-server.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,17 @@ const {Web3} = require('web3');
55
const {bigint_replacer, load_json_file} = require('./utils');
66
class SubscriptionServer extends EventEmitter {
77

8-
constructor({web3_rpc_endpoint, nodeos_rpc_endpoint, ws_listening_host, ws_listening_port, poll_interval, max_logs_subs_per_connection, max_minedtx_subs_per_connection, genesis_json, logger}) {
8+
constructor({web3_rpc_endpoint, web3_rpc_test_endpoint, nodeos_rpc_endpoint, miner_rpc_endpoint, ws_listening_host, ws_listening_port, poll_interval, max_logs_subs_per_connection, max_minedtx_subs_per_connection, logger, whitelist_methods, genesis_json}) {
99
super();
1010

1111
const genesis = load_json_file(genesis_json);
1212

1313
this.block_monitor = new BlockMonitor({web3_rpc_endpoint, nodeos_rpc_endpoint, poll_interval, genesis, logger});
14-
this.web_socket_handler = new WebSocketHandler({ws_listening_host, ws_listening_port, web3_rpc_endpoint, logger});
14+
this.web_socket_handler = new WebSocketHandler({ws_listening_host, ws_listening_port, web3_rpc_endpoint, web3_rpc_test_endpoint, miner_rpc_endpoint, logger, whitelist_methods});
1515
this.max_logs_subs_per_connection = max_logs_subs_per_connection;
1616
this.max_minedtx_subs_per_connection = max_minedtx_subs_per_connection;
1717
this.web3 = new Web3(web3_rpc_endpoint);
18-
this.logger = logger
18+
this.logger = logger;
1919
this.genesis = genesis;
2020

2121
this.new_head_subs = new Map();

peripherals/eos-evm-ws-proxy/websocket-handler.js

Lines changed: 199 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,21 @@ const { is_plain_object } = require('./utils');
2828

2929
class WebSocketHandler extends EventEmitter {
3030

31-
constructor({ ws_listening_host, ws_listening_port, web3_rpc_endpoint, logger }) {
31+
constructor({ ws_listening_host, ws_listening_port, web3_rpc_endpoint, web3_rpc_test_endpoint, miner_rpc_endpoint, logger, whitelist_methods }) {
3232
super();
3333

3434
this.host = ws_listening_host;
3535
this.port = ws_listening_port;
3636
this.web3_rpc_endpoint = web3_rpc_endpoint;
37+
this.web3_rpc_test_endpoint = web3_rpc_test_endpoint;
38+
this.miner_rpc_endpoint = miner_rpc_endpoint;
3739
this.logger = logger;
40+
this.whitelist_methods = whitelist_methods.split(",");
41+
42+
console.log("web3_rpc_endpoint is:", this.web3_rpc_endpoint);
43+
console.log("web3_rpc_test_endpoint is:", this.web3_rpc_test_endpoint);
44+
console.log("miner_rpc_endpoint is:", this.miner_rpc_endpoint);
45+
console.log("whitelist_methods are:", this.whitelist_methods);
3846

3947
this.server = http.createServer((req, res) => {
4048
this.handle_http_request(req, res);
@@ -62,96 +70,94 @@ class WebSocketHandler extends EventEmitter {
6270
});
6371
}
6472

65-
async handle_eth_subscribe(ws, data) {
73+
handle_eth_subscribe(ws, data) {
6674

67-
try {
68-
if (!data.hasOwnProperty('params') || !Array.isArray(data.params) || data.params.length == 0) {
69-
throw new Error("No params");
70-
}
75+
if (!data.hasOwnProperty('params') || !Array.isArray(data.params) || data.params.length == 0) {
76+
throw new Error("No params");
77+
}
7178

72-
const subscription_type = data.params[0];
73-
if( subscription_type == "newHeads") {
74-
const subid = uuidv4();
75-
this.emit('newHeads', {subid, ws});
76-
ws.send(JSON.stringify({ jsonrpc: "2.0", result: subid, id: data.id }));
77-
} else if (subscription_type == "logs") {
78-
const subid = uuidv4();
79-
let filter = {};
80-
if(data.params.length > 1) {
81-
filter = data.params[1];
82-
if (!(is_plain_object(filter) && filter != null)) {
83-
throw new Error("Invalid filter");
84-
}
85-
if (filter.hasOwnProperty('address') && !(typeof filter.address === 'string' || Array.isArray(filter.address))) {
86-
throw new Error("Invalid address filter");
87-
}
88-
if (filter.hasOwnProperty('topics') && !Array.isArray(filter.topics)) {
89-
throw new Error("Invalid topics filter");
90-
}
79+
const subscription_type = data.params[0];
80+
if( subscription_type == "newHeads") {
81+
const subid = uuidv4();
82+
this.emit('newHeads', {subid, ws});
83+
return { jsonrpc: "2.0", result: subid, id: data.id };
84+
} else if (subscription_type == "logs") {
85+
const subid = uuidv4();
86+
let filter = {};
87+
if(data.params.length > 1) {
88+
filter = data.params[1];
89+
if (!(is_plain_object(filter) && filter != null)) {
90+
throw new Error("Invalid filter");
91+
}
92+
if (filter.hasOwnProperty('address') && !(typeof filter.address === 'string' || Array.isArray(filter.address))) {
93+
throw new Error("Invalid address filter");
94+
}
95+
if (filter.hasOwnProperty('topics') && !Array.isArray(filter.topics)) {
96+
throw new Error("Invalid topics filter");
97+
}
98+
}
99+
this.emit('logs', {subid, ws, filter});
100+
return { jsonrpc: "2.0", result: subid, id: data.id };
101+
} else if (subscription_type == "minedTransactions") {
102+
const subid = uuidv4();
103+
let filter = {};
104+
if(data.params.length > 1) {
105+
filter = data.params[1];
106+
if (!(is_plain_object(filter) && filter != null)) {
107+
throw new Error("Invalid filter");
91108
}
92-
this.emit('logs', {subid, ws, filter});
93-
ws.send(JSON.stringify({ jsonrpc: "2.0", result: subid, id: data.id }));
94-
} else if (subscription_type == "minedTransactions") {
95-
const subid = uuidv4();
96-
let filter = {};
97-
if(data.params.length > 1) {
98-
filter = data.params[1];
99-
if (!(is_plain_object(filter) && filter != null)) {
100-
throw new Error("Invalid filter");
109+
if (filter.hasOwnProperty('addresses')) {
110+
if(!Array.isArray(filter.addresses)) {
111+
throw new Error("Invalid addresses filter");
101112
}
102-
if (filter.hasOwnProperty('addresses')) {
103-
if(!Array.isArray(filter.addresses)) {
104-
throw new Error("Invalid addresses filter");
113+
for(const ofilter of filter.addresses) {
114+
this.logger.debug("ELEMENT: ", ofilter);
115+
if(!is_plain_object(ofilter) || (typeof(ofilter.to) != 'string' && typeof(ofilter.from) != 'string') ) {
116+
throw new Error("Invalid addresses filter element");
105117
}
106-
for(const ofilter of filter.addresses) {
107-
this.logger.debug("ELEMENT: ", ofilter);
108-
if(!is_plain_object(ofilter) || (typeof(ofilter.to) != 'string' && typeof(ofilter.from) != 'string') ) {
109-
throw new Error("Invalid addresses filter element");
110-
}
111-
if(typeof(ofilter.to) == 'string') {
112-
ofilter.to = ofilter.to.toLowerCase();
113-
}
114-
if(typeof(ofilter.from) == 'string') {
115-
ofilter.from = ofilter.from.toLowerCase();
116-
}
118+
if(typeof(ofilter.to) == 'string') {
119+
ofilter.to = ofilter.to.toLowerCase();
120+
}
121+
if(typeof(ofilter.from) == 'string') {
122+
ofilter.from = ofilter.from.toLowerCase();
117123
}
118-
}
119-
if (filter.hasOwnProperty('includeRemoved') && typeof(filter.includeRemoved) != 'boolean') {
120-
throw new Error("Invalid includeRemoved filter");
121-
}
122-
if (filter.hasOwnProperty('hashesOnly') && typeof(filter.hashesOnly) != 'boolean') {
123-
throw new Error("Invalid hashesOnly filter");
124124
}
125125
}
126-
this.emit('minedTransactions', {subid, ws, filter});
127-
ws.send(JSON.stringify({ jsonrpc: "2.0", result: subid, id: data.id }));
128-
} else {
129-
throw new Error(`${data.params[0]} not supported`);
126+
if (filter.hasOwnProperty('includeRemoved') && typeof(filter.includeRemoved) != 'boolean') {
127+
throw new Error("Invalid includeRemoved filter");
128+
}
129+
if (filter.hasOwnProperty('hashesOnly') && typeof(filter.hashesOnly) != 'boolean') {
130+
throw new Error("Invalid hashesOnly filter");
131+
}
130132
}
131-
} catch (error) {
132-
this.send_json_rpc_error(ws, data.id, -32000, error.message);
133+
this.emit('minedTransactions', {subid, ws, filter});
134+
return { jsonrpc: "2.0", result: subid, id: data.id };
135+
} else {
136+
throw new Error(`${data.params[0]} not supported`);
133137
}
134138
}
135139

136-
async handle_eth_unsubscribe(ws, data) {
137-
try {
138-
if (!data.hasOwnProperty('params') || !Array.isArray(data.params) || data.params.length == 0) {
139-
throw new Error("Invalid params");
140-
}
141-
const subid = data.params[0];
142-
this.emit('unsubscribe', {subid, ws});
143-
ws.send(JSON.stringify({ jsonrpc: "2.0", result: true, id: data.id }));
144-
} catch (error) {
145-
this.send_json_rpc_error(ws, data.id, -32000, error.message);
140+
handle_eth_unsubscribe(ws, data) {
141+
if (!data.hasOwnProperty('params') || !Array.isArray(data.params) || data.params.length == 0) {
142+
throw new Error("Invalid params");
146143
}
144+
const subid = data.params[0];
145+
this.emit('unsubscribe', {subid, ws});
146+
return { jsonrpc: "2.0", result: true, id: data.id };
147147
}
148148

149-
async handle_other_methods(ws, data) {
150-
try {
149+
async handle_miner_methods(data) {
150+
const response = await axios.post(this.miner_rpc_endpoint, data);
151+
return response.data;
152+
}
153+
154+
async handle_other_methods(data, use_test_rpc) {
155+
if (use_test_rpc) {
156+
const response = await axios.post(this.web3_rpc_test_endpoint, data);
157+
return response.data;
158+
} else {
151159
const response = await axios.post(this.web3_rpc_endpoint, data);
152-
ws.send(JSON.stringify(response.data));
153-
} catch (error) {
154-
this.send_json_rpc_error(ws, data.id, -32000, "Sever Error");
160+
return response.data;
155161
}
156162
}
157163

@@ -166,27 +172,139 @@ class WebSocketHandler extends EventEmitter {
166172
async handle_message(ws, message) {
167173

168174
let data;
175+
let use_test_rpc = false;
169176
try {
170177
data = JSON.parse(message);
171178
} catch (e) {
172179
ws.send(JSON.stringify({ error: "Invalid JSON" }));
173180
return;
174181
}
175182

176-
switch (data.method) {
183+
if (Array.isArray(data)) {
184+
let data2 = [];
185+
for (let i = 0; i < data.length; ++i) {
186+
switch(data[i].method) {
187+
case 'eth_subscribe':
188+
case 'eth_unsubscribe':
189+
case 'eth_gasPrice':
190+
case 'eth_sendRawTransaction':
191+
break;
192+
default:
193+
if ((this.whitelist_methods.indexOf(data[i].method) < 0)) {
194+
use_test_rpc = true;
195+
}
196+
data2.push(data[i]);
197+
}
198+
}
199+
let rpc_response_data = [];
200+
let rpc_error_message = null;
201+
if (data2.length > 0) {
202+
try {
203+
const rpc_response = await this.handle_other_methods(data2, use_test_rpc);
204+
if (Array.isArray(rpc_response)) {
205+
rpc_response_data = rpc_response;
206+
} else {
207+
rpc_error_message = "RPC Server Error:" + JSON.stringify({message: rpc_response});
208+
}
209+
} catch (error) {
210+
rpc_error_message = "RPC Server Error:" + error.message;
211+
}
212+
}
213+
let rpc_index = 0;
214+
let batch_response = [];
215+
for (let i = 0; i < data.length; ++i) {
216+
switch(data[i].method) {
217+
case 'eth_subscribe':
218+
try {
219+
batch_response.push(this.handle_eth_subscribe(ws, data[i]));
220+
} catch (error) {
221+
batch_response.push({
222+
id : data[i].id,
223+
jsonrpc : "2.0",
224+
error : { code: -32000, message: error.message }
225+
});
226+
}
227+
break;
228+
case 'eth_unsubscribe':
229+
try {
230+
batch_response.push(this.handle_eth_unsubscribe(ws, data[i]));
231+
} catch (error) {
232+
batch_response.push({
233+
id : data[i].id,
234+
jsonrpc : "2.0",
235+
error : { code: -32000, message: error.message }
236+
});
237+
}
238+
break;
239+
case 'eth_gasPrice':
240+
case 'eth_sendRawTransaction':
241+
try {
242+
const response_json = await this.handle_miner_methods(data[i]);
243+
batch_response.push(response_json);
244+
} catch (error) {
245+
batch_response.push({
246+
id : data[i].id,
247+
jsonrpc : "2.0",
248+
error : { code: -32000, message: error.message }
249+
});
250+
}
251+
break;
252+
default:
253+
if (rpc_index < rpc_response_data.length) {
254+
batch_response.push(rpc_response_data[rpc_index]);
255+
rpc_index++;
256+
} else {
257+
batch_response.push({
258+
id : data[i].id,
259+
jsonrpc : "2.0",
260+
error : { code: -32000, message: rpc_error_message }
261+
});
262+
}
263+
}
264+
}
265+
ws.send(JSON.stringify(batch_response));
266+
}
267+
else {
268+
switch (data.method) {
177269
case 'eth_subscribe':
178-
await this.handle_eth_subscribe(ws, data);
270+
try {
271+
const response_json = this.handle_eth_subscribe(ws, data);
272+
ws.send(JSON.stringify(response_json));
273+
} catch (error) {
274+
this.send_json_rpc_error(ws, data.id, -32000, error.message);
275+
}
179276
break;
180277

181278
case 'eth_unsubscribe':
182-
await this.handle_eth_unsubscribe(ws, data);
279+
try {
280+
const response_json = this.handle_eth_unsubscribe(ws, data);
281+
ws.send(JSON.stringify(response_json));
282+
} catch (error) {
283+
this.send_json_rpc_error(ws, data.id, -32000, error.message);
284+
}
285+
break;
286+
case 'eth_gasPrice':
287+
case 'eth_sendRawTransaction':
288+
try {
289+
const response_json = await this.handle_miner_methods(data);
290+
ws.send(JSON.stringify(response_json));
291+
} catch (error) {
292+
this.send_json_rpc_error(ws, data.id, -32000, "RPC Miner Error:" + error.message);
293+
}
183294
break;
184-
185295
default:
186-
await this.handle_other_methods(ws, data);
296+
try {
297+
if ((this.whitelist_methods.indexOf(data.method) < 0)) {
298+
use_test_rpc = true;
299+
}
300+
const response_json = await this.handle_other_methods(data, use_test_rpc);
301+
ws.send(JSON.stringify(response_json));
302+
} catch (error) {
303+
this.send_json_rpc_error(ws, data.id, -32000, "RPC Server Error:" + error.message);
304+
}
187305
break;
306+
}
188307
}
189-
190308
}
191309

192310
handle_close(ws) {

0 commit comments

Comments
 (0)