-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathSubscription.js
More file actions
114 lines (103 loc) · 4.29 KB
/
Subscription.js
File metadata and controls
114 lines (103 loc) · 4.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import { match_queries, query_match_data } from './TaiiNet.js';
import { EventBase } from "./EventBase.js";
export class Subscription extends EventBase {
constructor(sn, swarm, query, backlog) {
super();
this.query = query;
this.maximum_upstream_peers = 3;
this.upstream_peers = [];
this.upstream_connections = [];
this.downstream_peers = [];
this.sn = sn;
this.swarm = swarm;
this.connection_pool = [];
this.messages = [];
this.backlog = backlog;
// add all the peers from the swarm that have all the data we need for this
// subscription to our list of peers
for (var i in swarm.all_peers) {
if (match_queries(this.query, this.swarm.all_peers[i].query)) {
this.offer_connection(this.swarm.all_peers[i]);
}
}
// send ourselves to existing clients
this.sn.signaller.emit("socket_broadcast", [{
query: this.query
}]);
// every single socket we get notified about from the signaller
this.sn.signaller.on("socket_broadcast", function (socket) {
console.log("got socket");
console.log(socket);
this.offer_connection(socket.id, socket.query);
}.bind(this));
swarm.on("peer-connected", function (peer) {
console.log("swarm fires peer-connected");
// if peer is one we're interested in
if (match_queries(this.query, peer.query)) {
this.trigger("peer-connected", peer);
// did we initiate this connection?
if (this.upstream_connections.indexOf(peer)) {
this.upstream_peers.push(peer);
// remove it from the pool of upstream connections
this.upstream_connections = this.upstream_connections.splice(
this.upstream_connections.indexOf(peer), 1
);
this.trigger('upstream-peer', peer);
}
else {
// request the backlog from this peer
this.downstream_peers.push(peer);
this.trigger('downstream-peer', peer);
}
}
}.bind(this));
swarm.on("peer-disconnected", function (dead_peer) {
console.log("peer-disconnected");
// remove peer from our lists
if (this.upstream_connections.indexOf(dead_peer)) {
this.upstream_connections = this.upstream_connections.splice(
this.upstream_connections.indexOf(dead_peer), 1
);
}
if (this.upstream_peers.indexOf(dead_peer)) {
this.upstream_peers = this.upstream_peers.splice(
this.upstream_peers.indexOf(dead_peer), 1
);
}
if (this.downstream_peers.indexOf(dead_peer)) {
this.downstream_peers = this.downstream_peers.splice(
this.downstream_peers.indexOf(dead_peer), 1
);
}
}.bind(this));
// when we get data in from the swarm
swarm.on("data", function (data, e) {
if (!query_match_data(this.query, data.data))
return;
this.handle_data(data, e);
}.bind(this));
// supply this function with all the peers we know about, and it will
// connect to them if needed, and store them for later if not
}
// handles what the subscription does on receipt of data
handle_data(data, e) {
this.trigger("data", data.data, e);
}
offer_connection(id, query, via) {
// check if this socket's query matches the query for this subscription
if (match_queries(query, this.query)) {
// if we still need more peers
if (this.upstream_peers.length + this.upstream_connections.length < this.maximum_upstream_peers) {
// initiate a connection to this peer
var peer = this.swarm.connect(id, query, via);
this.upstream_connections.push(peer);
}
else {
this.connection_pool.push([id, query, via]);
}
}
};
send(data) {
this.swarm.send(data);
}
}