@@ -18,10 +18,13 @@ package bmcpfs
1818
1919import (
2020 "context"
21+ "errors"
2122 "fmt"
2223 "os"
24+ "path/filepath"
2325 "strconv"
2426 "strings"
27+ "time"
2528
2629 openapi "github.com/alibabacloud-go/darabonba-openapi/v2/client"
2730 efloclient "github.com/alibabacloud-go/eflo-controller-20221215/v3/client"
@@ -33,16 +36,21 @@ import (
3336 "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/common"
3437 "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/credentials"
3538 "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/nas/cloud"
39+ "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils"
3640 "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/version"
41+
3742 "google.golang.org/grpc/codes"
3843 "google.golang.org/grpc/status"
44+ "k8s.io/apimachinery/pkg/util/wait"
3945 "k8s.io/klog/v2"
4046)
4147
4248type controllerServer struct {
4349 common.GenericControllerServer
4450 vscManager * internal.PrimaryVscManagerWithCache
4551 attachDetacher internal.CPFSAttachDetacher
52+ filesetManager internal.CPFSFileSetManager
53+ locks * utils.VolumeLocks
4654 nasClient * nasclient.Client
4755 skipDetach bool
4856}
@@ -62,18 +70,132 @@ func newControllerServer(region string) (*controllerServer, error) {
6270 if err != nil {
6371 return nil , err
6472 }
73+
6574 return & controllerServer {
6675 vscManager : internal .NewPrimaryVscManagerWithCache (efloClient ),
6776 attachDetacher : internal .NewCPFSAttachDetacher (nasClient ),
77+ filesetManager : internal .NewCPFSFileSetManager (nasClient ),
78+ locks : utils .NewVolumeLocks (),
6879 nasClient : nasClient ,
6980 skipDetach : skipDetach ,
7081 }, nil
7182}
7283
84+ // CreateVolume ...
85+ func (cs * controllerServer ) CreateVolume (ctx context.Context , req * csi.CreateVolumeRequest ) (* csi.CreateVolumeResponse , error ) {
86+ logger := klog .FromContext (ctx )
87+
88+ if ! cs .locks .TryAcquire (req .Name ) {
89+ return nil , status .Errorf (codes .Aborted , "There is already an operation for volume %s" , req .Name )
90+ }
91+ defer cs .locks .Release (req .Name )
92+
93+ // Validate parameters
94+ if err := validateFileSetParameters (req ); err != nil {
95+ logger .Error (err , "failed to validate parameters" , "volumeName" , req .Name )
96+ return nil , status .Errorf (codes .InvalidArgument , "Invalid parameters from input: %v, with error: %v" , req .Name , err )
97+ }
98+
99+ // Extract parameters
100+ params := req .GetParameters ()
101+ bmcpfsID := params ["bmcpfsId" ]
102+ if bmcpfsID == "" {
103+ return nil , status .Error (codes .InvalidArgument , "bmcpfsId parameter is required" )
104+ }
105+
106+ // Construct volume path
107+ volumeName := req .Name
108+ var fullPath string
109+ if rootPath , ok := params ["path" ]; ok && rootPath != "" {
110+ fullPath = filepath .Join (rootPath , volumeName , "/" )
111+ } else {
112+ fullPath = filepath .Join ("/" , volumeName , "/" )
113+ }
114+ klog .InfoS ("CreateVolume: Constructing volume path" , "fullpath" , fullPath )
115+
116+ volSizeBytes := int64 (req .GetCapacityRange ().GetRequiredBytes ())
117+
118+ // Prepare volume context
119+ volumeContext := req .GetParameters ()
120+ if volumeContext == nil {
121+ volumeContext = make (map [string ]string )
122+ }
123+ volumeContext = updateVolumeContext (volumeContext )
124+
125+ // Check if fileset already exists as nas doesn't provide idempotent create fileset API
126+ fileset , err := cs .filesetManager .DescribeFilesetByFilePath (ctx , bmcpfsID , fullPath )
127+ if err == nil && fileset != nil {
128+ tmpVol := createVolumeResponse (* fileset .FsetId , bmcpfsID , volSizeBytes , volumeContext )
129+ return & csi.CreateVolumeResponse {Volume : tmpVol }, nil
130+ }
131+
132+ // If the error indicates the fileset was not found, continue to create it
133+ // Otherwise, return the error
134+ if err != nil && ! errors .Is (err , internal .ErrFilesetNotFound ) {
135+ klog .ErrorS (err , "CreateVolume: failed to describe fileset" , "filesystemID" , bmcpfsID , "filesetPath" , fullPath )
136+ return nil , status .Error (codes .Internal , err .Error ())
137+ }
138+
139+ // Create fileset
140+ fileSetID , err := cs .filesetManager .CreateFileSet (ctx , bmcpfsID , req .Name , fullPath , 1000000 , volSizeBytes , false )
141+ if err != nil {
142+ return nil , status .Error (codes .Internal , err .Error ())
143+ }
144+
145+ err = wait .PollUntilContextCancel (ctx , time .Second * 2 , true , func (ctx context.Context ) (bool , error ) {
146+ var err error
147+ resp , err := cs .filesetManager .DescribeFilesetByFsetID (ctx , bmcpfsID , fileSetID )
148+ if err == nil {
149+ if resp .Status != nil && * resp .Status == "CREATED" {
150+ klog .InfoS ("CreateVolume: fileset created successfully" , "filesetID" , fileSetID , "filesystemID" , bmcpfsID )
151+ return true , nil
152+ }
153+ klog .InfoS ("CreateVolume: fileset status incorrected" , "filesetID" , fileSetID , "filesystemID" , bmcpfsID , "status" , * resp .Status )
154+ return false , nil
155+ }
156+ return false , err
157+ })
158+
159+ if err != nil {
160+ return nil , status .Errorf (codes .Internal , "CreateVolume: failed to describe fileset %s: %s" , fileSetID , err .Error ())
161+ }
162+
163+ klog .Infof ("CreateVolume: Successfully created FileSet %s: id[%s], filesystem[%s], path[%s]" , req .GetName (), fileSetID , bmcpfsID , fullPath )
164+
165+ tmpVol := createVolumeResponse (fileSetID , bmcpfsID , volSizeBytes , volumeContext )
166+
167+ return & csi.CreateVolumeResponse {Volume : tmpVol }, nil
168+ }
169+
170+ func (cs * controllerServer ) DeleteVolume (ctx context.Context , req * csi.DeleteVolumeRequest ) (* csi.DeleteVolumeResponse , error ) {
171+ logger := klog .FromContext (ctx )
172+ logger .V (2 ).Info ("starting" )
173+
174+ // Parse volume ID to extract filesystem ID and fileset ID
175+ fsID , fileSetID , err := parseVolumeID (req .VolumeId )
176+ if err != nil {
177+ klog .Errorf ("DeleteVolume: failed to parse volume ID %s: %v" , req .VolumeId , err )
178+ return nil , status .Error (codes .InvalidArgument , err .Error ())
179+ }
180+
181+ klog .Infof ("DeleteVolume: deleting fileset %s from filesystem %s" , fileSetID , fsID )
182+
183+ // Delete the fileset
184+ err = cs .filesetManager .DeleteFileSet (ctx , fsID , fileSetID )
185+ if err != nil {
186+ klog .Errorf ("DeleteVolume: failed to delete fileset %s from filesystem %s: %v" , fileSetID , fsID , err )
187+ return nil , status .Error (codes .Internal , err .Error ())
188+ }
189+
190+ klog .Infof ("DeleteVolume: successfully deleted fileset %s from filesystem %s" , fileSetID , fsID )
191+ return & csi.DeleteVolumeResponse {}, nil
192+ }
193+
73194func (cs * controllerServer ) ControllerGetCapabilities (ctx context.Context , req * csi.ControllerGetCapabilitiesRequest ) (* csi.ControllerGetCapabilitiesResponse , error ) {
74195 return & csi.ControllerGetCapabilitiesResponse {
75196 Capabilities : common .ControllerRPCCapabilities (
76197 csi .ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME ,
198+ csi .ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME ,
77199 ),
78200 }, nil
79201}
@@ -94,7 +216,7 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
94216 }
95217
96218 cpfsID , _ := parseVolumeHandle (req .VolumeId )
97- // Get VscMountTarget of filesystem
219+
98220 mt := req .VolumeContext [_vscMountTarget ]
99221 if mt == "" {
100222 var err error
@@ -105,18 +227,18 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
105227 }
106228
107229 // Get Primary vsc of Lingjun node
108- lingjunInstanceId := strings .TrimPrefix (req .NodeId , LingjunNodeIDPrefix )
230+ lingjunInstanceID := strings .TrimPrefix (req .NodeId , LingjunNodeIDPrefix )
109231 if LingjunNodeIDPrefix == "" {
110232 return nil , status .Error (codes .InvalidArgument , "invalid node id" )
111233 }
112- vscId , err := cs .vscManager .EnsurePrimaryVsc (ctx , lingjunInstanceId , false )
234+ vscID , err := cs .vscManager .EnsurePrimaryVsc (ctx , lingjunInstanceID , false )
113235 if err != nil {
114236 return nil , status .Error (codes .Internal , err .Error ())
115237 }
116- klog .Info ("Use VSC MountTarget for lingjun node" , "nodeId" , req .NodeId , "vscId" , vscId )
238+ klog .Info ("Use VSC MountTarget for lingjun node" , "nodeId" , req .NodeId , "vscId" , vscID )
117239
118240 // Attach CPFS to VSC
119- err = cs .attachDetacher .Attach (ctx , cpfsID , vscId )
241+ err = cs .attachDetacher .Attach (ctx , cpfsID , vscID )
120242 if err != nil {
121243 if autoSwitch , _ := strconv .ParseBool (req .VolumeContext [_mpAutoSwitch ]); autoSwitch && internal .IsAttachNotSupportedError (err ) {
122244 if req .VolumeContext [_vpcMountTarget ] == "" {
@@ -135,35 +257,34 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
135257
136258 // TODO: if the cached vscid is already deleted, try to recreate a new primary vsc for lingjun node
137259
138- klog .InfoS ("ControllerPublishVolume: attached cpfs to vsc" , "vscMountTarget" , mt , "vscId" , vscId , "node" , req .NodeId )
260+ klog .InfoS ("ControllerPublishVolume: attached cpfs to vsc" , "vscMountTarget" , mt , "vscId" , vscID , "node" , req .NodeId )
139261 return & csi.ControllerPublishVolumeResponse {
140262 PublishContext : map [string ]string {
141263 _networkType : networkTypeVSC ,
142- _vscId : vscId ,
264+ _vscID : vscID ,
143265 _vscMountTarget : mt ,
144266 },
145267 }, nil
146268}
147269
148- func (cs * controllerServer ) ControllerUnpublishVolume (ctx context.Context , req * csi.ControllerUnpublishVolumeRequest ) (
149- * csi.ControllerUnpublishVolumeResponse , error ,
150- ) {
270+ func (cs * controllerServer ) ControllerUnpublishVolume (ctx context.Context , req * csi.ControllerUnpublishVolumeRequest ) (* csi.ControllerUnpublishVolumeResponse , error ) {
151271 if ! strings .HasPrefix (req .NodeId , LingjunNodeIDPrefix ) || cs .skipDetach {
152272 return & csi.ControllerUnpublishVolumeResponse {}, nil
153273 }
154274 // Create Primary vsc for Lingjun node
155- lingjunInstanceId := strings .TrimPrefix (req .NodeId , LingjunNodeIDPrefix )
275+ lingjunInstanceID := strings .TrimPrefix (req .NodeId , LingjunNodeIDPrefix )
156276 if LingjunNodeIDPrefix == "" {
157277 return nil , status .Error (codes .InvalidArgument , "invalid node id" )
158278 }
159- vsc , err := cs .vscManager .GetPrimaryVscOf (lingjunInstanceId )
279+ vsc , err := cs .vscManager .GetPrimaryVscOf (lingjunInstanceID )
160280 if err != nil {
161- return nil , status .Error (codes .Internal , err . Error () )
281+ return nil , status .Errorf (codes .Internal , "get vsc error: %v" , err )
162282 }
163283 if vsc == nil {
164284 klog .InfoS ("ControllerUnpublishVolume: skip detaching cpfs from vsc as vsc not found" , "node" , req .NodeId )
165285 return & csi.ControllerUnpublishVolumeResponse {}, nil
166286 }
287+
167288 // If `req.VolumeId` is a combination of `cpfsID` and `fsetID`, Detach will trigger an error.
168289 err = cs .attachDetacher .Detach (ctx , req .VolumeId , vsc .VscID )
169290 if err != nil {
@@ -173,6 +294,7 @@ func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *
173294 return & csi.ControllerUnpublishVolumeResponse {}, nil
174295}
175296
297+ // KubernetesAlicloudIdentity is the user agent string for Eflo client
176298var KubernetesAlicloudIdentity = fmt .Sprintf ("Kubernetes.Alicloud/CsiProvision.Bmcpfs-%s" , version .VERSION )
177299
178300const efloConnTimeout = 10
@@ -227,9 +349,9 @@ func parseVolumeHandle(volumeHandle string) (string, string) {
227349 return parts [0 ], ""
228350}
229351
230- func getMountTarget (client * nasclient.Client , fsId , networkType string ) (string , error ) {
352+ func getMountTarget (client * nasclient.Client , fsID , networkType string ) (string , error ) {
231353 resp , err := client .DescribeFileSystems (& nasclient.DescribeFileSystemsRequest {
232- FileSystemId : & fsId ,
354+ FileSystemId : & fsID ,
233355 })
234356 if err != nil {
235357 return "" , fmt .Errorf ("nas:DescribeFileSystems failed: %w" , err )
@@ -252,7 +374,7 @@ func getMountTarget(client *nasclient.Client, fsId, networkType string) (string,
252374 if t == networkType {
253375 mountTarget := tea .StringValue (mt .MountTargetDomain )
254376 status := tea .StringValue (mt .Status )
255- klog .V (2 ).InfoS ("Found cpfs mount target" , "filesystem" , fsId , "networkType" , networkType , "mountTarget" , mountTarget , "status" , status )
377+ klog .V (2 ).InfoS ("Found cpfs mount target" , "filesystem" , fsID , "networkType" , networkType , "mountTarget" , mountTarget , "status" , status )
256378 if status == "Active" {
257379 return mountTarget , nil
258380 }
0 commit comments