Skip to content

Commit 69fb4ff

Browse files
malvfrclaude
andcommitted
fix(salesforce): lock-aware token refresh for RTR safety
Replaces the fire-and-forget login() with a distributed-lock-aware implementation that coordinates with Argo (same lock protocol used by mk-node-libs lockAwareRefreshFn). Fixes the 15-min crash where the refresh timer reused a revoked refresh_token after Salesforce RTR rotation. - Acquires Argo refresh-lock before calling SF - Re-reads credentials after lock acquire (adopts if another service already refreshed) - Persists new AT + RT to Argo atomically via lock-release endpoint - Falls back to simple (in-memory-only) login when ARGO_URL/TENANT env vars are absent Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
1 parent cddd1ab commit 69fb4ff

1 file changed

Lines changed: 170 additions & 15 deletions

File tree

tap_salesforce/salesforce/credentials.py

Lines changed: 170 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
import logging
2+
import os
23
import threading
4+
import time
5+
import uuid as _uuid_mod
36
from collections import namedtuple
47

58
import requests
@@ -61,7 +64,6 @@ def from_credentials(cls, credentials, **kwargs):
6164

6265

6366
class SalesforceAuthOAuth(SalesforceAuth):
64-
# The minimum expiration setting for SF Refresh Tokens is 15 minutes
6567
REFRESH_TOKEN_EXPIRATION_PERIOD = 900
6668

6769
@property
@@ -70,45 +72,198 @@ def _login_body(self):
7072

7173
@property
7274
def _login_url(self):
73-
# Simulator override env var for login URL
74-
import os
7575
override = os.environ.get("SIMULATOR_TAP_SALESFORCE_LOGIN_URL")
7676
if override:
7777
return override
78-
79-
login_url = "https://login.salesforce.com/services/oauth2/token"
80-
8178
if self.is_sandbox:
82-
login_url = "https://test.salesforce.com/services/oauth2/token"
83-
84-
return login_url
79+
return "https://test.salesforce.com/services/oauth2/token"
80+
return "https://login.salesforce.com/services/oauth2/token"
8581

8682
def login(self):
87-
try:
88-
LOGGER.info("Attempting login via OAuth2")
83+
argo_url = os.environ.get("ARGO_URL")
84+
tenant = os.environ.get("TENANT")
85+
if argo_url and tenant:
86+
self._lock_aware_login(argo_url, tenant)
87+
else:
88+
LOGGER.warning(
89+
"SF RTR: ARGO_URL or TENANT not set — using simple login; "
90+
"rotated tokens will not be persisted (RTR unsafe)"
91+
)
92+
self._simple_login()
8993

94+
# ──────────────────────────────────────────────────────────
95+
# Simple login (fallback — no lock, no writeback)
96+
# ──────────────────────────────────────────────────────────
97+
98+
def _simple_login(self):
99+
resp = None
100+
try:
101+
LOGGER.info("Attempting login via OAuth2 (simple)")
90102
resp = requests.post(
91103
self._login_url,
92104
data=self._login_body,
93105
headers={"Content-Type": "application/x-www-form-urlencoded"},
94106
)
95-
96107
resp.raise_for_status()
97108
auth = resp.json()
98-
99109
LOGGER.info("OAuth2 login successful")
100110
self._access_token = auth["access_token"]
101111
self._instance_url = auth["instance_url"]
112+
new_rt = auth.get("refresh_token")
113+
if new_rt:
114+
# ponytail: in-memory only — no Argo writeback; next call uses new RT
115+
self._credentials = self._credentials._replace(refresh_token=new_rt)
102116
except Exception as e:
103117
error_message = str(e)
104-
if resp:
105-
error_message = error_message + f", Response from Salesforce: {resp.text}"
118+
if resp is not None:
119+
error_message += f", Response from Salesforce: {resp.text}"
106120
raise Exception(error_message) from e
107121
finally:
108122
LOGGER.info("Starting new login timer")
109123
self.login_timer = threading.Timer(self.REFRESH_TOKEN_EXPIRATION_PERIOD, self.login)
110124
self.login_timer.start()
111125

126+
# ──────────────────────────────────────────────────────────
127+
# Lock-aware login — mirrors lockAwareRefreshFn in mk-node-libs
128+
# ──────────────────────────────────────────────────────────
129+
130+
def _lock_aware_login(self, argo_url: str, tenant: str) -> None:
131+
argo_api_key = os.environ.get("ARGO_CONNECTOR_API_KEY", "")
132+
lock_id = str(_uuid_mod.uuid4())
133+
old_rt = self._credentials.refresh_token
134+
lock_acquired = False
135+
136+
try:
137+
LOGGER.info("SF RTR: acquiring refresh lock tenant=%s lock_id=%s", tenant, lock_id)
138+
self._acquire_lock(argo_url, argo_api_key, tenant, lock_id)
139+
lock_acquired = True
140+
LOGGER.info("SF RTR: lock acquired tenant=%s lock_id=%s", tenant, lock_id)
141+
142+
# Re-read after lock: another service may have refreshed while we waited.
143+
# Skip on first-ever login (self._access_token is None) — we must always
144+
# call SF on startup to get a fresh AT regardless of what Argo has cached.
145+
if self._access_token is not None:
146+
fresh = self._read_credentials_from_argo(argo_url, argo_api_key, tenant)
147+
if fresh and fresh.get("access_token") and fresh["access_token"] != self._access_token:
148+
LOGGER.info(
149+
"SF RTR: adopting tokens refreshed by another service — releasing lock "
150+
"tenant=%s lock_id=%s",
151+
tenant, lock_id,
152+
)
153+
self._access_token = fresh["access_token"]
154+
new_rt = fresh.get("refresh_token")
155+
if new_rt:
156+
self._credentials = self._credentials._replace(refresh_token=new_rt)
157+
self._instance_url = fresh.get("instance_url", self._instance_url)
158+
self._release_lock(argo_url, argo_api_key, tenant, lock_id)
159+
return
160+
161+
# Token still stale — call Salesforce.
162+
LOGGER.info("SF RTR: calling Salesforce tenant=%s lock_id=%s", tenant, lock_id)
163+
resp = requests.post(
164+
self._login_url,
165+
data=self._login_body,
166+
headers={"Content-Type": "application/x-www-form-urlencoded"},
167+
timeout=30,
168+
)
169+
resp.raise_for_status()
170+
auth = resp.json()
171+
LOGGER.info("SF RTR: Salesforce returned new tokens tenant=%s lock_id=%s", tenant, lock_id)
172+
173+
self._access_token = auth["access_token"]
174+
self._instance_url = auth.get("instance_url", self._instance_url)
175+
new_rt = auth.get("refresh_token")
176+
if new_rt:
177+
self._credentials = self._credentials._replace(refresh_token=new_rt)
178+
179+
# Persist tokens to Argo and release lock atomically.
180+
self._release_lock_with_tokens(argo_url, argo_api_key, tenant, lock_id, old_rt, auth)
181+
LOGGER.info("SF RTR: tokens persisted and lock released tenant=%s lock_id=%s", tenant, lock_id)
182+
183+
except Exception as e:
184+
LOGGER.error("SF RTR: error during lock-aware login tenant=%s lock_id=%s: %s", tenant, lock_id, e)
185+
if lock_acquired:
186+
try:
187+
self._release_lock(argo_url, argo_api_key, tenant, lock_id)
188+
except Exception as re:
189+
LOGGER.error("SF RTR: failed to release lock after error tenant=%s lock_id=%s: %s", tenant, lock_id, re)
190+
raise
191+
finally:
192+
LOGGER.info("Starting new login timer")
193+
self.login_timer = threading.Timer(self.REFRESH_TOKEN_EXPIRATION_PERIOD, self.login)
194+
self.login_timer.start()
195+
196+
# ── Argo helpers ───────────────────────────────────────────
197+
198+
def _acquire_lock(self, argo_url: str, argo_api_key: str, tenant: str, lock_id: str) -> None:
199+
max_wait_s = 25.0
200+
delay_s = 0.2
201+
start = time.monotonic()
202+
while True:
203+
resp = requests.post(
204+
f"{argo_url}/v1/tenant/{tenant}/connectors/salesforce/refresh-lock",
205+
json={"lockId": lock_id, "calledBy": "mk-tap-salesforce"},
206+
headers={"X-Api-Key": argo_api_key},
207+
timeout=10,
208+
)
209+
resp.raise_for_status()
210+
if resp.json().get("acquired"):
211+
return
212+
elapsed = time.monotonic() - start
213+
if elapsed + delay_s >= max_wait_s:
214+
raise Exception(
215+
f"SF RTR: lock acquire timeout after {max_wait_s}s "
216+
f"(tenant={tenant}, lock_id={lock_id})"
217+
)
218+
time.sleep(delay_s)
219+
delay_s = min(delay_s * 2, 2.0)
220+
221+
def _read_credentials_from_argo(self, argo_url: str, argo_api_key: str, tenant: str) -> dict:
222+
resp = requests.get(
223+
f"{argo_url}/v1/tenant/{tenant}/connectors/salesforce",
224+
headers={"X-Api-Key": argo_api_key},
225+
timeout=10,
226+
)
227+
resp.raise_for_status()
228+
return resp.json().get("credentials") or {}
229+
230+
def _release_lock(self, argo_url: str, argo_api_key: str, tenant: str, lock_id: str) -> None:
231+
resp = requests.put(
232+
f"{argo_url}/v1/tenant/{tenant}/connectors/salesforce/refresh-lock/release",
233+
json={"lockId": lock_id, "calledBy": "mk-tap-salesforce"},
234+
headers={"X-Api-Key": argo_api_key},
235+
timeout=10,
236+
)
237+
resp.raise_for_status()
238+
239+
def _release_lock_with_tokens(
240+
self,
241+
argo_url: str,
242+
argo_api_key: str,
243+
tenant: str,
244+
lock_id: str,
245+
old_rt: str,
246+
tokens: dict,
247+
) -> None:
248+
body = {
249+
"lockId": lock_id,
250+
"oldRefreshToken": old_rt,
251+
"tokens": {
252+
"access_token": tokens["access_token"],
253+
**({"refresh_token": tokens["refresh_token"]} if tokens.get("refresh_token") else {}),
254+
**({"signature": tokens["signature"]} if tokens.get("signature") else {}),
255+
**({"issued_at": tokens["issued_at"]} if tokens.get("issued_at") else {}),
256+
},
257+
"calledBy": "mk-tap-salesforce",
258+
}
259+
resp = requests.put(
260+
f"{argo_url}/v1/tenant/{tenant}/connectors/salesforce/refresh-lock/release",
261+
json=body,
262+
headers={"X-Api-Key": argo_api_key},
263+
timeout=10,
264+
)
265+
resp.raise_for_status()
266+
112267

113268
class SalesforceAuthPassword(SalesforceAuth):
114269
def login(self):

0 commit comments

Comments
 (0)