-
-
Notifications
You must be signed in to change notification settings - Fork 461
[YAML thing provider] Clean retry queue when a thing is updated/removed #5361
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -103,16 +103,19 @@ public class YamlThingProvider extends AbstractProvider<Thing> | |||||||||||||||||||||||||
| private final Map<String, Collection<Thing>> thingsMap = new ConcurrentHashMap<>(); | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| private final List<QueueContent> queue = new CopyOnWriteArrayList<>(); | ||||||||||||||||||||||||||
| private final Object queueLock = new Object(); | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| private final Runnable lazyRetryRunnable = new Runnable() { | ||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||
| public void run() { | ||||||||||||||||||||||||||
| logger.debug("Starting lazy retry thread"); | ||||||||||||||||||||||||||
| while (!queue.isEmpty()) { | ||||||||||||||||||||||||||
| for (QueueContent qc : queue) { | ||||||||||||||||||||||||||
| if (retryCreateThing(qc.thingHandlerFactory, qc.thingTypeUID, qc.configuration, qc.thingUID, | ||||||||||||||||||||||||||
| qc.bridgeUID)) { | ||||||||||||||||||||||||||
| queue.remove(qc); | ||||||||||||||||||||||||||
| synchronized (queueLock) { | ||||||||||||||||||||||||||
| for (QueueContent qc : queue) { | ||||||||||||||||||||||||||
| if (retryCreateThing(qc.thingHandlerFactory, qc.thingTypeUID, qc.configuration, qc.thingUID, | ||||||||||||||||||||||||||
| qc.bridgeUID)) { | ||||||||||||||||||||||||||
| queue.remove(qc); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| if (!queue.isEmpty()) { | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
|
|
@@ -198,6 +201,11 @@ public void addedModel(String modelName, Collection<YamlThingDTO> elements) { | |||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||
| public void updatedModel(String modelName, Collection<YamlThingDTO> elements) { | ||||||||||||||||||||||||||
| boolean isolated = isIsolatedModel(modelName); | ||||||||||||||||||||||||||
| if (!isolated) { | ||||||||||||||||||||||||||
| elements.stream().map(this::buildThingUID).filter(Objects::nonNull).forEach(uid -> { | ||||||||||||||||||||||||||
| removeFromRetryQueue(uid); | ||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| List<Thing> updated = elements.stream().map(t -> mapThing(t, isolated)).filter(Objects::nonNull).toList(); | ||||||||||||||||||||||||||
| Collection<Thing> modelThings = Objects | ||||||||||||||||||||||||||
| .requireNonNull(thingsMap.computeIfAbsent(modelName, k -> new ArrayList<>())); | ||||||||||||||||||||||||||
|
|
@@ -224,6 +232,9 @@ public void removedModel(String modelName, Collection<YamlThingDTO> elements) { | |||||||||||||||||||||||||
| boolean isolated = isIsolatedModel(modelName); | ||||||||||||||||||||||||||
| Collection<Thing> modelThings = thingsMap.getOrDefault(modelName, List.of()); | ||||||||||||||||||||||||||
| elements.stream().map(this::buildThingUID).filter(Objects::nonNull).forEach(uid -> { | ||||||||||||||||||||||||||
| if (!isolated) { | ||||||||||||||||||||||||||
| removeFromRetryQueue(uid); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| modelThings.stream().filter(th -> th.getUID().equals(uid)).findFirst().ifPresentOrElse(oldThing -> { | ||||||||||||||||||||||||||
| modelThings.remove(oldThing); | ||||||||||||||||||||||||||
| logger.debug("model {} removed thing {}", modelName, uid); | ||||||||||||||||||||||||||
|
|
@@ -544,7 +555,9 @@ private void mergeThing(Thing target, Thing source, boolean keepSourceConfig) { | |||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| private void queueRetryThingCreation(ThingHandlerFactory handlerFactory, ThingTypeUID thingTypeUID, | ||||||||||||||||||||||||||
| Configuration configuration, ThingUID thingUID, @Nullable ThingUID bridgeUID) { | ||||||||||||||||||||||||||
| queue.add(new QueueContent(handlerFactory, thingTypeUID, configuration, thingUID, bridgeUID)); | ||||||||||||||||||||||||||
| synchronized (queueLock) { | ||||||||||||||||||||||||||
| queue.add(new QueueContent(handlerFactory, thingTypeUID, configuration, thingUID, bridgeUID)); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| Thread thread = lazyRetryThread; | ||||||||||||||||||||||||||
| if (thread == null || !thread.isAlive()) { | ||||||||||||||||||||||||||
| thread = new Thread(lazyRetryRunnable); | ||||||||||||||||||||||||||
|
|
@@ -553,6 +566,16 @@ private void queueRetryThingCreation(ThingHandlerFactory handlerFactory, ThingTy | |||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| private void removeFromRetryQueue(ThingUID thingUID) { | ||||||||||||||||||||||||||
| synchronized (queueLock) { | ||||||||||||||||||||||||||
| for (QueueContent qc : queue) { | ||||||||||||||||||||||||||
| if (thingUID.equals(qc.thingUID)) { | ||||||||||||||||||||||||||
| queue.remove(qc); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
Comment on lines
+571
to
+575
|
||||||||||||||||||||||||||
| for (QueueContent qc : queue) { | |
| if (thingUID.equals(qc.thingUID)) { | |
| queue.remove(qc); | |
| } | |
| } | |
| queue.removeIf(qc -> thingUID.equals(qc.thingUID)); |
Copilot
AI
Feb 16, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The removeFromRetryQueue method continues iterating through all queue elements even after finding and removing a match. This is inefficient and could also remove multiple entries if duplicate thingUID entries exist in the queue. Consider adding a break statement after queue.remove(qc) to exit the loop once a match is found, or use queue.removeIf(qc -> thingUID.equals(qc.thingUID)) which is more efficient and idiomatic.
| for (QueueContent qc : queue) { | |
| if (thingUID.equals(qc.thingUID)) { | |
| queue.remove(qc); | |
| } | |
| } | |
| queue.removeIf(qc -> thingUID.equals(qc.thingUID)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The queue.isEmpty() check at line 112 is performed outside the synchronized block, which could lead to a race condition. If the queue becomes empty between the check and entering the synchronized block, or if items are added/removed by other threads, the loop might not behave as expected. Consider moving the isEmpty() check inside the synchronized block or restructuring the loop to check emptiness within the synchronized section.