|
1 | 1 | import base64
|
2 | 2 | import io
|
| 3 | +import ipaddress |
3 | 4 | import logging
|
4 | 5 | import os
|
5 | 6 | import time
|
6 | 7 | from typing import Any, Dict
|
7 | 8 |
|
| 9 | +import psutil |
8 | 10 | from api_inference_community.validation import (
|
9 | 11 | AUDIO,
|
10 | 12 | AUDIO_INPUTS,
|
|
27 | 29 | logger = logging.getLogger(__name__)
|
28 | 30 |
|
29 | 31 |
|
| 32 | +def already_left(request: Request) -> bool: |
| 33 | + """ |
| 34 | + Check if the caller has already left without waiting for the answer to come. This can help during burst to relieve |
| 35 | + the pressure on the worker by cancelling jobs whose results don't matter as they won't be fetched anyway |
| 36 | + :param request: |
| 37 | + :return: bool |
| 38 | + """ |
| 39 | + # NOTE(rg): Starlette method request.is_disconnected is totally broken, consumes the payload, does not return |
| 40 | + # the correct status. So we use the good old way to identify if the caller is still there. |
| 41 | + # In any case, if we are not sure, we return False |
| 42 | + logger.info("Checking if request caller already left") |
| 43 | + try: |
| 44 | + client = request.client |
| 45 | + host = client.host |
| 46 | + if not host: |
| 47 | + return False |
| 48 | + |
| 49 | + port = int(client.port) |
| 50 | + host = ipaddress.ip_address(host) |
| 51 | + |
| 52 | + if port <= 0 or port > 65535: |
| 53 | + logger.warning("Unexpected source port format for caller %s", port) |
| 54 | + return False |
| 55 | + counter = 0 |
| 56 | + for connection in psutil.net_connections(kind="tcp"): |
| 57 | + counter += 1 |
| 58 | + if connection.status != "ESTABLISHED": |
| 59 | + continue |
| 60 | + if not connection.laddr: |
| 61 | + continue |
| 62 | + if int(connection.laddr.port) != port: |
| 63 | + continue |
| 64 | + if ( |
| 65 | + not connection.laddr.ip |
| 66 | + or ipaddress.ip_address(connection.laddr.ip) != host |
| 67 | + ): |
| 68 | + continue |
| 69 | + logger.info( |
| 70 | + "Found caller connection still established, caller is most likely still there, %s", |
| 71 | + connection, |
| 72 | + ) |
| 73 | + return False |
| 74 | + except Exception as e: |
| 75 | + logger.warning( |
| 76 | + "Unexpected error while checking if caller already left, assuming still there" |
| 77 | + ) |
| 78 | + logger.exception(e) |
| 79 | + return False |
| 80 | + |
| 81 | + logger.info( |
| 82 | + "%d connections checked. No connection found matching to the caller, probably left", |
| 83 | + counter, |
| 84 | + ) |
| 85 | + return True |
| 86 | + |
| 87 | + |
30 | 88 | async def pipeline_route(request: Request) -> Response:
|
31 | 89 | start = time.time()
|
| 90 | + |
| 91 | + if already_left(request): |
| 92 | + logger.info("Discarding request as the caller already left") |
| 93 | + return Response(status_code=204) |
| 94 | + |
32 | 95 | payload = await request.body()
|
33 | 96 | task = os.environ["TASK"]
|
34 | 97 | if os.getenv("DEBUG", "0") in {"1", "true"}:
|
|
0 commit comments