Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 31 additions & 22 deletions src/constants/event_emitter.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,40 @@
package constants

// Event Emitter Configuration
const (
// WebSocket ping/pong configuration
EVENT_EMITTER_PING_INTERVAL_SECONDS_ENV_VAR = "EVENT_EMITTER_PING_INTERVAL_SECONDS"
EVENT_EMITTER_PONG_TIMEOUT_SECONDS_ENV_VAR = "EVENT_EMITTER_PONG_TIMEOUT_SECONDS"

// Default values
DEFAULT_EVENT_EMITTER_PING_INTERVAL_SECONDS = 30
DEFAULT_EVENT_EMITTER_PONG_TIMEOUT_SECONDS = 10
)
// EventType is a type-safe wrapper for event types
// This prevents arbitrary strings from being used as event types
type EventType string

// Event Message Types - predefined types for event routing
// Clients subscribe to these types and receive messages of that type
const (
EVENT_TYPE_GLOBAL = "global" // Broadcasts to all subscribers
EVENT_TYPE_SYSTEM = "system" // System-level events
EVENT_TYPE_VM = "vm" // Virtual machine events
EVENT_TYPE_HOST = "host" // Host-level events
EVENT_TYPE_PDFM = "pdfm" // PDFM-specific events
EventTypeGlobal EventType = "global" // Broadcasts to all subscribers
EventTypeSystem EventType = "system" // System-level events
EventTypeVM EventType = "vm" // Virtual machine events
EventTypeHost EventType = "host" // Host-level events
EventTypePDFM EventType = "pdfm" // PDFM-specific events
)

// AllEventTypes returns all valid event types for subscription
var AllEventTypes = []string{
EVENT_TYPE_GLOBAL,
EVENT_TYPE_SYSTEM,
EVENT_TYPE_VM,
EVENT_TYPE_HOST,
EVENT_TYPE_PDFM,
func (e EventType) String() string {
return string(e)
}

// IsValid checks if the EventType is valid
func (e EventType) IsValid() bool {
switch e {
case EventTypeGlobal, EventTypeSystem, EventTypeVM, EventTypeHost, EventTypePDFM:
return true
default:
return false
}
}

// GetAllEventTypes returns all valid EventType values
func GetAllEventTypes() []EventType {
return []EventType{
EventTypeGlobal,
EventTypeSystem,
EventTypeVM,
EventTypeHost,
EventTypePDFM,
}
}
100 changes: 100 additions & 0 deletions src/controllers/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package controllers

import (
"net/http"

"github.com/Parallels/prl-devops-service/basecontext"
"github.com/Parallels/prl-devops-service/models"
"github.com/Parallels/prl-devops-service/restapi"
eventemitter "github.com/Parallels/prl-devops-service/serviceprovider/eventEmitter"
)

func registerEventHandlers(ctx basecontext.ApiContext, version string) {

restapi.NewController().
WithMethod(restapi.GET).
WithVersion(version).
WithPath("/ws/subscribe").
WithHandler(WebSocketSubscribeHandler()).
Register()

restapi.NewController().
WithMethod(restapi.POST).
WithVersion(version).
WithPath("/ws/unsubscribe").
WithHandler(WebSocketUnsubscribeHandler()).
Register()
}

// @Summary Subscribe to event notifications via WebSocket
// @Description This endpoint upgrades the HTTP connection to WebSocket and subscribes to event notifications. Supports both JWT Bearer tokens and API Keys for authentication.
// @Tags Events
// @Produce json
// @Param event_types query string false "Comma-separated event types to subscribe to (e.g., vm,host,system). Valid types: global, system, vm, host, pdfm. If omitted, subscribes to global events only."
// @Param token query string false "JWT token for authentication (alternative to Authorization header)"
// @Param api_key query string false "API key for authentication (alternative to X-API-KEY header)"
// @Success 101 {string} string "Switching Protocols"
// @Failure 400 {object} models.ApiErrorResponse
// @Failure 401 {object} models.ApiErrorResponse
// @Failure 409 {object} models.ApiErrorResponse
// @Failure 503 {object} models.ApiErrorResponse
// @Security ApiKeyAuth
// @Security BearerAuth
// @Router /v1/ws/subscribe [get]
func WebSocketSubscribeHandler() restapi.ControllerHandler {
return func(w http.ResponseWriter, r *http.Request) {
ctx := GetBaseContext(r)

emitter := eventemitter.Get()
if emitter == nil || !emitter.IsRunning() {
ctx.LogErrorf("EventEmitter service is not available")
ReturnApiError(ctx, w, models.ApiErrorResponse{
Message: "EventEmitter service is not available",
Code: http.StatusServiceUnavailable,
})
return
}
if err := eventemitter.HandleWebSocketConnection(w, r, ctx); err != nil {
ReturnApiError(ctx, w, *err)
return
}
}
}

// @Summary Unsubscribe from specific event types
// @Description Unsubscribe an active WebSocket client from specific event types without disconnecting. The client must belong to the authenticated user.
// @Tags Events
// @Accept json
// @Produce json
// @Param body body models.UnsubscribeRequest true "Unsubscribe request with client ID and event types"
// @Success 200 {string} string "OK"
// @Failure 400 {object} models.ApiErrorResponse
// @Failure 401 {object} models.ApiErrorResponse
// @Failure 404 {object} models.ApiErrorResponse
// @Failure 503 {object} models.ApiErrorResponse
// @Security ApiKeyAuth
// @Security BearerAuth
// @Router /v1/ws/unsubscribe [post]
func WebSocketUnsubscribeHandler() restapi.ControllerHandler {
return func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
ctx := GetBaseContext(r)
defer Recover(ctx, r, w)

emitter := eventemitter.Get()
if emitter == nil || !emitter.IsRunning() {
ctx.LogErrorf("EventEmitter service is not available")
ReturnApiError(ctx, w, models.ApiErrorResponse{
Message: "EventEmitter service is not available",
Code: http.StatusServiceUnavailable,
})
return
}

if err := eventemitter.HandleUnsubscribe(r, ctx); err != nil {
ReturnApiError(ctx, w, *err)
return
}
ReturnApiCommonResponse(w)
}
}
1 change: 1 addition & 0 deletions src/controllers/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func RegisterV1Handlers(ctx basecontext.ApiContext) error {
registerOrchestratorHostsHandlers(ctx, version)
registerPerformanceHandlers(ctx, version)
registerReverseProxyHandlers(ctx, version)
registerEventHandlers(ctx, version)

return nil
}
6 changes: 6 additions & 0 deletions src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/Parallels/prl-devops-service/constants"
"github.com/Parallels/prl-devops-service/data"
"github.com/Parallels/prl-devops-service/serviceprovider"
eventemitter "github.com/Parallels/prl-devops-service/serviceprovider/eventEmitter"
"github.com/Parallels/prl-devops-service/serviceprovider/system"
"github.com/Parallels/prl-devops-service/telemetry"

Expand Down Expand Up @@ -137,6 +138,11 @@ func main() {
}

func cleanup(ctx basecontext.ApiContext, db *data.JsonDatabase) {
if emitter := eventemitter.Get(); emitter != nil && emitter.IsRunning() {
ctx.LogInfof("[Core] Shutting down EventEmitter service")
emitter.Shutdown()
}

if db != nil {
ctx.LogInfof("[Core] Saving database")
if err := db.SaveNow(ctx); err != nil {
Expand Down
57 changes: 35 additions & 22 deletions src/models/event_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,23 @@ package models
import (
"time"

"github.com/Parallels/prl-devops-service/constants"
"github.com/Parallels/prl-devops-service/helpers"
)

// EventMessage represents an event that is sent to clients
type EventMessage struct {
ID string `json:"id"` // Unique identifier for the event
Type string `json:"type"` // Type/routing key (e.g., pdfm, hi, bye, vm, host, system, global)
Timestamp time.Time `json:"timestamp"` // When the event occurred
Message string `json:"message"` // Human-readable message
Body map[string]interface{} `json:"body,omitempty"` // Event-specific data (internal application data)
ClientID string `json:"client_id,omitempty"` // Optional: Target specific client
ID string `json:"id"` // Unique identifier for the event
Type constants.EventType `json:"event_type"` // Type/routing key (e.g., pdfm, vm, host, system, global)
Timestamp time.Time `json:"timestamp"` // When the event occurred
Message string `json:"message"` // Human-readable message
Body interface{} `json:"body,omitempty"` // Event-specific data (internal application data)
ClientID string `json:"client_id,omitempty"` // Optional: Target specific client
}

// NewEventMessage creates a new event message with ID and timestamp
func NewEventMessage(eventType, message string, body map[string]interface{}) *EventMessage {
// Uses type-safe EventType to prevent arbitrary strings
func NewEventMessage(eventType constants.EventType, message string, body interface{}) *EventMessage {
return &EventMessage{
ID: helpers.GenerateId(),
Type: eventType,
Expand All @@ -29,23 +31,34 @@ func NewEventMessage(eventType, message string, body map[string]interface{}) *Ev

// EventClientInfo represents information about a connected WebSocket client
type EventClientInfo struct {
ID string `json:"id"` // Unique client identifier
UserID string `json:"user_id"` // User ID from authentication
Username string `json:"username"` // Username from authentication
ConnectedAt time.Time `json:"connected_at"` // Connection timestamp
LastPingAt time.Time `json:"last_ping_at"` // Last ping sent
LastPongAt time.Time `json:"last_pong_at"` // Last pong received
Subscriptions []string `json:"subscriptions"` // List of type subscriptions
IsAlive bool `json:"is_alive"` // Connection health status
ID string `json:"id"` // Unique client identifier
UserID string `json:"user_id"` // User ID from authentication
Username string `json:"username"` // Username from authentication
ConnectedAt time.Time `json:"connected_at"` // Connection timestamp
LastPingAt time.Time `json:"last_ping_at"` // Last ping sent
LastPongAt time.Time `json:"last_pong_at"` // Last pong received
Subscriptions []constants.EventType `json:"event_subscriptions"` // List of type subscriptions
IsAlive bool `json:"is_alive"` // Connection health status
}

// EventEmitterStats represents statistics about the event emitter
type EventEmitterStats struct {
TotalClients int `json:"total_clients"`
TotalSubscriptions int `json:"total_subscriptions"`
TypeStats map[string]int `json:"type_stats"` // Number of subscribers per type
Clients []EventClientInfo `json:"clients,omitempty"` // List of connected clients (admin only)
MessagesSent int64 `json:"messages_sent"` // Total messages sent since start
StartTime time.Time `json:"start_time"` // When the emitter started
Uptime string `json:"uptime"` // Human-readable uptime
TotalClients int `json:"total_clients"`
TotalSubscriptions int `json:"total_subscriptions"`
TypeStats map[constants.EventType]int `json:"type_stats"` // Number of subscribers per type
Clients []EventClientInfo `json:"clients,omitempty"` // List of connected clients (admin only)
MessagesSent int64 `json:"messages_sent"` // Total messages sent since start
StartTime time.Time `json:"start_time"` // When the emitter started
Uptime string `json:"uptime"` // Human-readable uptime
}

type UnsubscribeRequest struct {
ClientID string `json:"client_id"` // Unique client identifier
UserID string `json:"user_id"` // User ID from authentication
EventTypes []string `json:"event_types,omitempty"` // List of event types to unsubscribe from
}

type WebSocketConnectResponse struct {
ClientID string `json:"client_id"` // Unique client identifier
Subscriptions []constants.EventType `json:"subscriptions"` // List of event types the client is subscribed to
}
21 changes: 10 additions & 11 deletions src/models/event_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,22 @@ import (
"testing"
"time"

"github.com/Parallels/prl-devops-service/constants"
"github.com/stretchr/testify/assert"
)

func TestNewEventMessage(t *testing.T) {
eventType := "pdfm"
message := "Test message"
body := map[string]interface{}{
"key1": "value1",
"key2": 123,
}

msg := NewEventMessage(eventType, message, body)
msg := NewEventMessage(constants.EventTypePDFM, message, body)

assert.NotNil(t, msg)
assert.NotEmpty(t, msg.ID, "ID should be generated")
assert.Equal(t, eventType, msg.Type)
assert.Equal(t, constants.EventTypePDFM, msg.Type)
assert.Equal(t, message, msg.Message)
assert.Equal(t, body, msg.Body)
assert.Empty(t, msg.ClientID, "ClientID should be empty by default")
Expand All @@ -31,12 +31,11 @@ func TestNewEventMessage_NilBody(t *testing.T) {

assert.NotNil(t, msg)
assert.Nil(t, msg.Body)
assert.Equal(t, "system", msg.Type)
assert.Equal(t, constants.EventTypeSystem, msg.Type)
}

func TestNewEventMessage_EmptyMessage(t *testing.T) {
msg := NewEventMessage("vm", "", map[string]interface{}{})

msg := NewEventMessage(constants.EventTypeVM, "", map[string]interface{}{})
assert.NotNil(t, msg)
assert.Empty(t, msg.Message)
assert.NotEmpty(t, msg.ID)
Expand Down Expand Up @@ -66,7 +65,7 @@ func TestEventClientInfo_Fields(t *testing.T) {
ConnectedAt: now,
LastPingAt: now,
LastPongAt: now,
Subscriptions: []string{"pdfm", "global"},
Subscriptions: []constants.EventType{constants.EventTypePDFM, constants.EventTypeGlobal},
IsAlive: true,
}

Expand All @@ -84,9 +83,9 @@ func TestEventEmitterStats_Fields(t *testing.T) {
stats := EventEmitterStats{
TotalClients: 5,
TotalSubscriptions: 12,
TypeStats: map[string]int{
"pdfm": 2,
"global": 5,
TypeStats: map[constants.EventType]int{
constants.EventTypePDFM: 2,
constants.EventTypeGlobal: 5,
},
Clients: []EventClientInfo{},
MessagesSent: 100,
Expand All @@ -96,7 +95,7 @@ func TestEventEmitterStats_Fields(t *testing.T) {

assert.Equal(t, 5, stats.TotalClients)
assert.Equal(t, 12, stats.TotalSubscriptions)
assert.Equal(t, 2, stats.TypeStats["pdfm"])
assert.Equal(t, 2, stats.TypeStats[constants.EventTypePDFM])
assert.Equal(t, int64(100), stats.MessagesSent)
assert.Equal(t, now, stats.StartTime)
}
Loading
Loading