Skip to content

Commit 4385af7

Browse files
committed
source-stripe-native: reduce memory usage for connected accounts
When a user's Stripe account has multiple thousand connected accounts, memory usage limits how many accounts from which we can concurrently capture (see 85aab13). This means any memory efficiency improvements can improve connector performance and allow us to increase the number of concurrent workers/subtasks allowed in the `PriorityQueueConfig`. This commit addresses some low-hanging fruit that have a noteable impact on the connector's memory usage. Instead of each binding creating its own `initial_state` and `all_account_ids` list, these can be created once and passed into the resource creation functions. Testing locally with ~7,000 connected accounts, I've observed a lower memory usage when all bindings are backfilling (~80% down to ~65%). I also want to see what the memory usage looks like when all bindings are backfilled & we're rapidly cycling through accounts to catch them up incrementally. I'm not confident my attempts to replicate that locally adequately reflect actual production captures, so I'd like to merge these memory improvements, observe memory usage in production, then modify settings in `priority_capture.py` to take advantage of the freshly gained memory efficiency.
1 parent 38e43d7 commit 4385af7

File tree

1 file changed

+32
-28
lines changed

1 file changed

+32
-28
lines changed

source-stripe-native/source_stripe_native/resources.py

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from copy import deepcopy
12
from datetime import datetime, UTC, timedelta
23
from logging import Logger
34
import functools
@@ -127,8 +128,8 @@ def _reconcile_connector_state(
127128
task.log.info(
128129
f"Initializing new subtask state for account id {account_id}."
129130
)
130-
state.inc[account_id] = initial_state.inc[account_id]
131-
state.backfill[account_id] = initial_state.backfill[account_id]
131+
state.inc[account_id] = deepcopy(initial_state.inc[account_id])
132+
state.backfill[account_id] = deepcopy(initial_state.backfill[account_id])
132133
should_checkpoint = True
133134
elif not inc_state_exists and backfill_state_exists:
134135
# Note: This case is to fix a legacy issue where the incremental state was not initialized
@@ -137,8 +138,8 @@ def _reconcile_connector_state(
137138
task.log.info(
138139
f"Backfilling subtask for account id {account_id} due to missing incremental state."
139140
)
140-
state.inc[account_id] = initial_state.inc[account_id]
141-
state.backfill[account_id] = initial_state.backfill[account_id]
141+
state.inc[account_id] = deepcopy(initial_state.inc[account_id])
142+
state.backfill[account_id] = deepcopy(initial_state.backfill[account_id])
142143
should_checkpoint = True
143144

144145
if should_checkpoint:
@@ -179,6 +180,11 @@ async def all_resources(
179180
}
180181
)
181182

183+
all_account_ids = [platform_account_id, *connected_account_ids]
184+
initial_state = _create_initial_state(
185+
all_account_ids if connected_account_ids else platform_account_id
186+
)
187+
182188
for element in STREAMS:
183189
is_accessible_stream = True
184190
base_stream = element.get("stream")
@@ -197,6 +203,8 @@ async def all_resources(
197203
config.start_date,
198204
platform_account_id,
199205
connected_account_ids,
206+
all_account_ids,
207+
initial_state,
200208
)
201209
if hasattr(base_stream, "EVENT_TYPES")
202210
else no_events_object(
@@ -205,6 +213,8 @@ async def all_resources(
205213
config.start_date,
206214
platform_account_id,
207215
connected_account_ids,
216+
all_account_ids,
217+
initial_state,
208218
)
209219
)
210220
all_streams.append(resource)
@@ -229,6 +239,8 @@ async def all_resources(
229239
config.start_date,
230240
platform_account_id,
231241
connected_account_ids,
242+
all_account_ids,
243+
initial_state,
232244
)
233245
)
234246
case _ if child_stream.NAME in SPLIT_CHILD_STREAM_NAMES:
@@ -240,6 +252,8 @@ async def all_resources(
240252
config.start_date,
241253
platform_account_id,
242254
connected_account_ids,
255+
all_account_ids,
256+
initial_state,
243257
)
244258
)
245259
case _:
@@ -251,6 +265,8 @@ async def all_resources(
251265
config.start_date,
252266
platform_account_id,
253267
connected_account_ids,
268+
all_account_ids,
269+
initial_state,
254270
)
255271
)
256272

@@ -265,6 +281,8 @@ async def all_resources(
265281
config.start_date,
266282
platform_account_id,
267283
connected_account_ids,
284+
all_account_ids,
285+
initial_state,
268286
)
269287
all_streams.append(resource)
270288

@@ -300,14 +318,12 @@ def base_object(
300318
start_date: datetime,
301319
platform_account_id: str,
302320
connected_account_ids: list[str],
321+
all_account_ids: list[str],
322+
initial_state: ResourceState,
303323
) -> Resource:
304324
"""Base Object handles the default case from source-stripe-native
305325
It requires a single, parent stream with a valid Event API Type
306326
"""
307-
all_account_ids = [platform_account_id, *connected_account_ids]
308-
initial_state = _create_initial_state(
309-
all_account_ids if connected_account_ids else platform_account_id
310-
)
311327

312328
def open(
313329
binding: CaptureBinding[ResourceConfig],
@@ -392,17 +408,14 @@ def child_object(
392408
start_date: datetime,
393409
platform_account_id: str,
394410
connected_account_ids: list[str],
411+
all_account_ids: list[str],
412+
initial_state: ResourceState,
395413
) -> Resource:
396414
"""Child Object handles the default child case from source-stripe-native
397415
It requires both the parent and child stream, with the parent stream having
398416
a valid Event API Type
399417
"""
400418

401-
all_account_ids = [platform_account_id, *connected_account_ids]
402-
initial_state = _create_initial_state(
403-
all_account_ids if connected_account_ids else platform_account_id
404-
)
405-
406419
def open(
407420
binding: CaptureBinding[ResourceConfig],
408421
binding_index: int,
@@ -493,18 +506,15 @@ def split_child_object(
493506
start_date: datetime,
494507
platform_account_id: str,
495508
connected_account_ids: list[str],
509+
all_account_ids: list[str],
510+
initial_state: ResourceState,
496511
) -> Resource:
497512
"""
498513
split_child_object handles the case where a stream is a child stream when backfilling
499514
but incrementally replicates based off events that contain the child stream resource directly
500515
in the API response. Meaning, the stream behaves like a non-chid stream incrementally.
501516
"""
502517

503-
all_account_ids = [platform_account_id, *connected_account_ids]
504-
initial_state = _create_initial_state(
505-
all_account_ids if connected_account_ids else platform_account_id
506-
)
507-
508518
def open(
509519
binding: CaptureBinding[ResourceConfig],
510520
binding_index: int,
@@ -593,17 +603,14 @@ def usage_records(
593603
start_date: datetime,
594604
platform_account_id: str,
595605
connected_account_ids: list[str],
606+
all_account_ids: list[str],
607+
initial_state: ResourceState,
596608
) -> Resource:
597609
"""Usage Records handles a specific stream (UsageRecords).
598610
This is required since Usage Records is a child stream from SubscriptionItem
599611
and requires special processing.
600612
"""
601613

602-
all_account_ids = [platform_account_id, *connected_account_ids]
603-
initial_state = _create_initial_state(
604-
all_account_ids if connected_account_ids else platform_account_id
605-
)
606-
607614
def open(
608615
binding: CaptureBinding[ResourceConfig],
609616
binding_index: int,
@@ -692,18 +699,15 @@ def no_events_object(
692699
start_date: datetime,
693700
platform_account_id: str,
694701
connected_account_ids: list[str],
702+
all_account_ids: list[str],
703+
initial_state: ResourceState,
695704
) -> Resource:
696705
"""No Events Object handles a edge-case from source-stripe-native,
697706
where the given parent stream does not contain a valid Events API type.
698707
It requires a single, parent stream with a valid list all API endpoint.
699708
It works very similar to the base object, but without the use of the Events APi.
700709
"""
701710

702-
all_account_ids = [platform_account_id, *connected_account_ids]
703-
initial_state = _create_initial_state(
704-
all_account_ids if connected_account_ids else platform_account_id
705-
)
706-
707711
def open(
708712
binding: CaptureBinding[ResourceConfig],
709713
binding_index: int,

0 commit comments

Comments
 (0)