Skip to content

Commit 5e9e77e

Browse files
committed
nvmeof: add nvmeof ability
create prototype of nvmeof csi-driver. create\delete volume were implemented Signed-off-by: gadi-didi <gadi.didi@ibm.com>
1 parent d97f3cf commit 5e9e77e

File tree

6 files changed

+9094
-0
lines changed

6 files changed

+9094
-0
lines changed
Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
package controller
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strconv"
7+
8+
"github.com/container-storage-interface/spec/lib/go/csi"
9+
"google.golang.org/grpc/codes"
10+
"google.golang.org/grpc/status"
11+
12+
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
13+
"github.com/ceph/ceph-csi/internal/nvmeof"
14+
"github.com/ceph/ceph-csi/internal/rbd"
15+
rbdutil "github.com/ceph/ceph-csi/internal/rbd"
16+
rbddriver "github.com/ceph/ceph-csi/internal/rbd/driver"
17+
"github.com/ceph/ceph-csi/internal/util/log"
18+
)
19+
20+
type Server struct {
21+
csi.UnimplementedControllerServer
22+
23+
// backendServer handles the RBD requests
24+
backendServer *rbd.ControllerServer
25+
26+
// gateway handles NVMe-oF operations
27+
gateway *nvmeof.GatewayRpcClient
28+
}
29+
30+
// NewControllerServer initialize a controller server for nvmeof CSI driver.
31+
func NewControllerServer(d *csicommon.CSIDriver, gatewayConfig *nvmeof.GatewayConfig) (*Server, error) {
32+
// Initialize gateway client
33+
gw, err := nvmeof.NewGatewayRpcClient(gatewayConfig)
34+
if err != nil {
35+
return nil, status.Errorf(codes.Unavailable, err.Error(), "failed to create NVMe-oF Gateway client: %v", err)
36+
}
37+
38+
return &Server{
39+
backendServer: rbddriver.NewControllerServer(d),
40+
gateway: gw,
41+
}, nil
42+
}
43+
44+
// ControllerGetCapabilities uses the RBD backendServer to return the
45+
// capabilities that were set in the Driver.Run() function.
46+
func (cs *Server) ControllerGetCapabilities(
47+
ctx context.Context,
48+
req *csi.ControllerGetCapabilitiesRequest,
49+
) (*csi.ControllerGetCapabilitiesResponse, error) {
50+
return cs.backendServer.ControllerGetCapabilities(ctx, req)
51+
}
52+
53+
// ValidateVolumeCapabilities checks whether the volume capabilities requested
54+
// are supported.
55+
func (cs *Server) ValidateVolumeCapabilities(
56+
ctx context.Context,
57+
req *csi.ValidateVolumeCapabilitiesRequest,
58+
) (*csi.ValidateVolumeCapabilitiesResponse, error) {
59+
return cs.backendServer.ValidateVolumeCapabilities(ctx, req)
60+
}
61+
62+
// CreateVolume creates a new RBD volume and exposes it through NVMe-oF Gateway
63+
func (cs *Server) CreateVolume(
64+
ctx context.Context,
65+
req *csi.CreateVolumeRequest,
66+
) (*csi.CreateVolumeResponse, error) {
67+
68+
// TODO - maybe need to check and modify the request to ensure it has the required fields NvmeOF supports (like nfs does)
69+
// TODO - in case of failure, we should clean up the created resources (RBD volume, subsystem, etc.)
70+
// TODO - move all hardcoded strings to constants
71+
72+
// Step 1: Create RBD volume through backend. if exists, it is ok.
73+
res, err := cs.backendServer.CreateVolume(ctx, req)
74+
if err != nil {
75+
log.ErrorLog(ctx, "failed to create RBD volume: %v", err)
76+
return nil, err
77+
}
78+
79+
backend := res.GetVolume()
80+
volumeID := backend.GetVolumeId()
81+
rbdImageName := res.GetVolume().GetVolumeContext()["imageName"]
82+
rbdPoolName := res.GetVolume().GetVolumeContext()["pool"]
83+
log.DebugLog(ctx, "RBD volume created: rbdPoolName=%s rbdImageName=%s ", rbdPoolName, rbdImageName)
84+
85+
// Step 2: check if subsystem exists, if not create it
86+
subsystemNQN := req.GetParameters()["subsystemNQN"]
87+
if subsystemNQN == "" {
88+
return nil, status.Errorf(codes.NotFound, "subsystemNQN parameter is required")
89+
}
90+
if err := cs.ensureSubsystem(ctx, subsystemNQN); err != nil {
91+
log.ErrorLog(ctx, "failed to ensure (OR create) subsystem %s exists: %v", subsystemNQN, err)
92+
return nil, err
93+
}
94+
// Step 3: Create namespace in the NVMe-oF subsystem
95+
nsid, err := cs.gateway.CreateNamespace(ctx, subsystemNQN, rbdPoolName, rbdImageName)
96+
if err != nil {
97+
log.ErrorLog(ctx, "failed to create namespace for RBD %s/%s in subsystem %s: %v",
98+
rbdPoolName, rbdImageName, subsystemNQN, err)
99+
return nil, err
100+
}
101+
102+
log.DebugLog(ctx, "NVMe-oF namespace created successfully: %s/%s with NSID: %d",
103+
rbdPoolName, rbdImageName, nsid)
104+
105+
// Step 4: store the namespace ID and subsystem nqn in the volume context for ControllerPublishVolume
106+
backend.VolumeContext["nvmeof.SubsystemNQN"] = subsystemNQN
107+
backend.VolumeContext["nvmeof.NamespaceID"] = fmt.Sprintf("%d", nsid)
108+
109+
// step 5: Store the subsystem NQN and namespace ID in the RBD volume metadata
110+
mgr := rbdutil.NewManager(cs.backendServer.Driver.GetInstanceID(), nil, req.GetSecrets())
111+
defer mgr.Destroy(ctx)
112+
113+
rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
114+
if err != nil {
115+
return nil, status.Errorf(codes.Aborted, "failed to find volume with ID %q: %s", volumeID, err.Error())
116+
}
117+
defer rbdVol.Destroy(ctx)
118+
119+
err = rbdVol.SetMetadata("nvmeof.SubsystemNQN", subsystemNQN)
120+
if err != nil {
121+
log.ErrorLog(ctx, "failed to store subsystem metadata: %v", err)
122+
return nil, status.Errorf(codes.Internal, "failed to store subsystem metadata: %v", err)
123+
}
124+
125+
err = rbdVol.SetMetadata("nvmeof.NamespaceID", fmt.Sprintf("%d", nsid))
126+
if err != nil {
127+
log.ErrorLog(ctx, "failed to store NSID metadata: %v", err)
128+
return nil, status.Errorf(codes.Internal, "failed to store NSID metadata: %v", err)
129+
}
130+
return &csi.CreateVolumeResponse{Volume: backend}, nil
131+
}
132+
133+
func (cs *Server) DeleteVolume(
134+
ctx context.Context,
135+
req *csi.DeleteVolumeRequest,
136+
) (*csi.DeleteVolumeResponse, error) {
137+
138+
log.DebugLog(ctx, "NVMe-oF DeleteVolume: %s", req.GetVolumeId())
139+
140+
volumeID := req.GetVolumeId()
141+
if volumeID == "" {
142+
return nil, fmt.Errorf("volume ID is required")
143+
}
144+
// Step 1: Get the RBD volume by ID
145+
mgr := rbdutil.NewManager(cs.backendServer.Driver.GetInstanceID(), nil, req.GetSecrets())
146+
defer mgr.Destroy(ctx)
147+
rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
148+
if err != nil {
149+
return nil, status.Errorf(codes.Aborted, "failed to find volume with ID %q: %s", volumeID, err.Error())
150+
}
151+
defer rbdVol.Destroy(ctx)
152+
153+
// Retrieve subsystem NQN and namespace ID from volume context
154+
subsystemNQN, err := rbdVol.GetMetadata("nvmeof.SubsystemNQN")
155+
156+
if err != nil {
157+
log.DebugLog(ctx, "No subsystem metadata found: %v", err)
158+
subsystemNQN = ""
159+
}
160+
nsidStr, err := rbdVol.GetMetadata("nvmeof.NamespaceID")
161+
if err != nil {
162+
log.DebugLog(ctx, "No NSID metadata found: %v", err)
163+
nsidStr = ""
164+
}
165+
166+
// step 1: Delete the namespace from the NVMe-oF subsystem first
167+
if subsystemNQN != "" && nsidStr != "" {
168+
nsid, err := strconv.ParseUint(nsidStr, 10, 32)
169+
if err != nil {
170+
log.ErrorLog(ctx, "failed to parse namespace ID %s: %v", nsidStr, err)
171+
return nil, status.Error(codes.InvalidArgument, err.Error())
172+
}
173+
err = cs.gateway.DeleteNamespace(ctx, subsystemNQN, uint32(nsid))
174+
if err != nil {
175+
log.ErrorLog(ctx, "failed to delete namespace %d from subsystem %s: %v", nsid, subsystemNQN, err)
176+
return nil, status.Errorf(codes.Internal, "failed to delete namespace %d from subsystem %s: %v", nsid, subsystemNQN, err)
177+
}
178+
log.DebugLog(ctx, "Namespace %d deleted from subsystem %s successfully", nsid, subsystemNQN)
179+
} else {
180+
log.WarningLogMsg("No subsystem NQN or namespace ID found in volume context, skipping namespace deletion")
181+
}
182+
183+
// step 2: Delete the subsystem if it is empty (for now is one-to-one mapping?)
184+
err = cs.cleanupEmptySubsystem(ctx, subsystemNQN)
185+
if err != nil {
186+
return nil, status.Errorf(codes.Internal, "failed to delete subsystem %s: %v", subsystemNQN, err)
187+
}
188+
189+
// step 3: Delete the RBD volume through backend
190+
res, err := cs.backendServer.DeleteVolume(ctx, req)
191+
if err != nil {
192+
log.ErrorLog(ctx, "failed to delete RBD volume %s: %v", volumeID, err)
193+
return nil, status.Error(codes.NotFound, err.Error())
194+
}
195+
log.DebugLog(ctx, "RBD volume %s deleted successfully", volumeID)
196+
197+
return res, nil
198+
}
199+
200+
func (cs *Server) ControllerPublishVolume(
201+
ctx context.Context,
202+
req *csi.ControllerPublishVolumeRequest,
203+
) (*csi.ControllerPublishVolumeResponse, error) {
204+
log.DebugLog(ctx, "NVMe-oF ControllerPublishVolume: %s", req.GetVolumeId())
205+
206+
// step 1: Validate the request parameters
207+
208+
// step 2: add host to the NVMe-oF subsystem OR maybe add ns host
209+
// (depends on the design how many subsystems we have one-to-one or one-to-many)
210+
211+
// step 3: add listener\s to the NVMe-oF subsystem
212+
213+
return &csi.ControllerPublishVolumeResponse{}, nil
214+
}
215+
216+
func (cs *Server) ControllerUnpublishVolume(
217+
ctx context.Context,
218+
req *csi.ControllerUnpublishVolumeRequest,
219+
) (*csi.ControllerUnpublishVolumeResponse, error) {
220+
log.DebugLog(ctx, "NVMe-oF ControllerUnpublishVolume: %s", req.GetVolumeId())
221+
222+
// step 1: remove host from the NVMe-oF subsystem\ns (depends on the design)
223+
224+
// step 2: remove listener\s from the NVMe-oF subsystem
225+
226+
return &csi.ControllerUnpublishVolumeResponse{}, nil
227+
}
228+
229+
// ensureSubsystem checks if the subsystem exists, and creates it if not.
230+
func (cs *Server) ensureSubsystem(ctx context.Context, subsystemNQN string) error {
231+
exists, err := cs.gateway.SubsystemExists(ctx, subsystemNQN)
232+
if err != nil {
233+
return err
234+
}
235+
if exists {
236+
return nil
237+
}
238+
// Create if doesn't exist (controller decision)
239+
return cs.gateway.CreateSubsystem(ctx, subsystemNQN)
240+
}
241+
242+
// cleanupEmptySubsystem checks if the subsystem is empty (no namespaces), if so, deletes it.
243+
func (cs *Server) cleanupEmptySubsystem(ctx context.Context, subsystemNQN string) error {
244+
if subsystemNQN == "" {
245+
return nil
246+
}
247+
248+
// Check if subsystem has any remaining namespaces
249+
// (You'd need to implement ListNamespaces in your gateway client)
250+
namespaces, err := cs.gateway.ListNamespaces(ctx, subsystemNQN)
251+
if err != nil {
252+
return fmt.Errorf("failed to list namespaces: %w", err)
253+
}
254+
255+
if len(namespaces.Namespaces) == 0 {
256+
log.DebugLog(ctx, "Subsystem %s is empty, deleting", subsystemNQN)
257+
err = cs.gateway.DeleteSubsystem(ctx, subsystemNQN)
258+
if err != nil {
259+
return fmt.Errorf("failed to delete empty subsystem: %w", err)
260+
}
261+
log.DebugLog(ctx, "Empty subsystem %s deleted", subsystemNQN)
262+
} else {
263+
log.DebugLog(ctx, "Subsystem %s still has %d namespaces, keeping",
264+
subsystemNQN, len(namespaces.Namespaces))
265+
}
266+
267+
return nil
268+
}

0 commit comments

Comments
 (0)