Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 148 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,50 +1,176 @@
# Portus: NL queries for data

## Setup connection
Natural‑language queries for your data — connect SQL databases and DataFrames, ask questions in plain English, and get tables, plots, and explanations back. Portus runs agents on top of DuckDB and your connections, and can use both cloud and local LLMs.

Version: 0.0.4

## Overview
- Ask questions like “list all German shows” or “plot revenue by month”.
- Works with SQLAlchemy engines (e.g., Postgres) and in‑memory DataFrames via a session.
- Uses DuckDB under the hood to federate/query data.
- Built‑in visualization via a Vega‑Lite chat visualizer.
- Pluggable LLMs: OpenAI/Anthropic, or local models through Ollama or any OpenAI‑compatible server.

## Stack and Tooling
- Language: Python (>= 3.12)
- Packaging/build: Hatchling
- Package manager: uv (required version 0.9.5)
- Key deps: pandas, duckdb, langchain, langgraph, jinja2, psycopg2‑binary, matplotlib, edaplot‑vl
- Dev tools: ruff, mypy, pytest, pre‑commit

## Requirements
- Python 3.12+
- uv 0.9.5 (see pyproject `[tool.uv]`)
- Optional: Postgres client libs when using `psycopg2-binary` on your platform

## Installation
Using uv (recommended):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's replace all this with simple
pip install portus-ai


```bash
# Install dependencies for the library
uv sync

# Optionally include example extras (notebooks, dotenv)
uv sync --extra examples
```

If you are developing, you may also want the `dev` dependency group:

```bash
uv sync --group dev
```

## Environment variables
Copy `.env.example` to `.env` and fill in your keys:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we use not uv but pip install, there should be another way to specify keys.
Probably we can say:

Specify your API keys in the env vars:
- `OPENAI_API_KEY` — if using OpenAI models
- `ANTHROPIC_API_KEY` — if using Anthropic models
- Optional for local/OAI‑compatible servers:
  - `OPENAI_BASE_URL` (aka `api_base_url` in code)
  - `OLLAMA_HOST` (e.g., `127.0.0.1:11434`)


```bash
cp .env.example .env
```

Supported variables:
- `OPENAI_API_KEY` — if using OpenAI models
- `ANTHROPIC_API_KEY` — if using Anthropic models
- Optional for local/OAI‑compatible servers:
- `OPENAI_BASE_URL` (aka `api_base_url` in code)
- `OLLAMA_HOST` (e.g., `127.0.0.1:11434`)

## Quickstart

### 1) Create a database connection (SQLAlchemy)
Do not hard‑code credentials in code. Use env vars or a secret manager. Example with placeholders:
Copy link
Contributor

@kosstbarz kosstbarz Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should not give advices in the doc.
We can just show how we read secrets from the env:

user = os.environ.get("DATABASE_USER")
...
url = f"postgresql://{user}:{password}@{host}/{database}"


```python
from sqlalchemy import create_engine

engine = create_engine(
"postgresql://readonly_role:>sU9y95R([email protected]/netflix"
"postgresql://<user>:<password>@<host>/<database>"
)
```

## Create portus session
### 2) Open a Portus session and register sources

```python
from portus.api import open_session
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use instead

import portus
config = portus.LLMConfig(name="gpt-4o-mini", temperature=0)
session = portus.open_session(...)

from portus.configs.llm import LLMConfig

llm_config = LLMConfig(name="gpt-4o-mini", temperature=0)
session = portus.open_session(llm_config=llm_config)
session.add_db(engine)
session = open_session(name="demo", llm_config=llm_config)

# Register your engine (also supports native DuckDB connections)
session.add_db(engine, context="Postgres database for streaming catalog")
```

## Query data
### 3) Ask questions and materialize results

```python
# Start a conversational thread
thread = session.thread()
thread.ask("list all german shows").df()
```

# Ask a question and get a DataFrame
df = thread.ask("list all german shows").df()
print(df.head())

## Local models
# Get a textual answer
print(thread.text())

Portus can be used with local LLMs either using ollama or OpenAI API compatible servers (LM Studio, llama.cpp, etc.).
# Generate a visualization (Vega-Lite under the hood)
plot = thread.plot("bar chart of shows by country")
print(plot.code) # access generated plot code if needed
```

## Local models
Portus can be used with local LLMs either using Ollama or OpenAI‑compatible servers (LM Studio, llama.cpp, vLLM, etc.).

### Ollama
1. Install Ollama for your OS and make sure it is running.
2. Use an `LLMConfig` with `name` of the form `"ollama:<model_name>"`.
For an example see `examples/configs/qwen3-8b-ollama.yaml`.

The model will be downloaded automatically if it doesn't already exist. Alternatively, run `ollama pull <model_name>` to download it manually.

1. Install [ollama](https://ollama.com/download) for your operating system and make sure it is running.
2. Use an LLMConfig with `name` of the form `ollama:model_name`. For an example see [qwen3-8b-ollama.yaml](examples/configs/qwen3-8b-ollama.yaml).
### OpenAI‑compatible servers
You can use any OpenAI‑compatible server by setting `api_base_url` in the `LLMConfig`.
For an example, see `examples/configs/qwen3-8b-oai.yaml`.

The model will be downloaded automatically if it doesn't already exist.
Alternatively, `ollama pull model_name` to download the model manually.
Examples of compatible servers:
- LM Studio (macOS‑friendly; supports the OpenAI Responses API)
- Ollama (`OLLAMA_HOST=127.0.0.1:8080 ollama serve`)
- llama.cpp (`llama-server`)
- vLLM

## Scripts and common tasks
Using Makefile targets:

```bash
# Lint and static checks (pre-commit on all files)
make check

# Run tests (loads .env if present)
make test
```

Using uv directly:

```bash
uv run pytest -v
uv run pre-commit run --all-files
```

## Tests
- Test suite uses `pytest`.
- Some tests are marked `@pytest.mark.apikey` and require provider API keys.

Run all tests:

```bash
uv run pytest -v
```

Run only tests that do NOT require API keys:

```bash
uv run pytest -v -m "not apikey"
```

## Project structure
```
portus/
api.py # public entry: open_session(...)
core/ # Session, Pipe, Executor, Visualizer abstractions
agents/ # Lighthouse (default) and React-DuckDB agents
duckdb/ # DuckDB integration and tools
visualizers/ # Vega-Lite chat visualizer and utilities
examples/ # notebooks, demo script, configs
tests/ # pytest suite
```

### OpenAI compatible servers
## Entry points
- Programmatic: `from portus.api import open_session`
- CLI: none at the moment. TODO: consider adding a CLI entry point via `[project.scripts]`.

You can use any OAI compatible server by setting `api_base_url` in the LLMConfig. For an example, see [qwen3-8b.yaml](examples/configs/qwen3-8b-oai.yaml).
## License
TODO: Add a LICENSE file and state the license here.

Examples of OAI compatible servers:
- [LM Studio](https://lmstudio.ai/) - Recommended for macOS (LMX engine for M-based chips, supports the [OpenAI Responses API](https://platform.openai.com/docs/api-reference/responses)).
- [ollama](https://ollama.com/) - Run with `OLLAMA_HOST=127.0.0.1:8080 ollama serve`. We recommend using ollama directly, as described [above](#ollama).
- [llama.cpp](https://github.com/ggml-org/llama.cpp/tree/master/tools/server) using `llama-server`
- [vLLM](https://github.com/vllm-project/vllm)
- etc.
## Notes
- This README was updated on 2025-10-30 to remove hard‑coded credentials and reflect the current stack (uv + hatchling).
- If anything here is inaccurate or missing (e.g., additional environment variables, supported backends), please open an issue or a PR.
8 changes: 8 additions & 0 deletions portus/caches/in_mem_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,23 @@


class InMemCache(Cache):
"""Process-local, byte-based cache backed by a dict.

Use `scoped()` to create namespaced views over the same underlying storage.
"""

def __init__(self, prefix: str = "", shared_cache: dict[str, bytes] | None = None):
self._cache: dict[str, bytes] = shared_cache if shared_cache is not None else {}
self._prefix = prefix

def put(self, key: str, source: BytesIO) -> None:
"""Store bytes under the current scope/prefix."""
self._cache[self._prefix + key] = source.getvalue()

def get(self, key: str, dest: BytesIO) -> None:
"""Write cached bytes for key into the provided buffer."""
dest.write(self._cache[self._prefix + key])

def scoped(self, scope: str) -> Cache:
"""Return a view of this cache with an additional scope prefix."""
return InMemCache(prefix=self._prefix + scope + ":", shared_cache=self._cache)
8 changes: 8 additions & 0 deletions portus/core/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,23 @@


class Cache(ABC):
"""Simple byte-oriented cache interface with optional scoping."""

@abstractmethod
def put(self, key: str, source: BytesIO) -> None:
"""Store bytes for a key from the given buffer."""
raise NotImplementedError

@abstractmethod
def get(self, key: str, dest: BytesIO) -> None:
"""Load bytes for a key into the provided buffer.

Implementations may raise KeyError if the key is missing.
"""
# TODO Raise KeyError if key not found. Need a "contains" method as well.
raise NotImplementedError

@abstractmethod
def scoped(self, scope: str) -> "Cache":
"""Return a new cache view with the given key prefix/scope."""
raise NotImplementedError
18 changes: 18 additions & 0 deletions portus/core/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@


class ExecutionResult(BaseModel):
"""Immutable result of a single agent/executor step.

Attributes:
text: Human-readable response to the last user query.
meta: Arbitrary metadata collected during execution (debug info, timings, etc.).
code: Text of generated code (e.g., SQL/Vega spec) when applicable.
df: Optional dataframe materialized by the executor.
"""
text: str
meta: dict[str, Any]
code: str | None = None
Expand All @@ -19,6 +27,8 @@ class ExecutionResult(BaseModel):


class Executor(ABC):
"""Abstract interface for components that translate OPAs into results."""

@abstractmethod
def execute(
self,
Expand All @@ -28,4 +38,12 @@ def execute(
rows_limit: int = 100,
cache_scope: str = "common_cache",
) -> ExecutionResult:
"""Execute a single OPA within a session.

Args:
session: Active session providing LLM, data connections, cache, etc.
opa: User intent/query to process.
rows_limit: Preferred row limit for data materialization (may be ignored by executors).
cache_scope: Logical scope for caching per chat/thread.
"""
pass
32 changes: 29 additions & 3 deletions portus/core/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@


class Pipe:
"""Pipe is one chat with an LLM. It contains a history of messages.
It has no access to messages from other pipes even if they are in the same session.
It returns the last obtained answer from the LLM.
"""A single conversational thread within a session.

- Maintains its own message history (isolated from other pipes).
- Materializes data and visualizations lazily on demand and caches results per pipe.
- Exposes helpers to get the latest dataframe/text/plot/code.
"""

def __init__(self, session: "Session", *, default_rows_limit: int = 1000):
self._session = session
self._default_rows_limit = default_rows_limit
Expand All @@ -32,9 +35,14 @@ def __init__(self, session: "Session", *, default_rows_limit: int = 1000):
self._opas: list[Opa] = []
self._meta: dict[str, Any] = {}

# A unique cache scope so agents can store per-thread state (e.g., message history)
self._cache_scope = f"{self._session.name}/{uuid.uuid4()}"

def _materialize_data(self, rows_limit: int | None) -> "ExecutionResult":
"""Materialize latest data state by executing pending OPAs if needed.

Reuses cached result unless new OPAs were added or the row limit changed.
"""
# TODO Recompute on rows_limit change without recomputing the last Opa
rows_limit = rows_limit if rows_limit else self._default_rows_limit
new_opas = self._opas[self._opas_processed_count :]
Expand All @@ -51,6 +59,7 @@ def _materialize_data(self, rows_limit: int | None) -> "ExecutionResult":
return self._data_result

def _materialize_visualization(self, request: str | None, rows_limit: int | None) -> "VisualisationResult":
"""Materialize latest visualization for the given request and current data."""
data = self._materialize_data(rows_limit)
if not self._visualization_materialized or request != self._visualization_request:
# TODO Cache visualization results as in Executor.execute()?
Expand All @@ -64,26 +73,43 @@ def _materialize_visualization(self, request: str | None, rows_limit: int | None
return self._visualization_result

def df(self, *, rows_limit: int | None = None) -> DataFrame | None:
"""Return the latest dataframe, materializing data as needed.

Args:
rows_limit: Optional override for the number of rows to materialize.
"""
return self._materialize_data(rows_limit if rows_limit else self._data_materialized_rows).df

def plot(self, request: str | None = None, *, rows_limit: int | None = None) -> "VisualisationResult":
"""Generate or return the latest visualization for the current data.

Args:
request: Optional natural-language plotting request.
rows_limit: Optional row limit for data materialization.
"""
# TODO Currently, we can't chain calls or maintain a "plot history": pipe.plot("red").plot("blue").
# We have to do pipe.plot("red"), but then pipe.plot("blue") is independent of the first call.
return self._materialize_visualization(request, rows_limit if rows_limit else self._data_materialized_rows)

def text(self) -> str:
"""Return the latest textual answer from the executor/LLM."""
return self._materialize_data(self._data_materialized_rows).text

def __str__(self) -> str:
return self.text()

def ask(self, query: str) -> "Pipe":
"""Append a new user query to this pipe and invalidate visualization cache.

Returns self to allow chaining (e.g., pipe.ask("..."))
"""
self._opas.append(Opa(query=query))
self._visualization_materialized = False
return self

@property
def meta(self) -> dict[str, Any]:
"""Aggregated metadata from executor/visualizer for this pipe."""
return self._meta

@property
Expand Down
Loading
Loading