@@ -30,7 +30,10 @@ import (
3030 "github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests"
3131 "github.com/aliyun/alibaba-cloud-sdk-go/services/ecs"
3232 "github.com/container-storage-interface/spec/lib/go/csi"
33+ "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud/wrap"
3334 "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/common"
35+ "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/desc"
36+ "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/waitstatus"
3437 "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/features"
3538 "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils"
3639 "google.golang.org/grpc/codes"
@@ -42,11 +45,13 @@ import (
4245 "k8s.io/apimachinery/pkg/util/sets"
4346 "k8s.io/client-go/tools/record"
4447 "k8s.io/klog/v2"
48+ "k8s.io/utils/clock"
4549)
4650
4751// controller server try to create/delete volumes/snapshots
4852type controllerServer struct {
4953 recorder record.EventRecorder
54+ modify ModifyServer
5055 common.GenericControllerServer
5156}
5257
@@ -80,10 +85,24 @@ var veasp = struct {
8085
8186var delVolumeSnap sync.Map
8287
88+ func newTaskStatusWaiter () waitstatus.StatusWaiter [ecs.Task ] {
89+ client := desc.Task {Client : GlobalConfigVar .EcsClient }
90+ waiter := waitstatus .NewBatched (client , clock.RealClock {}, 3 * time .Second , 10 * time .Second )
91+ waiter .PollHook = func () desc.Client [ecs.Task ] {
92+ return desc.Task {Client : updateEcsClient (GlobalConfigVar .EcsClient )}
93+ }
94+ go waiter .Run (context .Background ())
95+ return waiter
96+ }
97+
8398// NewControllerServer is to create controller server
8499func NewControllerServer () csi.ControllerServer {
85100 c := & controllerServer {
86101 recorder : utils .NewEventRecorder (),
102+ modify : ModifyServer {
103+ ecsClient : GlobalConfigVar .EcsClient ,
104+ taskWaiter : newTaskStatusWaiter (),
105+ },
87106 }
88107 return c
89108}
@@ -96,6 +115,7 @@ func (cs *controllerServer) ControllerGetCapabilities(ctx context.Context, req *
96115 csi .ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT ,
97116 csi .ControllerServiceCapability_RPC_LIST_SNAPSHOTS ,
98117 csi .ControllerServiceCapability_RPC_EXPAND_VOLUME ,
118+ csi .ControllerServiceCapability_RPC_MODIFY_VOLUME ,
99119 ),
100120 }, nil
101121}
@@ -136,6 +156,14 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
136156 return nil , status .Errorf (codes .InvalidArgument , "Invalid parameters from input: %v, with error: %v" , req .Name , err )
137157 }
138158
159+ if len (req .MutableParameters ) > 0 {
160+ mutable , err := parseMutableParameters (req .MutableParameters )
161+ if err != nil {
162+ return nil , status .Errorf (codes .InvalidArgument , "Invalid mutable parameters: %v" , err )
163+ }
164+ importMutableParameters (diskVol , & mutable )
165+ }
166+
139167 // 兼容 serverless 拓扑感知场景;
140168 // req参数里面包含了云盘ID,则直接使用云盘ID进行返回;
141169 csiVolume , err := staticVolumeCreate (req , snapshotID )
@@ -970,3 +998,18 @@ func (cs *controllerServer) deleteUntagAutoSnapshot(snapshotID, diskID string) {
970998 klog .Errorf ("ControllerExpandVolume:: failed to untag volumeExpandAutoSnapshot: %s" , err .Error ())
971999 }
9721000}
1001+
1002+ func (cs * controllerServer ) ControllerModifyVolume (ctx context.Context , req * csi.ControllerModifyVolumeRequest ) (* csi.ControllerModifyVolumeResponse , error ) {
1003+ params , err := parseMutableParameters (req .MutableParameters )
1004+ if err != nil {
1005+ return nil , status .Error (codes .InvalidArgument , err .Error ())
1006+ }
1007+ err = cs .modify .Modify (ctx , req .VolumeId , params )
1008+ if err != nil {
1009+ if errors .Is (err , wrap .ErrorCode ("InvalidDiskId.NotFound" )) {
1010+ return nil , status .Error (codes .NotFound , err .Error ())
1011+ }
1012+ return nil , err
1013+ }
1014+ return & csi.ControllerModifyVolumeResponse {}, nil
1015+ }
0 commit comments