Skip to content

Commit

Permalink
middlewares: save middleware, save output and execution to a file
Browse files Browse the repository at this point in the history
  • Loading branch information
mcuadros committed Oct 7, 2015
1 parent c136333 commit b4b7032
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 11 deletions.
4 changes: 4 additions & 0 deletions cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const logFormat = "%{color}%{shortfile} ▶ %{level}%{color:reset} %{message}"
type Config struct {
Global struct {
middlewares.SlackConfig
middlewares.SaveConfig
}
Jobs map[string]*ExecJobConfig `gcfg:"Job"`
}
Expand Down Expand Up @@ -78,16 +79,19 @@ func (c *Config) buildLogger() core.Logger {

func (c *Config) buildSchedulerMiddlewares(sh *core.Scheduler) {
sh.Use(middlewares.NewSlack(&c.Global.SlackConfig))
sh.Use(middlewares.NewSave(&c.Global.SaveConfig))
}

// ExecJobConfig contains all configuration params needed to build a ExecJob
type ExecJobConfig struct {
core.ExecJob
middlewares.OverlapConfig
middlewares.SlackConfig
middlewares.SaveConfig
}

func (c *ExecJobConfig) buildMiddlewares() {
c.ExecJob.Use(middlewares.NewOverlap(&c.OverlapConfig))
c.ExecJob.Use(middlewares.NewSlack(&c.SlackConfig))
c.ExecJob.Use(middlewares.NewSave(&c.SaveConfig))
}
2 changes: 1 addition & 1 deletion core/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ type Execution struct {
Skipped bool
Error error

OutputStream, ErrorStream io.ReadWriter
OutputStream, ErrorStream io.ReadWriter `json:"-"`
}

// NewExecution returns a new Execution, with a random ID
Expand Down
2 changes: 1 addition & 1 deletion core/execjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ var ErrUnexpected = errors.New("error unexpected, docker has returned exit code

type ExecJob struct {
BareJob
Client *docker.Client
Client *docker.Client `json:"-"`
Container string
User string `default:"root"`
TTY bool `default:"false"`
Expand Down
6 changes: 6 additions & 0 deletions middlewares/overlap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package middlewares

import "github.com/mcuadros/ofelia/core"

// OverlapConfig configuration for the Overlap middleware
type OverlapConfig struct {
NoOverlap bool `gcfg:"no-overlap"`
}

// NewOverlap returns a Overlap middleware if the given configuration is not empty
func NewOverlap(c *OverlapConfig) core.Middleware {
var m core.Middleware
if !IsEmpty(c) {
Expand All @@ -15,14 +17,18 @@ func NewOverlap(c *OverlapConfig) core.Middleware {
return m
}

// Overlap when this middleware is enabled avoid to overlap executions from a
// specific job
type Overlap struct {
OverlapConfig
}

// ContinueOnStop Overlap is only called if the process is still running
func (m *Overlap) ContinueOnStop() bool {
return false
}

// Run stops the execution if the another execution is already running
func (m *Overlap) Run(ctx *core.Context) error {
if m.NoOverlap && ctx.Job.Running() > 1 {
ctx.Stop(core.ErrSkippedExecution)
Expand Down
102 changes: 102 additions & 0 deletions middlewares/save.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package middlewares

import (
"bytes"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"

"github.com/mcuadros/ofelia/core"
)

// SaveConfig configuration for the Save middleware
type SaveConfig struct {
SaveFolder string `gcfg:"save-folder"`
SaveOnlyOnError bool `gcfg:"save-only-on-error"`
}

// NewSave returns a Save middleware if the given configuration is not empty
func NewSave(c *SaveConfig) core.Middleware {
var m core.Middleware
if !IsEmpty(c) {
m = &Save{*c}
}

return m
}

// Save the save middleware saves to disk a dump of the stdout and stderr after
// every execution of the process
type Save struct {
SaveConfig
}

// ContinueOnStop return allways true, we want always report the final status
func (m *Save) ContinueOnStop() bool {
return true
}

// Run save the result of the execution to disk
func (m *Save) Run(ctx *core.Context) error {
err := ctx.Next()
ctx.Stop(err)

if ctx.Execution.Failed || !m.SaveOnlyOnError {
err := m.saveToDisk(ctx)
if err != nil {
ctx.Logger.Error("Save error: %q", err)
}
}

return err
}

func (m *Save) saveToDisk(ctx *core.Context) error {
root := filepath.Join(m.SaveFolder, fmt.Sprintf(
"%s_%s",
ctx.Execution.Date.Format("20060102_150405"), ctx.Job.GetName(),
))

e := ctx.Execution
err := m.saveReaderToDisk(e.ErrorStream, fmt.Sprintf("%s.stderr.log", root))
if err != nil {
return err
}

err = m.saveReaderToDisk(e.OutputStream, fmt.Sprintf("%s.stdout.log", root))
if err != nil {
return err
}

err = m.saveContextToDisk(ctx, fmt.Sprintf("%s.json", root))
if err != nil {
return err
}

return nil
}

func (m *Save) saveContextToDisk(ctx *core.Context, filename string) error {
js, _ := json.MarshalIndent(map[string]interface{}{
"Job": ctx.Job,
"Execution": ctx.Execution,
}, "", " ")

return m.saveReaderToDisk(bytes.NewBuffer(js), filename)
}

func (m *Save) saveReaderToDisk(r io.Reader, filename string) error {
f, err := os.Create(filename)
if err != nil {
return err
}

defer f.Close()
if _, err := io.Copy(f, r); err != nil {
return err
}

return nil
}
60 changes: 60 additions & 0 deletions middlewares/save_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package middlewares

import (
"io/ioutil"
"os"
"path/filepath"
"time"

. "gopkg.in/check.v1"
)

type SuiteSave struct {
BaseSuite
}

var _ = Suite(&SuiteSave{})

func (s *SuiteSave) TestNewSlackEmpty(c *C) {
c.Assert(NewSave(&SaveConfig{}), IsNil)
}

func (s *SuiteSave) TestRunSuccess(c *C) {
dir, err := ioutil.TempDir("/tmp", "save")
c.Assert(err, IsNil)

s.ctx.Start()
s.ctx.Stop(nil)

s.job.Name = "foo"
s.ctx.Execution.Date = time.Time{}

m := NewSave(&SaveConfig{SaveFolder: dir})
c.Assert(m.Run(s.ctx), IsNil)

_, err = os.Stat(filepath.Join(dir, "00010101_000000_foo.json"))
c.Assert(err, IsNil)

_, err = os.Stat(filepath.Join(dir, "00010101_000000_foo.stdout.log"))
c.Assert(err, IsNil)

_, err = os.Stat(filepath.Join(dir, "00010101_000000_foo.stderr.log"))
c.Assert(err, IsNil)
}

func (s *SuiteSave) TestRunSuccessOnError(c *C) {
dir, err := ioutil.TempDir("/tmp", "save")
c.Assert(err, IsNil)

s.ctx.Start()
s.ctx.Stop(nil)

s.job.Name = "foo"
s.ctx.Execution.Date = time.Time{}

m := NewSave(&SaveConfig{SaveFolder: dir, SaveOnlyOnError: true})
c.Assert(m.Run(s.ctx), IsNil)

_, err = os.Stat(filepath.Join(dir, "00010101_000000_foo.json"))
c.Assert(err, Not(IsNil))
}
15 changes: 9 additions & 6 deletions middlewares/slack.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ var (
slackPayloadVar = "payload"
)

// SlackConfig configuration for the Slack middleware
type SlackConfig struct {
URL string `gcfg:"slack-webhook"`
OnlyOnError bool `gcfg:"slack-only-on-error"`
SlackWebhook string `gcfg:"slack-webhook"`
SlackOnlyOnError bool `gcfg:"slack-only-on-error"`
}

// NewSlack returns a Slack middleware if the given configuration is not empty
func NewSlack(c *SlackConfig) core.Middleware {
var m core.Middleware
if !IsEmpty(c) {
Expand All @@ -29,6 +31,7 @@ func NewSlack(c *SlackConfig) core.Middleware {
return m
}

// Slack middleware calls to a Slack input-hook after every execution of a job
type Slack struct {
SlackConfig
}
Expand All @@ -44,7 +47,7 @@ func (m *Slack) Run(ctx *core.Context) error {
err := ctx.Next()
ctx.Stop(err)

if ctx.Execution.Failed || !m.OnlyOnError {
if ctx.Execution.Failed || !m.SlackOnlyOnError {
m.pushMessage(ctx)
}

Expand All @@ -56,11 +59,11 @@ func (m *Slack) pushMessage(ctx *core.Context) {
content, _ := json.Marshal(m.buildMessage(ctx))
values.Add(slackPayloadVar, string(content))

r, err := http.PostForm(m.URL, values)
r, err := http.PostForm(m.SlackWebhook, values)
if err != nil {
ctx.Logger.Error("Slack error calling %q error: %q", m.URL, err)
ctx.Logger.Error("Slack error calling %q error: %q", m.SlackWebhook, err)
} else if r.StatusCode != 200 {
ctx.Logger.Error("Slack error non-200 status code calling %q", m.URL)
ctx.Logger.Error("Slack error non-200 status code calling %q", m.SlackWebhook)
}
}

Expand Down
6 changes: 3 additions & 3 deletions middlewares/slack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (s *SuiteSlack) TestRunSuccess(c *C) {
s.ctx.Start()
s.ctx.Stop(nil)

m := NewSlack(&SlackConfig{URL: ts.URL})
m := NewSlack(&SlackConfig{SlackWebhook: ts.URL})
c.Assert(m.Run(s.ctx), IsNil)
}

Expand All @@ -47,7 +47,7 @@ func (s *SuiteSlack) TestRunSuccessFailed(c *C) {
s.ctx.Start()
s.ctx.Stop(errors.New("foo"))

m := NewSlack(&SlackConfig{URL: ts.URL})
m := NewSlack(&SlackConfig{SlackWebhook: ts.URL})
c.Assert(m.Run(s.ctx), IsNil)
}

Expand All @@ -61,6 +61,6 @@ func (s *SuiteSlack) TestRunSuccessOnError(c *C) {
s.ctx.Start()
s.ctx.Stop(nil)

m := NewSlack(&SlackConfig{URL: ts.URL, OnlyOnError: true})
m := NewSlack(&SlackConfig{SlackWebhook: ts.URL, SlackOnlyOnError: true})
c.Assert(m.Run(s.ctx), IsNil)
}

0 comments on commit b4b7032

Please sign in to comment.