|
| 1 | +import os |
| 2 | +from pathlib import Path |
| 3 | +import base64 |
| 4 | +import json |
| 5 | +import shutil |
| 6 | +import tempfile |
| 7 | +import pytest |
| 8 | + |
| 9 | +from engine.security.encryption import ( |
| 10 | + EncryptionManager, |
| 11 | + encrypt_sensitive_data, |
| 12 | + decrypt_sensitive_data, |
| 13 | +) |
| 14 | + |
| 15 | + |
| 16 | +@pytest.fixture() |
| 17 | +def temp_dir(tmp_path: Path): |
| 18 | + # Use a temp working dir to avoid polluting repo |
| 19 | + cwd = os.getcwd() |
| 20 | + os.chdir(tmp_path) |
| 21 | + try: |
| 22 | + yield tmp_path |
| 23 | + finally: |
| 24 | + os.chdir(cwd) |
| 25 | + |
| 26 | + |
| 27 | +def test_encrypt_decrypt_roundtrip_string(temp_dir: Path): |
| 28 | + mgr = EncryptionManager() |
| 29 | + plaintext = "hello secret" |
| 30 | + token = mgr.encrypt(plaintext) |
| 31 | + assert isinstance(token, (bytes, bytearray)) |
| 32 | + out = mgr.decrypt(token) |
| 33 | + assert out == plaintext |
| 34 | + |
| 35 | + |
| 36 | +def test_encrypt_decrypt_roundtrip_bytes(temp_dir: Path): |
| 37 | + mgr = EncryptionManager() |
| 38 | + data = b"\x00\x01binary" |
| 39 | + token = mgr.encrypt(data) |
| 40 | + out = mgr.decrypt(token) |
| 41 | + # decrypt returns str for non-JSON, so compare bytes->str decode |
| 42 | + assert out == data.decode() |
| 43 | + |
| 44 | + |
| 45 | +def test_encrypt_decrypt_roundtrip_dict(temp_dir: Path): |
| 46 | + mgr = EncryptionManager() |
| 47 | + payload = {"a": 1, "b": "two"} |
| 48 | + token = mgr.encrypt(payload) |
| 49 | + out = mgr.decrypt(token) |
| 50 | + assert isinstance(out, dict) |
| 51 | + assert out == payload |
| 52 | + |
| 53 | + |
| 54 | +def test_encrypt_decrypt_file_flow(temp_dir: Path): |
| 55 | + mgr = EncryptionManager() |
| 56 | + src = Path("sample.txt") |
| 57 | + src.write_text("file content", encoding="utf-8") |
| 58 | + |
| 59 | + enc_path = mgr.encrypt_file(src) |
| 60 | + assert enc_path.exists() |
| 61 | + assert enc_path.suffix == ".enc" |
| 62 | + |
| 63 | + dec_path = mgr.decrypt_file(enc_path) |
| 64 | + assert dec_path.exists() |
| 65 | + assert dec_path.read_text(encoding="utf-8") == "file content" |
| 66 | + |
| 67 | + |
| 68 | +def test_rotate_keys_and_multi_decrypt(temp_dir: Path): |
| 69 | + mgr = EncryptionManager() |
| 70 | + token_old = mgr.encrypt("keep me") |
| 71 | + |
| 72 | + # rotate keys to ensure a new primary is added; keep both |
| 73 | + mgr.rotate_keys(max_keys=2) |
| 74 | + |
| 75 | + # Should still decrypt using MultiFernet |
| 76 | + out = mgr.decrypt(token_old) |
| 77 | + assert out == "keep me" |
| 78 | + |
| 79 | + |
| 80 | +def test_reencrypt_file_and_directory(temp_dir: Path): |
| 81 | + mgr = EncryptionManager() |
| 82 | + directory = Path("vault") |
| 83 | + directory.mkdir() |
| 84 | + |
| 85 | + # Prepare two files |
| 86 | + for i in range(2): |
| 87 | + p = directory / f"f{i}.txt" |
| 88 | + p.write_text(f"data {i}", encoding="utf-8") |
| 89 | + mgr.encrypt_file(p) |
| 90 | + # remove plaintext |
| 91 | + p.unlink() |
| 92 | + |
| 93 | + # Now rotate keys and reencrypt directory |
| 94 | + mgr.rotate_keys(max_keys=3) |
| 95 | + success, total = mgr.reencrypt_directory(directory) |
| 96 | + assert total == 2 |
| 97 | + assert success == 2 |
| 98 | + |
| 99 | + |
| 100 | +def test_encrypt_sensitive_and_decrypt_sensitive_data(temp_dir: Path): |
| 101 | + data = { |
| 102 | + "username": "bob", |
| 103 | + "password": "secret123", |
| 104 | + "token": None, # None should be ignored |
| 105 | + } |
| 106 | + sensitive = ["password", "token"] |
| 107 | + |
| 108 | + enc = encrypt_sensitive_data(data, sensitive) |
| 109 | + assert enc["username"] == "bob" |
| 110 | + assert enc.get("password_encrypted") is True |
| 111 | + # base64 string stored |
| 112 | + enc_val = enc["password"] |
| 113 | + assert isinstance(enc_val, str) |
| 114 | + |
| 115 | + dec = decrypt_sensitive_data(enc) |
| 116 | + assert dec.get("password_encrypted") is None |
| 117 | + assert dec["password"] == "secret123" |
| 118 | + |
| 119 | + |
| 120 | +def test_init_with_password_derivation_creates_keys(temp_dir: Path): |
| 121 | + # ensure no keys exist |
| 122 | + keys_dir = Path("config") / "encryption_keys" |
| 123 | + if keys_dir.exists(): |
| 124 | + shutil.rmtree(keys_dir) |
| 125 | + |
| 126 | + mgr = EncryptionManager(password="passw0rd") |
| 127 | + # current_keys.json should be created |
| 128 | + key_file = keys_dir / "current_keys.json" |
| 129 | + assert key_file.exists() |
| 130 | + with open(key_file, "r") as f: |
| 131 | + serialized = json.load(f) |
| 132 | + assert isinstance(serialized, list) and len(serialized) >= 1 |
| 133 | + # keys are base64 strings |
| 134 | + base64.b64decode(serialized[0]["key"]) # will raise if invalid |
| 135 | + |
| 136 | + |
| 137 | +def test_reencrypt_directory_handles_empty_and_invalid_path(temp_dir: Path): |
| 138 | + mgr = EncryptionManager() |
| 139 | + |
| 140 | + empty_dir = Path("empty") |
| 141 | + empty_dir.mkdir() |
| 142 | + succ, tot = mgr.reencrypt_directory(empty_dir) |
| 143 | + assert (succ, tot) == (0, 0) |
| 144 | + |
| 145 | + # non-existing path -> method catches and returns (0,0) |
| 146 | + succ2, tot2 = mgr.reencrypt_directory(Path("no_such_dir")) |
| 147 | + assert (succ2, tot2) == (0, 0) |
0 commit comments