Skip to content

Commit 59edd9a

Browse files
committed
Adds document chunking example to contrib
Things this module does: 1. takes in a sitemap.xml file and creates a list of all the URLs in the file. 2. takes in a list of URLs and pulls the HTML from each URL. 3. it then strips the HTML to the relevant body of HTML. We assume `furo themed sphinx docs`. html/body/div[class="page"]/div[class="main"]/div[class="content"]/div[class="article-container"]/article 4. it then chunks the HTML into smaller pieces -- returning langchain documents 5. what this doesn't do is create embeddings -- but that would be easy to extend.
1 parent e93b829 commit 59edd9a

File tree

9 files changed

+678
-0
lines changed

9 files changed

+678
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# Purpose of this module
2+
3+
The purpose of this module is to take Sphinx Furo themed documentation, pull the pages, and chunk the text
4+
for further processing, e.g. creating embeddings. This is fairly generic code that is easy to change
5+
and extend for your purposes. It runs anywhere that python runs, and can be extended to run on Ray, Dask,
6+
and even PySpark.
7+
8+
```python
9+
# import sphinx_doc_chunking via the means that you want. See above code.
10+
11+
from hamilton import driver
12+
13+
from hamilton.execution import executors
14+
15+
dr = (
16+
driver.Builder()
17+
.with_modules(sphinx_doc_chunking)
18+
.enable_dynamic_execution(allow_experimental_mode=True)
19+
.with_config({})
20+
# defaults to multi-threading -- and tasks control max concurrency
21+
.with_remote_executor(executors.MultiThreadingExecutor(max_tasks=25))
22+
.build()
23+
)
24+
```
25+
26+
## What you should modify
27+
28+
You'll likely want to:
29+
30+
1. play with what does the chunking and settings for that.
31+
2. change how URLs are sourced.
32+
3. change how text is extracted from a page.
33+
4. extend the code to hit an API to get embeddings.
34+
5. extend the code to push data to a vector database.
35+
36+
# Configuration Options
37+
There is no configuration required for this module.
38+
39+
# Limitations
40+
41+
You general multiprocessing caveats apply if you choose an executor other than MultiThreading. For example:
42+
43+
1. Serialization -- objects need to be serializable between processes.
44+
2. Concurrency/parallelism -- you're in control of this.
45+
3. Failures -- you'll need to make your code do the right thing here.
46+
4. Memory requirements -- the "collect" (or reduce) step pulls things into memory. If you hit this, this just
47+
means you need to redesign your code a little, e.g. write large things to a store and pass pointers.
48+
49+
To extend this to [PySpark see the examples folder](https://github.com/dagworks-inc/hamilton/tree/main/examples/LLM_Workflows/scraping_and_chunking/spark)
50+
for the changes required to adjust the code to handle PySpark.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
"""
2+
Things this module does.
3+
4+
1. takes in a sitemap.xml file and creates a list of all the URLs in the file.
5+
2. takes in a list of URLs and pulls the HTML from each URL.
6+
3. it then strips the HTML to the relevant body of HTML. We assume `furo themed sphinx docs`.
7+
html/body/div[class="page"]/div[class="main"]/div[class="content"]/div[class="article-container"]/article
8+
4. it then chunks the HTML into smaller pieces -- returning langchain documents
9+
5. what this doesn't do is create embeddings -- but that would be easy to extend.
10+
"""
11+
12+
import re
13+
14+
import requests
15+
from langchain import text_splitter
16+
from langchain_core import documents
17+
18+
from hamilton.htypes import Collect, Parallelizable
19+
20+
21+
def sitemap_text(sitemap_url: str = "https://hamilton.dagworks.io/en/latest/sitemap.xml") -> str:
22+
"""Takes in a sitemap URL and returns the sitemap.xml file.
23+
24+
:param sitemap_url: the URL of sitemap.xml file
25+
:return:
26+
"""
27+
sitemap = requests.get(sitemap_url)
28+
return sitemap.text
29+
30+
31+
def urls_from_sitemap(sitemap_text: str) -> list[str]:
32+
"""Takes in a sitemap.xml file contents and creates a list of all the URLs in the file.
33+
34+
:param sitemap_text: the contents of a sitemap.xml file
35+
:return: list of URLs
36+
"""
37+
urls = re.findall(r"<loc>(.*?)</loc>", sitemap_text)
38+
return urls
39+
40+
41+
def url(urls_from_sitemap: list[str], max_urls: int = 1000) -> Parallelizable[str]:
42+
"""
43+
Takes in a list of URLs for parallel processing.
44+
45+
Note: this could be in a separate module, but it's here for simplicity.
46+
"""
47+
for url in urls_from_sitemap[0:max_urls]:
48+
yield url
49+
50+
51+
# --- Start Parallel Code ---
52+
# The following code is parallelized, once for each url.
53+
# This code could be in a separate module, but it's here for simplicity.
54+
55+
56+
def article_regex() -> str:
57+
"""This assumes you're using the furo theme for sphinx"""
58+
return r'<article role="main" id="furo-main-content">(.*?)</article>'
59+
60+
61+
def article_text(url: str, article_regex: str) -> str:
62+
"""Pulls URL and takes out relevant HTML.
63+
64+
:param url: the url to pull.
65+
:param article_regex: the regext to use to get the contents out of.
66+
:return: sub-portion of the HTML
67+
"""
68+
html = requests.get(url)
69+
article = re.findall(article_regex, html.text, re.DOTALL)
70+
if not article:
71+
raise ValueError(f"No article found in {url}")
72+
text = article[0].strip()
73+
return text
74+
75+
76+
def html_chunker() -> text_splitter.HTMLHeaderTextSplitter:
77+
"""Return HTML chunker object.
78+
79+
:return:
80+
"""
81+
headers_to_split_on = [
82+
("h1", "Header 1"),
83+
("h2", "Header 2"),
84+
("h3", "Header 3"),
85+
]
86+
return text_splitter.HTMLHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
87+
88+
89+
def text_chunker(
90+
chunk_size: int = 256, chunk_overlap: int = 32
91+
) -> text_splitter.RecursiveCharacterTextSplitter:
92+
"""Returns the text chunker object.
93+
94+
:param chunk_size:
95+
:param chunk_overlap:
96+
:return:
97+
"""
98+
return text_splitter.RecursiveCharacterTextSplitter(
99+
chunk_size=chunk_size, chunk_overlap=chunk_overlap
100+
)
101+
102+
103+
def chunked_text(
104+
article_text: str,
105+
html_chunker: text_splitter.HTMLHeaderTextSplitter,
106+
text_chunker: text_splitter.RecursiveCharacterTextSplitter,
107+
) -> list[documents.Document]:
108+
"""This function takes in HTML, chunks it, and then chunks it again.
109+
110+
It then outputs a list of langchain "documents". Multiple documents for one HTML header section is possible.
111+
112+
:param article_text:
113+
:param html_chunker:
114+
:param text_chunker:
115+
:return:
116+
"""
117+
header_splits = html_chunker.split_text(article_text)
118+
splits = text_chunker.split_documents(header_splits)
119+
return splits
120+
121+
122+
def url_result(url: str, article_text: str, chunked_text: list[documents.Document]) -> dict:
123+
"""Function to aggregate what we want to return from parallel processing.
124+
125+
Note: this function is where you could cache the results to a datastore.
126+
127+
:param url:
128+
:param article_text:
129+
:param chunked_text:
130+
:return:
131+
"""
132+
return {"url": url, "article_text": article_text, "chunks": chunked_text}
133+
134+
135+
# --- END Parallel Code ---
136+
137+
138+
def collect_chunked_url_text(url_result: Collect[dict]) -> list:
139+
"""Function to collect the results from parallel processing.
140+
Note: All results for `url_result` are pulled into memory here.
141+
So, if you have a lot of results, you may want to write them to a datastore and pass pointers.
142+
"""
143+
return list(url_result)
144+
145+
146+
if __name__ == "__main__":
147+
# code here for quickly testing the build of the code here.
148+
import __main__ as sphinx_doc_chunking
149+
150+
from hamilton import driver
151+
from hamilton.execution import executors
152+
153+
dr = (
154+
driver.Builder()
155+
.with_modules(sphinx_doc_chunking)
156+
.enable_dynamic_execution(allow_experimental_mode=True)
157+
.with_config({})
158+
.with_local_executor(executors.SynchronousLocalTaskExecutor())
159+
.with_remote_executor(executors.MultiThreadingExecutor(max_tasks=25))
160+
.build()
161+
)
162+
dr.display_all_functions("dag.png")
Loading
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
langchain
2+
langchain-core
3+
sf-hamilton[dask]
4+
# optionally install Ray, or Dask, or both
5+
sf-hamilton[ray]
6+
sf-hamilton[visualization]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{
2+
"schema": "1.0",
3+
"use_case_tags": ["data processing", "document chunking", "chunking", "langchain"],
4+
"secondary_tags": {
5+
"language": "English"
6+
}
7+
}

0 commit comments

Comments
 (0)