Skip to content

Commit 8213ea5

Browse files
JamesBLewisduglin
authored andcommitted
exposed custom publish settings for pubsub client
Signed-off-by: James Lewis <[email protected]>
1 parent 2db66e0 commit 8213ea5

File tree

2 files changed

+62
-0
lines changed

2 files changed

+62
-0
lines changed

protocol/pubsub/v2/internal/connection.go

+10
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"time"
1515

1616
"cloud.google.com/go/pubsub"
17+
1718
pscontext "github.com/cloudevents/sdk-go/protocol/pubsub/v2/context"
1819
"github.com/cloudevents/sdk-go/v2/binding"
1920
)
@@ -62,6 +63,9 @@ type Connection struct {
6263
// ReceiveSettings is used to configure Pubsub pull subscription.
6364
ReceiveSettings *pubsub.ReceiveSettings
6465

66+
// PublishSettings is used to configure Publishing to a topic
67+
PublishSettings *pubsub.PublishSettings
68+
6569
// AckDeadline is Pub/Sub AckDeadline.
6670
// Default is 30 seconds.
6771
// This can only be set prior to first call of any function.
@@ -128,6 +132,12 @@ func (c *Connection) getOrCreateTopicInfo(ctx context.Context, getAlreadyOpenOnl
128132
}
129133
ti.wasCreated = true
130134
}
135+
136+
// if publishSettings have been provided use them otherwise pubsub will use default settings
137+
if c.PublishSettings != nil {
138+
topic.PublishSettings = *c.PublishSettings
139+
}
140+
131141
// Success.
132142
ti.topic = topic
133143

protocol/pubsub/v2/internal/connection_test.go

+52
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,58 @@ func TestPublishCreateTopic(t *testing.T) {
161161
verifyTopicDeleteWorks(t, client, psconn, topicID)
162162
}
163163

164+
// Test that publishing to a topic with non default publish settings
165+
func TestPublishWithCustomPublishSettings(t *testing.T) {
166+
t.Run("create topic and publish to it with custom settings", func(t *testing.T) {
167+
ctx := context.Background()
168+
pc := &testPubsubClient{}
169+
defer pc.Close()
170+
171+
projectID, topicID, subID := "test-project", "test-topic", "test-sub"
172+
173+
client, err := pc.New(ctx, projectID, nil)
174+
if err != nil {
175+
t.Fatalf("failed to create pubsub client: %v", err)
176+
}
177+
defer client.Close()
178+
179+
psconn := &Connection{
180+
AllowCreateSubscription: true,
181+
AllowCreateTopic: true,
182+
Client: client,
183+
ProjectID: projectID,
184+
TopicID: topicID,
185+
SubscriptionID: subID,
186+
PublishSettings: &pubsub.PublishSettings{
187+
DelayThreshold: 100 * time.Millisecond,
188+
CountThreshold: 00,
189+
ByteThreshold: 2e6,
190+
Timeout: 120 * time.Second,
191+
BufferedByteLimit: 20 * pubsub.MaxPublishRequestBytes,
192+
FlowControlSettings: pubsub.FlowControlSettings{
193+
MaxOutstandingMessages: 10,
194+
MaxOutstandingBytes: 0,
195+
LimitExceededBehavior: pubsub.FlowControlBlock,
196+
},
197+
},
198+
}
199+
200+
topic, err := client.CreateTopic(ctx, topicID)
201+
if err != nil {
202+
t.Fatalf("failed to pre-create topic: %v", err)
203+
}
204+
topic.Stop()
205+
206+
msg := &pubsub.Message{
207+
ID: "msg-id-1",
208+
Data: []byte("msg-data-1"),
209+
}
210+
if _, err := psconn.Publish(ctx, msg); err != nil {
211+
t.Errorf("failed to publish message: %v", err)
212+
}
213+
})
214+
}
215+
164216
// Test that publishing to an already created topic works and doesn't allow topic deletion
165217
func TestPublishExistingTopic(t *testing.T) {
166218
for _, allowCreate := range []bool{true, false} {

0 commit comments

Comments
 (0)