Skip to content

Commit 94d30ef

Browse files
authored
Merge branch 'main' into feat/versioning
2 parents a60db44 + bf8c9f5 commit 94d30ef

File tree

87 files changed

+3861
-264
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

87 files changed

+3861
-264
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# Purpose of this module
2+
3+
The purpose of this module is to take Sphinx Furo themed documentation, pull the pages, and chunk the text
4+
for further processing, e.g. creating embeddings. This is fairly generic code that is easy to change
5+
and extend for your purposes. It runs anywhere that python runs, and can be extended to run on Ray, Dask,
6+
and even PySpark.
7+
8+
```python
9+
# import sphinx_doc_chunking via the means that you want. See above code.
10+
11+
from hamilton import driver
12+
13+
from hamilton.execution import executors
14+
15+
dr = (
16+
driver.Builder()
17+
.with_modules(sphinx_doc_chunking)
18+
.enable_dynamic_execution(allow_experimental_mode=True)
19+
.with_config({})
20+
# defaults to multi-threading -- and tasks control max concurrency
21+
.with_remote_executor(executors.MultiThreadingExecutor(max_tasks=25))
22+
.build()
23+
)
24+
```
25+
26+
## What you should modify
27+
28+
You'll likely want to:
29+
30+
1. play with what does the chunking and settings for that.
31+
2. change how URLs are sourced.
32+
3. change how text is extracted from a page.
33+
4. extend the code to hit an API to get embeddings.
34+
5. extend the code to push data to a vector database.
35+
36+
# Configuration Options
37+
There is no configuration required for this module.
38+
39+
# Limitations
40+
41+
You general multiprocessing caveats apply if you choose an executor other than MultiThreading. For example:
42+
43+
1. Serialization -- objects need to be serializable between processes.
44+
2. Concurrency/parallelism -- you're in control of this.
45+
3. Failures -- you'll need to make your code do the right thing here.
46+
4. Memory requirements -- the "collect" (or reduce) step pulls things into memory. If you hit this, this just
47+
means you need to redesign your code a little, e.g. write large things to a store and pass pointers.
48+
49+
To extend this to [PySpark see the examples folder](https://github.com/dagworks-inc/hamilton/tree/main/examples/LLM_Workflows/scraping_and_chunking/spark)
50+
for the changes required to adjust the code to handle PySpark.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
"""
2+
Things this module does.
3+
4+
1. takes in a sitemap.xml file and creates a list of all the URLs in the file.
5+
2. takes in a list of URLs and pulls the HTML from each URL.
6+
3. it then strips the HTML to the relevant body of HTML. We assume `furo themed sphinx docs`.
7+
html/body/div[class="page"]/div[class="main"]/div[class="content"]/div[class="article-container"]/article
8+
4. it then chunks the HTML into smaller pieces -- returning langchain documents
9+
5. what this doesn't do is create embeddings -- but that would be easy to extend.
10+
"""
11+
12+
import logging
13+
import re
14+
15+
logger = logging.getLogger(__name__)
16+
17+
from hamilton import contrib
18+
19+
with contrib.catch_import_errors(__name__, __file__, logger):
20+
import requests
21+
from langchain import text_splitter
22+
from langchain_core import documents
23+
24+
from hamilton.htypes import Collect, Parallelizable
25+
26+
27+
def sitemap_text(sitemap_url: str = "https://hamilton.dagworks.io/en/latest/sitemap.xml") -> str:
28+
"""Takes in a sitemap URL and returns the sitemap.xml file.
29+
30+
:param sitemap_url: the URL of sitemap.xml file
31+
:return:
32+
"""
33+
sitemap = requests.get(sitemap_url)
34+
return sitemap.text
35+
36+
37+
def urls_from_sitemap(sitemap_text: str) -> list[str]:
38+
"""Takes in a sitemap.xml file contents and creates a list of all the URLs in the file.
39+
40+
:param sitemap_text: the contents of a sitemap.xml file
41+
:return: list of URLs
42+
"""
43+
urls = re.findall(r"<loc>(.*?)</loc>", sitemap_text)
44+
return urls
45+
46+
47+
def url(urls_from_sitemap: list[str], max_urls: int = 1000) -> Parallelizable[str]:
48+
"""
49+
Takes in a list of URLs for parallel processing.
50+
51+
Note: this could be in a separate module, but it's here for simplicity.
52+
"""
53+
for url in urls_from_sitemap[0:max_urls]:
54+
yield url
55+
56+
57+
# --- Start Parallel Code ---
58+
# The following code is parallelized, once for each url.
59+
# This code could be in a separate module, but it's here for simplicity.
60+
61+
62+
def article_regex() -> str:
63+
"""This assumes you're using the furo theme for sphinx"""
64+
return r'<article role="main" id="furo-main-content">(.*?)</article>'
65+
66+
67+
def article_text(url: str, article_regex: str) -> str:
68+
"""Pulls URL and takes out relevant HTML.
69+
70+
:param url: the url to pull.
71+
:param article_regex: the regext to use to get the contents out of.
72+
:return: sub-portion of the HTML
73+
"""
74+
html = requests.get(url)
75+
article = re.findall(article_regex, html.text, re.DOTALL)
76+
if not article:
77+
raise ValueError(f"No article found in {url}")
78+
text = article[0].strip()
79+
return text
80+
81+
82+
def html_chunker() -> text_splitter.HTMLHeaderTextSplitter:
83+
"""Return HTML chunker object.
84+
85+
:return:
86+
"""
87+
headers_to_split_on = [
88+
("h1", "Header 1"),
89+
("h2", "Header 2"),
90+
("h3", "Header 3"),
91+
]
92+
return text_splitter.HTMLHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
93+
94+
95+
def text_chunker(
96+
chunk_size: int = 256, chunk_overlap: int = 32
97+
) -> text_splitter.RecursiveCharacterTextSplitter:
98+
"""Returns the text chunker object.
99+
100+
:param chunk_size:
101+
:param chunk_overlap:
102+
:return:
103+
"""
104+
return text_splitter.RecursiveCharacterTextSplitter(
105+
chunk_size=chunk_size, chunk_overlap=chunk_overlap
106+
)
107+
108+
109+
def chunked_text(
110+
article_text: str,
111+
html_chunker: text_splitter.HTMLHeaderTextSplitter,
112+
text_chunker: text_splitter.RecursiveCharacterTextSplitter,
113+
) -> list[documents.Document]:
114+
"""This function takes in HTML, chunks it, and then chunks it again.
115+
116+
It then outputs a list of langchain "documents". Multiple documents for one HTML header section is possible.
117+
118+
:param article_text:
119+
:param html_chunker:
120+
:param text_chunker:
121+
:return:
122+
"""
123+
header_splits = html_chunker.split_text(article_text)
124+
splits = text_chunker.split_documents(header_splits)
125+
return splits
126+
127+
128+
def url_result(url: str, article_text: str, chunked_text: list[documents.Document]) -> dict:
129+
"""Function to aggregate what we want to return from parallel processing.
130+
131+
Note: this function is where you could cache the results to a datastore.
132+
133+
:param url:
134+
:param article_text:
135+
:param chunked_text:
136+
:return:
137+
"""
138+
return {"url": url, "article_text": article_text, "chunks": chunked_text}
139+
140+
141+
# --- END Parallel Code ---
142+
143+
144+
def collect_chunked_url_text(url_result: Collect[dict]) -> list:
145+
"""Function to collect the results from parallel processing.
146+
Note: All results for `url_result` are pulled into memory here.
147+
So, if you have a lot of results, you may want to write them to a datastore and pass pointers.
148+
"""
149+
return list(url_result)
150+
151+
152+
if __name__ == "__main__":
153+
# code here for quickly testing the build of the code here.
154+
import __main__ as sphinx_doc_chunking
155+
156+
from hamilton import driver
157+
from hamilton.execution import executors
158+
159+
dr = (
160+
driver.Builder()
161+
.with_modules(sphinx_doc_chunking)
162+
.enable_dynamic_execution(allow_experimental_mode=True)
163+
.with_config({})
164+
.with_local_executor(executors.SynchronousLocalTaskExecutor())
165+
.with_remote_executor(executors.MultiThreadingExecutor(max_tasks=25))
166+
.build()
167+
)
168+
dr.display_all_functions("dag.png")
169+
result = dr.execute(
170+
["collect_chunked_url_text"],
171+
inputs={"chunk_size": 256, "chunk_overlap": 32},
172+
)
173+
# do something with the result...
174+
import pprint
175+
176+
for chunk in result["collect_chunked_url_text"]:
177+
pprint.pprint(chunk)
Loading
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
langchain
2+
langchain-core
3+
sf-hamilton[dask]
4+
# optionally install Ray, or Dask, or both
5+
sf-hamilton[ray]
6+
sf-hamilton[visualization]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"schema": "1.0",
3+
"use_case_tags": ["data processing", "document chunking", "chunking", "langchain"],
4+
"secondary_tags": {
5+
"language": "English"
6+
},
7+
"driver_tags": {
8+
"executor": "multithreading"
9+
}
10+
}

0 commit comments

Comments
 (0)