diff --git a/.gitignore b/.gitignore index a490b5de..eaf88e7e 100644 --- a/.gitignore +++ b/.gitignore @@ -114,6 +114,7 @@ venv.bak/ # Milvus DB db/ *.db +*.db.lock # Project files tmp/ diff --git a/README.md b/README.md index 84565f5e..be8cd84f 100644 --- a/README.md +++ b/README.md @@ -105,6 +105,22 @@ uv pip install "mmore[process,cpu]" > :warning: **Check the instructions for contributors directly at [`docs/for_devs.md`](./docs/for_devs.md)** +### Interactive TUI + +Prefer a guided experience over editing YAML by hand? Install the `tui` extra and launch the interactive Terminal UI: + +```bash +uv sync --extra tui +mmore tui +``` + +From the launcher you can: + +- run any stage (process / postprocess / index / rag / chat) interactively, +- chain the full pipeline (process β postprocess β index β chat), +- generate stage YAML configs through a guided wizard, +- pick from existing example configs without leaving the terminal. + ### Minimal Example You can use our predefined CLI commands to execute parts of the pipeline. Note that you might need to prepend `python -m` to the command if the package does not properly create bash aliases. diff --git a/docs/source/developer_documentation/for_devs.md b/docs/source/developer_documentation/for_devs.md index ecd179c4..5d9949ff 100644 --- a/docs/source/developer_documentation/for_devs.md +++ b/docs/source/developer_documentation/for_devs.md @@ -31,6 +31,7 @@ This guide will help you set up your development environment and contribute to t - [Writing tests](#writing-tests) - [π Pull Request Process](#-pull-request-process) - [PR checklist](#pr-checklist) + - [π₯οΈ Interactive TUI](#οΈ-interactive-tui) - [π‘ Development tips](#-development-tips) - [Working with `uv`](#working-with-uv) - [β Questions](#-questions) @@ -256,6 +257,25 @@ def test_something_on_gpu(): - [ ] Examples are provided for new features - [ ] Commit messages are clear and descriptive +## π₯οΈ Interactive TUI + +MMORE ships with a Terminal UI that wraps the CLI commands behind guided menus and config wizards. Useful for trying the pipeline without writing YAML by hand. + +Launch it from a project working directory: + +```bash +mmore tui +``` + +From the main menu you can: + +- **Run a single command** β pick any stage (`process`, `postprocess`, `index`, `retrieve`, `rag`, `ragcli`, `websearch`), then either select an existing YAML, generate one through a guided wizard, or type a path manually. Generated configs are written to `./tui-configs/` and validated against the stage's dataclass before running. +- **Run full pipeline** β chains `process β postprocess β index` using existing configs. +- **Build a full pipeline config (guided wizard)** β walks through the three stages in order, wiring the postprocess output JSONL into the index config automatically. +- **Chat with indexed documents** β shortcut to `ragcli`. + +Stages whose extras are missing are disabled in the menu with an install hint (e.g. `uv sync --extra rag --extra cpu`). Press `Ctrl-C` inside any sub-flow to cancel back to the main menu; press it again at the main menu to quit. + ## π‘ Development tips ### Working with `uv` diff --git a/pyproject.toml b/pyproject.toml index bb638af2..2a22fe4b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,7 +41,7 @@ dependencies = [ "python-dotenv>=1.0", "typing_extensions>=4.15.0,<5.0", "PyYAML>=6.0", - "setuptools<81" + "setuptools<81", ] [project.optional-dependencies] @@ -128,8 +128,15 @@ api = [ # --- Composite + variant extras --- +tui = [ + # Interactive terminal launcher (`mmore tui`) + "questionary>=2.0", + "rich>=13", + "uv", +] + all = [ - "mmore[process,rag,api,websearch]", + "mmore[process,rag,api,websearch,tui]", ] cpu = [ diff --git a/src/mmore/cli.py b/src/mmore/cli.py index ad952f58..1030a465 100644 --- a/src/mmore/cli.py +++ b/src/mmore/cli.py @@ -265,6 +265,19 @@ def ragcli(config_file: str): my_rag_cli.launch_cli() +@main.command() +def tui(): + """Launch the interactive Terminal UI.""" + try: + from .tui import run + except ModuleNotFoundError as e: + if e.name in ("questionary", "rich", "prompt_toolkit"): + click.echo("TUI dependency missing. Install with: uv sync --extra tui") + raise SystemExit(1) + raise + run() + + @main.group() def colpali(): """ColPali pipeline commands for PDF processing, indexing, and retrieval.""" diff --git a/src/mmore/run_process.py b/src/mmore/run_process.py index da53c62a..66484109 100644 --- a/src/mmore/run_process.py +++ b/src/mmore/run_process.py @@ -44,11 +44,19 @@ class ProcessInference: previous_results: Optional[str] = None +def merged_results_path(output_path: str) -> str: + """Path where `process` writes its final merged JSONL. + + Single source of truth for downstream tooling (TUI, scripts) that needs + to locate the JSONL produced by a `process` run from its config. + """ + return os.path.join(output_path, "merged", "merged_results.jsonl") + + def _write_merged_results(output_path, reused_samples, dispatched=True): """Merge per-processor JSONL files and reused samples into a single output.""" - merged_output_path = os.path.join(output_path, "merged") - output_file = os.path.join(merged_output_path, "merged_results.jsonl") - os.makedirs(merged_output_path, exist_ok=True) + output_file = merged_results_path(output_path) + os.makedirs(os.path.dirname(output_file), exist_ok=True) total_results = 0 with open(output_file, "w") as f: diff --git a/src/mmore/tui/__init__.py b/src/mmore/tui/__init__.py new file mode 100644 index 00000000..3004c7fb --- /dev/null +++ b/src/mmore/tui/__init__.py @@ -0,0 +1,3 @@ +from mmore.tui.app import run + +__all__ = ["run"] diff --git a/src/mmore/tui/app.py b/src/mmore/tui/app.py new file mode 100644 index 00000000..37186654 --- /dev/null +++ b/src/mmore/tui/app.py @@ -0,0 +1,304 @@ +"""mmore TUI entry point.""" + +from __future__ import annotations + +import threading + +import questionary +from rich.panel import Panel +from rich.text import Text + +from mmore.tui.commands import REGISTRY, check_stage_available +from mmore.tui.config_builder import ( + build_full_pipeline_wizard, + pick_or_build_config, +) +from mmore.tui.exceptions import UserCancelledError +from mmore.tui.paths import cwd_default +from mmore.tui.pipeline import run_full_pipeline, run_pipeline_with_configs +from mmore.tui.theme import ( + ACCENT, + ACCENT2, + MUTED, + OK, + QMARK, + QSTYLE, + console, + run_step, + section, + show_banner, +) + +_PIPELINE_STAGES = ("process", "postprocess", "index") + + +def _warm_pipeline_dataclasses() -> None: + """Pre-load process/postprocess/index dataclasses in a daemon thread. + + Called when entering the wizard or full-pipeline flows, where several YAML + validations happen back-to-back. The import cost then overlaps with the + wizard's own prompts. Daemon = no impact on exit. Stages whose canary + imports are missing are skipped so partial installs don't crash the warm-up. + """ + + def _warm() -> None: + for stage in _PIPELINE_STAGES: + spec = REGISTRY[stage] + if check_stage_available(spec) is not None or spec.config_dataclass is None: + continue + try: + spec.config_dataclass() + except Exception: # noqa: BLE001 + pass + + threading.Thread(target=_warm, daemon=True).start() + + +def _show_missing_extras(spec_name: str, hint: str) -> None: + console.print( + Panel( + Text.assemble( + (f"Stage `{spec_name}` can't run.\n\n", "bold"), + (hint, "yellow"), + ), + title="[bold yellow]missing dependencies[/]", + border_style="yellow", + padding=(1, 2), + ) + ) + + +def _missing_extras_notice() -> Panel | None: + """One-line-per-install-command notice β kept compact so the banner stays visible.""" + install_to_stages: dict[str, list[str]] = {} + for name, spec in REGISTRY.items(): + hint = check_stage_available(spec) + if hint and "Install with: " in hint: + cmd = hint.split("Install with: ", 1)[1].strip() + install_to_stages.setdefault(cmd, []).append(name) + + if not install_to_stages: + return None + + body = Text() + for i, (cmd, stages) in enumerate(install_to_stages.items()): + if i > 0: + body.append("\n") + body.append(", ".join(stages), style="bold white") + body.append(" β ", style="yellow") + body.append(cmd, style="cyan") + + return Panel( + body, + title="[bold yellow]β missing extras[/]", + border_style="yellow", + padding=(0, 1), + ) + + +def _disabled_label(label: str) -> str: + """Prefix a menu label so its disabled state is immediately readable.""" + return f"β {label}" + + +def _run_single_command() -> None: + choices = [] + enabled_count = 0 + for spec in REGISTRY.values(): + hint = check_stage_available(spec) + label = f"{spec.name:<12} β {spec.description}" + if hint: + choices.append( + questionary.Choice( + _disabled_label(label), value=spec.name, disabled="missing extras" + ) + ) + else: + choices.append(questionary.Choice(label, value=spec.name)) + enabled_count += 1 + + # questionary crashes ("InquirerControl has no attribute 'pointed_at'") when + # every choice is disabled because it can't pick an initial pointer. Bail + # out with a clear notice instead. + if enabled_count == 0: + notice = _missing_extras_notice() + if notice is not None: + console.print(notice) + return + + name = questionary.select( + "Pick a command", + choices=choices, + style=QSTYLE, + qmark=QMARK, + ).ask() + if name is None: + return + spec = REGISTRY[name] + # Defensive re-check in case the user typed past the disabled state. + hint = check_stage_available(spec) + if hint: + _show_missing_extras(spec.name, hint) + return + config_file = pick_or_build_config(spec) + kwargs = {"config_file": config_file} + if spec.needs_input_data: + input_data = questionary.text( + "Input JSONL path", + default=cwd_default("outputs/process/merged/merged_results.jsonl"), + style=QSTYLE, + qmark=QMARK, + ).ask() + if input_data is None: + return + kwargs["input_data"] = input_data + + console.print() + console.print( + section( + f"Running {name}", + Text(f"config: {config_file}", style=MUTED), + style=ACCENT2, + ) + ) + interactive = name in {"ragcli", "retrieve", "rag"} + if interactive: + spec.run(**kwargs) + else: + run_step(spec.description, spec.run, **kwargs) + console.print(f"[{OK}]β {name} finished[/]") + + +def _chat_only() -> None: + config_file = pick_or_build_config(REGISTRY["ragcli"]) + console.print() + console.print(section("RAG chat", Text(f"config: {config_file}", style=MUTED))) + REGISTRY["ragcli"].run(config_file=config_file) + + +def _run_full_wizard() -> None: + _warm_pipeline_dataclasses() + paths = build_full_pipeline_wizard() + console.print() + console.print( + section( + "Wizard complete", + Text( + "process: " + paths["process"] + "\n" + "postprocess: " + paths["postprocess"] + "\n" + "index: " + paths["index"], + style=MUTED, + ), + style=ACCENT2, + ) + ) + if questionary.confirm( + "Run the pipeline now with these configs?", + default=True, + style=QSTYLE, + qmark=QMARK, + ).ask(): + run_pipeline_with_configs( + paths["process"], paths["postprocess"], paths["index"] + ) + + +def _pipeline_hint() -> str | None: + """Return a combined hint if any of process/postprocess/index is missing.""" + hints = [ + check_stage_available(REGISTRY[s]) for s in ("process", "postprocess", "index") + ] + hints = [h for h in hints if h] + return " | ".join(hints) if hints else None + + +def _main_menu() -> str | None: + notice = _missing_extras_notice() + if notice is not None: + console.print(notice) + + pipeline_hint = _pipeline_hint() + chat_hint = check_stage_available(REGISTRY["ragcli"]) + # The wizard validates each generated YAML against the stage's dataclass, + # which transitively imports torch / transformers / etc. β so it needs the + # same extras as the full pipeline. Reuse `_pipeline_hint()` to stay aligned. + wizard_hint = _pipeline_hint() + + pipeline_label = "π Run full pipeline (process β postprocess β index)" + wizard_label = "π§ Build a full pipeline config (guided wizard)" + chat_label = "π¬ Chat with indexed documents" + + pipeline_choice = questionary.Choice( + _disabled_label(pipeline_label) if pipeline_hint else pipeline_label, + value="pipeline", + disabled="missing extras" if pipeline_hint else None, + ) + wizard_choice = questionary.Choice( + _disabled_label(wizard_label) if wizard_hint else wizard_label, + value="wizard", + disabled="missing extras" if wizard_hint else None, + ) + chat_choice = questionary.Choice( + _disabled_label(chat_label) if chat_hint else chat_label, + value="chat", + disabled="missing extras" if chat_hint else None, + ) + + return questionary.select( + "What do you want to do?", + choices=[ + questionary.Choice("β Run a single command", value="single"), + pipeline_choice, + wizard_choice, + chat_choice, + questionary.Separator(), + questionary.Choice("π§ Setup (install dependencies)", value="setup"), + questionary.Choice("β Quit", value="quit"), + ], + style=QSTYLE, + qmark=QMARK, + ).ask() + + +def run() -> None: + console.clear() + show_banner("interactive launcher") + while True: + # Ctrl-C at the main menu itself quits; inside any sub-flow it + # cancels and returns here. + try: + mode = _main_menu() + except KeyboardInterrupt: + console.print(f"\n[{ACCENT}]bye![/]") + return + if mode in (None, "quit"): + console.print(f"[{ACCENT}]bye![/]") + return + + try: + if mode == "single": + _run_single_command() + elif mode == "pipeline": + _warm_pipeline_dataclasses() + run_full_pipeline() + elif mode == "wizard": + _run_full_wizard() + elif mode == "chat": + _chat_only() + elif mode == "setup": + from mmore.tui.setup import run_setup_wizard + + run_setup_wizard() + except (UserCancelledError, KeyboardInterrupt): + console.print(f"[{ACCENT2}]cancelled β back to menu.[/]") + continue + except Exception as e: # noqa: BLE001 + console.print(f"[bold red]error:[/] {e}") + try: + cont = questionary.confirm( + "Continue?", default=True, style=QSTYLE + ).ask() + except KeyboardInterrupt: + return + if not cont: + return diff --git a/src/mmore/tui/commands.py b/src/mmore/tui/commands.py new file mode 100644 index 00000000..650cbf6d --- /dev/null +++ b/src/mmore/tui/commands.py @@ -0,0 +1,211 @@ +"""Registry of mmore commands callable from the TUI. + +Each entry mirrors a Click command in `mmore.cli` so the TUI is a thin wrapper: +the `run` callable is the same `run_*` function the CLI uses. +""" + +import importlib.util +from dataclasses import dataclass, field +from typing import Any, Callable, Optional + + +@dataclass +class CommandSpec: + name: str + description: str + example_config: Optional[str] + run: Callable[..., None] + needs_input_data: bool = False + config_globs: list[str] = field(default_factory=list) + # Lazy importer returning the dataclass to validate YAML against. + # Returns None if no validation is wired up for this stage. + config_dataclass: Optional[Callable[[], Any]] = None + # Extras the user has to `uv sync --extra ...` for this stage to import. + # Used only to build a friendly install hint. + required_extras: list[str] = field(default_factory=list) + # Module names probed via `importlib.util.find_spec` to verify the extras + # are actually installed. If any is missing, the stage is disabled in the + # menu with an install hint. + canary_imports: list[str] = field(default_factory=list) + + +def check_stage_available(spec: "CommandSpec") -> Optional[str]: + """Return None if all canary imports resolve, else an install-hint string.""" + missing: list[str] = [] + for mod in spec.canary_imports: + try: + if importlib.util.find_spec(mod) is None: + missing.append(mod) + except (ImportError, ValueError): + missing.append(mod) + if not missing: + return None + extras = " ".join(f"--extra {e}" for e in spec.required_extras) + return f"Missing: {', '.join(missing)}. Install with: uv sync {extras}".strip() + + +def _process(config_file: str, **_): + from mmore.run_process import process + + process(config_file) + + +def _postprocess(config_file: str, input_data: str, **_): + from mmore.run_postprocess import postprocess + + postprocess(config_file, input_data) + + +def _index( + config_file: str, + documents_path: Optional[str] = None, + collection_name: Optional[str] = None, + **_, +): + from mmore.run_index import index + + index(config_file, documents_path, collection_name) + + +def _retrieve(config_file: str, **_): + from mmore.run_retriever import run_api + + run_api(config_file, "0.0.0.0", 8001) + + +def _rag(config_file: str, **_): + from mmore.run_rag import rag + + rag(config_file) + + +def _ragcli(config_file: str, **_): + from mmore.run_ragcli import RagCLI + + RagCLI(config_file).launch_cli() + + +def _websearch(config_file: str, **_): + from mmore.run_websearch import run_websearch + + run_websearch(config_file) + + +# Lazy dataclass importers β keeps heavy deps out of TUI startup. +def _dc_process(): + from mmore.run_process import ProcessInference + + return ProcessInference + + +def _dc_postprocess(): + from mmore.process.post_processor.pipeline import PPPipelineConfig + + return PPPipelineConfig + + +def _dc_index(): + from mmore.run_index import IndexConfig + + return IndexConfig + + +def _dc_rag(): + from mmore.run_rag import RAGInferenceConfig + + return RAGInferenceConfig + + +REGISTRY: dict[str, CommandSpec] = { + "process": CommandSpec( + name="process", + description="Crawl + extract documents into a JSONL", + example_config="examples/process/config.yaml", + run=_process, + config_globs=[ + "examples/process/**/*.yaml", + "examples/process/**/*.yml", + ], + config_dataclass=_dc_process, + required_extras=["process", "cpu"], + canary_imports=["torch", "marker", "transformers"], + ), + "postprocess": CommandSpec( + name="postprocess", + description="Chunk / clean processed documents", + example_config="examples/postprocessor/config.yaml", + run=_postprocess, + needs_input_data=True, + config_globs=[ + "examples/postprocessor/**/*.yaml", + "examples/postprocessor/**/*.yml", + ], + config_dataclass=_dc_postprocess, + required_extras=["process", "cpu"], + canary_imports=["torch", "transformers"], + ), + "index": CommandSpec( + name="index", + description="Embed + store documents in Milvus", + example_config="examples/index/config.yaml", + run=_index, + config_globs=[ + "examples/index/**/*.yaml", + "examples/index/**/*.yml", + ], + config_dataclass=_dc_index, + required_extras=["index", "cpu"], + canary_imports=["pymilvus", "sentence_transformers", "torch"], + ), + "retrieve": CommandSpec( + name="retrieve", + description="Run retriever API server", + example_config="examples/rag/config.yaml", + run=_retrieve, + config_globs=[ + "examples/rag/**/*.yaml", + "examples/rag/**/*.yml", + ], + config_dataclass=_dc_rag, + required_extras=["rag", "api", "cpu"], + canary_imports=["fastapi", "pymilvus", "torch"], + ), + "rag": CommandSpec( + name="rag", + description="Run a one-shot RAG pipeline", + example_config="examples/rag/config.yaml", + run=_rag, + config_globs=[ + "examples/rag/**/*.yaml", + "examples/rag/**/*.yml", + ], + config_dataclass=_dc_rag, + required_extras=["rag", "cpu"], + canary_imports=["langchain", "pymilvus", "torch"], + ), + "ragcli": CommandSpec( + name="ragcli", + description="Interactive RAG chat", + example_config="examples/rag/config.yaml", + run=_ragcli, + config_globs=[ + "examples/rag/**/*.yaml", + "examples/rag/**/*.yml", + ], + config_dataclass=_dc_rag, + required_extras=["rag", "cpu"], + canary_imports=["langchain", "pymilvus", "torch"], + ), + "websearch": CommandSpec( + name="websearch", + description="Web search (+ optional RAG)", + example_config="examples/websearchRAG/config.yaml", + run=_websearch, + config_globs=[ + "examples/websearchRAG/**/*.yaml", + "examples/websearchRAG/**/*.yml", + ], + required_extras=["websearch"], + canary_imports=["ddgs"], + ), +} diff --git a/src/mmore/tui/config_builder.py b/src/mmore/tui/config_builder.py new file mode 100644 index 00000000..6c54c6a0 --- /dev/null +++ b/src/mmore/tui/config_builder.py @@ -0,0 +1,836 @@ +"""Generate YAML config files via guided prompts. + +Templates here mirror the example configs under `examples/`. The user is +asked only for the fields most likely to change between runs; everything else +falls back to the example defaults. The resulting dict is dumped to a YAML +file under `./tui-configs/`. +""" + +from __future__ import annotations + +import os +import shlex +import subprocess +import time +from pathlib import Path +from typing import Any, Optional + +import questionary +import yaml +from rich.live import Live +from rich.panel import Panel +from rich.spinner import Spinner +from rich.syntax import Syntax +from rich.text import Text + +from mmore.tui.commands import CommandSpec +from mmore.tui.exceptions import UserCancelledError +from mmore.tui.paths import cwd_default, repo_root, resolve_example +from mmore.tui.theme import ACCENT, ACCENT2, QMARK, QSTYLE, console, section + + +def _ask(prompt_obj: Any) -> Any: + """Call .ask() and translate Ctrl-C / Esc into UserCancelledError. + + questionary raises KeyboardInterrupt on Ctrl-C and returns None on Esc. + Both should land us back at the main menu, not exit the TUI. + """ + try: + answer = prompt_obj.ask() + except KeyboardInterrupt as e: + raise UserCancelledError("cancelled") from e + if answer is None: + raise UserCancelledError("cancelled") + return answer + + +CONFIG_DIR = Path("./tui-configs") + + +def _prompt(question: str, default: str = "") -> str: + return _ask(questionary.text(question, default=default, style=QSTYLE, qmark=QMARK)) + + +def _confirm(question: str, default: bool = False) -> bool: + return _ask( + questionary.confirm(question, default=default, style=QSTYLE, qmark=QMARK) + ) + + +def _prompt_int(question: str, default: int) -> int: + try: + return int(_prompt(question, str(default))) + except ValueError: + return default + + +def _prompt_float(question: str, default: float) -> float: + try: + return float(_prompt(question, str(default))) + except ValueError: + return default + + +def _save(name: str, data: dict[str, Any]) -> str: + CONFIG_DIR.mkdir(parents=True, exist_ok=True) + path = CONFIG_DIR / f"{name}-{time.time_ns()}.yaml" + with open(path, "w") as f: + yaml.safe_dump(data, f, sort_keys=False) + return str(path) + + +def _preview_config(path: str) -> None: + """Display a YAML file with syntax highlighting.""" + content = Path(path).read_text() + console.print( + Panel( + Syntax(content, "yaml", theme="monokai", line_numbers=True), + title=f"[bold]{path}[/bold]", + border_style=ACCENT, + padding=(1, 2), + ) + ) + + +def _edit_config(path: str) -> None: + """Open a config file in $EDITOR (falls back to vi). + + Supports editors with flags like ``EDITOR="code -w"`` via shlex.split. + """ + editor = os.environ.get("EDITOR", "vi") + subprocess.call([*shlex.split(editor), path]) + + +def _post_validation_menu(path: str, spec: CommandSpec) -> str: + """After validation, let the user preview, edit, or run the config. + + Returns the (potentially re-validated) path. + """ + while True: + action = _ask( + questionary.select( + "What next?", + choices=[ + questionary.Choice("βΆ Run with this config", value="run"), + questionary.Choice("π Preview config", value="preview"), + questionary.Choice("β Edit in $EDITOR", value="edit"), + ], + default="run", + style=QSTYLE, + qmark=QMARK, + ) + ) + if action == "run": + return path + if action == "preview": + _preview_config(path) + continue + if action == "edit": + _edit_config(path) + err = _validate_with_spinner(path, spec) + if err: + _show_error_panel(path, err) + continue + return path # unreachable but keeps mypy happy + + +def build_process_config() -> str: + data_path = _prompt( + "Data path (folder with documents to process)", + cwd_default("data"), + ) + output_path = _prompt( + "Output path (where merged_results.jsonl will be written)", + cwd_default("outputs/process"), + ) + use_fast = _confirm("Use fast (lower-quality) processors?", default=False) + distributed = _confirm("Use distributed processing (Dask)?", default=False) + extract_images = _confirm("Extract images from documents?", default=True) + + cfg = { + "data_path": data_path, + "google_drive_ids": [], + "previous_results": None, + "dispatcher_config": { + "output_path": output_path, + "use_fast_processors": use_fast, + "distributed": distributed, + "extract_images": extract_images, + "scheduler_file": None, + "process_batch_sizes": [ + {"URLProcessor": 40}, + {"DOCXProcessor": 100}, + {"PDFProcessor": 4000}, + {"MediaProcessor": 40}, + {"SpreadsheetProcessor": 100}, + {"TXTProcessor": 100}, + {"PPTXProcessor": 100}, + {"MarkdownProcessor": 100}, + {"EMLProcessor": 100}, + {"HTMLProcessor": 100}, + ], + "processor_config": { + "MediaProcessor": [ + {"normal_model": "openai/whisper-large-v3-turbo"}, + {"fast_model": "openai/whisper-tiny"}, + {"type": "automatic-speech-recognition"}, + {"sample_rate": 10}, + {"batch_size": 4}, + ], + "PDFProcessor": [ + {"PDFTEXT_CPU_WORKERS": 0}, + {"DETECTOR_BATCH_SIZE": 1}, + {"DETECTOR_POSTPROCESSING_CPU_WORKERS": 0}, + {"RECOGNITION_BATCH_SIZE": 1}, + {"OCR_PARALLEL_WORKERS": 0}, + {"TEXIFY_BATCH_SIZE": 1}, + {"LAYOUT_BATCH_SIZE": 1}, + {"ORDER_BATCH_SIZE": 1}, + {"TABLE_REC_BATCH_SIZE": 1}, + ], + }, + }, + } + return _save("process", cfg) + + +def build_postprocess_config() -> str: + strategy = _ask( + questionary.select( + "Chunking strategy", + choices=["sentence", "token", "word", "semantic"], + default="sentence", + style=QSTYLE, + qmark=QMARK, + ) + ) + table_handling = _ask( + questionary.select( + "Table handling", + choices=["single_row", "multi_rows", "keep_whole", "none"], + default="single_row", + style=QSTYLE, + qmark=QMARK, + ) + ) + output_path = _prompt( + "Output JSONL path", + cwd_default("outputs/postprocess/results.jsonl"), + ) + + cfg = { + "previous_results": None, + "pp_modules": [ + { + "type": "chunker", + "args": { + "chunking_strategy": strategy, + "table_handling": table_handling, + }, + }, + ], + "output": {"output_path": output_path, "save_each_step": True}, + } + return _save("postprocess", cfg) + + +def build_index_config(documents_path: Optional[str] = None) -> str: + dense = _prompt("Dense embedding model", "sentence-transformers/all-MiniLM-L6-v2") + sparse = _prompt("Sparse embedding model", "splade") + db_uri = _prompt( + "DB URI (Milvus Lite file or server URL)", cwd_default("proc_demo.db") + ) + db_name = _prompt("DB name", "my_db") + collection = _prompt("Collection name", "my_docs") + docs = documents_path or _prompt( + "Documents JSONL path", + cwd_default("outputs/postprocess/results.jsonl"), + ) + cfg = { + "indexer": { + "dense_model": {"model_name": dense, "is_multimodal": False}, + "sparse_model": {"model_name": sparse, "is_multimodal": False}, + "db": {"uri": db_uri, "name": db_name}, + }, + "collection_name": collection, + "documents_path": docs, + } + return _save("index", cfg) + + +def build_rag_config() -> str: + """Wizard for `rag` / `retrieve` / `ragcli` configs.""" + llm_name = _prompt("LLM name", "OpenMeditron/meditron3-8b") + max_new_tokens = _prompt_int("Max new tokens", 1200) + + db_uri = _prompt( + "DB URI (Milvus Lite file or server URL)", cwd_default("proc_demo.db") + ) + db_name = _prompt("DB name", "my_db") + collection = _prompt("Collection name", "my_docs") + k = _prompt_int("Number of docs to retrieve (k)", 5) + hybrid = _prompt_float("Hybrid search weight (0.0 dense β 1.0 sparse)", 0.5) + use_web = _confirm("Augment retrieval with web search?", default=False) + reranker = _prompt("Reranker model (blank to skip)", "BAAI/bge-reranker-base") + + mode = _ask( + questionary.select( + "Run mode", + choices=["local", "api"], + default="local", + style=QSTYLE, + qmark=QMARK, + ) + ) + + cfg: dict[str, Any] = { + "rag": { + "llm": {"llm_name": llm_name, "max_new_tokens": max_new_tokens}, + "retriever": { + "db": {"uri": db_uri, "name": db_name}, + "hybrid_search_weight": hybrid, + "k": k, + "collection_name": collection, + "use_web": use_web, + "reranker_model_name": reranker or None, + }, + "system_prompt": ( + "Use the following context to answer the questions.\n\n" + "Context:\n{context}" + ), + }, + "mode": mode, + } + if mode == "local": + input_file = _prompt( + "Queries JSONL path", resolve_example("examples/rag/queries.jsonl") + ) + output_file = _prompt( + "Output JSON path", cwd_default("outputs/rag/output.json") + ) + cfg["mode_args"] = {"input_file": input_file, "output_file": output_file} + else: + port = _prompt_int("API port", 8000) + cfg["mode_args"] = { + "endpoint": "/rag", + "host": "0.0.0.0", + "port": port, + } + return _save("rag", cfg) + + +def build_websearch_config() -> str: + """Wizard for `websearch` configs.""" + use_rag = _confirm("Combine web search with RAG?", default=True) + rag_path = "" + if use_rag: + rag_path = _prompt( + "Path to a RAG config YAML", + resolve_example("examples/rag/config.yaml"), + ) + llm_name = _prompt("LLM name", "OpenMeditron/meditron3-8b") + max_new_tokens = _prompt_int("Max new tokens", 1200) + input_queries = _prompt( + "Input queries JSONL", resolve_example("examples/rag/queries.jsonl") + ) + output_file = _prompt( + "Output JSON path", + cwd_default("outputs/websearch/enhanced_results.json"), + ) + n_subqueries = _prompt_int("Number of sub-queries per question", 2) + max_searches = _prompt_int("Max searches per query", 5) + provider = _ask( + questionary.select( + "Search provider", + choices=["duckduckgo"], + default="duckduckgo", + style=QSTYLE, + qmark=QMARK, + ) + ) + + cfg: dict[str, Any] = { + "websearch": { + "use_rag": use_rag, + "rag_config_path": rag_path, + "use_summary": True, + "n_subqueries": n_subqueries, + "input_queries": input_queries, + "output_file": output_file, + "n_loops": 2, + "max_searches": max_searches, + "search_provider": provider, + "max_retries": 3, + "max_context_tokens": 2048, + "fast_tokenizer": False, + "mode": "local", + "llm_config": { + "llm_name": llm_name, + "max_new_tokens": max_new_tokens, + }, + } + } + return _save("websearch", cfg) + + +BUILDERS = { + "process": build_process_config, + "postprocess": build_postprocess_config, + "index": build_index_config, + "rag": build_rag_config, + "retrieve": build_rag_config, + "ragcli": build_rag_config, + "websearch": build_websearch_config, +} + + +# Static list of processor class names β kept in sync with +# src/mmore/process/processors/*.py. Used by the full-pipeline wizard so the +# user can pick a subset rather than always shipping all 10. +_ALL_PROCESSORS: list[tuple[str, int]] = [ + ("PDFProcessor", 4000), + ("DOCXProcessor", 100), + ("PPTXProcessor", 100), + ("MarkdownProcessor", 100), + ("HTMLProcessor", 100), + ("TXTProcessor", 100), + ("EMLProcessor", 100), + ("SpreadsheetProcessor", 100), + ("MediaProcessor", 40), + ("URLProcessor", 40), +] + +_PROCESSOR_DEFAULT_CONFIG: dict[str, list[dict[str, Any]]] = { + "MediaProcessor": [ + {"normal_model": "openai/whisper-large-v3-turbo"}, + {"fast_model": "openai/whisper-tiny"}, + {"type": "automatic-speech-recognition"}, + {"sample_rate": 10}, + {"batch_size": 4}, + ], + "PDFProcessor": [ + {"PDFTEXT_CPU_WORKERS": 0}, + {"DETECTOR_BATCH_SIZE": 1}, + {"DETECTOR_POSTPROCESSING_CPU_WORKERS": 0}, + {"RECOGNITION_BATCH_SIZE": 1}, + {"OCR_PARALLEL_WORKERS": 0}, + {"TEXIFY_BATCH_SIZE": 1}, + {"LAYOUT_BATCH_SIZE": 1}, + {"ORDER_BATCH_SIZE": 1}, + {"TABLE_REC_BATCH_SIZE": 1}, + ], +} + + +def build_process_config_wizard() -> str: + """Richer process-config builder that lets the user pick processors.""" + data_path = _prompt( + "Data path (folder with documents to process)", cwd_default("data") + ) + output_path = _prompt( + "Output path (where merged_results.jsonl will be written)", + cwd_default("outputs/process"), + ) + use_fast = _confirm("Use fast (lower-quality) processors?", default=False) + distributed = _confirm("Use distributed processing (Dask)?", default=False) + extract_images = _confirm("Extract images from documents?", default=True) + + names = [n for n, _ in _ALL_PROCESSORS] + selected = _ask( + questionary.checkbox( + "Select processors to enable", + choices=[questionary.Choice(n, value=n, checked=True) for n in names], + style=QSTYLE, + qmark=QMARK, + ) + ) + if not selected: + selected = names # empty would mean a no-op pipeline; fall back to all + + customize = _confirm("Customize batch sizes?", default=False) + sizes: list[dict[str, int]] = [] + for name, default in _ALL_PROCESSORS: + if name not in selected: + continue + value = _prompt_int(f"Batch size for {name}", default) if customize else default + sizes.append({name: value}) + + processor_config = { + name: cfg for name, cfg in _PROCESSOR_DEFAULT_CONFIG.items() if name in selected + } + + # Incremental resume: detect previous results + from mmore.run_process import merged_results_path + + previous_results = None + prev_path = merged_results_path(output_path) + if os.path.exists(prev_path) and _confirm( + f"Previous results found at {prev_path}. Resume (skip unchanged files)?", + default=True, + ): + previous_results = prev_path + + cfg = { + "data_path": data_path, + "google_drive_ids": [], + "previous_results": previous_results, + "dispatcher_config": { + "output_path": output_path, + "use_fast_processors": use_fast, + "distributed": distributed, + "extract_images": extract_images, + "scheduler_file": None, + "process_batch_sizes": sizes, + "processor_config": processor_config, + }, + } + return _save("process", cfg) + + +def _postprocessor_choices() -> list[str]: + """Enumerate every post-processor `type` string the loader accepts. + + The wizard is reachable without the `process` extra installed (it only + writes YAML), so we fall back to the core set if the extra modules are + missing instead of crashing mid-wizard with an ImportError. + """ + base = ["chunker", "ner", "translator", "metafuse"] + try: + from mmore.process.post_processor.filter import FILTER_TYPES + from mmore.process.post_processor.tagger import TAGGER_TYPES + except ImportError: + return base + return [*base, *TAGGER_TYPES, *FILTER_TYPES] + + +def _ask_module_args(pp_type: str) -> dict[str, Any]: + if pp_type == "chunker": + strategy = _ask( + questionary.select( + "Chunking strategy", + choices=["sentence", "token", "word", "semantic"], + default="sentence", + style=QSTYLE, + qmark=QMARK, + ) + ) + table_handling = _ask( + questionary.select( + "Table handling", + choices=["single_row", "multi_rows", "keep_whole", "none"], + default="single_row", + style=QSTYLE, + qmark=QMARK, + ) + ) + return { + "chunking_strategy": strategy, + "table_handling": table_handling, + } + if pp_type in {"ner", "translator", "metafuse"}: + if _confirm(f"Provide extra args for `{pp_type}` as YAML?", default=False): + raw = _prompt("YAML args (single line, e.g. {key: value})", "{}") + try: + parsed = yaml.safe_load(raw) or {} + if isinstance(parsed, dict): + return parsed + except yaml.YAMLError: + pass + return {} + return {} + + +def build_postprocess_config_wizard() -> str: + """Build a postprocess config with an arbitrary list of pp_modules.""" + available = _postprocessor_choices() + modules: list[dict[str, Any]] = [] + while True: + if modules: + console.print( + f" [dim]current modules:[/] {', '.join(m['type'] for m in modules)}" + ) + pp_type = _ask( + questionary.select( + "Add a post-processor module" if not modules else "Add another module", + choices=[*available, questionary.Separator(), "(done)"], + style=QSTYLE, + qmark=QMARK, + ) + ) + if pp_type == "(done)": + break + args = _ask_module_args(pp_type) + modules.append({"type": pp_type, "args": args}) + + output_path = _prompt( + "Output JSONL path", + cwd_default("outputs/postprocess/results.jsonl"), + ) + + # Incremental resume: detect previous results + previous_results = None + # Resolve the actual JSONL path (dir β dir/final.jsonl, .jsonl β as-is) + if output_path.endswith(".jsonl"): + pp_prev_path = output_path + else: + pp_prev_path = os.path.join(output_path, "final.jsonl") + if os.path.exists(pp_prev_path) and _confirm( + f"Previous results found at {pp_prev_path}. Resume (skip unchanged)?", + default=True, + ): + previous_results = pp_prev_path + + cfg = { + "previous_results": previous_results, + "pp_modules": modules, + "output": {"output_path": output_path, "save_each_step": True}, + } + return _save("postprocess", cfg) + + +def build_index_config_wizard(documents_path: Optional[str] = None) -> str: + dense = _prompt("Dense embedding model", "sentence-transformers/all-MiniLM-L6-v2") + sparse = _prompt("Sparse embedding model", "splade") + multimodal = _confirm("Multimodal embeddings?", default=False) + db_uri = _prompt( + "DB URI (Milvus Lite file or server URL)", cwd_default("proc_demo.db") + ) + db_name = _prompt("DB name", "my_db") + collection = _prompt("Collection name", "my_docs") + docs = documents_path or _prompt( + "Documents JSONL path", + cwd_default("outputs/postprocess/results.jsonl"), + ) + cfg = { + "indexer": { + "dense_model": {"model_name": dense, "is_multimodal": multimodal}, + "sparse_model": {"model_name": sparse, "is_multimodal": multimodal}, + "db": {"uri": db_uri, "name": db_name}, + }, + "collection_name": collection, + "documents_path": docs, + } + return _save("index", cfg) + + +def build_full_pipeline_wizard() -> dict[str, str]: + """Build process + postprocess + index configs in one flow. + + Wires the postprocess output JSONL into the index config's documents_path + so the three files form a coherent pipeline. Validates each YAML and + re-prompts on failure (the per-stage builders run again on retry). + """ + from mmore.tui.commands import REGISTRY + from mmore.tui.pipeline import _postprocess_output_jsonl + + console.print(section("Pipeline wizard", Text("step 1/3 β process", style=ACCENT2))) + while True: + process_path = build_process_config_wizard() + err = _validate_with_spinner(process_path, REGISTRY["process"]) + if err is None: + break + _show_error_panel(process_path, err) + if not _confirm("Retry the process step?", default=True): + raise UserCancelledError("cancelled") + + console.print( + section("Pipeline wizard", Text("step 2/3 β postprocess", style=ACCENT2)) + ) + while True: + pp_path = build_postprocess_config_wizard() + err = _validate_with_spinner(pp_path, REGISTRY["postprocess"]) + if err is None: + break + _show_error_panel(pp_path, err) + if not _confirm("Retry the postprocess step?", default=True): + raise UserCancelledError("cancelled") + + try: + docs_jsonl = _postprocess_output_jsonl(pp_path) + except Exception: # noqa: BLE001 + docs_jsonl = None + + console.print(section("Pipeline wizard", Text("step 3/3 β index", style=ACCENT2))) + while True: + index_path = build_index_config_wizard(documents_path=docs_jsonl) + err = _validate_with_spinner(index_path, REGISTRY["index"]) + if err is None: + break + _show_error_panel(index_path, err) + if not _confirm("Retry the index step?", default=True): + raise UserCancelledError("cancelled") + + return {"process": process_path, "postprocess": pp_path, "index": index_path} + + +def find_yaml_configs(spec: CommandSpec) -> list[str]: + """Find candidate YAML configs scoped to this stage. + + Globs are evaluated against the resolved repo root (looked up by walking + up from CWD), so the TUI works from any working directory. Generated + configs in `./tui-configs/` (CWD-relative) are always included so users + keep access to configs they just built. + """ + root = repo_root() or Path.cwd() + matches: list[str] = [] + for pattern in spec.config_globs: + for p in root.glob(pattern): + matches.append(str(p)) + generated = Path.cwd() / "tui-configs" + if generated.exists(): + for p in sorted(generated.glob(f"{spec.name}-*.yaml")): + matches.append(str(p)) + + seen: set[str] = set() + out: list[str] = [] + for m in matches: + if m not in seen: + seen.add(m) + out.append(m) + return out + + +def _validate_yaml(path: str, spec: CommandSpec) -> Optional[str]: + """Return None on success, an error message string on failure.""" + if spec.config_dataclass is None: + return None + try: + from mmore.utils import load_config + + dataclass_cls = spec.config_dataclass() + load_config(path, dataclass_cls) + return None + except Exception as e: # noqa: BLE001 + return f"{type(e).__name__}: {e}" + + +def _validate_with_spinner(path: str, spec: CommandSpec) -> Optional[str]: + """Same as _validate_yaml but shows a spinner β config dataclass imports + can take several seconds (heavy transitive imports), making the TUI look + frozen otherwise.""" + spinner = Spinner( + "dots", text=Text(f" Validating {spec.name} configβ¦", style="cyan") + ) + result: dict[str, Optional[str]] = {} + with Live(spinner, console=console, refresh_per_second=12, transient=True): + result["err"] = _validate_yaml(path, spec) + return result["err"] + + +def _show_error_panel(path: str, err: str) -> None: + console.print( + Panel( + Text.assemble( + (f"{path}\n\n", "bold"), + (err, "red"), + ), + title="[bold red]invalid config[/]", + border_style="red", + padding=(1, 2), + ) + ) + + +def _ranked_choices(spec: CommandSpec, candidates: list[str]) -> list[Any]: + """Put `spec.example_config` first as β recommended; rest under a separator.""" + choices: list[Any] = [] + rec_resolved: Optional[str] = None + if spec.example_config: + rec_resolved = resolve_example(spec.example_config) + rest = list(candidates) + if rec_resolved and rec_resolved in rest: + choices.append( + questionary.Choice(f"β {rec_resolved} (recommended)", value=rec_resolved) + ) + rest.remove(rec_resolved) + elif rec_resolved and Path(rec_resolved).exists(): + choices.append( + questionary.Choice(f"β {rec_resolved} (recommended)", value=rec_resolved) + ) + if rest: + if choices: + choices.append(questionary.Separator("ββ other configs ββ")) + for c in rest: + choices.append(questionary.Choice(c, value=c)) + return choices + + +def pick_or_build_config( + spec: CommandSpec, documents_path: Optional[str] = None +) -> str: + """Ask the user to either pick an existing YAML or generate one. + + Validates the chosen YAML against the stage's dataclass and re-prompts + on failure rather than letting the run blow up later. + """ + while True: + choice = _ask( + questionary.select( + f"Config for `{spec.name}`?", + choices=[ + questionary.Choice("π Pick existing YAML", value="pick"), + questionary.Choice("β¨ Generate new YAML (guided)", value="build"), + questionary.Choice( + "β Edit an existing YAML in $EDITOR", value="edit" + ), + questionary.Choice("β¨ Type a path manually", value="manual"), + ], + style=QSTYLE, + qmark=QMARK, + ) + ) + + path: Optional[str] = None + + if choice in ("pick", "edit"): + candidates = find_yaml_configs(spec) + ranked = _ranked_choices(spec, candidates) + if not ranked: + questionary.print( + f"No YAML configs found for `{spec.name}`, " + "falling back to manual entry.", + style="fg:yellow", + ) + choice = "manual" + else: + picked = _ask( + questionary.select( + f"Select a config for `{spec.name}`", + choices=ranked, + style=QSTYLE, + qmark=QMARK, + ) + ) + path = picked + if choice == "edit": + _edit_config(path) + + if choice == "manual": + manual = _prompt("Path to YAML config") + manual = os.path.expandvars(os.path.expanduser(manual)) + if not os.path.exists(manual): + _show_error_panel(manual, "file not found") + continue + path = manual + + if choice == "build": + builder = BUILDERS.get(spec.name) + if builder is None: + questionary.print( + f"No guided builder for `{spec.name}` β pick an existing YAML.", + style="fg:yellow", + ) + continue + if spec.name == "index": + path = builder(documents_path=documents_path) # type: ignore[call-arg] + else: + path = builder() + + if path is None: + raise UserCancelledError("no config selected") + err = _validate_with_spinner(path, spec) + if err is None: + return _post_validation_menu(path, spec) + _show_error_panel(path, err) + if not _confirm("Try a different config?", default=True): + raise UserCancelledError("cancelled") diff --git a/src/mmore/tui/exceptions.py b/src/mmore/tui/exceptions.py new file mode 100644 index 00000000..eb310dae --- /dev/null +++ b/src/mmore/tui/exceptions.py @@ -0,0 +1,11 @@ +"""TUI-only exceptions.""" + +from __future__ import annotations + + +class UserCancelledError(Exception): + """Raised when the user cancels a sub-flow (Ctrl-C or Esc inside a prompt). + + Caught by the top-level menu loop so cancellation returns to the main menu + instead of exiting the whole TUI. + """ diff --git a/src/mmore/tui/inspector.py b/src/mmore/tui/inspector.py new file mode 100644 index 00000000..2d0dd033 --- /dev/null +++ b/src/mmore/tui/inspector.py @@ -0,0 +1,126 @@ +"""Lightweight JSONL inspector for TUI result previews. + +Streams the file line-by-line (no heavy imports like torch/transformers) +and prints a rich summary table + sample documents. +""" + +from __future__ import annotations + +import json +import os +from collections import Counter +from pathlib import Path +from typing import Any + +from rich.panel import Panel +from rich.table import Table +from rich.text import Text + +from mmore.tui.theme import ACCENT, ACCENT2, MUTED, console + + +def _iter_dicts(path: str): + """Yield raw dicts from a JSONL file without importing MultimodalSample.""" + with open(path) as f: + for line in f: + line = line.strip() + if line: + yield json.loads(line) + + +def inspect_jsonl(path: str, max_samples: int = 3) -> None: + """Print a summary of a JSONL file: counts, breakdowns, sample docs.""" + if not os.path.exists(path): + console.print(f" [dim]no output file at {path}[/dim]") + return + + total = 0 + processor_types: Counter[str] = Counter() + file_extensions: Counter[str] = Counter() + modality_types: Counter[str] = Counter() + total_text_len = 0 + samples: list[dict[str, Any]] = [] + + for doc in _iter_dicts(path): + total += 1 + + meta = doc.get("metadata", {}) + pt = meta.get("processor_type", "unknown") + processor_types[pt] += 1 + + fp = meta.get("file_path", "") + ext = Path(fp).suffix.lower() if fp else "(none)" + file_extensions[ext] += 1 + + text = doc.get("text", "") + if isinstance(text, str): + total_text_len += len(text) + + for mod in doc.get("modalities", []): + modality_types[mod.get("type", "unknown")] += 1 + + if len(samples) < max_samples: + samples.append(doc) + + if total == 0: + console.print(" [dim]empty JSONL (0 documents)[/dim]") + return + + # --- Stats table --- + table = Table( + title="[bold]Results summary[/bold]", + title_style=ACCENT2, + border_style=ACCENT, + header_style=f"bold {ACCENT}", + show_lines=False, + padding=(0, 2), + ) + table.add_column("Metric", style="bold") + table.add_column("Value") + + table.add_row("Total documents", str(total)) + table.add_row("Avg text length", f"{total_text_len // total:,} chars") + + if processor_types: + breakdown = ", ".join(f"{k}: {v}" for k, v in processor_types.most_common()) + table.add_row("Processor types", breakdown) + + if file_extensions: + breakdown = ", ".join(f"{k}: {v}" for k, v in file_extensions.most_common()) + table.add_row("File types", breakdown) + + if modality_types: + breakdown = ", ".join(f"{k}: {v}" for k, v in modality_types.most_common()) + table.add_row("Modalities", breakdown) + + console.print() + console.print(table) + + # --- Sample documents --- + if samples: + sample_text = Text() + for i, doc in enumerate(samples, 1): + meta = doc.get("metadata", {}) + fp = meta.get("file_path", "?") + pt = meta.get("processor_type", "?") + text = doc.get("text", "") + if isinstance(text, str): + preview = text[:200].replace("\n", " ") + if len(text) > 200: + preview += "β¦" + else: + preview = "(structured content)" + sample_text.append(f"#{i} ", style="bold") + sample_text.append(f"{fp} ") + sample_text.append(f"({pt})", style="dim") + sample_text.append("\n") + sample_text.append(preview + "\n\n", style=MUTED) + + console.print( + Panel( + sample_text, + title=f"[bold]Sample documents (first {len(samples)})[/bold]", + border_style=ACCENT, + padding=(1, 2), + ) + ) diff --git a/src/mmore/tui/paths.py b/src/mmore/tui/paths.py new file mode 100644 index 00000000..cb2594b6 --- /dev/null +++ b/src/mmore/tui/paths.py @@ -0,0 +1,48 @@ +"""Locate bundled example configs regardless of CWD. + +Strategy: +- Walk up from CWD looking for a directory that contains ``examples/`` + (works from any subdirectory of a source checkout). +- If nothing is found, return the original repo-relative path so error + messages stay readable; callers handle "missing" gracefully. +""" + +from __future__ import annotations + +import os +from pathlib import Path +from typing import Optional + + +def repo_root() -> Optional[Path]: + """Return a directory that contains an `examples/` folder, if any.""" + cwd = Path.cwd() + for candidate in [cwd, *cwd.parents]: + if (candidate / "examples").is_dir(): + return candidate + return None + + +def resolve_example(rel: str) -> str: + """Resolve an `examples/...` relative path to an absolute one. + + Falls back to the original string if no source checkout is found, so the + UI can still display it (and the validator will surface a clear error). + """ + root = repo_root() + if root is not None: + candidate = root / rel + if candidate.exists(): + return str(candidate) + return rel + + +def resolve_glob(pattern: str) -> tuple[Path, str]: + """Split a relative glob into (root, remaining-pattern) for Path.glob.""" + root = repo_root() or Path.cwd() + return root, pattern + + +def cwd_default(rel: str) -> str: + """A safe default path rooted at CWD (e.g. `./data` instead of `examples/...`).""" + return os.path.join(".", rel) diff --git a/src/mmore/tui/pipeline.py b/src/mmore/tui/pipeline.py new file mode 100644 index 00000000..025692fb --- /dev/null +++ b/src/mmore/tui/pipeline.py @@ -0,0 +1,174 @@ +"""Chain process -> postprocess -> index from the TUI.""" + +from __future__ import annotations + +import questionary +from rich.table import Table +from rich.text import Text + +from mmore.tui.commands import REGISTRY +from mmore.tui.config_builder import pick_or_build_config +from mmore.tui.inspector import inspect_jsonl +from mmore.tui.theme import ( + ACCENT, + ACCENT2, + MUTED, + console, + run_step, + section, + step_header, +) + + +def _process_output_jsonl(config_path: str) -> str: + """Resolve the JSONL path the `process` step writes to. + + Goes through `mmore.utils.load_config` so env-var expansion ($ROOT_OUT_DIR, + etc.) matches what the underlying command sees. + """ + from mmore.run_process import ProcessInference, merged_results_path + from mmore.utils import load_config + + cfg: ProcessInference = load_config(config_path, ProcessInference) + return merged_results_path(cfg.dispatcher_config.output_path) + + +def _postprocess_output_jsonl(config_path: str) -> str: + """Resolve the JSONL path `postprocess` writes to. + + Mirrors `PPPipeline`'s use of `mmore.process.utils.jsonl_path`: if the + configured `output_path` is a directory, the pipeline writes to + `