This repository has been archived by the owner on May 8, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 32
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Show a scrolling output for each in progress task.
- Loading branch information
Tom Elliott
committed
Nov 8, 2018
1 parent
1af0c53
commit 36487b2
Showing
17 changed files
with
374 additions
and
58 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
package servicelogs | ||
|
||
import ( | ||
"log" | ||
"os" | ||
"time" | ||
|
||
"github.com/hpcloud/tail" | ||
"github.com/pkg/errors" | ||
) | ||
|
||
type LogFollower struct { | ||
logs chan LogLine | ||
done chan struct{} | ||
|
||
runLog string | ||
} | ||
|
||
// NewLogFollower creates a log follower that tails a log file for the specified service | ||
func NewLogFollower(runLog string) *LogFollower { | ||
return &LogFollower{ | ||
runLog: runLog, | ||
done: make(chan struct{}), | ||
} | ||
} | ||
|
||
func (f *LogFollower) Start() <-chan LogLine { | ||
logs := make(chan LogLine) | ||
go f.doStart(logs) | ||
return logs | ||
} | ||
|
||
func (f *LogFollower) doStart(logs chan<- LogLine) { | ||
// Wait for file to exist | ||
var exists bool | ||
for !exists { | ||
_, err := os.Stat(f.runLog) | ||
exists = !os.IsNotExist(err) | ||
select { | ||
case <-f.done: | ||
close(logs) | ||
return | ||
default: | ||
time.Sleep(time.Millisecond * 100) | ||
} | ||
} | ||
|
||
err := doFollowServiceLog(f.runLog, 0, logs, f.done) | ||
if err != nil { | ||
log.Print("error", err) | ||
return | ||
} | ||
} | ||
|
||
func (f *LogFollower) Stop() { | ||
close(f.done) | ||
} | ||
|
||
func doFollowServiceLog(file string, skipLines int, logChannel chan<- LogLine, done <-chan struct{}) error { | ||
t, err := tail.TailFile(file, tail.Config{ | ||
Follow: true, | ||
Logger: tail.DiscardingLogger, | ||
Location: &tail.SeekInfo{ | ||
Offset: 0, | ||
Whence: 0, | ||
}, | ||
}) | ||
if err != nil { | ||
return errors.WithStack(err) | ||
} | ||
defer func() { | ||
close(logChannel) | ||
}() | ||
var linesSkipped int | ||
for line := range t.Lines { | ||
if linesSkipped < skipLines { | ||
linesSkipped++ | ||
continue | ||
} | ||
lineData, err := ParseLogLine(line.Text) | ||
if err != nil { | ||
t.Err() | ||
} | ||
logChannel <- lineData | ||
|
||
select { | ||
case <-done: | ||
return nil | ||
default: | ||
} | ||
} | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,149 @@ | ||
package servicelogs_test | ||
|
||
import ( | ||
"encoding/json" | ||
"fmt" | ||
"io/ioutil" | ||
"os" | ||
"path/filepath" | ||
"testing" | ||
"time" | ||
|
||
"github.com/yext/edward/instance/servicelogs" | ||
) | ||
|
||
func TestFollowLogsExisting(t *testing.T) { | ||
dir, err := ioutil.TempDir("", "example") | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
defer os.RemoveAll(dir) // clean up | ||
|
||
tmpfn := filepath.Join(dir, "tmpfile") | ||
f, err := os.Create(tmpfn) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
for i := 0; i < 10; i++ { | ||
lineData := servicelogs.LogLine{ | ||
Name: "MyService", | ||
Time: time.Now(), | ||
Stream: "stream", | ||
Message: fmt.Sprint(i), | ||
} | ||
|
||
jsonContent, err := json.Marshal(lineData) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
fmt.Fprintln(f, string(jsonContent)) | ||
} | ||
|
||
lf := servicelogs.NewLogFollower(tmpfn) | ||
lc := lf.Start() | ||
defer lf.Stop() | ||
|
||
success := make(chan struct{}) | ||
var count int | ||
go func() { | ||
for range lc { | ||
fmt.Println(count) | ||
count++ | ||
if count == 20 { | ||
close(success) | ||
} | ||
} | ||
}() | ||
|
||
for i := 10; i < 20; i++ { | ||
lineData := servicelogs.LogLine{ | ||
Name: "MyService", | ||
Time: time.Now(), | ||
Stream: "stream", | ||
Message: fmt.Sprint(i), | ||
} | ||
|
||
jsonContent, err := json.Marshal(lineData) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
fmt.Fprintln(f, string(jsonContent)) | ||
} | ||
|
||
select { | ||
case <-success: | ||
return | ||
case <-time.After(time.Second): | ||
t.Errorf("Timed out waiting for results") | ||
} | ||
|
||
if t.Failed() { | ||
t.Logf("Got %d results", count) | ||
} | ||
|
||
} | ||
|
||
func TestFollowLogsWaitForCreation(t *testing.T) { | ||
dir, err := ioutil.TempDir("", "example") | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
defer os.RemoveAll(dir) // clean up | ||
|
||
tmpfn := filepath.Join(dir, "tmpfile") | ||
|
||
lf := servicelogs.NewLogFollower(tmpfn) | ||
lc := lf.Start() | ||
defer lf.Stop() | ||
|
||
f, err := os.Create(tmpfn) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
success := make(chan struct{}) | ||
var count int | ||
go func() { | ||
for range lc { | ||
fmt.Println(count) | ||
count++ | ||
if count == 20 { | ||
close(success) | ||
return | ||
} | ||
} | ||
}() | ||
|
||
for i := 0; i < 20; i++ { | ||
lineData := servicelogs.LogLine{ | ||
Name: "MyService", | ||
Time: time.Now(), | ||
Stream: "stream", | ||
Message: fmt.Sprint(i), | ||
} | ||
|
||
jsonContent, err := json.Marshal(lineData) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
fmt.Fprintln(f, string(jsonContent)) | ||
time.Sleep(time.Millisecond) | ||
} | ||
|
||
select { | ||
case <-success: | ||
return | ||
case <-time.After(time.Second): | ||
t.Errorf("Timed out waiting for results") | ||
} | ||
|
||
if t.Failed() { | ||
t.Logf("Got %d results", count) | ||
} | ||
|
||
} |
Oops, something went wrong.