forked from pryv/lib-js
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSocketIO.js
More file actions
165 lines (150 loc) · 5.39 KB
/
SocketIO.js
File metadata and controls
165 lines (150 loc) · 5.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
/**
* @license
* [BSD-3-Clause](https://github.com/pryv/lib-js/blob/master/LICENSE)
*/
const io = require('socket.io-client');
const { EventEmitter } = require('events');
const EVENTS = ['eventsChanged', 'streamsChanged', 'accessesChanged', 'disconnect', 'error'];
/**
* Socket.IO transport for a Connection.
* Use connection.socket to access the instance associated with a Connection.
* @memberof pryv
* @extends EventEmitter
*/
class SocketIO extends EventEmitter {
/**
* @param {Connection} connection - The connection to bind to
*/
constructor (connection) {
super();
/** @type {Connection} */
this.connection = connection;
/** @type {boolean} */
this.connecting = false;
this._io = null;
}
/**
* Open the Socket.IO stream
* @throws {Error} On connection failures
* @returns {Promise<SocketIO>} Promise resolving to this SocketIO instance
*/
async open () {
return new Promise((resolve, reject) => {
if (this._io) return resolve(this);
if (this.connecting) return reject(new Error('open() process in course'));
this.connecting = true;
this.connection.username()
.then(username => {
const socketEndpoint = this.connection.endpoint + username + '?auth=' + this.connection.token;
// Cap reconnects so a server-side outage can't drive a runaway loop.
// socket.io-client default is reconnectionAttempts: Infinity, max delay 5s.
// With these settings, a stuck client gives up after ~10 attempts spread
// over ~1 minute (with randomization), then surfaces 'error' to consumers.
// @ts-ignore - io is callable in socket.io-client
this._io = io(socketEndpoint, {
forceNew: true,
transports: ['websocket'],
reconnectionAttempts: 10,
reconnectionDelayMax: 60000,
randomizationFactor: 0.5
});
// Terminal failure: socket.io-client gave up reconnecting.
// Tear down our handle and surface 'error' so consumers (e.g. Monitor)
// can clean up instead of leaving a zombie socket reference.
this._io.on('reconnect_failed', () => {
const dead = this._io;
this._io = null;
this.connecting = false;
try { if (dead) dead.close(); } catch (ex) { }
this.emit('error', new Error('socket.io: reconnect_failed (gave up after configured attempts)'));
});
// handle failure
for (const errcode of ['connect_error', 'connection_failed', 'error', 'connection_timeout']) {
const myCode = errcode;
this._io.on(errcode, (e) => {
if (!this.connecting) return; // do not care about errors if connected
this._io = null;
this.connecting = false;
if (e === null) { e = myCode; }
if (!(e instanceof Error)) { e = new Error(e); }
try { this._io.close(); } catch (ex) { }
return reject(e);
});
}
// handle success
this._io.on('connect', () => {
this.connecting = false;
registerListeners(this);
resolve(this);
});
})
.catch(e => {
this._io = null;
this.connecting = false;
return reject(e);
});
});
}
/**
* Close the socket
*/
close () {
checkOpen(this);
this._io.close();
}
/**
* Add listener for Socket.IO events
* @param {('eventsChanged'|'streamsChanged'|'accessesChanged'|'disconnect'|'error')} eventName - The event to listen for
* @param {Function} listener - The callback function
* @returns {SocketIO} this
*/
// @ts-ignore - overriding EventEmitter.on with restricted signature
on (eventName, listener) {
checkOpen(this);
if (EVENTS.indexOf(eventName) < 0) {
throw new Error('Unkown event [' + eventName + ']. Allowed events are: ' + EVENTS);
}
// @ts-ignore
return super.on(eventName, listener);
}
/**
* Identical to Connection.api() but using Socket.IO transport
* @param {Array<MethodCall>} arrayOfAPICalls - Array of Method Calls
* @param {Function} [progress] - Return percentage of progress (0 - 100)
* @returns {Promise<Array>} Promise to Array of results matching each method call in order
*/
async api (arrayOfAPICalls, progress) {
checkOpen(this);
function httpHandler (batchCall) {
return new Promise((resolve, reject) => {
this._io.emit('callBatch', batchCall, function (err, res) {
if (err) return reject(err);
resolve(res);
});
});
}
return await this.connection._chunkedBatchCall(arrayOfAPICalls, progress, httpHandler.bind(this));
}
}
// private method to fence the usage of socket before being open
function checkOpen (socket) {
if (!socket._io) throw new Error('Initialize socket.io with connection.socket.open() before');
}
// private method to register to all events for an open socket
// and relay it.
function registerListeners (socket) {
for (const event of EVENTS) {
socket._io.on(event, (...args) => {
socket.emit(event, ...args);
});
}
}
module.exports = function (Connection) {
Object.defineProperty(Connection.prototype, 'socket', {
get: function () {
if (this._socket) return this._socket;
this._socket = new SocketIO(this);
return this._socket;
}
});
};