Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: change name #3

Merged
merged 1 commit into from
Jun 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# goqu
GoQu - Golang Queue Wrapper for all queue platforms
# goqueue

GoQueue - Golang Queue Wrapper for all queue platforms
2 changes: 1 addition & 1 deletion consumer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package goqu
package goqueue

import "context"

Expand Down
6 changes: 3 additions & 3 deletions consumer/option.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package consumer

import "github.com/bxcodec/goqu"
import "github.com/bxcodec/goqueue"

// Option represents the configuration options for the consumer.
type Option struct {
Expand All @@ -9,7 +9,7 @@ type Option struct {
// QueueName specifies the name of the queue to consume messages from.
QueueName string
// Middlewares is a list of middleware functions to be applied to the inbound message handler.
Middlewares []goqu.InboundMessageHandlerMiddlewareFunc
Middlewares []goqueue.InboundMessageHandlerMiddlewareFunc
}

// OptionFunc is a function type that takes an `opt` parameter of type `*Option`.
Expand All @@ -35,7 +35,7 @@ func WithQueueName(name string) OptionFunc {
// WithMiddlewares is an OptionFunc that sets the provided middlewares for the consumer.
// Middlewares are used to process inbound messages before they are handled by the consumer.
// The middlewares are applied in the order they are provided.
func WithMiddlewares(middlewares ...goqu.InboundMessageHandlerMiddlewareFunc) OptionFunc {
func WithMiddlewares(middlewares ...goqueue.InboundMessageHandlerMiddlewareFunc) OptionFunc {
return func(opt *Option) {
opt.Middlewares = middlewares
}
Expand Down
26 changes: 13 additions & 13 deletions consumer/rabbitmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
amqp "github.com/rabbitmq/amqp091-go"
"github.com/sirupsen/logrus"

"github.com/bxcodec/goqu"
"github.com/bxcodec/goqu/consumer"
headerKey "github.com/bxcodec/goqu/headers/key"
headerVal "github.com/bxcodec/goqu/headers/value"
"github.com/bxcodec/goqu/middleware"
"github.com/bxcodec/goqueue"
"github.com/bxcodec/goqueue/consumer"
headerKey "github.com/bxcodec/goqueue/headers/key"
headerVal "github.com/bxcodec/goqueue/headers/value"
"github.com/bxcodec/goqueue/middleware"
)

// rabbitMQ is the subscriber handler for rabbitmq
Expand All @@ -28,7 +28,7 @@ type rabbitMQ struct {

var defaultOption = func() *consumer.Option {
return &consumer.Option{
Middlewares: []goqu.InboundMessageHandlerMiddlewareFunc{},
Middlewares: []goqueue.InboundMessageHandlerMiddlewareFunc{},
}
}

Expand All @@ -37,7 +37,7 @@ func NewConsumer(
client *amqp.Connection,
consumerChannel *amqp.Channel,
requeueChannel *amqp.Channel,
opts ...consumer.OptionFunc) goqu.Consumer {
opts ...consumer.OptionFunc) goqueue.Consumer {
opt := defaultOption()
for _, o := range opts {
o(opt)
Expand Down Expand Up @@ -84,7 +84,7 @@ func (r *rabbitMQ) initConsumer() {
false,
// queue arguments
// TODO(bxcodec): to support custom queue arguments on consumer initialization
// https://github.com/bxcodec/goqu/issues/1
// https://github.com/bxcodec/goqueue/issues/1
nil,
)
if err != nil {
Expand All @@ -99,7 +99,7 @@ func (r *rabbitMQ) initConsumer() {
// If the context is canceled, the method stops consuming messages and returns.
// The method returns an error if there was an issue consuming messages.
func (r *rabbitMQ) Consume(ctx context.Context,
h goqu.InboundMessageHandler,
h goqueue.InboundMessageHandler,
meta map[string]interface{}) (err error) {
logrus.WithFields(logrus.Fields{
"queue_name": r.option.QueueName,
Expand All @@ -115,7 +115,7 @@ func (r *rabbitMQ) Consume(ctx context.Context,
}).Info("stopping the worker")
return
case receivedMsg := <-r.msgReceiver:
msg := &goqu.Message{
msg := &goqueue.Message{
ID: extractHeaderString(receivedMsg.Headers, headerKey.AppID),
Timestamp: extractHeaderTime(receivedMsg.Headers, headerKey.PublishedTimestamp),
Action: receivedMsg.RoutingKey,
Expand All @@ -126,7 +126,7 @@ func (r *rabbitMQ) Consume(ctx context.Context,
ServiceAgent: headerVal.GoquServiceAgent(extractHeaderString(receivedMsg.Headers, headerKey.QueueServiceAgent)),
}
msg.SetSchemaVersion(extractHeaderString(receivedMsg.Headers, headerKey.SchemaVer))
m := goqu.InboundMessage{
m := goqueue.InboundMessage{
Message: *msg,
RetryCount: extractHeaderInt(receivedMsg.Headers, headerKey.RetryCount),
Metadata: map[string]interface{}{
Expand Down Expand Up @@ -161,9 +161,9 @@ func (r *rabbitMQ) Consume(ctx context.Context,
err = receivedMsg.Nack(false, false)
return
},
Requeue: func(ctx context.Context, delayFn goqu.DelayFn) (err error) {
Requeue: func(ctx context.Context, delayFn goqueue.DelayFn) (err error) {
if delayFn == nil {
delayFn = goqu.DefaultDelayFn
delayFn = goqueue.DefaultDelayFn
}
retries := extractHeaderInt(receivedMsg.Headers, headerKey.RetryCount)
retries++
Expand Down
2 changes: 1 addition & 1 deletion delayfn.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package goqu
package goqueue

type DelayFn func(retries int64) (delay int64)

Expand Down
4 changes: 2 additions & 2 deletions encoding.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package goqu
package goqueue

import (
"context"
"encoding/json"
"sync"

headerVal "github.com/bxcodec/goqu/headers/value"
headerVal "github.com/bxcodec/goqueue/headers/value"
)

type EncoderFn func(ctx context.Context, m Message) (data []byte, err error)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/bxcodec/goqu
module github.com/bxcodec/goqueue

go 1.20

Expand Down
16 changes: 8 additions & 8 deletions headers/key/const.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package key

const (
AppID = "goqu-app-id"
PublishedTimestamp = "goqu-published-timestamp"
RequeuedTimestamp = "goqu-requeued-timestamp"
RetryCount = "goqu-retry-count"
SchemaVer = "goqu-schema-version"
ContentType = "goqu-content-type"
QueueServiceAgent = "goqu-queue-service-agent"
MessageID = "goqu-message-id"
AppID = "goqueue-app-id"
PublishedTimestamp = "goqueue-published-timestamp"
RequeuedTimestamp = "goqueue-requeued-timestamp"
RetryCount = "goqueue-retry-count"
SchemaVer = "goqueue-schema-version"
ContentType = "goqueue-content-type"
QueueServiceAgent = "goqueue-queue-service-agent"
MessageID = "goqueue-message-id"
)
4 changes: 2 additions & 2 deletions message.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package goqu
package goqueue

import (
"time"

headerVal "github.com/bxcodec/goqu/headers/value"
headerVal "github.com/bxcodec/goqueue/headers/value"
)

// Message represents a message that will be published to the queue
Expand Down
30 changes: 15 additions & 15 deletions middleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ package middleware
import (
"context"

"github.com/bxcodec/goqu"
"github.com/bxcodec/goqueue"
"github.com/sirupsen/logrus"
)

// ApplyHandlerMiddleware applies a series of middleware functions to an inbound message handler function.
// It takes an inbound message handler function `h` and a variadic list of middleware functions `middleware`.
// Each middleware function is applied to the handler function in the order they are provided.
// The resulting handler function with all the middleware applied is returned.
func ApplyHandlerMiddleware(h goqu.InboundMessageHandlerFunc, middleware ...goqu.InboundMessageHandlerMiddlewareFunc) goqu.InboundMessageHandlerFunc {
func ApplyHandlerMiddleware(h goqueue.InboundMessageHandlerFunc, middleware ...goqueue.InboundMessageHandlerMiddlewareFunc) goqueue.InboundMessageHandlerFunc {
for _, middleware := range middleware {
h = middleware(h)
}
Expand All @@ -21,7 +21,7 @@ func ApplyHandlerMiddleware(h goqu.InboundMessageHandlerFunc, middleware ...goqu
// ApplyPublisherMiddleware applies the given publisher middleware functions to the provided publisher function.
// It iterates over the middleware functions and applies them in the order they are provided.
// The resulting publisher function is returned.
func ApplyPublisherMiddleware(p goqu.PublisherFunc, middleware ...goqu.PublisherMiddlewareFunc) goqu.PublisherFunc {
func ApplyPublisherMiddleware(p goqueue.PublisherFunc, middleware ...goqueue.PublisherMiddlewareFunc) goqueue.PublisherFunc {
for _, middleware := range middleware {
p = middleware(p)
}
Expand All @@ -32,9 +32,9 @@ func ApplyPublisherMiddleware(p goqu.PublisherFunc, middleware ...goqu.Publisher
// It wraps the provided `next` inbound message handler function and executes some additional logic after it.
// The additional logic includes logging any error that occurred during the execution of the `next` function
// and logging a message indicating that the middleware has been executed.
func HelloWorldMiddlewareExecuteAfterHandler() goqu.InboundMessageHandlerMiddlewareFunc {
return func(next goqu.InboundMessageHandlerFunc) goqu.InboundMessageHandlerFunc {
return func(ctx context.Context, m goqu.InboundMessage) (err error) {
func HelloWorldMiddlewareExecuteAfterHandler() goqueue.InboundMessageHandlerMiddlewareFunc {
return func(next goqueue.InboundMessageHandlerFunc) goqueue.InboundMessageHandlerFunc {
return func(ctx context.Context, m goqueue.InboundMessage) (err error) {
err = next(ctx, m)
if err != nil {
logrus.Error("Error: ", err, "processing to sent the error to Sentry")
Expand All @@ -46,9 +46,9 @@ func HelloWorldMiddlewareExecuteAfterHandler() goqu.InboundMessageHandlerMiddlew
}

// HelloWorldMiddlewareExecuteBeforeHandler returns a middleware function that logs a message before executing the handler.
func HelloWorldMiddlewareExecuteBeforeHandler() goqu.InboundMessageHandlerMiddlewareFunc {
return func(next goqu.InboundMessageHandlerFunc) goqu.InboundMessageHandlerFunc {
return func(ctx context.Context, m goqu.InboundMessage) (err error) {
func HelloWorldMiddlewareExecuteBeforeHandler() goqueue.InboundMessageHandlerMiddlewareFunc {
return func(next goqueue.InboundMessageHandlerFunc) goqueue.InboundMessageHandlerFunc {
return func(ctx context.Context, m goqueue.InboundMessage) (err error) {
logrus.Info("hello-world-first-middleware executed")
return next(ctx, m)
}
Expand All @@ -57,9 +57,9 @@ func HelloWorldMiddlewareExecuteBeforeHandler() goqu.InboundMessageHandlerMiddle

// HelloWorldMiddlewareExecuteAfterPublisher returns a PublisherMiddlewareFunc that executes after the publisher function.
// It logs any error that occurs during publishing and logs a message indicating that the last middleware has been executed.
func HelloWorldMiddlewareExecuteAfterPublisher() goqu.PublisherMiddlewareFunc {
return func(next goqu.PublisherFunc) goqu.PublisherFunc {
return func(ctx context.Context, m goqu.Message) (err error) {
func HelloWorldMiddlewareExecuteAfterPublisher() goqueue.PublisherMiddlewareFunc {
return func(next goqueue.PublisherFunc) goqueue.PublisherFunc {
return func(ctx context.Context, m goqueue.Message) (err error) {
err = next(ctx, m)
if err != nil {
logrus.Error("got error while publishing the message: ", err)
Expand All @@ -73,9 +73,9 @@ func HelloWorldMiddlewareExecuteAfterPublisher() goqu.PublisherMiddlewareFunc {

// HelloWorldMiddlewareExecuteBeforePublisher is a function that returns a PublisherMiddlewareFunc.
// It wraps the provided PublisherFunc with a middleware that logs a message before executing the next function.
func HelloWorldMiddlewareExecuteBeforePublisher() goqu.PublisherMiddlewareFunc {
return func(next goqu.PublisherFunc) goqu.PublisherFunc {
return func(ctx context.Context, e goqu.Message) (err error) {
func HelloWorldMiddlewareExecuteBeforePublisher() goqueue.PublisherMiddlewareFunc {
return func(next goqueue.PublisherFunc) goqueue.PublisherFunc {
return func(ctx context.Context, e goqueue.Message) (err error) {
logrus.Info("hello-world-first-middleware executed")
return next(ctx, e)
}
Expand Down
4 changes: 2 additions & 2 deletions option.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package goqu
package goqueue

type Option struct {
// number of consumer/worker/goroutine that will be spawned in one goqu instance
// number of consumer/worker/goroutine that will be spawned in one goqueue instance
NumberOfConsumer int
Consumer Consumer
Publisher Publisher
Expand Down
2 changes: 1 addition & 1 deletion publisher.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package goqu
package goqueue

import "context"

Expand Down
6 changes: 3 additions & 3 deletions publisher/option.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package publisher

import "github.com/bxcodec/goqu"
import "github.com/bxcodec/goqueue"

// Option define the option property
type Option struct {
PublisherID string
Middlewares []goqu.PublisherMiddlewareFunc
Middlewares []goqueue.PublisherMiddlewareFunc
}

// OptionFunc used for option chaining
Expand All @@ -17,7 +17,7 @@ func WithPublisherID(id string) OptionFunc {
}
}

func WithMiddlewares(middlewares ...goqu.PublisherMiddlewareFunc) OptionFunc {
func WithMiddlewares(middlewares ...goqueue.PublisherMiddlewareFunc) OptionFunc {
return func(opt *Option) {
opt.Middlewares = middlewares
}
Expand Down
22 changes: 11 additions & 11 deletions publisher/rabbitmq/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"context"
"time"

"github.com/bxcodec/goqu"
headerKey "github.com/bxcodec/goqu/headers/key"
headerVal "github.com/bxcodec/goqu/headers/value"
"github.com/bxcodec/goqu/middleware"
"github.com/bxcodec/goqu/publisher"
"github.com/bxcodec/goqueue"
headerKey "github.com/bxcodec/goqueue/headers/key"
headerVal "github.com/bxcodec/goqueue/headers/value"
"github.com/bxcodec/goqueue/middleware"
"github.com/bxcodec/goqueue/publisher"
"github.com/google/uuid"
amqp "github.com/rabbitmq/amqp091-go"
)
Expand All @@ -20,15 +20,15 @@ type rabbitMQ struct {

var defaultOption = func() *publisher.Option {
return &publisher.Option{
Middlewares: []goqu.PublisherMiddlewareFunc{},
Middlewares: []goqueue.PublisherMiddlewareFunc{},
PublisherID: uuid.New().String(),
}
}

func New(
publisherChannel *amqp.Channel,
opts ...publisher.OptionFunc,
) goqu.Publisher {
) goqueue.Publisher {
opt := defaultOption()
for _, o := range opts {
o(opt)
Expand All @@ -40,17 +40,17 @@ func New(
}
}

func (r *rabbitMQ) Publish(ctx context.Context, m goqu.Message) (err error) {
func (r *rabbitMQ) Publish(ctx context.Context, m goqueue.Message) (err error) {
publishFunc := middleware.ApplyPublisherMiddleware(
r.buildPublisher(),
r.option.Middlewares...,
)
return publishFunc(ctx, m)
}

func (r *rabbitMQ) buildPublisher() goqu.PublisherFunc {
return func(ctx context.Context, m goqu.Message) (err error) {
data, err := goqu.GetGoquEncoding(m.ContentType).Encode(ctx, m)
func (r *rabbitMQ) buildPublisher() goqueue.PublisherFunc {
return func(ctx context.Context, m goqueue.Message) (err error) {
data, err := goqueue.GetGoquEncoding(m.ContentType).Encode(ctx, m)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion queueservice.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package goqu
package goqueue

import (
"context"
Expand Down