From 70898ae91ba01642e0b8a309e9c2463c161361d8 Mon Sep 17 00:00:00 2001 From: Mike Prieto Date: Thu, 23 Jan 2025 17:00:12 +0000 Subject: [PATCH 01/10] docs: Add samples and tests for ingestion from Kafka sources --- ...CreateTopicWithAwsMskIngestionExample.java | 75 +++++++++++++ ...picWithAzureEventHubsIngestionExample.java | 93 ++++++++++++++++ ...picWithConfluentCloudIngestionExample.java | 85 +++++++++++++++ .../src/test/java/pubsub/AdminIT.java | 103 +++++++++++++++++- 4 files changed, 354 insertions(+), 2 deletions(-) create mode 100644 samples/snippets/src/main/java/pubsub/CreateTopicWithAwsMskIngestionExample.java create mode 100644 samples/snippets/src/main/java/pubsub/CreateTopicWithAzureEventHubsIngestionExample.java create mode 100644 samples/snippets/src/main/java/pubsub/CreateTopicWithConfluentCloudIngestionExample.java diff --git a/samples/snippets/src/main/java/pubsub/CreateTopicWithAwsMskIngestionExample.java b/samples/snippets/src/main/java/pubsub/CreateTopicWithAwsMskIngestionExample.java new file mode 100644 index 000000000..e04b41574 --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/CreateTopicWithAwsMskIngestionExample.java @@ -0,0 +1,75 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +// [START pubsub_create_topic_with_aws_msk_ingestion] + +import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.cloud.pubsub.v1.TopicAdminSettings; +import com.google.pubsub.v1.IngestionDataSourceSettings; +import com.google.pubsub.v1.Topic; +import com.google.pubsub.v1.TopicName; +import java.io.IOException; + +public class CreateTopicWithAwsMskIngestionExample { + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "your-project-id"; + String topicId = "your-topic-id"; + // AWS MSK ingestion settings. + String clusterArn = "cluster-arn"; + String mskTopic = "msk-topic"; + String awsRoleArn = "aws-role-arn"; + String gcpServiceAccount = "gcp-service-account"; + + createTopicWithAwsMskIngestionExample( + projectId, topicId, clusterArn, mskTopic, awsRoleArn, gcpServiceAccount); + } + + public static void createTopicWithAwsMskIngestionExample( + String projectId, + String topicId, + String clusterArn, + String mskTopic, + String awsRoleArn, + String gcpServiceAccount) + throws IOException { + try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { + TopicName topicName = TopicName.of(projectId, topicId); + + IngestionDataSourceSettings.AwsMsk awsMsk = + IngestionDataSourceSettings.AwsMsk.newBuilder() + .setClusterArn(clusterArn) + .setTopic(mskTopic) + .setAwsRoleArn(awsRoleArn) + .setGcpServiceAccount(gcpServiceAccount) + .build(); + IngestionDataSourceSettings ingestionDataSourceSettings = + IngestionDataSourceSettings.newBuilder().setAwsMsk(awsMsk).build(); + + Topic topic = + topicAdminClient.createTopic( + Topic.newBuilder() + .setName(topicName.toString()) + .setIngestionDataSourceSettings(ingestionDataSourceSettings) + .build()); + + System.out.println("Created topic with AWS MSK ingestion settings: " + topic.getAllFields()); + } + } +} +// [END pubsub_create_topic_with_aws_msk_ingestion] diff --git a/samples/snippets/src/main/java/pubsub/CreateTopicWithAzureEventHubsIngestionExample.java b/samples/snippets/src/main/java/pubsub/CreateTopicWithAzureEventHubsIngestionExample.java new file mode 100644 index 000000000..b2db75d70 --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/CreateTopicWithAzureEventHubsIngestionExample.java @@ -0,0 +1,93 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +// [START pubsub_create_topic_with_azure_event_hubs_ingestion] + +import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.cloud.pubsub.v1.TopicAdminSettings; +import com.google.pubsub.v1.IngestionDataSourceSettings; +import com.google.pubsub.v1.Topic; +import com.google.pubsub.v1.TopicName; +import java.io.IOException; + +public class CreateTopicWithAzureEventHubsIngestionExample { + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "your-project-id"; + String topicId = "your-topic-id"; + // Azure Event Hubs ingestion settings. + String resourceGroup = "resource-group"; + String namespace = "namespace"; + String eventHub = "event-hub"; + String clientId = "client-id"; + String tenantId = "tenant-id"; + String subscriptionId = "subscription-id"; + String gcpServiceAccount = "gcp-service-account"; + + createTopicWithAzureEventHubsIngestionExample( + projectId, + topicId, + resourceGroup, + namespace, + eventHub, + clientId, + tenantId, + subscriptionId, + gcpServiceAccount); + } + + public static void createTopicWithAzureEventHubsIngestionExample( + String projectId, + String topicId, + String resourceGroup, + String namespace, + String eventHub, + String clientId, + String tenantId, + String subscriptionId, + String gcpServiceAccount) + throws IOException { + try (TopicAdminClient topicAdminClient = TopicAdminClient.create(TopicAdminSettings.newBuilder().setEndpoint("staging-pubsub.sandbox.googleapis.com:443").build())) { + TopicName topicName = TopicName.of(projectId, topicId); + + IngestionDataSourceSettings.AzureEventHubs azureEventHubs = + IngestionDataSourceSettings.AzureEventHubs.newBuilder() + .setResourceGroup(resourceGroup) + .setNamespace(namespace) + .setEventHub(eventHub) + .setClientId(clientId) + .setTenantId(tenantId) + .setSubscriptionId(subscriptionId) + .setGcpServiceAccount(gcpServiceAccount) + .build(); + IngestionDataSourceSettings ingestionDataSourceSettings = + IngestionDataSourceSettings.newBuilder().setAzureEventHubs(azureEventHubs).build(); + + Topic topic = + topicAdminClient.createTopic( + Topic.newBuilder() + .setName(topicName.toString()) + .setIngestionDataSourceSettings(ingestionDataSourceSettings) + .build()); + + System.out.println("Created topic with Azure Event Hubs ingestion settings: " + + topic.getAllFields()); + } + } +} +// [END pubsub_create_topic_with_azure_event_hubs_ingestion] diff --git a/samples/snippets/src/main/java/pubsub/CreateTopicWithConfluentCloudIngestionExample.java b/samples/snippets/src/main/java/pubsub/CreateTopicWithConfluentCloudIngestionExample.java new file mode 100644 index 000000000..87990b5c9 --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/CreateTopicWithConfluentCloudIngestionExample.java @@ -0,0 +1,85 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +// [START pubsub_create_topic_with_confluent_cloud_ingestion] + +import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.cloud.pubsub.v1.TopicAdminSettings; +import com.google.pubsub.v1.IngestionDataSourceSettings; +import com.google.pubsub.v1.Topic; +import com.google.pubsub.v1.TopicName; +import java.io.IOException; + +public class CreateTopicWithConfluentCloudIngestionExample { + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "your-project-id"; + String topicId = "your-topic-id"; + // Confluent Cloud ingestion settings. + String bootstrapServer = "bootstrap-server"; + String clusterId = "cluster-id"; + String confluentTopic = "confluent-topic"; + String identityPoolId = "identity-pool-id"; + String gcpServiceAccount = "gcp-service-account"; + + createTopicWithConfluentCloudIngestionExample( + projectId, + topicId, + bootstrapServer, + clusterId, + confluentTopic, + identityPoolId, + gcpServiceAccount); + } + + public static void createTopicWithConfluentCloudIngestionExample( + String projectId, + String topicId, + String bootstrapServer, + String clusterId, + String confluentTopic, + String identityPoolId, + String gcpServiceAccount) + throws IOException { + try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { + TopicName topicName = TopicName.of(projectId, topicId); + + IngestionDataSourceSettings.ConfluentCloud confluentCloud = + IngestionDataSourceSettings.ConfluentCloud.newBuilder() + .setBootstrapServer(bootstrapServer) + .setClusterId(clusterId) + .setTopic(confluentTopic) + .setIdentityPoolId(identityPoolId) + .setGcpServiceAccount(gcpServiceAccount) + .build(); + IngestionDataSourceSettings ingestionDataSourceSettings = + IngestionDataSourceSettings.newBuilder().setConfluentCloud(confluentCloud).build(); + + Topic topic = + topicAdminClient.createTopic( + Topic.newBuilder() + .setName(topicName.toString()) + .setIngestionDataSourceSettings(ingestionDataSourceSettings) + .build()); + + System.out.println("Created topic with Confluent Cloud ingestion settings: " + + topic.getAllFields()); + } + } +} +// [END pubsub_create_topic_with_confluent_cloud_ingestion] diff --git a/samples/snippets/src/test/java/pubsub/AdminIT.java b/samples/snippets/src/test/java/pubsub/AdminIT.java index e0c45e8e1..ecf2fe22a 100644 --- a/samples/snippets/src/test/java/pubsub/AdminIT.java +++ b/samples/snippets/src/test/java/pubsub/AdminIT.java @@ -55,6 +55,11 @@ public class AdminIT { private static final String kinesisIngestionTopicId = "kinesis-ingestion-topic-" + _suffix; private static final String cloudStorageIngestionTopicId = "cloud-storage-ingestion-topic-" + _suffix; + private static final String awsMskIngestionTopicId = "aws-msk-ingestion-topic-" + _suffix; + private static final String confluentCloudIngestionTopicId = + "confluent-cloud-ingestion-topic-" + _suffix; + private static final String azureEventHubsIngestionTopicId = + "azure-event-hubs-ingestion-topic-" + _suffix; private static final String pullSubscriptionId = "iam-pull-subscription-" + _suffix; private static final String pushSubscriptionId = "iam-push-subscription-" + _suffix; private static final String orderedSubscriptionId = "iam-ordered-subscription-" + _suffix; @@ -66,6 +71,9 @@ public class AdminIT { "java_samples_data_set" + _suffix.replace("-", "_"); private static final String bigquerySubscriptionId = "iam-bigquery-subscription-" + _suffix; private static final String bigqueryTableId = "java_samples_table_" + _suffix; + private static final String gcpServiceAccount = + "fake-service-account@fake-gcp-project.iam.gserviceaccount.com"; + // AWS Kinesis ingestion settings. private static final String streamArn = "arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name"; private static final String consumerArn = @@ -75,20 +83,41 @@ public class AdminIT { "arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name/" + "consumer/consumer-2:2222222222"; private static final String awsRoleArn = "arn:aws:iam::111111111111:role/fake-role-name"; - private static final String gcpServiceAccount = - "fake-service-account@fake-gcp-project.iam.gserviceaccount.com"; + // GCS ingestion settings. private static final String cloudStorageBucket = "pubsub-cloud-storage-bucket"; private static final String cloudStorageInputFormat = "text"; private static final String cloudStorageTextDelimiter = ","; private static final String cloudStorageMatchGlob = "**.txt"; private static final String cloudStorageMinimumObjectCreateTime = "1970-01-01T00:00:01Z"; private static final String cloudStorageMinimumObjectCreateTimeSeconds = "seconds: 1"; + // AWS MSK ingestion settings. + String clusterArn = + "arn:aws:kafka:us-east-1:111111111111:cluster/fake-cluster-name/11111111-1111-1"; + String mskTopic = "fake-msk-topic-name"; + // Confluent Cloud ingestion settings. + String bootstrapServer = "fake-bootstrap-server-id.us-south1.gcp.confluent.cloud:9092"; + String clusterId = "fake-cluster-id"; + String confluentTopic = "fake-confluent-topic-name"; + String identityPoolId = "fake-pool-id" + // Azure Event Hubs ingestion settings. + String resourceGroup = "fake-resource-group"; + String namespace = "fake-namespace"; + String eventHub = "fake-event-hub"; + String clientId = "11111111-1111-1111-1111-111111111111"; + String tenantId = "22222222-2222-2222-2222-222222222222"; + String subscriptionId = "33333333-3333-3333-3333-333333333333"; private static final TopicName topicName = TopicName.of(projectId, topicId); private static final TopicName kinesisIngestionTopicName = TopicName.of(projectId, kinesisIngestionTopicId); private static final TopicName cloudStorageIngestionTopicName = TopicName.of(projectId, cloudStorageIngestionTopicId); + private static final TopicName awsMskIngestionTopicName = + TopicName.of(projectId, awsMskIngestionTopicId); + private static final TopicName confluentCloudIngestionTopicName = + TopicName.of(projectId, confluentCloudIngestionTopicId); + private static final TopicName azureEventHubsIngestionTopicName = + TopicName.of(projectId, azureEventHubsIngestionTopicId); private static final SubscriptionName pullSubscriptionName = SubscriptionName.of(projectId, pullSubscriptionId); private static final SubscriptionName pushSubscriptionName = @@ -361,5 +390,75 @@ public void testAdmin() throws Exception { // Test delete Cloud Storage ingestion topic. DeleteTopicExample.deleteTopicExample(projectId, cloudStorageIngestionTopicId); assertThat(bout.toString()).contains("Deleted topic."); + + bout.reset(); + // Test create topic with AWS MSK ingestion settings. + CreateTopicWithAwsMskIngestionExample.createTopicWithAwsMskIngestionExample( + projectId, + awsMskIngestionTopicId, + clusterArn, + mskTopic, + awsRoleArn, + gcpServiceAccount); + assertThat(bout.toString()) + .contains("google.pubsub.v1.Topic.name=" + awsMskIngestionTopicName.toString()); + assertThat(bout.toString()).contains(clusterArn); + assertThat(bout.toString()).contains(mskTopic); + assertThat(bout.toString()).contains(awsRoleArn); + assertThat(bout.toString()).contains(gcpServiceAccount); + + bout.reset(); + // Test delete AWS MSK ingestion topic. + DeleteTopicExample.deleteTopicExample(projectId, awsMskIngestionTopicId); + assertThat(bout.toString()).contains("Deleted topic."); + + bout.reset(); + // Test create topic with Confluent Cloud ingestion settings. + CreateTopicWithConfluentCloudIngestionExample.createTopicWithConfluentCloudIngestionExample( + projectId, + confluentCloudIngestionTopicId, + bootstrapServer, + clusterId, + confluentTopic, + identityPoolId, + gcpServiceAccount); + assertThat(bout.toString()) + .contains("google.pubsub.v1.Topic.name=" + confluentCloudIngestionTopicName.toString()); + assertThat(bout.toString()).contains(bootstrapServer); + assertThat(bout.toString()).contains(clusterId); + assertThat(bout.toString()).contains(confluentTopic); + assertThat(bout.toString()).contains(identityPoolId); + assertThat(bout.toString()).contains(gcpServiceAccount); + + bout.reset(); + // Test delete Confluent Cloud ingestion topic. + DeleteTopicExample.deleteTopicExample(projectId, confluentCloudIngestionTopicId); + assertThat(bout.toString()).contains("Deleted topic."); + + bout.reset(); + // Test create topic with Azure Event Hubs ingestion settings. + CreateTopicWithAzureEventHubsIngestionExample.createTopicWithAzureEventHubsIngestionExample( + projectId, + azureEventHubsIngestionTopicId, + resourceGroup, + namespace, + eventHub, + clientId, + tenantId, + subscriptionId, + gcpServiceAccount); + assertThat(bout.toString()).contains("google.pubsub.v1.Topic.name=" + azureEventHubsIngestionTopicName.toString()); + assertThat(bout.toString()).contains(resourceGroup); + assertThat(bout.toString()).contains(namespace); + assertThat(bout.toString()).contains(eventHub); + assertThat(bout.toString()).contains(clientId); + assertThat(bout.toString()).contains(tenantId); + assertThat(bout.toString()).contains(subscriptionId); + assertThat(bout.toString()).contains(gcpServiceAccount); + + bout.reset(); + // Test delete Azure Event Hubs ingestion topic. + DeleteTopicExample.deleteTopicExample(projectId, azureEventHubsIngestionTopicId); + assertThat(bout.toString()).contains("Deleted topic."); } } From bae301627d403c95e0fa5264475ec04f78c9c485 Mon Sep 17 00:00:00 2001 From: Mike Prieto Date: Thu, 23 Jan 2025 17:09:22 +0000 Subject: [PATCH 02/10] docs: Styles fixes for samples/tests --- .../CreateTopicWithAzureEventHubsIngestionExample.java | 6 +++--- .../CreateTopicWithConfluentCloudIngestionExample.java | 4 ++-- samples/snippets/src/test/java/pubsub/AdminIT.java | 5 +++-- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/samples/snippets/src/main/java/pubsub/CreateTopicWithAzureEventHubsIngestionExample.java b/samples/snippets/src/main/java/pubsub/CreateTopicWithAzureEventHubsIngestionExample.java index b2db75d70..29210b792 100644 --- a/samples/snippets/src/main/java/pubsub/CreateTopicWithAzureEventHubsIngestionExample.java +++ b/samples/snippets/src/main/java/pubsub/CreateTopicWithAzureEventHubsIngestionExample.java @@ -62,7 +62,7 @@ public static void createTopicWithAzureEventHubsIngestionExample( String subscriptionId, String gcpServiceAccount) throws IOException { - try (TopicAdminClient topicAdminClient = TopicAdminClient.create(TopicAdminSettings.newBuilder().setEndpoint("staging-pubsub.sandbox.googleapis.com:443").build())) { + try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { TopicName topicName = TopicName.of(projectId, topicId); IngestionDataSourceSettings.AzureEventHubs azureEventHubs = @@ -85,8 +85,8 @@ public static void createTopicWithAzureEventHubsIngestionExample( .setIngestionDataSourceSettings(ingestionDataSourceSettings) .build()); - System.out.println("Created topic with Azure Event Hubs ingestion settings: " + - topic.getAllFields()); + System.out.println( + "Created topic with Azure Event Hubs ingestion settings: " + topic.getAllFields()); } } } diff --git a/samples/snippets/src/main/java/pubsub/CreateTopicWithConfluentCloudIngestionExample.java b/samples/snippets/src/main/java/pubsub/CreateTopicWithConfluentCloudIngestionExample.java index 87990b5c9..e222b7ba0 100644 --- a/samples/snippets/src/main/java/pubsub/CreateTopicWithConfluentCloudIngestionExample.java +++ b/samples/snippets/src/main/java/pubsub/CreateTopicWithConfluentCloudIngestionExample.java @@ -77,8 +77,8 @@ public static void createTopicWithConfluentCloudIngestionExample( .setIngestionDataSourceSettings(ingestionDataSourceSettings) .build()); - System.out.println("Created topic with Confluent Cloud ingestion settings: " + - topic.getAllFields()); + System.out.println( + "Created topic with Confluent Cloud ingestion settings: " + topic.getAllFields()); } } } diff --git a/samples/snippets/src/test/java/pubsub/AdminIT.java b/samples/snippets/src/test/java/pubsub/AdminIT.java index ecf2fe22a..23b24f917 100644 --- a/samples/snippets/src/test/java/pubsub/AdminIT.java +++ b/samples/snippets/src/test/java/pubsub/AdminIT.java @@ -98,7 +98,7 @@ public class AdminIT { String bootstrapServer = "fake-bootstrap-server-id.us-south1.gcp.confluent.cloud:9092"; String clusterId = "fake-cluster-id"; String confluentTopic = "fake-confluent-topic-name"; - String identityPoolId = "fake-pool-id" + String identityPoolId = "fake-pool-id"; // Azure Event Hubs ingestion settings. String resourceGroup = "fake-resource-group"; String namespace = "fake-namespace"; @@ -447,7 +447,8 @@ public void testAdmin() throws Exception { tenantId, subscriptionId, gcpServiceAccount); - assertThat(bout.toString()).contains("google.pubsub.v1.Topic.name=" + azureEventHubsIngestionTopicName.toString()); + assertThat(bout.toString()).contains( + "google.pubsub.v1.Topic.name=" + azureEventHubsIngestionTopicName.toString()); assertThat(bout.toString()).contains(resourceGroup); assertThat(bout.toString()).contains(namespace); assertThat(bout.toString()).contains(eventHub); From d9ffcd22dcf5dad6551b70ed7613a5586c28397a Mon Sep 17 00:00:00 2001 From: Mike Prieto Date: Fri, 24 Jan 2025 01:08:38 +0000 Subject: [PATCH 03/10] fix: Prevent excessive string parsing when publishing and receiving messages to improve performance --- .../cloud/pubsub/v1/MessageDispatcher.java | 5 ++++- .../com/google/cloud/pubsub/v1/Publisher.java | 4 +++- .../cloud/pubsub/v1/PubsubMessageWrapper.java | 20 ++++--------------- .../cloud/pubsub/v1/OpenTelemetryTest.java | 17 ++++++++-------- 4 files changed, 19 insertions(+), 27 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 7112d4c02..5b3dec090 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -28,6 +28,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.ReceivedMessage; +import com.google.pubsub.v1.SubscriptionName; import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; @@ -105,6 +106,7 @@ class MessageDispatcher { private final Distribution ackLatencyDistribution; private final String subscriptionName; + private final SubscriptionName subscriptionNameObject; private final boolean enableOpenTelemetryTracing; private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false); @@ -226,6 +228,7 @@ private MessageDispatcher(Builder builder) { sequentialExecutor = new SequentialExecutorService.AutoExecutor(builder.executor); subscriptionName = builder.subscriptionName; + subscriptionNameObject = SubscriptionName.parse(builder.subscriptionName); enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing; if (builder.tracer != null) { tracer = builder.tracer; @@ -408,7 +411,7 @@ void processReceivedMessages(List messages) { PubsubMessageWrapper messageWrapper = PubsubMessageWrapper.newBuilder( message.getMessage(), - subscriptionName, + subscriptionNameObject, message.getAckId(), message.getDeliveryAttempt()) .build(); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index af7a57471..3dba86f58 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -101,6 +101,7 @@ public class Publisher implements PublisherInterface { private final String topicName; private final int topicNameSize; + private final TopicName topicNameObject; private final BatchingSettings batchingSettings; private final boolean enableMessageOrdering; @@ -149,6 +150,7 @@ private Publisher(Builder builder) throws IOException { topicName = builder.topicName; topicNameSize = CodedOutputStream.computeStringSize(PublishRequest.TOPIC_FIELD_NUMBER, this.topicName); + topicNameObject = TopicName.parse(this.topicName); this.batchingSettings = builder.batchingSettings; FlowControlSettings flowControl = this.batchingSettings.getFlowControlSettings(); @@ -282,7 +284,7 @@ public ApiFuture publish(PubsubMessage message) { + "setEnableMessageOrdering(true) in the builder."); PubsubMessageWrapper messageWrapper = - PubsubMessageWrapper.newBuilder(messageTransform.apply(message), topicName).build(); + PubsubMessageWrapper.newBuilder(messageTransform.apply(message), topicNameObject).build(); tracer.startPublisherSpan(messageWrapper); final OutstandingPublish outstandingPublish = new OutstandingPublish(messageWrapper); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java index 94fd13085..839eecf91 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java @@ -73,12 +73,12 @@ private PubsubMessageWrapper(Builder builder) { this.deliveryAttempt = builder.deliveryAttempt; } - static Builder newBuilder(PubsubMessage message, String topicName) { + static Builder newBuilder(PubsubMessage message, TopicName topicName) { return new Builder(message, topicName); } static Builder newBuilder( - PubsubMessage message, String subscriptionName, String ackId, int deliveryAttempt) { + PubsubMessage message, SubscriptionName subscriptionName, String ackId, int deliveryAttempt) { return new Builder(message, subscriptionName, ackId, deliveryAttempt); } @@ -395,21 +395,9 @@ static final class Builder { private String ackId = null; private int deliveryAttempt = 0; - public Builder(PubsubMessage message, String topicName) { + public Builder(PubsubMessage message, TopicName topicName) { this.message = message; - if (topicName != null) { - this.topicName = TopicName.parse(topicName); - } - } - - public Builder( - PubsubMessage message, String subscriptionName, String ackId, int deliveryAttempt) { - this.message = message; - if (subscriptionName != null) { - this.subscriptionName = SubscriptionName.parse(subscriptionName); - } - this.ackId = ackId; - this.deliveryAttempt = deliveryAttempt; + this.topicName = topicName; } public Builder( diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java index b4433f41e..9c2ab18a9 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java @@ -104,7 +104,7 @@ public void testPublishSpansSuccess() { openTelemetryTesting.clearSpans(); PubsubMessageWrapper messageWrapper = - PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build(); + PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME).build(); List messageWrappers = Arrays.asList(messageWrapper); long messageSize = messageWrapper.getPubsubMessage().getData().size(); @@ -218,7 +218,7 @@ public void testPublishFlowControlSpanFailure() { openTelemetryTesting.clearSpans(); PubsubMessageWrapper messageWrapper = - PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build(); + PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME).build(); Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); @@ -258,7 +258,7 @@ public void testPublishRpcSpanFailure() { openTelemetryTesting.clearSpans(); PubsubMessageWrapper messageWrapper = - PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build(); + PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME).build(); List messageWrappers = Arrays.asList(messageWrapper); Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); @@ -302,7 +302,7 @@ public void testSubscribeSpansSuccess() { OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); PubsubMessageWrapper publishMessageWrapper = - PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build(); + PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME).build(); // Initialize the Publisher span to inject the context in the message tracer.startPublisherSpan(publishMessageWrapper); tracer.endPublisherSpan(publishMessageWrapper); @@ -310,8 +310,7 @@ public void testSubscribeSpansSuccess() { PubsubMessage publishedMessage = publishMessageWrapper.getPubsubMessage().toBuilder().setMessageId(MESSAGE_ID).build(); PubsubMessageWrapper subscribeMessageWrapper = - PubsubMessageWrapper.newBuilder( - publishedMessage, FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, 1) + PubsubMessageWrapper.newBuilder(publishedMessage, FULL_SUBSCRIPTION_NAME, ACK_ID, 1) .build(); List subscribeMessageWrappers = Arrays.asList(subscribeMessageWrapper); @@ -518,7 +517,7 @@ public void testSubscribeConcurrencyControlSpanFailure() { PubsubMessageWrapper messageWrapper = PubsubMessageWrapper.newBuilder( - getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT) + getPubsubMessage(), FULL_SUBSCRIPTION_NAME, ACK_ID, DELIVERY_ATTEMPT) .build(); Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); @@ -562,7 +561,7 @@ public void testSubscriberSpanFailure() { PubsubMessageWrapper messageWrapper = PubsubMessageWrapper.newBuilder( - getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT) + getPubsubMessage(), FULL_SUBSCRIPTION_NAME, ACK_ID, DELIVERY_ATTEMPT) .build(); Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); @@ -595,7 +594,7 @@ public void testSubscribeRpcSpanFailures() { PubsubMessageWrapper messageWrapper = PubsubMessageWrapper.newBuilder( - getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT) + getPubsubMessage(), FULL_SUBSCRIPTION_NAME, ACK_ID, DELIVERY_ATTEMPT) .build(); List messageWrappers = Arrays.asList(messageWrapper); From f316c0009e2ed9917a9019f04b47da1d7a1669cc Mon Sep 17 00:00:00 2001 From: Mike Prieto Date: Fri, 24 Jan 2025 01:22:02 +0000 Subject: [PATCH 04/10] test: Fix test to use proper subscription name --- .../cloud/pubsub/v1/StreamingSubscriberConnectionTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java index 412dd2ad8..57a65df53 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java @@ -52,7 +52,7 @@ public class StreamingSubscriberConnectionTest { private FakeClock clock; private SubscriberStub mockSubscriberStub; - private static final String MOCK_SUBSCRIPTION_NAME = "MOCK-SUBSCRIPTION"; + private static final String MOCK_SUBSCRIPTION_NAME = "projects/MOCK-PROJECT/subscriptions/MOCK-SUBSCRIPTION"; private static final String MOCK_ACK_ID_SUCCESS = "MOCK-ACK-ID-SUCCESS"; private static final String MOCK_ACK_ID_SUCCESS_2 = "MOCK-ACK-ID-SUCCESS-2"; private static final String MOCK_ACK_ID_NACK_SUCCESS = "MOCK-ACK-ID-NACK-SUCCESS"; From 2b744063ef02ea5b4379d4b2b4091249dc129ffd Mon Sep 17 00:00:00 2001 From: Mike Prieto Date: Fri, 24 Jan 2025 01:28:51 +0000 Subject: [PATCH 05/10] test: Fix MessageDispatcherTest --- .../java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java index bd3dccccf..d62142298 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java @@ -35,6 +35,7 @@ import org.mockito.stubbing.Answer; public class MessageDispatcherTest { + private static final String MOCK_SUBSCRIPTION_NAME = "projects/MOCK-PROJECT/subscriptions/MOCK-SUBSCRIPTION"; private static final ByteString MESSAGE_DATA = ByteString.copyFromUtf8("message-data"); private static final int DELIVERY_INFO_COUNT = 3; private static final String ACK_ID = "ACK-ID"; @@ -704,6 +705,7 @@ private MessageDispatcher getMessageDispatcherFromBuilder( .setAckLatencyDistribution(mock(Distribution.class)) .setFlowController(mock(FlowController.class)) .setExecutor(executor) + .setSubscriptionName(MOCK_SUBSCRIPTION_NAME) .setSystemExecutor(systemExecutor) .setApiClock(clock) .build(); From 58d4bca9ca0b2b415a38c0fb996b9bcbd75dbeff Mon Sep 17 00:00:00 2001 From: Mike Prieto Date: Fri, 24 Jan 2025 02:32:42 +0000 Subject: [PATCH 06/10] test: Add subscription name to all required builders in MessageDispatcherTest --- .../com/google/cloud/pubsub/v1/MessageDispatcherTest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java index d62142298..094a3630b 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java @@ -463,6 +463,7 @@ public void testAckExtensionDefaultsExactlyOnceDeliveryDisabledThenEnabled() { .setMinDurationPerAckExtensionDefaultUsed(true) .setMaxDurationPerAckExtension(Subscriber.DEFAULT_MAX_ACK_DEADLINE_EXTENSION) .setMaxDurationPerAckExtensionDefaultUsed(true) + .setSubscriptionName(MOCK_SUBSCRIPTION_NAME) .build(); // ExactlyOnceDeliveryEnabled is turned off by default @@ -495,6 +496,7 @@ public void testAckExtensionDefaultsExactlyOnceDeliveryEnabledThenDisabled() { .setMinDurationPerAckExtensionDefaultUsed(true) .setMaxDurationPerAckExtension(Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION) .setMaxDurationPerAckExtensionDefaultUsed(true) + .setSubscriptionName(MOCK_SUBSCRIPTION_NAME) .build(); // This would normally be set from the streaming pull response in the @@ -606,6 +608,7 @@ public void testAckExtensionCustomMinExactlyOnceDeliveryDisabledThenEnabled() { .setMinDurationPerAckExtensionDefaultUsed(false) .setMaxDurationPerAckExtension(Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION) .setMaxDurationPerAckExtensionDefaultUsed(true) + .setSubscriptionName(MOCK_SUBSCRIPTION_NAME) .build(); // ExactlyOnceDeliveryEnabled is turned off by default @@ -635,6 +638,7 @@ public void testAckExtensionCustomMaxExactlyOnceDeliveryDisabledThenEnabled() { .setMinDurationPerAckExtensionDefaultUsed(true) .setMaxDurationPerAckExtension(Duration.ofSeconds(customMaxSeconds)) .setMaxDurationPerAckExtensionDefaultUsed(false) + .setSubscriptionName(MOCK_SUBSCRIPTION_NAME) .build(); // ExactlyOnceDeliveryEnabled is turned off by default From 951b11c0f504f64e09f429447388d8deb8de0f8c Mon Sep 17 00:00:00 2001 From: Mike Prieto Date: Fri, 24 Jan 2025 03:28:06 +0000 Subject: [PATCH 07/10] fix: Fix formatting of test files --- .../java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java | 3 ++- .../cloud/pubsub/v1/StreamingSubscriberConnectionTest.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java index 094a3630b..1de156939 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java @@ -35,7 +35,8 @@ import org.mockito.stubbing.Answer; public class MessageDispatcherTest { - private static final String MOCK_SUBSCRIPTION_NAME = "projects/MOCK-PROJECT/subscriptions/MOCK-SUBSCRIPTION"; + private static final String MOCK_SUBSCRIPTION_NAME = + "projects/MOCK-PROJECT/subscriptions/MOCK-SUBSCRIPTION"; private static final ByteString MESSAGE_DATA = ByteString.copyFromUtf8("message-data"); private static final int DELIVERY_INFO_COUNT = 3; private static final String ACK_ID = "ACK-ID"; diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java index 57a65df53..8bf0113b5 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java @@ -52,7 +52,8 @@ public class StreamingSubscriberConnectionTest { private FakeClock clock; private SubscriberStub mockSubscriberStub; - private static final String MOCK_SUBSCRIPTION_NAME = "projects/MOCK-PROJECT/subscriptions/MOCK-SUBSCRIPTION"; + private static final String MOCK_SUBSCRIPTION_NAME = + "projects/MOCK-PROJECT/subscriptions/MOCK-SUBSCRIPTION"; private static final String MOCK_ACK_ID_SUCCESS = "MOCK-ACK-ID-SUCCESS"; private static final String MOCK_ACK_ID_SUCCESS_2 = "MOCK-ACK-ID-SUCCESS-2"; private static final String MOCK_ACK_ID_NACK_SUCCESS = "MOCK-ACK-ID-NACK-SUCCESS"; From e911ef8d8e1d01adc32654964bfddc6527e4e545 Mon Sep 17 00:00:00 2001 From: Mike Prieto Date: Fri, 24 Jan 2025 14:40:55 +0000 Subject: [PATCH 08/10] fix: Use TopicName instead of String for startPublishRpcSpan --- .../com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java | 3 +-- .../src/main/java/com/google/cloud/pubsub/v1/Publisher.java | 2 +- .../java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java | 4 ++-- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java index b946f44bf..7a9e54ca5 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java @@ -172,11 +172,10 @@ void endPublishBatchingSpan(PubsubMessageWrapper message) { * Creates, starts, and returns a publish RPC span for the given message batch. Bi-directional * links with the publisher parent span are created for sampled messages in the batch. */ - Span startPublishRpcSpan(String topic, List messages) { + Span startPublishRpcSpan(TopicName topicName, List messages) { if (!enabled) { return null; } - TopicName topicName = TopicName.parse(topic); Attributes attributes = createCommonSpanAttributesBuilder( topicName.getTopic(), topicName.getProject(), "publishCall", "publish") diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 3dba86f58..113cbf932 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -492,7 +492,7 @@ private ApiFuture publishCall(OutstandingBatch outstandingBatch pubsubMessagesList.add(messageWrapper.getPubsubMessage()); } - outstandingBatch.publishRpcSpan = tracer.startPublishRpcSpan(topicName, messageWrappers); + outstandingBatch.publishRpcSpan = tracer.startPublishRpcSpan(topicNameObject, messageWrappers); return publisherStub .publishCallable() diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java index 9c2ab18a9..1914cd966 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java @@ -117,7 +117,7 @@ public void testPublishSpansSuccess() { tracer.endPublishFlowControlSpan(messageWrapper); tracer.startPublishBatchingSpan(messageWrapper); tracer.endPublishBatchingSpan(messageWrapper); - Span publishRpcSpan = tracer.startPublishRpcSpan(FULL_TOPIC_NAME.toString(), messageWrappers); + Span publishRpcSpan = tracer.startPublishRpcSpan(FULL_TOPIC_NAME, messageWrappers); tracer.endPublishRpcSpan(publishRpcSpan); tracer.setPublisherMessageIdSpanAttribute(messageWrapper, MESSAGE_ID); tracer.endPublisherSpan(messageWrapper); @@ -265,7 +265,7 @@ public void testPublishRpcSpanFailure() { OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); tracer.startPublisherSpan(messageWrapper); - Span publishRpcSpan = tracer.startPublishRpcSpan(FULL_TOPIC_NAME.toString(), messageWrappers); + Span publishRpcSpan = tracer.startPublishRpcSpan(FULL_TOPIC_NAME, messageWrappers); Exception e = new Exception("test-exception"); tracer.setPublishRpcSpanException(publishRpcSpan, e); From 7696870fcd2a403bcff512dcf0dd49d05b8221e3 Mon Sep 17 00:00:00 2001 From: Mike Prieto Date: Fri, 24 Jan 2025 14:44:38 +0000 Subject: [PATCH 09/10] fix: Use SubscriptionName instead of String for startSubscriberRpcSpan --- .../pubsub/v1/OpenTelemetryPubsubTracer.java | 3 +-- .../v1/StreamingSubscriberConnection.java | 8 ++++++-- .../cloud/pubsub/v1/OpenTelemetryTest.java | 18 ++++++------------ 3 files changed, 13 insertions(+), 16 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java index 7a9e54ca5..9ee751135 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java @@ -358,7 +358,7 @@ void endSubscribeProcessSpan(PubsubMessageWrapper message, String action) { * to parent subscribe span for sampled messages are added. */ Span startSubscribeRpcSpan( - String subscription, + SubscriptionName subscriptionName, String rpcOperation, List messages, int ackDeadline, @@ -367,7 +367,6 @@ Span startSubscribeRpcSpan( return null; } String codeFunction = rpcOperation == "ack" ? "sendAckOperations" : "sendModAckOperations"; - SubscriptionName subscriptionName = SubscriptionName.parse(subscription); AttributesBuilder attributesBuilder = createCommonSpanAttributesBuilder( subscriptionName.getSubscription(), diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 3ad124f80..80ff2b4e1 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -47,6 +47,7 @@ import com.google.pubsub.v1.ModifyAckDeadlineRequest; import com.google.pubsub.v1.StreamingPullRequest; import com.google.pubsub.v1.StreamingPullResponse; +import com.google.pubsub.v1.SubscriptionName; import com.google.rpc.ErrorInfo; import io.grpc.Status; import io.grpc.protobuf.StatusProto; @@ -94,6 +95,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements private final SubscriberStub subscriberStub; private final int channelAffinity; private final String subscription; + private final SubscriptionName subscriptionNameObject; private final ScheduledExecutorService systemExecutor; private final MessageDispatcher messageDispatcher; @@ -124,6 +126,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements private StreamingSubscriberConnection(Builder builder) { subscription = builder.subscription; + subscriptionNameObject = SubscriptionName.parse(builder.subscription); systemExecutor = builder.systemExecutor; // We need to set the default stream ack deadline on the initial request, this will be @@ -454,7 +457,8 @@ private void sendAckOperations( } } // Creates an Ack span to be passed to the callback - Span rpcSpan = tracer.startSubscribeRpcSpan(subscription, "ack", messagesInRequest, 0, false); + Span rpcSpan = + tracer.startSubscribeRpcSpan(subscriptionNameObject, "ack", messagesInRequest, 0, false); ApiFutureCallback callback = getCallback(ackRequestDataInRequestList, 0, false, currentBackoffMillis, rpcSpan); ApiFuture ackFuture = @@ -493,7 +497,7 @@ private void sendModackOperations( // Creates either a ModAck span or a Nack span depending on the given ack deadline Span rpcSpan = tracer.startSubscribeRpcSpan( - subscription, + subscriptionNameObject, rpcOperation, messagesInRequest, deadlineExtensionSeconds, diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java index 1914cd966..2297f84bf 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java @@ -326,21 +326,17 @@ public void testSubscribeSpansSuccess() { tracer.endSubscribeProcessSpan(subscribeMessageWrapper, PROCESS_ACTION); Span subscribeModackRpcSpan = tracer.startSubscribeRpcSpan( - FULL_SUBSCRIPTION_NAME.toString(), - "modack", - subscribeMessageWrappers, - ACK_DEADLINE, - true); + FULL_SUBSCRIPTION_NAME, "modack", subscribeMessageWrappers, ACK_DEADLINE, true); tracer.endSubscribeRpcSpan(subscribeModackRpcSpan); tracer.addEndRpcEvent(subscribeMessageWrapper, true, true, ACK_DEADLINE); Span subscribeAckRpcSpan = tracer.startSubscribeRpcSpan( - FULL_SUBSCRIPTION_NAME.toString(), "ack", subscribeMessageWrappers, 0, false); + FULL_SUBSCRIPTION_NAME, "ack", subscribeMessageWrappers, 0, false); tracer.endSubscribeRpcSpan(subscribeAckRpcSpan); tracer.addEndRpcEvent(subscribeMessageWrapper, true, false, 0); Span subscribeNackRpcSpan = tracer.startSubscribeRpcSpan( - FULL_SUBSCRIPTION_NAME.toString(), "nack", subscribeMessageWrappers, 0, false); + FULL_SUBSCRIPTION_NAME, "nack", subscribeMessageWrappers, 0, false); tracer.endSubscribeRpcSpan(subscribeNackRpcSpan); tracer.addEndRpcEvent(subscribeMessageWrapper, true, true, 0); tracer.endSubscriberSpan(subscribeMessageWrapper); @@ -604,13 +600,11 @@ public void testSubscribeRpcSpanFailures() { tracer.startSubscriberSpan(messageWrapper, EXACTLY_ONCE_ENABLED); Span subscribeModackRpcSpan = tracer.startSubscribeRpcSpan( - FULL_SUBSCRIPTION_NAME.toString(), "modack", messageWrappers, ACK_DEADLINE, true); + FULL_SUBSCRIPTION_NAME, "modack", messageWrappers, ACK_DEADLINE, true); Span subscribeAckRpcSpan = - tracer.startSubscribeRpcSpan( - FULL_SUBSCRIPTION_NAME.toString(), "ack", messageWrappers, 0, false); + tracer.startSubscribeRpcSpan(FULL_SUBSCRIPTION_NAME, "ack", messageWrappers, 0, false); Span subscribeNackRpcSpan = - tracer.startSubscribeRpcSpan( - FULL_SUBSCRIPTION_NAME.toString(), "nack", messageWrappers, 0, false); + tracer.startSubscribeRpcSpan(FULL_SUBSCRIPTION_NAME, "nack", messageWrappers, 0, false); Exception e = new Exception("test-exception"); tracer.setSubscribeRpcSpanException(subscribeModackRpcSpan, true, ACK_DEADLINE, e); From 01bf37a042c31a7ede311324ffc2d253b9790160 Mon Sep 17 00:00:00 2001 From: Mike Prieto Date: Mon, 27 Jan 2025 16:20:15 +0000 Subject: [PATCH 10/10] fix: Removed unused subscriptionName string --- .../main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 5b3dec090..34ef312f3 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -105,7 +105,6 @@ class MessageDispatcher { // To keep track of number of seconds the receiver takes to process messages. private final Distribution ackLatencyDistribution; - private final String subscriptionName; private final SubscriptionName subscriptionNameObject; private final boolean enableOpenTelemetryTracing; private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false); @@ -227,7 +226,6 @@ private MessageDispatcher(Builder builder) { messagesWaiter = new Waiter(); sequentialExecutor = new SequentialExecutorService.AutoExecutor(builder.executor); - subscriptionName = builder.subscriptionName; subscriptionNameObject = SubscriptionName.parse(builder.subscriptionName); enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing; if (builder.tracer != null) {