Skip to content

Commit 173c3c7

Browse files
committed
source-salesforce-native: use ValidationContext class instead of dict
While the previous dict validation context approach worked and still works with the IncrementalCSVProcessor typing changes in the previous commit, I'm updating the connector to use a more structured ValidationContext class so there's a clear example of the (currently) preferred way to use Pydantic's validation contexts with the IncrementalCSVProcessor.
1 parent b0dbbce commit 173c3c7

File tree

3 files changed

+17
-7
lines changed

3 files changed

+17
-7
lines changed

source-salesforce-native/source_salesforce_native/bulk_job_manager.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
BulkJobCheckStatusResponse,
1616
SalesforceDataSource,
1717
SalesforceRecord,
18+
ValidationContext,
1819
)
1920

2021
INITIAL_SLEEP = 0.2
@@ -35,6 +36,8 @@
3536
encoding='utf-8'
3637
)
3738

39+
BULK_VALIDATION_CONTEXT = ValidationContext(SalesforceDataSource.BULK_API)
40+
3841

3942
class BulkJobManager:
4043
def __init__(self, http: HTTPSession, log: Logger, instance_url: str):
@@ -103,12 +106,11 @@ async def _fetch_results(self, job_id: str, model_cls: type[SalesforceRecord]) -
103106
expected = int(count)
104107
received = 0
105108

106-
bulk_context = {'data_source': SalesforceDataSource.BULK_API}
107109
processor = IncrementalCSVProcessor(
108110
body(),
109111
model_cls,
110112
CSV_CONFIG,
111-
validation_context=bulk_context,
113+
validation_context=BULK_VALIDATION_CONTEXT,
112114
)
113115
async for record in processor:
114116
yield record

source-salesforce-native/source_salesforce_native/models.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,11 @@ class SalesforceDataSource(StrEnum):
332332
BULK_API = "bulk_api"
333333

334334

335+
class ValidationContext:
336+
def __init__(self, data_source: SalesforceDataSource):
337+
self.data_source = data_source
338+
339+
335340
class SalesforceRecord(BaseDocument, extra="allow"):
336341
field_details: ClassVar[FieldDetailsDict]
337342

@@ -353,10 +358,10 @@ def _transform_fields(cls, values: dict[str, Any], info: ValidationInfo) -> dict
353358
"field_details must be set on the SalesforceRecord subclass before validation."
354359
)
355360

356-
context = info.context or {}
357-
data_source: SalesforceDataSource | None = context.get('data_source', None)
358-
if data_source is None:
359-
raise ValueError("data_source must be provided in validation context")
361+
if not info.context or not isinstance(info.context, ValidationContext):
362+
raise RuntimeError(f"Validation context must be of type ValidationContext: {info.context}")
363+
364+
data_source = info.context.data_source
360365

361366
transformed: dict[str, Any] = {}
362367

source-salesforce-native/source_salesforce_native/rest_query_manager.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
QueryResponse,
1111
SalesforceDataSource,
1212
SalesforceRecord,
13+
ValidationContext,
1314
)
1415

1516

@@ -19,6 +20,8 @@
1920
MAX_URI_LENGTH = 16_384
2021
MAX_FIELDS_LENGTH = MAX_URI_LENGTH - 2_500
2122

23+
REST_VALIDATION_CONTEXT = ValidationContext(SalesforceDataSource.REST_API)
24+
2225
class Query:
2326
def __init__(self, http: HTTPSession, log: Logger, base_url: str, query: str):
2427
self.http = http
@@ -177,7 +180,7 @@ async def execute(
177180
# current state of the record in Saleforce.
178181
if str_to_dt(record[cursor_field]) <= end:
179182
yield model_cls.model_validate(
180-
record, context={'data_source': SalesforceDataSource.REST_API}
183+
record, context=REST_VALIDATION_CONTEXT
181184
)
182185

183186
# Delete completed records to avoid keeping them in memory.

0 commit comments

Comments
 (0)