1212import org .apache .commons .lang3 .StringUtils ;
1313import org .apache .http .client .utils .URIBuilder ;
1414
15+ import javax .annotation .Nullable ;
1516import java .io .IOException ;
1617import java .io .InputStream ;
1718import java .io .OutputStream ;
@@ -63,7 +64,7 @@ public static JsonObject getResponseObject(String exception) {
6364 response .addProperty ("status" , "FAILURE" );
6465
6566 //logic for failure types here
66- JsonObject workflowException = new JsonParser (). parse (exception ).getAsJsonObject ();
67+ JsonObject workflowException = JsonParser . parseString (exception ).getAsJsonObject ();
6768
6869 String error = workflowException .get ("Error" ).getAsString ();
6970 AdapterLogger .LogWarning (CNMResponse .class .getName () + " error:" + error );
@@ -232,6 +233,50 @@ public String buildGeneralError(JsonObject input, String cause) {
232233 return new Gson ().toJson (failureJson );
233234 }
234235
236+ /**
237+ * Gets the value for the 'dataProcessingType' field, to be used
238+ * by the 'buildMessageAttributesHash' method.
239+ * <br><br>
240+ * Tries to use the 'OriginalCNM > Product > dataProcessingType' from the
241+ * 'input > config' section, of the provided Json, but in case of an error,
242+ * return a generic error string.
243+ *<br><br>
244+ * @param input the input > config section, as a JsonObject
245+ * @return the string to use as the 'dataProcessingType' value; null if not available
246+ */
247+ public String getDataProcessingType (JsonObject input ) {
248+ String dataProcessingType = null ;
249+ // Implement a recursive json search function (find "dataProcessingType" else)
250+ try {
251+ if (input .has ("OriginalCNM" ) &&
252+ input .getAsJsonObject ("OriginalCNM" ).has ("product" ) &&
253+ input .getAsJsonObject ("OriginalCNM" ).getAsJsonObject ("product" ).has ("dataProcessingType" )) {
254+ dataProcessingType = input .getAsJsonObject ("OriginalCNM" ).getAsJsonObject ("product" ).get ("dataProcessingType" ).getAsString ();
255+ }
256+ return dataProcessingType ;
257+ } catch (Exception e ) {
258+ AdapterLogger .LogError (this .className + " handleRequest error:\n " + e .getMessage ());
259+ AdapterLogger .LogInfo ("input content:\n " + input );
260+ return null ;
261+ }
262+ }
263+
264+ public String getTrace (JsonObject input ) {
265+ String trace = null ;
266+ // Implement a recursive json search function (find "trace" else)
267+ try {
268+ if (input .has ("OriginalCNM" ) &&
269+ input .getAsJsonObject ("OriginalCNM" ).has ("trace" )) {
270+ trace = input .getAsJsonObject ("OriginalCNM" ).get ("trace" ).getAsString ();
271+ }
272+ return trace ;
273+ } catch (Exception e ) {
274+ AdapterLogger .LogError (this .className + " handleRequest error:\n " + e .getMessage ());
275+ AdapterLogger .LogInfo ("input content:\n " + input );
276+ return null ;
277+ }
278+ }
279+
235280 /**
236281 * Gets the value for the 'DataVersion' field, to be used
237282 * by the 'buildMessageAttributesHash' method.
@@ -304,21 +349,23 @@ public String buildMessage(JsonObject inputKey, JsonObject inputConfig) throws E
304349 * using the 'response > status' field from the 'output'
305350 * along with the provided values for 'collection' and 'dataversion'
306351 * <br><br>
307- * @param output the final output json message, as String
308- * @param method the method to use, 'Kinesis' or 'Sns' when sending
309- * @param region the region to send the message to
310- * @param endpoint the actual endpoint for the message
311- * @param collection the collection value for the attribute hash
312- * @param dataVersion the dataVersion value for the attribute hash
352+ * @param output the final output json message, as String
353+ * @param method the method to use, 'Kinesis' or 'Sns' when sending
354+ * @param region the region to send the message to
355+ * @param endpoint the actual endpoint for the message
356+ * @param collection the collection value for the attribute hash
357+ * @param dataVersion the dataVersion value for the attribute hash
358+ * @oaram dataProcessingType the dataProcessingType value for the attribute hash (can be null)
313359 */
314360 public void sendMessage (String output , String method , String region ,
315361 JsonElement endpoint , String collection ,
316- String dataVersion ) {
362+ String dataVersion , @ Nullable String dataProcessingType , @ Nullable String trace ) {
317363 // convert the final output to a JsonObject, so we can get 'response status'
318364 JsonObject outputJsonObj = new JsonParser ().parse (output ).getAsJsonObject ();
319365 String final_status = outputJsonObj .getAsJsonObject ("response" ).get ("status" ).getAsString ();
320366 if (method != null ) {
321- Map <String , MessageAttribute > attributeBOMap = buildMessageAttributesHash (collection , dataVersion , final_status );
367+ Map <String , MessageAttribute > attributeBOMap =
368+ buildMessageAttributesHash (collection , dataVersion , final_status , dataProcessingType , trace );
322369 Sender sender = SenderFactory .getSender (region , method );
323370 sender .addMessageAttributes (attributeBOMap );
324371 if (endpoint .isJsonArray ()) {
@@ -346,10 +393,12 @@ public String PerformFunction(String input, Context context) throws Exception {
346393 // get collection and dataVersion for use in message attribute hash
347394 String collection = getCollection (inputKey );
348395 String dataVersion = getDataVersion (inputConfig );
396+ String dataProcessingType = getDataProcessingType (inputConfig );
397+ String trace = getTrace (inputConfig );
349398 String output ;
350399 try {
351400 output = buildMessage (inputKey , inputConfig );
352- sendMessage (output , method , region , responseEndpoint , collection , dataVersion );
401+ sendMessage (output , method , region , responseEndpoint , collection , dataVersion , dataProcessingType , trace );
353402 /* create new object:
354403 *
355404 * {cnm: output, input:input}
@@ -361,27 +410,51 @@ public String PerformFunction(String input, Context context) throws Exception {
361410 return new Gson ().toJson (bigOutput );
362411 } catch (Exception ex ) {
363412 AdapterLogger .LogError (this .className + " encountered exception with input String: " + input );
413+ AdapterLogger .LogError (this .className + " handleRequest error:" + ex .getMessage ());
364414 output = buildGeneralError (inputKey , ex .getMessage ());
365- sendMessage (output , method , region , responseEndpoint , collection , dataVersion );
415+ sendMessage (output , method , region , responseEndpoint , collection , dataVersion , dataProcessingType , trace );
366416 // re-throw the exception now.
367417 throw ex ;
368418 }
369419 }
370420
371- Map <String , MessageAttribute > buildMessageAttributesHash (String collection_name , String dataVersion , String status ) {
421+ Map <String , MessageAttribute > buildMessageAttributesHash (
422+ String collection_name ,
423+ String dataVersion ,
424+ String status ,
425+ @ Nullable String dataProcessingType ,
426+ @ Nullable String trace ) {
372427 Map <String , MessageAttribute > attributeBOMap = new HashMap <>();
428+
373429 MessageAttribute collectionNameBO = new MessageAttribute ();
374430 collectionNameBO .setType (MessageFilterTypeEnum .String );
375431 collectionNameBO .setValue (collection_name );
376432 attributeBOMap .put (this .COLLECTION_SHORT_NAME_ATTRIBUTE_KEY , collectionNameBO );
433+
377434 MessageAttribute statusBO = new MessageAttribute ();
378435 statusBO .setType (MessageFilterTypeEnum .String );
379436 statusBO .setValue (status );
380437 attributeBOMap .put (this .CNM_RESPONSE_STATUS_ATTRIBUTE_KEY , statusBO );
438+
381439 MessageAttribute dataVersionBO = new MessageAttribute ();
382440 dataVersionBO .setType (MessageFilterTypeEnum .String );
383441 dataVersionBO .setValue (dataVersion );
384442 attributeBOMap .put (this .DATA_VERSION_ATTRIBUTE_KEY , dataVersionBO );
443+
444+ if (null != dataProcessingType && !dataProcessingType .isEmpty ()){
445+ MessageAttribute dataProcessingTypeBO = new MessageAttribute ();
446+ dataProcessingTypeBO .setType (MessageFilterTypeEnum .String );
447+ dataProcessingTypeBO .setValue (dataProcessingType );
448+ attributeBOMap .put (this .DATA_PROCESSING_TYPE , dataProcessingTypeBO );
449+ }
450+
451+ if (null != trace && !trace .isEmpty ()){
452+ MessageAttribute traceBO = new MessageAttribute ();
453+ traceBO .setType (MessageFilterTypeEnum .String );
454+ traceBO .setValue (trace );
455+ attributeBOMap .put (this .TRACE , traceBO );
456+ }
457+
385458 return attributeBOMap ;
386459 }
387460}
0 commit comments