Skip to content

Commit

Permalink
feat(docker): listens for container stopped event
Browse files Browse the repository at this point in the history
When a container is shutdown manually while it's registered as `ready` in Sablier it will remove it from the store.
Meaning externally events are now handled for docker.
  • Loading branch information
acouvreur committed Nov 6, 2022
1 parent eb83d39 commit 1ca1934
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 34 deletions.
34 changes: 33 additions & 1 deletion app/providers/docker_classic.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@ package providers

import (
"context"
"errors"
"fmt"
"io"

"github.com/acouvreur/sablier/app/instance"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/client"
log "github.com/sirupsen/logrus"
)

type DockerClassicProvider struct {
Client client.ContainerAPIClient
Client client.APIClient
desiredReplicas int
}

Expand Down Expand Up @@ -103,3 +106,32 @@ func (provider *DockerClassicProvider) GetState(name string) (instance.State, er
return instance.UnrecoverableInstanceState(name, fmt.Sprintf("container status \"%s\" not handled", spec.State.Status), provider.desiredReplicas)
}
}

func (provider *DockerClassicProvider) NotifyInsanceStopped(ctx context.Context, instance chan string) {
msgs, errs := provider.Client.Events(ctx, types.EventsOptions{
Filters: filters.NewArgs(
filters.Arg("scope", "local"),
filters.Arg("type", "container"),
filters.Arg("event", "die"),
),
})

go func() {
for {
select {
case msg := <-msgs:
// Send the container that has died to the channel
instance <- msg.From
case err := <-errs:
if errors.Is(err, io.EOF) {
log.Debug("provider event stream closed")
close(instance)
return
}
case <-ctx.Done():
close(instance)
return
}
}
}()
}
80 changes: 61 additions & 19 deletions app/providers/docker_classic_test.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
package providers

import (
"context"
"fmt"
"reflect"
"testing"

"github.com/acouvreur/sablier/app/instance"
"github.com/acouvreur/sablier/app/providers/mocks"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/events"
"github.com/stretchr/testify/mock"
)

func TestDockerClassicProvider_GetState(t *testing.T) {
type fields struct {
Client *mocks.ContainerAPIClientMock
Client *mocks.DockerAPIClientMock
}
type args struct {
name string
Expand All @@ -30,7 +32,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) {
{
name: "nginx created container state",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
Expand All @@ -47,7 +49,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) {
{
name: "nginx running container state without healthcheck",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
Expand All @@ -64,7 +66,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) {
{
name: "nginx running container state with \"starting\" health",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
Expand All @@ -81,7 +83,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) {
{
name: "nginx running container state with \"unhealthy\" health",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
Expand All @@ -99,7 +101,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) {
{
name: "nginx running container state with \"healthy\" health",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
Expand All @@ -116,7 +118,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) {
{
name: "nginx paused container state",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
Expand All @@ -133,7 +135,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) {
{
name: "nginx restarting container state",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
Expand All @@ -150,7 +152,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) {
{
name: "nginx removing container state",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
Expand All @@ -167,7 +169,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) {
{
name: "nginx exited container state with status code 0",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
Expand All @@ -184,7 +186,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) {
{
name: "nginx exited container state with status code 137",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
Expand All @@ -202,7 +204,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) {
{
name: "nginx dead container state",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
Expand All @@ -220,7 +222,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) {
{
name: "container inspect has an error",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
Expand Down Expand Up @@ -260,7 +262,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) {

func TestDockerClassicProvider_Stop(t *testing.T) {
type fields struct {
Client *mocks.ContainerAPIClientMock
Client *mocks.DockerAPIClientMock
}
type args struct {
name string
Expand All @@ -276,7 +278,7 @@ func TestDockerClassicProvider_Stop(t *testing.T) {
{
name: "container stop has an error",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
Expand All @@ -294,7 +296,7 @@ func TestDockerClassicProvider_Stop(t *testing.T) {
{
name: "container stop as expected",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
Expand Down Expand Up @@ -332,7 +334,7 @@ func TestDockerClassicProvider_Stop(t *testing.T) {

func TestDockerClassicProvider_Start(t *testing.T) {
type fields struct {
Client *mocks.ContainerAPIClientMock
Client *mocks.DockerAPIClientMock
}
type args struct {
name string
Expand All @@ -348,7 +350,7 @@ func TestDockerClassicProvider_Start(t *testing.T) {
{
name: "container start has an error",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
Expand All @@ -366,7 +368,7 @@ func TestDockerClassicProvider_Start(t *testing.T) {
{
name: "container start as expected",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
Expand Down Expand Up @@ -401,3 +403,43 @@ func TestDockerClassicProvider_Start(t *testing.T) {
})
}
}

func TestDockerClassicProvider_NotifyInsanceStopped(t *testing.T) {
tests := []struct {
name string
want []string
events []events.Message
errors []error
}{
{
name: "container nginx is stopped",
want: []string{"nginx"},
events: []events.Message{
mocks.ContainerStoppedEvent("nginx"),
},
errors: []error{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
provider := &DockerClassicProvider{
Client: mocks.NewDockerAPIClientMockWithEvents(tt.events, tt.errors),
desiredReplicas: 1,
}

instanceC := make(chan string)

provider.NotifyInsanceStopped(context.Background(), instanceC)

var got []string

for i := range instanceC {
got = append(got, i)
}

if !reflect.DeepEqual(got, tt.want) {
t.Errorf("NotifyInsanceStopped() = %v, want %v", got, tt.want)
}
})
}
}
3 changes: 3 additions & 0 deletions app/providers/docker_swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,6 @@ func (provider *DockerSwarmProvider) getInstanceName(name string, service swarm.

return fmt.Sprintf("%s (%s)", name, service.Spec.Name)
}

func (provider *DockerSwarmProvider) NotifyInsanceStopped(ctx context.Context, instance chan string) {
}
3 changes: 3 additions & 0 deletions app/providers/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,6 @@ func (provider *KubernetesProvider) getStatefulsetState(config *Config) (instanc

return instance.NotReadyInstanceState(config.OriginalName, int(ss.Status.ReadyReplicas), int(config.Replicas))
}

func (provider *KubernetesProvider) NotifyInsanceStopped(ctx context.Context, instance chan string) {
}
Loading

0 comments on commit 1ca1934

Please sign in to comment.