Skip to content

Commit

Permalink
Initial Commit
Browse files Browse the repository at this point in the history
  • Loading branch information
zprobst committed Oct 23, 2024
1 parent 1a866c6 commit 38f9147
Show file tree
Hide file tree
Showing 9 changed files with 1,181 additions and 635 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
# See here: https://docs.github.com/en/actions/deployment/targeting-different-environments/using-environments-for-deployment
environment:
name: pypi
url: https://pypi.org/p/nodestream-plugin-dotenv/
url: https://pypi.org/p/nodestream-plugin-semantic/

steps:
# Checkout the repository subject to the release.
Expand Down
15 changes: 1 addition & 14 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,2 @@
# Nodestream Dotenv Plugin
# Nodestream Semantic Plugin

This plugin allows you to load environment variables from a `.env` file into your nodestream application.

## Installation

```bash
pip install nodestream-plugin-dotenv
```

## Usage

By default, the plugin will look for a `.env` file in the current working directory. You can specify a different path by setting the `NODESTREAM_DOTENV_PATH` environment variable.

For more information on how to use the `.env` file, see the [python-dotenv](https://github.com/theskumar/python-dotenv#file-format) documentation.
37 changes: 37 additions & 0 deletions nodestream_plugin_semantic/chunk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from abc import ABC, abstractmethod
from typing import Iterable

from nodestream.subclass_registry import SubclassRegistry
from nodestream.pluggable import Pluggable

from .model import Content

CHUNKER_SUBCLASS_REGISTRY = SubclassRegistry()


@CHUNKER_SUBCLASS_REGISTRY.connect_baseclass
class Chunker(ABC, Pluggable):
"""Chunker is a mechanism to split a large document into smaller chunks.
The chunker is used to split a large document into smaller chunks.
The chunker is useful when the document is too large to be
semantically meaningful as one piece of content.
"""

entrypoint_name = "chunkers"

@staticmethod
def from_file_data(type, **chunker_kwargs) -> "Chunker":
return CHUNKER_SUBCLASS_REGISTRY.get(type)(**chunker_kwargs)

@abstractmethod
def chunk(self, content: Content) -> Iterable[Content]: ...


class SplitOnDelimiterChunker(Chunker):
def __init__(self, delimiter: str):
self.delimiter = delimiter

def chunk(self, content: Content) -> Iterable[Content]:
for item in content.split_on_delimiter(self.delimiter):
yield item
61 changes: 61 additions & 0 deletions nodestream_plugin_semantic/content_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from abc import ABC, abstractmethod
from typing import Iterable
from pathlib import Path

from nodestream.subclass_registry import SubclassRegistry


CONTENT_TYPE_SUBCLASS_REGISTRY = SubclassRegistry()
PLAIN_TEXT_ALIAS = "plain_text"
PLAIN_TEXT_EXTENSIONS = {".txt", ".md"}


@CONTENT_TYPE_SUBCLASS_REGISTRY.connect_baseclass
class ContentType(ABC):
"""Describes the mechanism to read a file of a specific content type."""

@classmethod
def all(cls) -> Iterable["ContentType"]:
cls.import_all() # Import all embedders to register them.
for sub in CONTENT_TYPE_SUBCLASS_REGISTRY.all_subclasses():
yield sub()

@classmethod
def by_name(cls, name: str) -> "ContentType":
cls.import_all() # Import all embedders to register them.
return CONTENT_TYPE_SUBCLASS_REGISTRY.get(name)()

@abstractmethod
def is_supported(self, file_path: Path) -> bool:
"""Returns True if the file extension is supported.
Args:
file_path (Path): The file path to check.
Returns:
bool: True if the file extension is supported, False otherwise.
"""
...

@abstractmethod
def read(self, file_path: Path) -> str:
"""Reads the content of the file.
Args:
file_path (Path): The file path to read.
Returns:
str: The content of the file.
"""
...


class PlainText(ContentType, alias=PLAIN_TEXT_ALIAS):
"""Reads plain text files."""

def is_supported(self, file_path: Path) -> bool:
return file_path.suffix in PLAIN_TEXT_EXTENSIONS

def read(self, file_path: Path) -> str:
with file_path.open("r") as f:
return f.read()
33 changes: 33 additions & 0 deletions nodestream_plugin_semantic/embed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from abc import ABC, abstractmethod

from nodestream.subclass_registry import SubclassRegistry
from nodestream.pluggable import Pluggable

from .model import Content, Embedding


EMBEDDER_SUBCLASS_REGISTRY = SubclassRegistry()


@EMBEDDER_SUBCLASS_REGISTRY.connect_baseclass
class Embedder(ABC, Pluggable):
"""Embedder is a mechanism to embed content into a vector space."""

entrypoint_name = "embedders"

@classmethod
def from_file_data(cls, type, **embedder_kwargs) -> "Embedder":
cls.import_all() # Import all embedders to register them.
return EMBEDDER_SUBCLASS_REGISTRY.get(type)(**embedder_kwargs)

@abstractmethod
async def embed(self, content: Content) -> Embedding:
"""Embeds the content into a vector space.
Args:
content (Content): The content to embed.
Returns:
Embedding: The embedding of the content.
"""
...
73 changes: 73 additions & 0 deletions nodestream_plugin_semantic/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from dataclasses import dataclass
import hashlib
from typing import List, Optional, Iterable

from nodestream.model import DesiredIngestion, Node


Embedding = List[float | int]
CONTENT_NODE_TYPE_ID_PROPERTY = "id"


def hash(content: str) -> str:
sha1 = hashlib.sha1()
sha1.update(content.encode())
return sha1.hexdigest()


@dataclass(slots=True)
class Content:
"""Content is a piece of text.
Content is a piece of text that can be embedded into a vector space.
"""

id: str
content: str
parent: Optional["Content"] = None
embedding: Optional[Embedding] = None
metadata: Optional[dict] = None

@classmethod
def from_text(
cls,
content: str,
parent: Optional["Content"] = None,
) -> "Content":
return cls(id=hash(content), content=content, parent=parent)

def add_metadata(self, key: str, value: str):
if not self.metadata:
self.metadata = {}
self.metadata[key] = value

def split_on_delimiter(self, delimiter: str) -> Iterable["Content"]:
for line in self.content.split(delimiter):
yield Content.from_text(line, parent=self)

def assign_embedding(self, embedding: Embedding):
self.embedding = embedding

def apply_to_node(self, node_type: str, node: Node):
node.type = node_type
node.key_values.set_property(CONTENT_NODE_TYPE_ID_PROPERTY, self.id)
node.properties.set_property("content", self.content)
if self.embedding:
node.properties.set_property("embedding", self.embedding)
if self.metadata:
for key, value in self.metadata.items():
node.properties.set_property(key, value)

def make_ingestible(
self, node_type: str, relationship_type: str
) -> DesiredIngestion:
ingest = DesiredIngestion()
self.apply_to_node(node_type, ingest.source)

if self.parent:
self.parent.apply_to_node(node_type, related := Node())
ingest.add_relationship(
related_node=related, type=relationship_type, outbound=False
)

return ingest
Loading

0 comments on commit 38f9147

Please sign in to comment.