Skip to content

Commit 334301b

Browse files
tlmaloneyTom Maloneyclaudewesm
authored
Bulk update YNAB payees/merchants (#34)
For #31, implements a performance improvement for bulk merchant renames by updating YNAB payees once instead of updating each transaction individually. When renaming "Amazon.com/abc" to "Amazon" across 100 transactions, this reduces API calls from 100 to 1. Changes: - Add YNABClient.update_payee() and batch_update_merchant() methods - Enhance DataManager.commit_pending_edits() to detect and use batch updates - Falls back to individual transaction updates if batch fails - Works alongside other edit types (category, hide_from_reports) - Add comprehensive test coverage for batch optimization Notes: YNAB doesn't enforce name uniqueness for Payees, two different Payees with distinct ids can share the same name. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Tom Maloney <tom@supermaloney.com> Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: Wes McKinney <wes@posit.co>
1 parent 2b376db commit 334301b

File tree

7 files changed

+954
-48
lines changed

7 files changed

+954
-48
lines changed

moneyflow/backends/ynab.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,40 @@ async def get_all_merchants(self) -> List[str]:
153153
"""
154154
return self.client.get_all_merchants()
155155

156+
def batch_update_merchant(
157+
self, old_merchant_name: str, new_merchant_name: str
158+
) -> Dict[str, Any]:
159+
"""
160+
Batch update all transactions with a given merchant name (YNAB optimization).
161+
162+
This is a YNAB-specific optimization that updates the payee once instead
163+
of updating each transaction individually. This cascades to all transactions
164+
with that payee, making bulk renames 100x faster.
165+
166+
**Performance**:
167+
- Traditional: 100 transactions = 100 API calls
168+
- Optimized: 100 transactions = 1 API call
169+
170+
Args:
171+
old_merchant_name: Current merchant/payee name to rename
172+
new_merchant_name: New merchant/payee name
173+
174+
Returns:
175+
Dictionary with results (see YNABClient.batch_update_merchant for format)
176+
177+
Example:
178+
>>> backend = YNABBackend()
179+
>>> await backend.login(password=token)
180+
>>> result = backend.batch_update_merchant("Amazon.com/abc", "Amazon")
181+
>>> if result['success']:
182+
... print(f"Optimized: Updated payee {result['payee_id']}")
183+
184+
Note:
185+
This method is synchronous (not async) because the YNAB SDK is synchronous.
186+
Other backends may not support this optimization.
187+
"""
188+
return self.client.batch_update_merchant(old_merchant_name, new_merchant_name)
189+
156190
def get_currency_symbol(self) -> str:
157191
"""
158192
Get the currency symbol from YNAB budget settings.

moneyflow/data_manager.py

Lines changed: 148 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -902,9 +902,12 @@ async def commit_pending_edits(self, edits: List[Any]) -> Tuple[int, int]:
902902
"""
903903
Commit pending edits to backend API in parallel.
904904
905-
This method groups edits by transaction ID (in case multiple edits
906-
affect the same transaction) and sends update requests in parallel
907-
for maximum speed.
905+
This method intelligently optimizes commits based on backend capabilities:
906+
- For backends with batch_update_merchant (e.g., YNAB), bulk merchant
907+
renames are handled with a single API call per (old, new) pair instead
908+
of one call per transaction (100x performance improvement)
909+
- For other backends, or non-merchant edits, uses individual transaction
910+
updates in parallel for maximum speed
908911
909912
The method is resilient to partial failures - if some updates fail,
910913
others will still succeed. The caller receives counts for both.
@@ -934,48 +937,154 @@ async def commit_pending_edits(self, edits: List[Any]) -> Tuple[int, int]:
934937
logger.info("No edits to commit")
935938
return 0, 0
936939

937-
# Group edits by transaction ID
938-
edits_by_txn: Dict[str, Dict[str, Any]] = {}
939-
for edit in edits:
940-
txn_id = edit.transaction_id
941-
if txn_id not in edits_by_txn:
942-
edits_by_txn[txn_id] = {}
943-
944-
if edit.field == "merchant":
945-
edits_by_txn[txn_id]["merchant_name"] = edit.new_value
946-
elif edit.field == "category":
947-
edits_by_txn[txn_id]["category_id"] = edit.new_value
948-
elif edit.field == "hide_from_reports":
949-
edits_by_txn[txn_id]["hide_from_reports"] = edit.new_value
950-
951-
# Create update tasks
952-
tasks = []
953-
for txn_id, updates in edits_by_txn.items():
954-
tasks.append(self.mm.update_transaction(transaction_id=txn_id, **updates))
955-
956-
# Execute in parallel
957-
results = await asyncio.gather(*tasks, return_exceptions=True)
958-
959-
# Count successes and failures, and log errors
960940
success_count = 0
961941
failure_count = 0
962-
963-
# Check for auth errors that should trigger retry
964942
auth_errors = []
965943

966-
for i, result in enumerate(results):
967-
if isinstance(result, Exception):
968-
failure_count += 1
969-
logger.error(
970-
f"Transaction update {i + 1}/{len(results)} FAILED: {result}", exc_info=result
944+
# Check if backend supports batch merchant updates
945+
has_batch_update = hasattr(self.mm, "batch_update_merchant")
946+
947+
# Separate merchant edits from other edits
948+
merchant_edits = [e for e in edits if e.field == "merchant"]
949+
other_edits = [e for e in edits if e.field != "merchant"]
950+
951+
# OPTIMIZATION: Group merchant edits by (old_value, new_value) for batch updates
952+
if has_batch_update and merchant_edits:
953+
logger.info(
954+
f"Backend supports batch updates - optimizing {len(merchant_edits)} merchant edits"
955+
)
956+
957+
# Group merchant edits by (old_name, new_name)
958+
merchant_groups: Dict[Tuple[str, str], List[Any]] = {}
959+
for edit in merchant_edits:
960+
key = (edit.old_value, edit.new_value)
961+
if key not in merchant_groups:
962+
merchant_groups[key] = []
963+
merchant_groups[key].append(edit)
964+
965+
# Try batch update for each (old, new) pair
966+
successfully_batched_edits = []
967+
failed_batch_edits = []
968+
969+
# Track processed transaction IDs to prevent double-counting
970+
# Note: If the same transaction has multiple merchant edits (e.g., A→B then B→C),
971+
# they'll be in different batch groups. We can only batch one of them.
972+
processed_txn_ids = set()
973+
974+
for (old_name, new_name), group_edits in merchant_groups.items():
975+
# Filter out edits for transactions already processed in a different batch
976+
unprocessed_edits = [
977+
e for e in group_edits if e.transaction_id not in processed_txn_ids
978+
]
979+
980+
if not unprocessed_edits:
981+
# All edits in this group were already processed in a previous batch
982+
# Add them to failed list so they get individual processing with latest values
983+
failed_batch_edits.extend(group_edits)
984+
continue
985+
986+
group_edits = unprocessed_edits # Only batch the unprocessed ones
987+
group_txn_ids = {e.transaction_id for e in group_edits}
988+
logger.info(
989+
f"Attempting batch update: '{old_name}' -> '{new_name}' "
990+
f"({len(group_edits)} transactions)"
971991
)
972992

973-
# Check if it's a 401/auth error
974-
error_str = str(result).lower()
975-
if "401" in error_str or "unauthorized" in error_str:
976-
auth_errors.append(result)
977-
else:
978-
success_count += 1
993+
try:
994+
# Call batch_update_merchant in thread to avoid blocking event loop
995+
result = await asyncio.to_thread(
996+
self.mm.batch_update_merchant, # type: ignore[attr-defined]
997+
old_name,
998+
new_name,
999+
)
1000+
1001+
if result.get("success"):
1002+
# Batch update succeeded - mark edits as processed and count as successful
1003+
processed_txn_ids.update(group_txn_ids)
1004+
success_count += len(group_edits)
1005+
successfully_batched_edits.extend(group_edits)
1006+
logger.info(
1007+
f"✓ Batch update succeeded for '{old_name}' -> '{new_name}' "
1008+
f"({len(group_edits)} transactions updated via 1 API call)"
1009+
)
1010+
else:
1011+
# Batch update failed - mark as processed but add to fallback list
1012+
processed_txn_ids.update(group_txn_ids)
1013+
logger.warning(
1014+
f"Batch update failed for '{old_name}' -> '{new_name}': "
1015+
f"{result.get('message', 'Unknown error')}. "
1016+
f"Falling back to individual transaction updates."
1017+
)
1018+
failed_batch_edits.extend(group_edits)
1019+
1020+
except Exception as e:
1021+
# Exception during batch - mark as processed and add to fallback list
1022+
processed_txn_ids.update(group_txn_ids)
1023+
logger.warning(
1024+
f"Batch update exception for '{old_name}' -> '{new_name}': {e}. "
1025+
f"Falling back to individual transaction updates.",
1026+
exc_info=True,
1027+
)
1028+
failed_batch_edits.extend(group_edits)
1029+
1030+
# Add failed batch edits back to the list for individual processing
1031+
merchant_edits = failed_batch_edits
1032+
1033+
# Safety check: ensure no overlap between successful and failed batches
1034+
successful_ids = {e.transaction_id for e in successfully_batched_edits}
1035+
failed_ids = {e.transaction_id for e in failed_batch_edits}
1036+
overlap = successful_ids & failed_ids
1037+
assert not overlap, (
1038+
f"Found {len(overlap)} edits in both successful and failed batches - "
1039+
"this indicates a race condition or logic error"
1040+
)
1041+
1042+
# Process remaining edits (non-merchant + failed batch updates) individually
1043+
edits_to_process = merchant_edits + other_edits
1044+
1045+
if edits_to_process:
1046+
logger.info(
1047+
f"Processing {len(edits_to_process)} edits individually "
1048+
f"({len(merchant_edits)} merchant, {len(other_edits)} other)"
1049+
)
1050+
1051+
# Group edits by transaction ID
1052+
edits_by_txn: Dict[str, Dict[str, Any]] = {}
1053+
for edit in edits_to_process:
1054+
txn_id = edit.transaction_id
1055+
if txn_id not in edits_by_txn:
1056+
edits_by_txn[txn_id] = {}
1057+
1058+
if edit.field == "merchant":
1059+
edits_by_txn[txn_id]["merchant_name"] = edit.new_value
1060+
elif edit.field == "category":
1061+
edits_by_txn[txn_id]["category_id"] = edit.new_value
1062+
elif edit.field == "hide_from_reports":
1063+
edits_by_txn[txn_id]["hide_from_reports"] = edit.new_value
1064+
1065+
# Create update tasks
1066+
tasks = []
1067+
for txn_id, updates in edits_by_txn.items():
1068+
tasks.append(self.mm.update_transaction(transaction_id=txn_id, **updates))
1069+
1070+
# Execute in parallel
1071+
results = await asyncio.gather(*tasks, return_exceptions=True)
1072+
1073+
# Count successes and failures
1074+
for i, result in enumerate(results):
1075+
if isinstance(result, Exception):
1076+
failure_count += 1
1077+
logger.error(
1078+
f"Transaction update {i + 1}/{len(results)} FAILED: {result}",
1079+
exc_info=result,
1080+
)
1081+
1082+
# Check if it's a 401/auth error
1083+
error_str = str(result).lower()
1084+
if "401" in error_str or "unauthorized" in error_str:
1085+
auth_errors.append(result)
1086+
else:
1087+
success_count += 1
9791088

9801089
logger.info(f"Commit completed: {success_count} succeeded, {failure_count} failed")
9811090

0 commit comments

Comments
 (0)