Skip to content

Commit 61b8232

Browse files
authored
Remove 'datachain query' command (#1483)
1 parent 0ec9ee8 commit 61b8232

File tree

11 files changed

+9
-653
lines changed

11 files changed

+9
-653
lines changed

src/datachain/catalog/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from .catalog import (
22
QUERY_DATASET_PREFIX,
33
QUERY_SCRIPT_CANCELED_EXIT_CODE,
4-
QUERY_SCRIPT_INVALID_LAST_STATEMENT_EXIT_CODE,
54
Catalog,
65
is_namespace_local,
76
)
@@ -10,7 +9,6 @@
109
__all__ = [
1110
"QUERY_DATASET_PREFIX",
1211
"QUERY_SCRIPT_CANCELED_EXIT_CODE",
13-
"QUERY_SCRIPT_INVALID_LAST_STATEMENT_EXIT_CODE",
1412
"Catalog",
1513
"get_catalog",
1614
"is_namespace_local",

src/datachain/catalog/catalog.py

Lines changed: 2 additions & 192 deletions
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,19 @@
33
import os
44
import os.path
55
import posixpath
6-
import signal
7-
import subprocess
8-
import sys
96
import time
107
import traceback
11-
from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence
8+
from collections.abc import Callable, Iterable, Iterator, Sequence
129
from copy import copy
1310
from dataclasses import dataclass
1411
from functools import cached_property, reduce
15-
from threading import Thread
16-
from typing import IO, TYPE_CHECKING, Any, NoReturn
12+
from typing import TYPE_CHECKING, Any
1713
from uuid import uuid4
1814

1915
import sqlalchemy as sa
2016
from sqlalchemy import Column
2117
from tqdm.auto import tqdm
2218

23-
from datachain import json
2419
from datachain.cache import Cache
2520
from datachain.client import Client
2621
from datachain.dataset import (
@@ -43,8 +38,6 @@
4338
DatasetVersionNotFoundError,
4439
NamespaceNotFoundError,
4540
ProjectNotFoundError,
46-
QueryScriptCancelError,
47-
QueryScriptRunError,
4841
)
4942
from datachain.lib.listing import get_listing
5043
from datachain.node import DirType, Node, NodeWithPath
@@ -71,8 +64,6 @@
7164

7265
INDEX_INTERNAL_ERROR_MESSAGE = "Internal error on indexing"
7366
DATASET_INTERNAL_ERROR_MESSAGE = "Internal error on creating dataset"
74-
# exit code we use if last statement in query script is not instance of DatasetQuery
75-
QUERY_SCRIPT_INVALID_LAST_STATEMENT_EXIT_CODE = 10
7667
# exit code we use if query script was canceled
7768
QUERY_SCRIPT_CANCELED_EXIT_CODE = 11
7869
QUERY_SCRIPT_SIGTERM_EXIT_CODE = -15 # if query script was terminated by SIGTERM
@@ -84,78 +75,11 @@
8475
PULL_DATASET_CHECK_STATUS_INTERVAL = 20 # interval to check export status in Studio
8576

8677

87-
def noop(_: str):
88-
pass
89-
90-
91-
class TerminationSignal(RuntimeError): # noqa: N818
92-
def __init__(self, signal):
93-
self.signal = signal
94-
super().__init__("Received termination signal", signal)
95-
96-
def __repr__(self):
97-
return f"{self.__class__.__name__}({self.signal})"
98-
99-
100-
if sys.platform == "win32":
101-
SIGINT = signal.CTRL_C_EVENT
102-
else:
103-
SIGINT = signal.SIGINT
104-
105-
10678
def is_namespace_local(namespace_name) -> bool:
10779
"""Checks if namespace is from local environment, i.e. is `local`"""
10880
return namespace_name == "local"
10981

11082

111-
def shutdown_process(
112-
proc: subprocess.Popen,
113-
interrupt_timeout: int | None = None,
114-
terminate_timeout: int | None = None,
115-
) -> int:
116-
"""Shut down the process gracefully with SIGINT -> SIGTERM -> SIGKILL."""
117-
118-
logger.info("sending interrupt signal to the process %s", proc.pid)
119-
proc.send_signal(SIGINT)
120-
121-
logger.info("waiting for the process %s to finish", proc.pid)
122-
try:
123-
return proc.wait(interrupt_timeout)
124-
except subprocess.TimeoutExpired:
125-
logger.info(
126-
"timed out waiting, sending terminate signal to the process %s", proc.pid
127-
)
128-
proc.terminate()
129-
try:
130-
return proc.wait(terminate_timeout)
131-
except subprocess.TimeoutExpired:
132-
logger.info("timed out waiting, killing the process %s", proc.pid)
133-
proc.kill()
134-
return proc.wait()
135-
136-
137-
def process_output(stream: IO[bytes], callback: Callable[[str], None]) -> None:
138-
buffer = b""
139-
140-
try:
141-
while byt := stream.read(1): # Read one byte at a time
142-
buffer += byt
143-
144-
if byt in (b"\n", b"\r"): # Check for newline or carriage return
145-
line = buffer.decode("utf-8", errors="replace")
146-
callback(line)
147-
buffer = b"" # Clear buffer for the next line
148-
149-
if buffer: # Handle any remaining data in the buffer
150-
line = buffer.decode("utf-8", errors="replace")
151-
callback(line)
152-
finally:
153-
try:
154-
stream.close() # Ensure output is closed
155-
except Exception: # noqa: BLE001, S110
156-
pass
157-
158-
15983
class DatasetRowsFetcher(NodesThreadPool):
16084
def __init__(
16185
self,
@@ -1781,120 +1705,6 @@ def clone(
17811705
recursive=recursive,
17821706
)
17831707

1784-
@staticmethod
1785-
def query(
1786-
query_script: str,
1787-
env: Mapping[str, str] | None = None,
1788-
python_executable: str = sys.executable,
1789-
stdout_callback: Callable[[str], None] | None = None,
1790-
stderr_callback: Callable[[str], None] | None = None,
1791-
params: dict[str, str] | None = None,
1792-
job_id: str | None = None,
1793-
reset: bool = False,
1794-
interrupt_timeout: int | None = None,
1795-
terminate_timeout: int | None = None,
1796-
) -> None:
1797-
if not isinstance(reset, bool):
1798-
raise TypeError(f"reset must be a bool, got {type(reset).__name__}")
1799-
1800-
cmd = [python_executable, "-c", query_script]
1801-
env = dict(env or os.environ)
1802-
env.update(
1803-
{
1804-
"DATACHAIN_QUERY_PARAMS": json.dumps(params or {}),
1805-
"DATACHAIN_JOB_ID": job_id or "",
1806-
"DATACHAIN_CHECKPOINTS_RESET": str(reset),
1807-
},
1808-
)
1809-
popen_kwargs: dict[str, Any] = {}
1810-
1811-
if stdout_callback is not None:
1812-
popen_kwargs = {"stdout": subprocess.PIPE}
1813-
if stderr_callback is not None:
1814-
popen_kwargs["stderr"] = subprocess.PIPE
1815-
1816-
def raise_termination_signal(sig: int, _: Any) -> NoReturn:
1817-
raise TerminationSignal(sig)
1818-
1819-
stdout_thread: Thread | None = None
1820-
stderr_thread: Thread | None = None
1821-
1822-
with subprocess.Popen(cmd, env=env, **popen_kwargs) as proc: # noqa: S603
1823-
logger.info("Starting process %s", proc.pid)
1824-
1825-
orig_sigint_handler = signal.getsignal(signal.SIGINT)
1826-
# ignore SIGINT in the main process.
1827-
# In the terminal, SIGINTs are received by all the processes in
1828-
# the foreground process group, so the script will receive the signal too.
1829-
# (If we forward the signal to the child, it will receive it twice.)
1830-
signal.signal(signal.SIGINT, signal.SIG_IGN)
1831-
1832-
orig_sigterm_handler = signal.getsignal(signal.SIGTERM)
1833-
signal.signal(signal.SIGTERM, raise_termination_signal)
1834-
try:
1835-
if stdout_callback is not None:
1836-
stdout_thread = Thread(
1837-
target=process_output,
1838-
args=(proc.stdout, stdout_callback),
1839-
daemon=True,
1840-
)
1841-
stdout_thread.start()
1842-
if stderr_callback is not None:
1843-
stderr_thread = Thread(
1844-
target=process_output,
1845-
args=(proc.stderr, stderr_callback),
1846-
daemon=True,
1847-
)
1848-
stderr_thread.start()
1849-
1850-
proc.wait()
1851-
except TerminationSignal as exc:
1852-
signal.signal(signal.SIGTERM, orig_sigterm_handler)
1853-
signal.signal(signal.SIGINT, orig_sigint_handler)
1854-
logger.info("Shutting down process %s, received %r", proc.pid, exc)
1855-
# Rather than forwarding the signal to the child, we try to shut it down
1856-
# gracefully. This is because we consider the script to be interactive
1857-
# and special, so we give it time to cleanup before exiting.
1858-
shutdown_process(proc, interrupt_timeout, terminate_timeout)
1859-
if proc.returncode:
1860-
raise QueryScriptCancelError(
1861-
"Query script was canceled by user", return_code=proc.returncode
1862-
) from exc
1863-
finally:
1864-
signal.signal(signal.SIGTERM, orig_sigterm_handler)
1865-
signal.signal(signal.SIGINT, orig_sigint_handler)
1866-
# wait for the reader thread
1867-
thread_join_timeout_seconds = 30
1868-
if stdout_thread is not None:
1869-
stdout_thread.join(timeout=thread_join_timeout_seconds)
1870-
if stdout_thread.is_alive():
1871-
logger.warning(
1872-
"stdout thread is still alive after %s seconds",
1873-
thread_join_timeout_seconds,
1874-
)
1875-
if stderr_thread is not None:
1876-
stderr_thread.join(timeout=thread_join_timeout_seconds)
1877-
if stderr_thread.is_alive():
1878-
logger.warning(
1879-
"stderr thread is still alive after %s seconds",
1880-
thread_join_timeout_seconds,
1881-
)
1882-
1883-
logger.info("Process %s exited with return code %s", proc.pid, proc.returncode)
1884-
if proc.returncode in (
1885-
QUERY_SCRIPT_CANCELED_EXIT_CODE,
1886-
QUERY_SCRIPT_SIGTERM_EXIT_CODE,
1887-
):
1888-
raise QueryScriptCancelError(
1889-
"Query script was canceled by user",
1890-
return_code=proc.returncode,
1891-
)
1892-
if proc.returncode:
1893-
raise QueryScriptRunError(
1894-
f"Query script exited with error code {proc.returncode}",
1895-
return_code=proc.returncode,
1896-
)
1897-
18981708
def cp(
18991709
self,
19001710
sources: list[str],

src/datachain/cli/__init__.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
index,
1616
list_datasets,
1717
ls,
18-
query,
1918
rm_dataset,
2019
show,
2120
)
@@ -90,7 +89,6 @@ def handle_command(args, catalog, client_config) -> int:
9089
"find": lambda: handle_find_command(args, catalog),
9190
"index": lambda: handle_index_command(args, catalog),
9291
"completion": lambda: handle_completion_command(args),
93-
"query": lambda: handle_query_command(args, catalog),
9492
"clear-cache": lambda: clear_cache(catalog),
9593
"gc": lambda: garbage_collect(catalog),
9694
"auth": lambda: process_auth_cli_args(args),
@@ -259,15 +257,6 @@ def handle_completion_command(args):
259257
print(completion(args.shell))
260258

261259

262-
def handle_query_command(args, catalog):
263-
query(
264-
catalog,
265-
args.script,
266-
parallel=args.parallel,
267-
params=args.param,
268-
)
269-
270-
271260
def handle_broken_pipe_error(exc):
272261
# Python flushes standard streams on exit; redirect remaining output
273262
# to devnull to avoid another BrokenPipeError at shutdown
Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,8 @@
1-
from .datasets import (
2-
edit_dataset,
3-
list_datasets,
4-
list_datasets_local,
5-
rm_dataset,
6-
)
1+
from .datasets import edit_dataset, list_datasets, list_datasets_local, rm_dataset
72
from .du import du
83
from .index import index
94
from .ls import ls
105
from .misc import clear_cache, completion, garbage_collect
11-
from .query import query
126
from .show import show
137

148
__all__ = [
@@ -21,7 +15,6 @@
2115
"list_datasets",
2216
"list_datasets_local",
2317
"ls",
24-
"query",
2518
"rm_dataset",
2619
"show",
2720
]

src/datachain/cli/commands/query.py

Lines changed: 0 additions & 54 deletions
This file was deleted.

0 commit comments

Comments
 (0)