Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions backend/src/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,27 +356,29 @@ func startRPCServer(resourceManager *resource.ResourceManager, tlsCfg *tls.Confi
ReportServerV1 := server.NewReportServerV1(resourceManager)
ReportServer := server.NewReportServer(resourceManager)

VisualizationServerV1 := server.NewVisualizationServerV1(resourceManager)
VisualizationServer := server.NewVisualizationServer(resourceManager)

AuthServerV1 := server.NewAuthServerV1(resourceManager)
AuthServer := server.NewAuthServer(resourceManager)
Comment thread
Amr-Shams marked this conversation as resolved.

apiv1beta1.RegisterExperimentServiceServer(s, ExperimentServerV1)
apiv1beta1.RegisterPipelineServiceServer(s, PipelineServerV1)
apiv1beta1.RegisterJobServiceServer(s, JobServerV1)
apiv1beta1.RegisterRunServiceServer(s, RunServerV1)
apiv1beta1.RegisterTaskServiceServer(s, server.NewTaskServer(resourceManager))
apiv1beta1.RegisterReportServiceServer(s, ReportServerV1)

apiv1beta1.RegisterVisualizationServiceServer(
s,
server.NewVisualizationServer(
resourceManager,
common.GetStringConfig(cm.VisualizationServiceHost),
common.GetStringConfig(cm.VisualizationServicePort),
))
apiv1beta1.RegisterAuthServiceServer(s, server.NewAuthServer(resourceManager))
apiv1beta1.RegisterVisualizationServiceServer(s, VisualizationServerV1)
apiv1beta1.RegisterAuthServiceServer(s, AuthServerV1)

apiv2beta1.RegisterExperimentServiceServer(s, ExperimentServer)
apiv2beta1.RegisterPipelineServiceServer(s, PipelineServer)
apiv2beta1.RegisterRecurringRunServiceServer(s, JobServer)
apiv2beta1.RegisterRunServiceServer(s, RunServer)
apiv2beta1.RegisterReportServiceServer(s, ReportServer)
apiv2beta1.RegisterAuthServiceServer(s, AuthServer)
apiv2beta1.RegisterVisualizationServiceServer(s, VisualizationServer)

// Register reflection service on gRPC server.
reflection.Register(s)
Expand Down Expand Up @@ -424,6 +426,8 @@ func startHTTPProxy(resourceManager *resource.ResourceManager, usePipelinesKuber
register(apiv2beta1.RegisterRecurringRunServiceHandlerFromEndpoint, "RecurringRunService")
register(apiv2beta1.RegisterRunServiceHandlerFromEndpoint, "RunService")
register(apiv2beta1.RegisterReportServiceHandlerFromEndpoint, "ReportService")
register(apiv2beta1.RegisterVisualizationServiceHandlerFromEndpoint, "Visualization")
register(apiv2beta1.RegisterAuthServiceHandlerFromEndpoint, "AuthService")

sharedPipelineUploadServer := server.NewPipelineUploadServer(resourceManager, &server.PipelineUploadServerOptions{CollectMetrics: *collectMetricsFlag})
runLogServer := server.NewRunLogServer(resourceManager)
Expand Down
63 changes: 57 additions & 6 deletions backend/src/apiserver/server/auth_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"strings"

apiv1beta1 "github.com/kubeflow/pipelines/backend/api/v1beta1/go_client"
apiv2beta1 "github.com/kubeflow/pipelines/backend/api/v2beta1/go_client"

api "github.com/kubeflow/pipelines/backend/api/v1beta1/go_client"
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
"github.com/kubeflow/pipelines/backend/src/apiserver/resource"
"github.com/kubeflow/pipelines/backend/src/common/util"
Expand All @@ -37,12 +37,45 @@ var rbacResourceTypeToGroup = map[string]string{
common.RbacResourceTypeVisualizations: common.RbacPipelinesGroup,
}

type AuthServer struct {
type AuthServerV1 struct {
resourceManager *resource.ResourceManager
apiv1beta1.UnimplementedAuthServiceServer
}

func (s *AuthServer) AuthorizeV1(ctx context.Context, request *api.AuthorizeRequest) (
type AuthServer struct {
resourceManager *resource.ResourceManager
apiv2beta1.UnimplementedAuthServiceServer
}

func (s *AuthServerV1) AuthorizeV1(ctx context.Context, request *apiv1beta1.AuthorizeRequest) (
*emptypb.Empty, error,
) {
err := ValidateAuthorizeRequestV1(request)
if err != nil {
return nil, util.Wrap(err, "Authorize request is not valid")
}

namespace := strings.ToLower(request.GetNamespace())
verb := strings.ToLower(request.GetVerb().String())
resource := strings.ToLower(request.GetResources().String())
resourceAttributes := &authorizationv1.ResourceAttributes{
Namespace: namespace,
Verb: verb,
Group: rbacResourceTypeToGroup[resource],
Version: common.RbacPipelinesVersion,
Resource: resource,
Subresource: "",
Name: "",
}
err = s.resourceManager.IsAuthorized(ctx, resourceAttributes)
if err != nil {
return nil, util.Wrap(err, "Failed to authorize the request")
}

return &emptypb.Empty{}, nil
}

func (s *AuthServer) Authorize(ctx context.Context, request *apiv2beta1.AuthorizeRequest) (
*emptypb.Empty, error,
) {
err := ValidateAuthorizeRequest(request)
Expand Down Expand Up @@ -70,22 +103,40 @@ func (s *AuthServer) AuthorizeV1(ctx context.Context, request *api.AuthorizeRequ
return &emptypb.Empty{}, nil
}

func ValidateAuthorizeRequest(request *api.AuthorizeRequest) error {
func ValidateAuthorizeRequestV1(request *apiv1beta1.AuthorizeRequest) error {
if request == nil {
return util.NewInvalidInputError("request object is empty")
}
if len(request.Namespace) == 0 {
return util.NewInvalidInputError("Namespace is empty. Please specify a valid namespace")
}
if request.Resources == apiv1beta1.AuthorizeRequest_UNASSIGNED_RESOURCES {
return util.NewInvalidInputError("Resources not specified. Please specify a valid resources")
}
if request.Verb == apiv1beta1.AuthorizeRequest_UNASSIGNED_VERB {
return util.NewInvalidInputError("Verb not specified. Please specify a valid verb")
}
return nil
}
func ValidateAuthorizeRequest(request *apiv2beta1.AuthorizeRequest) error {
if request == nil {
return util.NewInvalidInputError("request object is empty")
}
if len(request.Namespace) == 0 {
return util.NewInvalidInputError("Namespace is empty. Please specify a valid namespace")
}
if request.Resources == api.AuthorizeRequest_UNASSIGNED_RESOURCES {
if request.Resources == apiv2beta1.AuthorizeRequest_UNASSIGNED_RESOURCES {
return util.NewInvalidInputError("Resources not specified. Please specify a valid resources")
}
if request.Verb == api.AuthorizeRequest_UNASSIGNED_VERB {
if request.Verb == apiv2beta1.AuthorizeRequest_UNASSIGNED_VERB {
return util.NewInvalidInputError("Verb not specified. Please specify a valid verb")
}
return nil
}

func NewAuthServerV1(resourceManager *resource.ResourceManager) *AuthServerV1 {
return &AuthServerV1{resourceManager: resourceManager}
}
func NewAuthServer(resourceManager *resource.ResourceManager) *AuthServer {
return &AuthServer{resourceManager: resourceManager}
}
156 changes: 150 additions & 6 deletions backend/src/apiserver/server/auth_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"

api "github.com/kubeflow/pipelines/backend/api/v1beta1/go_client"
apiv2beta1 "github.com/kubeflow/pipelines/backend/api/v2beta1/go_client"
"github.com/kubeflow/pipelines/backend/src/apiserver/client"
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
"github.com/spf13/viper"
Expand All @@ -30,7 +31,7 @@ import (
func TestAuthorizeRequest_SingleUserMode(t *testing.T) {
clients, manager, _ := initWithExperiment(t)
defer clients.Close()
authServer := AuthServer{resourceManager: manager}
authServer := AuthServerV1{resourceManager: manager}
clients.SubjectAccessReviewClientFake = client.NewFakeSubjectAccessReviewClientUnauthorized()

md := metadata.New(map[string]string{})
Expand All @@ -53,7 +54,7 @@ func TestAuthorizeRequest_InvalidRequest(t *testing.T) {

clients, manager, _ := initWithExperiment(t)
defer clients.Close()
authServer := AuthServer{resourceManager: manager}
authServer := AuthServerV1{resourceManager: manager}

md := metadata.New(map[string]string{})
ctx := metadata.NewIncomingContext(context.Background(), md)
Expand All @@ -75,7 +76,7 @@ func TestAuthorizeRequest_Authorized(t *testing.T) {

clients, manager, _ := initWithExperiment(t)
defer clients.Close()
authServer := AuthServer{resourceManager: manager}
authServer := AuthServerV1{resourceManager: manager}

md := metadata.New(map[string]string{common.GoogleIAPUserIdentityHeader: "accounts.google.com:user@google.com"})
ctx := metadata.NewIncomingContext(context.Background(), md)
Expand All @@ -96,7 +97,7 @@ func TestAuthorizeRequest_Unauthorized(t *testing.T) {

clients, manager, _ := initWithExperiment_SubjectAccessReview_Unauthorized(t)
defer clients.Close()
authServer := AuthServer{resourceManager: manager}
authServer := AuthServerV1{resourceManager: manager}

userIdentity := "user@google.com"
md := metadata.New(map[string]string{common.GoogleIAPUserIdentityHeader: common.GoogleIAPUserIdentityPrefix + userIdentity})
Expand Down Expand Up @@ -129,7 +130,7 @@ func TestAuthorizeRequest_EmptyUserIdPrefix(t *testing.T) {

clients, manager, _ := initWithExperiment(t)
defer clients.Close()
authServer := AuthServer{resourceManager: manager}
authServer := AuthServerV1{resourceManager: manager}

md := metadata.New(map[string]string{common.GoogleIAPUserIdentityHeader: "user@google.com"})
ctx := metadata.NewIncomingContext(context.Background(), md)
Expand All @@ -150,7 +151,7 @@ func TestAuthorizeRequest_Unauthenticated(t *testing.T) {

clients, manager, _ := initWithExperiment(t)
defer clients.Close()
authServer := AuthServer{resourceManager: manager}
authServer := AuthServerV1{resourceManager: manager}

md := metadata.New(map[string]string{"no-identity-header": "user"})
ctx := metadata.NewIncomingContext(context.Background(), md)
Expand All @@ -169,3 +170,146 @@ func TestAuthorizeRequest_Unauthenticated(t *testing.T) {
"there is no user identity header",
)
}

func TestAuthorizeV2Request_SingleUserMode(t *testing.T) {
clients, manager, _ := initWithExperiment(t)
defer clients.Close()
authServer := AuthServer{resourceManager: manager}
clients.SubjectAccessReviewClientFake = client.NewFakeSubjectAccessReviewClientUnauthorized()

md := metadata.New(map[string]string{})
ctx := metadata.NewIncomingContext(context.Background(), md)

request := &apiv2beta1.AuthorizeRequest{
Namespace: "ns1",
Resources: apiv2beta1.AuthorizeRequest_VIEWERS,
Verb: apiv2beta1.AuthorizeRequest_GET,
}

_, err := authServer.Authorize(ctx, request)
// Authz is completely skipped without checking anything.
assert.Nil(t, err)
}

func TestAuthorizeV2Request_InvalidRequest(t *testing.T) {
viper.Set(common.MultiUserMode, "true")
defer viper.Set(common.MultiUserMode, "false")

clients, manager, _ := initWithExperiment(t)
defer clients.Close()
authServer := AuthServer{resourceManager: manager}

md := metadata.New(map[string]string{})
ctx := metadata.NewIncomingContext(context.Background(), md)

request := &apiv2beta1.AuthorizeRequest{
Namespace: "",
Resources: apiv2beta1.AuthorizeRequest_UNASSIGNED_RESOURCES,
Verb: apiv2beta1.AuthorizeRequest_UNASSIGNED_VERB,
}

_, err := authServer.Authorize(ctx, request)
assert.Error(t, err)
assert.EqualError(t, err, "Authorize request is not valid: Invalid input error: Namespace is empty. Please specify a valid namespace")
}

func TestAuthorizeV2Request_Authorized(t *testing.T) {
viper.Set(common.MultiUserMode, "true")
defer viper.Set(common.MultiUserMode, "false")

clients, manager, _ := initWithExperiment(t)
defer clients.Close()
authServer := AuthServer{resourceManager: manager}

md := metadata.New(map[string]string{common.GoogleIAPUserIdentityHeader: "accounts.google.com:user@google.com"})
ctx := metadata.NewIncomingContext(context.Background(), md)

request := &apiv2beta1.AuthorizeRequest{
Namespace: "ns1",
Resources: apiv2beta1.AuthorizeRequest_VIEWERS,
Verb: apiv2beta1.AuthorizeRequest_GET,
}

_, err := authServer.Authorize(ctx, request)
assert.Nil(t, err)
}

func TestAuthorizeV2Request_Unauthorized(t *testing.T) {
viper.Set(common.MultiUserMode, "true")
defer viper.Set(common.MultiUserMode, "false")

clients, manager, _ := initWithExperiment_SubjectAccessReview_Unauthorized(t)
defer clients.Close()
authServer := AuthServer{resourceManager: manager}

userIdentity := "user@google.com"
md := metadata.New(map[string]string{common.GoogleIAPUserIdentityHeader: common.GoogleIAPUserIdentityPrefix + userIdentity})
ctx := metadata.NewIncomingContext(context.Background(), md)

request := &apiv2beta1.AuthorizeRequest{
Namespace: "ns1",
Resources: apiv2beta1.AuthorizeRequest_VIEWERS,
Verb: apiv2beta1.AuthorizeRequest_GET,
}

_, err := authServer.Authorize(ctx, request)
assert.Error(t, err)

resourceAttributes := &authorizationv1.ResourceAttributes{
Namespace: "ns1",
Verb: common.RbacResourceVerbGet,
Group: common.RbacKubeflowGroup,
Version: common.RbacPipelinesVersion,
Resource: common.RbacResourceTypeViewers,
}
assert.EqualError(t, err, wrapFailedAuthzRequestError(getPermissionDeniedError(userIdentity, resourceAttributes)).Error())
}

func TestAuthorizeV2Request_EmptyUserIdPrefix(t *testing.T) {
viper.Set(common.MultiUserMode, "true")
defer viper.Set(common.MultiUserMode, "false")
viper.Set(common.KubeflowUserIDPrefix, "")
defer viper.Set(common.KubeflowUserIDPrefix, common.GoogleIAPUserIdentityPrefix)

clients, manager, _ := initWithExperiment(t)
defer clients.Close()
authServer := AuthServer{resourceManager: manager}

md := metadata.New(map[string]string{common.GoogleIAPUserIdentityHeader: "user@google.com"})
ctx := metadata.NewIncomingContext(context.Background(), md)

request := &apiv2beta1.AuthorizeRequest{
Namespace: "ns1",
Resources: apiv2beta1.AuthorizeRequest_VIEWERS,
Verb: apiv2beta1.AuthorizeRequest_GET,
}

_, err := authServer.Authorize(ctx, request)
assert.Nil(t, err)
}

func TestAuthorizeV2Request_Unauthenticated(t *testing.T) {
viper.Set(common.MultiUserMode, "true")
defer viper.Set(common.MultiUserMode, "false")

clients, manager, _ := initWithExperiment(t)
defer clients.Close()
authServer := AuthServer{resourceManager: manager}

md := metadata.New(map[string]string{"no-identity-header": "user"})
ctx := metadata.NewIncomingContext(context.Background(), md)

request := &apiv2beta1.AuthorizeRequest{
Namespace: "ns1",
Resources: apiv2beta1.AuthorizeRequest_VIEWERS,
Verb: apiv2beta1.AuthorizeRequest_GET,
}

_, err := authServer.Authorize(ctx, request)
assert.NotNil(t, err)
assert.Contains(
t,
err.Error(),
"there is no user identity header",
)
}
Loading
Loading