Skip to content

Commit

Permalink
feat: make pendings job processing resilient to connection loss
Browse files Browse the repository at this point in the history
Instead of passing a single connection into the pending job processor,
use the connection pool to pull a new connection on every loop, releasing
it after every use.
  • Loading branch information
acaloiaro committed Jan 27, 2025
1 parent 17c86fa commit 08ccf0f
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 16 deletions.
39 changes: 27 additions & 12 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,7 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err
go p.listenerManager(ctx)

// monitor queues for pending jobs, so neoq is resilient to LISTEN disconnects and reconnections
pendingJobsConn, err := p.pool.Acquire(ctx)
if err != nil {
return nil, fmt.Errorf("unable to get a database connection: %w", err)
}
p.processPendingJobs(ctx, pendingJobsConn)
p.processPendingJobs(ctx)

p.listenConnDown <- true

Expand Down Expand Up @@ -547,7 +543,17 @@ func (p *PgBackend) StartCron(ctx context.Context, cronSpec string, h handler.Ha

// SetLogger sets this backend's logger
func (p *PgBackend) SetLogger(logger logging.Logger) {
p.mu.Lock()
p.logger = logger
p.mu.Unlock()
}

// Logger gets this backend's logger
func (p *PgBackend) Logger() (l logging.Logger) {
p.mu.Lock()
l = p.logger
p.mu.Unlock()
return
}

// Shutdown shuts this backend down
Expand Down Expand Up @@ -831,20 +837,29 @@ func (p *PgBackend) announceJob(ctx context.Context, queue, jobID string) {
//
// Past due jobs are fetched on the interval [neoq.DefaultPendingJobFetchInterval]
// nolint: cyclop
func (p *PgBackend) processPendingJobs(ctx context.Context, conn *pgxpool.Conn) {
func (p *PgBackend) processPendingJobs(ctx context.Context) {
go func(ctx context.Context) {
defer conn.Release()
var err error
var conn *pgxpool.Conn
var pendingJobs []*jobs.Job
ticker := time.NewTicker(neoq.DefaultPendingJobFetchInterval)

// check for pending jobs on an interval until the context is canceled
for {
pendingJobs, err := p.getPendingJobs(ctx, conn)
conn, err = p.acquire(ctx)
if err != nil {
p.Logger().Error("[pending_jobs] unable to get database connection", slog.Any("error", err))
<-ticker.C
continue
}

pendingJobs, err = p.getPendingJobs(ctx, conn)
conn.Release()
if errors.Is(err, context.Canceled) {
return
}

if err != nil && !errors.Is(err, pgx.ErrNoRows) {
p.logger.Error(
p.Logger().Error(
"failed to fetch pending jobs",
slog.Any("error", err),
)
Expand Down Expand Up @@ -1049,7 +1064,7 @@ func (p *PgBackend) acquire(ctx context.Context) (conn *pgxpool.Conn, err error)
ctx, cancelFunc := context.WithDeadline(ctx, time.Now().Add(p.config.PGConnectionTimeout))
defer cancelFunc()

p.logger.Debug("acquiring connection with timeout", slog.Any("timeout", p.config.PGConnectionTimeout))
p.Logger().Debug("acquiring connection with timeout", slog.Any("timeout", p.config.PGConnectionTimeout))

connCh := make(chan *pgxpool.Conn)
errCh := make(chan error)
Expand All @@ -1069,7 +1084,7 @@ func (p *PgBackend) acquire(ctx context.Context) (conn *pgxpool.Conn, err error)
case err := <-errCh:
return nil, err
case <-ctx.Done():
p.logger.Error("exceeded timeout acquiring a connection from the pool", slog.Any("timeout", p.config.PGConnectionTimeout))
p.Logger().Error("exceeded timeout acquiring a connection from the pool", slog.Any("timeout", p.config.PGConnectionTimeout))
cancelFunc()
err = ErrExceededConnectionPoolTimeout
return
Expand Down
9 changes: 5 additions & 4 deletions backends/postgres/postgres_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1144,7 +1144,7 @@ func TestProcessPendingJobs(t *testing.T) {

ctx := context.Background()

// INSERTing jobs into the the job queue before noeq is listening on any queues ensures that the new job is not announced, and when
// INSERTing jobs into the job queue before noeq is listening on any queues ensures that the new job is not announced, and when
// neoq _is_ started, that there is a pending jobs waiting to be processed
payload := map[string]interface{}{
"message": "hello world",
Expand All @@ -1154,7 +1154,7 @@ func TestProcessPendingJobs(t *testing.T) {
VALUES ($1, $2, $3, $4, $5, $6) RETURNING id`,
queue, "dummy", payload, time.Now().UTC(), nil, 1).Scan(&pendingJobID)
if err != nil {
err = fmt.Errorf("unable to add job to queue: %w", err)
t.Error(fmt.Errorf("unable to add job to queue: %w", err))
return
}

Expand All @@ -1174,8 +1174,9 @@ func TestProcessPendingJobs(t *testing.T) {
t.Error(err)
}

var status string
go func() {
var err error
var status string
// ensure job has failed/has the correct status
for {
err = conn.
Expand All @@ -1200,6 +1201,6 @@ func TestProcessPendingJobs(t *testing.T) {
case <-done:
}
if err != nil {
t.Errorf("job should have resulted in a status of 'processed', but its status is %s", status)
t.Errorf("job should have resulted in a status of 'processed', but did not")
}
}

0 comments on commit 08ccf0f

Please sign in to comment.