Skip to content

Commit fde9640

Browse files
joe4devclaude
andcommitted
refactor(init): derive init reporting from rapid's lifecycle events; drop rapidcore additions
Simplify the init error-reporting and init-timeout implementation with no behavior change (TestLambdaErrors 12/12, timeout/PC/log-envelope tests, and full test_lambda.py green against the rebuilt RIE): * Delete internal/lambda/rapidcore/server_localstack.go: the timeout-bounded await is now a goroutine running the exported AwaitInitialized() plus a select in main.go. On timeout, receiving the goroutine's result after the reset doubles as the init-failure drain and orders its cleanup before the ready signal, so it can never cancel the first invoke's fresh reservation. No LocalStack-owned code remains under internal/. * Render all INIT_REPORT flavors (cold-start failure, timeout, suppressed init) in LocalStackEventsAPI from rapid's native InitStart/InitRuntimeDone/ InitReport events instead of three hand-rolled call sites. The suppressed init's Phase: invoke line now reports that init's measured duration after its own logs, matching AWS more closely. * Record the scrubbed fatal error type natively from SendInitRuntimeDone (which fires even for crashes that never call /init/error), replacing the initErrorType mirroring; the cold-start Init Duration becomes take-once state, replacing warmStart/initTimedOut; wall-clock fallbacks are dropped because rapid's InitReport event always fires. * Collapse /status/error reporting into a single post site (ReportInitFailure: forward the stashed /init/error payload, synthesize otherwise), removing the initErrorForwarded duplicate-send guard. * Add unit tests for the events API rendering matrix. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 parent 64c9644 commit fde9640

6 files changed

Lines changed: 451 additions & 393 deletions

File tree

README-LOCALSTACK.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,3 @@ Document all custom changes with the following comment prefix `# LOCALSTACK CHAN
4747
* `Makefile` for debugging and building with Docker
4848
* `internal/lsapi` LocalStack-only package with the request/response types of the LocalStack <-> RIE HTTP API
4949
* 2023-10-17: `lambda/rapidcore/server.go` pass request metadata into .Reserve(invoke.ID, invoke.TraceID, invoke.LambdaSegmentID)
50-
* 2026-06-11: `lambda/rapidcore/server_localstack.go` new LocalStack-only file with additions to the rapidcore Server (timeout-aware init await, init-failure drain, structured init-failure interpretation)

cmd/localstack/custom_interop.go

Lines changed: 77 additions & 219 deletions
Original file line numberDiff line numberDiff line change
@@ -30,45 +30,14 @@ type CustomInteropServer struct {
3030
localStackAdapter *LocalStackAdapter
3131
port string
3232
upstreamEndpoint string
33-
// logCollector accumulates the runtime's stdout/stderr plus the synthetic START/REPORT/
34-
// INIT_REPORT lines that are flushed to LocalStack with each invocation's logs.
35-
logCollector *LogCollector
36-
// eventsAPI provides rapid's authoritative Init-phase duration (see events.go), used for
37-
// the REPORT/INIT_REPORT log lines instead of wall-clock measurements at invoke arrival.
33+
// eventsAPI renders the synthetic START/INIT_REPORT log lines from rapid's lifecycle
34+
// events and records the init outcome (error type, cold-start duration) — see events.go.
3835
eventsAPI *LocalStackEventsAPI
39-
// initStart is set once in Init() and warmStart is flipped on the first invoke.
40-
// Both are accessed only from the single sequential init -> invoke flow (the RIE
41-
// processes one invocation at a time), so they need no additional synchronization.
42-
initStart time.Time
43-
warmStart bool
44-
// initTimedOut is set by ReportInitTimeout when the init phase exceeds its timeout. It is
45-
// written from the init-await flow and read from the invoke flow, so it uses atomic access.
46-
// When set, the first invocation's REPORT omits Init Duration (init was already reported as
47-
// timed out and is re-run as a suppressed init during that invocation).
48-
initTimedOut atomic.Bool
49-
// initErrorForwarded is set once the runtime's own /init/error has been forwarded to
50-
// LocalStack via SendInitErrorResponse, so the crash-path fallback (SendInitError) does
51-
// not send a duplicate error status for the same failed initialization. Unlike
52-
// initErrorType below it is never cleared: it only guards the one-shot init-phase report.
53-
initErrorForwarded atomic.Bool
54-
// initErrorType holds rapidcore's scrubbed fatal error type (e.g. Runtime.Unknown) when init
55-
// failed, used to render the INIT_REPORT(phase=invoke) and REPORT Status/Error Type lines for
56-
// the on-demand folded-into-invoke path. Stores a string; empty/unset means init did not fail.
57-
// It persists while invocations keep failing (each one re-runs the init as a suppressed init
58-
// and AWS re-emits the failure envelope), and is cleared by the invoke handler once an
59-
// invocation succeeds so a recovered environment is not tainted by the original failure.
60-
initErrorType atomic.Value
61-
// onDemand is true for on-demand functions, where AWS folds a failed cold-start init into
62-
// the first invocation (suppressed init). For these we do NOT report init failures via
63-
// /status/error; instead we signal ready and let the first invoke surface the error with
64-
// the full INIT_REPORT/START/END/REPORT envelope. Provisioned concurrency and Managed
65-
// Instances keep the provisioning-time /status/error model. SnapStart environments are
66-
// also classified on-demand here (LocalStack sets AWS_LAMBDA_INITIALIZATION_TYPE=on-demand
67-
// for them and initializes them lazily at the first invoke, not at version publish), so the
68-
// fold-into-invoke model applies to them too.
69-
// TODO: set AWS_LAMBDA_INITIALIZATION_TYPE=snap-start on the LocalStack side for env-var
70-
// parity with AWS once SnapStart environments get their own initialization type.
71-
onDemand bool
36+
// initErrorPayload stashes the structured error payload the runtime reported via
37+
// /init/error ([]byte), so ReportInitFailure can forward the runtime's own error to
38+
// LocalStack instead of a synthesized one. Written from the runtime API handler flow and
39+
// read from the main flow after init failed, hence atomic.
40+
initErrorPayload atomic.Value
7241
}
7342

7443
type LocalStackAdapter struct {
@@ -136,9 +105,7 @@ func NewCustomInteropServer(lsOpts *LsOpts, adapter *LocalStackAdapter, delegate
136105
port: lsOpts.InteropPort,
137106
upstreamEndpoint: lsOpts.RuntimeEndpoint,
138107
localStackAdapter: adapter,
139-
logCollector: logCollector,
140108
eventsAPI: eventsAPI,
141-
onDemand: GetenvWithDefault("AWS_LAMBDA_INITIALIZATION_TYPE", "on-demand") == "on-demand",
142109
}
143110

144111
// TODO: extract this
@@ -158,33 +125,16 @@ func NewCustomInteropServer(lsOpts *LsOpts, adapter *LocalStackAdapter, delegate
158125
}
159126

160127
invokeResp := &standalone.ResponseWriterProxy{}
161-
// The synthetic START line is emitted via LocalStackEventsAPI.SendInvokeStart so it
162-
// lands after any inline (suppressed) init, matching AWS — see events.go.
163-
164-
initErrType, _ := server.initErrorType.Load().(string)
128+
// The synthetic START and INIT_REPORT lines are emitted via LocalStackEventsAPI
129+
// from rapid's lifecycle events, so they land at the AWS-faithful points (e.g.
130+
// after an inline suppressed init's own logs) — see events.go.
165131

166132
// First invocation into a successfully initialized on-demand environment: REPORT
167-
// carries the Init phase duration as measured by rapid (init start -> init end).
168-
// Provisioned concurrency / Managed Instances initialize at provisioning time and
169-
// AWS omits Init Duration from their invokes' REPORT lines.
133+
// carries the Init phase duration as measured by rapid (take-once; empty on warm
134+
// starts, failed/timed-out inits, and non-on-demand environments).
170135
initDuration := ""
171-
if server.onDemand && !server.warmStart && !server.initTimedOut.Load() && initErrType == "" {
172-
if initTimeMS, ok := server.eventsAPI.InitDurationMS(); ok {
173-
initDuration = fmt.Sprintf("Init Duration: %.2f ms\t", initTimeMS)
174-
}
175-
}
176-
server.warmStart = true
177-
178-
// On-demand init failure folded into this invocation (AWS suppressed init): emit
179-
// the INIT_REPORT(phase=invoke) line before START (emitted during Invoke below),
180-
// reporting the failed init's duration (rapid's measurement when available; the
181-
// wall-clock fallback covers inits that died before emitting INIT_REPORT).
182-
if initErrType != "" {
183-
initTimeMS, ok := server.eventsAPI.InitDurationMS()
184-
if !ok {
185-
initTimeMS = millisSince(server.initStart)
186-
}
187-
fprintInitReport(logCollector, initTimeMS, "invoke", "error", initErrType)
136+
if initTimeMS, ok := server.eventsAPI.TakeColdStartInitDuration(); ok {
137+
initDuration = fmt.Sprintf("Init Duration: %.2f ms\t", initTimeMS)
188138
}
189139

190140
invokeStart := time.Now()
@@ -209,7 +159,12 @@ func NewCustomInteropServer(lsOpts *LsOpts, adapter *LocalStackAdapter, delegate
209159
timeout := int(server.delegate.GetInvokeTimeout().Seconds())
210160
isErr := false
211161
status := ""
212-
if err != nil {
162+
if err == nil {
163+
// The invocation succeeded: if an earlier init failure was folded into it
164+
// and the suppressed init re-run recovered, the result stands on its own —
165+
// AWS reports it as successful — so clear the recorded init failure.
166+
server.eventsAPI.ClearInitError()
167+
} else {
213168
switch {
214169
case errors.Is(err, rapidcore.ErrInvokeTimeout):
215170
log.Debugf("Got invoke timeout")
@@ -232,25 +187,18 @@ func NewCustomInteropServer(lsOpts *LsOpts, adapter *LocalStackAdapter, delegate
232187
log.Fatalln("unable to write to response")
233188
}
234189
case errors.Is(err, rapidcore.ErrInvokeDoneFailed):
235-
// we can actually just continue here, error message is sent below
190+
// The error response body was already written by rapid and is sent below.
191+
// When an init failure was folded into this invocation (AWS suppressed
192+
// init), the REPORT additionally carries the failure status and the
193+
// scrubbed fatal error type (e.g. Runtime.Unknown).
194+
if errType := server.eventsAPI.InitErrorType(); errType != "" {
195+
isErr = true
196+
status = "Status: error\tError Type: " + errType
197+
}
236198
default:
237199
log.Fatalln(err)
238200
}
239201
}
240-
// On-demand init failure folded into this invocation: when the suppressed init
241-
// re-run (and thus the invoke) failed again, the REPORT carries the failure status
242-
// and rapidcore's scrubbed fatal error type (e.g. Runtime.Unknown). When the
243-
// invocation succeeded (the suppressed re-init recovered from a transient init
244-
// failure), the result stands on its own — AWS reports it as successful — and the
245-
// cached init failure is cleared so later invocations are not tainted by it.
246-
if initErrType != "" {
247-
if err != nil {
248-
isErr = true
249-
status = "Status: error\tError Type: " + initErrType
250-
} else {
251-
server.initErrorType.Store("")
252-
}
253-
}
254202
// optional sleep. can be used for debugging purposes
255203
if lsOpts.PostInvokeWaitMS != "" {
256204
waitMS, err := strconv.Atoi(lsOpts.PostInvokeWaitMS)
@@ -294,93 +242,42 @@ func (c *CustomInteropServer) SendErrorResponse(invokeID string, resp *interop.E
294242
return c.delegate.SendErrorResponse(invokeID, resp)
295243
}
296244

297-
// SendInitErrorResponse forwards the init error reported by the runtime (via /init/error) to
298-
// LocalStack and then propagates it to the delegate. It marks initErrorForwarded so the
299-
// crash-path fallback in main.go (SendInitError) does not send a duplicate error status for
300-
// the same failed initialization.
301-
func (c *CustomInteropServer) SendInitErrorResponse(resp *interop.ErrorInvokeResponse) (err error) {
245+
// SendInitErrorResponse stashes the init error reported by the runtime (via /init/error) for
246+
// ReportInitFailure and propagates it to the delegate, which caches it so the first invoke
247+
// can surface it. The delegate's error is returned because the /runtime/init/error handler
248+
// renders an interop error to the runtime based on it (e.g. ErrResponseSent during a
249+
// suppressed init).
250+
func (c *CustomInteropServer) SendInitErrorResponse(resp *interop.ErrorInvokeResponse) error {
302251
log.Traceln("SendInitErrorResponse called")
303-
// Mark synchronously, before sending: this runs in the init flow before
304-
// AwaitInitializedWithDetails unblocks in main.go, so the fallback observes the flag.
305-
c.initErrorForwarded.Store(true)
306-
// Record rapidcore's scrubbed fatal error type so the folded-into-invoke path can render the
307-
// INIT_REPORT(phase=invoke) and REPORT Status/Error Type lines (on-demand).
308-
c.initErrorType.Store(string(resp.FunctionError.Type))
309-
310-
// Always cache the structured error in the delegate so the first invoke can surface it, and
311-
// return its error: the /runtime/init/error handler renders an interop error to the runtime
312-
// based on it (e.g. ErrResponseSent during a suppressed init).
313-
defer func() { err = c.delegate.SendInitErrorResponse(resp) }()
314-
315-
// On-demand folds the failed init into the first invocation, which carries the error and
316-
// logs; reporting it here via /status/error too would race the invoke and fail the env
317-
// startup before the invoke runs. PC/SnapStart/MI report at provisioning time below.
318-
if c.onDemand {
319-
return nil
320-
}
321-
322-
// Forward the runtime's structured payload as-is and only inject the requestId. Decoding
323-
// into a map rather than a typed struct preserves fields exactly as the runtime emitted
324-
// them — in particular an empty but present "stackTrace": [] (e.g. Runtime.HandlerNotFound),
325-
// which a typed struct with omitempty would drop on re-marshal.
326-
var payload map[string]any
327-
if err := json.Unmarshal(resp.Payload, &payload); err != nil {
328-
log.WithError(err).Warn("Failed to parse init error payload; forwarding raw payload")
329-
if err := c.localStackAdapter.SendStatus(Error, resp.Payload); err != nil {
330-
log.WithError(err).WithField("runtime-id", c.localStackAdapter.RuntimeId).
331-
Error("Failed to send init error to LocalStack")
332-
}
333-
return nil
334-
}
335-
336-
// No invocation is active during the init phase, so this is typically blank; AWS still
337-
// includes a (blank) requestId in the init error payload.
338-
payload["requestId"] = c.delegate.GetCurrentInvokeID()
339-
340-
body, err := json.Marshal(payload)
341-
if err != nil {
342-
log.WithError(err).Error("Failed to marshal adapted init error response")
343-
body = resp.Payload
344-
}
345-
346-
if err := c.localStackAdapter.SendStatus(Error, body); err != nil {
347-
log.WithError(err).WithField("runtime-id", c.localStackAdapter.RuntimeId).
348-
Error("Failed to send init error to LocalStack")
349-
}
350-
return nil
252+
c.initErrorPayload.Store(resp.Payload)
253+
return c.delegate.SendInitErrorResponse(resp)
351254
}
352255

353-
// SendInitError reports a structured init failure to LocalStack when the runtime failed to
354-
// initialize WITHOUT calling /init/error itself (e.g. it crashed, called sys.exit, or had an
355-
// invalid entrypoint). The init failure is detected by the existing rapidcore machinery
356-
// (watchEvents -> InitFailure -> AwaitInitializedWithDetails) and surfaced to main.go.
357-
// It is a no-op if SendInitErrorResponse already forwarded the runtime's own structured error.
358-
func (c *CustomInteropServer) SendInitError(errType fatalerror.ErrorType, errMsg error) {
359-
if c.initErrorForwarded.Load() {
360-
log.Debug("Init error already forwarded to LocalStack; skipping duplicate")
361-
return
362-
}
363-
364-
if errType == "" {
365-
errType = fatalerror.RuntimeExit
366-
}
367-
368-
message := "Runtime exited during initialization"
369-
if errMsg != nil {
370-
message = errMsg.Error()
371-
}
372-
373-
// Match AWS's fault message format "RequestId: <id> Error: <msg>". No invocation is active
374-
// during the init phase (LocalStack only dispatches an invoke after the runtime reports
375-
// ready), so the request id is blank — matching the /init/error path, which forwards AWS's
376-
// blank init-phase requestId (see SendInitErrorResponse).
377-
payload, err := json.Marshal(lsapi.ErrorResponse{
378-
ErrorType: string(errType),
379-
ErrorMessage: fmt.Sprintf("RequestId: %s Error: %s", c.delegate.GetCurrentInvokeID(), message),
380-
})
381-
if err != nil {
382-
log.WithError(err).Error("Failed to marshal init error response")
383-
return
256+
// ReportInitFailure reports a failed initialization to LocalStack via /status/error, failing
257+
// the environment's startup. It forwards the runtime's own /init/error payload when one was
258+
// reported, and synthesizes a structured error from the given type and message otherwise
259+
// (e.g. when the runtime crashed, called sys.exit, or had an invalid entrypoint).
260+
// Only main.go calls this, and only for environments that fail provisioning-time (extended
261+
// init: provisioned concurrency / Managed Instances); on-demand environments fold init
262+
// failures into the first invocation instead.
263+
func (c *CustomInteropServer) ReportInitFailure(errType fatalerror.ErrorType, message string) {
264+
payload, _ := c.initErrorPayload.Load().([]byte)
265+
if payload == nil {
266+
// Match AWS's fault message format "RequestId: <id> Error: <msg>". No invocation is
267+
// active during the init phase (LocalStack only dispatches invokes after the runtime
268+
// reports ready), so the request id is blank — matching the /init/error path below,
269+
// which forwards AWS's blank init-phase requestId.
270+
body, err := json.Marshal(lsapi.ErrorResponse{
271+
ErrorType: string(errType),
272+
ErrorMessage: fmt.Sprintf("RequestId: %s Error: %s", c.delegate.GetCurrentInvokeID(), message),
273+
})
274+
if err != nil {
275+
log.WithError(err).Error("Failed to marshal init error response")
276+
return
277+
}
278+
payload = body
279+
} else if adapted := adaptInitErrorPayload(payload, c.delegate.GetCurrentInvokeID()); adapted != nil {
280+
payload = adapted
384281
}
385282

386283
if err := c.localStackAdapter.SendStatus(Error, payload); err != nil {
@@ -389,19 +286,24 @@ func (c *CustomInteropServer) SendInitError(errType fatalerror.ErrorType, errMsg
389286
}
390287
}
391288

392-
// RecordInitError records the structured init failure detected by rapidcore for runtimes that
393-
// failed WITHOUT calling /init/error (crash, sys.exit, invalid entrypoint), so the on-demand
394-
// folded-into-invoke path renders the same INIT_REPORT(phase=invoke) and REPORT Status/Error
395-
// Type lines as the /init/error-reported flavor. It must not overwrite a type already recorded
396-
// by SendInitErrorResponse: the runtime-reported error is the authoritative one.
397-
func (c *CustomInteropServer) RecordInitError(errType fatalerror.ErrorType) {
398-
if recorded, _ := c.initErrorType.Load().(string); recorded != "" {
399-
return
289+
// adaptInitErrorPayload injects the requestId into the runtime's structured /init/error
290+
// payload, preserving all other fields exactly as the runtime emitted them — in particular an
291+
// empty but present "stackTrace": [] (e.g. Runtime.HandlerNotFound), which a typed struct with
292+
// omitempty would drop on re-marshal. AWS includes a (blank) requestId in init error payloads.
293+
// Returns nil if the payload cannot be adapted (it is then forwarded unmodified).
294+
func adaptInitErrorPayload(payload []byte, requestID string) []byte {
295+
var fields map[string]any
296+
if err := json.Unmarshal(payload, &fields); err != nil {
297+
log.WithError(err).Warn("Failed to parse init error payload; forwarding raw payload")
298+
return nil
400299
}
401-
if errType == "" {
402-
errType = fatalerror.RuntimeExit
300+
fields["requestId"] = requestID
301+
adapted, err := json.Marshal(fields)
302+
if err != nil {
303+
log.WithError(err).Error("Failed to marshal adapted init error payload")
304+
return nil
403305
}
404-
c.initErrorType.Store(string(errType))
306+
return adapted
405307
}
406308

407309
func (c *CustomInteropServer) GetCurrentInvokeID() string {
@@ -416,53 +318,9 @@ func (c *CustomInteropServer) SendRuntimeReady() error {
416318

417319
func (c *CustomInteropServer) Init(i *interop.Init, invokeTimeoutMs int64) error {
418320
log.Traceln("Init called")
419-
c.initStart = time.Now()
420321
return c.delegate.Init(i, invokeTimeoutMs)
421322
}
422323

423-
// ReportInitTimeout emits an AWS-style INIT_REPORT timeout line into the log collector and
424-
// marks the init as timed out. The init is then re-run as a suppressed init during the first
425-
// invocation (under the function timeout), and that invocation's REPORT omits Init Duration.
426-
func (c *CustomInteropServer) ReportInitTimeout() {
427-
c.initTimedOut.Store(true)
428-
fprintInitReport(c.logCollector, millisSince(c.initStart), "init", "timeout", "")
429-
}
430-
431-
// ReportInitPhaseError emits the AWS-style INIT_REPORT(phase=init, status=error) line for an
432-
// on-demand cold-start init that failed (e.g. a runtime crash or exit during module load).
433-
// AWS performs a suppressed double init: the failed cold-start init reports Phase: init here,
434-
// and the retried init folded into the first invocation reports Phase: invoke (see the invoke
435-
// handler). It is a no-op when no init error was recorded. The duration is rapid's measurement
436-
// of the Init phase when available, falling back to wall-clock for inits that died before
437-
// emitting their INIT_REPORT lifecycle event.
438-
func (c *CustomInteropServer) ReportInitPhaseError() {
439-
errType, _ := c.initErrorType.Load().(string)
440-
if errType == "" {
441-
return
442-
}
443-
initTimeMS, ok := c.eventsAPI.InitDurationMS()
444-
if !ok {
445-
initTimeMS = millisSince(c.initStart)
446-
}
447-
fprintInitReport(c.logCollector, initTimeMS, "init", "error", errType)
448-
}
449-
450-
// millisSince returns the wall-clock milliseconds elapsed since start.
451-
func millisSince(start time.Time) float64 {
452-
return float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond)
453-
}
454-
455-
// fprintInitReport emits an AWS-style INIT_REPORT log line, e.g.
456-
// "INIT_REPORT Init Duration: 9999.27 ms\tPhase: init\tStatus: timeout" or
457-
// "INIT_REPORT Init Duration: 0.91 ms\tPhase: invoke\tStatus: error\tError Type: Runtime.ExitError".
458-
func fprintInitReport(w io.Writer, durationMS float64, phase string, status string, errorType string) {
459-
_, _ = fmt.Fprintf(w, "INIT_REPORT Init Duration: %.2f ms\tPhase: %s\tStatus: %s", durationMS, phase, status)
460-
if errorType != "" {
461-
_, _ = fmt.Fprintf(w, "\tError Type: %s", errorType)
462-
}
463-
_, _ = fmt.Fprintln(w)
464-
}
465-
466324
func (c *CustomInteropServer) Invoke(responseWriter http.ResponseWriter, invoke *interop.Invoke) error {
467325
log.Traceln("Invoke called")
468326
return c.delegate.Invoke(responseWriter, invoke)

0 commit comments

Comments
 (0)