Skip to content

Commit

Permalink
Refactored log forwarder
Browse files Browse the repository at this point in the history
  • Loading branch information
runabol committed Aug 19, 2024
1 parent 7ebb285 commit 967dee5
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 18 deletions.
15 changes: 7 additions & 8 deletions internal/logging/forwarder.go → mq/log.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package logging
package mq

import (
"context"
Expand All @@ -7,18 +7,17 @@ import (

"github.com/rs/zerolog/log"
"github.com/runabol/tork"
"github.com/runabol/tork/mq"
)

type Forwarder struct {
Broker mq.Broker
type LogShipper struct {
Broker Broker
TaskID string
part int
q chan []byte
}

func NewForwarder(broker mq.Broker, taskID string) *Forwarder {
f := &Forwarder{
func NewLogShipper(broker Broker, taskID string) *LogShipper {
f := &LogShipper{
Broker: broker,
TaskID: taskID,
q: make(chan []byte, 1000),
Expand All @@ -27,7 +26,7 @@ func NewForwarder(broker mq.Broker, taskID string) *Forwarder {
return f
}

func (r *Forwarder) Write(p []byte) (int, error) {
func (r *LogShipper) Write(p []byte) (int, error) {
pc := make([]byte, len(p))
copy(pc, p)
select {
Expand All @@ -38,7 +37,7 @@ func (r *Forwarder) Write(p []byte) (int, error) {
}
}

func (r *Forwarder) startFlushTimer() {
func (r *LogShipper) startFlushTimer() {
ticker := time.NewTicker(time.Second)
buffer := make([]byte, 0)
for {
Expand Down
11 changes: 5 additions & 6 deletions internal/logging/forwarder_test.go → mq/log_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package logging
package mq

import (
"fmt"
"testing"
"time"

"github.com/runabol/tork"
"github.com/runabol/tork/mq"
"github.com/stretchr/testify/assert"
)

func TestForwardTimeout(t *testing.T) {
b := mq.NewInMemoryBroker()
b := NewInMemoryBroker()

processed := make(chan any)
err := b.SubscribeForTaskLogPart(func(p *tork.TaskLogPart) {
Expand All @@ -20,7 +19,7 @@ func TestForwardTimeout(t *testing.T) {
})
assert.NoError(t, err)

fwd := NewForwarder(b, "some-task-id")
fwd := NewLogShipper(b, "some-task-id")
for i := 0; i < 1; i++ {
_, err = fwd.Write([]byte("hello\n"))
assert.NoError(t, err)
Expand All @@ -31,7 +30,7 @@ func TestForwardTimeout(t *testing.T) {
}

func TestForwardBatch(t *testing.T) {
b := mq.NewInMemoryBroker()
b := NewInMemoryBroker()

processed := make(chan any)
err := b.SubscribeForTaskLogPart(func(p *tork.TaskLogPart) {
Expand All @@ -40,7 +39,7 @@ func TestForwardBatch(t *testing.T) {
})
assert.NoError(t, err)

fwd := NewForwarder(b, "some-task-id")
fwd := NewLogShipper(b, "some-task-id")

for i := 0; i < 5; i++ {
_, err = fwd.Write([]byte(fmt.Sprintf("hello %d\n", i)))
Expand Down
3 changes: 1 addition & 2 deletions runtime/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"github.com/runabol/tork"
"github.com/runabol/tork/internal/logging"
"github.com/runabol/tork/internal/syncx"
"github.com/runabol/tork/internal/uuid"
"github.com/runabol/tork/mq"
Expand Down Expand Up @@ -157,7 +156,7 @@ func (d *DockerRuntime) Run(ctx context.Context, t *tork.Task) error {
}
var logger io.Writer
if d.broker != nil {
logger = logging.NewForwarder(d.broker, t.ID)
logger = mq.NewLogShipper(d.broker, t.ID)
} else {
logger = os.Stdout
}
Expand Down
3 changes: 1 addition & 2 deletions runtime/shell/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"github.com/runabol/tork"
"github.com/runabol/tork/internal/logging"
"github.com/runabol/tork/internal/reexec"
"github.com/runabol/tork/internal/syncx"
"github.com/runabol/tork/internal/uuid"
Expand Down Expand Up @@ -101,7 +100,7 @@ func (r *ShellRuntime) Run(ctx context.Context, t *tork.Task) error {
}
var logger io.Writer
if r.broker != nil {
logger = logging.NewForwarder(r.broker, t.ID)
logger = mq.NewLogShipper(r.broker, t.ID)
} else {
logger = os.Stdout
}
Expand Down

0 comments on commit 967dee5

Please sign in to comment.