Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions apiserver/pkg/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,14 +481,18 @@ func (krc *KuberayAPIServerClient) ListRayServices(request *api.ListRayServicesR
return response, nil, nil
}

// Finds all ray services in a given namespace.
func (krc *KuberayAPIServerClient) ListAllRayServices() (*api.ListAllRayServicesResponse, *rpcStatus.Status, error) {
// Finds all ray services in all namespaces.
func (krc *KuberayAPIServerClient) ListAllRayServices(request *api.ListAllRayServicesRequest) (*api.ListAllRayServicesResponse, *rpcStatus.Status, error) {
getURL := krc.baseURL + "/apis/v1/services"
httpRequest, err := http.NewRequestWithContext(context.TODO(), "GET", getURL, nil)
if err != nil {
return nil, nil, fmt.Errorf("failed to create http request for url '%s': %w", getURL, err)
}

q := httpRequest.URL.Query()
q.Set("pageSize", strconv.FormatInt(int64(request.PageSize), 10))
q.Set("pageToken", request.PageToken)
httpRequest.URL.RawQuery = q.Encode()
httpRequest.Header.Add("Accept", "application/json")

bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
Expand Down
18 changes: 0 additions & 18 deletions apiserver/pkg/manager/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,24 +298,6 @@ func (r *ResourceManager) ListServices(ctx context.Context, namespace string, pa
return rayServices, rayServiceList.Continue, nil
}

func (r *ResourceManager) ListAllServices(ctx context.Context) ([]*rayv1api.RayService, error) {
rayServices := make([]*rayv1api.RayService, 0)

namespaces, err := r.getKubernetesNamespaceClient().List(ctx, metav1.ListOptions{})
if err != nil {
return nil, util.Wrap(err, "Failed to fetch all Kubernetes namespaces")
}

for _, namespace := range namespaces.Items {
servicesByNamespace, _, err := r.ListServices(ctx, namespace.Name, "" /*pageToken*/, 0 /*pageSize*/)
if err != nil {
return nil, util.Wrap(err, "List All Rayservices failed")
}
rayServices = append(rayServices, servicesByNamespace...)
}
return rayServices, nil
}

func (r *ResourceManager) DeleteService(ctx context.Context, serviceName, namespace string) error {
client := r.getRayServiceClient(namespace)
service, err := getServiceByName(ctx, client, serviceName)
Expand Down
7 changes: 4 additions & 3 deletions apiserver/pkg/server/serve_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ func (s *RayServiceServer) ListRayServices(ctx context.Context, request *api.Lis
}, nil
}

func (s *RayServiceServer) ListAllRayServices(ctx context.Context, _ *api.ListAllRayServicesRequest) (*api.ListAllRayServicesResponse, error) {
services, err := s.resourceManager.ListAllServices(ctx)
func (s *RayServiceServer) ListAllRayServices(ctx context.Context, request *api.ListAllRayServicesRequest) (*api.ListAllRayServicesResponse, error) {
services, nextPageToken, err := s.resourceManager.ListServices(ctx, "" /*namespace*/, request.PageToken, request.PageSize)
if err != nil {
return nil, util.Wrap(err, "list all services failed.")
}
Expand All @@ -123,7 +123,8 @@ func (s *RayServiceServer) ListAllRayServices(ctx context.Context, _ *api.ListAl
serviceEventMap[service.Name] = serviceEvents
}
return &api.ListAllRayServicesResponse{
Services: model.FromCrdToAPIServices(services, serviceEventMap),
Services: model.FromCrdToAPIServices(services, serviceEventMap),
NextPageToken: nextPageToken,
}, nil
}

Expand Down
152 changes: 145 additions & 7 deletions apiserver/test/e2e/service_server_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func TestGetAllServices(t *testing.T) {
tCtx.DeleteRayService(t, testServiceRequest.Service.Name)
})

response, actualRPCStatus, err := tCtx.GetRayAPIServerClient().ListAllRayServices()
response, actualRPCStatus, err := tCtx.GetRayAPIServerClient().ListAllRayServices(&api.ListAllRayServicesRequest{})
require.NoError(t, err, "No error expected")
require.Nil(t, actualRPCStatus, "No RPC status expected")
require.NotNil(t, response, "A response is expected")
Expand All @@ -225,6 +225,144 @@ func TestGetAllServices(t *testing.T) {
require.Equal(t, tCtx.GetNamespaceName(), response.Services[0].Namespace)
}

func TestGetAllServicesWithPagination(t *testing.T) {
const numberOfNamespaces = 3
const numberOfService = 2
const totalServices = numberOfNamespaces * numberOfService

type targetService struct {
namespace string
service string
}

tCtxs := make([]*End2EndTestingContext, 0, numberOfNamespaces)
expectedServices := make([]targetService, 0, totalServices)

// Create services for each namespace
for i := 0; i < numberOfNamespaces; i++ {
tCtx, err := NewEnd2EndTestingContext(t)
require.NoError(t, err, "No error expected when creating testing context")

tCtx.CreateComputeTemplate(t)
t.Cleanup(func() {
tCtx.DeleteComputeTemplate(t)
})

for j := 0; j < numberOfService; j++ {
testServiceRequest := createTestServiceV2(t, tCtx)
t.Cleanup(func() {
tCtx.DeleteRayService(t, testServiceRequest.Service.Name)
})
expectedServices = append(expectedServices, targetService{
namespace: tCtx.GetNamespaceName(),
service: testServiceRequest.Service.Name,
})
}

tCtxs = append(tCtxs, tCtx)
}

var pageToken string
tCtx := tCtxs[0]

// Test pagination with limit less than the total number of services in all namespaces.
t.Run("Test pagination return part of the result services", func(t *testing.T) {
pageToken = ""
gotServices := make(map[targetService]bool, totalServices)
for _, expectedService := range expectedServices {
gotServices[expectedService] = false
}

for i := 0; i < totalServices; i++ {
response, actualRPCStatus, err := tCtx.GetRayAPIServerClient().ListAllRayServices(&api.ListAllRayServicesRequest{
PageToken: pageToken,
PageSize: int32(1),
})
require.NoError(t, err, "No error expected")
require.Nil(t, actualRPCStatus, "No RPC status expected")
require.NotNil(t, response, "A response is expected")
require.NotEmpty(t, response.Services, "A list of service is required")
require.Len(t, response.Services, 1, "Got %d services in response, expected %d", len(response.Services), 1)

pageToken = response.NextPageToken
if i == totalServices-1 {
require.Empty(t, pageToken, "No continue token is expected")
} else {
require.NotEmpty(t, pageToken, "A continue token is expected")
}

for _, service := range response.Services {
key := targetService{namespace: service.Namespace, service: service.Name}
seen, exist := gotServices[key]

// Check if this service is in expectedServices list
require.True(t, exist,
"ListAllRayServices returned an unexpected service: namespace=%s, name=%s",
key.namespace, key.service)

// Check if we've already seen this service before (duplicate)
require.False(t, seen,
"ListAllRayServices returned duplicated service: namespace=%s, name=%s",
key.namespace, key.service)

gotServices[key] = true
}
}

// Check all services were found
for _, expectedService := range expectedServices {
require.True(t, gotServices[expectedService],
"ListAllRayServices did not return expected service %s from namespace %s",
expectedService.service, expectedService.namespace)
}
})

// Test pagination with limit larger than the total number of services in all namespaces.
t.Run("Test pagination return all result services", func(t *testing.T) {
pageToken = ""
gotServices := make(map[targetService]bool, totalServices)
for _, expectedService := range expectedServices {
gotServices[expectedService] = false
}

response, actualRPCStatus, err := tCtx.GetRayAPIServerClient().ListAllRayServices(&api.ListAllRayServicesRequest{
PageToken: pageToken,
PageSize: int32(totalServices + 1),
})

require.NoError(t, err, "No error expected")
require.Nil(t, actualRPCStatus, "No RPC status expected")
require.NotNil(t, response, "A response is expected")
require.NotEmpty(t, response.Services, "A list of services is required")
require.Len(t, response.Services, totalServices, "Got %d services in response, expected %d", len(response.Services), totalServices)
require.Empty(t, response.NextPageToken, "Page token should be empty")

for _, service := range response.Services {
key := targetService{namespace: service.Namespace, service: service.Name}
seen, exist := gotServices[key]

// Check if this service is in expectedServices list
require.True(t, exist,
"ListAllRayServices returned an unexpected service: namespace=%s, name=%s",
key.namespace, key.service)

// Check if we've already seen this service before (duplicate)
require.False(t, seen,
"ListAllRayServices returned duplicated service: namespace=%s, name=%s",
key.namespace, key.service)

gotServices[key] = true
}

// Check all services were found
for _, expectedService := range expectedServices {
require.True(t, gotServices[expectedService],
"ListAllRayServices did not return expected service %s from namespace %s",
expectedService.service, expectedService.namespace)
}
})
}

func TestGetServicesInNamespace(t *testing.T) {
tCtx, err := NewEnd2EndTestingContext(t)
require.NoError(t, err, "No error expected when creating testing context")
Expand Down Expand Up @@ -308,9 +446,9 @@ func TestGetServicesInNamespaceWithPagination(t *testing.T) {

// Check all services created have been returned.
for idx := 0; idx < serviceCount; idx++ {
if !gotServices[idx] {
t.Errorf("ListServices did not return expected services %s", expectedServiceNames[idx])
}
require.True(t, gotServices[idx],
"ListServices did not return expected services %s",
expectedServiceNames[idx])
}
})

Expand Down Expand Up @@ -343,9 +481,9 @@ func TestGetServicesInNamespaceWithPagination(t *testing.T) {

// Check all services created have been returned.
for idx := 0; idx < serviceCount; idx++ {
if !gotServices[idx] {
t.Errorf("ListServices did not return expected services %s", expectedServiceNames[idx])
}
require.True(t, gotServices[idx],
"ListServices did not return expected services %s",
expectedServiceNames[idx])
}
})
}
Expand Down
1 change: 1 addition & 0 deletions proto/go_client/serve.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion proto/kuberay_api.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -2034,7 +2034,7 @@
},
"nextPageToken": {
"type": "string",
"description": "The token to list the next page of RayServices.",
"description": "The token to list the next page of RayServices.\nIf there is no more service, this field will be empty.",
"readOnly": true
}
}
Expand Down
1 change: 1 addition & 0 deletions proto/serve.proto
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ message ListAllRayServicesResponse {
// The total number of RayServices for the given query.
int32 total_size = 2 [(google.api.field_behavior) = OUTPUT_ONLY];
// The token to list the next page of RayServices.
// If there is no more service, this field will be empty.
string next_page_token = 3 [(google.api.field_behavior) = OUTPUT_ONLY];
}

Expand Down
2 changes: 1 addition & 1 deletion proto/swagger/serve.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@
},
"nextPageToken": {
"type": "string",
"description": "The token to list the next page of RayServices.",
"description": "The token to list the next page of RayServices.\nIf there is no more service, this field will be empty.",
"readOnly": true
}
}
Expand Down
Loading