Skip to content

Commit 839bafc

Browse files
committed
Add guardian helper functions to send/recv files
Add send_file and recv_file to the guardian utility functions. These routines will use the keyvalue store for moving files to and from another service. While useful beyond the guardian, this is where they are most needed. Eventually, it would be good to add comparable C++ routines that could be implemented within a contract. Also improved the error handling and message generation for capability processing. Signed-off-by: Mic Bowman <mic.bowman@intel.com>
1 parent fd4dab3 commit 839bafc

File tree

2 files changed

+89
-8
lines changed

2 files changed

+89
-8
lines changed

common-contract/pdo/contracts/guardian/common/utility.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,15 @@
1313
# limitations under the License.
1414

1515
import jsonschema
16+
import os
17+
import random
18+
import string
19+
20+
import pdo.common.crypto as crypto
21+
from pdo.common.key_value import KeyValueStore
22+
23+
import logging
24+
logger = logging.getLogger(__name__)
1625

1726
# -----------------------------------------------------------------
1827
def ValidateJSON(instance, schema):
@@ -21,3 +30,71 @@ def ValidateJSON(instance, schema):
2130
except jsonschema.exceptions.ValidationError as err:
2231
return False
2332
return True
33+
34+
# -----------------------------------------------------------------
35+
# Size of chunks to store per key; this is the maximum size of a
36+
# single key in the KeyValueStore
37+
__CHUNK_SIZE__ = 1024
38+
39+
# -----------------------------------------------------------------
40+
def send_file(file_name, block_store = None, **kwargs):
41+
"""
42+
Store the contents of a file in the KeyValueStore under a specified key. Sync any updated
43+
blocks to the block store if specified. Returns a dictionary containing information that
44+
can be used to receive the file from the KeyValueStore later.
45+
46+
:param file_name: Name of the file to be stored.
47+
:param block_store: Optional parameter of type pdo.service_client.storage.StorageServiceClient
48+
49+
:return: A dictionary containing the base key, number of chunks, encryption key, and state hash.
50+
"""
51+
52+
key = ''.join(random.choice(string.ascii_letters) for _ in range(16))
53+
54+
kv = KeyValueStore()
55+
56+
chunks = int(os.path.getsize(file_name) / __CHUNK_SIZE__) + 1
57+
with open(file_name, 'rb') as fp :
58+
for chunk_number in range(0, chunks) :
59+
chunk = fp.read(__CHUNK_SIZE__)
60+
with kv : _ = kv.set(f'{key}_{chunk_number}', chunk, input_encoding='str', output_encoding='raw')
61+
62+
if block_store :
63+
n = kv.sync_to_block_store(block_store, **kwargs)
64+
if n != chunk_number :
65+
raise ValueError(f'incorrect blocks writen; {n} != {chunk_number}')
66+
67+
file_information = dict()
68+
file_information['key_base'] = key
69+
file_information['chunks'] = chunks
70+
file_information['encryption_key'] = kv.encryption_key
71+
file_information['state_hash'] = kv.hash_identity
72+
73+
return file_information
74+
75+
# -----------------------------------------------------------------
76+
def recv_file(file_information, file_name, block_store = None, **kwargs) :
77+
"""
78+
Receive the contents of a file in the KeyValueStore under a specified key. Sync any updated
79+
blocks from the block store if specified. Takes a dictionary containing the file information
80+
as generated by `send_file`.
81+
82+
:param file_information: Dictionary containing the base key, number of chunks, encryption key, and state hash.
83+
:param file_name: Name of the file to be received.
84+
:param block_store: Optional parameter of type pdo.service_client.storage.StorageServiceClient
85+
"""
86+
key_base = file_information['key_base']
87+
chunks = file_information['chunks']
88+
encryption_key = file_information['encryption_key']
89+
state_hash = file_information['state_hash']
90+
91+
kv = KeyValueStore(encryption_key, state_hash)
92+
if block_store :
93+
_ = kv.sync_from_block_store(state_hash, block_store, **kwargs)
94+
95+
with open(file_name, 'wb') as fp :
96+
for chunk_number in range(chunks) :
97+
with kv : chunk = kv.get(f'{key_base}_{chunk_number}', input_encoding='str', output_encoding='raw')
98+
fp.write(bytes(chunk))
99+
100+
return True

common-contract/pdo/contracts/guardian/wsgi/process_capability.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -90,28 +90,32 @@ def __call__(self, environ, start_response) :
9090
return ErrorResponse(start_response, "invalid JSON")
9191

9292
except KeyError as ke :
93-
logger.error('missing field in request: %s', ke)
94-
return ErrorResponse(start_response, 'missing field {0}'.format(ke))
93+
logger.error(f'missing field in request: {ke}')
94+
return ErrorResponse(start_response, f'missing field in request: {ke}')
9595
except Exception as e :
96-
logger.error("unknown exception unpacking request (ProcessCapability); %s", str(e))
96+
logger.error(f'unknown exception unpacking request (ProcessCapability); {e}')
9797
return ErrorResponse(start_response, "unknown exception while unpacking request")
9898

9999
# dispatch the operation
100100
try :
101101
method_name = operation_message['method_name']
102102
parameters = operation_message['parameters']
103-
logger.info("process capability operation %s with parameters %s", method_name, parameters)
103+
except KeyError as ke :
104+
logger.error(f'missing field {ke}')
105+
return ErrorResponse(start_response, f'missing field {ke}')
106+
107+
logger.info("process capability operation %s with parameters %s", method_name, parameters)
104108

109+
try :
105110
operation = self.capability_handler_map[method_name]
106111
operation_result = operation(parameters)
107112
if operation_result is None :
108113
return ErrorResponse(start_response, "operation failed")
109-
110114
except KeyError as ke :
111-
logger.error('unknown operation: %s', )
112-
return ErrorResponse(start_response, 'missing field {0}'.format(ke))
115+
logger.error(f'unknown operation {ke}')
116+
return ErrorResponse(start_response, f'unknown operation {ke}')
113117
except Exception as e :
114-
logger.error("unknown exception performing operation (ProcessCapability); %s", str(e))
118+
logger.error(f'unknown exception performing operation (ProcessCapability); {e}')
115119
return ErrorResponse(start_response, "unknown exception while performing operation")
116120

117121
# and process the result

0 commit comments

Comments
 (0)