-
Notifications
You must be signed in to change notification settings - Fork 91
Tdl 15459 update bookmarking for transactions #141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: crest-work
Are you sure you want to change the base?
Changes from all commits
e0c55a4
df05032
432068e
5a9d583
ff0a963
012c578
c925f2f
3c97292
8af075b
1814b1c
0675c47
eafd58e
5546bfc
43ee4b1
baa88bc
910567f
c989cfb
792562e
3f82903
037ee46
515640b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,8 +48,13 @@ def bookmarks_test(self, conn_id, testable_streams): | |
|
||
# Select all streams and no fields within streams | ||
found_catalogs = menagerie.get_catalogs(conn_id) | ||
# `transactions` is child stream of `orders` stream which is incremental. | ||
# We are writing a separate bookmark for the child stream in which we are storing | ||
# the bookmark based on the parent's replication key. | ||
# But, we are not using any fields from the child record for it. | ||
# That's why the `transactions` stream does not have replication_key but still it is incremental. | ||
incremental_streams = {key for key, value in self.expected_replication_method().items() | ||
if value == self.INCREMENTAL and key in testable_streams} | ||
if value == self.INCREMENTAL and key in testable_streams and key not in self.SKIPPED_STREAMS} | ||
|
||
# Our test data sets for Shopify do not have any abandoned_checkouts | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The abandoned_checkouts stream should remain under test. It is incorrectly removed on crest master. See base.py init There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
our_catalogs = [catalog for catalog in found_catalogs if | ||
|
@@ -101,6 +106,7 @@ def bookmarks_test(self, conn_id, testable_streams): | |
stream_bookmark_key) == 1 # There shouldn't be a compound replication key | ||
stream_bookmark_key = stream_bookmark_key.pop() | ||
|
||
|
||
state_value = first_sync_state.get("bookmarks", {}).get( | ||
stream, {None: None}).get(stream_bookmark_key) | ||
target_value = first_max_bookmarks.get( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -75,7 +75,7 @@ def bookmarks_test(self, conn_id, testable_streams): | |
first_sync_records = runner.get_records_from_target_output() | ||
# BUG:TDL-17087 : State has additional values which are not streams | ||
# Need to remove additional values from bookmark value | ||
extra_stuff = {'transaction_orders', 'metafield_products', 'refund_orders', 'product_variants'} | ||
extra_stuff = {'metafield_products', 'refund_orders', 'product_variants'} | ||
for keys in list(first_sync_bookmark['bookmarks'].keys()): | ||
if keys in extra_stuff: | ||
first_sync_bookmark['bookmarks'].pop(keys) | ||
|
@@ -88,7 +88,7 @@ def bookmarks_test(self, conn_id, testable_streams): | |
#simulated_states = self.calculated_states_by_stream(first_sync_bookmark) | ||
|
||
# We are hardcoding the updated state to ensure that we get atleast 1 record in second sync. These values have been provided after reviewing the max bookmark value for each of the streams | ||
simulated_states = {'products': {'updated_at': '2021-12-20T05:10:05.000000Z'}, 'collects': {'updated_at': '2021-09-01T09:08:28.000000Z'}, 'abandoned_checkouts': {'updated_at': '2022-02-02T16:00:00.000000Z'}, 'inventory_levels': {'updated_at': '2021-12-20T05:09:34.000000Z'}, 'locations': {'updated_at': '2021-07-20T09:00:22.000000Z'}, 'events': {'created_at': '2021-12-20T05:09:01.000000Z'}, 'inventory_items': {'updated_at': '2021-09-15T19:44:11.000000Z'}, 'transactions': {'created_at': '2021-12-20T00:08:52-05:00'}, 'metafields': {'updated_at': '2021-09-07T21:18:05.000000Z'}, 'order_refunds': {'created_at': '2021-05-01T17:41:18.000000Z'}, 'customers': {'updated_at': '2021-12-20T05:08:17.000000Z'}, 'orders': {'updated_at': '2021-12-20T05:09:01.000000Z'}, 'custom_collections': {'updated_at': '2021-12-20T17:41:18.000000Z'}} | ||
simulated_states = {'products': {'updated_at': '2021-12-20T05:10:05.000000Z'}, 'collects': {'updated_at': '2022-01-15T00:53:24.000000Z'}, 'abandoned_checkouts': {'updated_at': '2022-02-02T16:00:00.000000Z'}, 'inventory_levels': {'updated_at': '2021-12-20T05:09:34.000000Z'}, 'locations': {'updated_at': '2021-07-20T09:00:22.000000Z'}, 'events': {'created_at': '2021-12-20T05:09:01.000000Z'}, 'inventory_items': {'updated_at': '2021-09-15T19:44:11.000000Z'}, 'transaction_orders': {'updated_at': '2021-12-12T00:08:33.000000Z'}, 'metafields': {'updated_at': '2021-09-07T21:18:05.000000Z'}, 'order_refunds': {'created_at': '2021-05-01T17:41:18.000000Z'}, 'customers': {'updated_at': '2021-12-20T05:08:17.000000Z'}, 'orders': {'updated_at': '2021-12-20T05:09:01.000000Z'}, 'custom_collections': {'updated_at': '2021-12-20T17:41:18.000000Z'}} | ||
|
||
for stream, updated_state in simulated_states.items(): | ||
new_state['bookmarks'][stream] = updated_state | ||
|
@@ -115,15 +115,22 @@ def bookmarks_test(self, conn_id, testable_streams): | |
if record.get('action') == 'upsert'] | ||
second_sync_messages = [record.get('data') for record in second_sync_records.get(stream, {}).get('messages', []) | ||
if record.get('action') == 'upsert'] | ||
first_bookmark_value = first_sync_bookmark.get('bookmarks', {stream: None}).get(stream) | ||
if stream not in self.SKIPPED_STREAMS: | ||
first_bookmark_value = first_sync_bookmark.get('bookmarks', {stream: None}).get(stream) | ||
second_bookmark_value = second_sync_bookmark.get('bookmarks', {stream: None}).get(stream) | ||
else: | ||
first_bookmark_value = first_sync_bookmark.get('bookmarks').get('transaction_orders') | ||
|
||
second_bookmark_value = second_sync_bookmark.get('bookmarks').get('transaction_orders') | ||
first_bookmark_value = list(first_bookmark_value.values())[0] | ||
second_bookmark_value = second_sync_bookmark.get('bookmarks', {stream: None}).get(stream) | ||
second_bookmark_value = list(second_bookmark_value.values())[0] | ||
|
||
replication_key = next(iter(expected_replication_keys[stream])) | ||
first_bookmark_value_utc = self.convert_state_to_utc(first_bookmark_value) | ||
second_bookmark_value_utc = self.convert_state_to_utc(second_bookmark_value) | ||
simulated_bookmark = new_state['bookmarks'][stream] | ||
if stream not in self.SKIPPED_STREAMS: | ||
simulated_bookmark = new_state['bookmarks'][stream] | ||
else: | ||
simulated_bookmark = new_state['bookmarks']['transaction_orders'] | ||
simulated_bookmark_value = list(simulated_bookmark.values())[0] | ||
|
||
# verify the syncs sets a bookmark of the expected form | ||
|
@@ -133,27 +140,32 @@ def bookmarks_test(self, conn_id, testable_streams): | |
self.assertTrue(self.is_expected_date_format(second_bookmark_value)) | ||
|
||
# verify the 2nd bookmark is equal to 1st sync bookmark | ||
#NOT A BUG (IS the expected behaviour for shopify as they are using date windowing : TDL-17096 : 2nd bookmark value is getting assigned from the execution time rather than the actual bookmark time. This is an invalid assertion for shopify | ||
#NOT A BUG (IS the expected behavior for shopify as they are using date windowing : TDL-17096 : 2nd bookmark value is getting assigned from the execution time rather than the actual bookmark time. This is an invalid assertion for shopify | ||
#self.assertEqual(first_bookmark_value, second_bookmark_value) | ||
|
||
for record in first_sync_messages: | ||
replication_key_value = record.get(replication_key) | ||
# verify 1st sync bookmark value is the max replication key value for a given stream | ||
self.assertLessEqual(replication_key_value, first_bookmark_value_utc, msg="First sync bookmark was set incorrectly, a record with a greater replication key value was synced") | ||
|
||
for record in second_sync_messages: | ||
replication_key_value = record.get(replication_key) | ||
# verify the 2nd sync replication key value is greater or equal to the 1st sync bookmarks | ||
self.assertGreaterEqual(replication_key_value, simulated_bookmark_value, msg="Second sync records do not respect the previous bookmark") | ||
# verify the 2nd sync bookmark value is the max replication key value for a given stream | ||
self.assertLessEqual(replication_key_value, second_bookmark_value_utc, msg="Second sync bookmark was set incorrectly, a record with a greater replication key value was synced") | ||
# The `transactions` stream is a child of the `orders` stream. Hence the bookmark for transactions is solely dependent on the value of bookmark in 'transaction_orders' which stores the parent record's bookmark. | ||
# Hence it doesn't have its own replication key. | ||
if stream not in self.SKIPPED_STREAMS: | ||
replication_key = next(iter(expected_replication_keys[stream])) | ||
for record in first_sync_messages: | ||
replication_key_value = record.get(replication_key) | ||
# verify 1st sync bookmark value is the max replication key value for a given stream | ||
self.assertLessEqual(replication_key_value, first_bookmark_value_utc, msg="First sync bookmark was set incorrectly, a record with a greater replication key value was synced") | ||
|
||
for record in second_sync_messages: | ||
replication_key_value = record.get(replication_key) | ||
# verify the 2nd sync replication key value is greater or equal to the 1st sync bookmarks | ||
self.assertGreaterEqual(replication_key_value, simulated_bookmark_value, msg="Second sync records do not respect the previous bookmark") | ||
# verify the 2nd sync bookmark value is the max replication key value for a given stream | ||
self.assertLessEqual(replication_key_value, second_bookmark_value_utc, msg="Second sync bookmark was set incorrectly, a record with a greater replication key value was synced") | ||
|
||
# verify that we get less data in the 2nd sync | ||
# collects has all the records with the same value of replication key, so we are removing from this assertion | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the manipulated state should be altered so this does not happen. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here, for the transactions stream, we had 2 bookmarks earlier i.e. transactions and transaction_orders(which stores the bookmark of the parent i.e. orders bookmark). However, as this card suggested removing the filtering of the transactions based on the transactions bookmark, we have now removed the transaction_orders completely. Hence this assertion of checking the replication key value against the bookmark value would now actually check the transaction_orders bookmark value against the replication value of the transactions. Thus, we have skipped this assertion for the stream. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah I was not clear, I meant for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have generated more data for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice! |
||
if stream not in ('collects'): | ||
# As the bookmark for `transactions` is solely dependent on the value of bookmark in 'transaction_orders' which stores the parent record's | ||
# bookmark, hence we'd now get all the data for transactions stream without filtering on `created_at` | ||
if stream not in self.SKIPPED_STREAMS: | ||
self.assertLess(second_sync_count, first_sync_count, | ||
msg="Second sync does not have less records, bookmark usage not verified") | ||
|
||
# verify that we get atleast 1 record in the second sync | ||
if stream not in ('collects'): | ||
self.assertGreater(second_sync_count, 0, msg="Second sync did not yield any records") | ||
self.assertGreater(second_sync_count, 0, msg="Second sync did not yield any records") |
Uh oh!
There was an error while loading. Please reload this page.