-
Notifications
You must be signed in to change notification settings - Fork 91
TDL-17512: Add missing tap-tester tests #134
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: crest-work
Are you sure you want to change the base?
Changes from all commits
b7b0d79
7fd7010
3088df8
3daafa7
273ce0e
7bfc94e
012242e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
from tap_tester import runner, menagerie | ||
from base import BaseTapTest | ||
|
||
class AllFieldsTest(BaseTapTest): | ||
|
||
@staticmethod | ||
def name(): | ||
return "tap_tester_shopify_all_fields_test" | ||
|
||
def test_run(self): | ||
""" | ||
Ensure running the tap with all streams and fields selected results in the | ||
replication of all fields. | ||
- Verify no unexpected streams were replicated | ||
- Verify that more than just the automatic fields are replicated for each stream | ||
""" | ||
|
||
self.start_date = "2021-04-01T00:00:00Z" | ||
expected_streams = self.expected_streams() | ||
|
||
# instantiate connection to run on "talenddatawearhouse" | ||
conn_id = self.create_connection(original_properties=False, original_credentials=False) | ||
|
||
# run check mode | ||
found_catalogs = menagerie.get_catalogs(conn_id) | ||
|
||
# table and field selection | ||
test_catalogs_all_fields = [catalog for catalog in found_catalogs | ||
if catalog.get('stream_name') in expected_streams] | ||
self.select_all_streams_and_fields(conn_id, test_catalogs_all_fields, select_all_fields=True) | ||
|
||
# grab metadata after performing table-and-field selection to set expectations | ||
stream_to_all_catalog_fields = dict() # used for asserting all fields are replicated | ||
for catalog in test_catalogs_all_fields: | ||
stream_id, stream_name = catalog['stream_id'], catalog['stream_name'] | ||
catalog_entry = menagerie.get_annotated_schema(conn_id, stream_id) | ||
fields_from_field_level_md = [md_entry['breadcrumb'][1] | ||
for md_entry in catalog_entry['metadata'] | ||
if md_entry['breadcrumb'] != []] | ||
stream_to_all_catalog_fields[stream_name] = set(fields_from_field_level_md) | ||
|
||
# run initial sync | ||
record_count_by_stream = self.run_sync(conn_id) | ||
synced_records = runner.get_records_from_target_output() | ||
|
||
# Verify no unexpected streams were replicated | ||
synced_stream_names = set(synced_records.keys()) | ||
self.assertSetEqual(expected_streams, synced_stream_names) | ||
|
||
for stream in expected_streams: | ||
with self.subTest(stream=stream): | ||
|
||
# expected values | ||
expected_automatic_keys = self.expected_primary_keys().get(stream, set()) | self.expected_replication_keys().get(stream, set()) | ||
# get all expected keys | ||
expected_all_keys = stream_to_all_catalog_fields[stream] | ||
|
||
# collect actual values | ||
messages = synced_records.get(stream) | ||
|
||
actual_all_keys = set() | ||
# collect actual values | ||
for message in messages['messages']: | ||
if message['action'] == 'upsert': | ||
actual_all_keys.update(message['data'].keys()) | ||
|
||
# Verify that you get some records for each stream | ||
self.assertGreater(record_count_by_stream.get(stream, -1), 0) | ||
|
||
# verify all fields for a stream were replicated | ||
self.assertGreater(len(expected_all_keys), len(expected_automatic_keys)) | ||
self.assertTrue(expected_automatic_keys.issubset(expected_all_keys), msg=f'{expected_automatic_keys-expected_all_keys} is not in "expected_all_keys"') | ||
|
||
if stream == 'abandoned_checkouts': | ||
expected_all_keys.remove('billing_address') | ||
elif stream == 'orders': | ||
# No field named 'order_adjustments' present in the 'order' object | ||
# Documentation: https://shopify.dev/api/admin-rest/2021-10/resources/order#resource_object | ||
expected_all_keys.remove('order_adjustments') | ||
|
||
self.assertSetEqual(expected_all_keys, actual_all_keys) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ def automatic_test(self, conn_id, testable_streams): | |
""" | ||
Verify that for each stream you can get multiple pages of data | ||
when no fields are selected and only the automatic fields are replicated. | ||
Verify that all replicated records have unique primary key values. | ||
|
||
PREREQUISITE | ||
For EACH stream add enough data that you surpass the limit of a single | ||
|
@@ -52,6 +53,7 @@ def automatic_test(self, conn_id, testable_streams): | |
record_count_by_stream = self.run_sync(conn_id) | ||
|
||
actual_fields_by_stream = runner.examine_target_output_for_fields() | ||
synced_records = runner.get_records_from_target_output() | ||
|
||
for stream in incremental_streams: | ||
with self.subTest(stream=stream): | ||
|
@@ -60,6 +62,11 @@ def automatic_test(self, conn_id, testable_streams): | |
# SKIP THIS ASSERTION FOR STREAMS WHERE YOU CANNOT GET | ||
# MORE THAN 1 PAGE OF DATA IN THE TEST ACCOUNT | ||
stream_metadata = self.expected_metadata().get(stream, {}) | ||
expected_primary_keys = self.expected_primary_keys().get(stream, set()) | ||
|
||
# collect records | ||
messages = synced_records.get(stream) | ||
|
||
minimum_record_count = stream_metadata.get( | ||
self.API_LIMIT, | ||
self.get_properties().get('result_per_page', self.DEFAULT_RESULTS_PER_PAGE) | ||
|
@@ -72,7 +79,15 @@ def automatic_test(self, conn_id, testable_streams): | |
# verify that only the automatic fields are sent to the target | ||
self.assertEqual( | ||
actual_fields_by_stream.get(stream, set()), | ||
self.expected_primary_keys().get(stream, set()) | | ||
expected_primary_keys | | ||
self.expected_replication_keys().get(stream, set()), | ||
msg="The fields sent to the target are not the automatic fields" | ||
) | ||
|
||
# Verify that all replicated records have unique primary key values. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good addition |
||
records_pks_set = {tuple([message.get('data').get(primary_key) for primary_key in expected_primary_keys]) | ||
for message in messages.get('messages')} | ||
records_pks_list = [tuple([message.get('data').get(primary_key) for primary_key in expected_primary_keys]) | ||
for message in messages.get('messages')] | ||
self.assertCountEqual(records_pks_set, records_pks_list, | ||
msg="We have duplicate records for {}".format(stream)) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ def test_run(self): | |
• Verify stream names follow naming convention | ||
streams should only have lowercase alphas and underscores | ||
• verify there is only 1 top level breadcrumb | ||
• Verify there are no duplicate/conflicting metadata entries. | ||
• verify replication key(s) | ||
• verify primary key(s) | ||
• verify that if there is a replication key we are doing INCREMENTAL otherwise FULL | ||
|
@@ -33,23 +34,24 @@ def test_run(self): | |
• verify that all other fields have inclusion of available (metadata and schema) | ||
""" | ||
conn_id = self.create_connection() | ||
expected_streams = self.expected_streams() | ||
|
||
# Verify number of actual streams discovered match expected | ||
found_catalogs = menagerie.get_catalogs(conn_id) | ||
self.assertGreater(len(found_catalogs), 0, | ||
msg="unable to locate schemas for connection {}".format(conn_id)) | ||
self.assertEqual(len(found_catalogs), | ||
len(self.expected_streams()), | ||
len(expected_streams), | ||
msg="Expected {} streams, actual was {} for connection {}," | ||
" actual {}".format( | ||
len(self.expected_streams()), | ||
len(expected_streams), | ||
len(found_catalogs), | ||
found_catalogs, | ||
conn_id)) | ||
|
||
# Verify the stream names discovered were what we expect | ||
found_catalog_names = {c['tap_stream_id'] for c in found_catalogs} | ||
self.assertEqual(set(self.expected_streams()), | ||
self.assertEqual(set(expected_streams), | ||
set(found_catalog_names), | ||
msg="Expected streams don't match actual streams") | ||
|
||
|
@@ -58,7 +60,7 @@ def test_run(self): | |
self.assertTrue(all([re.fullmatch(r"[a-z_]+", name) for name in found_catalog_names]), | ||
msg="One or more streams don't follow standard naming") | ||
|
||
for stream in self.expected_streams(): | ||
for stream in expected_streams: | ||
with self.subTest(stream=stream): | ||
catalog = next(iter([catalog for catalog in found_catalogs | ||
if catalog["stream_name"] == stream])) | ||
|
@@ -74,6 +76,14 @@ def test_run(self): | |
self.assertTrue(len(stream_properties) == 1, | ||
msg="There is more than one top level breadcrumb") | ||
|
||
# collect fields | ||
actual_fields = [] | ||
for md_entry in metadata: | ||
if md_entry['breadcrumb'] != []: | ||
actual_fields.append(md_entry['breadcrumb'][1]) | ||
# Verify there are no duplicate/conflicting metadata entries. | ||
self.assertEqual(len(actual_fields), len(set(actual_fields)), msg="There are duplicate entries in the fields of '{}' stream".format(stream)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a comment about this assertion at the function(test_run()) level comment above. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added function comment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good addition |
||
|
||
# verify replication key(s) | ||
self.assertEqual( | ||
set(stream_properties[0].get( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,8 @@ class StartDateTest(BaseTapTest): | |
• verify that a sync with a later start date has at least one record synced | ||
and less records than the 1st sync with a previous start date | ||
• verify that each stream has less records than the earlier start date sync | ||
• Verify by primary key values, that all records of the 2nd sync are included | ||
in the 1st sync since 2nd sync has a later start date. | ||
• verify all data from later start data has bookmark values >= start_date | ||
• verify that the minimum bookmark sent to the target for the later start_date sync | ||
is greater than or equal to the start date | ||
|
@@ -59,6 +61,9 @@ def test_run(self): | |
incremental_streams = {key for key, value in self.expected_replication_method().items() | ||
if value == self.INCREMENTAL} | ||
|
||
# get expected replication keys | ||
expected_replication_keys = self.expected_replication_keys() | ||
|
||
# IF THERE ARE STREAMS THAT SHOULD NOT BE TESTED | ||
# REPLACE THE EMPTY SET BELOW WITH THOSE STREAMS | ||
|
||
|
@@ -103,7 +108,6 @@ def test_run(self): | |
second_sync_record_count = self.run_sync(conn_id) | ||
second_total_records = reduce(lambda a, b: a + b, second_sync_record_count.values(), 0) | ||
second_sync_records = runner.get_records_from_target_output() | ||
second_min_bookmarks = self.min_bookmarks_by_stream(second_sync_records) | ||
|
||
# verify that at least one record synced and less records synced than the 1st connection | ||
self.assertGreater(second_total_records, 0) | ||
|
@@ -112,27 +116,45 @@ def test_run(self): | |
for stream in incremental_streams: | ||
with self.subTest(stream=stream): | ||
|
||
# get primary key values for both sync records | ||
expected_primary_keys = self.expected_primary_keys()[stream] | ||
primary_keys_list_1 = [tuple(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys) | ||
for message in first_sync_records.get(stream).get('messages') | ||
if message.get('action') == 'upsert'] | ||
primary_keys_list_2 = [tuple(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys) | ||
for message in second_sync_records.get(stream).get('messages') | ||
if message.get('action') == 'upsert'] | ||
primary_keys_sync_1 = set(primary_keys_list_1) | ||
primary_keys_sync_2 = set(primary_keys_list_2) | ||
|
||
# get replication key-values for all records for both syncs | ||
replication_key_sync_1 = [message.get('data').get(expected_rk) for expected_rk in expected_replication_keys.get(stream) | ||
for message in first_sync_records.get(stream).get('messages') | ||
if message.get('action') == 'upsert'] | ||
replication_key_sync_2 = [message.get('data').get(expected_rk) for expected_rk in expected_replication_keys.get(stream) | ||
for message in second_sync_records.get(stream).get('messages') | ||
if message.get('action') == 'upsert'] | ||
|
||
# verify that each stream has less records than the first connection sync | ||
self.assertGreaterEqual( | ||
first_sync_record_count.get(stream, 0), | ||
second_sync_record_count.get(stream, 0), | ||
msg="second had more records, start_date usage not verified") | ||
|
||
# verify all data from 2nd sync >= start_date | ||
target_mark = second_min_bookmarks.get(stream, {"mark": None}) | ||
target_value = next(iter(target_mark.values())) # there should be only one | ||
|
||
if target_value: | ||
# Verify by primary key values, that all records of the 2nd sync are included in the 1st sync since 2nd sync has a later start date. | ||
self.assertTrue(primary_keys_sync_2.issubset(primary_keys_sync_1)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a comment about this assertion at the function(test_run()) level comment above. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added function comment. |
||
|
||
# it's okay if there isn't target data for a stream | ||
try: | ||
target_value = self.local_to_utc(parse(target_value)) | ||
# get start dates for both syncs | ||
first_sync_start_date = self.get_properties()["start_date"] | ||
second_sync_start_date = self.start_date | ||
|
||
# verify that the minimum bookmark sent to the target for the second sync | ||
# is greater than or equal to the start date | ||
self.assertGreaterEqual(target_value, | ||
self.local_to_utc(parse(self.start_date))) | ||
# loop over the start date/state file date and replication key records for each syncs | ||
# to verify the records we synced are greater than the start date/state file date | ||
for start_date, record_replication_keys in zip( | ||
(first_sync_start_date, second_sync_start_date), | ||
(replication_key_sync_1, replication_key_sync_2)): | ||
|
||
except (OverflowError, ValueError, TypeError): | ||
print("bookmarks cannot be converted to dates, " | ||
"can't test start_date for {}".format(stream)) | ||
# loop over every replication key records and verify we have | ||
# synced records greater than start date/state file date | ||
for record_replication_key in record_replication_keys: | ||
self.assertGreaterEqual(record_replication_key, start_date) | ||
Comment on lines
+159
to
+160
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the try except and assertion above are unnecessary. The minimum replication key value is covered by this assertion. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed unnecessary assertion. |
Uh oh!
There was an error while loading. Please reload this page.