Skip to content

Commit be5c552

Browse files
authored
Add file change detection for incremental reprocessing (#268)
1 parent e99e275 commit be5c552

12 files changed

Lines changed: 919 additions & 140 deletions

File tree

docs/process.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,20 @@ You can configure parameters by providing a custom config file. You can find an
7171

7272
:rotating_light: Not all parameters are configurable yet :wink:
7373

74+
### :recycle: Incremental reprocessing
75+
76+
The optional top-level `previous_results` parameter lets you reuse results from a prior run to avoid reprocessing unchanged files so as to save time and compute costs.
77+
78+
```yaml
79+
previous_results: examples/process/outputs/merged/merged_results.jsonl
80+
```
81+
82+
Point it to a `merged_results.jsonl` produced by an earlier run. On the next run, each local input file is compared against that JSONL (meanwhile URL inputs are always reprocessed):
83+
84+
- Unchanged files: their previous samples are reused as-is.
85+
- New or modified files: they are processed normally.
86+
- Removed files: their samples are dropped from the output.
87+
7488
## :scroll: More information on what's under the hood
7589

7690
### :construction: Pipeline architecture
@@ -127,4 +141,12 @@ python3 -m mmore postprocess --config-file examples/postprocessor/config.yaml --
127141

128142
Specify with `--input-data` the path (absolute or relative to the root of the repository) to the JSONL recoding of the output of the initial processing phase.
129143

144+
### :recycle: Incremental post-processing
145+
146+
Like the processing pipeline, the post-processor accepts an optional `previous_results` parameter to reuse results from a prior post-processing run and skip unchanged documents.
147+
148+
```yaml
149+
previous_results: examples/postprocessor/outputs/merged/results.jsonl
150+
```
151+
130152
New post-processors can easily be implemented, and pipelines can be configured through lightweight YAML files. The post-processing stage produces a new JSONL file containing cleaned and optionally enhanced document samples.

examples/postprocessor/config.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
previous_results: null # Path to a previously post-processed JSONL to reuse when documents are unchanged
2+
13
pp_modules:
24
- type: chunker
35
args:

examples/process/config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
data_path: examples/sample_data/ #put absolute path! Possible to pass a list of folders as well
22
google_drive_ids: [] #put ids of google drive folders
3+
previous_results: null # Path to a previously processed JSONL to reuse when documents are unchanged
34
dispatcher_config:
45
output_path: examples/process/outputs/ #put absolute path or relative to the root of the module
56
use_fast_processors: false

src/mmore/process/crawler.py

Lines changed: 1 addition & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import json
21
import logging
32
import os
43
from typing import Dict, List, Optional
@@ -97,72 +96,6 @@ def from_dict(data: dict):
9796
return DispatcherReadyResult(urls=urls, file_paths=file_paths)
9897

9998

100-
class FindAlreadyComputedFiles:
101-
"""
102-
This class is used to get the list of all files that have already been processed.
103-
It will traverse the output_path directory and get all the results.jsonl files
104-
where in each line (representing a sample) we have the metadata of the file_path that was used to create that sample.
105-
> See create_sample in utils.py (file_path is in metadata of the sample).
106-
107-
Reminder here is the structure of the output_path directory (see DispatcherConfig):
108-
output_path
109-
├── processors
110-
| ├── Processor_type_1
111-
| | └── results.jsonl
112-
| ├── Processor_type_2
113-
| | └── results.jsonl
114-
| ├── ...
115-
|
116-
└── merged
117-
└── merged_results.jsonl
118-
"""
119-
120-
def __init__(self, output_path: str):
121-
"""
122-
output_path: the path where the output of the Process is stored.
123-
"""
124-
if output_path is None:
125-
raise ValueError("output_path must be provided.")
126-
self.output_path = output_path
127-
128-
def _get_all_samples_jsonl_paths(self, output_path):
129-
# Get all the results.jsonl files in the output_path directory.
130-
samples_files = []
131-
for root, _, files in os.walk(output_path):
132-
for file in files:
133-
if file.endswith("results.jsonl"):
134-
samples_files.append(os.path.join(root, file))
135-
return samples_files
136-
137-
def _get_metadata_jsonl_path(self, results_jsonl_path):
138-
# read jsonl file and for each item in the file, get the metadata's file_path
139-
# return the list of all file_path in this jsonl file
140-
file_paths = []
141-
with open(results_jsonl_path, "r") as f:
142-
for i, line in enumerate(f):
143-
data = json.loads(line)
144-
if "metadata" in data and "file_path" in data["metadata"]:
145-
file_paths.append(data["metadata"]["file_path"])
146-
else:
147-
logger.error(
148-
f"Warning file_path not found in metadate (line{i} of {results_jsonl_path})"
149-
)
150-
return file_paths
151-
152-
def get_all_files_already_processed(self) -> set[str]:
153-
"""
154-
This function returns the set of all files path's that have already been processed.
155-
Returns:
156-
set of all files path's that have already been processed.
157-
"""
158-
samples = self._get_all_samples_jsonl_paths(self.output_path)
159-
files_already_processed = set()
160-
for f in samples:
161-
line = self._get_metadata_jsonl_path(f)
162-
files_already_processed.update(line)
163-
return files_already_processed
164-
165-
16699
class CrawlerConfig:
167100
"""
168101
Configuration for the Crawler.
@@ -308,40 +241,9 @@ def _traverse_directories(self) -> None:
308241
FileDescriptor.from_filename(filepath)
309242
)
310243

311-
def _filter_out_already_processed_files(
312-
self, files: Dict[str, List[FileDescriptor]], output_path: str
313-
) -> Dict[str, List[FileDescriptor]]:
314-
"""
315-
Avoid processing files that have already been processed.
316-
Immutable function.
317-
Args:
318-
files: the crawled files that want to be processed. We want to remove the files that have already been processed.
319-
output_path: the path where the outputs of the "process" is stored.
320-
Returns:
321-
filtered out 'files' to process.
322-
"""
323-
all_files_done: set[str] = FindAlreadyComputedFiles(
324-
output_path
325-
).get_all_files_already_processed()
326-
logger.info(f"Found {len(all_files_done)} files already processed.")
327-
328-
for root_dir, files_in_dir in files.items():
329-
files[root_dir] = [
330-
f for f in files_in_dir if f.file_path not in all_files_done
331-
]
332-
333-
if len(all_files_done) > 0:
334-
logger.info(f"Removed {len(all_files_done)} files already processed.")
335-
logger.info(
336-
f"New total files to process: {sum(len(files) for files in files.values())}"
337-
)
338-
return files
339-
340-
def crawl(self, skip_already_processed: bool = False) -> DispatcherReadyResult:
244+
def crawl(self) -> DispatcherReadyResult:
341245
"""
342246
Crawl the configured directories and URLs.
343-
Args:
344-
skip_already_processed (bool): if set to True, the crawler will scan the outputs folder and detect files that correspond to them, and skip them.
345247
Returns:
346248
DispatcherReadyResult: The result of the crawl operation, ready to be dispatched to the processors.
347249
"""
@@ -368,12 +270,4 @@ def crawl(self, skip_already_processed: bool = False) -> DispatcherReadyResult:
368270
urls: List[URLDescriptor] = self.files["url"]
369271
file_paths: Dict[str, List[FileDescriptor]] = self.files["local"]
370272

371-
if self.config.output_path and skip_already_processed:
372-
logger.info(
373-
"Checking if some of those files to process have already been processed."
374-
)
375-
file_paths = self._filter_out_already_processed_files(
376-
files=file_paths, output_path=self.config.output_path
377-
)
378-
379273
return DispatcherReadyResult(urls=urls, file_paths=file_paths)

src/mmore/process/dispatcher.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22
import os
33
from dataclasses import dataclass
4+
from datetime import datetime
45
from operator import itemgetter
56
from typing import Dict, Iterator, List, Optional, Tuple, Type, Union, cast
67

@@ -321,10 +322,24 @@ def process_files(
321322

322323
return results
323324

325+
def _clear_per_processor_results(self) -> None:
326+
"""Clear per-processor result JSONL files.
327+
This is needed because :meth:`MultimodalSample.to_jsonl` uses append by default."""
328+
if not self.config.output_path:
329+
return
330+
processors_dir = os.path.join(self.config.output_path, "processors")
331+
if not os.path.isdir(processors_dir):
332+
return
333+
for processor_name in os.listdir(processors_dir):
334+
results_path = os.path.join(processors_dir, processor_name, "results.jsonl")
335+
if os.path.exists(results_path):
336+
os.remove(results_path)
337+
324338
def dispatch(self) -> List[List[MultimodalSample]]:
325339
"""
326340
Dispatches the result to the appropriate processor.
327341
"""
342+
self._clear_per_processor_results()
328343

329344
def batch_list(
330345
lst: List, obj_batch_size: int, processor: Type[Processor]
@@ -403,6 +418,11 @@ def save_individual_processor_results(
403418
if not self.config.output_path:
404419
return
405420

421+
processed_at = datetime.now().isoformat()
422+
for sample in results:
423+
sample.metadata["processed_at"] = processed_at
424+
sample.metadata["processor_type"] = cls_name
425+
406426
processor_output_path = os.path.join(
407427
self.config.output_path, "processors", cls_name
408428
)

src/mmore/process/incremental.py

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import json
2+
import logging
3+
import os
4+
from datetime import datetime
5+
from typing import Dict, List, Set
6+
7+
from ..type import MultimodalSample
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
def _iter_samples_jsonl(path: str):
13+
"""Helper function to stream line by line a JSONL to avoid loading it fully in memory."""
14+
if not os.path.exists(path):
15+
raise FileNotFoundError(f"Previous results file not found: {path}")
16+
with open(path, "r") as f:
17+
for line in f:
18+
line = line.strip()
19+
if not line:
20+
continue
21+
yield MultimodalSample.from_dict(json.loads(line))
22+
23+
24+
def load_previous_process_results(path: str) -> Dict[str, MultimodalSample]:
25+
"""Index samples by ``metadata.file_path`` for the processing pipeline,
26+
keeping the latest ``processed_at`` if there are any duplicates."""
27+
samples_by_file_path: Dict[str, List[MultimodalSample]] = {}
28+
for sample in _iter_samples_jsonl(path):
29+
samples_by_file_path.setdefault(sample.metadata["file_path"], []).append(sample)
30+
31+
index: Dict[str, MultimodalSample] = {}
32+
for file_path, samples in samples_by_file_path.items():
33+
if len(samples) > 1:
34+
logger.warning(
35+
"Duplicate samples for file_path %s: keeping latest processed_at, "
36+
"dropping %d samples",
37+
file_path,
38+
len(samples) - 1,
39+
)
40+
41+
index[file_path] = max(
42+
samples,
43+
key=lambda s: datetime.fromisoformat(s.metadata["processed_at"])
44+
if s.metadata.get("processed_at") is not None
45+
else datetime.min,
46+
)
47+
return index
48+
49+
50+
def load_previous_postprocess_results(
51+
path: str,
52+
) -> Dict[str, List[MultimodalSample]]:
53+
"""Index samples by ``metadata.file_path`` for the post-processing pipeline."""
54+
index: Dict[str, List[MultimodalSample]] = {}
55+
for sample in _iter_samples_jsonl(path):
56+
index.setdefault(sample.metadata["file_path"], []).append(sample)
57+
return index
58+
59+
60+
def is_reusable_process(file_path: str, previous: Dict[str, MultimodalSample]) -> bool:
61+
"""Check whether the previous processed sample the given file can be reused.
62+
63+
Conditions (all required):
64+
- ``file_path`` is present in ``previous``
65+
- the cached sample has a ``processed_at`` timestamp
66+
- the source file has not been modified since (``file_mtime <= processed_at``)
67+
"""
68+
sample = previous.get(file_path)
69+
if sample is None:
70+
return False
71+
72+
processed_at_str = sample.metadata.get("processed_at")
73+
if processed_at_str is None:
74+
return False
75+
76+
processed_at = datetime.fromisoformat(processed_at_str)
77+
if not os.path.exists(file_path):
78+
return False
79+
file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
80+
return file_mtime <= processed_at
81+
82+
83+
def is_reusable_postprocess(
84+
file_path: str,
85+
input_processed_at: str,
86+
previous: Dict[str, List[MultimodalSample]],
87+
) -> bool:
88+
"""Check whether the previous post-processed samples of the given file can be reused.
89+
90+
Conditions (all required):
91+
- ``file_path`` has at least one cached sample in ``previous``
92+
- every cached sample has a ``processed_at`` timestamp
93+
- ``input_processed_at <= min(cached processed_at)``
94+
"""
95+
samples = previous.get(file_path)
96+
if not samples:
97+
return False
98+
99+
timestamps: List[datetime] = []
100+
for s in samples:
101+
timestamp_str = s.metadata.get("processed_at")
102+
if timestamp_str is None:
103+
return False
104+
timestamps.append(datetime.fromisoformat(timestamp_str))
105+
106+
return datetime.fromisoformat(input_processed_at) <= min(timestamps)
107+
108+
109+
def merge_results(
110+
reused: Dict[str, List[MultimodalSample]],
111+
new_results: List[MultimodalSample],
112+
current_file_paths: Set[str],
113+
) -> List[MultimodalSample]:
114+
"""Combine reused and newly processed/post-processed samples."""
115+
merged: List[MultimodalSample] = []
116+
for file_path, samples in reused.items():
117+
if file_path in current_file_paths:
118+
merged.extend(samples)
119+
for sample in new_results:
120+
if sample.metadata["file_path"] in current_file_paths:
121+
merged.append(sample)
122+
return merged

0 commit comments

Comments
 (0)