Skip to content

Commit

Permalink
NBSNEBIUS-85: support for attaching nbs disk to container as nbd devi…
Browse files Browse the repository at this point in the history
…ce (#809)
  • Loading branch information
sanek325 authored Mar 25, 2024
1 parent d9ca877 commit 2ce3367
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,6 @@ func (c *nbsServerControllerService) ControllerGetCapabilities(
req *csi.ControllerGetCapabilitiesRequest,
) (*csi.ControllerGetCapabilitiesResponse, error) {

log.Printf("csi.ControllerGetCapabilitiesRequest: %+v", req)

return &csi.ControllerGetCapabilitiesResponse{
Capabilities: nbsServerControllerServiceCapabilities,
}, nil
Expand Down
91 changes: 65 additions & 26 deletions cloud/blockstore/tools/csi_driver/internal/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,18 @@ func (s *nodeService) NodeStageVolume(
"VolumeCapability missing im NodeStageVolumeRequest")
}

return &csi.NodeStageVolumeResponse{}, nil
}

func (s *nodeService) startEndpoint(
ctx context.Context,
volumeId string,
volumeCapability *csi.VolumeCapability,
volumeContext map[string]string) (*nbsapi.TStartEndpointResponse, error) {

var ipcType nbsapi.EClientIpcType
switch req.VolumeCapability.GetAccessType().(type) {

switch volumeCapability.GetAccessType().(type) {
case *csi.VolumeCapability_Block:
ipcType = nbsapi.EClientIpcType_IPC_NBD
case *csi.VolumeCapability_Mount:
Expand All @@ -92,7 +102,7 @@ func (s *nodeService) NodeStageVolume(
return nil, status.Error(codes.InvalidArgument, "Unknown access type")
}

endpointDir := filepath.Join(s.podSocketsDir, req.VolumeId)
endpointDir := filepath.Join(s.podSocketsDir, volumeId)
if err := os.MkdirAll(endpointDir, 0755); err != nil {
return nil, err
}
Expand All @@ -101,29 +111,32 @@ func (s *nodeService) NodeStageVolume(
return nil, err
}

if req.VolumeContext["backend"] == "nfs" {
if volumeContext != nil && volumeContext["backend"] == "nfs" {
// TODO: start nfs endpoint using nfsClient
return nil, status.Error(codes.Unimplemented, "nfs backend is unimplemented yet")
}

hostType := nbsapi.EHostType_HOST_TYPE_DEFAULT
socketPath := filepath.Join(s.nbsSocketsDir, req.VolumeId, socketName)
socketPath := filepath.Join(s.nbsSocketsDir, volumeId, socketName)
startEndpointRequest := &nbsapi.TStartEndpointRequest{
UnixSocketPath: socketPath,
DiskId: req.VolumeId,
DiskId: volumeId,
ClientId: s.clientID,
DeviceName: req.VolumeId,
DeviceName: volumeId,
IpcType: ipcType,
VhostQueuesCount: 8,
VolumeAccessMode: nbsapi.EVolumeAccessMode_VOLUME_ACCESS_READ_WRITE,
VolumeMountMode: nbsapi.EVolumeMountMode_VOLUME_MOUNT_REMOTE,
Persistent: true,
NbdDevice: &nbsapi.TStartEndpointRequest_UseFreeNbdDeviceFile{
ipcType == nbsapi.EClientIpcType_IPC_NBD,
},
ClientProfile: &nbsapi.TClientProfile{
HostType: &hostType,
},
}

_, err := s.nbsClient.StartEndpoint(ctx, startEndpointRequest)
resp, err := s.nbsClient.StartEndpoint(ctx, startEndpointRequest)
if err != nil {
return nil, status.Errorf(
codes.Internal,
Expand All @@ -146,7 +159,7 @@ func (s *nodeService) NodeStageVolume(
}
file.Close()

return &csi.NodeStageVolumeResponse{}, nil
return resp, nil
}

func (s *nodeService) NodeUnstageVolume(
Expand All @@ -166,24 +179,31 @@ func (s *nodeService) NodeUnstageVolume(
"StagingTargetPath missing in NodeUnstageVolumeRequest")
}

socketPath := filepath.Join(s.nbsSocketsDir, req.VolumeId, socketName)
return &csi.NodeUnstageVolumeResponse{}, nil
}

func (s *nodeService) stopEndpoint(
ctx context.Context,
volumeId string) error {

socketPath := filepath.Join(s.nbsSocketsDir, volumeId, socketName)
stopEndpointRequest := &nbsapi.TStopEndpointRequest{
UnixSocketPath: socketPath,
}

_, err := s.nbsClient.StopEndpoint(ctx, stopEndpointRequest)
if err != nil {
return nil, status.Errorf(
return status.Errorf(
codes.Internal,
"Failed to stop endpoint: %+v", err)
}

endpointDir := filepath.Join(s.podSocketsDir, req.VolumeId)
endpointDir := filepath.Join(s.podSocketsDir, volumeId)
if err := os.RemoveAll(endpointDir); err != nil {
return nil, err
return err
}

return &csi.NodeUnstageVolumeResponse{}, nil
return nil
}

func (s *nodeService) NodePublishVolume(
Expand All @@ -192,25 +212,37 @@ func (s *nodeService) NodePublishVolume(

log.Printf("csi.NodePublishVolumeRequest: %+v", req)

if req.VolumeId == "" {
return nil, status.Error(
codes.InvalidArgument,
"VolumeId missing in NodePublishVolumeRequest")
}
if req.VolumeCapability == nil {
return nil, status.Error(
codes.InvalidArgument,
"NodeStageVolume Volume Capability must be provided")
"VolumeCapability missing im NodePublishVolumeRequest")
}

resp, err := s.startEndpoint(
ctx,
req.VolumeId,
req.VolumeCapability,
req.VolumeContext)
if err != nil {
return nil, err
}

if resp.NbdDeviceFile != "" {
log.Printf("Endpoint started with device file: %q", resp.NbdDeviceFile)
}

options := []string{"bind"}

var err error
switch req.VolumeCapability.GetAccessType().(type) {
case *csi.VolumeCapability_Mount:
err = s.nodePublishVolumeForFileSystem(req, options)
case *csi.VolumeCapability_Block:
if req.VolumeContext["backend"] == "nfs" {
return nil, status.Error(
codes.InvalidArgument,
"'Block' volume mode is not supported for nfs backend")
}
err = s.nodePublishVolumeForBlock(req, options)
err = s.nodePublishVolumeForBlock(resp.NbdDeviceFile, req, options)
default:
return nil, status.Error(codes.InvalidArgument, "Unknown access type")
}
Expand Down Expand Up @@ -246,10 +278,17 @@ func (s *nodeService) nodePublishVolumeForFileSystem(
}

func (s *nodeService) nodePublishVolumeForBlock(
nbdDeviceFile string,
req *csi.NodePublishVolumeRequest,
mountOptions []string) error {

source := "/dev/nbd0" // TODO (issues/463): get from endpoint info
if req.VolumeContext != nil && req.VolumeContext["backend"] == "nfs" {
return status.Error(
codes.InvalidArgument,
"'Block' volume mode is not supported for nfs backend")
}

source := nbdDeviceFile
target := req.TargetPath

return s.mount(source, target, "", mountOptions...)
Expand All @@ -273,6 +312,10 @@ func (s *nodeService) NodeUnpublishVolume(
"Target Path missing in NodeUnpublishVolumeRequest")
}

if err := s.stopEndpoint(ctx, req.VolumeId); err != nil {
return nil, err
}

err := s.unmount(req.TargetPath)
if err != nil {
return nil, err
Expand All @@ -292,8 +335,6 @@ func (s *nodeService) NodeGetCapabilities(
req *csi.NodeGetCapabilitiesRequest,
) (*csi.NodeGetCapabilitiesResponse, error) {

log.Printf("csi.NodeGetCapabilitiesRequest: %+v", req)

return &csi.NodeGetCapabilitiesResponse{
Capabilities: capabilities,
}, nil
Expand All @@ -303,8 +344,6 @@ func (s *nodeService) NodeGetInfo(
ctx context.Context,
req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {

log.Printf("csi.NodeGetInfo: %+v", req)

return &csi.NodeGetInfoResponse{
NodeId: s.nodeID,
AccessibleTopology: &csi.Topology{
Expand Down

0 comments on commit 2ce3367

Please sign in to comment.