Skip to content

Commit 3c1b089

Browse files
authored
feat: Ingest CLI flags and test fixture updates (#227)
* Many command line options added. The sample ingest project is now an easy to use CLI (no code editing necessary), capable of processing large numbers of files from S3 in a re-entrant manner. See Ingest.md. * Fixes issue where text fixtures had been truncated * Adds a check to make sure this doesn't happen again * Moves fixture outputs for the existing connector one subdir lower, to make room for future connector outputs.
1 parent 74e6b84 commit 3c1b089

File tree

13 files changed

+3049
-70
lines changed

13 files changed

+3049
-70
lines changed

Diff for: .gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ dmypy.json
133133
/structured-output
134134
# ingest temporary files
135135
/tmp-ingest*
136+
# suggested ingest mirror directory
137+
/mirror
136138

137139
## https://github.com/github/gitignore/blob/main/Global/Emacs.gitignore (partial)
138140

Diff for: CHANGELOG.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
## 0.4.9
22

3-
* Added ingest modules and s3 connector
3+
* Added ingest modules and s3 connector, sample ingest script
44
* Default to `url=None` for `partition_pdf` and `partition_image`
55
* Add ability to skip English specific check by setting the `UNSTRUCTURED_LANGUAGE` env var to `""`.
66
* Document `Element` objects now track metadata

Diff for: Ingest.md

+46-11
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,50 @@
11
# Batch Processing Documents
22

3-
Several classes are provided in the Unstructured library
4-
to enable effecient batch processing of documents.
3+
## Sample Connector: S3
4+
5+
See the sample project [examples/ingest/s3-small-batch/main.py](examples/ingest/s3-small-batch/main.py), which processes all the documents under a given s3 URL with 2 parallel processes, writing the structured json output to `structured-outputs/`.
6+
7+
You can try it out with:
8+
9+
PYTHONPATH=. python examples/ingest/s3-small-batch/main.py --s3-url s3://utic-dev-tech-fixtures/small-pdf-set/ --anonymous
10+
11+
# Note: the --anonymous flag indicates not to provide AWS credentials, needed
12+
# for the boto3 lib. Remove this flag when local AWS credentials are required.
13+
14+
This utility is ready to use with any s3 prefix!
15+
16+
By default, it will not reprocess files from s3 if their outputs already exist in --structured-ouput-dir. Natrually, this may come in handy when processing a large number of files. However, you can force reprocessing all documents with the --reprocess flag.
17+
18+
19+
```
20+
$ PYTHONPATH=. python examples/ingest/s3-small-batch/main.py --help
21+
Usage: main.py [OPTIONS]
22+
23+
Options:
24+
--s3-url TEXT Prefix of s3 objects (files) to download.
25+
E.g. s3://bucket1/path/. This value may also
26+
be a single file.
27+
--re-download / --no-re-download
28+
Re-download files from s3 even if they are
29+
already present in --download-dir.
30+
--download-dir TEXT Where s3 files are downloaded to, defaults
31+
to tmp-ingest-<6 random chars>.
32+
--preserve-downloads Preserve downloaded s3 files. Otherwise each
33+
file is removed after being processed
34+
successfully.
35+
--structured-output-dir TEXT Where to place structured output .json
36+
files.
37+
--reprocess Reprocess a downloaded file from s3 even if
38+
the relevant structured output .json file in
39+
--structured-output-dir already exists.
40+
--num-processes INTEGER Number of parallel processes to process docs
41+
in. [default: 2]
42+
--anonymous Connect to s3 without local AWS credentials.
43+
-v, --verbose
44+
--help Show this message and exit.
45+
```
46+
47+
# Developer notes
548

649
## The Abstractions
750

@@ -25,12 +68,4 @@ sequenceDiagram
2568
Note over MainProcess: Optional - process structured data from all docs
2669
```
2770

28-
## Sample Connector: S3
29-
30-
See the sample project [examples/ingest/s3-small-batch/main.py](examples/ingest/s3-small-batch/main.py), which processes all the documents under a given s3 URL with 2 parallel processes, writing the structured json output to `structured-outputs/`.
31-
32-
You can try it out with
33-
34-
PYTHONPATH=. python examples/ingest/s3-small-batch/main.py
35-
36-
The abstractions in the above diagram are honored in this project (though ABC's are not yet written), with the exception of the StructuredDocWriter which may be added more formally at a later time.
71+
The abstractions in the above diagram are honored in the S3 Connector project (though ABC's are not yet written), with the exception of the StructuredDocWriter which may be added more formally at a later time.

Diff for: examples/ingest/s3-small-batch/main.py

+77-22
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,109 @@
11
import multiprocessing as mp
22
import os
3+
import random
4+
import string
5+
6+
import click
7+
38
from unstructured.ingest.connector.s3_connector import S3Connector, SimpleS3Config
4-
from unstructured.ingest.doc_processor.generalized import process_document
9+
from unstructured.ingest.doc_processor.generalized import initialize, process_document
510

611
class MainProcess:
712

8-
def __init__(self, doc_connector, doc_processor_fn, num_processes):
13+
def __init__(self, doc_connector, doc_processor_fn, num_processes, reprocess):
914
# initialize the reader and writer
1015
self.doc_connector = doc_connector
1116
self.doc_processor_fn = doc_processor_fn
1217
self.num_processes = num_processes
13-
18+
self.reprocess = reprocess
1419

1520
def initialize(self):
1621
"""Slower initialization things: check connections, load things into memory, etc."""
17-
self.doc_connector.initialize()
18-
22+
initialize()
23+
1924
def cleanup(self):
2025
self.doc_connector.cleanup()
2126

27+
def _filter_docs_with_outputs(self, docs):
28+
num_docs_all = len(docs)
29+
docs = [doc for doc in docs if not doc.has_output()]
30+
num_docs_to_process = len(docs)
31+
if num_docs_to_process == 0:
32+
print("All docs have structured outputs, nothing to do. Use --reprocess to process all.")
33+
return None
34+
elif num_docs_to_process != num_docs_all:
35+
print(f"Skipping processing for {num_docs_all - num_docs_to_process} docs out of "
36+
f"{num_docs_all} since their structured outputs already exist, use --reprocess to "
37+
"reprocess those in addition to the unprocessed ones.")
38+
return docs
39+
2240
def run(self):
2341
self.initialize()
24-
42+
2543
self.doc_connector.fetch_docs()
2644

2745
# fetch the list of lazy downloading IngestDoc obj's
2846
docs = self.doc_connector.fetch_docs()
2947

48+
# remove docs that have already been processed
49+
if not self.reprocess:
50+
docs = self._filter_docs_with_outputs(docs)
51+
if not docs:
52+
return
53+
3054
# Debugging tip: use the below line and comment out the mp.Pool loop
3155
# block to remain in single process
32-
#self.doc_processor_fn(docs[0])
33-
56+
# self.doc_processor_fn(docs[0])
57+
3458
with mp.Pool(processes=self.num_processes) as pool:
3559
results = pool.map(self.doc_processor_fn, docs)
36-
60+
3761
self.cleanup()
3862

39-
@staticmethod
40-
def main():
41-
doc_connector = S3Connector(
42-
config=SimpleS3Config(
43-
s3_url="s3://utic-dev-tech-fixtures/small-pdf-set/",
44-
output_dir="structured-output",
45-
# set to False to use your AWS creds (not needed for this public s3 url)
46-
anonymous=True,
47-
),
63+
@click.command()
64+
@click.option('--s3-url', default="s3://utic-dev-tech-fixtures/small-pdf-set/",
65+
help="Prefix of s3 objects (files) to download. E.g. s3://bucket1/path/. This value may also be a single file.")
66+
@click.option('--re-download/--no-re-download', default=False,
67+
help="Re-download files from s3 even if they are already present in --download-dir.")
68+
@click.option('--download-dir',
69+
help="Where s3 files are downloaded to, defaults to tmp-ingest-<6 random chars>." )
70+
@click.option('--preserve-downloads', is_flag=True, default=False,
71+
help="Preserve downloaded s3 files. Otherwise each file is removed after being processed successfully." )
72+
@click.option('--structured-output-dir', default="structured-output",
73+
help="Where to place structured output .json files.")
74+
@click.option('--reprocess', is_flag=True, default=False,
75+
help="Reprocess a downloaded file from s3 even if the relevant structured output .json file in --structured-output-dir already exists.")
76+
@click.option('--num-processes', default=2, show_default=True,
77+
help="Number of parallel processes to process docs in.")
78+
@click.option('--anonymous', is_flag=True, default=False,
79+
help="Connect to s3 without local AWS credentials.")
80+
@click.option('-v', '--verbose', is_flag=True, default=False)
81+
def main(s3_url, re_download, download_dir, preserve_downloads, structured_output_dir,
82+
reprocess, num_processes, anonymous, verbose):
83+
if not preserve_downloads and download_dir:
84+
print("Warning: not preserving downloaded s3 files but --download_dir is specified")
85+
if not download_dir:
86+
download_dir = "tmp-ingest-" + "".join(
87+
random.choice(string.ascii_letters) for i in range(6)
4888
)
49-
MainProcess(doc_connector=doc_connector,
50-
doc_processor_fn=process_document,
51-
num_processes=2).run()
89+
doc_connector = S3Connector(
90+
config=SimpleS3Config(
91+
download_dir=download_dir,
92+
s3_url=s3_url,
93+
output_dir=structured_output_dir,
94+
# set to False to use your AWS creds (not needed for this public s3 url)
95+
anonymous=anonymous,
96+
re_download=re_download,
97+
preserve_downloads=preserve_downloads,
98+
verbose=verbose,
99+
),
100+
)
101+
MainProcess(doc_connector=doc_connector,
102+
doc_processor_fn=process_document,
103+
num_processes=num_processes,
104+
reprocess=reprocess,
105+
).run()
52106

107+
53108
if __name__ == '__main__':
54-
MainProcess.main()
109+
main()

0 commit comments

Comments
 (0)