Skip to content

Commit

Permalink
Fix postgres Handler error handling (#47)
Browse files Browse the repository at this point in the history
  • Loading branch information
acaloiaro authored Apr 26, 2023
1 parent 1c92db5 commit 71dd54b
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 4 deletions.
4 changes: 0 additions & 4 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,10 +786,6 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handl

// execute the queue handler of this job
jobErr := handler.Exec(ctx, h)
if jobErr != nil {
err = fmt.Errorf("error executing handler: %w", jobErr)
return err
}

err = p.updateJob(ctx, jobErr)
if err != nil {
Expand Down
110 changes: 110 additions & 0 deletions backends/postgres/postgres_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"errors"
"fmt"
"log"
"os"
"strings"
"testing"
"time"

Expand All @@ -13,10 +15,38 @@ import (
"github.com/acaloiaro/neoq/handler"
"github.com/acaloiaro/neoq/internal"
"github.com/acaloiaro/neoq/jobs"
"github.com/acaloiaro/neoq/testutils"
"github.com/jackc/pgx/v5"
)

var errPeriodicTimeout = errors.New("timed out waiting for periodic job")

func flushDB() {
dbURL := os.Getenv("TEST_DATABASE_URL")
if dbURL == "" {
return
}

conn, err := pgx.Connect(context.Background(), dbURL)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)
os.Exit(1)
}
defer conn.Close(context.Background())

_, err = conn.Query(context.Background(), "DELETE FROM neoq_jobs") // nolint: gocritic
if err != nil {
fmt.Fprintf(os.Stderr, "'neoq_jobs' table flush failed: %v\n", err)
os.Exit(1) // nolint: gocritic
}
}

func TestMain(m *testing.M) {
flushDB()
code := m.Run()
os.Exit(code)
}

// TestBasicJobProcessing tests that the postgres backend is able to process the most basic jobs with the
// most basic configuration.
func TestBasicJobProcessing(t *testing.T) {
Expand Down Expand Up @@ -68,6 +98,10 @@ func TestBasicJobProcessing(t *testing.T) {
if err != nil {
t.Error(err)
}

t.Cleanup(func() {
flushDB()
})
}

// TestBasicJobMultipleQueue tests that the postgres backend is able to process jobs on multiple queues
Expand Down Expand Up @@ -149,6 +183,10 @@ results_loop:
if err != nil {
t.Error(err)
}

t.Cleanup(func() {
flushDB()
})
}

func TestCron(t *testing.T) {
Expand Down Expand Up @@ -195,4 +233,76 @@ func TestCron(t *testing.T) {
if err != nil {
t.Error(err)
}

t.Cleanup(func() {
flushDB()
})
}

// TestBasicJobProcessingWithErrors tests that the postgres backend is able to update the status of jobs that fail
func TestBasicJobProcessingWithErrors(t *testing.T) {
const queue = "testing"
done := make(chan bool)
defer close(done)

var timeoutTimer = time.After(5 * time.Second)

var connString = os.Getenv("TEST_DATABASE_URL")
if connString == "" {
t.Skip("Skipping: TEST_DATABASE_URL not set")
return
}

ctx := context.TODO()
nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(connString))
if err != nil {
t.Fatal(err)
}
defer nq.Shutdown(ctx)

h := handler.New(func(_ context.Context) (err error) {
err = errors.New("something bad happened") // nolint: goerr113
done <- true
return
})

err = nq.Start(ctx, queue, h)
if err != nil {
t.Error(err)
}

buf := &strings.Builder{}
nq.SetLogger(testutils.TestLogger{L: log.New(buf, "", 0), Done: done})

jid, e := nq.Enqueue(ctx, &jobs.Job{
Queue: queue,
Payload: map[string]interface{}{
"message": "hello world",
},
})
if e != nil || jid == jobs.DuplicateJobID {
t.Error(e)
}

select {
case <-timeoutTimer:
err = jobs.ErrJobTimeout
case <-done:
// the error is returned after the done channel receives its message, so we need to give time for the logger to
// have logged the error that was returned by the handler
time.Sleep(100 * time.Millisecond)
expectedLogMsg := "job failed to process: something bad happened"
actualLogMsg := strings.Trim(buf.String(), "\n")
if strings.Contains(actualLogMsg, expectedLogMsg) {
t.Error(fmt.Errorf("'%s' NOT CONTAINS '%s'", actualLogMsg, expectedLogMsg)) //nolint:all
}
}

if err != nil {
t.Error(err)
}

t.Cleanup(func() {
flushDB()
})
}

0 comments on commit 71dd54b

Please sign in to comment.