Description
I'm trying to use the unblob API for what I think should be a fairly straightforward task but I'm having some difficulties and hoping to get some help. I haven't found many examples of API usage so I'm hoping this issue might also help other users get started with the API from the code I have and learn from my mistakes.
My goal here is to use unblob to do a recursive extraction of a blob but to produce a clean copy of each extraction without any sub-extractions (i.e., have no _extract
files within any of my output directories). Instead I want each of the extractions to be stored one directory deep within an output directory (e.g., output/extraction1, output/extraction2). I've previously implemented something like this by just running unblob then parsing the generated outputs looking for files named *_extract
but I think it should be much cleaner to do this with the API.
I've written the code below which successfully logs a lot of information about the extraction process and almost gets me what I want, but I find that extraction files sometimes still end up in my output so I suspect I'm doing something wrong or missing something obvious here.
I have 3 specific questions, but any advice or guidance would be much appreciated! Thanks
- Am I missing a much easier way to do this with the API?
- Are
blob_id
values supposed to be unique per blob? It seems like the same blob_id will show up with distinct paths for example if a blob is carved into 2 files, both the base blob and the 2 generated files will have the same blob_id. Am I just misunderstanding this interface? - Are there examples of API usage somewhere?
Example usage after installing unblob dependencies, unblob itself, and saving the below script as extracator.py
wget https://dlcdnets.asus.com/pub/ASUS/wireless/RT-N66U_C1/FW_RT_N66U_C1_300438510000.zip
python3 extractor.py FW_RT_N66U_C1_300438510000.zip output
In the generated output directory I see one of the extracted directories contains two _extract
directories
$ ls output/extracted/*
output/extracted/00dd3c22e77b9382941bf19afb8370b1897535fb.unblob:
Firmware_Release
output/extracted/31c56af333e9f4652626f6e0e10418e27dd1af33.unblob:
part0 part0_extract part1 part1_extract
output/extracted/a0f5794c735f16dd4a7b12042ef23e1d95dcb134.unblob:
bin cifs1 cifs2 dev etc home jffs lib media mmc mnt opt proc rom root sbin sys sysroot tmp usr var www
output/extracted/e008abdad83a71b718083b949b91dafaf071b3c5.unblob:
lzma.uncompressed
It's almost right, but the part0_extract
and part1_extract
directories within output/extracted/31c56af333e9f4652626f6e0e10418e27dd1af33.unblob
shouldn't be there!
import os
import sys
import multiprocessing
import shutil
import hashlib
import ipdb
import subprocess
from pathlib import Path
from typing import Dict, List
from unblob.processing import ExtractionConfig, process_file
from unblob.logging import configure_logger
from unblob import report
def sha1sum_file(filename):
sha1 = hashlib.sha1() # XXX: if minimum python version >= 3.9, pass usedforsecurity=False
buf = bytearray(128*1024)
bufview = memoryview(buf)
with open(filename, 'rb', buffering=0) as f:
for n in iter(lambda : f.readinto(bufview), 0):
sha1.update(bufview[:n])
return sha1.hexdigest()
def extract(filesystem: Path, outdir: Path):
'''
Given a filesystem blob, extract it into outdir. Return a dictionary
mapping a unique ID for each extracted blob to (ID of source file, source file, extracted directory)
Note that one ID may be an empty string (e.g., for the input file).
'''
configure_logger(0, outdir, Path('unblob.log'))
unblob_results = process_file(
ExtractionConfig(extract_root=outdir / 'unblob.root',
entropy_depth=0,
max_depth=3,
verbose=False,
keep_extracted_chunks=True),
filesystem)
known_tasks = {} # task_id -> [task_file, subtask.path, [files_in_subtask_path]]
for task_result in unblob_results.results:
task_file = task_result.task.path
task_id = task_result.task.blob_id
for subtask in task_result.subtasks:
if subtask.blob_id not in known_tasks:
# XXX: We'll see the same subtask.blob_id for each time we extract more data from a blob. E.g., we could have
# blob_id=1 for foo.zip which has a subtask.path of foo.zip_extracted. Then we'll see blob_id=1 for foo.zip_extracted/nextfile
# because the nextfile is derived from foo.zip.
known_tasks[subtask.blob_id] = [task_file, subtask.path, [subtask.path]]
#print(f"New blob_id in subtask with ID={subtask.blob_id}\n\tExtracted from {task_file} (ID={task_id})\n\tResults at {subtask.path}")
else:
known_tasks[subtask.blob_id][2].append(subtask.path)
#print(f"Duplicate blob_id with ID={subtask.blob_id}\n\tExtracted from {task_file} (ID={task_id})\n\tResults at {subtask.path}")
return known_tasks
def package(known_tasks, output_dir):
'''
We want to create a clean set of directories in outdir.
For each extraction we want to create
outdir/[sha1sum_of_blob].[unblob]/
'''
# Create results directories. output/blobs/<sha1sum> and output/extracted/<sha1sum>.<extractor>
blob_dir = output_dir / 'blobs'
blob_dir.mkdir()
extracted_dir = output_dir / 'extracted'
extracted_dir.mkdir()
# Copy each blob to the blob directory
for task_id, info in known_tasks.items():
input_hash = sha1sum_file(info[0])
shutil.copy(info[0], blob_dir / input_hash)
# Copy each extraction to the extracted directory
# Identify all extraction directories that were created (so we don't copy later)
extraction_dirs = set()
for task_id, info in known_tasks.items():
extraction_dirs.add(info[1])
for task_id, info in known_tasks.items():
input_hash = sha1sum_file(info[0])
out_dir = extracted_dir / f"{input_hash}.unblob"
# We want to copy everything from info[1] to out_dir but exclude any directories in extraction_dirs
# First copy everything - we'll use CP over shutil to handle weird files
subprocess.check_output(['cp', '-r', info[1], out_dir])
# Now for each file in extraction_dirs (except info[1]), delete
for extraction_dir in extraction_dirs:
if extraction_dir == info[1]:
continue
# Rewrite path to be relative to out_dir
try:
relative_path = Path(extraction_dir).relative_to(info[1])
except ValueError:
# Not a subpath, skip
continue
# Ensure output is not-empty and within the output directory
if not len(str(out_dir / relative_path)) or not str(out_dir / relative_path).startswith(str(out_dir)):
print("XXX skipping rm of unsafe path", out_dir / relative_path)
continue
subprocess.check_output(['rm', '-rf', str(out_dir / relative_path)])
def extract_and_package(firmware, output_dir):
# Initial extraction
known_tasks = extract(firmware, output_dir)
# Log extraction results
for task_id, info in known_tasks.items():
print(f"\nID {task_id}:")
print(f"\tPath: {info[0]} {sha1sum_file(info[0])}")
print(f"\tExtraction directory {info[1]}")
print(f"\tExtracted children:")
for subtask_path in info[2]:
print(f"\t - {subtask_path}")
package(known_tasks, output_dir)
if __name__ == '__main__':
firmware = Path(sys.argv[1])
if not firmware.exists():
print(f"File {firmware} does not exist")
os.exit(1)
output_dir = Path(sys.argv[2])
if output_dir.exists():
print(f"Output directory {output_dir} already exists. Removing it.")
shutil.rmtree(output_dir)
output_dir.mkdir(parents=True)
extract_and_package(firmware, output_dir)