Skip to content

Commit f2e262f

Browse files
#6501 feat(local): forward Lambda response streaming through start-api
sam local start-api was buffering Lambda invokes in a StringIO and then running the result through API Gateway proxy JSON parsing, which made it impossible to test functions that use awslambda.streamifyResponse (or any other Lambda response-streaming pattern, e.g. Server-Sent Events). The HTTP request to RIE was issued without stream=True and the response body was materialised in full before the caller saw a single byte. This change adds a parallel "streaming-aware" invoke pipeline that the API Gateway service uses by default while keeping the buffered code path exactly as it was for non-streaming responses: * docker/container.py: new wait_for_streaming_response / _make_streaming_http_request methods that issue the RIE invoke with stream=True and hand the caller the raw requests.Response. A matching release_streaming_slot keeps the per-container concurrency semaphore correctly balanced across the lifetime of the stream. * lambdafn/runtime.py: new LambdaRuntime.invoke_streaming that builds / runs the container the same way invoke() does but returns a tuple of (response, cleanup). The cleanup callable releases the slot and invokes _on_invoke_done so warm containers stay warm while cold containers are stopped after the stream is fully consumed. * commands/local/lib/local_lambda.py: thin wrapper plumbing invoke_streaming through LocalLambdaRunner. * apigw/streaming_response.py (new): turns the streaming RIE response into a flask.Response. When the upstream Content-Type is application/vnd.awslambda.http-integration-response, the JSON prelude is parsed and applied to the outgoing status/headers/cookies before the body bytes (after the 8-NUL delimiter) are forwarded chunk by chunk. Chunks are pulled with raw.read1() for minimal latency, and call_on_close hooks the cleanup callable to the Flask response lifecycle. * apigw/local_apigw_service.py: _request_handler now invokes through the streaming-aware helper. When the runtime advertises streaming (via Lambda-Runtime-Function-Response-Mode), the streamed response is forwarded straight to the browser with CORS headers merged in. Otherwise the previously-buffered body is parsed and rendered through the existing v1/v2 payload-format helpers. * local/rapid/aws-lambda-rie-*: refreshed RIE binaries built from the streaming-aware fork that lives in aws-lambda-runtime-interface-emulator (case-insensitive response mode comparison + streamingCopy pass-through). Validated end-to-end with a Node.js Lambda using awslambda.streamifyResponse: an EventSource client in a browser receives one SSE frame per second over a 10-second stream rather than seeing all 10 frames arrive together at the end. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 9ffa5ed commit f2e262f

7 files changed

Lines changed: 685 additions & 4 deletions

File tree

samcli/commands/local/lib/local_lambda.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,46 @@ def invoke(
271271

272272
return headers
273273

274+
def invoke_streaming(
275+
self,
276+
function_identifier: str,
277+
event: str,
278+
tenant_id: Optional[str] = None,
279+
stderr: Optional[StreamWriter] = None,
280+
override_runtime: Optional[str] = None,
281+
function: Optional[Function] = None,
282+
):
283+
"""
284+
Streaming counterpart of :meth:`invoke`.
285+
286+
Resolves the function, then asks the underlying LambdaRuntime to
287+
perform a streaming invocation. Returns a tuple
288+
``(response, cleanup)`` where ``response`` is the raw streaming
289+
:class:`requests.Response` from the local Runtime Interface
290+
Emulator (RIE) and ``cleanup`` is a callable the caller MUST run
291+
when it has finished consuming the body.
292+
293+
This is used by the API Gateway local service to forward
294+
Lambda response-streaming output (e.g. Server-Sent Events
295+
produced by ``awslambda.streamifyResponse`` in Node.js) to the
296+
browser without buffering.
297+
"""
298+
if not function:
299+
function = self.get_function(function_identifier, tenant_id)
300+
config = self.get_invoke_config(function, override_runtime)
301+
302+
return self.local_runtime.invoke_streaming(
303+
config,
304+
event,
305+
tenant_id,
306+
debug_context=self.debug_context,
307+
stderr=stderr,
308+
container_host=self.container_host,
309+
container_host_interface=self.container_host_interface,
310+
extra_hosts=self.extra_hosts,
311+
container_dns=self.container_dns,
312+
)
313+
274314
def is_debugging(self) -> bool:
275315
"""
276316
Are we debugging the invoke?

samcli/local/apigw/local_apigw_service.py

Lines changed: 194 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@
3232
from samcli.local.apigw.path_converter import PathConverter
3333
from samcli.local.apigw.route import Route
3434
from samcli.local.apigw.service_error_responses import ServiceErrorResponses
35+
from samcli.local.apigw.streaming_response import (
36+
build_streaming_flask_response,
37+
is_streaming_response,
38+
)
3539
from samcli.local.docker.exceptions import DockerContainerCreationFailedException
3640
from samcli.local.events.api_event import (
3741
ContextHTTP,
@@ -60,6 +64,12 @@ def to_url(self, value):
6064
class LocalApigwService(BaseLocalService):
6165
_DEFAULT_PORT = 3000
6266
_DEFAULT_HOST = "127.0.0.1"
67+
# Hard ceiling on the size of a buffered (non-streaming) Lambda
68+
# response we are willing to materialize in memory. Slightly above
69+
# AWS Lambda's documented 6 MB synchronous response payload limit so
70+
# legitimate responses pass through unmodified, but stops a misbehaving
71+
# or hung runtime from blowing the SAM CLI process up.
72+
_BUFFERED_RESPONSE_BYTE_CAP = 8 * 1024 * 1024
6373

6474
def __init__(
6575
self,
@@ -630,6 +640,140 @@ def _invoke_lambda_function(
630640

631641
return lambda_response
632642

643+
def _route_uses_response_stream(self, route: Route) -> bool:
644+
"""Return ``True`` if the function backing ``route`` opts into
645+
Lambda response streaming via ``FunctionUrlConfig.InvokeMode ==
646+
RESPONSE_STREAM``. Routes whose function does not declare it
647+
(or whose Function can't be resolved) fall through to the
648+
historical buffered invoke path."""
649+
if not route.function_name:
650+
return False
651+
provider = getattr(self.lambda_runner, "provider", None)
652+
if provider is None:
653+
return False
654+
try:
655+
function = provider.get(route.function_name)
656+
except Exception: # pylint: disable=broad-except
657+
return False
658+
if function is None:
659+
return False
660+
url_config = getattr(function, "function_url_config", None) or {}
661+
invoke_mode = url_config.get("InvokeMode") if isinstance(url_config, dict) else None
662+
return isinstance(invoke_mode, str) and invoke_mode.upper() == "RESPONSE_STREAM"
663+
664+
def _invoke_lambda_function_streaming(
665+
self,
666+
lambda_function_name: str,
667+
event: dict,
668+
tenant_id: Optional[str],
669+
cors_headers: Dict[str, str],
670+
):
671+
"""
672+
Streaming counterpart of :meth:`_invoke_lambda_function`. Only
673+
used for routes whose Lambda function opts in via
674+
``FunctionUrlConfig.InvokeMode == RESPONSE_STREAM``.
675+
676+
Issues the invoke against the local RIE with ``stream=True`` so
677+
the response body is exposed to us as it arrives. If the runtime
678+
advertises a streaming response (via the
679+
``Lambda-Runtime-Function-Response-Mode`` header) we forward the
680+
bytes through to the browser via
681+
:func:`build_streaming_flask_response`. Otherwise we drain the
682+
body, parse the standard API Gateway proxy JSON and return a
683+
regular Flask response built from it — that case is rare for an
684+
opted-in route (it would mean the function is misconfigured) but
685+
keeps the behavior well-defined.
686+
"""
687+
event_str = json.dumps(event, sort_keys=True)
688+
response, cleanup = self.lambda_runner.invoke_streaming(
689+
lambda_function_name, event_str, tenant_id=tenant_id, stderr=self.stderr
690+
)
691+
692+
if is_streaming_response(response):
693+
LOG.debug("Lambda function returned a streaming response, forwarding chunks as they arrive")
694+
try:
695+
flask_response = build_streaming_flask_response(
696+
response,
697+
on_complete=cleanup,
698+
extra_headers=cors_headers,
699+
)
700+
except BaseException:
701+
# build_streaming_flask_response synchronously iterates the
702+
# upstream body to parse the http-integration JSON prelude.
703+
# If that read fails (connection reset, RIE crash, malformed
704+
# body) the Flask response is never built, ``call_on_close``
705+
# never fires, and the cleanup chain would leak the timeout
706+
# timer, the container's concurrency slot, the open
707+
# requests.Response, and (for cold runtimes) the container
708+
# itself. Run cleanup explicitly here so a construction-time
709+
# failure does not strand resources.
710+
cleanup()
711+
raise
712+
# cleanup is now owned by the Flask response lifecycle
713+
# (Response.call_on_close inside build_streaming_flask_response).
714+
return flask_response
715+
716+
# Buffered response: drain the body now so we can release the
717+
# container as soon as possible, then route through the existing
718+
# API Gateway parser.
719+
#
720+
# We use iter_content with a hard byte cap instead of
721+
# response.content because the latter would happily allocate
722+
# gigabytes if a misconfigured (or hung) runtime kept writing,
723+
# and would block the container's concurrency slot for the
724+
# entire read.
725+
try:
726+
chunks: list = []
727+
received = 0
728+
for chunk in response.iter_content(chunk_size=64 * 1024):
729+
if not chunk:
730+
continue
731+
received += len(chunk)
732+
if received > self._BUFFERED_RESPONSE_BYTE_CAP:
733+
LOG.error(
734+
"Buffered response from streaming-enabled Lambda %s exceeded %d bytes; aborting read",
735+
lambda_function_name,
736+
self._BUFFERED_RESPONSE_BYTE_CAP,
737+
)
738+
raise LambdaResponseParseException(
739+
f"Lambda response exceeds {self._BUFFERED_RESPONSE_BYTE_CAP} byte cap"
740+
)
741+
chunks.append(chunk)
742+
body_bytes = b"".join(chunks)
743+
finally:
744+
cleanup()
745+
746+
# Preserve the historical behaviour of get_lambda_output() which
747+
# round-trips through json.dumps to normalize ordering / encoding.
748+
body_text: Union[str, bytes]
749+
try:
750+
body_text = json.dumps(json.loads(body_bytes), ensure_ascii=False)
751+
except (ValueError, json.JSONDecodeError):
752+
try:
753+
body_text = body_bytes.decode("utf-8")
754+
except UnicodeDecodeError:
755+
body_text = body_bytes # type: ignore[assignment]
756+
757+
if isinstance(body_text, str) and LambdaOutputParser.is_lambda_error_response(body_text):
758+
raise LambdaResponseParseException
759+
760+
# An opted-in route should always return a streaming response.
761+
# If we end up here the function is misconfigured (declares
762+
# RESPONSE_STREAM but returned a buffered body). Surface the
763+
# raw body verbatim with CORS headers; we deliberately do NOT
764+
# try to parse it as an API Gateway proxy response — callers
765+
# that need that should remove InvokeMode=RESPONSE_STREAM.
766+
LOG.warning(
767+
"Lambda function %s is declared with InvokeMode=RESPONSE_STREAM but did not stream; "
768+
"returning the raw body verbatim",
769+
lambda_function_name,
770+
)
771+
return self.service_response(
772+
body_text if isinstance(body_text, str) else body_text.decode("utf-8", errors="replace"),
773+
Headers(cors_headers),
774+
200,
775+
)
776+
633777
def _request_handler(self, **kwargs):
634778
"""
635779
We handle all requests to the host:port. The general flow of handling a request is as follows
@@ -735,11 +879,57 @@ def _request_handler(self, **kwargs):
735879
return auth_service_error
736880

737881
endpoint_service_error = None
738-
try:
739-
# Extract tenant-id from HTTP request header
740-
tenant_id = request.headers.get("X-Amz-Tenant-Id")
741882

742-
# invoke the route's Lambda function
883+
# Extract tenant-id from HTTP request header
884+
tenant_id = request.headers.get("X-Amz-Tenant-Id")
885+
886+
# The streaming pipeline is opt-in per function via
887+
# FunctionUrlConfig.InvokeMode == RESPONSE_STREAM, mirroring the
888+
# contract Lambda Function URLs use in production. Functions that
889+
# don't declare it keep going through the historical buffered
890+
# invoke path untouched.
891+
use_streaming = self._route_uses_response_stream(route)
892+
893+
if use_streaming:
894+
streaming_response = None
895+
try:
896+
streaming_response = self._invoke_lambda_function_streaming(
897+
route.function_name, route_lambda_event, tenant_id, cors_headers
898+
)
899+
except TenantIdValidationError as e:
900+
endpoint_service_error = ServiceErrorResponses.tenant_id_validation_error(str(e))
901+
except FunctionNotFound:
902+
endpoint_service_error = ServiceErrorResponses.lambda_not_found_response()
903+
except UnsupportedInlineCodeError:
904+
endpoint_service_error = ServiceErrorResponses.not_implemented_locally(
905+
"Inline code is not supported for sam local commands. Please write your code in a separate file."
906+
)
907+
except DockerContainerCreationFailedException as ex:
908+
endpoint_service_error = ServiceErrorResponses.container_creation_failed(ex.message)
909+
except MissingFunctionNameException as ex:
910+
endpoint_service_error = ServiceErrorResponses.lambda_failure_response(
911+
f"Failed to execute endpoint. Got an invalid function name ({str(ex)})",
912+
)
913+
except LambdaResponseParseException:
914+
# Mirror the buffered path: a malformed / oversized
915+
# buffered fallback body inside the streaming pipeline
916+
# must surface as the same body-failure response and
917+
# status code, not as a generic invoke failure.
918+
endpoint_service_error = ServiceErrorResponses.lambda_body_failure_response()
919+
except Exception as ex: # pylint: disable=broad-except
920+
# Catch-all for unexpected failures in the streaming
921+
# invoke (e.g. requests.ConnectionError from the upstream
922+
# stream POST). Surface as a Lambda failure response
923+
# instead of a generic 500 from Flask.
924+
LOG.error("Failed to invoke streaming Lambda function: %s", ex, exc_info=True)
925+
endpoint_service_error = ServiceErrorResponses.lambda_failure_response()
926+
927+
if endpoint_service_error:
928+
return endpoint_service_error
929+
return streaming_response
930+
931+
try:
932+
# invoke the route's Lambda function (buffered path, unchanged)
743933
lambda_response = self._invoke_lambda_function(route.function_name, route_lambda_event, tenant_id)
744934
except TenantIdValidationError as e:
745935
endpoint_service_error = ServiceErrorResponses.tenant_id_validation_error(str(e))

0 commit comments

Comments
 (0)