From 77af1c35cbbad093a49d80e4995b834d934f3cac Mon Sep 17 00:00:00 2001 From: Aleksander Valle Grunnvoll Date: Mon, 11 Dec 2023 11:31:57 +0100 Subject: [PATCH 1/4] DBZ-7244 Simplify, remove unused parameters --- .../server/eventhubs/BatchManager.java | 18 ++++-------------- .../eventhubs/EventHubsChangeConsumer.java | 2 +- 2 files changed, 5 insertions(+), 15 deletions(-) diff --git a/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/BatchManager.java b/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/BatchManager.java index 6a160a68..dd5bcb0e 100644 --- a/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/BatchManager.java +++ b/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/BatchManager.java @@ -6,7 +6,6 @@ package io.debezium.server.eventhubs; import java.util.HashMap; -import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,8 +15,6 @@ import com.azure.messaging.eventhubs.models.CreateBatchOptions; import io.debezium.DebeziumException; -import io.debezium.engine.ChangeEvent; -import io.debezium.engine.DebeziumEngine; public class BatchManager { private static final Logger LOGGER = LoggerFactory.getLogger(BatchManager.class); @@ -32,8 +29,6 @@ public class BatchManager { // Prepare CreateBatchOptions for N partitions private final HashMap batchOptions = new HashMap<>(); private final HashMap batches = new HashMap<>(); - private List> records; - private DebeziumEngine.RecordCommitter> committer; public BatchManager(EventHubProducerClient producer, String configurePartitionId, String configuredPartitionKey, Integer maxBatchSize) { @@ -43,11 +38,7 @@ public BatchManager(EventHubProducerClient producer, String configurePartitionId this.maxBatchSize = maxBatchSize; } - public void initializeBatch(List> records, - DebeziumEngine.RecordCommitter> committer) { - this.records = records; - this.committer = committer; - + public void initializeBatch() { if (!configuredPartitionId.isEmpty() || !configuredPartitionKey.isEmpty()) { CreateBatchOptions op = new CreateBatchOptions(); @@ -99,7 +90,7 @@ public void closeAndEmitBatches() { batches.forEach((partitionId, batch) -> { if (batch.getCount() > 0) { LOGGER.trace("Dispatching {} events.", batch.getCount()); - emitBatchToEventHub(records, committer, batch); + emitBatchToEventHub(batch); } }); } @@ -118,15 +109,14 @@ public void sendEventToPartitionId(EventData eventData, Integer recordIndex, Int LOGGER.debug("Maximum batch size reached, dispatching {} events.", batch.getCount()); // Max size reached, dispatch the batch to EventHub - emitBatchToEventHub(records, committer, batch); + emitBatchToEventHub(batch); // Renew the batch proxy so we can continue. batch = new EventDataBatchProxy(producer, batchOptions.get(partitionId)); batches.put(partitionId, batch); } } - private void emitBatchToEventHub(List> records, DebeziumEngine.RecordCommitter> committer, - EventDataBatchProxy batch) { + private void emitBatchToEventHub(EventDataBatchProxy batch) { final int batchEventSize = batch.getCount(); if (batchEventSize > 0) { try { diff --git a/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/EventHubsChangeConsumer.java b/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/EventHubsChangeConsumer.java index e4aa228f..99729ba6 100644 --- a/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/EventHubsChangeConsumer.java +++ b/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/EventHubsChangeConsumer.java @@ -126,7 +126,7 @@ public void handleBatch(List> records, throws InterruptedException { LOGGER.trace("Event Hubs sink adapter processing change events"); - batchManager.initializeBatch(records, committer); + batchManager.initializeBatch(); for (int recordIndex = 0; recordIndex < records.size();) { int start = recordIndex; From a24d1809ce08d586018881db0be7c054179ae36a Mon Sep 17 00:00:00 2001 From: Aleksander Valle Grunnvoll Date: Mon, 11 Dec 2023 12:09:01 +0100 Subject: [PATCH 2/4] DBZ-7244 Fix potential regression Seems like https://github.com/debezium/debezium-server/pull/51 introduced a potential regression, where if the sink is configured with a custom producer, it is not wrapped in a BatchManager and batchManager will be null when handleBatch is invoked. --- .../server/eventhubs/EventHubsChangeConsumer.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/EventHubsChangeConsumer.java b/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/EventHubsChangeConsumer.java index 99729ba6..fe3d5402 100644 --- a/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/EventHubsChangeConsumer.java +++ b/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/EventHubsChangeConsumer.java @@ -71,22 +71,25 @@ public class EventHubsChangeConsumer extends BaseChangeConsumer @PostConstruct void connect() { + final Config config = ConfigProvider.getConfig(); + + // optional config + maxBatchSize = config.getOptionalValue(PROP_MAX_BATCH_SIZE, Integer.class).orElse(0); + configuredPartitionId = config.getOptionalValue(PROP_PARTITION_ID, String.class).orElse(""); + configuredPartitionKey = config.getOptionalValue(PROP_PARTITION_KEY, String.class).orElse(""); + if (customProducer.isResolvable()) { producer = customProducer.get(); + batchManager = new BatchManager(producer, configuredPartitionId, configuredPartitionKey, maxBatchSize); LOGGER.info("Obtained custom configured Event Hubs client for namespace '{}'", customProducer.get().getFullyQualifiedNamespace()); return; } - final Config config = ConfigProvider.getConfig(); + // required config connectionString = config.getValue(PROP_CONNECTION_STRING_NAME, String.class); eventHubName = config.getValue(PROP_EVENTHUB_NAME, String.class); - // optional config - maxBatchSize = config.getOptionalValue(PROP_MAX_BATCH_SIZE, Integer.class).orElse(0); - configuredPartitionId = config.getOptionalValue(PROP_PARTITION_ID, String.class).orElse(""); - configuredPartitionKey = config.getOptionalValue(PROP_PARTITION_KEY, String.class).orElse(""); - String finalConnectionString = String.format(CONNECTION_STRING_FORMAT, connectionString, eventHubName); try { From 9775cea3dcd269959ab4a9dad4c4719ea664a48b Mon Sep 17 00:00:00 2001 From: Aleksander Valle Grunnvoll Date: Mon, 11 Dec 2023 13:21:29 +0100 Subject: [PATCH 3/4] DBZ-7244 Support multiple target event hubs and topic routing --- .../server/eventhubs/BatchManager.java | 16 ++- .../eventhubs/EventHubsChangeConsumer.java | 119 ++++++++++++------ 2 files changed, 96 insertions(+), 39 deletions(-) diff --git a/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/BatchManager.java b/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/BatchManager.java index dd5bcb0e..2800ea09 100644 --- a/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/BatchManager.java +++ b/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/BatchManager.java @@ -29,12 +29,14 @@ public class BatchManager { // Prepare CreateBatchOptions for N partitions private final HashMap batchOptions = new HashMap<>(); private final HashMap batches = new HashMap<>(); + private final Integer partitionCount; public BatchManager(EventHubProducerClient producer, String configurePartitionId, - String configuredPartitionKey, Integer maxBatchSize) { + String configuredPartitionKey, Integer partitionCount, Integer maxBatchSize) { this.producer = producer; this.configuredPartitionId = configurePartitionId; this.configuredPartitionKey = configuredPartitionKey; + this.partitionCount = partitionCount; this.maxBatchSize = maxBatchSize; } @@ -116,6 +118,18 @@ public void sendEventToPartitionId(EventData eventData, Integer recordIndex, Int } } + public Integer getPartitionCount() { + return partitionCount; + } + + public String getEventHubName() { + return producer.getEventHubName(); + } + + public void closeProducer() { + producer.close(); + } + private void emitBatchToEventHub(EventDataBatchProxy batch) { final int batchEventSize = batch.getCount(); if (batchEventSize > 0) { diff --git a/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/EventHubsChangeConsumer.java b/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/EventHubsChangeConsumer.java index fe3d5402..ad29e255 100644 --- a/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/EventHubsChangeConsumer.java +++ b/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/EventHubsChangeConsumer.java @@ -5,7 +5,9 @@ */ package io.debezium.server.eventhubs; +import java.util.HashMap; import java.util.List; +import java.util.Map; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; @@ -46,6 +48,7 @@ public class EventHubsChangeConsumer extends BaseChangeConsumer private static final String PROP_PREFIX = "debezium.sink.eventhubs."; private static final String PROP_CONNECTION_STRING_NAME = PROP_PREFIX + "connectionstring"; private static final String PROP_EVENTHUB_NAME = PROP_PREFIX + "hubname"; + private static final String PROP_EVENTHUB_NAME_LIST = PROP_PREFIX + "hubname.list"; private static final String PROP_PARTITION_ID = PROP_PREFIX + "partitionid"; private static final String PROP_PARTITION_KEY = PROP_PREFIX + "partitionkey"; // maximum size for the batch of events (bytes) @@ -53,17 +56,15 @@ public class EventHubsChangeConsumer extends BaseChangeConsumer private String connectionString; private String eventHubName; + private String[] eventHubNameList; private String configuredPartitionId; private String configuredPartitionKey; private Integer maxBatchSize; - private Integer partitionCount; // connection string format - // Endpoint=sb:///;SharedAccessKeyName=;SharedAccessKey=;EntityPath= private static final String CONNECTION_STRING_FORMAT = "%s;EntityPath=%s"; - - private EventHubProducerClient producer = null; - private BatchManager batchManager = null; + private Map batchManagers = new HashMap<>(); @Inject @CustomConsumerBuilder @@ -73,53 +74,76 @@ public class EventHubsChangeConsumer extends BaseChangeConsumer void connect() { final Config config = ConfigProvider.getConfig(); - // optional config + // Required config + connectionString = config.getValue(PROP_CONNECTION_STRING_NAME, String.class); + eventHubName = config.getValue(PROP_EVENTHUB_NAME, String.class); + + // Optional config maxBatchSize = config.getOptionalValue(PROP_MAX_BATCH_SIZE, Integer.class).orElse(0); configuredPartitionId = config.getOptionalValue(PROP_PARTITION_ID, String.class).orElse(""); configuredPartitionKey = config.getOptionalValue(PROP_PARTITION_KEY, String.class).orElse(""); + eventHubNameList = config.getOptionalValue(PROP_EVENTHUB_NAME_LIST, String.class).orElse("").split(","); - if (customProducer.isResolvable()) { - producer = customProducer.get(); - batchManager = new BatchManager(producer, configuredPartitionId, configuredPartitionKey, maxBatchSize); - LOGGER.info("Obtained custom configured Event Hubs client for namespace '{}'", - customProducer.get().getFullyQualifiedNamespace()); - return; + try { + createDefaultProducerAndBatchManager(); + createBatchManagersFromEventHubNameList(); + } + catch (Exception e) { + throw new DebeziumException(e); } + } - // required config - connectionString = config.getValue(PROP_CONNECTION_STRING_NAME, String.class); - eventHubName = config.getValue(PROP_EVENTHUB_NAME, String.class); + private void createDefaultProducerAndBatchManager() { + EventHubProducerClient defaultProducer; + BatchManager defaultBatchManager; - String finalConnectionString = String.format(CONNECTION_STRING_FORMAT, connectionString, eventHubName); + if (customProducer.isResolvable()) { + defaultProducer = customProducer.get(); + int partitionCount = (int) defaultProducer.getPartitionIds().stream().count(); + validatePartitionId(partitionCount); - try { - producer = new EventHubClientBuilder().connectionString(finalConnectionString).buildProducerClient(); - batchManager = new BatchManager(producer, configuredPartitionId, configuredPartitionKey, maxBatchSize); + defaultBatchManager = new BatchManager(defaultProducer, configuredPartitionId, configuredPartitionKey, partitionCount, maxBatchSize); + LOGGER.info("Obtained custom configured Event Hubs client ({} partitions in hub) for namespace '{}'", + partitionCount, + customProducer.get().getFullyQualifiedNamespace()); } - catch (Exception e) { - throw new DebeziumException(e); + else { + String defaultConnectionString = String.format(CONNECTION_STRING_FORMAT, connectionString, eventHubName); + defaultProducer = new EventHubClientBuilder().connectionString(defaultConnectionString).buildProducerClient(); + int partitionCount = (int) defaultProducer.getPartitionIds().stream().count(); + validatePartitionId(partitionCount); + + defaultBatchManager = new BatchManager(defaultProducer, configuredPartitionId, configuredPartitionKey, partitionCount, maxBatchSize); + LOGGER.info("Obtained default configured Event Hubs client for event hub '{}' ({} partitions)", eventHubName, partitionCount); } - LOGGER.info("Using default Event Hubs client for namespace '{}'", producer.getFullyQualifiedNamespace()); + batchManagers.put(eventHubName, defaultBatchManager); + } + + private void createBatchManagersFromEventHubNameList() { + for (String hubName : eventHubNameList) { + if (!hubName.equals(eventHubName)) { + String finalConnectionString = String.format(CONNECTION_STRING_FORMAT, connectionString, hubName); + EventHubProducerClient producer = new EventHubClientBuilder().connectionString(finalConnectionString).buildProducerClient(); - // Retrieve available partition count for the EventHub - partitionCount = (int) producer.getPartitionIds().stream().count(); - LOGGER.trace("Event Hub '{}' has {} partitions available", producer.getEventHubName(), partitionCount); + int partitionCount = (int) producer.getPartitionIds().stream().count(); + validatePartitionId(partitionCount); - if (!configuredPartitionId.isEmpty() && Integer.parseInt(configuredPartitionId) > partitionCount - 1) { - throw new IndexOutOfBoundsException( - String.format("Target partition id %s does not exist in target EventHub %s", configuredPartitionId, eventHubName)); + BatchManager batchManager = new BatchManager(producer, configuredPartitionId, configuredPartitionKey, partitionCount, maxBatchSize); + batchManagers.put(hubName, batchManager); + LOGGER.info("Obtained Event Hubs client for event hub '{}' ({} partitions)", hubName, partitionCount); + } } } @PreDestroy void close() { try { - producer.close(); - LOGGER.info("Closed Event Hubs producer client"); + batchManagers.values().forEach(BatchManager::closeProducer); + LOGGER.info("Closed Event Hubs producer clients"); } catch (Exception e) { - LOGGER.warn("Exception while closing Event Hubs producer: {}", e); + LOGGER.warn("Exception while closing Event Hubs producers: {}", e); } } @@ -129,7 +153,7 @@ public void handleBatch(List> records, throws InterruptedException { LOGGER.trace("Event Hubs sink adapter processing change events"); - batchManager.initializeBatch(); + batchManagers.values().forEach(BatchManager::initializeBatch); for (int recordIndex = 0; recordIndex < records.size();) { int start = recordIndex; @@ -174,13 +198,25 @@ else if (!configuredPartitionKey.isEmpty()) { } } - // Check that the target partition exists. - if (targetPartitionId < BatchManager.BATCH_INDEX_FOR_NO_PARTITION_ID || targetPartitionId > partitionCount - 1) { - throw new IndexOutOfBoundsException( - String.format("Target partition id %d does not exist in target EventHub %s", targetPartitionId, eventHubName)); - } - try { + String destinationHub = record.destination(); + BatchManager batchManager = batchManagers.get(destinationHub); + + if (batchManager == null) { + batchManager = batchManagers.get(eventHubName); + + if (batchManager == null) { + throw new DebeziumException(String.format("Could not find batch manager for destination hub {}, nor for the default configured event hub {}", + destinationHub, eventHubName)); + } + } + + // Check that the target partition exists. + if (targetPartitionId < BatchManager.BATCH_INDEX_FOR_NO_PARTITION_ID || targetPartitionId > batchManager.getPartitionCount() - 1) { + throw new IndexOutOfBoundsException( + String.format("Target partition id %d does not exist in target EventHub %s", targetPartitionId, batchManager.getEventHubName())); + } + batchManager.sendEventToPartitionId(eventData, recordIndex, targetPartitionId); } catch (IllegalArgumentException e) { @@ -198,7 +234,7 @@ else if (!configuredPartitionKey.isEmpty()) { } } - batchManager.closeAndEmitBatches(); + batchManagers.values().forEach(BatchManager::closeAndEmitBatches); LOGGER.trace("Marking {} records as processed.", records.size()); for (ChangeEvent record : records) { @@ -207,4 +243,11 @@ else if (!configuredPartitionKey.isEmpty()) { committer.markBatchFinished(); LOGGER.trace("Batch marked finished"); } + + private void validatePartitionId(int partitionCount) { + if (!configuredPartitionId.isEmpty() && Integer.parseInt(configuredPartitionId) > partitionCount - 1) { + throw new IndexOutOfBoundsException( + String.format("Target partition id %s does not exist in target EventHub %s", configuredPartitionId, eventHubName)); + } + } } From f6629cecb1dec0e705a811a8266aeb930a464fca Mon Sep 17 00:00:00 2001 From: Aleksander Valle Grunnvoll Date: Thu, 14 Dec 2023 09:48:28 +0100 Subject: [PATCH 4/4] DBZ-7244 Simplify, no need for new config option --- .../eventhubs/EventHubsChangeConsumer.java | 64 +++++++------------ 1 file changed, 23 insertions(+), 41 deletions(-) diff --git a/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/EventHubsChangeConsumer.java b/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/EventHubsChangeConsumer.java index ad29e255..6991eae5 100644 --- a/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/EventHubsChangeConsumer.java +++ b/debezium-server-eventhubs/src/main/java/io/debezium/server/eventhubs/EventHubsChangeConsumer.java @@ -48,15 +48,13 @@ public class EventHubsChangeConsumer extends BaseChangeConsumer private static final String PROP_PREFIX = "debezium.sink.eventhubs."; private static final String PROP_CONNECTION_STRING_NAME = PROP_PREFIX + "connectionstring"; private static final String PROP_EVENTHUB_NAME = PROP_PREFIX + "hubname"; - private static final String PROP_EVENTHUB_NAME_LIST = PROP_PREFIX + "hubname.list"; private static final String PROP_PARTITION_ID = PROP_PREFIX + "partitionid"; private static final String PROP_PARTITION_KEY = PROP_PREFIX + "partitionkey"; // maximum size for the batch of events (bytes) private static final String PROP_MAX_BATCH_SIZE = PROP_PREFIX + "maxbatchsize"; private String connectionString; - private String eventHubName; - private String[] eventHubNameList; + private String[] eventHubNames; private String configuredPartitionId; private String configuredPartitionKey; private Integer maxBatchSize; @@ -76,63 +74,47 @@ void connect() { // Required config connectionString = config.getValue(PROP_CONNECTION_STRING_NAME, String.class); - eventHubName = config.getValue(PROP_EVENTHUB_NAME, String.class); + eventHubNames = config.getValue(PROP_EVENTHUB_NAME, String.class).split(","); // Optional config maxBatchSize = config.getOptionalValue(PROP_MAX_BATCH_SIZE, Integer.class).orElse(0); configuredPartitionId = config.getOptionalValue(PROP_PARTITION_ID, String.class).orElse(""); configuredPartitionKey = config.getOptionalValue(PROP_PARTITION_KEY, String.class).orElse(""); - eventHubNameList = config.getOptionalValue(PROP_EVENTHUB_NAME_LIST, String.class).orElse("").split(","); try { - createDefaultProducerAndBatchManager(); - createBatchManagersFromEventHubNameList(); + createBatchManagersFromEventHubNames(); } catch (Exception e) { throw new DebeziumException(e); } } - private void createDefaultProducerAndBatchManager() { - EventHubProducerClient defaultProducer; - BatchManager defaultBatchManager; - + private void createBatchManagersFromEventHubNames() { if (customProducer.isResolvable()) { - defaultProducer = customProducer.get(); - int partitionCount = (int) defaultProducer.getPartitionIds().stream().count(); - validatePartitionId(partitionCount); + EventHubProducerClient producer = customProducer.get(); + int partitionCount = (int) producer.getPartitionIds().stream().count(); + validatePartitionId(partitionCount, eventHubNames[0]); + + BatchManager batchManager = new BatchManager(producer, configuredPartitionId, configuredPartitionKey, partitionCount, maxBatchSize); + batchManagers.put(eventHubNames[0], batchManager); - defaultBatchManager = new BatchManager(defaultProducer, configuredPartitionId, configuredPartitionKey, partitionCount, maxBatchSize); LOGGER.info("Obtained custom configured Event Hubs client ({} partitions in hub) for namespace '{}'", partitionCount, customProducer.get().getFullyQualifiedNamespace()); - } - else { - String defaultConnectionString = String.format(CONNECTION_STRING_FORMAT, connectionString, eventHubName); - defaultProducer = new EventHubClientBuilder().connectionString(defaultConnectionString).buildProducerClient(); - int partitionCount = (int) defaultProducer.getPartitionIds().stream().count(); - validatePartitionId(partitionCount); - - defaultBatchManager = new BatchManager(defaultProducer, configuredPartitionId, configuredPartitionKey, partitionCount, maxBatchSize); - LOGGER.info("Obtained default configured Event Hubs client for event hub '{}' ({} partitions)", eventHubName, partitionCount); - } - batchManagers.put(eventHubName, defaultBatchManager); - } + return; + } - private void createBatchManagersFromEventHubNameList() { - for (String hubName : eventHubNameList) { - if (!hubName.equals(eventHubName)) { - String finalConnectionString = String.format(CONNECTION_STRING_FORMAT, connectionString, hubName); - EventHubProducerClient producer = new EventHubClientBuilder().connectionString(finalConnectionString).buildProducerClient(); + for (String hubName : eventHubNames) { + String finalConnectionString = String.format(CONNECTION_STRING_FORMAT, connectionString, hubName); + EventHubProducerClient producer = new EventHubClientBuilder().connectionString(finalConnectionString).buildProducerClient(); - int partitionCount = (int) producer.getPartitionIds().stream().count(); - validatePartitionId(partitionCount); + int partitionCount = (int) producer.getPartitionIds().stream().count(); + validatePartitionId(partitionCount, hubName); - BatchManager batchManager = new BatchManager(producer, configuredPartitionId, configuredPartitionKey, partitionCount, maxBatchSize); - batchManagers.put(hubName, batchManager); - LOGGER.info("Obtained Event Hubs client for event hub '{}' ({} partitions)", hubName, partitionCount); - } + BatchManager batchManager = new BatchManager(producer, configuredPartitionId, configuredPartitionKey, partitionCount, maxBatchSize); + batchManagers.put(hubName, batchManager); + LOGGER.info("Obtained Event Hubs client for event hub '{}' ({} partitions)", hubName, partitionCount); } } @@ -203,11 +185,11 @@ else if (!configuredPartitionKey.isEmpty()) { BatchManager batchManager = batchManagers.get(destinationHub); if (batchManager == null) { - batchManager = batchManagers.get(eventHubName); + batchManager = batchManagers.get(eventHubNames[0]); if (batchManager == null) { throw new DebeziumException(String.format("Could not find batch manager for destination hub {}, nor for the default configured event hub {}", - destinationHub, eventHubName)); + destinationHub, eventHubNames[0])); } } @@ -244,7 +226,7 @@ else if (!configuredPartitionKey.isEmpty()) { LOGGER.trace("Batch marked finished"); } - private void validatePartitionId(int partitionCount) { + private void validatePartitionId(int partitionCount, String eventHubName) { if (!configuredPartitionId.isEmpty() && Integer.parseInt(configuredPartitionId) > partitionCount - 1) { throw new IndexOutOfBoundsException( String.format("Target partition id %s does not exist in target EventHub %s", configuredPartitionId, eventHubName));