diff --git a/tornado/curl_httpclient.py b/tornado/curl_httpclient.py index 3ed82b765b..3683953cba 100644 --- a/tornado/curl_httpclient.py +++ b/tornado/curl_httpclient.py @@ -22,6 +22,7 @@ import threading import time from io import BytesIO +import asyncio from tornado import httputil from tornado import ioloop @@ -96,9 +97,9 @@ def close(self) -> None: self._multi = None def fetch_impl( - self, request: HTTPRequest, callback: Callable[[HTTPResponse], None] + self, request: HTTPRequest, callback: Callable[[HTTPResponse], None], future: asyncio.Future ) -> None: - self._requests.append((request, callback, self.io_loop.time())) + self._requests.append((request, callback, future, self.io_loop.time())) self._process_queue() self._set_timeout(0) @@ -211,6 +212,12 @@ def _finish_pending_requests(self) -> None: self._finish(curl, errnum, errmsg) if num_q == 0: break + for curl in self._curls: + if curl not in self._free_list: + future = curl.info['future'] + if future and future.cancelled(): + curl.info['future'] = None + curl.close() self._process_queue() def _process_queue(self) -> None: @@ -219,13 +226,14 @@ def _process_queue(self) -> None: while self._free_list and self._requests: started += 1 curl = self._free_list.pop() - (request, callback, queue_start_time) = self._requests.popleft() + (request, callback, future, queue_start_time) = self._requests.popleft() # TODO: Don't smuggle extra data on an attribute of the Curl object. curl.info = { # type: ignore "headers": httputil.HTTPHeaders(), "buffer": BytesIO(), "request": request, "callback": callback, + "future": future, "queue_start_time": queue_start_time, "curl_start_time": time.time(), "curl_start_ioloop_time": self.io_loop.current().time(), diff --git a/tornado/httpclient.py b/tornado/httpclient.py index 673c7470ed..df59d1b664 100644 --- a/tornado/httpclient.py +++ b/tornado/httpclient.py @@ -42,6 +42,7 @@ import ssl import time import weakref +import asyncio from tornado.concurrent import ( Future, @@ -303,11 +304,11 @@ def handle_response(response: "HTTPResponse") -> None: return future_set_result_unless_cancelled(future, response) - self.fetch_impl(cast(HTTPRequest, request_proxy), handle_response) + self.fetch_impl(cast(HTTPRequest, request_proxy), handle_response, future) return future def fetch_impl( - self, request: "HTTPRequest", callback: Callable[["HTTPResponse"], None] + self, request: "HTTPRequest", callback: Callable[["HTTPResponse"], None], future: asyncio.Future ) -> None: raise NotImplementedError() diff --git a/tornado/simple_httpclient.py b/tornado/simple_httpclient.py index 1e05786510..305719942e 100644 --- a/tornado/simple_httpclient.py +++ b/tornado/simple_httpclient.py @@ -32,6 +32,7 @@ import time from io import BytesIO import urllib.parse +import asyncio from typing import Dict, Any, Callable, Optional, Type, Union from types import TracebackType @@ -163,7 +164,7 @@ def close(self) -> None: self.tcp_client.close() def fetch_impl( - self, request: HTTPRequest, callback: Callable[[HTTPResponse], None] + self, request: HTTPRequest, callback: Callable[[HTTPResponse], None], future: asyncio.Future ) -> None: key = object() self.queue.append((key, request, callback))