Skip to content

Commit 99c9183

Browse files
authored
Merge pull request #4 from sabeechen/unicode-nonsense
Some fixes
2 parents ee31341 + fc1b1b9 commit 99c9183

File tree

4 files changed

+156
-29
lines changed

4 files changed

+156
-29
lines changed

decrypt-ha-backup/__main__.py

+43-25
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,27 @@
99
import random
1010
import getpass
1111
from typing import IO
12-
import securetar
1312
import tempfile
1413
import platform
1514
from pathlib import Path
15+
from .hacked_secure_tar_file import HackedSecureTarFile
16+
17+
class FailureError(Exception):
18+
"""Indicates a failure with a user readable message attached"""
19+
20+
def __init__(self, message: str) -> None:
21+
"""Initialize failure error."""
22+
super().__init__(message)
23+
self.message = message
24+
25+
def __str__(self) -> str:
26+
"""Return string representation of failure error."""
27+
return self.message
1628

17-
#PATH = "EncryptedFolders.tar"
18-
PATH = "EncryptedFolders.tar"
19-
PASSWORD = "orcsorcs"
2029

2130
def password_to_key(password: str) -> bytes:
2231
"""Generate a AES Key from password."""
23-
key: bytes = password.encode()
32+
key: bytes = password.encode("utf-8")
2433
for _ in range(100):
2534
key = hashlib.sha256(key).digest()
2635
return key[:16]
@@ -31,15 +40,8 @@ def key_to_iv(key: bytes) -> bytes:
3140
key = hashlib.sha256(key).digest()
3241
return key[:16]
3342

34-
def _generate_iv(key: bytes, salt: bytes) -> bytes:
35-
"""Generate an iv from data."""
36-
temp_iv = key + salt
37-
for _ in range(100):
38-
temp_iv = hashlib.sha256(temp_iv).digest()
39-
return temp_iv[:16]
40-
4143
def overwrite(line: str):
42-
sys.stdout.write(f"\r{line}\033[K")
44+
sys.stdout.write(f"\r{line.encode('utf-8', 'replace').decode()}\033[K")
4345

4446
def readTarMembers(tar: tarfile.TarFile):
4547
while(True):
@@ -57,6 +59,8 @@ def __init__(self, tarfile: tarfile.TarFile):
5759
except KeyError:
5860
self._configMember = self._tarfile.getmember("./backup.json")
5961
json_file = self._tarfile.extractfile(self._configMember)
62+
if not json_file:
63+
raise FailureError("Backup doesn't contain a metadata file named 'snapshot.json' or 'backup.json'")
6064
self._config = json.loads(json_file.read())
6165
json_file.close()
6266
self._items = [BackupItem(entry['slug'], entry['name'], self) for entry in self._config.get("addons")]
@@ -118,6 +122,8 @@ def __init__(self, slug, name, backup: Backup):
118122
self._name = name
119123
self._backup = backup
120124
self._info = self._backup._tarfile.getmember(self.fileName)
125+
if not self._info:
126+
raise FailureError(f"Backup file doesn't contain a file for {self._name} with the name '{self.fileName}'")
121127

122128
@property
123129
def fileName(self):
@@ -141,7 +147,10 @@ def size(self):
141147
return self.info.size
142148

143149
def _open(self):
144-
return self._backup._tarfile.extractfile(self.info)
150+
data = self._backup._tarfile.extractfile(self.info)
151+
if not data:
152+
raise FailureError(f"Backup file doesn't contain a file named {self.info.name}")
153+
return data
145154

146155
def _extractTo(self, file: IO):
147156
progress = 0
@@ -154,9 +163,7 @@ def _extractTo(self, file: IO):
154163
file.write(data)
155164
overwrite(f"Extracting '{self.name}' {round(100 * progress/self.size, 1)}%")
156165
progress += len(data)
157-
file.flush()
158166
overwrite(f"Extracting '{self.name}' {round(100 * progress/self.size, 1)}%")
159-
file.seek(0)
160167
print()
161168

162169
def _copyTar(self, source: tarfile.TarFile, dest: tarfile.TarFile):
@@ -167,12 +174,13 @@ def _copyTar(self, source: tarfile.TarFile, dest: tarfile.TarFile):
167174
else:
168175
dest.addfile(member, source.extractfile(member))
169176

170-
def addTo(self, output: tarfile, key: bytes):
171-
with tempfile.NamedTemporaryFile() as extracted:
172-
self._extractTo(extracted)
173-
overwrite(f"Decrypting '{self.name}'")
174-
extracted.seek(0)
175-
with securetar.SecureTarFile(Path(extracted.name), "r", key=key, gzip=self._backup.compressed) as decrypted:
177+
def addTo(self, temp_folder: str, output: tarfile.TarFile, key: bytes):
178+
temp_file = os.path.join(temp_folder, os.urandom(24).hex())
179+
try:
180+
with open(temp_file, "wb") as f:
181+
self._extractTo(f)
182+
overwrite(f"Decrypting '{self.name}'")
183+
with HackedSecureTarFile(Path(temp_file), key=key, gzip=self._backup.compressed) as decrypted:
176184
with tempfile.NamedTemporaryFile() as processed:
177185
tarmode = "w|" + ("gz" if self._backup.compressed else "")
178186
with tarfile.open(f"{self.slug}.tar", tarmode, fileobj=processed) as archivetar:
@@ -187,6 +195,10 @@ def addTo(self, output: tarfile, key: bytes):
187195
output.addfile(info, processed)
188196
overwrite(f"Saving '{self.name}' done")
189197
print()
198+
finally:
199+
if os.path.isfile(temp_file):
200+
os.remove(temp_file)
201+
pass
190202

191203

192204
def main():
@@ -209,12 +221,13 @@ def main():
209221
resp = input(f"The output file '{args.output_file}' already exists, do you want to overwrite it [y/n]?")
210222
if not resp.startswith("y"):
211223
print("Aborted")
212-
exit()
224+
exit(1)
213225

214226
if args.password is None:
215227
# ask fro a password
216228
args.password = getpass.getpass("Backup Password:")
217229

230+
temp_dir = tempfile.gettempdir()
218231
try:
219232
with tarfile.open(Path(args.backup_file), "r:") as backup_file:
220233
backup = Backup(backup_file)
@@ -226,10 +239,11 @@ def main():
226239
return
227240

228241
_key = password_to_key(args.password)
242+
print(_key)
229243

230244
with tarfile.open(args.output_file, "w:") as output:
231245
for archive in backup.items:
232-
archive.addTo(output, _key)
246+
archive.addTo(temp_dir, output, _key)
233247

234248
# Add the modified backup config
235249
backup.addModifiedConfig(output)
@@ -238,11 +252,15 @@ def main():
238252
except tarfile.ReadError as e:
239253
if "not a gzip file" in str(e):
240254
print("The file could not be read as a gzip file. Please ensure your password is correct.")
255+
else:
256+
raise
257+
except FailureError as e:
258+
print(e)
259+
exit(1)
241260

242261

243262
if __name__ == '__main__':
244263
if platform.system() == 'Windows':
245264
from ctypes import windll
246265
windll.kernel32.SetConsoleMode(windll.kernel32.GetStdHandle(-11), 7)
247-
248266
main()
+107
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
"""Tarfile fileobject handler for encrypted files."""
2+
import hashlib
3+
import logging
4+
from pathlib import Path
5+
import tarfile
6+
from typing import IO, Optional
7+
8+
from cryptography.hazmat.backends import default_backend
9+
from cryptography.hazmat.primitives.ciphers import (
10+
Cipher,
11+
CipherContext,
12+
algorithms,
13+
modes,
14+
)
15+
16+
_LOGGER: logging.Logger = logging.getLogger(__name__)
17+
18+
DEFAULT_BUFSIZE = 10240
19+
20+
class HackedSecureTarFile:
21+
"""Thsi is a hacked up verison of SecureTarFile that works around a bunch of windows specific issues the library has"""
22+
23+
def __init__(
24+
self,
25+
name: Path,
26+
key: bytes,
27+
gzip: bool = True,
28+
bufsize: int = DEFAULT_BUFSIZE,
29+
) -> None:
30+
"""Initialize encryption handler."""
31+
self._file: Optional[IO[bytes]] = None
32+
self._name: Path = name
33+
self._bufsize: int = bufsize
34+
35+
# Tarfile options
36+
self._tar: Optional[tarfile.TarFile] = None
37+
self._tar_mode: str = f"r|gz" if gzip else f"r|"
38+
39+
# Encryption/Description
40+
self._aes: Optional[Cipher] = None
41+
self._key = key
42+
43+
# Function helper
44+
self._decrypt: Optional[CipherContext] = None
45+
self._init = True
46+
47+
def __enter__(self) -> tarfile.TarFile:
48+
try:
49+
# Encrypted/Decryped Tarfile
50+
self._file = open(self._name, "rb")
51+
52+
# Extract IV for CBC
53+
cbc_rand = self._file.read(16)
54+
55+
# Create Cipher
56+
self._aes = Cipher(
57+
algorithms.AES(self._key),
58+
modes.CBC(_generate_iv(self._key, cbc_rand)),
59+
backend=default_backend(),
60+
)
61+
self._decrypt = self._aes.decryptor()
62+
self._tar = tarfile.open(
63+
fileobj=self, mode=self._tar_mode, dereference=False, bufsize=self._bufsize
64+
)
65+
return self._tar
66+
except:
67+
self.__exit__(None, None, None)
68+
raise
69+
70+
def __exit__(self, exc_type, exc_value, traceback) -> None:
71+
"""Close file."""
72+
if self._tar:
73+
self._tar.close()
74+
self._tar = None
75+
if self._file:
76+
self._file.close()
77+
self._file = None
78+
79+
def read(self, size: int = 0) -> bytes:
80+
"""Read data."""
81+
assert self._decrypt is not None
82+
assert self._file is not None
83+
data = self._decrypt.update(self._file.read(size))
84+
return data
85+
86+
@property
87+
def path(self) -> Path:
88+
"""Return path object of tarfile."""
89+
return self._name
90+
91+
@property
92+
def size(self) -> float:
93+
"""Return backup size."""
94+
if not self._name.is_file():
95+
return 0
96+
return round(self._name.stat().st_size / 1_048_576, 2) # calc mbyte
97+
98+
99+
def _generate_iv(key: bytes, salt: bytes) -> bytes:
100+
"""Generate an iv from data."""
101+
temp_iv = key + salt
102+
for _ in range(100):
103+
temp_iv = hashlib.sha256(temp_iv).digest()
104+
return temp_iv[:16]
105+
106+
107+

scripts/publish.sh

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
#!/bin/sh
2+
13
pip install -q --upgrade setupext-janitor twine build
2-
python3 setup.py clean --dist --eggs
4+
python3 setup.py clean
35
python3 -m build
46
keyring --disable
57
python3 -m twine upload dist/*

setup.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@
1010
'Programming Language :: Python :: 3',
1111
'Programming Language :: Python :: 3.9',
1212
],
13-
packages=find_packages(include=['decrypt-ha-backup']),
14-
version='2022.7.14.4',
13+
packages=find_packages(),
14+
version='2023.10.28.1',
1515
description='Decryption utility for Home Assistant backups',
1616
long_description=long_description,
1717
long_description_content_type="text/markdown",
18-
install_requires=["securetar"],
18+
install_requires=["cryptography"],
1919
author="Stephen Beechen",
2020
author_email="[email protected]",
2121
python_requires=">=3.9",

0 commit comments

Comments
 (0)