Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions loadtest/hub_load.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Usage:
// 1. Allow multiple WebSocket connections from the same IP:
// - build the binary with `-tags=debug`, or
// - temporarily change src/serviceprovider/eventEmitter/connection_limit_release.go to return true.
// 2. Start the API locally (http://localhost:5680) and run `prldevops test event-load` to stream events.
// 3. Execute: k6 run loadtest/hub_load.js

import ws from 'k6/ws';
import http from 'k6/http';
import { check, sleep } from 'k6';
import { Trend, Counter } from 'k6/metrics';

const latencyTrend = new Trend('message_latency');
const messageLossCounter = new Counter('message_loss');
const connectionErrors = new Counter('connection_errors');
const messagesReceived = new Counter('messages_received');
const pingsSent = new Counter('pings_sent');
const pongsReceived = new Counter('pongs_received');

export const options = {
scenarios: {
hub_load: {
executor: 'ramping-vus',
startVUs: 0,
stages: [
{ duration: '30s', target: 400 },
{ duration: '2m', target: 400 },
{ duration: '30s', target: 0 },
],
gracefulStop: '30s',
},
},
};

export default function () {
const data = { email: 'root@localhost', password: '' };
const authRes = http.post('http://localhost:5680/api/v1/auth/token', data);
check(authRes, { 'status is 200': (r) => r.status === 200 });

const token = authRes.json('token');
const url = `ws://localhost:5680/api/v1/ws/subscribe?event_types=pdfm,health`;
const params = {
headers: { 'Authorization': `Bearer ${token}` },
tags: { my_tag: 'hub_load' }
};

// 2. Connect to WebSocket
const response = ws.connect(url, params, function (socket) {
let lastSeq = -1;
let pingInterval;
let localPongCount = 0;
let localPingCount = 0;

socket.on('open', function open() {
// Send first ping immediately
const sendPing = function () {
const pingMsg = {
type: 'health',
message: 'ping'
};
socket.send(JSON.stringify(pingMsg));
localPingCount++;
pingsSent.add(1);
};

// Send first ping right away
sendPing();

// Then send pings every 5 seconds
pingInterval = setInterval(sendPing, 5000);
});

socket.on('message', function (message) {
const msg = JSON.parse(message);

// Track pong responses
if (msg.event_type === 'health' && msg.message === 'pong') {
localPongCount++;
pongsReceived.add(1);
}

// Track PDFM VM events with bigger payload
if (msg.event_type === 'pdfm' && msg.body && msg.body.new_vm && msg.body.new_vm.ID) {
messagesReceived.add(1);

// Parse seq and timestamp from ID format: "seq-{seq}-ts-{timestamp}"
const vmId = msg.body.new_vm.ID;
const parts = vmId.split('-');
if (parts.length >= 4 && parts[0] === 'seq' && parts[2] === 'ts') {
const seq = parseInt(parts[1]);
const sentTime = parseInt(parts[3]);
const now = new Date().getTime() * 1000000; // ns

// Calculate latency (ms)
const latencyMs = (now - sentTime) / 1000000;
if (latencyMs > 0) {
latencyTrend.add(latencyMs);
}

// Check for loss
if (lastSeq !== -1) {
const diff = seq - lastSeq;
if (diff > 1) {
messageLossCounter.add(diff - 1);
}
}
lastSeq = seq;
}
}
});

socket.on('close', function () {
if (pingInterval) {
clearInterval(pingInterval);
}
});

socket.on('error', function (e) {
connectionErrors.add(1);
console.log('error: ', e.error());
});

// Keep connection open for a while
socket.setTimeout(function () {
if (pingInterval) {
clearInterval(pingInterval);
}
socket.close();
}, 30000); // 30s session per VU iteration
});

check(response, { 'status is 101': (r) => r && r.status === 101 });
sleep(1);
}
5 changes: 5 additions & 0 deletions src/cmd/test_providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ func processTestProviders(ctx basecontext.ApiContext, cmd string) {
os.Exit(1)
}
}
case "event-load":
if err := tests.RunEventLoadTest(ctx); err != nil {
ctx.LogErrorf(err.Error())
os.Exit(1)
}
default:
processHelp(constants.TEST_COMMAND)
}
Expand Down
3 changes: 3 additions & 0 deletions src/config/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,9 @@ func (c *Config) GetBoolKey(key string) bool {
}

func (c *Config) SetKey(key string, value string) {
if c.config.Environment == nil {
c.config.Environment = make(map[string]string)
}
c.config.Environment[strings.ToLower(key)] = value
}

Expand Down
96 changes: 96 additions & 0 deletions src/tests/event_load.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package tests

import (
"context"
"fmt"
"net/http"
"strings"
"time"

"github.com/Parallels/prl-devops-service/basecontext"
"github.com/Parallels/prl-devops-service/config"
"github.com/Parallels/prl-devops-service/constants"
"github.com/Parallels/prl-devops-service/models"
eventemitter "github.com/Parallels/prl-devops-service/serviceprovider/eventEmitter"
"github.com/Parallels/prl-devops-service/startup"
)

// RunEventLoadTest boots the service in API mode and broadcasts load-test events for WebSocket clients.
func RunEventLoadTest(ctx basecontext.ApiContext) error {
startup.Init(ctx)

cfg := config.Get()
ctx.LogInfof("Starting Event Load Test API on port %s", cfg.ApiPort())
startup.Start(ctx)

listener := startup.InitApi()
go listener.Start("Event Load Test API", "dev")

apiPort := cfg.ApiPort()
apiPrefix := cfg.ApiPrefix()
if !strings.HasPrefix(apiPrefix, "/") {
apiPrefix = "/" + apiPrefix
}
healthEndpoint := fmt.Sprintf("http://localhost:%s%s/health/probe", apiPort, strings.TrimSuffix(apiPrefix, "/"))

readyCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

for {
req, err := http.NewRequestWithContext(readyCtx, http.MethodGet, healthEndpoint, nil)
if err != nil {
return fmt.Errorf("failed to create readiness request: %w", err)
}

resp, err := http.DefaultClient.Do(req)
if err == nil {
_ = resp.Body.Close()
if resp.StatusCode == http.StatusOK {
ctx.LogInfof("Event Load Test API listener initialized")
break
}
}

if readyCtx.Err() != nil {
return fmt.Errorf("timed out waiting for API initialization: %w", readyCtx.Err())
}

time.Sleep(500 * time.Millisecond)
}

func() {
ticker := time.NewTicker(10 * time.Millisecond) // 100 events/sec
defer ticker.Stop()

seq := 0
for range ticker.C {
seq++

vmPayload := models.VmAdded{
VmID: fmt.Sprintf("vm-load-test-%d", seq),
NewVm: models.ParallelsVM{
ID: fmt.Sprintf("seq-%d-ts-%d", seq, time.Now().UnixNano()),
Name: fmt.Sprintf("LoadTest-VM-%d", seq),
Description: "Load test virtual machine with realistic payload size",
State: "running",
OS: "ubuntu-22.04",
User: "loadtest",
HostId: "load-test-host",
Type: "VM",
Template: "ubuntu-22.04-template",
},
}

msg := models.NewEventMessage(constants.EventTypePDFM, "VM Added - Load Test", vmPayload)

if err := eventemitter.Get().Broadcast(msg); err != nil {
ctx.LogErrorf("Failed to broadcast load test message: %v", err)
}
if seq%1000 == 0 {
ctx.LogInfof("Broadcasted %d VM events", seq)
}
}
}()

return nil
}