diff --git a/tap_shopify/__init__.py b/tap_shopify/__init__.py index c593e5f7..1bf95b05 100644 --- a/tap_shopify/__init__.py +++ b/tap_shopify/__init__.py @@ -111,10 +111,9 @@ def discover(): 'tap_stream_id': schema_name, 'schema': catalog_schema, 'metadata': get_discovery_metadata(stream, schema), - 'key_properties': stream.key_properties, - 'replication_key': stream.replication_key, - 'replication_method': stream.replication_method + 'key_properties': stream.key_properties } + streams.append(catalog_entry) return {'streams': streams} @@ -143,7 +142,7 @@ def sync(): singer.write_schema(stream["tap_stream_id"], stream["schema"], stream["key_properties"], - bookmark_properties=stream["replication_key"]) + bookmark_properties=stream.get("replication_key", None)) Context.counts[stream["tap_stream_id"]] = 0 # If there is a currently syncing stream bookmark, shuffle the diff --git a/tap_shopify/streams/transactions.py b/tap_shopify/streams/transactions.py index fc9b2a55..0fd72a71 100644 --- a/tap_shopify/streams/transactions.py +++ b/tap_shopify/streams/transactions.py @@ -1,6 +1,5 @@ import shopify import singer -from singer.utils import strftime, strptime_to_utc from tap_shopify.context import Context from tap_shopify.streams.base import (Stream, shopify_error_handling) @@ -55,7 +54,8 @@ def canonicalize(transaction_dict, field_name): class Transactions(Stream): name = 'transactions' - replication_key = 'created_at' + # As it is a child of orders stream and it is incremental based on its parent. + replication_key = None replication_object = shopify.Transaction # Added decorator over functions of shopify SDK replication_object.find = shopify_error_handling(replication_object.find) @@ -107,19 +107,10 @@ def get_objects(self): yield transaction def sync(self): - bookmark = self.get_bookmark() - max_bookmark = bookmark for transaction in self.get_objects(): transaction_dict = transaction.to_dict() - replication_value = strptime_to_utc(transaction_dict[self.replication_key]) - if replication_value >= bookmark: - for field_name in ['token', 'version', 'ack', 'timestamp', 'build']: - canonicalize(transaction_dict, field_name) - yield transaction_dict - - if replication_value > max_bookmark: - max_bookmark = replication_value - - self.update_bookmark(strftime(max_bookmark)) + for field_name in ['token', 'version', 'ack', 'timestamp', 'build']: + canonicalize(transaction_dict, field_name) + yield transaction_dict Context.stream_objects['transactions'] = Transactions diff --git a/tests/base.py b/tests/base.py index 8e453eb4..188645ba 100644 --- a/tests/base.py +++ b/tests/base.py @@ -28,6 +28,8 @@ class BaseTapTest(unittest.TestCase): START_DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ" BOOKMARK_COMPARISON_FORMAT = "%Y-%m-%dT00:00:00+00:00" DEFAULT_RESULTS_PER_PAGE = 175 + # skipped this stream due to change in the bookmarking logic of the stream. + SKIPPED_STREAMS = ('transactions') @staticmethod def tap_name(): @@ -107,7 +109,11 @@ def expected_metadata(self): self.API_LIMIT: 250}, "metafields": meta, "transactions": { - self.REPLICATION_KEYS: {"created_at"}, + # `transactions` is child stream of `orders` stream which is incremental. + # We are writing a separate bookmark for the child stream in which we are storing + # the bookmark based on the parent's replication key. + # But, we are not using any fields from the child record for it. + # That's why the `transactions` stream does not have replication_key but still it is incremental. self.PRIMARY_KEYS: {"id"}, self.FOREIGN_KEYS: {"order_id"}, self.REPLICATION_METHOD: self.INCREMENTAL, @@ -245,46 +251,56 @@ def max_bookmarks_by_stream(self, sync_records): """ max_bookmarks = {} for stream, batch in sync_records.items(): - - upsert_messages = [m for m in batch.get('messages') if m['action'] == 'upsert'] - stream_bookmark_key = self.expected_replication_keys().get(stream, set()) - assert len(stream_bookmark_key) == 1 # There shouldn't be a compound replication key - stream_bookmark_key = stream_bookmark_key.pop() - - bk_values = [message["data"].get(stream_bookmark_key) for message in upsert_messages] - max_bookmarks[stream] = {stream_bookmark_key: None} - for bk_value in bk_values: - if bk_value is None: - continue - - if max_bookmarks[stream][stream_bookmark_key] is None: - max_bookmarks[stream][stream_bookmark_key] = bk_value - - if bk_value > max_bookmarks[stream][stream_bookmark_key]: - max_bookmarks[stream][stream_bookmark_key] = bk_value + # `transactions` is child stream of `orders` stream which is incremental. + # We are writing a separate bookmark for the child stream in which we are storing + # the bookmark based on the parent's replication key. + # But, we are not using any fields from the child record for it. + # That's why the `transactions` stream does not have replication_key but still it is incremental. + if stream not in self.SKIPPED_STREAMS: + upsert_messages = [m for m in batch.get('messages') if m['action'] == 'upsert'] + stream_bookmark_key = self.expected_replication_keys().get(stream, set()) + assert len(stream_bookmark_key) == 1 # There shouldn't be a compound replication key + stream_bookmark_key = stream_bookmark_key.pop() + + bk_values = [message["data"].get(stream_bookmark_key) for message in upsert_messages] + max_bookmarks[stream] = {stream_bookmark_key: None} + for bk_value in bk_values: + if bk_value is None: + continue + + if max_bookmarks[stream][stream_bookmark_key] is None: + max_bookmarks[stream][stream_bookmark_key] = bk_value + + if bk_value > max_bookmarks[stream][stream_bookmark_key]: + max_bookmarks[stream][stream_bookmark_key] = bk_value return max_bookmarks def min_bookmarks_by_stream(self, sync_records): """Return the minimum value for the replication key for each stream""" min_bookmarks = {} for stream, batch in sync_records.items(): - - upsert_messages = [m for m in batch.get('messages') if m['action'] == 'upsert'] - stream_bookmark_key = self.expected_replication_keys().get(stream, set()) - assert len(stream_bookmark_key) == 1 # There shouldn't be a compound replication key - (stream_bookmark_key, ) = stream_bookmark_key - - bk_values = [message["data"].get(stream_bookmark_key) for message in upsert_messages] - min_bookmarks[stream] = {stream_bookmark_key: None} - for bk_value in bk_values: - if bk_value is None: - continue - - if min_bookmarks[stream][stream_bookmark_key] is None: - min_bookmarks[stream][stream_bookmark_key] = bk_value - - if bk_value < min_bookmarks[stream][stream_bookmark_key]: - min_bookmarks[stream][stream_bookmark_key] = bk_value + # `transactions` is child stream of `orders` stream which is incremental. + # We are writing a separate bookmark for the child stream in which we are storing + # the bookmark based on the parent's replication key. + # But, we are not using any fields from the child record for it. + # That's why the `transactions` stream does not have replication_key but still it is incremental. + if stream not in self.SKIPPED_STREAMS: + upsert_messages = [m for m in batch.get('messages') if m['action'] == 'upsert'] + stream_bookmark_key = self.expected_replication_keys().get(stream, set()) + assert len(stream_bookmark_key) == 1 # There shouldn't be a compound replication key + (stream_bookmark_key, ) = stream_bookmark_key + + bk_values = [message["data"].get(stream_bookmark_key) for message in upsert_messages] + min_bookmarks[stream] = {stream_bookmark_key: None} + for bk_value in bk_values: + if bk_value is None: + continue + + if min_bookmarks[stream][stream_bookmark_key] is None: + min_bookmarks[stream][stream_bookmark_key] = bk_value + + if bk_value < min_bookmarks[stream][stream_bookmark_key]: + min_bookmarks[stream][stream_bookmark_key] = bk_value return min_bookmarks @staticmethod diff --git a/tests/test_bookmarks.py b/tests/test_bookmarks.py index a6a1f135..bab9158b 100644 --- a/tests/test_bookmarks.py +++ b/tests/test_bookmarks.py @@ -48,8 +48,13 @@ def bookmarks_test(self, conn_id, testable_streams): # Select all streams and no fields within streams found_catalogs = menagerie.get_catalogs(conn_id) + # `transactions` is child stream of `orders` stream which is incremental. + # We are writing a separate bookmark for the child stream in which we are storing + # the bookmark based on the parent's replication key. + # But, we are not using any fields from the child record for it. + # That's why the `transactions` stream does not have replication_key but still it is incremental. incremental_streams = {key for key, value in self.expected_replication_method().items() - if value == self.INCREMENTAL and key in testable_streams} + if value == self.INCREMENTAL and key in testable_streams and key not in self.SKIPPED_STREAMS} # Our test data sets for Shopify do not have any abandoned_checkouts our_catalogs = [catalog for catalog in found_catalogs if @@ -101,6 +106,7 @@ def bookmarks_test(self, conn_id, testable_streams): stream_bookmark_key) == 1 # There shouldn't be a compound replication key stream_bookmark_key = stream_bookmark_key.pop() + state_value = first_sync_state.get("bookmarks", {}).get( stream, {None: None}).get(stream_bookmark_key) target_value = first_max_bookmarks.get( diff --git a/tests/test_bookmarks_updated.py b/tests/test_bookmarks_updated.py index 2dbfd466..7780e01c 100644 --- a/tests/test_bookmarks_updated.py +++ b/tests/test_bookmarks_updated.py @@ -75,7 +75,7 @@ def bookmarks_test(self, conn_id, testable_streams): first_sync_records = runner.get_records_from_target_output() # BUG:TDL-17087 : State has additional values which are not streams # Need to remove additional values from bookmark value - extra_stuff = {'transaction_orders', 'metafield_products', 'refund_orders', 'product_variants'} + extra_stuff = {'metafield_products', 'refund_orders', 'product_variants'} for keys in list(first_sync_bookmark['bookmarks'].keys()): if keys in extra_stuff: first_sync_bookmark['bookmarks'].pop(keys) @@ -88,7 +88,7 @@ def bookmarks_test(self, conn_id, testable_streams): #simulated_states = self.calculated_states_by_stream(first_sync_bookmark) # We are hardcoding the updated state to ensure that we get atleast 1 record in second sync. These values have been provided after reviewing the max bookmark value for each of the streams - simulated_states = {'products': {'updated_at': '2021-12-20T05:10:05.000000Z'}, 'collects': {'updated_at': '2021-09-01T09:08:28.000000Z'}, 'abandoned_checkouts': {'updated_at': '2022-02-02T16:00:00.000000Z'}, 'inventory_levels': {'updated_at': '2021-12-20T05:09:34.000000Z'}, 'locations': {'updated_at': '2021-07-20T09:00:22.000000Z'}, 'events': {'created_at': '2021-12-20T05:09:01.000000Z'}, 'inventory_items': {'updated_at': '2021-09-15T19:44:11.000000Z'}, 'transactions': {'created_at': '2021-12-20T00:08:52-05:00'}, 'metafields': {'updated_at': '2021-09-07T21:18:05.000000Z'}, 'order_refunds': {'created_at': '2021-05-01T17:41:18.000000Z'}, 'customers': {'updated_at': '2021-12-20T05:08:17.000000Z'}, 'orders': {'updated_at': '2021-12-20T05:09:01.000000Z'}, 'custom_collections': {'updated_at': '2021-12-20T17:41:18.000000Z'}} + simulated_states = {'products': {'updated_at': '2021-12-20T05:10:05.000000Z'}, 'collects': {'updated_at': '2022-01-15T00:53:24.000000Z'}, 'abandoned_checkouts': {'updated_at': '2022-02-02T16:00:00.000000Z'}, 'inventory_levels': {'updated_at': '2021-12-20T05:09:34.000000Z'}, 'locations': {'updated_at': '2021-07-20T09:00:22.000000Z'}, 'events': {'created_at': '2021-12-20T05:09:01.000000Z'}, 'inventory_items': {'updated_at': '2021-09-15T19:44:11.000000Z'}, 'transaction_orders': {'updated_at': '2021-12-12T00:08:33.000000Z'}, 'metafields': {'updated_at': '2021-09-07T21:18:05.000000Z'}, 'order_refunds': {'created_at': '2021-05-01T17:41:18.000000Z'}, 'customers': {'updated_at': '2021-12-20T05:08:17.000000Z'}, 'orders': {'updated_at': '2021-12-20T05:09:01.000000Z'}, 'custom_collections': {'updated_at': '2021-12-20T17:41:18.000000Z'}} for stream, updated_state in simulated_states.items(): new_state['bookmarks'][stream] = updated_state @@ -115,15 +115,22 @@ def bookmarks_test(self, conn_id, testable_streams): if record.get('action') == 'upsert'] second_sync_messages = [record.get('data') for record in second_sync_records.get(stream, {}).get('messages', []) if record.get('action') == 'upsert'] - first_bookmark_value = first_sync_bookmark.get('bookmarks', {stream: None}).get(stream) + if stream not in self.SKIPPED_STREAMS: + first_bookmark_value = first_sync_bookmark.get('bookmarks', {stream: None}).get(stream) + second_bookmark_value = second_sync_bookmark.get('bookmarks', {stream: None}).get(stream) + else: + first_bookmark_value = first_sync_bookmark.get('bookmarks').get('transaction_orders') + + second_bookmark_value = second_sync_bookmark.get('bookmarks').get('transaction_orders') first_bookmark_value = list(first_bookmark_value.values())[0] - second_bookmark_value = second_sync_bookmark.get('bookmarks', {stream: None}).get(stream) second_bookmark_value = list(second_bookmark_value.values())[0] - replication_key = next(iter(expected_replication_keys[stream])) first_bookmark_value_utc = self.convert_state_to_utc(first_bookmark_value) second_bookmark_value_utc = self.convert_state_to_utc(second_bookmark_value) - simulated_bookmark = new_state['bookmarks'][stream] + if stream not in self.SKIPPED_STREAMS: + simulated_bookmark = new_state['bookmarks'][stream] + else: + simulated_bookmark = new_state['bookmarks']['transaction_orders'] simulated_bookmark_value = list(simulated_bookmark.values())[0] # verify the syncs sets a bookmark of the expected form @@ -133,27 +140,32 @@ def bookmarks_test(self, conn_id, testable_streams): self.assertTrue(self.is_expected_date_format(second_bookmark_value)) # verify the 2nd bookmark is equal to 1st sync bookmark - #NOT A BUG (IS the expected behaviour for shopify as they are using date windowing : TDL-17096 : 2nd bookmark value is getting assigned from the execution time rather than the actual bookmark time. This is an invalid assertion for shopify + #NOT A BUG (IS the expected behavior for shopify as they are using date windowing : TDL-17096 : 2nd bookmark value is getting assigned from the execution time rather than the actual bookmark time. This is an invalid assertion for shopify #self.assertEqual(first_bookmark_value, second_bookmark_value) - for record in first_sync_messages: - replication_key_value = record.get(replication_key) - # verify 1st sync bookmark value is the max replication key value for a given stream - self.assertLessEqual(replication_key_value, first_bookmark_value_utc, msg="First sync bookmark was set incorrectly, a record with a greater replication key value was synced") - - for record in second_sync_messages: - replication_key_value = record.get(replication_key) - # verify the 2nd sync replication key value is greater or equal to the 1st sync bookmarks - self.assertGreaterEqual(replication_key_value, simulated_bookmark_value, msg="Second sync records do not respect the previous bookmark") - # verify the 2nd sync bookmark value is the max replication key value for a given stream - self.assertLessEqual(replication_key_value, second_bookmark_value_utc, msg="Second sync bookmark was set incorrectly, a record with a greater replication key value was synced") + # The `transactions` stream is a child of the `orders` stream. Hence the bookmark for transactions is solely dependent on the value of bookmark in 'transaction_orders' which stores the parent record's bookmark. + # Hence it doesn't have its own replication key. + if stream not in self.SKIPPED_STREAMS: + replication_key = next(iter(expected_replication_keys[stream])) + for record in first_sync_messages: + replication_key_value = record.get(replication_key) + # verify 1st sync bookmark value is the max replication key value for a given stream + self.assertLessEqual(replication_key_value, first_bookmark_value_utc, msg="First sync bookmark was set incorrectly, a record with a greater replication key value was synced") + + for record in second_sync_messages: + replication_key_value = record.get(replication_key) + # verify the 2nd sync replication key value is greater or equal to the 1st sync bookmarks + self.assertGreaterEqual(replication_key_value, simulated_bookmark_value, msg="Second sync records do not respect the previous bookmark") + # verify the 2nd sync bookmark value is the max replication key value for a given stream + self.assertLessEqual(replication_key_value, second_bookmark_value_utc, msg="Second sync bookmark was set incorrectly, a record with a greater replication key value was synced") # verify that we get less data in the 2nd sync # collects has all the records with the same value of replication key, so we are removing from this assertion - if stream not in ('collects'): + # As the bookmark for `transactions` is solely dependent on the value of bookmark in 'transaction_orders' which stores the parent record's + # bookmark, hence we'd now get all the data for transactions stream without filtering on `created_at` + if stream not in self.SKIPPED_STREAMS: self.assertLess(second_sync_count, first_sync_count, msg="Second sync does not have less records, bookmark usage not verified") # verify that we get atleast 1 record in the second sync - if stream not in ('collects'): - self.assertGreater(second_sync_count, 0, msg="Second sync did not yield any records") + self.assertGreater(second_sync_count, 0, msg="Second sync did not yield any records") diff --git a/tests/test_discovery.py b/tests/test_discovery.py index b35738c0..abd9cc67 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -107,8 +107,13 @@ def test_run(self): # verify that if there is a replication key we are doing INCREMENTAL otherwise FULL actual_replication_method = stream_properties[0].get( "metadata", {self.REPLICATION_METHOD: None}).get(self.REPLICATION_METHOD) + # `transactions` is child stream of `orders` stream which is incremental. + # We are writing a separate bookmark for the child stream in which we are storing + # the bookmark based on the parent's replication key. + # But, we are not using any fields from the child record for it. + # That's why the `transactions` stream does not have replication_key but still it is incremental. if stream_properties[0].get( - "metadata", {self.REPLICATION_KEYS: []}).get(self.REPLICATION_KEYS, []): + "metadata", {self.REPLICATION_KEYS: []}).get(self.REPLICATION_KEYS, []) or stream in self.SKIPPED_STREAMS: self.assertTrue(actual_replication_method == self.INCREMENTAL, msg="Expected INCREMENTAL replication " diff --git a/tests/test_start_date.py b/tests/test_start_date.py index c2b13c22..8c387094 100644 --- a/tests/test_start_date.py +++ b/tests/test_start_date.py @@ -151,7 +151,7 @@ def test_run(self): for start_date, target_mark in zip((first_sync_start_date, second_sync_start_date), (first_sync_target_mark, second_sync_target_mark)): target_value = next(iter(target_mark.values())) # there should be only one - if target_value: + if target_value and stream not in self.SKIPPED_STREAMS: # it's okay if there isn't target data for a stream try: diff --git a/tests/unittests/test_bookmark_for_transactions.py b/tests/unittests/test_bookmark_for_transactions.py new file mode 100644 index 00000000..1784a43b --- /dev/null +++ b/tests/unittests/test_bookmark_for_transactions.py @@ -0,0 +1,33 @@ +import unittest +from unittest import mock +from singer.utils import strptime_to_utc +from tap_shopify.context import Context +from tap_shopify.streams import transactions + +TRANSACTIONS_OBJECT = Context.stream_objects['transactions']() + +class Transaction(): + '''The Transaction object to return.''' + def __init__(self, id, created_at): + self.id = id + self.created_at = created_at + + def to_dict(self): + return {"id": self.id, "created_at": self.created_at} + +tx1 = Transaction("i11", "2021-08-11T01:57:05-04:00") +tx2 = Transaction("i12", "2021-08-12T01:57:05-04:00") +tx3 = Transaction("i21", "2021-08-13T01:57:05-04:00") +tx4 = Transaction("i22", "2021-08-14T01:57:05-04:00") + +class TestTransactionsBookmark(unittest.TestCase): + + @mock.patch("tap_shopify.streams.base.Stream.get_bookmark") + @mock.patch('tap_shopify.streams.transactions.Transactions.get_objects') + def test_sync(self, mock_get_transactions, mock_get_bookmark): + '''Verify that the sync returns all the rcords for transactions without filtering through bookmark.''' + mock_get_transactions.return_value = [tx1, tx2, tx3, tx4] + mock_get_bookmark.return_value = strptime_to_utc("2021-08-13T01:05:05-04:00") + expected_sync = [tx1.to_dict(), tx2.to_dict(), tx3.to_dict(), tx4.to_dict()] + actual_sync = list(TRANSACTIONS_OBJECT.sync()) + self.assertEqual(expected_sync, actual_sync)