diff --git a/Makefile b/Makefile index 71612111..213899cd 100644 --- a/Makefile +++ b/Makefile @@ -32,6 +32,7 @@ clean-tests: .PHONY: test test: + go vet ./... go test -race ./... test-integration: diff --git a/internal/controller/controller.go b/internal/controller/controller.go index a3559bb9..cd765a5b 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -272,7 +272,6 @@ func (c *Controller) ControllerPublishVolume(ctx context.Context, req *csi.Contr if errors.As(err, &svcError) && svcError.Status != http.StatusConflict && svcError.ErrorCode() == upcloud.ErrCodeStorageDeviceLimitReached { return nil, status.Error(codes.ResourceExhausted, "The limit of the number of attached devices has been reached") } - // already attached to the node return nil, err } diff --git a/internal/service/mock/upcloud_client.go b/internal/service/mock/upcloud_client.go new file mode 100644 index 00000000..7b1d6f0a --- /dev/null +++ b/internal/service/mock/upcloud_client.go @@ -0,0 +1,108 @@ +package mock + +import ( + "context" + "errors" + "fmt" + "math/rand" + "sync" + "time" + + "github.com/UpCloudLtd/upcloud-go-api/v6/upcloud" + "github.com/UpCloudLtd/upcloud-go-api/v6/upcloud/request" + upsvc "github.com/UpCloudLtd/upcloud-go-api/v6/upcloud/service" +) + +type UpCloudClient struct { + upsvc.Storage + + servers sync.Map +} + +func (u *UpCloudClient) StoreServer(s *upcloud.ServerDetails) { + u.servers.LoadOrStore(s.UUID, s) +} + +func (u *UpCloudClient) getServer(id string) *upcloud.ServerDetails { + if s, ok := u.servers.Load(id); ok { + return s.(*upcloud.ServerDetails) + } + return nil +} + +func (u *UpCloudClient) WaitForServerState(ctx context.Context, r *request.WaitForServerStateRequest) (*upcloud.ServerDetails, error) { + s, _ := u.GetServerDetails(ctx, &request.GetServerDetailsRequest{ + UUID: r.UUID, + }) + return s, nil +} + +func (u *UpCloudClient) GetServers(ctx context.Context) (*upcloud.Servers, error) { + s := []upcloud.Server{} + u.servers.Range(func(key, value any) bool { + if d, ok := value.(*upcloud.ServerDetails); ok { + s = append(s, d.Server) + } + return true + }) + return &upcloud.Servers{Servers: s}, nil +} + +func (u *UpCloudClient) GetServerDetails(ctx context.Context, r *request.GetServerDetailsRequest) (*upcloud.ServerDetails, error) { + if s := u.getServer(r.UUID); s != nil { + return s, nil + } + return nil, fmt.Errorf("server '%s' not found", r.UUID) +} + +func (u *UpCloudClient) AttachStorage(ctx context.Context, r *request.AttachStorageRequest) (*upcloud.ServerDetails, error) { + server := u.getServer(r.ServerUUID) + if server == nil { + return server, errors.New("server not found") + } + if server.State != upcloud.ServerStateStarted { + return nil, fmt.Errorf("server %s state is %s", r.ServerUUID, server.State) + } + server.State = upcloud.ServerStateMaintenance + u.StoreServer(server) + time.Sleep(time.Duration(rand.Intn(200)+100) * time.Millisecond) //nolint:gosec // using weak random number doesn't affect the result. + server.State = upcloud.ServerStateStarted + if server.StorageDevices == nil { + server.StorageDevices = make(upcloud.ServerStorageDeviceSlice, 0) + } + server.StorageDevices = append(server.StorageDevices, upcloud.ServerStorageDevice{ + Address: fmt.Sprintf("%s:%d", r.Address, len(server.StorageDevices)+1), + UUID: r.StorageUUID, + Size: 10, + }) + u.StoreServer(server) + + return u.getServer(r.ServerUUID), nil +} + +func (u *UpCloudClient) DetachStorage(ctx context.Context, r *request.DetachStorageRequest) (*upcloud.ServerDetails, error) { + server := u.getServer(r.ServerUUID) + if server == nil { + return server, fmt.Errorf("server %s not found", r.ServerUUID) + } + if server.State != upcloud.ServerStateStarted { + return nil, fmt.Errorf("server %s state is %s", r.ServerUUID, server.State) + } + server.State = upcloud.ServerStateMaintenance + u.StoreServer(server) + time.Sleep(time.Duration(rand.Intn(200)+100) * time.Millisecond) //nolint:gosec // using weak random number doesn't affect the result. + server = u.getServer(r.ServerUUID) + server.State = upcloud.ServerStateStarted + if len(server.StorageDevices) > 0 { + storage := make([]upcloud.ServerStorageDevice, 0) + for i := range server.StorageDevices { + if server.StorageDevices[i].Address != r.Address { + storage = append(storage, server.StorageDevices[i]) + } + } + server.StorageDevices = storage + } + u.StoreServer(server) + + return server, nil +} diff --git a/internal/service/service_test.go b/internal/service/service_test.go index 001a3eff..f43cfe5a 100644 --- a/internal/service/service_test.go +++ b/internal/service/service_test.go @@ -5,13 +5,18 @@ import ( "fmt" "net/http" "net/http/httptest" + "sync" "testing" + "time" "github.com/UpCloudLtd/upcloud-csi/internal/service" + "github.com/UpCloudLtd/upcloud-csi/internal/service/mock" "github.com/UpCloudLtd/upcloud-go-api/v6/upcloud" "github.com/UpCloudLtd/upcloud-go-api/v6/upcloud/client" + "github.com/UpCloudLtd/upcloud-go-api/v6/upcloud/request" upsvc "github.com/UpCloudLtd/upcloud-go-api/v6/upcloud/service" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestUpCloudService_ListStorage(t *testing.T) { @@ -168,3 +173,57 @@ func TestUpCloudService_ListStorageBackups(t *testing.T) { assert.NoError(t, err) assert.Len(t, storages, 3) } + +func TestUpCloudService_AttachDetachStorage_Concurrency(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + client := &mock.UpCloudClient{} + s := service.NewUpCloudService(client) + c := 10 + + var wg sync.WaitGroup + for i := 0; i < c; i++ { + wg.Add(1) + // populate backend with two nodes and add 5 storages per node + serverUUID := fmt.Sprintf("test-node-%d", i%2) + volUUID := fmt.Sprintf("test-vol-%d", i) + client.StoreServer(&upcloud.ServerDetails{ + Server: upcloud.Server{ + UUID: serverUUID, + State: upcloud.ServerStateStarted, + }, + StorageDevices: make([]upcloud.ServerStorageDevice, 0), + }) + go func(volUUID, serverUUID string) { + defer wg.Done() + t1 := time.Now() + err := s.AttachStorage(ctx, volUUID, serverUUID) + t.Logf("attached %s to node %s in %s", volUUID, serverUUID, time.Since(t1)) + assert.NoError(t, err) + }(volUUID, serverUUID) + } + wg.Wait() + servers, err := client.GetServers(ctx) + require.NoError(t, err) + require.Len(t, servers.Servers, 2) + for _, srv := range servers.Servers { + d, err := client.GetServerDetails(ctx, &request.GetServerDetailsRequest{UUID: srv.UUID}) + if !assert.NoError(t, err) { + continue + } + for _, storage := range d.StorageDevices { + wg.Add(1) + go func(volUUID, serverUUID string) { + defer wg.Done() + t1 := time.Now() + err := s.DetachStorage(ctx, volUUID, serverUUID) + t.Logf("detached %s from node %s in %s", volUUID, serverUUID, time.Since(t1)) + assert.NoError(t, err) + }(storage.UUID, d.UUID) + } + } + wg.Wait() +} diff --git a/internal/service/upcloud_service.go b/internal/service/upcloud_service.go index 41562e5c..9fe755e6 100644 --- a/internal/service/upcloud_service.go +++ b/internal/service/upcloud_service.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sync" "time" "github.com/UpCloudLtd/upcloud-go-api/v6/upcloud" @@ -30,6 +31,9 @@ type upCloudClient interface { type UpCloudService struct { client upCloudClient + + // nodeSync holds per node mutex lock so that only one detach/attach operation can run simultaneously towards the node. + nodeSync sync.Map } func NewUpCloudService(svc upCloudClient) *UpCloudService { @@ -123,6 +127,13 @@ func (u *UpCloudService) DeleteStorage(ctx context.Context, storageUUID string) } func (u *UpCloudService) AttachStorage(ctx context.Context, storageUUID, serverUUID string) error { + // Lock attach operation per node because node can only attach single storage at the time. + mu, _ := u.nodeSync.LoadOrStore(serverUUID, &sync.Mutex{}) + if mu != nil { + mu.(*sync.Mutex).Lock() + defer mu.(*sync.Mutex).Unlock() + } + if err := u.waitForServerOnline(ctx, serverUUID); err != nil { return fmt.Errorf("failed to attach storage, pre-condition failed: %w", err) } @@ -142,12 +153,21 @@ func (u *UpCloudService) AttachStorage(ctx context.Context, storageUUID, serverU } func (u *UpCloudService) DetachStorage(ctx context.Context, storageUUID, serverUUID string) error { + // Lock detach operation per node because node can only detach single storage at the time. + mu, _ := u.nodeSync.LoadOrStore(serverUUID, &sync.Mutex{}) + if mu != nil { + mu.(*sync.Mutex).Lock() + defer mu.(*sync.Mutex).Unlock() + } + sd, err := u.client.GetServerDetails(ctx, &request.GetServerDetailsRequest{UUID: serverUUID}) if err != nil { return err } - if err := u.waitForServerOnline(ctx, serverUUID); err != nil { - return fmt.Errorf("failed to detach storage, pre-condition failed: %w", err) + if sd.State != upcloud.ServerStateStarted { + if err := u.waitForServerOnline(ctx, serverUUID); err != nil { + return fmt.Errorf("failed to detach storage, pre-condition failed: %w", err) + } } for _, device := range sd.StorageDevices { if device.UUID == storageUUID {