diff --git a/protocol/pubsub/v2/internal/connection.go b/protocol/pubsub/v2/internal/connection.go index 673231dfa..91be5f0d4 100644 --- a/protocol/pubsub/v2/internal/connection.go +++ b/protocol/pubsub/v2/internal/connection.go @@ -14,6 +14,7 @@ import ( "time" "cloud.google.com/go/pubsub" + pscontext "github.com/cloudevents/sdk-go/protocol/pubsub/v2/context" "github.com/cloudevents/sdk-go/v2/binding" ) @@ -62,6 +63,9 @@ type Connection struct { // ReceiveSettings is used to configure Pubsub pull subscription. ReceiveSettings *pubsub.ReceiveSettings + // PublishSettings is used to configure Publishing to a topic + PublishSettings *pubsub.PublishSettings + // AckDeadline is Pub/Sub AckDeadline. // Default is 30 seconds. // This can only be set prior to first call of any function. @@ -128,6 +132,12 @@ func (c *Connection) getOrCreateTopicInfo(ctx context.Context, getAlreadyOpenOnl } ti.wasCreated = true } + + // if publishSettings have been provided use them otherwise pubsub will use default settings + if c.PublishSettings != nil { + topic.PublishSettings = *c.PublishSettings + } + // Success. ti.topic = topic diff --git a/protocol/pubsub/v2/internal/connection_test.go b/protocol/pubsub/v2/internal/connection_test.go index 88b2ad727..96d87dbab 100644 --- a/protocol/pubsub/v2/internal/connection_test.go +++ b/protocol/pubsub/v2/internal/connection_test.go @@ -161,6 +161,58 @@ func TestPublishCreateTopic(t *testing.T) { verifyTopicDeleteWorks(t, client, psconn, topicID) } +// Test that publishing to a topic with non default publish settings +func TestPublishWithCustomPublishSettings(t *testing.T) { + t.Run("create topic and publish to it with custom settings", func(t *testing.T) { + ctx := context.Background() + pc := &testPubsubClient{} + defer pc.Close() + + projectID, topicID, subID := "test-project", "test-topic", "test-sub" + + client, err := pc.New(ctx, projectID, nil) + if err != nil { + t.Fatalf("failed to create pubsub client: %v", err) + } + defer client.Close() + + psconn := &Connection{ + AllowCreateSubscription: true, + AllowCreateTopic: true, + Client: client, + ProjectID: projectID, + TopicID: topicID, + SubscriptionID: subID, + PublishSettings: &pubsub.PublishSettings{ + DelayThreshold: 100 * time.Millisecond, + CountThreshold: 00, + ByteThreshold: 2e6, + Timeout: 120 * time.Second, + BufferedByteLimit: 20 * pubsub.MaxPublishRequestBytes, + FlowControlSettings: pubsub.FlowControlSettings{ + MaxOutstandingMessages: 10, + MaxOutstandingBytes: 0, + LimitExceededBehavior: pubsub.FlowControlBlock, + }, + }, + } + + topic, err := client.CreateTopic(ctx, topicID) + if err != nil { + t.Fatalf("failed to pre-create topic: %v", err) + } + topic.Stop() + + msg := &pubsub.Message{ + ID: "msg-id-1", + Data: []byte("msg-data-1"), + } + if _, err := psconn.Publish(ctx, msg); err != nil { + t.Errorf("failed to publish message: %v", err) + } + }) +} + // Test that publishing to an already created topic works and doesn't allow topic deletion func TestPublishExistingTopic(t *testing.T) { for _, allowCreate := range []bool{true, false} {