Skip to content

Commit 4be897b

Browse files
malvfrclaude
andcommitted
fix(salesforce): lock-aware token refresh for RTR safety
Replaces the fire-and-forget login() with a distributed-lock-aware implementation that coordinates with Argo (same lock protocol used by mk-node-libs lockAwareRefreshFn). Fixes the 15-min crash where the refresh timer reused a revoked refresh_token after Salesforce RTR rotation. - Acquires Argo refresh-lock before calling SF - Re-reads credentials after lock acquire (adopts if another service already refreshed) - Persists new AT + RT to Argo atomically via lock-release endpoint - Falls back to simple (in-memory-only) login when ARGO_URL/TENANT env vars are absent Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
1 parent 67da403 commit 4be897b

1 file changed

Lines changed: 170 additions & 12 deletions

File tree

tap_salesforce/salesforce/credentials.py

Lines changed: 170 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
import logging
2+
import os
23
import threading
4+
import time
5+
import uuid as _uuid_mod
36
from collections import namedtuple
47

58
import requests
@@ -61,7 +64,6 @@ def from_credentials(cls, credentials, **kwargs):
6164

6265

6366
class SalesforceAuthOAuth(SalesforceAuth):
64-
# The minimum expiration setting for SF Refresh Tokens is 15 minutes
6567
REFRESH_TOKEN_EXPIRATION_PERIOD = 900
6668

6769
@property
@@ -70,39 +72,195 @@ def _login_body(self):
7072

7173
@property
7274
def _login_url(self):
73-
login_url = "https://login.salesforce.com/services/oauth2/token"
74-
7575
if self.is_sandbox:
76-
login_url = "https://test.salesforce.com/services/oauth2/token"
77-
78-
return login_url
76+
return "https://test.salesforce.com/services/oauth2/token"
77+
return "https://login.salesforce.com/services/oauth2/token"
7978

8079
def login(self):
81-
try:
82-
LOGGER.info("Attempting login via OAuth2")
80+
argo_url = os.environ.get("ARGO_URL")
81+
tenant = os.environ.get("TENANT")
82+
if argo_url and tenant:
83+
self._lock_aware_login(argo_url, tenant)
84+
else:
85+
LOGGER.warning(
86+
"SF RTR: ARGO_URL or TENANT not set — using simple login; "
87+
"rotated tokens will not be persisted (RTR unsafe)"
88+
)
89+
self._simple_login()
90+
91+
# ──────────────────────────────────────────────────────────
92+
# Simple login (fallback — no lock, no writeback)
93+
# ──────────────────────────────────────────────────────────
8394

95+
def _simple_login(self):
96+
resp = None
97+
try:
98+
LOGGER.info("Attempting login via OAuth2 (simple)")
8499
resp = requests.post(
85100
self._login_url,
86101
data=self._login_body,
87102
headers={"Content-Type": "application/x-www-form-urlencoded"},
88103
)
89-
90104
resp.raise_for_status()
91105
auth = resp.json()
92-
93106
LOGGER.info("OAuth2 login successful")
94107
self._access_token = auth["access_token"]
95108
self._instance_url = auth["instance_url"]
109+
new_rt = auth.get("refresh_token")
110+
if new_rt:
111+
# ponytail: in-memory only — no Argo writeback; next call uses new RT
112+
self._credentials = self._credentials._replace(refresh_token=new_rt)
96113
except Exception as e:
97114
error_message = str(e)
98-
if resp:
99-
error_message = error_message + f", Response from Salesforce: {resp.text}"
115+
if resp is not None:
116+
error_message += f", Response from Salesforce: {resp.text}"
100117
raise Exception(error_message) from e
101118
finally:
102119
LOGGER.info("Starting new login timer")
103120
self.login_timer = threading.Timer(self.REFRESH_TOKEN_EXPIRATION_PERIOD, self.login)
104121
self.login_timer.start()
105122

123+
# ──────────────────────────────────────────────────────────
124+
# Lock-aware login — mirrors lockAwareRefreshFn in mk-node-libs
125+
# ──────────────────────────────────────────────────────────
126+
127+
def _lock_aware_login(self, argo_url: str, tenant: str) -> None:
128+
argo_api_key = os.environ.get("ARGO_CONNECTOR_API_KEY", "")
129+
lock_id = str(_uuid_mod.uuid4())
130+
old_rt = self._credentials.refresh_token
131+
lock_acquired = False
132+
133+
try:
134+
LOGGER.info("SF RTR: acquiring refresh lock tenant=%s lock_id=%s", tenant, lock_id)
135+
self._acquire_lock(argo_url, argo_api_key, tenant, lock_id)
136+
lock_acquired = True
137+
LOGGER.info("SF RTR: lock acquired tenant=%s lock_id=%s", tenant, lock_id)
138+
139+
# Re-read after lock: another service may have refreshed while we waited.
140+
# Skip on first-ever login (self._access_token is None) — we must always
141+
# call SF on startup to get a fresh AT regardless of what Argo has cached.
142+
if self._access_token is not None:
143+
fresh = self._read_credentials_from_argo(argo_url, argo_api_key, tenant)
144+
if fresh and fresh.get("access_token") and fresh["access_token"] != self._access_token:
145+
LOGGER.info(
146+
"SF RTR: adopting tokens refreshed by another service — releasing lock "
147+
"tenant=%s lock_id=%s",
148+
tenant, lock_id,
149+
)
150+
self._access_token = fresh["access_token"]
151+
new_rt = fresh.get("refresh_token")
152+
if new_rt:
153+
self._credentials = self._credentials._replace(refresh_token=new_rt)
154+
self._instance_url = fresh.get("instance_url", self._instance_url)
155+
self._release_lock(argo_url, argo_api_key, tenant, lock_id)
156+
return
157+
158+
# Token still stale — call Salesforce.
159+
LOGGER.info("SF RTR: calling Salesforce tenant=%s lock_id=%s", tenant, lock_id)
160+
resp = requests.post(
161+
self._login_url,
162+
data=self._login_body,
163+
headers={"Content-Type": "application/x-www-form-urlencoded"},
164+
timeout=30,
165+
)
166+
resp.raise_for_status()
167+
auth = resp.json()
168+
LOGGER.info("SF RTR: Salesforce returned new tokens tenant=%s lock_id=%s", tenant, lock_id)
169+
170+
self._access_token = auth["access_token"]
171+
self._instance_url = auth.get("instance_url", self._instance_url)
172+
new_rt = auth.get("refresh_token")
173+
if new_rt:
174+
self._credentials = self._credentials._replace(refresh_token=new_rt)
175+
176+
# Persist tokens to Argo and release lock atomically.
177+
self._release_lock_with_tokens(argo_url, argo_api_key, tenant, lock_id, old_rt, auth)
178+
LOGGER.info("SF RTR: tokens persisted and lock released tenant=%s lock_id=%s", tenant, lock_id)
179+
180+
except Exception as e:
181+
LOGGER.error("SF RTR: error during lock-aware login tenant=%s lock_id=%s: %s", tenant, lock_id, e)
182+
if lock_acquired:
183+
try:
184+
self._release_lock(argo_url, argo_api_key, tenant, lock_id)
185+
except Exception as re:
186+
LOGGER.error("SF RTR: failed to release lock after error tenant=%s lock_id=%s: %s", tenant, lock_id, re)
187+
raise
188+
finally:
189+
LOGGER.info("Starting new login timer")
190+
self.login_timer = threading.Timer(self.REFRESH_TOKEN_EXPIRATION_PERIOD, self.login)
191+
self.login_timer.start()
192+
193+
# ── Argo helpers ───────────────────────────────────────────
194+
195+
def _acquire_lock(self, argo_url: str, argo_api_key: str, tenant: str, lock_id: str) -> None:
196+
max_wait_s = 25.0
197+
delay_s = 0.2
198+
start = time.monotonic()
199+
while True:
200+
resp = requests.post(
201+
f"{argo_url}/v1/tenant/{tenant}/connectors/salesforce/refresh-lock",
202+
json={"lockId": lock_id, "calledBy": "mk-tap-salesforce"},
203+
headers={"X-Api-Key": argo_api_key},
204+
timeout=10,
205+
)
206+
resp.raise_for_status()
207+
if resp.json().get("acquired"):
208+
return
209+
elapsed = time.monotonic() - start
210+
if elapsed + delay_s >= max_wait_s:
211+
raise Exception(
212+
f"SF RTR: lock acquire timeout after {max_wait_s}s "
213+
f"(tenant={tenant}, lock_id={lock_id})"
214+
)
215+
time.sleep(delay_s)
216+
delay_s = min(delay_s * 2, 2.0)
217+
218+
def _read_credentials_from_argo(self, argo_url: str, argo_api_key: str, tenant: str) -> dict:
219+
resp = requests.get(
220+
f"{argo_url}/v1/tenant/{tenant}/connectors/salesforce",
221+
headers={"X-Api-Key": argo_api_key},
222+
timeout=10,
223+
)
224+
resp.raise_for_status()
225+
return resp.json().get("credentials") or {}
226+
227+
def _release_lock(self, argo_url: str, argo_api_key: str, tenant: str, lock_id: str) -> None:
228+
resp = requests.put(
229+
f"{argo_url}/v1/tenant/{tenant}/connectors/salesforce/refresh-lock/release",
230+
json={"lockId": lock_id, "calledBy": "mk-tap-salesforce"},
231+
headers={"X-Api-Key": argo_api_key},
232+
timeout=10,
233+
)
234+
resp.raise_for_status()
235+
236+
def _release_lock_with_tokens(
237+
self,
238+
argo_url: str,
239+
argo_api_key: str,
240+
tenant: str,
241+
lock_id: str,
242+
old_rt: str,
243+
tokens: dict,
244+
) -> None:
245+
body = {
246+
"lockId": lock_id,
247+
"oldRefreshToken": old_rt,
248+
"tokens": {
249+
"access_token": tokens["access_token"],
250+
**({"refresh_token": tokens["refresh_token"]} if tokens.get("refresh_token") else {}),
251+
**({"signature": tokens["signature"]} if tokens.get("signature") else {}),
252+
**({"issued_at": tokens["issued_at"]} if tokens.get("issued_at") else {}),
253+
},
254+
"calledBy": "mk-tap-salesforce",
255+
}
256+
resp = requests.put(
257+
f"{argo_url}/v1/tenant/{tenant}/connectors/salesforce/refresh-lock/release",
258+
json=body,
259+
headers={"X-Api-Key": argo_api_key},
260+
timeout=10,
261+
)
262+
resp.raise_for_status()
263+
106264

107265
class SalesforceAuthPassword(SalesforceAuth):
108266
def login(self):

0 commit comments

Comments
 (0)