Skip to content

<Issue-290> | Supports integration with pubsub emulator #381

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 22 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,27 +191,28 @@ configurations:

#### Sink Connector

| Config | Value Range | Default | Description |
|----------------------------|-------------------------------|-----------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| cps.topic | String | REQUIRED (No default) | The Pub/Sub topic ID, e.g. "foo" for topic "/projects/bar/topics/foo". |
| cps.project | String | REQUIRED (No default) | The project containing the Pub/Sub topic, e.g. "bar" from above. |
| cps.endpoint | String | "pubsub.googleapis.com:443" | The [Pub/Sub endpoint](https://cloud.google.com/pubsub/docs/reference/service_apis_overview#service_endpoints) to use. |
| maxBufferSize | Integer | 100 | The maximum number of messages that can be received for the messages on a topic partition before publishing them to Pub/Sub. |
| maxBufferBytes | Long | 10,000,000 | The maximum number of bytes that can be received for the messages on a topic partition before publishing them to Pub/Sub. |
| maxOutstandingRequestBytes | Long | Long.MAX_VALUE | The maximum number of total bytes that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing. |
| maxOutstandingMessages | Long | Long.MAX_VALUE | The maximum number of messages that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing. |
| maxDelayThresholdMs | Integer | 100 | The maximum amount of time to wait to reach maxBufferSize or maxBufferBytes before publishing outstanding messages to Pub/Sub. |
| maxRequestTimeoutMs | Integer | 10,000 | The timeout for individual publish requests to Pub/Sub. |
| maxTotalTimeoutMs | Integer | 60,000 | The total timeout for a call to publish (including retries) to Pub/Sub. |
| maxShutdownTimeoutMs | Integer | 60,000 | The maximum amount of time to wait for a publisher to shutdown when stopping task in Kafka Connect. |
| gcp.credentials.file.path | String | Optional | The filepath, which stores GCP credentials. If not defined, GOOGLE_APPLICATION_CREDENTIALS env is used. |
| gcp.credentials.json | String | Optional | GCP credentials JSON blob. If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value. |
| metadata.publish | Boolean | false | When true, include the Kafka topic, partition, offset, and timestamp as message attributes when a message is published to Pub/Sub. |
| headers.publish | Boolean | false | When true, include any headers as attributes when a message is published to Pub/Sub. |
| orderingKeySource | String (none, key, partition) | none | When set to "none", do not set the ordering key. When set to "key", uses a message's key as the ordering key. If set to "partition", converts the partition number to a String and uses that as the ordering key. Note that using "partition" should only be used for low-throughput topics or topics with thousands of partitions. |
| messageBodyName | String | "cps_message_body" | When using a struct or map value schema, this field or key name indicates that the corresponding value will go into the Pub/Sub message body. |
| enableCompression | Boolean | false | When true, enable [publish-side compression](https://cloud.google.com/pubsub/docs/publisher#compressing) in order to save on networking costs between Kafka Connect and Cloud Pub/Sub. |
| compressionBytesThreshold | Long | 240 | When enableCompression is true, the minimum size of publish request (in bytes) to compress.
| Config | Value Range | Default | Description |
|-----------------------------|-------------------------------|-----------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| cps.topic | String | REQUIRED (No default) | The Pub/Sub topic ID, e.g. "foo" for topic "/projects/bar/topics/foo". |
| cps.project | String | REQUIRED (No default) | The project containing the Pub/Sub topic, e.g. "bar" from above. |
| cps.endpoint | String | "pubsub.googleapis.com:443" | The [Pub/Sub endpoint](https://cloud.google.com/pubsub/docs/reference/service_apis_overview#service_endpoints) to use. |
| cps.host | String | "pubsub" | Set it to `emulator` to use Pub/Sub emulator. |
| maxBufferSize | Integer | 100 | The maximum number of messages that can be received for the messages on a topic partition before publishing them to Pub/Sub. |
| maxBufferBytes | Long | 10,000,000 | The maximum number of bytes that can be received for the messages on a topic partition before publishing them to Pub/Sub. |
| maxOutstandingRequestBytes | Long | Long.MAX_VALUE | The maximum number of total bytes that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing. |
| maxOutstandingMessages | Long | Long.MAX_VALUE | The maximum number of messages that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing. |
| maxDelayThresholdMs | Integer | 100 | The maximum amount of time to wait to reach maxBufferSize or maxBufferBytes before publishing outstanding messages to Pub/Sub. |
| maxRequestTimeoutMs | Integer | 10,000 | The timeout for individual publish requests to Pub/Sub. |
| maxTotalTimeoutMs | Integer | 60,000 | The total timeout for a call to publish (including retries) to Pub/Sub. |
| maxShutdownTimeoutMs | Integer | 60,000 | The maximum amount of time to wait for a publisher to shutdown when stopping task in Kafka Connect. |
| gcp.credentials.file.path | String | Optional | The filepath, which stores GCP credentials. If not defined, GOOGLE_APPLICATION_CREDENTIALS env is used. |
| gcp.credentials.json | String | Optional | GCP credentials JSON blob. If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value. |
| metadata.publish | Boolean | false | When true, include the Kafka topic, partition, offset, and timestamp as message attributes when a message is published to Pub/Sub. |
| headers.publish | Boolean | false | When true, include any headers as attributes when a message is published to Pub/Sub. |
| orderingKeySource | String (none, key, partition) | none | When set to "none", do not set the ordering key. When set to "key", uses a message's key as the ordering key. If set to "partition", converts the partition number to a String and uses that as the ordering key. Note that using "partition" should only be used for low-throughput topics or topics with thousands of partitions. |
| messageBodyName | String | "cps_message_body" | When using a struct or map value schema, this field or key name indicates that the corresponding value will go into the Pub/Sub message body. |
| enableCompression | Boolean | false | When true, enable [publish-side compression](https://cloud.google.com/pubsub/docs/publisher#compressing) in order to save on networking costs between Kafka Connect and Cloud Pub/Sub. |
| compressionBytesThreshold | Long | 240 | When enableCompression is true, the minimum size of publish request (in bytes) to compress.

### Pub/Sub Lite connector configs

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class ConnectorUtils {
public static final String CPS_TOPIC_CONFIG = "cps.topic";
public static final String CPS_ENDPOINT = "cps.endpoint";
public static final String CPS_DEFAULT_ENDPOINT = "pubsub.googleapis.com:443";
public static final String CPS_HOST = "cps.host";
public static final String CPS_DEFAULT_HOST = "pubsub";
public static final String CPS_MESSAGE_KEY_ATTRIBUTE = "key";
public static final String CPS_ORDERING_KEY_ATTRIBUTE = "orderingKey";
public static final String GCP_CREDENTIALS_FILE_PATH_CONFIG = "gcp.credentials.file.path";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,13 @@ public ConfigDef config() {
Type.STRING,
ConnectorUtils.CPS_DEFAULT_ENDPOINT,
Importance.LOW,
"The Pub/Sub endpoint to use.");
"The Pub/Sub endpoint to use.")
.define(
ConnectorUtils.CPS_HOST,
Type.STRING,
ConnectorUtils.CPS_DEFAULT_HOST,
Importance.LOW,
"The Pub/Sub cps host to use.");
}

@Override
Expand Down
Loading
Loading