Skip to content

Commit

Permalink
fix(postgres): jobs may never get processed if no workers hear the in…
Browse files Browse the repository at this point in the history
…itial announcement

When jobs are inserted into the 'neoq_jobs', the `announce_job` trigger announces the job to
listeners of the job's queue. However, if no workers are available to receive the announcement,
or the announcement is somehow lost, it meant those jobs were marooned on the queue.

This fix adds a periodic check for post-due pending jobs to ensure that a neoq restart is
not necessary to catch marooned, overdue jobs.
  • Loading branch information
acaloiaro committed Jan 25, 2025
1 parent 245711b commit 553ff5e
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 24 deletions.
64 changes: 42 additions & 22 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@ const (
AND status NOT IN ('processed')
FOR UPDATE SKIP LOCKED
LIMIT 1`
PendingJobIDQuery = `SELECT id
PendingJobIDsQuery = `SELECT id
FROM neoq_jobs
WHERE queue = $1
AND status NOT IN ('processed')
AND run_after <= NOW()
ORDER BY created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1`
LIMIT 100`
FutureJobQuery = `SELECT id,fingerprint,queue,status,deadline,payload,retries,max_retries,run_after,ran_at,created_at,error
FROM neoq_jobs
WHERE queue = $1
Expand Down Expand Up @@ -674,8 +675,7 @@ func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) {
return fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, h.Queue)
}

pendingJobsChan := p.pendingJobs(ctx, h.Queue) // process overdue jobs *at startup*

pendingJobsChan := p.processPendingJobs(ctx, h.Queue)
// wait for the listener to connect and be ready to listen
for q := range p.readyQueues {
if q == h.Queue {
Expand All @@ -688,7 +688,7 @@ func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) {
p.readyQueues <- q
}

// process all future jobs and retries
// process all future jobs
go func() { p.scheduleFutureJobs(ctx, h.Queue) }()

for i := 0; i < h.Concurrency; i++ {
Expand Down Expand Up @@ -754,7 +754,7 @@ func (p *PgBackend) initFutureJobs(ctx context.Context, queue string) (err error
return
}

// scheduleFutureJobs announces future jobs using NOTIFY on an interval
// scheduleFutureJobs monitors the future job list for upcoming jobs and announces them to be processed by available workers
func (p *PgBackend) scheduleFutureJobs(ctx context.Context, queue string) {
err := p.initFutureJobs(ctx, queue)
if err != nil {
Expand All @@ -772,8 +772,8 @@ func (p *PgBackend) scheduleFutureJobs(ctx context.Context, queue string) {
if timeUntillRunAfter <= p.config.FutureJobWindow {
delete(p.futureJobs, jobID)
go func(jid string, j *jobs.Job) {
scheduleCh := time.After(timeUntillRunAfter)
<-scheduleCh
jobDue := time.After(timeUntillRunAfter)
<-jobDue
p.announceJob(ctx, j.Queue, jid)
}(jobID, job)
}
Expand Down Expand Up @@ -823,9 +823,11 @@ func (p *PgBackend) announceJob(ctx context.Context, queue, jobID string) {
}
}

func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan *pgconn.Notification) {
jobsCh = make(chan *pgconn.Notification)

// processPendingJobs starts a goroutine that periodically fetches pendings jobs and announces them to workers.
//
// Past due jobs are fetched on the interval [neoq.Config.JobCheckInterval]
// nolint: cyclop
func (p *PgBackend) processPendingJobs(ctx context.Context, queue string) (jobsCh chan *pgconn.Notification) {
conn, err := p.acquire(ctx)
if err != nil {
p.logger.Error(
Expand All @@ -836,28 +838,37 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan
return
}

// check for new past-due jobs on an interval
ticker := time.NewTicker(p.config.JobCheckInterval)
go func(ctx context.Context) {
defer conn.Release()

// check for pending jobs on an interval until the context is canceled
for {
jobID, err := p.getPendingJobID(ctx, conn, queue)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) || errors.Is(err, context.Canceled) {
break
}
jobIDs, err := p.getPendingJobIDs(ctx, conn, queue)
if errors.Is(err, context.Canceled) {
return
}

if err != nil && !errors.Is(err, pgx.ErrNoRows) {
p.logger.Error(
"failed to fetch pending job",
slog.String("queue", queue),
slog.Any("error", err),
slog.String("job_id", jobID),
)
} else {
jobsCh <- &pgconn.Notification{Channel: queue, Payload: jobID}
}

for _, jid := range jobIDs {
jobsCh <- &pgconn.Notification{Channel: queue, Payload: jid}
}
select {
case <-ctx.Done():
return
case <-ticker.C:
}
}
}(ctx)

jobsCh = make(chan *pgconn.Notification)
return jobsCh
}

Expand Down Expand Up @@ -1030,8 +1041,17 @@ func (p *PgBackend) getJob(ctx context.Context, tx pgx.Tx, jobID string) (job *j
return
}

func (p *PgBackend) getPendingJobID(ctx context.Context, conn *pgxpool.Conn, queue string) (jobID string, err error) {
err = conn.QueryRow(ctx, PendingJobIDQuery, queue).Scan(&jobID)
func (p *PgBackend) getPendingJobIDs(ctx context.Context, conn *pgxpool.Conn, queue string) (jobIDs []string, err error) {
var rows pgx.Rows
var jid int64
rows, err = conn.Query(ctx, PendingJobIDsQuery, queue)
for rows.Next() {
err = rows.Scan(&jid)
if err != nil {
return
}
jobIDs = append(jobIDs, fmt.Sprint(jid))
}
return
}

Expand Down
7 changes: 6 additions & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
{
packages = with pkgs; [
automake
gcc
go_1_21
gomod2nix.legacyPackages.${system}.gomod2nix
gotools
Expand All @@ -41,7 +42,7 @@
];

enterShell = ''
export TEST_DATABASE_URL="postgres://postgres:postgres@localhost:${toString postgresPort}/neoq?sslmode=disable&pool_max_conns=100"
export TEST_DATABASE_URL="postgres://postgres:postgres@localhost:${toString postgresPort}/neoq?sslmode=disable&pool_max_conns=250"
export TEST_REDIS_URL=localhost:${toString redisPort}
export REDIS_PASSWORD=
'';
Expand All @@ -64,6 +65,10 @@
CREATE USER postgres WITH PASSWORD 'postgres' SUPERUSER;
CREATE DATABASE neoq;
'';
settings = {
max_connections = 250;
log_statement = "all";
};
};

redis = {
Expand Down
2 changes: 1 addition & 1 deletion neoq.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Config struct {
BackendAuthPassword string // password with which to authenticate to the backend
BackendConcurrency int // total number of backend processes available to process jobs
ConnectionString string // a string containing connection details for the backend
JobCheckInterval time.Duration // the interval of time between checking for new future/retry jobs
JobCheckInterval time.Duration // the interval of time between checking for new future/past-due jobs
FutureJobWindow time.Duration // time duration between current time and job.RunAfter that future jobs get scheduled
IdleTransactionTimeout int // number of milliseconds PgBackend transaction may idle before the connection is killed
ShutdownTimeout time.Duration // duration to wait for jobs to finish during shutdown
Expand Down

0 comments on commit 553ff5e

Please sign in to comment.