Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix execessive connection counts for monitoring pending jobs #139

Merged
merged 1 commit into from
Jan 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 29 additions & 42 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,9 @@ const (
AND status NOT IN ('processed')
FOR UPDATE SKIP LOCKED
LIMIT 1`
PendingJobIDsQuery = `SELECT id
PendingJobsQuery = `SELECT id,fingerprint,queue,status,deadline,payload,retries,max_retries,run_after,ran_at,created_at,error
FROM neoq_jobs
WHERE queue = $1
AND status NOT IN ('processed')
WHERE status NOT IN ('processed')
AND run_after <= NOW()
ORDER BY created_at ASC
FOR UPDATE SKIP LOCKED
Expand Down Expand Up @@ -191,6 +190,13 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err
// monitor handlers for changes and LISTEN when new queues are added
go p.listenerManager(ctx)

// monitor queues for pending jobs, so neoq is resilient to LISTEN disconnects and reconnections
pendingJobsConn, err := p.pool.Acquire(ctx)
if err != nil {
return nil, fmt.Errorf("unable to get a database connection: %w", err)
}
p.processPendingJobs(ctx, pendingJobsConn)

p.listenConnDown <- true

p.cron.Start()
Expand All @@ -200,7 +206,8 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err
return pb, nil
}

// listenerManager manages the LISTENer connection and adding queue to it
// listenerManager manages the LISTENer connection and add queues to it
// nolint: cyclop
func (p *PgBackend) listenerManager(ctx context.Context) {
var err error
for {
Expand Down Expand Up @@ -675,7 +682,6 @@ func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) {
return fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, h.Queue)
}

pendingJobsChan := p.processPendingJobs(ctx, h.Queue)
// wait for the listener to connect and be ready to listen
for q := range p.readyQueues {
if q == h.Queue {
Expand All @@ -700,8 +706,6 @@ func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) {
select {
case n = <-listenJobChan:
err = p.handleJob(ctx, n.Payload)
case n = <-pendingJobsChan:
err = p.handleJob(ctx, n.Payload)
case <-ctx.Done():
return
case <-errCh:
Expand Down Expand Up @@ -825,40 +829,29 @@ func (p *PgBackend) announceJob(ctx context.Context, queue, jobID string) {

// processPendingJobs starts a goroutine that periodically fetches pendings jobs and announces them to workers.
//
// Past due jobs are fetched on the interval [neoq.Config.JobCheckInterval]
// Past due jobs are fetched on the interval [neoq.DefaultPendingJobFetchInterval]
// nolint: cyclop
func (p *PgBackend) processPendingJobs(ctx context.Context, queue string) (jobsCh chan *pgconn.Notification) {
conn, err := p.acquire(ctx)
if err != nil {
p.logger.Error(
"failed to acquire database connection to listen for pending queue items",
slog.String("queue", queue),
slog.Any("error", err),
)
return
}

// check for new past-due jobs on an interval
ticker := time.NewTicker(p.config.JobCheckInterval)
func (p *PgBackend) processPendingJobs(ctx context.Context, conn *pgxpool.Conn) {
go func(ctx context.Context) {
defer conn.Release()
ticker := time.NewTicker(neoq.DefaultPendingJobFetchInterval)

// check for pending jobs on an interval until the context is canceled
for {
jobIDs, err := p.getPendingJobIDs(ctx, conn, queue)
pendingJobs, err := p.getPendingJobs(ctx, conn)
if errors.Is(err, context.Canceled) {
return
}

if err != nil && !errors.Is(err, pgx.ErrNoRows) {
p.logger.Error(
"failed to fetch pending job",
slog.String("queue", queue),
"failed to fetch pending jobs",
slog.Any("error", err),
)
}

for _, jid := range jobIDs {
jobsCh <- &pgconn.Notification{Channel: queue, Payload: jid}
for _, job := range pendingJobs {
p.announceJob(ctx, job.Queue, fmt.Sprint(job.ID))
}
select {
case <-ctx.Done():
Expand All @@ -867,9 +860,6 @@ func (p *PgBackend) processPendingJobs(ctx context.Context, queue string) (jobsC
}
}
}(ctx)

jobsCh = make(chan *pgconn.Notification)
return jobsCh
}

// handleJob is the workhorse of Neoq
Expand Down Expand Up @@ -955,9 +945,6 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string) (err error) {
}

// listen uses Postgres LISTEN to listen for jobs on a queue
// TODO: There is currently no handling of listener disconnects in PgBackend.
// This will lead to jobs not getting processed until the worker is restarted.
// Implement disconnect handling.
func (p *PgBackend) listen(ctx context.Context) (c chan *pgconn.Notification, errCh chan error) {
c = make(chan *pgconn.Notification)
errCh = make(chan error)
Expand Down Expand Up @@ -1041,17 +1028,17 @@ func (p *PgBackend) getJob(ctx context.Context, tx pgx.Tx, jobID string) (job *j
return
}

func (p *PgBackend) getPendingJobIDs(ctx context.Context, conn *pgxpool.Conn, queue string) (jobIDs []string, err error) {
var rows pgx.Rows
var jid int64
rows, err = conn.Query(ctx, PendingJobIDsQuery, queue)
for rows.Next() {
err = rows.Scan(&jid)
if err != nil {
return
}
jobIDs = append(jobIDs, fmt.Sprint(jid))
func (p *PgBackend) getPendingJobs(ctx context.Context, conn *pgxpool.Conn) (pendingJobs []*jobs.Job, err error) {
rows, err := conn.Query(ctx, PendingJobsQuery)
if err != nil {
return
}

pendingJobs, err = pgx.CollectRows(rows, pgx.RowToAddrOfStructByName[jobs.Job])
if err != nil {
return
}

return
}

Expand Down
71 changes: 71 additions & 0 deletions backends/postgres/postgres_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1132,3 +1132,74 @@ func TestHandlerRecoveryCallback(t *testing.T) {
t.Error(err)
}
}

// TestProcessPendingJobs tests that unanounced jobs with a run_after before the current timestamp get run periodically
// This ensures that when LISTENER connections fails, that jobs that would have been announced still get processed eventually
func TestProcessPendingJobs(t *testing.T) {
connString, conn := prepareAndCleanupDB(t)
const queue = "testing"
timeoutTimer := time.After(5 * time.Second)
done := make(chan bool)
defer close(done)

ctx := context.Background()

// INSERTing jobs into the the job queue before noeq is listening on any queues ensures that the new job is not announced, and when
// neoq _is_ started, that there is a pending jobs waiting to be processed
payload := map[string]interface{}{
"message": "hello world",
}
var pendingJobID string
err := conn.QueryRow(ctx, `INSERT INTO neoq_jobs(queue, fingerprint, payload, run_after, deadline, max_retries)
VALUES ($1, $2, $3, $4, $5, $6) RETURNING id`,
queue, "dummy", payload, time.Now().UTC(), nil, 1).Scan(&pendingJobID)
if err != nil {
err = fmt.Errorf("unable to add job to queue: %w", err)
return
}

nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(connString))
if err != nil {
t.Fatal(err)
}
defer nq.Shutdown(ctx)

h := handler.New(queue, func(_ context.Context) (err error) {
return
})

// Start ensures that pending jobs will be processed
err = nq.Start(ctx, h)
if err != nil {
t.Error(err)
}

var status string
go func() {
// ensure job has failed/has the correct status
for {
err = conn.
QueryRow(context.Background(), "SELECT status FROM neoq_jobs WHERE id = $1", pendingJobID).
Scan(&status)
if err != nil {
break
}

if status != internal.JobStatusNew {
done <- true
break
}

time.Sleep(50 * time.Millisecond)
}
}()

select {
case <-timeoutTimer:
err = jobs.ErrJobTimeout
case <-done:
}
if err != nil {
t.Errorf("job should have resulted in a status of 'processed', but its status is %s", status)
}
}
16 changes: 8 additions & 8 deletions gomod2nix.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,20 @@ schema = 3
version = "v1.24.0"
hash = "sha256-yLzjFbMWnc5b033gcPLGP0KY1xWPJ3sjnUG/RndmC3o="
[mod."golang.org/x/crypto"]
version = "v0.17.0"
hash = "sha256-/vzBaeD/Ymyc7cpjBvSfJfuZ57zWa9LOaZM7b33eIx0="
version = "v0.31.0"
hash = "sha256-ZBjoG7ZOuTEmjaXPP9txAvjAjC46DeaLs0zrNzi8EQw="
[mod."golang.org/x/exp"]
version = "v0.0.0-20230713183714-613f0c0eb8a1"
hash = "sha256-VLE9CCOYpTdyBWaQ1YxXpGOBS74wpIR3JJ+JVzNyEkQ="
[mod."golang.org/x/sync"]
version = "v0.2.0"
hash = "sha256-hKk9zsy2aXY7R0qGFZhGOVvk5qD17f6KHEuK4rGpTsg="
version = "v0.10.0"
hash = "sha256-HWruKClrdoBKVdxKCyoazxeQV4dIYLdkHekQvx275/o="
[mod."golang.org/x/sys"]
version = "v0.15.0"
hash = "sha256-n7TlABF6179RzGq3gctPDKDPRtDfnwPdjNCMm8ps2KY="
version = "v0.28.0"
hash = "sha256-kzSlDo5FKsQU9cLefIt2dueGUfz9XuEW+mGSGlPATGc="
[mod."golang.org/x/text"]
version = "v0.14.0"
hash = "sha256-yh3B0tom1RfzQBf1RNmfdNWF1PtiqxV41jW1GVS6JAg="
version = "v0.21.0"
hash = "sha256-QaMwddBRnoS2mv9Y86eVC2x2wx/GZ7kr2zAJvwDeCPc="
[mod."golang.org/x/time"]
version = "v0.0.0-20190308202827-9d24e82272b4"
hash = "sha256-azbksMSLQf1CK0jF2i+ESjFenMDR88xRW1tov0metrg="
Expand Down
4 changes: 4 additions & 0 deletions neoq.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import (

const (
DefaultIdleTxTimeout = 30000
// The duration of time between checking if any queues have pending jobs
// It's necessary to check for pending jobs periodically because if the LISTENer connection fails at any point, there is a period
// of time when new jobs get announced by the new job trigger announcement, but no listeners are LISTENing for work.
DefaultPendingJobFetchInterval = 60 * time.Second
// the window of time between time.Now() and when a job's RunAfter comes due that neoq will schedule a goroutine to
// schdule the job for execution.
// E.g. right now is 16:00 and a job's RunAfter is 16:30 of the same date. This job will get a dedicated goroutine
Expand Down
Loading