Skip to content

Commit 12f6b2b

Browse files
authored
Merge pull request #2460 from telefonicaid/task/alarm_channel_full
log warn when channel is near full
2 parents 86c7990 + 28641ca commit 12f6b2b

File tree

6 files changed

+39
-1
lines changed

6 files changed

+39
-1
lines changed

CHANGES_NEXT_RELEASE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
- [cygnus-ngsi] Log warn when channel is near to be full (>90%) (#2459)
12
- [cygnus-common] Upgrade postgresql from 42.7.2 to 42.7.5
23
- [cygnus-ngsi] Upgrade Debian version from 12.9 to 12.10 in Dockerfile
34

cygnus-common/src/main/java/com/telefonica/iot/cygnus/channels/CygnusChannel.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,13 @@ public interface CygnusChannel {
6060
* @return The number of take operations on the channel that failed
6161
*/
6262
long getNumTakesFail();
63-
63+
64+
/**
65+
* Gets the number of usage percent of the channel.
66+
* @return The number of usage percente of the channel
67+
*/
68+
double getUsage();
69+
6470
/**
6571
* Sets the number of put operations on the channel that went OK.
6672
* @param n The number to be set

cygnus-common/src/main/java/com/telefonica/iot/cygnus/channels/CygnusFileChannel.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,5 +115,13 @@ public void setNumTakesOK(long n) {
115115
public void setNumTakesFail(long n) {
116116
accTakesFail = channelCounterRef.getEventTakeAttemptCount() - channelCounterRef.getEventTakeSuccessCount();
117117
} // setNumTakesFail
118+
119+
@Override
120+
public double getUsage() {
121+
long currentSize = channelCounterRef.getChannelSize();
122+
long maxCapacity = channelCounterRef.getChannelCapacity();
123+
double usagePercentage = (double) currentSize / maxCapacity * 100;
124+
return usagePercentage;
125+
} // getUsage
118126

119127
} // CygnusFileChannel

cygnus-common/src/main/java/com/telefonica/iot/cygnus/channels/CygnusMemoryChannel.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,4 +120,13 @@ public void setNumTakesFail(long n) {
120120
accTakesFail = channelCounterRef.getEventTakeAttemptCount() - channelCounterRef.getEventTakeSuccessCount();
121121
} // setNumTakesFail
122122

123+
@Override
124+
public double getUsage() {
125+
long currentSize = channelCounterRef.getChannelSize();
126+
long maxCapacity = channelCounterRef.getChannelCapacity();
127+
double usagePercentage = (double) currentSize / maxCapacity * 100;
128+
return usagePercentage;
129+
} // getUsage
130+
123131
} // CygnusMemoryChannel
132+

cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSISink.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.telefonica.iot.cygnus.interceptors.NGSIEvent;
3232
import com.telefonica.iot.cygnus.log.CygnusLogger;
3333
import com.telefonica.iot.cygnus.sinks.Enums.DataModel;
34+
import com.telefonica.iot.cygnus.channels.CygnusChannel;
3435
import static com.telefonica.iot.cygnus.sinks.Enums.DataModel.DMBYATTRIBUTE;
3536
import static com.telefonica.iot.cygnus.sinks.Enums.DataModel.DMBYENTITY;
3637
import static com.telefonica.iot.cygnus.sinks.Enums.DataModel.DMBYENTITYTYPE;
@@ -58,6 +59,7 @@
5859
import org.apache.flume.Sink.Status;
5960
import org.apache.flume.Transaction;
6061
import org.apache.flume.ChannelException;
62+
import org.apache.flume.ChannelFullException;
6163
import org.apache.flume.conf.Configurable;
6264
import org.apache.logging.log4j.ThreadContext;
6365

@@ -500,15 +502,25 @@ protected void doRollbackAgain(Accumulator rollbackedAccumulation) {
500502
} // if else
501503
} // doRollbackAgain
502504

505+
503506
private Status processNewBatches() {
504507
// Get the channel
505508
Channel ch = getChannel();
509+
510+
double channelUsage = 0;
506511

507512
// Start a Flume transaction (it is not the same than a Cygnus transaction!)
508513
Transaction txn = ch.getTransaction();
509514
try {
510515
txn.begin();
511516

517+
CygnusChannel cygnusch = (CygnusChannel) ch;
518+
channelUsage = cygnusch.getUsage();
519+
LOGGER.debug("Channel usage: " + channelUsage + "% in sink " + this.getName());
520+
if (channelUsage > NGSIConstants.HIGH_CHANNEL_PERCENT_USAGE) {
521+
LOGGER.warn("High Channel usage: " + channelUsage + "% in sink " + this.getName());
522+
}
523+
512524
// Get and process as many events as the batch size
513525
int currentIndex;
514526

cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/utils/NGSIConstants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ private NGSIConstants() {
5353

5454
public static final int ROLLBACK_CHANNEL_EXCEPTION_THRESHOLD = 10;
5555
public static final int ROLLBACK_EXCEPTION_THRESHOLD = 10;
56+
57+
public static final int HIGH_CHANNEL_PERCENT_USAGE = 90;
5658

5759
// FIWARE service and FIWARE service path specific constants
5860
public static final int SERVICE_HEADER_MAX_LEN = 50;

0 commit comments

Comments
 (0)