From 046aef64c81e88ed99f4c0979524ff9ff538d046 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Wed, 25 Jan 2023 15:42:54 -0500 Subject: [PATCH 1/8] add http-sync --- fsspec/implementations/http_sync.py | 706 ++++++++++++++++++++++++++++ 1 file changed, 706 insertions(+) create mode 100644 fsspec/implementations/http_sync.py diff --git a/fsspec/implementations/http_sync.py b/fsspec/implementations/http_sync.py new file mode 100644 index 000000000..8ed37c972 --- /dev/null +++ b/fsspec/implementations/http_sync.py @@ -0,0 +1,706 @@ +from __future__ import absolute_import, division, print_function + +import io +import logging +import re +import weakref +from copy import copy +from urllib.parse import urlparse + +import requests +import yarl + +from fsspec.callbacks import _DEFAULT_CALLBACK +from fsspec.spec import AbstractBufferedFile, AbstractFileSystem +from fsspec.utils import DEFAULT_BLOCK_SIZE, isfilelike, nullcontext, tokenize + +from ..caching import AllBytes + +# https://stackoverflow.com/a/15926317/3821154 +ex = re.compile(r"""<(a|A)\s+(?:[^>]*?\s+)?(href|HREF)=["'](?P[^"']+)""") +ex2 = re.compile(r"""(?Phttp[s]?://[-a-zA-Z0-9@:%_+.~#?&/=]+)""") +logger = logging.getLogger("fsspec.http") + + +class HTTPFileSystem(AbstractFileSystem): + """ + Simple File-System for fetching data via HTTP(S) + + ``ls()`` is implemented by loading the parent page and doing a regex + match on the result. If simple_link=True, anything of the form + "http(s)://server.com/stuff?thing=other"; otherwise only links within + HTML href tags will be used. + """ + + sep = "/" + + def __init__( + self, + simple_links=True, + block_size=None, + same_scheme=True, + cache_type="bytes", + cache_options=None, + client_kwargs=None, + encoded=False, + **storage_options, + ): + """ + + Parameters + ---------- + block_size: int + Blocks to read bytes; if 0, will default to raw requests file-like + objects instead of HTTPFile instances + simple_links: bool + If True, will consider both HTML tags and anything that looks + like a URL; if False, will consider only the former. + same_scheme: True + When doing ls/glob, if this is True, only consider paths that have + http/https matching the input URLs. + size_policy: this argument is deprecated + client_kwargs: dict + Passed to aiohttp.ClientSession, see + https://docs.aiohttp.org/en/stable/client_reference.html + For example, ``{'auth': aiohttp.BasicAuth('user', 'pass')}`` + storage_options: key-value + Any other parameters passed on to requests + cache_type, cache_options: defaults used in open + """ + super().__init__(self, **storage_options) + self.block_size = block_size if block_size is not None else DEFAULT_BLOCK_SIZE + self.simple_links = simple_links + self.same_schema = same_scheme + self.cache_type = cache_type + self.cache_options = cache_options + self.client_kwargs = client_kwargs or {} + self.encoded = encoded + self.kwargs = storage_options + + session = requests.Session(**(client_kwargs or {})) + weakref.finalize(self, session.close) + self.session = session + + # Clean caching-related parameters from `storage_options` + # before propagating them as `request_options` through `self.kwargs`. + # TODO: Maybe rename `self.kwargs` to `self.request_options` to make + # it clearer. + request_options = copy(storage_options) + self.use_listings_cache = request_options.pop("use_listings_cache", False) + request_options.pop("listings_expiry_time", None) + request_options.pop("max_paths", None) + request_options.pop("skip_instance_cache", None) + self.kwargs = request_options + + @property + def fsid(self): + return "http" + + def encode_url(self, url): + return yarl.URL(url, encoded=self.encoded) + + @classmethod + def _strip_protocol(cls, path): + """For HTTP, we always want to keep the full URL""" + return path + + @classmethod + def _parent(cls, path): + # override, since _strip_protocol is different for URLs + par = super()._parent(path) + if len(par) > 7: # "http://..." + return par + return "" + + def _ls_real(self, url, detail=True, **kwargs): + # ignoring URL-encoded arguments + kw = self.kwargs.copy() + kw.update(kwargs) + logger.debug(url) + r = self.session.get(self.encode_url(url), **self.kwargs) + self._raise_not_found_for_status(r, url) + text = r.text() + if self.simple_links: + links = ex2.findall(text) + [u[2] for u in ex.findall(text)] + else: + links = [u[2] for u in ex.findall(text)] + out = set() + parts = urlparse(url) + for l in links: + if isinstance(l, tuple): + l = l[1] + if l.startswith("/") and len(l) > 1: + # absolute URL on this server + l = parts.scheme + "://" + parts.netloc + l + if l.startswith("http"): + if self.same_schema and l.startswith(url.rstrip("/") + "/"): + out.add(l) + elif l.replace("https", "http").startswith( + url.replace("https", "http").rstrip("/") + "/" + ): + # allowed to cross http <-> https + out.add(l) + else: + if l not in ["..", "../"]: + # Ignore FTP-like "parent" + out.add("/".join([url.rstrip("/"), l.lstrip("/")])) + if not out and url.endswith("/"): + out = self._ls_real(url.rstrip("/"), detail=False) + if detail: + return [ + { + "name": u, + "size": None, + "type": "directory" if u.endswith("/") else "file", + } + for u in out + ] + else: + return list(sorted(out)) + return out + + def ls(self, url, detail=True, **kwargs): + + if self.use_listings_cache and url in self.dircache: + out = self.dircache[url] + else: + out = self._ls_real(url, detail=detail, **kwargs) + self.dircache[url] = out + return out + + def _raise_not_found_for_status(self, response, url): + """ + Raises FileNotFoundError for 404s, otherwise uses raise_for_status. + """ + if response.status_code == 404: + raise FileNotFoundError(url) + response.raise_for_status() + + def cat_file(self, url, start=None, end=None, **kwargs): + kw = self.kwargs.copy() + kw.update(kwargs) + logger.debug(url) + + if start is not None or end is not None: + if start == end: + return b"" + headers = kw.pop("headers", {}).copy() + + headers["Range"] = self._process_limits(url, start, end) + kw["headers"] = headers + r = self.session.get(self.encode_url(url), **kw) + self._raise_not_found_for_status(r, url) + return r.content + + def get_file( + self, rpath, lpath, chunk_size=5 * 2**20, callback=_DEFAULT_CALLBACK, **kwargs + ): + kw = self.kwargs.copy() + kw.update(kwargs) + logger.debug(rpath) + r = self.session.get(self.encode_url(rpath), **kw) + try: + size = int(r.headers["content-length"]) + except (ValueError, KeyError): + size = None + + callback.set_size(size) + self._raise_not_found_for_status(r, rpath) + if not isfilelike(lpath): + lpath = open(lpath, "wb") + chunk = True + while chunk: + r.raw.decode_content = True + chunk = r.raw.read(chunk_size) + lpath.write(chunk) + callback.relative_update(len(chunk)) + + def put_file( + self, + lpath, + rpath, + chunk_size=5 * 2**20, + callback=_DEFAULT_CALLBACK, + method="post", + **kwargs, + ): + def gen_chunks(): + # Support passing arbitrary file-like objects + # and use them instead of streams. + if isinstance(lpath, io.IOBase): + context = nullcontext(lpath) + use_seek = False # might not support seeking + else: + context = open(lpath, "rb") + use_seek = True + + with context as f: + if use_seek: + callback.set_size(f.seek(0, 2)) + f.seek(0) + else: + callback.set_size(getattr(f, "size", None)) + + chunk = f.read(chunk_size) + while chunk: + yield chunk + callback.relative_update(len(chunk)) + chunk = f.read(chunk_size) + + kw = self.kwargs.copy() + kw.update(kwargs) + + method = method.lower() + if method not in ("post", "put"): + raise ValueError( + f"method has to be either 'post' or 'put', not: {method!r}" + ) + + meth = getattr(self.ession, method) + resp = meth(rpath, data=gen_chunks(), **kw) + self._raise_not_found_for_status(resp, rpath) + + def exists(self, path, **kwargs): + kw = self.kwargs.copy() + kw.update(kwargs) + try: + logger.debug(path) + session = self.set_session() + r = session.get(self.encode_url(path), **kw) + return r.status_code < 400 + except requests.HTTPError: + return False + + def isfile(self, path, **kwargs): + return self.exists(path, **kwargs) + + def _open( + self, + path, + mode="rb", + block_size=None, + autocommit=None, # XXX: This differs from the base class. + cache_type=None, + cache_options=None, + size=None, + **kwargs, + ): + """Make a file-like object + + Parameters + ---------- + path: str + Full URL with protocol + mode: string + must be "rb" + block_size: int or None + Bytes to download in one request; use instance value if None. If + zero, will return a streaming Requests file-like instance. + kwargs: key-value + Any other parameters, passed to requests calls + """ + if mode != "rb": + raise NotImplementedError + block_size = block_size if block_size is not None else self.block_size + kw = self.kwargs.copy() + kw.update(kwargs) + size = size or self.info(path, **kwargs)["size"] + if block_size and size: + return HTTPFile( + self, + path, + session=self.session, + block_size=block_size, + mode=mode, + size=size, + cache_type=cache_type or self.cache_type, + cache_options=cache_options or self.cache_options, + **kw, + ) + else: + return HTTPStreamFile( + self, + path, + mode=mode, + session=self.session, + **kw, + ) + + def ukey(self, url): + """Unique identifier; assume HTTP files are static, unchanging""" + return tokenize(url, self.kwargs, self.protocol) + + def info(self, url, **kwargs): + """Get info of URL + + Tries to access location via HEAD, and then GET methods, but does + not fetch the data. + + It is possible that the server does not supply any size information, in + which case size will be given as None (and certain operations on the + corresponding file will not work). + """ + info = {} + for policy in ["head", "get"]: + try: + info.update( + _file_info( + self.encode_url(url), + size_policy=policy, + session=self.session, + **self.kwargs, + **kwargs, + ) + ) + if info.get("size") is not None: + break + except Exception as exc: + if policy == "get": + # If get failed, then raise a FileNotFoundError + raise FileNotFoundError(url) from exc + logger.debug(str(exc)) + + return {"name": url, "size": None, **info, "type": "file"} + + def glob(self, path, **kwargs): + """ + Find files by glob-matching. + + This implementation is idntical to the one in AbstractFileSystem, + but "?" is not considered as a character for globbing, because it is + so common in URLs, often identifying the "query" part. + """ + import re + + ends = path.endswith("/") + path = self._strip_protocol(path) + indstar = path.find("*") if path.find("*") >= 0 else len(path) + indbrace = path.find("[") if path.find("[") >= 0 else len(path) + + ind = min(indstar, indbrace) + + detail = kwargs.pop("detail", False) + + if not has_magic(path): + root = path + depth = 1 + if ends: + path += "/*" + elif self.exists(path): + if not detail: + return [path] + else: + return {path: self.info(path)} + else: + if not detail: + return [] # glob of non-existent returns empty + else: + return {} + elif "/" in path[:ind]: + ind2 = path[:ind].rindex("/") + root = path[: ind2 + 1] + depth = None if "**" in path else path[ind2 + 1 :].count("/") + 1 + else: + root = "" + depth = None if "**" in path else path[ind + 1 :].count("/") + 1 + + allpaths = self.find(root, maxdepth=depth, withdirs=True, detail=True, **kwargs) + # Escape characters special to python regex, leaving our supported + # special characters in place. + # See https://www.gnu.org/software/bash/manual/html_node/Pattern-Matching.html + # for shell globbing details. + pattern = ( + "^" + + ( + path.replace("\\", r"\\") + .replace(".", r"\.") + .replace("+", r"\+") + .replace("//", "/") + .replace("(", r"\(") + .replace(")", r"\)") + .replace("|", r"\|") + .replace("^", r"\^") + .replace("$", r"\$") + .replace("{", r"\{") + .replace("}", r"\}") + .rstrip("/") + ) + + "$" + ) + pattern = re.sub("[*]{2}", "=PLACEHOLDER=", pattern) + pattern = re.sub("[*]", "[^/]*", pattern) + pattern = re.compile(pattern.replace("=PLACEHOLDER=", ".*")) + out = { + p: allpaths[p] + for p in sorted(allpaths) + if pattern.match(p.replace("//", "/").rstrip("/")) + } + if detail: + return out + else: + return list(out) + + def isdir(self, path): + # override, since all URLs are (also) files + try: + return bool(self._ls(path)) + except (FileNotFoundError, ValueError): + return False + + +class HTTPFile(AbstractBufferedFile): + """ + A file-like object pointing to a remove HTTP(S) resource + + Supports only reading, with read-ahead of a predermined block-size. + + In the case that the server does not supply the filesize, only reading of + the complete file in one go is supported. + + Parameters + ---------- + url: str + Full URL of the remote resource, including the protocol + session: requests.Session or None + All calls will be made within this session, to avoid restarting + connections where the server allows this + block_size: int or None + The amount of read-ahead to do, in bytes. Default is 5MB, or the value + configured for the FileSystem creating this file + size: None or int + If given, this is the size of the file in bytes, and we don't attempt + to call the server to find the value. + kwargs: all other key-values are passed to requests calls. + """ + + def __init__( + self, + fs, + url, + session=None, + block_size=None, + mode="rb", + cache_type="bytes", + cache_options=None, + size=None, + **kwargs, + ): + if mode != "rb": + raise NotImplementedError("File mode not supported") + self.url = url + self.session = session + self.details = {"name": url, "size": size, "type": "file"} + super().__init__( + fs=fs, + path=url, + mode=mode, + block_size=block_size, + cache_type=cache_type, + cache_options=cache_options, + **kwargs, + ) + + def read(self, length=-1): + """Read bytes from file + + Parameters + ---------- + length: int + Read up to this many bytes. If negative, read all content to end of + file. If the server has not supplied the filesize, attempting to + read only part of the data will raise a ValueError. + """ + if ( + (length < 0 and self.loc == 0) # explicit read all + # but not when the size is known and fits into a block anyways + and not (self.size is not None and self.size <= self.blocksize) + ): + self._fetch_all() + if self.size is None: + if length < 0: + self._fetch_all() + else: + length = min(self.size - self.loc, length) + return super().read(length) + + def _fetch_all(self): + """Read whole file in one shot, without caching + + This is only called when position is still at zero, + and read() is called without a byte-count. + """ + logger.debug(f"Fetch all for {self}") + if not isinstance(self.cache, AllBytes): + r = self.session.get(self.fs.encode_url(self.url), **self.kwargs) + r.raise_for_status() + out = r.content + self.cache = AllBytes(size=len(out), fetcher=None, blocksize=None, data=out) + self.size = len(out) + + def _parse_content_range(self, headers): + """Parse the Content-Range header""" + s = headers.get("Content-Range", "") + m = re.match(r"bytes (\d+-\d+|\*)/(\d+|\*)", s) + if not m: + return None, None, None + + if m[1] == "*": + start = end = None + else: + start, end = [int(x) for x in m[1].split("-")] + total = None if m[2] == "*" else int(m[2]) + return start, end, total + + def _fetch_range(self, start, end): + """Download a block of data + + The expectation is that the server returns only the requested bytes, + with HTTP code 206. If this is not the case, we first check the headers, + and then stream the output - if the data size is bigger than we + requested, an exception is raised. + """ + logger.debug(f"Fetch range for {self}: {start}-{end}") + kwargs = self.kwargs.copy() + headers = kwargs.pop("headers", {}).copy() + headers["Range"] = "bytes=%i-%i" % (start, end - 1) + logger.debug(str(self.url) + " : " + headers["Range"]) + r = self.session.get(self.fs.encode_url(self.url), headers=headers, **kwargs) + if r.status_code == 416: + # range request outside file + return b"" + r.raise_for_status() + + # If the server has handled the range request, it should reply + # with status 206 (partial content). But we'll guess that a suitable + # Content-Range header or a Content-Length no more than the + # requested range also mean we have got the desired range. + response_is_range = ( + r.status_code == 206 + or self._parse_content_range(r.headers)[0] == start + or int(r.headers.get("Content-Length", end + 1)) <= end - start + ) + + if response_is_range: + # partial content, as expected + out = r.content + elif start > 0: + raise ValueError( + "The HTTP server doesn't appear to support range requests. " + "Only reading this file from the beginning is supported. " + "Open with block_size=0 for a streaming file interface." + ) + else: + # Response is not a range, but we want the start of the file, + # so we can read the required amount anyway. + cl = 0 + out = [] + while True: + r.raw.decode_content = True + chunk = r.raw.read(2**20) + # data size unknown, let's read until we have enough + if chunk: + out.append(chunk) + cl += len(chunk) + if cl > end - start: + break + else: + break + r.raw.close() + out = b"".join(out)[: end - start] + return out + + +magic_check = re.compile("([*[])") + + +def has_magic(s): + match = magic_check.search(s) + return match is not None + + +class HTTPStreamFile(AbstractBufferedFile): + def __init__(self, fs, url, mode="rb", session=None, **kwargs): + self.url = url + self.session = session + if mode != "rb": + raise ValueError + self.details = {"name": url, "size": None} + super().__init__(fs=fs, path=url, mode=mode, cache_type="readahead", **kwargs) + + r = self.session.get(self.fs.encode_url(url), stream=True, **kwargs) + r.raw.decode_content = True + self.fs._raise_not_found_for_status(r, url) + + self.r = r + + def seek(self, *args, **kwargs): + raise ValueError("Cannot seek streaming HTTP file") + + def read(self, num=-1): + if num < 0: + return self.content + bufs = [] + leng = 0 + while not self.r.raw.closed and leng < num: + out = self.r.raw.read(num) + if out: + bufs.append(out) + leng += len(out) + self.loc += leng + return b"".join(bufs) + + def close(self): + self.r.close() + + +def get_range(session, url, start, end, **kwargs): + # explicit get a range when we know it must be safe + kwargs = kwargs.copy() + headers = kwargs.pop("headers", {}).copy() + headers["Range"] = "bytes=%i-%i" % (start, end - 1) + r = session.get(url, headers=headers, **kwargs) + r.raise_for_status() + return r.content + + +def _file_info(url, session, size_policy="head", **kwargs): + """Call HEAD on the server to get details about the file (size/checksum etc.) + + Default operation is to explicitly allow redirects and use encoding + 'identity' (no compression) to get the true size of the target. + """ + logger.debug("Retrieve file size for %s" % url) + kwargs = kwargs.copy() + ar = kwargs.pop("allow_redirects", True) + head = kwargs.get("headers", {}).copy() + head["Accept-Encoding"] = "identity" + kwargs["headers"] = head + + info = {} + if size_policy == "head": + r = session.head(url, allow_redirects=ar, **kwargs) + elif size_policy == "get": + r = session.get(url, allow_redirects=ar, **kwargs) + else: + raise TypeError('size_policy must be "head" or "get", got %s' "" % size_policy) + r.raise_for_status() + + # TODO: + # recognise lack of 'Accept-Ranges', + # or 'Accept-Ranges': 'none' (not 'bytes') + # to mean streaming only, no random access => return None + if "Content-Length" in r.headers: + info["size"] = int(r.headers["Content-Length"]) + elif "Content-Range" in r.headers: + info["size"] = int(r.headers["Content-Range"].split("/")[1]) + + for checksum_field in ["ETag", "Content-MD5", "Digest"]: + if r.headers.get(checksum_field): + info[checksum_field] = r.headers[checksum_field] + + return info + + +def _file_size(url, session, *args, **kwargs): + info = _file_info(url, session=session, *args, **kwargs) + return info.get("size") From e2562d7e714e63152caba4565c8b50bcf1c7c9fb Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Tue, 7 Mar 2023 13:07:47 -0500 Subject: [PATCH 2/8] Add pyodide shim to sync http --- fsspec/implementations/http_sync.py | 181 +++++++++++++++++++++++++--- 1 file changed, 167 insertions(+), 14 deletions(-) diff --git a/fsspec/implementations/http_sync.py b/fsspec/implementations/http_sync.py index 8ed37c972..958a9ff1c 100644 --- a/fsspec/implementations/http_sync.py +++ b/fsspec/implementations/http_sync.py @@ -3,14 +3,17 @@ import io import logging import re -import weakref +import urllib.error +import urllib.parse from copy import copy +from json import dumps, loads from urllib.parse import urlparse import requests import yarl from fsspec.callbacks import _DEFAULT_CALLBACK +from fsspec.registry import register_implementation from fsspec.spec import AbstractBufferedFile, AbstractFileSystem from fsspec.utils import DEFAULT_BLOCK_SIZE, isfilelike, nullcontext, tokenize @@ -22,6 +25,154 @@ logger = logging.getLogger("fsspec.http") +class JsHttpException(urllib.error.HTTPError): + ... + + +class ResponseProxy: + """Looks like a requests response""" + + def __init__(self, req): + self.request = req + self._data = None + self._headers = None + + @property + def raw(self): + if self._data is None: + self._data = str(self.request.response).encode() + return self._data + + @property + def headers(self): + if self._headers is None: + self._headers = dict( + [ + _.split(": ") + for _ in self.request.getAllResponseHeaders().strip().split("\r\n") + ] + ) + return self._headers + + @property + def status_code(self): + return int(self.request.status) + + def raise_for_status(self): + if not self.ok: + raise JsHttpException( + self.url, self.status_code, self.reason, self.headers, None + ) + + @property + def reason(self): + return self.request.statusText + + @property + def ok(self): + return self.status_code < 400 + + @property + def url(self): + return self.request.response.responseURL + + @property + def text(self): + # TODO: encoding from headers + return self.raw.decode() + + @property + def content(self): + return self.raw + + @property + def json(self): + return loads(self.text) + + +class RequestsSessionShim: + def __init__(self): + self.headers = {} + + def request( + self, + method, + url, + params=None, + data=None, + headers=None, + cookies=None, + files=None, + auth=None, + timeout=None, + allow_redirects=None, + proxies=None, + hooks=None, + stream=None, + verify=None, + cert=None, + json=None, + ): + from js import Blob, XMLHttpRequest + + logger.debug("JS request: %s %s", method, url) + + if ( + cert + or verify + or proxies + or files + or cookies + or hooks + or stream + or allow_redirects + ): + raise NotImplementedError + if data and json: + raise ValueError("Use json= or data=, not both") + req = XMLHttpRequest.new() + extra = auth if auth else () + if params: + url = f"{url}?{urllib.parse.urlencode(params)}" + req.open(method, url, False, *extra) + if timeout: + req.timeout = timeout + if headers: + for k, v in headers.items(): + req.setRequestHeader(k, v) + + req.setRequestHeader("Accept", "application/octet-stream") + if json: + blob = Blob.new([dumps(data)], {type: "application/json"}) + req.send(blob) + elif data: + if isinstance(data, io.IOBase): + data = data.read() + blob = Blob.new([data], {type: "application/octet-stream"}) + req.send(blob) + else: + req.send(None) + return ResponseProxy(req) + + def get(self, url, **kwargs): + return self.request("GET", url, **kwargs) + + def head(self, url, **kwargs): + return self.request("HEAD", url, **kwargs) + + def post(self, url, **kwargs): + return self.request("POST}", url, **kwargs) + + def put(self, url, **kwargs): + return self.request("PUT", url, **kwargs) + + def patch(self, url, **kwargs): + return self.request("PATCH", url, **kwargs) + + def delete(self, url, **kwargs): + return self.request("DELETE", url, **kwargs) + + class HTTPFileSystem(AbstractFileSystem): """ Simple File-System for fetching data via HTTP(S) @@ -39,7 +190,7 @@ def __init__( simple_links=True, block_size=None, same_scheme=True, - cache_type="bytes", + cache_type="readahead", cache_options=None, client_kwargs=None, encoded=False, @@ -77,14 +228,17 @@ def __init__( self.encoded = encoded self.kwargs = storage_options - session = requests.Session(**(client_kwargs or {})) - weakref.finalize(self, session.close) - self.session = session + try: + import js # noqa: F401 + + logger.debug("Starting JS session") + self.session = RequestsSessionShim() + self.js = True + except Exception as e: + logger.debug("Starting cpython session because of: %s", e) + self.session = requests.Session(**(client_kwargs or {})) + self.js = False - # Clean caching-related parameters from `storage_options` - # before propagating them as `request_options` through `self.kwargs`. - # TODO: Maybe rename `self.kwargs` to `self.request_options` to make - # it clearer. request_options = copy(storage_options) self.use_listings_cache = request_options.pop("use_listings_cache", False) request_options.pop("listings_expiry_time", None) @@ -157,7 +311,6 @@ def _ls_real(self, url, detail=True, **kwargs): ] else: return list(sorted(out)) - return out def ls(self, url, detail=True, **kwargs): @@ -256,7 +409,7 @@ def gen_chunks(): f"method has to be either 'post' or 'put', not: {method!r}" ) - meth = getattr(self.ession, method) + meth = getattr(self.session, method) resp = meth(rpath, data=gen_chunks(), **kw) self._raise_not_found_for_status(resp, rpath) @@ -701,6 +854,6 @@ def _file_info(url, session, size_policy="head", **kwargs): return info -def _file_size(url, session, *args, **kwargs): - info = _file_info(url, session=session, *args, **kwargs) - return info.get("size") +# importing this is enough to register it +register_implementation("http", HTTPFileSystem, clobber=True) +register_implementation("https", HTTPFileSystem, clobber=True) From 863d6091f046d7942306fc61750cd7a5ee28a031 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Wed, 8 Mar 2023 09:57:03 -0500 Subject: [PATCH 3/8] Put in JS stuff --- fsspec/implementations/http_sync.py | 51 +++++++++++++++++++---------- 1 file changed, 33 insertions(+), 18 deletions(-) diff --git a/fsspec/implementations/http_sync.py b/fsspec/implementations/http_sync.py index 958a9ff1c..684550117 100644 --- a/fsspec/implementations/http_sync.py +++ b/fsspec/implementations/http_sync.py @@ -9,8 +9,10 @@ from json import dumps, loads from urllib.parse import urlparse -import requests -import yarl +try: + import yarl +except (ImportError, ModuleNotFoundError, OSError): + yarl = False from fsspec.callbacks import _DEFAULT_CALLBACK from fsspec.registry import register_implementation @@ -29,20 +31,33 @@ class JsHttpException(urllib.error.HTTPError): ... +class StreamIO(io.BytesIO): + # fake class, so you can set attributes on it + # will eventually actually stream + ... + + class ResponseProxy: """Looks like a requests response""" - def __init__(self, req): + def __init__(self, req, stream=False): self.request = req + self.stream = stream self._data = None self._headers = None @property def raw(self): if self._data is None: - self._data = str(self.request.response).encode() + if self.stream: + self._data = StreamIO(str(self.request.response).encode()) + else: + self._data = str(self.request.response).encode() return self._data + def close(self): + del self._data + @property def headers(self): if self._headers is None: @@ -83,6 +98,7 @@ def text(self): @property def content(self): + self.stream = False return self.raw @property @@ -117,16 +133,7 @@ def request( logger.debug("JS request: %s %s", method, url) - if ( - cert - or verify - or proxies - or files - or cookies - or hooks - or stream - or allow_redirects - ): + if cert or verify or proxies or files or cookies or hooks: raise NotImplementedError if data and json: raise ValueError("Use json= or data=, not both") @@ -142,6 +149,8 @@ def request( req.setRequestHeader(k, v) req.setRequestHeader("Accept", "application/octet-stream") + # TODO: can only do this in a worker + # req.responseType = "arraybuffer" if json: blob = Blob.new([dumps(data)], {type: "application/json"}) req.send(blob) @@ -152,7 +161,7 @@ def request( req.send(blob) else: req.send(None) - return ResponseProxy(req) + return ResponseProxy(req, stream=stream) def get(self, url, **kwargs): return self.request("GET", url, **kwargs) @@ -235,6 +244,8 @@ def __init__( self.session = RequestsSessionShim() self.js = True except Exception as e: + import requests + logger.debug("Starting cpython session because of: %s", e) self.session = requests.Session(**(client_kwargs or {})) self.js = False @@ -251,7 +262,9 @@ def fsid(self): return "http" def encode_url(self, url): - return yarl.URL(url, encoded=self.encoded) + if yarl: + return yarl.URL(url, encoded=self.encoded) + return url @classmethod def _strip_protocol(cls, path): @@ -421,7 +434,7 @@ def exists(self, path, **kwargs): session = self.set_session() r = session.get(self.encode_url(path), **kw) return r.status_code < 400 - except requests.HTTPError: + except Exception: return False def isfile(self, path, **kwargs): @@ -791,13 +804,15 @@ def seek(self, *args, **kwargs): def read(self, num=-1): if num < 0: - return self.content + return self.r.content bufs = [] leng = 0 while not self.r.raw.closed and leng < num: out = self.r.raw.read(num) if out: bufs.append(out) + else: + break leng += len(out) self.loc += leng return b"".join(bufs) From c2b95ab650836efe04ab23c87767558ca27bd510 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Wed, 8 Mar 2023 12:56:59 -0500 Subject: [PATCH 4/8] binary for JS --- fsspec/implementations/http_sync.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/fsspec/implementations/http_sync.py b/fsspec/implementations/http_sync.py index 684550117..d183050eb 100644 --- a/fsspec/implementations/http_sync.py +++ b/fsspec/implementations/http_sync.py @@ -49,10 +49,11 @@ def __init__(self, req, stream=False): @property def raw(self): if self._data is None: + b = self.request.response.to_bytes() if self.stream: - self._data = StreamIO(str(self.request.response).encode()) + self._data = StreamIO(b) else: - self._data = str(self.request.response).encode() + self._data = b return self._data def close(self): @@ -129,8 +130,12 @@ def request( cert=None, json=None, ): + import js from js import Blob, XMLHttpRequest + if hasattr(js, "document"): + raise RuntimeError("Filesystem can only be run from a worker, not main") + logger.debug("JS request: %s %s", method, url) if cert or verify or proxies or files or cookies or hooks: @@ -149,8 +154,7 @@ def request( req.setRequestHeader(k, v) req.setRequestHeader("Accept", "application/octet-stream") - # TODO: can only do this in a worker - # req.responseType = "arraybuffer" + req.responseType = "arraybuffer" if json: blob = Blob.new([dumps(data)], {type: "application/json"}) req.send(blob) @@ -803,11 +807,9 @@ def seek(self, *args, **kwargs): raise ValueError("Cannot seek streaming HTTP file") def read(self, num=-1): - if num < 0: - return self.r.content bufs = [] leng = 0 - while not self.r.raw.closed and leng < num: + while not self.r.raw.closed and (leng < num or num < 0): out = self.r.raw.read(num) if out: bufs.append(out) @@ -841,7 +843,8 @@ def _file_info(url, session, size_policy="head", **kwargs): kwargs = kwargs.copy() ar = kwargs.pop("allow_redirects", True) head = kwargs.get("headers", {}).copy() - head["Accept-Encoding"] = "identity" + # TODO: not allowed in JS + # head["Accept-Encoding"] = "identity" kwargs["headers"] = head info = {} From b49e34538185c451ecb9b87a46408875f543f789 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Sat, 11 Mar 2023 15:48:31 -0500 Subject: [PATCH 5/8] add sync tests Mostly copied from async Question: should async FS use sync/requests stuff for things that don't multiplex? We would be able to share code. --- fsspec/implementations/http_sync.py | 141 ++++--- fsspec/implementations/tests/test_http.py | 2 +- .../implementations/tests/test_http_sync.py | 350 ++++++++++++++++++ 3 files changed, 449 insertions(+), 44 deletions(-) create mode 100644 fsspec/implementations/tests/test_http_sync.py diff --git a/fsspec/implementations/http_sync.py b/fsspec/implementations/http_sync.py index d183050eb..f9b1cf93c 100644 --- a/fsspec/implementations/http_sync.py +++ b/fsspec/implementations/http_sync.py @@ -57,7 +57,8 @@ def raw(self): return self._data def close(self): - del self._data + if hasattr(self, "_data"): + del self._data @property def headers(self): @@ -80,6 +81,14 @@ def raise_for_status(self): self.url, self.status_code, self.reason, self.headers, None ) + def iter_content(self, chunksize, *_, **__): + while True: + out = self.raw.read(chunksize) + if out: + yield out + else: + break + @property def reason(self): return self.request.statusText @@ -95,7 +104,7 @@ def url(self): @property def text(self): # TODO: encoding from headers - return self.raw.decode() + return self.content.decode() @property def content(self): @@ -190,10 +199,13 @@ class HTTPFileSystem(AbstractFileSystem): """ Simple File-System for fetching data via HTTP(S) - ``ls()`` is implemented by loading the parent page and doing a regex - match on the result. If simple_link=True, anything of the form - "http(s)://server.com/stuff?thing=other"; otherwise only links within - HTML href tags will be used. + This is the BLOCKING version of the normal HTTPFileSystem. It uses + requests in normal python and the JS runtime in pyodide. + + Note that + for pyodide, it only runs in a webworker, because we require binary + blocking fetches. Also, all requests must pass the browser's CORS + checks, which requires the server to send the right headers. """ sep = "/" @@ -290,7 +302,7 @@ def _ls_real(self, url, detail=True, **kwargs): logger.debug(url) r = self.session.get(self.encode_url(url), **self.kwargs) self._raise_not_found_for_status(r, url) - text = r.text() + text = r.text if self.simple_links: links = ex2.findall(text) + [u[2] for u in ex.findall(text)] else: @@ -370,18 +382,18 @@ def get_file( logger.debug(rpath) r = self.session.get(self.encode_url(rpath), **kw) try: - size = int(r.headers["content-length"]) - except (ValueError, KeyError): + size = int( + r.headers.get("content-length", None) + or r.headers.get("Content-Length", None) + ) + except (ValueError, KeyError, TypeError): size = None callback.set_size(size) self._raise_not_found_for_status(r, rpath) if not isfilelike(lpath): lpath = open(lpath, "wb") - chunk = True - while chunk: - r.raw.decode_content = True - chunk = r.raw.read(chunk_size) + for chunk in r.iter_content(chunk_size, decode_unicode=False): lpath.write(chunk) callback.relative_update(len(chunk)) @@ -430,13 +442,38 @@ def gen_chunks(): resp = meth(rpath, data=gen_chunks(), **kw) self._raise_not_found_for_status(resp, rpath) + def _process_limits(self, url, start, end): + """Helper for "Range"-based _cat_file""" + size = None + suff = False + if start is not None and start < 0: + # if start is negative and end None, end is the "suffix length" + if end is None: + end = -start + start = "" + suff = True + else: + size = size or self.info(url)["size"] + start = size + start + elif start is None: + start = 0 + if not suff: + if end is not None and end < 0: + if start is not None: + size = size or self.info(url)["size"] + end = size + end + elif end is None: + end = "" + if isinstance(end, int): + end -= 1 # bytes range is inclusive + return "bytes=%s-%s" % (start, end) + def exists(self, path, **kwargs): kw = self.kwargs.copy() kw.update(kwargs) try: logger.debug(path) - session = self.set_session() - r = session.get(self.encode_url(path), **kw) + r = self.session.get(self.encode_url(path), **kw) return r.status_code < 400 except Exception: return False @@ -613,7 +650,7 @@ def glob(self, path, **kwargs): def isdir(self, path): # override, since all URLs are (also) files try: - return bool(self._ls(path)) + return bool(self.ls(path)) except (FileNotFoundError, ValueError): return False @@ -744,10 +781,11 @@ def _fetch_range(self, start, end): # with status 206 (partial content). But we'll guess that a suitable # Content-Range header or a Content-Length no more than the # requested range also mean we have got the desired range. + cl = r.headers.get("Content-Length", r.headers.get("content-length", end + 1)) response_is_range = ( r.status_code == 206 or self._parse_content_range(r.headers)[0] == start - or int(r.headers.get("Content-Length", end + 1)) <= end - start + or int(cl) <= end - start ) if response_is_range: @@ -764,18 +802,9 @@ def _fetch_range(self, start, end): # so we can read the required amount anyway. cl = 0 out = [] - while True: - r.raw.decode_content = True - chunk = r.raw.read(2**20) - # data size unknown, let's read until we have enough - if chunk: - out.append(chunk) - cl += len(chunk) - if cl > end - start: - break - else: - break - r.raw.close() + for chunk in r.iter_content(2**20, False): + out.append(chunk) + cl += len(chunk) out = b"".join(out)[: end - start] return out @@ -798,8 +827,9 @@ def __init__(self, fs, url, mode="rb", session=None, **kwargs): super().__init__(fs=fs, path=url, mode=mode, cache_type="readahead", **kwargs) r = self.session.get(self.fs.encode_url(url), stream=True, **kwargs) - r.raw.decode_content = True self.fs._raise_not_found_for_status(r, url) + self.it = r.iter_content(1024, False) + self.leftover = b"" self.r = r @@ -807,20 +837,30 @@ def seek(self, *args, **kwargs): raise ValueError("Cannot seek streaming HTTP file") def read(self, num=-1): - bufs = [] - leng = 0 - while not self.r.raw.closed and (leng < num or num < 0): - out = self.r.raw.read(num) + bufs = [self.leftover] + leng = len(self.leftover) + while leng < num or num < 0: + try: + out = self.it.__next__() + except StopIteration: + break if out: bufs.append(out) else: break leng += len(out) - self.loc += leng - return b"".join(bufs) + out = b"".join(bufs) + if num >= 0: + self.leftover = out[num:] + out = out[:num] + else: + self.leftover = b"" + self.loc += len(out) + return out def close(self): self.r.close() + self.closed = True def get_range(session, url, start, end, **kwargs): @@ -854,16 +894,20 @@ def _file_info(url, session, size_policy="head", **kwargs): r = session.get(url, allow_redirects=ar, **kwargs) else: raise TypeError('size_policy must be "head" or "get", got %s' "" % size_policy) - r.raise_for_status() + r.raise_for_status() - # TODO: - # recognise lack of 'Accept-Ranges', - # or 'Accept-Ranges': 'none' (not 'bytes') - # to mean streaming only, no random access => return None + # TODO: + # recognise lack of 'Accept-Ranges', + # or 'Accept-Ranges': 'none' (not 'bytes') + # to mean streaming only, no random access => return None if "Content-Length" in r.headers: info["size"] = int(r.headers["Content-Length"]) elif "Content-Range" in r.headers: info["size"] = int(r.headers["Content-Range"].split("/")[1]) + elif "content-length" in r.headers: + info["size"] = int(r.headers["content-length"]) + elif "content-range" in r.headers: + info["size"] = int(r.headers["content-range"].split("/")[1]) for checksum_field in ["ETag", "Content-MD5", "Digest"]: if r.headers.get(checksum_field): @@ -873,5 +917,16 @@ def _file_info(url, session, size_policy="head", **kwargs): # importing this is enough to register it -register_implementation("http", HTTPFileSystem, clobber=True) -register_implementation("https", HTTPFileSystem, clobber=True) +def register(): + register_implementation("http", HTTPFileSystem, clobber=True) + register_implementation("https", HTTPFileSystem, clobber=True) + + +register() + + +def unregister(): + from fsspec.implementations.http import HTTPFileSystem + + register_implementation("http", HTTPFileSystem, clobber=True) + register_implementation("https", HTTPFileSystem, clobber=True) diff --git a/fsspec/implementations/tests/test_http.py b/fsspec/implementations/tests/test_http.py index 3b1c3ff86..234fedc9d 100644 --- a/fsspec/implementations/tests/test_http.py +++ b/fsspec/implementations/tests/test_http.py @@ -227,7 +227,7 @@ def test_random_access(server, headers): @pytest.mark.parametrize( "headers", [ - {"ignore_range": "true", "head_ok": "true", "head_give_length": "true"}, + {"ignore_range": "true", "head_ok": "true", "give_length": "true"}, {"ignore_range": "true", "give_length": "true"}, {"ignore_range": "true", "give_range": "true"}, ], diff --git a/fsspec/implementations/tests/test_http_sync.py b/fsspec/implementations/tests/test_http_sync.py new file mode 100644 index 000000000..4dd357ce8 --- /dev/null +++ b/fsspec/implementations/tests/test_http_sync.py @@ -0,0 +1,350 @@ +import io +import json +import os +import time + +import pytest + +import fsspec.utils +from fsspec.tests.conftest import data, reset_files, server, win # noqa: F401 + + +@pytest.fixture() +def sync(): + from fsspec.implementations.http_sync import register, unregister + + register() + yield + unregister() + + +def test_list(server, sync): + h = fsspec.filesystem("http") + out = h.glob(server + "/index/*") + assert out == [server + "/index/realfile"] + + +def test_list_invalid_args(server, sync): + with pytest.raises(TypeError): + h = fsspec.filesystem("http", use_foobar=True) + h.glob(server + "/index/*") + + +def test_list_cache(server, sync): + h = fsspec.filesystem("http", use_listings_cache=True) + out = h.glob(server + "/index/*") + assert out == [server + "/index/realfile"] + + +def test_list_cache_with_expiry_time_cached(server, sync): + h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=30) + + # First, the directory cache is not initialized. + assert not h.dircache + + # By querying the filesystem with "use_listings_cache=True", + # the cache will automatically get populated. + out = h.glob(server + "/index/*") + assert out == [server + "/index/realfile"] + + # Verify cache content. + assert len(h.dircache) == 1 + + out = h.glob(server + "/index/*") + assert out == [server + "/index/realfile"] + + +def test_list_cache_with_expiry_time_purged(server, sync): + h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=0.3) + + # First, the directory cache is not initialized. + assert not h.dircache + + # By querying the filesystem with "use_listings_cache=True", + # the cache will automatically get populated. + out = h.glob(server + "/index/*") + assert out == [server + "/index/realfile"] + assert len(h.dircache) == 1 + + # Verify cache content. + assert server + "/index/" in h.dircache + assert len(h.dircache.get(server + "/index/")) == 1 + + # Wait beyond the TTL / cache expiry time. + time.sleep(0.31) + + # Verify that the cache item should have been purged. + cached_items = h.dircache.get(server + "/index/") + assert cached_items is None + + # Verify that after clearing the item from the cache, + # it can get populated again. + out = h.glob(server + "/index/*") + assert out == [server + "/index/realfile"] + cached_items = h.dircache.get(server + "/index/") + assert len(cached_items) == 1 + + +def test_list_cache_reuse(server, sync): + h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=5) + + # First, the directory cache is not initialized. + assert not h.dircache + + # By querying the filesystem with "use_listings_cache=True", + # the cache will automatically get populated. + out = h.glob(server + "/index/*") + assert out == [server + "/index/realfile"] + + # Verify cache content. + assert len(h.dircache) == 1 + + # Verify another instance without caching enabled does not have cache content. + h = fsspec.filesystem("http", use_listings_cache=False) + assert not h.dircache + + # Verify that yet another new instance, with caching enabled, + # will see the same cache content again. + h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=5) + assert len(h.dircache) == 1 + + # However, yet another instance with a different expiry time will also not have + # any valid cache content. + h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=666) + assert len(h.dircache) == 0 + + +def test_ls_raises_filenotfound(server, sync): + h = fsspec.filesystem("http") + + with pytest.raises(FileNotFoundError): + h.ls(server + "/not-a-key") + + +def test_list_cache_with_max_paths(server, sync): + h = fsspec.filesystem("http", use_listings_cache=True, max_paths=5) + out = h.glob(server + "/index/*") + assert out == [server + "/index/realfile"] + + +def test_list_cache_with_skip_instance_cache(server, sync): + h = fsspec.filesystem("http", use_listings_cache=True, skip_instance_cache=True) + out = h.glob(server + "/index/*") + assert out == [server + "/index/realfile"] + + +def test_isdir(server, sync): + h = fsspec.filesystem("http") + assert h.isdir(server + "/index/") + assert not h.isdir(server + "/index/realfile") + assert not h.isdir(server + "doesnotevenexist") + + +def test_exists(server, sync): + h = fsspec.filesystem("http") + assert not h.exists(server + "/notafile") + with pytest.raises(FileNotFoundError): + h.cat(server + "/notafile") + + +def test_read(server, sync): + h = fsspec.filesystem("http") + out = server + "/index/realfile" + # with h.open(out, "rb") as f: + # assert f.read() == data + # with h.open(out, "rb", block_size=0) as f: + # assert f.read() == data + with h.open(out, "rb") as f: + o1 = f.read(100) + o2 = f.read() + assert o1 + o2 == data + + +def test_methods(server, sync): + h = fsspec.filesystem("http") + url = server + "/index/realfile" + assert h.exists(url) + assert h.cat(url) == data + + +@pytest.mark.parametrize( + "headers", + [ + {}, + {"give_length": "true"}, + {"give_length": "true", "head_ok": "true"}, + {"give_range": "true"}, + {"give_length": "true", "head_not_auth": "true"}, + {"give_range": "true", "head_not_auth": "true"}, + {"use_206": "true", "head_ok": "true", "head_give_length": "true"}, + {"use_206": "true", "give_length": "true"}, + {"use_206": "true", "give_range": "true"}, + ], +) +def test_random_access(server, headers, sync): + h = fsspec.filesystem("http", headers=headers) + url = server + "/index/realfile" + with h.open(url, "rb") as f: + if headers: + assert f.size == len(data) + assert f.read(5) == data[:5] + + if headers: + f.seek(5, 1) + assert f.read(5) == data[10:15] + else: + with pytest.raises(ValueError): + f.seek(5, 1) + assert f.closed + + +@pytest.mark.parametrize( + "headers", + [ + {"ignore_range": "true", "head_ok": "true", "give_length": "true"}, + {"ignore_range": "true", "give_length": "true"}, + {"ignore_range": "true", "give_range": "true"}, + ], +) +def test_no_range_support(server, headers, sync): + h = fsspec.filesystem("http", headers=headers) + url = server + "/index/realfile" + with h.open(url, "rb") as f: + # Random access is not possible if the server doesn't respect Range + f.seek(5) + with pytest.raises(ValueError): + f.read(10) + + # Reading from the beginning should still work + f.seek(0) + assert f.read(10) == data[:10] + + +def test_mapper_url(server, sync): + h = fsspec.filesystem("http") + mapper = h.get_mapper(server + "/index/") + assert mapper.root.startswith("http:") + assert list(mapper) + + mapper2 = fsspec.get_mapper(server + "/index/") + assert mapper2.root.startswith("http:") + assert list(mapper) == list(mapper2) + + +def test_content_length_zero(server, sync): + h = fsspec.filesystem( + "http", headers={"give_length": "true", "zero_length": "true"} + ) + url = server + "/index/realfile" + + with h.open(url, "rb") as f: + assert f.read() == data + + +def test_download(server, tmpdir, sync): + h = fsspec.filesystem("http", headers={"give_length": "true", "head_ok": "true "}) + url = server + "/index/realfile" + fn = os.path.join(tmpdir, "afile") + h.get(url, fn) + assert open(fn, "rb").read() == data + + +def test_multi_download(server, tmpdir, sync): + h = fsspec.filesystem("http", headers={"give_length": "true", "head_ok": "true "}) + urla = server + "/index/realfile" + urlb = server + "/index/otherfile" + fna = os.path.join(tmpdir, "afile") + fnb = os.path.join(tmpdir, "bfile") + h.get([urla, urlb], [fna, fnb]) + assert open(fna, "rb").read() == data + assert open(fnb, "rb").read() == data + + +def test_ls(server, sync): + h = fsspec.filesystem("http") + l = h.ls(server + "/data/20020401/", detail=False) + nc = server + "/data/20020401/GRACEDADM_CLSM0125US_7D.A20020401.030.nc4" + assert nc in l + assert len(l) == 11 + assert all(u["type"] == "file" for u in h.ls(server + "/data/20020401/")) + assert h.glob(server + "/data/20020401/*.nc4") == [nc] + + +def test_mcat(server, sync): + h = fsspec.filesystem("http", headers={"give_length": "true", "head_ok": "true "}) + urla = server + "/index/realfile" + urlb = server + "/index/otherfile" + out = h.cat([urla, urlb]) + assert out == {urla: data, urlb: data} + + +def test_cat_file_range(server, sync): + h = fsspec.filesystem("http", headers={"give_length": "true", "head_ok": "true "}) + urla = server + "/index/realfile" + assert h.cat(urla, start=1, end=10) == data[1:10] + assert h.cat(urla, start=1) == data[1:] + + assert h.cat(urla, start=-10) == data[-10:] + assert h.cat(urla, start=-10, end=-2) == data[-10:-2] + + assert h.cat(urla, end=-10) == data[:-10] + + +def test_mcat_cache(server, sync): + urla = server + "/index/realfile" + urlb = server + "/index/otherfile" + fs = fsspec.filesystem("simplecache", target_protocol="http") + assert fs.cat([urla, urlb]) == {urla: data, urlb: data} + + +def test_mcat_expand(server, sync): + h = fsspec.filesystem("http", headers={"give_length": "true", "head_ok": "true "}) + out = h.cat(server + "/index/*") + assert out == {server + "/index/realfile": data} + + +def test_info(server, sync): + fs = fsspec.filesystem("http", headers={"give_etag": "true", "head_ok": "true"}) + info = fs.info(server + "/index/realfile") + assert info["ETag"] == "xxx" + + +@pytest.mark.parametrize("method", ["POST", "PUT"]) +def test_put_file(server, tmp_path, method, reset_files, sync): + src_file = tmp_path / "file_1" + src_file.write_bytes(data) + + dwl_file = tmp_path / "down_1" + + fs = fsspec.filesystem("http", headers={"head_ok": "true", "give_length": "true"}) + with pytest.raises(FileNotFoundError): + fs.info(server + "/hey") + + fs.put_file(src_file, server + "/hey", method=method) + assert fs.info(server + "/hey")["size"] == len(data) + + fs.get_file(server + "/hey", dwl_file) + assert dwl_file.read_bytes() == data + + src_file.write_bytes(b"xxx") + with open(src_file, "rb") as stream: + fs.put_file(stream, server + "/hey_2", method=method) + assert fs.cat(server + "/hey_2") == b"xxx" + + fs.put_file(io.BytesIO(b"yyy"), server + "/hey_3", method=method) + assert fs.cat(server + "/hey_3") == b"yyy" + + +def test_encoded(server, sync): + fs = fsspec.filesystem("http", encoded=False) + out = fs.cat(server + "/Hello: Günter", headers={"give_path": "true"}) + assert json.loads(out)["path"] == "/Hello:%20G%C3%BCnter" + + +def test_with_cache(server, sync): + fs = fsspec.filesystem("http", headers={"head_ok": "true", "give_length": "true"}) + fn = server + "/index/realfile" + fs1 = fsspec.filesystem("blockcache", fs=fs) + with fs1.open(fn, "rb") as f: + out = f.read() + assert out == fs1.cat(fn) From 9db78bf71e9bd4b23054cce06b670713b05993c3 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 6 Mar 2025 16:47:57 -0500 Subject: [PATCH 6/8] No check --- fsspec/implementations/http_sync.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/fsspec/implementations/http_sync.py b/fsspec/implementations/http_sync.py index f9b1cf93c..a45ed384b 100644 --- a/fsspec/implementations/http_sync.py +++ b/fsspec/implementations/http_sync.py @@ -142,9 +142,6 @@ def request( import js from js import Blob, XMLHttpRequest - if hasattr(js, "document"): - raise RuntimeError("Filesystem can only be run from a worker, not main") - logger.debug("JS request: %s %s", method, url) if cert or verify or proxies or files or cookies or hooks: From 4a4a44be66ddbdbe24f108a49de714dd626fb786 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 7 Mar 2025 14:48:53 -0500 Subject: [PATCH 7/8] Change protocol name This specifically for pandas, which looks for URLs starting with "http" to pass to requests --- fsspec/implementations/http_sync.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fsspec/implementations/http_sync.py b/fsspec/implementations/http_sync.py index 805746271..ac29f2d8e 100644 --- a/fsspec/implementations/http_sync.py +++ b/fsspec/implementations/http_sync.py @@ -201,7 +201,7 @@ class HTTPFileSystem(AbstractFileSystem): you are testing pyodide/pyscript integration*** """ - protocol = ("http", "https", "http_sync", "https_sync") + protocol = ("http", "https", "sync_http", "sync_https") sep = "/" def __init__( @@ -916,8 +916,8 @@ def _file_info(url, session, size_policy="head", **kwargs): def register(): register_implementation("http", HTTPFileSystem, clobber=True) register_implementation("https", HTTPFileSystem, clobber=True) - register_implementation("http_sync", HTTPFileSystem, clobber=True) - register_implementation("https_sync", HTTPFileSystem, clobber=True) + register_implementation("sync_http", HTTPFileSystem, clobber=True) + register_implementation("sync_https", HTTPFileSystem, clobber=True) register() From a12d364c1031ce96ec89d2da22a2494d1f711ca5 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 7 Mar 2025 14:59:38 -0500 Subject: [PATCH 8/8] fix tests --- fsspec/implementations/http_sync.py | 6 +- .../implementations/tests/test_http_sync.py | 122 +++++++++--------- 2 files changed, 65 insertions(+), 63 deletions(-) diff --git a/fsspec/implementations/http_sync.py b/fsspec/implementations/http_sync.py index ac29f2d8e..2a1caf1b0 100644 --- a/fsspec/implementations/http_sync.py +++ b/fsspec/implementations/http_sync.py @@ -565,7 +565,7 @@ def info(self, url, **kwargs): return {"name": url, "size": None, **info, "type": "file"} - def glob(self, path, **kwargs): + def glob(self, path, maxdepth=None, **kwargs): """ Find files by glob-matching. @@ -607,7 +607,9 @@ def glob(self, path, **kwargs): root = "" depth = None if "**" in path else path[ind + 1 :].count("/") + 1 - allpaths = self.find(root, maxdepth=depth, withdirs=True, detail=True, **kwargs) + allpaths = self.find( + root, maxdepth=maxdepth or depth, withdirs=True, detail=True, **kwargs + ) # Escape characters special to python regex, leaving our supported # special characters in place. # See https://www.gnu.org/software/bash/manual/html_node/Pattern-Matching.html diff --git a/fsspec/implementations/tests/test_http_sync.py b/fsspec/implementations/tests/test_http_sync.py index 4dd357ce8..330cf4d07 100644 --- a/fsspec/implementations/tests/test_http_sync.py +++ b/fsspec/implementations/tests/test_http_sync.py @@ -20,20 +20,20 @@ def sync(): def test_list(server, sync): h = fsspec.filesystem("http") - out = h.glob(server + "/index/*") - assert out == [server + "/index/realfile"] + out = h.glob(server.address + "/index/*") + assert out == [server.address + "/index/realfile"] def test_list_invalid_args(server, sync): with pytest.raises(TypeError): h = fsspec.filesystem("http", use_foobar=True) - h.glob(server + "/index/*") + h.glob(server.address + "/index/*") def test_list_cache(server, sync): h = fsspec.filesystem("http", use_listings_cache=True) - out = h.glob(server + "/index/*") - assert out == [server + "/index/realfile"] + out = h.glob(server.address + "/index/*") + assert out == [server.address + "/index/realfile"] def test_list_cache_with_expiry_time_cached(server, sync): @@ -44,14 +44,14 @@ def test_list_cache_with_expiry_time_cached(server, sync): # By querying the filesystem with "use_listings_cache=True", # the cache will automatically get populated. - out = h.glob(server + "/index/*") - assert out == [server + "/index/realfile"] + out = h.glob(server.address + "/index/*") + assert out == [server.address + "/index/realfile"] # Verify cache content. assert len(h.dircache) == 1 - out = h.glob(server + "/index/*") - assert out == [server + "/index/realfile"] + out = h.glob(server.address + "/index/*") + assert out == [server.address + "/index/realfile"] def test_list_cache_with_expiry_time_purged(server, sync): @@ -62,26 +62,26 @@ def test_list_cache_with_expiry_time_purged(server, sync): # By querying the filesystem with "use_listings_cache=True", # the cache will automatically get populated. - out = h.glob(server + "/index/*") - assert out == [server + "/index/realfile"] + out = h.glob(server.address + "/index/*") + assert out == [server.address + "/index/realfile"] assert len(h.dircache) == 1 # Verify cache content. - assert server + "/index/" in h.dircache - assert len(h.dircache.get(server + "/index/")) == 1 + assert server.address + "/index/" in h.dircache + assert len(h.dircache.get(server.address + "/index/")) == 1 # Wait beyond the TTL / cache expiry time. time.sleep(0.31) # Verify that the cache item should have been purged. - cached_items = h.dircache.get(server + "/index/") + cached_items = h.dircache.get(server.address + "/index/") assert cached_items is None # Verify that after clearing the item from the cache, # it can get populated again. - out = h.glob(server + "/index/*") - assert out == [server + "/index/realfile"] - cached_items = h.dircache.get(server + "/index/") + out = h.glob(server.address + "/index/*") + assert out == [server.address + "/index/realfile"] + cached_items = h.dircache.get(server.address + "/index/") assert len(cached_items) == 1 @@ -93,8 +93,8 @@ def test_list_cache_reuse(server, sync): # By querying the filesystem with "use_listings_cache=True", # the cache will automatically get populated. - out = h.glob(server + "/index/*") - assert out == [server + "/index/realfile"] + out = h.glob(server.address + "/index/*") + assert out == [server.address + "/index/realfile"] # Verify cache content. assert len(h.dircache) == 1 @@ -118,38 +118,38 @@ def test_ls_raises_filenotfound(server, sync): h = fsspec.filesystem("http") with pytest.raises(FileNotFoundError): - h.ls(server + "/not-a-key") + h.ls(server.address + "/not-a-key") def test_list_cache_with_max_paths(server, sync): h = fsspec.filesystem("http", use_listings_cache=True, max_paths=5) - out = h.glob(server + "/index/*") - assert out == [server + "/index/realfile"] + out = h.glob(server.address + "/index/*") + assert out == [server.address + "/index/realfile"] def test_list_cache_with_skip_instance_cache(server, sync): h = fsspec.filesystem("http", use_listings_cache=True, skip_instance_cache=True) - out = h.glob(server + "/index/*") - assert out == [server + "/index/realfile"] + out = h.glob(server.address + "/index/*") + assert out == [server.address + "/index/realfile"] def test_isdir(server, sync): h = fsspec.filesystem("http") - assert h.isdir(server + "/index/") - assert not h.isdir(server + "/index/realfile") - assert not h.isdir(server + "doesnotevenexist") + assert h.isdir(server.address + "/index/") + assert not h.isdir(server.address + "/index/realfile") + assert not h.isdir(server.address + "doesnotevenexist") def test_exists(server, sync): h = fsspec.filesystem("http") - assert not h.exists(server + "/notafile") + assert not h.exists(server.address + "/notafile") with pytest.raises(FileNotFoundError): - h.cat(server + "/notafile") + h.cat(server.address + "/notafile") def test_read(server, sync): h = fsspec.filesystem("http") - out = server + "/index/realfile" + out = server.address + "/index/realfile" # with h.open(out, "rb") as f: # assert f.read() == data # with h.open(out, "rb", block_size=0) as f: @@ -162,7 +162,7 @@ def test_read(server, sync): def test_methods(server, sync): h = fsspec.filesystem("http") - url = server + "/index/realfile" + url = server.address + "/index/realfile" assert h.exists(url) assert h.cat(url) == data @@ -183,7 +183,7 @@ def test_methods(server, sync): ) def test_random_access(server, headers, sync): h = fsspec.filesystem("http", headers=headers) - url = server + "/index/realfile" + url = server.address + "/index/realfile" with h.open(url, "rb") as f: if headers: assert f.size == len(data) @@ -208,7 +208,7 @@ def test_random_access(server, headers, sync): ) def test_no_range_support(server, headers, sync): h = fsspec.filesystem("http", headers=headers) - url = server + "/index/realfile" + url = server.address + "/index/realfile" with h.open(url, "rb") as f: # Random access is not possible if the server doesn't respect Range f.seek(5) @@ -222,11 +222,11 @@ def test_no_range_support(server, headers, sync): def test_mapper_url(server, sync): h = fsspec.filesystem("http") - mapper = h.get_mapper(server + "/index/") + mapper = h.get_mapper(server.address + "/index/") assert mapper.root.startswith("http:") assert list(mapper) - mapper2 = fsspec.get_mapper(server + "/index/") + mapper2 = fsspec.get_mapper(server.address + "/index/") assert mapper2.root.startswith("http:") assert list(mapper) == list(mapper2) @@ -235,7 +235,7 @@ def test_content_length_zero(server, sync): h = fsspec.filesystem( "http", headers={"give_length": "true", "zero_length": "true"} ) - url = server + "/index/realfile" + url = server.address + "/index/realfile" with h.open(url, "rb") as f: assert f.read() == data @@ -243,7 +243,7 @@ def test_content_length_zero(server, sync): def test_download(server, tmpdir, sync): h = fsspec.filesystem("http", headers={"give_length": "true", "head_ok": "true "}) - url = server + "/index/realfile" + url = server.address + "/index/realfile" fn = os.path.join(tmpdir, "afile") h.get(url, fn) assert open(fn, "rb").read() == data @@ -251,8 +251,8 @@ def test_download(server, tmpdir, sync): def test_multi_download(server, tmpdir, sync): h = fsspec.filesystem("http", headers={"give_length": "true", "head_ok": "true "}) - urla = server + "/index/realfile" - urlb = server + "/index/otherfile" + urla = server.address + "/index/realfile" + urlb = server.address + "/index/otherfile" fna = os.path.join(tmpdir, "afile") fnb = os.path.join(tmpdir, "bfile") h.get([urla, urlb], [fna, fnb]) @@ -262,25 +262,25 @@ def test_multi_download(server, tmpdir, sync): def test_ls(server, sync): h = fsspec.filesystem("http") - l = h.ls(server + "/data/20020401/", detail=False) - nc = server + "/data/20020401/GRACEDADM_CLSM0125US_7D.A20020401.030.nc4" + l = h.ls(server.address + "/data/20020401/", detail=False) + nc = server.address + "/data/20020401/GRACEDADM_CLSM0125US_7D.A20020401.030.nc4" assert nc in l assert len(l) == 11 - assert all(u["type"] == "file" for u in h.ls(server + "/data/20020401/")) - assert h.glob(server + "/data/20020401/*.nc4") == [nc] + assert all(u["type"] == "file" for u in h.ls(server.address + "/data/20020401/")) + assert h.glob(server.address + "/data/20020401/*.nc4") == [nc] def test_mcat(server, sync): h = fsspec.filesystem("http", headers={"give_length": "true", "head_ok": "true "}) - urla = server + "/index/realfile" - urlb = server + "/index/otherfile" + urla = server.address + "/index/realfile" + urlb = server.address + "/index/otherfile" out = h.cat([urla, urlb]) assert out == {urla: data, urlb: data} def test_cat_file_range(server, sync): h = fsspec.filesystem("http", headers={"give_length": "true", "head_ok": "true "}) - urla = server + "/index/realfile" + urla = server.address + "/index/realfile" assert h.cat(urla, start=1, end=10) == data[1:10] assert h.cat(urla, start=1) == data[1:] @@ -291,21 +291,21 @@ def test_cat_file_range(server, sync): def test_mcat_cache(server, sync): - urla = server + "/index/realfile" - urlb = server + "/index/otherfile" + urla = server.address + "/index/realfile" + urlb = server.address + "/index/otherfile" fs = fsspec.filesystem("simplecache", target_protocol="http") assert fs.cat([urla, urlb]) == {urla: data, urlb: data} def test_mcat_expand(server, sync): h = fsspec.filesystem("http", headers={"give_length": "true", "head_ok": "true "}) - out = h.cat(server + "/index/*") - assert out == {server + "/index/realfile": data} + out = h.cat(server.address + "/index/*") + assert out == {server.address + "/index/realfile": data} def test_info(server, sync): fs = fsspec.filesystem("http", headers={"give_etag": "true", "head_ok": "true"}) - info = fs.info(server + "/index/realfile") + info = fs.info(server.address + "/index/realfile") assert info["ETag"] == "xxx" @@ -318,32 +318,32 @@ def test_put_file(server, tmp_path, method, reset_files, sync): fs = fsspec.filesystem("http", headers={"head_ok": "true", "give_length": "true"}) with pytest.raises(FileNotFoundError): - fs.info(server + "/hey") + fs.info(server.address + "/hey") - fs.put_file(src_file, server + "/hey", method=method) - assert fs.info(server + "/hey")["size"] == len(data) + fs.put_file(src_file, server.address + "/hey", method=method) + assert fs.info(server.address + "/hey")["size"] == len(data) - fs.get_file(server + "/hey", dwl_file) + fs.get_file(server.address + "/hey", dwl_file) assert dwl_file.read_bytes() == data src_file.write_bytes(b"xxx") with open(src_file, "rb") as stream: - fs.put_file(stream, server + "/hey_2", method=method) - assert fs.cat(server + "/hey_2") == b"xxx" + fs.put_file(stream, server.address + "/hey_2", method=method) + assert fs.cat(server.address + "/hey_2") == b"xxx" - fs.put_file(io.BytesIO(b"yyy"), server + "/hey_3", method=method) - assert fs.cat(server + "/hey_3") == b"yyy" + fs.put_file(io.BytesIO(b"yyy"), server.address + "/hey_3", method=method) + assert fs.cat(server.address + "/hey_3") == b"yyy" def test_encoded(server, sync): fs = fsspec.filesystem("http", encoded=False) - out = fs.cat(server + "/Hello: Günter", headers={"give_path": "true"}) + out = fs.cat(server.address + "/Hello: Günter", headers={"give_path": "true"}) assert json.loads(out)["path"] == "/Hello:%20G%C3%BCnter" def test_with_cache(server, sync): fs = fsspec.filesystem("http", headers={"head_ok": "true", "give_length": "true"}) - fn = server + "/index/realfile" + fn = server.address + "/index/realfile" fs1 = fsspec.filesystem("blockcache", fs=fs) with fs1.open(fn, "rb") as f: out = f.read()