Skip to content

Commit a8a4e2c

Browse files
sonthonaxrkdavidbrochart
authored andcommitted
Issue #12786: Create hook for dispatching messages out of order
1 parent 69a0b79 commit a8a4e2c

File tree

2 files changed

+30
-13
lines changed

2 files changed

+30
-13
lines changed

ipykernel/inprocess/client.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,10 +173,8 @@ def _dispatch_to_kernel(self, msg):
173173
raise RuntimeError('Cannot send request. No kernel exists.')
174174

175175
stream = kernel.shell_stream
176-
self.session.send(stream, msg)
177-
msg_parts = stream.recv_multipart()
178176
loop = asyncio.get_event_loop()
179-
loop.run_until_complete(kernel.dispatch_shell(msg_parts))
177+
loop.run_until_complete(kernel.dispatch_shell(msg))
180178
idents, reply_msg = self.session.recv(stream, copy=False)
181179
self.shell_channel.call_handlers_later(reply_msg)
182180

ipykernel/kernelbase.py

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -300,18 +300,14 @@ def should_handle(self, stream, msg, idents):
300300
return False
301301
return True
302302

303-
async def dispatch_shell(self, msg):
303+
async def dispatch_shell(self, msg, idents=None):
304304
"""dispatch shell requests"""
305305

306306
# flush control queue before handling shell requests
307307
await self._flush_control_queue()
308308

309-
idents, msg = self.session.feed_identities(msg, copy=False)
310-
try:
311-
msg = self.session.deserialize(msg, content=True, copy=False)
312-
except Exception:
313-
self.log.error("Invalid Message", exc_info=True)
314-
return
309+
if idents is None:
310+
idents = []
315311

316312
# Set the parent message for side effects.
317313
self.set_parent(idents, msg, channel='shell')
@@ -465,15 +461,38 @@ async def dispatch_queue(self):
465461
def _message_counter_default(self):
466462
return itertools.count()
467463

468-
def schedule_dispatch(self, dispatch, *args):
464+
def should_dispatch_immediately(self, msg):
465+
"""
466+
This provides a hook for dispatching incoming messages
467+
from the frontend immediately, and out of order.
468+
469+
It could be used to allow asynchronous messages from
470+
GUIs to be processed.
471+
"""
472+
return False
473+
474+
def schedule_dispatch(self, msg, dispatch):
469475
"""schedule a message for dispatch"""
476+
477+
idents, msg = self.session.feed_identities(msg, copy=False)
478+
try:
479+
msg = self.session.deserialize(msg, content=True, copy=False)
480+
except:
481+
self.log.error("Invalid shell message", exc_info=True)
482+
return
483+
484+
new_args = (msg, idents)
485+
486+
if self.should_dispatch_immediately(msg):
487+
return self.io_loop.add_callback(dispatch, *new_args)
488+
470489
idx = next(self._message_counter)
471490

472491
self.msg_queue.put_nowait(
473492
(
474493
idx,
475494
dispatch,
476-
args,
495+
new_args,
477496
)
478497
)
479498
# ensure the eventloop wakes up
@@ -497,7 +516,7 @@ def start(self):
497516
self.shell_stream.on_recv(
498517
partial(
499518
self.schedule_dispatch,
500-
self.dispatch_shell,
519+
dispatch=self.dispatch_shell,
501520
),
502521
copy=False,
503522
)

0 commit comments

Comments
 (0)