21
21
22
22
import org .apache .eventmesh .connector .http .sink .HttpSinkConnector ;
23
23
import org .apache .eventmesh .connector .http .sink .config .HttpSinkConfig ;
24
+ import org .apache .eventmesh .connector .http .sink .config .HttpWebhookConfig ;
24
25
import org .apache .eventmesh .openconnect .offsetmgmt .api .data .ConnectRecord ;
25
26
import org .apache .eventmesh .openconnect .offsetmgmt .api .data .RecordOffset ;
26
27
import org .apache .eventmesh .openconnect .offsetmgmt .api .data .RecordPartition ;
34
35
import org .junit .jupiter .api .AfterEach ;
35
36
import org .junit .jupiter .api .BeforeEach ;
36
37
import org .junit .jupiter .api .Test ;
37
- import org .mockserver .client .MockServerClient ;
38
38
import org .mockserver .integration .ClientAndServer ;
39
39
import org .mockserver .model .HttpRequest ;
40
40
import org .mockserver .model .HttpResponse ;
41
41
import org .mockserver .model .MediaType ;
42
- import org .mockserver .verify .VerificationTimes ;
43
-
44
- import io .vertx .core .http .HttpMethod ;
45
42
46
43
import com .alibaba .fastjson2 .JSON ;
44
+ import com .alibaba .fastjson2 .JSONArray ;
47
45
import com .alibaba .fastjson2 .JSONObject ;
48
46
47
+ import okhttp3 .HttpUrl ;
48
+ import okhttp3 .OkHttpClient ;
49
+ import okhttp3 .Request ;
50
+ import okhttp3 .Response ;
51
+ import okhttp3 .ResponseBody ;
52
+
49
53
public class HttpSinkConnectorTest {
50
54
51
55
private HttpSinkConnector sinkConnector ;
@@ -68,8 +72,7 @@ void before() throws Exception {
68
72
this .severUri = URI .create (sinkConfig .connectorConfig .getUrls ()[0 ]);
69
73
// start mockServer
70
74
mockServer = ClientAndServer .startClientAndServer (severUri .getPort ());
71
- // mockServer response
72
- new MockServerClient (severUri .getHost (), severUri .getPort ())
75
+ mockServer .reset ()
73
76
.when (
74
77
request ()
75
78
.withMethod ("POST" )
@@ -113,46 +116,49 @@ void testPut() throws Exception {
113
116
Thread .sleep (5000 );
114
117
115
118
// verify request
116
- new MockServerClient (severUri .getHost (), severUri .getPort ())
117
- .verify (
118
- HttpRequest .request ()
119
- .withMethod (HttpMethod .POST .name ())
120
- .withPath (severUri .getPath ()),
121
- VerificationTimes .exactly (times ));
122
-
123
- /*
124
- **The following code is only required in webhook mode**
119
+ HttpRequest [] recordedRequests = mockServer .retrieveRecordedRequests (null );
120
+ assert recordedRequests .length == times ;
125
121
126
122
// verify response
127
123
HttpWebhookConfig webhookConfig = sinkConfig .connectorConfig .getWebhookConfig ();
128
- URI uri = new URIBuilder()
129
- .setScheme("http")
130
- .setHost(severUri.getHost())
131
- .setPort(webhookConfig.getPort())
132
- .setPath(webhookConfig.getExportPath())
133
- .addParameter("pageNum", "1")
134
- .addParameter("pageSize", "10")
135
- .addParameter("type", "poll")
124
+ String url = new HttpUrl .Builder ()
125
+ .scheme ("http" )
126
+ .host (severUri .getHost ())
127
+ .port (webhookConfig .getPort ())
128
+ .addPathSegments (webhookConfig .getExportPath ())
129
+ .addQueryParameter ("pageNum" , "1" )
130
+ .addQueryParameter ("pageSize" , "10" )
131
+ .addQueryParameter ("type" , "poll" )
132
+ .build ().toString ();
133
+
134
+ // build request
135
+ Request request = new Request .Builder ()
136
+ .url (url )
137
+ .addHeader ("Content-Type" , "application/json" )
136
138
.build ();
137
139
138
- CloseableHttpClient httpClient = HttpClients.createDefault();
139
- HttpGet httpGet = new HttpGet(uri);
140
- httpGet.setHeader("Content-Type", "application/json");
141
- CloseableHttpResponse response = httpClient.execute(httpGet);
142
- String body = EntityUtils.toString(response.getEntity());
143
- assert body != null;
144
- JSONArray pageItems = JSON.parseObject(body).getJSONArray("pageItems");
145
- assert pageItems != null && pageItems.size() == times;
146
- for (int i = 0; i < times; i++) {
147
- JSONObject pageItem = pageItems.getJSONObject(i);
148
- assert pageItem != null;
149
- assert pageItem.getJSONObject("data") != null;
150
- assert pageItem.getJSONObject("metadata") != null;
140
+ OkHttpClient client = new OkHttpClient ();
141
+ try (Response response = client .newCall (request ).execute ()) {
142
+ // check response code
143
+ if (!response .isSuccessful ()) {
144
+ throw new RuntimeException ("Unexpected response code: " + response );
145
+ }
146
+ // check response body
147
+ ResponseBody responseBody = response .body ();
148
+ if (responseBody != null ) {
149
+ JSONObject jsonObject = JSON .parseObject (responseBody .string ());
150
+ JSONArray pageItems = jsonObject .getJSONArray ("pageItems" );
151
+
152
+ assert pageItems != null && pageItems .size () == times ;
153
+
154
+ for (int i = 0 ; i < times ; i ++) {
155
+ JSONObject pageItem = pageItems .getJSONObject (i );
156
+ assert pageItem != null ;
157
+ assert pageItem .getJSONObject ("data" ) != null ;
158
+ assert pageItem .getJSONObject ("metadata" ) != null ;
159
+ }
160
+ }
151
161
}
152
-
153
- httpClient.close();
154
-
155
- */
156
162
}
157
163
158
164
private ConnectRecord createConnectRecord () {
0 commit comments