@@ -85,9 +85,9 @@ public class DataPullRequestProcessor implements DataPullClientService {
85
85
86
86
private final ThreadPoolTaskScheduler scheduler ;
87
87
88
- private String userEmail ;
89
- private String failureEmail ;
88
+ HashMap <String ,String > successEmails = new HashMap <>();
90
89
90
+ HashMap <String ,String > failureEmails = new HashMap <>();
91
91
public DataPullRequestProcessor (){
92
92
scheduler = new ThreadPoolTaskScheduler ();
93
93
scheduler .setPoolSize (POOL_SIZE );
@@ -127,6 +127,10 @@ public void runSimpleDataPull(String awsenv, String pipelinename) {
127
127
}
128
128
129
129
private void runDataPull (String json , boolean isStart , boolean validateJson ) throws ProcessingException {
130
+
131
+ String userEmail ;
132
+ String failureEmail ;
133
+
130
134
String originalInputJson = json ;
131
135
json = extractUserJsonFromS3IfProvided (json , isStart );
132
136
@@ -148,6 +152,9 @@ private void runDataPull(String json, boolean isStart, boolean validateJson) thr
148
152
JsonNode jsonNode = objectMapper .readTree (json );
149
153
userEmail = null != jsonNode .get ("useremailaddress" ) ? jsonNode .get ("useremailaddress" ).asText (): "" ;
150
154
155
+ String taskId = jsonNode .get ("cluster" ).get ("awsenv" ).asText ().concat ("-emr-" ).concat (jsonNode .get ("cluster" ).get ("pipelinename" ).asText ()).concat ("-pipeline" );
156
+ successEmails .put (taskId ,userEmail );
157
+
151
158
JsonNode failureEmailNode = jsonNode .get ("failureemailaddress" );
152
159
if (StringUtils .isNotEmpty (userEmail )){
153
160
failureEmail = (failureEmailNode != null ) ? userEmail .concat ("," ).concat (failureEmailNode .asText ()): userEmail ;
@@ -156,7 +163,7 @@ private void runDataPull(String json, boolean isStart, boolean validateJson) thr
156
163
failureEmail = (failureEmailNode != null ) ? (failureEmailNode .asText ()): "" ;
157
164
158
165
}
159
-
166
+ failureEmails . put ( taskId , failureEmail );
160
167
161
168
162
169
List <Map .Entry <String , JsonNode >> result = new LinkedList <Map .Entry <String , JsonNode >>();
@@ -218,13 +225,13 @@ private void runDataPull(String json, boolean isStart, boolean validateJson) thr
218
225
log .debug ("runDataPull <- return" );
219
226
}
220
227
221
- public String successMailAddress () throws ProcessingException {
222
- return userEmail ;
228
+ public HashMap < String , String > successMailAddress () throws ProcessingException {
229
+ return successEmails ;
223
230
}
224
231
225
232
226
- public String failureMailAddress () throws ProcessingException {
227
- return failureEmail ;
233
+ public HashMap < String , String > failureMailAddress () throws ProcessingException {
234
+ return failureEmails ;
228
235
}
229
236
230
237
0 commit comments