Skip to content
/ neoq Public

Queue-agnostic background job library for Go, with a pleasant API and powerful features.

License

Notifications You must be signed in to change notification settings

acaloiaro/neoq

Folders and files

NameName
Last commit message
Last commit date

Latest commit

934594a · Apr 15, 2023

History

87 Commits
Apr 15, 2023
Apr 15, 2023
Apr 15, 2023
Apr 15, 2023
Apr 15, 2023
Mar 21, 2023
Apr 15, 2023
Mar 1, 2023
Mar 1, 2023
Apr 15, 2023
Apr 15, 2023
Feb 19, 2023
Mar 1, 2023
Mar 7, 2023
Feb 14, 2023
Mar 18, 2023
Apr 15, 2023
Apr 15, 2023
Apr 15, 2023
Apr 15, 2023
Apr 15, 2023
Mar 17, 2023
Apr 15, 2023

Repository files navigation

Neoq

Background job processing for Go

Go Reference Gitter chat

Usage

go get -u github.com/acaloiaro/neoq

About

Neoq is a queue-agnostic background job framework for Go.

Queue-agnostic means that whether you're using an in-memory queue for developing and testing, or Postgres or Redis queue in production -- your job processing code doesn't change. Job handlers are agnostic to the queue providing jobs. It also means that you can mix queue types within a single application. If you have ephemeral or periodic tasks, you may want to process them in an in-memory queue, and use Postgres or Redis queues for jobs requiring queue durability.

Neoq aims to be simple, reliable, easy to integrate, and demand a minimal infrastructure footprint by providing queue backends that match your existing tech stack.

What it does

  • Multiple Backends: In-memory, Postgres, Redis, or user-supplied custom backends.
  • Retries: Jobs may be retried a configurable number of times with exponential backoff and jitter to prevent thundering herds
  • Job uniqueness: jobs are fingerprinted based on their payload and status to prevent job duplication (multiple jobs with the same payload are not re-queued)
  • Job Timeouts: Queue handlers can be configured with per-job timeouts with millisecond accuracy
  • Periodic Jobs: Jobs can be scheduled periodically using standard cron syntax
  • Future Jobs: Jobs can be scheduled in the future
  • Concurrency: Concurrency is configurable for every queue

Getting Started

Getting started is as simple as declaring queue handlers and adding jobs. You can create multiple neoq instances with different backends to meet your application's needs. E.g. an in-memory backend instance for ephemeral jobs and a Postgres backend instance for queue durability between application restarts.

Additional documentation can be found in the wiki: https://github.com/acaloiaro/neoq/wiki

Error handling in this section is excluded for simplicity.

Add queue handlers

Queue handlers listen for Jobs on queues. Jobs may consist of any payload that is JSON-serializable.

Queue Handlers are simple Go functions that accept a Context parameter.

Example: Add a listener on the hello_world queue using the default in-memory backend

ctx := context.Background()
nq, _ := neoq.New(ctx)
nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) {
  j, _ := jobs.FromContext(ctx)
  log.Println("got job id:", j.ID, "messsage:", j.Payload["message"])
  return
}))

Enqueue jobs

Enqueuing adds jobs to the specified queue to be processed asynchronously.

Example: Add a "Hello World" job to the hello_world queue using the default in-memory backend.

ctx := context.Background()
nq, _ := neoq.New(ctx)
nq.Enqueue(ctx, &jobs.Job{
  Queue: "hello_world",
  Payload: map[string]interface{}{
    "message": "hello world",
  },
})

Redis

Example: Process jobs on the "hello_world" queue and add a job to it using the redis backend

ctx := context.Background()
nq, _ := neoq.New(ctx,
  neoq.WithBackend(redis.Backend),
  redis.WithAddr("localhost:6379"),
  redis.WithPassword(""))

nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) {
  j, _ := jobs.FromContext(ctx)
  log.Println("got job id:", j.ID, "messsage:", j.Payload["message"])
  return
}))

nq.Enqueue(ctx, &jobs.Job{
  Queue: "hello_world",
  Payload: map[string]interface{}{
    "message": "hello world",
  },
})

Postgres

Example: Process jobs on the "hello_world" queue and add a job to it using the postgres backend

ctx := context.Background()
nq, _ := neoq.New(ctx,
  neoq.WithBackend(postgres.Backend),
  postgres.WithConnectionString("postgres://postgres:postgres@127.0.0.1:5432/neoq"),
)

nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) {
  j, _ := jobs.FromContext(ctx)
  log.Println("got job id:", j.ID, "messsage:", j.Payload["message"])
  return
}))

nq.Enqueue(ctx, &jobs.Job{
  Queue: "hello_world",
  Payload: map[string]interface{}{
    "message": "hello world",
  },
})

Example Code

Additional example integration code can be found at https://github.com/acaloiaro/neoq/tree/main/examples

Status

This project is currently in alpha. Future releases may change the API.