Skip to content

Commit a54876a

Browse files
authored
[iris] S3Lease: conditional writes for S3/R2 locking (#3874)
- Add `S3Lease` backend to `distributed_lock.py` that uses S3 conditional writes (`If-None-Match: *` for create, `If-Match: <etag>` for update) instead of the advisory write-sleep-readback in `FsspecLease`. - Route `s3://` paths to `S3Lease` in `create_lock()` factory. `FsspecLease` remains as fallback for non-S3/non-GCS/non-local schemes. - Uses botocore directly (already available via s3fs → aiobotocore) with `before-sign` event injection to add conditional headers that boto3/botocore don't expose natively. - Supports custom endpoints via `AWS_ENDPOINT_URL_S3` / `AWS_ENDPOINT_URL` for R2/MinIO.
1 parent 2eede72 commit a54876a

1 file changed

Lines changed: 98 additions & 1 deletion

File tree

lib/iris/src/iris/distributed_lock.py

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@
44
"""Generic distributed locking with lease-based semantics.
55
66
Provides lease-based distributed locks backed by a single lock file.
7-
Three backend implementations are available:
7+
Four backend implementations are available:
88
99
- **GcsLease**: generation-based conditional writes for atomicity.
10+
- **S3Lease**: conditional writes (``If-None-Match`` / ``If-Match``) for S3-compatible stores.
1011
- **LocalFileLease**: ``fcntl`` file locking for mutual exclusion.
1112
- **FsspecLease**: best-effort write-then-read-back (advisory only).
1213
@@ -64,6 +65,10 @@ def _is_gcs_path(path: str) -> bool:
6465
return path.startswith("gs://")
6566

6667

68+
def _is_s3_path(path: str) -> bool:
69+
return path.startswith("s3://")
70+
71+
6772
# ---------------------------------------------------------------------------
6873
# Abstract base
6974
# ---------------------------------------------------------------------------
@@ -224,6 +229,96 @@ def _delete(self) -> None:
224229
logger.debug("Lock blob %s already deleted", self.lock_path)
225230

226231

232+
# ---------------------------------------------------------------------------
233+
# S3 backend
234+
# ---------------------------------------------------------------------------
235+
236+
237+
class S3Lease(DistributedLease):
238+
"""S3-backed lease using conditional writes (If-None-Match / If-Match).
239+
240+
Works with any S3-compatible store that supports conditional PutObject
241+
(AWS S3, Cloudflare R2, MinIO, etc.). Uses botocore directly (available
242+
transitively via s3fs) to inject the conditional headers that the
243+
high-level SDKs do not expose.
244+
"""
245+
246+
def __init__(self, lock_path: str, worker_id: str | None = None):
247+
super().__init__(lock_path, worker_id)
248+
self._last_etag: str | None = None
249+
250+
@staticmethod
251+
def _parse_s3_path(path: str) -> tuple[str, str]:
252+
path = path[5:] # Remove s3://
253+
bucket, _, key = path.partition("/")
254+
return (bucket, key)
255+
256+
@staticmethod
257+
def _make_client():
258+
import botocore.config
259+
import botocore.session
260+
261+
session = botocore.session.get_session()
262+
endpoint_url = os.environ.get("AWS_ENDPOINT_URL_S3") or os.environ.get("AWS_ENDPOINT_URL")
263+
kwargs: dict = {}
264+
if endpoint_url:
265+
kwargs["endpoint_url"] = endpoint_url
266+
# Some S3-compatible endpoints (CoreWeave cwobject.com, cwlota.com)
267+
# reject path-style requests. Virtual-host style is the modern
268+
# default for AWS S3 anyway, so always prefer it when a custom
269+
# endpoint is in use.
270+
kwargs["config"] = botocore.config.Config(s3={"addressing_style": "virtual"})
271+
return session.create_client("s3", **kwargs)
272+
273+
def _read_with_generation(self) -> tuple[int, Lease | None]:
274+
from botocore.exceptions import ClientError
275+
276+
client = self._make_client()
277+
bucket, key = self._parse_s3_path(self.lock_path)
278+
try:
279+
resp = client.get_object(Bucket=bucket, Key=key)
280+
data = json.loads(resp["Body"].read())
281+
self._last_etag = resp["ETag"]
282+
return (1, Lease(**data))
283+
except ClientError as e:
284+
if e.response["Error"]["Code"] == "NoSuchKey":
285+
self._last_etag = None
286+
return (0, None)
287+
raise
288+
289+
def _write(self, lease: Lease, if_generation_match: int) -> None:
290+
from botocore.exceptions import ClientError
291+
292+
client = self._make_client()
293+
bucket, key = self._parse_s3_path(self.lock_path)
294+
body = json.dumps(asdict(lease)).encode()
295+
296+
if if_generation_match == 0:
297+
condition_header = {"If-None-Match": "*"}
298+
else:
299+
assert self._last_etag is not None, "Cannot conditionally update without a prior read"
300+
condition_header = {"If-Match": self._last_etag}
301+
302+
def inject_condition(request, **kwargs):
303+
for key, value in condition_header.items():
304+
request.headers[key] = value
305+
306+
client.meta.events.register("before-sign.s3.PutObject", inject_condition)
307+
try:
308+
client.put_object(Bucket=bucket, Key=key, Body=body)
309+
except ClientError as e:
310+
if e.response["Error"]["Code"] in ("PreconditionFailed", "412"):
311+
raise FileExistsError(f"Conditional write failed for {self.lock_path}") from e
312+
raise
313+
finally:
314+
client.meta.events.unregister("before-sign.s3.PutObject", inject_condition)
315+
316+
def _delete(self) -> None:
317+
client = self._make_client()
318+
bucket, key = self._parse_s3_path(self.lock_path)
319+
client.delete_object(Bucket=bucket, Key=key)
320+
321+
227322
# ---------------------------------------------------------------------------
228323
# Local filesystem backend
229324
# ---------------------------------------------------------------------------
@@ -331,6 +426,8 @@ def create_lock(lock_path: str, worker_id: str | None = None) -> DistributedLeas
331426
"""Create the appropriate lease implementation for *lock_path*."""
332427
if _is_gcs_path(lock_path):
333428
return GcsLease(lock_path, worker_id)
429+
elif _is_s3_path(lock_path):
430+
return S3Lease(lock_path, worker_id)
334431
elif _is_local_path(lock_path):
335432
return LocalFileLease(lock_path, worker_id)
336433
else:

0 commit comments

Comments
 (0)