Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions src/controllers/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net/http"

"github.com/Parallels/prl-devops-service/basecontext"
"github.com/Parallels/prl-devops-service/constants"
"github.com/Parallels/prl-devops-service/models"
"github.com/Parallels/prl-devops-service/restapi"
eventemitter "github.com/Parallels/prl-devops-service/serviceprovider/eventEmitter"
Expand All @@ -15,24 +16,24 @@ func registerEventHandlers(ctx basecontext.ApiContext, version string) {
WithMethod(restapi.GET).
WithVersion(version).
WithPath("/ws/subscribe").
WithRequiredClaim(constants.READ_ONLY_CLAIM).
WithHandler(WebSocketSubscribeHandler()).
Register()

restapi.NewController().
WithMethod(restapi.POST).
WithVersion(version).
WithPath("/ws/unsubscribe").
WithRequiredClaim(constants.READ_ONLY_CLAIM).
WithHandler(WebSocketUnsubscribeHandler()).
Register()
}

// @Summary Subscribe to event notifications via WebSocket
// @Description This endpoint upgrades the HTTP connection to WebSocket and subscribes to event notifications. Supports both JWT Bearer tokens and API Keys for authentication.
// @Description This endpoint upgrades the HTTP connection to WebSocket and subscribes to event notifications. Authentication is required via Authorization header (Bearer token) or X-Api-Key header.
// @Tags Events
// @Produce json
// @Param event_types query string false "Comma-separated event types to subscribe to (e.g., vm,host,system). Valid types: global, system, vm, host, pdfm. If omitted, subscribes to global events only."
// @Param token query string false "JWT token for authentication (alternative to Authorization header)"
// @Param api_key query string false "API key for authentication (alternative to X-API-KEY header)"
// @Success 101 {string} string "Switching Protocols"
// @Failure 400 {object} models.ApiErrorResponse
// @Failure 401 {object} models.ApiErrorResponse
Expand Down Expand Up @@ -90,11 +91,6 @@ func WebSocketUnsubscribeHandler() restapi.ControllerHandler {
})
return
}

if err := eventemitter.HandleUnsubscribe(r, ctx); err != nil {
ReturnApiError(ctx, w, *err)
return
}
ReturnApiCommonResponse(w)
eventemitter.HandleUnsubscribe(w, r, ctx)
}
}
14 changes: 14 additions & 0 deletions src/models/event_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,17 @@ type WebSocketConnectResponse struct {
ClientID string `json:"client_id"` // Unique client identifier
Subscriptions []constants.EventType `json:"subscriptions"` // List of event types the client is subscribed to
}

type VmStateChange struct {
PreviousState string `json:"previous_state"`
CurrentState string `json:"current_state"`
VmID string `json:"vm_id"`
}

type VmAdded struct {
VmID string `json:"vm_id"`
NewVm ParallelsVM `json:"new_vm"`
}
type VmRemoved struct {
VmID string `json:"vm_id"`
}
6 changes: 4 additions & 2 deletions src/serviceprovider/eventEmitter/hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,12 @@ func TestHub_UnsubscribeClientFromTypes_ClientNotFound(t *testing.T) {
subscriptions: make(map[constants.EventType]map[string]bool),
}

result := hub.unsubscribeClientFromTypes("nonexistent", "user1", []constants.EventType{
result, err := hub.unsubscribeClientFromTypes("nonexistent", "user1", []constants.EventType{
constants.EventTypeVM,
})

assert.Empty(t, result)
assert.Error(t, err)
}

func TestHub_UnsubscribeClientFromTypes_NotSubscribed(t *testing.T) {
Expand All @@ -122,11 +123,12 @@ func TestHub_UnsubscribeClientFromTypes_NotSubscribed(t *testing.T) {
hub.clients["client1"] = &Client{ID: "client1"}
hub.subscriptions[constants.EventTypeVM] = map[string]bool{}

result := hub.unsubscribeClientFromTypes("client1", "user1", []constants.EventType{
result, err := hub.unsubscribeClientFromTypes("client1", "user1", []constants.EventType{
constants.EventTypeHost, // Not subscribed to this
})

assert.Empty(t, result)
assert.Error(t, err)
}

func TestEventEmitter_SendToType_NotRunning(t *testing.T) {
Expand Down
11 changes: 7 additions & 4 deletions src/serviceprovider/eventEmitter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Hub struct {
clientsByIP map[string]string // Map of IP address to client ID
subscriptions map[constants.EventType]map[string]bool // Map of event type to set of client IDs (type-safe)
broadcast chan *models.EventMessage // Channel for broadcasting messages
clientToHub chan hubCommand // Channel for commands from clients
clientToHub chan hubCommand // Channel for commands from websocket clients
shutdownChan chan struct{} // Closed to signal shutdown started (never sent to)
stopped chan struct{} // Closed to signal hub has fully stopped
mu sync.RWMutex
Expand Down Expand Up @@ -390,15 +390,16 @@ func (h *Hub) broadcastMessage(message *models.EventMessage) error {
return nil
}

func (h *Hub) unsubscribeClientFromTypes(clientID string, userId string, eventTypes []constants.EventType) []string {
func (h *Hub) unsubscribeClientFromTypes(clientID string, userId string, eventTypes []constants.EventType) ([]string, error) {

h.mu.Lock()
defer h.mu.Unlock()
unsubscribed := []string{}
err := error(nil)
_, clientExists := h.clients[clientID]
if !clientExists {
h.ctx.LogWarnf("[Client %s] Attempted to unsubscribe but client does not exist", clientID)
return unsubscribed
return unsubscribed, fmt.Errorf("client %s does not exist", clientID)
}
var globalAttempted bool
for _, eventType := range eventTypes {
Expand All @@ -417,6 +418,7 @@ func (h *Hub) unsubscribeClientFromTypes(clientID string, userId string, eventTy
}
} else {
h.ctx.LogWarnf("[Client %s] Not subscribed to event type %s, cannot unsubscribe", clientID, eventType)
err = fmt.Errorf("not subscribed to event type %s", eventType)
}
}
if len(unsubscribed) > 0 {
Expand All @@ -425,8 +427,9 @@ func (h *Hub) unsubscribeClientFromTypes(clientID string, userId string, eventTy

if globalAttempted {
h.ctx.LogWarnf("[Client %s] Cannot unsubscribe from global event type", clientID)
err = fmt.Errorf("cannot unsubscribe from %s event type", constants.EventTypeGlobal)
}
return unsubscribed
return unsubscribed, err
}

func (c *unregisterClientCmd) execute(h *Hub) {
Expand Down
6 changes: 4 additions & 2 deletions src/serviceprovider/eventEmitter/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,13 @@ func TestHub_UnsubscribeClientFromTypes(t *testing.T) {
hub.subscriptions[constants.EventTypeHost] = map[string]bool{"client1": true}
hub.subscriptions[constants.EventTypeGlobal] = map[string]bool{"client1": true}

result := hub.unsubscribeClientFromTypes("client1", "user1", []constants.EventType{
result, err := hub.unsubscribeClientFromTypes("client1", "user1", []constants.EventType{
constants.EventTypeVM,
constants.EventTypeHost,
})

assert.Len(t, result, 2)
assert.NoError(t, err)
assert.Contains(t, result, "vm")
assert.Contains(t, result, "host")
assert.Empty(t, hub.subscriptions[constants.EventTypeVM])
Expand All @@ -318,11 +319,12 @@ func TestHub_UnsubscribeClientFromTypes_CannotUnsubscribeGlobal(t *testing.T) {
hub.clients["client1"] = &Client{ID: "client1"}
hub.subscriptions[constants.EventTypeGlobal] = map[string]bool{"client1": true}

result := hub.unsubscribeClientFromTypes("client1", "user1", []constants.EventType{
result, err := hub.unsubscribeClientFromTypes("client1", "user1", []constants.EventType{
constants.EventTypeGlobal,
})

assert.Empty(t, result)
assert.Error(t, err)
// Global should still be there
assert.Len(t, hub.subscriptions[constants.EventTypeGlobal], 1)
}
Expand Down
49 changes: 30 additions & 19 deletions src/serviceprovider/eventEmitter/websocket_handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package eventemitter

import (
"encoding/json"
"fmt"
"net/http"
"strings"
Expand Down Expand Up @@ -46,12 +47,11 @@ func HandleWebSocketConnection(w http.ResponseWriter, r *http.Request, ctx basec

usr := ctx.GetUser()
if usr == nil {
// for testing purposes only
usr = &models.ApiUser{
ID: "anonymous",
Username: "anonymous",
ctx.LogErrorf("WebSocket connection without authenticated user")
return &models.ApiErrorResponse{
Message: "Authentication required",
Code: http.StatusUnauthorized,
}
ctx.LogWarnf("WebSocket connection established without authenticated user")
}
// Create client
client := &Client{
Expand All @@ -78,44 +78,55 @@ func HandleWebSocketConnection(w http.ResponseWriter, r *http.Request, ctx basec
return nil
}

func HandleUnsubscribe(r *http.Request, ctx basecontext.ApiContext) *models.ApiErrorResponse {
func HandleUnsubscribe(w http.ResponseWriter, r *http.Request, ctx basecontext.ApiContext) {
var request models.UnsubscribeRequest
if err := http_helper.MapRequestBody(r, &request); err != nil {
ctx.LogWarnf("Invalid unsubscribe request body: %v", err)
return &models.ApiErrorResponse{
w.WriteHeader(http.StatusBadRequest)
_ = json.NewEncoder(w).Encode(models.ApiErrorResponse{
Message: "Invalid request body: " + err.Error(),
Code: http.StatusBadRequest,
}
})
}

if len(request.EventTypes) == 0 {
ctx.LogInfof("No event_types specified")
return &models.ApiErrorResponse{
w.WriteHeader(http.StatusBadRequest)
_ = json.NewEncoder(w).Encode(models.ApiErrorResponse{
Message: "Invalid event_types body parameter",
Code: http.StatusBadRequest,
}
})
}

eventTypesList, err := stringToEventTypes(request.EventTypes)
if len(eventTypesList) <= 0 {
ctx.LogWarnf("No valid event types to unsubscribe: %v", err)
return &models.ApiErrorResponse{
w.WriteHeader(http.StatusBadRequest)
_ = json.NewEncoder(w).Encode(models.ApiErrorResponse{
Message: "No valid event types to unsubscribe: " + err.Error(),
Code: http.StatusBadRequest,
}
})
return
}
usr := ctx.GetUser()

unsubscribed := Get().hub.unsubscribeClientFromTypes(request.ClientID, usr.ID, eventTypesList)
unsubscribed, err := Get().hub.unsubscribeClientFromTypes(request.ClientID, ctx.GetUser().Username, eventTypesList)

if len(unsubscribed) == 0 {

return &models.ApiErrorResponse{
Message: "No event types were unsubscribed",
w.WriteHeader(http.StatusBadRequest)
_ = json.NewEncoder(w).Encode(models.ApiErrorResponse{
Message: err.Error(),
Code: http.StatusBadRequest,
}
})
return
}

if err != nil && len(unsubscribed) > 0 {
w.WriteHeader(http.StatusOK)
_ = json.NewEncoder(w).Encode(models.ApiErrorResponse{
Message: err.Error() + " unsubscribed from: " + strings.Join(unsubscribed, ", "),
Code: http.StatusOK,
})
return
}
ctx.LogInfof("Client %s unsubscribed from event types: %v", request.ClientID, unsubscribed)
return nil
}
42 changes: 42 additions & 0 deletions src/serviceprovider/parallelsdesktop/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/Parallels/prl-devops-service/helpers"
"github.com/Parallels/prl-devops-service/models"
"github.com/Parallels/prl-devops-service/processlauncher"
eventemitter "github.com/Parallels/prl-devops-service/serviceprovider/eventEmitter"
"github.com/Parallels/prl-devops-service/serviceprovider/git"
"github.com/Parallels/prl-devops-service/serviceprovider/interfaces"
"github.com/Parallels/prl-devops-service/serviceprovider/packer"
Expand Down Expand Up @@ -423,15 +424,30 @@ func (s *ParallelsService) processEvent(ctx basecontext.ApiContext, event models

func (s *ParallelsService) processVmStateChanged(ctx basecontext.ApiContext, event models.ParallelsServiceEvent) {
if event.AdditionalInfo != nil && event.AdditionalInfo.VmStateName != "" {
var prevState string
s.Lock()
for i, vm := range s.cachedLocalVms {
if vm.ID == event.VMID {
ctx.LogInfof("Updating cached state for VM %s from %s to %s", vm.ID, vm.State, event.AdditionalInfo.VmStateName)
prevState = vm.State
s.cachedLocalVms[i].State = event.AdditionalInfo.VmStateName
break
}
}
s.Unlock()
VmStateChangeEvent := models.VmStateChange{
PreviousState: prevState,
CurrentState: event.AdditionalInfo.VmStateName,
VmID: event.VMID,
}
go func() {
if ee := eventemitter.Get(); ee != nil && ee.IsRunning() {
msg := models.NewEventMessage(constants.EventTypePDFM, "VM_STATE_CHANGED", VmStateChangeEvent)
if err := ee.BroadcastMessage(msg); err != nil {
ctx.LogErrorf("Error broadcasting VM state change event: %v", err)
}
}
}()
}
}

Expand Down Expand Up @@ -477,6 +493,19 @@ func (s *ParallelsService) processVmAdded(ctx basecontext.ApiContext, event mode
s.cachedLocalVms = append(s.cachedLocalVms, machine)
s.Unlock()
ctx.LogInfof("Added VM %s to cache", event.VMID)
VmAddedEvent := models.VmAdded{
VmID: event.VMID,
NewVm: machine,
}

go func() {
if ee := eventemitter.Get(); ee != nil && ee.IsRunning() {
msg := models.NewEventMessage(constants.EventTypePDFM, "VM_ADDED", VmAddedEvent)
if err := ee.BroadcastMessage(msg); err != nil {
ctx.LogErrorf("Error broadcasting VM added event: %v", err)
}
}
}()
return
}
}
Expand All @@ -490,6 +519,19 @@ func (s *ParallelsService) processVmUnregistered(ctx basecontext.ApiContext, eve
s.cachedLocalVms = append(s.cachedLocalVms[:i], s.cachedLocalVms[i+1:]...)
s.Unlock()
ctx.LogInfof("Removed VM %s from cache", event.VMID)

VmRemoved := models.VmRemoved{
VmID: event.VMID,
}
go func() {
if ee := eventemitter.Get(); ee != nil && ee.IsRunning() {
msg := models.NewEventMessage(constants.EventTypePDFM, "VM_REMOVED", VmRemoved)
if err := ee.BroadcastMessage(msg); err != nil {
ctx.LogErrorf("Error broadcasting VM removed event: %v", err)
}
}
}()

break
}
}
Expand Down
Loading
Loading