Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add synchronous jobs #131

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ jobs:
# each job needs a unique name, it's used for logging and as a default label
- name: "example"
# interval defined the pause between the runs of this job
# set to 0 to make the queries synchronous
interval: '5m'
# cron_schedule when to execute the job in the standard CRON syntax
# if specified, the interval is ignored
Expand Down
2 changes: 2 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ func (c *cronConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
type Job struct {
log log.Logger
conns []*connection
Trigger chan bool // used to trigger execution
Done chan bool // used to tell state
Name string `yaml:"name"` // name of this job
KeepAlive bool `yaml:"keepalive"` // keep connection between runs?
Interval time.Duration `yaml:"interval"` // interval at which this job is run
Expand Down
35 changes: 35 additions & 0 deletions handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package main

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net/http"
)

// handlerFunc can be used as handler for http.HandleFunc()
// all synchronous jobs will be triggered and waited for,
// then the promhttp handler is executed
func (ex *Exporter) handlerFunc(w http.ResponseWriter, req *http.Request) {
// pull all triggers on jobs with interval 0
for _, job := range ex.jobs {
// if job is nil or is async then continue to next job
if job == nil || job.Interval > 0 {
continue
}
job.Trigger <- true
}

// wait for all sync jobs to finish
for _, job := range ex.jobs {
if job == nil || job.Interval > 0 {
continue
}
<-job.Done
}

// get the prometheus handler
handler := promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{})

// execute the ServeHTTP function
handler.ServeHTTP(w, req)
}
36 changes: 29 additions & 7 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ func (j *Job) Init(logger log.Logger, queries map[string]string) error {
}

func (j *Job) updateConnections() {
// if the interval is not set > 0, create needed channels
if j.Interval <= 0 {
if j.Trigger == nil {
j.Trigger = make(chan bool)
}

if j.Done == nil {
j.Done = make(chan bool)
}
}
// if there are no connection URLs for this job it can't be run
if j.Connections == nil {
level.Error(j.log).Log("msg", "no connections for job", "job_name", j.Name)
Expand Down Expand Up @@ -405,13 +415,25 @@ func (j *Job) markFailed(conn *connection) {

// Run the job queries with exponential backoff, implements the cron.Job interface
func (j *Job) Run() {
bo := backoff.NewExponentialBackOff()
bo.MaxElapsedTime = j.Interval
if bo.MaxElapsedTime == 0 {
bo.MaxElapsedTime = time.Minute
}
if err := backoff.Retry(j.runOnce, bo); err != nil {
level.Error(j.log).Log("msg", "Failed to run", "err", err)
// if the interval is 0 or lower, wait to be triggered
if j.Interval <= 0 {
// wait for trigger
<-j.Trigger
if err := j.runOnce(); err != nil {
level.Error(j.log).Log("msg", "Failed to run", "err", err)
}

// send true into done channel
j.Done <- true
} else {
bo := backoff.NewExponentialBackOff()
bo.MaxElapsedTime = j.Interval
if bo.MaxElapsedTime == 0 {
bo.MaxElapsedTime = time.Minute
}
if err := backoff.Retry(j.runOnce, bo); err != nil {
level.Error(j.log).Log("msg", "Failed to run", "err", err)
}
}
}

Expand Down
5 changes: 2 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/version"
)

Expand Down Expand Up @@ -62,8 +61,8 @@ func main() {
}
prometheus.MustRegister(exporter)

// setup and start webserver
http.Handle(*metricsPath, promhttp.Handler())
// setup and start webserver with custom function
http.HandleFunc(*metricsPath, exporter.handlerFunc)
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { http.Error(w, "OK", http.StatusOK) })
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`<html>
Expand Down