Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apiserver/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path"
"strings"
"sync/atomic"
"time"

assetfs "github.com/elazarl/go-bindata-assetfs"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
Expand Down Expand Up @@ -86,7 +87,7 @@ func startRPCServer(resourceManager *manager.ResourceManager) {

s := grpc.NewServer(
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(grpc_prometheus.UnaryServerInterceptor, interceptor.APIServerInterceptor)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(interceptor.TimeoutInterceptor(20*time.Second), grpc_prometheus.UnaryServerInterceptor, interceptor.APIServerInterceptor)),
grpc.MaxRecvMsgSize(math.MaxInt32))
api.RegisterClusterServiceServer(s, clusterServer)
api.RegisterComputeTemplateServiceServer(s, templateServer)
Expand Down
39 changes: 39 additions & 0 deletions apiserver/pkg/interceptor/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package interceptor

import (
"context"
"fmt"
"time"

"google.golang.org/grpc"
klog "k8s.io/klog/v2"
Expand All @@ -19,3 +21,40 @@ func APIServerInterceptor(ctx context.Context, req interface{}, info *grpc.Unary
klog.Infof("%v handler finished", info.FullMethod)
return
}

func TimeoutInterceptor(timeout time.Duration) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
_ *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
// Create a context with timeout
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

// Channel to capture execution result
done := make(chan struct{})
var (
resp interface{}
err error
)

go func() {
resp, err = handler(ctx, req)
close(done)
}()

select {
case <-ctx.Done():
// Raise error if time out
if ctx.Err() == context.DeadlineExceeded {
return nil, fmt.Errorf("grpc server timed out")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we name the grpc server with KubeRay API server ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! Just changed

}
return nil, ctx.Err()
case <-done:
// Handler finished
return resp, err
}
}
}
Loading