Skip to content

Commit 9436167

Browse files
authored
New automatic_keys Support and Bulk State Persistence Logic (#237)
* Store the bulk operation details in the state once it is complete for the extraction if the existing fails due to the time limit of 23 hours * add automatic keys properties * update the automatic key name * Increase the chunk size to 1MB * remove chunk size modifications * update setup and changelog * update discovery test * update automatic test for the order_refunds stream * include extra automatic keys logic in the automatic test
1 parent 0ca13f8 commit 9436167

File tree

8 files changed

+25
-5
lines changed

8 files changed

+25
-5
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# Changelog
22

3+
### 3.7.2
4+
* New automatic_keys Support and Bulk State Persistence Logic [#237](https://github.com/singer-io/tap-shopify/pull/237)
5+
36
### 3.7.1
47
* Query and extract multiple pages of products for each collection [#227](https://github.com/singer-io/tap-shopify/pull/231)
58

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
setup(
66
name="tap-shopify",
7-
version="3.7.1",
7+
version="3.7.2",
88
description="Singer.io tap for extracting Shopify data",
99
author="Stitch",
1010
url="http://github.com/singer-io/tap-shopify",

tap_shopify/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ def get_discovery_metadata(stream, schema):
8686
mdata = metadata.write(mdata, (), 'valid-replication-keys', [stream.replication_key])
8787

8888
for field_name in schema['properties'].keys():
89-
if field_name in stream.key_properties or field_name == stream.replication_key:
89+
if field_name in stream.key_properties or field_name == stream.replication_key \
90+
or field_name in stream.automatic_keys:
9091
mdata = metadata.write(mdata, ('properties', field_name), 'inclusion', 'automatic')
9192
elif field_name in UNSUPPORTED_FIELDS and not has_read_users_access():
9293
mdata = metadata.write(mdata, ('properties', field_name), 'inclusion', 'unsupported')

tap_shopify/streams/base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ class Stream():
132132
name = None
133133
replication_method = 'INCREMENTAL'
134134
replication_key = 'updatedAt'
135+
automatic_keys = []
135136
key_properties = ['id']
136137
date_window_size = None
137138
data_key = None

tap_shopify/streams/order_refunds.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ class OrderRefunds(Stream):
1111
data_key = "orders"
1212
child_data_key = "refunds"
1313
replication_key = "updatedAt"
14+
automatic_keys = ["order"]
1415

1516
# pylint: disable=too-many-locals
1617
def get_objects(self):

tap_shopify/streams/orders.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1118,6 +1118,15 @@ def fetch_bulk_operation(op_id):
11181118

11191119
if current_status == "COMPLETED":
11201120
LOGGER.info("Bulk operation completed. File size: %s bytes", op.get("fileSize"))
1121+
self.update_bookmark(
1122+
bookmark_value=utils.strftime(current_bookmark),
1123+
bulk_op_metadata={
1124+
"bulk_operation_id": op.get("id"),
1125+
"status": current_status,
1126+
"created_at": op.get("createdAt"),
1127+
"last_date_window": self.date_window_size,
1128+
}
1129+
)
11211130
return op.get("url")
11221131

11231132
if current_status in ["FAILED", "CANCELED"]:
@@ -1281,12 +1290,11 @@ def get_objects(self):
12811290
current_bookmark = max(current_bookmark, replication_value)
12821291

12831292
yield obj
1284-
1285-
self.clear_bulk_operation_state()
12861293
else:
12871294
LOGGER.info("No data returned for the date range: %s to %s",
12881295
last_updated_at, query_end)
12891296

1297+
self.clear_bulk_operation_state()
12901298
last_updated_at = query_end
12911299
max_bookmark_value = min(sync_start, current_bookmark)
12921300
self.update_bookmark(utils.strftime(max_bookmark_value))

tests/test_automatic_fields.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ def automatic_test(self, conn_id, testable_streams):
6464
stream_metadata = self.expected_metadata().get(stream, {})
6565
expected_primary_keys = self.expected_primary_keys().get(stream, set())
6666

67+
extra_automatic_keys = {"order"} if stream == "order_refunds" else set()
68+
6769
# collect records
6870
messages = synced_records.get(stream)
6971

@@ -79,7 +81,7 @@ def automatic_test(self, conn_id, testable_streams):
7981
# verify that only the automatic fields are sent to the target
8082
self.assertEqual(
8183
actual_fields_by_stream.get(stream, set()),
82-
expected_primary_keys |
84+
expected_primary_keys | extra_automatic_keys |
8385
self.expected_replication_keys().get(stream, set()),
8486
msg="The fields sent to the target are not the automatic fields"
8587
)

tests/test_discovery.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,10 @@ def test_run(self):
129129
expected_primary_keys = self.expected_primary_keys()[stream]
130130
expected_replication_keys = self.expected_replication_keys()[stream]
131131
expected_automatic_fields = expected_primary_keys | expected_replication_keys
132+
133+
# In the order_refunds stream, order field is explicitly marked as automatic
134+
if stream == "order_refunds":
135+
expected_automatic_fields = expected_automatic_fields | {"order"}
132136

133137
# verify that primary, replication and foreign keys
134138
# are given the inclusion of automatic in annotated schema.

0 commit comments

Comments
 (0)