From 967dee53c2b395f84dd22eae31f3df2036aa0ef5 Mon Sep 17 00:00:00 2001 From: Arik Cohen Date: Mon, 19 Aug 2024 11:51:21 -0400 Subject: [PATCH] Refactored log forwarder --- internal/logging/forwarder.go => mq/log.go | 15 +++++++-------- .../logging/forwarder_test.go => mq/log_test.go | 11 +++++------ runtime/docker/docker.go | 3 +-- runtime/shell/shell.go | 3 +-- 4 files changed, 14 insertions(+), 18 deletions(-) rename internal/logging/forwarder.go => mq/log.go (78%) rename internal/logging/forwarder_test.go => mq/log_test.go (82%) diff --git a/internal/logging/forwarder.go b/mq/log.go similarity index 78% rename from internal/logging/forwarder.go rename to mq/log.go index a04b8d23..13a8e033 100644 --- a/internal/logging/forwarder.go +++ b/mq/log.go @@ -1,4 +1,4 @@ -package logging +package mq import ( "context" @@ -7,18 +7,17 @@ import ( "github.com/rs/zerolog/log" "github.com/runabol/tork" - "github.com/runabol/tork/mq" ) -type Forwarder struct { - Broker mq.Broker +type LogShipper struct { + Broker Broker TaskID string part int q chan []byte } -func NewForwarder(broker mq.Broker, taskID string) *Forwarder { - f := &Forwarder{ +func NewLogShipper(broker Broker, taskID string) *LogShipper { + f := &LogShipper{ Broker: broker, TaskID: taskID, q: make(chan []byte, 1000), @@ -27,7 +26,7 @@ func NewForwarder(broker mq.Broker, taskID string) *Forwarder { return f } -func (r *Forwarder) Write(p []byte) (int, error) { +func (r *LogShipper) Write(p []byte) (int, error) { pc := make([]byte, len(p)) copy(pc, p) select { @@ -38,7 +37,7 @@ func (r *Forwarder) Write(p []byte) (int, error) { } } -func (r *Forwarder) startFlushTimer() { +func (r *LogShipper) startFlushTimer() { ticker := time.NewTicker(time.Second) buffer := make([]byte, 0) for { diff --git a/internal/logging/forwarder_test.go b/mq/log_test.go similarity index 82% rename from internal/logging/forwarder_test.go rename to mq/log_test.go index 66f0e89c..85f2ee1c 100644 --- a/internal/logging/forwarder_test.go +++ b/mq/log_test.go @@ -1,4 +1,4 @@ -package logging +package mq import ( "fmt" @@ -6,12 +6,11 @@ import ( "time" "github.com/runabol/tork" - "github.com/runabol/tork/mq" "github.com/stretchr/testify/assert" ) func TestForwardTimeout(t *testing.T) { - b := mq.NewInMemoryBroker() + b := NewInMemoryBroker() processed := make(chan any) err := b.SubscribeForTaskLogPart(func(p *tork.TaskLogPart) { @@ -20,7 +19,7 @@ func TestForwardTimeout(t *testing.T) { }) assert.NoError(t, err) - fwd := NewForwarder(b, "some-task-id") + fwd := NewLogShipper(b, "some-task-id") for i := 0; i < 1; i++ { _, err = fwd.Write([]byte("hello\n")) assert.NoError(t, err) @@ -31,7 +30,7 @@ func TestForwardTimeout(t *testing.T) { } func TestForwardBatch(t *testing.T) { - b := mq.NewInMemoryBroker() + b := NewInMemoryBroker() processed := make(chan any) err := b.SubscribeForTaskLogPart(func(p *tork.TaskLogPart) { @@ -40,7 +39,7 @@ func TestForwardBatch(t *testing.T) { }) assert.NoError(t, err) - fwd := NewForwarder(b, "some-task-id") + fwd := NewLogShipper(b, "some-task-id") for i := 0; i < 5; i++ { _, err = fwd.Write([]byte(fmt.Sprintf("hello %d\n", i))) diff --git a/runtime/docker/docker.go b/runtime/docker/docker.go index 406b59ac..b4bf21fc 100644 --- a/runtime/docker/docker.go +++ b/runtime/docker/docker.go @@ -30,7 +30,6 @@ import ( "github.com/pkg/errors" "github.com/rs/zerolog/log" "github.com/runabol/tork" - "github.com/runabol/tork/internal/logging" "github.com/runabol/tork/internal/syncx" "github.com/runabol/tork/internal/uuid" "github.com/runabol/tork/mq" @@ -157,7 +156,7 @@ func (d *DockerRuntime) Run(ctx context.Context, t *tork.Task) error { } var logger io.Writer if d.broker != nil { - logger = logging.NewForwarder(d.broker, t.ID) + logger = mq.NewLogShipper(d.broker, t.ID) } else { logger = os.Stdout } diff --git a/runtime/shell/shell.go b/runtime/shell/shell.go index 36f7dbd3..b65faee9 100644 --- a/runtime/shell/shell.go +++ b/runtime/shell/shell.go @@ -15,7 +15,6 @@ import ( "github.com/pkg/errors" "github.com/rs/zerolog/log" "github.com/runabol/tork" - "github.com/runabol/tork/internal/logging" "github.com/runabol/tork/internal/reexec" "github.com/runabol/tork/internal/syncx" "github.com/runabol/tork/internal/uuid" @@ -101,7 +100,7 @@ func (r *ShellRuntime) Run(ctx context.Context, t *tork.Task) error { } var logger io.Writer if r.broker != nil { - logger = logging.NewForwarder(r.broker, t.ID) + logger = mq.NewLogShipper(r.broker, t.ID) } else { logger = os.Stdout }