|
7 | 7 | from functools import partial |
8 | 8 | from typing import Any, Literal, Self, TypeAlias |
9 | 9 |
|
10 | | -import fsspec |
11 | 10 | import numpy as np |
| 11 | +from object_store import ObjectStore |
12 | 12 |
|
13 | 13 | import icechunk as ic |
14 | 14 | import icechunk.xarray |
@@ -129,22 +129,30 @@ def env_vars(self) -> dict[str, str]: |
129 | 129 | # return {"AWS_ENDPOINT_URL_IAM": "https://fly.iam.storage.tigris.dev"} |
130 | 130 | return {} |
131 | 131 |
|
132 | | - @property |
133 | | - def protocol(self) -> str: |
134 | | - if self.store in ("s3", "tigris"): |
135 | | - protocol = "s3" |
136 | | - elif self.store == "gcs": |
137 | | - protocol = "gcs" |
138 | | - else: |
139 | | - protocol = "file" |
140 | | - return protocol |
| 132 | + _SCHEMES = {"s3": "s3", "s3_ob": "s3", "gcs": "gs", "tigris": "s3", "r2": "s3"} |
| 133 | + |
| 134 | + def clear_prefix(self) -> None: |
| 135 | + """Delete all objects under this prefix using object_store.""" |
| 136 | + import shutil |
141 | 137 |
|
142 | | - def clear_uri(self) -> str: |
143 | | - """URI to clear when re-creating data from scratch.""" |
144 | 138 | if self.store == "local": |
145 | | - return f"{self.protocol}://{self.path}" |
146 | | - else: |
147 | | - return f"{self.protocol}://{self.bucket}/{self.prefix}" |
| 139 | + shutil.rmtree(self.path, ignore_errors=True) |
| 140 | + return |
| 141 | + |
| 142 | + scheme = self._SCHEMES.get(self.store) |
| 143 | + if scheme is None: |
| 144 | + warnings.warn( |
| 145 | + f"Clearing not supported for store {self.store!r}", |
| 146 | + RuntimeWarning, |
| 147 | + stacklevel=2, |
| 148 | + ) |
| 149 | + return |
| 150 | + |
| 151 | + store_url = f"{scheme}://{self.bucket}" |
| 152 | + store = ObjectStore(store_url) |
| 153 | + objects = store.list(prefix=self.prefix) |
| 154 | + for obj in objects: |
| 155 | + store.delete(obj["path"]) |
148 | 156 |
|
149 | 157 | def get_coiled_kwargs(self) -> dict[str, str]: |
150 | 158 | assert self.store is not None, ( |
@@ -174,22 +182,7 @@ def create( |
174 | 182 | self, clear: bool = False, config: ic.RepositoryConfig | None = None |
175 | 183 | ) -> ic.Repository: |
176 | 184 | if clear: |
177 | | - clear_uri = self.storage_config.clear_uri() |
178 | | - if clear_uri is None: |
179 | | - raise NotImplementedError |
180 | | - if self.storage_config.protocol not in ["file", "s3", "gcs"]: |
181 | | - warnings.warn( |
182 | | - f"Only clearing of GCS, S3-compatible URIs supported at the moment. Received {clear_uri!r}", |
183 | | - RuntimeWarning, |
184 | | - stacklevel=2, |
185 | | - ) |
186 | | - else: |
187 | | - fs = fsspec.filesystem(self.storage_config.protocol) |
188 | | - try: |
189 | | - logger.info(f"Clearing prefix: {clear_uri!r}") |
190 | | - fs.rm(clear_uri, recursive=True) |
191 | | - except FileNotFoundError: |
192 | | - pass |
| 185 | + self.storage_config.clear_prefix() |
193 | 186 | logger.info(repr(self.storage)) |
194 | 187 | return ic.Repository.create(self.storage, config=config) |
195 | 188 |
|
|
0 commit comments