@@ -20,8 +20,10 @@ import (
2020 "context"
2121 "fmt"
2222 "os"
23+ "path/filepath"
2324 "strconv"
2425 "strings"
26+ "time"
2527
2628 openapi "github.com/alibabacloud-go/darabonba-openapi/v2/client"
2729 efloclient "github.com/alibabacloud-go/eflo-controller-20221215/v3/client"
@@ -36,13 +38,15 @@ import (
3638 "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/version"
3739 "google.golang.org/grpc/codes"
3840 "google.golang.org/grpc/status"
41+ "k8s.io/apimachinery/pkg/util/wait"
3942 "k8s.io/klog/v2"
4043)
4144
4245type controllerServer struct {
4346 common.GenericControllerServer
4447 vscManager * internal.PrimaryVscManagerWithCache
4548 attachDetacher internal.CPFSAttachDetacher
49+ filesetManager internal.CPFSFileSetManager
4650 nasClient * nasclient.Client
4751 skipDetach bool
4852}
@@ -62,18 +66,114 @@ func newControllerServer(region string) (*controllerServer, error) {
6266 if err != nil {
6367 return nil , err
6468 }
69+
6570 return & controllerServer {
6671 vscManager : internal .NewPrimaryVscManagerWithCache (efloClient ),
6772 attachDetacher : internal .NewCPFSAttachDetacher (nasClient ),
73+ filesetManager : internal .NewCPFSFileSetManager (nasClient ),
6874 nasClient : nasClient ,
6975 skipDetach : skipDetach ,
7076 }, nil
7177}
7278
79+ // CreateVolume ...
80+ func (cs * controllerServer ) CreateVolume (ctx context.Context , req * csi.CreateVolumeRequest ) (* csi.CreateVolumeResponse , error ) {
81+ logger := klog .FromContext (ctx )
82+ logger .V (2 ).Info ("starting" )
83+
84+ // Validate parameters
85+ if err := validateFileSetParameters (req ); err != nil {
86+ klog .Errorf ("CreateVolume: error parameters from input: %v, with error: %v" , req .Name , err )
87+ return nil , status .Errorf (codes .InvalidArgument , "Invalid parameters from input: %v, with error: %v" , req .Name , err )
88+ }
89+
90+ // Extract parameters
91+ params := req .GetParameters ()
92+ bmcpfsID := params ["bmcpfsId" ]
93+ if bmcpfsID == "" {
94+ return nil , status .Error (codes .InvalidArgument , "bmcpfsId parameter is required" )
95+ }
96+
97+ // Construct volume path
98+ volumeName := req .Name
99+ var fullPath string
100+ if rootPath , ok := params ["path" ]; ok && rootPath != "" {
101+ fullPath = filepath .Join (rootPath , volumeName )
102+ } else {
103+ fullPath = filepath .Join ("/" , volumeName )
104+ }
105+ fullPath = fullPath + "/"
106+ klog .InfoS ("CreateVolume: Constructing volume path" , "fullpath" , fullPath )
107+
108+ volSizeBytes := int64 (req .GetCapacityRange ().GetRequiredBytes ())
109+
110+ // Create fileset
111+ fileSetID , err := cs .filesetManager .CreateFileSet (ctx , bmcpfsID , req .Name , fullPath , 1000000 , volSizeBytes , false )
112+ if err != nil {
113+ return nil , status .Error (codes .Internal , err .Error ())
114+ }
115+
116+ err = wait .PollUntilContextTimeout (ctx , time .Second * 5 , FILESET_DESCRIBE_TIMEOUT , true , func (ctx context.Context ) (bool , error ) {
117+ var err error
118+ resp , err := cs .filesetManager .DescribeFileset (ctx , bmcpfsID , fileSetID )
119+ if err == nil {
120+ if resp .Status != nil && * resp .Status == "CREATED" {
121+ klog .InfoS ("CreateVolume: fileset created successfully" , "filesetID" , fileSetID , "filesystemID" , bmcpfsID )
122+ return true , nil
123+ }
124+ klog .InfoS ("CreateVolume: fileset status incorrected" , "filesetID" , fileSetID , "filesystemID" , bmcpfsID , "status" , * resp .Status )
125+ return false , nil
126+ }
127+ return false , err
128+ })
129+
130+ if err != nil {
131+ return nil , status .Errorf (codes .Internal , "CreateVolume: failed to describe fileset %s: %s" , fileSetID , err .Error ())
132+ }
133+
134+ // Prepare volume context
135+ volumeContext := req .GetParameters ()
136+ if volumeContext == nil {
137+ volumeContext = make (map [string ]string )
138+ }
139+ volumeContext = updateVolumeContext (volumeContext )
140+
141+ klog .Infof ("CreateVolume: Successfully created FileSet %s: id[%s], filesystem[%s], path[%s]" , req .GetName (), fileSetID , bmcpfsID , fullPath )
142+
143+ tmpVol := createVolumeResponse (fileSetID , bmcpfsID , volSizeBytes , volumeContext )
144+
145+ return & csi.CreateVolumeResponse {Volume : tmpVol }, nil
146+ }
147+
148+ func (cs * controllerServer ) DeleteVolume (ctx context.Context , req * csi.DeleteVolumeRequest ) (* csi.DeleteVolumeResponse , error ) {
149+ logger := klog .FromContext (ctx )
150+ logger .V (2 ).Info ("starting" )
151+
152+ // Parse volume ID to extract filesystem ID and fileset ID
153+ fsID , fileSetID , err := parseVolumeID (req .VolumeId )
154+ if err != nil {
155+ klog .Errorf ("DeleteVolume: failed to parse volume ID %s: %v" , req .VolumeId , err )
156+ return nil , status .Error (codes .InvalidArgument , err .Error ())
157+ }
158+
159+ klog .Infof ("DeleteVolume: deleting fileset %s from filesystem %s" , fileSetID , fsID )
160+
161+ // Delete the fileset
162+ err = cs .filesetManager .DeleteFileSet (ctx , fsID , fileSetID )
163+ if err != nil {
164+ klog .Errorf ("DeleteVolume: failed to delete fileset %s from filesystem %s: %v" , fileSetID , fsID , err )
165+ return nil , status .Error (codes .Internal , err .Error ())
166+ }
167+
168+ klog .Infof ("DeleteVolume: successfully deleted fileset %s from filesystem %s" , fileSetID , fsID )
169+ return & csi.DeleteVolumeResponse {}, nil
170+ }
171+
73172func (cs * controllerServer ) ControllerGetCapabilities (ctx context.Context , req * csi.ControllerGetCapabilitiesRequest ) (* csi.ControllerGetCapabilitiesResponse , error ) {
74173 return & csi.ControllerGetCapabilitiesResponse {
75174 Capabilities : common .ControllerRPCCapabilities (
76175 csi .ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME ,
176+ csi .ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME ,
77177 ),
78178 }, nil
79179}
@@ -94,7 +194,7 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
94194 }
95195
96196 cpfsID , _ := parseVolumeHandle (req .VolumeId )
97- // Get VscMountTarget of filesystem
197+
98198 mt := req .VolumeContext [_vscMountTarget ]
99199 if mt == "" {
100200 var err error
@@ -105,18 +205,18 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
105205 }
106206
107207 // Get Primary vsc of Lingjun node
108- lingjunInstanceId := strings .TrimPrefix (req .NodeId , LingjunNodeIDPrefix )
208+ lingjunInstanceID := strings .TrimPrefix (req .NodeId , LingjunNodeIDPrefix )
109209 if LingjunNodeIDPrefix == "" {
110210 return nil , status .Error (codes .InvalidArgument , "invalid node id" )
111211 }
112- vscId , err := cs .vscManager .EnsurePrimaryVsc (ctx , lingjunInstanceId , false )
212+ vscID , err := cs .vscManager .EnsurePrimaryVsc (ctx , lingjunInstanceID , false )
113213 if err != nil {
114214 return nil , status .Error (codes .Internal , err .Error ())
115215 }
116- klog .Info ("Use VSC MountTarget for lingjun node" , "nodeId" , req .NodeId , "vscId" , vscId )
216+ klog .Info ("Use VSC MountTarget for lingjun node" , "nodeId" , req .NodeId , "vscId" , vscID )
117217
118218 // Attach CPFS to VSC
119- err = cs .attachDetacher .Attach (ctx , cpfsID , vscId )
219+ err = cs .attachDetacher .Attach (ctx , cpfsID , vscID )
120220 if err != nil {
121221 if autoSwitch , _ := strconv .ParseBool (req .VolumeContext [_mpAutoSwitch ]); autoSwitch && internal .IsAttachNotSupportedError (err ) {
122222 if req .VolumeContext [_vpcMountTarget ] == "" {
@@ -135,35 +235,34 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
135235
136236 // TODO: if the cached vscid is already deleted, try to recreate a new primary vsc for lingjun node
137237
138- klog .InfoS ("ControllerPublishVolume: attached cpfs to vsc" , "vscMountTarget" , mt , "vscId" , vscId , "node" , req .NodeId )
238+ klog .InfoS ("ControllerPublishVolume: attached cpfs to vsc" , "vscMountTarget" , mt , "vscId" , vscID , "node" , req .NodeId )
139239 return & csi.ControllerPublishVolumeResponse {
140240 PublishContext : map [string ]string {
141241 _networkType : networkTypeVSC ,
142- _vscId : vscId ,
242+ _vscID : vscID ,
143243 _vscMountTarget : mt ,
144244 },
145245 }, nil
146246}
147247
148- func (cs * controllerServer ) ControllerUnpublishVolume (ctx context.Context , req * csi.ControllerUnpublishVolumeRequest ) (
149- * csi.ControllerUnpublishVolumeResponse , error ,
150- ) {
248+ func (cs * controllerServer ) ControllerUnpublishVolume (ctx context.Context , req * csi.ControllerUnpublishVolumeRequest ) (* csi.ControllerUnpublishVolumeResponse , error ) {
151249 if ! strings .HasPrefix (req .NodeId , LingjunNodeIDPrefix ) || cs .skipDetach {
152250 return & csi.ControllerUnpublishVolumeResponse {}, nil
153251 }
154252 // Create Primary vsc for Lingjun node
155- lingjunInstanceId := strings .TrimPrefix (req .NodeId , LingjunNodeIDPrefix )
253+ lingjunInstanceID := strings .TrimPrefix (req .NodeId , LingjunNodeIDPrefix )
156254 if LingjunNodeIDPrefix == "" {
157255 return nil , status .Error (codes .InvalidArgument , "invalid node id" )
158256 }
159- vsc , err := cs .vscManager .GetPrimaryVscOf (lingjunInstanceId )
257+ vsc , err := cs .vscManager .GetPrimaryVscOf (lingjunInstanceID )
160258 if err != nil {
161- return nil , status .Error (codes .Internal , err . Error () )
259+ return nil , status .Errorf (codes .Internal , "get vsc error: %v" , err )
162260 }
163261 if vsc == nil {
164262 klog .InfoS ("ControllerUnpublishVolume: skip detaching cpfs from vsc as vsc not found" , "node" , req .NodeId )
165263 return & csi.ControllerUnpublishVolumeResponse {}, nil
166264 }
265+
167266 // If `req.VolumeId` is a combination of `cpfsID` and `fsetID`, Detach will trigger an error.
168267 err = cs .attachDetacher .Detach (ctx , req .VolumeId , vsc .VscID )
169268 if err != nil {
@@ -173,6 +272,7 @@ func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *
173272 return & csi.ControllerUnpublishVolumeResponse {}, nil
174273}
175274
275+ // KubernetesAlicloudIdentity is the user agent string for Eflo client
176276var KubernetesAlicloudIdentity = fmt .Sprintf ("Kubernetes.Alicloud/CsiProvision.Bmcpfs-%s" , version .VERSION )
177277
178278const efloConnTimeout = 10
@@ -227,9 +327,9 @@ func parseVolumeHandle(volumeHandle string) (string, string) {
227327 return parts [0 ], ""
228328}
229329
230- func getMountTarget (client * nasclient.Client , fsId , networkType string ) (string , error ) {
330+ func getMountTarget (client * nasclient.Client , fsID , networkType string ) (string , error ) {
231331 resp , err := client .DescribeFileSystems (& nasclient.DescribeFileSystemsRequest {
232- FileSystemId : & fsId ,
332+ FileSystemId : & fsID ,
233333 })
234334 if err != nil {
235335 return "" , fmt .Errorf ("nas:DescribeFileSystems failed: %w" , err )
@@ -252,7 +352,7 @@ func getMountTarget(client *nasclient.Client, fsId, networkType string) (string,
252352 if t == networkType {
253353 mountTarget := tea .StringValue (mt .MountTargetDomain )
254354 status := tea .StringValue (mt .Status )
255- klog .V (2 ).InfoS ("Found cpfs mount target" , "filesystem" , fsId , "networkType" , networkType , "mountTarget" , mountTarget , "status" , status )
355+ klog .V (2 ).InfoS ("Found cpfs mount target" , "filesystem" , fsID , "networkType" , networkType , "mountTarget" , mountTarget , "status" , status )
256356 if status == "Active" {
257357 return mountTarget , nil
258358 }
0 commit comments