Skip to content

Commit 0e36063

Browse files
authored
feat: put method in http postprocessor (#217)
* feat: put method in http postprocessor * feat: add endpointVariables functionality in http GET postprocessor * chore: bump version * add: unit test to parse endpointVariables * remove: generated source * add: include generated-sources in gitignore * fix: null string validation * doc: update documentation * refactor: use []Object instead of string to handle endpointVariables * remove: empty string validation * test: add more unit test for multiple endpointVariables case
1 parent e46f634 commit 0e36063

19 files changed

+282
-106
lines changed

.gitignore

+2-1
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@ target/
1212
bin
1313
.settings
1414
.gradletasknamecache
15-
.DS_Store
15+
.DS_Store
16+
dagger-common/src/generated-sources/

dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/HttpAsyncConnector.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,13 @@ protected void process(Row input, ResultFuture<Row> resultFuture) {
9898
.getVariablesValue(rowManager, ExternalPostProcessorVariableType.REQUEST_VARIABLES, httpSourceConfig.getRequestVariables(), resultFuture);
9999
Object[] dynamicHeaderVariablesValues = getEndpointHandler()
100100
.getVariablesValue(rowManager, ExternalPostProcessorVariableType.HEADER_VARIABLES, httpSourceConfig.getHeaderVariables(), resultFuture);
101+
Object[] endpointVariablesValues = getEndpointHandler()
102+
.getVariablesValue(rowManager, ExternalPostProcessorVariableType.ENDPOINT_VARIABLE, httpSourceConfig.getEndpointVariables(), resultFuture);
101103
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, httpSourceConfig.getRequestVariables(), requestVariablesValues) || getEndpointHandler().isQueryInvalid(resultFuture, rowManager, httpSourceConfig.getHeaderVariables(), dynamicHeaderVariablesValues)) {
102104
return;
103105
}
104106

105-
BoundRequestBuilder request = HttpRequestFactory.createRequest(httpSourceConfig, httpClient, requestVariablesValues, dynamicHeaderVariablesValues);
107+
BoundRequestBuilder request = HttpRequestFactory.createRequest(httpSourceConfig, httpClient, requestVariablesValues, dynamicHeaderVariablesValues, endpointVariablesValues);
106108
HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, getMeterStatsManager(),
107109
rowManager, getColumnNameManager(), getOutputDescriptor(resultFuture), resultFuture, getErrorReporter(), new PostResponseTelemetry());
108110
httpResponseHandler.startTimer();

dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/HttpSourceConfig.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
public class HttpSourceConfig implements Serializable, SourceConfig {
1919
private String endpoint;
20+
private String endpointVariables;
2021
private String verb;
2122
private String requestPattern;
2223
private String requestVariables;
@@ -39,6 +40,7 @@ public class HttpSourceConfig implements Serializable, SourceConfig {
3940
* Instantiates a new Http source config.
4041
*
4142
* @param endpoint the endpoint
43+
* @param endpointVariables the endpoint variables
4244
* @param verb the verb
4345
* @param requestPattern the request pattern
4446
* @param requestVariables the request variables
@@ -54,8 +56,9 @@ public class HttpSourceConfig implements Serializable, SourceConfig {
5456
* @param metricId the metric id
5557
* @param retainResponseType the retain response type
5658
*/
57-
public HttpSourceConfig(String endpoint, String verb, String requestPattern, String requestVariables, String headerPattern, String headerVariables, String streamTimeout, String connectTimeout, boolean failOnErrors, String type, String capacity, Map<String, String> headers, Map<String, OutputMapping> outputMapping, String metricId, boolean retainResponseType) {
59+
public HttpSourceConfig(String endpoint, String endpointVariables, String verb, String requestPattern, String requestVariables, String headerPattern, String headerVariables, String streamTimeout, String connectTimeout, boolean failOnErrors, String type, String capacity, Map<String, String> headers, Map<String, OutputMapping> outputMapping, String metricId, boolean retainResponseType) {
5860
this.endpoint = endpoint;
61+
this.endpointVariables = endpointVariables;
5962
this.verb = verb;
6063
this.requestPattern = requestPattern;
6164
this.requestVariables = requestVariables;
@@ -90,6 +93,16 @@ public String getEndpoint() {
9093
return endpoint;
9194
}
9295

96+
/**
97+
* Gets endpoint variables.
98+
*
99+
* @return the endpointVariables
100+
*/
101+
public String getEndpointVariables() {
102+
return endpointVariables;
103+
}
104+
105+
93106
/**
94107
* Gets verb.
95108
*
@@ -237,6 +250,6 @@ public boolean equals(Object o) {
237250

238251
@Override
239252
public int hashCode() {
240-
return Objects.hash(endpoint, verb, requestPattern, requestVariables, headerPattern, headerVariables, streamTimeout, connectTimeout, failOnErrors, type, capacity, headers, outputMapping, metricId, retainResponseType);
253+
return Objects.hash(endpoint, endpointVariables, verb, requestPattern, requestVariables, headerPattern, headerVariables, streamTimeout, connectTimeout, failOnErrors, type, capacity, headers, outputMapping, metricId, retainResponseType);
241254
}
242255
}

dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/request/HttpGetRequestHandler.java

+9-5
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,29 @@ public class HttpGetRequestHandler implements HttpRequestHandler {
1919
private AsyncHttpClient httpClient;
2020
private Object[] requestVariablesValues;
2121
private Object[] dynamicHeaderVariablesValues;
22+
private Object[] endpointVariablesValues;
2223

2324
/**
2425
* Instantiates a new Http get request handler.
2526
*
26-
* @param httpSourceConfig the http source config
27-
* @param httpClient the http client
28-
* @param requestVariablesValues the request variables values
27+
* @param httpSourceConfig the http source config
28+
* @param httpClient the http client
29+
* @param requestVariablesValues the request variables values
30+
* @param endpointVariablesValues the request variables values
2931
*/
30-
public HttpGetRequestHandler(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues, Object[] dynamicHeaderVariablesValues) {
32+
public HttpGetRequestHandler(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues, Object[] dynamicHeaderVariablesValues, Object[] endpointVariablesValues) {
3133
this.httpSourceConfig = httpSourceConfig;
3234
this.httpClient = httpClient;
3335
this.requestVariablesValues = requestVariablesValues;
3436
this.dynamicHeaderVariablesValues = dynamicHeaderVariablesValues;
37+
this.endpointVariablesValues = endpointVariablesValues;
3538
}
3639

3740
@Override
3841
public BoundRequestBuilder create() {
3942
String endpointPath = String.format(httpSourceConfig.getPattern(), requestVariablesValues);
40-
String endpoint = httpSourceConfig.getEndpoint();
43+
String endpoint = String.format(httpSourceConfig.getEndpoint(), endpointVariablesValues);
44+
4145
String requestEndpoint = endpoint + endpointPath;
4246
BoundRequestBuilder getRequest = httpClient.prepareGet(requestEndpoint);
4347
Map<String, String> headers = httpSourceConfig.getHeaders();

dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/request/HttpPostRequestHandler.java

+9-5
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,28 @@ public class HttpPostRequestHandler implements HttpRequestHandler {
1919
private AsyncHttpClient httpClient;
2020
private Object[] requestVariablesValues;
2121
private Object[] dynamicHeaderVariablesValues;
22+
private Object[] endpointVariablesValues;
2223
/**
2324
* Instantiates a new Http post request handler.
2425
*
25-
* @param httpSourceConfig the http source config
26-
* @param httpClient the http client
27-
* @param requestVariablesValues the request variables values
26+
* @param httpSourceConfig the http source config
27+
* @param httpClient the http client
28+
* @param requestVariablesValues the request variables values
29+
* @param endpointVariablesValues the endpoint variables values
2830
*/
29-
public HttpPostRequestHandler(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues, Object[] dynamicHeaderVariablesValues) {
31+
public HttpPostRequestHandler(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues, Object[] dynamicHeaderVariablesValues, Object[] endpointVariablesValues) {
3032
this.httpSourceConfig = httpSourceConfig;
3133
this.httpClient = httpClient;
3234
this.requestVariablesValues = requestVariablesValues;
3335
this.dynamicHeaderVariablesValues = dynamicHeaderVariablesValues;
36+
this.endpointVariablesValues = endpointVariablesValues;
3437
}
3538

3639
@Override
3740
public BoundRequestBuilder create() {
3841
String requestBody = String.format(httpSourceConfig.getPattern(), requestVariablesValues);
39-
String endpoint = httpSourceConfig.getEndpoint();
42+
String endpoint = String.format(httpSourceConfig.getEndpoint(), endpointVariablesValues);
43+
4044
BoundRequestBuilder postRequest = httpClient
4145
.preparePost(endpoint)
4246
.setBody(requestBody);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package io.odpf.dagger.core.processors.external.http.request;
2+
3+
import com.google.gson.Gson;
4+
import io.netty.util.internal.StringUtil;
5+
import io.odpf.dagger.core.exception.InvalidConfigurationException;
6+
import io.odpf.dagger.core.processors.external.http.HttpSourceConfig;
7+
import org.asynchttpclient.AsyncHttpClient;
8+
import org.asynchttpclient.BoundRequestBuilder;
9+
10+
import java.util.HashMap;
11+
import java.util.Map;
12+
import java.util.UnknownFormatConversionException;
13+
14+
/**
15+
* The Http post request handler.
16+
*/
17+
public class HttpPutRequestHandler implements HttpRequestHandler {
18+
private HttpSourceConfig httpSourceConfig;
19+
private AsyncHttpClient httpClient;
20+
private Object[] requestVariablesValues;
21+
private Object[] dynamicHeaderVariablesValues;
22+
private Object[] endpointVariablesValues;
23+
/**
24+
* Instantiates a new Http post request handler.
25+
*
26+
* @param httpSourceConfig the http source config
27+
* @param httpClient the http client
28+
* @param requestVariablesValues the request variables values
29+
* @param endpointVariablesValues the endpoint variables values
30+
*/
31+
public HttpPutRequestHandler(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues, Object[] dynamicHeaderVariablesValues, Object[] endpointVariablesValues) {
32+
this.httpSourceConfig = httpSourceConfig;
33+
this.httpClient = httpClient;
34+
this.requestVariablesValues = requestVariablesValues;
35+
this.dynamicHeaderVariablesValues = dynamicHeaderVariablesValues;
36+
this.endpointVariablesValues = endpointVariablesValues;
37+
}
38+
39+
@Override
40+
public BoundRequestBuilder create() {
41+
String requestBody = String.format(httpSourceConfig.getPattern(), requestVariablesValues);
42+
String endpoint = String.format(httpSourceConfig.getEndpoint(), endpointVariablesValues);
43+
44+
BoundRequestBuilder putRequest = httpClient
45+
.preparePut(endpoint)
46+
.setBody(requestBody);
47+
Map<String, String> headers = httpSourceConfig.getHeaders();
48+
if (!StringUtil.isNullOrEmpty(httpSourceConfig.getHeaderPattern())) {
49+
try {
50+
String dynamicHeader = String.format(httpSourceConfig.getHeaderPattern(), dynamicHeaderVariablesValues);
51+
headers.putAll(new Gson().fromJson(dynamicHeader, HashMap.class));
52+
} catch (UnknownFormatConversionException e) {
53+
throw new InvalidConfigurationException(String.format("pattern config '%s' is invalid", httpSourceConfig.getHeaderPattern()));
54+
} catch (IllegalArgumentException e) {
55+
throw new InvalidConfigurationException(String.format("pattern config '%s' is incompatible with the variable config '%s'", httpSourceConfig.getHeaderPattern(), httpSourceConfig.getHeaderVariables()));
56+
}
57+
}
58+
return addHeaders(putRequest, headers);
59+
}
60+
61+
@Override
62+
public boolean canCreate() {
63+
return httpSourceConfig.getVerb().equalsIgnoreCase("put");
64+
}
65+
}

dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/request/HttpRequestFactory.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@ public class HttpRequestFactory {
1919
* @param requestVariablesValues the request variables values
2020
* @return the bound request builder
2121
*/
22-
public static BoundRequestBuilder createRequest(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues, Object[] headerVariablesValues) {
22+
public static BoundRequestBuilder createRequest(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues, Object[] headerVariablesValues, Object[] endpointVariablesValues) {
2323

2424
ArrayList<HttpRequestHandler> httpRequestHandlers = new ArrayList<>();
25-
httpRequestHandlers.add(new HttpPostRequestHandler(httpSourceConfig, httpClient, requestVariablesValues, headerVariablesValues));
26-
httpRequestHandlers.add(new HttpGetRequestHandler(httpSourceConfig, httpClient, requestVariablesValues, headerVariablesValues));
25+
httpRequestHandlers.add(new HttpPostRequestHandler(httpSourceConfig, httpClient, requestVariablesValues, headerVariablesValues, endpointVariablesValues));
26+
httpRequestHandlers.add(new HttpGetRequestHandler(httpSourceConfig, httpClient, requestVariablesValues, headerVariablesValues, endpointVariablesValues));
27+
httpRequestHandlers.add(new HttpPutRequestHandler(httpSourceConfig, httpClient, requestVariablesValues, headerVariablesValues, endpointVariablesValues));
2728

2829
HttpRequestHandler httpRequestHandler = httpRequestHandlers
2930
.stream()

dagger-core/src/test/java/io/odpf/dagger/core/processors/PostProcessorConfigTest.java

+16-2
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public void shouldReturnHttpExternalSourceConfig() {
6464
outputMapping = new OutputMapping("$.data.tensor.values[0]");
6565
outputMappings.put("surge_factor", outputMapping);
6666

67-
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8000", "post", null, null, null, null, "5000", "5000", true, null, null, headerMap, outputMappings, null, false);
67+
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8000", "", "post", null, null, null, null, "5000", "5000", true, null, null, headerMap, outputMappings, null, false);
6868

6969
assertEquals(httpSourceConfig, defaultPostProcessorConfig.getExternalSource().getHttpConfig().get(0));
7070
}
@@ -120,7 +120,7 @@ public void shouldBeEmptyWhenNoneOfTheConfigsExist() {
120120
@Test
121121
public void shouldNotBeEmptyWhenExternalSourceHasHttpConfigExist() {
122122
ArrayList<HttpSourceConfig> http = new ArrayList<>();
123-
http.add(new HttpSourceConfig("", "", "", "", "", "", "", "", false, "", "", new HashMap<>(), new HashMap<>(), "metricId_01", false));
123+
http.add(new HttpSourceConfig("", "", "", "", "", "", "", "", "", false, "", "", new HashMap<>(), new HashMap<>(), "metricId_01", false));
124124
ArrayList<EsSourceConfig> es = new ArrayList<>();
125125
ArrayList<PgSourceConfig> pg = new ArrayList<>();
126126
ExternalSourceConfig externalSourceConfig = new ExternalSourceConfig(http, es, pg, new ArrayList<>());
@@ -290,4 +290,18 @@ public void shouldParseInternalProcessorConfigForInternalSourceConfig() {
290290
PostProcessorConfig postProcessorConfig = PostProcessorConfig.parse(configuration);
291291
assertNotNull(postProcessorConfig.getInternalSource().get(0).getInternalProcessorConfig());
292292
}
293+
294+
@Test
295+
public void shouldParseEndpointVariablesConfig() {
296+
String configuration = "{\"external_source\":{\"es\":[{\"host\":\"localhost\",\"port\":\"9200\",\"output_mapping\":{\"customer_profile\":{\"path\":\"$._source\"}},\"endpoint_pattern\":\"/customers/customer/%s\",\"endpoint_variables\":\"customer_id\",\"retry_timeout\":\"5000\",\"socket_timeout\":\"6000\",\"stream_timeout\":\"5000\",\"type\":\"TestLogMessage\"}],\"http\":[{\"body_column_from_sql\":\"request_body\",\"connect_timeout\":\"5000\",\"endpoint\":\"http://localhost:8000/%s\",\"endpoint_variables\":\"some-id\",\"fail_on_errors\":\"true\",\"headers\":{\"content-type\":\"application/json\"},\"output_mapping\":{\"surge_factor\":{\"path\":\"$.data.tensor.values[0]\"}},\"stream_timeout\":\"5000\",\"verb\":\"put\"}]},\"internal_source\":[{\"output_field\":\"event_timestamp\",\"value\":\"CURRENT_TIMESTAMP\",\"type\":\"function\"},{\"output_field\":\"s2_id_level\",\"value\":\"7\",\"type\":\"constant\"}],\"transformers\":[{\"transformation_arguments\":{\"keyColumnName\":\"s2id\",\"valueColumnName\":\"features\"},\"transformation_class\":\"test.postprocessor.FeatureTransformer\"}]}";
297+
PostProcessorConfig postProcessorConfig = PostProcessorConfig.parse(configuration);
298+
assertEquals("some-id", postProcessorConfig.getExternalSource().getHttpConfig().get(0).getEndpointVariables());
299+
}
300+
301+
@Test
302+
public void shouldParseEmptyEndpointVariablesConfig() {
303+
String configuration = "{\"external_source\":{\"es\":[{\"host\":\"localhost\",\"port\":\"9200\",\"output_mapping\":{\"customer_profile\":{\"path\":\"$._source\"}},\"endpoint_pattern\":\"/customers/customer/%s\",\"endpoint_variables\":\"customer_id\",\"retry_timeout\":\"5000\",\"socket_timeout\":\"6000\",\"stream_timeout\":\"5000\",\"type\":\"TestLogMessage\"}],\"http\":[{\"body_column_from_sql\":\"request_body\",\"connect_timeout\":\"5000\",\"endpoint\":\"http://localhost:8000/%s\",\"fail_on_errors\":\"true\",\"headers\":{\"content-type\":\"application/json\"},\"output_mapping\":{\"surge_factor\":{\"path\":\"$.data.tensor.values[0]\"}},\"stream_timeout\":\"5000\",\"verb\":\"put\"}]},\"internal_source\":[{\"output_field\":\"event_timestamp\",\"value\":\"CURRENT_TIMESTAMP\",\"type\":\"function\"},{\"output_field\":\"s2_id_level\",\"value\":\"7\",\"type\":\"constant\"}],\"transformers\":[{\"transformation_arguments\":{\"keyColumnName\":\"s2id\",\"valueColumnName\":\"features\"},\"transformation_class\":\"test.postprocessor.FeatureTransformer\"}]}";
304+
PostProcessorConfig postProcessorConfig = PostProcessorConfig.parse(configuration);
305+
assertEquals(null, postProcessorConfig.getExternalSource().getHttpConfig().get(0).getEndpointVariables());
306+
}
293307
}

dagger-core/src/test/java/io/odpf/dagger/core/processors/external/ExternalPostProcessorTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public void setup() {
7272
HashMap<String, OutputMapping> httpColumnNames = new HashMap<>();
7373
httpColumnNames.put("http_field_1", new OutputMapping(""));
7474
httpColumnNames.put("http_field_2", new OutputMapping(""));
75-
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "POST", "/some/patttern/%s", "variable", "", "", "123", "234", false, "type", "20", new HashMap<>(), httpColumnNames, "metricId_01", false);
75+
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "", "POST", "/some/patttern/%s", "variable", "", "", "123", "234", false, "type", "20", new HashMap<>(), httpColumnNames, "metricId_01", false);
7676
HashMap<String, OutputMapping> esOutputMapping = new HashMap<>();
7777
esOutputMapping.put("es_field_1", new OutputMapping(""));
7878
EsSourceConfig esSourceConfig = new EsSourceConfig("host", "port", "", "", "endpointPattern",
@@ -135,7 +135,7 @@ public void shouldProcessWithRightConfiguration() {
135135
outputMapping.put("order_id", new OutputMapping("path"));
136136

137137
List<HttpSourceConfig> httpSourceConfigs = new ArrayList<>();
138-
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "POST", "/some/patttern/%s", "variable", "", "", "123", "234", false, "type", "20", new HashMap<>(), outputMapping, "metricId_01", false);
138+
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "", "POST", "/some/patttern/%s", "variable", "", "", "123", "234", false, "type", "20", new HashMap<>(), outputMapping, "metricId_01", false);
139139
httpSourceConfigs.add(httpSourceConfig);
140140

141141
List<EsSourceConfig> esSourceConfigs = new ArrayList<>();

dagger-core/src/test/java/io/odpf/dagger/core/processors/external/ExternalSourceConfigTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public void setUp() {
2929
HashMap<String, OutputMapping> httpOutputMapping = new HashMap<>();
3030
httpOutputMapping.put("http_field_1", new OutputMapping(""));
3131
httpOutputMapping.put("http_field_2", new OutputMapping(""));
32-
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "POST", "/some/patttern/%s", "variable", "", "", "123", "234", false, "type", "20", new HashMap<>(), httpOutputMapping, "metricId_01", false);
32+
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "", "POST", "/some/patttern/%s", "variable", "", "", "123", "234", false, "type", "20", new HashMap<>(), httpOutputMapping, "metricId_01", false);
3333
http = new ArrayList<>();
3434
http.add(httpSourceConfig);
3535
es = new ArrayList<>();

0 commit comments

Comments
 (0)