@@ -90,7 +90,7 @@ public AsyncHTTPPushRequest(HandleMsgContext handleMsgContext,
90
90
91
91
@ Override
92
92
public void tryHTTPRequest () {
93
- String localAddress = IPUtils . getLocalAddress ();
93
+
94
94
currPushUrl = getUrl ();
95
95
96
96
if (StringUtils .isBlank (currPushUrl )) {
@@ -100,31 +100,30 @@ public void tryHTTPRequest() {
100
100
HttpPost builder = new HttpPost (currPushUrl );
101
101
102
102
String requestCode = "" ;
103
-
104
103
if (SubscriptionType .SYNC == handleMsgContext .getSubscriptionItem ().getType ()) {
105
104
requestCode = String .valueOf (RequestCode .HTTP_PUSH_CLIENT_SYNC .getRequestCode ());
106
105
} else {
107
106
requestCode = String .valueOf (RequestCode .HTTP_PUSH_CLIENT_ASYNC .getRequestCode ());
108
107
}
109
-
108
+ String localAddress = IPUtils . getLocalAddress ();
110
109
builder .addHeader (ProtocolKey .REQUEST_CODE , requestCode );
111
110
builder .addHeader (ProtocolKey .LANGUAGE , Constants .LANGUAGE_JAVA );
112
111
builder .addHeader (ProtocolKey .VERSION , ProtocolVersion .V1 .getVersion ());
113
112
builder .addHeader (ProtocolKey .EventMeshInstanceKey .EVENTMESHCLUSTER ,
114
- handleMsgContext .getEventMeshHTTPServer ()
115
- .getEventMeshHttpConfiguration ().getEventMeshCluster ());
113
+ handleMsgContext .getEventMeshHTTPServer ()
114
+ .getEventMeshHttpConfiguration ().getEventMeshCluster ());
116
115
builder .addHeader (ProtocolKey .EventMeshInstanceKey .EVENTMESHIP , localAddress );
117
116
builder .addHeader (ProtocolKey .EventMeshInstanceKey .EVENTMESHENV ,
118
- handleMsgContext .getEventMeshHTTPServer ().getEventMeshHttpConfiguration ().getEventMeshEnv ());
117
+ handleMsgContext .getEventMeshHTTPServer ().getEventMeshHttpConfiguration ().getEventMeshEnv ());
119
118
builder .addHeader (ProtocolKey .EventMeshInstanceKey .EVENTMESHIDC ,
120
- handleMsgContext .getEventMeshHTTPServer ().getEventMeshHttpConfiguration ().getEventMeshIDC ());
119
+ handleMsgContext .getEventMeshHTTPServer ().getEventMeshHttpConfiguration ().getEventMeshIDC ());
121
120
122
121
CloudEvent event = CloudEventBuilder .from (handleMsgContext .getEvent ())
123
- .withExtension (EventMeshConstants .REQ_EVENTMESH2C_TIMESTAMP ,
124
- String .valueOf (System .currentTimeMillis ()))
125
- .withExtension (EventMeshConstants .RSP_URL , currPushUrl )
126
- .withExtension (EventMeshConstants .RSP_GROUP , handleMsgContext .getConsumerGroup ())
127
- .build ();
122
+ .withExtension (EventMeshConstants .REQ_EVENTMESH2C_TIMESTAMP ,
123
+ String .valueOf (System .currentTimeMillis ()))
124
+ .withExtension (EventMeshConstants .RSP_URL , currPushUrl )
125
+ .withExtension (EventMeshConstants .RSP_GROUP , handleMsgContext .getConsumerGroup ())
126
+ .build ();
128
127
handleMsgContext .setEvent (event );
129
128
130
129
String content = "" ;
@@ -134,7 +133,7 @@ public void tryHTTPRequest() {
134
133
ProtocolAdaptor <ProtocolTransportObject > protocolAdaptor = ProtocolPluginFactory .getProtocolAdaptor (protocolType );
135
134
136
135
ProtocolTransportObject protocolTransportObject =
137
- protocolAdaptor .fromCloudEvent (handleMsgContext .getEvent ());
136
+ protocolAdaptor .fromCloudEvent (handleMsgContext .getEvent ());
138
137
if (protocolTransportObject instanceof HttpCommand ) {
139
138
content = ((HttpCommand ) protocolTransportObject ).getBody ().toMap ().get ("content" ).toString ();
140
139
} else {
@@ -158,37 +157,37 @@ public void tryHTTPRequest() {
158
157
body .add (new BasicNameValuePair (PushMessageRequestBody .CONTENT , content ));
159
158
if (StringUtils .isBlank (handleMsgContext .getBizSeqNo ())) {
160
159
body .add (new BasicNameValuePair (PushMessageRequestBody .BIZSEQNO ,
161
- RandomStringUtils .generateNum (20 )));
160
+ RandomStringUtils .generateNum (20 )));
162
161
} else {
163
162
body .add (new BasicNameValuePair (PushMessageRequestBody .BIZSEQNO ,
164
- handleMsgContext .getBizSeqNo ()));
163
+ handleMsgContext .getBizSeqNo ()));
165
164
}
166
165
if (StringUtils .isBlank (handleMsgContext .getUniqueId ())) {
167
166
body .add (new BasicNameValuePair (PushMessageRequestBody .UNIQUEID ,
168
- RandomStringUtils .generateNum (20 )));
167
+ RandomStringUtils .generateNum (20 )));
169
168
} else {
170
169
body .add (new BasicNameValuePair (PushMessageRequestBody .UNIQUEID ,
171
- handleMsgContext .getUniqueId ()));
170
+ handleMsgContext .getUniqueId ()));
172
171
}
173
172
174
173
body .add (new BasicNameValuePair (PushMessageRequestBody .RANDOMNO ,
175
- handleMsgContext .getMsgRandomNo ()));
174
+ handleMsgContext .getMsgRandomNo ()));
176
175
body .add (new BasicNameValuePair (PushMessageRequestBody .TOPIC , handleMsgContext .getTopic ()));
177
176
178
177
body .add (new BasicNameValuePair (PushMessageRequestBody .EXTFIELDS ,
179
- JsonUtils .serialize (EventMeshUtil .getEventProp (handleMsgContext .getEvent ()))));
178
+ JsonUtils .serialize (EventMeshUtil .getEventProp (handleMsgContext .getEvent ()))));
180
179
181
180
HttpEntity httpEntity = new UrlEncodedFormEntity (body , StandardCharsets .UTF_8 );
182
181
183
182
builder .setEntity (httpEntity );
184
183
185
184
// for CloudEvents Webhook spec
186
185
String urlAuthType = handleMsgContext .getConsumerGroupConfig ().getConsumerGroupTopicConf ()
187
- .get (handleMsgContext .getTopic ()).getHttpAuthTypeMap ().get (currPushUrl );
186
+ .get (handleMsgContext .getTopic ()).getHttpAuthTypeMap ().get (currPushUrl );
188
187
189
188
WebhookUtil .setWebhookHeaders (builder , httpEntity .getContentType ().getValue (),
190
- eventMeshHttpConfiguration .getEventMeshWebhookOrigin (),
191
- urlAuthType );
189
+ eventMeshHttpConfiguration .getEventMeshWebhookOrigin (),
190
+ urlAuthType );
192
191
193
192
194
193
eventMeshHTTPServer .metrics .getSummaryMetrics ().recordPushMsg ();
@@ -199,7 +198,7 @@ public void tryHTTPRequest() {
199
198
200
199
if (CMD_LOGGER .isInfoEnabled ()) {
201
200
CMD_LOGGER .info ("cmd={}|eventMesh2client|from={}|to={}" , requestCode ,
202
- localAddress , currPushUrl );
201
+ localAddress , currPushUrl );
203
202
}
204
203
205
204
try {
@@ -215,18 +214,18 @@ public Object handleResponse(HttpResponse response) {
215
214
String res = "" ;
216
215
try {
217
216
res = EntityUtils .toString (response .getEntity (),
218
- Charset .forName (EventMeshConstants .DEFAULT_CHARSET ));
217
+ Charset .forName (EventMeshConstants .DEFAULT_CHARSET ));
219
218
} catch (IOException e ) {
220
219
handleMsgContext .finish ();
221
220
return new Object ();
222
221
}
223
222
ClientRetCode result = processResponseContent (res );
224
223
if (MESSAGE_LOGGER .isInfoEnabled ()) {
225
224
MESSAGE_LOGGER .info (
226
- "message|eventMesh2client|{}|url={}|topic={}|bizSeqNo={}"
227
- + "|uniqueId={}|cost={}" ,
228
- result , currPushUrl , handleMsgContext .getTopic (),
229
- handleMsgContext .getBizSeqNo (), handleMsgContext .getUniqueId (), cost );
225
+ "message|eventMesh2client|{}|url={}|topic={}|bizSeqNo={}"
226
+ + "|uniqueId={}|cost={}" ,
227
+ result , currPushUrl , handleMsgContext .getTopic (),
228
+ handleMsgContext .getBizSeqNo (), handleMsgContext .getUniqueId (), cost );
230
229
}
231
230
if (result == ClientRetCode .OK || result == ClientRetCode .REMOTE_OK ) {
232
231
complete ();
@@ -253,9 +252,9 @@ public Object handleResponse(HttpResponse response) {
253
252
eventMeshHTTPServer .metrics .getSummaryMetrics ().recordHttpPushMsgFailed ();
254
253
if (MESSAGE_LOGGER .isInfoEnabled ()) {
255
254
MESSAGE_LOGGER .info (
256
- "message|eventMesh2client|exception|url={}|topic={}|bizSeqNo={}"
257
- + "|uniqueId={}|cost={}" , currPushUrl , handleMsgContext .getTopic (),
258
- handleMsgContext .getBizSeqNo (), handleMsgContext .getUniqueId (), cost );
255
+ "message|eventMesh2client|exception|url={}|topic={}|bizSeqNo={}"
256
+ + "|uniqueId={}|cost={}" , currPushUrl , handleMsgContext .getTopic (),
257
+ handleMsgContext .getBizSeqNo (), handleMsgContext .getUniqueId (), cost );
259
258
}
260
259
261
260
if (isComplete ()) {
@@ -268,14 +267,14 @@ public Object handleResponse(HttpResponse response) {
268
267
269
268
if (MESSAGE_LOGGER .isDebugEnabled ()) {
270
269
MESSAGE_LOGGER .debug ("message|eventMesh2client|url={}|topic={}|event={}" , currPushUrl ,
271
- handleMsgContext .getTopic (),
272
- handleMsgContext .getEvent ());
270
+ handleMsgContext .getTopic (),
271
+ handleMsgContext .getEvent ());
273
272
} else {
274
273
if (MESSAGE_LOGGER .isInfoEnabled ()) {
275
274
MESSAGE_LOGGER
276
- .info ("message|eventMesh2client|url={}|topic={}|bizSeqNo={}|uniqueId={}" ,
277
- currPushUrl , handleMsgContext .getTopic (),
278
- handleMsgContext .getBizSeqNo (), handleMsgContext .getUniqueId ());
275
+ .info ("message|eventMesh2client|url={}|topic={}|bizSeqNo={}|uniqueId={}" ,
276
+ currPushUrl , handleMsgContext .getTopic (),
277
+ handleMsgContext .getBizSeqNo (), handleMsgContext .getUniqueId ());
279
278
}
280
279
}
281
280
} catch (IOException e ) {
@@ -292,22 +291,22 @@ public Object handleResponse(HttpResponse response) {
292
291
public String toString () {
293
292
StringBuilder sb = new StringBuilder ();
294
293
sb .append ("asyncPushRequest={" )
295
- .append ("bizSeqNo=" ).append (handleMsgContext .getBizSeqNo ())
296
- .append (",startIdx=" ).append (startIdx )
297
- .append (",retryTimes=" ).append (retryTimes )
298
- .append (",uniqueId=" ).append (handleMsgContext .getUniqueId ())
299
- .append (",executeTime=" )
300
- .append (DateFormatUtils .format (executeTime , Constants .DATE_FORMAT ))
301
- .append (",lastPushTime=" )
302
- .append (DateFormatUtils .format (lastPushTime , Constants .DATE_FORMAT ))
303
- .append (",createTime=" )
304
- .append (DateFormatUtils .format (createTime , Constants .DATE_FORMAT )).append ("}" );
294
+ .append ("bizSeqNo=" ).append (handleMsgContext .getBizSeqNo ())
295
+ .append (",startIdx=" ).append (startIdx )
296
+ .append (",retryTimes=" ).append (retryTimes )
297
+ .append (",uniqueId=" ).append (handleMsgContext .getUniqueId ())
298
+ .append (",executeTime=" )
299
+ .append (DateFormatUtils .format (executeTime , Constants .DATE_FORMAT ))
300
+ .append (",lastPushTime=" )
301
+ .append (DateFormatUtils .format (lastPushTime , Constants .DATE_FORMAT ))
302
+ .append (",createTime=" )
303
+ .append (DateFormatUtils .format (createTime , Constants .DATE_FORMAT )).append ("}" );
305
304
return sb .toString ();
306
305
}
307
306
308
307
boolean processResponseStatus (int httpStatus , HttpResponse httpResponse ) {
309
308
if (httpStatus == HttpStatus .SC_OK || httpStatus == HttpStatus .SC_CREATED
310
- || httpStatus == HttpStatus .SC_NO_CONTENT || httpStatus == HttpStatus .SC_ACCEPTED ) {
309
+ || httpStatus == HttpStatus .SC_NO_CONTENT || httpStatus == HttpStatus .SC_ACCEPTED ) {
311
310
// success http response
312
311
return true ;
313
312
} else if (httpStatus == 429 ) {
@@ -337,8 +336,8 @@ ClientRetCode processResponseContent(String content) {
337
336
338
337
try {
339
338
Map <String , Object > ret =
340
- JsonUtils .deserialize (content , new TypeReference <Map <String , Object >>() {
341
- });
339
+ JsonUtils .deserialize (content , new TypeReference <Map <String , Object >>() {
340
+ });
342
341
Integer retCode = (Integer ) ret .get ("retCode" );
343
342
if (retCode != null && ClientRetCode .contains (retCode )) {
344
343
return ClientRetCode .get (retCode );
@@ -348,19 +347,19 @@ ClientRetCode processResponseContent(String content) {
348
347
} catch (NumberFormatException e ) {
349
348
if (MESSAGE_LOGGER .isWarnEnabled ()) {
350
349
MESSAGE_LOGGER .warn ("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}" , currPushUrl ,
351
- handleMsgContext .getBizSeqNo (), handleMsgContext .getUniqueId (), content );
350
+ handleMsgContext .getBizSeqNo (), handleMsgContext .getUniqueId (), content );
352
351
}
353
352
return ClientRetCode .FAIL ;
354
353
} catch (JsonException e ) {
355
354
if (MESSAGE_LOGGER .isWarnEnabled ()) {
356
355
MESSAGE_LOGGER .warn ("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}" , currPushUrl ,
357
- handleMsgContext .getBizSeqNo (), handleMsgContext .getUniqueId (), content );
356
+ handleMsgContext .getBizSeqNo (), handleMsgContext .getUniqueId (), content );
358
357
}
359
358
return ClientRetCode .FAIL ;
360
359
} catch (Throwable t ) {
361
360
if (MESSAGE_LOGGER .isWarnEnabled ()) {
362
361
MESSAGE_LOGGER .warn ("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}" , currPushUrl ,
363
- handleMsgContext .getBizSeqNo (), handleMsgContext .getUniqueId (), content );
362
+ handleMsgContext .getBizSeqNo (), handleMsgContext .getUniqueId (), content );
364
363
}
365
364
return ClientRetCode .FAIL ;
366
365
}
@@ -372,7 +371,7 @@ private void addToWaitingMap(AsyncHTTPPushRequest request) {
372
371
return ;
373
372
}
374
373
waitingRequests
375
- .put (request .handleMsgContext .getConsumerGroup (), Sets .newConcurrentHashSet ());
374
+ .put (request .handleMsgContext .getConsumerGroup (), Sets .newConcurrentHashSet ());
376
375
waitingRequests .get (request .handleMsgContext .getConsumerGroup ()).add (request );
377
376
}
378
377
0 commit comments