Skip to content

Commit f4fa1a8

Browse files
committed
#6858 downgrade kafka in-loop debug info to DEBUG from INFO - too noisy
Signed-off-by: Nigel Jones <nigel.l.jones+git@gmail.com>
1 parent 7f3d849 commit f4fa1a8

File tree

2 files changed

+25
-25
lines changed

2 files changed

+25
-25
lines changed

open-metadata-implementation/adapters/open-connectors/event-bus-connectors/open-metadata-topic-connectors/kafka-open-metadata-topic-connector/src/main/java/org/odpi/openmetadata/adapters/eventbus/topic/kafka/KafkaOpenMetadataEventConsumer.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ public void run()
166166
//The connector queue is too big. Wait until the size goes down until
167167
//polling again. If we let the events just accumulate, we will
168168
//eventually run out of memory if the consumer cannot keep up.
169-
log.info("Skipping Kafka polling since unprocessed message queue size {} is greater than {}", nUnprocessedEvents, maxQueueSize);
169+
log.debug("Skipping Kafka polling since unprocessed message queue size {} is greater than {}", nUnprocessedEvents, maxQueueSize);
170170
awaitNextPollingTime();
171171
continue;
172172

@@ -183,7 +183,7 @@ public void run()
183183
String json = consumerRecord.value();
184184
log.debug("Received message: {}" ,json);
185185
countReceivedMessages++;
186-
log.info("Metrics: receivedMessages: {}", countReceivedMessages);
186+
log.debug("Metrics: receivedMessages: {}", countReceivedMessages);
187187
final KafkaIncomingEvent event = new KafkaIncomingEvent(json, consumerRecord.offset());
188188
final String recordKey=consumerRecord.key();
189189
final String recordValue=consumerRecord.value();
@@ -194,12 +194,12 @@ public void run()
194194
addUnprocessedEvent(consumerRecord.partition(), consumerRecord.topic(), event);
195195
connector.distributeToListeners(event);
196196
countMessagesToProcess++;
197-
log.info("Metrics: messagesToProcess: {}", countMessagesToProcess);
197+
log.debug("Metrics: messagesToProcess: {}", countMessagesToProcess);
198198
}
199199
catch (Exception error)
200200
{
201201
countMessagesFailedToProcess++;
202-
log.info("Metrics: messagesFailedToProcess: {}", countMessagesFailedToProcess);
202+
log.debug("Metrics: messagesFailedToProcess: {}", countMessagesFailedToProcess);
203203
log.warn("Error distributing inbound event: {}", error.getMessage());
204204

205205
if (auditLog != null)
@@ -217,7 +217,7 @@ public void run()
217217
{
218218
log.debug("Ignoring message with key: {} and value: {}",recordKey, recordValue);
219219
countIgnoredMessages++;
220-
log.info("Metrics: ignoredMessages: {}", countIgnoredMessages);
220+
log.debug("Metrics: ignoredMessages: {}", countIgnoredMessages);
221221
}
222222

223223
if ( isAutoCommitEnabled) {
@@ -231,14 +231,14 @@ public void run()
231231
final TopicPartition partition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
232232
currentOffsets.put(partition, new OffsetAndMetadata(consumerRecord.offset() + 1));
233233
countCommits++;
234-
log.info("Metrics: messageCommits: {}", countCommits);
234+
log.debug("Metrics: messageCommits: {}", countCommits);
235235

236236
}
237237
}
238238
}
239239
catch (WakeupException e)
240240
{
241-
log.info("Received wakeup call, proceeding with graceful shutdown");
241+
log.debug("Received wakeup call, proceeding with graceful shutdown");
242242
}
243243
catch (Exception error)
244244
{
@@ -313,7 +313,7 @@ public void run()
313313
}
314314
consumer = null;
315315
}
316-
log.debug("Exiting main loop for topic {} & cleaning up", this.topicToSubscribe);
316+
log.info("Exiting main loop for topic {} & cleaning up", this.topicToSubscribe);
317317

318318
}
319319

@@ -358,7 +358,7 @@ private boolean checkForFullyProcessedMessages() {
358358
if (isAutoCommitEnabled) {
359359
return false;
360360
}
361-
log.info("Checking for fully processed messages whose offsets need to be committed");
361+
log.debug("Checking for fully processed messages whose offsets need to be committed");
362362

363363
//Check all the queues to see they have events initial events
364364
//that are fully processed
@@ -375,7 +375,7 @@ private boolean checkForFullyProcessedMessages() {
375375

376376
if (! commitData.isEmpty()) {
377377
currentOffsets.putAll(commitData);
378-
log.info("Committing: {}", commitData);
378+
log.debug("Committing: {}", commitData);
379379
try {
380380
consumer.commitSync(commitData);
381381
return true;
@@ -417,15 +417,15 @@ private KafkaIncomingEvent removeFullyProcessedEventsFromBeginningOfQueue(Queue<
417417
//The message at the beginning of the queue has been fully processed. Remove
418418
//it from the queue and repeat the check.
419419
lastRemoved = queue.remove();
420-
log.info("Message with offset {} has been fully processed.",lastRemoved.getOffset() );
420+
log.debug("Message with offset {} has been fully processed.",lastRemoved.getOffset() );
421421
countCommits++;
422-
log.info("Metrics: commits: {}", countCommits);
422+
log.debug("Metrics: commits: {}", countCommits);
423423
}
424424
KafkaIncomingEvent firstEvent = queue.peek();
425425
if (firstEvent != null) {
426426
//Queue is not empty, so we're waiting for the processing of first message in
427427
//the queue to finish
428-
log.info("Waiting for completing of processing of message with offset {}",firstEvent.getOffset());
428+
log.debug("Waiting for completing of processing of message with offset {}",firstEvent.getOffset());
429429
}
430430
return lastRemoved;
431431
}
@@ -551,12 +551,12 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
551551
// Check if we need to rewind to handle initial startup case -- but only on first assignment
552552
try {
553553
if (initialPartitionAssignment) {
554-
log.info("Received initial PartitionsAssigned event");
554+
log.debug("Received initial PartitionsAssigned event");
555555

556556
long partitionCount = partitions.size();
557557

558558
if (partitionCount != 1) {
559-
log.info("Received PartitionsAssigned event with {} partitions. This is not supported.",partitionCount);
559+
log.warn("Received PartitionsAssigned event with {} partitions. This is not supported.",partitionCount);
560560
} else {
561561
// there is only one partition, so we can just grab the first one - and we'll try this once only
562562
initialPartitionAssignment = false;

open-metadata-implementation/adapters/open-connectors/event-bus-connectors/open-metadata-topic-connectors/kafka-open-metadata-topic-connector/src/main/java/org/odpi/openmetadata/adapters/eventbus/topic/kafka/KafkaOpenMetadataEventProducer.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ private void publishEvent(String event) throws ConnectorCheckedException {
8484
long eventRetryCount = 0;
8585

8686
messagePublishRequestCount++;
87-
log.info("Metrics: messagePublishRequestCount {}", messagePublishRequestCount);
87+
log.debug("Metrics: messagePublishRequestCount {}", messagePublishRequestCount);
8888

8989
if (producer == null) {
9090
try {
@@ -107,11 +107,11 @@ private void publishEvent(String event) throws ConnectorCheckedException {
107107
log.debug("Sending message try {} [0 based] : {}", eventRetryCount,event);
108108
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, localServerId, event);
109109
kafkaSendAttemptCount++;
110-
log.info("Metrics: kafkaSendAttemptCount {}", kafkaSendAttemptCount);
110+
log.debug("Metrics: kafkaSendAttemptCount {}", kafkaSendAttemptCount);
111111
producer.send(producerRecord).get();
112112
eventSent = true;
113113
messageSendCount++;
114-
log.info("Metrics: messageSendCount {}", messageSendCount);
114+
log.debug("Metrics: messageSendCount {}", messageSendCount);
115115
} catch (ExecutionException error) {
116116
kafkaSendFailCount++;
117117
log.debug("Metrics: kafkaSendFailCount {}", kafkaSendFailCount);
@@ -129,7 +129,7 @@ private void publishEvent(String event) throws ConnectorCheckedException {
129129
producer = null;
130130

131131
messageFailedSendCount++;
132-
log.info(messageFailedCountString, messageFailedSendCount);
132+
log.warn(messageFailedCountString, messageFailedSendCount);
133133

134134
throw new ConnectorCheckedException(
135135
KafkaOpenMetadataTopicConnectorErrorCode.ERROR_SENDING_EVENT.getMessageDefinition(
@@ -141,7 +141,7 @@ private void publishEvent(String event) throws ConnectorCheckedException {
141141
producer.close();
142142
producer = null;
143143
messageFailedSendCount++;
144-
log.info(messageFailedCountString, messageFailedSendCount);
144+
log.warn(messageFailedCountString, messageFailedSendCount);
145145
log.error("Retryable Exception closed producer after {} tries", eventRetryCount);
146146
break;
147147
} else {
@@ -171,7 +171,7 @@ private void publishEvent(String event) throws ConnectorCheckedException {
171171
}
172172

173173
messageFailedSendCount++;
174-
log.info(messageFailedCountString, messageFailedSendCount);
174+
log.warn(messageFailedCountString, messageFailedSendCount);
175175

176176
throw new ConnectorCheckedException(
177177
KafkaOpenMetadataTopicConnectorErrorCode.ERROR_SENDING_EVENT.getMessageDefinition(
@@ -225,7 +225,7 @@ public void run() {
225225
}
226226
}
227227
} catch (InterruptedException error) {
228-
log.info("Woken up from sleep ");
228+
log.debug("Woken up from sleep ");
229229
Thread.currentThread().interrupt();
230230
} catch (Exception error) {
231231
log.warn("Bad exception from sending events: {}",error.getMessage());
@@ -240,7 +240,7 @@ public void run() {
240240
}
241241
}
242242
}
243-
log.debug("Exiting main loop for topic {} & cleaning up", topicName);
243+
log.info("Exiting main loop for topic {} & cleaning up", topicName);
244244

245245
/* producer may have already closed by exception handler in publishEvent */
246246
if (producer != null) {
@@ -265,8 +265,8 @@ public void run() {
265265
*/
266266
private void putEvent(String newEvent) {
267267
inmemoryPutMessageCount++;
268-
log.info("Metrics: inmemoryPutMessageCount {}", inmemoryPutMessageCount);
269-
log.info("Metrics: sendBufferSize {}", sendBuffer.size());
268+
log.debug("Metrics: inmemoryPutMessageCount {}", inmemoryPutMessageCount);
269+
log.debug("Metrics: sendBufferSize {}", sendBuffer.size());
270270
sendBuffer.add(newEvent);
271271
}
272272

0 commit comments

Comments
 (0)