Skip to content

Commit c781e51

Browse files
committed
Almost all requested changes
1 parent 8414365 commit c781e51

9 files changed

Lines changed: 67 additions & 158 deletions

File tree

src/ets/core/aem_pack_registry.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@
3030
from schemapack.spec.schemapack import SchemaPack
3131

3232
from ets.config import Config
33+
from ets.core.config_manager import ConfigManager
3334
from ets.core.models import AEMPack, PersistedConfig, Workflow
3435
from ets.ports.inbound.aem_pack_registry import (
3536
AEMPackRegistryPort,
3637
)
37-
from ets.ports.inbound.config_manager import ConfigManagerPort
3838
from ets.ports.outbound.config_lock import ConfigLockPort
3939
from ets.ports.outbound.dao import AEMPackDao
4040
from ets.ports.outbound.incoming_aem_pack_queue import IncomingAEMPackQueuePort
@@ -56,7 +56,7 @@ def __init__(
5656
*,
5757
config: Config,
5858
aem_pack_dao: AEMPackDao,
59-
config_manager: ConfigManagerPort,
59+
config_manager: ConfigManager,
6060
config_lock: ConfigLockPort,
6161
incoming_aem_pack_queue: IncomingAEMPackQueuePort,
6262
):

src/ets/core/config_manager.py

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,20 @@
2121
from ets.core.config_comparison import compare_configs
2222
from ets.core.config_pruning import prune_unproductive_subgraphs
2323
from ets.core.config_validation import ConfigValidationError, validate
24-
from ets.core.model_derivation import ModelDeriver
24+
from ets.core.model_derivation import ModelDerivationError, ModelDeriver
2525
from ets.core.models import PersistedConfig, RawConfig
26-
from ets.ports.inbound.config_manager import ConfigManagerError, ConfigManagerPort
2726
from ets.ports.outbound.config_loader import ConfigLoaderPort
2827
from ets.ports.outbound.config_version import ConfigVersionerPort
2928
from ets.ports.outbound.config_writer import ConfigWriterPort
3029

3130
log = logging.getLogger(__name__)
3231

3332

34-
class ConfigManager(ConfigManagerPort):
33+
class ConfigManagerError(RuntimeError):
34+
"""Raised when an unexpected error happens while handling a transformation configuration."""
35+
36+
37+
class ConfigManager:
3538
"""Manages loading, comparison, validation and selection of an active config."""
3639

3740
def __init__(
@@ -64,7 +67,7 @@ def known_version(self) -> int:
6467
return self._known_version
6568

6669
async def update_config(self):
67-
"""Return the active config and its version, reloading from DB if the version changed."""
70+
"""Check if the active config is stale and reload from DB if the version changed."""
6871
current_version = await self._versioner.get_version()
6972
if self._current_config is None:
7073
log.info("Loading initial config (version %d).", current_version)
@@ -86,7 +89,7 @@ async def update_config(self):
8689
log.critical(inconsistent_version)
8790
raise inconsistent_version
8891

89-
async def resolve_and_persist(self, input_config_path: Path) -> None:
92+
async def resolve_and_persist(self, input_config_path: Path) -> bool:
9093
"""Load, resolve, and persist the transformation config.
9194
9295
- Loads the raw config from disk and the persisted one from the database.
@@ -100,19 +103,28 @@ async def resolve_and_persist(self, input_config_path: Path) -> None:
100103
Raises:
101104
ConfigManagerError: If the new config fails validation and no previous
102105
valid config exists in the database.
106+
Returns:
107+
True, if the persisted config is outdated and has been replaced with a new one
108+
False in all other cases
103109
"""
104110
raw_config = self._loader.load_config_from_file(input_config_path)
105111
persisted_config = await self._loader.load_config_from_db()
106112

107-
await self._resolve(raw_config=raw_config, persisted_config=persisted_config)
113+
return await self._resolve_has_config_changed(
114+
raw_config=raw_config, persisted_config=persisted_config
115+
)
108116

109-
async def _resolve(
117+
async def _resolve_has_config_changed(
110118
self, *, raw_config: RawConfig, persisted_config: PersistedConfig
111-
):
112-
"""Compare, validate/prune and derive schemas as needed."""
119+
) -> bool:
120+
"""Compare, validate/prune, derive schemas and persist the config as needed.
121+
Returns:
122+
True, if the persisted config is outdated and has been replaced with a new one
123+
False in all other cases
124+
"""
113125
result = compare_configs(raw_config, persisted_config)
114126
if not isinstance(result, RawConfig):
115-
return
127+
return False
116128

117129
try:
118130
validated = validate(raw_config)
@@ -134,11 +146,21 @@ async def _resolve(
134146
log.warning(
135147
"New config failed to validate, using existing, persisted config instead."
136148
)
137-
return persisted_config
138-
derived_models = self._model_deriver.derive_models(pruned)
149+
return False
150+
151+
try:
152+
derived_models = self._model_deriver.derive_models(pruned)
153+
except ModelDerivationError:
154+
log.warning(
155+
"Could not derive model schemas for new configuration."
156+
+ "\nFalling back to existing, persisted config instead."
157+
)
158+
return False
159+
139160
resolved = PersistedConfig(
140161
models=derived_models,
141162
routes=pruned.routes,
142163
workflows=pruned.workflows,
143164
)
144165
await self._writer.write_config(resolved)
166+
return True

src/ets/core/config_updater.py

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,31 +18,27 @@
1818
import logging
1919
from pathlib import Path
2020

21-
from ets.ports.inbound.config_manager import ConfigManagerPort
22-
from ets.ports.inbound.config_updater import ConfigUpdaterPort
21+
from ets.core.config_manager import ConfigManager
2322
from ets.ports.outbound.config_lock import ConfigLockPort
24-
from ets.ports.outbound.config_version import ConfigVersionerPort
2523
from ets.ports.outbound.incoming_aem_pack_queue import IncomingAEMPackQueuePort
2624

2725
log = logging.getLogger(__name__)
2826

2927

30-
class ConfigUpdater(ConfigUpdaterPort):
28+
class ConfigUpdater:
3129
"""Coordinates the startup config update across service instances."""
3230

3331
def __init__(
3432
self,
3533
*,
3634
input_config_path: Path,
3735
config_lock: ConfigLockPort,
38-
config_manager: ConfigManagerPort,
39-
versioner: ConfigVersionerPort,
36+
config_manager: ConfigManager,
4037
incoming_aem_pack_queue: IncomingAEMPackQueuePort,
4138
):
4239
self._input_config_path = input_config_path
4340
self._config_lock = config_lock
4441
self._config_manager = config_manager
45-
self._versioner = versioner
4642
self._incoming_aem_pack_queue = incoming_aem_pack_queue
4743

4844
async def run(self) -> None:
@@ -57,15 +53,15 @@ async def run(self) -> None:
5753
await self._config_lock.setup_index()
5854
if not await self._config_lock.try_acquire_lock():
5955
await self._config_lock.wait_for_lock_release()
60-
log.info("Update lock released, loading persisted config placeholder.")
56+
log.info("Update lock released, loading persisted config.")
6157
return
6258

6359
try:
6460
log.info("Lock acquired, starting config update.")
65-
previous_version = await self._versioner.get_version()
66-
await self._config_manager.resolve_and_persist(self._input_config_path)
67-
current_version = await self._versioner.get_version()
68-
if current_version != previous_version:
61+
config_has_changed = await self._config_manager.resolve_and_persist(
62+
self._input_config_path
63+
)
64+
if config_has_changed:
6965
await self._incoming_aem_pack_queue.mark_all_for_reprocessing()
7066
log.info("Config validation/update finished.")
7167
finally:

src/ets/core/config_validation.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ def _validate_routes(raw_config: RawConfig) -> None:
8787
if output_model.is_ingress:
8888
raise ConfigValidationError(
8989
f"Route '{route.name}' output model '{output_model.name}' "
90-
f"must not be an ingress model (is_ingress must be False)."
90+
"must not be an ingress model (is_ingress must be False)."
9191
)
9292

9393

@@ -140,7 +140,7 @@ def _validate_models(raw_config: RawConfig) -> None:
140140
if model.is_ingress and model.schema_ is None:
141141
raise ConfigValidationError(
142142
f"Model '{model.name}' is marked as ingress but does not have a schema defined. "
143-
f"Ingress models must have a schema defined."
143+
"Ingress models must have a schema defined."
144144
)
145145
if not model.is_ingress and model.schema_:
146146
raise ConfigValidationError(
@@ -156,7 +156,7 @@ def _validate_graph_and_add_order(raw_config: RawConfig) -> ValidatedConfig:
156156
157157
This method performs the following steps:
158158
1. Validates that the directed graph formed by the models and routes is acyclic
159-
and had unique path properties.
159+
and has unique path properties.
160160
2. Computes a topological ordering of the models.
161161
3. Returns a new `ValidatedConfig` object where each model is wrapped as an
162162
`OrderedRawModel` with the corresponding `order` assigned.

src/ets/inject.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,6 @@
5151
from ets.core.config_updater import ConfigUpdater
5252
from ets.core.model_derivation import ModelDeriver
5353
from ets.ports.inbound.aem_pack_registry import AEMPackRegistryPort
54-
from ets.ports.inbound.config_manager import ConfigManagerPort
55-
from ets.ports.inbound.config_updater import ConfigUpdaterPort
5654
from ets.ports.outbound.config_loader import ConfigLoaderPort
5755
from ets.ports.outbound.config_lock import ConfigLockPort
5856
from ets.ports.outbound.config_version import ConfigVersionerPort
@@ -65,7 +63,7 @@
6563
class _BaseWiring:
6664
"""Contains everything reused across all higher-level preparation steps."""
6765

68-
config_manager: ConfigManagerPort
66+
config_manager: ConfigManager
6967
config_lock: ConfigLockPort
7068
versioner: ConfigVersionerPort
7169
incoming_aem_pack_queue: IncomingAEMPackQueuePort
@@ -141,9 +139,7 @@ async def _prepare_base_wiring(
141139

142140

143141
@asynccontextmanager
144-
async def prepare_config_updater(
145-
*, config: Config
146-
) -> AsyncGenerator[ConfigUpdaterPort]:
142+
async def prepare_config_updater(*, config: Config) -> AsyncGenerator[ConfigUpdater]:
147143
"""Construct and initialize a ConfigUpdater with all its dependencies."""
148144
async with (
149145
ConfiguredMongoClient(config=config) as client,
@@ -153,7 +149,6 @@ async def prepare_config_updater(
153149
input_config_path=config.input_config_path,
154150
config_lock=base.config_lock,
155151
config_manager=base.config_manager,
156-
versioner=base.versioner,
157152
incoming_aem_pack_queue=base.incoming_aem_pack_queue,
158153
)
159154

src/ets/ports/inbound/config_manager.py

Lines changed: 0 additions & 60 deletions
This file was deleted.

src/ets/ports/inbound/config_updater.py

Lines changed: 0 additions & 31 deletions
This file was deleted.

tests/test_config_manager.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,11 @@
2323
from yaml import safe_load
2424

2525
from ets.core import config_manager as config_manager_module
26-
from ets.core.config_manager import ConfigManager
26+
from ets.core.config_manager import ConfigManager, ConfigManagerError
2727
from ets.core.config_pruning import prune_unproductive_subgraphs
2828
from ets.core.config_validation import ConfigValidationError
2929
from ets.core.model_derivation import ModelDeriver
3030
from ets.core.models import Model, PersistedConfig, RawConfig, ValidatedConfig
31-
from ets.ports.inbound.config_manager import ConfigManagerError
3231
from ets.ports.outbound.config_loader import ConfigLoaderPort
3332
from ets.ports.outbound.config_version import ConfigVersionerPort
3433
from ets.ports.outbound.config_writer import ConfigWriterPort

0 commit comments

Comments
 (0)