39
39
from airbyte_cdk .sources .declarative .parsers .manifest_component_transformer import (
40
40
ManifestComponentTransformer ,
41
41
)
42
+ from airbyte_cdk .sources .declarative .parsers .manifest_normalizer import (
43
+ ManifestNormalizer ,
44
+ )
42
45
from airbyte_cdk .sources .declarative .parsers .manifest_reference_resolver import (
43
46
ManifestReferenceResolver ,
44
47
)
57
60
from airbyte_cdk .utils .traced_exception import AirbyteTracedException
58
61
59
62
63
+ def _get_declarative_component_schema () -> Dict [str , Any ]:
64
+ try :
65
+ raw_component_schema = pkgutil .get_data (
66
+ "airbyte_cdk" , "sources/declarative/declarative_component_schema.yaml"
67
+ )
68
+ if raw_component_schema is not None :
69
+ declarative_component_schema = yaml .load (raw_component_schema , Loader = yaml .SafeLoader )
70
+ return declarative_component_schema # type: ignore
71
+ else :
72
+ raise RuntimeError (
73
+ "Failed to read manifest component json schema required for deduplication"
74
+ )
75
+ except FileNotFoundError as e :
76
+ raise FileNotFoundError (
77
+ f"Failed to read manifest component json schema required for deduplication: { e } "
78
+ )
79
+
80
+
60
81
class ManifestDeclarativeSource (DeclarativeSource ):
61
82
"""Declarative source defined by a manifest of low-code components that define source connector behavior"""
62
83
@@ -78,6 +99,8 @@ def __init__(
78
99
component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
79
100
"""
80
101
self .logger = logging .getLogger (f"airbyte.{ self .name } " )
102
+
103
+ self ._declarative_component_schema = _get_declarative_component_schema ()
81
104
# For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
82
105
manifest = dict (source_config )
83
106
if "type" not in manifest :
@@ -86,10 +109,15 @@ def __init__(
86
109
# If custom components are needed, locate and/or register them.
87
110
self .components_module : ModuleType | None = get_registered_components_module (config = config )
88
111
89
- self ._reduce_manifest_commons = True if emit_connector_builder_messages else False
90
- resolved_source_config = ManifestReferenceResolver ().preprocess_manifest (
91
- manifest , self ._reduce_manifest_commons
92
- )
112
+ resolved_source_config = ManifestReferenceResolver ().preprocess_manifest (manifest )
113
+
114
+ if emit_connector_builder_messages :
115
+ # reduce commonalities in the manifest after the references have been resolved,
116
+ # used mostly for Connector Builder use cases.
117
+ resolved_source_config = ManifestNormalizer (
118
+ resolved_source_config , self ._declarative_component_schema
119
+ ).normalize ()
120
+
93
121
propagated_source_config = ManifestComponentTransformer ().propagate_types_and_parameters (
94
122
"" , resolved_source_config , {}
95
123
)
@@ -269,22 +297,6 @@ def _validate_source(self) -> None:
269
297
"""
270
298
Validates the connector manifest against the declarative component schema
271
299
"""
272
- try :
273
- raw_component_schema = pkgutil .get_data (
274
- "airbyte_cdk" , "sources/declarative/declarative_component_schema.yaml"
275
- )
276
- if raw_component_schema is not None :
277
- declarative_component_schema = yaml .load (
278
- raw_component_schema , Loader = yaml .SafeLoader
279
- )
280
- else :
281
- raise RuntimeError (
282
- "Failed to read manifest component json schema required for validation"
283
- )
284
- except FileNotFoundError as e :
285
- raise FileNotFoundError (
286
- f"Failed to read manifest component json schema required for validation: { e } "
287
- )
288
300
289
301
streams = self ._source_config .get ("streams" )
290
302
dynamic_streams = self ._source_config .get ("dynamic_streams" )
@@ -294,7 +306,7 @@ def _validate_source(self) -> None:
294
306
)
295
307
296
308
try :
297
- validate (self ._source_config , declarative_component_schema )
309
+ validate (self ._source_config , self . _declarative_component_schema )
298
310
except ValidationError as e :
299
311
raise ValidationError (
300
312
"Validation against json schema defined in declarative_component_schema.yaml schema failed"
0 commit comments