diff --git a/src/components/requester.js b/src/components/requester.js index 63ae540..0a3ba8b 100644 --- a/src/components/requester.js +++ b/src/components/requester.js @@ -5,6 +5,7 @@ const axon = require('@dashersw/axon'); const debug = require('debug')('axon:req'); const SUBSET_IDENTIFIER = '__subset'; +const TYPE_IDENTIFIER = 'type'; module.exports = class Requester extends Monitorable(Configurable(Component)) { @@ -36,6 +37,24 @@ module.exports = class Requester extends Monitorable(Configurable(Component)) { return possibleSocks; } + filterRespondsToInSocks(type, socks) { + // Find correct nodes + const possibleNodes = Object.values(this.discovery.nodes).filter((node) => { + return undefined != node.advertisement.respondsTo && + node.advertisement.respondsTo.includes(type); + }); + + // Find corresponding sockets + const possibleSocks = possibleNodes.map((node) => { + return socks.find((sock) => { + return sock.remoteAddress == node.address && + sock.remotePort == node.advertisement.port; + }); + }).filter((sock) => sock); + + return possibleSocks; + } + // This function overwrites the axon socket's send() function. // The socketSend() function's `this` is bound to this class in // order to have access to the advertisement of other nodes. @@ -55,8 +74,10 @@ module.exports = class Requester extends Monitorable(Configurable(Component)) { // existence of the SUBSET_IDENTIFIER const data = args[0]; const subset = data[SUBSET_IDENTIFIER]; + const type = data[TYPE_IDENTIFIER]; - const possibleSocks = subset ? this.filterSubsetInSocks(subset, socks) : socks; + const possibleSocks = subset ? this.filterSubsetInSocks(subset, socks) : this.filterRespondsToInSocks(type, socks); + // Enqueue if the correct nodes did not connect yet/does not exist if (!possibleSocks.length) return this.sock.enqueue(args);