Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rbd: use ioctx locks for key rotation #4734

Merged
merged 3 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 11 additions & 27 deletions internal/cephfs/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"os"
"path"
"strings"
"syscall"
"time"

cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
Expand All @@ -34,6 +33,7 @@ import (
hc "github.com/ceph/ceph-csi/internal/health-checker"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/fscrypt"
iolock "github.com/ceph/ceph-csi/internal/util/lock"
"github.com/ceph/ceph-csi/internal/util/log"

"github.com/container-storage-interface/spec/lib/go/csi"
Expand Down Expand Up @@ -134,12 +134,12 @@ func maybeUnlockFileEncryption(
}

// Define Mutex Lock variables
lockName := string(volID) + "-mutexLock"
lockDesc := "Lock for " + string(volID)
volIDStr := string(volID)
lockName := volIDStr + "-mutexLock"
lockDesc := "Lock for " + volIDStr
lockDuration := 150 * time.Second
// Generate a consistent lock cookie for the client using hostname and process ID
lockCookie := generateLockCookie()
var flags byte = 0

log.DebugLog(ctx, "Creating lock for the following volume ID %s", volID)

Expand All @@ -151,32 +151,16 @@ func maybeUnlockFileEncryption(
}
defer ioctx.Destroy()

res, err := ioctx.LockExclusive(string(volID), lockName, lockCookie, lockDesc, lockDuration, &flags)
if res != 0 {
switch res {
case -int(syscall.EBUSY):
return fmt.Errorf("Lock is already held by another client and cookie pair for %v volume", volID)
case -int(syscall.EEXIST):
return fmt.Errorf("Lock is already held by the same client and cookie pair for %v volume", volID)
default:
return fmt.Errorf("Failed to lock volume ID %v: %w", volID, err)
}
lock := iolock.NewLock(ioctx, volIDStr, lockName, lockCookie, lockDesc, lockDuration)
err = lock.LockExclusive(ctx)
if err != nil {
log.ErrorLog(ctx, "failed to create lock for volume ID %s: %v", volID, err)

return err
}
defer lock.Unlock(ctx)
log.DebugLog(ctx, "Lock successfully created for volume ID %s", volID)

defer func() {
ret, unlockErr := ioctx.Unlock(string(volID), lockName, lockCookie)
switch ret {
case 0:
log.DebugLog(ctx, "Lock %s successfully released ", lockName)
case -int(syscall.ENOENT):
log.DebugLog(ctx, "Lock is not held by the specified %s, %s pair", lockCookie, lockName)
default:
log.ErrorLog(ctx, "Failed to release following lock, this will lead to orphan lock %s: %v",
lockName, unlockErr)
}
}()

log.DebugLog(ctx, "cephfs: unlocking fscrypt on volume %q path %s", volID, stagingTargetPath)
err = fscrypt.Unlock(ctx, volOptions.Encryption, stagingTargetPath, string(volID))
if err != nil {
Expand Down
8 changes: 0 additions & 8 deletions internal/csi-addons/rbd/encryptionkeyrotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@ import (
"context"
"errors"

csicommon "github.com/ceph/ceph-csi/internal/csi-common"
"github.com/ceph/ceph-csi/internal/rbd"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"

"github.com/container-storage-interface/spec/lib/go/csi"
ekr "github.com/csi-addons/spec/lib/go/encryptionkeyrotation"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -55,12 +53,6 @@ func (ekrs *EncryptionKeyRotationServer) EncryptionKeyRotate(
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
}

// Block key rotation for RWX/ROX volumes
_, isMultiNode := csicommon.IsBlockMultiNode([]*csi.VolumeCapability{req.GetVolumeCapability()})
if isMultiNode {
return nil, status.Error(codes.Unimplemented, "multi-node key rotation is not supported")
}

if acquired := ekrs.volLock.TryAcquire(volID); !acquired {
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volID)
}
Expand Down
24 changes: 24 additions & 0 deletions internal/rbd/encryption.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import (
"fmt"
"strconv"
"strings"
"time"

kmsapi "github.com/ceph/ceph-csi/internal/kms"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/lock"
"github.com/ceph/ceph-csi/internal/util/log"

librbd "github.com/ceph/go-ceph/rbd"
Expand Down Expand Up @@ -463,6 +465,28 @@ func (rv *rbdVolume) RotateEncryptionKey(ctx context.Context) error {
return errors.New("key rotation not supported for unencrypted device")
}

// Call open Ioctx to create a new ioctx object
// if the obj already exists, no error is returned
err = rv.openIoctx()
if err != nil {
return fmt.Errorf("failed to open ioctx, err: %w", err)
}

// Lock params
lockName := rv.VolID + "-mutexlock"
lockDesc := "Key rotation mutex lock for " + rv.VolID
lockDuration := 3 * time.Minute
lockCookie := rv.VolID + "-enc-key-rotate"

// Acquire the exclusive lock based on vol id
lck := lock.NewLock(rv.ioctx, rv.VolID, lockName, lockCookie, lockDesc, lockDuration)
err = lck.LockExclusive(ctx)
if err != nil {
return err
}
defer lck.Unlock(ctx)
log.DebugLog(ctx, "acquired ioctx lock for vol id: %s", rv.VolID)

// Get the device path for the underlying image
useNbd := rv.Mounter == rbdNbdMounter && hasNBD
devicePath, found := waitForPath(ctx, rv.Pool, rv.RadosNamespace, rv.RbdImageName, 1, useNbd)
Expand Down
108 changes: 108 additions & 0 deletions internal/util/lock/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
Copyright 2024 The Ceph-CSI Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package lock

import (
"context"
"fmt"
"syscall"
"time"

"github.com/ceph/go-ceph/rados"

"github.com/ceph/ceph-csi/internal/util/log"
)

// IOCtxLock provides methods for acquiring and releasing exclusive locks on a volume.
// using rados IO context locks.
type IOCtxLock interface {
LockExclusive(ctx context.Context) error
Unlock(ctx context.Context)
}

type lock struct {
volID string
lockName string
lockDesc string
lockCookie string
timeout time.Duration
ioctx *rados.IOContext
}

// NewLock returns `lock` type that implements the IOCtxLock interface.
func NewLock(
ioctx *rados.IOContext,
volID string,
lockName string,
lockCookie string,
lockDesc string,
timeout time.Duration,
) IOCtxLock {
return &lock{
volID: volID,
lockName: lockName,
lockDesc: lockDesc,
lockCookie: lockCookie,
timeout: timeout,
ioctx: ioctx,
}
}

// LockExclusive acquires an exclusive lock on the volume identified by
// the name and cookie pair.
func (lck *lock) LockExclusive(ctx context.Context) error {
var flags byte = 0
ret, err := lck.ioctx.LockExclusive(
lck.volID,
lck.lockName,
lck.lockCookie,
lck.lockDesc,
lck.timeout,
&flags)

if ret != 0 {
switch ret {
case -int(syscall.EBUSY):
return fmt.Errorf("lock is already held by another client and cookie pair for %v volume",
lck.volID)
case -int(syscall.EEXIST):
return fmt.Errorf("lock is already held by the same client and cookie pair for %v volume",
lck.volID)
default:
return fmt.Errorf("failed to lock volume ID %v: %w", lck.volID, err)
}
}

return nil
}

// Unlock releases the exclusive lock on the volume.
func (lck *lock) Unlock(ctx context.Context) {
ret, err := lck.ioctx.Unlock(lck.volID, lck.lockName, lck.lockCookie)

switch ret {
case 0:
log.DebugLog(ctx, "lock %s for vol id %s successfully released ",
lck.lockName, lck.volID)
case -int(syscall.ENOENT):
log.DebugLog(ctx, "lock is not held by the specified %s, %s pair",
lck.lockCookie, lck.lockName)
default:
log.ErrorLog(ctx, "failed to release lock %s: %v",
lck.lockName, err)
}
}