diff --git a/ipykernel/inprocess/client.py b/ipykernel/inprocess/client.py index e6b3b358a..bff31cf9f 100644 --- a/ipykernel/inprocess/client.py +++ b/ipykernel/inprocess/client.py @@ -171,10 +171,7 @@ def _dispatch_to_kernel(self, msg): raise RuntimeError('Cannot send request. No kernel exists.') stream = kernel.shell_stream - self.session.send(stream, msg) - msg_parts = stream.recv_multipart() - kernel.dispatch_shell(msg_parts) - + kernel.dispatch_shell(msg) idents, reply_msg = self.session.recv(stream, copy=False) self.shell_channel.call_handlers_later(reply_msg) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 6495cbcf0..3c6d62349 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -240,14 +240,10 @@ def should_handle(self, stream, msg, idents): return True @gen.coroutine - def dispatch_shell(self, msg): + def dispatch_shell(self, msg, idents=None): """dispatch shell requests""" - idents, msg = self.session.feed_identities(msg, copy=False) - try: - msg = self.session.deserialize(msg, content=True, copy=False) - except: - self.log.error("Invalid Message", exc_info=True) - return + if idents is None: + idents = [] # Set the parent message for side effects. self.set_parent(idents, msg, channel='shell') @@ -403,15 +399,38 @@ def dispatch_queue(self): def _message_counter_default(self): return itertools.count() - def schedule_dispatch(self, dispatch, *args): + def should_dispatch_immediately(self, msg): + """ + This provides a hook for dispatching incoming messages + from the frontend immediately, and out of order. + + It could be used to allow asynchronous messages from + GUIs to be processed. + """ + return False + + def schedule_dispatch(self, msg, dispatch): """schedule a message for dispatch""" + + idents, msg = self.session.feed_identities(msg, copy=False) + try: + msg = self.session.deserialize(msg, content=True, copy=False) + except: + self.log.error("Invalid shell message", exc_info=True) + return + + new_args = (msg, idents) + + if self.should_dispatch_immediately(msg): + return self.io_loop.add_callback(dispatch, *new_args) + idx = next(self._message_counter) self.msg_queue.put_nowait( ( idx, dispatch, - args, + new_args, ) ) # ensure the eventloop wakes up @@ -428,7 +447,7 @@ def start(self): self.shell_stream.on_recv( partial( self.schedule_dispatch, - self.dispatch_shell, + dispatch=self.dispatch_shell, ), copy=False, )