From 7a7716e940d82e367112aa6d4a45202f5273ecbe Mon Sep 17 00:00:00 2001 From: tfreville Date: Tue, 10 Nov 2020 10:19:03 +0100 Subject: [PATCH] feat(GCP/publisher): Implement publisher for GCP --- README.md | 16 +- go.mod | 2 + go.sum | 13 ++ google/gcp.go | 209 ------------------------- google/google.go | 21 +++ {internal => google/internal}/types.go | 0 google/listener.go | 30 ++++ google/message.go | 34 ++++ google/publisher.go | 112 +++++++++++++ google/publisher_test.go | 131 ++++++++++++++++ google/pubsub.go | 74 +++++++++ google/pubsub_test.go | 81 ++++++++++ google/receiver.go | 34 ++++ google/registry.go | 44 +++++- google/send.go | 1 - google/send_result.go | 39 +++++ google/utils_test.go | 84 ++++++++++ messages.go | 52 +++--- publisher.go | 47 ++++++ pubsub.go | 104 +++--------- receivers.go | 56 +++++++ 21 files changed, 858 insertions(+), 326 deletions(-) delete mode 100644 google/gcp.go create mode 100644 google/google.go rename {internal => google/internal}/types.go (100%) create mode 100644 google/message.go create mode 100644 google/publisher.go create mode 100644 google/publisher_test.go create mode 100644 google/pubsub.go create mode 100644 google/pubsub_test.go delete mode 100644 google/send.go create mode 100644 google/send_result.go create mode 100644 google/utils_test.go create mode 100644 publisher.go create mode 100644 receivers.go diff --git a/README.md b/README.md index 5204e9c..755bd88 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,17 @@ # pubsub [![PkgGoDev][godoc_img]]() [![Coverage Status][coverage_img]][coverage] [![Build Status][status_img]][status] [![Go Report Card][report_img]][report] -This package aims to simplify google pubsub usage within go application. It mostly -provides helpers and structure to easily send and listen to message while -cleanly managing topics and subscriptions. + +This package aims to simplify pubsub management pubsub implementation in go programs. It allows to regroup any pubsub +client through a single interface lessening the burdening of provider switching. + +## Usage + +- `go get github.com/elmagician/pubsub` + +## Implementations + +- Mock: testify mock implementation for unit testing +- GCP: google pubsub implementation + [//]: <> (Badges links and images) [coverage]: https://pkg.go.dev/github.com/elmagician/pubsub?tab=overview diff --git a/go.mod b/go.mod index 9e8f113..1d76f5b 100644 --- a/go.mod +++ b/go.mod @@ -7,4 +7,6 @@ require ( github.com/go-errors/errors v1.1.1 github.com/smartystreets/goconvey v1.6.4 github.com/stretchr/testify v1.6.1 + google.golang.org/api v0.32.0 + google.golang.org/grpc v1.32.0 ) diff --git a/go.sum b/go.sum index 8a119c5..572eadb 100644 --- a/go.sum +++ b/go.sum @@ -49,6 +49,7 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/go-errors/errors v1.1.1 h1:ljK/pL5ltg3qoN+OtN6yCv9HWSfMwxSx90GJCZQxYNg= github.com/go-errors/errors v1.1.1/go.mod h1:psDX2osz5VnTOnFWbDeWwS7yejl+uV3FEWEp4lssFEs= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -56,6 +57,7 @@ github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= @@ -76,6 +78,7 @@ github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrU github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -86,6 +89,7 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -99,6 +103,7 @@ github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hf github.com/google/pprof v0.0.0-20200905233945-acf8798be1f7/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= +github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -106,6 +111,7 @@ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= +github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -115,7 +121,9 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= +github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -130,6 +138,7 @@ go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.opencensus.io v0.22.4 h1:LYy1Hy3MJdrCdMwwzxA/dRok4ejH+RwNGbuoD9fCjto= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -208,6 +217,7 @@ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 h1:qwRHBd0NqMbJxfbotnDhm2ByMI1Shq4Y6oRJo21SGJA= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -241,6 +251,7 @@ golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fq golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -368,6 +379,7 @@ google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3Iji google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.31.1/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= +google.golang.org/grpc v1.32.0 h1:zWTV+LMdc3kaiJMSTOFz2UgSBgx8RNQoTGiZu3fR9S0= google.golang.org/grpc v1.32.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= @@ -378,6 +390,7 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4= +google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/google/gcp.go b/google/gcp.go deleted file mode 100644 index 463c9f5..0000000 --- a/google/gcp.go +++ /dev/null @@ -1,209 +0,0 @@ -package google - -// -// var ( -// _ pubsub.Pubsub = &GCP{} -// _ pubsub.Listener = &GCPListener{} -// ) -// -// type ( -// // Config config object to init a new publisher -// // ProjectID google cloud project -// // JSONConfigPath path to credential json file used for a publisher instance -// // Concurrency represent the number of messages that can be stored in listening channel -// // Timeout set the maximal duration for a pubsub request -// Config struct { -// ProjectID string `yaml:"projectId"` -// JSONConfigPath string `yaml:"configFile"` -// Concurrency int `yaml:"concurrency"` -// Timeout time.Duration `yaml:"timeout"` -// } -// -// // GCP structure to support Pubsub implementation from Google Cloud Platform -// GCP struct { -// Client *googlePubSub.Client -// Config Config -// injectionKey string -// } -// -// // GCPListener Pubsub listener implementation for GCP -// GCPListener struct { -// client *googlePubSub.Client -// subscription *googlePubSub.Subscription -// channelBufferSize int -// -// chError chan error -// chUnMatched chan *Unmatch -// -// ctx context.Context -// ctxCancel context.CancelFunc -// listeners map[listenerKey]listenerValue -// } -// -// listenerKey struct { -// messageVersion string -// messageType string -// } -// -// listenerValue struct { -// channel chan Message -// message func() Message -// } -// ) -// -// // New initialize a GCP Pubsub -// func New(conf Config, injectionKey string, opts ...option.ClientOption) (pubsub pubsub.Pubsub, err error) { -// if conf.JSONConfigPath != "" { -// opts = append(opts, option.WithCredentialsFile(conf.JSONConfigPath)) -// } -// var cli *googlePubSub.Client -// cli, err = googlePubSub.NewClient( -// context.Background(), conf.ProjectID, -// opts..., -// ) -// return &GCP{Client: cli, Config: conf, injectionKey: injectionKey}, err -// } -// -// // Send message to topics. Stop on first error -// func (gcp *GCP) Send(providedContext context.Context, topics []string, message Message) error { -// ctx, cancel := gcp.ctx(providedContext) -// defer cancel() -// -// attr, data, err := message.ToPubsubMessage() -// if err != nil { -// return err // error returned from ToPubsubMessage are matchable with Is. No need to wrap them -// } -// -// messageHandler := &googlePubSub.Message{ -// Data: data, -// Attributes: attr, -// } -// -// for _, topicKey := range topics { -// resp := gcp.Client.Topic(topicKey).Publish(ctx, messageHandler) -// if _, err := resp.Get(ctx); err != nil { -// return err -// } -// } -// -// return nil -// } -// -// // Listener initialize a pubSub listener for GCP. -// // It will fail with unknown subscription if subscription could not be validated from cloud. -// func (gcp *GCP) Listener(providedContext context.Context, subscriptionID string) (pubsub.Listener, error) { -// ctx, cancel := gcp.ctx(providedContext) -// -// subscription := gcp.Client.Subscription(subscriptionID) -// if exists, err := subscription.Exists(ctx); !exists || err != nil { -// cancel() -// return nil, err -// } -// -// return &GCPListener{ -// subscription: subscription, -// client: gcp.Client, -// channelBufferSize: gcp.Config.Concurrency, -// ctx: ctx, -// ctxCancel: cancel, -// listeners: map[listenerKey]listenerValue{}, -// }, nil -// } -// -// // Implement Godim interface to set injection Key -// func (gcp *GCP) Key() string { -// return gcp.injectionKey -// } -// -// // Listener to messageType/messageVersion couple using messages.Message transport. -// func (listener *GCPListener) Listen( -// messageType, messageVersion string, initMessage func() Message, -// ) (chMessage chan Message) { -// chMessage = make(chan Message, listener.channelBufferSize) -// listener.listeners[listenerKey{ -// messageVersion: messageVersion, -// messageType: messageType, -// }] = listenerValue{ -// channel: chMessage, -// message: initMessage, -// } -// return chMessage -// } -// -// // Unmatched allow to receive all message unmatched in a Listener clause. -// func (listener *GCPListener) Unmatched() (chUnmatched chan *Unmatch) { -// if listener.chUnMatched == nil { -// listener.chUnMatched = make(chan *Unmatch, listener.channelBufferSize) -// } -// return listener.chUnMatched -// } -// -// // Error listen to errors -// func (listener *GCPListener) Error() (chError chan error) { -// if listener.chError == nil { -// listener.chError = make(chan error) -// } -// return listener.chError -// } -// -// // Start listening to GCP subscription -// func (listener *GCPListener) Start() { -// go func() { -// err := listener.subscription.Receive(listener.ctx, func(ctx context.Context, m *googlePubSub.Message) { -// attr := m.Attributes -// version, okVersion := attr["version"] -// msgType, okType := attr["type"] -// -// // Try to match message to declared transport -// if okVersion && okType { -// listenInfo, ok := listener.listeners[listenerKey{ -// messageVersion: version, -// messageType: msgType, -// }] -// -// if ok { -// msg := listenInfo.message() -// if err := msg.FromPubsubMessage(attr, m.Data, m.Ack, m.Nack); err == nil { -// listenInfo.channel <- msg -// return -// } -// } -// } -// -// if listener.chUnMatched != nil { -// listener.chUnMatched <- &Unmatch{ -// ID: m.ID, -// Raw: m.Data, -// Attributes: attr, -// Ack: m.Ack, -// Nack: m.Nack, -// } -// } else { -// m.Ack() // acknowledge unmatched messages by default if they are not listened -// } -// }) -// if err != nil && !errors.Is(err, context.Canceled) { -// if listener.chError != nil { -// listener.chError <- err -// } -// } -// }() -// } -// -// // Stop listening to GCP subscription -// func (listener *GCPListener) Stop() { -// if listener.chError != nil { -// close(listener.chError) -// } -// if listener.chUnMatched != nil { -// close(listener.chUnMatched) -// } -// for _, val := range listener.listeners { -// close(val.channel) -// } -// listener.ctxCancel() -// } -// -// func (gcp *GCP) ctx(ctx context.Context) (context.Context, context.CancelFunc) { -// return context.WithTimeout(ctx, gcp.Config.Timeout) -// } diff --git a/google/google.go b/google/google.go new file mode 100644 index 0000000..617e957 --- /dev/null +++ b/google/google.go @@ -0,0 +1,21 @@ +// Package google implement elMagician pubsub interfaces for GCP Pubsub provider. +package google + +import ( + "time" +) + +// Config for pubsub instance +type Config struct { + // ProjectID in GCP + ProjectID string `yaml:"projectId" json:"projectId"` + + // CredentialsPath to your JSON credential provided by GCP. + CredentialsPath string `yaml:"credentialsPath" json:"credentialsPath"` + + // Concurrency is the default concurrency for listening process. + Concurrency int `yaml:"concurrency" json:"concurrency"` + + // Timeout is the default timeout for GCP calls + Timeout time.Duration `yaml:"timeout" json:"timeout"` +} diff --git a/internal/types.go b/google/internal/types.go similarity index 100% rename from internal/types.go rename to google/internal/types.go diff --git a/google/listener.go b/google/listener.go index 71664db..b96c19b 100644 --- a/google/listener.go +++ b/google/listener.go @@ -1 +1,31 @@ package google + +import ( + "context" + + "github.com/elmagician/pubsub" +) + +var _ pubsub.Listener = (*Listener)(nil) + +type Listener struct{} + +func (l Listener) OnMessage(envelop pubsub.Envelop, newMessage func() pubsub.Message) chan pubsub.Message { + panic("implement me") +} + +func (l Listener) OnUnmatched() chan pubsub.Message { + panic("implement me") +} + +func (l Listener) OnError() chan error { + panic("implement me") +} + +func (l Listener) Listen(ctx context.Context) { + panic("implement me") +} + +func (l Listener) Stop() { + panic("implement me") +} diff --git a/google/message.go b/google/message.go new file mode 100644 index 0000000..6fcfaed --- /dev/null +++ b/google/message.go @@ -0,0 +1,34 @@ +package google + +import ( + googlePubSub "cloud.google.com/go/pubsub" + + "github.com/elmagician/pubsub" +) + +var _ pubsub.Message = &message{} + +// message provide a private structure to match pubsub.Message (elMagician) interface from pubsub.Message (Google) structure +type message struct { + Message *googlePubSub.Message +} + +func (m *message) ID() interface{} { + return m.Message.ID +} + +func (m *message) Ack() { + m.Message.Ack() +} + +func (m *message) Nack() { + m.Message.Nack() +} + +func (m *message) Metadata() map[string]string { + return m.Message.Attributes +} + +func (m *message) Data() []byte { + return m.Message.Data +} diff --git a/google/publisher.go b/google/publisher.go new file mode 100644 index 0000000..b93f034 --- /dev/null +++ b/google/publisher.go @@ -0,0 +1,112 @@ +package google + +import ( + "context" + "errors" + + googlePubSub "cloud.google.com/go/pubsub" + + "github.com/elmagician/pubsub" + "github.com/elmagician/pubsub/google/internal" +) + +var ErrPublisherDestroyed = errors.New("publisher instance was destroyed") + +// Publisher implements pubsub.Publisher interface for GCP. +type Publisher struct { + client *googlePubSub.Client + config Config + registry *Registry + + // following value HAS TO BE reset after each call to Send + destroyed bool + topics map[string]*googlePubSub.Topic + newTopics map[string]*internal.Topic + nbTopics int +} + +// To adds topic to sent message to. +// +// If Destroy is called, unknown topic will not be saved in registry +// and https://pkg.go.dev/cloud.google.com/go/pubsub#Topic.Stop will be called. +// +// This method apply last registered send configuration for topic. If no configuration where registered, it use default +// configuration +func (p *Publisher) To(topics ...string) pubsub.Publisher { + defaultConfig := &googlePubSub.PublishSettings{Timeout: p.config.Timeout} + + // add topic to destination + for _, topicKey := range topics { + topic, ok := p.registry.topics[topicKey] + if !ok { + // Create a new topic with default send configuration if not registered + topic = &internal.Topic{ + Topic: p.client.Topic(topicKey), + PublishSettings: &googlePubSub.PublishSettings{Timeout: p.config.Timeout}, + } + + p.newTopics[topicKey] = topic + } + + p.topics[topicKey] = topic.Topic + + // Apply last registered configuration for topic + if topic.PublishSettings != nil { + topic.Topic.PublishSettings = *topic.PublishSettings + } else { + topic.Topic.PublishSettings = *defaultConfig + } + + p.nbTopics++ + } + return p +} + +// WithOption provide send settings to apply to call. +// +// It will be applied to all topics added to sent process before +// the WithOption call. +func (p *Publisher) WithOption(opt interface{}) pubsub.Publisher { + panic("implement me") +} + +// Send message to topics registered. Any new topic will be saved to the registry if Destroy was not called. +// If Destroy was called, all topics will have their connection stopped and known topics will be kept in registry. +// +// Send will return a single string ID if sending to a single topic, else a list of string. +func (p *Publisher) Send(ctx context.Context, msg pubsub.Envelop) (pubsub.SendResults, error) { + if p.destroyed { + return nil, ErrPublisherDestroyed + } + res := SendResults{results: make(map[string]*googlePubSub.PublishResult)} + + pubsubMessage, err := msg.ToPubsubMessage() // transform envelop to pubsub.Message + if err != nil { + return nil, nil + } + + // send message to all topics demanded + for topicKey, topic := range p.topics { + res.results[topicKey] = topic.Publish(ctx, &googlePubSub.Message{Data: pubsubMessage.Data(), Attributes: pubsubMessage.Metadata()}) + + // Add new topics to registry + if newTopic, ok := p.newTopics[topicKey]; ok { + p.registry.topics[topicKey] = newTopic + } + } + + p.reset() + return res, nil +} + +func (p *Publisher) Destroy() { + p.destroyed = true +} + +// reset restore default value for variables used for send process. +// It has to be called after each send process. +func (p *Publisher) reset() { + p.topics = make(map[string]*googlePubSub.Topic) + p.newTopics = make(map[string]*internal.Topic) + p.nbTopics = 0 +} diff --git a/google/publisher_test.go b/google/publisher_test.go new file mode 100644 index 0000000..91b2d95 --- /dev/null +++ b/google/publisher_test.go @@ -0,0 +1,131 @@ +package google_test + +import ( + "context" + "fmt" + "strings" + "time" + + googlePubSub "cloud.google.com/go/pubsub" + "google.golang.org/api/option" + "google.golang.org/grpc" + + "github.com/elmagician/pubsub/google" +) + +func ExamplePublisher_Send() { + var conn *grpc.ClientConn + var yourEnvelop *Envelop // we are using a mock implementation here + // This setup a test instance for Google Pubsub. + // You don't need it outside of unit testing. + { + srv, cli := initTestClient("aSuperCoolProject") + defer func() { + if err := srv.Close(); err != nil { + panic(err) + } + }() + + _, err := cli.CreateTopic(context.Background(), "mine") + if err != nil { + panic(err) + } + _, err = cli.CreateTopic(context.Background(), "tropical") + if err != nil { + panic(err) + } + _, err = cli.CreateTopic(context.Background(), "someTopic") + if err != nil { + panic(err) + } + + conn, err = grpc.Dial(srv.Addr, grpc.WithInsecure()) + if err != nil { + panic(err) + } + + yourEnvelop = &Envelop{} + yourEnvelop.message = &message{ + Message: &googlePubSub.Message{ + ID: "test", + Data: []byte("someData"), + Attributes: map[string]string{"version": "v1", "type": "test", "new": "false"}, + }, + } + } + + ctx := context.Background() + conf := google.Config{ + ProjectID: "aSuperCoolProject", + CredentialsPath: "path/to/credentials.json", + Timeout: 10 * time.Second, + Concurrency: 0, + } + + // Initialize pubsub instance. + ps, err := google.NewPubsub(ctx, conf, option.WithGRPCConn(conn)) + if err != nil { + // TODO manage error + panic(err) + return + } + + // Register topics that will be used from the instance. + ps.Registry(). + MustAddTopic("topicAnna", nil). // Will not fail if topic does not exists using MustAddTopic. Use AddTopic to check existence on add. + MustAddTopic("tropical", nil) + + if err := ps.Registry().AddTopic("mine", nil); err != nil { + // TODO manager error + panic(err) + return + } + + // Send message to registered topics + results, err := ps.Publish().To("mine").Send(ctx, yourEnvelop) + if err != nil { + // TODO manage error + fmt.Println(err.Error()) + return + } + + idsStr := "" + for topic, res := range results.Results(context.Background()) { + idsStr += topic + ": " + res.ID + } + + idsStr = strings.TrimSuffix(idsStr, ",") + fmt.Println("Msg:", idsStr) + + // Send message to registered topics + results, err = ps.Publish().To("tropical", "mine").Send(ctx, yourEnvelop) + if err != nil { + // TODO manage error + fmt.Println(err.Error()) + return + } + + idsStr = "" + for topic, res := range results.Results(context.Background()) { + if res.Error != nil { + fmt.Println("got unexpected error", res.Error) + } + if topic == "tropical" { + idsStr = topic + ", " + idsStr + } else { + idsStr += topic + } + } + + idsStr = strings.TrimSuffix(idsStr, ",") + fmt.Println(idsStr) + // Stop topics connection to server without discarding them.s + ps.Registry().StopTopics("mine", "someTopic") + + // Reset registry after stopping all topics. + ps.Registry().Clear() + + // Output: + // Msg: mine: m0 + // tropical, mine +} diff --git a/google/pubsub.go b/google/pubsub.go new file mode 100644 index 0000000..c526f07 --- /dev/null +++ b/google/pubsub.go @@ -0,0 +1,74 @@ +package google + +import ( + "context" + + googlePubSub "cloud.google.com/go/pubsub" + "google.golang.org/api/option" + + "github.com/elmagician/pubsub" + "github.com/elmagician/pubsub/google/internal" +) + +var _ pubsub.Pubsub = (*Pubsub)(nil) + +// Pubsub implements pubsub.Pubsub interface for GCP. +type Pubsub struct { + // Client is the gcp instance used to send requests. + // It is passed as a private parameter to all structures + // derived from Pubsub + Client *googlePubSub.Client + + // Config for running instance. + // It is passed as a private parameter to all structures + // derived from Pubsub + Config Config + + registry *Registry +} + +// NewPubsub initializes a GCP implementation for pubsub. +func NewPubsub(ctx context.Context, config Config, opts ...option.ClientOption) (pubsub.Pubsub, error) { + if config.CredentialsPath != "" { + opts = append(opts, option.WithCredentialsFile(config.CredentialsPath)) + } + + cli, err := googlePubSub.NewClient(ctx, config.ProjectID, opts...) + if err != nil { + return nil, err + } + + return &Pubsub{ + Client: cli, Config: config, + registry: &Registry{ + client: cli, + topics: make(map[string]*internal.Topic), + subscriptions: make(map[string]*internal.Subscription), + }, + }, + nil +} + +// Publisher set up an instance to send message to pubsub. +// It use client, config and registry from main Pubsub instance. +func (p Pubsub) Publish() pubsub.Publisher { + return &Publisher{ + client: p.Client, + config: p.Config, + registry: p.registry, + newTopics: make(map[string]*internal.Topic), + topics: make(map[string]*googlePubSub.Topic), + } +} + +func (p Pubsub) Registry() pubsub.Registry { + return p.registry +} + +func (p Pubsub) Listen(subscription string) pubsub.Listener { + panic("implement me") +} + +func (p Pubsub) Receive(subscription string) pubsub.Receiver { + panic("implement me") +} diff --git a/google/pubsub_test.go b/google/pubsub_test.go new file mode 100644 index 0000000..0db371d --- /dev/null +++ b/google/pubsub_test.go @@ -0,0 +1,81 @@ +package google_test + +import ( + "context" + "testing" + "time" + + . "github.com/smartystreets/goconvey/convey" + "google.golang.org/api/option" + + "github.com/elmagician/pubsub/google" +) + +func TestPubsub_Publisher(t *testing.T) { + Convey("When I wish to publish things to google Pubsub", t, func() { + expectedConfig := google.Config{ + ProjectID: "testProject", + CredentialsPath: "path/to/cred.yml", + Concurrency: 150, + Timeout: 666 * time.Second, + } + + src, cli := initTestClient("something") + defer src.Close() + + ps := google.Pubsub{ + Client: cli, + Config: expectedConfig, + } + + Convey("I should be able to initialize a Publish implementation", func() { + publisher := ps.Publish() + So(publisher, ShouldNotBeZeroValue) + }) + }) +} + +func TestPubsub_Listen(t *testing.T) { + +} + +func TestPubsub_Receive(t *testing.T) { + +} + +func TestPubsub_Registry(t *testing.T) { + +} + +func ExampleNewPubsub_withoutOption() { + conf := google.Config{ + ProjectID: "aSuperCoolProject", + CredentialsPath: "path/to/credentials.json", + Timeout: 10 * time.Second, + Concurrency: 10, + } + + _, err := google.NewPubsub(context.Background(), conf) + if err != nil { + panic(err) + } +} + +func ExampleNewPubsub_withOption() { + conf := google.Config{ + ProjectID: "aSuperCoolProject", + CredentialsPath: "path/to/credentials.json", + } + + _, err := google.NewPubsub( + context.Background(), conf, + option.WithCredentialsFile("some/other/credentials.json"), + option.WithEndpoint("dont.evil/know/where"), + ) + if err != nil { + panic(err) + } + + // in this example, client uses credentials path from Config. Passing an option will not override + // credentials values. +} diff --git a/google/receiver.go b/google/receiver.go index 71664db..d8c72f9 100644 --- a/google/receiver.go +++ b/google/receiver.go @@ -1 +1,35 @@ package google + +import ( + "context" + + "github.com/elmagician/pubsub" +) + +var _ pubsub.Receiver = (*Receiver)(nil) + +type Receiver struct{} + +func (r Receiver) OnMessage(envelop pubsub.Envelop, callback pubsub.MessageCallback) { + panic("implement me") +} + +func (r Receiver) OnUnmatched(callback pubsub.MessageCallback) { + panic("implement me") +} + +func (r Receiver) OnError(callback func(ctx context.Context)) { + panic("implement me") +} + +func (r Receiver) Start(ctx context.Context) { + panic("implement me") +} + +func (r Receiver) Receive(ctx context.Context) error { + panic("implement me") +} + +func (r Receiver) Stop() { + panic("implement me") +} diff --git a/google/registry.go b/google/registry.go index b30d3f9..e2e361c 100644 --- a/google/registry.go +++ b/google/registry.go @@ -7,21 +7,21 @@ import ( googlePubSub "cloud.google.com/go/pubsub" "github.com/elmagician/pubsub" - "github.com/elmagician/pubsub/internal" + "github.com/elmagician/pubsub/google/internal" ) const timeout = 10 * time.Second -var _ pubsub.Registry = (*LocalRegistry)(nil) +var _ pubsub.Registry = (*Registry)(nil) -type LocalRegistry struct { +type Registry struct { client *googlePubSub.Client topics map[string]*internal.Topic subscriptions map[string]*internal.Subscription } // nolint: dupl -func (l *LocalRegistry) AddTopic(key string, publishSettings *googlePubSub.PublishSettings) error { +func (l *Registry) AddTopic(key string, publishSettings *googlePubSub.PublishSettings) error { if _, ok := l.topics[key]; ok { return nil } @@ -45,7 +45,7 @@ func (l *LocalRegistry) AddTopic(key string, publishSettings *googlePubSub.Publi } // nolint: dupl -func (l *LocalRegistry) AddSubscription(key string, receiveSettings *googlePubSub.ReceiveSettings) error { +func (l *Registry) AddSubscription(key string, receiveSettings *googlePubSub.ReceiveSettings) error { if _, ok := l.subscriptions[key]; ok { return nil } @@ -68,13 +68,41 @@ func (l *LocalRegistry) AddSubscription(key string, receiveSettings *googlePubSu return nil } -func (l *LocalRegistry) StopTopics(topics ...string) { +// nolint: dupl +func (l *Registry) MustAddTopic(key string, publishSettings *googlePubSub.PublishSettings) pubsub.Registry { + if _, ok := l.topics[key]; ok { + return nil + } + + l.topics[key] = &internal.Topic{ + Topic: l.client.Topic(key), + PublishSettings: publishSettings, + } + + return l +} + +// nolint: dupl +func (l *Registry) MustAddSubscription(key string, receiveSettings *googlePubSub.ReceiveSettings) pubsub.Registry { + if _, ok := l.subscriptions[key]; ok { + return nil + } + + l.subscriptions[key] = &internal.Subscription{ + Subscription: l.client.Subscription(key), + ReceiveSettings: receiveSettings, + } + + return l +} + +func (l *Registry) StopTopics(topics ...string) { for _, key := range topics { l.StopTopic(key) } } -func (l *LocalRegistry) Clear() { +func (l *Registry) Clear() { for key := range l.topics { l.StopTopic(key) } @@ -82,7 +110,7 @@ func (l *LocalRegistry) Clear() { l.subscriptions = make(map[string]*internal.Subscription) } -func (l *LocalRegistry) StopTopic(key string) { +func (l *Registry) StopTopic(key string) { if topic, ok := l.topics[key]; ok { topic.Stop() } diff --git a/google/send.go b/google/send.go deleted file mode 100644 index 71664db..0000000 --- a/google/send.go +++ /dev/null @@ -1 +0,0 @@ -package google diff --git a/google/send_result.go b/google/send_result.go new file mode 100644 index 0000000..923e82a --- /dev/null +++ b/google/send_result.go @@ -0,0 +1,39 @@ +package google + +import ( + "context" + + googlePubSub "cloud.google.com/go/pubsub" + + "github.com/elmagician/pubsub" +) + +var _ pubsub.SendResults = (*SendResults)(nil) + +type SendResults struct { + results map[string]*googlePubSub.PublishResult +} + +func (s SendResults) Results(ctx context.Context) pubsub.Results { + res := make(pubsub.Results) + + for topic, result := range s.results { + id, err := result.Get(ctx) + res[topic] = pubsub.Result{ID: id, Error: err} + } + + return res +} + +func (s SendResults) OnResults(ctx context.Context, callback func(topic string, result pubsub.Result)) { + for topic, result := range s.results { + // avoid loop variable overwrite in routing + result := result + topic := topic + + go func() { + id, err := result.Get(ctx) + callback(topic, pubsub.Result{ID: id, Error: err}) + }() + } +} diff --git a/google/utils_test.go b/google/utils_test.go new file mode 100644 index 0000000..6cef30f --- /dev/null +++ b/google/utils_test.go @@ -0,0 +1,84 @@ +package google_test + +import ( + "context" + + googlePubSub "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/pstest" + "google.golang.org/api/option" + "google.golang.org/grpc" + + "github.com/elmagician/pubsub" +) + +func initTestClient(project string, opts ...pstest.ServerReactorOption) (*pstest.Server, *googlePubSub.Client) { + psTest := pstest.NewServer(opts...) + + conn, err := grpc.Dial(psTest.Addr, grpc.WithInsecure()) + if err != nil { + panic(err) + } + + cli, err := googlePubSub.NewClient(context.Background(), project, option.WithGRPCConn(conn)) + if err != nil { + panic(err) + } + + return psTest, cli +} + +type Envelop struct { + err error + message pubsub.Message + filter pubsub.MessageFilter +} + +func (e *Envelop) ToPubsubMessage() (pubsub.Message, error) { + if e.err != nil { + return nil, e.err + } + return e.message, nil +} + +func (e *Envelop) FromPubsubMessage(msg pubsub.Message) error { + if e.err != nil { + e.message = msg + } + return e.err +} + +func (e *Envelop) Filter() pubsub.MessageFilter { + return e.filter +} + +func (e *Envelop) New() pubsub.Envelop { + return &Envelop{} +} + +type message struct { + Message *googlePubSub.Message +} + +func (m *message) ID() interface{} { + return m.Message.ID +} + +func (m *message) Ack() { + m.Message.Ack() +} + +func (m *message) Nack() { + m.Message.Nack() +} + +func (m *message) Metadata() map[string]string { + return m.Message.Attributes +} + +func (m *message) Data() []byte { + return m.Message.Data +} + +var _ pubsub.Envelop = &Envelop{} + +var _ pubsub.Message = &message{} diff --git a/messages.go b/messages.go index e9783d3..7cca6e1 100644 --- a/messages.go +++ b/messages.go @@ -1,5 +1,9 @@ package pubsub +import ( + "context" +) + type ( // Message is an interface to manage pubsub messages relevant data // It has to represent the message payload and can include relevant information @@ -7,16 +11,20 @@ type ( // allowing any struct to be converted to a pubsub message while masking // some pubsub logic. Message interface { - // ID of the message in the service provider + // ID of the message in the service provider. ID() interface{} - // Data payload + + // Data payload. Data() []byte + // Metadata are all the tags witch can identify the message and group it with others but are not - // relevant as information + // relevant as information. Metadata() map[string]string - // Ack acknowledge message + + // Ack acknowledges message. Ack() - // Nack refuse to acknowledge message + + // Nack refuses to acknowledge message. Nack() } @@ -24,23 +32,25 @@ type ( // relevant data to a Message interface. It act as a DTO and // use Filter method to get relevant filtering data. Envelop interface { - // ToPubsubMessage convert the envelop to JSON representative byte table - // Used before emitting message to queue + // ToPubsubMessage converts the envelop to JSON representative byte table. + // Used before emitting message to queue. ToPubsubMessage() (Message, error) - // FromPubsubMessage set envelop data from message json payload - FromPubsubMessage(msg Message, ack func(), nack func()) error - // Filter return a logical map to filter value. The key as to match the expected pubsub metadata key - // while the value represent the expected value to filter - Filter() map[string]string - } - // Unmatch represent messages not matching an expected couple of Version/Type or the Message interface - // It provides the id of the message and full data as well as methods to Acknowledge or not the message - Unmatch struct { - ID interface{} - Raw []byte - Attributes map[string]string - Ack func() `json:"-"` - Nack func() `json:"-"` + // FromPubsubMessage set envelop data from message json payload. + FromPubsubMessage(msg Message) error + + // Filter returns a logical map to filter value. The key as to match the expected pubsub metadata key + // while the value represent the expected value to filter. + Filter() MessageFilter + + // New generates a new empty envelop. + // Used form message reception. + New() Envelop } + + // MessageCallback enforces callback function type on reception. + MessageCallback func(ctx context.Context, msg Message) + + // MessageFilter represents filtering data for message reception. + MessageFilter map[string]string ) diff --git a/publisher.go b/publisher.go new file mode 100644 index 0000000..8a0136e --- /dev/null +++ b/publisher.go @@ -0,0 +1,47 @@ +package pubsub + +import ( + "context" +) + +type ( + // Publisher interface allow to build messages to be sent through pubsub instance. + Publisher interface { + // To indicates topic in witch we would like to send the message + // if topic is used for the first time, a connection will be created + // and kept alive regarding this topic. + // Call Clean method to clear all saved topic. + To(topics ...string) Publisher + + // WithOption allows to configure locally a send call. + WithOption(opt interface{}) Publisher + + // Send message to topics listed in Send instance. It will returns a SendResults interface + // witch you can safely discard if you don't need to check that your message + // was correctly sent. + Send(ctx context.Context, msg Envelop) (SendResults, error) + + // Destroy has to be called at the end of life of the publisher instance to ensure all messages are correctly + // sent. Destroy method will only return after ensuring messages were sent or errored then it will + // destroy connection to pubsub instance definitively. + // Publisher cannot be used any more after Destroy. + Destroy() + } + + // SendResults allows to manage publish results + SendResults interface { + // Results recovers send response and return the list of result corresponding Results structure. + // This is a locking process. Results will await server response before returning. + Results(ctx context.Context) Results + + // OnResults will apply callback function when server respond. + OnResults(ctx context.Context, allback func(topic string, result Result)) + } + + Result struct { + ID string + Error error + } + + Results map[string]Result +) diff --git a/pubsub.go b/pubsub.go index 3c985a5..2eb515f 100644 --- a/pubsub.go +++ b/pubsub.go @@ -1,108 +1,44 @@ package pubsub import ( - "context" - googlePubSub "cloud.google.com/go/pubsub" ) type ( - // Pubsub provide method to setup and use a pubsub client + // Pubsub provides method to setup and use a pubsub client. Pubsub interface { - // Publisher prepare pubsub to emit a message - Publisher(ctx context.Context, msg interface{}) Publisher + // Publish prepare pubsub to emit a message. + Publish() Publisher - // Registry allow to add Topic or Subscription to Pubsub instance + // Registry allow to add Topic or Subscription to Pubsub instance. Registry() Registry - // Listen initialize a lister instance for provided subscription - Listen() Listener - - // Receive initialize a receiver instance for provided subscription - Receive() Receiver + // Listen initialize a lister instance for provided subscription. + Listen(subscription string) Listener - // Clean all known topics and subscriptions - Clean() error + // Receive initialize a receiver instance for provided subscription. + Receive(subscription string) Receiver } - // Registry manage known topics and subscriptions + // Registry manages known topics and subscriptions. Registry interface { - // AddTopic register a new topic using provided publication settings - // publication settings comes from LINK TO GOOGLE - // It returns an error if topic does not exists or pubsub client call failed + // AddTopic registers a new topic using provided publication settings. AddTopic(key string, publishSettings *googlePubSub.PublishSettings) error - // AddSubscription register a subscription topic using provided receive settings - // receive option comes from LINK TO GOOGLE - // It returns an error if subscription does not exists or pubsub client call failed + // MustAddTopic registers a new topic using provided publication settings or panic. + MustAddTopic(key string, publishSettings *googlePubSub.PublishSettings) Registry + + // AddSubscription registers a subscription topic using provided receive settings. AddSubscription(key string, receiveSettings *googlePubSub.ReceiveSettings) error - // StopTopics has to be called to kill connection to topic instance + // MustAddSubscription registers a subscription topic using provided receive settings or panic. + MustAddSubscription(key string, receiveSettings *googlePubSub.ReceiveSettings) Registry + + // StopTopics has to be called to kill connection to topic instance. Passing no arguments + // will stop all known topics. StopTopics(topics ...string) - // Clear registry of all known Topics && Subscriptions. Clear will stop all topics removed + // Clear registry of all known Topics && Subscriptions. Clear will stop all topics removed. Clear() } - - // Publisher interface allow to build messages to be sent through pubsub instance - Publisher interface { - // To indicate topic in witch we would like to send the message - // if topic is used for the first time, a connection will be created - // and kept alive regarding this topic - // Call Clean method to clear all saved topic - To(topic string) Publisher - - // WithOption allow to configure locally a send call - WithOption(opt interface{}) Publisher - - // Go send message to topics listed in Send instance - Send() (id string, err error) - - // Destroy indicate to sender that the topics created has to be discarded after - // publication - Destroy() Publisher - } - - // Listener provide method to setup listening process on subscription. Listened messages will - // be transformed to a Message interface and sent through channel - Listener interface { - // Message initialize a channel to listen to messages of provided Type/Version couple. - // The provided channel uses the interface type messages.Message but you can - // safely match it to the provided message type as it is assured that the message emitted in the channel - // match the type/channel couple - // newMessage has to be a function witch returns a new Message object. It will be call upon - // receiving messages to ensure we are using different instances of the messages for each receive messages. - Message(messageType, messageVersion string, newMessage func() Message) chan Message - - // Unmatched provide a channel to retrieve all messages that could not be matched against provided types/versions. - Unmatched() chan *Unmatch - - // Error initialize a channel to manage errors - Error() chan error - - // Start listening - Start() - - // Stop listening - Stop() - } - - // Receiver provide method to setup reception process on subscription. Received messages will - // be transformed to a Message then process through provided processes. - Receiver interface { - // Action apply provided callback method to message matching expected Type && Version - Action(messageType, messageVersion string, callback func(ctx context.Context, newMessage func() Message)) - - // Unmatched apply provided callback to unexpected message Type or Version - Unmatched(callback func(ctx context.Context, unmatch Unmatch)) - - // Error apply a callback method to all received errors - Error(callback func(ctx context.Context)) - - // Start receiving - Start() - - // Stop receiving - Stop() - } ) diff --git a/receivers.go b/receivers.go new file mode 100644 index 0000000..05be57b --- /dev/null +++ b/receivers.go @@ -0,0 +1,56 @@ +package pubsub + +import ( + "context" +) + +type ( + // Listener provides method to setup listening process on subscription. Listened messages will + // be transformed to a Message interface and sent through channel. + Listener interface { + // OnMessage initializes a channel to listen to messages of provided Type/Version couple. + // The provided channel uses the interface type messages.Message but you can + // safely match it to the provided message type as it is assured that the message emitted in the channel + // match the type/channel couple + // newMessage has to be a function witch returns a new Message object. It will be call upon + // receiving messages to ensure we are using different instances of the messages for each receive messages. + OnMessage(envelop Envelop, newMessage func() Message) chan Message + + // OnUnmatched provides a channel to retrieve all messages that could not be matched against provided types/versions. + OnUnmatched() chan Message + + // OnError initializes a channel to manage errors. + OnError() chan error + + // Listen starts listening process in background. + Listen(ctx context.Context) + + // Stop listening. + Stop() + } + + // Receiver provides method to setup reception process on subscription. Received messages will + // be transformed to a Message then process through provided processes. + Receiver interface { + // OnMessage applies provided callback method to message matching expected Type && Version. + OnMessage(envelop Envelop, callback MessageCallback) + + // OnUnmatched applies provided callback to unexpected message Type or Version. + OnUnmatched(callback MessageCallback) + + // OnError applies a callback method to all received errors. + OnError(callback func(ctx context.Context)) + + // Start receiving as separated process. + // Errors are managed through Error callback. + Start(ctx context.Context) + + // Receive messages in current process. + // Process will stop at the first error received and return it. + // No errors are returned when Stop is used. + Receive(ctx context.Context) error + + // Stop receiving. + Stop() + } +)