Skip to content

Commit 691d16a

Browse files
committed
updated duplicates collection
1 parent 7d910ee commit 691d16a

File tree

3 files changed

+42
-28
lines changed

3 files changed

+42
-28
lines changed

airbyte_cdk/sources/declarative/parsers/manifest_deduplicator.py

+39-28
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# Type definitions for better readability
1414
ManifestType = Dict[str, Any]
1515
DefinitionsType = Dict[str, Any]
16-
DuplicatesType = DefaultDict[str, List[Tuple[List[str], Dict, Dict]]]
16+
DuplicatesType = DefaultDict[str, List[Tuple[List[str], Dict[str, Any], Dict[str, Any]]]]
1717

1818
# Configuration constants
1919
N_OCCURANCES = 2
@@ -27,9 +27,6 @@
2727
"url_base",
2828
]
2929

30-
# the placeholder for collected duplicates
31-
DUPLICATES: DuplicatesType = defaultdict(list, {})
32-
3330

3431
def deduplicate_definitions(resolved_manifest: ManifestType) -> ManifestType:
3532
"""
@@ -47,23 +44,23 @@ def deduplicate_definitions(resolved_manifest: ManifestType) -> ManifestType:
4744
_manifest = copy.deepcopy(resolved_manifest)
4845
definitions = _manifest.get(DEF_TAG, {})
4946

50-
_collect_duplicates(definitions)
51-
_handle_duplicates(definitions)
47+
duplicates = _collect_duplicates(definitions)
48+
_handle_duplicates(definitions, duplicates)
5249

5350
return _manifest
5451
except ManifestDeduplicationException:
5552
# if any arror occurs, we just return the original manifest.
5653
return resolved_manifest
5754

5855

59-
def _replace_duplicates_with_refs(definitions: ManifestType) -> None:
56+
def _replace_duplicates_with_refs(definitions: ManifestType, duplicates: DuplicatesType) -> None:
6057
"""
6158
Process duplicate objects and replace them with references.
6259
6360
Args:
6461
definitions: The definitions dictionary to modify
6562
"""
66-
for _, occurrences in DUPLICATES.items():
63+
for _, occurrences in duplicates.items():
6764
# Skip non-duplicates
6865
if len(occurrences) < N_OCCURANCES:
6966
continue
@@ -84,20 +81,20 @@ def _replace_duplicates_with_refs(definitions: ManifestType) -> None:
8481
parent_obj[key] = _create_ref_object(ref_key)
8582

8683

87-
def _handle_duplicates(definitions: DefinitionsType) -> None:
84+
def _handle_duplicates(definitions: DefinitionsType, duplicates: DuplicatesType) -> None:
8885
"""
89-
Process the DUPLICATES and replace them with references.
86+
Process the duplicates and replace them with references.
9087
9188
Args:
92-
DUPLICATES: Dictionary of duplicate objects
89+
duplicates: Dictionary of duplicate objects
9390
"""
9491
# process duplicates only if there are any
95-
if len(DUPLICATES) > 0:
92+
if len(duplicates) > 0:
9693
if not SHARED_TAG in definitions:
9794
definitions[SHARED_TAG] = {}
9895

9996
try:
100-
_replace_duplicates_with_refs(definitions)
97+
_replace_duplicates_with_refs(definitions, duplicates)
10198
except Exception as e:
10299
raise ManifestDeduplicationException(str(e))
103100

@@ -116,19 +113,21 @@ def _is_allowed_tag(key: str) -> bool:
116113

117114

118115
def _add_duplicate(
116+
duplicates: DuplicatesType,
119117
current_path: List[str],
120-
obj: Dict,
118+
obj: Dict[str, Any],
121119
value: Any,
122120
key: Optional[str] = None,
123121
) -> None:
124122
"""
125123
Adds a duplicate record of an observed object by computing a unique hash for the provided value.
126124
127125
This function computes a hash for the given value (or a dictionary composed of the key and value if a key is provided)
128-
and appends a tuple containing the current path, the original object, and the value to the global DUPLICATES
126+
and appends a tuple containing the current path, the original object, and the value to the duplicates
129127
dictionary under the corresponding hash.
130128
131129
Parameters:
130+
duplicates (DuplicatesType): The dictionary to store duplicate records.
132131
current_path (List[str]): The list of keys or indices representing the current location in the object hierarchy.
133132
obj (Dict): The original dictionary object where the duplicate is observed.
134133
value (Any): The value to be hashed and used for identifying duplicates.
@@ -138,7 +137,7 @@ def _add_duplicate(
138137
value_to_hash = value if key is None else {key: value}
139138
obj_hash = _hash_object(value_to_hash)
140139
if obj_hash:
141-
DUPLICATES[obj_hash].append((current_path, obj, value))
140+
duplicates[obj_hash].append((current_path, obj, value))
142141

143142

144143
def _add_to_shared_definitions(
@@ -161,49 +160,61 @@ def _add_to_shared_definitions(
161160
return definitions
162161

163162

164-
def _collect_duplicates(node: ManifestType, path: Optional[List[str]] = None) -> None:
163+
def _collect_duplicates(node: ManifestType) -> DuplicatesType:
165164
"""
166165
Traverse the JSON object and collect all potential duplicate values and objects.
167166
168167
Args:
169-
node: The JSON object to analyze
168+
node: The JSON object to analyze.
170169
171170
Returns:
172-
DUPLICATES: A dictionary of duplicate objects
171+
duplicates: A dictionary of duplicate objects.
173172
"""
174173

175-
try:
176-
if not isinstance(node, dict):
174+
def _collect(obj: Dict[str, Any], path: Optional[List[str]] = None) -> None:
175+
"""
176+
The closure to recursively collect duplicates in the JSON object.
177+
178+
Args:
179+
obj: The current object being analyzed.
180+
path: The current path in the object hierarchy.
181+
"""
182+
if not isinstance(obj, dict):
177183
return
178184

179185
path = [] if path is None else path
180-
181186
# Check if the object is empty
182-
for key, value in node.items():
187+
for key, value in obj.items():
183188
current_path = path + [key]
184189

185190
if isinstance(value, dict):
186191
# First process nested dictionaries
187-
_collect_duplicates(value, current_path)
192+
_collect(value, current_path)
188193
# Process allowed-only component tags
189194
if _is_allowed_tag(key):
190-
_add_duplicate(current_path, node, value)
195+
_add_duplicate(duplicates, current_path, obj, value)
191196

192197
# handle primitive types
193198
elif isinstance(value, (str, int, float, bool)):
194199
# Process allowed-only field tags
195200
if _is_allowed_tag(key):
196-
_add_duplicate(current_path, node, value, key)
201+
_add_duplicate(duplicates, current_path, obj, value, key)
197202

198203
# handle list cases
199204
elif isinstance(value, list):
200205
for i, item in enumerate(value):
201-
_collect_duplicates(item, current_path + [str(i)])
206+
_collect(item, current_path + [str(i)])
207+
208+
duplicates: DuplicatesType = defaultdict(list, {})
209+
210+
try:
211+
_collect(node)
212+
return duplicates
202213
except Exception as e:
203214
raise ManifestDeduplicationException(str(e))
204215

205216

206-
def _hash_object(node: Dict) -> Optional[str]:
217+
def _hash_object(node: Dict[str, Any]) -> Optional[str]:
207218
"""
208219
Create a unique hash for a dictionary object.
209220

unit_tests/connector_builder/test_connector_builder_handler.py

+1
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ def test_resolve_manifest(valid_resolve_manifest_config_file):
384384
},
385385
"record_selector": {"extractor": {"field_path": ["result"]}},
386386
},
387+
"shared": {},
387388
},
388389
"streams": [
389390
{

unit_tests/sources/declarative/interpolation/test_macros.py

+2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import datetime
66

77
import pytest
8+
from freezegun import freeze_time
89

910
from airbyte_cdk.sources.declarative.interpolation.macros import macros
1011

@@ -29,6 +30,7 @@ def test_macros_export(test_name, fn_name, found_in_macros):
2930
assert fn_name not in macros
3031

3132

33+
@freeze_time("2022-01-01")
3234
@pytest.mark.parametrize(
3335
"input_value, format, input_format, expected_output",
3436
[

0 commit comments

Comments
 (0)