@@ -368,16 +368,20 @@ def write_commands(
368368
369369 def _process_command_finished (self , pending_command , ignore_errors ):
370370 try :
371- if pending_command .future .exception () is None :
372- LOG .debug ('Command finished: %s' , pending_command )
373371 self ._writer .inflight_commands .remove (pending_command )
374372 except ValueError :
375373 LOG .warning ('Tried to remove %s even though it was already removed.' , pending_command )
376374
377- if pending_command .future .exception () is not None :
378- # TODO: more with this; maybe let the user respond to this
379- LOG .exception ('A command submission failed!' ,
380- exc_info = pending_command .future .exception ())
375+ if pending_command .future .cancelled ():
376+ LOG .debug ("Command cancelled: %s" , pending_command )
377+ else :
378+ ex = pending_command .future .exception ()
379+ if ex is not None :
380+ # TODO: more with this; maybe let the user respond to this
381+ LOG .exception ('A command submission failed!' ,
382+ exc_info = pending_command .future .exception ())
383+ else :
384+ LOG .debug ('Command finished: %s' , pending_command )
381385
382386 async def main_writer (self ):
383387 """
@@ -401,6 +405,12 @@ async def main_writer(self):
401405 ledger_effective_time = metadata .time_model .get_time ()
402406 command_payloads = [] # type: List[Tuple[_PendingCommand, Sequence[CommandPayload]]]
403407
408+ if p .future .done ():
409+ # PendingCommand instances that are already marked as done have either been marked
410+ # as a failure or cancelled by the caller. Do NOT send the corresponding Ledger API
411+ # command because the PendingCommand() has effectively been aborted.
412+ continue
413+
404414 self ._writer .inflight_commands .append (p )
405415 try :
406416 defaults = CommandDefaults (
@@ -419,7 +429,8 @@ async def main_writer(self):
419429 else :
420430 # This is a "null command"; don't even bother sending to the server. Immediately
421431 # resolve the future successfully and discard
422- p .future .set_result (None )
432+ if not p .future .done ():
433+ p .future .set_result (None )
423434 except Exception as ex :
424435 LOG .exception ("Tried to send a command and failed!" )
425436 p .notify_read_fail (ex )
@@ -491,15 +502,28 @@ def notify_read_done(self, command_id: str, ledger_time: Optional[datetime]):
491502 self .command_ids .discard (command_id )
492503 if not self .command_ids :
493504 # the command is finished
494- self .future .set_result (None )
505+ if not self .future .done ():
506+ self .future .set_result (None )
495507 return
496508
497509 if self .max_record_time is not None and ledger_time is not None and \
498510 self .max_record_time < ledger_time :
499- self .future .set_exception (CommandTimeoutError ())
511+ if self .future .done ():
512+ LOG .debug (
513+ 'A command timed out on the server and the client also cancelled the request.' )
514+ else :
515+ self .future .set_exception (CommandTimeoutError ())
500516
501517 def notify_read_fail (self , ex : Exception ):
502- self .future .set_exception (ex )
518+ if self .future .done ():
519+ # The user may have cancelled the command or otherwise terminated the Future; trying
520+ # to set an exception on a done Future will itself throw an exception, so the best
521+ # we can do is log
522+ LOG .exception (
523+ 'An exception was received and reported to a Future that has already been aborted! '
524+ 'This cannot be reported in a normal way.' , ex )
525+ else :
526+ self .future .set_exception (ex )
503527
504528 def __eq__ (self , other ):
505529 return self is other
0 commit comments