Skip to content

Commit 7d910ee

Browse files
committed
deduplication version 2
1 parent a488ab3 commit 7d910ee

File tree

2 files changed

+109
-179
lines changed

2 files changed

+109
-179
lines changed

airbyte_cdk/sources/declarative/parsers/manifest_deduplicator.py

+107-177
Original file line numberDiff line numberDiff line change
@@ -6,32 +6,30 @@
66
import hashlib
77
import json
88
from collections import defaultdict
9-
from typing import Any, DefaultDict, Dict, Hashable, List, Optional, Tuple
9+
from typing import Any, DefaultDict, Dict, List, Optional, Tuple
1010

1111
from airbyte_cdk.sources.declarative.parsers.custom_exceptions import ManifestDeduplicationException
1212

1313
# Type definitions for better readability
1414
ManifestType = Dict[str, Any]
1515
DefinitionsType = Dict[str, Any]
16-
FieldDuplicatesType = DefaultDict[Tuple[str, Any], List[Tuple[List[str], Dict]]]
17-
ComponentDuplicatesType = DefaultDict[str, List[Tuple[List[str], Dict, Dict]]]
16+
DuplicatesType = DefaultDict[str, List[Tuple[List[str], Dict, Dict]]]
1817

1918
# Configuration constants
2019
N_OCCURANCES = 2
2120

2221
DEF_TAG = "definitions"
2322
SHARED_TAG = "shared"
2423

25-
# SPECIFY COMPONENT TAGS FOR DEDUPLICATION
26-
COMPONENT_TAGS = [
24+
# SPECIFY TAGS FOR DEDUPLICATION
25+
TAGS = [
2726
"authenticator",
28-
]
29-
30-
# SPECIFY FIELD TAGS FOR DEDUPLICATION
31-
FIELD_TAGS = [
3227
"url_base",
3328
]
3429

30+
# the placeholder for collected duplicates
31+
DUPLICATES: DuplicatesType = defaultdict(list, {})
32+
3533

3634
def deduplicate_definitions(resolved_manifest: ManifestType) -> ManifestType:
3735
"""
@@ -41,124 +39,169 @@ def deduplicate_definitions(resolved_manifest: ManifestType) -> ManifestType:
4139
resolved_manifest: A dictionary representing a JSON structure to be analyzed.
4240
4341
Returns:
44-
A refactored JSON structure with common properties extracted.
42+
A refactored JSON structure with common properties extracted to `definitions.shared`,
43+
the duplicated properties replaced with references
4544
"""
4645

4746
try:
4847
_manifest = copy.deepcopy(resolved_manifest)
4948
definitions = _manifest.get(DEF_TAG, {})
50-
field_duplicates, component_duplicates = _collect_all_duplicates(definitions)
51-
_process_duplicates(definitions, field_duplicates, component_duplicates)
49+
50+
_collect_duplicates(definitions)
51+
_handle_duplicates(definitions)
52+
5253
return _manifest
5354
except ManifestDeduplicationException:
54-
# we don't want to fix every single error which might occur,
55-
# due to the varaety of possible manifest configurations,
5655
# if any arror occurs, we just return the original manifest.
5756
return resolved_manifest
5857

5958

60-
def _process_duplicates(
61-
definitions: DefinitionsType,
62-
field_duplicates: FieldDuplicatesType,
63-
component_duplicates: ComponentDuplicatesType,
64-
) -> None:
59+
def _replace_duplicates_with_refs(definitions: ManifestType) -> None:
60+
"""
61+
Process duplicate objects and replace them with references.
62+
63+
Args:
64+
definitions: The definitions dictionary to modify
65+
"""
66+
for _, occurrences in DUPLICATES.items():
67+
# Skip non-duplicates
68+
if len(occurrences) < N_OCCURANCES:
69+
continue
70+
71+
# Take the value from the first occurrence, as they are the same
72+
path, _, value = occurrences[0]
73+
# take the component's name as the last part of it's path
74+
key = path[-1]
75+
# Create a meaningful reference key
76+
ref_key = _create_reference_key(definitions, key)
77+
# Add to definitions
78+
_add_to_shared_definitions(definitions, ref_key, value)
79+
80+
# Replace all occurrences with references
81+
for path, parent_obj, _ in occurrences:
82+
if path: # Make sure the path is valid
83+
key = path[-1]
84+
parent_obj[key] = _create_ref_object(ref_key)
85+
86+
87+
def _handle_duplicates(definitions: DefinitionsType) -> None:
6588
"""
66-
Process the duplicates and replace them with references.
89+
Process the DUPLICATES and replace them with references.
6790
6891
Args:
69-
field_duplicates: Dictionary of duplicate primitive values
70-
component_duplicates: Dictionary of duplicate objects
92+
DUPLICATES: Dictionary of duplicate objects
7193
"""
7294
# process duplicates only if there are any
73-
if len(field_duplicates) > 0 or len(component_duplicates) > 0:
95+
if len(DUPLICATES) > 0:
7496
if not SHARED_TAG in definitions:
7597
definitions[SHARED_TAG] = {}
7698

7799
try:
78-
_process_component_duplicates(definitions, component_duplicates)
79-
_process_field_duplicates(definitions, field_duplicates)
100+
_replace_duplicates_with_refs(definitions)
80101
except Exception as e:
81102
raise ManifestDeduplicationException(str(e))
82103

83104

84-
def _is_allowed_component(key: str) -> bool:
105+
def _is_allowed_tag(key: str) -> bool:
85106
"""
86-
Check if the key is an allowed component tag.
107+
Check if the key is an allowed tag for deduplication.
87108
88109
Args:
89110
key: The key to check
90111
91112
Returns:
92113
True if the key is allowed, False otherwise
93114
"""
94-
return key in COMPONENT_TAGS
115+
return key in TAGS
95116

96117

97-
def _is_allowed_field(key: str) -> bool:
118+
def _add_duplicate(
119+
current_path: List[str],
120+
obj: Dict,
121+
value: Any,
122+
key: Optional[str] = None,
123+
) -> None:
98124
"""
99-
Check if the key is an allowed field tag.
125+
Adds a duplicate record of an observed object by computing a unique hash for the provided value.
100126
101-
Args:
102-
key: The key to check
127+
This function computes a hash for the given value (or a dictionary composed of the key and value if a key is provided)
128+
and appends a tuple containing the current path, the original object, and the value to the global DUPLICATES
129+
dictionary under the corresponding hash.
103130
104-
Returns:
105-
True if the key is allowed, False otherwise
131+
Parameters:
132+
current_path (List[str]): The list of keys or indices representing the current location in the object hierarchy.
133+
obj (Dict): The original dictionary object where the duplicate is observed.
134+
value (Any): The value to be hashed and used for identifying duplicates.
135+
key (Optional[str]): An optional key that, if provided, wraps the value in a dictionary before hashing.
136+
"""
137+
# create hash for the duplicate observed
138+
value_to_hash = value if key is None else {key: value}
139+
obj_hash = _hash_object(value_to_hash)
140+
if obj_hash:
141+
DUPLICATES[obj_hash].append((current_path, obj, value))
142+
143+
144+
def _add_to_shared_definitions(
145+
definitions: DefinitionsType,
146+
key: str,
147+
value: Any,
148+
) -> DefinitionsType:
106149
"""
107-
return key in FIELD_TAGS
150+
Add a value to the shared definitions under the specified key.
151+
152+
Args:
153+
definitions: The definitions dictionary to modify
154+
key: The key to use
155+
value: The value to add
156+
"""
157+
158+
if key not in definitions[SHARED_TAG]:
159+
definitions[SHARED_TAG][key] = value
160+
161+
return definitions
108162

109163

110-
def _collect_all_duplicates(
111-
node: ManifestType,
112-
) -> Tuple[FieldDuplicatesType, ComponentDuplicatesType]:
164+
def _collect_duplicates(node: ManifestType, path: Optional[List[str]] = None) -> None:
113165
"""
114166
Traverse the JSON object and collect all potential duplicate values and objects.
115167
116168
Args:
117169
node: The JSON object to analyze
118170
119171
Returns:
120-
A tuple of (field_duplicates, component_duplicates)
172+
DUPLICATES: A dictionary of duplicate objects
121173
"""
122174

123-
field_duplicates: FieldDuplicatesType = defaultdict(list)
124-
component_duplicates: ComponentDuplicatesType = defaultdict(list)
125-
126-
def collect_duplicates(obj: Dict, path: Optional[List[str]] = None) -> None:
127-
if not isinstance(obj, dict):
175+
try:
176+
if not isinstance(node, dict):
128177
return
129178

130179
path = [] if path is None else path
180+
131181
# Check if the object is empty
132-
for key, value in obj.items():
182+
for key, value in node.items():
133183
current_path = path + [key]
134184

135185
if isinstance(value, dict):
136186
# First process nested dictionaries
137-
collect_duplicates(value, current_path)
138-
187+
_collect_duplicates(value, current_path)
139188
# Process allowed-only component tags
140-
if _is_allowed_component(key):
141-
obj_hash = _hash_object(value)
142-
if obj_hash:
143-
component_duplicates[obj_hash].append((current_path, obj, value))
189+
if _is_allowed_tag(key):
190+
_add_duplicate(current_path, node, value)
191+
192+
# handle primitive types
193+
elif isinstance(value, (str, int, float, bool)):
194+
# Process allowed-only field tags
195+
if _is_allowed_tag(key):
196+
_add_duplicate(current_path, node, value, key)
144197

145-
# handle list[dict] cases
198+
# handle list cases
146199
elif isinstance(value, list):
147200
for i, item in enumerate(value):
148-
collect_duplicates(item, current_path + [str(i)])
149-
150-
# Process allowed-only field tags
151-
elif _is_allowed_field(key):
152-
hashable_value = _make_hashable(value)
153-
field_duplicates[(key, hashable_value)].append((current_path, obj))
154-
155-
try:
156-
collect_duplicates(node)
201+
_collect_duplicates(item, current_path + [str(i)])
157202
except Exception as e:
158203
raise ManifestDeduplicationException(str(e))
159204

160-
return field_duplicates, component_duplicates
161-
162205

163206
def _hash_object(node: Dict) -> Optional[str]:
164207
"""
@@ -173,26 +216,10 @@ def _hash_object(node: Dict) -> Optional[str]:
173216
if isinstance(node, Dict):
174217
# Sort keys to ensure consistent hash for same content
175218
return hashlib.md5(json.dumps(node, sort_keys=True).encode()).hexdigest()
176-
177219
return None
178220

179221

180-
def _make_hashable(value: Any) -> Any:
181-
"""
182-
Convert a value to a hashable representation.
183-
184-
Args:
185-
value: The value to make hashable
186-
187-
Returns:
188-
A hashable representation of the value
189-
"""
190-
return json.dumps(value) if not isinstance(value, Hashable) else value
191-
192-
193-
def _create_reference_key(
194-
definitions: DefinitionsType, key: str, value: Optional[Any] = None
195-
) -> str:
222+
def _create_reference_key(definitions: DefinitionsType, key: str) -> str:
196223
"""
197224
Create a unique reference key and handle collisions.
198225
@@ -206,9 +233,6 @@ def _create_reference_key(
206233

207234
counter = 1
208235
while key in definitions[SHARED_TAG]:
209-
# If the value is already in shared definitions with this key, no need to rename
210-
if value is not None and _is_same_value(definitions[SHARED_TAG].get(key), value):
211-
return key
212236
key = f"{key}_{counter}"
213237
counter += 1
214238
return key
@@ -225,97 +249,3 @@ def _create_ref_object(ref_key: str) -> Dict[str, str]:
225249
A reference object in the proper format
226250
"""
227251
return {"$ref": f"#/{DEF_TAG}/{SHARED_TAG}/{ref_key}"}
228-
229-
230-
def _is_same_value(val1: Any, val2: Any) -> bool:
231-
"""
232-
Check if two values are the same by comparing their JSON representation.
233-
234-
Args:
235-
val1: First value
236-
val2: Second value
237-
238-
Returns:
239-
True if the values are the same, False otherwise
240-
"""
241-
return json.dumps(val1, sort_keys=True) == json.dumps(val2, sort_keys=True)
242-
243-
244-
def _process_component_duplicates(
245-
definitions: ManifestType,
246-
component_duplicates: ComponentDuplicatesType,
247-
) -> None:
248-
"""
249-
Process duplicate objects and replace them with references.
250-
251-
Args:
252-
definitions: The definitions dictionary to modify
253-
component_duplicates: Dictionary of duplicate objects
254-
"""
255-
for obj_hash, occurrences in component_duplicates.items():
256-
# Skip non-duplicates
257-
if len(occurrences) < N_OCCURANCES:
258-
continue
259-
260-
# Take the value from the first occurrence, as they are the same
261-
path, _, value = occurrences[0]
262-
# take the component's name as the last part of it's path
263-
key = path[-1]
264-
# Create a meaningful reference key
265-
ref_key = _create_reference_key(definitions, key)
266-
# Add to definitions
267-
_add_to_shared_definitions(definitions, ref_key, value)
268-
269-
# Replace all occurrences with references
270-
for path, parent_obj, _ in occurrences:
271-
if path: # Make sure the path is valid
272-
key = path[-1]
273-
parent_obj[key] = _create_ref_object(ref_key)
274-
275-
276-
def _add_to_shared_definitions(
277-
definitions: DefinitionsType,
278-
key: str,
279-
value: Any,
280-
) -> DefinitionsType:
281-
"""
282-
Add a value to the shared definitions under the specified key.
283-
284-
Args:
285-
definitions: The definitions dictionary to modify
286-
key: The key to use
287-
value: The value to add
288-
"""
289-
290-
if key not in definitions[SHARED_TAG]:
291-
definitions[SHARED_TAG][key] = value
292-
293-
return definitions
294-
295-
296-
def _process_field_duplicates(
297-
definitions: ManifestType,
298-
field_duplicates: FieldDuplicatesType,
299-
) -> None:
300-
"""
301-
Process duplicate primitive values and replace them with references.
302-
303-
Args:
304-
definitions: The definitions dictionary to modify
305-
field_duplicates: Dictionary of duplicate primitive values
306-
"""
307-
308-
for (key, value), occurrences in field_duplicates.items():
309-
# Skip non-duplicates
310-
if len(occurrences) < N_OCCURANCES:
311-
continue
312-
313-
ref_key = _create_reference_key(definitions, key, value)
314-
# Add to definitions if not already there
315-
_add_to_shared_definitions(definitions, ref_key, value)
316-
317-
# Replace all occurrences with references
318-
for path, parent_obj in occurrences:
319-
if path: # Make sure the path is valid
320-
key = path[-1]
321-
parent_obj[key] = _create_ref_object(ref_key)

0 commit comments

Comments
 (0)