Skip to content

Commit b6eb111

Browse files
#6501 feat(local): forward Lambda response streaming through start-api
sam local start-api was buffering Lambda invokes in a StringIO and then running the result through API Gateway proxy JSON parsing, which made it impossible to test functions that use awslambda.streamifyResponse (or any other Lambda response-streaming pattern, e.g. Server-Sent Events). The HTTP request to RIE was issued without stream=True and the response body was materialised in full before the caller saw a single byte. This change adds a parallel "streaming-aware" invoke pipeline that the API Gateway service uses by default while keeping the buffered code path exactly as it was for non-streaming responses: * docker/container.py: new wait_for_streaming_response / _make_streaming_http_request methods that issue the RIE invoke with stream=True and hand the caller the raw requests.Response. A matching release_streaming_slot keeps the per-container concurrency semaphore correctly balanced across the lifetime of the stream. * lambdafn/runtime.py: new LambdaRuntime.invoke_streaming that builds / runs the container the same way invoke() does but returns a tuple of (response, cleanup). The cleanup callable releases the slot and invokes _on_invoke_done so warm containers stay warm while cold containers are stopped after the stream is fully consumed. * commands/local/lib/local_lambda.py: thin wrapper plumbing invoke_streaming through LocalLambdaRunner. * apigw/streaming_response.py (new): turns the streaming RIE response into a flask.Response. When the upstream Content-Type is application/vnd.awslambda.http-integration-response, the JSON prelude is parsed and applied to the outgoing status/headers/cookies before the body bytes (after the 8-NUL delimiter) are forwarded chunk by chunk. Chunks are pulled with raw.read1() for minimal latency, and call_on_close hooks the cleanup callable to the Flask response lifecycle. * apigw/local_apigw_service.py: _request_handler now invokes through the streaming-aware helper. When the runtime advertises streaming (via Lambda-Runtime-Function-Response-Mode), the streamed response is forwarded straight to the browser with CORS headers merged in. Otherwise the previously-buffered body is parsed and rendered through the existing v1/v2 payload-format helpers. * local/rapid/aws-lambda-rie-*: refreshed RIE binaries built from the streaming-aware fork that lives in aws-lambda-runtime-interface-emulator (case-insensitive response mode comparison + streamingCopy pass-through). Validated end-to-end with a Node.js Lambda using awslambda.streamifyResponse: an EventSource client in a browser receives one SSE frame per second over a 10-second stream rather than seeing all 10 frames arrive together at the end. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 9ffa5ed commit b6eb111

7 files changed

Lines changed: 664 additions & 4 deletions

File tree

samcli/commands/local/lib/local_lambda.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,46 @@ def invoke(
271271

272272
return headers
273273

274+
def invoke_streaming(
275+
self,
276+
function_identifier: str,
277+
event: str,
278+
tenant_id: Optional[str] = None,
279+
stderr: Optional[StreamWriter] = None,
280+
override_runtime: Optional[str] = None,
281+
function: Optional[Function] = None,
282+
):
283+
"""
284+
Streaming counterpart of :meth:`invoke`.
285+
286+
Resolves the function, then asks the underlying LambdaRuntime to
287+
perform a streaming invocation. Returns a tuple
288+
``(response, cleanup)`` where ``response`` is the raw streaming
289+
:class:`requests.Response` from the local Runtime Interface
290+
Emulator (RIE) and ``cleanup`` is a callable the caller MUST run
291+
when it has finished consuming the body.
292+
293+
This is used by the API Gateway local service to forward
294+
Lambda response-streaming output (e.g. Server-Sent Events
295+
produced by ``awslambda.streamifyResponse`` in Node.js) to the
296+
browser without buffering.
297+
"""
298+
if not function:
299+
function = self.get_function(function_identifier, tenant_id)
300+
config = self.get_invoke_config(function, override_runtime)
301+
302+
return self.local_runtime.invoke_streaming(
303+
config,
304+
event,
305+
tenant_id,
306+
debug_context=self.debug_context,
307+
stderr=stderr,
308+
container_host=self.container_host,
309+
container_host_interface=self.container_host_interface,
310+
extra_hosts=self.extra_hosts,
311+
container_dns=self.container_dns,
312+
)
313+
274314
def is_debugging(self) -> bool:
275315
"""
276316
Are we debugging the invoke?

samcli/local/apigw/local_apigw_service.py

Lines changed: 173 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@
3232
from samcli.local.apigw.path_converter import PathConverter
3333
from samcli.local.apigw.route import Route
3434
from samcli.local.apigw.service_error_responses import ServiceErrorResponses
35+
from samcli.local.apigw.streaming_response import (
36+
build_streaming_flask_response,
37+
is_streaming_response,
38+
)
3539
from samcli.local.docker.exceptions import DockerContainerCreationFailedException
3640
from samcli.local.events.api_event import (
3741
ContextHTTP,
@@ -60,6 +64,12 @@ def to_url(self, value):
6064
class LocalApigwService(BaseLocalService):
6165
_DEFAULT_PORT = 3000
6266
_DEFAULT_HOST = "127.0.0.1"
67+
# Hard ceiling on the size of a buffered (non-streaming) Lambda
68+
# response we are willing to materialize in memory. Slightly above
69+
# AWS Lambda's documented 6 MB synchronous response payload limit so
70+
# legitimate responses pass through unmodified, but stops a misbehaving
71+
# or hung runtime from blowing the SAM CLI process up.
72+
_BUFFERED_RESPONSE_BYTE_CAP = 8 * 1024 * 1024
6373

6474
def __init__(
6575
self,
@@ -630,6 +640,124 @@ def _invoke_lambda_function(
630640

631641
return lambda_response
632642

643+
def _route_uses_response_stream(self, route: Route) -> bool:
644+
"""Return ``True`` if the function backing ``route`` opts into
645+
Lambda response streaming via ``FunctionUrlConfig.InvokeMode ==
646+
RESPONSE_STREAM``. Routes whose function does not declare it
647+
(or whose Function can't be resolved) fall through to the
648+
historical buffered invoke path."""
649+
if not route.function_name:
650+
return False
651+
provider = getattr(self.lambda_runner, "provider", None)
652+
if provider is None:
653+
return False
654+
try:
655+
function = provider.get(route.function_name)
656+
except Exception: # pylint: disable=broad-except
657+
return False
658+
if function is None:
659+
return False
660+
url_config = getattr(function, "function_url_config", None) or {}
661+
invoke_mode = url_config.get("InvokeMode") if isinstance(url_config, dict) else None
662+
return isinstance(invoke_mode, str) and invoke_mode.upper() == "RESPONSE_STREAM"
663+
664+
def _invoke_lambda_function_streaming(
665+
self,
666+
lambda_function_name: str,
667+
event: dict,
668+
tenant_id: Optional[str],
669+
cors_headers: Dict[str, str],
670+
):
671+
"""
672+
Streaming counterpart of :meth:`_invoke_lambda_function`. Only
673+
used for routes whose Lambda function opts in via
674+
``FunctionUrlConfig.InvokeMode == RESPONSE_STREAM``.
675+
676+
Issues the invoke against the local RIE with ``stream=True`` so
677+
the response body is exposed to us as it arrives. If the runtime
678+
advertises a streaming response (via the
679+
``Lambda-Runtime-Function-Response-Mode`` header) we forward the
680+
bytes through to the browser via
681+
:func:`build_streaming_flask_response`. Otherwise we drain the
682+
body, parse the standard API Gateway proxy JSON and return a
683+
regular Flask response built from it — that case is rare for an
684+
opted-in route (it would mean the function is misconfigured) but
685+
keeps the behavior well-defined.
686+
"""
687+
event_str = json.dumps(event, sort_keys=True)
688+
response, cleanup = self.lambda_runner.invoke_streaming(
689+
lambda_function_name, event_str, tenant_id=tenant_id, stderr=self.stderr
690+
)
691+
692+
if is_streaming_response(response):
693+
LOG.debug("Lambda function returned a streaming response, forwarding chunks as they arrive")
694+
return build_streaming_flask_response(
695+
response,
696+
on_complete=cleanup,
697+
extra_headers=cors_headers,
698+
)
699+
700+
# Buffered response: drain the body now so we can release the
701+
# container as soon as possible, then route through the existing
702+
# API Gateway parser.
703+
#
704+
# We use iter_content with a hard byte cap instead of
705+
# response.content because the latter would happily allocate
706+
# gigabytes if a misconfigured (or hung) runtime kept writing,
707+
# and would block the container's concurrency slot for the
708+
# entire read.
709+
try:
710+
chunks: list = []
711+
received = 0
712+
for chunk in response.iter_content(chunk_size=64 * 1024):
713+
if not chunk:
714+
continue
715+
received += len(chunk)
716+
if received > self._BUFFERED_RESPONSE_BYTE_CAP:
717+
LOG.error(
718+
"Buffered response from streaming-enabled Lambda %s exceeded %d bytes; aborting read",
719+
lambda_function_name,
720+
self._BUFFERED_RESPONSE_BYTE_CAP,
721+
)
722+
raise LambdaResponseParseException(
723+
f"Lambda response exceeds {self._BUFFERED_RESPONSE_BYTE_CAP} byte cap"
724+
)
725+
chunks.append(chunk)
726+
body_bytes = b"".join(chunks)
727+
finally:
728+
cleanup()
729+
730+
# Preserve the historical behaviour of get_lambda_output() which
731+
# round-trips through json.dumps to normalize ordering / encoding.
732+
body_text: Union[str, bytes]
733+
try:
734+
body_text = json.dumps(json.loads(body_bytes), ensure_ascii=False)
735+
except (ValueError, json.JSONDecodeError):
736+
try:
737+
body_text = body_bytes.decode("utf-8")
738+
except UnicodeDecodeError:
739+
body_text = body_bytes # type: ignore[assignment]
740+
741+
if isinstance(body_text, str) and LambdaOutputParser.is_lambda_error_response(body_text):
742+
raise LambdaResponseParseException
743+
744+
# An opted-in route should always return a streaming response.
745+
# If we end up here the function is misconfigured (declares
746+
# RESPONSE_STREAM but returned a buffered body). Surface the
747+
# raw body verbatim with CORS headers; we deliberately do NOT
748+
# try to parse it as an API Gateway proxy response — callers
749+
# that need that should remove InvokeMode=RESPONSE_STREAM.
750+
LOG.warning(
751+
"Lambda function %s is declared with InvokeMode=RESPONSE_STREAM but did not stream; "
752+
"returning the raw body verbatim",
753+
lambda_function_name,
754+
)
755+
return self.service_response(
756+
body_text if isinstance(body_text, str) else body_text.decode("utf-8", errors="replace"),
757+
Headers(cors_headers),
758+
200,
759+
)
760+
633761
def _request_handler(self, **kwargs):
634762
"""
635763
We handle all requests to the host:port. The general flow of handling a request is as follows
@@ -735,11 +863,52 @@ def _request_handler(self, **kwargs):
735863
return auth_service_error
736864

737865
endpoint_service_error = None
738-
try:
739-
# Extract tenant-id from HTTP request header
740-
tenant_id = request.headers.get("X-Amz-Tenant-Id")
741866

742-
# invoke the route's Lambda function
867+
# Extract tenant-id from HTTP request header
868+
tenant_id = request.headers.get("X-Amz-Tenant-Id")
869+
870+
# The streaming pipeline is opt-in per function via
871+
# FunctionUrlConfig.InvokeMode == RESPONSE_STREAM, mirroring the
872+
# contract Lambda Function URLs use in production. Functions that
873+
# don't declare it keep going through the historical buffered
874+
# invoke path untouched.
875+
use_streaming = self._route_uses_response_stream(route)
876+
877+
if use_streaming:
878+
streaming_response = None
879+
try:
880+
streaming_response = self._invoke_lambda_function_streaming(
881+
route.function_name, route_lambda_event, tenant_id, cors_headers
882+
)
883+
except TenantIdValidationError as e:
884+
endpoint_service_error = ServiceErrorResponses.tenant_id_validation_error(str(e))
885+
except FunctionNotFound:
886+
endpoint_service_error = ServiceErrorResponses.lambda_not_found_response()
887+
except UnsupportedInlineCodeError:
888+
endpoint_service_error = ServiceErrorResponses.not_implemented_locally(
889+
"Inline code is not supported for sam local commands. Please write your code in a separate file."
890+
)
891+
except DockerContainerCreationFailedException as ex:
892+
endpoint_service_error = ServiceErrorResponses.container_creation_failed(ex.message)
893+
except MissingFunctionNameException as ex:
894+
endpoint_service_error = ServiceErrorResponses.lambda_failure_response(
895+
f"Failed to execute endpoint. Got an invalid function name ({str(ex)})",
896+
)
897+
except Exception as ex: # pylint: disable=broad-except
898+
# Mirror the catch-all behavior of the buffered invoke
899+
# path so unexpected failures during the streaming invoke
900+
# (e.g. requests.ConnectionError from the upstream stream
901+
# POST) surface to the caller as a Lambda failure response
902+
# instead of a generic 500 from Flask.
903+
LOG.error("Failed to invoke streaming Lambda function: %s", ex, exc_info=True)
904+
endpoint_service_error = ServiceErrorResponses.lambda_failure_response()
905+
906+
if endpoint_service_error:
907+
return endpoint_service_error
908+
return streaming_response
909+
910+
try:
911+
# invoke the route's Lambda function (buffered path, unchanged)
743912
lambda_response = self._invoke_lambda_function(route.function_name, route_lambda_event, tenant_id)
744913
except TenantIdValidationError as e:
745914
endpoint_service_error = ServiceErrorResponses.tenant_id_validation_error(str(e))

0 commit comments

Comments
 (0)