-
Notifications
You must be signed in to change notification settings - Fork 288
Expand file tree
/
Copy pathdata_source.py
More file actions
136 lines (116 loc) · 4.59 KB
/
data_source.py
File metadata and controls
136 lines (116 loc) · 4.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# Copyright (c) 2025 Apple Inc. Licensed under MIT License.
import json
import os
import pathlib
import shutil
import zipfile
from io import BytesIO
from typing import Any
import pandas as pd
from .cache import file_cache_get, file_cache_set
from .utils import to_parquet_bytes
def _deep_merge(base: dict, overrides: dict) -> dict:
result = base.copy()
for key, value in overrides.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = _deep_merge(result[key], value)
else:
result[key] = value
return result
class DataSource:
def __init__(
self,
identifier: str,
dataset: pd.DataFrame,
metadata: dict,
):
self.identifier = identifier
self.dataset = dataset
self.metadata = metadata
self._cache_index: set[str] = set(self._cache_index_load())
def _cache_index_key(self):
return [self.identifier, "__index__"]
def _cache_index_load(self) -> list[str]:
index = file_cache_get(self._cache_index_key(), scope="DataSource")
if index is None:
return []
return index
def _cache_index_save(self):
file_cache_set(
self._cache_index_key(), sorted(self._cache_index), scope="DataSource"
)
def _cache_index_add(self, name: str):
if name not in self._cache_index:
self._cache_index.add(name)
# Re-read from disk and merge to avoid losing entries from other processes
persisted = set(self._cache_index_load())
merged = self._cache_index | persisted
file_cache_set(self._cache_index_key(), sorted(merged), scope="DataSource")
def cache_set(self, name: str, data):
file_cache_set([self.identifier, name], data, scope="DataSource")
self._cache_index_add(name)
def cache_get(self, name: str):
return file_cache_get([self.identifier, name], scope="DataSource")
def cache_items(self) -> dict[str, Any]:
"""Return all cached entries as a dict of {name: value}."""
result = {}
for name in self._cache_index:
value = self.cache_get(name)
if value is not None:
result[name] = value
return result
def _build_metadata(self, metadata_overrides: dict | None = None) -> dict:
metadata = self.metadata | {
"isStatic": True,
"database": {"type": "wasm", "load": True},
}
if metadata_overrides:
metadata = _deep_merge(metadata, metadata_overrides)
return metadata
def make_archive(self, static_path: str, metadata_overrides: dict | None = None):
io = BytesIO()
with zipfile.ZipFile(io, "w", zipfile.ZIP_DEFLATED) as zip:
zip.writestr(
"data/metadata.json",
json.dumps(self._build_metadata(metadata_overrides)),
)
zip.writestr("data/dataset.parquet", to_parquet_bytes(self.dataset))
for root, _, files in os.walk(static_path):
for fn in files:
p = os.path.relpath(os.path.join(root, fn), static_path)
zip.write(os.path.join(root, fn), p)
for name, value in self.cache_items().items():
zip.writestr(
f"data/cache/{name}",
json.dumps(value),
)
return io.getvalue()
def export_to_folder(
self,
static_path: str,
folder_path: str,
metadata_overrides: dict | None = None,
):
folder = pathlib.Path(folder_path)
folder.mkdir(parents=True, exist_ok=True)
# Write metadata and parquet data
data_dir = folder / "data"
data_dir.mkdir(exist_ok=True)
(data_dir / "metadata.json").write_text(
json.dumps(self._build_metadata(metadata_overrides))
)
(data_dir / "dataset.parquet").write_bytes(to_parquet_bytes(self.dataset))
# Copy static frontend files
for root, _, files in os.walk(static_path):
for fn in files:
src = os.path.join(root, fn)
rel = os.path.relpath(src, static_path)
dst = folder / rel
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, dst)
# Write cache files
cache_dir = data_dir / "cache"
for name, value in self.cache_items().items():
cache_file = cache_dir / name
cache_file.parent.mkdir(parents=True, exist_ok=True)
cache_file.write_text(json.dumps(value))