Skip to content

Commit

Permalink
feat: add swarm event listener
Browse files Browse the repository at this point in the history
  • Loading branch information
acouvreur committed Nov 7, 2022
1 parent 8dc7198 commit a62f098
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 74 deletions.
31 changes: 30 additions & 1 deletion app/providers/docker_swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package providers

import (
"context"
"errors"
"fmt"
"io"
"strings"

"github.com/acouvreur/sablier/app/instance"
Expand All @@ -14,7 +16,7 @@ import (
)

type DockerSwarmProvider struct {
Client client.ServiceAPIClient
Client client.APIClient
desiredReplicas int
}

Expand Down Expand Up @@ -139,4 +141,31 @@ func (provider *DockerSwarmProvider) getInstanceName(name string, service swarm.
}

func (provider *DockerSwarmProvider) NotifyInsanceStopped(ctx context.Context, instance chan string) {
msgs, errs := provider.Client.Events(ctx, types.EventsOptions{
Filters: filters.NewArgs(
filters.Arg("scope", "swarm"),
filters.Arg("type", "service"),
),
})

go func() {
for {
select {
case msg := <-msgs:
// Send the container that has died to the channel
if msg.Actor.Attributes["replicas.new"] == "0" {
instance <- msg.Actor.Attributes["name"]
}
case err := <-errs:
if errors.Is(err, io.EOF) {
log.Debug("provider event stream closed")
close(instance)
return
}
case <-ctx.Done():
close(instance)
return
}
}
}()
}
115 changes: 53 additions & 62 deletions app/providers/docker_swarm_test.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,24 @@
package providers

import (
"context"
"reflect"
"testing"

"github.com/acouvreur/sablier/app/instance"
"github.com/acouvreur/sablier/app/providers/mocks"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/swarm"
"github.com/stretchr/testify/mock"
)

func TestDockerSwarmProvider_Start(t *testing.T) {
type fields struct {
Client *mocks.ServiceAPIClientMock
}
type args struct {
name string
}
tests := []struct {
name string
fields fields
args args
want instance.State
serviceList []swarm.Service
Expand All @@ -30,9 +28,6 @@ func TestDockerSwarmProvider_Start(t *testing.T) {
}{
{
name: "scale nginx service to 1 replica",
fields: fields{
Client: mocks.NewServiceAPIClientMock(),
},
args: args{
name: "nginx",
},
Expand All @@ -53,9 +48,6 @@ func TestDockerSwarmProvider_Start(t *testing.T) {
},
{
name: "ambiguous service name",
fields: fields{
Client: mocks.NewServiceAPIClientMock(),
},
args: args{
name: "nginx",
},
Expand All @@ -78,9 +70,6 @@ func TestDockerSwarmProvider_Start(t *testing.T) {
},
{
name: "exact match service name",
fields: fields{
Client: mocks.NewServiceAPIClientMock(),
},
args: args{
name: "nginx",
},
Expand All @@ -103,9 +92,6 @@ func TestDockerSwarmProvider_Start(t *testing.T) {
},
{
name: "service match on suffix",
fields: fields{
Client: mocks.NewServiceAPIClientMock(),
},
args: args{
name: "nginx",
},
Expand All @@ -127,9 +113,6 @@ func TestDockerSwarmProvider_Start(t *testing.T) {
},
{
name: "nginx is not a replicated service",
fields: fields{
Client: mocks.NewServiceAPIClientMock(),
},
args: args{
name: "nginx",
},
Expand All @@ -152,13 +135,14 @@ func TestDockerSwarmProvider_Start(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
clientMock := mocks.NewDockerAPIClientMock()
provider := &DockerSwarmProvider{
Client: tt.fields.Client,
Client: clientMock,
desiredReplicas: 1,
}

tt.fields.Client.On("ServiceList", mock.Anything, mock.Anything).Return(tt.serviceList, nil)
tt.fields.Client.On("ServiceUpdate", mock.Anything, tt.wantService.ID, tt.wantService.Meta.Version, tt.wantService.Spec, mock.Anything).Return(tt.response, nil)
clientMock.On("ServiceList", mock.Anything, mock.Anything).Return(tt.serviceList, nil)
clientMock.On("ServiceUpdate", mock.Anything, tt.wantService.ID, tt.wantService.Meta.Version, tt.wantService.Spec, mock.Anything).Return(tt.response, nil)

got, err := provider.Start(tt.args.name)
if (err != nil) != tt.wantErr {
Expand All @@ -173,15 +157,11 @@ func TestDockerSwarmProvider_Start(t *testing.T) {
}

func TestDockerSwarmProvider_Stop(t *testing.T) {
type fields struct {
Client *mocks.ServiceAPIClientMock
}
type args struct {
name string
}
tests := []struct {
name string
fields fields
args args
want instance.State
serviceList []swarm.Service
Expand All @@ -191,9 +171,6 @@ func TestDockerSwarmProvider_Stop(t *testing.T) {
}{
{
name: "scale nginx service to 0 replica",
fields: fields{
Client: mocks.NewServiceAPIClientMock(),
},
args: args{
name: "nginx",
},
Expand All @@ -214,9 +191,6 @@ func TestDockerSwarmProvider_Stop(t *testing.T) {
},
{
name: "ambiguous service name",
fields: fields{
Client: mocks.NewServiceAPIClientMock(),
},
args: args{
name: "nginx",
},
Expand All @@ -239,9 +213,6 @@ func TestDockerSwarmProvider_Stop(t *testing.T) {
},
{
name: "exact match service name",
fields: fields{
Client: mocks.NewServiceAPIClientMock(),
},
args: args{
name: "nginx",
},
Expand All @@ -264,9 +235,6 @@ func TestDockerSwarmProvider_Stop(t *testing.T) {
},
{
name: "service match on suffix",
fields: fields{
Client: mocks.NewServiceAPIClientMock(),
},
args: args{
name: "nginx",
},
Expand All @@ -288,9 +256,6 @@ func TestDockerSwarmProvider_Stop(t *testing.T) {
},
{
name: "nginx is not a replicated service",
fields: fields{
Client: mocks.NewServiceAPIClientMock(),
},
args: args{
name: "nginx",
},
Expand All @@ -313,13 +278,14 @@ func TestDockerSwarmProvider_Stop(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
clientMock := mocks.NewDockerAPIClientMock()
provider := &DockerSwarmProvider{
Client: tt.fields.Client,
Client: clientMock,
desiredReplicas: 1,
}

tt.fields.Client.On("ServiceList", mock.Anything, mock.Anything).Return(tt.serviceList, nil)
tt.fields.Client.On("ServiceUpdate", mock.Anything, tt.wantService.ID, tt.wantService.Meta.Version, tt.wantService.Spec, mock.Anything).Return(tt.response, nil)
clientMock.On("ServiceList", mock.Anything, mock.Anything).Return(tt.serviceList, nil)
clientMock.On("ServiceUpdate", mock.Anything, tt.wantService.ID, tt.wantService.Meta.Version, tt.wantService.Spec, mock.Anything).Return(tt.response, nil)

got, err := provider.Stop(tt.args.name)
if (err != nil) != tt.wantErr {
Expand All @@ -334,25 +300,18 @@ func TestDockerSwarmProvider_Stop(t *testing.T) {
}

func TestDockerSwarmProvider_GetState(t *testing.T) {
type fields struct {
Client *mocks.ServiceAPIClientMock
}
type args struct {
name string
}
tests := []struct {
name string
fields fields
args args
want instance.State
serviceList []swarm.Service
wantErr bool
}{
{
name: "nginx service is ready",
fields: fields{
Client: mocks.NewServiceAPIClientMock(),
},
args: args{
name: "nginx",
},
Expand All @@ -369,9 +328,6 @@ func TestDockerSwarmProvider_GetState(t *testing.T) {
},
{
name: "nginx service is not ready",
fields: fields{
Client: mocks.NewServiceAPIClientMock(),
},
args: args{
name: "nginx",
},
Expand All @@ -388,9 +344,6 @@ func TestDockerSwarmProvider_GetState(t *testing.T) {
},
{
name: "nginx service is not ready",
fields: fields{
Client: mocks.NewServiceAPIClientMock(),
},
args: args{
name: "nginx",
},
Expand All @@ -407,9 +360,6 @@ func TestDockerSwarmProvider_GetState(t *testing.T) {
},
{
name: "nginx is not a replicated service",
fields: fields{
Client: mocks.NewServiceAPIClientMock(),
},
args: args{
name: "nginx",
},
Expand All @@ -428,12 +378,13 @@ func TestDockerSwarmProvider_GetState(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
clientMock := mocks.NewDockerAPIClientMock()
provider := &DockerSwarmProvider{
Client: tt.fields.Client,
Client: clientMock,
desiredReplicas: 1,
}

tt.fields.Client.On("ServiceList", mock.Anything, mock.Anything).Return(tt.serviceList, nil)
clientMock.On("ServiceList", mock.Anything, mock.Anything).Return(tt.serviceList, nil)

got, err := provider.GetState(tt.args.name)
if (err != nil) != tt.wantErr {
Expand All @@ -446,3 +397,43 @@ func TestDockerSwarmProvider_GetState(t *testing.T) {
})
}
}

func TestDockerSwarmProvider_NotifyInsanceStopped(t *testing.T) {
tests := []struct {
name string
want []string
events []events.Message
errors []error
}{
{
name: "service nginx is scaled to 0",
want: []string{"nginx"},
events: []events.Message{
mocks.SeviceScaledEvent("nginx", "1", "0"),
},
errors: []error{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
provider := &DockerSwarmProvider{
Client: mocks.NewDockerAPIClientMockWithEvents(tt.events, tt.errors),
desiredReplicas: 1,
}

instanceC := make(chan string)

provider.NotifyInsanceStopped(context.Background(), instanceC)

var got []string

for i := range instanceC {
got = append(got, i)
}

if !reflect.DeepEqual(got, tt.want) {
t.Errorf("NotifyInsanceStopped() = %v, want %v", got, tt.want)
}
})
}
}
29 changes: 18 additions & 11 deletions app/providers/mocks/client_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,21 +190,12 @@ func ContainerStoppedEvent(name string) events.Message {
}
}

type ServiceAPIClientMock struct {
client.ServiceAPIClient
mock.Mock
}

func NewServiceAPIClientMock() *ServiceAPIClientMock {
return &ServiceAPIClientMock{}
}

func (client *ServiceAPIClientMock) ServiceUpdate(ctx context.Context, serviceID string, version swarm.Version, service swarm.ServiceSpec, options types.ServiceUpdateOptions) (types.ServiceUpdateResponse, error) {
func (client *DockerAPIClientMock) ServiceUpdate(ctx context.Context, serviceID string, version swarm.Version, service swarm.ServiceSpec, options types.ServiceUpdateOptions) (types.ServiceUpdateResponse, error) {
args := client.Mock.Called(ctx, serviceID, version, service, options)
return args.Get(0).(types.ServiceUpdateResponse), args.Error(1)
}

func (client *ServiceAPIClientMock) ServiceList(ctx context.Context, options types.ServiceListOptions) ([]swarm.Service, error) {
func (client *DockerAPIClientMock) ServiceList(ctx context.Context, options types.ServiceListOptions) ([]swarm.Service, error) {
args := client.Mock.Called(ctx, options)
return args.Get(0).([]swarm.Service), args.Error(1)
}
Expand Down Expand Up @@ -266,6 +257,22 @@ func ServiceGlobal(name string) swarm.Service {
}
}

func SeviceScaledEvent(name string, oldReplicas string, newReplicas string) events.Message {
return events.Message{
Scope: "swarm",
Action: "update",
Type: "service",
Actor: events.Actor{
ID: "randomid",
Attributes: map[string]string{
"name": name,
"replicas.new": newReplicas,
"replicas.old": oldReplicas,
},
},
}
}

type KubernetesAPIClientMock struct {
mockv1 AppsV1InterfaceMock

Expand Down

0 comments on commit a62f098

Please sign in to comment.