Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions historyserver/config/historyserver-azureblob.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
apiVersion: v1
kind: Service
metadata:
name: historyserver
labels:
app: historyserver
spec:
selector:
app: historyserver
ports:
- protocol: TCP
name: http
port: 30080
targetPort: 8080
type: ClusterIP
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: historyserver-demo
labels:
app: historyserver
spec:
replicas: 1
selector:
matchLabels:
app: historyserver
template:
metadata:
labels:
app: historyserver
spec:
serviceAccountName: historyserver
containers:
- name: historyserver
env:
- name: AZURE_STORAGE_CONNECTION_STRING
value: "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite-service.azurite-dev.svc.cluster.local:10000/devstoreaccount1;"
- name: AZURE_STORAGE_CONTAINER
value: "ray-historyserver"
image: historyserver:v0.1.0
imagePullPolicy: IfNotPresent
command:
- historyserver
- --runtime-class-name=azureblob
- --ray-root-dir=log
ports:
- containerPort: 8080
resources:
limits:
cpu: "500m"
3 changes: 2 additions & 1 deletion historyserver/docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ Sample configs are in the `config/` directory:
| `raycluster.yaml` | Ray cluster with collector sidecar (S3/MinIO) |
| `raycluster-azureblob.yaml` | Ray cluster with collector sidecar (Azure Blob) |
| `rayjob.yaml` | Sample Ray job for testing |
| `historyserver.yaml` | History Server deployment |
| `historyserver.yaml` | History Server deployment (S3/MinIO) |
| `historyserver-azureblob.yaml` | History Server deployment (Azure Blob) |
| `service_account.yaml` | Service account for History Server |

## Additional resources
Expand Down
6 changes: 3 additions & 3 deletions historyserver/test/e2e/azureblob_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func testAzureBlobUploadOnGracefulShutdown(test Test, g *WithT, namespace *corev

_ = ApplyRayJobAndWaitForCompletion(test, g, namespace, rayCluster)

clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, RayClusterID)
clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, namespace.Name)
sessionID := GetSessionIDFromHeadPod(test, g, rayCluster)
headNodeID := GetNodeIDFromPod(test, g, HeadPod(test, rayCluster), "ray-head")
workerNodeID := GetNodeIDFromPod(test, g, FirstWorkerPod(test, rayCluster), "ray-worker")
Expand All @@ -84,7 +84,7 @@ func testAzureBlobSeparatesFilesBySession(test Test, g *WithT, namespace *corev1

_ = ApplyRayJobAndWaitForCompletion(test, g, namespace, rayCluster)

clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, RayClusterID)
clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, namespace.Name)
sessionID := GetSessionIDFromHeadPod(test, g, rayCluster)
headNodeID := GetNodeIDFromPod(test, g, HeadPod(test, rayCluster), "ray-head")
workerNodeID := GetNodeIDFromPod(test, g, FirstWorkerPod(test, rayCluster), "ray-worker")
Expand All @@ -108,7 +108,7 @@ func testAzureBlobResumesUploadsOnRestart(test Test, g *WithT, namespace *corev1

dummySessionID := fmt.Sprintf("test-recovery-session-%s", namespace.Name)
dummyNodeID := fmt.Sprintf("head-node-%s", namespace.Name)
clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, RayClusterID)
clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, namespace.Name)
sessionPrefix := fmt.Sprintf("log/%s/%s/", clusterNameID, dummySessionID)

headPod, err := GetHeadPod(test, rayCluster)
Expand Down
138 changes: 138 additions & 0 deletions historyserver/test/e2e/historyserver_azureblob_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package e2e

import (
"testing"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"

. "github.com/ray-project/kuberay/historyserver/test/support"
. "github.com/ray-project/kuberay/ray-operator/test/support"
)

func TestAzureHistoryServer(t *testing.T) {
azureClient := EnsureAzureBlobClient(t)

tests := []struct {
name string
testFunc func(Test, *WithT, *corev1.Namespace, *azblob.Client)
}{
{
name: "Live cluster: historyserver endpoints should be accessible",
testFunc: testAzureLiveClusters,
},
{
name: "Dead cluster: historyserver endpoints should be accessible",
testFunc: testAzureDeadClusters,
},
{
name: "/v0/logs/file endpoint (live cluster)",
testFunc: testAzureLogFileEndpointLiveCluster,
},
{
name: "/v0/logs/file endpoint (dead cluster)",
testFunc: testAzureLogFileEndpointDeadCluster,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
test := With(t)
g := NewWithT(t)
namespace := test.NewTestNamespace()

tt.testFunc(test, g, namespace, azureClient)
})
}
}

func testAzureLiveClusters(test Test, g *WithT, namespace *corev1.Namespace, azureClient *azblob.Client) {
rayCluster := PrepareAzureBlobTestEnv(test, g, namespace, azureClient)
ApplyRayJobAndWaitForCompletion(test, g, namespace, rayCluster)
ApplyHistoryServer(test, g, namespace, AzureHistoryServerManifestPath)
historyServerURL := GetHistoryServerURL(test, g, namespace)

clusterInfo := getClusterFromList(test, g, historyServerURL, rayCluster.Name, namespace.Name)
g.Expect(clusterInfo.SessionName).To(Equal(LiveSessionName), "Live cluster should have sessionName='live'")

client := CreateHTTPClientWithCookieJar(g)
setClusterContext(test, g, client, historyServerURL, namespace.Name, rayCluster.Name, clusterInfo.SessionName)
verifyHistoryServerEndpoints(test, g, client, historyServerURL)

DeleteAzureBlobContainer(test, g, azureClient)
LogWithTimestamp(test.T(), "Azure live clusters E2E test completed successfully")
}

func testAzureDeadClusters(test Test, g *WithT, namespace *corev1.Namespace, azureClient *azblob.Client) {
rayCluster := PrepareAzureBlobTestEnv(test, g, namespace, azureClient)
ApplyRayJobAndWaitForCompletion(test, g, namespace, rayCluster)

DeleteRayClusterAndWait(test, g, namespace.Name, rayCluster.Name)

ApplyHistoryServer(test, g, namespace, AzureHistoryServerManifestPath)
historyServerURL := GetHistoryServerURL(test, g, namespace)

clusterInfo := getClusterFromList(test, g, historyServerURL, rayCluster.Name, namespace.Name)
g.Expect(clusterInfo.SessionName).NotTo(Equal(LiveSessionName), "Dead cluster should not have sessionName='live'")

client := CreateHTTPClientWithCookieJar(g)
setClusterContext(test, g, client, historyServerURL, namespace.Name, rayCluster.Name, clusterInfo.SessionName)
verifyHistoryServerEndpoints(test, g, client, historyServerURL)

DeleteAzureBlobContainer(test, g, azureClient)
LogWithTimestamp(test.T(), "Azure dead clusters E2E test completed successfully")
}

func testAzureLogFileEndpointLiveCluster(test Test, g *WithT, namespace *corev1.Namespace, azureClient *azblob.Client) {
rayCluster := PrepareAzureBlobTestEnv(test, g, namespace, azureClient)
ApplyRayJobAndWaitForCompletion(test, g, namespace, rayCluster)
ApplyHistoryServer(test, g, namespace, AzureHistoryServerManifestPath)
historyServerURL := GetHistoryServerURL(test, g, namespace)

clusterInfo := getClusterFromList(test, g, historyServerURL, rayCluster.Name, namespace.Name)
client := CreateHTTPClientWithCookieJar(g)
setClusterContext(test, g, client, historyServerURL, namespace.Name, rayCluster.Name, clusterInfo.SessionName)

nodeID := GetOneOfNodeID(g, client, historyServerURL)

test.T().Run("should return log content", func(t *testing.T) {
VerifyLogFileEndpointReturnsContent(test, NewWithT(t), client, historyServerURL, nodeID)
})

test.T().Run("should reject path traversal", func(t *testing.T) {
VerifyLogFileEndpointRejectsPathTraversal(test, NewWithT(t), client, historyServerURL, nodeID)
})

DeleteAzureBlobContainer(test, g, azureClient)
LogWithTimestamp(test.T(), "Azure log file endpoint tests completed")
}

func testAzureLogFileEndpointDeadCluster(test Test, g *WithT, namespace *corev1.Namespace, azureClient *azblob.Client) {
rayCluster := PrepareAzureBlobTestEnv(test, g, namespace, azureClient)
ApplyRayJobAndWaitForCompletion(test, g, namespace, rayCluster)

DeleteRayClusterAndWait(test, g, namespace.Name, rayCluster.Name)

ApplyHistoryServer(test, g, namespace, AzureHistoryServerManifestPath)
historyServerURL := GetHistoryServerURL(test, g, namespace)

clusterInfo := getClusterFromList(test, g, historyServerURL, rayCluster.Name, namespace.Name)
g.Expect(clusterInfo.SessionName).NotTo(Equal(LiveSessionName))

client := CreateHTTPClientWithCookieJar(g)
setClusterContext(test, g, client, historyServerURL, namespace.Name, rayCluster.Name, clusterInfo.SessionName)

nodeID := GetOneOfNodeID(g, client, historyServerURL)

test.T().Run("should return log content from Azure Blob", func(t *testing.T) {
VerifyLogFileEndpointReturnsContent(test, NewWithT(t), client, historyServerURL, nodeID)
})

test.T().Run("should reject path traversal from Azure Blob", func(t *testing.T) {
VerifyLogFileEndpointRejectsPathTraversal(test, NewWithT(t), client, historyServerURL, nodeID)
})

DeleteAzureBlobContainer(test, g, azureClient)
LogWithTimestamp(test.T(), "Azure dead cluster log file endpoint tests completed")
}
Loading
Loading