Skip to content

Commit d3f3973

Browse files
#6501 feat(local): forward Lambda response streaming through start-api
sam local start-api was buffering Lambda invokes in a StringIO and then running the result through API Gateway proxy JSON parsing, which made it impossible to test functions that use awslambda.streamifyResponse (or any other Lambda response-streaming pattern, e.g. Server-Sent Events). The HTTP request to RIE was issued without stream=True and the response body was materialised in full before the caller saw a single byte. This change adds a parallel "streaming-aware" invoke pipeline that the API Gateway service uses by default while keeping the buffered code path exactly as it was for non-streaming responses: * docker/container.py: new wait_for_streaming_response / _make_streaming_http_request methods that issue the RIE invoke with stream=True and hand the caller the raw requests.Response. A matching release_streaming_slot keeps the per-container concurrency semaphore correctly balanced across the lifetime of the stream. * lambdafn/runtime.py: new LambdaRuntime.invoke_streaming that builds / runs the container the same way invoke() does but returns a tuple of (response, cleanup). The cleanup callable releases the slot and invokes _on_invoke_done so warm containers stay warm while cold containers are stopped after the stream is fully consumed. * commands/local/lib/local_lambda.py: thin wrapper plumbing invoke_streaming through LocalLambdaRunner. * apigw/streaming_response.py (new): turns the streaming RIE response into a flask.Response. When the upstream Content-Type is application/vnd.awslambda.http-integration-response, the JSON prelude is parsed and applied to the outgoing status/headers/cookies before the body bytes (after the 8-NUL delimiter) are forwarded chunk by chunk. Chunks are pulled with raw.read1() for minimal latency, and call_on_close hooks the cleanup callable to the Flask response lifecycle. * apigw/local_apigw_service.py: _request_handler now invokes through the streaming-aware helper. When the runtime advertises streaming (via Lambda-Runtime-Function-Response-Mode), the streamed response is forwarded straight to the browser with CORS headers merged in. Otherwise the previously-buffered body is parsed and rendered through the existing v1/v2 payload-format helpers. * local/rapid/aws-lambda-rie-*: refreshed RIE binaries built from the streaming-aware fork that lives in aws-lambda-runtime-interface-emulator (case-insensitive response mode comparison + streamingCopy pass-through). Validated end-to-end with a Node.js Lambda using awslambda.streamifyResponse: an EventSource client in a browser receives one SSE frame per second over a 10-second stream rather than seeing all 10 frames arrive together at the end. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 9c2aecc commit d3f3973

7 files changed

Lines changed: 512 additions & 2 deletions

File tree

samcli/commands/local/lib/local_lambda.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,46 @@ def invoke(
271271

272272
return headers
273273

274+
def invoke_streaming(
275+
self,
276+
function_identifier: str,
277+
event: str,
278+
tenant_id: Optional[str] = None,
279+
stderr: Optional[StreamWriter] = None,
280+
override_runtime: Optional[str] = None,
281+
function: Optional[Function] = None,
282+
):
283+
"""
284+
Streaming counterpart of :meth:`invoke`.
285+
286+
Resolves the function, then asks the underlying LambdaRuntime to
287+
perform a streaming invocation. Returns a tuple
288+
``(response, cleanup)`` where ``response`` is the raw streaming
289+
:class:`requests.Response` from the local Runtime Interface
290+
Emulator (RIE) and ``cleanup`` is a callable the caller MUST run
291+
when it has finished consuming the body.
292+
293+
This is used by the API Gateway local service to forward
294+
Lambda response-streaming output (e.g. Server-Sent Events
295+
produced by ``awslambda.streamifyResponse`` in Node.js) to the
296+
browser without buffering.
297+
"""
298+
if not function:
299+
function = self.get_function(function_identifier, tenant_id)
300+
config = self.get_invoke_config(function, override_runtime)
301+
302+
return self.local_runtime.invoke_streaming(
303+
config,
304+
event,
305+
tenant_id,
306+
debug_context=self.debug_context,
307+
stderr=stderr,
308+
container_host=self.container_host,
309+
container_host_interface=self.container_host_interface,
310+
extra_hosts=self.extra_hosts,
311+
container_dns=self.container_dns,
312+
)
313+
274314
def is_debugging(self) -> bool:
275315
"""
276316
Are we debugging the invoke?

samcli/local/apigw/local_apigw_service.py

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@
3232
from samcli.local.apigw.path_converter import PathConverter
3333
from samcli.local.apigw.route import Route
3434
from samcli.local.apigw.service_error_responses import ServiceErrorResponses
35+
from samcli.local.apigw.streaming_response import (
36+
build_streaming_flask_response,
37+
is_streaming_response,
38+
)
3539
from samcli.local.docker.exceptions import DockerContainerCreationFailedException
3640
from samcli.local.events.api_event import (
3741
ContextHTTP,
@@ -630,6 +634,55 @@ def _invoke_lambda_function(
630634

631635
return lambda_response
632636

637+
def _invoke_lambda_function_streaming(self, lambda_function_name: str, event: dict, tenant_id: Optional[str] = None):
638+
"""
639+
Streaming counterpart of :meth:`_invoke_lambda_function`.
640+
641+
Issues the invoke against the local RIE with ``stream=True`` so the
642+
response body is exposed to us as it arrives. Returns one of two
643+
shapes:
644+
645+
* ``("streaming", response, cleanup)`` when the RIE advertises that
646+
the function used response streaming. Caller should hand the
647+
response to :func:`build_streaming_flask_response`.
648+
* ``("buffered", body_str, cleanup)`` for regular buffered Lambda
649+
responses. The caller should run them through the normal
650+
API Gateway payload parser. ``cleanup`` MUST be called in
651+
either case once processing is done.
652+
"""
653+
event_str = json.dumps(event, sort_keys=True)
654+
response, cleanup = self.lambda_runner.invoke_streaming(
655+
lambda_function_name, event_str, tenant_id=tenant_id, stderr=self.stderr
656+
)
657+
658+
if is_streaming_response(response):
659+
LOG.debug("Lambda function returned a streaming response, forwarding chunks as they arrive")
660+
return "streaming", response, cleanup
661+
662+
# Buffered response: drain the body now so we can release the
663+
# container as soon as possible, then route through the existing
664+
# JSON parsing pipeline.
665+
try:
666+
body_bytes = response.content
667+
finally:
668+
cleanup()
669+
670+
# Preserve the historical behaviour of get_lambda_output() which
671+
# round-trips through json.dumps to normalize ordering / encoding.
672+
body_text: Union[str, bytes]
673+
try:
674+
body_text = json.dumps(json.loads(body_bytes), ensure_ascii=False)
675+
except (ValueError, json.JSONDecodeError):
676+
try:
677+
body_text = body_bytes.decode("utf-8")
678+
except UnicodeDecodeError:
679+
body_text = body_bytes # type: ignore[assignment]
680+
681+
if isinstance(body_text, str) and LambdaOutputParser.is_lambda_error_response(body_text):
682+
raise LambdaResponseParseException
683+
684+
return "buffered", body_text, cleanup
685+
633686
def _request_handler(self, **kwargs):
634687
"""
635688
We handle all requests to the host:port. The general flow of handling a request is as follows
@@ -735,12 +788,20 @@ def _request_handler(self, **kwargs):
735788
return auth_service_error
736789

737790
endpoint_service_error = None
791+
invoke_kind: Optional[str] = None
792+
invoke_payload = None
738793
try:
739794
# Extract tenant-id from HTTP request header
740795
tenant_id = request.headers.get("X-Amz-Tenant-Id")
741796

742-
# invoke the route's Lambda function
743-
lambda_response = self._invoke_lambda_function(route.function_name, route_lambda_event, tenant_id)
797+
# Always invoke via the streaming-aware path so Lambda response
798+
# streaming (e.g. SSE) works out of the box. The helper returns
799+
# either a streaming response (which we forward chunk by chunk
800+
# to the browser) or a buffered body that is then parsed by
801+
# the existing API Gateway payload format helpers.
802+
invoke_kind, invoke_payload, invoke_cleanup = self._invoke_lambda_function_streaming(
803+
route.function_name, route_lambda_event, tenant_id
804+
)
744805
except TenantIdValidationError as e:
745806
endpoint_service_error = ServiceErrorResponses.tenant_id_validation_error(str(e))
746807
except FunctionNotFound:
@@ -761,6 +822,19 @@ def _request_handler(self, **kwargs):
761822
if endpoint_service_error:
762823
return endpoint_service_error
763824

825+
# If the Lambda function streamed its response, forward it verbatim
826+
# to the client. The cleanup callback is wired into the Flask
827+
# response's lifecycle so the container slot is released after the
828+
# browser finishes reading or disconnects.
829+
if invoke_kind == "streaming":
830+
return build_streaming_flask_response(
831+
invoke_payload,
832+
on_complete=invoke_cleanup,
833+
extra_headers=cors_headers,
834+
)
835+
836+
lambda_response = invoke_payload
837+
764838
try:
765839
if route.event_type == Route.HTTP and (
766840
not route.payload_format_version or route.payload_format_version == "2.0"
Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
"""
2+
Helpers for forwarding Lambda response streaming to the local API Gateway
3+
client.
4+
5+
When a Lambda function uses response streaming (e.g. via the Node.js
6+
``awslambda.streamifyResponse``) the runtime POSTs its response body to the
7+
Runtime API with the ``Lambda-Runtime-Function-Response-Mode: streaming``
8+
header. The streaming-enabled fork of the Runtime Interface Emulator
9+
forwards this header to the invoke caller (SAM CLI) and emits the body
10+
chunks straight through. This module turns that wire format into a
11+
:class:`flask.Response` that streams chunks to the browser as they arrive,
12+
parsing the optional Lambda HTTP-integration prelude (used by Function
13+
URLs and API Gateway HTTP API) when present.
14+
"""
15+
16+
from __future__ import annotations
17+
18+
import json
19+
import logging
20+
from typing import Callable, Dict, Iterable, Iterator, Optional, Tuple
21+
22+
from flask import Response
23+
24+
LOG = logging.getLogger(__name__)
25+
26+
# HTTP header set by the runtime / RIE on streaming responses.
27+
RESPONSE_MODE_HEADER = "Lambda-Runtime-Function-Response-Mode"
28+
RESPONSE_MODE_STREAMING = "streaming"
29+
30+
# Content type used by Lambda Function URLs / API Gateway HTTP API for
31+
# streamed responses. The body starts with a JSON prelude describing the
32+
# desired HTTP response status, headers and cookies, followed by 8 NUL
33+
# bytes, followed by the raw body bytes.
34+
HTTP_INTEGRATION_CONTENT_TYPE = "application/vnd.awslambda.http-integration-response"
35+
_PRELUDE_DELIMITER = b"\x00" * 8
36+
# Cap the size of the JSON prelude we are willing to scan for. AWS's
37+
# documented limit is much smaller than this, but we keep some headroom
38+
# for unusual header sets.
39+
_PRELUDE_MAX_BYTES = 64 * 1024
40+
41+
# How many bytes we ask urllib3 for on each read. Smaller values give
42+
# slightly lower latency for tiny SSE frames at the cost of more syscalls;
43+
# 4 KiB is a reasonable compromise that still lets a single TCP segment
44+
# carrying multiple SSE events be delivered in one yield.
45+
_STREAM_READ_SIZE = 4096
46+
47+
48+
def is_streaming_response(resp) -> bool:
49+
"""Return ``True`` if ``resp`` (a ``requests.Response``) carries the
50+
streaming response-mode header."""
51+
return resp.headers.get(RESPONSE_MODE_HEADER, "").lower() == RESPONSE_MODE_STREAMING
52+
53+
54+
def _read_streaming_chunks(resp) -> Iterator[bytes]:
55+
"""Yield raw bytes from a streaming response with minimal buffering.
56+
57+
We bypass ``iter_content`` and pull from ``raw.read1`` (when
58+
available) because we want each TCP segment to surface as soon as the
59+
network delivers it; ``iter_content`` would otherwise wait until
60+
``chunk_size`` bytes have accumulated.
61+
"""
62+
raw = getattr(resp, "raw", None)
63+
read1 = getattr(raw, "read1", None) if raw is not None else None
64+
65+
if callable(read1):
66+
while True:
67+
try:
68+
chunk = read1(_STREAM_READ_SIZE)
69+
except Exception: # pragma: no cover - network noise
70+
LOG.debug("Streaming read failed", exc_info=True)
71+
break
72+
if not chunk:
73+
break
74+
yield chunk
75+
else:
76+
for chunk in resp.iter_content(chunk_size=_STREAM_READ_SIZE):
77+
if chunk:
78+
yield chunk
79+
80+
81+
def _peek_prelude(byte_iter: Iterator[bytes]) -> Tuple[Optional[bytes], Iterator[bytes]]:
82+
"""
83+
Pull bytes from ``byte_iter`` until we see the 8-NUL prelude
84+
delimiter or hit ``_PRELUDE_MAX_BYTES`` without finding it.
85+
86+
Returns a tuple ``(prelude, rest_iter)`` where:
87+
88+
* ``prelude`` is the JSON-encoded prelude bytes (excluding the
89+
delimiter), or ``None`` if no delimiter was found within the cap
90+
(in which case the bytes already consumed are re-emitted at the
91+
start of ``rest_iter``).
92+
* ``rest_iter`` is an iterator that yields any leftover bytes
93+
followed by the rest of the original iterator.
94+
"""
95+
buffered = bytearray()
96+
for chunk in byte_iter:
97+
buffered.extend(chunk)
98+
idx = bytes(buffered).find(_PRELUDE_DELIMITER)
99+
if idx != -1:
100+
prelude = bytes(buffered[:idx])
101+
tail = bytes(buffered[idx + len(_PRELUDE_DELIMITER):])
102+
103+
def _rest_with_tail():
104+
if tail:
105+
yield tail
106+
yield from byte_iter
107+
108+
return prelude, _rest_with_tail()
109+
if len(buffered) >= _PRELUDE_MAX_BYTES:
110+
break
111+
112+
leftover = bytes(buffered)
113+
114+
def _rest_with_leftover():
115+
if leftover:
116+
yield leftover
117+
yield from byte_iter
118+
119+
return None, _rest_with_leftover()
120+
121+
122+
def _parse_prelude(prelude_bytes: bytes) -> Tuple[int, Dict[str, str], list]:
123+
"""Parse the JSON prelude. Falls back to defaults for missing fields."""
124+
try:
125+
decoded = json.loads(prelude_bytes.decode("utf-8"))
126+
except (UnicodeDecodeError, json.JSONDecodeError) as exc:
127+
LOG.warning("Failed to parse Lambda HTTP-integration prelude: %s", exc)
128+
return 200, {}, []
129+
130+
if not isinstance(decoded, dict):
131+
return 200, {}, []
132+
133+
status_code = int(decoded.get("statusCode", 200))
134+
headers = decoded.get("headers") or {}
135+
if not isinstance(headers, dict):
136+
headers = {}
137+
cookies = decoded.get("cookies") or []
138+
if not isinstance(cookies, list):
139+
cookies = []
140+
# Normalize header values to strings (Lambda allows numbers too).
141+
headers = {str(k): str(v) for k, v in headers.items()}
142+
return status_code, headers, cookies
143+
144+
145+
def build_streaming_flask_response(
146+
resp,
147+
on_complete: Callable[[], None],
148+
extra_headers: Optional[Dict[str, str]] = None,
149+
) -> Response:
150+
"""
151+
Build a Flask :class:`~flask.Response` that forwards a streaming
152+
Lambda response body to the HTTP client as it arrives.
153+
154+
Parameters
155+
----------
156+
resp : requests.Response
157+
The streaming response returned by the local RIE invoke. Must
158+
have been opened with ``stream=True``.
159+
on_complete : Callable
160+
Called exactly once after the response generator is fully
161+
consumed (or aborted). Used to release container resources.
162+
extra_headers : Optional[Dict[str, str]]
163+
Additional headers (e.g. CORS) to add to the outgoing response.
164+
165+
Returns
166+
-------
167+
flask.Response
168+
A response whose body is a generator producing the Lambda
169+
function's streamed bytes.
170+
"""
171+
extra_headers = extra_headers or {}
172+
upstream_content_type = resp.headers.get("Content-Type", "")
173+
is_http_integration = HTTP_INTEGRATION_CONTENT_TYPE in upstream_content_type
174+
175+
chunk_iter = _read_streaming_chunks(resp)
176+
177+
status_code = 200
178+
response_headers: Dict[str, str] = {}
179+
cookies: list = []
180+
body_iter: Iterable[bytes] = chunk_iter
181+
182+
if is_http_integration:
183+
prelude, rest_iter = _peek_prelude(chunk_iter)
184+
if prelude is not None:
185+
status_code, response_headers, cookies = _parse_prelude(prelude)
186+
else:
187+
LOG.warning(
188+
"Streaming response declared http-integration content type but did not "
189+
"contain the 8-NUL prelude delimiter within %d bytes; passing body through verbatim",
190+
_PRELUDE_MAX_BYTES,
191+
)
192+
body_iter = rest_iter
193+
else:
194+
# Non-integration streaming: pass content-type and other useful
195+
# headers through unchanged.
196+
if upstream_content_type:
197+
response_headers["Content-Type"] = upstream_content_type
198+
199+
# Make sure intermediaries do not buffer the response. Without this
200+
# some reverse proxies hold the bytes back until the connection
201+
# closes.
202+
response_headers.setdefault("Cache-Control", "no-cache")
203+
response_headers.setdefault("X-Accel-Buffering", "no")
204+
response_headers.update(extra_headers)
205+
206+
completion_called = {"done": False}
207+
208+
def _safe_complete() -> None:
209+
if completion_called["done"]:
210+
return
211+
completion_called["done"] = True
212+
try:
213+
on_complete()
214+
except Exception: # pragma: no cover - best effort
215+
LOG.debug("on_complete callback raised", exc_info=True)
216+
217+
def _wrapped_body() -> Iterator[bytes]:
218+
try:
219+
for chunk in body_iter:
220+
if chunk:
221+
yield chunk
222+
finally:
223+
_safe_complete()
224+
225+
flask_response = Response(_wrapped_body(), status=status_code)
226+
for key, value in response_headers.items():
227+
flask_response.headers[key] = value
228+
for cookie in cookies:
229+
if isinstance(cookie, str):
230+
flask_response.headers.add("Set-Cookie", cookie)
231+
232+
# Hook werkzeug's "response close" lifecycle in case the consumer
233+
# disconnects before the body generator finishes naturally.
234+
flask_response.call_on_close(_safe_complete)
235+
return flask_response

0 commit comments

Comments
 (0)