Skip to content

Commit

Permalink
fix: #98 retries can be picked up by wrong handler
Browse files Browse the repository at this point in the history
Fixes a bug that can allow retries to end up on
the wrong queue in settings where there are
multiple handlers.
  • Loading branch information
acaloiaro committed Oct 21, 2023
1 parent 8b21247 commit 4ec478c
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 25 deletions.
49 changes: 25 additions & 24 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,20 @@ import (
var migrationsFS embed.FS

const (
PendingJobIDQuery = `SELECT id
JobQuery = `SELECT id,fingerprint,queue,status,deadline,payload,retries,max_retries,run_after,ran_at,created_at,error
FROM neoq_jobs
WHERE queue = $1
WHERE id = $1
AND status NOT IN ('processed')
AND run_after <= NOW()
FOR UPDATE SKIP LOCKED
LIMIT 1`
PendingJobQuery = `SELECT id,fingerprint,queue,status,deadline,payload,retries,max_retries,run_after,ran_at,created_at,error
PendingJobIDQuery = `SELECT id
FROM neoq_jobs
WHERE id = $1
WHERE queue = $1
AND status NOT IN ('processed')
AND run_after <= NOW()
FOR UPDATE SKIP LOCKED
LIMIT 1`
FutureJobQuery = `SELECT id,run_after
FutureJobQuery = `SELECT id,fingerprint,queue,status,deadline,payload,retries,max_retries,run_after,ran_at,created_at,error
FROM neoq_jobs
WHERE queue = $1
AND status NOT IN ('processed')
Expand Down Expand Up @@ -75,7 +74,7 @@ type PgBackend struct {
cron *cron.Cron
mu *sync.RWMutex // mutex to protect mutating state on a pgWorker
pool *pgxpool.Pool
futureJobs map[string]time.Time // map of future job IDs to their due time
futureJobs map[string]*jobs.Job // map of future job IDs to the corresponding job record
handlers map[string]handler.Handler // a map of queue names to queue handlers
cancelFuncs []context.CancelFunc // A collection of cancel functions to be called upon Shutdown()
}
Expand Down Expand Up @@ -112,7 +111,7 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err
mu: &sync.RWMutex{},
config: cfg,
handlers: make(map[string]handler.Handler),
futureJobs: make(map[string]time.Time),
futureJobs: make(map[string]*jobs.Job),
cron: cron.New(),
cancelFuncs: []context.CancelFunc{},
}
Expand Down Expand Up @@ -322,7 +321,7 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e
// add future jobs to the future job list
if job.RunAfter.After(time.Now().UTC()) {
p.mu.Lock()
p.futureJobs[jobID] = job.RunAfter
p.futureJobs[jobID] = job
p.mu.Unlock()
p.logger.Debug("added job to future jobs list", "queue", job.Queue, "job_id", jobID, "run_after", job.RunAfter)
}
Expand Down Expand Up @@ -511,7 +510,7 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) {

if time.Until(runAfter) > 0 {
p.mu.Lock()
p.futureJobs[fmt.Sprint(job.ID)] = runAfter
p.futureJobs[fmt.Sprint(job.ID)] = job
p.mu.Unlock()
}

Expand Down Expand Up @@ -579,14 +578,16 @@ func (p *PgBackend) initFutureJobs(ctx context.Context, queue string) (err error
return
}

var id string
var runAfter time.Time
_, err = pgx.ForEachRow(rows, []any{&id, &runAfter}, func() error {
futureJobs, err := pgx.CollectRows(rows, pgx.RowToAddrOfStructByName[jobs.Job])
if err != nil {
return
}

for _, job := range futureJobs {
p.mu.Lock()
p.futureJobs[id] = runAfter
p.futureJobs[fmt.Sprintf("%d", job.ID)] = job
p.mu.Unlock()
return nil
})
}

return
}
Expand All @@ -604,15 +605,15 @@ func (p *PgBackend) scheduleFutureJobs(ctx context.Context, queue string) {
for {
// loop over list of future jobs, scheduling goroutines to wait for jobs that are due within the next 30 seconds
p.mu.Lock()
for jobID, runAfter := range p.futureJobs {
timeUntillRunAfter := time.Until(runAfter)
for jobID, job := range p.futureJobs {
timeUntillRunAfter := time.Until(job.RunAfter)
if timeUntillRunAfter <= p.config.FutureJobWindow {
delete(p.futureJobs, jobID)
go func(jid string) {
go func(jid string, j *jobs.Job) {
scheduleCh := time.After(timeUntillRunAfter)
<-scheduleCh
p.announceJob(ctx, queue, jid)
}(jobID)
p.announceJob(ctx, j.Queue, jid)
}(jobID, job)
}
}
p.mu.Unlock()
Expand Down Expand Up @@ -708,7 +709,7 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handl
}
defer func(ctx context.Context) { _ = tx.Rollback(ctx) }(ctx) // rollback has no effect if the transaction has been committed

job, err = p.getPendingJob(ctx, tx, jobID)
job, err = p.getJob(ctx, tx, jobID)
if err != nil {
return
}
Expand Down Expand Up @@ -815,8 +816,8 @@ func (p *PgBackend) release(ctx context.Context, conn *pgxpool.Conn, queue strin
conn.Release()
}

func (p *PgBackend) getPendingJob(ctx context.Context, tx pgx.Tx, jobID string) (job *jobs.Job, err error) {
row, err := tx.Query(ctx, PendingJobQuery, jobID)
func (p *PgBackend) getJob(ctx context.Context, tx pgx.Tx, jobID string) (job *jobs.Job, err error) {
row, err := tx.Query(ctx, JobQuery, jobID)
if err != nil {
return
}
Expand Down
101 changes: 100 additions & 1 deletion backends/postgres/postgres_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ func TestMultipleProcessors(t *testing.T) {

// From one of the neoq clients, enqueue several jobs. At least one per processor registered above.
nq := neos[0]
wg.Add(ConcurrentWorkers)
for i := 0; i < ConcurrentWorkers; i++ {
wg.Add(1)
ctx := context.Background()
deadline := time.Now().UTC().Add(10 * time.Second)
jid, e := nq.Enqueue(ctx, &jobs.Job{
Expand Down Expand Up @@ -498,6 +498,7 @@ results_loop:
}

// Test_MoveJobsToDeadQueue tests that when a job's MaxRetries is reached, that the job is moved ot the dead queue successfully
// https://github.com/acaloiaro/neoq/issues/98
func Test_MoveJobsToDeadQueue(t *testing.T) {
connString, conn := prepareAndCleanupDB(t)
const queue = "testing"
Expand Down Expand Up @@ -639,3 +640,101 @@ func TestJobEnqueuedSeparately(t *testing.T) {
t.Error(err)
}
}

// TestBasicJobMultipleQueueWithError tests that the postgres backend is able to process jobs on multiple queues
// and retries occur
// https://github.com/acaloiaro/neoq/issues/98
// nolint: gocyclo
func TestBasicJobMultipleQueueWithError(t *testing.T) {
connString, conn := prepareAndCleanupDB(t)
const queue = "testing"
const queue2 = "testing2"

ctx := context.TODO()
nq, err := neoq.New(ctx,
neoq.WithBackend(postgres.Backend),
neoq.WithLogLevel(logging.LogLevelDebug),
postgres.WithConnectionString(connString))
if err != nil {
t.Fatal(err)
}
defer nq.Shutdown(ctx)

h := handler.New(queue, func(_ context.Context) (err error) {
return
})

h2 := handler.New(queue2, func(_ context.Context) (err error) {
panic("no good")
})

err = nq.Start(ctx, h)
if err != nil {
t.Error(err)
}

err = nq.Start(ctx, h2)
if err != nil {
t.Error(err)
}

job2Chan := make(chan string, 100)
go func() {
jid, err := nq.Enqueue(ctx, &jobs.Job{
Queue: queue,
Payload: map[string]interface{}{
"message": fmt.Sprintf("should not fail: %d", internal.RandInt(10000000000)),
},
})
if err != nil || jid == jobs.DuplicateJobID {
t.Error(err)
}

maxRetries := 1
jid2, err := nq.Enqueue(ctx, &jobs.Job{
Queue: queue2,
Payload: map[string]interface{}{
"message": fmt.Sprintf("should fail: %d", internal.RandInt(10000000000)),
},
MaxRetries: &maxRetries,
})
if err != nil || jid2 == jobs.DuplicateJobID {
t.Error(err)
}

job2Chan <- jid2
}()

// wait for the job to process before waiting for updates
jid2 := <-job2Chan

// ensure job has fields set correctly
maxWait := time.Now().Add(30 * time.Second)
var status string
for {
if time.Now().After(maxWait) {
break
}

err = conn.
QueryRow(context.Background(), "SELECT status FROM neoq_dead_jobs WHERE id = $1", jid2).
Scan(&status)

if err == nil {
break
}

// jid2 is empty until the job gets queued
if jid2 == "" || err != nil && errors.Is(err, pgx.ErrNoRows) {
time.Sleep(100 * time.Millisecond)
continue
} else if err != nil {
t.Error(err)
return
}
}

if status != internal.JobStatusFailed {
t.Error("should be dead")
}
}

0 comments on commit 4ec478c

Please sign in to comment.