Skip to content

Commit cfd4b52

Browse files
authored
bugfix/uncompress process in pipeline (#83)
1 parent f819f02 commit cfd4b52

File tree

8 files changed

+48
-37
lines changed

8 files changed

+48
-37
lines changed

CHANGELOG.md

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
1-
## 0.0.9-dev1
1+
## 0.0.9
22

33
### Enhancements
44

5-
**Chroma dict settings should allow string inputs
6-
**Move opensearch non-secret fields out of access config
7-
**Support string inputs for dict type model fields** Use the `BeforeValidator` support from pydantic to map a string value to a dict if that's provided.
5+
* **Chroma dict settings should allow string inputs**
6+
* **Move opensearch non-secret fields out of access config**
7+
* **Support string inputs for dict type model fields** Use the `BeforeValidator` support from pydantic to map a string value to a dict if that's provided.
8+
* **Move opensearch non-secret fields out of access config
9+
10+
### Fixes
11+
12+
**Fix uncompress logic** Use of the uncompress process wasn't being leveraged in the pipeline correctly. Updated to use the new loca download path for where the partitioned looks for the new file.
13+
>>>>>>> d7a2cab (Add entry to changelog)
814
915
## 0.0.8
1016

test_e2e/src/s3-compression.sh

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ PYTHONPATH=${PYTHONPATH:-.} "$RUN_SCRIPT" \
3434
--remote-url s3://utic-dev-tech-fixtures/small-pdf-set-w-compression/ \
3535
--anonymous \
3636
--work-dir "$WORK_DIR" \
37-
--uncompress
37+
--uncompress \
38+
--file-glob "*.pdf,*.zip,*.tgz"
3839

39-
"$SCRIPT_DIR"/check-num-files-output.sh 12 $OUTPUT_FOLDER_NAME
40+
"$SCRIPT_DIR"/check-num-files-output.sh 18 $OUTPUT_FOLDER_NAME
4041

4142
"$SCRIPT_DIR"/evaluation-ingest-cp.sh "$OUTPUT_DIR" "$OUTPUT_FOLDER_NAME"

test_e2e/test-src.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ all_tests=(
2626
'biomed-path.sh'
2727
# NOTE(yuming): The pdf-fast-reprocess test should be put after any tests that save downloaded files
2828
'pdf-fast-reprocess.sh'
29+
's3-compression.sh'
2930
'salesforce.sh'
3031
'box.sh'
3132
'discord.sh'

unstructured_ingest/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.0.9-dev1" # pragma: no cover
1+
__version__ = "0.0.9" # pragma: no cover

unstructured_ingest/v2/interfaces/file_data.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class FileData(DataClassJsonMixin):
4242
metadata: FileDataSourceMetadata = field(default_factory=lambda: FileDataSourceMetadata())
4343
additional_metadata: dict[str, Any] = field(default_factory=dict)
4444
reprocess: bool = False
45+
local_download_path: Optional[str] = None
4546

4647
@classmethod
4748
def from_file(cls, path: str) -> "FileData":

unstructured_ingest/v2/pipeline/steps/download.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,9 @@ def should_download(self, file_data: FileData, file_data_path: str) -> bool:
6868
def update_file_data(
6969
self, file_data: FileData, file_data_path: Path, download_path: Path
7070
) -> None:
71+
file_data.local_download_path = str(download_path.resolve())
7172
file_size_bytes = download_path.stat().st_size
72-
changed = False
7373
if not file_data.metadata.filesize_bytes and file_size_bytes:
74-
changed = True
7574
file_data.metadata.filesize_bytes = file_size_bytes
7675
if (
7776
file_data.metadata.filesize_bytes
@@ -82,12 +81,10 @@ def update_file_data(
8281
f"({file_data.metadata.filesize_bytes}) doesn't "
8382
f"match size of local file: {file_size_bytes}, updating"
8483
)
85-
changed = True
8684
file_data.metadata.filesize_bytes = file_size_bytes
87-
if changed:
88-
logger.debug(f"Updating file data with new content: {file_data.to_dict()}")
89-
with file_data_path.open("w") as file:
90-
json.dump(file_data.to_dict(), file, indent=2)
85+
logger.debug(f"Updating file data with new content: {file_data.to_dict()}")
86+
with file_data_path.open("w") as file:
87+
json.dump(file_data.to_dict(), file, indent=2)
9188

9289
async def _run_async(self, fn: Callable, file_data_path: str) -> list[DownloadStepResponse]:
9390
file_data = FileData.from_file(path=file_data_path)

unstructured_ingest/v2/pipeline/steps/uncompress.py

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
from dataclasses import dataclass
23
from pathlib import Path
34
from typing import Callable, TypedDict
45

@@ -15,6 +16,7 @@ class UncompressStepResponse(TypedDict):
1516
path: str
1617

1718

19+
@dataclass
1820
class UncompressStep(PipelineStep):
1921
process: Uncompressor
2022
identifier: str = STEP_ID
@@ -23,21 +25,6 @@ def __post_init__(self):
2325
config = self.process.config.json() if self.process.config else None
2426
logger.info(f"Created {self.identifier} with configs: {config}")
2527

26-
def _run(self, path: str, file_data_path: str) -> list[UncompressStepResponse]:
27-
file_data = FileData.from_file(path=file_data_path)
28-
new_file_data = self.process.run(file_data=file_data)
29-
responses = []
30-
for new_file in new_file_data:
31-
new_file_data_path = Path(file_data_path).parent / f"{new_file.identifier}.json"
32-
new_file.to_file(path=str(new_file_data_path.resolve()))
33-
responses.append(
34-
UncompressStepResponse(
35-
path=new_file.source_identifiers.fullpath,
36-
file_data_path=str(new_file_data_path),
37-
)
38-
)
39-
return responses
40-
4128
async def _run_async(
4229
self, fn: Callable, path: str, file_data_path: str
4330
) -> list[UncompressStepResponse]:
@@ -56,7 +43,7 @@ async def _run_async(
5643
new_file.to_file(path=str(new_file_data_path.resolve()))
5744
responses.append(
5845
UncompressStepResponse(
59-
path=new_file.source_identifiers.fullpath,
46+
path=new_file.local_download_path,
6047
file_data_path=str(new_file_data_path),
6148
)
6249
)

unstructured_ingest/v2/processes/uncompress.py

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
from dataclasses import dataclass, field
44
from pathlib import Path
55
from typing import Any
6+
from uuid import NAMESPACE_DNS, uuid5
67

78
from pydantic import BaseModel
89

910
from unstructured_ingest.utils.compression import TAR_FILE_EXT, ZIP_FILE_EXT, uncompress_file
10-
from unstructured_ingest.v2.interfaces import FileData
11+
from unstructured_ingest.v2.interfaces import FileData, SourceIdentifiers
1112
from unstructured_ingest.v2.interfaces.process import BaseProcess
13+
from unstructured_ingest.v2.logger import logger
1214

1315

1416
class UncompressConfig(BaseModel):
@@ -23,19 +25,35 @@ def is_async(self) -> bool:
2325
return True
2426

2527
def run(self, file_data: FileData, **kwargs: Any) -> list[FileData]:
26-
local_filepath = Path(file_data.source_identifiers.fullpath)
28+
local_filepath = Path(file_data.local_download_path)
2729
if local_filepath.suffix not in TAR_FILE_EXT + ZIP_FILE_EXT:
2830
return [file_data]
2931
new_path = uncompress_file(filename=str(local_filepath))
3032
new_files = [i for i in Path(new_path).rglob("*") if i.is_file()]
3133
responses = []
34+
logger.debug(
35+
"uncompressed {} files from original file {}: {}".format(
36+
len(new_files), local_filepath, ", ".join([str(f) for f in new_files])
37+
)
38+
)
3239
for f in new_files:
3340
new_file_data = copy(file_data)
34-
new_file_data.source_identifiers.fullpath = str(f)
35-
if new_file_data.source_identifiers.rel_path:
36-
new_file_data.source_identifiers.rel_path = str(f).replace(
37-
str(local_filepath.parent), ""
38-
)[1:]
41+
new_file_data.identifier = str(uuid5(NAMESPACE_DNS, str(f)))
42+
new_file_data.local_download_path = str(f.resolve())
43+
new_rel_download_path = str(f).replace(str(Path(local_filepath.parent)), "")[1:]
44+
new_file_data.source_identifiers = SourceIdentifiers(
45+
filename=f.name,
46+
fullpath=file_data.source_identifiers.fullpath.replace(
47+
file_data.source_identifiers.filename, new_rel_download_path
48+
),
49+
rel_path=(
50+
file_data.source_identifiers.rel_path.replace(
51+
file_data.source_identifiers.filename, new_rel_download_path
52+
)
53+
if file_data.source_identifiers.rel_path
54+
else None
55+
),
56+
)
3957
responses.append(new_file_data)
4058
return responses
4159

0 commit comments

Comments
 (0)