From 823003b38815c40598ad36cef5d196b7b6554bbe Mon Sep 17 00:00:00 2001 From: Iftekhar Ahmed Khan Date: Fri, 23 May 2025 20:24:32 +0530 Subject: [PATCH] feat: adds support for integration with pubsub emulator --- README.md | 43 ++++++++++--------- .../pubsub/kafka/common/ConnectorUtils.java | 2 + .../kafka/sink/CloudPubSubSinkConnector.java | 8 +++- .../kafka/sink/CloudPubSubSinkTask.java | 31 ++++++++++--- .../kafka/sink/CloudPubSubSinkTaskTest.java | 18 ++++++++ 5 files changed, 73 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index b86b9353..7861d9d1 100644 --- a/README.md +++ b/README.md @@ -191,27 +191,28 @@ configurations: #### Sink Connector -| Config | Value Range | Default | Description | -|----------------------------|-------------------------------|-----------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| cps.topic | String | REQUIRED (No default) | The Pub/Sub topic ID, e.g. "foo" for topic "/projects/bar/topics/foo". | -| cps.project | String | REQUIRED (No default) | The project containing the Pub/Sub topic, e.g. "bar" from above. | -| cps.endpoint | String | "pubsub.googleapis.com:443" | The [Pub/Sub endpoint](https://cloud.google.com/pubsub/docs/reference/service_apis_overview#service_endpoints) to use. | -| maxBufferSize | Integer | 100 | The maximum number of messages that can be received for the messages on a topic partition before publishing them to Pub/Sub. | -| maxBufferBytes | Long | 10,000,000 | The maximum number of bytes that can be received for the messages on a topic partition before publishing them to Pub/Sub. | -| maxOutstandingRequestBytes | Long | Long.MAX_VALUE | The maximum number of total bytes that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing. | -| maxOutstandingMessages | Long | Long.MAX_VALUE | The maximum number of messages that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing. | -| maxDelayThresholdMs | Integer | 100 | The maximum amount of time to wait to reach maxBufferSize or maxBufferBytes before publishing outstanding messages to Pub/Sub. | -| maxRequestTimeoutMs | Integer | 10,000 | The timeout for individual publish requests to Pub/Sub. | -| maxTotalTimeoutMs | Integer | 60,000 | The total timeout for a call to publish (including retries) to Pub/Sub. | -| maxShutdownTimeoutMs | Integer | 60,000 | The maximum amount of time to wait for a publisher to shutdown when stopping task in Kafka Connect. | -| gcp.credentials.file.path | String | Optional | The filepath, which stores GCP credentials. If not defined, GOOGLE_APPLICATION_CREDENTIALS env is used. | -| gcp.credentials.json | String | Optional | GCP credentials JSON blob. If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value. | -| metadata.publish | Boolean | false | When true, include the Kafka topic, partition, offset, and timestamp as message attributes when a message is published to Pub/Sub. | -| headers.publish | Boolean | false | When true, include any headers as attributes when a message is published to Pub/Sub. | -| orderingKeySource | String (none, key, partition) | none | When set to "none", do not set the ordering key. When set to "key", uses a message's key as the ordering key. If set to "partition", converts the partition number to a String and uses that as the ordering key. Note that using "partition" should only be used for low-throughput topics or topics with thousands of partitions. | -| messageBodyName | String | "cps_message_body" | When using a struct or map value schema, this field or key name indicates that the corresponding value will go into the Pub/Sub message body. | -| enableCompression | Boolean | false | When true, enable [publish-side compression](https://cloud.google.com/pubsub/docs/publisher#compressing) in order to save on networking costs between Kafka Connect and Cloud Pub/Sub. | -| compressionBytesThreshold | Long | 240 | When enableCompression is true, the minimum size of publish request (in bytes) to compress. +| Config | Value Range | Default | Description | +|-----------------------------|-------------------------------|-----------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| cps.topic | String | REQUIRED (No default) | The Pub/Sub topic ID, e.g. "foo" for topic "/projects/bar/topics/foo". | +| cps.project | String | REQUIRED (No default) | The project containing the Pub/Sub topic, e.g. "bar" from above. | +| cps.endpoint | String | "pubsub.googleapis.com:443" | The [Pub/Sub endpoint](https://cloud.google.com/pubsub/docs/reference/service_apis_overview#service_endpoints) to use. | +| cps.host | String | "pubsub" | Set it to `emulator` to use Pub/Sub emulator. | +| maxBufferSize | Integer | 100 | The maximum number of messages that can be received for the messages on a topic partition before publishing them to Pub/Sub. | +| maxBufferBytes | Long | 10,000,000 | The maximum number of bytes that can be received for the messages on a topic partition before publishing them to Pub/Sub. | +| maxOutstandingRequestBytes | Long | Long.MAX_VALUE | The maximum number of total bytes that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing. | +| maxOutstandingMessages | Long | Long.MAX_VALUE | The maximum number of messages that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing. | +| maxDelayThresholdMs | Integer | 100 | The maximum amount of time to wait to reach maxBufferSize or maxBufferBytes before publishing outstanding messages to Pub/Sub. | +| maxRequestTimeoutMs | Integer | 10,000 | The timeout for individual publish requests to Pub/Sub. | +| maxTotalTimeoutMs | Integer | 60,000 | The total timeout for a call to publish (including retries) to Pub/Sub. | +| maxShutdownTimeoutMs | Integer | 60,000 | The maximum amount of time to wait for a publisher to shutdown when stopping task in Kafka Connect. | +| gcp.credentials.file.path | String | Optional | The filepath, which stores GCP credentials. If not defined, GOOGLE_APPLICATION_CREDENTIALS env is used. | +| gcp.credentials.json | String | Optional | GCP credentials JSON blob. If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value. | +| metadata.publish | Boolean | false | When true, include the Kafka topic, partition, offset, and timestamp as message attributes when a message is published to Pub/Sub. | +| headers.publish | Boolean | false | When true, include any headers as attributes when a message is published to Pub/Sub. | +| orderingKeySource | String (none, key, partition) | none | When set to "none", do not set the ordering key. When set to "key", uses a message's key as the ordering key. If set to "partition", converts the partition number to a String and uses that as the ordering key. Note that using "partition" should only be used for low-throughput topics or topics with thousands of partitions. | +| messageBodyName | String | "cps_message_body" | When using a struct or map value schema, this field or key name indicates that the corresponding value will go into the Pub/Sub message body. | +| enableCompression | Boolean | false | When true, enable [publish-side compression](https://cloud.google.com/pubsub/docs/publisher#compressing) in order to save on networking costs between Kafka Connect and Cloud Pub/Sub. | +| compressionBytesThreshold | Long | 240 | When enableCompression is true, the minimum size of publish request (in bytes) to compress. ### Pub/Sub Lite connector configs diff --git a/src/main/java/com/google/pubsub/kafka/common/ConnectorUtils.java b/src/main/java/com/google/pubsub/kafka/common/ConnectorUtils.java index 597ff5ef..2b155ddb 100644 --- a/src/main/java/com/google/pubsub/kafka/common/ConnectorUtils.java +++ b/src/main/java/com/google/pubsub/kafka/common/ConnectorUtils.java @@ -29,6 +29,8 @@ public class ConnectorUtils { public static final String CPS_TOPIC_CONFIG = "cps.topic"; public static final String CPS_ENDPOINT = "cps.endpoint"; public static final String CPS_DEFAULT_ENDPOINT = "pubsub.googleapis.com:443"; + public static final String CPS_HOST = "cps.host"; + public static final String CPS_DEFAULT_HOST = "pubsub"; public static final String CPS_MESSAGE_KEY_ATTRIBUTE = "key"; public static final String CPS_ORDERING_KEY_ATTRIBUTE = "orderingKey"; public static final String GCP_CREDENTIALS_FILE_PATH_CONFIG = "gcp.credentials.file.path"; diff --git a/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java b/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java index d9736fe1..0d8f2631 100644 --- a/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java +++ b/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java @@ -273,7 +273,13 @@ public ConfigDef config() { Type.STRING, ConnectorUtils.CPS_DEFAULT_ENDPOINT, Importance.LOW, - "The Pub/Sub endpoint to use."); + "The Pub/Sub endpoint to use.") + .define( + ConnectorUtils.CPS_HOST, + Type.STRING, + ConnectorUtils.CPS_DEFAULT_HOST, + Importance.LOW, + "The Pub/Sub cps host to use."); } @Override diff --git a/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java b/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java index ce5fe152..29397bcb 100644 --- a/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java +++ b/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java @@ -15,15 +15,14 @@ */ package com.google.pubsub.kafka.sink; -import static com.google.pubsub.kafka.common.ConnectorUtils.getSystemExecutor; - import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.batching.FlowController; -import com.google.api.gax.core.FixedExecutorProvider; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.pubsub.v1.Publisher; import com.google.common.annotations.VisibleForTesting; @@ -69,6 +68,7 @@ public class CloudPubSubSinkTask extends SinkTask { private String cpsProject; private String cpsTopic; private String cpsEndpoint; + private String cpsHost; private String messageBodyName; private long maxBufferSize; private long maxBufferBytes; @@ -118,6 +118,7 @@ public void start(Map props) { cpsProject = validatedProps.get(ConnectorUtils.CPS_PROJECT_CONFIG).toString(); cpsTopic = validatedProps.get(ConnectorUtils.CPS_TOPIC_CONFIG).toString(); cpsEndpoint = validatedProps.get(ConnectorUtils.CPS_ENDPOINT).toString(); + cpsHost = validatedProps.get(ConnectorUtils.CPS_HOST).toString(); maxBufferSize = (Integer) validatedProps.get(CloudPubSubSinkConnector.MAX_BUFFER_SIZE_CONFIG); maxBufferBytes = (Long) validatedProps.get(CloudPubSubSinkConnector.MAX_BUFFER_BYTES_CONFIG); maxOutstandingRequestBytes = @@ -399,7 +400,6 @@ private void createPublisher() { com.google.cloud.pubsub.v1.Publisher.Builder builder = com.google.cloud.pubsub.v1.Publisher.newBuilder(fullTopic) - .setCredentialsProvider(gcpCredentialsProvider) .setBatchingSettings(batchingSettings.build()) .setRetrySettings( RetrySettings.newBuilder() @@ -412,9 +412,26 @@ private void createPublisher() { .setMaxRetryDelay(Duration.ofMillis(Long.MAX_VALUE)) .setInitialRpcTimeout(Duration.ofSeconds(10)) .setRpcTimeoutMultiplier(2) - .build()) - .setExecutorProvider(FixedExecutorProvider.create(getSystemExecutor())) - .setEndpoint(cpsEndpoint); + .build()); + + log.info("CPS host value: {}", cpsHost); + + if ("emulator".equals(cpsHost)) { + log.info("Using custom CPS host: {}", cpsHost); + builder.setCredentialsProvider(NoCredentialsProvider.create()); + builder.setChannelProvider( + InstantiatingGrpcChannelProvider.newBuilder() + .setEndpoint(cpsEndpoint) + .setChannelConfigurator(channel -> channel.usePlaintext()) + .build()); + } else { + builder.setCredentialsProvider(gcpCredentialsProvider); + builder.setChannelProvider( + InstantiatingGrpcChannelProvider.newBuilder().setEndpoint(cpsEndpoint).build()); + } + + log.info("Final CPS endpoint = {}", cpsEndpoint); + if (orderingKeySource != OrderingKeySource.NONE) { builder.setEnableMessageOrdering(true); } diff --git a/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java b/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java index d25918f3..806aacbd 100644 --- a/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java +++ b/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java @@ -754,6 +754,24 @@ record = assertEquals(expectedMessages, requestArgs); } + @Test + public void testCreatePublisherWithCustomHost() { + props.put(ConnectorUtils.CPS_HOST, "emulator"); + props.put(ConnectorUtils.CPS_ENDPOINT, "custom-endpoint:443"); + CloudPubSubSinkTask task = new CloudPubSubSinkTask(null); + task.start(props); + assertEquals(CloudPubSubSinkTask.class, task.getClass()); + } + + @Test + public void testCreatePublisherWithDefaultHost() { + props.remove(ConnectorUtils.CPS_HOST); + props.put(ConnectorUtils.CPS_ENDPOINT, "default-endpoint:443"); + CloudPubSubSinkTask task = new CloudPubSubSinkTask(null); + task.start(props); + assertEquals(CloudPubSubSinkTask.class, task.getClass()); + } + /** * Get some PubsubMessage's which correspond to the SinkRecord's created in {@link * #getSampleRecords()}.