diff --git a/common-contract/pdo/contracts/guardian/common/utility.py b/common-contract/pdo/contracts/guardian/common/utility.py index a76d6a8..207d1a8 100644 --- a/common-contract/pdo/contracts/guardian/common/utility.py +++ b/common-contract/pdo/contracts/guardian/common/utility.py @@ -13,6 +13,14 @@ # limitations under the License. import jsonschema +import os +import random +import string + +from pdo.common.key_value import KeyValueStore + +import logging +logger = logging.getLogger(__name__) # ----------------------------------------------------------------- def ValidateJSON(instance, schema): @@ -21,3 +29,69 @@ def ValidateJSON(instance, schema): except jsonschema.exceptions.ValidationError as err: return False return True + +# ----------------------------------------------------------------- +# Size of chunks to store per key; this is the maximum size of a +# single key in the KeyValueStore +__CHUNK_SIZE__ = 1024 + +# ----------------------------------------------------------------- +def send_file(file_name, block_store = None, **kwargs): + """ + Store the contents of a file in the KeyValueStore under a specified key. Sync any updated + blocks to the block store if specified. Returns a dictionary containing information that + can be used to receive the file from the KeyValueStore later. + + :param file_name: Name of the file to be stored. + :param block_store: Optional parameter of type pdo.service_client.storage.StorageServiceClient + + :return: A dictionary containing the base key, number of chunks, encryption key, and state hash. + """ + + key = ''.join(random.choice(string.ascii_letters) for _ in range(16)) + + kv = KeyValueStore() + + chunks = (os.path.getsize(file_name) + __CHUNK_SIZE__ - 1) // __CHUNK_SIZE__ + with open(file_name, 'rb') as fp : + for chunk_number in range(0, chunks) : + chunk = fp.read(__CHUNK_SIZE__) + with kv : _ = kv.set(f'{key}_{chunk_number}', chunk, input_encoding='str', output_encoding='raw') + + if block_store : + _ = kv.sync_to_block_store(block_store, **kwargs) + + file_information = dict() + file_information['key_base'] = key + file_information['chunks'] = chunks + file_information['encryption_key'] = kv.encryption_key + file_information['state_hash'] = kv.hash_identity + + return file_information + +# ----------------------------------------------------------------- +def recv_file(file_information, file_name, block_store = None, **kwargs) : + """ + Receive the contents of a file in the KeyValueStore under a specified key. Sync any updated + blocks from the block store if specified. Takes a dictionary containing the file information + as generated by `send_file`. + + :param file_information: Dictionary containing the base key, number of chunks, encryption key, and state hash. + :param file_name: Name of the file to be received. + :param block_store: Optional parameter of type pdo.service_client.storage.StorageServiceClient + """ + key_base = file_information['key_base'] + chunks = file_information['chunks'] + encryption_key = file_information['encryption_key'] + state_hash = file_information['state_hash'] + + kv = KeyValueStore(encryption_key, state_hash) + if block_store : + _ = kv.sync_from_block_store(state_hash, block_store, **kwargs) + + with open(file_name, 'wb') as fp : + for chunk_number in range(chunks) : + with kv : chunk = kv.get(f'{key_base}_{chunk_number}', input_encoding='str', output_encoding='raw') + fp.write(bytes(chunk)) + + return True diff --git a/common-contract/pdo/contracts/guardian/wsgi/process_capability.py b/common-contract/pdo/contracts/guardian/wsgi/process_capability.py index b715673..d8080a8 100644 --- a/common-contract/pdo/contracts/guardian/wsgi/process_capability.py +++ b/common-contract/pdo/contracts/guardian/wsgi/process_capability.py @@ -90,28 +90,32 @@ def __call__(self, environ, start_response) : return ErrorResponse(start_response, "invalid JSON") except KeyError as ke : - logger.error('missing field in request: %s', ke) - return ErrorResponse(start_response, 'missing field {0}'.format(ke)) + logger.error(f'missing field in request: {ke}') + return ErrorResponse(start_response, f'missing field in request: {ke}') except Exception as e : - logger.error("unknown exception unpacking request (ProcessCapability); %s", str(e)) + logger.error(f'unknown exception unpacking request (ProcessCapability); {e}') return ErrorResponse(start_response, "unknown exception while unpacking request") # dispatch the operation try : method_name = operation_message['method_name'] parameters = operation_message['parameters'] - logger.info("process capability operation %s with parameters %s", method_name, parameters) + except KeyError as ke : + logger.error(f'missing field {ke}') + return ErrorResponse(start_response, f'missing field {ke}') + + logger.info("process capability operation %s with parameters %s", method_name, parameters) + try : operation = self.capability_handler_map[method_name] operation_result = operation(parameters) if operation_result is None : return ErrorResponse(start_response, "operation failed") - except KeyError as ke : - logger.error('unknown operation: %s', ) - return ErrorResponse(start_response, 'missing field {0}'.format(ke)) + logger.error(f'unknown operation {ke}') + return ErrorResponse(start_response, f'unknown operation {ke}') except Exception as e : - logger.error("unknown exception performing operation (ProcessCapability); %s", str(e)) + logger.error(f'unknown exception performing operation (ProcessCapability); {e}') return ErrorResponse(start_response, "unknown exception while performing operation") # and process the result