Skip to content

Commit 14dfae7

Browse files
committed
test: add e2e tests for error paths, streaming, and aggregation
Signed-off-by: pmady <pavan4devops@gmail.com>
1 parent 62e4443 commit 14dfae7

1 file changed

Lines changed: 181 additions & 18 deletions

File tree

tests/e2e/grpc_scaler_test.go

Lines changed: 181 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ package e2e
2121

2222
import (
2323
"context"
24-
"fmt"
2524
"net"
2625
"testing"
2726
"time"
@@ -37,8 +36,6 @@ import (
3736
"github.com/pmady/keda-gpu-scaler/pkg/scaler"
3837
)
3938

40-
// startTestServer starts a gRPC server with the given mock devices and returns
41-
// the address and a cleanup function.
4239
func startTestServer(t *testing.T, devices []gpu.Metrics) (string, func()) {
4340
t.Helper()
4441

@@ -82,7 +79,6 @@ func dialScaler(t *testing.T, addr string) (*grpc.ClientConn, pb.ExternalScalerC
8279
return conn, pb.NewExternalScalerClient(conn)
8380
}
8481

85-
// TestHealthCheck verifies the gRPC health check endpoint responds SERVING.
8682
func TestHealthCheck(t *testing.T) {
8783
devices := []gpu.Metrics{
8884
{Index: 0, GPUUtilization: 50, MemoryUsedMiB: 4096, MemoryTotalMiB: 8192},
@@ -112,7 +108,6 @@ func TestHealthCheck(t *testing.T) {
112108
}
113109
}
114110

115-
// TestIsActive verifies IsActive returns true when GPU utilization exceeds the activation threshold.
116111
func TestIsActive(t *testing.T) {
117112
tests := []struct {
118113
name string
@@ -181,7 +176,6 @@ func TestIsActive(t *testing.T) {
181176
}
182177
}
183178

184-
// TestGetMetricSpec verifies GetMetricSpec returns correct metric name and target value.
185179
func TestGetMetricSpec(t *testing.T) {
186180
devices := []gpu.Metrics{
187181
{Index: 0, GPUUtilization: 75},
@@ -245,7 +239,6 @@ func TestGetMetricSpec(t *testing.T) {
245239
}
246240
}
247241

248-
// TestGetMetrics verifies GetMetrics returns correct GPU metric values.
249242
func TestGetMetrics(t *testing.T) {
250243
tests := []struct {
251244
name string
@@ -338,11 +331,9 @@ func TestGetMetrics(t *testing.T) {
338331
}
339332
}
340333

341-
// TestScaleOutScaleIn simulates KEDA's scaling decision loop:
342-
// 1. Start with high GPU utilization → scaler reports active + high metric
343-
// 2. Drop GPU utilization → scaler reports inactive + low metric
334+
// Spin up a hot server, check it reports active + high metric,
335+
// then swap to a cold server and confirm it flips.
344336
func TestScaleOutScaleIn(t *testing.T) {
345-
// Phase 1: High utilization — should trigger scale out
346337
highDevices := []gpu.Metrics{
347338
{Index: 0, GPUUtilization: 95, MemoryUsedMiB: 7500, MemoryTotalMiB: 8192},
348339
{Index: 1, GPUUtilization: 88, MemoryUsedMiB: 7000, MemoryTotalMiB: 8192},
@@ -387,13 +378,13 @@ func TestScaleOutScaleIn(t *testing.T) {
387378
if highValue <= 80 {
388379
t.Errorf("expected metric > 80 (target) for scale-out, got %v", highValue)
389380
}
390-
t.Logf("Phase 1 (scale-out): metric=%v (max GPU util), target=80 → HPA would scale out", highValue)
381+
t.Logf("high phase: metric=%v", highValue)
391382

392383
cancel()
393384
conn.Close()
394385
cleanup()
395386

396-
// Phase 2: Low utilization — should trigger scale in
387+
// now swap to idle GPUs
397388
lowDevices := []gpu.Metrics{
398389
{Index: 0, GPUUtilization: 5, MemoryUsedMiB: 500, MemoryTotalMiB: 8192},
399390
{Index: 1, GPUUtilization: 3, MemoryUsedMiB: 400, MemoryTotalMiB: 8192},
@@ -436,10 +427,10 @@ func TestScaleOutScaleIn(t *testing.T) {
436427
if lowValue >= 80 {
437428
t.Errorf("expected metric < 80 (target) for scale-in, got %v", lowValue)
438429
}
439-
t.Logf("Phase 2 (scale-in): metric=%v (max GPU util), target=80 → HPA would scale in", lowValue)
430+
t.Logf("low phase: metric=%v", lowValue)
440431
}
441432

442-
// TestAllProfiles verifies that every pre-built profile produces valid gRPC responses.
433+
// Smoke-test all profiles: call IsActive, GetMetricSpec, GetMetrics.
443434
func TestAllProfiles(t *testing.T) {
444435
devices := []gpu.Metrics{
445436
{
@@ -465,7 +456,7 @@ func TestAllProfiles(t *testing.T) {
465456
defer conn.Close()
466457

467458
for _, profile := range profileNames {
468-
t.Run(fmt.Sprintf("profile-%s", profile), func(t *testing.T) {
459+
t.Run(profile, func(t *testing.T) {
469460
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
470461
defer cancel()
471462

@@ -503,12 +494,184 @@ func TestAllProfiles(t *testing.T) {
503494
t.Errorf("expected 1 metric value for %s, got %d", profile, len(metricsResp.MetricValues))
504495
}
505496

506-
t.Logf("profile=%s metric=%s value=%v target=%v",
497+
t.Logf("%s: val=%v target=%v",
507498
profile,
508-
specResp.MetricSpecs[0].MetricName,
509499
metricsResp.MetricValues[0].MetricValueFloat,
510500
specResp.MetricSpecs[0].TargetSizeFloat,
511501
)
512502
})
513503
}
514504
}
505+
506+
func TestBadMetadata(t *testing.T) {
507+
devices := []gpu.Metrics{
508+
{Index: 0, GPUUtilization: 50},
509+
}
510+
addr, cleanup := startTestServer(t, devices)
511+
defer cleanup()
512+
513+
conn, client := dialScaler(t, addr)
514+
defer conn.Close()
515+
516+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
517+
defer cancel()
518+
519+
badCases := []struct {
520+
name string
521+
metadata map[string]string
522+
}{
523+
{"bogus profile", map[string]string{"profile": "doesnt-exist"}},
524+
{"non-numeric targetValue", map[string]string{"targetValue": "abc"}},
525+
{"non-numeric gpuIndex", map[string]string{"gpuIndex": "xyz"}},
526+
{"bad aggregation", map[string]string{"aggregation": "median"}},
527+
}
528+
529+
for _, tc := range badCases {
530+
t.Run(tc.name, func(t *testing.T) {
531+
_, err := client.IsActive(ctx, &pb.ScaledObjectRef{
532+
Name: "bad",
533+
Namespace: "default",
534+
ScalerMetadata: tc.metadata,
535+
})
536+
if err == nil {
537+
t.Errorf("expected error for metadata %v, got nil", tc.metadata)
538+
}
539+
})
540+
}
541+
}
542+
543+
func TestStreamIsActive(t *testing.T) {
544+
devices := []gpu.Metrics{
545+
{Index: 0, GPUUtilization: 60, MemoryUsedMiB: 4096, MemoryTotalMiB: 8192},
546+
}
547+
addr, cleanup := startTestServer(t, devices)
548+
defer cleanup()
549+
550+
conn, client := dialScaler(t, addr)
551+
defer conn.Close()
552+
553+
// short poll so we don't wait forever
554+
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
555+
defer cancel()
556+
557+
stream, err := client.StreamIsActive(ctx, &pb.ScaledObjectRef{
558+
Name: "stream-test",
559+
Namespace: "default",
560+
ScalerMetadata: map[string]string{
561+
"pollIntervalSeconds": "1",
562+
},
563+
})
564+
if err != nil {
565+
t.Fatalf("StreamIsActive call failed: %v", err)
566+
}
567+
568+
// read at least one message
569+
resp, err := stream.Recv()
570+
if err != nil {
571+
t.Fatalf("stream recv failed: %v", err)
572+
}
573+
// 60 > 0 (default activation), should be active
574+
if !resp.Result {
575+
t.Errorf("expected stream to report active, got false")
576+
}
577+
}
578+
579+
// gpuIndex out of range should error from the mock collector
580+
func TestGpuIndexOutOfRange(t *testing.T) {
581+
devices := []gpu.Metrics{
582+
{Index: 0, GPUUtilization: 50},
583+
}
584+
addr, cleanup := startTestServer(t, devices)
585+
defer cleanup()
586+
587+
conn, client := dialScaler(t, addr)
588+
defer conn.Close()
589+
590+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
591+
defer cancel()
592+
593+
_, err := client.GetMetrics(ctx, &pb.GetMetricsRequest{
594+
ScaledObjectRef: &pb.ScaledObjectRef{
595+
Name: "oob-test",
596+
Namespace: "default",
597+
ScalerMetadata: map[string]string{
598+
"gpuIndex": "99",
599+
},
600+
},
601+
MetricName: "keda_gpu_metric",
602+
})
603+
if err == nil {
604+
t.Error("expected error for gpuIndex=99 with 1 device, got nil")
605+
}
606+
}
607+
608+
// min aggregation across 4 GPUs
609+
func TestAggregationMin(t *testing.T) {
610+
devices := []gpu.Metrics{
611+
{Index: 0, GPUUtilization: 80},
612+
{Index: 1, GPUUtilization: 40},
613+
{Index: 2, GPUUtilization: 90},
614+
{Index: 3, GPUUtilization: 55},
615+
}
616+
addr, cleanup := startTestServer(t, devices)
617+
defer cleanup()
618+
619+
conn, client := dialScaler(t, addr)
620+
defer conn.Close()
621+
622+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
623+
defer cancel()
624+
625+
resp, err := client.GetMetrics(ctx, &pb.GetMetricsRequest{
626+
ScaledObjectRef: &pb.ScaledObjectRef{
627+
Name: "min-test",
628+
Namespace: "default",
629+
ScalerMetadata: map[string]string{
630+
"aggregation": "min",
631+
},
632+
},
633+
MetricName: "keda_gpu_metric",
634+
})
635+
if err != nil {
636+
t.Fatalf("GetMetrics failed: %v", err)
637+
}
638+
got := resp.MetricValues[0].MetricValueFloat
639+
if got != 40 {
640+
t.Errorf("min aggregation = %v, want 40", got)
641+
}
642+
}
643+
644+
// sum aggregation
645+
func TestAggregationSum(t *testing.T) {
646+
devices := []gpu.Metrics{
647+
{Index: 0, GPUUtilization: 20},
648+
{Index: 1, GPUUtilization: 30},
649+
{Index: 2, GPUUtilization: 50},
650+
}
651+
addr, cleanup := startTestServer(t, devices)
652+
defer cleanup()
653+
654+
conn, client := dialScaler(t, addr)
655+
defer conn.Close()
656+
657+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
658+
defer cancel()
659+
660+
resp, err := client.GetMetrics(ctx, &pb.GetMetricsRequest{
661+
ScaledObjectRef: &pb.ScaledObjectRef{
662+
Name: "sum-test",
663+
Namespace: "default",
664+
ScalerMetadata: map[string]string{
665+
"aggregation": "sum",
666+
},
667+
},
668+
MetricName: "keda_gpu_metric",
669+
})
670+
if err != nil {
671+
t.Fatalf("GetMetrics failed: %v", err)
672+
}
673+
got := resp.MetricValues[0].MetricValueFloat
674+
if got != 100 {
675+
t.Errorf("sum aggregation = %v, want 100", got)
676+
}
677+
}

0 commit comments

Comments
 (0)