@@ -24,6 +24,7 @@ const EVENT_SOURCE_TYPE_KEY = `${EVENT_SOURCE_PREFIX}.eventType`
2424const NAMES = require ( '../metrics/names' )
2525
2626const EVENT_SOURCE_INFO = require ( './event-sources' )
27+ const HANDLER_STREAMING = Symbol . for ( 'aws.lambda.runtime.handler.streaming' )
2728
2829// A function with no references used to stub out closures
2930function cleanClosure ( ) { }
@@ -35,7 +36,7 @@ let transactionEnders = []
3536// the invocation transaction.
3637let uncaughtException = null
3738
38- // Tracking the first time patchLambdaHandler is called for one off functionality
39+ // Tracking the first time patchLambdaHandler is called for one- off functionality
3940let patchCalled = false
4041let coldStartRecorded = false
4142
@@ -105,6 +106,45 @@ class AwsLambda {
105106 } )
106107 }
107108
109+ /**
110+ * Response-streaming handlers are identified by symbol properties on the function.
111+ * We propagate any symbols if they're present, so that the handler keeps its signatue for any AWS features that rely on symbols
112+ * @param handler
113+ * @param nrHandler
114+ */
115+ propagateSymbols ( handler , nrHandler ) {
116+ for ( const symbol of Object . getOwnPropertySymbols ( handler ) ) {
117+ logger . trace ( `Setting symbol ${ symbol . toString ( ) } on handler` )
118+ nrHandler [ symbol ] = handler [ symbol ]
119+ }
120+ }
121+
122+ createSegment ( { event, context, transaction, recorder } ) {
123+ const shim = this . shim
124+ const functionName = context . functionName
125+ const group = NAMES . FUNCTION . PREFIX
126+ const transactionName = group + functionName
127+
128+ const activeSegment = shim . tracer . getSegment ( )
129+
130+ transaction . setPartialName ( transactionName )
131+ const txnEnder = endTransaction . bind ( null , transaction , transactionEnders . length )
132+
133+ transactionEnders . push ( txnEnder )
134+ const segment = shim . createSegment ( functionName , recorder , activeSegment )
135+ transaction . baseSegment = segment
136+ const awsAttributes = this . _getAwsAgentAttributes ( event , context )
137+ transaction . trace . attributes . addAttributes ( ATTR_DEST . TRANS_COMMON , awsAttributes )
138+
139+ shim . agent . setLambdaArn ( context . invokedFunctionArn )
140+
141+ shim . agent . setLambdaFunctionVersion ( context . functionVersion )
142+ segment . addSpanAttributes ( awsAttributes )
143+
144+ segment . start ( )
145+ return { segment, txnEnder }
146+ }
147+
108148 patchLambdaHandler ( handler ) {
109149 const awsLambda = this
110150 const shim = this . shim
@@ -114,6 +154,11 @@ class AwsLambda {
114154 return handler
115155 }
116156
157+ const isStreamHandler = handler [ HANDLER_STREAMING ] === 'response'
158+ if ( isStreamHandler ) {
159+ this . agent . recordSupportability ( 'Nodejs/Serverless/Lambda/ResponseStreaming' )
160+ }
161+
117162 if ( ! patchCalled ) {
118163 // Only wrap emit on process the first time patch is called.
119164 patchCalled = true
@@ -122,52 +167,87 @@ class AwsLambda {
122167 this . wrapFatal ( )
123168 }
124169
125- return shim . bindCreateTransaction ( wrappedHandler , new specs . TransactionSpec ( { type : shim . BG } ) )
170+ const wrapper = isStreamHandler ? wrappedStreamHandler : wrappedHandler
171+ const nrHandler = shim . bindCreateTransaction ( wrapper , new specs . TransactionSpec ( { type : shim . BG } ) )
172+ awsLambda . propagateSymbols ( handler , nrHandler )
173+
174+ return nrHandler
175+
176+ /**
177+ * Wraps a response streaming lambda handler.
178+ *
179+ * Creates and applies segment based on function name, assigns attributes to transaction trace,
180+ * listen when stream errors(log error), ends(end transaction)
181+ *
182+ * **Note**: AWS doesn't support response streaming with API gateway invoked lambdas.
183+ * This means we do not handle that as it would require intercepting the stream to parse
184+ * the response code and headers.
185+ */
186+ function wrappedStreamHandler ( ) {
187+ const transaction = shim . tracer . getTransaction ( )
188+ if ( ! transaction ) {
189+ logger . trace ( 'No active transaction, not wrapping streaming handler' )
190+ return handler . apply ( this , arguments )
191+ }
126192
127- function wrappedHandler ( ) {
128193 const args = shim . argsToArray . apply ( shim , arguments )
129-
130194 const event = args [ 0 ]
131- const context = args [ 1 ]
195+ const context = args [ 2 ]
196+ logger . trace ( 'In stream handler, lambda function name' , context ?. functionName )
197+ const { segment, txnEnder } = awsLambda . createSegment ( { context, event, transaction, recorder : recordBackground } )
198+ args [ 1 ] = awsLambda . wrapStreamAndCaptureError (
199+ transaction ,
200+ txnEnder ,
201+ args [ 1 ]
202+ )
132203
133- const functionName = context . functionName
134- const group = NAMES . FUNCTION . PREFIX
135- const transactionName = group + functionName
204+ let res
205+ try {
206+ res = shim . applySegment ( handler , segment , false , this , args )
207+ } catch ( err ) {
208+ uncaughtException = err
209+ txnEnder ( )
210+ throw err
211+ }
212+
213+ return res
214+ }
136215
216+ /**
217+ * Wraps a non response streaming lambda handler.
218+ *
219+ * Creates and applies segment based on function name, assigns attributes to transaction trace,
220+ * adds handlers if api gateway to wrap request/response
221+ * wraps the callback(if present), wraps the context `done`, `succeed`, `fail methods`, intercepts promise
222+ * and properly ends transaction
223+ */
224+ function wrappedHandler ( ) {
137225 const transaction = shim . tracer . getTransaction ( )
138226 if ( ! transaction ) {
227+ logger . trace ( 'No active transaction, not wrapping handler' )
139228 return handler . apply ( this , arguments )
140229 }
141- const activeSegment = shim . tracer . getSegment ( )
142-
143- transaction . setPartialName ( transactionName )
230+ const args = shim . argsToArray . apply ( shim , arguments )
144231
232+ const event = args [ 0 ]
233+ const context = args [ 1 ]
234+ logger . trace ( 'Lambda function name' , context ?. functionName )
145235 const isApiGatewayLambdaProxy = apiGateway . isLambdaProxyEvent ( event )
236+ logger . trace ( 'Is this Lambda event an API Gateway or ALB web proxy?' , isApiGatewayLambdaProxy )
237+ logger . trace ( 'Lambda event keys' , Object . keys ( event ) )
146238 const segmentRecorder = isApiGatewayLambdaProxy ? recordWeb : recordBackground
147- const segment = shim . createSegment ( functionName , segmentRecorder , activeSegment )
148- transaction . baseSegment = segment
239+ const { segment, txnEnder } = awsLambda . createSegment ( { context , event , transaction , recorder : segmentRecorder } )
240+
149241 // resultProcessor is used to execute additional logic based on the
150242 // payload supplied to the callback.
151243 let resultProcessor
152244
153- logger . trace ( 'Is this Lambda event an API Gateway or ALB web proxy?' , isApiGatewayLambdaProxy )
154- logger . trace ( 'Lambda event keys' , Object . keys ( event ) )
155-
156245 if ( isApiGatewayLambdaProxy ) {
157246 const webRequest = new apiGateway . LambdaProxyWebRequest ( event )
158247 setWebRequest ( shim , transaction , webRequest )
159248 resultProcessor = getApiGatewayLambdaProxyResultProcessor ( transaction )
160249 }
161-
162250 const cbIndex = args . length - 1
163-
164- // Add transaction ending closure to the list of functions to be called on
165- // beforeExit (i.e. in the case that context.{done,fail,succeed} or callback
166- // were not called).
167- const txnEnder = endTransaction . bind ( null , transaction , transactionEnders . length )
168-
169- transactionEnders . push ( txnEnder )
170-
171251 args [ cbIndex ] = awsLambda . wrapCallbackAndCaptureError (
172252 transaction ,
173253 txnEnder ,
@@ -186,16 +266,6 @@ class AwsLambda {
186266 }
187267 } )
188268
189- const awsAttributes = awsLambda . _getAwsAgentAttributes ( event , context )
190- transaction . trace . attributes . addAttributes ( ATTR_DEST . TRANS_COMMON , awsAttributes )
191-
192- shim . agent . setLambdaArn ( context . invokedFunctionArn )
193-
194- shim . agent . setLambdaFunctionVersion ( context . functionVersion )
195- segment . addSpanAttributes ( awsAttributes )
196-
197- segment . start ( )
198-
199269 let res
200270 try {
201271 res = shim . applySegment ( handler , segment , false , this , args )
@@ -251,6 +321,18 @@ class AwsLambda {
251321 }
252322 }
253323
324+ wrapStreamAndCaptureError ( transaction , txnEnder , stream ) {
325+ const shim = this . shim
326+ stream . on ( 'error' , ( error ) => {
327+ shim . agent . errors . add ( transaction , error )
328+ } )
329+
330+ stream . on ( 'close' , ( ) => {
331+ txnEnder ( )
332+ } )
333+ return stream
334+ }
335+
254336 _getAwsAgentAttributes ( event , context ) {
255337 const attributes = {
256338 'aws.lambda.arn' : context . invokedFunctionArn ,
@@ -372,7 +454,7 @@ function lowercaseObjectKeys(original) {
372454}
373455
374456function endTransaction ( transaction , enderIndex ) {
375- if ( transactionEnders [ enderIndex ] === cleanClosure ) {
457+ if ( transactionEnders . length === 0 || transactionEnders [ enderIndex ] === cleanClosure ) {
376458 // In the case where we have already been called, we return early. There may be a
377459 // case where this is called more than once, given the lambda is left in a dirty
378460 // state after thread suspension (e.g. timeouts)
@@ -411,7 +493,7 @@ function setWebResponse(transaction, response) {
411493
412494 // We are adding http.statusCode to base segment as
413495 // we found in testing async invoked lambdas, the
414- // active segement is not available at this point.
496+ // active segment is not available at this point.
415497 const segment = transaction . baseSegment
416498
417499 segment . addSpanAttribute ( 'http.statusCode' , responseCode )
0 commit comments