Skip to content

Commit 3b3a053

Browse files
committed
bugfix: kafka message consumer with config
1 parent 2cef487 commit 3b3a053

File tree

2 files changed

+36
-1
lines changed

2 files changed

+36
-1
lines changed

packages/Kafka/src/Configuration/KafkaModule.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
117117

118118
foreach ($extensionObjects as $extensionObject) {
119119
if ($extensionObject instanceof KafkaConsumerConfiguration) {
120-
$consumerConfigurations[$extensionObject->getEndpointId()] = $consumerConfigurations;
120+
$consumerConfigurations[$extensionObject->getEndpointId()] = $extensionObject;
121121
} elseif ($extensionObject instanceof TopicConfiguration) {
122122
$topicConfigurations[$extensionObject->getTopicName()] = $extensionObject;
123123
$topicReferenceMapping[$extensionObject->referenceName] = $extensionObject->getTopicName();

packages/Kafka/tests/Integration/KafkaChannelAdapterTest.php

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
use Ecotone\Kafka\Api\KafkaHeader;
88
use Ecotone\Kafka\Configuration\KafkaBrokerConfiguration;
9+
use Ecotone\Kafka\Configuration\KafkaConsumerConfiguration;
910
use Ecotone\Kafka\Configuration\KafkaPublisherConfiguration;
1011
use Ecotone\Kafka\Configuration\TopicConfiguration;
1112
use Ecotone\Kafka\Outbound\MessagePublishingException;
@@ -325,4 +326,38 @@ public function test_kafka_consumer_with_delayed_retry(): void
325326
// 5. This continues until max delayed retries reached or success
326327
// 6. If all retries fail, message goes to dead letter channel or is resent to Kafka (based on finalFailureStrategy)
327328
}
329+
330+
public function test_sending_and_receiving_with_kafka_consumer_configuration(): void
331+
{
332+
$topicName = Uuid::uuid4()->toString();
333+
$ecotoneLite = EcotoneLite::bootstrapFlowTesting(
334+
[ExampleKafkaConsumer::class],
335+
[
336+
KafkaBrokerConfiguration::class => ConnectionTestCase::getConnection(),
337+
new ExampleKafkaConsumer(),
338+
],
339+
ServiceConfiguration::createWithDefaults()
340+
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE, ModulePackageList::KAFKA_PACKAGE]))
341+
->withExtensionObjects([
342+
KafkaPublisherConfiguration::createWithDefaults($topicName),
343+
TopicConfiguration::createWithReferenceName('exampleTopic', $topicName),
344+
KafkaConsumerConfiguration::createWithDefaults('exampleConsumer'),
345+
]),
346+
licenceKey: LicenceTesting::VALID_LICENCE,
347+
);
348+
349+
/** @var MessagePublisher $kafkaPublisher */
350+
$kafkaPublisher = $ecotoneLite->getGateway(MessagePublisher::class);
351+
352+
$kafkaPublisher->sendWithMetadata('exampleData', 'application/text', ['key' => 'value']);
353+
354+
$ecotoneLite->run('exampleConsumer', ExecutionPollingMetadata::createWithTestingSetup(
355+
maxExecutionTimeInMilliseconds: 30000
356+
));
357+
358+
$messages = $ecotoneLite->sendQueryWithRouting('getMessages');
359+
360+
self::assertCount(1, $messages);
361+
self::assertEquals('exampleData', $messages[0]['payload']);
362+
}
328363
}

0 commit comments

Comments
 (0)