Skip to content

Commit 5de637f

Browse files
refactor(events): Update event types and related tests to remove deprecated types
1 parent 3c48a08 commit 5de637f

File tree

8 files changed

+319
-66
lines changed

8 files changed

+319
-66
lines changed
Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
---
2+
layout: page
3+
title: Events Hub Guide
4+
show_sidebar: false
5+
---
6+
7+
# Events Hub Guide
8+
9+
The **Events Hub** is a real-time WebSocket service that enables DevOps engineers and SREs to subscribe to critical infrastructure events. Instead of polling REST APIs, you can receive instant updates about host health, VM lifecycle changes, and system alerts.
10+
11+
## Overview
12+
13+
- **Protocol**: WebSocket (RFC 6455)
14+
- **Endpoint**: `/v1/ws/subscribe`
15+
- **Auth**: Bearer Token or API Key
16+
- **Latency**: Sub-millisecond event propogation
17+
- **Connection Limit**: One active connection per IP address (enforced).
18+
19+
## Quick Start
20+
21+
Connect to the global channel (default) to start receiving events immediately.
22+
23+
### 1. Get your Token
24+
Obtain a Bearer token or API Key from your administrator.
25+
26+
### 2. Connect
27+
```bash
28+
wscat -H "Authorization: Bearer <YOUR_TOKEN>" -c "wss://<YOUR_HOST>/v1/ws/subscribe?event_types=pdfm,health"
29+
```
30+
31+
### 3. Verify
32+
You will receive a confirmation message:
33+
```json
34+
{
35+
"type": "global",
36+
"message": "WebSocket connection established subscribed to global by default",
37+
"body": {
38+
"subscriptions": ["pdfm", "health", "global"]
39+
}
40+
}
41+
```
42+
43+
## Core Concepts
44+
45+
### Event Channels
46+
Channels are topics you can subscribe to. You can specify multiple channels during connection.
47+
48+
| Channel | Description | Use Case |
49+
| :--- | :--- | :--- |
50+
| **`global`** | System-wide broadcasts. **Auto-subscribed**; cannot be removed. | Critical maintenance alerts, shutdowns. |
51+
| **`pdfm`** | Host & VM events. | Monitoring VM starts, stops, and host resource changes. |
52+
| **`orchestrator`** | Cluster-level orchestration events. | tracking task assignments, cluster scaling. |
53+
| **`health`** | Ping/Pong heartbeat channel. | Implementing liveness checks (see [Client Responsibilities](#client-responsibilities)). |
54+
| **`system`** | System & Metadata info. | Retrieving your unique `client_id`. |
55+
56+
### Connection Rules
57+
- **Single Connection per IP**: To prevent resource exhaustion, only one active WebSocket connection is allowed per client IP address. New connection attempts from the same IP will be rejected with `409 Conflict`.
58+
59+
## Real-world Use Cases
60+
61+
### 1. Monitoring VM Lifecycle
62+
Subscribe to `pdfm` to track VM states in real-time.
63+
- **Event**: VM Started, VM Stopped, VM Suspended.
64+
- **Action**: Update dashboard status, trigger billing events.
65+
66+
### 2. Cluster Health Monitoring
67+
Subscribe to `orchestrator` to detect node failures or capacity issues instantly.
68+
69+
### 3. Connection Liveness Checks
70+
Subscribe to `health` to implement a heartbeat. If the server stops responding to your pings, you can trigger a reconnection alert.
71+
72+
## Client Responsibilities
73+
74+
### 1. Heartbeat (Ping/Pong)
75+
The connection is bidirectional. To ensure the link is alive, your client **must** subscribe to the `health` channel and send periodic pings.
76+
77+
**Client Request:**
78+
```json
79+
{
80+
"type": "health",
81+
"id": "ping-1",
82+
"message": "ping"
83+
}
84+
```
85+
86+
**Server Response:**
87+
```json
88+
{
89+
"type": "health",
90+
"ref_id": "ping-1",
91+
"message": "pong",
92+
"body": {
93+
"server_time": "2023-10-27T10:00:00Z"
94+
}
95+
}
96+
```
97+
98+
### 2. Client ID Lookup
99+
After connecting, you might need your unique `client_id` for unsubscribe operations. Send a message to the `system` channel.
100+
101+
**Client Request:**
102+
```json
103+
{
104+
"type": "system",
105+
"message": "client-id"
106+
}
107+
```
108+
109+
**Server Response:**
110+
```json
111+
{
112+
"type": "system",
113+
"message": "client-id",
114+
"body": {
115+
"client-id": "550e8400-e29b-41d4-a716-446655440000"
116+
}
117+
}
118+
```
119+
120+
## API Reference
121+
122+
### Subscribe
123+
**GET** `/v1/ws/subscribe`
124+
125+
**Query Parameters:**
126+
- `event_types` (string, optional): Comma-separated list of channels (e.g., `pdfm,health,orchestrator`). `global` is always added automatically.
127+
128+
**Headers:**
129+
- `Authorization`: `Bearer <token>`
130+
131+
### Unsubscribe
132+
**POST** `/v1/ws/unsubscribe`
133+
134+
Allows you to remove specific channel subscriptions without dropping the connection. **Note**: You cannot unsubscribe from `global`.
135+
136+
**Body:**
137+
```json
138+
{
139+
"client_id": "<your-client-id>",
140+
"event_types": ["pdfm"]
141+
}
142+
```
143+
144+
## Code Examples
145+
146+
### Node.js (`ws`)
147+
148+
```javascript
149+
const WebSocket = require('ws');
150+
151+
const ws = new WebSocket('wss://api.example.com/v1/ws/subscribe?event_types=pdfm,health', {
152+
headers: { 'Authorization': 'Bearer <YOUR_TOKEN>' }
153+
});
154+
155+
ws.on('open', () => {
156+
console.log('Connected');
157+
// Send Heartbeat every 30s
158+
setInterval(() => {
159+
ws.send(JSON.stringify({ type: 'health', message: 'ping', id: Date.now().toString() }));
160+
}, 30000);
161+
});
162+
163+
ws.on('message', (data) => {
164+
const event = JSON.parse(data);
165+
if (event.type === 'health' && event.message === 'pong') {
166+
console.log('Heartbeat OK');
167+
} else {
168+
console.log('Event received:', event);
169+
}
170+
});
171+
```
172+
173+
### Python (`websockets`)
174+
175+
```python
176+
import asyncio
177+
import websockets
178+
import json
179+
import time
180+
181+
async def listen():
182+
uri = "wss://api.example.com/v1/ws/subscribe?event_types=pdfm,health"
183+
headers = {"Authorization": "Bearer <YOUR_TOKEN>"}
184+
185+
async with websockets.connect(uri, extra_headers=headers) as websocket:
186+
print("Connected")
187+
188+
while True:
189+
# Send Ping
190+
await websocket.send(json.dumps({
191+
"type": "health",
192+
"message": "ping",
193+
"id": str(time.time())
194+
}))
195+
196+
# Wait for messages
197+
message = await websocket.recv()
198+
event = json.loads(message)
199+
print(f"Received: {event}")
200+
201+
if __name__ == "__main__":
202+
asyncio.run(listen())
203+
```
204+
205+
### Go (`gorilla/websocket`)
206+
207+
```go
208+
package main
209+
210+
import (
211+
"log"
212+
"net/http"
213+
"time"
214+
215+
"github.com/gorilla/websocket"
216+
)
217+
218+
func main() {
219+
header := http.Header{}
220+
header.Add("Authorization", "Bearer <YOUR_TOKEN>")
221+
222+
c, _, err := websocket.DefaultDialer.Dial("wss://api.example.com/v1/ws/subscribe?event_types=pdfm,health", header)
223+
if err != nil {
224+
log.Fatal("dial:", err)
225+
}
226+
defer c.Close()
227+
228+
// Heartbeat routine
229+
go func() {
230+
for {
231+
time.Sleep(30 * time.Second)
232+
msg := map[string]string{"type": "health", "message": "ping"}
233+
c.WriteJSON(msg)
234+
}
235+
}()
236+
237+
for {
238+
_, message, err := c.ReadMessage()
239+
if err != nil {
240+
log.Println("read:", err)
241+
return
242+
}
243+
log.Printf("recv: %s", message)
244+
}
245+
}
246+
```
247+
248+
## Troubleshooting & FAQ
249+
250+
**Q: I get a `409 Conflict` error when connecting.**
251+
A: You likely have another active connection from the same IP. Close the existing connection or check for "zombie" processes.
252+
253+
**Q: Can I unsubscribe from `global`?**
254+
A: No, the `global` channel is mandatory for system-wide alerts.
255+
256+
**Q: I'm not receiving pongs.**
257+
A: Ensure you have subscribed to the `health` channel in your connection URL (`?event_types=health`).
258+
259+
**Q: My connection drops after 60 seconds.**
260+
A: Check if your client is sending heartbeats. Some load balancers drop idle WebSocket connections.

src/constants/event_emitter.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,8 @@ type EventType string
88
// Clients subscribe to these types and receive messages of that type
99
const (
1010
EventTypeGlobal EventType = "global" // Broadcasts to all subscribers
11-
EventTypeSystem EventType = "system" // System-level events
12-
EventTypeVM EventType = "vm" // Virtual machine events
13-
EventTypeHost EventType = "host" // Host-level events
1411
EventTypePDFM EventType = "pdfm" // PDFM-specific events
12+
EventTypeSystem EventType = "system" // System-level events
1513
EventTypeHealth EventType = "health" // Health check events
1614
EventTypeOrchestrator EventType = "orchestrator" // Orchestrator events
1715
)
@@ -23,7 +21,7 @@ func (e EventType) String() string {
2321
// IsValid checks if the EventType is valid
2422
func (e EventType) IsValid() bool {
2523
switch e {
26-
case EventTypeGlobal, EventTypeSystem, EventTypeVM, EventTypeHost, EventTypePDFM, EventTypeHealth, EventTypeOrchestrator:
24+
case EventTypeGlobal, EventTypePDFM, EventTypeSystem, EventTypeHealth, EventTypeOrchestrator:
2725
return true
2826
default:
2927
return false
@@ -34,9 +32,6 @@ func (e EventType) IsValid() bool {
3432
func GetAllEventTypes() []EventType {
3533
return []EventType{
3634
EventTypeGlobal,
37-
EventTypeSystem,
38-
EventTypeVM,
39-
EventTypeHost,
4035
EventTypePDFM,
4136
EventTypeHealth,
4237
EventTypeOrchestrator,

src/controllers/events.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func registerEventHandlers(ctx basecontext.ApiContext, version string) {
3333
// @Description This endpoint upgrades the HTTP connection to WebSocket and subscribes to event notifications. Authentication is required via Authorization header (Bearer token) or X-Api-Key header.
3434
// @Tags Events
3535
// @Produce json
36-
// @Param event_types query string false "Comma-separated event types to subscribe to (e.g., vm,host,system). Valid types: global, system, vm, host, pdfm. If omitted, subscribes to global events only."
36+
// @Param event_types query string false "Comma-separated event types to subscribe to (e.g., global,pdfm,system). Valid types: global,pdfm and orchestrator. If omitted, subscribes to global events only."
3737
// @Success 101 {string} string "Switching Protocols"
3838
// @Failure 400 {object} models.ApiErrorResponse
3939
// @Failure 401 {object} models.ApiErrorResponse

src/models/event_message_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func TestNewEventMessage_NilBody(t *testing.T) {
3535
}
3636

3737
func TestNewEventMessage_EmptyMessage(t *testing.T) {
38-
msg := NewEventMessage(constants.EventTypeVM, "", map[string]interface{}{})
38+
msg := NewEventMessage(constants.EventTypeHealth, "", map[string]interface{}{})
3939
assert.NotNil(t, msg)
4040
assert.Empty(t, msg.Message)
4141
assert.NotEmpty(t, msg.ID)

src/serviceprovider/eventEmitter/helpers_test.go

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ import (
1010
)
1111

1212
func TestStringToEventTypes_ValidTypes(t *testing.T) {
13-
result, err := stringToEventTypes([]string{"vm", "host", "global"})
13+
result, err := stringToEventTypes([]string{"pdfm", "system", "global"})
1414
require.NoError(t, err)
1515
expected := []constants.EventType{
16-
constants.EventTypeVM,
17-
constants.EventTypeHost,
16+
constants.EventTypePDFM,
17+
constants.EventTypeSystem,
1818
constants.EventTypeGlobal,
1919
}
2020
assert.Equal(t, expected, result)
@@ -27,41 +27,39 @@ func TestStringToEventTypes_EmptySlice(t *testing.T) {
2727
}
2828

2929
func TestStringToEventTypes_InvalidTypes(t *testing.T) {
30-
result, err := stringToEventTypes([]string{"invalid", "vm", "fake"})
30+
result, err := stringToEventTypes([]string{"invalid", "pdfm", "fake"})
3131
// Should return error but still include valid types
3232
assert.Error(t, err)
3333
assert.Len(t, result, 1)
34-
assert.Equal(t, constants.EventTypeVM, result[0])
34+
assert.Equal(t, constants.EventTypePDFM, result[0])
3535
}
3636

3737
func TestStringToEventTypes_MixedCase(t *testing.T) {
38-
result, err := stringToEventTypes([]string{"VM", "Host", "GLOBAL"})
38+
result, err := stringToEventTypes([]string{"PDFM", "System", "GLOBAL"})
3939
require.NoError(t, err)
4040
expected := []constants.EventType{
41-
constants.EventTypeVM,
42-
constants.EventTypeHost,
41+
constants.EventTypePDFM,
42+
constants.EventTypeSystem,
4343
constants.EventTypeGlobal,
4444
}
4545
assert.Equal(t, expected, result)
4646
}
4747

4848
func TestStringToEventTypes_ExtraWhitespace(t *testing.T) {
49-
result, err := stringToEventTypes([]string{" vm ", " host ", " global "})
49+
result, err := stringToEventTypes([]string{" pdfm ", " system ", " global "})
5050
require.NoError(t, err)
5151
expected := []constants.EventType{
52-
constants.EventTypeVM,
53-
constants.EventTypeHost,
52+
constants.EventTypePDFM,
53+
constants.EventTypeSystem,
5454
constants.EventTypeGlobal,
5555
}
5656
assert.Equal(t, expected, result)
5757
}
5858

5959
func TestStringToEventTypes_AllValidTypes(t *testing.T) {
60-
result, err := stringToEventTypes([]string{"vm", "host", "global", "system", "pdfm"})
60+
result, err := stringToEventTypes([]string{"global", "system", "pdfm"})
6161
require.NoError(t, err)
62-
assert.Len(t, result, 5)
63-
assert.Contains(t, result, constants.EventTypeVM)
64-
assert.Contains(t, result, constants.EventTypeHost)
62+
assert.Len(t, result, 3)
6563
assert.Contains(t, result, constants.EventTypeGlobal)
6664
assert.Contains(t, result, constants.EventTypeSystem)
6765
assert.Contains(t, result, constants.EventTypePDFM)

0 commit comments

Comments
 (0)