diff --git a/.github/workflows/todo_to_issue.yml b/.github/workflows/todo_to_issue.yml index 215828e..f9fb9a9 100644 --- a/.github/workflows/todo_to_issue.yml +++ b/.github/workflows/todo_to_issue.yml @@ -9,6 +9,9 @@ on: description: "By default, the commit entered above is compared to the one directly before it; to go back further, enter an earlier SHA here" required: false push: + branches: + - main + jobs: build: runs-on: "ubuntu-latest" diff --git a/.golangci.yaml b/.golangci.yaml index 23cce0c..871b62c 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -5,7 +5,7 @@ run: issues-exit-code: 1 tests: true # list of build tags, all linters use it. Default is empty list. - #build-tags: + build-tags: testing # which dirs to skip: issues from them won't be reported; # can use regexp here: generated.*, regexp is applied on full path; @@ -133,7 +133,7 @@ linters: - goconst - gocritic - gocyclo - - godox + # - godox - goerr113 - gofmt - goheader diff --git a/Makefile b/Makefile index c19fb69..4fa15cf 100644 --- a/Makefile +++ b/Makefile @@ -26,6 +26,7 @@ test: install-gotestsum -count=5 \ -race \ -cover \ + -tags testing \ -coverprofile=tmp/coverage/coverage.out \ ./... diff --git a/README.md b/README.md index cb495b9..8cb9202 100644 --- a/README.md +++ b/README.md @@ -41,13 +41,13 @@ Queue handlers listen for Jobs on queues. Jobs may consist of any payload that i Queue Handlers are simple Go functions that accept a `Context` parameter. -**Example**: Add a listener on the `hello_world` queue +**Example**: Add a listener on the `hello_world` queue using the default in-memory backend ```go ctx := context.Background() nq, _ := neoq.New(ctx) -nq.Listen(ctx, "hello_world", neoq.NewHandler(func(ctx context.Context) (err error) { - j, err := neoq.JobFromContext(ctx) +nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) { + j, _ := handler.JobFromContext(ctx) log.Println("got job id:", j.ID, "messsage:", j.Payload["message"]) return })) @@ -55,12 +55,14 @@ nq.Listen(ctx, "hello_world", neoq.NewHandler(func(ctx context.Context) (err err ## Enqueue jobs -**Example**: Add a "Hello World" job to the `hello_world` queue +Enqueuing jobs adds jobs to the specified queue to be processed asynchronously. + +**Example**: Add a "Hello World" job to the `hello_world` queue using the default in-memory backend. ```go ctx := context.Background() nq, _ := neoq.New(ctx) -jid, _ := nq.Enqueue(ctx, neoq.Job{ +jid, _ := nq.Enqueue(ctx, &jobs.Job{ Queue: "hello_world", Payload: map[string]interface{}{ "message": "hello world", @@ -68,6 +70,30 @@ jid, _ := nq.Enqueue(ctx, neoq.Job{ }) ``` +## Postgres + +**Example**: Process jobs on the "hello_world" queue and add a job to it using the postgres backend + +```go +ctx := context.Background() +nq, _ := neoq.New(ctx, + neoq.WithBackend(postgres.Backend), + config.WithConnectionString("postgres://postgres:postgres@127.0.0.1:5432/neoq"), +) + +nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) { + j, _ := handler.JobFromContext(ctx) + log.Println("got job id:", j.ID, "messsage:", j.Payload["message"]) + return +})) + +nq.Enqueue(ctx, &jobs.Job{ + Queue: "hello_world", + Payload: map[string]interface{}{ + "message": "hello world", + }, +}) +``` # Example Code Additional example integration code can be found at https://github.com/acaloiaro/neoq/tree/main/examples diff --git a/backends/doc.go b/backends/doc.go new file mode 100644 index 0000000..ef8a689 --- /dev/null +++ b/backends/doc.go @@ -0,0 +1,4 @@ +// Package backends provides concrete implementations of [pkg/github.com/acaloiaro/neoq/types.Backend] +// +// These backends provide the bulk of Neoq's functionality. +package backends diff --git a/backends/memory/memory_backend.go b/backends/memory/memory_backend.go new file mode 100644 index 0000000..4e2dc49 --- /dev/null +++ b/backends/memory/memory_backend.go @@ -0,0 +1,310 @@ +package memory + +import ( + "context" + "fmt" + "os" + "sync" + "time" + + "github.com/acaloiaro/neoq/config" + "github.com/acaloiaro/neoq/handler" + "github.com/acaloiaro/neoq/internal" + "github.com/acaloiaro/neoq/jobs" + "github.com/acaloiaro/neoq/logging" + "github.com/acaloiaro/neoq/types" + "github.com/guregu/null" + "github.com/iancoleman/strcase" // TODO factor out + "github.com/jsuar/go-cron-descriptor/pkg/crondescriptor" // TODO factor out + "github.com/pkg/errors" + "github.com/robfig/cron" + "golang.org/x/exp/slog" +) + +const ( + defaultMemQueueCapacity = 10000 // the default capacity of individual queues + emptyCapacity = 0 +) + +// MemBackend is a memory-backed neoq backend +type MemBackend struct { + types.Backend + config *config.Config + logger logging.Logger + handlers *sync.Map // map queue names [string] to queue handlers [Handler] + fingerprints *sync.Map // map fingerprints [string] to job [Job] + futureJobs *sync.Map // map jobIDs [int64] to job [Job] + queues *sync.Map // map queue names [string] to queue handler channels [chan Job] + cron *cron.Cron + mu *sync.Mutex // mutext to protect mutating state on a pgWorker + cancelFuncs []context.CancelFunc // A collection of cancel functions to be called upon Shutdown() + jobCount int64 // number of jobs that have been queued since start +} + +// Backend is a [config.BackendInitializer] that initializes a new memory-backed neoq backend +func Backend(ctx context.Context, opts ...config.Option) (backend types.Backend, err error) { + mb := &MemBackend{ + config: config.New(), + cron: cron.New(), + mu: &sync.Mutex{}, + queues: &sync.Map{}, + handlers: &sync.Map{}, + futureJobs: &sync.Map{}, + fingerprints: &sync.Map{}, + logger: slog.New(slog.NewTextHandler(os.Stdout)), + jobCount: 0, + cancelFuncs: []context.CancelFunc{}, + } + mb.cron.Start() + + for _, opt := range opts { + opt(mb.config) + } + + backend = mb + + return +} + +// Enqueue queues jobs to be executed asynchronously +func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID int64, err error) { + var queueChan chan *jobs.Job + var qc any + var ok bool + + if qc, ok = m.queues.Load(job.Queue); !ok { + return jobs.UnqueuedJobID, fmt.Errorf("%w: %s", handler.ErrNoProcessorForQueue, job.Queue) + } + + queueChan = qc.(chan *jobs.Job) + + // Make sure RunAfter is set to a non-zero value if not provided by the caller + // if already set, schedule the future job + now := time.Now() + if job.RunAfter.IsZero() { + job.RunAfter = now + } + + if job.Queue == "" { + err = errors.New("this job does not specify a Queue. Please specify a queue") + + return + } + + err = jobs.FingerprintJob(job) + if err != nil { + return + } + + // if the job fingerprint is already known, don't queue the job + if _, found := m.fingerprints.Load(job.Fingerprint); found { + return jobs.DuplicateJobID, nil + } + + m.fingerprints.Store(job.Fingerprint, job) + m.mu.Lock() + m.jobCount++ + m.mu.Unlock() + + job.ID = m.jobCount + jobID = m.jobCount + + if job.RunAfter.Equal(now) { + queueChan <- job + } else { + m.queueFutureJob(job) + } + + return jobID, nil +} + +// Start starts processing jobs with the specified queue and handler +func (m *MemBackend) Start(ctx context.Context, queue string, h handler.Handler) (err error) { + var queueCapacity = h.QueueCapacity + if queueCapacity == emptyCapacity { + queueCapacity = defaultMemQueueCapacity + } + + m.handlers.Store(queue, h) + m.queues.Store(queue, make(chan *jobs.Job, queueCapacity)) + + ctx, cancel := context.WithCancel(ctx) + + m.mu.Lock() + m.cancelFuncs = append(m.cancelFuncs, cancel) + m.mu.Unlock() + + err = m.start(ctx, queue) + if err != nil { + return + } + + return +} + +// StartCron starts processing jobs with the specified cron schedule and handler +// +// See: https://pkg.go.dev/github.com/robfig/cron?#hdr-CRON_Expression_Format for details on the cron spec format +func (m *MemBackend) StartCron(ctx context.Context, cronSpec string, h handler.Handler) (err error) { + cd, err := crondescriptor.NewCronDescriptor(cronSpec) + if err != nil { + return fmt.Errorf("error creating cron descriptor: %w", err) + } + + cdStr, err := cd.GetDescription(crondescriptor.Full) + if err != nil { + return fmt.Errorf("error getting cron description: %w", err) + } + + queue := internal.StripNonAlphanum(strcase.ToSnake(*cdStr)) + + ctx, cancel := context.WithCancel(ctx) + m.mu.Lock() + m.cancelFuncs = append(m.cancelFuncs, cancel) + m.mu.Unlock() + + err = m.Start(ctx, queue, h) + if err != nil { + return fmt.Errorf("error processing queue '%s': %w", queue, err) + } + + if err := m.cron.AddFunc(cronSpec, func() { + _, _ = m.Enqueue(ctx, &jobs.Job{Queue: queue}) + }); err != nil { + return fmt.Errorf("error adding cron: %w", err) + } + + return +} + +// SetLogger sets this backend's logger +func (m *MemBackend) SetLogger(logger logging.Logger) { + m.logger = logger +} + +// Shutdown halts the worker +func (m *MemBackend) Shutdown(ctx context.Context) { + for _, f := range m.cancelFuncs { + f() + } + + m.cancelFuncs = nil +} + +// start starts a processor that handles new incoming jobs and future jobs +func (m *MemBackend) start(ctx context.Context, queue string) (err error) { + var queueChan chan *jobs.Job + var qc any + var ht any + var h handler.Handler + var ok bool + + if ht, ok = m.handlers.Load(queue); !ok { + return fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, queue) + } + + if qc, ok = m.queues.Load(queue); !ok { + return fmt.Errorf("%w: %s", handler.ErrNoProcessorForQueue, queue) + } + + go func() { m.scheduleFutureJobs(ctx, queue) }() + + h = ht.(handler.Handler) + queueChan = qc.(chan *jobs.Job) + + for i := 0; i < h.Concurrency; i++ { + go func() { + var err error + var job *jobs.Job + + for { + select { + case job = <-queueChan: + err = m.handleJob(ctx, job, h) + case <-ctx.Done(): + return + } + + if err != nil { + m.logger.Error("job failed", err, "job_id", job.ID) + runAfter := internal.CalculateBackoff(job.Retries) + job.RunAfter = runAfter + m.queueFutureJob(job) + } + + m.fingerprints.Delete(job.Fingerprint) + } + }() + } + + return nil +} + +func (m *MemBackend) scheduleFutureJobs(ctx context.Context, queue string) { + // check for new future jobs on an interval + ticker := time.NewTicker(m.config.JobCheckInterval) + + for { + // loop over list of future jobs, scheduling goroutines to wait for jobs that are due within the next 30 seconds + m.futureJobs.Range(func(_, v any) bool { + job := v.(*jobs.Job) + var queueChan chan *jobs.Job + + timeUntilRunAfter := time.Until(job.RunAfter) + if timeUntilRunAfter <= m.config.FutureJobWindow { + m.removeFutureJob(job.ID) + go func(j *jobs.Job) { + scheduleCh := time.After(timeUntilRunAfter) + <-scheduleCh + if qc, ok := m.queues.Load(queue); ok { + queueChan = qc.(chan *jobs.Job) + queueChan <- j + } else { + m.logger.Error(fmt.Sprintf("no queue processor for queue '%s'", queue), errors.New("no queue processor configured")) + } + }(job) + } + + return true + }) + select { + case <-ticker.C: + continue + case <-ctx.Done(): + return + } + } +} + +func (m *MemBackend) handleJob(ctx context.Context, job *jobs.Job, h handler.Handler) (err error) { + ctxv := handler.CtxVars{Job: job} + hctx := handler.WithContext(ctx, ctxv) + + // check if the job is being retried and increment retry count accordingly + if job.Status != internal.JobStatusNew { + job.Retries++ + } + + // execute the queue handler of this job + err = handler.Exec(hctx, h) + if err != nil { + job.Error = null.StringFrom(err.Error()) + } + + return +} + +// queueFutureJob queues a future job for eventual execution +func (m *MemBackend) queueFutureJob(job *jobs.Job) { + m.fingerprints.Store(job.Fingerprint, job) + m.futureJobs.Store(job.ID, job) +} + +// removeFutureJob removes a future job from the in-memory list of jobs that will execute in the future +func (m *MemBackend) removeFutureJob(jobID int64) { + if j, ok := m.futureJobs.Load(jobID); ok { + job := j.(*jobs.Job) + m.fingerprints.Delete(job.Fingerprint) + m.futureJobs.Delete(job.ID) + } +} diff --git a/backends/memory/memory_backend_helper_test.go b/backends/memory/memory_backend_helper_test.go new file mode 100644 index 0000000..82a72d6 --- /dev/null +++ b/backends/memory/memory_backend_helper_test.go @@ -0,0 +1,43 @@ +//go:build testing + +package memory + +import ( + "context" + "sync" + + "github.com/acaloiaro/neoq/config" + "github.com/acaloiaro/neoq/logging" + "github.com/acaloiaro/neoq/types" + "github.com/robfig/cron" +) + +// TestingBackend initializes a backend for testing purposes +func TestingBackend(conf *config.Config, + c *cron.Cron, + queues, h, futureJobs, fingerprints *sync.Map, + logger logging.Logger) config.BackendInitializer { + return func(ctx context.Context, opts ...config.Option) (backend types.Backend, err error) { + mb := &MemBackend{ + config: conf, + cron: c, + mu: &sync.Mutex{}, + queues: queues, + handlers: h, + futureJobs: futureJobs, + fingerprints: fingerprints, + logger: logger, + jobCount: 0, + cancelFuncs: []context.CancelFunc{}, + } + mb.cron.Start() + + for _, opt := range opts { + opt(mb.config) + } + + backend = mb + + return + } +} diff --git a/memory_backend_test.go b/backends/memory/memory_backend_test.go similarity index 52% rename from memory_backend_test.go rename to backends/memory/memory_backend_test.go index 31c5893..61ae737 100644 --- a/memory_backend_test.go +++ b/backends/memory/memory_backend_test.go @@ -1,52 +1,62 @@ -package neoq +package memory_test import ( "context" "fmt" + "log" + "strings" + "sync" "testing" "time" + "github.com/acaloiaro/neoq" + "github.com/acaloiaro/neoq/backends/memory" + "github.com/acaloiaro/neoq/config" + "github.com/acaloiaro/neoq/handler" + "github.com/acaloiaro/neoq/jobs" + "github.com/acaloiaro/neoq/testutils" "github.com/pkg/errors" + "github.com/robfig/cron" "golang.org/x/exp/slog" ) -// TestMemeoryBackendBasicJobProcessing tests that the memory backend is able to process the most basic jobs with the +const ( + queue = "testing" +) + +// TestBasicJobProcessing tests that the memory backend is able to process the most basic jobs with the // most basic configuration. -func TestMemeoryBackendBasicJobProcessing(t *testing.T) { - queue := "testing" +func TestBasicJobProcessing(t *testing.T) { numJobs := 1000 doneCnt := 0 done := make(chan bool) var timeoutTimer = time.After(5 * time.Second) ctx := context.TODO() - backend, err := NewMemBackend() - if err != nil { - t.Fatal(err) - } - - nq, err := New(ctx, WithBackend(backend)) + nq, err := neoq.New(ctx, neoq.WithBackend(memory.Backend)) if err != nil { t.Fatal(err) } - handler := NewHandler(func(ctx context.Context) (err error) { + h := handler.New(func(_ context.Context) (err error) { done <- true return }) - nq.Listen(ctx, queue, handler) + if err := nq.Start(ctx, queue, h); err != nil { + t.Fatal(err) + } go func() { for i := 0; i < numJobs; i++ { - jid, err := nq.Enqueue(ctx, Job{ + jid, e := nq.Enqueue(ctx, &jobs.Job{ Queue: queue, Payload: map[string]interface{}{ "message": fmt.Sprintf("hello world: %d", i), }, }) - if err != nil || jid == DuplicateJobID { - slog.Error("job was not enqueued. either it was duplicate or this error caused it:", err) + if e != nil || jid == jobs.DuplicateJobID { + slog.Error("job was not enqueued. either it was duplicate or this error caused it:", e) } } }() @@ -75,14 +85,14 @@ func TestMemeoryBackendBasicJobProcessing(t *testing.T) { nq.Shutdown(ctx) } -// TestMemoryBackendConfiguration tests that the memory backend receives and utilizes the MaxQueueCapacity handler +// TestBackendConfiguration tests that the memory backend receives and utilizes the MaxQueueCapacity handler // configuration. // // This test works by enqueueing 3 jobs. Each job sleeps for longer than the test is willing to wait for the jobs to // enqueue. The `done` channel is notified when all 3 jobs are enqueued. // -// By serializing handler execution with `WithOption(HandlerConcurrency(1))` and enqueueing jobs asynchronously, we can wait -// on `done` and a timeout channel to see which one completes first. +// By serializing handler execution by initializing neoq with `handler.Concurrency(1)` and enqueueing jobs +// asynchronously, we can wait on `done` and a timeout channel to see which one completes first. // // Since the queue has a capacity of 1 and the handler is serialized, we know that `done` cannot be notified until job 1 // is complete, job 2 is processing, and job 3 can be added to the queue. @@ -90,41 +100,37 @@ func TestMemeoryBackendBasicJobProcessing(t *testing.T) { // If `done` is notified before the timeout channel, this test would fail, because that would mean Enqueue() is not // blocking while the first Sleep()ing job is running. If the qeueue is blocking when it meets its capacity, we know // that the max queue capacity configuration has taken effect. -func TestMemeoryBackendConfiguration(t *testing.T) { +func TestBackendConfiguration(t *testing.T) { numJobs := 3 - queue := "testing" timeout := false done := make(chan bool) ctx := context.TODO() - backend, err := NewMemBackend() - if err != nil { - t.Fatal(err) - } - - nq, err := New(ctx, WithBackend(backend)) + nq, err := neoq.New(ctx, neoq.WithBackend(memory.Backend)) if err != nil { t.Fatal(err) } - handler := NewHandler(func(ctx context.Context) (err error) { + h := handler.New(func(_ context.Context) (err error) { time.Sleep(100 * time.Millisecond) return - }, HandlerConcurrency(1), MaxQueueCapacity(1)) + }, handler.Concurrency(1), handler.MaxQueueCapacity(1)) - nq.Listen(ctx, queue, handler) + if err := nq.Start(ctx, queue, h); err != nil { + t.Fatal(err) + } go func() { for i := 0; i < numJobs; i++ { - jid, err := nq.Enqueue(ctx, Job{ + jid, e := nq.Enqueue(ctx, &jobs.Job{ Queue: queue, Payload: map[string]interface{}{ "message": fmt.Sprintf("hello world: %d", i), }, }) - if err != nil || jid == DuplicateJobID { - slog.Error("job was not enqueued. either it was duplicate or this error caused it:", err) + if e != nil || jid == jobs.DuplicateJobID { + slog.Error("job was not enqueued. either it was duplicate or this error caused it:", e) } } @@ -145,43 +151,41 @@ func TestMemeoryBackendConfiguration(t *testing.T) { nq.Shutdown(ctx) } -func TestMemeoryBackendFutureJobScheduling(t *testing.T) { - queue := "testing" +var testFutureJobs = &sync.Map{} - ctx := context.TODO() - backend, err := NewMemBackend() - if err != nil { - t.Fatal(err) - } +func TestFutureJobScheduling(t *testing.T) { + ctx := context.Background() + testLogger := testutils.TestLogger{L: log.New(&strings.Builder{}, "", 0), Done: make(chan bool)} + testBackend := memory.TestingBackend(config.New(), cron.New(), &sync.Map{}, &sync.Map{}, testFutureJobs, &sync.Map{}, testLogger) - nq, err := New(ctx, WithBackend(backend)) + nq, err := neoq.New(ctx, neoq.WithBackend(testBackend)) if err != nil { t.Fatal(err) } + defer nq.Shutdown(ctx) - handler := NewHandler(func(ctx context.Context) (err error) { + h := handler.New(func(ctx context.Context) (err error) { return }) - nq.Listen(ctx, queue, handler) + if err := nq.Start(ctx, queue, h); err != nil { + t.Fatal(err) + } - jid, err := nq.Enqueue(ctx, Job{ + jid, err := nq.Enqueue(ctx, &jobs.Job{ Queue: queue, Payload: map[string]interface{}{ "message": "hello world", }, RunAfter: time.Now().Add(5 * time.Second), }) - if err != nil || jid == DuplicateJobID { - slog.Error("job was not enqueued. either it was duplicate or this error caused it:", err) + if err != nil || jid == jobs.DuplicateJobID { + err = fmt.Errorf("job was not enqueued. either it was duplicate or this error caused it: %w", err) + t.Error(err) } - mb := nq.(*MemBackend) - var ok bool - if _, ok = mb.futureJobs.Load(jid); !ok { + if _, ok = testFutureJobs.Load(jid); !ok { t.Error(err) } - - nq.Shutdown(ctx) } diff --git a/postgres_backend.go b/backends/postgres/postgres_backend.go similarity index 56% rename from postgres_backend.go rename to backends/postgres/postgres_backend.go index c61de69..73b1110 100644 --- a/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -1,16 +1,20 @@ -package neoq +package postgres import ( "context" "errors" "fmt" - "log" "os" "strconv" - "strings" "sync" "time" + "github.com/acaloiaro/neoq/config" + "github.com/acaloiaro/neoq/handler" + "github.com/acaloiaro/neoq/internal" + "github.com/acaloiaro/neoq/jobs" + "github.com/acaloiaro/neoq/logging" + "github.com/acaloiaro/neoq/types" "github.com/iancoleman/strcase" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" @@ -44,115 +48,145 @@ const ( ORDER BY run_after ASC LIMIT 100 FOR UPDATE SKIP LOCKED` + setIdleInTxSessionTimeout = `SET idle_in_transaction_session_timeout = 0` +) + +var ( + ErrCnxString = errors.New("invalid connecton string: see documentation for valid connection strings") + ErrDuplicateJobID = errors.New("duplicate job id") + ErrNoQueue = errors.New("no queue specified") + ErrNoTransactionInContext = errors.New("context does not have a Tx set") ) // PgBackend is a Postgres-based Neoq backend type PgBackend struct { - cancelFuncs []context.CancelFunc // A collection of cancel functions to be called upon Shutdown() - config *pgConfig - cron *cron.Cron - listenConn *pgx.Conn - pool *pgxpool.Pool - handlers map[string]Handler // a map of queue names to queue handlers - mu *sync.Mutex // mutex to protect mutating state on a pgWorker - futureJobs map[int64]time.Time // map of future job IDs to their due time - jobCheckInterval time.Duration // the duration of time between checking for future jobs to schedule - logger Logger + types.Backend + config *config.Config + logger logging.Logger + cron *cron.Cron + listenConn *pgx.Conn + mu *sync.Mutex // mutex to protect mutating state on a pgWorker + pool *pgxpool.Pool + futureJobs map[int64]time.Time // map of future job IDs to their due time + handlers map[string]handler.Handler // a map of queue names to queue handlers + cancelFuncs []context.CancelFunc // A collection of cancel functions to be called upon Shutdown() } -// NewPgBackend creates a new neoq backend backed by Postgres +// Backend initializes a new postgres-backed neoq backend +// // If the database does not yet exist, Neoq will attempt to create the database and related tables by default. // -// Connection strings may be a URL or DSN-style connection string to neoq's database. The connection string supports multiple +// Backend requires that one of the [config.ConfigOption] is [config.WithConnectionString] +// +// Connection strings may be a URL or DSN-style connection strings. The connection string supports multiple // options detailed below. // // options: -// pool_max_conns: integer greater than 0 -// pool_min_conns: integer 0 or greater -// pool_max_conn_lifetime: duration string -// pool_max_conn_idle_time: duration string -// pool_health_check_period: duration string -// pool_max_conn_lifetime_jitter: duration string +// - pool_max_conns: integer greater than 0 +// - pool_min_conns: integer 0 or greater +// - pool_max_conn_lifetime: duration string +// - pool_max_conn_idle_time: duration string +// - pool_health_check_period: duration string +// - pool_max_conn_lifetime_jitter: duration string // // # Example DSN +// // user=worker password=secret host=workerdb.example.com port=5432 dbname=mydb sslmode=verify-ca pool_max_conns=10 // // # Example URL -// postgres://worker:secret@workerdb.example.com:5432/mydb?sslmode=verify-ca&pool_max_conns=10 // -// Available ConfigOption -// - WithTransactionTimeout(timeout int): configure the idle_in_transaction_timeout for the worker's database -// connection(s) -func NewPgBackend(ctx context.Context, connectString string, opts ...ConfigOption) (n Neoq, err error) { - w := &PgBackend{ - mu: &sync.Mutex{}, - config: &pgConfig{connectString: connectString}, - handlers: make(map[string]Handler), - futureJobs: make(map[int64]time.Time), - logger: slog.New(slog.NewTextHandler(os.Stdout)), - cron: cron.New(), - cancelFuncs: []context.CancelFunc{}, - jobCheckInterval: DefaultJobCheckInterval, - } - w.cron.Start() +// postgres://worker:secret@workerdb.example.com:5432/mydb?sslmode=verify-ca&pool_max_conns=10 +func Backend(ctx context.Context, opts ...config.Option) (pb types.Backend, err error) { + p := &PgBackend{ + mu: &sync.Mutex{}, + config: config.New(), + handlers: make(map[string]handler.Handler), + futureJobs: make(map[int64]time.Time), + logger: slog.New(slog.NewTextHandler(os.Stdout)), + cron: cron.New(), + cancelFuncs: []context.CancelFunc{}, + } // Set all options for _, opt := range opts { - opt(w) + opt(p.config) } ctx, cancel := context.WithCancel(ctx) - w.mu.Lock() - w.cancelFuncs = append(w.cancelFuncs, cancel) - w.mu.Unlock() + p.mu.Lock() + p.cancelFuncs = append(p.cancelFuncs, cancel) + p.mu.Unlock() - err = w.initializeDB(ctx) + err = p.initializeDB(ctx) if err != nil { return } - if w.pool == nil { + if p.pool == nil { var poolConfig *pgxpool.Config - poolConfig, err = pgxpool.ParseConfig(w.config.connectString) - if err != nil || w.config.connectString == "" { - return nil, errors.New("invalid connecton string: see documentation for valid connection strings") + poolConfig, err = pgxpool.ParseConfig(p.config.ConnectionString) + if err != nil || p.config.ConnectionString == "" { + return nil, ErrCnxString } // ensure that workers don't consume connections with idle transactions poolConfig.AfterConnect = func(ctx context.Context, conn *pgx.Conn) (err error) { var query string - if w.config.idleTxTimeout > 0 { - query = fmt.Sprintf("SET idle_in_transaction_session_timeout = '%dms'", w.config.idleTxTimeout) + if p.config.IdleTransactionTimeout > 0 { + query = fmt.Sprintf("SET idle_in_transaction_session_timeout = '%dms'", p.config.IdleTransactionTimeout) } else { // there is no limit to the amount of time a worker's transactions may be idle - query = "SET idle_in_transaction_session_timeout = 0" + query = setIdleInTxSessionTimeout } _, err = conn.Exec(ctx, query) return } - w.pool, err = pgxpool.NewWithConfig(ctx, poolConfig) + p.pool, err = pgxpool.NewWithConfig(ctx, poolConfig) if err != nil { return } } - n = w + p.cron.Start() + + pb = p return } -// pgConfig configures NeoqPg's general behavior -type pgConfig struct { - idleTxTimeout int // time a worker's transaction may be idle before its connection is closed - connectString string // postgres connect string / DSN +// WithTransactionTimeout sets the time that PgBackend's transactions may be idle before its underlying connection is +// closed +// The timeout is the number of milliseconds that a transaction may sit idle before postgres terminates the +// transaction's underlying connection. The timeout should be longer than your longest job takes to complete. If set +// too short, job state will become unpredictable, e.g. retry counts may become incorrect. +func WithTransactionTimeout(txTimeout int) config.Option { + return func(c *config.Config) { + c.IdleTransactionTimeout = txTimeout + } +} + +// txFromContext gets the transaction from a context, if the transaction is already set +func txFromContext(ctx context.Context) (t pgx.Tx, err error) { + if v, ok := ctx.Value(handler.CtxVarsKey).(handler.CtxVars); ok { + var tx pgx.Tx + var ok bool + if tx, ok = v.Tx.(pgx.Tx); !ok { + return nil, ErrNoTransactionInContext + } + return tx, nil + } + + return nil, ErrNoTransactionInContext } // initializeDB initializes the tables, types, and indices necessary to operate Neoq -func (w PgBackend) initializeDB(ctx context.Context) (err error) { +// +//nolint:funlen,gocyclo,cyclop +func (p *PgBackend) initializeDB(ctx context.Context) (err error) { var pgxCfg *pgx.ConnConfig var tx pgx.Tx - pgxCfg, err = pgx.ParseConfig(w.config.connectString) + pgxCfg, err = pgx.ParseConfig(p.config.ConnectionString) if err != nil { return } @@ -160,7 +194,7 @@ func (w PgBackend) initializeDB(ctx context.Context) (err error) { connectStr := fmt.Sprintf("postgres://%s:%s@%s", pgxCfg.User, pgxCfg.Password, pgxCfg.Host) conn, err := pgx.Connect(ctx, connectStr) if err != nil { - w.logger.Error("unableto connect to database", err) + p.logger.Error("unableto connect to database", err) return } defer conn.Close(ctx) @@ -169,14 +203,12 @@ func (w PgBackend) initializeDB(ctx context.Context) (err error) { dbExistsQ := fmt.Sprintf(`SELECT EXISTS (SELECT datname FROM pg_catalog.pg_database WHERE datname = '%s');`, pgxCfg.Database) rows, err := conn.Query(ctx, dbExistsQ) if err != nil { - fmt.Fprintf(os.Stderr, "unable to determne if jobs table exists: %v", err) - return + return fmt.Errorf("unable to determne if jobs table exists: %w", err) } for rows.Next() { err = rows.Scan(&dbExists) if err != nil { - fmt.Fprintf(os.Stderr, "unable to determine if jobs table exists: %v", err) - return + return fmt.Errorf("unable to determine if jobs table exists: %w", err) } } defer rows.Close() @@ -184,17 +216,14 @@ func (w PgBackend) initializeDB(ctx context.Context) (err error) { conn.Close(ctx) conn, err = pgx.Connect(ctx, connectStr) if err != nil { - fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err) - os.Exit(1) + return fmt.Errorf("unable to connect to database: %w", err) } defer conn.Close(ctx) if !dbExists { createDBQ := fmt.Sprintf("CREATE DATABASE %s", pgxCfg.Database) - _, err := conn.Exec(ctx, createDBQ) - if err != nil { - fmt.Fprintf(os.Stderr, "unable to create neoq database: %v", err) - return err + if _, err = conn.Exec(ctx, createDBQ); err != nil { + return fmt.Errorf("unable to create neoq database: %w", err) } } @@ -207,7 +236,7 @@ func (w PgBackend) initializeDB(ctx context.Context) (err error) { if err != nil { return } - defer tx.Rollback(ctx) // rollback has no effect if the transaction has been committed + defer func(ctx context.Context) { _ = tx.Rollback(ctx) }(ctx) // rollback has no effect if the transaction has been committed jobsTableExistsQ := `SELECT EXISTS (SELECT FROM pg_tables @@ -272,37 +301,39 @@ func (w PgBackend) initializeDB(ctx context.Context) (err error) { _, err = tx.Exec(ctx, createTablesQ) if err != nil { - fmt.Fprintf(os.Stderr, "unable to create job status enum: %v", err) - return err + return fmt.Errorf("unable to create job status enum: %w", err) } - err = tx.Commit(ctx) + if err = tx.Commit(ctx); err != nil { + return fmt.Errorf("error committing transaction: %w", err) + } } - return + return nil } // Enqueue adds jobs to the specified queue -func (w *PgBackend) Enqueue(ctx context.Context, job Job) (jobID int64, err error) { +func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID int64, err error) { ctx, cancel := context.WithCancel(ctx) - w.mu.Lock() - w.cancelFuncs = append(w.cancelFuncs, cancel) - w.mu.Unlock() - - conn, err := w.pool.Acquire(ctx) + p.mu.Lock() + p.cancelFuncs = append(p.cancelFuncs, cancel) + p.mu.Unlock() + conn, err := p.pool.Acquire(ctx) if err != nil { + err = fmt.Errorf("error acquiring connection: %w", err) return } defer conn.Release() tx, err := conn.Begin(ctx) if err != nil { + err = fmt.Errorf("error creating transaction: %w", err) return } // Rollback is safe to call even if the tx is already closed, so if // the tx commits successfully, this is a no-op - defer tx.Rollback(ctx) + defer func(ctx context.Context) { _ = tx.Rollback(ctx) }(ctx) // rollback has no effect if the transaction has been committed // Make sure RunAfter is set to a non-zero value if not provided by the caller // if already set, schedule the future job @@ -312,108 +343,113 @@ func (w *PgBackend) Enqueue(ctx context.Context, job Job) (jobID int64, err erro } if job.Queue == "" { - err = errors.New("this job does not specify a Queue. Please specify a queue") - + err = ErrNoQueue return } - jobID, err = w.enqueueJob(ctx, tx, job) - - if err != nil || jobID == DuplicateJobID { + jobID, err = p.enqueueJob(ctx, tx, job) + if err != nil { + err = fmt.Errorf("error enqueuing job: %w", err) + } + if jobID == jobs.DuplicateJobID { + err = ErrDuplicateJobID return } // notify listeners that a new job has arrived if it's not a future job - if job.RunAfter == now { + if job.RunAfter.Equal(now) { _, err = tx.Exec(ctx, fmt.Sprintf("NOTIFY %s, '%d'", job.Queue, jobID)) if err != nil { - return + err = fmt.Errorf("error executing transaction: %w", err) } } else { - w.mu.Lock() - w.futureJobs[jobID] = job.RunAfter - w.mu.Unlock() + p.mu.Lock() + p.futureJobs[jobID] = job.RunAfter + p.mu.Unlock() } err = tx.Commit(ctx) if err != nil { + err = fmt.Errorf("error committing transaction: %w", err) return } - return + return jobID, nil } -// Listen sets the queue handler function for the specified queue. -// -// Neoq will not process any queues until Listen() is called at least once. -func (w *PgBackend) Listen(ctx context.Context, queue string, h Handler) (err error) { +// Start starts processing jobs with the specified queue and handler +func (p *PgBackend) Start(ctx context.Context, queue string, h handler.Handler) (err error) { ctx, cancel := context.WithCancel(ctx) - w.mu.Lock() - w.cancelFuncs = append(w.cancelFuncs, cancel) - w.handlers[queue] = h - w.mu.Unlock() + p.mu.Lock() + p.cancelFuncs = append(p.cancelFuncs, cancel) + p.handlers[queue] = h + p.mu.Unlock() - err = w.start(ctx, queue) + err = p.start(ctx, queue) if err != nil { return } return } -// ListenCron listens for jobs on a cron schedule and handles them with the provided handler +// StartCron starts processing jobs with the specified cron schedule and handler // // See: https://pkg.go.dev/github.com/robfig/cron?#hdr-CRON_Expression_Format for details on the cron spec format -func (w *PgBackend) ListenCron(ctx context.Context, cronSpec string, h Handler) (err error) { +func (p *PgBackend) StartCron(ctx context.Context, cronSpec string, h handler.Handler) (err error) { cd, err := crondescriptor.NewCronDescriptor(cronSpec) if err != nil { - return err + return fmt.Errorf("error creating cron descriptor: %w", err) } cdStr, err := cd.GetDescription(crondescriptor.Full) if err != nil { - return err + return fmt.Errorf("error getting cron description: %w", err) } - queue := stripNonAlphanum(strcase.ToSnake(*cdStr)) + queue := internal.StripNonAlphanum(strcase.ToSnake(*cdStr)) ctx, cancel := context.WithCancel(ctx) - w.mu.Lock() - w.cancelFuncs = append(w.cancelFuncs, cancel) - w.mu.Unlock() + p.mu.Lock() + p.cancelFuncs = append(p.cancelFuncs, cancel) + p.mu.Unlock() - w.cron.AddFunc(cronSpec, func() { - w.Enqueue(ctx, Job{Queue: queue}) - }) + if err = p.cron.AddFunc(cronSpec, func() { + _, err := p.Enqueue(ctx, &jobs.Job{Queue: queue}) + if err != nil { + p.logger.Error("error queueing cron job", err) + } + }); err != nil { + return fmt.Errorf("error adding cron: %w", err) + } - err = w.Listen(ctx, queue, h) + err = p.Start(ctx, queue, h) return } // SetLogger sets this backend's logger -func (w *PgBackend) SetLogger(logger Logger) { - w.logger = logger +func (p *PgBackend) SetLogger(logger logging.Logger) { + p.logger = logger } -func (w *PgBackend) Shutdown(ctx context.Context) { - w.pool.Close() // also closes the hijacked listenConn - w.cron.Stop() +func (p *PgBackend) Shutdown(ctx context.Context) { + p.pool.Close() // also closes the hijacked listenConn + p.cron.Stop() - for _, f := range w.cancelFuncs { + for _, f := range p.cancelFuncs { f() } - w.cancelFuncs = nil + p.cancelFuncs = nil } // enqueueJob adds jobs to the queue, returning the job ID // // Jobs that are not already fingerprinted are fingerprinted before being added // Duplicate jobs are not added to the queue. Any two unprocessed jobs with the same fingerprint are duplicates -func (w PgBackend) enqueueJob(ctx context.Context, tx pgx.Tx, j Job) (jobID int64, err error) { - - err = fingerprintJob(&j) +func (p *PgBackend) enqueueJob(ctx context.Context, tx pgx.Tx, j *jobs.Job) (jobID int64, err error) { + err = jobs.FingerprintJob(j) if err != nil { return } @@ -430,7 +466,7 @@ func (w PgBackend) enqueueJob(ctx context.Context, tx pgx.Tx, j Job) (jobID int6 // this is a duplicate job; skip it if rowCount > 0 { - return DuplicateJobID, nil + return jobs.DuplicateJobID, nil } err = tx.QueryRow(ctx, `INSERT INTO neoq_jobs(queue, fingerprint, payload, run_after) @@ -444,7 +480,7 @@ func (w PgBackend) enqueueJob(ctx context.Context, tx pgx.Tx, j Job) (jobID int6 } // moveToDeadQueue moves jobs from the pending queue to the dead queue -func (w PgBackend) moveToDeadQueue(ctx context.Context, tx pgx.Tx, j *Job, jobErr error) (err error) { +func (p *PgBackend) moveToDeadQueue(ctx context.Context, tx pgx.Tx, j *jobs.Job, jobErr error) (err error) { _, err = tx.Exec(ctx, "DELETE FROM neoq_jobs WHERE id = $1", j.ID) if err != nil { return @@ -464,36 +500,37 @@ func (w PgBackend) moveToDeadQueue(ctx context.Context, tx pgx.Tx, j *Job, jobEr // reflecting the status of the job and its number of retries. // TODO: Handle dropped connections when updating job status in PgBackend // e.g. acquiring a new connection in the event of connection failure -func (w PgBackend) updateJob(ctx context.Context, jobErr error) (err error) { - status := JobStatusProcessed +// nolint: cyclop +func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) { + status := internal.JobStatusProcessed errMsg := "" if jobErr != nil { - status = JobStatusFailed + status = internal.JobStatusFailed errMsg = jobErr.Error() } - var job *Job - if job, err = JobFromContext(ctx); err != nil { - return errors.New("unable to get job from context") + var job *jobs.Job + if job, err = handler.JobFromContext(ctx); err != nil { + return fmt.Errorf("error getting job from context: %w", err) } var tx pgx.Tx if tx, err = txFromContext(ctx); err != nil { - return errors.New("unable to get transaction from context") + return fmt.Errorf("error getting tx from context: %w", err) } if job.Retries >= job.MaxRetries { - err = w.moveToDeadQueue(ctx, tx, job, jobErr) + err = p.moveToDeadQueue(ctx, tx, job, jobErr) return } var runAfter time.Time - if job.Retries > 0 && status == JobStatusFailed { - runAfter = calculateBackoff(job.Retries) + if job.Retries > 0 && status == internal.JobStatusFailed { + runAfter = internal.CalculateBackoff(job.Retries) qstr := "UPDATE neoq_jobs SET ran_at = $1, error = $2, status = $3, retries = $4, run_after = $5 WHERE id = $6" _, err = tx.Exec(ctx, qstr, time.Now(), errMsg, status, job.Retries, runAfter, job.ID) - } else if job.Retries > 0 && status != JobStatusFailed { + } else if job.Retries > 0 && status != internal.JobStatusFailed { qstr := "UPDATE neoq_jobs SET ran_at = $1, error = $2, status = $3, retries = $4 WHERE id = $5" _, err = tx.Exec(ctx, qstr, time.Now(), errMsg, status, job.Retries, job.ID) } else { @@ -502,22 +539,23 @@ func (w PgBackend) updateJob(ctx context.Context, jobErr error) (err error) { } if err == nil && time.Until(runAfter) > 0 { - w.mu.Lock() - w.futureJobs[job.ID] = runAfter - w.mu.Unlock() + p.mu.Lock() + p.futureJobs[job.ID] = runAfter + p.mu.Unlock() } - return + return nil } -// start starts a queue listener, processes pending job, and fires up goroutines to process future jobs -func (w PgBackend) start(ctx context.Context, queue string) (err error) { - var handler Handler +// start starts processing new, pending, and future jobs +// nolint: cyclop +func (p *PgBackend) start(ctx context.Context, queue string) (err error) { + var h handler.Handler var ok bool - if handler, ok = w.handlers[queue]; !ok { - return fmt.Errorf("no handler for queue: %s", queue) + if h, ok = p.handlers[queue]; !ok { + return fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, queue) } - conn, err := w.pool.Acquire(ctx) + conn, err := p.pool.Acquire(ctx) if err != nil { return } @@ -526,20 +564,20 @@ func (w PgBackend) start(ctx context.Context, queue string) (err error) { // TODO: Give more thought to the implications of hijacking connections to LISTEN on in PgBackend // should this connecton not come from the pool, to avoid tainting it with connections that don't have an idle in // transaction time out set? - w.mu.Lock() - if w.listenConn == nil { - w.listenConn = conn.Hijack() + p.mu.Lock() + if p.listenConn == nil { + p.listenConn = conn.Hijack() } - w.mu.Unlock() + p.mu.Unlock() - listenJobChan := w.listen(ctx, queue) // listen for 'new' jobs - pendingJobsChan := w.pendingJobs(ctx, queue) // process overdue jobs *at startup* + listenJobChan := p.listen(ctx, queue) // listen for 'new' jobs + pendingJobsChan := p.pendingJobs(ctx, queue) // process overdue jobs *at startup* // process all future jobs and retries // TODO consider performance implications of `scheduleFutureJobs` in PgBackend - go func() { w.scheduleFutureJobs(ctx, queue) }() + go func() { p.scheduleFutureJobs(ctx, queue) }() - for i := 0; i < handler.concurrency; i++ { + for i := 0; i < h.Concurrency; i++ { go func() { var err error var jobID int64 @@ -547,9 +585,9 @@ func (w PgBackend) start(ctx context.Context, queue string) (err error) { for { select { case jobID = <-listenJobChan: - err = w.handleJob(ctx, jobID, handler) + err = p.handleJob(ctx, jobID, h) case jobID = <-pendingJobsChan: - err = w.handleJob(ctx, jobID, handler) + err = p.handleJob(ctx, jobID, h) case <-ctx.Done(): return } @@ -560,61 +598,62 @@ func (w PgBackend) start(ctx context.Context, queue string) (err error) { continue } - w.logger.Error("job failed", err, "job_id", jobID) + p.logger.Error("job failed", err, "job_id", jobID) continue } } }() } - return + + return nil } // removeFutureJob removes a future job from the in-memory list of jobs that will execute in the future -func (w PgBackend) removeFutureJob(jobID int64) { - if _, ok := w.futureJobs[jobID]; ok { - w.mu.Lock() - delete(w.futureJobs, jobID) - w.mu.Unlock() +func (p *PgBackend) removeFutureJob(jobID int64) { + if _, ok := p.futureJobs[jobID]; ok { + p.mu.Lock() + delete(p.futureJobs, jobID) + p.mu.Unlock() } } // initFutureJobs is intended to be run once to initialize the list of future jobs that must be monitored for // execution. it should be run only during system startup. -func (w PgBackend) initFutureJobs(ctx context.Context, queue string) { - rows, err := w.pool.Query(ctx, FutureJobQuery, queue) +func (p *PgBackend) initFutureJobs(ctx context.Context, queue string) { + rows, err := p.pool.Query(ctx, FutureJobQuery, queue) if err != nil { - w.logger.Error("error fetching future jobs list", err) + p.logger.Error("error fetching future jobs list", err) return } var id int64 var runAfter time.Time - pgx.ForEachRow(rows, []any{&id, &runAfter}, func() error { - w.mu.Lock() - w.futureJobs[id] = runAfter - w.mu.Unlock() + _, _ = pgx.ForEachRow(rows, []any{&id, &runAfter}, func() error { + p.mu.Lock() + p.futureJobs[id] = runAfter + p.mu.Unlock() return nil }) } // scheduleFutureJobs announces future jobs using NOTIFY on an interval -func (w PgBackend) scheduleFutureJobs(ctx context.Context, queue string) { - w.initFutureJobs(ctx, queue) +func (p *PgBackend) scheduleFutureJobs(ctx context.Context, queue string) { + p.initFutureJobs(ctx, queue) // check for new future jobs on an interval - ticker := time.NewTicker(w.jobCheckInterval) + ticker := time.NewTicker(p.config.JobCheckInterval) for { // loop over list of future jobs, scheduling goroutines to wait for jobs that are due within the next 30 seconds - for jobID, runAfter := range w.futureJobs { + for jobID, runAfter := range p.futureJobs { timeUntillRunAfter := time.Until(runAfter) - if timeUntillRunAfter <= DefaultFutureJobWindow { - w.removeFutureJob(jobID) + if timeUntillRunAfter <= p.config.FutureJobWindow { + p.removeFutureJob(jobID) go func(jid int64) { scheduleCh := time.After(timeUntillRunAfter) <-scheduleCh - w.announceJob(ctx, queue, jid) + p.announceJob(ctx, queue, jid) }(jobID) } } @@ -631,8 +670,8 @@ func (w PgBackend) scheduleFutureJobs(ctx context.Context, queue string) { // announceJob announces jobs to queue listeners. // // Announced jobs are executed by the first worker to respond to the announcement. -func (w PgBackend) announceJob(ctx context.Context, queue string, jobID int64) { - conn, err := w.pool.Acquire(ctx) +func (p *PgBackend) announceJob(ctx context.Context, queue string, jobID int64) { + conn, err := p.pool.Acquire(ctx) if err != nil { return } @@ -645,7 +684,7 @@ func (w PgBackend) announceJob(ctx context.Context, queue string, jobID int64) { // Rollback is safe to call even if the tx is already closed, so if // the tx commits successfully, this is a no-op - defer tx.Rollback(ctx) + defer func(ctx context.Context) { _ = tx.Rollback(ctx) }(ctx) // notify listeners that a job is ready to run _, err = tx.Exec(ctx, fmt.Sprintf("NOTIFY %s, '%d'", queue, jobID)) @@ -657,15 +696,14 @@ func (w PgBackend) announceJob(ctx context.Context, queue string, jobID int64) { if err != nil { return } - } -func (w PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan int64) { +func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan int64) { jobsCh = make(chan int64) - conn, err := w.pool.Acquire(ctx) + conn, err := p.pool.Acquire(ctx) if err != nil { - w.logger.Error("failed to acquire database connection to listen for pending queue items", err) + p.logger.Error("failed to acquire database connection to listen for pending queue items", err) return } @@ -673,10 +711,10 @@ func (w PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan i defer conn.Release() for { - jobID, err := w.getPendingJobID(ctx, conn, queue) + jobID, err := p.getPendingJobID(ctx, conn, queue) if err != nil { if !errors.Is(err, pgx.ErrNoRows) { - w.logger.Error("failed to fetch pending job", err, "job_id", jobID) + p.logger.Error("failed to fetch pending job", err, "job_id", jobID) } else { // done fetching pending jobs break @@ -685,7 +723,6 @@ func (w PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan i jobsCh <- jobID } } - }(ctx) return @@ -695,9 +732,9 @@ func (w PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan i // it receives pending, periodic, and retry job ids asynchronously // 1. handleJob first creates a transactions inside of which a row lock is acquired for the job to be processed. // 2. handleJob secondly calls the handler on the job, and finally updates the job's status -func (w PgBackend) handleJob(ctx context.Context, jobID int64, handler Handler) (err error) { +func (p *PgBackend) handleJob(ctx context.Context, jobID int64, h handler.Handler) (err error) { var tx pgx.Tx - conn, err := w.pool.Acquire(ctx) + conn, err := p.pool.Acquire(ctx) if err != nil { return } @@ -707,76 +744,77 @@ func (w PgBackend) handleJob(ctx context.Context, jobID int64, handler Handler) if err != nil { return } - defer tx.Rollback(ctx) // rollback has no effect if the transaction has been committed + defer func(ctx context.Context) { _ = tx.Rollback(ctx) }(ctx) // rollback has no effect if the transaction has been committed - ctxv := handlerCtxVars{tx: tx} - var job Job - job, err = w.getPendingJob(ctx, tx, jobID) + ctxv := handler.CtxVars{Tx: tx} + var job *jobs.Job + job, err = p.getPendingJob(ctx, tx, jobID) if err != nil { return } - ctxv.job = &job - ctx = withHandlerContext(ctx, ctxv) + ctxv.Job = job + ctx = handler.WithContext(ctx, ctxv) // check if the job is being retried and increment retry count accordingly - if job.Status != JobStatusNew { - job.Retries = job.Retries + 1 + if job.Status != internal.JobStatusNew { + job.Retries++ } // execute the queue handler of this job - jobErr := execHandler(ctx, handler) - err = w.updateJob(ctx, jobErr) - if err != nil { - err = fmt.Errorf("error updating job status: %w", err) - return + jobErr := handler.Exec(ctx, h) + if jobErr != nil { + err = fmt.Errorf("error executing handler: %w", jobErr) + return err } - if jobErr != nil { - return jobErr + err = p.updateJob(ctx, jobErr) + if err != nil { + err = fmt.Errorf("error updating job status: %w", err) + return err } err = tx.Commit(ctx) if err != nil { - w.logger.Error("unable to commit job transaction. retrying this job may dupliate work", err, "job_id", job.ID) - err = fmt.Errorf("unable to commit job transaction. retrying this job may dupliate work: %w", err) + p.logger.Error("unable to commit job transaction. retrying this job may dupliate work", err, "job_id", job.ID) + return fmt.Errorf("unable to commit job transaction. retrying this job may dupliate work: %w", err) } - return + return nil } // listen uses Postgres LISTEN to listen for jobs on a queue // TODO: There is currently no handling of listener disconnects in PgBackend. // This will lead to jobs not getting processed until the worker is restarted. // Implement disconnect handling. -func (w PgBackend) listen(ctx context.Context, queue string) (c chan int64) { +func (p *PgBackend) listen(ctx context.Context, queue string) (c chan int64) { var err error c = make(chan int64) // set this connection's idle in transaction timeout to infinite so it is not intermittently disconnected - _, err = w.listenConn.Exec(ctx, fmt.Sprintf("SET idle_in_transaction_session_timeout = '0'; LISTEN %s", queue)) + _, err = p.listenConn.Exec(ctx, fmt.Sprintf("SET idle_in_transaction_session_timeout = '0'; LISTEN %s", queue)) if err != nil { err = fmt.Errorf("unable to create database connection for listener: %w", err) - w.logger.Error("unablet o create database connection for listener", err) + p.logger.Error("unablet o create database connection for listener", err) return } go func(ctx context.Context) { for { - notification, waitErr := w.listenConn.WaitForNotification(ctx) + notification, waitErr := p.listenConn.WaitForNotification(ctx) if waitErr != nil { if errors.Is(waitErr, context.Canceled) { return } - w.logger.Error("failed to wait for notification", waitErr) + p.logger.Error("failed to wait for notification", waitErr) time.Sleep(1 * time.Second) continue } var jobID int64 if jobID, err = strconv.ParseInt(notification.Payload, 0, 64); err != nil { - w.logger.Error("unable to fetch job", err) + p.logger.Error("unable to fetch job", err) continue } @@ -784,94 +822,24 @@ func (w PgBackend) listen(ctx context.Context, queue string) (c chan int64) { } }(ctx) - return + return c } -func (w PgBackend) getPendingJob(ctx context.Context, tx pgx.Tx, jobID int64) (job Job, err error) { +func (p *PgBackend) getPendingJob(ctx context.Context, tx pgx.Tx, jobID int64) (job *jobs.Job, err error) { row, err := tx.Query(ctx, PendingJobQuery, jobID) if err != nil { return } - var j *Job - j, err = pgx.CollectOneRow(row, pgx.RowToAddrOfStructByName[Job]) + job, err = pgx.CollectOneRow(row, pgx.RowToAddrOfStructByName[jobs.Job]) if err != nil { return } - return *j, err + return } -func (w PgBackend) getPendingJobID(ctx context.Context, conn *pgxpool.Conn, queue string) (jobID int64, err error) { +func (p *PgBackend) getPendingJobID(ctx context.Context, conn *pgxpool.Conn, queue string) (jobID int64, err error) { err = conn.QueryRow(ctx, PendingJobIDQuery, queue).Scan(&jobID) return } - -// PgTransactionTimeout sets the time that NeoqPg's transactions may be idle before its underlying connection is -// closed -// The timeout is the number of milliseconds that a transaction may sit idle before postgres terminates the -// transaction's underlying connection. The timeout should be longer than your longest job takes to complete. If set -// too short, job state will become unpredictable, e.g. retry counts may become incorrect. -// -// PgTransactionTimeout is best set when calling neoq.New() rather than after creation using WithConfig() because this -// setting results in the creation of a new database connection pool. -func PgTransactionTimeout(txTimeout int) ConfigOption { - return func(n Neoq) { - var ok bool - var npg *PgBackend - - if npg, ok = n.(*PgBackend); !ok { - return - } - - npg.config.idleTxTimeout = txTimeout - - // if the worker already has a pool configured, then setting the transaction timeout prints a warning and is a - // no-op - if npg.pool != nil { - log.Println("create a new Neoq instance to set transaction timeout") - return - } - - var err error - var poolConfig *pgxpool.Config - - poolConfig, err = pgxpool.ParseConfig(npg.config.connectString) - if err != nil || npg.config.connectString == "" { - return - } - - // ensure that workers don't consume connections with idle transactions - poolConfig.AfterConnect = func(ctx context.Context, conn *pgx.Conn) (err error) { - var query string - if npg.config.idleTxTimeout > 0 { - query = fmt.Sprintf("SET idle_in_transaction_session_timeout = '%dms'", npg.config.idleTxTimeout) - } else { - // there is no limit to the amount of time a worker's transactions may be idle - query = "SET idle_in_transaction_session_timeout = 0" - } - _, err = conn.Exec(ctx, query) - return - } - - npg.pool, err = pgxpool.NewWithConfig(context.Background(), poolConfig) - if err != nil { - return - } - } -} - -func stripNonAlphanum(s string) string { - var result strings.Builder - for i := 0; i < len(s); i++ { - b := s[i] - if (b == '_') || - ('a' <= b && b <= 'z') || - ('A' <= b && b <= 'Z') || - ('0' <= b && b <= '9') || - b == ' ' { - result.WriteByte(b) - } - } - return result.String() -} diff --git a/postgres_backend_test.go b/backends/postgres/postgres_backend_test.go similarity index 57% rename from postgres_backend_test.go rename to backends/postgres/postgres_backend_test.go index a41d490..4bba363 100644 --- a/postgres_backend_test.go +++ b/backends/postgres/postgres_backend_test.go @@ -1,4 +1,4 @@ -package neoq +package postgres_test import ( "context" @@ -7,13 +7,18 @@ import ( "testing" "time" + "github.com/acaloiaro/neoq" + "github.com/acaloiaro/neoq/backends/postgres" + "github.com/acaloiaro/neoq/config" + "github.com/acaloiaro/neoq/handler" + "github.com/acaloiaro/neoq/jobs" "github.com/pkg/errors" "golang.org/x/exp/slog" ) -// TestPgBackendBasicJobProcessing tests that the postgres backend is able to process the most basic jobs with the +// TestBasicJobProcessing tests that the postgres backend is able to process the most basic jobs with the // most basic configuration. -func TestPgBackendBasicJobProcessing(t *testing.T) { +func TestBasicJobProcessing(t *testing.T) { queue := "testing" numJobs := 10 doneCnt := 0 @@ -27,28 +32,32 @@ func TestPgBackendBasicJobProcessing(t *testing.T) { } ctx := context.TODO() - nq, err := NewPgBackend(ctx, connString) + nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), config.WithConnectionString(connString)) if err != nil { t.Fatal(err) } + defer nq.Shutdown(ctx) - handler := NewHandler(func(ctx context.Context) (err error) { + h := handler.New(func(_ context.Context) (err error) { done <- true return }) - nq.Listen(ctx, queue, handler) + err = nq.Start(ctx, queue, h) + if err != nil { + t.Error(err) + } go func() { for i := 0; i < numJobs; i++ { - jid, err := nq.Enqueue(ctx, Job{ + jid, e := nq.Enqueue(ctx, &jobs.Job{ Queue: queue, Payload: map[string]interface{}{ "message": fmt.Sprintf("hello world: %d", i), }, }) - if err != nil || jid == DuplicateJobID { - slog.Error("job was not enqueued. either it was duplicate or this error caused it:", err) + if e != nil || jid == jobs.DuplicateJobID { + slog.Error("job was not enqueued. either it was duplicate or this error caused it:", e) } } }() @@ -73,6 +82,4 @@ func TestPgBackendBasicJobProcessing(t *testing.T) { if err != nil { t.Error(err) } - - nq.Shutdown(ctx) } diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..8ec1b40 --- /dev/null +++ b/config/config.go @@ -0,0 +1,49 @@ +package config + +import ( + "context" + "time" + + "github.com/acaloiaro/neoq/types" +) + +const ( + DefaultIdleTxTimeout = 30000 + // the window of time between time.Now() and when a job's RunAfter comes due that neoq will schedule a goroutine to + // schdule the job for execution. + // E.g. right now is 16:00 and a job's RunAfter is 16:30 of the same date. This job will get a dedicated goroutine to + // wait until the job's RunAfter, scheduling the job to be run exactly at RunAfter + DefaultFutureJobWindow = 30 * time.Second + DefaultJobCheckInterval = 5 * time.Second + DefaultTransactionTimeout = time.Minute +) + +type Config struct { + BackendInitializer BackendInitializer + ConnectionString string // a string containing connection details for the backend + FutureJobWindow time.Duration // the window of time between now and RunAfter that goroutines are scheduled for future jobs + JobCheckInterval time.Duration // the interval of time between checking for new future/retry jobs + IdleTransactionTimeout int // the number of milliseconds PgBackend transaction may idle before the connection is killed +} + +// Option is a function that sets optional backend configuration +type Option func(c *Config) + +// New initiailizes a new Config with defaults +func New() *Config { + return &Config{ + FutureJobWindow: DefaultFutureJobWindow, + JobCheckInterval: DefaultJobCheckInterval, + IdleTransactionTimeout: DefaultIdleTxTimeout, + } +} + +// WithConnectionString configures neoq to use the specified connection string when connecting to a backend +func WithConnectionString(connectionString string) Option { + return func(c *Config) { + c.ConnectionString = connectionString + } +} + +// BackendInitializer is a function that initializes a backend +type BackendInitializer func(ctx context.Context, opts ...Option) (backend types.Backend, err error) diff --git a/doc.go b/doc.go new file mode 100644 index 0000000..bf25292 --- /dev/null +++ b/doc.go @@ -0,0 +1,8 @@ +// Package neoq provides background job processing for Go applications. +// +// Neoq's goal is to minimize the infrastructure necessary to add background job processing to +// Go applications. It does so by implementing queue durability with modular backends, rather +// than introducing a strict dependency on a particular backend such as Redis. +// +// An in-memory and Postgres backend are provided out of the box. +package neoq diff --git a/examples/add_future_postgres_job/main.go b/examples/add_future_postgres_job/main.go index 2d6f024..344e005 100644 --- a/examples/add_future_postgres_job/main.go +++ b/examples/add_future_postgres_job/main.go @@ -6,22 +6,25 @@ import ( "time" "github.com/acaloiaro/neoq" + "github.com/acaloiaro/neoq/backends/postgres" + "github.com/acaloiaro/neoq/config" + "github.com/acaloiaro/neoq/jobs" ) func main() { const queue = "foobar" ctx := context.Background() nq, err := neoq.New(ctx, - neoq.PgTransactionTimeout(1000), - neoq.WithBackendName("postgres"), - neoq.WithConnectionString("postgres://postgres:postgres@127.0.0.1:5432/neoq")) - + config.WithConnectionString("postgres://postgres:postgres@127.0.0.1:5432/neoq"), + neoq.WithBackend(postgres.Backend), + postgres.WithTransactionTimeout(1000), // nolint: mnd, gomnd + ) if err != nil { - log.Fatalf("error initializing neoq: %v", err) + log.Fatalf("error initializing postgres backend: %v", err) } // Add a job that will execute 1 hour from now - jobID, err := nq.Enqueue(ctx, neoq.Job{ + jobID, err := nq.Enqueue(ctx, &jobs.Job{ Queue: queue, Payload: map[string]interface{}{ "message": "hello, future world", diff --git a/examples/add_job_with_custom_concurrency/main.go b/examples/add_job_with_custom_concurrency/main.go index cd24239..9730af6 100644 --- a/examples/add_job_with_custom_concurrency/main.go +++ b/examples/add_job_with_custom_concurrency/main.go @@ -5,13 +5,16 @@ import ( "log" "github.com/acaloiaro/neoq" + "github.com/acaloiaro/neoq/backends/memory" + "github.com/acaloiaro/neoq/handler" + "github.com/acaloiaro/neoq/jobs" ) func main() { var err error const queue = "foobar" ctx := context.Background() - nq, err := neoq.New(ctx) + nq, err := neoq.New(ctx, neoq.WithBackend(memory.Backend)) if err != nil { log.Fatalf("error initializing neoq: %v", err) } @@ -22,24 +25,24 @@ func main() { // Concurrency and other options may be set on handlers both during creation (Option 1), or after the fact (Option 2) // Option 1: add options when creating the handler - handler := neoq.NewHandler(func(ctx context.Context) (err error) { - var j *neoq.Job - j, err = neoq.JobFromContext(ctx) + h := handler.New(func(ctx context.Context) (err error) { + var j *jobs.Job + j, err = handler.JobFromContext(ctx) log.Println("got job id:", j.ID, "messsage:", j.Payload["message"]) done <- true return - }, neoq.HandlerConcurrency(8)) + }, handler.Concurrency(8)) // Option 2: Set options after the handler is created - handler.WithOptions(neoq.HandlerConcurrency(8)) + h.WithOptions(handler.Concurrency(8)) - err = nq.Listen(ctx, queue, handler) + err = nq.Start(ctx, queue, h) if err != nil { log.Println("error listening to queue", err) } // enqueue a job - _, err = nq.Enqueue(ctx, neoq.Job{ + _, err = nq.Enqueue(ctx, &jobs.Job{ Queue: queue, Payload: map[string]interface{}{ "message": "hello, world", @@ -53,5 +56,5 @@ func main() { // job's status will be 'failed' and 'error' will be 'job exceeded its 10ms deadline' // until either the job's Sleep statement is decreased/removed or the handler's deadline is increased - // this job will continue to to fail and ultimately land on the dead jobs queue + // this job will continue to fail and ultimately land on the dead jobs queue } diff --git a/examples/add_job_with_deadline/main.go b/examples/add_job_with_deadline/main.go index c87ebe2..33fb364 100644 --- a/examples/add_job_with_deadline/main.go +++ b/examples/add_job_with_deadline/main.go @@ -6,6 +6,9 @@ import ( "time" "github.com/acaloiaro/neoq" + "github.com/acaloiaro/neoq/backends/memory" + "github.com/acaloiaro/neoq/handler" + "github.com/acaloiaro/neoq/jobs" ) func main() { @@ -16,7 +19,7 @@ func main() { // by default neoq connects to a local postgres server using: [neoq.DefaultPgConnectionString] // connection strings can be set explicitly as follows: // neoq.New(neoq.ConnectionString("postgres://username:passsword@hostname/database")) - nq, err := neoq.New(ctx) + nq, err := neoq.New(ctx, neoq.WithBackend(memory.Backend)) if err != nil { log.Fatalf("error initializing neoq: %v", err) } @@ -25,24 +28,24 @@ func main() { // this is probably not a pattern you want to use in production jobs and you see it here only for testing reasons done := make(chan bool) - handler := neoq.NewHandler(func(ctx context.Context) (err error) { - var j *neoq.Job + h := handler.New(func(ctx context.Context) (err error) { + var j *jobs.Job time.Sleep(1 * time.Second) - j, err = neoq.JobFromContext(ctx) + j, err = handler.JobFromContext(ctx) log.Println("got job id:", j.ID, "messsage:", j.Payload["message"]) done <- true return }) // this 10ms deadline will cause our job that sleeps for 1s to fail - handler.WithOptions(neoq.HandlerDeadline(10 * time.Millisecond)) + h.WithOptions(handler.Deadline(10 * time.Millisecond)) - err = nq.Listen(ctx, queue, handler) + err = nq.Start(ctx, queue, h) if err != nil { log.Println("error listening to queue", err) } - _, err = nq.Enqueue(ctx, neoq.Job{ + _, err = nq.Enqueue(ctx, &jobs.Job{ Queue: queue, Payload: map[string]interface{}{ "message": "hello, world", @@ -56,5 +59,5 @@ func main() { // job's status will be 'failed' and 'error' will be 'job exceeded its 10ms deadline' // until either the job's Sleep statement is decreased/removed or the handler's deadline is increased - // this job will continue to to fail and ultimately land on the dead jobs queue + // this job will continue to fail and ultimately land on the dead jobs queue } diff --git a/examples/add_periodic_jobs/main.go b/examples/add_periodic_jobs/main.go index 10ab836..96bc8de 100644 --- a/examples/add_periodic_jobs/main.go +++ b/examples/add_periodic_jobs/main.go @@ -6,26 +6,28 @@ import ( "time" "github.com/acaloiaro/neoq" + "github.com/acaloiaro/neoq/backends/memory" + "github.com/acaloiaro/neoq/handler" ) func main() { ctx := context.Background() - nq, err := neoq.New(ctx) + nq, err := neoq.New(ctx, neoq.WithBackend(memory.Backend)) if err != nil { log.Fatalf("error initializing neoq: %v", err) } // run a job periodically - handler := neoq.NewHandler(func(ctx context.Context) (err error) { + h := handler.New(func(ctx context.Context) (err error) { log.Println("running periodic job") return }) - handler.WithOptions( - neoq.HandlerDeadline(500*time.Millisecond), - neoq.HandlerConcurrency(1), + h.WithOptions( + handler.Deadline(500*time.Millisecond), + handler.Concurrency(1), ) - nq.ListenCron(ctx, "* * * * * *", handler) + nq.StartCron(ctx, "* * * * * *", h) time.Sleep(5 * time.Second) } diff --git a/examples/listen_for_jobs/main.go b/examples/listen_for_jobs/main.go deleted file mode 100644 index fb8bd6c..0000000 --- a/examples/listen_for_jobs/main.go +++ /dev/null @@ -1,35 +0,0 @@ -package main - -import ( - "context" - "log" - - "github.com/acaloiaro/neoq" -) - -func main() { - var err error - const queue = "foobar" - ctx := context.Background() - nq, err := neoq.New(ctx, - neoq.WithBackendName("postgres"), - neoq.WithConnectionString("postgres://postgres:postgres@127.0.0.1:5432/neoq")) - if err != nil { - log.Fatalf("error initializing neoq: %v", err) - } - - handler := neoq.NewHandler(func(ctx context.Context) (err error) { - var j *neoq.Job - j, err = neoq.JobFromContext(ctx) - log.Println("got job id:", j.ID, "messsage:", j.Payload["message"]) - return - }) - - err = nq.Listen(ctx, queue, handler) - if err != nil { - log.Println("error listening to queue", err) - } - - // this code will exit quickly since since Listen() is asynchronous - // real applications should call Listen() on startup for every queue that needs to be handled -} diff --git a/examples/start_processing_jobs/main.go b/examples/start_processing_jobs/main.go new file mode 100644 index 0000000..7632929 --- /dev/null +++ b/examples/start_processing_jobs/main.go @@ -0,0 +1,38 @@ +package main + +import ( + "context" + "log" + + "github.com/acaloiaro/neoq" + "github.com/acaloiaro/neoq/backends/postgres" + "github.com/acaloiaro/neoq/config" + "github.com/acaloiaro/neoq/handler" + "github.com/acaloiaro/neoq/jobs" +) + +func main() { + var err error + const queue = "foobar" + ctx := context.Background() + + nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), config.WithConnectionString("postgres://postgres:postgres@127.0.0.1:5432/neoq")) + if err != nil { + log.Fatalf("error initializing postgres backend: %v", err) + } + + h := handler.New(func(ctx context.Context) (err error) { + var j *jobs.Job + j, err = handler.JobFromContext(ctx) + log.Println("got job id:", j.ID, "messsage:", j.Payload["message"]) + return + }) + + err = nq.Start(ctx, queue, h) + if err != nil { + log.Println("error processing queue", err) + } + + // this code will exit quickly since Start() is asynchronous + // real applications should call Start() on startup for every queue that needs to be handled +} diff --git a/handler/handler.go b/handler/handler.go new file mode 100644 index 0000000..fb26561 --- /dev/null +++ b/handler/handler.go @@ -0,0 +1,149 @@ +package handler + +import ( + "context" + "errors" + "fmt" + "runtime" + "time" + + "github.com/acaloiaro/neoq/jobs" +) + +const ( + DefaultHandlerDeadline = 30 * time.Second +) + +type contextKey struct{} + +var ( + CtxVarsKey contextKey + ErrContextHasNoJob = errors.New("context has no Job") + ErrNoHandlerForQueue = errors.New("no handler for queue") + // TODO this error is here because cyclic imports with neoq + ErrNoProcessorForQueue = errors.New("no processor configured for queue") +) + +// Func is a function that Handlers execute for every Job on a queue +type Func func(ctx context.Context) error + +// Handler handles jobs on a queue +type Handler struct { + Handle Func + Concurrency int + Deadline time.Duration + QueueCapacity int64 +} + +// CtxVars are variables passed to every Handler context +type CtxVars struct { + Job *jobs.Job + // this is a bit hacky. Tx here contains a pgx.Tx for PgBackend, but because we're in the handlers package, and we don't + // want all neoq users to have pgx as a transitive dependency, we store Tx as any, and coerce it to a pgx.Tx inside + // the postgres backend + // TODO redesign HandlerCtxVars so it doesn't need to include a pgx.Tx + Tx any +} + +// Option is function that sets optional configuration for Handlers +type Option func(w *Handler) + +// WithOptions sets one or more options on handler +func (h *Handler) WithOptions(opts ...Option) { + for _, opt := range opts { + opt(h) + } +} + +// Deadline configures handlers with a time deadline for every executed job +// The deadline is the amount of time that can be spent executing the handler's Func +// when a deadline is exceeded, the job is failed and enters its retry phase +func Deadline(d time.Duration) Option { + return func(h *Handler) { + h.Deadline = d + } +} + +// Concurrency configures Neoq handlers to process jobs concurrently +// the default concurrency is the number of (v)CPUs on the machine running Neoq +func Concurrency(c int) Option { + return func(h *Handler) { + h.Concurrency = c + } +} + +// MaxQueueCapacity configures Handlers to enforce a maximum capacity on the queues that it handles +// queues that have reached capacity cause Enqueue() to block until the queue is below capacity +func MaxQueueCapacity(capacity int64) Option { + return func(h *Handler) { + h.QueueCapacity = capacity + } +} + +// New creates a new queue handler +func New(f Func, opts ...Option) (h Handler) { + h = Handler{ + Handle: f, + } + + h.WithOptions(opts...) + + // default to running one fewer threads than CPUs + if h.Concurrency == 0 { + Concurrency(runtime.NumCPU() - 1)(&h) + } + + // always set a job deadline if none is set + if h.Deadline == 0 { + Deadline(DefaultHandlerDeadline)(&h) + } + + return +} + +// WithContext creates a new context with the job and transaction set +func WithContext(ctx context.Context, v CtxVars) context.Context { + return context.WithValue(ctx, CtxVarsKey, v) +} + +// Exec executes handler functions with a concrete time deadline +func Exec(ctx context.Context, handler Handler) (err error) { + deadlineCtx, cancel := context.WithDeadline(ctx, time.Now().Add(handler.Deadline)) + defer cancel() + + var errCh = make(chan error, 1) + var done = make(chan bool) + go func(ctx context.Context) { + errCh <- handler.Handle(ctx) + done <- true + }(ctx) + + select { + case <-done: + err = <-errCh + if err != nil { + err = fmt.Errorf("job failed to process: %w", err) + } + + case <-deadlineCtx.Done(): + ctxErr := deadlineCtx.Err() + if errors.Is(ctxErr, context.DeadlineExceeded) { + err = fmt.Errorf("job exceeded its %s deadline: %w", handler.Deadline, ctxErr) + } else if errors.Is(ctxErr, context.Canceled) { + err = ctxErr + } else { + err = fmt.Errorf("job failed to process: %w", ctxErr) + } + } + + return +} + +// JobFromContext fetches the job from a context if the job context variable is already set +func JobFromContext(ctx context.Context) (*jobs.Job, error) { + if v, ok := ctx.Value(CtxVarsKey).(CtxVars); ok { + return v.Job, nil + } + + return nil, ErrContextHasNoJob +} diff --git a/internal/internal.go b/internal/internal.go new file mode 100644 index 0000000..430ac64 --- /dev/null +++ b/internal/internal.go @@ -0,0 +1,56 @@ +package internal + +import ( + crand "crypto/rand" + "math" + "math/big" + "math/rand" + "strings" + "time" +) + +const ( + JobStatusNew = "new" + JobStatusProcessed = "processed" + JobStatusFailed = "failed" +) + +// CalculateBackoff calculates the number of seconds to back off before the next retry +// this formula is unabashedly taken from Sidekiq because it is good. +func CalculateBackoff(retryCount int) time.Time { + const backoffExponent = 4 + const maxInt = 30 + p := int(math.Round(math.Pow(float64(retryCount), backoffExponent))) + return time.Now().Add(time.Duration(p+15+RandInt(maxInt)*retryCount+1) * time.Second) +} + +// RandInt returns a random integer up to max +func RandInt(max int) int { + if true { + r := rand.New(rand.NewSource(time.Now().UnixNano())) // nolint: gosec + return r.Intn(max) + } + + r, err := crand.Int(crand.Reader, big.NewInt(int64(max))) + if err != nil { + panic(err) + } + return int(r.Int64()) +} + +// StripNonALphanum strips nonalphanumeric characters from a string and returns a new one +// TODO Replace `StripNonAlphanum` with strings.ReplaceAll +func StripNonAlphanum(s string) string { + var result strings.Builder + for i := 0; i < len(s); i++ { + b := s[i] + if (b == '_') || + ('a' <= b && b <= 'z') || + ('A' <= b && b <= 'Z') || + ('0' <= b && b <= '9') || + b == ' ' { + result.WriteByte(b) + } + } + return result.String() +} diff --git a/jobs/jobs.go b/jobs/jobs.go new file mode 100644 index 0000000..94a1de7 --- /dev/null +++ b/jobs/jobs.go @@ -0,0 +1,64 @@ +package jobs + +import ( + "crypto/md5" // nolint: gosec + "encoding/json" + "fmt" + "io" + "time" + + "github.com/guregu/null" +) + +const ( + DuplicateJobID = -1 + UnqueuedJobID = -2 +) + +// Job contains all the data pertaining to jobs +// +// Jobs are what are placed on queues for processing. +// +// The Fingerprint field can be supplied by the user to impact job deduplication. +// TODO Factor out usage of the null package: github.com/guregu/null +type Job struct { + ID int64 `db:"id"` + Fingerprint string `db:"fingerprint"` // A md5 sum of the job's queue + payload, affects job deduplication + Status string `db:"status"` // The status of the job + Queue string `db:"queue"` // The queue the job is on + Payload map[string]any `db:"payload"` // JSON job payload for more complex jobs + RunAfter time.Time `db:"run_after"` // The time after which the job is elligible to be picked up by a worker + RanAt null.Time `db:"ran_at"` // The last time the job ran + Error null.String `db:"error"` // The last error the job elicited + Retries int `db:"retries"` // The number of times the job has retried + MaxRetries int `db:"max_retries"` // The maximum number of times the job can retry + CreatedAt time.Time `db:"created_at"` // The time the job was created +} + +// FingerprintJob fingerprints jobs as an md5 hash of its queue combined with its JSON-serialized payload +func FingerprintJob(j *Job) (err error) { + // only generate a fingerprint if the job is not already fingerprinted + if j.Fingerprint != "" { + return + } + + var js []byte + js, err = json.Marshal(j.Payload) + if err != nil { + return + } + h := md5.New() // nolint: gosec + _, err = io.WriteString(h, j.Queue) + if err != nil { + return + } + + _, err = h.Write(js) + if err != nil { + return + } + + j.Fingerprint = fmt.Sprintf("%x", h.Sum(nil)) + + return +} diff --git a/logging/logging.go b/logging/logging.go new file mode 100644 index 0000000..1df060c --- /dev/null +++ b/logging/logging.go @@ -0,0 +1,11 @@ +package logging + +// Logger interface is the interface that neoq's logger must implement +// +// This interface is a subset of [slog.Logger]. The slog interface was chosen under the assumption that its +// likely to be Golang's standard library logging interface. +type Logger interface { + Debug(msg string, args ...any) + Error(msg string, err error, args ...any) + Info(msg string, args ...any) +} diff --git a/memory_backend.go b/memory_backend.go deleted file mode 100644 index 2388da1..0000000 --- a/memory_backend.go +++ /dev/null @@ -1,303 +0,0 @@ -package neoq - -import ( - "context" - "fmt" - "os" - "sync" - "time" - - "github.com/guregu/null" - "github.com/iancoleman/strcase" - "github.com/jsuar/go-cron-descriptor/pkg/crondescriptor" - "github.com/pkg/errors" - "github.com/robfig/cron" - "golang.org/x/exp/slog" -) - -const ( - // TODO make MemBackend queue capacity configurable - defaultMemQueueCapacity = 10000 // the default capacity of individual queues - emptyCapacity = 0 -) - -// MemBackend is a memory-backed neoq backend -type MemBackend struct { - cancelFuncs []context.CancelFunc // A collection of cancel functions to be called upon Shutdown() - cron *cron.Cron - logger Logger - jobCheckInterval time.Duration // the duration of time between checking for future jobs to schedule - mu *sync.Mutex // mutext to protect mutating state on a pgWorker - jobCount int64 // number of jobs that have been queued since start - handlers sync.Map // map queue names [string] to queue handlers [Handler] - fingerprints sync.Map // map fingerprints [string] to their jobs [Job] - futureJobs sync.Map // map jobIDs [int64] to [Jobs] - queues sync.Map // map queue names [string] to queue handler channels [chan Job] -} - -func NewMemBackend(opts ...ConfigOption) (n Neoq, err error) { - mb := &MemBackend{ - cron: cron.New(), - mu: &sync.Mutex{}, - queues: sync.Map{}, - handlers: sync.Map{}, - futureJobs: sync.Map{}, - fingerprints: sync.Map{}, - logger: slog.New(slog.NewTextHandler(os.Stdout)), - jobCount: 0, - cancelFuncs: []context.CancelFunc{}, - jobCheckInterval: DefaultJobCheckInterval, - } - mb.cron.Start() - - for _, opt := range opts { - opt(mb) - } - - n = mb - - return -} - -// Enqueue queues jobs to be executed asynchronously -func (m *MemBackend) Enqueue(ctx context.Context, job Job) (jobID int64, err error) { - var queueChan chan Job - var qc any - var ok bool - - if qc, ok = m.queues.Load(job.Queue); !ok { - return UnqueuedJobID, fmt.Errorf("queue has no listeners: %s", job.Queue) - } - - queueChan = qc.(chan Job) - - // Make sure RunAfter is set to a non-zero value if not provided by the caller - // if already set, schedule the future job - now := time.Now() - if job.RunAfter.IsZero() { - job.RunAfter = now - } - - if job.Queue == "" { - err = errors.New("this job does not specify a Queue. Please specify a queue") - - return - } - - err = fingerprintJob(&job) - if err != nil { - return - } - - // if the job fingerprint is already known, don't queue the job - if _, found := m.fingerprints.Load(job.Fingerprint); found { - return DuplicateJobID, nil - } - - m.fingerprints.Store(job.Fingerprint, job) - m.mu.Lock() - m.jobCount++ - m.mu.Unlock() - - job.ID = m.jobCount - jobID = m.jobCount - - // notify listeners that a new job has arrived if it's not a future job - if job.RunAfter == now { - queueChan <- job - } else { - m.queueFutureJob(job) - } - - return -} - -// Listen listens for jobs on a queue and processes them with the given handler -func (m *MemBackend) Listen(ctx context.Context, queue string, h Handler) (err error) { - var queueCapacity = h.queueCapacity - if queueCapacity == emptyCapacity { - queueCapacity = defaultMemQueueCapacity - } - - m.handlers.Store(queue, h) - m.queues.Store(queue, make(chan Job, queueCapacity)) - - ctx, cancel := context.WithCancel(ctx) - - m.mu.Lock() - m.cancelFuncs = append(m.cancelFuncs, cancel) - m.mu.Unlock() - - err = m.start(ctx, queue) - if err != nil { - return - } - - return -} - -// ListenCron listens for jobs on a cron schedule and handles them with the provided handler -// -// See: https://pkg.go.dev/github.com/robfig/cron?#hdr-CRON_Expression_Format for details on the cron spec format -func (m *MemBackend) ListenCron(ctx context.Context, cronSpec string, h Handler) (err error) { - cd, err := crondescriptor.NewCronDescriptor(cronSpec) - if err != nil { - return err - } - - cdStr, err := cd.GetDescription(crondescriptor.Full) - if err != nil { - return err - } - - queue := stripNonAlphanum(strcase.ToSnake(*cdStr)) - - ctx, cancel := context.WithCancel(ctx) - m.mu.Lock() - m.cancelFuncs = append(m.cancelFuncs, cancel) - m.mu.Unlock() - - err = m.Listen(ctx, queue, h) - if err != nil { - return - } - - m.cron.AddFunc(cronSpec, func() { - m.Enqueue(ctx, Job{Queue: queue}) - }) - - return -} - -// SetLogger sets this backend's logger -func (m *MemBackend) SetLogger(logger Logger) { - m.logger = logger -} - -// Shutdown halts the worker -func (m *MemBackend) Shutdown(ctx context.Context) { - for _, f := range m.cancelFuncs { - f() - } - - m.cancelFuncs = nil - - return -} - -// start starts a queue listener, processes pending job, and fires up goroutines to process future jobs -func (m *MemBackend) start(ctx context.Context, queue string) (err error) { - var queueChan chan Job - var qc any - var h any - var handler Handler - var ok bool - if h, ok = m.handlers.Load(queue); !ok { - return fmt.Errorf("no handler for queue: %s", queue) - } - - if qc, ok = m.queues.Load(queue); !ok { - return fmt.Errorf("no listener configured for qeuue: %s", queue) - } - - go func() { m.scheduleFutureJobs(ctx, queue) }() - - handler = h.(Handler) - queueChan = qc.(chan Job) - - for i := 0; i < handler.concurrency; i++ { - go func() { - var err error - var job Job - - for { - select { - case job = <-queueChan: - err = m.handleJob(ctx, job, handler) - case <-ctx.Done(): - return - } - - if err != nil { - m.logger.Error("job failed", err, "job_id", job.ID) - runAfter := calculateBackoff(job.Retries) - job.RunAfter = runAfter - m.queueFutureJob(job) - } - - m.fingerprints.Delete(job.Fingerprint) - } - }() - } - return -} - -func (m *MemBackend) scheduleFutureJobs(ctx context.Context, queue string) { - // check for new future jobs on an interval - // TODO make the future jobs check interval configurable in MemBackend - ticker := time.NewTicker(m.jobCheckInterval) - - for { - // loop over list of future jobs, scheduling goroutines to wait for jobs that are due within the next 30 seconds - m.futureJobs.Range(func(k, v any) bool { - job := v.(Job) - var queueChan chan Job - - timeUntilRunAfter := time.Until(job.RunAfter) - if timeUntilRunAfter <= DefaultFutureJobWindow { - m.removeFutureJob(job.ID) - go func(j Job) { - scheduleCh := time.After(timeUntilRunAfter) - <-scheduleCh - if qc, ok := m.queues.Load(queue); ok { - queueChan = qc.(chan Job) - queueChan <- j - } else { - m.logger.Error(fmt.Sprintf("no listen channel configured for queue: %s", queue), errors.New("no listener configured")) - } - }(job) - } - - return true - }) - select { - case <-ticker.C: - continue - case <-ctx.Done(): - return - } - } -} - -func (m *MemBackend) handleJob(ctx context.Context, job Job, handler Handler) (err error) { - ctxv := handlerCtxVars{job: &job} - hctx := withHandlerContext(ctx, ctxv) - - // check if the job is being retried and increment retry count accordingly - if job.Status != JobStatusNew { - job.Retries = job.Retries + 1 - } - - // execute the queue handler of this job - err = execHandler(hctx, handler) - if err != nil { - job.Error = null.StringFrom(err.Error()) - } - - return -} - -// queueFutureJob queues a future job for eventual execution -func (m *MemBackend) queueFutureJob(job Job) { - m.fingerprints.Store(job.Fingerprint, job) - m.futureJobs.Store(job.ID, job) -} - -// removeFutureJob removes a future job from the in-memory list of jobs that will execute in the future -func (m *MemBackend) removeFutureJob(jobID int64) { - if j, ok := m.futureJobs.Load(jobID); ok { - job := j.(Job) - m.fingerprints.Delete(job.Fingerprint) - m.futureJobs.Delete(job.ID) - } -} diff --git a/neoq.go b/neoq.go index b263b64..31bc027 100644 --- a/neoq.go +++ b/neoq.go @@ -1,382 +1,59 @@ -// Package neoq provides background job processing for Go applications. -// -// Neoq's goal is to minimize the infrastructure necessary to add background job processing to Go applications. It does so by implementing queue durability with modular backends, rather than introducing a strict dependency on a particular backend such as Redis. -// -// An in-memory and Postgres backend are provided out of the box. package neoq -// TODO factor out the following dependencies -// "github.com/iancoleman/strcase" -// "github.com/jsuar/go-cron-descriptor/pkg/crondescriptor" import ( "context" - "crypto/md5" - "encoding/json" "errors" - "fmt" - "io" - "math" - "runtime" "time" - "math/rand" - - "github.com/guregu/null" - "github.com/jackc/pgx/v5" + "github.com/acaloiaro/neoq/backends/memory" + "github.com/acaloiaro/neoq/config" + "github.com/acaloiaro/neoq/types" ) -type contextKey int - -var varsKey contextKey - -const ( - JobStatusNew = "new" - JobStatusProcessed = "processed" - JobStatusFailed = "failed" - DefaultTransactionTimeout = time.Minute - DefaultHandlerDeadline = 30 * time.Second - DuplicateJobID = -1 - UnqueuedJobID = -2 - DefaultJobCheckInterval = 5 * time.Second - // the window of time between time.Now() and when a job's RunAfter comes due that neoq will schedule a goroutine to - // schdule the job for execution. - // E.g. right now is 16:00 and a job's RunAfter is 16:30 of the same date. This job will get a dedicated goroutine to - // wait until the job's RunAfter, scheduling the job to be run exactly at RunAfter - DefaultFutureJobWindow = 30 * time.Second +var ( + ErrConnectionStringRequired = errors.New("a connection string is required for this backend. See [config.WithConnectionString]") + ErrNoBackendSpecified = errors.New("please specify a backend by using [config.WithBackend]") ) -// Neoq interface is Neoq's primary API -type Neoq interface { - // Enqueue queues jobs to be executed asynchronously - Enqueue(ctx context.Context, job Job) (jobID int64, err error) - - // Listen listens for jobs on a queue and processes them with the given handler - Listen(ctx context.Context, queue string, h Handler) (err error) - - // ListenCron listens for jobs on a cron schedule and processes them with the given handler - // - // See: https://pkg.go.dev/github.com/robfig/cron?#hdr-CRON_Expression_Format for details on the cron spec format - ListenCron(ctx context.Context, cron string, h Handler) (err error) - - // SetLogger sets the logger for a backend - SetLogger(logger Logger) - - // Shutdown halts job processing and releases resources - Shutdown(ctx context.Context) -} - -// Logger interface is the interface that neoq's logger must implement +// New creates a new backend instance for job processing. // -// This interface is a subset of [slog.Logger]. The slog interface was chosen under the assumption that its -// likely to be Golang's standard library logging interface. -type Logger interface { - Debug(msg string, args ...any) - Error(msg string, err error, args ...any) - Info(msg string, args ...any) -} - -// HandlerFunc is a function that Handlers execute for every Job on a queue -type HandlerFunc func(ctx context.Context) error - -// Handler handles jobs on a queue -type Handler struct { - handle HandlerFunc - concurrency int - deadline time.Duration - queueCapacity int64 -} - -// HandlerOption is function that sets optional configuration for Handlers -type HandlerOption func(w *Handler) - -// WithOptions sets one or more options on handler -func (h *Handler) WithOptions(opts ...HandlerOption) { - for _, opt := range opts { - opt(h) - } -} - -// HandlerDeadline configures handlers with a time deadline for every executed job -// The deadline is the amount of time that can be spent executing the handler's HandlerFunc -// when a deadline is exceeded, the job is failed and enters its retry phase -func HandlerDeadline(d time.Duration) HandlerOption { - return func(h *Handler) { - h.deadline = d - } -} - -// HandlerConcurrency configures Neoq handlers to process jobs concurrently -// the default concurrency is the number of (v)CPUs on the machine running Neoq -func HandlerConcurrency(c int) HandlerOption { - return func(h *Handler) { - h.concurrency = c - } -} - -// MaxQueueCapacity configures Handlers to enforce a maximum capacity on the queues that it handles -// queues that have reached capacity cause Enqueue() to block until the queue is below capacity -func MaxQueueCapacity(capacity int64) HandlerOption { - return func(h *Handler) { - h.queueCapacity = capacity - } -} - -// NewHandler creates a new queue handler -func NewHandler(f HandlerFunc, opts ...HandlerOption) (h Handler) { - h = Handler{ - handle: f, - } - - h.WithOptions(opts...) - - // default to running one fewer threads than CPUs - if h.concurrency == 0 { - HandlerConcurrency(runtime.NumCPU() - 1)(&h) - } - - // always set a job deadline if none is set - if h.deadline == 0 { - HandlerDeadline(DefaultHandlerDeadline)(&h) - } - - return -} - -// ConfigOption is a function that sets optional Neoq configuration -type ConfigOption func(n Neoq) - -// Job contains all the data pertaining to jobs +// By default, neoq initializes [memory.Backend] if New() is called without a backend configuration option. // -// Jobs are what are placed on queues for processing. +// Use [neoq.WithBackend] to initialize different backends. // -// The Fingerprint field can be supplied by the user to impact job deduplication. -// TODO Factor out usage of the null package: github.com/guregu/null -type Job struct { - ID int64 `db:"id"` - Fingerprint string `db:"fingerprint"` // A md5 sum of the job's queue + payload, affects job deduplication - Status string `db:"status"` // The status of the job - Queue string `db:"queue"` // The queue the job is on - Payload map[string]any `db:"payload"` // JSON job payload for more complex jobs - RunAfter time.Time `db:"run_after"` // The time after which the job is elligible to be picked up by a worker - RanAt null.Time `db:"ran_at"` // The last time the job ran - Error null.String `db:"error"` // The last error the job elicited - Retries int `db:"retries"` // The number of times the job has retried - MaxRetries int `db:"max_retries"` // The maximum number of times the job can retry - CreatedAt time.Time `db:"created_at"` // The time the job was created -} - -// New creates a new Neoq instance for listening to queues and enqueing new jobs -func New(ctx context.Context, opts ...ConfigOption) (n Neoq, err error) { - ic := internalConfig{} +// For available configuration options see [config.ConfigOption]. +func New(ctx context.Context, opts ...config.Option) (b types.Backend, err error) { + c := config.Config{} for _, opt := range opts { - opt(&ic) + opt(&c) } - if ic.backend != nil { - n = *ic.backend - return + if c.BackendInitializer == nil { + c.BackendInitializer = memory.Backend } - switch ic.backendName { - case postgresBackendName: - if ic.connectionString == "" { - err = errors.New("your must provide a postgres connection string to initialize the postgres backend: see neoq.ConnectionString(...)") - return - } - n, err = NewPgBackend(ctx, ic.connectionString, opts...) - default: - n, err = NewMemBackend(opts...) - } - - return -} - -// JobFromContext fetches the job from a context if the job context variable is already set -func JobFromContext(ctx context.Context) (j *Job, err error) { - if v, ok := ctx.Value(varsKey).(handlerCtxVars); ok { - j = v.job - } else { - err = errors.New("context does not have a Job set") + b, err = c.BackendInitializer(ctx, opts...) + if err != nil { + return } return } -// WithBackendName configures neoq to create a new backend with the given name upon -// initialization -func WithBackendName(backendName string) ConfigOption { - return func(n Neoq) { - switch c := n.(type) { - case *internalConfig: - c.backendName = backendName - default: - } +// WithBackend configures neoq to initialize a specific backend for job processing. +// +// Neoq provides two [config.BackendInitializer] that may be used with WithBackend +// - [pkg/github.com/acaloiaro/neoq/backends/memory.Backend] +// - [pkg/github.com/acaloiaro/neoq/backends/postgres.Backend] +func WithBackend(initializer config.BackendInitializer) config.Option { + return func(c *config.Config) { + c.BackendInitializer = initializer } } // WithJobCheckInterval configures the duration of time between checking for future jobs -func WithJobCheckInterval(interval time.Duration) ConfigOption { - return func(n Neoq) { - switch b := n.(type) { - case *internalConfig: - b.jobCheckInterval = interval - case *MemBackend: - b.jobCheckInterval = interval - case *PgBackend: - b.jobCheckInterval = interval - default: - } - } -} - -// WithBackend configures neoq to use the specified backend rather than initializing a new one -// during initialization -func WithBackend(backend Neoq) ConfigOption { - return func(n Neoq) { - switch c := n.(type) { - case *internalConfig: - c.backend = &backend - default: - } - } -} - -// WithConnectionString configures neoq to use connection string for backend initialization -func WithConnectionString(connectionString string) ConfigOption { - return func(n Neoq) { - switch c := n.(type) { - case *internalConfig: - c.connectionString = connectionString - case *PgBackend: - c.config.connectString = connectionString - default: - } - } -} - -// WithLogger configures neoq to use the spcified logger -func WithLogger(logger Logger) ConfigOption { - return func(n Neoq) { - switch b := n.(type) { - case *MemBackend: - b.logger = logger - case *PgBackend: - b.logger = logger - default: - } - } -} - -// handlerCtxVars are variables passed to every Handler context -type handlerCtxVars struct { - job *Job - tx pgx.Tx -} - -// withHandlerContext creates a new context with the job and transaction set -func withHandlerContext(ctx context.Context, v handlerCtxVars) context.Context { - return context.WithValue(ctx, varsKey, v) -} - -// txFromContext gets the transaction from a context, if the the transaction is already set -func txFromContext(ctx context.Context) (t pgx.Tx, err error) { - if v, ok := ctx.Value(varsKey).(handlerCtxVars); ok { - t = v.tx - } else { - err = errors.New("context does not have a Tx set") - } - - return -} - -// calculateBackoff calculates the number of seconds to back off before the next retry -// this formula is unabashedly taken from Sidekiq because it is good. -func calculateBackoff(retryCount int) time.Time { - p := int(math.Round(math.Pow(float64(retryCount), 4))) - return time.Now().Add(time.Duration(p+15+randInt(30)*retryCount+1) * time.Second) -} - -func randInt(max int) int { - r := rand.New(rand.NewSource(time.Now().UnixNano())) - return r.Intn(max) -} - -// internalConfig models internal neoq configuratio not exposed to users -type internalConfig struct { - backendName string // the name of a known backend - backend *Neoq // user-provided backend to use - connectionString string // a connection string to use connecting to a backend - jobCheckInterval time.Duration // the duration of time between checking for future jobs to schedule -} - -func (i internalConfig) Enqueue(ctx context.Context, job Job) (jobID int64, err error) { - return -} - -func (i internalConfig) Listen(ctx context.Context, queue string, h Handler) (err error) { - return -} - -func (i internalConfig) ListenCron(ctx context.Context, cron string, h Handler) (err error) { - return -} - -func (i internalConfig) SetLogger(logger Logger) { - return -} - -func (i internalConfig) Shutdown(ctx context.Context) {} - -// exechandler executes handler functions with a concrete time deadline -func execHandler(ctx context.Context, handler Handler) (err error) { - deadlineCtx, cancel := context.WithDeadline(ctx, time.Now().Add(handler.deadline)) - defer cancel() - - var errCh = make(chan error, 1) - var done = make(chan bool) - go func(ctx context.Context) (e error) { - errCh <- handler.handle(ctx) - done <- true - return - }(ctx) - - select { - case <-done: - err = <-errCh - if err != nil { - err = fmt.Errorf("job failed to process: %w", err) - } - - case <-deadlineCtx.Done(): - ctxErr := deadlineCtx.Err() - if errors.Is(ctxErr, context.DeadlineExceeded) { - err = fmt.Errorf("job exceeded its %s deadline: %w", handler.deadline, ctxErr) - } else if errors.Is(ctxErr, context.Canceled) { - err = nil - } else { - err = fmt.Errorf("job failed to process: %w", ctxErr) - } - } - - return -} - -// fingerprintJob fingerprints jobs as an md5 hash of its queue combined with its JSON-serialized payload -func fingerprintJob(j *Job) (err error) { - // only generate a fingerprint if the job is not already fingerprinted - if j.Fingerprint != "" { - return +func WithJobCheckInterval(interval time.Duration) config.Option { + return func(c *config.Config) { + c.JobCheckInterval = interval } - - var js []byte - js, err = json.Marshal(j.Payload) - if err != nil { - return - } - h := md5.New() - io.WriteString(h, j.Queue) - io.WriteString(h, string(js)) - j.Fingerprint = fmt.Sprintf("%x", h.Sum(nil)) - - return } diff --git a/neoq_test.go b/neoq_test.go index e315e0a..43ff9f9 100644 --- a/neoq_test.go +++ b/neoq_test.go @@ -5,33 +5,89 @@ import ( "errors" "fmt" "log" + "os" "strings" "testing" "time" + + "github.com/acaloiaro/neoq/backends/memory" + "github.com/acaloiaro/neoq/backends/postgres" + "github.com/acaloiaro/neoq/config" + "github.com/acaloiaro/neoq/handler" + "github.com/acaloiaro/neoq/jobs" + "github.com/acaloiaro/neoq/testutils" ) var errTrigger = errors.New("triggerering a log error") var errPeriodicTimeout = errors.New("timed out waiting for periodic job") -type testLogger struct { - l *log.Logger - done chan bool +func ExampleNew() { + ctx := context.Background() + nq, err := New(ctx, WithBackend(memory.Backend)) + if err != nil { + fmt.Println("initializing a new Neoq with no params should not return an error:", err) + return + } + defer nq.Shutdown(ctx) + + fmt.Println("neoq initialized with default memory backend") + // Output: neoq initialized with default memory backend } -func (h testLogger) Info(m string, args ...any) { - h.l.Println(m) - h.done <- true +func ExampleNew_postgres() { + ctx := context.Background() + var pgURL string + var ok bool + if pgURL, ok = os.LookupEnv("TEST_DATABASE_URL"); !ok { + fmt.Println("Please set TEST_DATABASE_URL environment variable") + return + } + + nq, err := New(ctx, WithBackend(postgres.Backend), config.WithConnectionString(pgURL)) + if err != nil { + fmt.Println("neoq's postgres backend failed to initialize:", err) + return + } + defer nq.Shutdown(ctx) + + fmt.Println("neoq initialized with postgres backend") + // Output: neoq initialized with postgres backend } -func (h testLogger) Debug(m string, args ...any) { - h.l.Println(m) - h.done <- true + +func ExampleWithBackend() { + ctx := context.Background() + nq, err := New(ctx, WithBackend(memory.Backend)) + if err != nil { + fmt.Println("initializing a new Neoq with no params should not return an error:", err) + return + } + defer nq.Shutdown(ctx) + + fmt.Println("neoq initialized with memory backend") + // Output: neoq initialized with memory backend } -func (h testLogger) Error(m string, err error, args ...any) { - h.l.Println(m, err) - h.done <- true + +func ExampleWithBackend_postgres() { + ctx := context.Background() + var pgURL string + var ok bool + if pgURL, ok = os.LookupEnv("TEST_DATABASE_URL"); !ok { + fmt.Println("Please set TEST_DATABASE_URL environment variable") + return + } + + nq, err := New(ctx, WithBackend(postgres.Backend), config.WithConnectionString(pgURL)) + if err != nil { + fmt.Println("initializing a new Neoq with no params should not return an error:", err) + return + } + defer nq.Shutdown(ctx) + + fmt.Println("neoq initialized with postgres backend") + // Output: neoq initialized with postgres backend } -func TestWorkerListenConn(t *testing.T) { +func TestStart(t *testing.T) { const queue = "testing" timeout := false numJobs := 1 @@ -39,38 +95,32 @@ func TestWorkerListenConn(t *testing.T) { var done = make(chan bool, numJobs) ctx := context.TODO() - backend, err := NewMemBackend() - if err != nil { - t.Fatal(err) - } - - nq, err := New(ctx, WithBackend(backend)) + nq, err := New(ctx, WithBackend(memory.Backend)) if err != nil { t.Fatal(err) } defer nq.Shutdown(ctx) - handler := NewHandler(func(ctx context.Context) (err error) { + h := handler.New(func(ctx context.Context) (err error) { done <- true return }) - handler.WithOptions( - HandlerDeadline(500*time.Millisecond), - HandlerConcurrency(1), + h.WithOptions( + handler.Deadline(500*time.Millisecond), + handler.Concurrency(1), ) + // process jobs on the test queue + err = nq.Start(ctx, queue, h) if err != nil { t.Error(err) } - // Listen for jobs on the queue - nq.Listen(ctx, queue, handler) - - // allow time for listener to start + // allow time for processor to start time.Sleep(5 * time.Millisecond) for i := 0; i < numJobs; i++ { - jid, err := nq.Enqueue(ctx, Job{ + jid, err := nq.Enqueue(ctx, &jobs.Job{ Queue: queue, Payload: map[string]interface{}{ "message": fmt.Sprintf("hello world: %d", i), @@ -85,7 +135,7 @@ func TestWorkerListenConn(t *testing.T) { select { case <-time.After(5 * time.Second): timeout = true - err = errors.New("timed out waiting for job") + err = errors.New("timed out waiting for job") // nolint: goerr113 case <-done: doneCnt++ } @@ -104,32 +154,31 @@ func TestWorkerListenConn(t *testing.T) { } } -func TestWorkerListenCron(t *testing.T) { +func TestStartCron(t *testing.T) { const cron = "* * * * * *" ctx := context.TODO() - nq, err := New(ctx) + nq, err := New(ctx, WithBackend(memory.Backend)) if err != nil { t.Fatal(err) } defer nq.Shutdown(ctx) var done = make(chan bool) - handler := NewHandler(func(ctx context.Context) (err error) { + h := handler.New(func(ctx context.Context) (err error) { done <- true return }) - handler.WithOptions( - HandlerDeadline(500*time.Millisecond), - HandlerConcurrency(1), + h.WithOptions( + handler.Deadline(500*time.Millisecond), + handler.Concurrency(1), ) + err = nq.StartCron(ctx, cron, h) if err != nil { t.Error(err) } - nq.ListenCron(ctx, cron, handler) - // allow time for listener to start time.Sleep(5 * time.Millisecond) @@ -144,20 +193,21 @@ func TestWorkerListenCron(t *testing.T) { } } -func TestNeoqAddLogger(t *testing.T) { +func TestSetLogger(t *testing.T) { const queue = "testing" var done = make(chan bool) - + buf := &strings.Builder{} ctx := context.TODO() - buf := &strings.Builder{} - nq, err := New(ctx, WithLogger(testLogger{l: log.New(buf, "", 0), done: done})) + nq, err := New(ctx, WithBackend(memory.Backend)) if err != nil { t.Fatal(err) } defer nq.Shutdown(ctx) - handler := NewHandler(func(ctx context.Context) (err error) { + nq.SetLogger(testutils.TestLogger{L: log.New(buf, "", 0), Done: done}) + + h := handler.New(func(ctx context.Context) (err error) { err = errTrigger return }) @@ -165,13 +215,12 @@ func TestNeoqAddLogger(t *testing.T) { t.Error(err) } - // Listen for jobs on the queue - err = nq.Listen(ctx, queue, handler) + err = nq.Start(ctx, queue, h) if err != nil { t.Error(err) } - _, err = nq.Enqueue(ctx, Job{Queue: queue}) + _, err = nq.Enqueue(ctx, &jobs.Job{Queue: queue}) if err != nil { t.Error(err) } diff --git a/testutils/testutils.go b/testutils/testutils.go new file mode 100644 index 0000000..a90f0a6 --- /dev/null +++ b/testutils/testutils.go @@ -0,0 +1,29 @@ +//go:build testing + +package testutils + +import "log" + +// TestLogger is a utility for logging in tests +type TestLogger struct { + L *log.Logger + Done chan bool +} + +// Info prints to stdout and signals its done channel +func (h TestLogger) Info(m string, args ...any) { + h.L.Println(m) + h.Done <- true +} + +// Debug prints to stdout and signals its done channel +func (h TestLogger) Debug(m string, args ...any) { + h.L.Println(m) + h.Done <- true +} + +// Error prints to stdout and signals its done channel +func (h TestLogger) Error(m string, err error, args ...any) { + h.L.Println(m, err) + h.Done <- true +} diff --git a/types/types.go b/types/types.go new file mode 100644 index 0000000..76dece1 --- /dev/null +++ b/types/types.go @@ -0,0 +1,33 @@ +package types + +import ( + "context" + + "github.com/acaloiaro/neoq/handler" + "github.com/acaloiaro/neoq/jobs" + "github.com/acaloiaro/neoq/logging" +) + +// Backend interface is Neoq's primary API +// +// Backend is implemented by: +// - [pkg/github.com/acaloiaro/neoq/backends/memory.PgBackend] +// - [pkg/github.com/acaloiaro/neoq/backends/postgres.PgBackend] +type Backend interface { + // Enqueue queues jobs to be executed asynchronously + Enqueue(ctx context.Context, job *jobs.Job) (jobID int64, err error) + + // Start starts processing jobs with the specified queue and handler + Start(ctx context.Context, queue string, h handler.Handler) (err error) + + // StartCron starts processing jobs with the specified cron schedule and handler + // + // See: https://pkg.go.dev/github.com/robfig/cron?#hdr-CRON_Expression_Format for details on the cron spec format + StartCron(ctx context.Context, cron string, h handler.Handler) (err error) + + // SetLogger sets the backend logger + SetLogger(logger logging.Logger) + + // Shutdown halts job processing and releases resources + Shutdown(ctx context.Context) +}