-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathextract_ocr.py
More file actions
2973 lines (2500 loc) · 113 KB
/
extract_ocr.py
File metadata and controls
2973 lines (2500 loc) · 113 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
Electoral Roll PDF Data Extraction using OCR (Tesseract + OpenCV).
Extracts voter data from Tamil Nadu electoral roll PDFs (English + Tamil pairs)
into structured CSV files. Uses PyMuPDF for image extraction, OpenCV for grid
detection, and Tesseract OCR for text recognition.
Usage:
python extract_ocr.py AC-188 # Process one AC directory
python extract_ocr.py AC-188 --validate # Test on 1 pair
python extract_ocr.py --all --workers 4 # Process all AC directories
python extract_ocr.py AC-188 --dry-run # Show pairs only
python extract_ocr.py AC-188 --part 101 # Process only Part 101
python extract_ocr.py AC-188 --part 50-100 # Process Parts 50 to 100
python extract_ocr.py AC-188 --part 1,5,10-20 # Specific parts and ranges
python extract_ocr.py AC-188 --reset # Reset checkpoint (all parts)
python extract_ocr.py AC-188 --reset --part 101 # Reset only Part 101
python extract_ocr.py AC-188 --reset --part 50-100 # Reset Parts 50 to 100
python extract_ocr.py AC-188 --limit 10 # Process only 10 pairs
python extract_ocr.py # Interactive prompt
"""
import argparse
import csv
import json
import logging
import os
import re
import shutil
import sys
import traceback
import unicodedata
from collections import Counter, defaultdict
from concurrent.futures import ProcessPoolExecutor, as_completed
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Optional
import cv2
import fitz # PyMuPDF
import numpy as np
from PIL import Image
import io
try:
import pytesseract
# Set Tesseract path for Windows (default install location)
_tess_path = r"C:\Program Files\Tesseract-OCR\tesseract.exe"
if os.path.exists(_tess_path):
pytesseract.pytesseract.tesseract_cmd = _tess_path
except ImportError:
pytesseract = None
# ----- Configuration -----
OCR_DIR = Path(__file__).parent # ER_OCR/
INPUT_BASE = OCR_DIR / "Input" / "split_files"
OUTPUT_BASE = OCR_DIR / "output" / "split_files"
MERGED_BASE = OCR_DIR / "output" / "merged_files" / "parts"
AC_MERGED_DIR = OCR_DIR / "output" / "merged_files" / "ac"
LOG_DIR = OCR_DIR / "logs"
# Legacy paths (for backward compatibility with old checkpoint location)
_LEGACY_CHECKPOINT_DIR = OCR_DIR / "checkpoints"
CSV_HEADERS = [
"AC No", "Part No", "Serial No", "EPIC ID",
"Name (English)", "Name (Tamil)",
"Relation Name (English)", "Relation Name (Tamil)",
"Relation Type", "House No", "Age", "Gender",
"DOB", "ContactNo",
]
# Extra columns appended when --cross-check (or --validate) is active.
# Default runs always produce the 14-column CSV above.
CSV_HEADERS_CROSSCHECK = CSV_HEADERS + ["Cross_Check", "Cross_Check_Notes"]
# ----- Logging -----
log = logging.getLogger("extract_ocr")
def setup_logging(log_file: Path = None):
"""Configure dual logging: console + file."""
log.setLevel(logging.INFO)
fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S")
console = logging.StreamHandler(sys.stdout)
console.setFormatter(fmt)
log.addHandler(console)
if log_file:
log_file.parent.mkdir(parents=True, exist_ok=True)
fh = logging.FileHandler(log_file, encoding="utf-8")
fh.setFormatter(fmt)
log.addHandler(fh)
# ----- Part Range Parsing -----
def parse_part_range(part_str: str) -> set[int]:
"""Parse a --part argument into a set of part numbers.
Accepts: single part (101), range (50-100), or comma-separated (1,5,10-20).
"""
parts = set()
for segment in part_str.split(","):
segment = segment.strip()
if "-" in segment:
start, end = segment.split("-", 1)
parts.update(range(int(start), int(end) + 1))
else:
parts.add(int(segment))
return parts
# ----- Filename Parsing -----
FILENAME_RE = re.compile(
r"^(\d+)-EROLLGEN-S(\d+)-(\d+)-(\w+)-FinalRoll-Revision(\d+)"
r"-ENG-(\d+)-WI_page_(\d+)\.pdf$"
)
def parse_filename(filename: str) -> Optional[dict]:
"""Extract metadata from an English PDF filename."""
m = FILENAME_RE.match(filename)
if not m:
return None
return {
"year": m.group(1),
"state_code": m.group(2),
"ac_no": m.group(3),
"ac_abbr": m.group(4),
"revision": m.group(5),
"part_no": m.group(6),
"page_no": int(m.group(7)),
"filename": filename,
}
def discover_tamil_files(tamil_dir: Path) -> dict[str, dict[int, Path]]:
"""
Index all Tamil PDF files by (part_no, page_no).
Returns dict: part_no -> {page_no: Path}
"""
TAM_RE = re.compile(
r"^(\d+)-EROLLGEN-S(\d+)-(\d+)-(\w+)-FinalRoll-Revision(\d+)"
r"-TAM-(\d+)-WI_page_(\d+)\.pdf$"
)
index = defaultdict(dict)
if not tamil_dir.exists():
return index
for f in tamil_dir.iterdir():
if not f.name.endswith(".pdf"):
continue
m = TAM_RE.match(f.name)
if m:
part_no = m.group(6)
page_no = int(m.group(7))
index[part_no][page_no] = f
return index
def discover_pairs(directory: Path) -> list[dict]:
"""Discover all English PDF files in a directory.
Tamil matching is done later at processing time using EPIC ID matching,
since page number offsets between English and Tamil vary by part.
We still store all Tamil files indexed by part for efficient lookup.
"""
eng_dir = directory / "english"
tam_dir = directory / "tamil"
if not eng_dir.exists():
log.error(f"English directory not found: {eng_dir}")
return []
# Index all Tamil files
tamil_index = discover_tamil_files(tam_dir)
pairs = []
for f in sorted(eng_dir.iterdir()):
if not f.name.endswith(".pdf"):
continue
meta = parse_filename(f.name)
if meta is None:
log.warning(f"Skipping unrecognized filename: {f.name}")
continue
# Collect all Tamil pages for this part (for later EPIC matching)
tamil_pages = tamil_index.get(meta["part_no"], {})
pairs.append({
"english_path": str(f),
"tamil_pages": {pg: str(p) for pg, p in tamil_pages.items()},
"ac_no": meta["ac_no"],
"part_no": meta["part_no"],
"page_no": meta["page_no"],
"key": f.name,
})
return pairs
# ----- Checkpoint -----
def _checkpoint_path(dir_name: str) -> Path:
"""Return checkpoint file path inside the split_files/AC-xxx/ directory."""
cp_path = INPUT_BASE / dir_name / "checkpoint.json"
cp_path.parent.mkdir(parents=True, exist_ok=True)
# Auto-migrate from legacy checkpoints/ location
if not cp_path.exists():
legacy_path = _LEGACY_CHECKPOINT_DIR / f"{dir_name}.json"
if legacy_path.exists():
shutil.copy2(legacy_path, cp_path)
log.info(f"Migrated checkpoint from {legacy_path} to {cp_path}")
return cp_path
def load_checkpoint(dir_name: str) -> dict:
"""Load checkpoint data for a directory."""
cp_file = _checkpoint_path(dir_name)
if cp_file.exists():
with open(cp_file, "r") as f:
return json.load(f)
return {"processed": [], "batch_number": 0}
def save_checkpoint(dir_name: str, data: dict):
"""Save checkpoint data for a directory."""
cp_file = _checkpoint_path(dir_name)
with open(cp_file, "w") as f:
json.dump(data, f, indent=2)
# ----- Image Extraction -----
def extract_image_from_pdf(pdf_path: str) -> np.ndarray:
"""Extract the embedded PNG image from a single-page PDF."""
doc = fitz.open(pdf_path)
try:
page = doc[0]
images = page.get_images(full=True)
if not images:
raise ValueError(f"No images found in {pdf_path}")
xref = images[0][0]
img_data = doc.extract_image(xref)
img_bytes = img_data["image"]
img_pil = Image.open(io.BytesIO(img_bytes))
return np.array(img_pil)
finally:
doc.close()
# ----- Image Preprocessing -----
def preprocess_for_ocr(gray: np.ndarray, use_adaptive: bool = True) -> np.ndarray:
"""
Preprocess a grayscale image for better OCR accuracy.
Applies CLAHE contrast enhancement, noise removal, and adaptive thresholding.
"""
# CLAHE contrast enhancement
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
enhanced = clahe.apply(gray)
# Noise removal
denoised = cv2.fastNlMeansDenoising(enhanced, None, 10, 7, 21)
if use_adaptive:
# Adaptive thresholding handles uneven lighting/scanning
thresh = cv2.adaptiveThreshold(
denoised, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 31, 10
)
else:
_, thresh = cv2.threshold(denoised, 150, 255, cv2.THRESH_BINARY)
return thresh
# ----- Grid Detection -----
def detect_grid(image: np.ndarray) -> list[tuple[int, int, int, int]]:
"""
Detect voter entry cell boundaries in the electoral roll page.
Returns list of (x1, y1, x2, y2) tuples for each cell,
ordered left-to-right, top-to-bottom (col-major within each row).
"""
h, w = image.shape[:2]
# Convert to grayscale
if len(image.shape) == 3:
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
else:
gray = image.copy()
# Binary threshold
_, binary = cv2.threshold(gray, 128, 255, cv2.THRESH_BINARY_INV)
# Multi-scale horizontal line detection — try decreasing kernel widths
h_groups = []
for kernel_width in [w // 4, w // 6, w // 8, w // 12]:
h_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (kernel_width, 1))
h_lines = cv2.morphologyEx(binary, cv2.MORPH_OPEN, h_kernel)
h_proj = np.sum(h_lines, axis=1)
h_positions = np.where(h_proj > w * 0.3 * 255)[0]
h_groups = _group_positions(h_positions, min_gap=15)
if len(h_groups) >= 2:
break
# If morphological detection failed, try Hough Line Transform
if len(h_groups) < 2:
h_groups = _detect_lines_hough(binary, h, w, direction="horizontal")
if len(h_groups) < 2:
log.warning("Grid detection failed: not enough horizontal lines")
# Try contour-based detection before proportional fallback
contour_cells = _detect_grid_contours(binary, h, w)
if contour_cells:
return contour_cells
return _fallback_grid(h, w)
# Detect vertical lines for column boundary validation
v_groups = []
for kernel_height in [h // 4, h // 6, h // 8]:
v_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1, kernel_height))
v_lines = cv2.morphologyEx(binary, cv2.MORPH_OPEN, v_kernel)
v_proj = np.sum(v_lines, axis=0)
v_positions = np.where(v_proj > h * 0.2 * 255)[0]
v_groups = _group_positions(v_positions, min_gap=15)
if len(v_groups) >= 2:
break
# Determine column boundaries
col_boundaries = _detect_column_boundaries(binary, h_groups)
# If vertical lines detected, use them to validate/replace column boundaries
if len(v_groups) >= 4:
# Vertical lines give us direct column boundaries
col_boundaries = _validate_column_boundaries(v_groups, w)
elif len(col_boundaries) < 2:
# Fallback: assume 3 equal columns
col_boundaries = [
int(w * 0.02), # left margin
int(w * 0.34),
int(w * 0.66),
int(w * 0.98), # right margin
]
# Validate: electoral rolls always have 3 data columns (4 boundaries)
col_boundaries = _enforce_three_columns(col_boundaries, w)
# P4-1: Sanity check — if detected boundaries don't span at least 85% of page
# width, the column detection collapsed (common on partial pages with <10 voters).
# Fall back to proportional columns.
col_span = col_boundaries[-1] - col_boundaries[0]
if col_span < w * 0.85:
log.info(f"Column boundaries span only {col_span/w:.0%} of page width — using proportional fallback")
col_boundaries = [int(w * 0.02), int(w * 0.34), int(w * 0.66), int(w * 0.98)]
# Build cell list
cells = []
for row_idx in range(len(h_groups) - 1):
y1 = h_groups[row_idx]
y2 = h_groups[row_idx + 1]
for col_idx in range(len(col_boundaries) - 1):
x1 = col_boundaries[col_idx]
x2 = col_boundaries[col_idx + 1]
cells.append((x1, y1, x2, y2))
return cells
def _detect_lines_hough(binary: np.ndarray, h: int, w: int, direction: str = "horizontal") -> list[int]:
"""Detect line positions using Hough Line Transform (catches lines with gaps)."""
lines = cv2.HoughLinesP(binary, 1, np.pi / 180, threshold=100,
minLineLength=w // 6 if direction == "horizontal" else h // 6,
maxLineGap=20)
if lines is None:
return []
positions = []
for line in lines:
x1, y1, x2, y2 = line[0]
if direction == "horizontal":
# Horizontal lines: small y difference, large x span
if abs(y2 - y1) < 10 and abs(x2 - x1) > w * 0.2:
positions.append((y1 + y2) // 2)
else:
# Vertical lines: small x difference, large y span
if abs(x2 - x1) < 10 and abs(y2 - y1) > h * 0.2:
positions.append((x1 + x2) // 2)
if not positions:
return []
return _group_positions(np.array(sorted(positions)), min_gap=15)
def _detect_grid_contours(binary: np.ndarray, h: int, w: int) -> list[tuple[int, int, int, int]]:
"""Detect grid cells using contour detection as fallback."""
contours, _ = cv2.findContours(binary, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
# Expected cell size
expected_area = (w / 3) * (h / 10)
min_area = expected_area * 0.3
max_area = expected_area * 2.5
cells = []
for contour in contours:
x, y, cw, ch = cv2.boundingRect(contour)
area = cw * ch
aspect = cw / ch if ch > 0 else 0
if min_area < area < max_area and 0.5 < aspect < 4.0:
cells.append((x, y, x + cw, y + ch))
if len(cells) < 5:
return []
# Sort by y then x (row-major order)
cells.sort(key=lambda c: (c[1], c[0]))
return cells
def _validate_column_boundaries(v_groups: list[int], w: int) -> list[int]:
"""Use detected vertical lines to determine column boundaries."""
# Filter to boundaries that span a reasonable portion of the page width
# Expect: left margin, col1/col2 border, col2/col3 border, right margin
if len(v_groups) >= 4:
# Take the leftmost, rightmost, and 2 middle ones
boundaries = [v_groups[0]]
inner = v_groups[1:-1]
if len(inner) >= 2:
# Take the two that best split into thirds
third = w / 3
best_pair = None
best_score = float("inf")
for i in range(len(inner)):
for j in range(i + 1, len(inner)):
score = abs(inner[i] - third) + abs(inner[j] - 2 * third)
if score < best_score:
best_score = score
best_pair = (inner[i], inner[j])
if best_pair:
boundaries.extend(best_pair)
elif len(inner) == 1:
boundaries.append(inner[0])
boundaries.append(v_groups[-1])
return sorted(boundaries)
return []
def _enforce_three_columns(boundaries: list[int], w: int) -> list[int]:
"""Ensure exactly 4 boundaries (3 columns) for electoral roll pages."""
if len(boundaries) == 4:
return boundaries
if len(boundaries) > 4:
# Merge closest pair until we have 4
while len(boundaries) > 4:
min_gap = float("inf")
merge_idx = 0
for i in range(len(boundaries) - 1):
gap = boundaries[i + 1] - boundaries[i]
if gap < min_gap:
min_gap = gap
merge_idx = i
merged = (boundaries[merge_idx] + boundaries[merge_idx + 1]) // 2
boundaries = boundaries[:merge_idx] + [merged] + boundaries[merge_idx + 2:]
return boundaries
if len(boundaries) < 4 and len(boundaries) >= 2:
# Split widest span until we have 4
while len(boundaries) < 4:
max_gap = 0
split_idx = 0
for i in range(len(boundaries) - 1):
gap = boundaries[i + 1] - boundaries[i]
if gap > max_gap:
max_gap = gap
split_idx = i
mid = (boundaries[split_idx] + boundaries[split_idx + 1]) // 2
boundaries = boundaries[:split_idx + 1] + [mid] + boundaries[split_idx + 1:]
return boundaries
# Not enough data — use default proportional
return [int(w * 0.02), int(w * 0.34), int(w * 0.66), int(w * 0.98)]
def _detect_column_boundaries(binary: np.ndarray, h_groups: list[int]) -> list[int]:
"""Detect column boundaries by analyzing vertical density in the data rows."""
h, w = binary.shape
# Use the middle rows for analysis (avoid header/footer)
if len(h_groups) >= 3:
y_start = h_groups[1]
y_end = h_groups[-2]
else:
y_start = h_groups[0]
y_end = h_groups[-1]
row_slice = binary[y_start:y_end, :]
col_density = np.sum(row_slice, axis=0) / 255
# Find low-density gaps (column separators)
# Look for regions where density drops near zero
threshold = 10
gaps = []
in_gap = False
gap_start = 0
for x in range(w):
if col_density[x] < threshold:
if not in_gap:
gap_start = x
in_gap = True
else:
if in_gap:
gap_end = x
gap_width = gap_end - gap_start
if gap_width >= 3:
gaps.append((gap_start, gap_end))
in_gap = False
if in_gap:
gaps.append((gap_start, w))
if len(gaps) < 2:
return []
# First gap is left margin, last gap is right margin
# Gaps in between are column separators
boundaries = []
boundaries.append(gaps[0][1]) # right edge of left margin = start of col 1
for g in gaps[1:-1]:
mid = (g[0] + g[1]) // 2
boundaries.append(mid)
boundaries.append(gaps[-1][0]) # left edge of right margin = end of last col
return boundaries
def _group_positions(positions: np.ndarray, min_gap: int = 15) -> list[int]:
"""Group nearby pixel positions into single line positions."""
if len(positions) == 0:
return []
groups = []
current = [positions[0]]
for p in positions[1:]:
if p - current[-1] <= min_gap:
current.append(p)
else:
groups.append(int(np.mean(current)))
current = [p]
groups.append(int(np.mean(current)))
return groups
def _fallback_grid(h: int, w: int) -> list[tuple[int, int, int, int]]:
"""Fallback: return proportionally-split 3x10 grid."""
margin_x = int(w * 0.02)
margin_top = int(h * 0.033)
margin_bottom = int(h * 0.03)
content_w = w - 2 * margin_x
content_h = h - margin_top - margin_bottom
col_w = content_w // 3
row_h = content_h // 10
cells = []
for row in range(10):
y1 = margin_top + row * row_h
y2 = y1 + row_h
for col in range(3):
x1 = margin_x + col * col_w
x2 = x1 + col_w
cells.append((x1, y1, x2, y2))
return cells
# ----- Cell Content Detection -----
def _is_cell_empty(cell_img: np.ndarray, threshold: float = 0.02) -> bool:
"""
Check if a cell image is effectively empty (no voter data) by analyzing ink density.
Real voter cells have 5-10%+ ink coverage; empty cells have <1%.
Crops out border 5% to exclude grid lines before measuring.
"""
h, w = cell_img.shape[:2]
if h < 10 or w < 10:
return True
# Convert to grayscale
if len(cell_img.shape) == 3:
gray = cv2.cvtColor(cell_img, cv2.COLOR_RGB2GRAY)
else:
gray = cell_img
# Binary threshold (same as grid detection)
_, binary = cv2.threshold(gray, 128, 255, cv2.THRESH_BINARY_INV)
# Crop out border 5% on each side to exclude grid lines
border_y = max(int(h * 0.05), 2)
border_x = max(int(w * 0.05), 2)
interior = binary[border_y:h - border_y, border_x:w - border_x]
if interior.size == 0:
return True
dark_pixels = np.count_nonzero(interior)
total_pixels = interior.size
ink_ratio = dark_pixels / total_pixels
# Absolute minimum: fewer than 200 dark pixels means no readable text
if dark_pixels < 200:
log.debug(f"Empty cell: dark_pixels={dark_pixels}, ink_ratio={ink_ratio:.4f}")
return True
if ink_ratio < threshold:
log.debug(f"Empty cell: ink_ratio={ink_ratio:.4f} < {threshold}")
return True
return False
def _is_record_valid(record: dict) -> bool:
"""
Check if an OCR'd record has enough valid signals to be real voter data.
A real voter entry always has 3+ valid fields; noise from empty cells rarely
produces 2+ valid signals simultaneously.
"""
signals = 0
if record.get("name_english") and len(record["name_english"]) >= 3:
signals += 1
if record.get("epic_id") and re.match(r"^[A-Z]{3}\d{7}$", record["epic_id"]):
signals += 1
if record.get("serial_no"):
signals += 1
if record.get("age") and record.get("gender"):
signals += 1
if record.get("house_no"):
signals += 1
return signals >= 2
def _trim_trailing_empty_rows(records: list[dict], num_cols: int = 3) -> list[dict]:
"""
Remove records from trailing rows where no cell passed validation.
Prevents noise from bottom empty rows from entering serial inference.
Records must have '_cell_index' set.
"""
if not records or num_cols < 1:
return records
max_cell = max(r.get("_cell_index", 0) for r in records)
num_rows = (max_cell // num_cols) + 1
# Find the last row that has at least one valid record
last_valid_row = -1
for row in range(num_rows - 1, -1, -1):
row_start = row * num_cols
row_end = row_start + num_cols
row_records = [r for r in records if row_start <= r.get("_cell_index", -1) < row_end]
if any(_is_record_valid(r) for r in row_records):
last_valid_row = row
break
if last_valid_row < 0:
return records
# Keep records up to and including last_valid_row
cutoff = (last_valid_row + 1) * num_cols
trimmed = [r for r in records if r.get("_cell_index", 0) < cutoff]
if len(trimmed) < len(records):
log.info(f"Trimmed {len(records) - len(trimmed)} records from trailing empty rows")
return trimmed
# ----- OCR -----
def ocr_serial_targeted(cell_img: np.ndarray) -> str:
"""
Targeted serial number extraction from the top-left corner of a cell.
Serial numbers appear in a small bordered box at the top-left.
Uses multi-strategy preprocessing with voting for accuracy.
"""
h, w = cell_img.shape[:2]
# Serial number box is in the top-left ~25% height, ~20% width of the cell
roi = cell_img[0:int(h * 0.25), 0:int(w * 0.20)]
scale = 4
upscaled = cv2.resize(roi, None, fx=scale, fy=scale, interpolation=cv2.INTER_LANCZOS4)
if len(upscaled.shape) == 3:
gray = cv2.cvtColor(upscaled, cv2.COLOR_RGB2GRAY)
else:
gray = upscaled
config = "--psm 7 --oem 1 --dpi 300 -c tessedit_char_whitelist=0123456789#"
# Multi-strategy: try different thresholds and pick by voting
candidates = []
# Strategy 1: Fixed threshold at 150 (original)
_, thresh1 = cv2.threshold(gray, 150, 255, cv2.THRESH_BINARY)
text = pytesseract.image_to_string(thresh1, lang="eng", config=config).strip()
m = re.search(r"(\d{1,4})", text)
if m:
candidates.append(m.group(1))
# Strategy 2: Otsu's threshold
_, thresh2 = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
text = pytesseract.image_to_string(thresh2, lang="eng", config=config).strip()
m = re.search(r"(\d{1,4})", text)
if m:
candidates.append(m.group(1))
# Strategy 3: Lower threshold (120) for faint text
_, thresh3 = cv2.threshold(gray, 120, 255, cv2.THRESH_BINARY)
text = pytesseract.image_to_string(thresh3, lang="eng", config=config).strip()
m = re.search(r"(\d{1,4})", text)
if m:
candidates.append(m.group(1))
if not candidates:
return ""
# Voting: if 2+ strategies agree, use that; otherwise use first result
counts = Counter(candidates)
best, count = counts.most_common(1)[0]
if count >= 2:
return best
# No consensus — return the first candidate
return candidates[0]
def _extract_age_gender_targeted(cell_img: np.ndarray, record: dict) -> None:
"""
Targeted age/gender extraction from the bottom portion of a cell.
Age and Gender labels sit at the bottom of voter cells. When full-cell OCR
misses them (e.g., due to noise or misalignment), this re-OCRs just the
bottom region with focused settings.
"""
h, w = cell_img.shape[:2]
# Bottom 25% of the cell where Age/Gender typically appears
roi = cell_img[int(h * 0.75):, :]
if roi.size == 0:
return
scale = 4
upscaled = cv2.resize(roi, None, fx=scale, fy=scale, interpolation=cv2.INTER_LANCZOS4)
if len(upscaled.shape) == 3:
gray = cv2.cvtColor(upscaled, cv2.COLOR_RGB2GRAY)
else:
gray = upscaled
thresh = preprocess_for_ocr(gray)
text = pytesseract.image_to_string(thresh, lang="eng", config="--psm 6 --oem 1 --dpi 300")
if not record["age"]:
ag_match = AGE_GENDER_RE.search(text)
if ag_match:
try:
age_val = int(ag_match.group(1))
if 18 <= age_val <= 120:
record["age"] = ag_match.group(1)
except ValueError:
pass
if not record["gender"]:
record["gender"] = normalize_gender(ag_match.group(2))
else:
age_match = AGE_RE.search(text)
if age_match:
try:
age_val = int(age_match.group(1))
if 18 <= age_val <= 120:
record["age"] = age_match.group(1)
except ValueError:
pass
if not record["gender"]:
gender_match = GENDER_RE.search(text)
if gender_match:
record["gender"] = normalize_gender(gender_match.group(1))
def ocr_cell_english(cell_img: np.ndarray) -> dict:
"""OCR a single cell from the English PDF and parse fields."""
# Upscale 4x with Lanczos for sharper text
scale = 4
upscaled = cv2.resize(cell_img, None, fx=scale, fy=scale, interpolation=cv2.INTER_LANCZOS4)
# Convert to grayscale if needed
if len(upscaled.shape) == 3:
gray = cv2.cvtColor(upscaled, cv2.COLOR_RGB2GRAY)
else:
gray = upscaled
# Preprocessed thresholding (adaptive + CLAHE + denoising)
thresh = preprocess_for_ocr(gray)
# OCR with PSM 4 (single column of text) — handles the serial number box + text layout
# better than PSM 6 which treats it as one flat block
text = pytesseract.image_to_string(thresh, lang="eng", config="--psm 4 --oem 1 --dpi 300")
return parse_english_text(text)
def _ocr_tamil_with_preprocessing(cell_img: np.ndarray, crop_ratio: float,
use_otsu: bool = False) -> dict:
"""OCR a Tamil cell with specified crop ratio and preprocessing strategy."""
h, w = cell_img.shape[:2]
cropped = cell_img[0:int(h * crop_ratio), :]
scale = 4
upscaled = cv2.resize(cropped, None, fx=scale, fy=scale, interpolation=cv2.INTER_LANCZOS4)
if len(upscaled.shape) == 3:
gray = cv2.cvtColor(upscaled, cv2.COLOR_RGB2GRAY)
else:
gray = upscaled
if use_otsu:
_, thresh = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
else:
thresh = preprocess_for_ocr(gray)
text = pytesseract.image_to_string(thresh, lang="tam+eng", config="--psm 6 --oem 1 --dpi 300")
return parse_tamil_text(text)
def ocr_cell_tamil(cell_img: np.ndarray) -> dict:
"""OCR a single cell from the Tamil PDF and extract Tamil names.
Retries with alternative preprocessing if initial result has poor Tamil names."""
# Primary attempt: crop bottom 15%, CLAHE preprocessing
result = _ocr_tamil_with_preprocessing(cell_img, crop_ratio=0.85)
# If name is valid, return immediately (no retry needed)
if _is_valid_tamil_name(result.get("name_tamil", "")):
return result
# Retry with less aggressive crops and alternative preprocessing
best = result
best_count = _count_tamil_chars(result.get("name_tamil", ""))
retry_configs = [
(0.90, False), # Less crop, same preprocessing
(0.95, False), # Minimal crop, same preprocessing
(0.85, True), # Same crop, Otsu threshold
(0.90, True), # Less crop, Otsu threshold
]
for crop_ratio, use_otsu in retry_configs:
attempt = _ocr_tamil_with_preprocessing(cell_img, crop_ratio, use_otsu)
name = attempt.get("name_tamil", "")
if _is_valid_tamil_name(name):
count = _count_tamil_chars(name)
if count > best_count:
best = attempt
best_count = count
# If we got a good result, no need to try more
if best_count >= 3:
break
return best
def ocr_epic_id_targeted(cell_img: np.ndarray) -> Optional[str]:
"""
Targeted EPIC ID extraction from the top-right area of a cell.
Uses character allowlist for better accuracy.
Tries multiple ROI sizes to handle shifted cell boundaries.
"""
h, w = cell_img.shape[:2]
config = "--psm 7 --oem 1 --dpi 300 -c tessedit_char_whitelist=ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
# Try multiple ROI regions: primary tight, then progressively wider
rois = [
(0, int(h * 0.3), int(w * 0.4), w), # Primary: top 30%, right 60%
(0, int(h * 0.35), int(w * 0.3), w), # Wider: top 35%, right 70%
(0, int(h * 0.4), int(w * 0.2), w), # Widest: top 40%, right 80%
]
for y1, y2, x1, x2 in rois:
roi = cell_img[y1:y2, x1:x2]
if roi.size == 0:
continue
scale = 4
upscaled = cv2.resize(roi, None, fx=scale, fy=scale, interpolation=cv2.INTER_LANCZOS4)
if len(upscaled.shape) == 3:
gray = cv2.cvtColor(upscaled, cv2.COLOR_RGB2GRAY)
else:
gray = upscaled
thresh = preprocess_for_ocr(gray)
text = pytesseract.image_to_string(thresh, lang="eng", config=config).strip()
epic_id = fix_epic_id(text)
if epic_id and re.match(r"^[A-Z]{3}\d{7}$", epic_id):
return epic_id
return None
def _retry_epic_id_alt_preprocess(cell_img: np.ndarray) -> Optional[str]:
"""Retry EPIC ID extraction with alternative preprocessing strategies.
Uses different binarization and sharpening approaches that may work
when the standard CLAHE + adaptive threshold pipeline fails.
"""
h, w = cell_img.shape[:2]
config = "--psm 7 --oem 1 --dpi 300 -c tessedit_char_whitelist=ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
# ROI: top portion where EPIC ID appears
rois = [
(0, int(h * 0.3), int(w * 0.4), w),
(0, int(h * 0.35), int(w * 0.3), w),
]
for y1, y2, x1, x2 in rois:
roi = cell_img[y1:y2, x1:x2]
if roi.size == 0:
continue
scale = 4
upscaled = cv2.resize(roi, None, fx=scale, fy=scale, interpolation=cv2.INTER_LANCZOS4)
if len(upscaled.shape) == 3:
gray = cv2.cvtColor(upscaled, cv2.COLOR_RGB2GRAY)
else:
gray = upscaled
# Strategy 1: Otsu's threshold (better for high-contrast text)
_, otsu = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
text = pytesseract.image_to_string(otsu, lang="eng", config=config).strip()
epic_id = fix_epic_id(text)
if epic_id and re.match(r"^[A-Z]{3}\d{7}$", epic_id):
return epic_id
# Strategy 2: Sharpen + simple threshold (helps with faint text)
sharpen_kernel = np.array([[-1, -1, -1], [-1, 9, -1], [-1, -1, -1]])
sharpened = cv2.filter2D(gray, -1, sharpen_kernel)
_, sharp_thresh = cv2.threshold(sharpened, 140, 255, cv2.THRESH_BINARY)
text = pytesseract.image_to_string(sharp_thresh, lang="eng", config=config).strip()
epic_id = fix_epic_id(text)
if epic_id and re.match(r"^[A-Z]{3}\d{7}$", epic_id):
return epic_id
return None
def _ocr_epic_id_with_confidence(cell_img: np.ndarray) -> tuple[str, float]:
"""
Multi-strategy EPIC ID extraction with confidence scoring.
Runs all preprocessing strategies and uses voting/confidence to pick the best.
Returns (epic_id, confidence) where confidence is 0-100.
"""
h, w = cell_img.shape[:2]
config = "--psm 7 --oem 1 --dpi 300 -c tessedit_char_whitelist=ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
# Primary ROI: top-right area where EPIC ID appears
roi = cell_img[0:int(h * 0.35), int(w * 0.3):w]
if roi.size == 0:
return ("", 0.0)
scale = 4
upscaled = cv2.resize(roi, None, fx=scale, fy=scale, interpolation=cv2.INTER_LANCZOS4)
if len(upscaled.shape) == 3:
gray = cv2.cvtColor(upscaled, cv2.COLOR_RGB2GRAY)
else:
gray = upscaled
# Three preprocessing strategies
strategies = {}
# Strategy 1: CLAHE + adaptive threshold (default)
thresh1 = preprocess_for_ocr(gray)
strategies["clahe"] = thresh1
# Strategy 2: Otsu's threshold
_, thresh2 = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
strategies["otsu"] = thresh2
# Strategy 3: Sharpen + simple threshold
sharpen_kernel = np.array([[-1, -1, -1], [-1, 9, -1], [-1, -1, -1]])
sharpened = cv2.filter2D(gray, -1, sharpen_kernel)
_, thresh3 = cv2.threshold(sharpened, 140, 255, cv2.THRESH_BINARY)
strategies["sharpen"] = thresh3