Skip to content

Commit

Permalink
fix(postgres): jobs may never get processed if no workers hear the in…
Browse files Browse the repository at this point in the history
…itial announcement

When jobs are inserted into the 'neoq_jobs', the `announce_job` trigger announces the job to
listeners of the job's queue. However, if no workers are available to receive the announcement,
or the announcement is somehow lost, it meant those jobs were marooned on the queue.

This fix adds a periodic check for post-due pending jobs to ensure that a neoq restart is
not necessary to catch marooned, overdue jobs.
  • Loading branch information
acaloiaro committed Jan 25, 2025
1 parent a002d3f commit 2e890c4
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 23 deletions.
70 changes: 48 additions & 22 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@ const (
AND status NOT IN ('processed')
FOR UPDATE SKIP LOCKED
LIMIT 1`
PendingJobIDQuery = `SELECT id
PendingJobIDsQuery = `SELECT id
FROM neoq_jobs
WHERE queue = $1
AND status NOT IN ('processed')
AND run_after <= NOW()
ORDER BY created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1`
LIMIT 100`
FutureJobQuery = `SELECT id,fingerprint,queue,status,deadline,payload,retries,max_retries,run_after,ran_at,created_at,error
FROM neoq_jobs
WHERE queue = $1
Expand Down Expand Up @@ -674,8 +675,7 @@ func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) {
return fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, h.Queue)
}

pendingJobsChan := p.pendingJobs(ctx, h.Queue) // process overdue jobs *at startup*

pendingJobsChan := p.processPendingJobs(ctx, h.Queue)
// wait for the listener to connect and be ready to listen
for q := range p.readyQueues {
if q == h.Queue {
Expand All @@ -688,7 +688,7 @@ func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) {
p.readyQueues <- q
}

// process all future jobs and retries
// process all future jobs
go func() { p.scheduleFutureJobs(ctx, h.Queue) }()

for i := 0; i < h.Concurrency; i++ {
Expand Down Expand Up @@ -754,7 +754,7 @@ func (p *PgBackend) initFutureJobs(ctx context.Context, queue string) (err error
return
}

// scheduleFutureJobs announces future jobs using NOTIFY on an interval
// scheduleFutureJobs monitors the future job list for upcoming jobs and announces them to be processed by available workers
func (p *PgBackend) scheduleFutureJobs(ctx context.Context, queue string) {
err := p.initFutureJobs(ctx, queue)
if err != nil {
Expand All @@ -772,8 +772,8 @@ func (p *PgBackend) scheduleFutureJobs(ctx context.Context, queue string) {
if timeUntillRunAfter <= p.config.FutureJobWindow {
delete(p.futureJobs, jobID)
go func(jid string, j *jobs.Job) {
scheduleCh := time.After(timeUntillRunAfter)
<-scheduleCh
jobDue := time.After(timeUntillRunAfter)
<-jobDue
p.announceJob(ctx, j.Queue, jid)
}(jobID, job)
}
Expand Down Expand Up @@ -823,9 +823,11 @@ func (p *PgBackend) announceJob(ctx context.Context, queue, jobID string) {
}
}

func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan *pgconn.Notification) {
jobsCh = make(chan *pgconn.Notification)

// processPendingJobs starts a goroutine that periodically fetches pendings jobs and announces them to workers.
//
// Past due jobs are fetched on the interval [neoq.Config.JobCheckInterval]
// nolint: cyclop
func (p *PgBackend) processPendingJobs(ctx context.Context, queue string) (jobsCh chan *pgconn.Notification) {
conn, err := p.acquire(ctx)
if err != nil {
p.logger.Error(
Expand All @@ -836,28 +838,43 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan
return
}

// check for new past-due jobs on an interval
ticker := time.NewTicker(p.config.JobCheckInterval)
go func(ctx context.Context) {
defer conn.Release()

// check for pending jobs on an interval until the context is canceled
for {
jobID, err := p.getPendingJobID(ctx, conn, queue)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) || errors.Is(err, context.Canceled) {
break
}
select {
case <-ctx.Done():
return
default:
}
jobIDs, err := p.getPendingJobIDs(ctx, conn, queue)
if errors.Is(err, context.Canceled) {
return
}

if errors.Is(err, pgx.ErrNoRows) {
<-ticker.C
continue
}

if err != nil {
p.logger.Error(
"failed to fetch pending job",
slog.String("queue", queue),
slog.Any("error", err),
slog.String("job_id", jobID),
)
} else {
jobsCh <- &pgconn.Notification{Channel: queue, Payload: jobID}
break
}

for _, jid := range jobIDs {
jobsCh <- &pgconn.Notification{Channel: queue, Payload: jid}
}
}
}(ctx)

jobsCh = make(chan *pgconn.Notification)
return jobsCh
}

Expand Down Expand Up @@ -1030,8 +1047,17 @@ func (p *PgBackend) getJob(ctx context.Context, tx pgx.Tx, jobID string) (job *j
return
}

func (p *PgBackend) getPendingJobID(ctx context.Context, conn *pgxpool.Conn, queue string) (jobID string, err error) {
err = conn.QueryRow(ctx, PendingJobIDQuery, queue).Scan(&jobID)
func (p *PgBackend) getPendingJobIDs(ctx context.Context, conn *pgxpool.Conn, queue string) (jobIDs []string, err error) {
var rows pgx.Rows
rows, err = conn.Query(ctx, PendingJobIDsQuery, queue)
for rows.Next() {
var jid int64
err = rows.Scan(&jid)
if err != nil {
return
}
jobIDs = append(jobIDs, fmt.Sprint(jid))
}
return
}

Expand Down
4 changes: 4 additions & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
{
packages = with pkgs; [
automake
gcc
go_1_21
gomod2nix.legacyPackages.${system}.gomod2nix
gotools
Expand Down Expand Up @@ -64,6 +65,9 @@
CREATE USER postgres WITH PASSWORD 'postgres' SUPERUSER;
CREATE DATABASE neoq;
'';
settings = {
log_statement = "all";
};
};

redis = {
Expand Down
2 changes: 1 addition & 1 deletion neoq.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Config struct {
BackendAuthPassword string // password with which to authenticate to the backend
BackendConcurrency int // total number of backend processes available to process jobs
ConnectionString string // a string containing connection details for the backend
JobCheckInterval time.Duration // the interval of time between checking for new future/retry jobs
JobCheckInterval time.Duration // the interval of time between checking for new future/past-due jobs
FutureJobWindow time.Duration // time duration between current time and job.RunAfter that future jobs get scheduled
IdleTransactionTimeout int // number of milliseconds PgBackend transaction may idle before the connection is killed
ShutdownTimeout time.Duration // duration to wait for jobs to finish during shutdown
Expand Down

0 comments on commit 2e890c4

Please sign in to comment.