Skip to content

Commit 133923c

Browse files
committed
Migrate the hub codebase to use the latest Kafka connector records
1 parent be45575 commit 133923c

3 files changed

Lines changed: 7 additions & 7 deletions

File tree

examples/kafka-hub/consolidator/consolidator_service.bal

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ http:Service consolidatorService = service object {
3737
isolated function consolidateSystemState() returns error? {
3838
do {
3939
while true {
40-
kafka:ConsumerRecord[] records = check conn:websubEventConsumer->poll(config:POLLING_INTERVAL);
41-
foreach kafka:ConsumerRecord currentRecord in records {
40+
kafka:BytesConsumerRecord[] records = check conn:websubEventConsumer->poll(config:POLLING_INTERVAL);
41+
foreach kafka:BytesConsumerRecord currentRecord in records {
4242
string lastPersistedData = check string:fromBytes(currentRecord.value);
4343
error? result = processPersistedData(lastPersistedData);
4444
if result is error {

examples/kafka-hub/hub/hub_state_update.bal

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@ function initializeHubState() returns error? {
3939

4040
function updateHubState() returns error? {
4141
while true {
42-
kafka:ConsumerRecord[] records = check conn:websubEventsConsumer->poll(config:POLLING_INTERVAL);
42+
kafka:BytesConsumerRecord[] records = check conn:websubEventsConsumer->poll(config:POLLING_INTERVAL);
4343
if records.length() <= 0 {
4444
continue;
4545
}
46-
foreach kafka:ConsumerRecord currentRecord in records {
46+
foreach kafka:BytesConsumerRecord currentRecord in records {
4747
string lastPersistedData = check string:fromBytes(currentRecord.value);
4848
error? result = processStateUpdateEvent(lastPersistedData);
4949
if result is error {

examples/kafka-hub/hub/websub_subscribers.bal

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ isolated function pollForNewUpdates(string subscriberId, websubhub:VerifiedSubsc
8585
});
8686
do {
8787
while true {
88-
kafka:ConsumerRecord[] records = check consumerEp->poll(config:POLLING_INTERVAL);
88+
kafka:BytesConsumerRecord[] records = check consumerEp->poll(config:POLLING_INTERVAL);
8989
if !isValidConsumer(topicName, subscriberId) {
9090
fail error(string `Subscriber with Id ${subscriberId} or topic ${topicName} is invalid`);
9191
}
@@ -113,7 +113,7 @@ isolated function isValidSubscription(string subscriberId) returns boolean {
113113
}
114114
}
115115

116-
isolated function notifySubscribers(kafka:ConsumerRecord[] records, websubhub:HubClient clientEp, kafka:Consumer consumerEp) returns error? {
116+
isolated function notifySubscribers(kafka:BytesConsumerRecord[] records, websubhub:HubClient clientEp, kafka:Consumer consumerEp) returns error? {
117117
foreach var kafkaRecord in records {
118118
var message = deSerializeKafkaRecord(kafkaRecord);
119119
if message is websubhub:ContentDistributionMessage {
@@ -129,7 +129,7 @@ isolated function notifySubscribers(kafka:ConsumerRecord[] records, websubhub:Hu
129129
}
130130
}
131131

132-
isolated function deSerializeKafkaRecord(kafka:ConsumerRecord kafkaRecord) returns websubhub:ContentDistributionMessage|error {
132+
isolated function deSerializeKafkaRecord(kafka:BytesConsumerRecord kafkaRecord) returns websubhub:ContentDistributionMessage|error {
133133
byte[] content = kafkaRecord.value;
134134
string message = check string:fromBytes(content);
135135
json payload = check value:fromJsonString(message);

0 commit comments

Comments
 (0)