Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(postgres): jobs may never get processed if no workers hear the initial announcement #138

Merged
merged 1 commit into from
Jan 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 42 additions & 22 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@ const (
AND status NOT IN ('processed')
FOR UPDATE SKIP LOCKED
LIMIT 1`
PendingJobIDQuery = `SELECT id
PendingJobIDsQuery = `SELECT id
FROM neoq_jobs
WHERE queue = $1
AND status NOT IN ('processed')
AND run_after <= NOW()
ORDER BY created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1`
LIMIT 100`
FutureJobQuery = `SELECT id,fingerprint,queue,status,deadline,payload,retries,max_retries,run_after,ran_at,created_at,error
FROM neoq_jobs
WHERE queue = $1
Expand Down Expand Up @@ -674,8 +675,7 @@ func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) {
return fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, h.Queue)
}

pendingJobsChan := p.pendingJobs(ctx, h.Queue) // process overdue jobs *at startup*

pendingJobsChan := p.processPendingJobs(ctx, h.Queue)
// wait for the listener to connect and be ready to listen
for q := range p.readyQueues {
if q == h.Queue {
Expand All @@ -688,7 +688,7 @@ func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) {
p.readyQueues <- q
}

// process all future jobs and retries
// process all future jobs
go func() { p.scheduleFutureJobs(ctx, h.Queue) }()

for i := 0; i < h.Concurrency; i++ {
Expand Down Expand Up @@ -754,7 +754,7 @@ func (p *PgBackend) initFutureJobs(ctx context.Context, queue string) (err error
return
}

// scheduleFutureJobs announces future jobs using NOTIFY on an interval
// scheduleFutureJobs monitors the future job list for upcoming jobs and announces them to be processed by available workers
func (p *PgBackend) scheduleFutureJobs(ctx context.Context, queue string) {
err := p.initFutureJobs(ctx, queue)
if err != nil {
Expand All @@ -772,8 +772,8 @@ func (p *PgBackend) scheduleFutureJobs(ctx context.Context, queue string) {
if timeUntillRunAfter <= p.config.FutureJobWindow {
delete(p.futureJobs, jobID)
go func(jid string, j *jobs.Job) {
scheduleCh := time.After(timeUntillRunAfter)
<-scheduleCh
jobDue := time.After(timeUntillRunAfter)
<-jobDue
p.announceJob(ctx, j.Queue, jid)
}(jobID, job)
}
Expand Down Expand Up @@ -823,9 +823,11 @@ func (p *PgBackend) announceJob(ctx context.Context, queue, jobID string) {
}
}

func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan *pgconn.Notification) {
jobsCh = make(chan *pgconn.Notification)

// processPendingJobs starts a goroutine that periodically fetches pendings jobs and announces them to workers.
//
// Past due jobs are fetched on the interval [neoq.Config.JobCheckInterval]
// nolint: cyclop
func (p *PgBackend) processPendingJobs(ctx context.Context, queue string) (jobsCh chan *pgconn.Notification) {
conn, err := p.acquire(ctx)
if err != nil {
p.logger.Error(
Expand All @@ -836,28 +838,37 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan
return
}

// check for new past-due jobs on an interval
ticker := time.NewTicker(p.config.JobCheckInterval)
go func(ctx context.Context) {
defer conn.Release()

// check for pending jobs on an interval until the context is canceled
for {
jobID, err := p.getPendingJobID(ctx, conn, queue)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) || errors.Is(err, context.Canceled) {
break
}
jobIDs, err := p.getPendingJobIDs(ctx, conn, queue)
if errors.Is(err, context.Canceled) {
return
}

if err != nil && !errors.Is(err, pgx.ErrNoRows) {
p.logger.Error(
"failed to fetch pending job",
slog.String("queue", queue),
slog.Any("error", err),
slog.String("job_id", jobID),
)
} else {
jobsCh <- &pgconn.Notification{Channel: queue, Payload: jobID}
}

for _, jid := range jobIDs {
jobsCh <- &pgconn.Notification{Channel: queue, Payload: jid}
}
select {
case <-ctx.Done():
return
case <-ticker.C:
}
}
}(ctx)

jobsCh = make(chan *pgconn.Notification)
return jobsCh
}

Expand Down Expand Up @@ -1030,8 +1041,17 @@ func (p *PgBackend) getJob(ctx context.Context, tx pgx.Tx, jobID string) (job *j
return
}

func (p *PgBackend) getPendingJobID(ctx context.Context, conn *pgxpool.Conn, queue string) (jobID string, err error) {
err = conn.QueryRow(ctx, PendingJobIDQuery, queue).Scan(&jobID)
func (p *PgBackend) getPendingJobIDs(ctx context.Context, conn *pgxpool.Conn, queue string) (jobIDs []string, err error) {
var rows pgx.Rows
var jid int64
rows, err = conn.Query(ctx, PendingJobIDsQuery, queue)
for rows.Next() {
err = rows.Scan(&jid)
if err != nil {
return
}
jobIDs = append(jobIDs, fmt.Sprint(jid))
}
return
}

Expand Down
7 changes: 6 additions & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
{
packages = with pkgs; [
automake
gcc
go_1_21
gomod2nix.legacyPackages.${system}.gomod2nix
gotools
Expand All @@ -41,7 +42,7 @@
];

enterShell = ''
export TEST_DATABASE_URL="postgres://postgres:postgres@localhost:${toString postgresPort}/neoq?sslmode=disable&pool_max_conns=100"
export TEST_DATABASE_URL="postgres://postgres:postgres@localhost:${toString postgresPort}/neoq?sslmode=disable&pool_max_conns=250"
export TEST_REDIS_URL=localhost:${toString redisPort}
export REDIS_PASSWORD=
'';
Expand All @@ -64,6 +65,10 @@
CREATE USER postgres WITH PASSWORD 'postgres' SUPERUSER;
CREATE DATABASE neoq;
'';
settings = {
max_connections = 250;
log_statement = "all";
};
};

redis = {
Expand Down
2 changes: 1 addition & 1 deletion neoq.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Config struct {
BackendAuthPassword string // password with which to authenticate to the backend
BackendConcurrency int // total number of backend processes available to process jobs
ConnectionString string // a string containing connection details for the backend
JobCheckInterval time.Duration // the interval of time between checking for new future/retry jobs
JobCheckInterval time.Duration // the interval of time between checking for new future/past-due jobs
FutureJobWindow time.Duration // time duration between current time and job.RunAfter that future jobs get scheduled
IdleTransactionTimeout int // number of milliseconds PgBackend transaction may idle before the connection is killed
ShutdownTimeout time.Duration // duration to wait for jobs to finish during shutdown
Expand Down
Loading