Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions common-contract/pdo/contracts/guardian/common/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@
# limitations under the License.

import jsonschema
import os
import random
import string

from pdo.common.key_value import KeyValueStore

import logging
logger = logging.getLogger(__name__)

# -----------------------------------------------------------------
def ValidateJSON(instance, schema):
Expand All @@ -21,3 +29,69 @@ def ValidateJSON(instance, schema):
except jsonschema.exceptions.ValidationError as err:
return False
return True

# -----------------------------------------------------------------
# Size of chunks to store per key; this is the maximum size of a
# single key in the KeyValueStore
__CHUNK_SIZE__ = 1024

# -----------------------------------------------------------------
def send_file(file_name, block_store = None, **kwargs):
"""
Store the contents of a file in the KeyValueStore under a specified key. Sync any updated
blocks to the block store if specified. Returns a dictionary containing information that
can be used to receive the file from the KeyValueStore later.

:param file_name: Name of the file to be stored.
:param block_store: Optional parameter of type pdo.service_client.storage.StorageServiceClient

:return: A dictionary containing the base key, number of chunks, encryption key, and state hash.
"""

key = ''.join(random.choice(string.ascii_letters) for _ in range(16))

kv = KeyValueStore()

chunks = (os.path.getsize(file_name) + __CHUNK_SIZE__ - 1) // __CHUNK_SIZE__
with open(file_name, 'rb') as fp :
for chunk_number in range(0, chunks) :
chunk = fp.read(__CHUNK_SIZE__)
with kv : _ = kv.set(f'{key}_{chunk_number}', chunk, input_encoding='str', output_encoding='raw')

if block_store :
_ = kv.sync_to_block_store(block_store, **kwargs)

file_information = dict()
file_information['key_base'] = key
file_information['chunks'] = chunks
file_information['encryption_key'] = kv.encryption_key
file_information['state_hash'] = kv.hash_identity

return file_information

# -----------------------------------------------------------------
def recv_file(file_information, file_name, block_store = None, **kwargs) :
"""
Receive the contents of a file in the KeyValueStore under a specified key. Sync any updated
blocks from the block store if specified. Takes a dictionary containing the file information
as generated by `send_file`.

:param file_information: Dictionary containing the base key, number of chunks, encryption key, and state hash.
:param file_name: Name of the file to be received.
:param block_store: Optional parameter of type pdo.service_client.storage.StorageServiceClient
"""
key_base = file_information['key_base']
chunks = file_information['chunks']
encryption_key = file_information['encryption_key']
state_hash = file_information['state_hash']

kv = KeyValueStore(encryption_key, state_hash)
if block_store :
_ = kv.sync_from_block_store(state_hash, block_store, **kwargs)

with open(file_name, 'wb') as fp :
for chunk_number in range(chunks) :
with kv : chunk = kv.get(f'{key_base}_{chunk_number}', input_encoding='str', output_encoding='raw')
fp.write(bytes(chunk))

return True
20 changes: 12 additions & 8 deletions common-contract/pdo/contracts/guardian/wsgi/process_capability.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,28 +90,32 @@ def __call__(self, environ, start_response) :
return ErrorResponse(start_response, "invalid JSON")

except KeyError as ke :
logger.error('missing field in request: %s', ke)
return ErrorResponse(start_response, 'missing field {0}'.format(ke))
logger.error(f'missing field in request: {ke}')
return ErrorResponse(start_response, f'missing field in request: {ke}')
except Exception as e :
logger.error("unknown exception unpacking request (ProcessCapability); %s", str(e))
logger.error(f'unknown exception unpacking request (ProcessCapability); {e}')
return ErrorResponse(start_response, "unknown exception while unpacking request")

# dispatch the operation
try :
method_name = operation_message['method_name']
parameters = operation_message['parameters']
logger.info("process capability operation %s with parameters %s", method_name, parameters)
except KeyError as ke :
logger.error(f'missing field {ke}')
return ErrorResponse(start_response, f'missing field {ke}')

logger.info("process capability operation %s with parameters %s", method_name, parameters)

try :
operation = self.capability_handler_map[method_name]
operation_result = operation(parameters)
if operation_result is None :
return ErrorResponse(start_response, "operation failed")

except KeyError as ke :
logger.error('unknown operation: %s', )
return ErrorResponse(start_response, 'missing field {0}'.format(ke))
logger.error(f'unknown operation {ke}')
return ErrorResponse(start_response, f'unknown operation {ke}')
except Exception as e :
logger.error("unknown exception performing operation (ProcessCapability); %s", str(e))
logger.error(f'unknown exception performing operation (ProcessCapability); {e}')
return ErrorResponse(start_response, "unknown exception while performing operation")

# and process the result
Expand Down