Skip to content

Commit 2e507f7

Browse files
Add locking to katalogus service (1.13) (#2144)
Co-authored-by: JP Bruins Slot <jpbruinsslot@gmail.com>
1 parent b22b455 commit 2e507f7

1 file changed

Lines changed: 61 additions & 48 deletions

File tree

mula/scheduler/connectors/services/katalogus.py

Lines changed: 61 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import threading
12
from typing import Dict, List
23

34
from scheduler.connectors.errors import exception_handler
@@ -15,6 +16,8 @@ class Katalogus(HTTPService):
1516
def __init__(self, host: str, source: str, timeout: int = 5, cache_ttl: int = 30):
1617
super().__init__(host, source, timeout)
1718

19+
self.lock = threading.Lock()
20+
1821
# For every organisation we cache its plugins, it references the
1922
# plugin-id as key and the plugin as value.
2023
self.organisations_plugin_cache: dict_utils.ExpiringDict = dict_utils.ExpiringDict(lifetime=cache_ttl)
@@ -44,76 +47,82 @@ def flush_caches(self) -> None:
4447
def flush_organisations_plugin_cache(self) -> None:
4548
self.logger.debug("flushing plugin cache [cache=%s]", self.organisations_plugin_cache.cache)
4649

47-
# First, we reset the cache, to make sure we won't get any ExpiredError
48-
self.organisations_plugin_cache.expiration_enabled = False
49-
self.organisations_plugin_cache.reset()
50+
with self.lock:
51+
# First, we reset the cache, to make sure we won't get any ExpiredError
52+
self.organisations_plugin_cache.expiration_enabled = False
53+
self.organisations_plugin_cache.reset()
54+
55+
orgs = self.get_organisations()
56+
for org in orgs:
57+
if org.id not in self.organisations_plugin_cache:
58+
self.organisations_plugin_cache[org.id] = {}
59+
self.organisations_new_boefjes_cache[org.id] = {}
5060

51-
orgs = self.get_organisations()
52-
for org in orgs:
53-
if org.id not in self.organisations_plugin_cache:
54-
self.organisations_plugin_cache[org.id] = {}
55-
self.organisations_new_boefjes_cache[org.id] = {}
61+
plugins = self.get_plugins_by_organisation(org.id)
62+
self.organisations_plugin_cache[org.id] = {plugin.id: plugin for plugin in plugins if plugin.enabled}
5663

57-
plugins = self.get_plugins_by_organisation(org.id)
58-
self.organisations_plugin_cache[org.id] = {plugin.id: plugin for plugin in plugins if plugin.enabled}
64+
self.organisations_plugin_cache.expiration_enabled = True
5965

60-
self.organisations_plugin_cache.expiration_enabled = True
6166
self.logger.debug("flushed plugins cache [cache=%s]", self.organisations_plugin_cache.cache)
6267

6368
def flush_organisations_boefje_type_cache(self) -> None:
6469
"""boefje.consumes -> plugin type boefje"""
6570
self.logger.debug("flushing boefje cache [cache=%s]", self.organisations_boefje_type_cache.cache)
6671

67-
# First, we reset the cache, to make sure we won't get any ExpiredError
68-
self.organisations_boefje_type_cache.expiration_enabled = False
69-
self.organisations_boefje_type_cache.reset()
72+
with self.lock:
73+
# First, we reset the cache, to make sure we won't get any ExpiredError
74+
self.organisations_boefje_type_cache.expiration_enabled = False
75+
self.organisations_boefje_type_cache.reset()
7076

71-
orgs = self.get_organisations()
72-
for org in orgs:
73-
self.organisations_boefje_type_cache[org.id] = {}
77+
orgs = self.get_organisations()
78+
for org in orgs:
79+
self.organisations_boefje_type_cache[org.id] = {}
7480

75-
for plugin in self.get_plugins_by_organisation(org.id):
76-
if plugin.type != "boefje":
77-
continue
81+
for plugin in self.get_plugins_by_organisation(org.id):
82+
if plugin.type != "boefje":
83+
continue
7884

79-
if plugin.enabled is False:
80-
continue
85+
if plugin.enabled is False:
86+
continue
8187

82-
# NOTE: backwards compatibility, when it is a boefje the
83-
# consumes field is a string field.
84-
if isinstance(plugin.consumes, str):
85-
self.organisations_boefje_type_cache[org.id].setdefault(plugin.consumes, []).append(plugin)
86-
continue
88+
# NOTE: backwards compatibility, when it is a boefje the
89+
# consumes field is a string field.
90+
if isinstance(plugin.consumes, str):
91+
self.organisations_boefje_type_cache[org.id].setdefault(plugin.consumes, []).append(plugin)
92+
continue
8793

88-
for type_ in plugin.consumes:
89-
self.organisations_boefje_type_cache[org.id].setdefault(type_, []).append(plugin)
94+
for type_ in plugin.consumes:
95+
self.organisations_boefje_type_cache[org.id].setdefault(type_, []).append(plugin)
96+
97+
self.organisations_boefje_type_cache.expiration_enabled = True
9098

91-
self.organisations_boefje_type_cache.expiration_enabled = True
9299
self.logger.debug("flushed boefje cache [cache=%s]", self.organisations_boefje_type_cache.cache)
93100

94101
def flush_organisations_normalizer_type_cache(self) -> None:
95102
"""normalizer.consumes -> plugin type normalizer"""
96103
self.logger.debug("flushing normalizer cache [cache=%s]", self.organisations_normalizer_type_cache.cache)
97104

98-
# First, we reset the cache, to make sure we won't get any ExpiredError
99-
self.organisations_normalizer_type_cache.expiration_enabled = False
100-
self.organisations_normalizer_type_cache.reset()
105+
with self.lock:
106+
# First, we reset the cache, to make sure we won't get any ExpiredError
107+
self.organisations_normalizer_type_cache.expiration_enabled = False
108+
self.organisations_normalizer_type_cache.reset()
109+
110+
orgs = self.get_organisations()
111+
for org in orgs:
112+
self.organisations_normalizer_type_cache[org.id] = {}
101113

102-
orgs = self.get_organisations()
103-
for org in orgs:
104-
self.organisations_normalizer_type_cache[org.id] = {}
114+
for plugin in self.get_plugins_by_organisation(org.id):
115+
if plugin.type != "normalizer":
116+
continue
105117

106-
for plugin in self.get_plugins_by_organisation(org.id):
107-
if plugin.type != "normalizer":
108-
continue
118+
if plugin.enabled is False:
119+
continue
109120

110-
if plugin.enabled is False:
111-
continue
121+
for type_ in plugin.consumes:
122+
self.organisations_normalizer_type_cache[org.id].setdefault(type_, []).append(plugin)
112123

113-
for type_ in plugin.consumes:
114-
self.organisations_normalizer_type_cache[org.id].setdefault(type_, []).append(plugin)
124+
self.organisations_normalizer_type_cache.expiration_enabled = True
115125

116-
self.organisations_normalizer_type_cache.expiration_enabled = True
117126
self.logger.debug("flushed normalizer cache [cache=%s]", self.organisations_normalizer_type_cache.cache)
118127

119128
@exception_handler
@@ -147,28 +156,32 @@ def get_plugins_by_organisation(self, organisation_id: str) -> List[Plugin]:
147156

148157
def get_plugins_by_org_id(self, organisation_id: str) -> List[Plugin]:
149158
try:
150-
return dict_utils.deep_get(self.organisations_plugin_cache, [organisation_id])
159+
with self.lock:
160+
return dict_utils.deep_get(self.organisations_plugin_cache, [organisation_id])
151161
except dict_utils.ExpiredError:
152162
self.flush_organisations_plugin_cache()
153163
return dict_utils.deep_get(self.organisations_plugin_cache, [organisation_id])
154164

155165
def get_plugin_by_id_and_org_id(self, plugin_id: str, organisation_id: str) -> Plugin:
156166
try:
157-
return dict_utils.deep_get(self.organisations_plugin_cache, [organisation_id, plugin_id])
167+
with self.lock:
168+
return dict_utils.deep_get(self.organisations_plugin_cache, [organisation_id, plugin_id])
158169
except dict_utils.ExpiredError:
159170
self.flush_organisations_plugin_cache()
160171
return dict_utils.deep_get(self.organisations_plugin_cache, [organisation_id, plugin_id])
161172

162173
def get_boefjes_by_type_and_org_id(self, boefje_type: str, organisation_id: str) -> List[Plugin]:
163174
try:
164-
return dict_utils.deep_get(self.organisations_boefje_type_cache, [organisation_id, boefje_type])
175+
with self.lock:
176+
return dict_utils.deep_get(self.organisations_boefje_type_cache, [organisation_id, boefje_type])
165177
except dict_utils.ExpiredError:
166178
self.flush_organisations_boefje_type_cache()
167179
return dict_utils.deep_get(self.organisations_boefje_type_cache, [organisation_id, boefje_type])
168180

169181
def get_normalizers_by_org_id_and_type(self, organisation_id: str, normalizer_type: str) -> List[Plugin]:
170182
try:
171-
return dict_utils.deep_get(self.organisations_normalizer_type_cache, [organisation_id, normalizer_type])
183+
with self.lock:
184+
return dict_utils.deep_get(self.organisations_normalizer_type_cache, [organisation_id, normalizer_type])
172185
except dict_utils.ExpiredError:
173186
self.flush_organisations_normalizer_type_cache()
174187
return dict_utils.deep_get(self.organisations_normalizer_type_cache, [organisation_id, normalizer_type])

0 commit comments

Comments
 (0)