Skip to content

Commit f76eee6

Browse files
dekkersjpbruinsslotunderdarknlammar92
authored
Fix new boefjes issue for scheduler (1.16) (#3329)
Co-authored-by: JP Bruins Slot <jpbruinsslot@gmail.com> Co-authored-by: Jan Klopper <janklopper+underdark@gmail.com> Co-authored-by: ammar92 <ammar.abdulamir@gmail.com>
1 parent 2caf60d commit f76eee6

3 files changed

Lines changed: 463 additions & 127 deletions

File tree

mula/scheduler/connectors/services/katalogus.py

Lines changed: 90 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -22,67 +22,67 @@ def __init__(
2222
):
2323
super().__init__(host, source, timeout, pool_connections)
2424

25-
self.lock = threading.Lock()
26-
2725
# For every organisation we cache its plugins, it references the
2826
# plugin-id as key and the plugin as value.
29-
self.organisations_plugin_cache: dict_utils.ExpiringDict = dict_utils.ExpiringDict(lifetime=cache_ttl)
27+
self.plugin_cache_lock = threading.Lock()
28+
self.plugin_cache = dict_utils.ExpiringDict(lifetime=cache_ttl)
3029

3130
# For every organisation we cache on which type of object (consumes)
3231
# the boefjes consume, it references the object type (consumes)
3332
# as the key and a dict of boefjes as value.
34-
self.organisations_boefje_type_cache: dict_utils.ExpiringDict = dict_utils.ExpiringDict(lifetime=cache_ttl)
33+
self.boefje_cache_lock = threading.Lock()
34+
self.boefje_cache = dict_utils.ExpiringDict(lifetime=cache_ttl)
3535

3636
# For every organisation we cache on which type of object (consumes)
3737
# the normalizers consume, it references the object type (consumes)
3838
# as the key and a dict of normalizers as value.
39-
self.organisations_normalizer_type_cache: dict_utils.ExpiringDict = dict_utils.ExpiringDict(lifetime=cache_ttl)
39+
self.normalizer_cache_lock = threading.Lock()
40+
self.normalizer_cache = dict_utils.ExpiringDict(lifetime=cache_ttl)
4041

4142
# For every organisation we cache which new boefjes for an organisation
4243
# have been enabled.
43-
self.organisations_new_boefjes_cache: dict = {}
44+
self.new_boefjes_cache_lock = threading.Lock()
45+
self.new_boefjes_cache: dict = {}
4446

4547
# Initialise the cache.
4648
self.flush_caches()
4749

4850
def flush_caches(self) -> None:
49-
self.flush_organisations_plugin_cache()
50-
self.flush_organisations_normalizer_type_cache()
51-
self.flush_organisations_boefje_type_cache()
51+
self.flush_plugin_cache()
52+
self.flush_normalizer_cache()
53+
self.flush_boefje_cache()
5254

53-
def flush_organisations_plugin_cache(self) -> None:
55+
def flush_plugin_cache(self) -> None:
5456
self.logger.debug("Flushing the katalogus plugin cache for organisations")
5557

56-
with self.lock:
58+
with self.plugin_cache_lock:
5759
# First, we reset the cache, to make sure we won't get any ExpiredError
58-
self.organisations_plugin_cache.expiration_enabled = False
59-
self.organisations_plugin_cache.reset()
60+
self.plugin_cache.expiration_enabled = False
61+
self.plugin_cache.reset()
6062

6163
orgs = self.get_organisations()
6264
for org in orgs:
63-
if org.id not in self.organisations_plugin_cache:
64-
self.organisations_plugin_cache[org.id] = {}
65-
self.organisations_new_boefjes_cache[org.id] = {}
65+
self.plugin_cache.setdefault(org.id, {})
6666

6767
plugins = self.get_plugins_by_organisation(org.id)
68-
self.organisations_plugin_cache[org.id] = {plugin.id: plugin for plugin in plugins if plugin.enabled}
68+
self.plugin_cache[org.id] = {plugin.id: plugin for plugin in plugins if plugin.enabled}
6969

70-
self.organisations_plugin_cache.expiration_enabled = True
70+
self.plugin_cache.expiration_enabled = True
7171

7272
self.logger.debug("Flushed the katalogus plugin cache for organisations")
7373

74-
def flush_organisations_boefje_type_cache(self) -> None:
74+
def flush_boefje_cache(self) -> None:
7575
"""boefje.consumes -> plugin type boefje"""
7676
self.logger.debug("Flushing the katalogus boefje type cache for organisations")
7777

78-
with self.lock:
78+
with self.boefje_cache_lock:
7979
# First, we reset the cache, to make sure we won't get any ExpiredError
80-
self.organisations_boefje_type_cache.expiration_enabled = False
81-
self.organisations_boefje_type_cache.reset()
80+
self.boefje_cache.expiration_enabled = False
81+
self.boefje_cache.reset()
8282

8383
orgs = self.get_organisations()
8484
for org in orgs:
85-
self.organisations_boefje_type_cache[org.id] = {}
85+
self.boefje_cache[org.id] = {}
8686

8787
for plugin in self.get_plugins_by_organisation(org.id):
8888
if plugin.type != "boefje":
@@ -97,28 +97,28 @@ def flush_organisations_boefje_type_cache(self) -> None:
9797
# NOTE: backwards compatibility, when it is a boefje the
9898
# consumes field is a string field.
9999
if isinstance(plugin.consumes, str):
100-
self.organisations_boefje_type_cache[org.id].setdefault(plugin.consumes, []).append(plugin)
100+
self.boefje_cache[org.id].setdefault(plugin.consumes, []).append(plugin)
101101
continue
102102

103103
for type_ in plugin.consumes:
104-
self.organisations_boefje_type_cache[org.id].setdefault(type_, []).append(plugin)
104+
self.boefje_cache[org.id].setdefault(type_, []).append(plugin)
105105

106-
self.organisations_boefje_type_cache.expiration_enabled = True
106+
self.boefje_cache.expiration_enabled = True
107107

108108
self.logger.debug("Flushed the katalogus boefje type cache for organisations")
109109

110-
def flush_organisations_normalizer_type_cache(self) -> None:
110+
def flush_normalizer_cache(self) -> None:
111111
"""normalizer.consumes -> plugin type normalizer"""
112112
self.logger.debug("Flushing the katalogus normalizer type cache for organisations")
113113

114-
with self.lock:
114+
with self.normalizer_cache_lock:
115115
# First, we reset the cache, to make sure we won't get any ExpiredError
116-
self.organisations_normalizer_type_cache.expiration_enabled = False
117-
self.organisations_normalizer_type_cache.reset()
116+
self.normalizer_cache.expiration_enabled = False
117+
self.normalizer_cache.reset()
118118

119119
orgs = self.get_organisations()
120120
for org in orgs:
121-
self.organisations_normalizer_type_cache[org.id] = {}
121+
self.normalizer_cache[org.id] = {}
122122

123123
for plugin in self.get_plugins_by_organisation(org.id):
124124
if plugin.type != "normalizer":
@@ -131,9 +131,9 @@ def flush_organisations_normalizer_type_cache(self) -> None:
131131
continue
132132

133133
for type_ in plugin.consumes:
134-
self.organisations_normalizer_type_cache[org.id].setdefault(type_, []).append(plugin)
134+
self.normalizer_cache[org.id].setdefault(type_, []).append(plugin)
135135

136-
self.organisations_normalizer_type_cache.expiration_enabled = True
136+
self.normalizer_cache.expiration_enabled = True
137137

138138
self.logger.debug("Flushed the katalogus normalizer type cache for organisations")
139139

@@ -161,74 +161,84 @@ def get_organisations(self) -> list[Organisation]:
161161
response = self.get(url)
162162
return [Organisation(**organisation) for organisation in response.json().values()]
163163

164+
@exception_handler
164165
def get_plugins_by_organisation(self, organisation_id: str) -> list[Plugin]:
165166
url = f"{self.host}/v1/organisations/{organisation_id}/plugins"
166167
response = self.get(url)
167168
return [Plugin(**plugin) for plugin in response.json()]
168169

169170
def get_plugins_by_org_id(self, organisation_id: str) -> list[Plugin]:
171+
def _get_from_cache() -> list[Plugin]:
172+
with self.plugin_cache_lock:
173+
return dict_utils.deep_get(self.plugin_cache, [organisation_id])
174+
170175
try:
171-
with self.lock:
172-
return dict_utils.deep_get(self.organisations_plugin_cache, [organisation_id])
176+
return _get_from_cache()
173177
except dict_utils.ExpiredError:
174-
self.flush_organisations_plugin_cache()
175-
return dict_utils.deep_get(self.organisations_plugin_cache, [organisation_id])
178+
self.flush_plugin_cache()
179+
return _get_from_cache()
176180

177181
def get_plugin_by_id_and_org_id(self, plugin_id: str, organisation_id: str) -> Plugin:
182+
def _get_from_cache() -> Plugin:
183+
with self.plugin_cache_lock:
184+
return dict_utils.deep_get(self.plugin_cache, [organisation_id, plugin_id])
185+
178186
try:
179-
with self.lock:
180-
return dict_utils.deep_get(self.organisations_plugin_cache, [organisation_id, plugin_id])
187+
return _get_from_cache()
181188
except dict_utils.ExpiredError:
182-
self.flush_organisations_plugin_cache()
183-
return dict_utils.deep_get(self.organisations_plugin_cache, [organisation_id, plugin_id])
189+
self.flush_plugin_cache()
190+
return _get_from_cache()
184191

185192
def get_boefjes_by_type_and_org_id(self, boefje_type: str, organisation_id: str) -> list[Plugin]:
193+
def _get_from_cache() -> list[Plugin]:
194+
with self.boefje_cache_lock:
195+
return dict_utils.deep_get(self.boefje_cache, [organisation_id, boefje_type])
196+
186197
try:
187-
with self.lock:
188-
return dict_utils.deep_get(self.organisations_boefje_type_cache, [organisation_id, boefje_type])
198+
return _get_from_cache()
189199
except dict_utils.ExpiredError:
190-
self.flush_organisations_boefje_type_cache()
191-
return dict_utils.deep_get(self.organisations_boefje_type_cache, [organisation_id, boefje_type])
200+
self.flush_boefje_cache()
201+
return _get_from_cache()
192202

193203
def get_normalizers_by_org_id_and_type(self, organisation_id: str, normalizer_type: str) -> list[Plugin]:
194-
try:
195-
with self.lock:
204+
def _get_from_cache() -> list[Plugin]:
205+
with self.normalizer_cache_lock:
196206
return dict_utils.deep_get(
197-
self.organisations_normalizer_type_cache,
207+
self.normalizer_cache,
198208
[organisation_id, normalizer_type],
199209
)
210+
211+
try:
212+
return _get_from_cache()
200213
except dict_utils.ExpiredError:
201-
self.flush_organisations_normalizer_type_cache()
202-
return dict_utils.deep_get(
203-
self.organisations_normalizer_type_cache,
204-
[organisation_id, normalizer_type],
205-
)
214+
self.flush_normalizer_cache()
215+
return _get_from_cache()
206216

207217
def get_new_boefjes_by_org_id(self, organisation_id: str) -> list[Plugin]:
208-
# Get the enabled boefjes for the organisation from katalogus
209-
plugins = self.get_plugins_by_organisation(organisation_id)
210-
enabled_boefjes = {
211-
plugin.id: plugin
212-
for plugin in plugins
213-
if plugin.enabled is True and plugin.type == "boefje" and plugin.consumes
214-
}
215-
216-
# Check if there are new boefjes
217-
new_boefjes = []
218-
for boefje_id, boefje in enabled_boefjes.items():
219-
if boefje_id in self.organisations_new_boefjes_cache.get(organisation_id, {}):
220-
continue
221-
222-
new_boefjes.append(boefje)
223-
224-
self.organisations_new_boefjes_cache[organisation_id] = enabled_boefjes
225-
226-
self.logger.debug(
227-
"%d new boefjes found for organisation %s",
228-
len(new_boefjes),
229-
organisation_id,
230-
organisation_id=organisation_id,
231-
boefjes=[boefje.name for boefje in new_boefjes],
232-
)
233-
234-
return new_boefjes
218+
with self.new_boefjes_cache_lock:
219+
# Get the enabled boefjes for the organisation from katalogus
220+
plugins = self.get_plugins_by_organisation(organisation_id)
221+
enabled_boefjes = {
222+
plugin.id: plugin
223+
for plugin in plugins
224+
if plugin.enabled is True and plugin.type == "boefje" and plugin.consumes
225+
}
226+
227+
# Check if there are new boefjes
228+
new_boefjes = []
229+
for boefje_id, boefje in enabled_boefjes.items():
230+
if boefje_id not in self.new_boefjes_cache.get(organisation_id, {}):
231+
new_boefjes.append(boefje)
232+
233+
# Update the cache
234+
self.new_boefjes_cache[organisation_id] = enabled_boefjes
235+
236+
self.logger.debug(
237+
"%d new boefjes found for organisation %s",
238+
len(new_boefjes),
239+
organisation_id,
240+
organisation_id=organisation_id,
241+
boefjes=[boefje.name for boefje in new_boefjes],
242+
)
243+
244+
return new_boefjes

mula/scheduler/utils/dict_utils.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,3 +95,7 @@ def __len__(self) -> int:
9595
def __iter__(self) -> Iterator[str]:
9696
with self.lock:
9797
return iter(self.cache)
98+
99+
def setdefault(self, key: str, default: Any) -> Any:
100+
with self.lock:
101+
return self.cache.setdefault(key, default)

0 commit comments

Comments
 (0)