diff --git a/docs/HTTP-transform.md b/docs/HTTP-transform.md
new file mode 100644
index 00000000..cd02c40a
--- /dev/null
+++ b/docs/HTTP-transform.md
@@ -0,0 +1,423 @@
+# HTTP Transform
+
+Description
+-----------
+This plugin reads data from HTTP/HTTPS page with an URL dynamically changing based on input data.
+Paginated APIs are supported. For paginated APIs plugin reads available data and than waits for new pages to appear.
+Data in JSON, XML, CSV, TSV, TEXT and BLOB formats is supported.
+
+Properties
+----------
+
+### General
+
+**Reference Name:** Name used to uniquely identify this source for lineage, annotating metadata, etc.
+
+**URL:** Url to fetch to the first page.
+The url must start with a protocol (e.g. http://).
+
+**HTTP Method:** HTTP request method.
+
+**Headers:** Headers to send with each HTTP request.
+
+**Request body:** Body to send with each HTTP request.
+
+**Reused Input Fields:** List of fields to retrieve from the input record. The output record will be a concatenation of fields from the response of the HTTP query and the fields from the input record specified here.
+
+**Rename Reused Input Fields:** Mapping to rename fields reused from the input record. It is not mandatory to rename a reused field but it can be usefull in case the input record and the response of the HTTP query have fields with the same name.
+
+### Format
+
+**Format:** Format of the HTTP response. This determines how the response is converted into output records. Possible values are:
+JSON - retrieves all records from the given json path
+and transforms them into records according to the mapping.
+XML - retrieves all records from the given XPath
+and transforms them into records according to the mapping.
+TSV - tab separated values. Columns are mapped to record fields in the order they are
+listed in schema.
+CSV - comma separated values. Columns are mapped to record fields in the order they are
+listed in schema.
+Text - transforms a single line of text into a single record with a string field `body` containing the result.
+BLOB - transforms the entire response into a single record with a byte array field `body` containing the result.
+
+**JSON/XML Result Path:** Path to the results. When the format is XML, this is an XPath. When the format is JSON, this is a JSON path.
+
+JSON path example:
+```
+{
+ "errors": [],
+ "response": {
+ "books": [
+ {
+ "id": "1159142",
+ "title": "Agile Web Development with Rails",
+ "author": "Sam Ruby, Dave Thomas, David Heinemeier Hansson",
+ "printInfo": {
+ "page": 488,
+ "coverType": "hard",
+ "publisher": "Pragmatic Bookshelf"
+ }
+ },
+ {
+ "id": "2375753",
+ "title": "Flask Web Development",
+ "author": "Miguel Grinberg",
+ "printInfo": {
+ "page": 543,
+ "coverType": "hard",
+ "publisher": "O'Reilly Media, Inc"
+ }
+ },
+ {
+ "id": "547307",
+ "title": "Alex Homer, ASP.NET 2.0 Visual Web Developer 2005",
+ "author": "David Sussman",
+ "printInfo": {
+ "page": 543,
+ "coverType": "hard",
+ "publisher": "unknown"
+ }
+ }
+ ]
+ }
+}
+ ```
+Json path to fetch books is `/response/books`. However if we need to fetch only `printInfo` we can specify
+`/response/books/printInfo` as well.
+
+XPath example:
+```
+
+
+
+
+ Everyday Italian
+ Giada De Laurentiis
+ 2005
+
+ 15.0
+ Discount up to 50%
+
+
+
+ XQuery Kick Start
+ James McGovern
+ Per Bothner
+ 2003
+
+ 49.99
+ No discount
+
+
+ ...
+
+
+ ...
+
+
+```
+
+XPath to fetch all books is `/bookstores/bookstore/book`. However a more precise selections can be done. E.g.
+`/bookstores/bookstore/book[@category='web']`.
+
+**JSON/XML Fields Mapping:** Mapping of fields in a record to fields in retrieved element. The left column contains the
+name of schema field. The right column contains path to it within a relative to an element. It can be either XPath or
+JSON path.
+
+Example response:
+```
+{
+ "startAt":1,
+ "maxResults":5,
+ "total":15599,
+ "issues":[
+ {
+ "id":"20276",
+ "key":"NETTY-14",
+ "fields":{
+ "issuetype":{
+ "name":"Bug",
+ "subtask":false
+ },
+ "fixVersions":[
+ "4.1.37"
+ ],
+ "description":"Test description for NETTY-14",
+ "project":{
+ "id":"10301",
+ "key":"NETTY",
+ "name":"Netty-HTTP",
+ "projectCategory":{
+ "id":"10002",
+ "name":"Infrastructure"
+ }
+ }
+ }
+ },
+ {
+ "id":"19124",
+ "key":"NETTY-13",
+ "fields":{
+ "issuetype":{
+ "self":"https://issues.cask.co/rest/api/2/issuetype/4",
+ "name":"Improvement",
+ "subtask":false
+ },
+ "fixVersions":[
+
+ ],
+ "description":"Test description for NETTY-13",
+ "project":{
+ "id":"10301",
+ "key":"NETTY",
+ "name":"Netty-HTTP",
+ "projectCategory":{
+ "id":"10002",
+ "name":"Infrastructure"
+ }
+ }
+ }
+ }
+ ]
+}
+```
+
+Assume the result path is `/issues`.
+
+The mapping is:
+
+| Field Name | Field Path |
+| --------------- |:-----------------------------------------:|
+| type | /fields/issuetype/name |
+| description | /fields/description |
+| projectCategory | /fields/project/projectCategory/name |
+| isSubtask | /fields/issuetype/subtask |
+| fixVersions | /fields/fixVersions |
+
+The result records are:
+
+| key | type | isSubtask | description | projectCategory | fixVersions |
+| -------- | ----------- | --------- | ----------------------------- | ---------------- | ----------- |
+| NETTY-14 | Bug | false | Test description for NETTY-14 | Infrastructure | ["4.1.37"] |
+| NETTY-13 | Improvement | false | Test description for NETTY-13 | Infrastructure | [] |
+
+Note, that field `key` was mapped without being included into the mapping. Mapping entries like `key: /key`
+can be omitted as long as the field is present in schema.
+
+
+**CSV Skip First Row:** Whether to skip the first row of the HTTP response. This is usually set if the first row is a header row.
+
+### Basic Authentication
+
+**Username:** Username for basic authentication.
+
+**Password:** Password for basic authentication.
+
+### HTTP Proxy
+
+**Proxy URL:** Proxy URL. Must contain a protocol, address and port.
+
+**Username:** Proxy username.
+
+**Password:** Proxy password.
+
+### Error Handling
+
+**HTTP Errors Handling:** Defines the error handling strategy to use for certain HTTP response codes.
+The left column contains a regular expression for HTTP status code. The right column contains an action which
+is done in case of match. If HTTP status code matches multiple regular expressions, the first specified in mapping
+is matched.
+
+Example:
+
+| HTTP Code Regexp | Error Handling |
+| ----------------- |:-----------------------:|
+| 2.. | Success |
+| 401 | Retry and fail |
+| 4.. | Fail |
+| 5.. | Retry and skip |
+| .* | Fail |
+
+Note: pagination types "Link in response header", "Link in response body", "Token in response body" do not support
+"Skip", "Retry and skip" options.
+
+**Non-HTTP Error Handling:** Error handling strategy to use when the HTTP response cannot be transformed to an output record.
+Possible values are:
+Stop on error - Fails pipeline due to erroneous record.
+Send to error - Sends erroneous record's text to error port and continues.
+Skip on error - Ignores erroneous records.
+
+**Retry Policy:** Policy used to calculate delay between retries.
+
+**Linear Retry Interval:** Interval between retries. Is only used if retry policy is "linear".
+
+**Max Retry Duration:** Maximum time in seconds retries can take.
+
+**Connect Timeout:** Maximum time in seconds connection initialization is allowed to take.
+
+**Read Timeout:** Maximum time in seconds fetching data from the server is allowed to take.
+
+### Pagination
+
+**Pagination Type:** Strategy used to determine how to get next page.
+
+**Wait Time Between Pages:** Time in milliseconds to wait between HTTP requests for the next page.
+
+
+##### Pagination type: None
+Only single page is loaded.
+
+##### Pagination type: Link in response header
+In response there is a "Link" header, which contains an url marked as "next". Example:
+```
+; rel="first",
+; rel="next",
+; rel="last"`
+```
+
+
+##### Pagination type: Link in response body
+Every page contains a next page url. This pagination type is only supported for JSON and XML formats.
+Pagination happens until no next page field is present or until page contains no elements.
+
+**Next Page JSON/XML Field Path:** A JSON path or an XPath to a field which contains next page url.
+It can be either relative or absolute url.
+
+Example page response:
+```
+{
+ "results": [
+ ...
+ ]
+ "_links": {
+ "self": "https://confluence.atlassian.com/rest/api/space/ADMINJIRASERVER0710/content/page",
+ "next": "/rest/api/space/ADMINJIRASERVER0710/content/page?limit=100&start=100",
+ "base": "https://confluence.atlassian.com",
+ "context": ""
+ }
+}
+```
+Next page field path is `_links/next`.
+
+##### Pagination type: Token in response body
+Every page contains a token, which is appended as an url parameter to obtain next page.
+This type of pagination is only supported for JSON and XML formats. Pagination happens until no next page
+token is present on the page or until page contains no elements.
+
+**Next Page Token Path:** A JSON path or an XPath to a field which contains next page token.
+
+**Next Page Url Parameter:** A parameter which is appended to url in order to specify next page token.
+
+Example plugin config:
+```
+{
+ "url": "https://www.googleapis.com/youtube/v3/search?part=snippet&maxResults=20&q=cask+cdap",
+ "resultPath": "/items"
+ "paginationType": "Token in response body",
+ "nextPageTokenPath": "/nextPageToken",
+ "nextPageUrlParameter": "pageToken"
+}
+```
+
+First page response:
+```
+{
+ "nextPageToken": "CAEQAA",
+ "pageInfo": {
+ "totalResults": 208,
+ "resultsPerPage": 2
+ },
+ "items": [
+ ...
+ ]
+}
+```
+Next page fetched by plugin will be url with `&pageToken=CAEQAA` appended.
+
+##### Pagination type: Increment an index
+Pagination by incrementing a {pagination.index} placeholder value in url. For this pagination type url is required
+to contain above placeholder.
+
+**Start Index:** Start value of {pagination.index} placeholder
+
+**Max Index:** Maximum value of {pagination.index} placeholder. If empty, pagination will happen until the page with
+no elements.
+
+**Index Increment:** A value which the {pagination.index} placeholder is incremented by. Increment can be negative.
+
+##### Pagination type: Custom
+Pagination using user provided code. The code decides how to retrieve a next page url based on previous page contents
+and headers and when to finish pagination.
+
+**Custom Pagination Python Code:** A code which implements retrieving
+a next page url based on previous page contents and headers.
+
+Example code:
+```
+import json
+
+def get_next_page_url(url, page, headers):
+ """
+ Based on previous page data generates next page url, when "Custom pagination" is enabled.
+
+ Args:
+ url (string): previous page url
+ page (string): a body of previous page
+ headers (dict): a dictionary of headers from previous page
+
+ """
+ page_json = json.loads(page)
+ next_page_num = page_json['nextpage']
+
+ # stop the iteration
+ if next_page_num == None or next_page_num > 5:
+ return None
+
+ return "https://searchcode.com/api/codesearch_I/?q=curl&p={}".format(next_page_num)
+```
+The above code iterates over first five pages of searchcode.com results. When 'None' is returned the iteration
+is stopped.
+
+### OAuth2
+
+**OAuth2 Enabled:** If true, plugin will perform OAuth2 authentication.
+
+**Auth URL:** Endpoint for the authorization server used to retrieve the authorization code.
+
+**Token URL:** Endpoint for the resource server, which exchanges the authorization code for an access token.
+
+**Client ID:** Client identifier obtained during the Application registration process.
+
+**Client Secret:** Client secret obtained during the Application registration process.
+
+**Scopes:** Scope of the access request, which might have multiple space-separated values.
+
+**Refresh Token:** Token used to receive accessToken, which is end product of OAuth2.
+
+### SSL/TLS
+
+**Verify HTTPS Trust Certificates:** If false, untrusted trust certificates (e.g. self signed), will not lead to an
+error. Do not disable this in production environment on a network you do not entirely trust. Especially public internet.
+
+**Keystore File:** A path to a file which contains keystore.
+
+**Keystore Type:** Format of a keystore.
+
+**Keystore Password:** Password for a keystore. If a keystore is not password protected leave it empty.
+
+**Keystore Key Algorithm:** An algorithm used for keystore.
+
+**TrustStore File:** A path to a file which contains truststore.
+
+**TrustStore Type:** Format of a truststore.
+
+**TrustStore Password:** Password for a truststore. If a truststore is not password protected leave it empty.
+
+**TrustStore Key Algorithm:** An algorithm used for truststore.
+
+**Transport Protocols:** Transport protocols which are allowed for connection.
+
+**Cipher Suites:** Cipher suites which are allowed for connection.
+Colons, commas or spaces are also acceptable separators.
+
+**Schema:** Output schema. Is required to be set.
diff --git a/icons/HTTP-transform.png b/icons/HTTP-transform.png
new file mode 100644
index 00000000..933bf62b
Binary files /dev/null and b/icons/HTTP-transform.png differ
diff --git a/pom.xml b/pom.xml
index d49ef2e5..198f44a7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -82,7 +82,7 @@
2.8.5
2.3.0
4.5.9
- 2.4.0-SNAPSHOT
+ 2.6.0
2.9.9
4.11
2.7.1
diff --git a/src/main/java/io/cdap/plugin/http/source/batch/HttpRecordReader.java b/src/main/java/io/cdap/plugin/http/source/batch/HttpRecordReader.java
index 993c20f4..edba1dbf 100644
--- a/src/main/java/io/cdap/plugin/http/source/batch/HttpRecordReader.java
+++ b/src/main/java/io/cdap/plugin/http/source/batch/HttpRecordReader.java
@@ -36,9 +36,9 @@
*/
public class HttpRecordReader extends RecordReader {
private static final Logger LOG = LoggerFactory.getLogger(HttpRecordReader.class);
- private static final Gson gson = new GsonBuilder().create();
+ protected static final Gson GSON = new GsonBuilder().create();
- private BaseHttpPaginationIterator httpPaginationIterator;
+ protected BaseHttpPaginationIterator httpPaginationIterator;
private BasePage value;
/**
@@ -51,7 +51,7 @@ public class HttpRecordReader extends RecordReader {
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) {
Configuration conf = taskAttemptContext.getConfiguration();
String configJson = conf.get(HttpInputFormatProvider.PROPERTY_CONFIG_JSON);
- HttpBatchSourceConfig httpBatchSourceConfig = gson.fromJson(configJson, HttpBatchSourceConfig.class);
+ HttpBatchSourceConfig httpBatchSourceConfig = GSON.fromJson(configJson, HttpBatchSourceConfig.class);
httpPaginationIterator = PaginationIteratorFactory.createInstance(httpBatchSourceConfig, null);
}
diff --git a/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java b/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java
index a554dd6c..289f7860 100644
--- a/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java
+++ b/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java
@@ -671,8 +671,16 @@ public List getTransportProtocolsList() {
}
public void validate() {
+ validate(true, true);
+ }
+
+ public void validate(boolean validateURL) {
+ validate(validateURL, true);
+ }
+
+ public void validate(boolean validateURL, boolean validateErrorHandling) {
// Validate URL
- if (!containsMacro(PROPERTY_URL)) {
+ if (validateURL && !containsMacro(PROPERTY_URL)) {
try {
// replace with placeholder with anything just during pagination
new URI(getUrl().replaceAll(PAGINATION_INDEX_PLACEHOLDER_REGEX, "0"));
@@ -683,7 +691,7 @@ public void validate() {
}
// Validate HTTP Error Handling Map
- if (!containsMacro(PROPERTY_HTTP_ERROR_HANDLING)) {
+ if (validateErrorHandling && !containsMacro(PROPERTY_HTTP_ERROR_HANDLING)) {
List httpErrorsHandlingEntries = getHttpErrorHandlingEntries();
boolean supportsSkippingPages = PaginationIteratorFactory
.createInstance(this, null).supportsSkippingPages();
@@ -774,9 +782,7 @@ PAGINATION_INDEX_PLACEHOLDER, getPaginationType()),
String reasonFormat = String.format("page format is '%s'", getFormat());
if (getFormat().equals(PageFormat.JSON) || getFormat().equals(PageFormat.XML)) {
- if (!getFormat().equals(PageFormat.JSON)) {
- assertIsSet(getResultPath(), PROPERTY_RESULT_PATH, reasonFormat);
- }
+ assertIsSet(getResultPath(), PROPERTY_RESULT_PATH, reasonFormat);
getFullFieldsMapping(); // can be null, but call getter to verify correctness of regexps
} else {
assertIsNotSet(getResultPath(), PROPERTY_RESULT_PATH, reasonFormat);
@@ -861,15 +867,20 @@ public static List getListFromString(String value) {
}
public static Map getMapFromKeyValueString(String keyValueString) {
+ return getMapFromKeyValueString(keyValueString, ",", ":");
+ }
+
+ public static Map getMapFromKeyValueString(String keyValueString, String delimiter,
+ String kvDelimiter) {
Map result = new LinkedHashMap<>();
if (Strings.isNullOrEmpty(keyValueString)) {
return result;
}
- String[] mappings = keyValueString.split(",");
+ String[] mappings = keyValueString.split(delimiter);
for (String map : mappings) {
- String[] columns = map.split(":");
+ String[] columns = map.split(kvDelimiter);
result.put(columns[0], columns[1]);
}
return result;
diff --git a/src/main/java/io/cdap/plugin/http/source/common/pagination/BaseHttpPaginationIterator.java b/src/main/java/io/cdap/plugin/http/source/common/pagination/BaseHttpPaginationIterator.java
index ece34866..8c75dd97 100644
--- a/src/main/java/io/cdap/plugin/http/source/common/pagination/BaseHttpPaginationIterator.java
+++ b/src/main/java/io/cdap/plugin/http/source/common/pagination/BaseHttpPaginationIterator.java
@@ -60,9 +60,9 @@ public abstract class BaseHttpPaginationIterator implements Iterator,
private Integer httpStatusCode;
private HttpResponse response;
- public BaseHttpPaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state) {
+ public BaseHttpPaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state, HttpClient httpClient) {
this.config = config;
- this.httpClient = new HttpClient(config);
+ this.httpClient = httpClient;
this.nextPageUrl = config.getUrl();
this.httpErrorHandler = new HttpErrorHandler(config);
diff --git a/src/main/java/io/cdap/plugin/http/source/common/pagination/CustomPaginationIterator.java b/src/main/java/io/cdap/plugin/http/source/common/pagination/CustomPaginationIterator.java
index ad1e8323..18dfef57 100644
--- a/src/main/java/io/cdap/plugin/http/source/common/pagination/CustomPaginationIterator.java
+++ b/src/main/java/io/cdap/plugin/http/source/common/pagination/CustomPaginationIterator.java
@@ -16,6 +16,7 @@
package io.cdap.plugin.http.source.common.pagination;
import io.cdap.plugin.http.source.common.BaseHttpSourceConfig;
+import io.cdap.plugin.http.source.common.http.HttpClient;
import io.cdap.plugin.http.source.common.http.HttpResponse;
import io.cdap.plugin.http.source.common.pagination.page.BasePage;
import io.cdap.plugin.http.source.common.pagination.state.PaginationIteratorState;
@@ -32,8 +33,8 @@
public class CustomPaginationIterator extends BaseHttpPaginationIterator {
private final JythonPythonExecutor pythonExecutor;
- public CustomPaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state) {
- super(config, state);
+ public CustomPaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state, HttpClient httpClient) {
+ super(config, state, httpClient);
pythonExecutor = new JythonPythonExecutor(config.getCustomPaginationCode());
pythonExecutor.initialize();
}
diff --git a/src/main/java/io/cdap/plugin/http/source/common/pagination/IncrementAnIndexPaginationIterator.java b/src/main/java/io/cdap/plugin/http/source/common/pagination/IncrementAnIndexPaginationIterator.java
index 69084fad..51c4b7e8 100644
--- a/src/main/java/io/cdap/plugin/http/source/common/pagination/IncrementAnIndexPaginationIterator.java
+++ b/src/main/java/io/cdap/plugin/http/source/common/pagination/IncrementAnIndexPaginationIterator.java
@@ -16,6 +16,7 @@
package io.cdap.plugin.http.source.common.pagination;
import io.cdap.plugin.http.source.common.BaseHttpSourceConfig;
+import io.cdap.plugin.http.source.common.http.HttpClient;
import io.cdap.plugin.http.source.common.http.HttpResponse;
import io.cdap.plugin.http.source.common.pagination.page.BasePage;
import io.cdap.plugin.http.source.common.pagination.state.IndexPaginationIteratorState;
@@ -36,8 +37,9 @@ public class IncrementAnIndexPaginationIterator extends BaseHttpPaginationIterat
private Long index;
- public IncrementAnIndexPaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state) {
- super(config, state);
+ public IncrementAnIndexPaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state,
+ HttpClient httpClient) {
+ super(config, state, httpClient);
this.indexIncrement = config.getIndexIncrement();
this.maxIndex = config.getMaxIndex();
diff --git a/src/main/java/io/cdap/plugin/http/source/common/pagination/LinkInResponseBodyPaginationIterator.java b/src/main/java/io/cdap/plugin/http/source/common/pagination/LinkInResponseBodyPaginationIterator.java
index cdc815d5..654d3b88 100644
--- a/src/main/java/io/cdap/plugin/http/source/common/pagination/LinkInResponseBodyPaginationIterator.java
+++ b/src/main/java/io/cdap/plugin/http/source/common/pagination/LinkInResponseBodyPaginationIterator.java
@@ -16,6 +16,7 @@
package io.cdap.plugin.http.source.common.pagination;
import io.cdap.plugin.http.source.common.BaseHttpSourceConfig;
+import io.cdap.plugin.http.source.common.http.HttpClient;
import io.cdap.plugin.http.source.common.http.HttpResponse;
import io.cdap.plugin.http.source.common.pagination.page.BasePage;
import io.cdap.plugin.http.source.common.pagination.state.PaginationIteratorState;
@@ -34,10 +35,13 @@ public class LinkInResponseBodyPaginationIterator extends BaseHttpPaginationIter
private final String address;
- public LinkInResponseBodyPaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state) {
- super(config, state);
+ boolean isMultiQuery;
+ public LinkInResponseBodyPaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state,
+ HttpClient httpClient, boolean isMultiQuery) {
+ super(config, state, httpClient);
URI uri = URI.create(config.getUrl());
this.address = uri.getScheme() + "://" + uri.getAuthority();
+ this.isMultiQuery = isMultiQuery;
}
@Override
@@ -62,6 +66,6 @@ protected String getNextPageUrl(HttpResponse response, BasePage page) {
@Override
public boolean supportsSkippingPages() {
- return false;
+ return isMultiQuery;
}
}
diff --git a/src/main/java/io/cdap/plugin/http/source/common/pagination/LinkInResponseHeaderPaginationIterator.java b/src/main/java/io/cdap/plugin/http/source/common/pagination/LinkInResponseHeaderPaginationIterator.java
index 5e59b353..6253d8a9 100644
--- a/src/main/java/io/cdap/plugin/http/source/common/pagination/LinkInResponseHeaderPaginationIterator.java
+++ b/src/main/java/io/cdap/plugin/http/source/common/pagination/LinkInResponseHeaderPaginationIterator.java
@@ -16,6 +16,7 @@
package io.cdap.plugin.http.source.common.pagination;
import io.cdap.plugin.http.source.common.BaseHttpSourceConfig;
+import io.cdap.plugin.http.source.common.http.HttpClient;
import io.cdap.plugin.http.source.common.http.HttpResponse;
import io.cdap.plugin.http.source.common.pagination.page.BasePage;
import io.cdap.plugin.http.source.common.pagination.state.PaginationIteratorState;
@@ -34,8 +35,9 @@ public class LinkInResponseHeaderPaginationIterator extends BaseHttpPaginationIt
private static final Logger LOG = LoggerFactory.getLogger(LinkInResponseHeaderPaginationIterator.class);
private static final Pattern nextLinkPattern = Pattern.compile("<(.+)>; rel=next");
- public LinkInResponseHeaderPaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state) {
- super(config, state);
+ public LinkInResponseHeaderPaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state,
+ HttpClient httpClient) {
+ super(config, state, httpClient);
}
@Override
diff --git a/src/main/java/io/cdap/plugin/http/source/common/pagination/NonePaginationIterator.java b/src/main/java/io/cdap/plugin/http/source/common/pagination/NonePaginationIterator.java
index 80f48e3e..1920bd98 100644
--- a/src/main/java/io/cdap/plugin/http/source/common/pagination/NonePaginationIterator.java
+++ b/src/main/java/io/cdap/plugin/http/source/common/pagination/NonePaginationIterator.java
@@ -16,6 +16,7 @@
package io.cdap.plugin.http.source.common.pagination;
import io.cdap.plugin.http.source.common.BaseHttpSourceConfig;
+import io.cdap.plugin.http.source.common.http.HttpClient;
import io.cdap.plugin.http.source.common.http.HttpResponse;
import io.cdap.plugin.http.source.common.pagination.page.BasePage;
import io.cdap.plugin.http.source.common.pagination.state.PaginationIteratorState;
@@ -28,8 +29,12 @@
public class NonePaginationIterator extends BaseHttpPaginationIterator {
private static final Logger LOG = LoggerFactory.getLogger(NonePaginationIterator.class);
- public NonePaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state) {
- super(config, state);
+ boolean isMultiQuery;
+
+ public NonePaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state, HttpClient httpClient,
+ boolean isMultiQuery) {
+ super(config, state, httpClient);
+ this.isMultiQuery = isMultiQuery;
}
@Override
@@ -39,6 +44,6 @@ protected String getNextPageUrl(HttpResponse response, BasePage page) {
@Override
public boolean supportsSkippingPages() {
- return false;
+ return isMultiQuery;
}
}
diff --git a/src/main/java/io/cdap/plugin/http/source/common/pagination/PaginationIteratorFactory.java b/src/main/java/io/cdap/plugin/http/source/common/pagination/PaginationIteratorFactory.java
index c2cad0de..e3158d63 100644
--- a/src/main/java/io/cdap/plugin/http/source/common/pagination/PaginationIteratorFactory.java
+++ b/src/main/java/io/cdap/plugin/http/source/common/pagination/PaginationIteratorFactory.java
@@ -16,6 +16,7 @@
package io.cdap.plugin.http.source.common.pagination;
import io.cdap.plugin.http.source.common.BaseHttpSourceConfig;
+import io.cdap.plugin.http.source.common.http.HttpClient;
import io.cdap.plugin.http.source.common.pagination.state.PaginationIteratorState;
/**
@@ -23,23 +24,34 @@
* the input config.
*/
public class PaginationIteratorFactory {
- public static BaseHttpPaginationIterator createInstance(BaseHttpSourceConfig config, PaginationIteratorState state) {
+ public static BaseHttpPaginationIterator createInstance(BaseHttpSourceConfig config, PaginationIteratorState state,
+ boolean isMultiQuery,
+ HttpClient httpClient) {
switch (config.getPaginationType()) {
case NONE:
- return new NonePaginationIterator(config, state);
+ return new NonePaginationIterator(config, state, httpClient, isMultiQuery);
case LINK_IN_RESPONSE_HEADER:
- return new LinkInResponseHeaderPaginationIterator(config, state);
+ return new LinkInResponseHeaderPaginationIterator(config, state, httpClient);
case LINK_IN_RESPONSE_BODY:
- return new LinkInResponseBodyPaginationIterator(config, state);
+ return new LinkInResponseBodyPaginationIterator(config, state, httpClient, isMultiQuery);
case TOKEN_IN_RESPONSE_BODY:
- return new TokenPaginationIterator(config, state);
+ return new TokenPaginationIterator(config, state, httpClient, isMultiQuery);
case INCREMENT_AN_INDEX:
- return new IncrementAnIndexPaginationIterator(config, state);
+ return new IncrementAnIndexPaginationIterator(config, state, httpClient);
case CUSTOM:
- return new CustomPaginationIterator(config, state);
+ return new CustomPaginationIterator(config, state, httpClient);
default:
throw new IllegalArgumentException(
- String.format("Unsupported pagination type: '%s'", config.getPaginationType()));
+ String.format("Unsupported pagination type: '%s'", config.getPaginationType()));
}
}
+
+ public static BaseHttpPaginationIterator createInstance(BaseHttpSourceConfig config, PaginationIteratorState state,
+ HttpClient httpClient) {
+ return createInstance(config, state, false, httpClient);
+ }
+
+ public static BaseHttpPaginationIterator createInstance(BaseHttpSourceConfig config, PaginationIteratorState state) {
+ return createInstance(config, state, false, new HttpClient(config));
+ }
}
diff --git a/src/main/java/io/cdap/plugin/http/source/common/pagination/TokenPaginationIterator.java b/src/main/java/io/cdap/plugin/http/source/common/pagination/TokenPaginationIterator.java
index d3f88041..cf242a34 100644
--- a/src/main/java/io/cdap/plugin/http/source/common/pagination/TokenPaginationIterator.java
+++ b/src/main/java/io/cdap/plugin/http/source/common/pagination/TokenPaginationIterator.java
@@ -16,6 +16,7 @@
package io.cdap.plugin.http.source.common.pagination;
import io.cdap.plugin.http.source.common.BaseHttpSourceConfig;
+import io.cdap.plugin.http.source.common.http.HttpClient;
import io.cdap.plugin.http.source.common.http.HttpResponse;
import io.cdap.plugin.http.source.common.pagination.page.BasePage;
import io.cdap.plugin.http.source.common.pagination.state.PaginationIteratorState;
@@ -33,8 +34,12 @@
public class TokenPaginationIterator extends BaseHttpPaginationIterator {
private static final Logger LOG = LoggerFactory.getLogger(TokenPaginationIterator.class);
- public TokenPaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state) {
- super(config, state);
+ boolean isMultiQuery;
+
+ public TokenPaginationIterator(BaseHttpSourceConfig config, PaginationIteratorState state, HttpClient httpClient
+ , boolean isMultiQuery) {
+ super(config, state, httpClient);
+ this.isMultiQuery = isMultiQuery;
}
@Override
@@ -56,6 +61,6 @@ protected String getNextPageUrl(HttpResponse response, BasePage page) {
@Override
public boolean supportsSkippingPages() {
- return false;
+ return isMultiQuery;
}
}
diff --git a/src/main/java/io/cdap/plugin/http/transform/DynamicHttpRecordReader.java b/src/main/java/io/cdap/plugin/http/transform/DynamicHttpRecordReader.java
new file mode 100644
index 00000000..dd28ee97
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/http/transform/DynamicHttpRecordReader.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package io.cdap.plugin.http.transform;
+
+import io.cdap.plugin.http.source.common.http.HttpClient;
+import io.cdap.plugin.http.source.common.pagination.BaseHttpPaginationIterator;
+import io.cdap.plugin.http.source.common.pagination.PaginationIteratorFactory;
+import io.cdap.plugin.http.source.common.pagination.page.BasePage;
+
+import java.io.IOException;
+
+/**
+ * RecordReader implementation, which reads text records representations and http codes
+ * using {@link BaseHttpPaginationIterator} subclasses.
+ */
+public class DynamicHttpRecordReader {
+ protected BaseHttpPaginationIterator httpPaginationIterator;
+ private BasePage value;
+
+ public DynamicHttpRecordReader(DynamicHttpTransformConfig dynamicHttpTransformConfig, HttpClient httpClient) {
+ httpPaginationIterator = PaginationIteratorFactory.createInstance(dynamicHttpTransformConfig, null,
+ true, httpClient);
+ }
+
+ public boolean nextKeyValue() {
+ if (!httpPaginationIterator.hasNext()) {
+ return false;
+ }
+ value = httpPaginationIterator.next();
+ return true;
+ }
+
+ public BasePage getCurrentValue() {
+ return value;
+ }
+
+ public void close() throws IOException {
+ if (httpPaginationIterator != null) {
+ httpPaginationIterator.close();
+ }
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransform.java b/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransform.java
new file mode 100644
index 00000000..bc83a59b
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransform.java
@@ -0,0 +1,200 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package io.cdap.plugin.http.transform;
+
+import com.google.common.util.concurrent.RateLimiter;
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.annotation.Plugin;
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.etl.api.Emitter;
+import io.cdap.cdap.etl.api.InvalidEntry;
+import io.cdap.cdap.etl.api.PipelineConfigurer;
+import io.cdap.cdap.etl.api.Transform;
+import io.cdap.cdap.etl.api.TransformContext;
+import io.cdap.plugin.http.source.common.BaseHttpSourceConfig;
+import io.cdap.plugin.http.source.common.error.ErrorHandling;
+import io.cdap.plugin.http.source.common.error.HttpErrorHandler;
+import io.cdap.plugin.http.source.common.http.HttpClient;
+import io.cdap.plugin.http.source.common.http.HttpResponse;
+import io.cdap.plugin.http.source.common.pagination.page.BasePage;
+import io.cdap.plugin.http.source.common.pagination.page.PageEntry;
+import io.cdap.plugin.http.source.common.pagination.page.PageFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Plugin returns records from HTTP source specified by link. Pagination via APIs is supported.
+ */
+@Plugin(type = Transform.PLUGIN_TYPE)
+@Name(DynamicHttpTransform.NAME)
+@Description("Read data from HTTP endpoint that changes dynamically depending on inputs data.")
+public class DynamicHttpTransform extends Transform {
+ private static final Logger LOG = LoggerFactory.getLogger(DynamicHttpTransform.class);
+ static final String NAME = "HTTP";
+
+ private final DynamicHttpTransformConfig config;
+ private RateLimiter rateLimiter;
+ private final HttpClient httpClient;
+ private final HttpErrorHandler httpErrorHandler;
+
+ private List reusedInputs;
+
+ private Map reusedInputsNameMap;
+
+ // This is a reverse map compared to config.getReusedInputsNameMap()
+ // reversedReusedInputsNameMap uses the name of the field in the output schema as Key and
+ // the name of the field in the input schema as Value.
+ // In config.getReusedInputsNameMap(), the Key is the name of the field in the input schema,
+ // and the Value is the name of the field in the output schema (which is more intuitive for the user)
+ // Since the uniqueness of values is checked in configuration validation,
+ // this should not pause any problem and will be more efficient.
+ private Map reversedReusedInputsNameMap;
+
+ private Schema outputSchema;
+
+ /**
+ * Constructor used by Data Fusion
+ *
+ * @param config the plugin configuration
+ */
+ public DynamicHttpTransform(DynamicHttpTransformConfig config) {
+ this(config, new HttpClient(config));
+ }
+
+ /**
+ * Constructor used in unit tests
+ *
+ * @param config the plugin configuration
+ * @param httpClient the http client
+ */
+ public DynamicHttpTransform(DynamicHttpTransformConfig config, HttpClient httpClient) {
+ this.config = config;
+ this.httpClient = httpClient;
+ if (config.throttlingEnabled()) {
+ this.rateLimiter = RateLimiter.create(this.config.maxCallPerSeconds);
+ }
+ this.httpErrorHandler = new HttpErrorHandler(config);
+
+ this.reusedInputs = config.getReusedInputs();
+ this.reusedInputsNameMap = config.getReusedInputsNameMap();
+ this.reversedReusedInputsNameMap = new HashMap<>();
+ for (Map.Entry e: reusedInputsNameMap.entrySet()) {
+ this.reversedReusedInputsNameMap.put(e.getValue(), e.getKey());
+ }
+ this.outputSchema = config.getOutputSchema();
+ }
+
+ @Override
+ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
+ config.validate(pipelineConfigurer.getStageConfigurer().getInputSchema());
+ // validate when macros not yet substituted
+ config.validateSchema();
+
+ pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getOutputSchema());
+ }
+
+ @Override
+ public void initialize(TransformContext context) throws Exception {
+ super.initialize(context);
+ }
+
+ @Override
+ public void transform(StructuredRecord input, Emitter emitter) {
+ // Replace placeholders in URL
+ String url = config.getBaseUrl();
+ for (Map.Entry e : config.getUrlVariablesMap().entrySet()) {
+ String valueToUse = input.get(e.getValue());
+ if (valueToUse != null) {
+ String placeholder = "{" + e.getKey() + "}";
+ if (!url.contains(placeholder)) {
+ LOG.warn("Placeholder " + placeholder + " not found in url " + url);
+ } else {
+ url = url.replace(placeholder, valueToUse);
+ }
+ } else {
+ emitter.emitError(new InvalidEntry<>(
+ -1, "Cannot find required field " + e.getValue(), input));
+ }
+ }
+
+ config.setProcessedURL(url);
+
+ if (config.throttlingEnabled()) {
+ rateLimiter.acquire(); // Throttle
+ }
+
+ DynamicHttpRecordReader reader = new DynamicHttpRecordReader(config, httpClient);
+ while (reader.nextKeyValue()) {
+ BasePage page = reader.getCurrentValue();
+
+ while (page.hasNext()) {
+ PageEntry pageEntry = page.next();
+
+ if (!pageEntry.isError()) {
+ StructuredRecord retrievedDataRecord = pageEntry.getRecord();
+ StructuredRecord.Builder builder = StructuredRecord.builder(outputSchema);
+
+ for (Schema.Field f : outputSchema.getFields()) {
+ String fieldName = f.getName();
+ Object fieldValue;
+
+ if (Util.isReusedField(f.getName(), reusedInputs, reusedInputsNameMap)) {
+ fieldValue = getReusedFieldValue(input, fieldName);
+ } else {
+ fieldValue = retrievedDataRecord.get(fieldName);
+ }
+ builder.set(fieldName, fieldValue);
+ }
+
+ emitter.emit(builder.build());
+ } else {
+ emitter.emitError(pageEntry.getError());
+ }
+ }
+ }
+ }
+
+ BasePage createPageInstance(BaseHttpSourceConfig config, HttpResponse httpResponse,
+ ErrorHandling postRetryStrategy) throws IOException {
+ return PageFactory.createInstance(config, httpResponse, httpErrorHandler,
+ !postRetryStrategy.equals(ErrorHandling.SUCCESS));
+ }
+
+ /**
+ * Retrieve the given field in the given record handling the case the field is reused.
+ * If the field name have been mapped, retrieve the original field name from the mapping and
+ * retrieve the field value associated to this name from record,
+ * Else, just retrieve the field value associated to the given fieldName
+ * @param inputRecord the record
+ * @param fieldName the field
+ * @return the field value
+ */
+ public Object getReusedFieldValue(StructuredRecord inputRecord, String fieldName) {
+ if (reversedReusedInputsNameMap.containsKey(fieldName)) {
+ String fieldNameInInput = reversedReusedInputsNameMap.get(fieldName);
+ return inputRecord.get(fieldNameInInput);
+ } else {
+ return inputRecord.get(fieldName);
+ }
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransformConfig.java b/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransformConfig.java
new file mode 100644
index 00000000..d3716d9b
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/http/transform/DynamicHttpTransformConfig.java
@@ -0,0 +1,180 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package io.cdap.plugin.http.transform;
+
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Macro;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.plugin.http.source.common.BaseHttpSourceConfig;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+/**
+ * Provides all the configurations required for configuring the plugin.
+ */
+public class DynamicHttpTransformConfig extends BaseHttpSourceConfig {
+ public static final String PROPERTY_URL_VARIABLES = "urlVariables";
+ public static final String PROPERTY_REUSED_INPUTS = "reusedInputs";
+ public static final String PROPERTY_RENAME_REUSED_INPUTS = "renameReusedInputs";
+ public static final String PROPERTY_MAX_CALL_PER_SECONDS = "maxCallPerSeconds";
+
+ @Name(PROPERTY_URL_VARIABLES)
+ @Nullable
+ @Description("Variables used to dynamically construct the URL through placeholder. " +
+ "Use the placeholder name as Key, and the field from input schema you want to use as Value.")
+ @Macro
+ protected String urlVariables;
+
+ @Nullable
+ @Name(PROPERTY_REUSED_INPUTS)
+ @Description("List of fields from inputSchema that will be added to the output schema. " +
+ "If left empty, the output record will contains only the result of the HTTP " +
+ "query and the input record will be lost." +
+ "If a field reused from inputSchema has the same name as a field in the output schema, " +
+ "use the \"Rename Reused Input Fields\" to rename the field.")
+ @Macro
+ protected String reusedInputs;
+
+ @Nullable
+ @Name(PROPERTY_RENAME_REUSED_INPUTS)
+ @Description("Rename a reused field from input schema. This should be used when a reused field " +
+ "from imput schema have the same name as a field from output schema.")
+ @Macro
+ protected String renameReusedInputs;
+
+ @Nullable
+ @Name(PROPERTY_MAX_CALL_PER_SECONDS)
+ @Description("The maximum number of call made per seconds. 0 = throttling disabled")
+ @Macro
+ protected int maxCallPerSeconds;
+
+
+ // processedURL is static so that it does not appears in plugin configuration panel
+ private static String processedURL;
+
+ public String getBaseUrl() {
+ return url;
+ }
+ @Override
+ public String getUrl() {
+ return processedURL;
+ }
+
+ public void setProcessedURL(String processedURL) {
+ this.processedURL = processedURL;
+ }
+
+ protected DynamicHttpTransformConfig(String referenceName) {
+ super(referenceName);
+ }
+
+ public void validate(Schema inputSchema) {
+ super.validate(false, false);
+
+ // Check that the needed fields exists in input schema
+ Map urlVariableMap = getUrlVariablesMap();
+ for (String value: urlVariableMap.values()) {
+ if (inputSchema.getField(value) == null) {
+ throw new IllegalArgumentException("Field " + value + " is required in input data schema " +
+ "but wasn't found. Current input schema is : " + inputSchema);
+ }
+ }
+
+ Map reusedInputsNameMap = getReusedInputsNameMap();
+ HashSet inputFields = new HashSet<>();
+ HashSet outputFields = new HashSet<>();
+ for (Map.Entry e: reusedInputsNameMap.entrySet()) {
+ if (inputSchema.getField(e.getKey()) == null) {
+ throw new IllegalArgumentException("Input field " + e.getKey() + " is configured to be renamed " +
+ "but is not present in the inputSchema");
+ }
+ if (getSchema().getField(e.getValue()) != null) {
+ throw new IllegalArgumentException("Input field " + e.getKey() + " is configured to be " +
+ "renamed as " + e.getValue() + " but this field is already present in the outputSchema");
+ }
+
+ if (inputFields.add(e.getKey()) == false) {
+ throw new IllegalArgumentException("Input field " + e.getKey() +
+ " is configured multiple times to be renamed");
+ }
+ if (outputFields.add(e.getValue()) == false) {
+ throw new IllegalArgumentException("Multiple fields configured to be renamed " + e.getValue());
+ }
+ }
+
+
+
+ }
+
+ public boolean throttlingEnabled() {
+ return maxCallPerSeconds > 0;
+ }
+
+
+ public Map getUrlVariablesMap() {
+ return getMapFromKeyValueString(urlVariables, ",", ":");
+ }
+
+
+ public Map getReusedInputsNameMap() {
+ return getMapFromKeyValueString(renameReusedInputs, ",", ":");
+ }
+
+ public List getReusedInputs() {
+ List uniqueFieldList = new ArrayList<>();
+ if (!Strings.isNullOrEmpty(reusedInputs)) {
+ for (String field : Splitter.on(',').trimResults().split(reusedInputs)) {
+ uniqueFieldList.add(field);
+ }
+ }
+ return uniqueFieldList;
+ }
+
+ @Override
+ /**
+ * Return the data schema.
+ * In case no fields are reused from the input schema, this function is equals to getOutput()
+ * Otherwise, this function return the data schema without the reused fields.
+ */
+ public Schema getSchema() {
+ Schema schema = getOutputSchema();
+ List reusedInput = getReusedInputs();
+ if (reusedInput.size() > 0) {
+ Map reusedInputsNameMap = getReusedInputsNameMap();
+
+ List fields = new ArrayList<>();
+ for (Schema.Field f : schema.getFields()) {
+ if (!Util.isReusedField(f.getName(), reusedInput, reusedInputsNameMap)) {
+ fields.add(f);
+ }
+ }
+ schema = Schema.recordOf("record", fields);
+ }
+ return schema;
+ }
+
+
+ public Schema getOutputSchema() {
+ return super.getSchema();
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/http/transform/Util.java b/src/main/java/io/cdap/plugin/http/transform/Util.java
new file mode 100644
index 00000000..298d43e4
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/http/transform/Util.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.http.transform;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utilitary class
+ */
+public class Util {
+ /**
+ * Return true if the field (in output schema) is a reused field (coming from the input schema), false else
+ * @param fieldName the name of the field
+ * @param reusedInputs the list of reused input fields
+ * @param reusedInputsMapping the mapping of reused fields
+ * @return is reused
+ */
+ public static boolean isReusedField(
+ String fieldName,
+ List reusedInputs,
+ Map reusedInputsMapping) {
+ return reusedInputsMapping.containsValue(fieldName) ||
+ (reusedInputs.contains(fieldName) && !reusedInputsMapping.containsKey(fieldName));
+ }
+}
diff --git a/src/test/java/io/cdap/plugin/http/transform/DynamicHttpTransformTest.java b/src/test/java/io/cdap/plugin/http/transform/DynamicHttpTransformTest.java
new file mode 100644
index 00000000..5c57769f
--- /dev/null
+++ b/src/test/java/io/cdap/plugin/http/transform/DynamicHttpTransformTest.java
@@ -0,0 +1,215 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.http.transform;
+
+import com.google.common.base.Joiner;
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.etl.api.Transform;
+import io.cdap.cdap.etl.mock.common.MockEmitter;
+import io.cdap.plugin.http.source.common.http.HttpClient;
+import org.apache.http.HttpEntity;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DynamicHttpTransformTest {
+ // The input schema
+ private static final Schema INPUT_SCHEMA = Schema.recordOf("input",
+ Schema.Field.of("firstName", Schema.nullableOf(Schema.of(Schema.Type.STRING))),
+ Schema.Field.of("lastName", Schema.nullableOf(Schema.of(Schema.Type.STRING))),
+ Schema.Field.of("mail", Schema.of(Schema.Type.STRING)),
+ Schema.Field.of("_id", Schema.of(Schema.Type.STRING))
+ );
+
+ private static final String OUTPUT_SCHEMA = Schema.recordOf("input",
+ Schema.Field.of("firstName", Schema.nullableOf(Schema.of(Schema.Type.STRING))),
+ Schema.Field.of("lastName", Schema.nullableOf(Schema.of(Schema.Type.STRING))),
+ Schema.Field.of("mail", Schema.of(Schema.Type.STRING)),
+ Schema.Field.of("_id", Schema.of(Schema.Type.STRING)),
+ Schema.Field.of("assignedPrograms", Schema.nullableOf(Schema.of(Schema.Type.INT))),
+ Schema.Field.of("averageScore", Schema.nullableOf(Schema.of(Schema.Type.FLOAT))),
+ Schema.Field.of("paths", Schema.nullableOf(Schema.arrayOf(Schema.of(Schema.Type.STRING))))
+ ).toString();
+
+ private static final String OUTPUT_SCHEMA_WITH_REUSED = Schema.recordOf("input",
+ Schema.Field.of("firstName", Schema.nullableOf(Schema.of(Schema.Type.STRING))),
+ Schema.Field.of("lastName", Schema.nullableOf(Schema.of(Schema.Type.STRING))),
+ Schema.Field.of("mail", Schema.of(Schema.Type.STRING)),
+ Schema.Field.of("renamed_mail", Schema.of(Schema.Type.STRING)),
+ Schema.Field.of("renamed_id", Schema.of(Schema.Type.STRING)),
+ Schema.Field.of("_id", Schema.of(Schema.Type.STRING)),
+ Schema.Field.of("assignedPrograms", Schema.nullableOf(Schema.of(Schema.Type.INT))),
+ Schema.Field.of("averageScore", Schema.nullableOf(Schema.of(Schema.Type.FLOAT))),
+ Schema.Field.of("paths", Schema.nullableOf(Schema.arrayOf(Schema.of(Schema.Type.STRING))))
+ ).toString();
+
+ private StructuredRecord generateData(String idValue) {
+ StructuredRecord record = StructuredRecord.builder(INPUT_SCHEMA)
+ .set("firstName", "toto")
+ .set("mail", "toto.tata@tutu.com")
+ .set("_id", idValue)
+ .build();
+ return record;
+ }
+
+ static class BaseTestConfigHttp extends DynamicHttpTransformConfig {
+ BaseTestConfigHttp(String outputSchema, String referenceName, String url,
+ String urlVariables, int maxCallPerSeconds, String reusedInputs, String renameReusedInputs) {
+ super(referenceName);
+
+ this.schema = outputSchema;
+ this.url = url;
+ this.urlVariables = urlVariables;
+ this.reusedInputs = reusedInputs;
+ this.renameReusedInputs = renameReusedInputs;
+ this.maxCallPerSeconds = maxCallPerSeconds;
+ this.httpMethod = "GET";
+ this.oauth2Enabled = "false";
+ this.httpErrorsHandling = "2..:Success,.*:Fail";
+ this.errorHandling = "stopOnError";
+ this.retryPolicy = "linear";
+ this.maxRetryDuration = 10L;
+ this.linearRetryInterval = 1L;
+ this.waitTimeBetweenPages = 0L;
+ this.connectTimeout = 60;
+ this.readTimeout = 120;
+ this.format = "text";
+ this.keystoreType = "Java KeyStore (JKS)";
+ this.trustStoreType = "Java KeyStore (JKS)";
+ this.transportProtocols = "TLSv1.2";
+ this.format = "json";
+ }
+ }
+
+ @Test
+ public void testHttpDynamicTransformNominal() throws Exception {
+ List outputRecords = testHttpDynamicTransform("user.json");
+
+ Assert.assertTrue(outputRecords.size() == 1);
+ StructuredRecord outputRecord = outputRecords.get(0);
+ Assert.assertEquals("toto", outputRecord.get("firstName"));
+ Assert.assertEquals("tata", outputRecord.get("lastName"));
+ Assert.assertEquals("toto.tata@tutu.com", outputRecord.get("mail"));
+ Assert.assertEquals("the_id_value", outputRecord.get("_id"));
+ Assert.assertEquals(Arrays.asList("RATkiller 🐭", "Bienvenue 🙋", "Pro amiante"), outputRecord.get("paths"));
+ }
+
+
+ @Test
+ public void testHttpDynamicTransformPartial() throws Exception {
+ List outputRecords = testHttpDynamicTransform("user_partial.json");
+
+ Assert.assertTrue(outputRecords.size() == 1);
+ StructuredRecord outputRecord = outputRecords.get(0);
+ Assert.assertEquals("toto.tata@tutu.com", outputRecord.get("mail"));
+ Assert.assertEquals("the_id_value", outputRecord.get("_id"));
+ }
+
+ public List testHttpDynamicTransform(String filepath)throws Exception {
+ HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+ CloseableHttpResponse mockHttpResponse = Mockito.mock(CloseableHttpResponse.class);
+ HttpEntity mockEntity = Mockito.mock(HttpEntity.class);
+ StatusLine statusLine = Mockito.mock(StatusLine.class);
+
+ String baseURL = "myfakeurl.com/{id}?apiKey=XX&company=xx";
+ Map urlVariables = new HashMap<>();
+ urlVariables.put("id", "_id");
+ String idValue = "the_id_value";
+ String targetURL = "myfakeurl.com/the_id_value?apiKey=XX&company=xx";
+ Mockito.when(mockHttpClient.executeHTTP(targetURL/*Mockito.any()*/)).thenReturn(mockHttpResponse);
+ Mockito.when(mockHttpResponse.getEntity()).thenReturn(mockEntity);
+ Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(statusLine);
+ Mockito.when(statusLine.getStatusCode()).thenReturn(200);
+ Mockito.when(mockEntity.getContent()).thenReturn(getClass().getClassLoader().getResourceAsStream(filepath));
+
+ BaseTestConfigHttp config = new BaseTestConfigHttp(
+ OUTPUT_SCHEMA,
+ "HttpDynamicTransform-transform",
+ baseURL,
+ Joiner.on(",").withKeyValueSeparator(":").join(urlVariables),
+ 10,
+ "",
+ "");
+
+ Transform transform
+ = new DynamicHttpTransform(config, mockHttpClient);
+
+ transform.initialize(null);
+
+ MockEmitter emitter = new MockEmitter<>();
+
+ transform.transform(generateData(idValue), emitter);
+ transform.destroy();
+
+ return emitter.getEmitted();
+ }
+
+ @Test
+ public void testReuseInputs()throws Exception {
+ String filepath = "user.json";
+ HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+ CloseableHttpResponse mockHttpResponse = Mockito.mock(CloseableHttpResponse.class);
+ HttpEntity mockEntity = Mockito.mock(HttpEntity.class);
+ StatusLine statusLine = Mockito.mock(StatusLine.class);
+
+ Mockito.when(mockHttpClient.executeHTTP(Mockito.any())).thenReturn(mockHttpResponse);
+ Mockito.when(mockHttpResponse.getEntity()).thenReturn(mockEntity);
+ Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(statusLine);
+ Mockito.when(statusLine.getStatusCode()).thenReturn(200);
+ Mockito.when(mockEntity.getContent()).thenReturn(getClass().getClassLoader().getResourceAsStream(filepath));
+
+ Map renamingMapping = new HashMap<>();
+ renamingMapping.put("_id", "renamed_id");
+ renamingMapping.put("mail", "renamed_mail");
+
+ BaseTestConfigHttp config = new BaseTestConfigHttp(
+ OUTPUT_SCHEMA_WITH_REUSED,
+ "HttpDynamicTransform-transform",
+ "",
+ "",
+ 10,
+ "_id,mail",
+ Joiner.on(",").withKeyValueSeparator(":").join(renamingMapping));
+
+ Transform transform
+ = new DynamicHttpTransform(config, mockHttpClient);
+
+ transform.initialize(null);
+
+ MockEmitter emitter = new MockEmitter<>();
+
+ transform.transform(generateData("id_value"), emitter);
+ transform.destroy();
+
+ List outputRecords = emitter.getEmitted();
+
+ Assert.assertTrue(outputRecords.size() == 1);
+ StructuredRecord outputRecord = outputRecords.get(0);
+ Assert.assertEquals("toto.tata@tutu.com", outputRecord.get("mail"));
+ Assert.assertEquals("id_value", outputRecord.get("renamed_id"));
+ Assert.assertEquals(Arrays.asList("RATkiller 🐭", "Bienvenue 🙋", "Pro amiante"), outputRecord.get("paths"));
+ }
+
+}
diff --git a/src/test/resources/user.json b/src/test/resources/user.json
new file mode 100644
index 00000000..277669a6
--- /dev/null
+++ b/src/test/resources/user.json
@@ -0,0 +1,221 @@
+{
+ "_id": "the_id_value",
+ "mail": "toto.tata@tutu.com",
+ "firstName": "toto",
+ "lastName": "tata",
+ "assignedPrograms": 11,
+ "averageScore": 75.2,
+ "paths": [
+ "RATkiller 🐭",
+ "Bienvenue 🙋",
+ "Pro amiante"
+ ],
+ "certifications": [
+ {
+ "_id": "someId1",
+ "name": "La circulation au travail",
+ "date": "2021-02-24T14:44:08.000Z"
+ },
+ {
+ "_id": "someId2",
+ "name": "Évaluation d'accueil sécurité métier du déchet",
+ "date": "2020-10-20T08:18:55.000Z"
+ },
+ {
+ "_id": "someId3",
+ "name": "Acciline + Module gestion des évènements",
+ "date": "2020-10-15T16:41:24.000Z"
+ },
+ {
+ "_id": "someId4",
+ "name": "Connaissances approfondies AMIANTE",
+ "date": "2020-09-29T08:39:03.000Z"
+ },
+ {
+ "_id": "someId15",
+ "name": "Pass Compliance - Anti corruption",
+ "date": "2020-09-28T08:39:46.000Z"
+ },
+ {
+ "_id": "someId6",
+ "name": "Mes réflexes santé",
+ "date": "2020-05-11T20:26:24.000Z"
+ },
+ {
+ "_id": "someId7",
+ "name": "Mes réflexes santé",
+ "date": "2020-05-02T09:25:23.000Z"
+ }
+ ],
+ "championAchievements": {
+ "coursesPublished": [
+ "Champion",
+ "Guide",
+ "Modèle"
+ ],
+ "authoringToolExplored": [
+ "Apprenti Champion"
+ ],
+ "repliesOnOwnActivitiesGiven": [
+ "Pédagogue",
+ "Instituteur"
+ ],
+ "positiveReactionsOnOwnContent": [
+ "Reconnu(e)",
+ "Renommé(e)",
+ "Acclamé(e)",
+ "Illustre"
+ ]
+ },
+ "comments": 93,
+ "completedPrograms": 25,
+ "groups": [
+ {
+ "_id": "someId1",
+ "name": "Learning@Veolia",
+ "public": false
+ },
+ {
+ "_id": "someId2",
+ "name": "VEOLIA MIB",
+ "public": false,
+ "parent": "someId1"
+ },
+ {
+ "_id": "someId3",
+ "name": "SARP - SESAME",
+ "public": false,
+ "parent": "someId2"
+ },
+ {
+ "_id": "someId4",
+ "name": "VEOLIA Energie France",
+ "public": false,
+ "parent": "someId2"
+ },
+ {
+ "_id": "someId5",
+ "name": "Grands Comptes",
+ "public": false,
+ "parent": "someId3"
+ }
+ ],
+ "imageUrl": "https://veolialearning.360learning.com/api/medias/user/the_id_value",
+ "labels": [
+ "[BU] SARP",
+ "SARP SIEGE",
+ "SARP SA",
+ "[ZONE] MIB",
+ "Manager"
+ ],
+ "lastLoginAt": "2021-04-10T19:43:18.977Z",
+ "championStatus": "champion",
+ "learnerAchievements": {
+ "reactionsGiven": [
+ "Membre de la communauté",
+ "Supporter"
+ ],
+ "repliesGiven": [
+ "Main tendue",
+ "Contributeur(trice)",
+ "Pilier"
+ ],
+ "coursesPlayed": [
+ "Curieux(se)",
+ "Assidu(e)"
+ ]
+ },
+ "managers": [],
+ "messages": 95,
+ "publications": 0,
+ "reactions": 104,
+ "skills": [
+ {
+ "_id": "5e53c15ab7004c4dfbc90fe6",
+ "name": "Conformité",
+ "assessmentScore": "",
+ "averageScore": 96,
+ "eLearningScore": 96
+ },
+ {
+ "_id": "5e53c16c0cc7e84db3c1c93c",
+ "name": "Anti corruption",
+ "assessmentScore": "",
+ "averageScore": 100,
+ "eLearningScore": 100
+ },
+ {
+ "_id": "59e8527ec6daad12ee0a860d",
+ "name": "Santé sécurité",
+ "assessmentScore": "",
+ "averageScore": 100,
+ "eLearningScore": 100
+ },
+ {
+ "_id": "5b3f4a973cb0111d4c2e02f8",
+ "name": "Standard sécurité",
+ "assessmentScore": "",
+ "averageScore": 100,
+ "eLearningScore": 100
+ },
+ {
+ "_id": "5b3f4a973cb0111d4c2e02d7",
+ "name": "Prévention des risques",
+ "assessmentScore": "",
+ "averageScore": 100,
+ "eLearningScore": 100
+ },
+ {
+ "_id": "5e71f3c6dd2b1734b380e040",
+ "name": "Suivi & reporting",
+ "assessmentScore": "",
+ "averageScore": 100,
+ "eLearningScore": 100
+ },
+ {
+ "_id": "5b3f4a963cb0111d4c2e026d",
+ "name": "Analyse de risques",
+ "assessmentScore": "",
+ "averageScore": 100,
+ "eLearningScore": 100
+ },
+ {
+ "_id": "5e71f35ef55a8837e4803d86",
+ "name": "Langage Preventeo",
+ "assessmentScore": "",
+ "averageScore": 100,
+ "eLearningScore": 100
+ },
+ {
+ "_id": "5e71f36b1602f434c12da815",
+ "name": "Fonctions Preventeo",
+ "assessmentScore": "",
+ "averageScore": 100,
+ "eLearningScore": 100
+ },
+ {
+ "_id": "5e71f377f55a8837e4803d88",
+ "name": "Veille réglementaire",
+ "assessmentScore": "",
+ "averageScore": 100,
+ "eLearningScore": 100
+ },
+ {
+ "_id": "59e85261c6daad12ee0a860a",
+ "name": "Management & Leadership",
+ "assessmentScore": "",
+ "averageScore": 100,
+ "eLearningScore": 100
+ },
+ {
+ "_id": "5d238c1fe451727e3a270a83",
+ "name": "Veolia",
+ "assessmentScore": "",
+ "averageScore": 100,
+ "eLearningScore": 100
+ }
+ ],
+ "subordinates": [],
+ "toDeactivateAt": "",
+ "totalTimeSpentInMinutes": 914
+}
\ No newline at end of file
diff --git a/widgets/HTTP-streamingsource.json b/widgets/HTTP-streamingsource.json
index e7abdb68..417bad69 100644
--- a/widgets/HTTP-streamingsource.json
+++ b/widgets/HTTP-streamingsource.json
@@ -503,7 +503,7 @@
"name": "Proxy authentication",
"condition": {
"property": "proxyUrl",
- "operator": "exists",
+ "operator": "exists"
},
"show": [
{
diff --git a/widgets/HTTP-transform.json b/widgets/HTTP-transform.json
new file mode 100644
index 00000000..7f949365
--- /dev/null
+++ b/widgets/HTTP-transform.json
@@ -0,0 +1,707 @@
+{
+ "metadata": {
+ "spec-version": "1.5"
+ },
+ "display-name": "Multi HTTP",
+ "configuration-groups": [
+ {
+ "label": "General",
+ "properties": [
+ {
+ "widget-type": "textbox",
+ "label": "Reference Name",
+ "name": "referenceName"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "URL",
+ "name": "url"
+ },
+ {
+ "widget-type": "keyvalue",
+ "label": "URL Variables",
+ "name": "urlVariables",
+ "widget-attributes": {
+ "showDelimiter": "false"
+ }
+ },
+ {
+ "widget-type": "select",
+ "label": "HTTP Method",
+ "name": "httpMethod",
+ "widget-attributes": {
+ "values": [
+ "GET",
+ "POST",
+ "PUT",
+ "DELETE",
+ "HEAD"
+ ],
+ "default": "GET"
+ }
+ },
+ {
+ "widget-type": "keyvalue",
+ "label": "Headers",
+ "name": "headers",
+ "widget-attributes": {
+ "showDelimiter": "false"
+ }
+ },
+ {
+ "widget-type": "textarea",
+ "name": "requestBody",
+ "label": "Request Body",
+ "widget-attributes": {
+ "rows": "5"
+ }
+ }
+ ]
+ }, {
+ "label": "Multi-HTTP",
+ "properties": [
+ {
+ "widget-type": "input-field-selector",
+ "label": "Reused Input Fields",
+ "name": "reusedInputs",
+ "widget-attributes": {
+ "multiselect": "true"
+ }
+ },
+ {
+ "widget-type": "keyvalue",
+ "label": "Rename Reused Input Fields",
+ "name": "renameReusedInputs",
+ "widget-attributes": {
+ "showDelimiter": "false"
+ }
+ },
+ {
+ "widget-type": "number",
+ "label": "Max calls per seconds",
+ "name": "maxCallPerSeconds",
+ "widget-attributes": {
+ "min": "0",
+ "default": "0"
+ }
+ }
+ ]
+ },
+ {
+ "label": "Format",
+ "properties": [
+ {
+ "widget-type": "select",
+ "label": "Format",
+ "name": "format",
+ "widget-attributes": {
+ "values": [
+ "json",
+ "xml",
+ "tsv",
+ "csv",
+ "text",
+ "blob"
+ ],
+ "default": "json"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "JSON/XML Result Path",
+ "name": "resultPath"
+ },
+ {
+ "widget-type": "keyvalue",
+ "label": "JSON/XML Fields Mapping",
+ "name": "fieldsMapping",
+ "widget-attributes": {
+ "showDelimiter": "false"
+ }
+ },
+ {
+ "widget-type": "radio-group",
+ "label": "CSV Skip First Row",
+ "name": "csvSkipFirstRow",
+ "widget-attributes": {
+ "layout": "inline",
+ "default": "false",
+ "options": [
+ {
+ "id": "true",
+ "label": "true"
+ },
+ {
+ "id": "false",
+ "label": "false"
+ }
+ ]
+ }
+ }
+ ]
+ },
+ {
+ "label": "OAuth2",
+ "properties": [
+ {
+ "widget-type": "toggle",
+ "label": "OAuth2 Enabled",
+ "name": "oauth2Enabled",
+ "widget-attributes": {
+ "default": "false",
+ "on": {
+ "label": "True",
+ "value": "true"
+ },
+ "off": {
+ "label": "False",
+ "value": "false"
+ }
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Auth URL",
+ "name": "authUrl"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Token URL",
+ "name": "tokenUrl"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Client ID",
+ "name": "clientId"
+ },
+ {
+ "widget-type": "password",
+ "label": "Client Secret",
+ "name": "clientSecret"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Scopes",
+ "name": "scopes"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Refresh Token",
+ "name": "refreshToken"
+ }
+ ]
+ },
+ {
+ "label": "Basic Authentication",
+ "properties": [
+ {
+ "widget-type": "textbox",
+ "label": "Username",
+ "name": "username"
+ },
+ {
+ "widget-type": "password",
+ "label": "Password",
+ "name": "password"
+ }
+ ]
+ },
+ {
+ "label": "HTTP Proxy",
+ "properties": [
+ {
+ "widget-type": "textbox",
+ "label": "Proxy URL",
+ "name": "proxyUrl"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Username",
+ "name": "proxyUsername"
+ },
+ {
+ "widget-type": "password",
+ "label": "Password",
+ "name": "proxyPassword"
+ }
+ ]
+ },
+ {
+ "label": "Error Handling",
+ "properties": [
+ {
+ "widget-type": "keyvalue-dropdown",
+ "label": "HTTP Errors Handling",
+ "name": "httpErrorsHandling",
+ "widget-attributes": {
+ "default": "2..:Success,.*:Fail",
+ "showDelimiter": "false",
+ "dropdownOptions": [
+ "Success",
+ "Fail",
+ "Skip",
+ "Send to error",
+ "Retry and fail",
+ "Retry and skip",
+ "Retry and send to error"
+ ],
+ "key-placeholder": "HTTP Status Code Regex"
+ }
+ },
+ {
+ "widget-type": "radio-group",
+ "label": "Non-HTTP Error Handling",
+ "name": "errorHandling",
+ "widget-attributes": {
+ "layout": "inline",
+ "default": "stopOnError",
+ "options": [
+ {
+ "id": "stopOnError",
+ "label": "Stop on error"
+ },
+ {
+ "id": "sendToError",
+ "label": "Send to error"
+ },
+ {
+ "id": "skipOnError",
+ "label": "Skip on error"
+ }
+ ]
+ }
+ },
+ {
+ "widget-type": "radio-group",
+ "label": "Retry Policy",
+ "name": "retryPolicy",
+ "widget-attributes": {
+ "layout": "inline",
+ "default": "exponential",
+ "options": [
+ {
+ "id": "exponential",
+ "label": "Exponential"
+ },
+ {
+ "id": "linear",
+ "label": "Linear"
+ }
+ ]
+ }
+ },
+ {
+ "widget-type": "number",
+ "label": "Linear Retry Interval",
+ "name": "linearRetryInterval",
+ "widget-attributes": {
+ "min": "0",
+ "default": "30"
+ }
+ },
+ {
+ "widget-type": "number",
+ "label": "Max Retry Duration",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "min": "0",
+ "default": "600"
+ }
+ },
+ {
+ "widget-type": "number",
+ "label": "Connect Timeout",
+ "name": "connectTimeout",
+ "widget-attributes": {
+ "min": "0",
+ "default": "120"
+ }
+ },
+ {
+ "widget-type": "number",
+ "label": "Read Timeout",
+ "name": "readTimeout",
+ "widget-attributes": {
+ "min": "0",
+ "default": "120"
+ }
+ }
+ ]
+ },
+ {
+ "label": "Pagination",
+ "properties": [
+ {
+ "widget-type": "select",
+ "label": "Pagination Type",
+ "name": "paginationType",
+ "widget-attributes": {
+ "values": [
+ "None",
+ "Link in response header",
+ "Link in response body",
+ "Token in response body",
+ "Increment an index",
+ "Custom"
+ ],
+ "default": "None"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Start Index",
+ "name": "startIndex",
+ "widget-attributes": {
+ "placeholder": "For pagination type \"Increment an index\""
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Max Index",
+ "name": "maxIndex",
+ "widget-attributes": {
+ "placeholder": "For pagination type \"Increment an index\""
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Index Increment",
+ "name": "indexIncrement",
+ "widget-attributes": {
+ "placeholder": "For pagination type \"Increment an index\""
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Next Page JSON/XML Field Path",
+ "name": "nextPageFieldPath",
+ "widget-attributes": {
+ "placeholder": "For pagination type \"Link in response body\""
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Next Page Token Path",
+ "name": "nextPageTokenPath",
+ "widget-attributes": {
+ "placeholder": "For pagination type \"Token in response body\""
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Next Page Url Parameter",
+ "name": "nextPageUrlParameter",
+ "widget-attributes": {
+ "placeholder": "For pagination type \"Token in response body\""
+ }
+ },
+ {
+ "widget-type": "python-editor",
+ "label": "Custom Pagination Python Code",
+ "name": "customPaginationCode",
+ "widget-attributes": {
+ "placeholder": "def get_next_page_url(url, page, headers):\n \"\"\"\n Based on previous page data generates next page url, when \"Custom pagination\" is enabled.\n Returns 'None' if no more pages to load \n\n Args:\n url (string): previous page url\n page (string): a body of previous page\n headers (dict): a dictionary of headers from previous page\n\n \"\"\"\n return \"https://next-page-url.com\""
+ }
+ },
+ {
+ "widget-type": "number",
+ "label": "Wait Time Between Pages (milliseconds)",
+ "name": "waitTimeBetweenPages",
+ "widget-attributes": {
+ "min": "0",
+ "default": "0"
+ }
+ }
+ ]
+ },
+ {
+ "label": "SSL/TLS",
+ "properties": [
+ {
+ "widget-type": "toggle",
+ "label": "Verify HTTPS Trust Certificates",
+ "name": "verifyHttps",
+ "widget-attributes": {
+ "default": "true",
+ "on": {
+ "label": "True",
+ "value": "true"
+ },
+ "off": {
+ "label": "False",
+ "value": "false"
+ }
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Keystore File",
+ "name": "keystoreFile"
+ },
+ {
+ "widget-type": "select",
+ "label": "Keystore Type",
+ "name": "keystoreType",
+ "widget-attributes": {
+ "default": "Java KeyStore (JKS)",
+ "values": [
+ "Java KeyStore (JKS)",
+ "Java Cryptography Extension KeyStore (JCEKS)",
+ "PKCS #12"
+ ]
+ }
+ },
+ {
+ "widget-type": "password",
+ "label": "Keystore Password",
+ "name": "keystorePassword"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Keystore Key Algorithm",
+ "name": "keystoreKeyAlgorithm",
+ "widget-attributes": {
+ "default": "SunX509"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "TrustStore File",
+ "name": "trustStoreFile"
+ },
+ {
+ "widget-type": "select",
+ "label": "TrustStore Type",
+ "name": "trustStoreType",
+ "widget-attributes": {
+ "default": "Java KeyStore (JKS)",
+ "values": [
+ "Java KeyStore (JKS)",
+ "Java Cryptography Extension KeyStore (JCEKS)",
+ "PKCS #12"
+ ]
+ }
+ },
+ {
+ "widget-type": "password",
+ "label": "TrustStore Password",
+ "name": "trustStorePassword"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "TrustStore Key Algorithm",
+ "name": "trustStoreKeyAlgorithm",
+ "widget-attributes": {
+ "default": "SunX509"
+ }
+ },
+ {
+ "widget-type": "csv",
+ "label": "Transport Protocols",
+ "name": "transportProtocols",
+ "widget-attributes": {
+ "default": "TLSv1.2"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Cipher Suites",
+ "name": "cipherSuites"
+ }
+ ]
+ }
+ ],
+ "emit-errors": true,
+ "outputs": [
+ {
+ "name": "schema",
+ "label": "schema",
+ "widget-type": "schema",
+ "widget-attributes": {
+ "schema-types": [
+ "boolean",
+ "int",
+ "long",
+ "float",
+ "double",
+ "bytes",
+ "string",
+ "array",
+ "record",
+ "map",
+ "union"
+ ],
+ "schema-default-type": "string",
+ "property-watch": "format"
+ }
+ }
+ ],
+ "filters": [
+ {
+ "name": "Proxy authentication",
+ "condition": {
+ "property": "proxyUrl",
+ "operator": "exists"
+ },
+ "show": [
+ {
+ "name": "proxyUsername",
+ "type": "property"
+ },
+ {
+ "name": "proxyPassword",
+ "type": "property"
+ }
+ ]
+ },
+ {
+ "name": "Increment an index",
+ "condition": {
+ "property": "paginationType",
+ "operator": "equal to",
+ "value": "Increment an index"
+ },
+ "show": [
+ {
+ "name": "startIndex",
+ "type": "property"
+ },
+ {
+ "name": "maxIndex",
+ "type": "property"
+ },
+ {
+ "name": "indexIncrement",
+ "type": "property"
+ }
+ ]
+ },
+ {
+ "name": "Token in Response Body",
+ "condition": {
+ "property": "paginationType",
+ "operator": "equal to",
+ "value": "Token in response body"
+ },
+ "show": [
+ {
+ "name": "nextPageTokenPath",
+ "type": "property"
+ },
+ {
+ "name": "nextPageUrlParameter",
+ "type": "property"
+ }
+ ]
+ },
+ {
+ "name": "Link in response body",
+ "condition": {
+ "property": "paginationType",
+ "operator": "equal to",
+ "value": "Link in response body"
+ },
+ "show": [
+ {
+ "name": "nextPageFieldPath",
+ "type": "property"
+ }
+ ]
+ },
+ {
+ "name": "Custom pagination",
+ "condition": {
+ "property": "paginationType",
+ "operator": "equal to",
+ "value": "Custom"
+ },
+ "show": [
+ {
+ "name": "customPaginationCode",
+ "type": "property"
+ }
+ ]
+ },
+ {
+ "label": "OAuth2",
+ "properties": [
+ {
+ "widget-type": "toggle",
+ "label": "OAuth2 Enabled",
+ "name": "oauth2Enabled",
+ "widget-attributes": {
+ "default": "false",
+ "on": {
+ "label": "True",
+ "value": "true"
+ },
+ "off": {
+ "label": "False",
+ "value": "false"
+ }
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Auth URL",
+ "name": "authUrl"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Token URL",
+ "name": "tokenUrl"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Client ID",
+ "name": "clientId"
+ },
+ {
+ "widget-type": "password",
+ "label": "Client Secret",
+ "name": "clientSecret"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Scopes",
+ "name": "scopes"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Refresh Token",
+ "name": "refreshToken"
+ }
+ ]
+ },
+ {
+ "name": "JSON/XML Formatting",
+ "condition": {
+ "expression": "format == 'json' || format == 'xml'"
+ },
+ "show": [
+ {
+ "name": "resultPath",
+ "type": "property"
+ },
+ {
+ "name": "fieldsMapping",
+ "type": "property"
+ }
+ ]
+ },
+ {
+ "name": "CSV Formatting",
+ "condition": {
+ "property": "format",
+ "operator": "equal to",
+ "value": "csv"
+ },
+ "show": [
+ {
+ "name": "csvSkipFirstRow",
+ "type": "property"
+ }
+ ]
+ }
+ ]
+}
\ No newline at end of file