Skip to content

fix(salesforce): lock-aware token refresh for RTR safety#97

Open
malvfr wants to merge 48 commits into
MeltanoLabs:mainfrom
HGData:fix/rtr-lock-aware-refresh
Open

fix(salesforce): lock-aware token refresh for RTR safety#97
malvfr wants to merge 48 commits into
MeltanoLabs:mainfrom
HGData:fix/rtr-lock-aware-refresh

Conversation

@malvfr

@malvfr malvfr commented Jun 23, 2026

Copy link
Copy Markdown

Summary

  • Fixes the RTR crash where SalesforceAuthOAuth.login() ignored the rotated refresh_token returned by Salesforce, leaving a revoked token in memory. The 15-minute refresh timer would then call SF again with that revoked token and get invalid_grant.
  • Adds Argo distributed lock protocol (same as lockAwareRefreshFn in mk-node-libs) so token rotation is coordinated across mk-tap-salesforce, mk-bongo, mk-push, and mk-pull.
  • Falls back to a simple (un-coordinated) in-memory login when ARGO_URL or TENANT env vars are absent — maintains backward-compat for local/test runs.

What changed

tap_salesforce/salesforce/credentials.pySalesforceAuthOAuth only:

  1. login() now dispatches to _lock_aware_login() (when Argo env vars present) or _simple_login() (fallback).
  2. _simple_login() — original behavior plus captures refresh_token from SF response in-memory.
  3. _lock_aware_login() — acquires Argo refresh-lock → re-reads credentials (adopts if another service already refreshed) → calls SF → persists new AT + RT to Argo atomically via lock-release endpoint → releases lock.
  4. Helper methods: _acquire_lock, _read_credentials_from_argo, _release_lock, _release_lock_with_tokens.

All other classes (SalesforceAuth, SalesforceAuthPassword, OAuthCredentials, PasswordCredentials, parse_credentials) are unchanged.

Argo lock endpoints used

  • POST /v1/tenant/{tenant}/connectors/salesforce/refresh-lock — acquire
  • GET /v1/tenant/{tenant}/connectors/salesforce — re-read after lock
  • PUT /v1/tenant/{tenant}/connectors/salesforce/refresh-lock/release — release (with or without tokens)

Test plan

  • Run tap locally with ARGO_URL/TENANT unset → confirm warning log + simple login path (existing behavior)
  • Run tap with ARGO_URL/TENANT/ARGO_CONNECTOR_API_KEY set against staging Argo → confirm lock acquired, tokens written to Argo on refresh
  • Simulate concurrent refresh (two tap processes, same tenant) → confirm second process adopts tokens from first and releases lock without calling SF
  • Confirm invalid_grant no longer occurs after 15 minutes when RTR is enabled on the SF connected app

🤖 Generated with Claude Code

richard-merrick and others added 30 commits August 4, 2025 14:14
fix: add a hard 12 month limit to tasks extraction
fix: increase maximum fields limit to 500 and ensure unique field selection
fix (salesforce): prioritise pull config selected fields
RGI-368 : Salesforce Quota Tracking
RGI-552 - tap-salesforce : Fix concurrent write race condition and bookmark corruption
RGI:646 -Extended bisection logic to also cover OPERATION_TOO_LARGE errors.
fix: Switch from queryAll to query endpoint to exclude soft-deleted records. This will continue bisect until it hits the 1 hour floor
soundarya-sambath-23 and others added 11 commits April 2, 2026 10:53
RGI: 755 - MDI since handling the tasks queryall / data too large issue, there is duplication in events being extracted
https://hgdata.atlassian.net/browse/RGI-765

When a new column/attribute is added to a Salesforce pull config, the system previously had no way to automatically backfill historical data for that column. The start date was also set too recently (2025-01-01) to capture full history for core entity objects.

The objective it to have a full history for data from start date(2000-01-01) for Lead, Contact, Account, Opportunity, User and 9 months max data for Task,Campaign,CampaignMember when a new column is in added in UI. This also triggers dbt full refresh when a new column is added
RGI-964 : Add per-SFDC-call observability to surface composite-batch cost
feat: add support for login url overrides for simulators
Replaces the fire-and-forget login() with a distributed-lock-aware
implementation that coordinates with Argo (same lock protocol used by
mk-node-libs lockAwareRefreshFn). Fixes the 15-min crash where the
refresh timer reused a revoked refresh_token after Salesforce RTR
rotation.

- Acquires Argo refresh-lock before calling SF
- Re-reads credentials after lock acquire (adopts if another service
  already refreshed)
- Persists new AT + RT to Argo atomically via lock-release endpoint
- Falls back to simple (in-memory-only) login when ARGO_URL/TENANT
  env vars are absent

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
@malvfr malvfr force-pushed the fix/rtr-lock-aware-refresh branch from 4be897b to 69fb4ff Compare June 23, 2026 15:29
malvfr and others added 7 commits June 23, 2026 12:48
…rd, tests

- _release_lock_with_tokens: retry 3× with backoff; on all failures log
  CRITICAL with the new refresh_token (manual recovery path) then raise
- lock_released set optimistically before adopt-path _release_lock call so
  the except block never double-releases when Argo is down (I4 from review)
- Remove uuid alias (no clash), inline _read_credentials_from_argo (called
  once), dict comprehension for token body, delete ascii-art banners, log
  polling event in _acquire_lock, fix timeout check to cap sleep at remaining
  budget, warn on empty ARGO_CONNECTOR_API_KEY
- Add tests/test_credentials.py: 8 tests covering simple login, happy path,
  adopt path, first-login skip, persist retry, double-release guard, polling

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…rt; 22 tests

Logging:
- Log SF response body (status + text) before raise_for_status on SF 4xx —
  critical for invalid_grant diagnosis in RTR incidents
- Log event=sf_rtr_disabled when SF returns no new refresh_token (RTR off)
- Log event=adopt_no_rt WARNING when Argo returns new AT without refresh_token
  (in-memory RT may be stale; next cycle may get invalid_grant)
- Lock polling now includes heldBy/heldByService/ttlRemainingMs from Argo
  reject body in both lock_polling and lock_acquire_timeout events
- Log event=token_persist_rejected on 409 (no retry warranted)

Logic:
- _release_lock_with_tokens: fast-fail on Argo 409 — CAS/lock mismatch is
  permanent, retrying 3x wastes 4.5s and emits misleading warnings

Tests (22 total, up from 8):
- T6: 409 fast-fail verified (no sleep, no retry)
- T7: adopt-path no RT — old RT preserved, warning fired
- T8: persist all fail + cleanup release also fails
- SF timeout path, Argo GET fails after lock, SF 4xx body, timer restart on
  failure, acquire timeout with heldBy in message, API key warning

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…ignores

Argo validates requests with Authorization: Basic base64(key + ":").
All four Argo calls (_acquire_lock, re-read GET, _release_lock,
_release_lock_with_tokens) were sending X-Api-Key and would have
received 401 from staging/prod.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
- _simple_login: add timeout=30 to match lock-aware path
- RT in CRITICAL log: log only first 8 chars (hint) not the full value —
  refresh tokens are credentials, must not land in shared log systems
- Replace # ponytail: comment with a real explanation of intent
- SF timeout log: clarify that cleanup release is attempted (was misleading)
- lock_released=True pre-call: expand comment explaining the double-release
  guard is intentional (Copilot flagged as wrong; it is correct by design)
- wait_ms/total_ms added to lock_acquired and tokens_persisted log events
- test: monotonic mock gets 3 values after acquire_start call was added

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
- old_rt: add comment that it must be captured before any _credentials
  mutation — it is the CAS value sent to Argo's lock-release endpoint
- max_wait_s → poll_budget_s: the constant bounds polling budget, not
  hard wall-clock time (each HTTP call can consume up to timeout=10s
  before elapsed is checked); rename + comment avoids false precision

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants