Skip to content

Commit 2a5855e

Browse files
Merge remote-tracking branch 'vb-ce/master'
2 parents ec2538a + 4abe7ba commit 2a5855e

131 files changed

Lines changed: 4868 additions & 3048 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ public void process(CalculatedFieldStateRestoreMsg msg) {
132132
} else {
133133
removeState(cfId);
134134
}
135+
msg.getCallback().onSuccess();
135136
}
136137

137138
public void process(CalculatedFieldStatePartitionRestoreMsg msg) {

application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,10 +167,13 @@ public void onStateRestoreMsg(CalculatedFieldStateRestoreMsg msg) {
167167

168168
if (ctx != null) {
169169
msg.setCtx(ctx);
170-
log.debug("Pushing CF state restore msg to specific actor [{}]", msg.getId().entityId());
170+
log.debug("[{}] Pushing CF state restore msg to specific actor [{}]", tenantId, msg.getId().entityId());
171171
getOrCreateActor(msg.getId().entityId()).tellWithHighPriority(msg);
172-
} else {
172+
} else if (msg.getState() != null) {
173+
log.debug("[{}] Received CF state restore msg for non-existing CF [{}]. Removing state", tenantId, cfId);
173174
cfStateService.deleteState(msg.getId(), msg.getCallback());
175+
} else {
176+
msg.getCallback().onSuccess();
174177
}
175178
}
176179

application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldStateRestoreMsg.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.thingsboard.server.common.data.id.TenantId;
2020
import org.thingsboard.server.common.msg.MsgType;
2121
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
22+
import org.thingsboard.server.common.msg.queue.TbCallback;
2223
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
2324
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
2425
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
@@ -30,6 +31,7 @@ public class CalculatedFieldStateRestoreMsg implements ToCalculatedFieldSystemMs
3031
private final CalculatedFieldEntityCtxId id;
3132
private final CalculatedFieldState state;
3233
private final TopicPartitionInfo partition;
34+
private final TbCallback callback;
3335
private CalculatedFieldCtx ctx;
3436

3537
@Override
@@ -41,4 +43,5 @@ public MsgType getMsgType() {
4143
public TenantId getTenantId() {
4244
return id.tenantId();
4345
}
46+
4447
}

application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
4444
import org.thingsboard.server.common.data.rule.RuleChain;
4545
import org.thingsboard.server.common.data.rule.RuleChainType;
46-
import org.thingsboard.server.common.msg.MsgType;
4746
import org.thingsboard.server.common.msg.TbActorMsg;
4847
import org.thingsboard.server.common.msg.TbActorStopReason;
4948
import org.thingsboard.server.common.msg.TbMsg;
@@ -139,13 +138,22 @@ public void destroy(TbActorStopReason stopReason, Throwable cause) {
139138
@Override
140139
protected boolean doProcess(TbActorMsg msg) {
141140
if (cantFindTenant) {
142-
log.info("[{}] Processing missing Tenant msg: {}", tenantId, msg);
143-
if (msg.getMsgType().equals(MsgType.QUEUE_TO_RULE_ENGINE_MSG)) {
144-
QueueToRuleEngineMsg queueMsg = (QueueToRuleEngineMsg) msg;
145-
queueMsg.getMsg().getCallback().onSuccess();
146-
} else if (msg.getMsgType().equals(MsgType.TRANSPORT_TO_DEVICE_ACTOR_MSG)) {
147-
TransportToDeviceActorMsgWrapper transportMsg = (TransportToDeviceActorMsgWrapper) msg;
148-
transportMsg.getCallback().onSuccess();
141+
log.debug("[{}] Processing message for non-existing tenant: {}", tenantId, msg);
142+
switch (msg.getMsgType()) {
143+
case QUEUE_TO_RULE_ENGINE_MSG -> {
144+
((QueueToRuleEngineMsg) msg).getMsg().getCallback().onSuccess();
145+
}
146+
case TRANSPORT_TO_DEVICE_ACTOR_MSG -> {
147+
((TransportToDeviceActorMsgWrapper) msg).getCallback().onSuccess();
148+
}
149+
case CF_STATE_RESTORE_MSG -> {
150+
((CalculatedFieldStateRestoreMsg) msg).getCallback().onSuccess();
151+
}
152+
default -> {
153+
if (!log.isDebugEnabled()) {
154+
log.info("[{}] Processing message for non-existing tenant: {}", tenantId, msg);
155+
}
156+
}
149157
}
150158
return true;
151159
}

application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public final void deleteState(CalculatedFieldEntityCtxId stateId, TbCallback cal
6868

6969
protected abstract void doRemove(CalculatedFieldEntityCtxId stateId, TbCallback callback);
7070

71-
protected void processRestoredState(CalculatedFieldStateProto stateMsg, TopicPartitionInfo partition) {
71+
protected void processRestoredState(CalculatedFieldStateProto stateMsg, TopicPartitionInfo partition, TbCallback callback) {
7272
var id = fromProto(stateMsg.getId());
7373
if (partition == null) {
7474
try {
@@ -79,12 +79,12 @@ protected void processRestoredState(CalculatedFieldStateProto stateMsg, TopicPar
7979
}
8080
}
8181
var state = fromProto(id, stateMsg);
82-
processRestoredState(id, state, partition);
82+
processRestoredState(id, state, partition, callback);
8383
}
8484

85-
protected void processRestoredState(CalculatedFieldEntityCtxId id, CalculatedFieldState state, TopicPartitionInfo partition) {
85+
protected void processRestoredState(CalculatedFieldEntityCtxId id, CalculatedFieldState state, TopicPartitionInfo partition, TbCallback callback) {
8686
partition = partition.withTopic(DataConstants.CF_STATES_QUEUE_NAME);
87-
actorSystemContext.tellWithHighPriority(new CalculatedFieldStateRestoreMsg(id, state, partition));
87+
actorSystemContext.tellWithHighPriority(new CalculatedFieldStateRestoreMsg(id, state, partition, callback));
8888
}
8989

9090
@Override

application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
import org.thingsboard.server.service.cf.AbstractCalculatedFieldStateService;
4444
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
4545

46+
import java.util.concurrent.CountDownLatch;
47+
import java.util.concurrent.TimeUnit;
4648
import java.util.concurrent.atomic.AtomicInteger;
4749

4850
import static org.thingsboard.server.queue.common.AbstractTbQueueTemplate.bytesToString;
@@ -61,6 +63,8 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
6163

6264
@Value("${queue.calculated_fields.poll_interval:25}")
6365
private long pollInterval;
66+
@Value("${queue.calculated_fields.pack_processing_timeout:60000}")
67+
private long packProcessingTimeout;
6468

6569
private TbKafkaProducerTemplate<TbProtoQueueMsg<CalculatedFieldStateProto>> stateProducer;
6670

@@ -74,21 +78,39 @@ public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFie
7478
.topic(partitionService.getTopic(queueKey))
7579
.pollInterval(pollInterval)
7680
.msgPackProcessor((msgs, consumer, consumerKey, config) -> {
81+
CountDownLatch completionLatch = new CountDownLatch(msgs.size());
7782
for (TbProtoQueueMsg<CalculatedFieldStateProto> msg : msgs) {
83+
TbCallback callback = new TbCallback() {
84+
@Override
85+
public void onSuccess() {
86+
int processedMsgCount = counter.incrementAndGet();
87+
if (processedMsgCount % 10000 == 0) {
88+
log.info("Processed {} CF state messages", processedMsgCount);
89+
}
90+
completionLatch.countDown();
91+
}
92+
93+
@Override
94+
public void onFailure(Throwable t) {
95+
log.error("Failed to process CF state message: {}", msg, t);
96+
completionLatch.countDown();
97+
}
98+
};
99+
78100
try {
79101
if (msg.getValue() != null) {
80-
processRestoredState(msg.getValue(), consumerKey.partition());
102+
processRestoredState(msg.getValue(), consumerKey.partition(), callback);
81103
} else {
82-
processRestoredState(getStateId(msg.getHeaders()), null, consumerKey.partition());
104+
processRestoredState(getStateId(msg.getHeaders()), null, consumerKey.partition(), callback);
83105
}
84106
} catch (Throwable t) {
85-
log.error("Failed to process state message: {}", msg, t);
107+
callback.onFailure(t);
86108
}
109+
}
87110

88-
int processedMsgCount = counter.incrementAndGet();
89-
if (processedMsgCount % 10000 == 0) {
90-
log.info("Processed {} calculated field state msgs", processedMsgCount);
91-
}
111+
boolean success = completionLatch.await(packProcessingTimeout, TimeUnit.MILLISECONDS);
112+
if (!success) {
113+
log.error("Timeout to process CF state messages pack of size {}", msgs.size());
92114
}
93115
})
94116
.consumerCreator((queueConfig, tpi) -> queueFactory.createCalculatedFieldStateConsumer())

application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBCalculatedFieldStateService.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,22 @@ protected void doRemove(CalculatedFieldEntityCtxId stateId, TbCallback callback)
6262
public void restore(QueueKey queueKey, Set<TopicPartitionInfo> partitions) {
6363
if (stateService.getPartitions().isEmpty()) {
6464
cfRocksDb.forEach((key, value) -> {
65+
CalculatedFieldStateProto stateMsg;
6566
try {
66-
processRestoredState(CalculatedFieldStateProto.parseFrom(value), null);
67+
stateMsg = CalculatedFieldStateProto.parseFrom(value);
6768
} catch (Exception e) {
68-
log.error("[{}] Failed to process restored state", key, e);
69+
log.error("Failed to parse CalculatedFieldStateProto for key {}", key, e);
70+
return;
6971
}
72+
processRestoredState(stateMsg, null, new TbCallback() {
73+
@Override
74+
public void onSuccess() {}
75+
76+
@Override
77+
public void onFailure(Throwable t) {
78+
log.error("Failed to process CF state message: {}", stateMsg, t);
79+
}
80+
});
7081
});
7182
}
7283
super.restore(queueKey, partitions);

application/src/main/java/org/thingsboard/server/service/device/DeviceBulkImportService.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,8 +258,7 @@ private DeviceProfile setUpLwM2mDeviceProfile(TenantId tenantId, Device device)
258258

259259
Lwm2mDeviceProfileTransportConfiguration transportConfiguration = new Lwm2mDeviceProfileTransportConfiguration();
260260
transportConfiguration.setBootstrap(Collections.emptyList());
261-
transportConfiguration.setClientLwM2mSettings(new OtherConfiguration(false,1, 1, 1, PowerMode.DRX, null, null, null, null, null, V1_0.toString()));
262-
transportConfiguration.setObserveAttr(new TelemetryMappingConfiguration(Collections.emptyMap(), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), false, SINGLE));
261+
transportConfiguration.setClientLwM2mSettings(new OtherConfiguration());
263262

264263
DeviceProfileData deviceProfileData = new DeviceProfileData();
265264
DefaultDeviceProfileConfiguration configuration = new DefaultDeviceProfileConfiguration();

application/src/main/resources/tb-edge.yml

Lines changed: 35 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -589,14 +589,14 @@ actors:
589589
response_timeout_ms: "${ACTORS_RPC_RESPONSE_TIMEOUT_MS:30000}"
590590
# Close transport session if RPC delivery timed out. If enabled, RPC will be reverted to the queued state.
591591
# Note:
592-
# - For MQTT transport:
593-
# - QoS level 0: This feature does not apply, as no acknowledgment is expected, and therefore no timeout is triggered.
594-
# - QoS level 1: This feature applies, as an acknowledgment is expected.
595-
# - QoS level 2: Unsupported.
596-
# - For CoAP transport:
597-
# - Confirmable requests: This feature applies, as delivery confirmation is expected.
598-
# - Non-confirmable requests: This feature does not apply, as no delivery acknowledgment is expected.
599-
# - For HTTP and SNPM transports: RPC is considered delivered immediately, and there is no logic to await acknowledgment.
592+
# <ul><li> For MQTT transport:
593+
# <ul><li> QoS level 0: This feature does not apply, as no acknowledgment is expected, and therefore no timeout is triggered.</li>
594+
# <li> QoS level 1: This feature applies, as an acknowledgment is expected.</li>
595+
# <li> QoS level 2: Unsupported.</li></ul></li>
596+
# <li> For CoAP transport:
597+
# <ul><li> Confirmable requests: This feature applies, as delivery confirmation is expected.</li>
598+
# <li> Non-confirmable requests: This feature does not apply, as no delivery acknowledgment is expected.</li></ul></li>
599+
# <li> For HTTP and SNPM transports: RPC is considered delivered immediately, and there is no logic to await acknowledgment.</li></ul>
600600
close_session_on_rpc_delivery_timeout: "${ACTORS_RPC_CLOSE_SESSION_ON_RPC_DELIVERY_TIMEOUT:false}"
601601
statistics:
602602
# Enable/disable actor statistics
@@ -806,13 +806,13 @@ redis:
806806
# if set false will be used pool config build from values of the pool config section
807807
useDefaultPoolConfig: "${REDIS_USE_DEFAULT_POOL_CONFIG:true}"
808808
sentinel:
809-
# name of the master node
809+
# Name of the master node
810810
master: "${REDIS_MASTER:}"
811-
# comma-separated list of "host:port" pairs of sentinels
811+
# Comma-separated list of "host:port" pairs of sentinels
812812
sentinels: "${REDIS_SENTINELS:}"
813-
# password to authenticate with sentinel
813+
# Password to authenticate with sentinel
814814
password: "${REDIS_SENTINEL_PASSWORD:}"
815-
# if set false will be used pool config build from values of the pool config section
815+
# If set false will be used pool config build from values of the pool config section
816816
useDefaultPoolConfig: "${REDIS_USE_DEFAULT_POOL_CONFIG:true}"
817817
# db index
818818
db: "${REDIS_DB:0}"
@@ -1112,10 +1112,10 @@ transport:
11121112
activity:
11131113
# This property specifies the strategy for reporting activity events within each reporting period.
11141114
# The accepted values are 'FIRST', 'LAST', 'FIRST_AND_LAST' and 'ALL'.
1115-
# - 'FIRST': Only the first activity event in each reporting period is reported.
1116-
# - 'LAST': Only the last activity event in the reporting period is reported.
1117-
# - 'FIRST_AND_LAST': Both the first and last activity events in the reporting period are reported.
1118-
# - 'ALL': All activity events in the reporting period are reported.
1115+
# <ul><li> 'FIRST': Only the first activity event in each reporting period is reported.</li>
1116+
# <li> 'LAST': Only the last activity event in the reporting period is reported.</li>
1117+
# <li> 'FIRST_AND_LAST': Both the first and last activity events in the reporting period are reported.</li>
1118+
# <li> 'ALL': All activity events in the reporting period are reported.</li></ul>
11191119
reporting_strategy: "${TB_TRANSPORT_ACTIVITY_REPORTING_STRATEGY:LAST}"
11201120
json:
11211121
# Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON
@@ -1233,15 +1233,15 @@ transport:
12331233
dtls:
12341234
# RFC7925_RETRANSMISSION_TIMEOUT_IN_MILLISECONDS = 9000
12351235
retransmission_timeout: "${LWM2M_DTLS_RETRANSMISSION_TIMEOUT_MS:9000}"
1236-
# CoAP DTLS connection ID length for LWM2M. RFC 9146, Connection Identifier for DTLS 1.2
1237-
# Default: off
1236+
# LWM2M DTLS connection ID length for LWM2M. RFC 9146, Connection Identifier for DTLS 1.2
1237+
# Default: off. <br>
12381238
# Control usage of DTLS connection ID length (CID).
1239-
# - 'off' to deactivate it.
1240-
# - 'on' to activate Connection ID support (same as CID 0 or more 0).
1241-
# - A positive value defines generated CID size in bytes.
1242-
# - A value of 0 means we accept using CID but will not generate one for foreign peer (enables support but not for incoming traffic).
1243-
# - A value between 0 and <= 4: SingleNodeConnectionIdGenerator is used
1244-
# - A value that are > 4: MultiNodeConnectionIdGenerator is used
1239+
# <ul> <li> 'off' to deactivate it. </li>
1240+
# <li> 'on' to activate Connection ID support (same as CID 0 or more 0). </li>
1241+
# <li> A positive value defines generated CID size in bytes.</li>
1242+
# <li> A value of 0 means we accept using CID but will not generate one for foreign peer (enables support but not for incoming traffic). </li>
1243+
# <li> A value between 0 and <= 4: SingleNodeConnectionIdGenerator is used </li>
1244+
# <li> A value that are > 4: MultiNodeConnectionIdGenerator is used </li> </ul>
12451245
connection_id_length: "${LWM2M_DTLS_CONNECTION_ID_LENGTH:8}"
12461246
server:
12471247
# LwM2M Server ID
@@ -1424,14 +1424,14 @@ coap:
14241424
# CoAP DTLS bind port
14251425
bind_port: "${COAP_DTLS_BIND_PORT:5684}"
14261426
# CoAP DTLS connection ID length. RFC 9146, Connection Identifier for DTLS 1.2
1427-
# Default: off
1427+
# Default: off. <br>
14281428
# Control usage of DTLS connection ID length (CID).
1429-
# - 'off' to deactivate it.
1430-
# - 'on' to activate Connection ID support (same as CID 0 or more 0).
1431-
# - A positive value defines generated CID size in bytes.
1432-
# - A value of 0 means we accept using CID but will not generate one for foreign peer (enables support but not for incoming traffic).
1433-
# - A value between 0 and <= 4: SingleNodeConnectionIdGenerator is used
1434-
# - A value that are > 4: MultiNodeConnectionIdGenerator is used
1429+
# <ul> <li> 'off' to deactivate it. </li>
1430+
# <li> 'on' to activate Connection ID support (same as CID 0 or more 0). </li>
1431+
# <li> A positive value defines generated CID size in bytes.</li>
1432+
# <li> A value of 0 means we accept using CID but will not generate one for foreign peer (enables support but not for incoming traffic).</li>
1433+
# <li> A value between 0 and <= 4: SingleNodeConnectionIdGenerator is used</li>
1434+
# <li> A value that are > 4: MultiNodeConnectionIdGenerator is used</li> </ul>
14351435
connection_id_length: "${COAP_DTLS_CONNECTION_ID_LENGTH:8}"
14361436
# Specify the MTU (Maximum Transmission Unit).
14371437
# Should be used if LAN MTU is not used, e.g. if IP tunnels are used or if the client uses a smaller value than the LAN MTU.
@@ -1448,13 +1448,13 @@ coap:
14481448
# In order to negotiate smaller maximum fragment lengths,
14491449
# clients MAY include an extension of type "max_fragment_length" in the (extended) client hello.
14501450
# The "extension_data" field of this extension SHALL contain:
1451-
# enum {
1451+
# <pre> enum {
14521452
# 2^9(1) == 512,
14531453
# 2^10(2) == 1024,
14541454
# 2^11(3) == 2048,
14551455
# 2^12(4) == 4096,
14561456
# (255)
1457-
# } MaxFragmentLength;
1457+
# } MaxFragmentLength; </pre>
14581458
# TLS already requires clients and servers to support fragmentation of handshake messages.
14591459
max_fragment_length: "${COAP_DTLS_MAX_FRAGMENT_LENGTH:1024}"
14601460
# Server DTLS credentials
@@ -1832,6 +1832,8 @@ queue:
18321832
print-interval-ms: "${TB_QUEUE_KAFKA_CONSUMER_STATS_MIN_PRINT_INTERVAL_MS:60000}"
18331833
# Time to wait for the stats-loading requests to Kafka to finish
18341834
kafka-response-timeout-ms: "${TB_QUEUE_KAFKA_CONSUMER_STATS_RESPONSE_TIMEOUT_MS:1000}"
1835+
# Topics cache TTL in milliseconds. 5 minutes by default
1836+
topics_cache_ttl_ms: "${TB_QUEUE_KAFKA_TOPICS_CACHE_TTL_MS:300000}"
18351837
partitions:
18361838
hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" # murmur3_32, murmur3_128 or sha256
18371839
transport_api:

0 commit comments

Comments
 (0)