Skip to content

Commit 04cfd7f

Browse files
committed
Update rapidjson/python-rapidjson to use the yggdrasil specific versions
Add option for running without redirecting output from models to queue via dont_queue_output Fix bug where cpp was not checked as language to disable use of direct connection Fix bug in broker where responses could get sent to the wrong client
1 parent 586898d commit 04cfd7f

File tree

9 files changed

+67
-18
lines changed

9 files changed

+67
-18
lines changed

.gitmodules

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[submodule "yggdrasil/rapidjson"]
22
path = yggdrasil/rapidjson
3-
url = https://github.com/cropsinsilico/rapidjson
3+
url = https://github.com/cropsinsilico/yggdrasil-rapidjson
44
branch = yggdrasil
55
[submodule "yggdrasil/FSPM2020_yggdrasil_workshop"]
66
path = yggdrasil/demos/fspm2020
@@ -11,5 +11,5 @@
1111
url = https://github.com/cropsinsilico/CiS2021-hackathon
1212
[submodule "python-rapidjson"]
1313
path = yggdrasil/python-rapidjson
14-
url = https://github.com/cropsinsilico/python-rapidjson.git
14+
url = https://github.com/cropsinsilico/yggdrasil-python-rapidjson
1515
branch = yggdrasil

yggdrasil/.ygg_schema.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3621,6 +3621,10 @@ definitions:
36213621
default: false
36223622
description: If True, the Python C API will be disabled. Defaults to False.
36233623
type: boolean
3624+
dont_queue_output:
3625+
default: false
3626+
description: If True, model output will not be captured.
3627+
type: boolean
36243628
driver:
36253629
deprecated: true
36263630
description: '[DEPRECATED] Name of driver class that should be used.'

yggdrasil/broker.py

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ def __init__(self, yaml=None):
4949
response_kwargs={'datatype': {'type': 'any'}},
5050
)
5151
self._delayed_requests = []
52+
self._awaiting_comm = []
5253
self._current_request = None
5354
self._comms = None
5455
self._connections = {}
@@ -230,12 +231,14 @@ def run_loop(self):
230231
timeout=0.1, return_message_object=True,
231232
quiet_timeout=True)
232233
if msg.flag in [CommBase.FLAG_EOF, CommBase.FLAG_EMPTY]:
234+
if self._delayed_requests:
235+
self.complete_request(self._delayed_requests.pop(0))
233236
return
234237
elif msg.flag != CommBase.FLAG_SUCCESS:
235238
raise BrokerError(f"Error receiving inside loop: "
236239
f"{CommBase.FLAG_TO_STRING[msg.flag]}")
237-
if self._delayed_requests:
238-
self.complete_request(self._delayed_requests.pop(0))
240+
msg.args['id'] = msg.header['__meta__']['request_id']
241+
# self.server_comm.info(f"REQUEST [{msg.args['action']}]")
239242
self.complete_request(msg.args)
240243

241244
def run_finally(self):
@@ -264,15 +267,21 @@ def complete_request(self, request):
264267
"""
265268
error = None
266269
try:
270+
# self.server_comm.info(f"PROCESS REQUEST "
271+
# f"[{request['action']}]: {request}")
267272
response = self.process_request(request)
268273
except DelayedRequestError:
274+
# self.server_comm.info(f"DELAYED REQUEST "
275+
# f"[{request['action']}]")
269276
self._delayed_requests.append(request)
270277
return
271278
except BaseException as e:
272279
error = e
273280
tb = traceback.format_exc()
274281
response = {'error': str(e), 'traceback': tb}
275-
flag = self.server_comm.send(response)
282+
# self.server_comm.info(f"RESPONSE [{request['action']}]: "
283+
# f"{response}")
284+
flag = self.server_comm.send_to(request['id'], response)
276285
if error:
277286
raise error
278287
if not flag:
@@ -483,15 +492,22 @@ def _send_request(cls, action, *args, **kwargs):
483492
'kwargs': kwargs,
484493
}
485494
client = cls.get_client()
486-
flag, response = client.call(request)
495+
# client.info(f"REQUEST [{action}]")
496+
flag = client.send(request)
487497
if not flag:
488498
raise BrokerError(f"{model}: Failed to send request to "
489499
f"the broker {request}")
500+
msg = client.recv(return_message_object=True, timeout=False)
501+
if msg.flag != CommBase.FLAG_SUCCESS:
502+
raise BrokerError(f"{model}: Failed to receive response from "
503+
f"the broker {request} ({msg.flag})")
504+
response = msg.args
490505
if 'error' in response:
491506
raise BrokerError(f"{model}: Error on broker process during "
492507
f"response to request {request}.\n"
493508
f"{response['error']}\n"
494509
f"{response['traceback']}")
510+
# client.info(f"RESPONSE [{action}]: {response['return']}")
495511
return response['return']
496512

497513
@classmethod
@@ -513,6 +529,10 @@ def update_model_comm_kwargs(cls, name, kwargs, self=None):
513529
assert isinstance(comm, dict)
514530
comm.update(kwargs)
515531
assert comm['direction'] == direction
532+
if name in self._awaiting_comm:
533+
assert 'address' in comm
534+
# self.server_comm.info(f"UNLOCKING: {name}")
535+
self._awaiting_comm.remove(name)
516536

517537
@classmethod
518538
def model_comm_kwargs(cls, name, direction, self=None):
@@ -536,6 +556,11 @@ def model_comm_kwargs(cls, name, direction, self=None):
536556
out = {}
537557
is_split_server = False
538558
language = None
559+
if name in self._awaiting_comm:
560+
# self.server_comm.info(f"LOCKED {name}")
561+
raise DelayedRequestError(
562+
f"Delaying creationg of {name} comm until "
563+
f"the partner comm finishing creation")
539564
with self.lock:
540565
model_driver = self._models[model]
541566
language = model_driver.language
@@ -564,6 +589,9 @@ def model_comm_kwargs(cls, name, direction, self=None):
564589
)
565590
if is_split_server or out['commtype'] == 'model_function':
566591
out['global_scope'] = model
592+
if 'address' not in out and 'partner_name' in out:
593+
# self.server_comm.info(f"LOCKING {out['partner_name']}")
594+
self._awaiting_comm.append(out['partner_name'])
567595
return out
568596

569597
@classmethod

yggdrasil/communication/AsyncComm.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,7 @@ def recv_message(self, timeout=None, **kwargs):
478478
"""
479479
# Sleep until there is a message
480480
if timeout is None:
481-
timeout = kwargs.get('timeout', self.recv_timeout)
481+
timeout = self.recv_timeout
482482
T = self.start_timeout(timeout, key_suffix='.recv:backlog')
483483
while (not T.is_out) and (not self.backlog_ready.is_set()):
484484
self.backlog_ready.wait(self.sleeptime)

yggdrasil/drivers/DirectConnectionDriver.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def _init_single_comm(self, io, comm_list):
6060
x['model_copies'] = x.pop('partner_copies')
6161
if 'partner_language' in x:
6262
x['language'] = x.pop('partner_language')
63-
if x['language'] in ['c', 'c++', 'fortran']:
63+
if x['language'] in ['c', 'c++', 'cpp', 'fortran']:
6464
raise DirectConnectionError(
6565
"C use of comm broker not yet implemented")
6666
x.setdefault('commtype', comm_type)

yggdrasil/drivers/ModelDriver.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,8 @@ class ModelDriver(Driver):
418418
preserve_cache (bool, optional): [DEPRECATED] If True model
419419
products will be kept following the run, otherwise all
420420
products will be cleaned up.
421+
dont_queue_output (bool, optional): If True, model output will not
422+
be captured.
421423
with_strace (bool, optional): If True, the command is run with strace (on
422424
Linux) or dtrace (on MacOS). Defaults to False.
423425
strace_flags (list, optional): Flags to pass to strace (or dtrace).
@@ -673,6 +675,7 @@ class ModelDriver(Driver):
673675
'overwrite': {'type': 'boolean', 'default': False},
674676
'remove_products': {'type': 'boolean', 'default': False},
675677
'preserve_cache': {'type': 'boolean', 'default': True},
678+
'dont_queue_output': {'type': 'boolean', 'default': False},
676679
'function': {'type': 'string'},
677680
'iter_function_over': {'type': 'array', 'default': [],
678681
'items': {'type': 'string'}},
@@ -827,8 +830,10 @@ def __init__(self, name, args, model_index=0, copy_index=-1, clients=[],
827830
# TODO: Pass removable_source_exts used by Compiled drivers
828831
# Setup process things
829832
self.model_process = None
830-
self.queue = multitasking.Queue()
833+
self.queue = None
831834
self.queue_thread = None
835+
if not self.dont_queue_output:
836+
self.queue = multitasking.Queue()
832837
self.event_process_kill_called = multitasking.Event()
833838
self.event_process_kill_complete = multitasking.Event()
834839
# Tools
@@ -1525,7 +1530,8 @@ def run_model(self, command=None, return_process=True, **kwargs):
15251530
# not running locally.
15261531
default_kwargs = dict(env=env, working_dir=self.working_dir,
15271532
forward_signals=False,
1528-
shell=platform._is_win)
1533+
shell=platform._is_win,
1534+
allow_buffer=self.dont_queue_output)
15291535
for k, v in default_kwargs.items():
15301536
kwargs.setdefault(k, v)
15311537
return self.run_executable(command, return_process=return_process, **kwargs)
@@ -2025,7 +2031,7 @@ def before_start(self, no_queue_thread=False, **kwargs):
20252031
"""
20262032
self.model_process = self.run_model(**kwargs)
20272033
# Start thread to queue output
2028-
if not no_queue_thread:
2034+
if not (no_queue_thread or self.dont_queue_output):
20292035
self.queue_thread = multitasking.YggTaskLoop(
20302036
target=self.enqueue_output_loop,
20312037
name=self.name + '.EnqueueLoop')
@@ -2035,10 +2041,12 @@ def before_start(self, no_queue_thread=False, **kwargs):
20352041

20362042
def queue_close(self):
20372043
r"""Close the queue for messages from the model process."""
2038-
self.model_process.stdout.close()
2044+
if not self.dont_queue_output:
2045+
self.model_process.stdout.close()
20392046

20402047
def queue_recv(self):
20412048
r"""Receive a message from the model process."""
2049+
assert not self.dont_queue_output
20422050
return self.model_process.stdout.readline()
20432051

20442052
def enqueue_output_loop(self):
@@ -2185,6 +2193,13 @@ def run_loop(self):
21852193
self.set_break_flag()
21862194
for k in self._mpi_requests_checked:
21872195
self.check_mpi_request(k)
2196+
if self.dont_queue_output:
2197+
if self.model_process_complete:
2198+
self.debug("Model process complete")
2199+
self.set_break_flag()
2200+
else:
2201+
self.sleep()
2202+
return
21882203
try:
21892204
line = self.queue.get_nowait()
21902205
except Empty:

yggdrasil/tools.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1576,17 +1576,19 @@ class YggPopen(subprocess.Popen):
15761576
**kwargs: Additional keywords arguments are passed to Popen.
15771577
15781578
"""
1579-
def __init__(self, cmd_args, forward_signals=True, for_matlab=False, **kwargs):
1579+
def __init__(self, cmd_args, forward_signals=True, for_matlab=False,
1580+
allow_buffer=False, **kwargs):
15801581
# stdbuf only for linux
15811582
if platform._is_linux:
15821583
stdbuf_args = ['stdbuf', '-o0', '-e0']
15831584
if isinstance(cmd_args, str):
15841585
cmd_args = ' '.join(stdbuf_args + [cmd_args])
15851586
else:
15861587
cmd_args = stdbuf_args + cmd_args
1587-
kwargs.setdefault('bufsize', 0)
1588-
kwargs.setdefault('stdout', subprocess.PIPE)
1589-
kwargs.setdefault('stderr', subprocess.STDOUT)
1588+
if not allow_buffer:
1589+
kwargs.setdefault('bufsize', 0)
1590+
kwargs.setdefault('stdout', subprocess.PIPE)
1591+
kwargs.setdefault('stderr', subprocess.STDOUT)
15901592
# To prevent forward of signals, process will have a new process group
15911593
if not forward_signals:
15921594
if platform._is_win: # pragma: windows

0 commit comments

Comments
 (0)