-
Notifications
You must be signed in to change notification settings - Fork 49
Expand file tree
/
Copy pathauto_run_stage2.py
More file actions
302 lines (246 loc) · 11 KB
/
auto_run_stage2.py
File metadata and controls
302 lines (246 loc) · 11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Auto-Run Stage 2: Parallel OCR Processing Wrapper
Wrapper script that provides parallel processing for Stage 2 OCR processing.
Processes GCS directories discovered from a single GCS path.
"""
import os
import yaml
import logging
import subprocess
import argparse
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Any
# ----------------------------
# [1] Configuration Loader
# ----------------------------
def load_auto_run_config(config_path="auto_run.yaml"):
"""Load auto-run configuration from YAML file"""
try:
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
return config
except Exception as e:
print(f"Error loading auto-run config file: {e}")
return None
# ----------------------------
# [2] Stage 2 OCR Processor
# ----------------------------
def run_stage2_ocr(gcs_directory_path: str, worker_id: int = 0) -> Dict[str, Any]:
"""
Run Stage 2 OCR processing for a single GCS directory using the existing ocr_stage_2.py script.
Args:
gcs_directory_path (str): GCS directory path to process (e.g., gs://bucket/stage_1/biology/exam1)
worker_id (int): Worker ID for logging
Returns:
Dict[str, Any]: Processing result
"""
try:
# Get the directory where this script is located (same as config.yaml)
script_dir = os.path.dirname(os.path.abspath(__file__))
# Path to ocr_stage_2.py relative to config.yaml location
ocr_stage2_path = os.path.join(script_dir, "src", "stages", "ocr_stage_2.py")
# Build command to run ocr_stage_2.py
# Note: ocr_stage_2.py expects a parent path and discovers subdirectories automatically
cmd = [
"python3", ocr_stage2_path,
"--gcs-path", gcs_directory_path
]
logging.info(f"[Worker {worker_id}] Running Stage 2 OCR for: {gcs_directory_path}")
logging.info(f"[Worker {worker_id}] Command: {' '.join(cmd)}")
# Execute the command with real-time output
logging.info(f"[Worker {worker_id}] Starting subprocess...")
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=True, bufsize=1)
stdout_lines = []
# Show real-time output
for line in process.stdout:
line = line.strip()
if line:
logging.info(f"[Worker {worker_id}] {line}")
stdout_lines.append(line)
process.wait()
if process.returncode == 0:
logging.info(f"[Worker {worker_id}] Stage 2 OCR completed successfully for: {gcs_directory_path}")
return {
"success": True,
"gcs_path": gcs_directory_path,
"worker_id": worker_id,
"stdout": "\n".join(stdout_lines),
"stderr": ""
}
else:
logging.error(f"[Worker {worker_id}] Stage 2 OCR failed with return code {process.returncode}")
return {
"success": False,
"gcs_path": gcs_directory_path,
"worker_id": worker_id,
"error": f"Process failed with return code {process.returncode}",
"stdout": "\n".join(stdout_lines),
"stderr": ""
}
except subprocess.CalledProcessError as e:
logging.error(f"[Worker {worker_id}] Stage 2 OCR failed for {gcs_directory_path}: {e}")
logging.error(f"[Worker {worker_id}] STDOUT: {e.stdout}")
logging.error(f"[Worker {worker_id}] STDERR: {e.stderr}")
return {
"success": False,
"gcs_path": gcs_directory_path,
"worker_id": worker_id,
"error": str(e),
"stdout": e.stdout,
"stderr": e.stderr
}
except Exception as e:
logging.error(f"[Worker {worker_id}] Unexpected error in Stage 2 OCR for {gcs_directory_path}: {e}")
return {
"success": False,
"gcs_path": gcs_directory_path,
"worker_id": worker_id,
"error": str(e),
"stdout": "",
"stderr": ""
}
# ----------------------------
# [3] Parallel Processing Manager
# ----------------------------
def process_gcs_paths_parallel(gcs_paths: List[str], max_workers: int = 4) -> List[Dict[str, Any]]:
"""
Process multiple GCS paths in parallel using ThreadPoolExecutor.
Args:
gcs_paths (List[str]): List of GCS paths to process
max_workers (int): Maximum number of parallel workers
Returns:
List[Dict[str, Any]]: List of processing results
"""
results = []
if not gcs_paths:
logging.warning("No GCS paths to process")
return results
logging.info(f"Starting parallel processing of {len(gcs_paths)} paths with {max_workers} workers")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
future_to_path = {}
for i, gcs_path in enumerate(gcs_paths):
future = executor.submit(run_stage2_ocr, gcs_path, i + 1)
future_to_path[future] = gcs_path
# Collect results as they complete
completed_count = 0
total_count = len(gcs_paths)
for future in as_completed(future_to_path):
gcs_path = future_to_path[future]
completed_count += 1
try:
result = future.result()
results.append(result)
if result['success']:
logging.info(f"[{completed_count}/{total_count}] ✓ Completed: {gcs_path}")
else:
logging.error(f"[{completed_count}/{total_count}] ✗ Failed: {gcs_path}")
except Exception as e:
logging.error(f"[{completed_count}/{total_count}] ✗ Exception for {gcs_path}: {e}")
results.append({
"success": False,
"gcs_path": gcs_path,
"worker_id": 0,
"error": str(e),
"stdout": "",
"stderr": ""
})
return results
# ----------------------------
# [4] Main Auto-Run Function
# ----------------------------
def main():
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
parser = argparse.ArgumentParser(description="Auto-Run Stage 2: Parallel OCR Processing")
parser.add_argument("--config", default="auto_run.yaml",
help="Auto-run configuration file path (default: auto_run.yaml)")
parser.add_argument("--gcs-path",
help="Override GCS input path from config")
parser.add_argument("--workers", type=int,
help="Override number of parallel workers from config")
parser.add_argument("--dry-run", action="store_true",
help="Run in dry-run mode (pass to ocr_stage_2.py)")
args = parser.parse_args()
# Load auto-run configuration
config = load_auto_run_config(args.config)
if not config:
logger.error("Failed to load auto-run configuration")
return False
# Get Stage 2 configuration
stage2_config = config.get('stage2', {})
gcs_input_paths = stage2_config.get('gcs_input_paths', [])
parallel_workers = args.workers or stage2_config.get('parallel_workers', 4)
# Override with single path if provided via command line
if args.gcs_path:
gcs_input_paths = [args.gcs_path]
if not gcs_input_paths:
logger.error("Stage 2 gcs_input_paths must be specified in config or via --gcs-path")
return False
# Validate all GCS paths
for path in gcs_input_paths:
if not path.startswith('gs://'):
logger.error(f"GCS input path must start with gs://: {path}")
return False
logger.info("=== Auto-Run Stage 2: Parallel OCR Processing Starting ===")
logger.info(f"GCS input paths: {gcs_input_paths}")
logger.info(f"Parallel workers: {parallel_workers}")
logger.info(f"Dry run mode: {args.dry_run}")
gcs_paths_to_process = gcs_input_paths
if args.dry_run:
logger.info("=== Dry Run: GCS Paths to Process ===")
for i, path in enumerate(gcs_paths_to_process, 1):
logger.info(f" {i}. {path}")
# Also run the actual dry-run of ocr_stage_2.py for each path
logger.info("Running ocr_stage_2.py in dry-run mode for each path...")
script_dir = os.path.dirname(os.path.abspath(__file__))
ocr_stage2_path = os.path.join(script_dir, "src", "stages", "ocr_stage_2.py")
for i, path in enumerate(gcs_paths_to_process, 1):
logger.info(f"Dry-run {i}/{len(gcs_paths_to_process)}: {path}")
try:
cmd = [
"python3", ocr_stage2_path,
"--gcs-path", path,
"--dry-run"
]
logger.info(f"Running dry-run command: {' '.join(cmd)}")
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=True, bufsize=1)
logger.info(f"Dry-run output for {path}:")
for line in process.stdout:
line = line.strip()
if line:
logger.info(f" {line}")
process.wait()
if process.returncode != 0:
logger.error(f"Dry-run failed for {path} with return code {process.returncode}")
except Exception as e:
logger.error(f"Dry-run failed for {path}: {e}")
return True
# Process paths (currently just one, but prepared for parallel processing)
results = process_gcs_paths_parallel(gcs_paths_to_process, parallel_workers)
# Generate summary
total_count = len(results)
success_count = sum(1 for r in results if r.get('success', False))
failure_count = total_count - success_count
logger.info("=== Auto-Run Stage 2 Complete ===")
logger.info(f"Total paths: {total_count}")
logger.info(f"Successful: {success_count}")
logger.info(f"Failed: {failure_count}")
if failure_count > 0:
logger.warning("Failed paths:")
for result in results:
if not result.get('success', False):
logger.warning(f" - {result['gcs_path']}: {result.get('error', 'Unknown error')}")
return failure_count == 0
if __name__ == "__main__":
success = main()
exit(0 if success else 1)