|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +from typing import TYPE_CHECKING, BinaryIO |
| 4 | + |
| 5 | +from dissect.apfs.c_apfs import c_apfs |
| 6 | +from dissect.apfs.objects import NxSuperblock |
| 7 | + |
| 8 | +if TYPE_CHECKING: |
| 9 | + from uuid import UUID |
| 10 | + |
| 11 | + from dissect.apfs.objects.fs import FS |
| 12 | + from dissect.apfs.objects.keybag import ContainerKeybag |
| 13 | + |
| 14 | + |
| 15 | +class APFS: |
| 16 | + """Container class for APFS operations. |
| 17 | +
|
| 18 | + Args: |
| 19 | + fh: File-like object to read the APFS container from. |
| 20 | + """ |
| 21 | + |
| 22 | + def __init__(self, fh: BinaryIO): |
| 23 | + self.fh = fh |
| 24 | + self.fh.seek(0) |
| 25 | + |
| 26 | + self.sb = NxSuperblock.from_block(self, 0, self.fh.read(c_apfs.NX_DEFAULT_BLOCK_SIZE)) |
| 27 | + self.sb = sorted( |
| 28 | + [self.sb] + [obj for obj in self.sb.checkpoint_objects if isinstance(obj, NxSuperblock)], |
| 29 | + key=lambda obj: obj.xid, |
| 30 | + )[-1] |
| 31 | + |
| 32 | + @property |
| 33 | + def block_size(self) -> int: |
| 34 | + """The block size of the container.""" |
| 35 | + return self.sb.block_size |
| 36 | + |
| 37 | + @property |
| 38 | + def sectors_per_block(self) -> int: |
| 39 | + """The number of 512-byte sectors per block.""" |
| 40 | + return self.block_size // 512 |
| 41 | + |
| 42 | + @property |
| 43 | + def block_count(self) -> int: |
| 44 | + """The total number of blocks in the container.""" |
| 45 | + return self.sb.block_count |
| 46 | + |
| 47 | + @property |
| 48 | + def uuid(self) -> UUID: |
| 49 | + """The UUID of the container.""" |
| 50 | + return self.sb.uuid |
| 51 | + |
| 52 | + @property |
| 53 | + def keybag(self) -> ContainerKeybag | None: |
| 54 | + """The container keybag, if present.""" |
| 55 | + return self.sb.keylocker |
| 56 | + |
| 57 | + @property |
| 58 | + def volumes(self) -> list[FS]: |
| 59 | + """All the filesystems in the container.""" |
| 60 | + return self.sb.filesystems |
| 61 | + |
| 62 | + def _read_block(self, address: int, count: int = 1) -> bytes: |
| 63 | + """Read a block from the container. |
| 64 | +
|
| 65 | + Args: |
| 66 | + address: The block address to read. |
| 67 | + """ |
| 68 | + # TODO: Fusion tier2 |
| 69 | + self.fh.seek(address * self.block_size) |
| 70 | + return self.fh.read(count * self.block_size) |
0 commit comments