Skip to content

Commit cd07eb9

Browse files
committed
Added process-wide temp directory utility and updated lancedb to support it.
1 parent c1911dd commit cd07eb9

7 files changed

Lines changed: 355 additions & 15 deletions

File tree

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@
1111
(assign, concat, longestStr, isIn, isNotIn, isTrue, isFalse, Hash), core.py (AbstractFieldSegment),
1212
search modules (whoosh.py, lancedb.py), llm modules (chat.py, embedding.py), data modules
1313
(mongo.py, text/chunking_units.py), and pipelines (basic_rag.py, vector_databases.py).
14+
- Added get_process_temp_dir() utility to allow creation of temporary directories that are removed
15+
when the process exists using `atexit`, but are consistent withing. aprocess.
16+
- Added support for `tmp://name` URI scheme in LanceDB path parameters. This enables process-scoped
17+
temporary databases that are automatically cleaned up on exit. Temporary databases with the same name
18+
share state within a process, making them ideal for testing or ephemeral workflows. Works alongside
19+
existing `memory://` (in-memory) and file path options. Implemented via `get_process_temp_dir()`.
1420

1521
## 0.10.0
1622
### New Features

src/talkpipe/app/chatterlang_workbench.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ def emit(self, record):
9494
{
9595
"name": "RAG Pipeline with Vector Database",
9696
"description": "Build a complete RAG system with document indexing and querying",
97-
"code": '# This example demonstrates a complete RAG (Retrieval-Augmented Generation) workflow.\n# It indexes documents into a vector database and then queries them with an LLM.\n\n# Sample knowledge base documents (in a real scenario, these would be from files or a database)\nCONST docs = "TalkPipe is a Python toolkit for building AI workflows. It provides a Unix-like pipeline syntax for chaining data transformations and LLM operations.|TalkPipe supports multiple LLM providers including OpenAI, Ollama, and Anthropic. You can switch between providers easily using configuration.|With TalkPipe, you can build RAG systems, multi-agent debates, and document processing pipelines. It uses Python generators for memory-efficient streaming.";\n\n# Step 1: Index documents into a vector database\nINPUT FROM echo[data=docs, delimiter="|"] \n | toDict[field_list="_:text"] \n | makeVectorDatabase[\n path="./demo_knowledge_base",\n embedding_model="nomic-embed-text",\n embedding_source="ollama",\n embedding_field="text"\n ] \n | print;\n\n# Step 2: Query the knowledge base with RAG\nINPUT FROM echo[data="What are the key benefits of using TalkPipe?"] \n | toDict[field_list="_:text"] \n | ragToText[\n path="./demo_knowledge_base",\n embedding_model="nomic-embed-text",\n embedding_source="ollama",\n completion_model="llama3.2",\n completion_source="ollama",\n content_field="text",\n prompt_directive="Answer the question based on the background information provided.",\n limit=3\n ] \n | print'
97+
"code": '# This example demonstrates a complete RAG (Retrieval-Augmented Generation) workflow.\n# It indexes documents into a vector database and then queries them with an LLM.\n\n# Sample knowledge base documents (in a real scenario, these would be from files or a database)\nCONST docs = "TalkPipe is a Python toolkit for building AI workflows. It provides a Unix-like pipeline syntax for chaining data transformations and LLM operations.|TalkPipe supports multiple LLM providers including OpenAI, Ollama, and Anthropic. You can switch between providers easily using configuration.|With TalkPipe, you can build RAG systems, multi-agent debates, and document processing pipelines. It uses Python generators for memory-efficient streaming.";\n\n# Step 1: Index documents into a vector database\nINPUT FROM echo[data=docs, delimiter="|"] \n | toDict[field_list="_:text"] \n | makeVectorDatabase[\n path="tmp://demo_knowledge_base",\n embedding_model="nomic-embed-text",\n embedding_source="ollama",\n embedding_field="text"\n ] \n | print;\n\n# Step 2: Query the knowledge base with RAG\nINPUT FROM echo[data="What are the key benefits of using TalkPipe?"] \n | toDict[field_list="_:text"] \n | ragToText[\n path="tmp://demo_knowledge_base",\n embedding_model="nomic-embed-text",\n embedding_source="ollama",\n completion_model="llama3.2",\n completion_source="ollama",\n content_field="text",\n prompt_directive="Answer the question based on the background information provided.",\n limit=3\n ] \n | print'
9898
}
9999
]
100100
}
@@ -732,7 +732,7 @@ def get_ui():
732732
<div id="cursorPosition">Line: 0, Column: 0</div>
733733
</div>
734734
<div class="button-group">
735-
<button id="compileButton">Compile Script</button>
735+
<button id="compileButton">Compile and Run Script</button>
736736
<button id="toggle-examples">Toggle Examples</button>
737737
<button id="log-button">Toggle Logs</button>
738738
<span id="compileLoadingIndicator" class="loading hidden">Compiling script...</span>

src/talkpipe/pipelines/basic_rag.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,19 @@ def transform(self, input_iter):
6161

6262
class AbstractRAGPipeline(AbstractSegment):
6363
""" Convenience segment that runs a RAG pipeline from search to prompt creation to LLM completion.
64+
65+
Path supports multiple URI schemes:
66+
- File path: "./my_db" or "/path/to/db" - Persistent storage
67+
- Memory: "memory://" - Ephemeral in-memory database (faster, no disk I/O)
68+
- Temp: "tmp://name" - Process-scoped temporary database (shared by name, auto-cleanup on exit)
6469
"""
65-
70+
6671
def __init__(self,
6772
embedding_model: Annotated[str, "Embedding model to use"],
6873
embedding_source: Annotated[str, "Source of text to embed"],
6974
completion_model: Annotated[str, "LLM model to use for completion"],
7075
completion_source: Annotated[str, "Source of prompt for completion"],
71-
path: Annotated[str, "Path to the LanceDB database"],
76+
path: Annotated[str, "Path to LanceDB database. Supports file paths, 'memory://' for in-memory, or 'tmp://name' for process-scoped temp (auto-cleanup)"],
7277
content_field: Annotated[Any, "Field to evaluate relevance on"],
7378
prompt_directive: Annotated[str, "Directive to guide the evaluation"] = "Respond to the provided content based on the background information. If the background does not contain relevant information, respond with 'No relevant information found.'",
7479
set_as: Annotated[str, "The field to set/append the result as."] = None,
@@ -113,14 +118,19 @@ def transform(self, input_iter):
113118
@register_segment("ragToText")
114119
class RAGToText(AbstractRAGPipeline):
115120
""" RAG pipeline that outputs text completions from LLM.
121+
122+
Path supports multiple URI schemes:
123+
- File path: "./my_db" or "/path/to/db" - Persistent storage
124+
- Memory: "memory://" - Ephemeral in-memory database (faster, no disk I/O)
125+
- Temp: "tmp://name" - Process-scoped temporary database (shared by name, auto-cleanup on exit)
116126
"""
117127

118128
def __init__(self,
119129
embedding_model: Annotated[str, "Embedding model to use"],
120130
embedding_source: Annotated[str, "Source of text to embed"],
121131
completion_model: Annotated[str, "LLM model to use for completion"],
122132
completion_source: Annotated[str, "Source of prompt for completion"],
123-
path: Annotated[str, "Path to the LanceDB database"],
133+
path: Annotated[str, "Path to LanceDB database. Supports file paths, 'memory://' for in-memory, or 'tmp://name' for process-scoped temp (auto-cleanup)"],
124134
content_field: Annotated[Any, "Field to evaluate relevance on"],
125135
prompt_directive: Annotated[str, "Directive to guide the evaluation"] = "Respond to the provided content based on the background information. If the background does not contain relevant information, respond with 'No relevant information found.'",
126136
set_as: Annotated[str, "The field to set/append the result as."] = None,
@@ -146,14 +156,19 @@ def make_completion_segment(self) -> AbstractSegment:
146156
@register_segment("ragToBinaryAnswer")
147157
class RAGToBinaryAnswer(AbstractRAGPipeline):
148158
""" RAG pipeline that outputs binary answers from LLM.
159+
160+
Path supports multiple URI schemes:
161+
- File path: "./my_db" or "/path/to/db" - Persistent storage
162+
- Memory: "memory://" - Ephemeral in-memory database (faster, no disk I/O)
163+
- Temp: "tmp://name" - Process-scoped temporary database (shared by name, auto-cleanup on exit)
149164
"""
150165

151166
def __init__(self,
152167
embedding_model: Annotated[str, "Embedding model to use"],
153168
embedding_source: Annotated[str, "Source of text to embed"],
154169
completion_model: Annotated[str, "LLM model to use for completion"],
155170
completion_source: Annotated[str, "Source of prompt for completion"],
156-
path: Annotated[str, "Path to the LanceDB database"],
171+
path: Annotated[str, "Path to LanceDB database. Supports file paths, 'memory://' for in-memory, or 'tmp://name' for process-scoped temp (auto-cleanup)"],
157172
content_field: Annotated[Any, "Field to evaluate relevance on"],
158173
prompt_directive: Annotated[str, "Directive to guide the evaluation"] = "Answer the provided question as YES or NO. If the background does not contain relevant information, respond with 'NO'.",
159174
set_as: Annotated[str, "The field to set/append the result as."] = None,
@@ -180,14 +195,19 @@ def make_completion_segment(self) -> AbstractSegment:
180195
@register_segment("ragToScore")
181196
class RAGToScore(AbstractRAGPipeline):
182197
""" RAG pipeline that outputs scores from LLM.
198+
199+
Path supports multiple URI schemes:
200+
- File path: "./my_db" or "/path/to/db" - Persistent storage
201+
- Memory: "memory://" - Ephemeral in-memory database (faster, no disk I/O)
202+
- Temp: "tmp://name" - Process-scoped temporary database (shared by name, auto-cleanup on exit)
183203
"""
184204

185205
def __init__(self,
186206
embedding_model: Annotated[str, "Embedding model to use"],
187207
embedding_source: Annotated[str, "Source of text to embed"],
188208
completion_model: Annotated[str, "LLM model to use for completion"],
189209
completion_source: Annotated[str, "Source of prompt for completion"],
190-
path: Annotated[str, "Path to the LanceDB database"],
210+
path: Annotated[str, "Path to LanceDB database. Supports file paths, 'memory://' for in-memory, or 'tmp://name' for process-scoped temp (auto-cleanup)"],
191211
content_field: Annotated[Any, "Field to evaluate relevance on"],
192212
prompt_directive: Annotated[str, "Directive to guide the evaluation"] = "Answer the provided question on a scale of 1 to 10. If the background does not contain relevant information, respond with a score of 1.",
193213
set_as: Annotated[str, "The field to set/append the result as."] = None,

src/talkpipe/pipelines/vector_databases.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,18 @@ class MakeVectorDatabaseSegment(AbstractSegment):
1010
1111
This segment expects dictionary inputs representing documents.
1212
It embeds the specified field and stores the documents with their embeddings in LanceDB.
13+
14+
Path supports multiple URI schemes:
15+
- File path: "./my_db" or "/path/to/db" - Persistent storage
16+
- Memory: "memory://" - Ephemeral in-memory database (faster, no disk I/O)
17+
- Temp: "tmp://name" - Process-scoped temporary database (shared by name, auto-cleanup on exit)
1318
"""
1419

1520
def __init__(self,
1621
embedding_field: Annotated[str, "Field to use for embeddings"],
1722
embedding_model: Annotated[str, "Embedding model to use"],
1823
embedding_source: Annotated[str, "Source of text to embed"],
19-
path: Annotated[str, "Path to the LanceDB database"],
24+
path: Annotated[str, "Path to LanceDB database. Supports file paths, 'memory://', or 'tmp://name'"],
2025
table_name: Annotated[str, "Name of the table in the database"] = "docs",
2126
doc_id_field: Annotated[Optional[str], "Field containing document ID"] = None,
2227
overwrite: Annotated[bool, "If true, overwrite existing table"] = False,
@@ -52,12 +57,17 @@ class SearchVectorDatabaseSegment(AbstractSegment):
5257
search results are yielded (set_as must be None).
5358
- If query_field is specified: Expects dictionary inputs, embeds the specified field,
5459
and search results can be yielded directly (set_as=None) or attached to the input item.
60+
61+
Path supports multiple URI schemes:
62+
- File path: "./my_db" or "/path/to/db" - Persistent storage
63+
- Memory: "memory://" - Ephemeral in-memory database (faster, no disk I/O)
64+
- Temp: "tmp://name" - Process-scoped temporary database (shared by name, auto-cleanup on exit)
5565
"""
5666

5767
def __init__(self,
5868
embedding_model: Annotated[str, "Embedding model to use"],
5969
embedding_source: Annotated[str, "Source of text to embed"],
60-
path: Annotated[str, "Path to the LanceDB database"],
70+
path: Annotated[str, "Path to LanceDB database. Supports file paths, 'memory://' for in-memory, or 'tmp://name' for process-scoped temp (auto-cleanup)"],
6171
table_name: Annotated[str, "Name of the table in the database"] = "docs",
6272
query_field: Annotated[Optional[str], "Field containing the query text to embed. If None, expects string inputs."] = None,
6373
limit: Annotated[int, "Number of search results to return"] = 10,

src/talkpipe/search/lancedb.py

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,56 @@
77
from talkpipe.chatterlang import register_segment
88
from talkpipe import segment
99
from talkpipe.util.data_manipulation import extract_property, VectorLike, Document, DocID, toDict, assign_property
10+
from talkpipe.util.os import get_process_temp_dir
1011
from .abstract import DocumentStore, VectorAddable, VectorSearchable, SearchResult
1112

1213
logger = logging.getLogger(__name__)
1314

15+
16+
def parse_db_path(path: str) -> str:
17+
"""
18+
Parse database path, handling special URI schemes.
19+
20+
Supported schemes:
21+
- Regular paths: "/path/to/db" -> "/path/to/db"
22+
- Memory DBs: "memory://" or "" -> passes through to LanceDB
23+
- Temp DBs: "tmp://name" -> process-wide temp directory path
24+
25+
Args:
26+
path: Database path or URI
27+
28+
Returns:
29+
Resolved path suitable for lancedb.connect()
30+
31+
Examples:
32+
>>> parse_db_path("/data/mydb")
33+
"/data/mydb"
34+
35+
>>> parse_db_path("memory://")
36+
"memory://"
37+
38+
>>> parse_db_path("tmp://my_cache")
39+
"/tmp/talkpipe_tmp/my_cache" # actual temp dir
40+
41+
Raises:
42+
ValueError: If tmp:// URI has no name
43+
"""
44+
if path.startswith("tmp://"):
45+
# Extract name from URI
46+
name = path[6:] # Remove "tmp://" prefix
47+
if not name:
48+
raise ValueError("tmp:// URI requires a name (e.g., tmp://my_db)")
49+
50+
# Get process-wide temp directory
51+
return get_process_temp_dir(name)
52+
53+
# Pass through other URIs/paths
54+
return path
55+
1456
@register_segment("searchLanceDB", "searchLancDB")
1557
@segment()
1658
def search_lancedb(items: Annotated[object, "Items with the query vectors"],
17-
path: Annotated[str, "Path to the LanceDB database"],
59+
path: Annotated[str, "Path to the LanceDB database. Supports file paths, 'memory://' for in-memory, or 'tmp://name' for process-scoped temp (auto-cleanup)"],
1860
table_name: Annotated[str, "Table name in the LanceDB database"],
1961
all_results_at_once: Annotated[bool, "If true, return all results at once"]=False,
2062
field: Annotated[str, "Field with the vector"]=None,
@@ -25,6 +67,11 @@ def search_lancedb(items: Annotated[object, "Items with the query vectors"],
2567
):
2668
"""Search for similar vectors in LanceDB and return SearchResult objects.
2769
70+
The path parameter supports multiple URI schemes:
71+
- File path: "./my_db" or "/path/to/db" - Persistent storage
72+
- Memory: "memory://" - Ephemeral in-memory database (faster, no disk I/O)
73+
- Temp: "tmp://name" - Process-scoped temporary database (shared by name, auto-cleanup on exit)
74+
2875
Yields:
2976
SearchResult objects or lists of SearchResult objects.
3077
"""
@@ -59,7 +106,7 @@ def search_lancedb(items: Annotated[object, "Items with the query vectors"],
59106
@register_segment("addToLanceDB", "addToLancDB")
60107
@segment()
61108
def add_to_lancedb(items: Annotated[object, "Items with the vectors and documents"],
62-
path: Annotated[str, "Path to the LanceDB database"],
109+
path: Annotated[str, "Path to the LanceDB database. Supports file paths, 'memory://' for in-memory, or 'tmp://name' for process-scoped temp (auto-cleanup)"],
63110
table_name: Annotated[str, "Table name in the LanceDB database"],
64111
vector_field: Annotated[str, "The field containing the vector data"] = "vector",
65112
doc_id_field: Annotated[Optional[str], "Field containing document ID"] = None,
@@ -69,6 +116,11 @@ def add_to_lancedb(items: Annotated[object, "Items with the vectors and document
69116
):
70117
"""Add vectors and documents to LanceDB using LanceDBDocumentStore.
71118
119+
The path parameter supports multiple URI schemes:
120+
- File path: "./my_db" or "/path/to/db" - Persistent storage
121+
- Memory: "memory://" - Ephemeral in-memory database (faster, no disk I/O)
122+
- Temp: "tmp://name" - Process-scoped temporary database (shared by name, auto-cleanup on exit)
123+
72124
Returns:
73125
The original items with the document IDs added.
74126
"""
@@ -136,11 +188,15 @@ def __init__(self, path: str, table_name: str = "documents", vector_dim: Optiona
136188
Initialize the LanceDB document store.
137189
138190
Args:
139-
path: Path to the LanceDB database
191+
path: Path to the LanceDB database. Supports:
192+
- Regular paths: "/path/to/db"
193+
- Memory DBs: "memory://"
194+
- Temp DBs: "tmp://name" (process-wide, auto-cleanup)
140195
table_name: Name of the table to store documents in
141196
vector_dim: Expected dimension of vectors (optional, inferred from first vector)
142197
"""
143-
self.path = path
198+
self.original_path = path # Keep original for reference
199+
self.path = parse_db_path(path) # Resolve tmp:// and other URIs
144200
self.table_name = table_name
145201
self.vector_dim = vector_dim
146202
self._db = None

0 commit comments

Comments
 (0)