Skip to content

Commit 8097f3e

Browse files
committed
rsa_keys: refactor
1 parent 8aebf27 commit 8097f3e

File tree

11 files changed

+88
-51
lines changed

11 files changed

+88
-51
lines changed

job_executor/adapter/fs/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
from job_executor.adapter.fs.datastore_files import DatastoreDirectory
66
from job_executor.adapter.fs.input_files import InputDirectory
7+
from job_executor.adapter.fs.private_keys_directory import PrivateKeysDirectory
78
from job_executor.adapter.fs.working_files import WorkingDirectory
9+
from job_executor.config import environment
810

911

1012
class FileSystemAdapter(Protocol):
@@ -21,13 +23,17 @@ class LocalStorageAdapter:
2123
datastore_dir: DatastoreDirectory
2224
working_dir: WorkingDirectory
2325
input_dir: InputDirectory
26+
private_keys_dir: PrivateKeysDirectory
2427

25-
def __init__(self, datastore_dir_path: Path) -> None:
28+
def __init__(self, datastore_dir_path: Path, datastore_rdn: str) -> None:
2629
self.datastore_dir = DatastoreDirectory(datastore_dir_path)
2730
self.working_dir = WorkingDirectory(
2831
Path(f"{datastore_dir_path}_working")
2932
)
3033
self.input_dir = InputDirectory(Path(f"{datastore_dir_path}_input"))
34+
self.private_keys_dir = PrivateKeysDirectory(
35+
Path(environment.private_keys_dir) / datastore_rdn
36+
)
3137

3238
def move_working_dir_parquet_to_datastore(self, dataset_name: str) -> None:
3339
"""
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import os
2+
from dataclasses import dataclass
3+
from pathlib import Path
4+
5+
6+
@dataclass
7+
class PrivateKeysDirectory:
8+
path_with_rdn: Path
9+
10+
def create(self) -> bool:
11+
if not self.path_with_rdn.exists():
12+
os.makedirs(self.path_with_rdn)
13+
return True
14+
return False
15+
16+
def save_private_key(self, microdata_private_key_pem: bytes) -> None:
17+
with open(self._get_private_key_location(), "wb") as file:
18+
file.write(microdata_private_key_pem)
19+
20+
def clean_up(self) -> bool:
21+
if self._get_private_key_location().exists():
22+
os.remove(self._get_private_key_location())
23+
return True
24+
return False
25+
26+
def _get_private_key_location(self) -> Path:
27+
return self.path_with_rdn / "microdata_private_key.pem"

job_executor/app.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def initialize_app() -> Manager:
2424
rollback.fix_interrupted_jobs()
2525
for rdn in datastore_api.get_datastores():
2626
local_storage = LocalStorageAdapter(
27-
datastore_api.get_datastore_directory(rdn)
27+
datastore_api.get_datastore_directory(rdn), rdn
2828
)
2929
if local_storage.datastore_dir.temporary_backup_exists():
3030
raise StartupException(f"tmp directory exists for {rdn}")

job_executor/domain/datastores.py

Lines changed: 14 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,4 @@
11
import logging
2-
import os
3-
from pathlib import Path
4-
5-
from cryptography.hazmat.backends import default_backend
6-
from cryptography.hazmat.primitives import serialization
7-
from cryptography.hazmat.primitives.asymmetric import rsa
82

93
from job_executor.adapter import datastore_api
104
from job_executor.adapter.datastore_api.models import JobStatus
@@ -25,12 +19,12 @@
2519
UnnecessaryUpdateException,
2620
VersioningException,
2721
)
28-
from job_executor.config import environment
2922
from job_executor.domain.models import JobContext
3023
from job_executor.domain.rollback import (
3124
rollback_bump,
3225
rollback_manager_phase_import_job,
3326
)
27+
from job_executor.domain.rsa_keys import generate_rsa_key_pair
3428

3529
logger = logging.getLogger()
3630

@@ -605,55 +599,34 @@ def generate_rsa_keys(
605599
"""
606600
job_id = job_context.job.job_id
607601
datastore_rdn = job_context.job.datastore_rdn
602+
private_keys_dir = job_context.local_storage.private_keys_dir
608603
try:
609604
logger.info(f"{job_id}: initiated")
610605
datastore_api.update_job_status(job_id, JobStatus.INITIATED)
611606

612-
target_dir = Path(environment.private_keys_dir) / datastore_rdn
613-
614-
if not target_dir.exists():
615-
logger.info(
616-
f"{job_id}: Creating private keys directory at {target_dir}"
617-
)
618-
os.makedirs(target_dir)
619-
620-
logger.info(f"{job_id}: Generating RSA key pair")
621-
private_key = rsa.generate_private_key(
622-
public_exponent=65537, key_size=2048, backend=default_backend()
623-
)
624-
public_key = private_key.public_key()
625-
626-
microdata_private_key_pem = private_key.private_bytes(
627-
encoding=serialization.Encoding.PEM,
628-
format=serialization.PrivateFormat.PKCS8,
629-
encryption_algorithm=serialization.NoEncryption(),
607+
logger.info(
608+
f"{job_id}: Checking private keys directory at "
609+
f"{private_keys_dir.path_with_rdn}"
630610
)
611+
if private_keys_dir.create():
612+
logger.info(f"{job_id}: Private keys directory created")
631613

632-
private_key_location = target_dir / "microdata_private_key.pem"
633-
with open(private_key_location, "wb") as file:
634-
file.write(microdata_private_key_pem)
635-
logger.info(f"{job_id}: Saved private key to {private_key_location}")
614+
logger.info(f"{job_id}: Generating RSA key pair")
615+
private_key_pem, public_key_pem = generate_rsa_key_pair()
636616

637-
microdata_public_key_pem = public_key.public_bytes(
638-
encoding=serialization.Encoding.PEM,
639-
format=serialization.PublicFormat.SubjectPublicKeyInfo,
640-
)
617+
private_keys_dir.save_private_key(private_key_pem)
618+
logger.info(f"{job_id}: Saved private key")
641619

642620
try:
643621
logger.info(f"{job_id}: Posting public key to datastore-api")
644-
datastore_api.post_public_key(
645-
datastore_rdn, microdata_public_key_pem
646-
)
622+
datastore_api.post_public_key(datastore_rdn, public_key_pem)
647623
except Exception as post_error:
648624
logger.error(
649625
f"{job_id}: Failed to post public key to datastore-api, "
650626
"cleaning up saved private key"
651627
)
652-
if private_key_location.exists():
653-
os.remove(private_key_location)
654-
logger.info(
655-
f"{job_id}: Deleted private key at {private_key_location}"
656-
)
628+
if private_keys_dir.clean_up():
629+
logger.info(f"{job_id}: Deleted private key due to error")
657630
raise post_error
658631

659632
logger.info(f"{job_id}: completed")

job_executor/domain/models.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ class JobContext:
1818

1919
def build_job_context(job: Job, handler: handler_type) -> JobContext:
2020
local_storage = LocalStorageAdapter(
21-
datastore_api.get_datastore_directory(job.datastore_rdn)
21+
datastore_api.get_datastore_directory(job.datastore_rdn),
22+
job.datastore_rdn,
2223
)
2324
job_size = (
2425
local_storage.input_dir.get_importable_tar_size_in_bytes(

job_executor/domain/rollback.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@
2828
def rollback_bump(job: Job, bump_manifesto: DatastoreVersion) -> None:
2929
job_id = job.job_id
3030
local_storage = LocalStorageAdapter(
31-
datastore_api.get_datastore_directory(job.datastore_rdn)
31+
datastore_api.get_datastore_directory(job.datastore_rdn),
32+
job.datastore_rdn,
3233
)
3334
try:
3435
logger.info(f"{job_id}: Restoring files from temporary backup")
@@ -138,7 +139,8 @@ def rollback_worker_phase_import_job(
138139
) -> None:
139140
job_id = job.job_id
140141
local_storage = LocalStorageAdapter(
141-
datastore_api.get_datastore_directory(job.datastore_rdn)
142+
datastore_api.get_datastore_directory(job.datastore_rdn),
143+
job.datastore_rdn,
142144
)
143145
logger.warning(
144146
f"{job_id}: Rolling back worker job "
@@ -194,7 +196,8 @@ def rollback_manager_phase_import_job(
194196
"""
195197
job_id = job.job_id
196198
local_storage = LocalStorageAdapter(
197-
datastore_api.get_datastore_directory(job.datastore_rdn)
199+
datastore_api.get_datastore_directory(job.datastore_rdn),
200+
job.datastore_rdn,
198201
)
199202
logger.warning(
200203
f"{job_id}: Rolling back import job "

job_executor/domain/rsa_keys.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import logging
2+
3+
from cryptography.hazmat.backends import default_backend
4+
from cryptography.hazmat.primitives import serialization
5+
from cryptography.hazmat.primitives.asymmetric import rsa
6+
7+
logger = logging.getLogger()
8+
9+
10+
def generate_rsa_key_pair() -> tuple[bytes, bytes]:
11+
private_key = rsa.generate_private_key(
12+
public_exponent=65537, key_size=2048, backend=default_backend()
13+
)
14+
public_key = private_key.public_key()
15+
16+
private_key_pem = private_key.private_bytes(
17+
encoding=serialization.Encoding.PEM,
18+
format=serialization.PrivateFormat.PKCS8,
19+
encryption_algorithm=serialization.NoEncryption(),
20+
)
21+
22+
public_key_pem = public_key.public_bytes(
23+
encoding=serialization.Encoding.PEM,
24+
format=serialization.PublicFormat.SubjectPublicKeyInfo,
25+
)
26+
27+
return private_key_pem, public_key_pem

tests/integration/test_datastore.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ def generate_job_parameters(
112112

113113
return JobContext(
114114
handler="worker",
115-
local_storage=LocalStorageAdapter(DATASTORE_DIR),
115+
local_storage=LocalStorageAdapter(DATASTORE_DIR, "TEST_DATASTORE"),
116116
job_size=100,
117117
job=Job(
118118
job_id="1",

tests/integration/test_import.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ def set_up_resources():
8888
def generate_job_context(operation: Operation, target: str) -> JobContext:
8989
return JobContext(
9090
handler="worker",
91-
local_storage=LocalStorageAdapter(DATASTORE_DIR),
91+
local_storage=LocalStorageAdapter(DATASTORE_DIR, "TEST_DATASTORE"),
9292
job_size=100,
9393
job=Job(
9494
job_id="1",

tests/unit/adapter/fs/model/test_datastore_versions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ def load_json(file_path):
1616

1717
DATASTORE_DIR = "tests/unit/resources/adapter/fs/TEST_DATASTORE"
1818
METADATA_DIR = f"{DATASTORE_DIR}/datastore"
19-
local_storage = LocalStorageAdapter(Path(DATASTORE_DIR))
19+
local_storage = LocalStorageAdapter(Path(DATASTORE_DIR), "TEST_DATASTORE")
2020
DATASTORE_VERSIONS_PATH = f"{METADATA_DIR}/datastore_versions.json"
2121

2222

0 commit comments

Comments
 (0)