|
| 1 | +#!/usr/bin/env python3 |
| 2 | +"""Create minimized RAW test fixtures for rawpy thumbnail extraction tests. |
| 3 | +
|
| 4 | +
|
| 5 | +This script zeroes out sensor pixel data, so total filesize remains the same, but they compress |
| 6 | +extremely well in git's object store, and retain enough structure for rawpy's extract_thumb() to work. |
| 7 | +
|
| 8 | +- Keep all TIFF metadata and IFD structures intact |
| 9 | +- Replace all IFD-referenced JPEG previews with a tiny 16x16 synthetic JPEG |
| 10 | +- Update the JPEGLength (tag 514) IFD entry if present |
| 11 | +- Zero out raw sensor pixel data (never needed for extract_thumb()) |
| 12 | +- Write the result to test/sample_data/<original filename> |
| 13 | +
|
| 14 | +Usage: |
| 15 | + test/minimize_raw.py path/to/original.RAW |
| 16 | +""" |
| 17 | + |
| 18 | +import argparse |
| 19 | +import struct |
| 20 | +import sys |
| 21 | +from io import BytesIO |
| 22 | +from pathlib import Path |
| 23 | + |
| 24 | +from PIL import Image |
| 25 | + |
| 26 | +from test.analyze_raw import parse_tiff_header, read_ifd, read_offsets_and_lengths |
| 27 | + |
| 28 | +SAMPLE_DATA = Path(__file__).parent / 'sample_data' |
| 29 | + |
| 30 | + |
| 31 | +def _scan_file(data: bytes) -> tuple[str, dict]: |
| 32 | + """Parse RAW file bytes and return (endian, results). |
| 33 | +
|
| 34 | + results has keys: |
| 35 | + 'jpegs' — list of dicts with offset/length/length_tag_entry_offset |
| 36 | + 'raw_regions' — list of dicts with offset/length for raw sensor data |
| 37 | + """ |
| 38 | + endian, ifd0_offset = parse_tiff_header(data) |
| 39 | + |
| 40 | + results: dict = {'jpegs': [], 'raw_regions': []} |
| 41 | + ifd_offset = ifd0_offset |
| 42 | + ifd_index = 0 |
| 43 | + while ifd_offset: |
| 44 | + _collect_ifd(data, ifd_offset, endian, f'IFD{ifd_index}', results) |
| 45 | + _, ifd_offset, _ = read_ifd(data, ifd_offset, endian) |
| 46 | + ifd_index += 1 |
| 47 | + |
| 48 | + return endian, results |
| 49 | + |
| 50 | + |
| 51 | +def _collect_ifd(data: bytes, ifd_offset: int, endian: str, label: str, results: dict) -> None: |
| 52 | + """Recursively walk an IFD, collecting JPEG preview and raw sensor info.""" |
| 53 | + entries, _, _ = read_ifd(data, ifd_offset, endian) |
| 54 | + if not entries: |
| 55 | + return |
| 56 | + |
| 57 | + # Embedded JPEG via tags 513 (offset) / 514 (length) |
| 58 | + if 513 in entries and 514 in entries: |
| 59 | + results['jpegs'].append( |
| 60 | + { |
| 61 | + 'label': label, |
| 62 | + 'offset': entries[513][2], |
| 63 | + 'length': entries[514][2], |
| 64 | + 'length_tag_entry_offset': entries[514][3], |
| 65 | + } |
| 66 | + ) |
| 67 | + |
| 68 | + # Raw sensor strips (tags 273/279) and tiles (tags 324/325) |
| 69 | + for offset_tag, length_tag in [(273, 279), (324, 325)]: |
| 70 | + for off, ln in read_offsets_and_lengths(data, entries, offset_tag, length_tag, endian): |
| 71 | + results['raw_regions'].append({'offset': off, 'length': ln}) |
| 72 | + |
| 73 | + # SubIFDs (tag 330) |
| 74 | + if 330 in entries: |
| 75 | + _, cnt, val, _ = entries[330] |
| 76 | + sub_offsets = ( |
| 77 | + [val] |
| 78 | + if cnt == 1 |
| 79 | + else [struct.unpack_from(endian + 'I', data, val + i * 4)[0] for i in range(cnt)] |
| 80 | + ) |
| 81 | + for i, sub_off in enumerate(sub_offsets): |
| 82 | + _collect_ifd(data, sub_off, endian, f'{label}/SubIFD[{i}]', results) |
| 83 | + |
| 84 | + |
| 85 | +def _make_tiny_jpeg(size: tuple[int, int] = (16, 16), color: str = 'red') -> bytes: |
| 86 | + buf = BytesIO() |
| 87 | + Image.new('RGB', size, color=color).save(buf, format='JPEG') |
| 88 | + return buf.getvalue() |
| 89 | + |
| 90 | + |
| 91 | +def minimize(src: Path, dest: Path) -> None: |
| 92 | + """Minimize a RAW file by replacing embedded JPEGs and zeroing raw sensor data. |
| 93 | +
|
| 94 | + Reads src, writes the minimized result to dest (may be the same path). |
| 95 | + """ |
| 96 | + print(f'Reading {src} ({src.stat().st_size / 1024 / 1024:.1f} MB)...') |
| 97 | + data = bytearray(src.read_bytes()) |
| 98 | + endian, results = _scan_file(bytes(data)) |
| 99 | + |
| 100 | + if not results['jpegs'] and not results['raw_regions']: |
| 101 | + raise RuntimeError( |
| 102 | + 'No embedded JPEGs or raw sensor regions found — is this a TIFF-based RAW file?' |
| 103 | + ) |
| 104 | + |
| 105 | + tiny_jpeg = _make_tiny_jpeg() |
| 106 | + |
| 107 | + for jpeg in sorted(results['jpegs'], key=lambda j: j['length'], reverse=True): |
| 108 | + old_offset = jpeg['offset'] |
| 109 | + old_len = jpeg['length'] |
| 110 | + data[old_offset : old_offset + len(tiny_jpeg)] = tiny_jpeg |
| 111 | + data[old_offset + len(tiny_jpeg) : old_offset + old_len] = b'\x00' * ( |
| 112 | + old_len - len(tiny_jpeg) |
| 113 | + ) |
| 114 | + print( |
| 115 | + f' Replaced JPEG [{jpeg["label"]}] @ {old_offset:,}: {old_len:,} → {len(tiny_jpeg)} bytes' |
| 116 | + ) |
| 117 | + |
| 118 | + if jpeg['length_tag_entry_offset'] is not None: |
| 119 | + struct.pack_into( |
| 120 | + endian + 'I', data, jpeg['length_tag_entry_offset'] + 8, len(tiny_jpeg) |
| 121 | + ) |
| 122 | + print(f' Updated IFD tag 514 entry @ {jpeg["length_tag_entry_offset"]:,}') |
| 123 | + |
| 124 | + for region in results['raw_regions']: |
| 125 | + off, ln = region['offset'], region['length'] |
| 126 | + data[off : off + ln] = b'\x00' * ln |
| 127 | + print(f' Zeroed raw sensor region @ {off:,}: {ln / 1024 / 1024:.1f} MB') |
| 128 | + |
| 129 | + dest.parent.mkdir(parents=True, exist_ok=True) |
| 130 | + dest.write_bytes(data) |
| 131 | + print(f'Written {dest} ({dest.stat().st_size / 1024 / 1024:.1f} MB)') |
| 132 | + |
| 133 | + |
| 134 | +def verify(path: Path) -> None: |
| 135 | + """Verify rawpy can extract a valid thumbnail from the minimized file.""" |
| 136 | + import rawpy |
| 137 | + |
| 138 | + print(f'Verifying {path.name} with rawpy...') |
| 139 | + with rawpy.imread(str(path)) as raw: |
| 140 | + thumb = raw.extract_thumb() |
| 141 | + |
| 142 | + if thumb.format == rawpy.ThumbFormat.JPEG: |
| 143 | + img = Image.open(BytesIO(bytes(thumb.data))) |
| 144 | + print(f' OK: {thumb.format}, size={img.size}') |
| 145 | + elif thumb.format == rawpy.ThumbFormat.BITMAP: |
| 146 | + print(f' OK: {thumb.format}, shape={getattr(thumb.data, "shape", "?")}') |
| 147 | + else: |
| 148 | + raise AssertionError(f'Unexpected thumbnail format: {thumb.format}') |
| 149 | + |
| 150 | + |
| 151 | +def main() -> None: |
| 152 | + parser = argparse.ArgumentParser(description='Minimize a RAW file for use in test data') |
| 153 | + parser.add_argument('src', type=Path, help='Path to the original RAW file') |
| 154 | + args = parser.parse_args() |
| 155 | + |
| 156 | + if not args.src.exists(): |
| 157 | + print(f'ERROR: File not found: {args.src}', file=sys.stderr) |
| 158 | + sys.exit(1) |
| 159 | + |
| 160 | + dest = SAMPLE_DATA / args.src.name |
| 161 | + minimize(args.src, dest) |
| 162 | + verify(dest) |
| 163 | + print('Done.') |
| 164 | + |
| 165 | + |
| 166 | +if __name__ == '__main__': |
| 167 | + main() |
0 commit comments