-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathutils.py
More file actions
91 lines (70 loc) · 3.14 KB
/
utils.py
File metadata and controls
91 lines (70 loc) · 3.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
from __future__ import annotations
import os
import sys
from hashlib import sha256
from os import PathLike
from typing import AsyncGenerator
from urllib.parse import quote
import anyio
import yarl
from anyio import open_file
from otaclient_common._typing import StrOrPath
from .config import config as cfg
if sys.version_info < (3, 12):
from itertools import islice
def batched(iterable, n, *, strict=False): # pragma: no cover
"""Python version of batched, copied from python documentation.
See https://docs.python.org/3/library/itertools.html#itertools.batched.
"""
# batched('ABCDEFG', 3) → ABC DEF G
if n < 1:
raise ValueError("n must be at least one")
iterator = iter(iterable)
while batch := tuple(islice(iterator, n)):
if strict and len(batch) != n:
raise ValueError("batched(): incomplete batch")
yield batch
else:
from itertools import batched # noqa: F401
async def read_file(
fpath: PathLike, chunk_size: int = cfg.LOCAL_READ_SIZE
) -> AsyncGenerator[bytes]:
"""Open and read a file asynchronously."""
async with await open_file(fpath, "rb") as f:
fd = f.wrapped.fileno()
os.posix_fadvise(fd, 0, 0, os.POSIX_FADV_SEQUENTIAL)
while data := await f.read(chunk_size):
yield data
os.posix_fadvise(fd, 0, 0, os.POSIX_FADV_DONTNEED)
def read_file_once(fpath: StrOrPath | anyio.Path) -> bytes:
"""Read the whole file with once call.
This function is to serve small files read.
NOTE(20260420): for small files read, it increases the kernel
page cache pages with much slower speed and much
small amount, so let kernel handles the cache pages.
"""
with open(fpath, "rb") as f:
return f.read()
def url_based_hash(raw_url: str) -> str:
"""Generate sha256hash with unquoted raw_url."""
_sha256_value = sha256(raw_url.encode()).hexdigest()
return f"{cfg.URL_BASED_HASH_PREFIX}{_sha256_value}"
def process_raw_url(raw_url: str, enable_https: bool) -> yarl.URL:
"""Process the raw URL received from upper uvicorn app.
NOTE: raw_url(get from uvicorn) is unquoted, we must quote it again before we send it to the remote
NOTE(20221003): as otaproxy, we should treat all contents after netloc as path and not touch it,
because we should forward the request as it to the remote.
NOTE(20221003): unconditionally set scheme to https if enable_https, else unconditionally set to http
NOTE(20260410): return yarl.URL to prevent aiohttp to encode the URL again.
"""
# raw_url is "<scheme>://<netloc>/<path>..." — find the boundaries by string indexing
_sep = raw_url.index("://") + 3
try:
_slash = raw_url.index("/", _sep)
except ValueError: # no path component
_slash = len(raw_url)
_netloc = raw_url[_sep:_slash]
_scheme = "https" if enable_https else "http"
_scheme_netloc: yarl.URL = yarl.URL(f"{_scheme}://{_netloc}")
# everything after netloc, forwarded as-is with quoted back
return _scheme_netloc.with_path(quote(raw_url[_slash:]), encoded=True)