Skip to content

Commit

Permalink
rename messageReceiveFunc to messageSink
Browse files Browse the repository at this point in the history
  • Loading branch information
jandelgado committed Sep 7, 2024
1 parent 2244b55 commit 1a16ec2
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 120 deletions.
26 changes: 13 additions & 13 deletions cmd/rabtap/cmd_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ import (

// CmdSubscribeArg contains arguments for the subscribe command
type CmdSubscribeArg struct {
amqpURL *url.URL
queue string
tlsConfig *tls.Config
messageReceiveFunc MessageReceiveFunc
termPred Predicate
filterPred Predicate
reject bool
requeue bool
args rabtap.KeyValueMap
timeout time.Duration
amqpURL *url.URL
queue string
tlsConfig *tls.Config
messageSink MessageSink
termPred Predicate
filterPred Predicate
reject bool
requeue bool
args rabtap.KeyValueMap
timeout time.Duration
}

// cmdSub subscribes to messages from the given queue
Expand All @@ -45,11 +45,11 @@ func cmdSubscribe(ctx context.Context, cmd CmdSubscribeArg) error {
g.Go(func() error { return subscriber.EstablishSubscription(ctx, cmd.queue, messageChannel, errorChannel) })
g.Go(func() error {

acknowledger := createAcknowledgeFunc(cmd.reject, cmd.requeue)
err := messageReceiveLoop(ctx,
acknowledger := CreateAcknowledgeFunc(cmd.reject, cmd.requeue)
err := MessageReceiveLoop(ctx,
messageChannel,
errorChannel,
cmd.messageReceiveFunc,
cmd.messageSink,
cmd.filterPred,
cmd.termPred,
acknowledger,
Expand Down
26 changes: 13 additions & 13 deletions cmd/rabtap/cmd_subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ func TestCmdSubFailsEarlyWhenBrokerIsNotAvailable(t *testing.T) {
go func() {
// we expect cmdSubscribe to return
cmdSubscribe(ctx, CmdSubscribeArg{
amqpURL: amqpURL,
queue: "queue",
tlsConfig: &tls.Config{},
messageReceiveFunc: func(rabtap.TapMessage) error { return nil },
termPred: &constantPred{false},
timeout: time.Second * 10,
amqpURL: amqpURL,
queue: "queue",
tlsConfig: &tls.Config{},
messageSink: func(rabtap.TapMessage) error { return nil },
termPred: &constantPred{false},
timeout: time.Second * 10,
})
done <- true
}()
Expand Down Expand Up @@ -78,13 +78,13 @@ func TestCmdSub(t *testing.T) {

// subscribe to testQueue
go cmdSubscribe(ctx, CmdSubscribeArg{
amqpURL: amqpURL,
queue: testQueue,
tlsConfig: tlsConfig,
messageReceiveFunc: receiveFunc,
filterPred: constantPred{true},
termPred: constantPred{false},
timeout: time.Second * 10,
amqpURL: amqpURL,
queue: testQueue,
tlsConfig: tlsConfig,
messageSink: receiveFunc,
filterPred: constantPred{true},
termPred: constantPred{false},
timeout: time.Second * 10,
})

time.Sleep(time.Second * 1)
Expand Down
18 changes: 9 additions & 9 deletions cmd/rabtap/cmd_tap.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ import (
)

type CmdTapArg struct {
tapConfig []rabtap.TapConfiguration
tlsConfig *tls.Config
messageReceiveFunc MessageReceiveFunc
termPred Predicate
filterPred Predicate
timeout time.Duration
tapConfig []rabtap.TapConfiguration
tlsConfig *tls.Config
messageSink MessageSink
termPred Predicate
filterPred Predicate
timeout time.Duration
}

// cmdTap taps to the given exchanges and displays or saves the received
Expand All @@ -43,11 +43,11 @@ func cmdTap(
})
}
g.Go(func() error {
acknowledger := createAcknowledgeFunc(false, false) // ACK
err := messageReceiveLoop(ctx,
acknowledger := CreateAcknowledgeFunc(false, false) // ACK
err := MessageReceiveLoop(ctx,
tapMessageChannel,
errorChannel,
cmd.messageReceiveFunc,
cmd.messageSink,
cmd.filterPred,
cmd.termPred,
acknowledger,
Expand Down
2 changes: 1 addition & 1 deletion cmd/rabtap/cmd_tap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestCmdTap(t *testing.T) {
// when
go cmdTap(ctx, CmdTapArg{tapConfig: tapConfig,
tlsConfig: &tls.Config{},
messageReceiveFunc: receiveFunc,
messageSink: receiveFunc,
filterPred: constantPred{true},
termPred: constantPred{false},
timeout: time.Second * 10})
Expand Down
40 changes: 20 additions & 20 deletions cmd/rabtap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,14 @@ func startCmdPublish(ctx context.Context, args CommandLineArgs) {
}

func startCmdSubscribe(ctx context.Context, args CommandLineArgs) {
opts := MessageReceiveFuncOptions{
opts := MessageSinkOptions{
out: NewColorableWriter(os.Stdout),
format: args.Format,
silent: args.Silent,
optSaveDir: args.SaveDir,
filenameProvider: defaultFilenameProvider,
}
messageReceiveFunc, err := createMessageReceiveFunc(opts)
messageSink, err := NewMessageSink(opts)
failOnError(err, "options", os.Exit)

termPred, err := NewLoopCountPred(args.Limit)
Expand All @@ -167,29 +167,29 @@ func startCmdSubscribe(ctx context.Context, args CommandLineArgs) {
failOnError(err, fmt.Sprintf("invalid message filter predicate '%s'", args.Filter), os.Exit)

err = cmdSubscribe(ctx, CmdSubscribeArg{
amqpURL: args.AMQPURL,
queue: args.QueueName,
requeue: args.Requeue,
reject: args.Reject,
tlsConfig: getTLSConfig(args.InsecureTLS, args.TLSCertFile, args.TLSKeyFile, args.TLSCaFile),
messageReceiveFunc: messageReceiveFunc,
filterPred: filterPred,
termPred: termPred,
args: args.Args,
timeout: args.IdleTimeout,
amqpURL: args.AMQPURL,
queue: args.QueueName,
requeue: args.Requeue,
reject: args.Reject,
tlsConfig: getTLSConfig(args.InsecureTLS, args.TLSCertFile, args.TLSKeyFile, args.TLSCaFile),
messageSink: messageSink,
filterPred: filterPred,
termPred: termPred,
args: args.Args,
timeout: args.IdleTimeout,
})
failOnError(err, "error subscribing messages", os.Exit)
}

func startCmdTap(ctx context.Context, args CommandLineArgs) {
opts := MessageReceiveFuncOptions{
opts := MessageSinkOptions{
out: NewColorableWriter(os.Stdout),
format: args.Format,
silent: args.Silent,
optSaveDir: args.SaveDir,
filenameProvider: defaultFilenameProvider,
}
messageReceiveFunc, err := createMessageReceiveFunc(opts)
messageSink, err := NewMessageSink(opts)
failOnError(err, "options", os.Exit)
termPred, err := NewLoopCountPred(args.Limit)
failOnError(err, "invalid message limit predicate", os.Exit)
Expand All @@ -198,12 +198,12 @@ func startCmdTap(ctx context.Context, args CommandLineArgs) {

cmdTap(ctx,
CmdTapArg{
tapConfig: args.TapConfig,
tlsConfig: getTLSConfig(args.InsecureTLS, args.TLSCertFile, args.TLSKeyFile, args.TLSCaFile),
messageReceiveFunc: messageReceiveFunc,
filterPred: filterPred,
termPred: termPred,
timeout: args.IdleTimeout,
tapConfig: args.TapConfig,
tlsConfig: getTLSConfig(args.InsecureTLS, args.TLSCertFile, args.TLSKeyFile, args.TLSCaFile),
messageSink: messageSink,
filterPred: filterPred,
termPred: termPred,
timeout: args.IdleTimeout,
})
}

Expand Down
67 changes: 33 additions & 34 deletions cmd/rabtap/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ type FilenameProvider func() string

type AcknowledgeFunc func(rabtap.TapMessage) error

type MessageReceiveFuncOptions struct {
type MessageSinkOptions struct {
out io.Writer
format string // currently: raw, json, json-nopp
silent bool
optSaveDir *string
filenameProvider FilenameProvider
}

// MessageReceiveFunc processes receiced messages from a tap.
type MessageReceiveFunc func(rabtap.TapMessage) error
// MessageSink processes received messages
type MessageSink func(rabtap.TapMessage) error

// var ErrMessageLoopEnded = errors.New("message loop ended")

Expand Down Expand Up @@ -72,10 +72,10 @@ func NewLoopCountPred(limit int64) (*LoopCountPred, error) {
return &LoopCountPred{limit}, nil
}

// createAcknowledgeFunc returns the function used to acknowledge received
// CreateAcknowledgeFunc returns the function used to acknowledge received
// functions, wich will either be ACKed or REJECTED with optional REQUEUE
// flag set.
func createAcknowledgeFunc(reject, requeue bool) AcknowledgeFunc {
func CreateAcknowledgeFunc(reject, requeue bool) AcknowledgeFunc {
return func(message rabtap.TapMessage) error {
if reject {
if err := message.AmqpMessage.Reject(requeue); err != nil {
Expand All @@ -90,17 +90,17 @@ func createAcknowledgeFunc(reject, requeue bool) AcknowledgeFunc {
}
}

// messageReceiveLoop passes received AMQP messages to messageReceiveFunc and
// MessageReceiveLoop passes received AMQP messages to the messageSink and
// handles errors received on the errorChan. AMQP messages are ascknowledged by
// the provides acknowleder function. Each message is passed to the predicate
// termPred function. If true is returned, processing is ended. Timeout
// specifies an idle timeout, which will end processing when for the given
// duration no new messages are received on messageChan.
// TODO pass in struct, limit number of arguments
func messageReceiveLoop(ctx context.Context,
func MessageReceiveLoop(ctx context.Context,
messageChan rabtap.TapChannel,
errorChan rabtap.SubscribeErrorChannel,
messageReceiveFunc MessageReceiveFunc,
messageSink MessageSink,
filterPred Predicate,
termPred Predicate,
acknowledger AcknowledgeFunc,
Expand Down Expand Up @@ -147,7 +147,7 @@ func messageReceiveLoop(ctx context.Context,
}
count += 1

if err := messageReceiveFunc(message); err != nil {
if err := messageSink(message); err != nil {
log.Error(err)
}

Expand All @@ -166,13 +166,13 @@ func messageReceiveLoop(ctx context.Context,
}
}

// NullMessageReceiveFunc is used a sentinel to terminate a chain of
// nopMessageSink is used a sentinel to terminate a chain of
// MessageReceiveFuncs
func NullMessageReceiveFunc(rabtap.TapMessage) error {
func nopMessageSink(rabtap.TapMessage) error {
return nil
}

func chainedMessageReceiveFunc(first, second MessageReceiveFunc) MessageReceiveFunc {
func messageSinkTee(first, second MessageSink) MessageSink {
return func(message rabtap.TapMessage) error {
if err := first(message); err != nil {
return err
Expand All @@ -184,83 +184,82 @@ func chainedMessageReceiveFunc(first, second MessageReceiveFunc) MessageReceiveF
// createMessageReceiveFuncWriteToJSONFile return receive func that writes the
// message and metadata to separate files in the provided directory using the
// provided marshaller.
func createMessageReceiveFuncWriteToRawFiles(dir string, marshaller marshalFunc, filenameProvider FilenameProvider) MessageReceiveFunc {
func newWriteToRawFileMessageSink(dir string, marshaller marshalFunc, filenameProvider FilenameProvider) MessageSink {
return func(message rabtap.TapMessage) error {
basename := path.Join(dir, filenameProvider())
return SaveMessageToRawFiles(basename, message, marshaller)
}
}

// createMessageReceiveFuncWriteToJSONFile return receive func that writes the
// creatmMessageReceiveFuncWriteToJSONFile return receive func that writes the
// message to a file in the provided directory using the provided marshaller.
func createMessageReceiveFuncWriteToJSONFile(dir string, marshaller marshalFunc, filenameProvider FilenameProvider) MessageReceiveFunc {
func newWriteToJSONFileMessageSink(dir string, marshaller marshalFunc, filenameProvider FilenameProvider) MessageSink {
return func(message rabtap.TapMessage) error {
filename := path.Join(dir, filenameProvider()+".json")
return SaveMessageToJSONFile(filename, message, marshaller)
}
}

// createMessageReceiveFuncPrintJSON returns a function that prints messages as
// newPrintJSONMessageSink returns a function that prints messages as
// JSON to the provided writer
// messages as JSON messages
func createMessageReceiveFuncPrintJSON(out io.Writer, marshaller marshalFunc) MessageReceiveFunc {
func newPrintJSONMessageSink(out io.Writer, marshaller marshalFunc) MessageSink {
return func(message rabtap.TapMessage) error {
return WriteMessage(out, message, marshaller)
}
}

// createMessageReceiveFuncPrintPretty returns a function that pretty prints
// newPrettyPrintJSONMessageSink returns a function that pretty prints
// received messaged to the provided writer
func createMessageReceiveFuncPrintPretty(out io.Writer) MessageReceiveFunc {
func newPrettyPrintJSONMessageSink(out io.Writer) MessageSink {
return func(message rabtap.TapMessage) error {
return PrettyPrintMessage(out, message)
}
}

func createMessageReceivePrintFunc(format string, out io.Writer, silent bool) (MessageReceiveFunc, error) {
func newPrintMessageMessageSink(format string, out io.Writer, silent bool) (MessageSink, error) {
if silent {
return NullMessageReceiveFunc, nil
return nopMessageSink, nil
}

switch format {
case "json-nopp":
return createMessageReceiveFuncPrintJSON(out, JSONMarshal), nil
return newPrintJSONMessageSink(out, JSONMarshal), nil
case "json":
return createMessageReceiveFuncPrintJSON(out, JSONMarshalIndent), nil
return newPrintJSONMessageSink(out, JSONMarshalIndent), nil
case "raw":
return createMessageReceiveFuncPrintPretty(out), nil
return newPrettyPrintJSONMessageSink(out), nil
default:
return nil, fmt.Errorf("invalid format %s", format)
}
}

func createMessageReceiveSaveFunc(format string, optSaveDir *string, filenameProvider FilenameProvider) (MessageReceiveFunc, error) {
func newSaveFileMessageSink(format string, optSaveDir *string, filenameProvider FilenameProvider) (MessageSink, error) {
if optSaveDir == nil {
return NullMessageReceiveFunc, nil
return nopMessageSink, nil
}

switch format {
case "json-nopp":
fallthrough
case "json":
return createMessageReceiveFuncWriteToJSONFile(*optSaveDir, JSONMarshalIndent, filenameProvider), nil
return newWriteToJSONFileMessageSink(*optSaveDir, JSONMarshalIndent, filenameProvider), nil
case "raw":
return createMessageReceiveFuncWriteToRawFiles(*optSaveDir, JSONMarshalIndent, filenameProvider), nil
return newWriteToRawFileMessageSink(*optSaveDir, JSONMarshalIndent, filenameProvider), nil
default:
return nil, fmt.Errorf("invalid format %s", format)
}
}

// createMessageReceiveFunc returns a MessageReceiveFunc which is invoked on
// NewMessageSink returns a MessageReceiveFunc which is invoked on
// receival of a message during tap and subscribe. Depending on the options
// set, function that optionally prints to the proviced io.Writer and
// optionally to the provided directory is returned.
func createMessageReceiveFunc(opts MessageReceiveFuncOptions) (MessageReceiveFunc, error) {

printFunc, err := createMessageReceivePrintFunc(opts.format, opts.out, opts.silent)
func NewMessageSink(opts MessageSinkOptions) (MessageSink, error) {
printFunc, err := newPrintMessageMessageSink(opts.format, opts.out, opts.silent)
if err != nil {
return printFunc, err
}
saveFunc, err := createMessageReceiveSaveFunc(opts.format, opts.optSaveDir, opts.filenameProvider)
return chainedMessageReceiveFunc(printFunc, saveFunc), err
saveFunc, err := newSaveFileMessageSink(opts.format, opts.optSaveDir, opts.filenameProvider)
return messageSinkTee(printFunc, saveFunc), err
}
Loading

0 comments on commit 1a16ec2

Please sign in to comment.