Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 129 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,50 +1,157 @@
# Portus: NL queries for data

## Setup connection
Natural‑language queries for your data — connect SQL databases and DataFrames, ask questions in plain English, and get tables, plots, and explanations back.
Portus runs agents on top of dataframes and your DB connections, and can use both cloud and local LLMs.


## Overview
- Ask questions like “list all German shows” or “plot revenue by month”.
- Works with SQLAlchemy engines (e.g., Postgres) and in‑memory DataFrames via a session.
- Uses DuckDB under the hood to federate/query data.
- Built‑in visualization via a Vega‑Lite chat visualizer.
- Pluggable LLMs: OpenAI/Anthropic, or local models through Ollama or any OpenAI‑compatible server.


## Installation
Using pip:
```bash
pip install portus-ai
```

Using uv (for development):
Clone this repo and run:
```bash
# Install dependencies for the library
uv sync

# Optionally include example extras (notebooks, dotenv)
uv sync --extra examples
```

## Environment variables

Specify your API keys in the environment variables:
- `OPENAI_API_KEY` — if using OpenAI models
- `ANTHROPIC_API_KEY` — if using Anthropic models
- Optional for local/OAI‑compatible servers:
- `OPENAI_BASE_URL` (aka `api_base_url` in code)
- `OLLAMA_HOST` (e.g., `127.0.0.1:11434`)

## Quickstart

### 1) Create a database connection (SQLAlchemy)
```python
from sqlalchemy import create_engine

user = os.environ.get("DATABASE_USER")
password = os.environ.get("DATABASE_PASSWORD")
host = os.environ.get("DATABASE_HOST")
database = os.environ.get("DATABASE_NAME")

engine = create_engine(
"postgresql://readonly_role:>sU9y95R([email protected]/netflix"
f"postgresql://{user}:{password}@{host}/{database}"
)
```

## Create portus session
### 2) Open a Portus session and register sources

```python
llm_config = LLMConfig(name="gpt-4o-mini", temperature=0)
session = portus.open_session(llm_config=llm_config)
import portus

llm_config = portus.LLMConfig(name="gpt-4o-mini", temperature=0)
session = portus.open_session(name="demo", llm_config=llm_config)

# Register your engine (also supports native DuckDB connections)
session.add_db(engine)
```

## Query data
### 3) Ask questions and materialize results

```python
# Start a conversational thread
thread = session.thread()
thread.ask("list all german shows").df()
```

# Ask a question and get a DataFrame
df = thread.ask("list all german shows").df()
print(df.head())

## Local models
# Get a textual answer
print(thread.text())

# Generate a visualization (Vega-Lite under the hood)
plot = thread.plot("bar chart of shows by country")
print(plot.code) # access generated plot code if needed
```

Portus can be used with local LLMs either using ollama or OpenAI API compatible servers (LM Studio, llama.cpp, etc.).
## Local models
Portus can be used with local LLMs either using Ollama or OpenAI‑compatible servers (LM Studio, llama.cpp, vLLM, etc.).

### Ollama
1. Install Ollama for your OS and make sure it is running.
2. Use an `LLMConfig` with `name` of the form `"ollama:<model_name>"`.
For an example see `examples/configs/qwen3-8b-ollama.yaml`.

1. Install [ollama](https://ollama.com/download) for your operating system and make sure it is running.
2. Use an LLMConfig with `name` of the form `ollama:model_name`. For an example see [qwen3-8b-ollama.yaml](examples/configs/qwen3-8b-ollama.yaml).
The model will be downloaded automatically if it doesn't already exist. Alternatively, run `ollama pull <model_name>` to download it manually.

The model will be downloaded automatically if it doesn't already exist.
Alternatively, `ollama pull model_name` to download the model manually.
### OpenAI‑compatible servers
You can use any OpenAI‑compatible server by setting `api_base_url` in the `LLMConfig`.
For an example, see `examples/configs/qwen3-8b-oai.yaml`.

Examples of compatible servers:
- LM Studio (macOS‑friendly; supports the OpenAI Responses API)
- Ollama (`OLLAMA_HOST=127.0.0.1:8080 ollama serve`)
- llama.cpp (`llama-server`)
- vLLM

## Scripts and common tasks
Using Makefile targets:

```bash
# Lint and static checks (pre-commit on all files)
make check

# Run tests (loads .env if present)
make test
```

Using uv directly:

```bash
uv run pytest -v
uv run pre-commit run --all-files
```

## Tests
- Test suite uses `pytest`.
- Some tests are marked `@pytest.mark.apikey` and require provider API keys.

Run all tests:

```bash
uv run pytest -v
```

Run only tests that do NOT require API keys:

```bash
uv run pytest -v -m "not apikey"
```

## Project structure
```
portus/
api.py # public entry: open_session(...)
core/ # Session, Pipe, Executor, Visualizer abstractions
agents/ # Lighthouse (default) and React-DuckDB agents
duckdb/ # DuckDB integration and tools
visualizers/ # Vega-Lite chat visualizer and utilities
examples/ # notebooks, demo script, configs
tests/ # pytest suite
```

### OpenAI compatible servers
## Entry points
- Programmatic: `from portus.api import open_session`

You can use any OAI compatible server by setting `api_base_url` in the LLMConfig. For an example, see [qwen3-8b.yaml](examples/configs/qwen3-8b-oai.yaml).
## License
TODO: Add a LICENSE file and state the license here.

Examples of OAI compatible servers:
- [LM Studio](https://lmstudio.ai/) - Recommended for macOS (LMX engine for M-based chips, supports the [OpenAI Responses API](https://platform.openai.com/docs/api-reference/responses)).
- [ollama](https://ollama.com/) - Run with `OLLAMA_HOST=127.0.0.1:8080 ollama serve`. We recommend using ollama directly, as described [above](#ollama).
- [llama.cpp](https://github.com/ggml-org/llama.cpp/tree/master/tools/server) using `llama-server`
- [vLLM](https://github.com/vllm-project/vllm)
- etc.
8 changes: 8 additions & 0 deletions portus/caches/in_mem_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,23 @@


class InMemCache(Cache):
"""Process-local, byte-based cache backed by a dict.

Use `scoped()` to create namespaced views over the same underlying storage.
"""

def __init__(self, prefix: str = "", shared_cache: dict[str, bytes] | None = None):
self._cache: dict[str, bytes] = shared_cache if shared_cache is not None else {}
self._prefix = prefix

def put(self, key: str, source: BytesIO) -> None:
"""Store bytes under the current scope/prefix."""
self._cache[self._prefix + key] = source.getvalue()

def get(self, key: str, dest: BytesIO) -> None:
"""Write cached bytes for key into the provided buffer."""
dest.write(self._cache[self._prefix + key])

def scoped(self, scope: str) -> Cache:
"""Return a view of this cache with an additional scope prefix."""
return InMemCache(prefix=self._prefix + scope + ":", shared_cache=self._cache)
8 changes: 8 additions & 0 deletions portus/core/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,23 @@


class Cache(ABC):
"""Simple byte-oriented cache interface with optional scoping."""

@abstractmethod
def put(self, key: str, source: BytesIO) -> None:
"""Store bytes for a key from the given buffer."""
raise NotImplementedError

@abstractmethod
def get(self, key: str, dest: BytesIO) -> None:
"""Load bytes for a key into the provided buffer.

Implementations may raise KeyError if the key is missing.
"""
# TODO Raise KeyError if key not found. Need a "contains" method as well.
raise NotImplementedError

@abstractmethod
def scoped(self, scope: str) -> "Cache":
"""Return a new cache view with the given key prefix/scope."""
raise NotImplementedError
25 changes: 25 additions & 0 deletions portus/core/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@


class ExecutionResult(BaseModel):
"""Immutable result of a single agent/executor step.

Attributes:
text: Human-readable response to the last user query.
meta: Arbitrary metadata collected during execution (debug info, timings, etc.).
code: Text of generated code when applicable.
df: Optional dataframe materialized by the executor.
"""

text: str
meta: dict[str, Any]
code: str | None = None
Expand All @@ -19,6 +28,14 @@ class ExecutionResult(BaseModel):


class Executor(ABC):
"""
Defines the Executor interface as an abstract base class for execution of
operations within a given session.

Methods:
execute: Abstract method to execute a single OPA within a session.
"""

@abstractmethod
def execute(
self,
Expand All @@ -28,4 +45,12 @@ def execute(
rows_limit: int = 100,
cache_scope: str = "common_cache",
) -> ExecutionResult:
"""Execute a single OPA within a session.

Args:
session: Active session providing LLM, data connections, cache, etc.
opa: User intent/query to process.
rows_limit: Preferred row limit for data materialization (may be ignored by executors).
cache_scope: Logical scope for caching per chat/thread.
"""
pass
1 change: 1 addition & 0 deletions portus/core/opa.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
@dataclass(frozen=True)
class Opa:
"""User question to the LLM"""

query: str
32 changes: 29 additions & 3 deletions portus/core/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@


class Pipe:
"""Pipe is one chat with an LLM. It contains a history of messages.
It has no access to messages from other pipes even if they are in the same session.
It returns the last obtained answer from the LLM.
"""A single conversational thread within a session.

- Maintains its own message history (isolated from other pipes).
- Materializes data and visualizations lazily on demand and caches results per pipe.
- Exposes helpers to get the latest dataframe/text/plot/code.
"""

def __init__(self, session: "Session", *, default_rows_limit: int = 1000):
self._session = session
self._default_rows_limit = default_rows_limit
Expand All @@ -32,9 +35,14 @@ def __init__(self, session: "Session", *, default_rows_limit: int = 1000):
self._opas: list[Opa] = []
self._meta: dict[str, Any] = {}

# A unique cache scope so agents can store per-thread state (e.g., message history)
self._cache_scope = f"{self._session.name}/{uuid.uuid4()}"

def _materialize_data(self, rows_limit: int | None) -> "ExecutionResult":
"""Materialize latest data state by executing pending OPAs if needed.

Reuses cached result unless new OPAs were added or the row limit changed.
"""
# TODO Recompute on rows_limit change without recomputing the last Opa
rows_limit = rows_limit if rows_limit else self._default_rows_limit
new_opas = self._opas[self._opas_processed_count :]
Expand All @@ -51,6 +59,7 @@ def _materialize_data(self, rows_limit: int | None) -> "ExecutionResult":
return self._data_result

def _materialize_visualization(self, request: str | None, rows_limit: int | None) -> "VisualisationResult":
"""Materialize latest visualization for the given request and current data."""
data = self._materialize_data(rows_limit)
if not self._visualization_materialized or request != self._visualization_request:
# TODO Cache visualization results as in Executor.execute()?
Expand All @@ -64,26 +73,43 @@ def _materialize_visualization(self, request: str | None, rows_limit: int | None
return self._visualization_result

def df(self, *, rows_limit: int | None = None) -> DataFrame | None:
"""Return the latest dataframe, materializing data as needed.

Args:
rows_limit: Optional override for the number of rows to materialize.
"""
return self._materialize_data(rows_limit if rows_limit else self._data_materialized_rows).df

def plot(self, request: str | None = None, *, rows_limit: int | None = None) -> "VisualisationResult":
"""Generate or return the latest visualization for the current data.

Args:
request: Optional natural-language plotting request.
rows_limit: Optional row limit for data materialization.
"""
# TODO Currently, we can't chain calls or maintain a "plot history": pipe.plot("red").plot("blue").
# We have to do pipe.plot("red"), but then pipe.plot("blue") is independent of the first call.
return self._materialize_visualization(request, rows_limit if rows_limit else self._data_materialized_rows)

def text(self) -> str:
"""Return the latest textual answer from the executor/LLM."""
return self._materialize_data(self._data_materialized_rows).text

def __str__(self) -> str:
return self.text()

def ask(self, query: str) -> "Pipe":
"""Append a new user query to this pipe.

Returns self to allow chaining (e.g., pipe.ask("..."))
"""
self._opas.append(Opa(query=query))
self._visualization_materialized = False
return self

@property
def meta(self) -> dict[str, Any]:
"""Aggregated metadata from executor/visualizer for this pipe."""
return self._meta

@property
Expand Down
Loading