This {product} workbook is a comprehensive guide that provides detailed examples and practices for managing the {product} platform using the {devops-api}s. It provides details on the most commonly used APIs for managing {product} and {pulsar-reg} instances. These details include required parameters and the expected output from the API. The workbook is designed to fill the gap between detailed API reference docs and HowTo guides. The result is to help customers in operating and managing {product} and provide guidance on how to use {devops-api}s to automate many common tasks.
The workbook covers a wide range of topics, including provisioning of resources, monitoring, and troubleshooting. It provides instructions for various operations, such as creating a new tenant, namespace, topics, geo-replication, and access tokens, to setting up monitoring and alerting, and troubleshooting common issues.
Overall, this {product} Workbook is a valuable resource for customers who want to leverage the benefits of {product} and manage their streaming environment effectively with {devops-api}s. By following the best practices and guidelines outlined in the workbook, customers can ensure that their streaming applications are secure, performant, and reliable.
-
An active {product-short} account with access to {product}
-
Credentials and values required to form HTTP requests
Due to their frequency in {product} API calls, you might find it helpful to set the following environment variables:
export PULSAR_TOKEN="PULSAR_TOKEN"
export WEB_SERVICE_URL="PULSAR_WEB_SERVICE_URL"
export NAMESPACE="STREAMING_NAMESPACE_NAME"
export TENANT="STREAMING_TENANT_NAME"
export TOPIC="STREAMING_TOPIC_NAME"
export NUM_OF_PARTITIONS="PARTITIONS"
export SUBSCRIPTION="SUBSCRIPTION_NAME"
export INSTANCE="TENANT_INSTANCE_NAME"
export SOURCE="SOURCE_CONNECTOR_NAME"
export SINK="SINK_NAME"
export FUNCTION="FUNCTION_DISPLAY_NAME"
export TOKENID="PULSAR_TOKEN_UUID"
export ASTRA_TOKEN="ASTRA_DB_APPLICATION_TOKEN"
The examples in this guide use environment variables for these values.
Additionally, some examples in this guide use python3 -mjson.tool
to format the JSON response.
This is optional; it is not required to execute API requests.
The following examples demonstrate how to use the {product} {devops-api} to manage {product} resources.
curl --location --request GET 'https://api.astra.datastax.com/v2/streaming/tenants' --header "Authorization: Bearer $ASTRA_TOKEN" | python3 -mjson.tool
Result
[
{
"id": "14b77c47-bdfd-4ba1. . .",
"tenantName": "mytenant",
"clusterName": "pulsar-aws-useast2",
"webServiceUrl": "https://pulsar-aws-useast2",
"brokerServiceUrl": "pulsar+ssl://pulsar-aws-useast2:6651",
"websocketUrl": "wss://pulsar-aws-useast2:8001/ws/v2",
"websocketQueryParamUrl": "wss://pulsar-aws-useast2:8964/ws/v2",
"pulsarToken": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpX…..",
"plan": "payg",
"planCode": "1",
"astraOrgGUID": "b282a256-b129-......",
"cloudProvider": "aws",
"cloudProviderCode": "1",
"cloudRegion": "useast2",
"status": "active",
"jvmVersion": "JDK11",
"pulsarVersion": "2.10.2",
"regionZone": "na",
"Email": "",
"userMetricsUrl": "https://prometheus-aws-useast2….",
"pulsarInstance": "prod0"
},
{
"id": "e8bf25d8-a6a1-4169-. . .",
"tenantName": "mytenant2",
"clusterName": "pulsar-gcp-useast1",
"webServiceUrl": "https://pulsar-gcp-useast1",
"brokerServiceUrl": "pulsar+ssl://pulsar-gcp-useast1:6651",
"websocketUrl": "wss://pulsar-gcp-useast1m:8001/ws/v2",
"websocketQueryParamUrl": "wss://pulsar-gcp-useast1:8964/ws/v2",
"pulsarToken": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.ey. . .",
"plan": "payg",
"planCode": "1",
"astraOrgGUID": "b282a256-b129-43e9. . .",
"cloudProvider": "gcp",
"cloudProviderCode": "2",
"cloudRegion": "useast1",
"status": "active",
"jvmVersion": "JDK11",
"pulsarVersion": "2.10.2",
"regionZone": "na",
"Email": "",
"userMetricsUrl": "https://prometheus-gcp-useast1. . .",
"pulsarInstance": "prod0"
}
]
curl --location --request GET 'https://api.astra.datastax.com/v2/streaming/providers' --header "Authorization: Bearer $ASTRA_TOKEN" | python3 -mjson.tool
Result
{
"aws": [
"useast1",
"uswest2",
"useast2"
],
"azure": [
"westus2",
"eastus",
"australiaeast"
],
"gcp": [
"useast1",
"uscentral1",
"australiase1",
"europewest1",
"useast4"
]
}
Create a tenant:
curl --location --request POST 'https://api.astra.datastax.com/v2/streaming/tenants' --header 'Content-Type: application/json' --header "Authorization: Bearer $ASTRA_TOKEN" --data-raw '{
"cloudProvider": "aws",
"cloudRegion": "useast2",
"tenantName": "mytenant",
"userEmail": "[email protected]"
}' | python3 -mjson.tool
Create a tenant with file input:
curl --fail --location --request POST 'https://api.astra.datastax.com/v2/streaming/tenants' --header 'Content-Type: application/json' --header "Authorization: Bearer $ASTRA_TOKEN" --data "@mytenant-config.json" | python3 -mjson.tool
Result
The output includes the "pulsarToken" which is the JWT for this {pulsar-short} instance.
{
"namespace": "default",
"topic": "",
"id": "",
"tenantName": "mytenant",
"clusterName": "pulsar-aws-useast2",
"webServiceUrl": "https://pulsar-aws-useast2",
"brokerServiceUrl": "pulsar+ssl://pulsar-aws-useast2:6651",
"websocketUrl": "wss://pulsar-aws-useast2:8001/ws/v2",
"websocketQueryParamUrl": "wss://pulsar-aws-useast2:8964/ws/v2",
"pulsarToken": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9. . .",
"plan": "payg",
"planCode": "",
"astraOrgGUID": "b282a256-b129-43e9. . .",
"cloudProvider": "aws",
"cloudProviderCode": "",
"cloudRegion": "useast2",
"status": "active",
"jvmVersion": "JDK11",
"pulsarVersion": "2.10.2",
"regionZone": "",
"Email": "",
"userMetricsUrl": "",
"pulsarInstance": ""
}
To manage {product} namespaces, use the {pulsar-short} REST APIs.
curl --location --request GET “https://$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
[
"mytenant/default",
"mytenant/mynamespace"
]
curl -sS --fail --location --request PUT --header "Authorization: Bearer $PULSAR_TOKEN" "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE"
Result
Output: No reply means successful.
curl -sS --fail --location --request DELETE --header "Authorization: Bearer $PULSAR_TOKEN" "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE"
No response indicates success.
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/retention" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
"retentionTimeInMinutes": 0,
"retentionSizeInMB": 0
}
curl --location "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/retention" --header 'Content-Type: application/json' --header "Authorization: Bearer $PULSAR_TOKEN" --data '{
"retentionTimeInMinutes": 360,
"retentionSizeInMB": 102
}'
No response indicates success.
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/backlogQuotaMap" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
"destination_storage": {
"limit": -1,
"limitSize": 102400,
"limitTime": 3600,
"policy": "producer_exception"
}
}
curl -sS --fail --location --request POST "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/backlogQuota" --header "Authorization: Bearer $PULSAR_TOKEN" --header 'Content-Type: application/json' --data '{
"limit": -1,
"limitSize": 102400,
"limitTime": 3600,
"policy": "producer_exception"
}'
No response indicates success.
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/messageTTL" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
The response is a number, such as 3600
.
curl -sS --fail --location --request POST "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/messageTTL" --header "Authorization: Bearer $PULSAR_TOKEN" --header 'Content-Type: application/json' --data 3600
No response indicates success.
Input parameter “topicType" should be either “non-partitioned" or “partitioned".
curl -sS --fail --location --request POST --header "Authorization: Bearer $PULSAR_TOKEN" "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/autoTopicCreation" --header 'Content-Type: application/json' --data '{
"allowAutoTopicCreation": false,
"topicType": "non-partitioned"
}'
No response indicates success.
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/maxConsumersPerTopic" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
The response is a number, such as 50
.
curl -sS --fail --location --request POST "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/maxConsumersPerTopic" --header "Authorization: Bearer $PULSAR_TOKEN" --header 'Content-Type: application/json' --data 100
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/maxTopicsPerNamespace" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
The response is a number.
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
[
"persistent://testtenant/ns0/mytopic-partition-0",
"persistent://testtenant/ns0/mytopic-partition-1",
"persistent://testtenant/ns0/topic1",
"persistent://testtenant/ns0/topic2",
"persistent://testtenant/ns0/tp1-partition-0",
"persistent://testtenant/ns0/tp1-partition-1",
"persistent://testtenant/ns0/tp1-partition-2",
"persistent://testtenant/ns0/tp1-partition-3"
]
curl -sS --fail --location --request PUT "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE
/$TOPIC" --header "Authorization: Bearer $PULSAR_TOKEN"
No response indicates success.
curl -sS --fail --location --request PUT "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/partitions" --header "Authorization: Bearer $PULSAR_TOKEN" --header "Content-Type: application/json" --data $NUM_OF_PARTITIONS
No response indicates success.
curl -sS --fail --location --request DELETE"$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/partitions" --header "Authorization: Bearer $PULSAR_TOKEN"
No response indicates success.
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/internalStats" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
"entriesAddedCounter": 0,
"numberOfEntries": 0,
"totalSize": 0,
"currentLedgerEntries": 0,
"currentLedgerSize": 0,
"lastLedgerCreatedTimestamp": "2023-04-25T15:35:45.136Z",
"waitingCursorsCount": 0,
"pendingAddEntriesCount": 0,
"lastConfirmedEntry": "275812:-1",
"state": "LedgerOpened",
"ledgers": [
{
"ledgerId": 275812,
"entries": 0,
"size": 0,
"offloaded": false,
"underReplicated": false
}
],
"cursors": {},
"schemaLedgers": [],
"compactedLedger": {
"ledgerId": -1,
"entries": -1,
"size": -1,
"offloaded": false,
"underReplicated": false
}
}
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/stats" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
"msgRateIn": 0.0,
"msgThroughputIn": 0.0,
"msgRateOut": 0.0,
"msgThroughputOut": 0.0,
"bytesInCounter": 0,
"msgInCounter": 0,
"bytesOutCounter": 0,
"msgOutCounter": 0,
"averageMsgSize": 0.0,
"msgChunkPublished": false,
"storageSize": 0,
"backlogSize": 0,
"publishRateLimitedTimes": 0,
"earliestMsgPublishTimeInBacklogs": 0,
"offloadedStorageSize": 0,
"lastOffloadLedgerId": 0,
"lastOffloadSuccessTimeStamp": 0,
"lastOffloadFailureTimeStamp": 0,
"publishers": [],
"waitingPublishers": 0,
"subscriptions": {},
"replication": {},
"deduplicationStatus": "Disabled",
"nonContiguousDeletedMessagesRanges": 0,
"nonContiguousDeletedMessagesRangesSerializedSize": 0,
"compaction": {
"lastCompactionRemovedEventCount": 0,
"lastCompactionSucceedTimestamp": 0,
"lastCompactionFailedTimestamp": 0,
"lastCompactionDurationTimeInMills": 0
}
...TRUNCATED FOR READABILITY...
}
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/partitioned-stats" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
"msgRateIn": 0.0,
"msgThroughputIn": 0.0,
"msgRateOut": 0.0,
"msgThroughputOut": 0.0,
"bytesInCounter": 0,
"msgInCounter": 0,
"bytesOutCounter": 0,
"msgOutCounter": 0,
"averageMsgSize": 0.0,
"msgChunkPublished": false,
"storageSize": 0,
"backlogSize": 0,
"publishRateLimitedTimes": 0,
"earliestMsgPublishTimeInBacklogs": 0,
"offloadedStorageSize": 0,
"lastOffloadLedgerId": 0,
"lastOffloadSuccessTimeStamp": 0,
"lastOffloadFailureTimeStamp": 0,
"publishers": [],
"waitingPublishers": 0,
"subscriptions": {},
"replication": {},
"nonContiguousDeletedMessagesRanges": 0,
"nonContiguousDeletedMessagesRangesSerializedSize": 0,
"compaction": {
"lastCompactionRemovedEventCount": 0,
"lastCompactionSucceedTimestamp": 0,
"lastCompactionFailedTimestamp": 0,
"lastCompactionDurationTimeInMills": 0
},
"metadata": {
"partitions": 2,
"deleted": false
},
"partitions": {
"persistent://testcreate/ns0/mytopic-partition-1": {
"msgRateIn": 0.0,
"msgThroughputIn": 0.0,
"msgRateOut": 0.0,
"msgThroughputOut": 0.0,
"bytesInCounter": 0,
"msgInCounter": 0,
"bytesOutCounter": 0,
"msgOutCounter": 0,
"averageMsgSize": 0.0,
"msgChunkPublished": false,
"storageSize": 0,
"backlogSize": 0,
"publishRateLimitedTimes": 0,
"earliestMsgPublishTimeInBacklogs": 0,
"offloadedStorageSize": 0,
"lastOffloadLedgerId": 0,
"lastOffloadSuccessTimeStamp": 0,
"lastOffloadFailureTimeStamp": 0,
"publishers": [],
"waitingPublishers": 0,
"subscriptions": {},
"replication": {},
"deduplicationStatus": "Disabled",
"nonContiguousDeletedMessagesRanges": 0,
"nonContiguousDeletedMessagesRangesSerializedSize": 0,
"compaction": {
"lastCompactionRemovedEventCount": 0,
"lastCompactionSucceedTimestamp": 0,
"lastCompactionFailedTimestamp": 0,
"lastCompactionDurationTimeInMills": 0
}
},
"persistent://testcreate/ns0/mytopic-partition-0": {
"msgRateIn": 0.0,
"msgThroughputIn": 0.0,
"msgRateOut": 0.0,
"msgThroughputOut": 0.0,
"bytesInCounter": 0,
"msgInCounter": 0,
"bytesOutCounter": 0,
"msgOutCounter": 0,
"averageMsgSize": 0.0,
"msgChunkPublished": false,
"storageSize": 0,
"backlogSize": 0,
"publishRateLimitedTimes": 0,
"earliestMsgPublishTimeInBacklogs": 0,
"offloadedStorageSize": 0,
"lastOffloadLedgerId": 0,
"lastOffloadSuccessTimeStamp": 0,
"lastOffloadFailureTimeStamp": 0,
"publishers": [],
"waitingPublishers": 0,
"subscriptions": {},
"replication": {},
"deduplicationStatus": "Disabled",
"nonContiguousDeletedMessagesRanges": 0,
"nonContiguousDeletedMessagesRangesSerializedSize": 0,
"compaction": {
"lastCompactionRemovedEventCount": 0,
"lastCompactionSucceedTimestamp": 0,
"lastCompactionFailedTimestamp": 0,
"lastCompactionDurationTimeInMills": 0
}
}
}
...TRUNCATED FOR READABILITY...
}
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/stats/topics/$TENANT/$NAMESPACE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
"persistent://testcreate/ns0/mytopic3": {
"name": "persistent://testcreate/ns0/mytopic3",
"totalMessagesIn": 0,
"totalMessagesOut": 0,
"totalBytesIn": 0,
"totalBytesOut": 0,
"msgRateIn": 0,
"msgRateOut": 0,
"throughputIn": 0,
"throughputOut": 0,
"subscriptionCount": 0,
"producerCount": 0,
"consumerCount": 0,
"subscriptionDelayed": 0,
"storageSize": 0,
"backlogStorageByteSize": 0,
"msgBacklogNumber": 0,
"updatedAt": "2023-04-25T16:00:24.252397617Z"
},
"persistent://testcreate/ns0/t1": {
"name": "persistent://testcreate/ns0/t1",
"totalMessagesIn": 0,
"totalMessagesOut": 0,
"totalBytesIn": 0,
"totalBytesOut": 0,
"msgRateIn": 0,
"msgRateOut": 0,
"throughputIn": 0,
"throughputOut": 0,
"subscriptionCount": 0,
"producerCount": 0,
"consumerCount": 0,
"subscriptionDelayed": 0,
"storageSize": 0,
"backlogStorageByteSize": 0,
"msgBacklogNumber": 0,
"updatedAt": "2023-04-25T16:00:24.252466612Z"
},
"persistent://testcreate/ns0/t1-partition-0": {
"name": "persistent://testcreate/ns0/t1-partition-0",
"totalMessagesIn": 516,
"totalMessagesOut": 514,
"totalBytesIn": 637776,
"totalBytesOut": 637674,
"msgRateIn": 0,
"msgRateOut": 0,
"throughputIn": 0,
"throughputOut": 0,
"subscriptionCount": 1,
"producerCount": 0,
"consumerCount": 0,
"subscriptionDelayed": 0,
"storageSize": 1899200,
"backlogStorageByteSize": 0,
"msgBacklogNumber": 0,
"updatedAt": "2023-04-25T16:00:24.252410963Z"
},
"persistent://testcreate/ns0/t1-partition-1": {
"name": "persistent://testcreate/ns0/t1-partition-1",
"totalMessagesIn": 534,
"totalMessagesOut": 531,
"totalBytesIn": 696340,
"totalBytesOut": 692347,
"msgRateIn": 0,
"msgRateOut": 0,
"throughputIn": 0,
"throughputOut": 0,
"subscriptionCount": 1,
"producerCount": 0,
"consumerCount": 0,
"subscriptionDelayed": 0,
"storageSize": 2020678,
"backlogStorageByteSize": 2151,
"msgBacklogNumber": 3,
"updatedAt": "2023-04-25T16:00:24.252425482Z"
},
"persistent://testcreate/ns0/t1-partition-2": {
"name": "persistent://testcreate/ns0/t1-partition-2",
"totalMessagesIn": 522,
"totalMessagesOut": 519,
"totalBytesIn": 653487,
"totalBytesOut": 649286,
"msgRateIn": 0,
"msgRateOut": 0,
"throughputIn": 0,
"throughputOut": 0,
"subscriptionCount": 1,
"producerCount": 0,
"consumerCount": 0,
"subscriptionDelayed": 0,
"storageSize": 1916574,
"backlogStorageByteSize": 0,
"msgBacklogNumber": 0,
"updatedAt": "2023-04-25T16:00:24.252438306Z"
},
"persistent://testcreate/ns0/t1-partition-3": {
"name": "persistent://testcreate/ns0/t1-partition-3",
"totalMessagesIn": 516,
"totalMessagesOut": 514,
"totalBytesIn": 631638,
"totalBytesOut": 631536,
"msgRateIn": 0,
"msgRateOut": 0,
"throughputIn": 0,
"throughputOut": 0,
"subscriptionCount": 1,
"producerCount": 0,
"consumerCount": 0,
"subscriptionDelayed": 0,
"storageSize": 1890920,
"backlogStorageByteSize": 1586,
"msgBacklogNumber": 4,
"updatedAt": "2023-04-25T16:00:24.252452735Z"
...TRUNCATED FOR READABILITY...
}
...TRUNCATED FOR READABILITY...
}
curl -sS --fail --location --request GET "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/subscriptions" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
[
"mysub",
"subscript2"
]
Create a replicated or non-replicated subscription. "Replicated=true" can be set to “false" for non-replicated subscriptions.
curl -sS --fail --location --request PUT "$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/subscription/$SUBSCRIPTION?replicated=true" --header "Authorization: Bearer $PULSAR_TOKEN" --header "Content-Type: application/json"
No response indicates success.
curl -sS --fail --location --request DELETE"$WEB_SERVICE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/subscription/$SUBSCRIPTION" --header "Authorization: Bearer $PULSAR_TOKEN"
No response indicates success.
curl --location --fail --request GET "https://api.astra.datastax.com/v2/streaming/replications/$INSTANCE/$TENANT/$NAMESPACE" --header "Authorization: Bearer $ASTRA_TOKEN" | python3 -mjson.tool
Result
{
"pulsarInstance": "prod0",
"tenant": "mytenant",
"namespace": "mynamespace",
"replications": {
"pulsar-aws-useast2": [
"pulsar-aws-uswest2",
"pulsar-aws-useast2"
],
"pulsar-aws-uswest2": [
"pulsar-aws-uswest2",
"pulsar-aws-useast2"
]
},
"clusters": {
"pulsar-aws-useast2": {
"clusterName": "pulsar-aws-useast2",
"cloudProvider": "aws",
"cloudRegion": "useast2",
"clusterType": "cloud",
"webServiceUrl": "https://pvt-pulsar-aws-useast2:8443",
"brokerServiceUrl": "pulsar+ssl://pulsar-aws-useast2:6651",
"websocketUrl": "",
"pulsarInstance": "prod0",
"regionZone": ""
},
"pulsar-aws-uswest2": {
"clusterName": "pulsar-aws-uswest2",
"cloudProvider": "aws",
"cloudRegion": "uswest2",
"clusterType": "cloud",
"webServiceUrl": "https://pvt-pulsar-aws-uswest2:8443",
"brokerServiceUrl": "pulsar+ssl://pulsar-aws-uswest2:6651",
"websocketUrl": "",
"pulsarInstance": "prod0",
"regionZone": ""
}
...TRUNCATED FOR READABILITY...
}
}
The JSON input parameters can be obtained from List Tenants with Details and Get a list cloud providers of {product} sections of this guide.
curl --location --fail --request POST "https://api.astra.datastax.com/v2/streaming/replications/$INSTANCE/$TENANT/$NAMESPACE" --header "Content-Type: application/json" --header "Authorization: Bearer $ASTRA_TOKEN" --data-raw '{
"bidirection": true,
"destCluster": "pulsar-aws-uswest2",
"email": "[email protected]",
"namespace": "mynamespace",
"originCluster": "pulsar-aws-useast2"
}'
No response indicates success.
The JSON input parameters can be obtained from List Tenants with Details and Get a list cloud providers of {product} sections of this guide.
curl --location --fail --request DELETE "https://api.astra.datastax.com/v2/streaming/replications/$INSTANCE/$TENANT/$NAMESPACE" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer $ASTRA_TOKEN" \
--data-raw '{
"bidirection": true,
"destCluster": "pulsar-aws-uswest2",
"email": "[email protected]",
"namespace": "ns0",
"originCluster": "pulsar-aws-useast2"
}'
No response indicates success.
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
[
"testfunction1"
]
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE/$FUNCTION/status" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
"numInstances": 1,
"numRunning": 0,
"instances": [
{
"instanceId": 0,
"status": {
"running": false,
"error": "",
"numRestarts": 0,
"numReceived": 0,
"numSuccessfullyProcessed": 0,
"numUserExceptions": 0,
"latestUserExceptions": null,
"numSystemExceptions": 0,
"latestSystemExceptions": null,
"averageLatency": 0.0,
"lastInvocationTime": 0,
"workerId": "pulsar-aws-useast2-function-0"
}
}
]
}
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE/$FUNCTION/stats" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
"receivedTotal": 0,
"processedSuccessfullyTotal": 0,
"systemExceptionsTotal": 0,
"userExceptionsTotal": 0,
"avgProcessLatency": null,
"1min": {
"receivedTotal": 0,
"processedSuccessfullyTotal": 0,
"systemExceptionsTotal": 0,
"userExceptionsTotal": 0,
"avgProcessLatency": null
},
"lastInvocation": null,
"instances": [
{
"instanceId": 0,
"metrics": {
"receivedTotal": 0,
"processedSuccessfullyTotal": 0,
"systemExceptionsTotal": 0,
"userExceptionsTotal": 0,
"avgProcessLatency": null,
"1min": {
"receivedTotal": 0,
"processedSuccessfullyTotal": 0,
"systemExceptionsTotal": 0,
"userExceptionsTotal": 0,
"avgProcessLatency": null
},
"lastInvocation": null,
"userMetrics": {}
}
}
]
}
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE/$FUNCTION" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
"runtimeFlags": null,
"tenant": "mytenant",
"namespace": "mynamespace",
"name": "testfunction1",
"className": "TransformFunction",
"inputs": null,
"customSerdeInputs": null,
"topicsPattern": null,
"customSchemaInputs": null,
"customSchemaOutputs": null,
"inputSpecs": {
"testcreate/ns0/tp1": {
"schemaType": null,
"serdeClassName": null,
"schemaProperties": {},
"consumerProperties": {},
"receiverQueueSize": null,
"cryptoConfig": null,
"poolMessages": false,
"regexPattern": false
}
},
"output": "mytenant/mynamespace/tp2",
"producerConfig": {
"maxPendingMessages": null,
"maxPendingMessagesAcrossPartitions": null,
"useThreadLocalProducers": false,
"cryptoConfig": null,
"batchBuilder": ""
},
"outputSchemaType": null,
"outputSerdeClassName": null,
"logTopic": null,
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": false,
"retainKeyOrdering": false,
"batchBuilder": null,
"forwardSourceMessageProperty": true,
"userConfig": {
"steps": [
{
"schema-type": "STRING",
"type": "cast"
}
]
},
"secrets": null,
"runtime": "JAVA",
"autoAck": true,
"maxMessageRetries": null,
"deadLetterTopic": null,
"subName": null,
"parallelism": 1,
"resources": {
"cpu": 0.25,
"ram": 1000000000,
"disk": 1000000000
},
"fqfn": null,
"windowConfig": null,
"timeoutMs": 11000,
"jar": null,
"py": null,
"go": null,
"functionType": null,
"cleanupSubscription": false,
"customRuntimeOptions": "",
"maxPendingAsyncRequests": null,
"exposePulsarAdminClientEnabled": null,
"subscriptionPosition": "Latest"
}
curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE/$FUNCTION/start" --header "Authorization: Bearer $PULSAR_TOKEN"
No response indicates success.
curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/functions/$TENANT/$NAMESPACE/$FUNCTION/stop" --header "Authorization: Bearer $PULSAR_TOKEN"
No response indicates success.
Get a list of Token IDs for your Cluster. With the TokenID, you can then lookup and obtain the {pulsar-short} JWT string. The TokenIDs are also listed in the {astra-ui} for that Tenant and Cluster.
Required parameters "CLUSTER" is obtained from the "List Tenants with Details" API command.
curl --location --request GET "https://api.astra.datastax.com/v2/streaming/tenants/$TENANT/tokens" --header "Authorization: Bearer $ASTRA_TOKEN" --header "X-DataStax-Pulsar-Cluster: $CLUSTER" | python3 -mjson.tool
Result
[
{
"iat": 1679335276,
"iss": "datastax",
"sub": "client;b282a256-b129-43e9-b870. . .",
"tokenid": "cdb87797. . ."
}
]
curl --fail --location --request GET "https://api.astra.datastax.com/v2/streaming/tenants/$TENANT/tokens/$TOKENID" --header "X-DataStax-Pulsar-Cluster: $CLUSTER" --header "Authorization: Bearer $ASTRA_TOKEN"
Result
Output: Raw string JWT
eyJhbGciOiJSUzI1NiIsI . . .
Create a new {pulsar-short} JWT. The new JWT will also be visible in the {astra-ui} for that Tenant and Cluster.
Required parameters "CLUSTER" is obtained from the "List Tenants with Details" API command.
curl --fail --location --request POST "https://api.astra.datastax.com/v2/streaming/tenants/$TENANT/tokens" --header "X-DataStax-Pulsar-Cluster: $CLUSTER" --header "Authorization: Bearer $ASTRA_TOKEN"
Result
Output: new raw string JWT
eyJhbGciOiJSUzI1NiIsI . . .
Required parameters "CLUSTER" is obtained from the “List Tenants with Details" API command. List of "TOKENID" can be obtained from List Existing Tokens IDs.
curl --fail --location --request DELETE "https://api.astra.datastax.com/v2/streaming/tenants/$TENANT/tokens" --header "X-DataStax-Pulsar-Cluster: $CLUSTER" --header "Authorization: Bearer $ASTRA_TOKEN"
No response indicates success.
{pulsar-short} Sources and Sinks share a similar API structure for most methods.
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
[
"mysource1"
]
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
[
"mysink1"
]
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE/status" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
"numInstances": 1,
"numRunning": 1,
"instances": [
{
"instanceId": 0,
"status": {
"running": true,
"error": "",
"numRestarts": 0,
"numReceivedFromSource": 0,
"numSystemExceptions": 0,
"latestSystemExceptions": [],
"numSourceExceptions": 0,
"latestSourceExceptions": [],
"numWritten": 0,
"lastReceivedTime": 0,
"workerId": "pulsar-aws-useast2-function-0"
}
}
]
}
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SINK/status" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
"numInstances": 1,
"numRunning": 0,
"instances": [
{
"instanceId": 0,
"status": {
"running": false,
"error": "",
"numRestarts": 0,
"numReadFromPulsar": 0,
"numSystemExceptions": 0,
"latestSystemExceptions": null,
"numSinkExceptions": 0,
"latestSinkExceptions": null,
"numWrittenToSink": 0,
"lastReceivedTime": 0,
"workerId": "pulsar-useast-function-1"
}
}
]
}
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
"archive": "builtin://netty",
"batchBuilder": null,
"batchSourceConfig": null,
"className": "org.apache.pulsar.io.netty.NettySource",
"configs": {
"host": "127.0.0.1",
"numberOfThreads": "1",
"port": "10999",
"type": "tcp"
},
"customRuntimeOptions": "internal_data",
"name": "mysource",
"namespace": "ns0",
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE",
"producerConfig": {
"batchBuilder": "",
"cryptoConfig": null,
"maxPendingMessages": null,
"maxPendingMessagesAcrossPartitions": null,
"useThreadLocalProducers": false
},
"resources": {
"cpu": 0.25,
"disk": 1000000000,
"ram": 1000000000
},
"runtimeFlags": null,
"schemaType": null,
"secrets": null,
"serdeClassName": null,
"tenant": "testcreate",
"topicName": "persistent://testcreate/ns0/t1"
}
curl --fail --location --request GET "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK" --header "Authorization: Bearer $PULSAR_TOKEN" | python3 -mjson.tool
Result
{
"archive": "builtin://data-generator",
"autoAck": true,
"className": "org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink",
"cleanupSubscription": false,
"configs": {},
"customRuntimeOptions": "internal_data",
"deadLetterTopic": null,
"inputSpecs": {
"persistent://testcreate/ns0/tp1": {
"consumerProperties": {},
"cryptoConfig": null,
"poolMessages": false,
"receiverQueueSize": null,
"regexPattern": false,
"schemaProperties": {},
"schemaType": null,
"serdeClassName": null
}
},
"inputs": [
"persistent://testcreate/ns0/tp1"
],
"maxMessageRetries": null,
"name": "mysink1",
"namespace": "ns0",
"negativeAckRedeliveryDelayMs": null,
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE",
"resources": {
"cpu": 0.15,
"disk": 500000000,
"ram": 400000000
},
"retainKeyOrdering": false,
"retainOrdering": false,
"runtimeFlags": null,
"secrets": null,
"sourceSubscriptionName": null,
"sourceSubscriptionPosition": "Latest",
"tenant": "testcreate",
"timeoutMs": 5000,
"topicToSchemaProperties": null,
"topicToSchemaType": null,
"topicToSerdeClassName": null,
"topicsPattern": null,
"transformFunction": null,
"transformFunctionClassName": null,
"transformFunctionConfig": null
}
curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE/start" --header "Authorization: Bearer $PULSAR_TOKEN"
No response indicates success.
curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK/start" --header "Authorization: Bearer $PULSAR_TOKEN"
No response indicates success.
curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE/stop" --header "Authorization: Bearer $PULSAR_TOKEN"
No response indicates success.
curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK/stop" --header "Authorization: Bearer $PULSAR_TOKEN"
No response indicates success.
curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE" --header "Authorization: Bearer $PULSAR_TOKEN" --form "[email protected];type=application/json"
No response indicates success.
In the example above, a configuration file is provided as input to CURL. The file is named "mynetty-source-config.json", which has the following context for the built-in “netty" source connector in {product}.
Tip
|
The curl parameter |
curl --fail --location --request DELETE "$WEB_SERVICE_URL/admin/v3/sources/$TENANT/$NAMESPACE/$SOURCE" --header "Authorization: Bearer $PULSAR_TOKEN"
No response indicates success.
curl --fail --location --request POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK" --header "Authorization: Bearer $PULSAR_TOKEN" --form "[email protected];type=application/json"
No response indicates success.
In the example above, a configuration file is provided as input to CURL. The file is named mykafka-sink-config.json which has the following context for the built-in “kafka" source connector in {product}.
{
"tenant": "testcreate",
"namespace": "ns0",
"name": "mykafkaconnector",
"archive": "builtin://kafka",
"parallelism": 1,
"autoAck": true,
"cleanupSubscription": false,
"configs": {
"acks": "1",
"batchSize": "16384",
"bootstrapServers": "localhost:55200,localhost:55201",
"maxRequestSize": "1048576",
"producerConfigProperties": {
"client.id": "astra-streaming-client",
"sasl.jaas.config": "sensitive_data_removed",
"sasl.mechanism": "PLAIN",
"sasl.password": "sensitive_data_removed",
"sasl.username": "myuserid",
"security.protocol": "SASL_SSL"
},
"topic": "mykafka-topic"
},
"inputs": [ "persistent://testcreate/ns0/mytopic3" ]
}
Tip
|
The curl parameter |