Skip to content

Commit 834d3b1

Browse files
authored
Merge pull request #395 from nodestream-proj/optional-timestamp-update
Flag to Disable last_ingested_at update
2 parents 22706c5 + 39921c8 commit 834d3b1

9 files changed

+48
-4
lines changed

nodestream/databases/ingest_strategy.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class IngestionStrategy(ABC):
2424
How that data is committed and how it is stored internally is decoupled through the `IngestionStrategy` interface.
2525
2626
Generally, your usage of nodestream is decoupled from `IngestionStrategy` unless you intend to provide an implementation
27-
of your own database writer. The writer API will give you a `DesiredIngestion` or other `Ingestible` object that needs a
27+
of your own database writer. The writer API will give you a `DesiredIngestion` or other `Ingestible` object that needs an
2828
instance of an `IngestionStrategy` to apply operations to.
2929
"""
3030

nodestream/interpreting/interpretation_passes.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def from_file_data(cls, interpretation_arg_list):
6363
def __init__(self, *interpretations: Interpretation):
6464
self.interpretations = interpretations
6565

66-
# Verifies that there is only one source node creator in all of the interpretations
66+
# Verifies that there is only one source node creator in all the interpretations
6767
def verify_uniqueness(self):
6868
source_node_generator_count = 0
6969
for interpretation in self.interpretations:

nodestream/interpreting/interpretations/property_mapping.py

+18
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,16 @@ def __init__(self, map_of_value_providers: Dict[str, ValueProvider]):
6969
def key_value_generator(
7070
self, context: "ProviderContext", norm_args: Dict[str, bool]
7171
):
72+
"""
73+
Generates key-value pairs by normalizing values provided by ValueProviders.
74+
75+
Args:
76+
context (ProviderContext): The context in which the values are being interpreted.
77+
norm_args (Dict[str, bool]): Arguments used for normalizing the values.
78+
79+
Yields:
80+
Tuple[str, Any]: A tuple containing the property key and its normalized value.
81+
"""
7282
for key, provider in self.map_of_value_providers.items():
7383
v = provider.normalize_single_value(context, norm_args)
7484
yield key, v
@@ -79,6 +89,14 @@ def apply_to(
7989
property_set: PropertySet,
8090
norm_args: Dict[str, bool],
8191
):
92+
"""
93+
Applies the key-value pairs generated by the key_value_generator to the given PropertySet.
94+
95+
Args:
96+
context (ProviderContext): The context in which the values are being interpreted.
97+
property_set (PropertySet): The PropertySet to which the key-value pairs will be applied.
98+
norm_args (Dict[str, bool]): Arguments used for normalizing the values.
99+
"""
82100
property_set.apply(self.key_value_generator(context, norm_args))
83101

84102
def __iter__(self):

nodestream/interpreting/interpretations/relationship_interpretation.py

+14
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ class RelationshipInterpretation(Interpretation, alias="relationship"):
9494
"properties_normalization",
9595
"key_search_algorithm",
9696
"node_additional_types",
97+
"node_update_last_ingested",
98+
"relationship_update_last_ingested",
9799
)
98100

99101
@deprecated_arugment("match_strategy", "node_creation_rule")
@@ -113,6 +115,8 @@ def __init__(
113115
key_normalization: Optional[Dict[str, Any]] = None,
114116
properties_normalization: Optional[Dict[str, Any]] = None,
115117
node_additional_types: Optional[Iterable[str]] = None,
118+
node_update_last_ingested: bool = True,
119+
relationship_update_last_ingested: bool = True,
116120
):
117121
self.can_find_many = find_many or iterate_on is not None
118122
self.cardinality = Cardinality(cardinality)
@@ -146,6 +150,8 @@ def __init__(
146150
self.node_key, self.key_normalization
147151
)
148152
self.node_additional_types = tuple(node_additional_types or tuple())
153+
self.node_update_last_ingested = node_update_last_ingested
154+
self.relationship_update_last_ingested = relationship_update_last_ingested
149155

150156
def interpret(self, context: ProviderContext):
151157
for sub_context in self.decomposer.decompose_record(context):
@@ -156,6 +162,10 @@ def interpret(self, context: ProviderContext):
156162

157163
def find_relationship(self, context: ProviderContext) -> Relationship:
158164
rel = Relationship(type=self.relationship_type.single_value(context))
165+
166+
if not self.relationship_update_last_ingested:
167+
rel.properties.remove_last_ingested()
168+
159169
relationship_key_property_mapping = PropertyMappingFromDict(
160170
self.relationship_key
161171
)
@@ -174,6 +184,10 @@ def find_related_nodes(self, context: ProviderContext) -> Iterable[Node]:
174184
key_values=PropertySet(key_set),
175185
additional_types=self.node_additional_types,
176186
)
187+
188+
if not self.node_update_last_ingested:
189+
node.properties.remove_last_ingested()
190+
177191
if node.has_valid_id:
178192
self.node_properties.apply_to(
179193
context, node.properties, self.properties_normalization

nodestream/interpreting/interpretations/source_node_interpretation.py

+6
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ class SourceNodeInterpretation(Interpretation, alias="source_node"):
8383
"additional_types",
8484
"norm_args",
8585
"allow_create",
86+
"update_last_ingested",
8687
)
8788

8889
def __init__(
@@ -96,6 +97,7 @@ def __init__(
9697
properties_normalization: Optional[Dict[str, Any]] = None,
9798
key_normalization: Optional[Dict[str, Any]] = None,
9899
allow_create: bool = True,
100+
update_last_ingested: bool = True,
99101
):
100102
if normalization and (properties_normalization or key_normalization):
101103
raise ValueError(
@@ -118,6 +120,7 @@ def __init__(
118120
self.creation_rule = NodeCreationRule.EAGER
119121
else:
120122
self.creation_rule = NodeCreationRule.MATCH_ONLY
123+
self.update_last_ingested = update_last_ingested
121124

122125
def interpret(self, context: ProviderContext):
123126
normalized_key: PropertySet = PropertySet()
@@ -135,6 +138,9 @@ def interpret(self, context: ProviderContext):
135138
normalized_properties,
136139
)
137140

141+
if not self.update_last_ingested:
142+
context.desired_ingest.source.properties.remove_last_ingested()
143+
138144
def expand_source_node_schema(self, source_node_schema: GraphObjectSchema):
139145
source_node_schema.add_keys(self.key)
140146
source_node_schema.add_properties(self.properties)

nodestream/model/graph_objects.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,13 @@ def merge(self, properties: "PropertySet"):
8686
for key, val in properties.items():
8787
self.set_property(key, val)
8888

89+
def remove_last_ingested(self):
90+
self.pop("last_ingested_at", None)
91+
8992

9093
@dataclass(slots=True)
9194
class Node(DeduplicatableObject):
92-
"""A `Node` is a entity that has a distinct identity.
95+
"""A `Node` is an entity that has a distinct identity.
9396
9497
Each `Node` represents an entity (a person, place, thing, category or other piece of data) that has a distinct
9598
identity. Nodestream assumes the underlying graph database layer is a Labeled Property Graph. The identity

pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "nodestream"
3-
version = "0.13.7"
3+
version = "0.13.8"
44
description = "A Fast, Declarative ETL for Graph Databases."
55
license = "GPL-3.0-only"
66
authors = [

tests/unit/interpreting/interpretations/test_relationship_interpretation.py

+2
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ def test_relationship_interpretation_find_relationship_property_normalization(
119119
node_key={"hello": "world"},
120120
relationship_properties=supplied_properties,
121121
properties_normalization={"do_lowercase_strings": True},
122+
node_update_last_ingested=False,
123+
relationship_update_last_ingested=False,
122124
).interpret(blank_context)
123125
assert_that(
124126
blank_context.desired_ingest.relationships[0].relationship.properties,

tests/unit/interpreting/interpretations/test_source_node_interpretation.py

+1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ def test_source_node_interpretation_applies_dynamic_properties(blank_context):
6666
node_type=EXPECTED_NODE_TYPE,
6767
key={},
6868
properties={"first_name": dynamic_first_name, "last_name": "Probst"},
69+
update_last_ingested=False,
6970
)
7071
subject.interpret(blank_context)
7172
actual_properties = blank_context.desired_ingest.source.properties

0 commit comments

Comments
 (0)