Skip to content

Commit

Permalink
fix(service): run only single attach/detach operation simultaneously (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
peknur authored Oct 24, 2023
1 parent 1d65a40 commit 854cbca
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 3 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ clean-tests:

.PHONY: test
test:
go vet ./...
go test -race ./...

test-integration:
Expand Down
1 change: 0 additions & 1 deletion internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,6 @@ func (c *Controller) ControllerPublishVolume(ctx context.Context, req *csi.Contr
if errors.As(err, &svcError) && svcError.Status != http.StatusConflict && svcError.ErrorCode() == upcloud.ErrCodeStorageDeviceLimitReached {
return nil, status.Error(codes.ResourceExhausted, "The limit of the number of attached devices has been reached")
}
// already attached to the node
return nil, err
}

Expand Down
108 changes: 108 additions & 0 deletions internal/service/mock/upcloud_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package mock

import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
"time"

"github.com/UpCloudLtd/upcloud-go-api/v6/upcloud"
"github.com/UpCloudLtd/upcloud-go-api/v6/upcloud/request"
upsvc "github.com/UpCloudLtd/upcloud-go-api/v6/upcloud/service"
)

type UpCloudClient struct {
upsvc.Storage

servers sync.Map
}

func (u *UpCloudClient) StoreServer(s *upcloud.ServerDetails) {
u.servers.LoadOrStore(s.UUID, s)
}

func (u *UpCloudClient) getServer(id string) *upcloud.ServerDetails {
if s, ok := u.servers.Load(id); ok {
return s.(*upcloud.ServerDetails)
}
return nil
}

func (u *UpCloudClient) WaitForServerState(ctx context.Context, r *request.WaitForServerStateRequest) (*upcloud.ServerDetails, error) {
s, _ := u.GetServerDetails(ctx, &request.GetServerDetailsRequest{
UUID: r.UUID,
})
return s, nil
}

func (u *UpCloudClient) GetServers(ctx context.Context) (*upcloud.Servers, error) {
s := []upcloud.Server{}
u.servers.Range(func(key, value any) bool {
if d, ok := value.(*upcloud.ServerDetails); ok {
s = append(s, d.Server)
}
return true
})
return &upcloud.Servers{Servers: s}, nil
}

func (u *UpCloudClient) GetServerDetails(ctx context.Context, r *request.GetServerDetailsRequest) (*upcloud.ServerDetails, error) {
if s := u.getServer(r.UUID); s != nil {
return s, nil
}
return nil, fmt.Errorf("server '%s' not found", r.UUID)
}

func (u *UpCloudClient) AttachStorage(ctx context.Context, r *request.AttachStorageRequest) (*upcloud.ServerDetails, error) {
server := u.getServer(r.ServerUUID)
if server == nil {
return server, errors.New("server not found")
}
if server.State != upcloud.ServerStateStarted {
return nil, fmt.Errorf("server %s state is %s", r.ServerUUID, server.State)
}
server.State = upcloud.ServerStateMaintenance
u.StoreServer(server)
time.Sleep(time.Duration(rand.Intn(200)+100) * time.Millisecond) //nolint:gosec // using weak random number doesn't affect the result.
server.State = upcloud.ServerStateStarted
if server.StorageDevices == nil {
server.StorageDevices = make(upcloud.ServerStorageDeviceSlice, 0)
}
server.StorageDevices = append(server.StorageDevices, upcloud.ServerStorageDevice{
Address: fmt.Sprintf("%s:%d", r.Address, len(server.StorageDevices)+1),
UUID: r.StorageUUID,
Size: 10,
})
u.StoreServer(server)

return u.getServer(r.ServerUUID), nil
}

func (u *UpCloudClient) DetachStorage(ctx context.Context, r *request.DetachStorageRequest) (*upcloud.ServerDetails, error) {
server := u.getServer(r.ServerUUID)
if server == nil {
return server, fmt.Errorf("server %s not found", r.ServerUUID)
}
if server.State != upcloud.ServerStateStarted {
return nil, fmt.Errorf("server %s state is %s", r.ServerUUID, server.State)
}
server.State = upcloud.ServerStateMaintenance
u.StoreServer(server)
time.Sleep(time.Duration(rand.Intn(200)+100) * time.Millisecond) //nolint:gosec // using weak random number doesn't affect the result.
server = u.getServer(r.ServerUUID)
server.State = upcloud.ServerStateStarted
if len(server.StorageDevices) > 0 {
storage := make([]upcloud.ServerStorageDevice, 0)
for i := range server.StorageDevices {
if server.StorageDevices[i].Address != r.Address {
storage = append(storage, server.StorageDevices[i])
}
}
server.StorageDevices = storage
}
u.StoreServer(server)

return server, nil
}
59 changes: 59 additions & 0 deletions internal/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"

"github.com/UpCloudLtd/upcloud-csi/internal/service"
"github.com/UpCloudLtd/upcloud-csi/internal/service/mock"
"github.com/UpCloudLtd/upcloud-go-api/v6/upcloud"
"github.com/UpCloudLtd/upcloud-go-api/v6/upcloud/client"
"github.com/UpCloudLtd/upcloud-go-api/v6/upcloud/request"
upsvc "github.com/UpCloudLtd/upcloud-go-api/v6/upcloud/service"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestUpCloudService_ListStorage(t *testing.T) {
Expand Down Expand Up @@ -168,3 +173,57 @@ func TestUpCloudService_ListStorageBackups(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, storages, 3)
}

func TestUpCloudService_AttachDetachStorage_Concurrency(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

client := &mock.UpCloudClient{}
s := service.NewUpCloudService(client)
c := 10

var wg sync.WaitGroup
for i := 0; i < c; i++ {
wg.Add(1)
// populate backend with two nodes and add 5 storages per node
serverUUID := fmt.Sprintf("test-node-%d", i%2)
volUUID := fmt.Sprintf("test-vol-%d", i)
client.StoreServer(&upcloud.ServerDetails{
Server: upcloud.Server{
UUID: serverUUID,
State: upcloud.ServerStateStarted,
},
StorageDevices: make([]upcloud.ServerStorageDevice, 0),
})
go func(volUUID, serverUUID string) {
defer wg.Done()
t1 := time.Now()
err := s.AttachStorage(ctx, volUUID, serverUUID)
t.Logf("attached %s to node %s in %s", volUUID, serverUUID, time.Since(t1))
assert.NoError(t, err)
}(volUUID, serverUUID)
}
wg.Wait()
servers, err := client.GetServers(ctx)
require.NoError(t, err)
require.Len(t, servers.Servers, 2)
for _, srv := range servers.Servers {
d, err := client.GetServerDetails(ctx, &request.GetServerDetailsRequest{UUID: srv.UUID})
if !assert.NoError(t, err) {
continue
}
for _, storage := range d.StorageDevices {
wg.Add(1)
go func(volUUID, serverUUID string) {
defer wg.Done()
t1 := time.Now()
err := s.DetachStorage(ctx, volUUID, serverUUID)
t.Logf("detached %s from node %s in %s", volUUID, serverUUID, time.Since(t1))
assert.NoError(t, err)
}(storage.UUID, d.UUID)
}
}
wg.Wait()
}
24 changes: 22 additions & 2 deletions internal/service/upcloud_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/UpCloudLtd/upcloud-go-api/v6/upcloud"
Expand All @@ -30,6 +31,9 @@ type upCloudClient interface {

type UpCloudService struct {
client upCloudClient

// nodeSync holds per node mutex lock so that only one detach/attach operation can run simultaneously towards the node.
nodeSync sync.Map
}

func NewUpCloudService(svc upCloudClient) *UpCloudService {
Expand Down Expand Up @@ -123,6 +127,13 @@ func (u *UpCloudService) DeleteStorage(ctx context.Context, storageUUID string)
}

func (u *UpCloudService) AttachStorage(ctx context.Context, storageUUID, serverUUID string) error {
// Lock attach operation per node because node can only attach single storage at the time.
mu, _ := u.nodeSync.LoadOrStore(serverUUID, &sync.Mutex{})
if mu != nil {
mu.(*sync.Mutex).Lock()
defer mu.(*sync.Mutex).Unlock()
}

if err := u.waitForServerOnline(ctx, serverUUID); err != nil {
return fmt.Errorf("failed to attach storage, pre-condition failed: %w", err)
}
Expand All @@ -142,12 +153,21 @@ func (u *UpCloudService) AttachStorage(ctx context.Context, storageUUID, serverU
}

func (u *UpCloudService) DetachStorage(ctx context.Context, storageUUID, serverUUID string) error {
// Lock detach operation per node because node can only detach single storage at the time.
mu, _ := u.nodeSync.LoadOrStore(serverUUID, &sync.Mutex{})
if mu != nil {
mu.(*sync.Mutex).Lock()
defer mu.(*sync.Mutex).Unlock()
}

sd, err := u.client.GetServerDetails(ctx, &request.GetServerDetailsRequest{UUID: serverUUID})
if err != nil {
return err
}
if err := u.waitForServerOnline(ctx, serverUUID); err != nil {
return fmt.Errorf("failed to detach storage, pre-condition failed: %w", err)
if sd.State != upcloud.ServerStateStarted {
if err := u.waitForServerOnline(ctx, serverUUID); err != nil {
return fmt.Errorf("failed to detach storage, pre-condition failed: %w", err)
}
}
for _, device := range sd.StorageDevices {
if device.UUID == storageUUID {
Expand Down

0 comments on commit 854cbca

Please sign in to comment.