Skip to content

Commit 21bf92e

Browse files
committed
fix: Bind to Hubot's Express server
- list commands from worker instances with discovery.help - fix error serliazation.
1 parent 335c2fb commit 21bf92e

12 files changed

Lines changed: 1077 additions & 69 deletions

DiscoveryService.mjs

Lines changed: 199 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,3 @@
1-
// Description:
2-
// Hubot service discovery script for distributed Hubot clusters
3-
//
4-
// Configuration (Discovery Service Server):
5-
// HUBOT_SERVICE_NAME - Service name for registration (default: 'hubot')
6-
// HUBOT_INSTANCE_ID - Unique instance identifier (default: generated as hubot-<Date.now()>)
7-
// HUBOT_HOST - Host address for this instance (default: 'localhost')
8-
// HUBOT_PORT - Port for this instance (default: 8080)
9-
// HUBOT_HEARTBEAT_INTERVAL - Heartbeat interval in ms (default: 15000)
10-
// HUBOT_DISCOVERY_PORT - Port for the discovery server (default: 3100)
11-
// HUBOT_DISCOVERY_STORAGE - Storage directory for event store (default: ../data from current working directory)
12-
// HUBOT_DISCOVERY_TIMEOUT - Heartbeat timeout in ms (default: 30000)
13-
// HUBOT_LB_STRATEGY - Load balancing strategy: round-robin, random, least-connections (default: round-robin)
14-
// HUBOT_ALLOWED_ORIGINS - Comma-separated list of allowed WebSocket origins for security (default: none, allows all)
15-
// HUBOT_DISCOVERY_TOKEN - Shared secret token for authentication (default: none, allows all)
16-
// HUBOT_MAX_CONNECTIONS_PER_IP - Maximum connections per IP address (default: 5)
17-
// HUBOT_RATE_LIMIT_WINDOW_MS - Rate limit window in ms (default: 60000 = 1 minute)
18-
// HUBOT_RATE_LIMIT_MAX_ATTEMPTS - Max connection attempts per window (default: 10)
19-
//
20-
// Commands:
21-
// hubot discover services - Show all registered services
22-
// hubot discovery status - Show service discovery status
23-
// hubot load balancer status - Show load balancer statistics
24-
// hubot lb strategy <strategy> - Change load balancing strategy
25-
// hubot lb reset - Reset round-robin counter
26-
// hubot test routing [message] - Test message routing
27-
//
28-
// Author:
29-
// Joey Guerra
30-
311
import EventStore from './lib/EventStore.mjs'
322
import ServiceRegistry from './lib/ServiceRegistry.mjs'
333
import LoadBalancer from './lib/LoadBalancer.mjs'
@@ -39,6 +9,16 @@ import { TextMessage, TextListener } from 'hubot'
399
const __filename = fileURLToPath(import.meta.url)
4010
const __dirname = dirname(__filename)
4111

12+
function serializeError(error) {
13+
if (error.name && error.message && error.stack) {
14+
return `${error.name}: ${error.message}\n${error.stack}`
15+
}
16+
if (error.message && error.stack) {
17+
return `${error.message}\n${error.stack}`
18+
}
19+
return error
20+
}
21+
4222
export class DiscoveryService {
4323
constructor(robot) {
4424
this.robot = robot
@@ -53,9 +33,22 @@ export class DiscoveryService {
5333
this.storageDir = process.env.HUBOT_DISCOVERY_STORAGE || join(process.cwd(), '../data')
5434
this.heartbeatTimeoutMs = parseInt(process.env.HUBOT_DISCOVERY_TIMEOUT || 30000)
5535

36+
// Parse WebSocket path from HUBOT_DISCOVERY_URL if set
37+
this.discoveryWsPath = null
38+
if (process.env.HUBOT_DISCOVERY_URL) {
39+
try {
40+
const url = new URL(process.env.HUBOT_DISCOVERY_URL)
41+
this.discoveryWsPath = url.pathname || '/'
42+
} catch (error) {
43+
this.robot?.logger?.warn(`Failed to parse HUBOT_DISCOVERY_URL: ${error.message}`)
44+
}
45+
}
46+
5647
// State - DiscoveryService is always a server
5748
this.registry = null
5849
this.wss = null
50+
this.upgradeHandler = null // Store reference to upgrade handler for cleanup
51+
this.started = false
5952
this.isRegistered = false
6053

6154
// Load balancing state
@@ -76,17 +69,23 @@ export class DiscoveryService {
7669

7770
async start() {
7871
try {
72+
if (this.started) {
73+
return
74+
}
75+
this.started = true
76+
7977
await this.startDiscoveryServer()
80-
78+
8179
// Start periodic cleanup of pending responses
8280
this.cleanupTimer = setInterval(() => {
8381
this.cleanupPendingResponses()
8482
}, 30000) // Clean up every 30 seconds
85-
83+
8684
this.registerCommands()
8785
this.robot.logger.info(`Service discovery server initialized for ${this.instanceId}`)
8886
} catch (error) {
89-
this.robot.logger.error('Failed to initialize service discovery:', error)
87+
this.started = false
88+
this.robot.logger.error(`Failed to initialize service discovery: ${serializeError(error)}`)
9089
}
9190
}
9291

@@ -111,7 +110,7 @@ export class DiscoveryService {
111110
? process.env.HUBOT_ALLOWED_ORIGINS.split(',').map(o => o.trim())
112111
: null // null means no origin validation (backward compatible but insecure)
113112

114-
// Log security configuration
113+
// Log security configuration
115114
if (!allowedOrigins) {
116115
this.robot.logger.warn('⚠️ Origin validation is DISABLED. Set HUBOT_ALLOWED_ORIGINS for security.')
117116
}
@@ -123,8 +122,9 @@ export class DiscoveryService {
123122
}
124123

125124
// Start WebSocket server for service discovery
126-
this.wss = new WebSocketServer({
127-
port: this.discoveryPort,
125+
// Prefer binding to robot.server if available (shares port with Express)
126+
// Otherwise create standalone server on separate port
127+
const wsOptions = {
128128
verifyClient: (info, callback) => {
129129
const ip = info.req.socket.remoteAddress
130130

@@ -162,7 +162,46 @@ export class DiscoveryService {
162162
this.trackConnection(ip)
163163
callback(true)
164164
}
165-
})
165+
}
166+
if (this.robot.server) {
167+
// Bind to existing Express server (shares port)
168+
// Use noServer mode and manually handle upgrades
169+
wsOptions.noServer = true
170+
this.wss = new WebSocketServer(wsOptions)
171+
172+
// Manually handle upgrade requests
173+
this.upgradeHandler = (request, socket, head) => {
174+
// Validate this is a WebSocket upgrade request
175+
const upgrade = (request.headers.upgrade || '').toLowerCase()
176+
if (upgrade !== 'websocket') {
177+
this.robot.logger.debug(`Rejected non-WebSocket upgrade request: ${request.url}`)
178+
socket.destroy()
179+
return
180+
}
181+
182+
// Validate path if configured
183+
if (this.discoveryWsPath && request.url !== this.discoveryWsPath) {
184+
this.robot.logger.warn(`Rejected WebSocket upgrade on invalid path: ${request.url} (expected: ${this.discoveryWsPath})`)
185+
socket.destroy()
186+
return
187+
}
188+
189+
this.robot.logger.debug(`WebSocket upgrade request: ${request.url}`)
190+
this.wss.handleUpgrade(request, socket, head, (ws) => {
191+
this.wss.emit('connection', ws, request)
192+
})
193+
}
194+
this.robot.server.on('upgrade', this.upgradeHandler)
195+
196+
this.robot.logger.info(`🔍 Service discovery WebSocket server attached to Express on port ${this.port}`)
197+
this.robot.logger.info(` Workers should connect to: ws://localhost:${this.port}`)
198+
} else {
199+
// Create standalone WebSocket server on separate port
200+
wsOptions.port = this.discoveryPort
201+
this.wss = new WebSocketServer(wsOptions)
202+
this.robot.logger.info(`🔍 Service discovery server started on separate port ${this.discoveryPort}`)
203+
this.robot.logger.info(` Workers should connect to: ws://localhost:${this.discoveryPort}`)
204+
}
166205

167206
this.wss.on('connection', (ws, req) => {
168207
const workerId = `${req.socket.remoteAddress}:${req.socket.remotePort}`
@@ -217,8 +256,6 @@ export class DiscoveryService {
217256
this.wss.on('error', (error) => {
218257
this.robot.logger.error('Discovery WebSocket server error:', error)
219258
})
220-
221-
this.robot.logger.info(`🔍 Service discovery server started on port ${this.discoveryPort}`)
222259

223260
// Initialize the registry before registering self
224261
await this.registry.initialize()
@@ -281,7 +318,8 @@ export class DiscoveryService {
281318

282319
if (ws && !message.data.isServer) {
283320
this.connectedWorkers.set(message.data.instanceId, ws)
284-
this.robot.logger.debug(`Registered worker instance for load balancing: ${message.data.instanceId}, group: ${message.data.metadata?.group || 'default'}`)
321+
ws.group = message.data.metadata?.group || 'default'
322+
this.robot.logger.debug(`Registered worker instance for load balancing: ${message.data.instanceId}, group: ${ws.group}`)
285323
this.robot.logger.debug(`connectedWorkers now has ${this.connectedWorkers.size} entries`)
286324
} else if (ws && message.data.isServer) {
287325
this.robot.logger.debug(`Server instance registered: ${message.data.instanceId}, NOT added to connectedWorkers`)
@@ -343,6 +381,11 @@ export class DiscoveryService {
343381
// This is a response from a client instance back to the chat provider
344382
return this.handleMessageResponse(message.data)
345383

384+
case 'commands_response':
385+
// This is a response to a get_commands request (handled by temporary listener in help command)
386+
// Just acknowledge receipt without processing
387+
return { success: true, received: true }
388+
346389
default:
347390
throw new Error(`Unknown message type: ${message.type}`)
348391
}
@@ -645,6 +688,28 @@ export class DiscoveryService {
645688
status.push(`Round-Robin Index: ${stats.roundRobinIndex}`)
646689
}
647690

691+
// List all instances
692+
status.push('\n📋 Registered Instances:')
693+
for (const serviceName of Object.keys(allServices)) {
694+
const instances = allServices[serviceName]
695+
if (instances.length === 0) continue
696+
697+
status.push(`\n ${serviceName}:`)
698+
for (const instance of instances) {
699+
const group = instance.metadata?.group || 'default'
700+
const isConnected = this.connectedWorkers.has(instance.instanceId) &&
701+
this.connectedWorkers.get(instance.instanceId)?.readyState === 1
702+
const isHealthy = this.registry.getHealthyInstances(serviceName).some(i => i.instanceId === instance.instanceId)
703+
const status_icon = isConnected ? '✅' : isHealthy ? '⚠️' : '❌'
704+
705+
status.push(` ${status_icon} ${instance.instanceId}`)
706+
status.push(` Group: ${group}, Host: ${instance.host}:${instance.port}`)
707+
if (instance.metadata?.adapter) {
708+
status.push(` Adapter: ${instance.metadata.adapter}`)
709+
}
710+
}
711+
}
712+
648713
return `⚖️ Load Balancer Status:\n${status.join('\n')}`
649714
}
650715
})
@@ -710,6 +775,94 @@ export class DiscoveryService {
710775
}
711776
})
712777

778+
// Help command - shows worker commands from connected instances
779+
this.robot.commands.register({
780+
id: 'discovery.help',
781+
description: 'Show available worker commands',
782+
aliases: ['discovery help', 'help discovery'],
783+
handler: async (ctx) => {
784+
const lines = []
785+
786+
// Request commands from all connected workers
787+
if (this.connectedWorkers.size === 0) {
788+
lines.push('_No worker instances connected yet_')
789+
return lines.join('\n')
790+
}
791+
792+
lines.push('👷 **Worker Commands (by group):**')
793+
lines.push('')
794+
795+
const messageId = `get-commands-${Date.now()}`
796+
const commandsByGroup = new Map()
797+
const responses = []
798+
799+
// Send get_commands request to all workers
800+
for (const [instanceId, ws] of this.connectedWorkers.entries()) {
801+
if (ws.readyState === 1) { // OPEN
802+
ws.send(JSON.stringify({
803+
type: 'get_commands',
804+
messageId: messageId
805+
}))
806+
807+
// Wait for response with timeout
808+
responses.push(
809+
new Promise((resolve) => {
810+
const timeout = setTimeout(() => resolve(null), 2000) // 2 second timeout
811+
812+
const handler = (data) => {
813+
try {
814+
const msg = JSON.parse(data.toString())
815+
if (msg.type === 'commands_response' && msg.messageId === messageId) {
816+
clearTimeout(timeout)
817+
ws.off('message', handler)
818+
resolve({ instanceId, commands: msg.commands, group: ws.group, serviceName: ws.serviceName })
819+
}
820+
} catch (e) {
821+
// Ignore parse errors
822+
}
823+
}
824+
825+
ws.on('message', handler)
826+
})
827+
)
828+
}
829+
}
830+
831+
// Collect all responses
832+
const results = await Promise.all(responses)
833+
834+
// Organize commands by service:group
835+
for (const result of results) {
836+
if (result && result.commands && result.commands.length > 0) {
837+
const groupKey = `${result.serviceName || 'hubot'}:${result.group || 'default'}`
838+
if (!commandsByGroup.has(groupKey)) {
839+
commandsByGroup.set(groupKey, {
840+
serviceName: result.serviceName || 'hubot',
841+
group: result.group || 'default',
842+
commands: result.commands
843+
})
844+
}
845+
}
846+
}
847+
848+
// Display commands by group
849+
if (commandsByGroup.size > 0) {
850+
for (const [groupKey, groupEntry] of commandsByGroup.entries()) {
851+
lines.push(`**${groupEntry.serviceName} - ${groupEntry.group}**`)
852+
for (const cmd of groupEntry.commands) {
853+
const aliases = cmd.aliases?.length > 0 ? ` (${cmd.aliases.join(', ')})` : ''
854+
lines.push(` • **${cmd.id}**${aliases} - ${cmd.description}`)
855+
}
856+
lines.push('')
857+
}
858+
} else {
859+
lines.push('_No commands available from connected workers_')
860+
}
861+
862+
return lines.join('\n')
863+
}
864+
})
865+
713866
// Message routing middleware - not a command, but part of the routing logic
714867
this.robot.receiveMiddleware(async context => {
715868
if (!this.robot.listeners.some(listener => {
@@ -732,6 +885,12 @@ export class DiscoveryService {
732885
this.cleanupTimer = null
733886
}
734887

888+
// Remove upgrade handler if attached to Express
889+
if (this.upgradeHandler && this.robot?.server) {
890+
this.robot.server.removeListener('upgrade', this.upgradeHandler)
891+
this.upgradeHandler = null
892+
}
893+
735894
// Close discovery server with timeout
736895
if (this.wss) {
737896
try {

0 commit comments

Comments
 (0)