Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Prevent excessive string parsing when publishing and receiving messages to improve performance #2317

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import com.google.pubsub.v1.SubscriptionName;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
Expand Down Expand Up @@ -105,6 +106,7 @@ class MessageDispatcher {
private final Distribution ackLatencyDistribution;

private final String subscriptionName;
private final SubscriptionName subscriptionNameObject;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this new object interact with the previous subscriptionName string? Is the latter still necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old String subscriptionName is actually no longer used, so I've removed it. subscriptionNameObject is created in the constructor using the string that is passed in to the builder, but that string is not needed after.

private final boolean enableOpenTelemetryTracing;
private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false);

Expand Down Expand Up @@ -226,6 +228,7 @@ private MessageDispatcher(Builder builder) {
sequentialExecutor = new SequentialExecutorService.AutoExecutor(builder.executor);

subscriptionName = builder.subscriptionName;
subscriptionNameObject = SubscriptionName.parse(builder.subscriptionName);
enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing;
if (builder.tracer != null) {
tracer = builder.tracer;
Expand Down Expand Up @@ -408,7 +411,7 @@ void processReceivedMessages(List<ReceivedMessage> messages) {
PubsubMessageWrapper messageWrapper =
PubsubMessageWrapper.newBuilder(
message.getMessage(),
subscriptionName,
subscriptionNameObject,
message.getAckId(),
message.getDeliveryAttempt())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,10 @@ void endPublishBatchingSpan(PubsubMessageWrapper message) {
* Creates, starts, and returns a publish RPC span for the given message batch. Bi-directional
* links with the publisher parent span are created for sampled messages in the batch.
*/
Span startPublishRpcSpan(String topic, List<PubsubMessageWrapper> messages) {
Span startPublishRpcSpan(TopicName topicName, List<PubsubMessageWrapper> messages) {
if (!enabled) {
return null;
}
TopicName topicName = TopicName.parse(topic);
Attributes attributes =
createCommonSpanAttributesBuilder(
topicName.getTopic(), topicName.getProject(), "publishCall", "publish")
Expand Down Expand Up @@ -359,7 +358,7 @@ void endSubscribeProcessSpan(PubsubMessageWrapper message, String action) {
* to parent subscribe span for sampled messages are added.
*/
Span startSubscribeRpcSpan(
String subscription,
SubscriptionName subscriptionName,
String rpcOperation,
List<PubsubMessageWrapper> messages,
int ackDeadline,
Expand All @@ -368,7 +367,6 @@ Span startSubscribeRpcSpan(
return null;
}
String codeFunction = rpcOperation == "ack" ? "sendAckOperations" : "sendModAckOperations";
SubscriptionName subscriptionName = SubscriptionName.parse(subscription);
AttributesBuilder attributesBuilder =
createCommonSpanAttributesBuilder(
subscriptionName.getSubscription(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public class Publisher implements PublisherInterface {

private final String topicName;
private final int topicNameSize;
private final TopicName topicNameObject;

private final BatchingSettings batchingSettings;
private final boolean enableMessageOrdering;
Expand Down Expand Up @@ -149,6 +150,7 @@ private Publisher(Builder builder) throws IOException {
topicName = builder.topicName;
topicNameSize =
CodedOutputStream.computeStringSize(PublishRequest.TOPIC_FIELD_NUMBER, this.topicName);
topicNameObject = TopicName.parse(this.topicName);

this.batchingSettings = builder.batchingSettings;
FlowControlSettings flowControl = this.batchingSettings.getFlowControlSettings();
Expand Down Expand Up @@ -282,7 +284,7 @@ public ApiFuture<String> publish(PubsubMessage message) {
+ "setEnableMessageOrdering(true) in the builder.");

PubsubMessageWrapper messageWrapper =
PubsubMessageWrapper.newBuilder(messageTransform.apply(message), topicName).build();
PubsubMessageWrapper.newBuilder(messageTransform.apply(message), topicNameObject).build();
tracer.startPublisherSpan(messageWrapper);

final OutstandingPublish outstandingPublish = new OutstandingPublish(messageWrapper);
Expand Down Expand Up @@ -490,7 +492,7 @@ private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch
pubsubMessagesList.add(messageWrapper.getPubsubMessage());
}

outstandingBatch.publishRpcSpan = tracer.startPublishRpcSpan(topicName, messageWrappers);
outstandingBatch.publishRpcSpan = tracer.startPublishRpcSpan(topicNameObject, messageWrappers);

return publisherStub
.publishCallable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ private PubsubMessageWrapper(Builder builder) {
this.deliveryAttempt = builder.deliveryAttempt;
}

static Builder newBuilder(PubsubMessage message, String topicName) {
static Builder newBuilder(PubsubMessage message, TopicName topicName) {
return new Builder(message, topicName);
}

static Builder newBuilder(
PubsubMessage message, String subscriptionName, String ackId, int deliveryAttempt) {
PubsubMessage message, SubscriptionName subscriptionName, String ackId, int deliveryAttempt) {
return new Builder(message, subscriptionName, ackId, deliveryAttempt);
}

Expand Down Expand Up @@ -395,21 +395,9 @@ static final class Builder {
private String ackId = null;
private int deliveryAttempt = 0;

public Builder(PubsubMessage message, String topicName) {
public Builder(PubsubMessage message, TopicName topicName) {
this.message = message;
if (topicName != null) {
this.topicName = TopicName.parse(topicName);
}
}

public Builder(
PubsubMessage message, String subscriptionName, String ackId, int deliveryAttempt) {
this.message = message;
if (subscriptionName != null) {
this.subscriptionName = SubscriptionName.parse(subscriptionName);
}
this.ackId = ackId;
this.deliveryAttempt = deliveryAttempt;
this.topicName = topicName;
}

public Builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.StreamingPullRequest;
import com.google.pubsub.v1.StreamingPullResponse;
import com.google.pubsub.v1.SubscriptionName;
import com.google.rpc.ErrorInfo;
import io.grpc.Status;
import io.grpc.protobuf.StatusProto;
Expand Down Expand Up @@ -94,6 +95,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
private final SubscriberStub subscriberStub;
private final int channelAffinity;
private final String subscription;
private final SubscriptionName subscriptionNameObject;
private final ScheduledExecutorService systemExecutor;
private final MessageDispatcher messageDispatcher;

Expand Down Expand Up @@ -124,6 +126,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements

private StreamingSubscriberConnection(Builder builder) {
subscription = builder.subscription;
subscriptionNameObject = SubscriptionName.parse(builder.subscription);
systemExecutor = builder.systemExecutor;

// We need to set the default stream ack deadline on the initial request, this will be
Expand Down Expand Up @@ -454,7 +457,8 @@ private void sendAckOperations(
}
}
// Creates an Ack span to be passed to the callback
Span rpcSpan = tracer.startSubscribeRpcSpan(subscription, "ack", messagesInRequest, 0, false);
Span rpcSpan =
tracer.startSubscribeRpcSpan(subscriptionNameObject, "ack", messagesInRequest, 0, false);
ApiFutureCallback<Empty> callback =
getCallback(ackRequestDataInRequestList, 0, false, currentBackoffMillis, rpcSpan);
ApiFuture<Empty> ackFuture =
Expand Down Expand Up @@ -493,7 +497,7 @@ private void sendModackOperations(
// Creates either a ModAck span or a Nack span depending on the given ack deadline
Span rpcSpan =
tracer.startSubscribeRpcSpan(
subscription,
subscriptionNameObject,
rpcOperation,
messagesInRequest,
deadlineExtensionSeconds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.mockito.stubbing.Answer;

public class MessageDispatcherTest {
private static final String MOCK_SUBSCRIPTION_NAME =
"projects/MOCK-PROJECT/subscriptions/MOCK-SUBSCRIPTION";
private static final ByteString MESSAGE_DATA = ByteString.copyFromUtf8("message-data");
private static final int DELIVERY_INFO_COUNT = 3;
private static final String ACK_ID = "ACK-ID";
Expand Down Expand Up @@ -462,6 +464,7 @@ public void testAckExtensionDefaultsExactlyOnceDeliveryDisabledThenEnabled() {
.setMinDurationPerAckExtensionDefaultUsed(true)
.setMaxDurationPerAckExtension(Subscriber.DEFAULT_MAX_ACK_DEADLINE_EXTENSION)
.setMaxDurationPerAckExtensionDefaultUsed(true)
.setSubscriptionName(MOCK_SUBSCRIPTION_NAME)
.build();

// ExactlyOnceDeliveryEnabled is turned off by default
Expand Down Expand Up @@ -494,6 +497,7 @@ public void testAckExtensionDefaultsExactlyOnceDeliveryEnabledThenDisabled() {
.setMinDurationPerAckExtensionDefaultUsed(true)
.setMaxDurationPerAckExtension(Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION)
.setMaxDurationPerAckExtensionDefaultUsed(true)
.setSubscriptionName(MOCK_SUBSCRIPTION_NAME)
.build();

// This would normally be set from the streaming pull response in the
Expand Down Expand Up @@ -605,6 +609,7 @@ public void testAckExtensionCustomMinExactlyOnceDeliveryDisabledThenEnabled() {
.setMinDurationPerAckExtensionDefaultUsed(false)
.setMaxDurationPerAckExtension(Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION)
.setMaxDurationPerAckExtensionDefaultUsed(true)
.setSubscriptionName(MOCK_SUBSCRIPTION_NAME)
.build();

// ExactlyOnceDeliveryEnabled is turned off by default
Expand Down Expand Up @@ -634,6 +639,7 @@ public void testAckExtensionCustomMaxExactlyOnceDeliveryDisabledThenEnabled() {
.setMinDurationPerAckExtensionDefaultUsed(true)
.setMaxDurationPerAckExtension(Duration.ofSeconds(customMaxSeconds))
.setMaxDurationPerAckExtensionDefaultUsed(false)
.setSubscriptionName(MOCK_SUBSCRIPTION_NAME)
.build();

// ExactlyOnceDeliveryEnabled is turned off by default
Expand Down Expand Up @@ -704,6 +710,7 @@ private MessageDispatcher getMessageDispatcherFromBuilder(
.setAckLatencyDistribution(mock(Distribution.class))
.setFlowController(mock(FlowController.class))
.setExecutor(executor)
.setSubscriptionName(MOCK_SUBSCRIPTION_NAME)
.setSystemExecutor(systemExecutor)
.setApiClock(clock)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void testPublishSpansSuccess() {
openTelemetryTesting.clearSpans();

PubsubMessageWrapper messageWrapper =
PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build();
PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME).build();
List<PubsubMessageWrapper> messageWrappers = Arrays.asList(messageWrapper);

long messageSize = messageWrapper.getPubsubMessage().getData().size();
Expand All @@ -117,7 +117,7 @@ public void testPublishSpansSuccess() {
tracer.endPublishFlowControlSpan(messageWrapper);
tracer.startPublishBatchingSpan(messageWrapper);
tracer.endPublishBatchingSpan(messageWrapper);
Span publishRpcSpan = tracer.startPublishRpcSpan(FULL_TOPIC_NAME.toString(), messageWrappers);
Span publishRpcSpan = tracer.startPublishRpcSpan(FULL_TOPIC_NAME, messageWrappers);
tracer.endPublishRpcSpan(publishRpcSpan);
tracer.setPublisherMessageIdSpanAttribute(messageWrapper, MESSAGE_ID);
tracer.endPublisherSpan(messageWrapper);
Expand Down Expand Up @@ -218,7 +218,7 @@ public void testPublishFlowControlSpanFailure() {
openTelemetryTesting.clearSpans();

PubsubMessageWrapper messageWrapper =
PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build();
PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME).build();

Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");
OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true);
Expand Down Expand Up @@ -258,14 +258,14 @@ public void testPublishRpcSpanFailure() {
openTelemetryTesting.clearSpans();

PubsubMessageWrapper messageWrapper =
PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build();
PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME).build();

List<PubsubMessageWrapper> messageWrappers = Arrays.asList(messageWrapper);
Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");
OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true);

tracer.startPublisherSpan(messageWrapper);
Span publishRpcSpan = tracer.startPublishRpcSpan(FULL_TOPIC_NAME.toString(), messageWrappers);
Span publishRpcSpan = tracer.startPublishRpcSpan(FULL_TOPIC_NAME, messageWrappers);

Exception e = new Exception("test-exception");
tracer.setPublishRpcSpanException(publishRpcSpan, e);
Expand Down Expand Up @@ -302,16 +302,15 @@ public void testSubscribeSpansSuccess() {
OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true);

PubsubMessageWrapper publishMessageWrapper =
PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build();
PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME).build();
// Initialize the Publisher span to inject the context in the message
tracer.startPublisherSpan(publishMessageWrapper);
tracer.endPublisherSpan(publishMessageWrapper);

PubsubMessage publishedMessage =
publishMessageWrapper.getPubsubMessage().toBuilder().setMessageId(MESSAGE_ID).build();
PubsubMessageWrapper subscribeMessageWrapper =
PubsubMessageWrapper.newBuilder(
publishedMessage, FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, 1)
PubsubMessageWrapper.newBuilder(publishedMessage, FULL_SUBSCRIPTION_NAME, ACK_ID, 1)
.build();
List<PubsubMessageWrapper> subscribeMessageWrappers = Arrays.asList(subscribeMessageWrapper);

Expand All @@ -327,21 +326,17 @@ public void testSubscribeSpansSuccess() {
tracer.endSubscribeProcessSpan(subscribeMessageWrapper, PROCESS_ACTION);
Span subscribeModackRpcSpan =
tracer.startSubscribeRpcSpan(
FULL_SUBSCRIPTION_NAME.toString(),
"modack",
subscribeMessageWrappers,
ACK_DEADLINE,
true);
FULL_SUBSCRIPTION_NAME, "modack", subscribeMessageWrappers, ACK_DEADLINE, true);
tracer.endSubscribeRpcSpan(subscribeModackRpcSpan);
tracer.addEndRpcEvent(subscribeMessageWrapper, true, true, ACK_DEADLINE);
Span subscribeAckRpcSpan =
tracer.startSubscribeRpcSpan(
FULL_SUBSCRIPTION_NAME.toString(), "ack", subscribeMessageWrappers, 0, false);
FULL_SUBSCRIPTION_NAME, "ack", subscribeMessageWrappers, 0, false);
tracer.endSubscribeRpcSpan(subscribeAckRpcSpan);
tracer.addEndRpcEvent(subscribeMessageWrapper, true, false, 0);
Span subscribeNackRpcSpan =
tracer.startSubscribeRpcSpan(
FULL_SUBSCRIPTION_NAME.toString(), "nack", subscribeMessageWrappers, 0, false);
FULL_SUBSCRIPTION_NAME, "nack", subscribeMessageWrappers, 0, false);
tracer.endSubscribeRpcSpan(subscribeNackRpcSpan);
tracer.addEndRpcEvent(subscribeMessageWrapper, true, true, 0);
tracer.endSubscriberSpan(subscribeMessageWrapper);
Expand Down Expand Up @@ -518,7 +513,7 @@ public void testSubscribeConcurrencyControlSpanFailure() {

PubsubMessageWrapper messageWrapper =
PubsubMessageWrapper.newBuilder(
getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT)
getPubsubMessage(), FULL_SUBSCRIPTION_NAME, ACK_ID, DELIVERY_ATTEMPT)
.build();

Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");
Expand Down Expand Up @@ -562,7 +557,7 @@ public void testSubscriberSpanFailure() {

PubsubMessageWrapper messageWrapper =
PubsubMessageWrapper.newBuilder(
getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT)
getPubsubMessage(), FULL_SUBSCRIPTION_NAME, ACK_ID, DELIVERY_ATTEMPT)
.build();

Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");
Expand Down Expand Up @@ -595,7 +590,7 @@ public void testSubscribeRpcSpanFailures() {

PubsubMessageWrapper messageWrapper =
PubsubMessageWrapper.newBuilder(
getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT)
getPubsubMessage(), FULL_SUBSCRIPTION_NAME, ACK_ID, DELIVERY_ATTEMPT)
.build();
List<PubsubMessageWrapper> messageWrappers = Arrays.asList(messageWrapper);

Expand All @@ -605,13 +600,11 @@ public void testSubscribeRpcSpanFailures() {
tracer.startSubscriberSpan(messageWrapper, EXACTLY_ONCE_ENABLED);
Span subscribeModackRpcSpan =
tracer.startSubscribeRpcSpan(
FULL_SUBSCRIPTION_NAME.toString(), "modack", messageWrappers, ACK_DEADLINE, true);
FULL_SUBSCRIPTION_NAME, "modack", messageWrappers, ACK_DEADLINE, true);
Span subscribeAckRpcSpan =
tracer.startSubscribeRpcSpan(
FULL_SUBSCRIPTION_NAME.toString(), "ack", messageWrappers, 0, false);
tracer.startSubscribeRpcSpan(FULL_SUBSCRIPTION_NAME, "ack", messageWrappers, 0, false);
Span subscribeNackRpcSpan =
tracer.startSubscribeRpcSpan(
FULL_SUBSCRIPTION_NAME.toString(), "nack", messageWrappers, 0, false);
tracer.startSubscribeRpcSpan(FULL_SUBSCRIPTION_NAME, "nack", messageWrappers, 0, false);

Exception e = new Exception("test-exception");
tracer.setSubscribeRpcSpanException(subscribeModackRpcSpan, true, ACK_DEADLINE, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public class StreamingSubscriberConnectionTest {
private FakeClock clock;
private SubscriberStub mockSubscriberStub;

private static final String MOCK_SUBSCRIPTION_NAME = "MOCK-SUBSCRIPTION";
private static final String MOCK_SUBSCRIPTION_NAME =
"projects/MOCK-PROJECT/subscriptions/MOCK-SUBSCRIPTION";
private static final String MOCK_ACK_ID_SUCCESS = "MOCK-ACK-ID-SUCCESS";
private static final String MOCK_ACK_ID_SUCCESS_2 = "MOCK-ACK-ID-SUCCESS-2";
private static final String MOCK_ACK_ID_NACK_SUCCESS = "MOCK-ACK-ID-NACK-SUCCESS";
Expand Down
Loading