Skip to content

Commit

Permalink
feat: change name (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
bxcodec authored Jun 9, 2024
1 parent a800af2 commit a9c6351
Show file tree
Hide file tree
Showing 15 changed files with 67 additions and 66 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# goqu
GoQu - Golang Queue Wrapper for all queue platforms
# goqueue

GoQueue - Golang Queue Wrapper for all queue platforms
2 changes: 1 addition & 1 deletion consumer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package goqu
package goqueue

import "context"

Expand Down
6 changes: 3 additions & 3 deletions consumer/option.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package consumer

import "github.com/bxcodec/goqu"
import "github.com/bxcodec/goqueue"

// Option represents the configuration options for the consumer.
type Option struct {
Expand All @@ -9,7 +9,7 @@ type Option struct {
// QueueName specifies the name of the queue to consume messages from.
QueueName string
// Middlewares is a list of middleware functions to be applied to the inbound message handler.
Middlewares []goqu.InboundMessageHandlerMiddlewareFunc
Middlewares []goqueue.InboundMessageHandlerMiddlewareFunc
}

// OptionFunc is a function type that takes an `opt` parameter of type `*Option`.
Expand All @@ -35,7 +35,7 @@ func WithQueueName(name string) OptionFunc {
// WithMiddlewares is an OptionFunc that sets the provided middlewares for the consumer.
// Middlewares are used to process inbound messages before they are handled by the consumer.
// The middlewares are applied in the order they are provided.
func WithMiddlewares(middlewares ...goqu.InboundMessageHandlerMiddlewareFunc) OptionFunc {
func WithMiddlewares(middlewares ...goqueue.InboundMessageHandlerMiddlewareFunc) OptionFunc {
return func(opt *Option) {
opt.Middlewares = middlewares
}
Expand Down
26 changes: 13 additions & 13 deletions consumer/rabbitmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
amqp "github.com/rabbitmq/amqp091-go"
"github.com/sirupsen/logrus"

"github.com/bxcodec/goqu"
"github.com/bxcodec/goqu/consumer"
headerKey "github.com/bxcodec/goqu/headers/key"
headerVal "github.com/bxcodec/goqu/headers/value"
"github.com/bxcodec/goqu/middleware"
"github.com/bxcodec/goqueue"
"github.com/bxcodec/goqueue/consumer"
headerKey "github.com/bxcodec/goqueue/headers/key"
headerVal "github.com/bxcodec/goqueue/headers/value"
"github.com/bxcodec/goqueue/middleware"
)

// rabbitMQ is the subscriber handler for rabbitmq
Expand All @@ -28,7 +28,7 @@ type rabbitMQ struct {

var defaultOption = func() *consumer.Option {
return &consumer.Option{
Middlewares: []goqu.InboundMessageHandlerMiddlewareFunc{},
Middlewares: []goqueue.InboundMessageHandlerMiddlewareFunc{},
}
}

Expand All @@ -37,7 +37,7 @@ func NewConsumer(
client *amqp.Connection,
consumerChannel *amqp.Channel,
requeueChannel *amqp.Channel,
opts ...consumer.OptionFunc) goqu.Consumer {
opts ...consumer.OptionFunc) goqueue.Consumer {
opt := defaultOption()
for _, o := range opts {
o(opt)
Expand Down Expand Up @@ -84,7 +84,7 @@ func (r *rabbitMQ) initConsumer() {
false,
// queue arguments
// TODO(bxcodec): to support custom queue arguments on consumer initialization
// https://github.com/bxcodec/goqu/issues/1
// https://github.com/bxcodec/goqueue/issues/1
nil,
)
if err != nil {
Expand All @@ -99,7 +99,7 @@ func (r *rabbitMQ) initConsumer() {
// If the context is canceled, the method stops consuming messages and returns.
// The method returns an error if there was an issue consuming messages.
func (r *rabbitMQ) Consume(ctx context.Context,
h goqu.InboundMessageHandler,
h goqueue.InboundMessageHandler,
meta map[string]interface{}) (err error) {
logrus.WithFields(logrus.Fields{
"queue_name": r.option.QueueName,
Expand All @@ -115,7 +115,7 @@ func (r *rabbitMQ) Consume(ctx context.Context,
}).Info("stopping the worker")
return
case receivedMsg := <-r.msgReceiver:
msg := &goqu.Message{
msg := &goqueue.Message{
ID: extractHeaderString(receivedMsg.Headers, headerKey.AppID),
Timestamp: extractHeaderTime(receivedMsg.Headers, headerKey.PublishedTimestamp),
Action: receivedMsg.RoutingKey,
Expand All @@ -126,7 +126,7 @@ func (r *rabbitMQ) Consume(ctx context.Context,
ServiceAgent: headerVal.GoquServiceAgent(extractHeaderString(receivedMsg.Headers, headerKey.QueueServiceAgent)),
}
msg.SetSchemaVersion(extractHeaderString(receivedMsg.Headers, headerKey.SchemaVer))
m := goqu.InboundMessage{
m := goqueue.InboundMessage{
Message: *msg,
RetryCount: extractHeaderInt(receivedMsg.Headers, headerKey.RetryCount),
Metadata: map[string]interface{}{
Expand Down Expand Up @@ -161,9 +161,9 @@ func (r *rabbitMQ) Consume(ctx context.Context,
err = receivedMsg.Nack(false, false)
return
},
Requeue: func(ctx context.Context, delayFn goqu.DelayFn) (err error) {
Requeue: func(ctx context.Context, delayFn goqueue.DelayFn) (err error) {
if delayFn == nil {
delayFn = goqu.DefaultDelayFn
delayFn = goqueue.DefaultDelayFn
}
retries := extractHeaderInt(receivedMsg.Headers, headerKey.RetryCount)
retries++
Expand Down
2 changes: 1 addition & 1 deletion delayfn.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package goqu
package goqueue

type DelayFn func(retries int64) (delay int64)

Expand Down
4 changes: 2 additions & 2 deletions encoding.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package goqu
package goqueue

import (
"context"
"encoding/json"
"sync"

headerVal "github.com/bxcodec/goqu/headers/value"
headerVal "github.com/bxcodec/goqueue/headers/value"
)

type EncoderFn func(ctx context.Context, m Message) (data []byte, err error)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/bxcodec/goqu
module github.com/bxcodec/goqueue

go 1.20

Expand Down
16 changes: 8 additions & 8 deletions headers/key/const.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package key

const (
AppID = "goqu-app-id"
PublishedTimestamp = "goqu-published-timestamp"
RequeuedTimestamp = "goqu-requeued-timestamp"
RetryCount = "goqu-retry-count"
SchemaVer = "goqu-schema-version"
ContentType = "goqu-content-type"
QueueServiceAgent = "goqu-queue-service-agent"
MessageID = "goqu-message-id"
AppID = "goqueue-app-id"
PublishedTimestamp = "goqueue-published-timestamp"
RequeuedTimestamp = "goqueue-requeued-timestamp"
RetryCount = "goqueue-retry-count"
SchemaVer = "goqueue-schema-version"
ContentType = "goqueue-content-type"
QueueServiceAgent = "goqueue-queue-service-agent"
MessageID = "goqueue-message-id"
)
4 changes: 2 additions & 2 deletions message.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package goqu
package goqueue

import (
"time"

headerVal "github.com/bxcodec/goqu/headers/value"
headerVal "github.com/bxcodec/goqueue/headers/value"
)

// Message represents a message that will be published to the queue
Expand Down
30 changes: 15 additions & 15 deletions middleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ package middleware
import (
"context"

"github.com/bxcodec/goqu"
"github.com/bxcodec/goqueue"
"github.com/sirupsen/logrus"
)

// ApplyHandlerMiddleware applies a series of middleware functions to an inbound message handler function.
// It takes an inbound message handler function `h` and a variadic list of middleware functions `middleware`.
// Each middleware function is applied to the handler function in the order they are provided.
// The resulting handler function with all the middleware applied is returned.
func ApplyHandlerMiddleware(h goqu.InboundMessageHandlerFunc, middleware ...goqu.InboundMessageHandlerMiddlewareFunc) goqu.InboundMessageHandlerFunc {
func ApplyHandlerMiddleware(h goqueue.InboundMessageHandlerFunc, middleware ...goqueue.InboundMessageHandlerMiddlewareFunc) goqueue.InboundMessageHandlerFunc {
for _, middleware := range middleware {
h = middleware(h)
}
Expand All @@ -21,7 +21,7 @@ func ApplyHandlerMiddleware(h goqu.InboundMessageHandlerFunc, middleware ...goqu
// ApplyPublisherMiddleware applies the given publisher middleware functions to the provided publisher function.
// It iterates over the middleware functions and applies them in the order they are provided.
// The resulting publisher function is returned.
func ApplyPublisherMiddleware(p goqu.PublisherFunc, middleware ...goqu.PublisherMiddlewareFunc) goqu.PublisherFunc {
func ApplyPublisherMiddleware(p goqueue.PublisherFunc, middleware ...goqueue.PublisherMiddlewareFunc) goqueue.PublisherFunc {
for _, middleware := range middleware {
p = middleware(p)
}
Expand All @@ -32,9 +32,9 @@ func ApplyPublisherMiddleware(p goqu.PublisherFunc, middleware ...goqu.Publisher
// It wraps the provided `next` inbound message handler function and executes some additional logic after it.
// The additional logic includes logging any error that occurred during the execution of the `next` function
// and logging a message indicating that the middleware has been executed.
func HelloWorldMiddlewareExecuteAfterHandler() goqu.InboundMessageHandlerMiddlewareFunc {
return func(next goqu.InboundMessageHandlerFunc) goqu.InboundMessageHandlerFunc {
return func(ctx context.Context, m goqu.InboundMessage) (err error) {
func HelloWorldMiddlewareExecuteAfterHandler() goqueue.InboundMessageHandlerMiddlewareFunc {
return func(next goqueue.InboundMessageHandlerFunc) goqueue.InboundMessageHandlerFunc {
return func(ctx context.Context, m goqueue.InboundMessage) (err error) {
err = next(ctx, m)
if err != nil {
logrus.Error("Error: ", err, "processing to sent the error to Sentry")
Expand All @@ -46,9 +46,9 @@ func HelloWorldMiddlewareExecuteAfterHandler() goqu.InboundMessageHandlerMiddlew
}

// HelloWorldMiddlewareExecuteBeforeHandler returns a middleware function that logs a message before executing the handler.
func HelloWorldMiddlewareExecuteBeforeHandler() goqu.InboundMessageHandlerMiddlewareFunc {
return func(next goqu.InboundMessageHandlerFunc) goqu.InboundMessageHandlerFunc {
return func(ctx context.Context, m goqu.InboundMessage) (err error) {
func HelloWorldMiddlewareExecuteBeforeHandler() goqueue.InboundMessageHandlerMiddlewareFunc {
return func(next goqueue.InboundMessageHandlerFunc) goqueue.InboundMessageHandlerFunc {
return func(ctx context.Context, m goqueue.InboundMessage) (err error) {
logrus.Info("hello-world-first-middleware executed")
return next(ctx, m)
}
Expand All @@ -57,9 +57,9 @@ func HelloWorldMiddlewareExecuteBeforeHandler() goqu.InboundMessageHandlerMiddle

// HelloWorldMiddlewareExecuteAfterPublisher returns a PublisherMiddlewareFunc that executes after the publisher function.
// It logs any error that occurs during publishing and logs a message indicating that the last middleware has been executed.
func HelloWorldMiddlewareExecuteAfterPublisher() goqu.PublisherMiddlewareFunc {
return func(next goqu.PublisherFunc) goqu.PublisherFunc {
return func(ctx context.Context, m goqu.Message) (err error) {
func HelloWorldMiddlewareExecuteAfterPublisher() goqueue.PublisherMiddlewareFunc {
return func(next goqueue.PublisherFunc) goqueue.PublisherFunc {
return func(ctx context.Context, m goqueue.Message) (err error) {
err = next(ctx, m)
if err != nil {
logrus.Error("got error while publishing the message: ", err)
Expand All @@ -73,9 +73,9 @@ func HelloWorldMiddlewareExecuteAfterPublisher() goqu.PublisherMiddlewareFunc {

// HelloWorldMiddlewareExecuteBeforePublisher is a function that returns a PublisherMiddlewareFunc.
// It wraps the provided PublisherFunc with a middleware that logs a message before executing the next function.
func HelloWorldMiddlewareExecuteBeforePublisher() goqu.PublisherMiddlewareFunc {
return func(next goqu.PublisherFunc) goqu.PublisherFunc {
return func(ctx context.Context, e goqu.Message) (err error) {
func HelloWorldMiddlewareExecuteBeforePublisher() goqueue.PublisherMiddlewareFunc {
return func(next goqueue.PublisherFunc) goqueue.PublisherFunc {
return func(ctx context.Context, e goqueue.Message) (err error) {
logrus.Info("hello-world-first-middleware executed")
return next(ctx, e)
}
Expand Down
4 changes: 2 additions & 2 deletions option.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package goqu
package goqueue

type Option struct {
// number of consumer/worker/goroutine that will be spawned in one goqu instance
// number of consumer/worker/goroutine that will be spawned in one goqueue instance
NumberOfConsumer int
Consumer Consumer
Publisher Publisher
Expand Down
2 changes: 1 addition & 1 deletion publisher.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package goqu
package goqueue

import "context"

Expand Down
6 changes: 3 additions & 3 deletions publisher/option.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package publisher

import "github.com/bxcodec/goqu"
import "github.com/bxcodec/goqueue"

// Option define the option property
type Option struct {
PublisherID string
Middlewares []goqu.PublisherMiddlewareFunc
Middlewares []goqueue.PublisherMiddlewareFunc
}

// OptionFunc used for option chaining
Expand All @@ -17,7 +17,7 @@ func WithPublisherID(id string) OptionFunc {
}
}

func WithMiddlewares(middlewares ...goqu.PublisherMiddlewareFunc) OptionFunc {
func WithMiddlewares(middlewares ...goqueue.PublisherMiddlewareFunc) OptionFunc {
return func(opt *Option) {
opt.Middlewares = middlewares
}
Expand Down
22 changes: 11 additions & 11 deletions publisher/rabbitmq/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"context"
"time"

"github.com/bxcodec/goqu"
headerKey "github.com/bxcodec/goqu/headers/key"
headerVal "github.com/bxcodec/goqu/headers/value"
"github.com/bxcodec/goqu/middleware"
"github.com/bxcodec/goqu/publisher"
"github.com/bxcodec/goqueue"
headerKey "github.com/bxcodec/goqueue/headers/key"
headerVal "github.com/bxcodec/goqueue/headers/value"
"github.com/bxcodec/goqueue/middleware"
"github.com/bxcodec/goqueue/publisher"
"github.com/google/uuid"
amqp "github.com/rabbitmq/amqp091-go"
)
Expand All @@ -20,15 +20,15 @@ type rabbitMQ struct {

var defaultOption = func() *publisher.Option {
return &publisher.Option{
Middlewares: []goqu.PublisherMiddlewareFunc{},
Middlewares: []goqueue.PublisherMiddlewareFunc{},
PublisherID: uuid.New().String(),
}
}

func New(
publisherChannel *amqp.Channel,
opts ...publisher.OptionFunc,
) goqu.Publisher {
) goqueue.Publisher {
opt := defaultOption()
for _, o := range opts {
o(opt)
Expand All @@ -40,17 +40,17 @@ func New(
}
}

func (r *rabbitMQ) Publish(ctx context.Context, m goqu.Message) (err error) {
func (r *rabbitMQ) Publish(ctx context.Context, m goqueue.Message) (err error) {
publishFunc := middleware.ApplyPublisherMiddleware(
r.buildPublisher(),
r.option.Middlewares...,
)
return publishFunc(ctx, m)
}

func (r *rabbitMQ) buildPublisher() goqu.PublisherFunc {
return func(ctx context.Context, m goqu.Message) (err error) {
data, err := goqu.GetGoquEncoding(m.ContentType).Encode(ctx, m)
func (r *rabbitMQ) buildPublisher() goqueue.PublisherFunc {
return func(ctx context.Context, m goqueue.Message) (err error) {
data, err := goqueue.GetGoquEncoding(m.ContentType).Encode(ctx, m)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion queueservice.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package goqu
package goqueue

import (
"context"
Expand Down

0 comments on commit a9c6351

Please sign in to comment.