Skip to content

Commit 0570ddf

Browse files
committed
source-google-play: complete initial connector development
This commit finishes the initial development for `source-google-play`. Some notable decisions made include: - Title casing all field names. The CSV column headers are not consistently named across files. Although I had hoped to avoid transformations as much as possible, ensuring fields are consistently named makes downstream processing easier for users plus it allows us to reuse more code in the connector (ex: primary keys are the same, model field definitions are simpler, etc.). - The `_overview` suffix is used for statistics files that aren't split on dimensions, while there is no suffix for reviews that aren't split on dimensions. There _are_ other files in the bucket containing data split by certain dimensions, and it's very easy to add another binding to capture these by overriding the `suffix` class variable for a given resource. Those additional bindings aren't needed right now, but they'll be easy to add in the future if someone asks for them later. - Reviews have an "updated_at" type of field that appears to always be present. This means that instead of yielding every row of an updated file, we can instead only yield rows that have been updated since the previous sweep. - The "Row Number" field doesn't need to be part of any `Statistics` primary key since the "Date" and "Package Name" uniquely identify a row already. No such combination of unique identifiers exist for "Reviews", so we still add "Row Number" into those documents.
1 parent 69861eb commit 0570ddf

File tree

4 files changed

+102
-84
lines changed

4 files changed

+102
-84
lines changed

source-google-play/source_google_play/api.py

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from .models import (
88
GooglePlayRow,
9+
Reviews,
910
)
1011

1112
from .gcs import GCSClient, GCSFileMetadata
@@ -20,14 +21,6 @@ async def fetch_resources(
2021
) -> AsyncGenerator[GooglePlayRow | LogCursor, None]:
2122
assert isinstance(log_cursor, datetime)
2223

23-
# The code below this return is a best-effort implementation based on the Google Play
24-
# documentation about how the GCS bucket organizes data. It boils down to:
25-
# - Find all files updated on or after the log_cursor.
26-
# - Yield all rows from those files.
27-
#
28-
# Once we have valid credentials, development can continue and we can iterate on the code below.
29-
return
30-
3124
files: list[GCSFileMetadata] = []
3225
async for file in gcs_client.list_files(prefix=model.prefix, globPattern=model.get_glob_pattern()):
3326
if file.updated >= log_cursor:
@@ -39,7 +32,15 @@ async def fetch_resources(
3932
model,
4033
model.validation_context_model(filename=file.name),
4134
):
42-
yield row
35+
# Reviews have a "Review Last Update Date And Time" field that we can use to
36+
# only yield rows that have been updated since the last sweep.
37+
if isinstance(row, Reviews):
38+
if row.updated_at >= log_cursor:
39+
yield row
40+
# All other resources do not have an "updated_at" type field, so we have to
41+
# yield all rows for every file that's been updated since the last sweep.
42+
else:
43+
yield row
4344

4445
if len(files) > 0:
4546
latest_file = max(files, key=lambda f: f.updated)
@@ -60,15 +61,6 @@ async def backfill_resources(
6061
if cursor_month >= cutoff:
6162
return
6263

63-
# The code below this return is a best-effort implementation based on the Google Play
64-
# documentation about how the GCS bucket organizes data. It boils down to:
65-
# - Find all files containing data for the same month as the page cursor.
66-
# - Yield all rows from those files.
67-
# - Stop when the page cursor reaches the cutoff.
68-
#
69-
# Once we have valid credentials, development can continue and we can iterate on the code below.
70-
return
71-
7264
files: list[GCSFileMetadata] = []
7365
async for file in gcs_client.list_files(prefix=model.prefix, globPattern=model.get_glob_pattern(cursor_month)):
7466
files.append(file)

source-google-play/source_google_play/gcs.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,13 @@
66
from pydantic import BaseModel, Field
77

88
from estuary_cdk.http import HTTPSession
9-
from estuary_cdk.incremental_csv_processor import IncrementalCSVProcessor
9+
from estuary_cdk.incremental_csv_processor import CSVConfig, IncrementalCSVProcessor
1010

1111

12+
CSV_CONFIG = CSVConfig(
13+
encoding="utf-16",
14+
)
15+
1216
_CSVRow = TypeVar('_CSVRow', bound=BaseModel)
1317

1418

@@ -128,6 +132,7 @@ async def stream_csv(
128132
processor = IncrementalCSVProcessor(
129133
body(),
130134
model,
135+
config=CSV_CONFIG,
131136
validation_context=validation_context
132137
)
133138

source-google-play/source_google_play/models.py

Lines changed: 52 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,24 @@
33
import re
44

55
from estuary_cdk.capture.common import (
6-
BaseDocument,
76
ConnectorState as GenericConnectorState,
8-
LogCursor,
9-
Logger,
107
ResourceConfig,
118
ResourceState,
129
)
1310
from estuary_cdk.flow import (
1411
GoogleServiceAccount,
1512
GoogleServiceAccountSpec,
1613
)
17-
14+
from estuary_cdk.incremental_csv_processor import BaseCSVRow
1815

1916
from pydantic import AwareDatetime, BaseModel, Field, model_validator, ValidationInfo
2017

2118

19+
PACKAGE_NAME_FIELD = "Package Name"
20+
ROW_NUMBER_FIELD = "Row Number"
21+
MONTH_FIELD = "Month"
22+
YEAR_FIELD = "Year"
23+
2224
EPOCH = datetime(1970, 1, 1, tzinfo=UTC)
2325

2426
GOOGLE_SPEC = GoogleServiceAccountSpec(
@@ -60,9 +62,10 @@ def increment(self):
6062
self.count += 1
6163

6264

63-
class GooglePlayRow(BaseDocument, extra="allow"):
65+
class GooglePlayRow(BaseCSVRow, extra="allow"):
6466
name: ClassVar[str]
6567
prefix: ClassVar[str]
68+
suffix: ClassVar[str | None] = None
6669
primary_keys: ClassVar[list[str]]
6770
validation_context_model: ClassVar[type[BaseValidationContext]] = BaseValidationContext
6871

@@ -74,27 +77,36 @@ def get_glob_pattern(cls, date: datetime | None = None) -> str:
7477
if date:
7578
year_month_pattern = f"{date.year:04d}{date.month:02d}"
7679

77-
return f"**_{year_month_pattern}.csv"
80+
pattern = f"**_{year_month_pattern}"
81+
82+
if cls.suffix:
83+
pattern += f"_{cls.suffix}"
7884

79-
package_name: str
80-
row_number: int
85+
pattern += ".csv"
8186

87+
return pattern
88+
89+
package_name: str = Field(alias=PACKAGE_NAME_FIELD)
90+
91+
# The column naming convention across CSVs is not inherently consistent. ex: Sometimes a column
92+
# is named "Package name" and other times it's "Package Name". We normalize the column names to
93+
# be title case, which is the predominant casing convention for these columns before we do
94+
# perform any transformation.
8295
@model_validator(mode="before")
8396
@classmethod
84-
def _add_row_number(cls, data: dict[str, Any], info: ValidationInfo) -> dict[str, Any]:
97+
def _normalize_field_names(cls, data: dict[str, Any], info: ValidationInfo) -> dict[str, Any]:
98+
normalized_data: dict[str, Any] = {}
99+
for key, value in data.items():
100+
normalized_key = key.title()
101+
normalized_data[normalized_key] = value
85102

86-
if not info.context or not isinstance(info.context, BaseValidationContext):
87-
raise RuntimeError(f"Validation context is not set or is not of type BaseValidationContext: {info.context}")
88-
89-
assert "row_number" not in data, "Row number should not be set before validation."
90-
data["row_number"] = info.context.count
91-
info.context.increment()
92-
return data
103+
return normalized_data
93104

94105

95106
class Statistics(GooglePlayRow):
96-
primary_keys: ClassVar[list[str]] = ["/date", "/package_name", "/row_number"]
97-
date: str
107+
suffix: ClassVar[str | None] = "overview"
108+
primary_keys: ClassVar[list[str]] = ["/Date", f"/{PACKAGE_NAME_FIELD}"]
109+
date: str = Field(alias="Date")
98110

99111

100112
class Crashes(Statistics):
@@ -110,9 +122,9 @@ class Installs(Statistics):
110122
class ReviewValidationContext(BaseValidationContext):
111123
def __init__(self, filename: str):
112124
super().__init__()
113-
self.year_month = self._extract_year_month(filename)
125+
self.year, self.month = self._extract_year_month(filename)
114126

115-
def _extract_year_month(self, filename: str) -> str:
127+
def _extract_year_month(self, filename: str) -> tuple[str, str]:
116128
"""
117129
Extract YYYYMM from review filenames in various formats:
118130
- /reviews/reviews_[package_name]_YYYYMM.csv
@@ -123,38 +135,47 @@ def _extract_year_month(self, filename: str) -> str:
123135
filename: The filename or path
124136
125137
Returns:
126-
The YYYYMM string.
138+
A tuple containing the year and month as strings.
127139
"""
128140
# Matches reviews_[anything]_YYYYMM[_optionalstuff].csv
129141
pattern = r'reviews_[^_]+_(\d{6})(?:_[^.]*)?\.csv$'
130142
match = re.search(pattern, filename)
131143

132144
assert match, f"Filename does not match expected pattern: {filename}"
133145

134-
return match.group(1)
146+
year_month = match.group(1) # YYYYMM
147+
year = year_month[:4]
148+
month = year_month[4:6]
149+
150+
return (year, month)
135151

136152

137-
# There _might_ be a "Review Last Update Date and Time" we could use to incrementally
138-
# capture updates within a specific file of Reviews. However, the documentation says it's optional
139-
# and we haven't see what this data actually looks like. Once we see real data, we can
140-
# evaluate whether or not the incremental replication strategy for Reviews can be improved.
141153
class Reviews(GooglePlayRow):
142154
name: ClassVar[str] = "reviews"
143155
prefix: ClassVar[str] = "reviews"
144-
primary_keys: ClassVar[list[str]] = ["/year_month", "/package_name", "/row_number"]
156+
primary_keys: ClassVar[list[str]] = [f"/{YEAR_FIELD}", f"/{MONTH_FIELD}", f"/{PACKAGE_NAME_FIELD}", f"/{ROW_NUMBER_FIELD}"]
145157
validation_context_model: ClassVar[type[BaseValidationContext]] = ReviewValidationContext
146158

147-
year_month: str
159+
row_number: int = Field(alias=ROW_NUMBER_FIELD)
160+
year: str = Field(alias=YEAR_FIELD)
161+
month: str = Field(alias=MONTH_FIELD)
162+
updated_at: AwareDatetime = Field(alias="Review Last Update Date And Time")
148163

149164
@model_validator(mode="before")
150165
@classmethod
151-
def _add_year_month(cls, data: dict[str, Any], info: ValidationInfo) -> dict[str, Any]:
166+
def _add_primary_key_components(cls, data: dict[str, Any], info: ValidationInfo) -> dict[str, Any]:
152167

153168
if not info.context or not isinstance(info.context, ReviewValidationContext):
154169
raise RuntimeError(f"Validation context is not set or is not of type ReviewValidationContext: {info.context}")
155170

156-
assert "year_month" not in data, "year_month should not be set before validation."
157-
data["year_month"] = info.context.year_month
171+
assert YEAR_FIELD not in data, f"{YEAR_FIELD} should not be set before validation."
172+
assert MONTH_FIELD not in data, f"{MONTH_FIELD} should not be set before validation."
173+
data[YEAR_FIELD] = info.context.year
174+
data[MONTH_FIELD] = info.context.month
175+
176+
assert ROW_NUMBER_FIELD not in data, f"{ROW_NUMBER_FIELD} should not be set before validation."
177+
data[ROW_NUMBER_FIELD] = info.context.count
178+
info.context.increment()
158179
return data
159180

160181

source-google-play/tests/snapshots/snapshots__discover__capture.stdout.json

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -41,32 +41,26 @@
4141
},
4242
"description": "Document metadata"
4343
},
44-
"package_name": {
44+
"Package Name": {
4545
"title": "Package Name",
4646
"type": "string"
4747
},
48-
"row_number": {
49-
"title": "Row Number",
50-
"type": "integer"
51-
},
52-
"date": {
48+
"Date": {
5349
"title": "Date",
5450
"type": "string"
5551
}
5652
},
5753
"required": [
58-
"package_name",
59-
"row_number",
60-
"date"
54+
"Package Name",
55+
"Date"
6156
],
6257
"title": "Crashes",
6358
"type": "object",
6459
"x-infer-schema": true
6560
},
6661
"key": [
67-
"/date",
68-
"/package_name",
69-
"/row_number"
62+
"/Date",
63+
"/Package Name"
7064
]
7165
},
7266
{
@@ -111,32 +105,26 @@
111105
},
112106
"description": "Document metadata"
113107
},
114-
"package_name": {
108+
"Package Name": {
115109
"title": "Package Name",
116110
"type": "string"
117111
},
118-
"row_number": {
119-
"title": "Row Number",
120-
"type": "integer"
121-
},
122-
"date": {
112+
"Date": {
123113
"title": "Date",
124114
"type": "string"
125115
}
126116
},
127117
"required": [
128-
"package_name",
129-
"row_number",
130-
"date"
118+
"Package Name",
119+
"Date"
131120
],
132121
"title": "Installs",
133122
"type": "object",
134123
"x-infer-schema": true
135124
},
136125
"key": [
137-
"/date",
138-
"/package_name",
139-
"/row_number"
126+
"/Date",
127+
"/Package Name"
140128
]
141129
},
142130
{
@@ -181,32 +169,44 @@
181169
},
182170
"description": "Document metadata"
183171
},
184-
"package_name": {
172+
"Package Name": {
185173
"title": "Package Name",
186174
"type": "string"
187175
},
188-
"row_number": {
176+
"Row Number": {
189177
"title": "Row Number",
190178
"type": "integer"
191179
},
192-
"year_month": {
193-
"title": "Year Month",
180+
"Year": {
181+
"title": "Year",
182+
"type": "string"
183+
},
184+
"Month": {
185+
"title": "Month",
186+
"type": "string"
187+
},
188+
"Review Last Update Date And Time": {
189+
"format": "date-time",
190+
"title": "Review Last Update Date And Time",
194191
"type": "string"
195192
}
196193
},
197194
"required": [
198-
"package_name",
199-
"row_number",
200-
"year_month"
195+
"Package Name",
196+
"Row Number",
197+
"Year",
198+
"Month",
199+
"Review Last Update Date And Time"
201200
],
202201
"title": "Reviews",
203202
"type": "object",
204203
"x-infer-schema": true
205204
},
206205
"key": [
207-
"/package_name",
208-
"/row_number",
209-
"/year_month"
206+
"/Month",
207+
"/Package Name",
208+
"/Row Number",
209+
"/Year"
210210
]
211211
}
212212
]

0 commit comments

Comments
 (0)