Skip to content

Commit 94571b5

Browse files
committed
source-dynamics-365-finance-and-operations: remove Pydanitic validation for documents
Pydantic valiation is arguably not necessary for this connector. It primarily processes CSVs and performs very light transformation (convert empty strings to `None`, convert boolean fields to actual boolean values, add in the `_meta.op` field). Yielding validated Pydantic model instances also causes the CDK to serialize documents with model_dump_json(), performing some additional validation. This connector has high CPU usage, and removing some Pydantic validation should help speed up the connector. By performing the light transformation itself and yielding raw dicts, the connector uses the CDK's faster `orjson` serialization pathway when capturing documents, avoiding overhead from Pydantic validation and serialization. This ends up reducing steady state CPU usage from ~80% to ~35% when capturing a single data-heavy binding.
1 parent 03a6838 commit 94571b5

File tree

4 files changed

+140
-38
lines changed

4 files changed

+140
-38
lines changed

source-dynamics-365-finance-and-operations/source_dynamics_365_finance_and_operations/adls_gen2_client.py

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from datetime import datetime, timezone
22
from logging import Logger
3-
from typing import AsyncGenerator, Any, Optional, TypeVar
3+
from typing import AsyncGenerator, Any, Optional
44

55
from pydantic import BaseModel, Field, computed_field
66

@@ -11,8 +11,6 @@
1111

1212
NEXT_PAGE_HEADER = "x-ms-continuation"
1313

14-
_CSVRow = TypeVar('_CSVRow', bound=BaseModel)
15-
1614

1715
class ADLSFilesystemMetadata(BaseModel):
1816
"""Represents metadata for a single filesystem in Azure Data Lake Storage Gen2."""
@@ -119,11 +117,16 @@ async def read_file(
119117
return await self.http.request(self.log, url)
120118

121119
async def stream_csv(
122-
self,
120+
self,
123121
path: str,
124-
model: type[_CSVRow],
125122
field_names: list[str],
126-
) -> AsyncGenerator[_CSVRow, None]:
123+
) -> AsyncGenerator[dict[str, Any], None]:
124+
"""
125+
Stream and parse a CSV file, yielding dict rows.
126+
127+
Transformations applied:
128+
- Empty strings converted to None
129+
"""
127130
url_without_sas = f"{self.base_url}/{self.filesystem}/{path}"
128131

129132
self.log.debug(f"Streaming CSV contents from /{path}.", {
@@ -134,11 +137,10 @@ async def stream_csv(
134137

135138
_, body = await self.http.request_stream(self.log, url)
136139

137-
processor = IncrementalCSVProcessor(
138-
body(),
139-
model,
140-
fieldnames=field_names
141-
)
140+
async for row in IncrementalCSVProcessor(body(), fieldnames=field_names):
141+
# Convert empty strings to None
142+
for key, value in row.items():
143+
if value == "":
144+
row[key] = None
142145

143-
async for item in processor:
144-
yield item
146+
yield row

source-dynamics-365-finance-and-operations/source_dynamics_365_finance_and_operations/api.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import asyncio
22
from datetime import datetime
33
from logging import Logger
4-
from typing import AsyncGenerator
4+
from typing import AsyncGenerator, cast
55

66
# Some functions that send a request to Azure for listing files or reading
77
# metadata files are wrapped with the alru_cache decorator. alru_cache
@@ -141,7 +141,7 @@ async def read_csvs_in_folder(
141141
folder: str,
142142
table_name: str,
143143
client: ADLSGen2Client,
144-
) -> AsyncGenerator[BaseTable, None]:
144+
) -> AsyncGenerator[dict, None]:
145145
folder_contents = await get_folder_contents_for_table(folder, table_name, client)
146146

147147
csvs: list[ADLSPathMetadata] = []
@@ -164,16 +164,36 @@ async def read_csvs_in_folder(
164164
csvs.sort(key=lambda c: c.last_modified_datetime)
165165

166166
for csv in csvs:
167-
async for row in client.stream_csv(csv.name, table_model, table_model.field_names):
168-
yield row
167+
async for row in client.stream_csv(csv.name, table_model.field_names):
168+
yield transform_row(row, table_model.boolean_fields)
169+
170+
171+
def transform_row(row: dict[str, str], boolean_fields: frozenset[str]) -> dict[str, str | bool | dict[str, str]]:
172+
"""
173+
Apply Dynamics 365-specific transformations to a CSV row.
174+
175+
Transformations:
176+
- Convert boolean fields from "True"/"False"/empty strings to actual booleans
177+
- Add _meta field with operation type based on IsDelete field
178+
(IsDelete is "True" for deletions, "" otherwise)
179+
"""
180+
result = cast(dict[str, str | bool | dict[str, str]], row)
181+
182+
for field_name in boolean_fields:
183+
value = row.get(field_name)
184+
result[field_name] = value.lower() == "true" if value else False
185+
186+
result["_meta"] = {"op": "d" if result.get("IsDelete") else "u"}
187+
188+
return result
169189

170190

171191
async def fetch_changes(
172192
client: ADLSGen2Client,
173193
table_name: str,
174194
log: Logger,
175195
log_cursor: LogCursor,
176-
) -> AsyncGenerator[BaseTable | LogCursor, None]:
196+
) -> AsyncGenerator[dict | LogCursor, None]:
177197
assert isinstance(log_cursor, datetime)
178198

179199
finalized_folders = await get_finalized_timestamp_folders(client)

source-dynamics-365-finance-and-operations/source_dynamics_365_finance_and_operations/models.py

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from enum import StrEnum
21
from typing import ClassVar, Literal, Annotated
32

43
from estuary_cdk.capture.common import (
@@ -8,7 +7,7 @@
87
)
98
from estuary_cdk.incremental_csv_processor import BaseCSVRow
109

11-
from pydantic import BaseModel, Field, model_validator, BeforeValidator
10+
from pydantic import BaseModel, Field, BeforeValidator
1211

1312

1413
class AzureSASToken(BaseModel):
@@ -60,6 +59,11 @@ class Attribute(BaseModel, extra="allow"):
6059

6160

6261
class BaseTable(BaseCSVRow, extra="allow"):
62+
"""
63+
Used for schema generation and table metadata. Not used to validate actual
64+
documents - we yield raw dicts to avoid Pydantic's serialization/validation
65+
overhead that's not necessary in this connector.
66+
"""
6367
name: ClassVar[str]
6468
field_names: ClassVar[list[str]]
6569
field_types: ClassVar[dict[str, str]]
@@ -70,24 +74,6 @@ class BaseTable(BaseCSVRow, extra="allow"):
7074
Id: str
7175
IsDelete: bool | None
7276

73-
@model_validator(mode='before')
74-
@classmethod
75-
def convert_boolean_fields(cls, values: dict) -> dict:
76-
for field_name in cls.boolean_fields:
77-
value = values.get(field_name)
78-
if value is not None:
79-
assert isinstance(value, str)
80-
values[field_name] = value.lower() == 'true'
81-
return values
82-
83-
@model_validator(mode='after')
84-
def set_meta_op(self) -> 'BaseTable':
85-
if self.IsDelete:
86-
self.meta_ = BaseTable.Meta(op='d')
87-
else:
88-
self.meta_ = BaseTable.Meta(op='u')
89-
return self
90-
9177

9278
def model_from_entity(entity: ModelDotJson.Entity) -> type[BaseTable]:
9379
field_names = [attr.name for attr in entity.attributes]
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
from source_dynamics_365_finance_and_operations.api import transform_row
2+
3+
4+
class TestTransformRow:
5+
"""Tests for the transform_row helper function."""
6+
7+
def test_converts_boolean_field_true(self):
8+
"""Boolean field with 'true' string should become True."""
9+
row = {"IsActive": "true", "Name": "Test"}
10+
boolean_fields = frozenset({"IsActive"})
11+
12+
result = transform_row(row, boolean_fields)
13+
14+
assert result["IsActive"] is True
15+
assert result["Name"] == "Test"
16+
17+
def test_converts_boolean_field_false(self):
18+
"""Boolean field with 'false' string should become False."""
19+
row = {"IsActive": "false", "Name": "Test"}
20+
boolean_fields = frozenset({"IsActive"})
21+
22+
result = transform_row(row, boolean_fields)
23+
24+
assert result["IsActive"] is False
25+
26+
def test_converts_boolean_field_case_insensitive(self):
27+
"""Boolean conversion should be case-insensitive."""
28+
row = {"IsActive": "TRUE", "IsEnabled": "False", "IsValid": "TrUe"}
29+
boolean_fields = frozenset({"IsActive", "IsEnabled", "IsValid"})
30+
31+
result = transform_row(row, boolean_fields)
32+
33+
assert result["IsActive"] is True
34+
assert result["IsEnabled"] is False
35+
assert result["IsValid"] is True
36+
37+
def test_boolean_field_none_becomes_false(self):
38+
"""Boolean field with None value should become False."""
39+
row = {"IsActive": None, "Name": "Test"}
40+
boolean_fields = frozenset({"IsActive"})
41+
42+
result = transform_row(row, boolean_fields)
43+
44+
assert result["IsActive"] is False
45+
46+
def test_multiple_boolean_fields(self):
47+
"""Multiple boolean fields should all be converted."""
48+
row = {"IsActive": "true", "IsDeleted": "false", "IsEnabled": "true"}
49+
boolean_fields = frozenset({"IsActive", "IsDeleted", "IsEnabled"})
50+
51+
result = transform_row(row, boolean_fields)
52+
53+
assert result["IsActive"] is True
54+
assert result["IsDeleted"] is False
55+
assert result["IsEnabled"] is True
56+
57+
def test_meta_op_delete_when_isdelete_true(self):
58+
"""_meta.op should be 'd' when IsDelete is True."""
59+
row = {"IsDelete": "True", "Name": "Test"}
60+
boolean_fields = frozenset()
61+
62+
result = transform_row(row, boolean_fields)
63+
64+
assert result["_meta"] == {"op": "d"}
65+
66+
def test_meta_op_update_when_isdelete_empty(self):
67+
"""_meta.op should be 'u' when IsDelete is empty string."""
68+
row = {"IsDelete": "", "Name": "Test"}
69+
boolean_fields = frozenset()
70+
71+
result = transform_row(row, boolean_fields)
72+
73+
assert result["_meta"] == {"op": "u"}
74+
75+
def test_meta_op_update_when_isdelete_false(self):
76+
"""_meta.op should be 'u' when IsDelete is 'False' and converted to bool."""
77+
row = {"IsDelete": "False", "Name": "Test"}
78+
boolean_fields = frozenset({"IsDelete"})
79+
80+
result = transform_row(row, boolean_fields)
81+
82+
assert result["IsDelete"] is False
83+
assert result["_meta"] == {"op": "u"}
84+
85+
def test_mutates_row_in_place(self):
86+
"""transform_row should mutate the row in place and return it."""
87+
row = {"IsActive": "true"}
88+
boolean_fields = frozenset({"IsActive"})
89+
90+
result = transform_row(row, boolean_fields)
91+
92+
assert result is row
93+
assert row["IsActive"] is True
94+
assert "_meta" in row

0 commit comments

Comments
 (0)