diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 7112d4c02..34ef312f3 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -28,6 +28,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.ReceivedMessage; +import com.google.pubsub.v1.SubscriptionName; import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; @@ -104,7 +105,7 @@ class MessageDispatcher { // To keep track of number of seconds the receiver takes to process messages. private final Distribution ackLatencyDistribution; - private final String subscriptionName; + private final SubscriptionName subscriptionNameObject; private final boolean enableOpenTelemetryTracing; private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false); @@ -225,7 +226,7 @@ private MessageDispatcher(Builder builder) { messagesWaiter = new Waiter(); sequentialExecutor = new SequentialExecutorService.AutoExecutor(builder.executor); - subscriptionName = builder.subscriptionName; + subscriptionNameObject = SubscriptionName.parse(builder.subscriptionName); enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing; if (builder.tracer != null) { tracer = builder.tracer; @@ -408,7 +409,7 @@ void processReceivedMessages(List messages) { PubsubMessageWrapper messageWrapper = PubsubMessageWrapper.newBuilder( message.getMessage(), - subscriptionName, + subscriptionNameObject, message.getAckId(), message.getDeliveryAttempt()) .build(); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java index b946f44bf..9ee751135 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java @@ -172,11 +172,10 @@ void endPublishBatchingSpan(PubsubMessageWrapper message) { * Creates, starts, and returns a publish RPC span for the given message batch. Bi-directional * links with the publisher parent span are created for sampled messages in the batch. */ - Span startPublishRpcSpan(String topic, List messages) { + Span startPublishRpcSpan(TopicName topicName, List messages) { if (!enabled) { return null; } - TopicName topicName = TopicName.parse(topic); Attributes attributes = createCommonSpanAttributesBuilder( topicName.getTopic(), topicName.getProject(), "publishCall", "publish") @@ -359,7 +358,7 @@ void endSubscribeProcessSpan(PubsubMessageWrapper message, String action) { * to parent subscribe span for sampled messages are added. */ Span startSubscribeRpcSpan( - String subscription, + SubscriptionName subscriptionName, String rpcOperation, List messages, int ackDeadline, @@ -368,7 +367,6 @@ Span startSubscribeRpcSpan( return null; } String codeFunction = rpcOperation == "ack" ? "sendAckOperations" : "sendModAckOperations"; - SubscriptionName subscriptionName = SubscriptionName.parse(subscription); AttributesBuilder attributesBuilder = createCommonSpanAttributesBuilder( subscriptionName.getSubscription(), diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index af7a57471..113cbf932 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -101,6 +101,7 @@ public class Publisher implements PublisherInterface { private final String topicName; private final int topicNameSize; + private final TopicName topicNameObject; private final BatchingSettings batchingSettings; private final boolean enableMessageOrdering; @@ -149,6 +150,7 @@ private Publisher(Builder builder) throws IOException { topicName = builder.topicName; topicNameSize = CodedOutputStream.computeStringSize(PublishRequest.TOPIC_FIELD_NUMBER, this.topicName); + topicNameObject = TopicName.parse(this.topicName); this.batchingSettings = builder.batchingSettings; FlowControlSettings flowControl = this.batchingSettings.getFlowControlSettings(); @@ -282,7 +284,7 @@ public ApiFuture publish(PubsubMessage message) { + "setEnableMessageOrdering(true) in the builder."); PubsubMessageWrapper messageWrapper = - PubsubMessageWrapper.newBuilder(messageTransform.apply(message), topicName).build(); + PubsubMessageWrapper.newBuilder(messageTransform.apply(message), topicNameObject).build(); tracer.startPublisherSpan(messageWrapper); final OutstandingPublish outstandingPublish = new OutstandingPublish(messageWrapper); @@ -490,7 +492,7 @@ private ApiFuture publishCall(OutstandingBatch outstandingBatch pubsubMessagesList.add(messageWrapper.getPubsubMessage()); } - outstandingBatch.publishRpcSpan = tracer.startPublishRpcSpan(topicName, messageWrappers); + outstandingBatch.publishRpcSpan = tracer.startPublishRpcSpan(topicNameObject, messageWrappers); return publisherStub .publishCallable() diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java index 94fd13085..839eecf91 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java @@ -73,12 +73,12 @@ private PubsubMessageWrapper(Builder builder) { this.deliveryAttempt = builder.deliveryAttempt; } - static Builder newBuilder(PubsubMessage message, String topicName) { + static Builder newBuilder(PubsubMessage message, TopicName topicName) { return new Builder(message, topicName); } static Builder newBuilder( - PubsubMessage message, String subscriptionName, String ackId, int deliveryAttempt) { + PubsubMessage message, SubscriptionName subscriptionName, String ackId, int deliveryAttempt) { return new Builder(message, subscriptionName, ackId, deliveryAttempt); } @@ -395,21 +395,9 @@ static final class Builder { private String ackId = null; private int deliveryAttempt = 0; - public Builder(PubsubMessage message, String topicName) { + public Builder(PubsubMessage message, TopicName topicName) { this.message = message; - if (topicName != null) { - this.topicName = TopicName.parse(topicName); - } - } - - public Builder( - PubsubMessage message, String subscriptionName, String ackId, int deliveryAttempt) { - this.message = message; - if (subscriptionName != null) { - this.subscriptionName = SubscriptionName.parse(subscriptionName); - } - this.ackId = ackId; - this.deliveryAttempt = deliveryAttempt; + this.topicName = topicName; } public Builder( diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 3ad124f80..80ff2b4e1 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -47,6 +47,7 @@ import com.google.pubsub.v1.ModifyAckDeadlineRequest; import com.google.pubsub.v1.StreamingPullRequest; import com.google.pubsub.v1.StreamingPullResponse; +import com.google.pubsub.v1.SubscriptionName; import com.google.rpc.ErrorInfo; import io.grpc.Status; import io.grpc.protobuf.StatusProto; @@ -94,6 +95,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements private final SubscriberStub subscriberStub; private final int channelAffinity; private final String subscription; + private final SubscriptionName subscriptionNameObject; private final ScheduledExecutorService systemExecutor; private final MessageDispatcher messageDispatcher; @@ -124,6 +126,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements private StreamingSubscriberConnection(Builder builder) { subscription = builder.subscription; + subscriptionNameObject = SubscriptionName.parse(builder.subscription); systemExecutor = builder.systemExecutor; // We need to set the default stream ack deadline on the initial request, this will be @@ -454,7 +457,8 @@ private void sendAckOperations( } } // Creates an Ack span to be passed to the callback - Span rpcSpan = tracer.startSubscribeRpcSpan(subscription, "ack", messagesInRequest, 0, false); + Span rpcSpan = + tracer.startSubscribeRpcSpan(subscriptionNameObject, "ack", messagesInRequest, 0, false); ApiFutureCallback callback = getCallback(ackRequestDataInRequestList, 0, false, currentBackoffMillis, rpcSpan); ApiFuture ackFuture = @@ -493,7 +497,7 @@ private void sendModackOperations( // Creates either a ModAck span or a Nack span depending on the given ack deadline Span rpcSpan = tracer.startSubscribeRpcSpan( - subscription, + subscriptionNameObject, rpcOperation, messagesInRequest, deadlineExtensionSeconds, diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java index bd3dccccf..1de156939 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java @@ -35,6 +35,8 @@ import org.mockito.stubbing.Answer; public class MessageDispatcherTest { + private static final String MOCK_SUBSCRIPTION_NAME = + "projects/MOCK-PROJECT/subscriptions/MOCK-SUBSCRIPTION"; private static final ByteString MESSAGE_DATA = ByteString.copyFromUtf8("message-data"); private static final int DELIVERY_INFO_COUNT = 3; private static final String ACK_ID = "ACK-ID"; @@ -462,6 +464,7 @@ public void testAckExtensionDefaultsExactlyOnceDeliveryDisabledThenEnabled() { .setMinDurationPerAckExtensionDefaultUsed(true) .setMaxDurationPerAckExtension(Subscriber.DEFAULT_MAX_ACK_DEADLINE_EXTENSION) .setMaxDurationPerAckExtensionDefaultUsed(true) + .setSubscriptionName(MOCK_SUBSCRIPTION_NAME) .build(); // ExactlyOnceDeliveryEnabled is turned off by default @@ -494,6 +497,7 @@ public void testAckExtensionDefaultsExactlyOnceDeliveryEnabledThenDisabled() { .setMinDurationPerAckExtensionDefaultUsed(true) .setMaxDurationPerAckExtension(Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION) .setMaxDurationPerAckExtensionDefaultUsed(true) + .setSubscriptionName(MOCK_SUBSCRIPTION_NAME) .build(); // This would normally be set from the streaming pull response in the @@ -605,6 +609,7 @@ public void testAckExtensionCustomMinExactlyOnceDeliveryDisabledThenEnabled() { .setMinDurationPerAckExtensionDefaultUsed(false) .setMaxDurationPerAckExtension(Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION) .setMaxDurationPerAckExtensionDefaultUsed(true) + .setSubscriptionName(MOCK_SUBSCRIPTION_NAME) .build(); // ExactlyOnceDeliveryEnabled is turned off by default @@ -634,6 +639,7 @@ public void testAckExtensionCustomMaxExactlyOnceDeliveryDisabledThenEnabled() { .setMinDurationPerAckExtensionDefaultUsed(true) .setMaxDurationPerAckExtension(Duration.ofSeconds(customMaxSeconds)) .setMaxDurationPerAckExtensionDefaultUsed(false) + .setSubscriptionName(MOCK_SUBSCRIPTION_NAME) .build(); // ExactlyOnceDeliveryEnabled is turned off by default @@ -704,6 +710,7 @@ private MessageDispatcher getMessageDispatcherFromBuilder( .setAckLatencyDistribution(mock(Distribution.class)) .setFlowController(mock(FlowController.class)) .setExecutor(executor) + .setSubscriptionName(MOCK_SUBSCRIPTION_NAME) .setSystemExecutor(systemExecutor) .setApiClock(clock) .build(); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java index b4433f41e..2297f84bf 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java @@ -104,7 +104,7 @@ public void testPublishSpansSuccess() { openTelemetryTesting.clearSpans(); PubsubMessageWrapper messageWrapper = - PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build(); + PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME).build(); List messageWrappers = Arrays.asList(messageWrapper); long messageSize = messageWrapper.getPubsubMessage().getData().size(); @@ -117,7 +117,7 @@ public void testPublishSpansSuccess() { tracer.endPublishFlowControlSpan(messageWrapper); tracer.startPublishBatchingSpan(messageWrapper); tracer.endPublishBatchingSpan(messageWrapper); - Span publishRpcSpan = tracer.startPublishRpcSpan(FULL_TOPIC_NAME.toString(), messageWrappers); + Span publishRpcSpan = tracer.startPublishRpcSpan(FULL_TOPIC_NAME, messageWrappers); tracer.endPublishRpcSpan(publishRpcSpan); tracer.setPublisherMessageIdSpanAttribute(messageWrapper, MESSAGE_ID); tracer.endPublisherSpan(messageWrapper); @@ -218,7 +218,7 @@ public void testPublishFlowControlSpanFailure() { openTelemetryTesting.clearSpans(); PubsubMessageWrapper messageWrapper = - PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build(); + PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME).build(); Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); @@ -258,14 +258,14 @@ public void testPublishRpcSpanFailure() { openTelemetryTesting.clearSpans(); PubsubMessageWrapper messageWrapper = - PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build(); + PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME).build(); List messageWrappers = Arrays.asList(messageWrapper); Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); tracer.startPublisherSpan(messageWrapper); - Span publishRpcSpan = tracer.startPublishRpcSpan(FULL_TOPIC_NAME.toString(), messageWrappers); + Span publishRpcSpan = tracer.startPublishRpcSpan(FULL_TOPIC_NAME, messageWrappers); Exception e = new Exception("test-exception"); tracer.setPublishRpcSpanException(publishRpcSpan, e); @@ -302,7 +302,7 @@ public void testSubscribeSpansSuccess() { OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); PubsubMessageWrapper publishMessageWrapper = - PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build(); + PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME).build(); // Initialize the Publisher span to inject the context in the message tracer.startPublisherSpan(publishMessageWrapper); tracer.endPublisherSpan(publishMessageWrapper); @@ -310,8 +310,7 @@ public void testSubscribeSpansSuccess() { PubsubMessage publishedMessage = publishMessageWrapper.getPubsubMessage().toBuilder().setMessageId(MESSAGE_ID).build(); PubsubMessageWrapper subscribeMessageWrapper = - PubsubMessageWrapper.newBuilder( - publishedMessage, FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, 1) + PubsubMessageWrapper.newBuilder(publishedMessage, FULL_SUBSCRIPTION_NAME, ACK_ID, 1) .build(); List subscribeMessageWrappers = Arrays.asList(subscribeMessageWrapper); @@ -327,21 +326,17 @@ public void testSubscribeSpansSuccess() { tracer.endSubscribeProcessSpan(subscribeMessageWrapper, PROCESS_ACTION); Span subscribeModackRpcSpan = tracer.startSubscribeRpcSpan( - FULL_SUBSCRIPTION_NAME.toString(), - "modack", - subscribeMessageWrappers, - ACK_DEADLINE, - true); + FULL_SUBSCRIPTION_NAME, "modack", subscribeMessageWrappers, ACK_DEADLINE, true); tracer.endSubscribeRpcSpan(subscribeModackRpcSpan); tracer.addEndRpcEvent(subscribeMessageWrapper, true, true, ACK_DEADLINE); Span subscribeAckRpcSpan = tracer.startSubscribeRpcSpan( - FULL_SUBSCRIPTION_NAME.toString(), "ack", subscribeMessageWrappers, 0, false); + FULL_SUBSCRIPTION_NAME, "ack", subscribeMessageWrappers, 0, false); tracer.endSubscribeRpcSpan(subscribeAckRpcSpan); tracer.addEndRpcEvent(subscribeMessageWrapper, true, false, 0); Span subscribeNackRpcSpan = tracer.startSubscribeRpcSpan( - FULL_SUBSCRIPTION_NAME.toString(), "nack", subscribeMessageWrappers, 0, false); + FULL_SUBSCRIPTION_NAME, "nack", subscribeMessageWrappers, 0, false); tracer.endSubscribeRpcSpan(subscribeNackRpcSpan); tracer.addEndRpcEvent(subscribeMessageWrapper, true, true, 0); tracer.endSubscriberSpan(subscribeMessageWrapper); @@ -518,7 +513,7 @@ public void testSubscribeConcurrencyControlSpanFailure() { PubsubMessageWrapper messageWrapper = PubsubMessageWrapper.newBuilder( - getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT) + getPubsubMessage(), FULL_SUBSCRIPTION_NAME, ACK_ID, DELIVERY_ATTEMPT) .build(); Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); @@ -562,7 +557,7 @@ public void testSubscriberSpanFailure() { PubsubMessageWrapper messageWrapper = PubsubMessageWrapper.newBuilder( - getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT) + getPubsubMessage(), FULL_SUBSCRIPTION_NAME, ACK_ID, DELIVERY_ATTEMPT) .build(); Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); @@ -595,7 +590,7 @@ public void testSubscribeRpcSpanFailures() { PubsubMessageWrapper messageWrapper = PubsubMessageWrapper.newBuilder( - getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT) + getPubsubMessage(), FULL_SUBSCRIPTION_NAME, ACK_ID, DELIVERY_ATTEMPT) .build(); List messageWrappers = Arrays.asList(messageWrapper); @@ -605,13 +600,11 @@ public void testSubscribeRpcSpanFailures() { tracer.startSubscriberSpan(messageWrapper, EXACTLY_ONCE_ENABLED); Span subscribeModackRpcSpan = tracer.startSubscribeRpcSpan( - FULL_SUBSCRIPTION_NAME.toString(), "modack", messageWrappers, ACK_DEADLINE, true); + FULL_SUBSCRIPTION_NAME, "modack", messageWrappers, ACK_DEADLINE, true); Span subscribeAckRpcSpan = - tracer.startSubscribeRpcSpan( - FULL_SUBSCRIPTION_NAME.toString(), "ack", messageWrappers, 0, false); + tracer.startSubscribeRpcSpan(FULL_SUBSCRIPTION_NAME, "ack", messageWrappers, 0, false); Span subscribeNackRpcSpan = - tracer.startSubscribeRpcSpan( - FULL_SUBSCRIPTION_NAME.toString(), "nack", messageWrappers, 0, false); + tracer.startSubscribeRpcSpan(FULL_SUBSCRIPTION_NAME, "nack", messageWrappers, 0, false); Exception e = new Exception("test-exception"); tracer.setSubscribeRpcSpanException(subscribeModackRpcSpan, true, ACK_DEADLINE, e); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java index 412dd2ad8..8bf0113b5 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java @@ -52,7 +52,8 @@ public class StreamingSubscriberConnectionTest { private FakeClock clock; private SubscriberStub mockSubscriberStub; - private static final String MOCK_SUBSCRIPTION_NAME = "MOCK-SUBSCRIPTION"; + private static final String MOCK_SUBSCRIPTION_NAME = + "projects/MOCK-PROJECT/subscriptions/MOCK-SUBSCRIPTION"; private static final String MOCK_ACK_ID_SUCCESS = "MOCK-ACK-ID-SUCCESS"; private static final String MOCK_ACK_ID_SUCCESS_2 = "MOCK-ACK-ID-SUCCESS-2"; private static final String MOCK_ACK_ID_NACK_SUCCESS = "MOCK-ACK-ID-NACK-SUCCESS";