source-apple-app-store: finalize implementation#3216
Conversation
Introduces a new utility that leverages the built-in `zlib` library and providies the ability to stream decompressed chunks from a GZIP file without loading the entire archive into memory.
Report instance segments are signed URLs with URL parameters set for authentication. Setting authorization headers on top of this causes HTTP 400 errors. This update ensures we do not use the JWT token in this request. Additionally, the response of the request is a Gzip encoded file with the `Content-Type` header set to `application/gzip` which cannot be automatically decoded by `aiohttp`. The `GunzipStream` is set up to handle the decompression instead.
…nd use detailed analytics reports Uses filename and row_number for the document keys for Analytics Reports. This is shown to be reliable compared to using columns from the analytics report files themselves. This also updates the models to use the "detailed" reports, which offer more information over the "standard".
67003ec to
590e602
Compare
Alex-Bair
left a comment
There was a problem hiding this comment.
LGTM! % a question around backfill completion when there's no ongoing report. I'm hazy on the various delays Apple can have in creating various reports, so I may be missing a reason for the backfill to finish early if no ongoing report exists.
| params: dict[str, Any] | None = None, | ||
| json: dict[str, Any] | None = None, | ||
| form: dict[str, Any] | None = None, | ||
| _with_token: bool = True, # Unstable internal API. |
There was a problem hiding this comment.
nit: Since _with_token is now part of the public interface, could the leading underscore be removed so it's just with_token?
| if not chunk: | ||
| continue |
There was a problem hiding this comment.
nit: When would chunk be falsy and cause us to hit continue? From the GunzipStream implementation, it looks like it always yields a chunk of some kind.
|
|
||
| @model_validator(mode="after") | ||
| def extract_filename_from_url(self): | ||
| from urllib.parse import urlparse |
There was a problem hiding this comment.
nit: What was the motivation to import urlparse within the function rather than at the top of the file? IMO it's easier to track imports when they're grouped together at the top.
| import csv | ||
| import os |
There was a problem hiding this comment.
nit: It looks like the csv and os imports are unused in this file.
|
|
||
| app_id: str | ||
| record_date: date = Field(..., alias="Date") | ||
| filename: str = Field(default="") |
There was a problem hiding this comment.
nit: Does filename need to have a default value in the model? The _add_row_metadata before model validator looks like it always adds filename before validation and the default value is never used.
| if not ongoing_report_exists: | ||
| log.warning( | ||
| f"Skipping backfill for {model.report_name} since no ONGOING report request exists. " | ||
| "Backfill requires an existing ONGOING report request to ensure proper data continuity.", | ||
| { | ||
| "app_id": app_id, | ||
| "report_name": model.report_name, | ||
| }, | ||
| ) | ||
| return |
There was a problem hiding this comment.
If there's not an ongoing report, does this mean the backfill completes early/doesn't capture any data? From the code, I think that's what's happening. Completing the backfill without capturing any data sounds like it'd be difficult to determine whether a completed backfill means "we got all historical data" vs. "we skipped the backfill & someone would need to re-backfill later to get historical data". Instead, should the connector wait somehow for an ongoing report to exist, then capture data for the backfill?
54a5faa to
d2b4cd5
Compare
…nd processing strategy Minor refactors to use the updated models, fields and report processing flow. Instead of a complicated paging strategy that waits X amount of days based on some completeness lag, the reports are processed as long as there is a report with a processing data <= `cutoff`.
d2b4cd5 to
e8927c4
Compare
Description:
This PR finalizes the implementation of the Apple App store capture connector after seeing it used and finding out more about the quirks in Apple's API and asynchronous Analytics Report endpoints.
Introduces a
GunzipStreamclass in the CDK for streaming uncompressed bytes from a Gzip encoded file returned from APIs with theContent-Typeheader set toapplication/gzip. This is not automatically decompressed byaiohttpand requires manual decompression, which this class will do via the built inzlibPython library.Additionally, this PR simplifies the analytics report incremental streams page and log cursors to only change when we process a report. I.e., We assume (and have observed this behavior thus far) the analytics reports are processed and provided via the API in chronological / monotonically increasing order. Upon seeing a report with a
processingDatewithin our window we are backfilling or incrementally capturing data we process that report and then yield that as the next cursor. Backfill will stop when the last processed date is equal to the cutoff.Lastly, the collection key for analytics report documents are selected to be the filename path provided by Apple in the AWS-Signed URL when finding report instances to process. This is combined with a synthetic
row_numberas an offset for that particular file. For example:{...., "filename":"reports/<app_id>/discovery_and_engagement_detail/daily/snapshot/<date>/<some_id>/<some_unique_filename>.csv.gz","row_number":42936}. This ensures we have a unique and distinguishable collection key. Note: when working with the data it was observed that certain fields can benullwhen they are expected to be valid string values that were required for a proper composite key made up of the file's data. Because of that reason, I have selected to use the filename/row_number as stated.Workflow steps:
(How does one use this feature, and how has it changed)
Documentation links affected:
New documentation: estuary/flow#2364
Notes for reviewers:
(anything that might help someone review this PR)
This change is