diff --git a/docs/en/connector-v2/source/Http.md b/docs/en/connector-v2/source/Http.md index 8b922edc230..992542b50c8 100644 --- a/docs/en/connector-v2/source/Http.md +++ b/docs/en/connector-v2/source/Http.md @@ -44,37 +44,38 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor ## Source Options -| Name | Type | Required | Default | Description | -|-------------------------------|---------|----------|-------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| url | String | Yes | - | Http request url. | -| schema | Config | No | - | Http and seatunnel data structure mapping | -| schema.fields | Config | No | - | The schema fields of upstream data | -| json_field | Config | No | - | This parameter helps you configure the schema,so this parameter must be used with schema. | -| pageing | Config | No | - | This parameter is used for paging queries | -| pageing.page_field | String | No | - | This parameter is used to specify the page field name in the request. It can be used in headers, params, or body with placeholders like ${page_field}. | -| pageing.use_placeholder_replacement | Boolean | No | false | If true, use placeholder replacement (${field}) for headers, parameters and body values, otherwise use key-based replacement. | -| pageing.total_page_size | Int | No | - | This parameter is used to control the total number of pages | -| pageing.batch_size | Int | No | - | The batch size returned per request is used to determine whether to continue when the total number of pages is unknown | -| pageing.start_page_number | Int | No | 1 | Specify the page number from which synchronization starts | +| Name | Type | Required | Default | Description | +|-------------------------------|---------|----------|-------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| url | String | Yes | - | Http request url. | +| schema | Config | No | - | Http and seatunnel data structure mapping | +| schema.fields | Config | No | - | The schema fields of upstream data | +| json_field | Config | No | - | This parameter helps you configure the schema,so this parameter must be used with schema. | +| pageing | Config | No | - | This parameter is used for paging queries | +| pageing.page_field | String | No | - | This parameter is used to specify the page field name in the request. It can be used in headers, params, or body with placeholders like ${page_field}. | +| pageing.use_placeholder_replacement | Boolean | No | false | If true, use placeholder replacement (${field}) for headers, parameters and body values, otherwise use key-based replacement. | +| pageing.total_page_size | Int | No | - | This parameter is used to control the total number of pages | +| pageing.batch_size | Int | No | - | The batch size returned per request is used to determine whether to continue when the total number of pages is unknown | +| pageing.start_page_number | Int | No | 1 | Specify the page number from which synchronization starts | | pageing.page_type | String | No | PageNumber | this parameter is used to specify the page type ,or PageNumber if not set, only support `PageNumber` and `Cursor`. | | pageing.cursor_field | String | No | - | this parameter is used to specify the Cursor field name in the request parameter. | -| pageing.cursor_response_field | String | No | - | This parameter specifies the field in the response from which the cursor is retrieved. | -| content_json | String | No | - | This parameter can get some json data.If you only need the data in the 'book' section, configure `content_field = "$.store.book.*"`. | -| format | String | No | text | The format of upstream data, now only support `json` `text`, default `text`. | -| method | String | No | get | Http request method, only supports GET, POST method. | -| headers | Map | No | - | Http headers. | -| params | Map | No | - | Http params. | -| body | String | No | - | Http body,the program will automatically add http header application/json,body is jsonbody. | -| poll_interval_millis | Int | No | - | Request http api interval(millis) in stream mode. | -| retry | Int | No | - | The max retry times if request http return to `IOException`. | -| retry_backoff_multiplier_ms | Int | No | 100 | The retry-backoff times(millis) multiplier if request http failed. | -| retry_backoff_max_ms | Int | No | 10000 | The maximum retry-backoff times(millis) if request http failed | -| enable_multi_lines | Boolean | No | false | | -| connect_timeout_ms | Int | No | 12000 | Connection timeout setting, default 12s. | -| socket_timeout_ms | Int | No | 60000 | Socket timeout setting, default 60s. | -| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details | +| pageing.cursor_response_field | String | No | - | This parameter specifies the field in the response from which the cursor is retrieved. | +| content_json | String | No | - | This parameter can get some json data.If you only need the data in the 'book' section, configure `content_field = "$.store.book.*"`. | +| format | String | No | text | The format of upstream data, now only support `json` `text`, default `text`. | +| method | String | No | get | Http request method, only supports GET, POST method. | +| headers | Map | No | - | Http headers. | +| params | Map | No | - | Http params. | +| body | String | No | - | Http body,the program will automatically add http header application/json,body is jsonbody. | +| poll_interval_millis | Int | No | - | Request http api interval(millis) in stream mode. | +| retry | Int | No | - | The max retry times if request http return to `IOException`. | +| retry_backoff_multiplier_ms | Int | No | 100 | The retry-backoff times(millis) multiplier if request http failed. | +| retry_backoff_max_ms | Int | No | 10000 | The maximum retry-backoff times(millis) if request http failed | +| enable_multi_lines | Boolean | No | false | | +| connect_timeout_ms | Int | No | 12000 | Connection timeout setting, default 12s. | +| socket_timeout_ms | Int | No | 60000 | Socket timeout setting, default 60s. | +| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details | | keep_params_as_form | Boolean | No | false | Whether the params are submitted according to the form, used for compatibility with legacy behaviors. When true, the value of the params parameter is submitted through the form. | -| keep_page_param_as_http_param | Boolean | No | false | Whether to set the paging parameters to params. For compatibility with legacy behaviors. | +| keep_page_param_as_http_param | Boolean | No | false | Whether to set the paging parameters to params. For compatibility with legacy behaviors.| +| json_filed_missed_return_null | Boolean | No | false | When the json field is missing, set true return null else error.| ## How to Create a Http Data Synchronization Jobs @@ -216,7 +217,7 @@ By default, the parameters will be added to the url path. If you need to keep the old version behavior, please check keep_params_as_form. ### body -The HTTP body is used to carry the actual data in requests or responses, including JSON, form submissions. +The HTTP body is used to carry the actual data in requests or responses, including JSON, form submissions. The reference format is as follows: ```hocon diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml b/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml index f18d6f9fef1..e19e9ab9848 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml @@ -33,6 +33,7 @@ 4.5.13 4.4.4 2.0.0 + 3.12.4 @@ -69,5 +70,29 @@ json-path ${json-path.version} + + + + org.junit.jupiter + junit-jupiter-api + test + + + org.junit.jupiter + junit-jupiter-engine + test + + + org.mockito + mockito-core + ${mockito.version} + test + + + org.mockito + mockito-junit-jupiter + ${mockito.version} + test + diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java index 272f2023298..94493a6c600 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java @@ -45,6 +45,7 @@ public class HttpParameter implements Serializable { protected boolean arrayMode = false; protected int batchSize = 1; protected int requestIntervalMs = 0; + protected boolean jsonFiledMissedReturnNull; public void buildWithConfig(ReadonlyConfig pluginConfig) { // set url @@ -83,5 +84,7 @@ public void buildWithConfig(ReadonlyConfig pluginConfig) { this.setEnableMultilines(pluginConfig.get(HttpSourceOptions.ENABLE_MULTI_LINES)); this.setConnectTimeoutMs(pluginConfig.get(HttpSourceOptions.CONNECT_TIMEOUT_MS)); this.setSocketTimeoutMs(pluginConfig.get(HttpSourceOptions.SOCKET_TIMEOUT_MS)); + this.setJsonFiledMissedReturnNull( + pluginConfig.get(HttpSourceOptions.JSON_FILED_MISSED_RETURN_NULL)); } } diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpSourceOptions.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpSourceOptions.java index cc179ea027f..5014a01c4a2 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpSourceOptions.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpSourceOptions.java @@ -147,4 +147,10 @@ public class HttpSourceOptions extends HttpCommonOptions { .intType() .defaultValue(DEFAULT_SOCKET_TIMEOUT_MS) .withDescription("Socket timeout setting, default 60s."); + + public static final Option JSON_FILED_MISSED_RETURN_NULL = + Options.key("json_filed_missed_return_null") + .booleanType() + .defaultValue(false) + .withDescription("When the json field is missing, return null"); } diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java index b82fe3fe0c3..fdb2b50adc5 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java @@ -36,6 +36,9 @@ import org.apache.seatunnel.connectors.seatunnel.http.config.PageInfo; import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException; +import org.apache.seatunnel.connectors.seatunnel.http.util.JsonPathProcessor; +import org.apache.seatunnel.connectors.seatunnel.http.util.JsonPathProcessorFactory; +import org.apache.seatunnel.connectors.seatunnel.http.util.JsonPathUtils; import org.apache.commons.collections4.MapUtils; @@ -418,47 +421,24 @@ private void collect(Collector output, String data) throws IOExcep deserializationCollector.collect(contentData.getBytes(), output); } - private List> parseToMap(List> datas, JsonField jsonField) { - List> decodeDatas = new ArrayList<>(datas.size()); - String[] keys = jsonField.getFields().keySet().toArray(new String[] {}); - - for (List data : datas) { - Map decodeData = new HashMap<>(jsonField.getFields().size()); - final int[] index = {0}; - data.forEach( - field -> { - decodeData.put(keys[index[0]], field); - index[0]++; - }); - decodeDatas.add(decodeData); - } - - return decodeDatas; - } - private List> decodeJSON(String data) { ReadContext jsonReadContext = JsonPath.using(jsonConfiguration).parse(data); - List> results = new ArrayList<>(jsonPaths.length); - for (JsonPath path : jsonPaths) { - List result = jsonReadContext.read(path); - results.add(result); - } - for (int i = 1; i < results.size(); i++) { - List result0 = results.get(0); - List result = results.get(i); - if (result0.size() != result.size()) { - throw new HttpConnectorException( - HttpConnectorErrorCode.FIELD_DATA_IS_INCONSISTENT, - String.format( - "[%s](%d) and [%s](%d) the number of parsing records is inconsistent.", - jsonPaths[0].getPath(), - result0.size(), - jsonPaths[i].getPath(), - result.size())); - } + boolean jsonFiledMissedReturnNull = httpParameter.isJsonFiledMissedReturnNull(); + if (jsonFiledMissedReturnNull) { + // Get the appropriate processor for the JsonPaths, passing the + // jsonFiledMissedReturnNull flag + JsonPathProcessor processor = + JsonPathProcessorFactory.getProcessor(this.jsonPaths, true); + return processor.processJsonData(jsonReadContext, this.jsonPaths); + } else { + // Standard processing for strict fields + return JsonPathProcessorFactory.getProcessor(this.jsonPaths) + .processJsonData(jsonReadContext, this.jsonPaths); } + } - return dataFlip(results); + private List> parseToMap(List> datas, JsonField jsonField) { + return JsonPathUtils.parseToMap(datas, jsonField); } private String getPartOfJson(String data) { @@ -467,8 +447,8 @@ private String getPartOfJson(String data) { } private List> dataFlip(List> results) { - List> datas = new ArrayList<>(); + for (int i = 0; i < results.size(); i++) { List result = results.get(i); if (i == 0) { @@ -487,15 +467,11 @@ private List> dataFlip(List> results) { } } } + return datas; } private void initJsonPath(JsonField jsonField) { - jsonPaths = new JsonPath[jsonField.getFields().size()]; - for (int index = 0; index < jsonField.getFields().keySet().size(); index++) { - jsonPaths[index] = - JsonPath.compile( - jsonField.getFields().values().toArray(new String[] {})[index]); - } + jsonPaths = JsonPathUtils.createJsonPaths(jsonField); } } diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/util/AbstractJsonPathProcessor.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/util/AbstractJsonPathProcessor.java new file mode 100644 index 00000000000..3c7b4a2014a --- /dev/null +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/util/AbstractJsonPathProcessor.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.http.util; + +import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException; + +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.ReadContext; + +import java.util.ArrayList; +import java.util.List; + +/** Abstract implementation of JsonPathProcessor providing common functionality. */ +public abstract class AbstractJsonPathProcessor implements JsonPathProcessor { + + /** Flag to indicate whether to return null for missing fields */ + private boolean jsonFiledMissedReturnNull = false; + + /** + * Set whether to return null for missing fields. + * + * @param jsonFiledMissedReturnNull true to return null for missing fields, false otherwise + */ + public void setJsonFiledMissedReturnNull(boolean jsonFiledMissedReturnNull) { + this.jsonFiledMissedReturnNull = jsonFiledMissedReturnNull; + } + + /** + * Check if json fields with missing values should return null. This is used to determine + * whether to validate result consistency. + * + * @return true if missing fields should return null, false otherwise + */ + protected boolean isJsonFiledMissedReturnNull() { + return jsonFiledMissedReturnNull; + } + + /** {@inheritDoc} */ + @Override + public List> processJsonData(ReadContext jsonReadContext, JsonPath[] paths) { + // Default implementation - can be overridden by subclasses + List> results = new ArrayList<>(paths.length); + + // Read all paths + for (JsonPath path : paths) { + results.add(jsonReadContext.read(path)); + } + + // Only validate consistency if jsonFiledMissedReturnNull is false + boolean shouldValidate = !isJsonFiledMissedReturnNull(); + if (shouldValidate) { + validateResultsConsistency(results, paths); + } + + return dataFlip(results); + } + + /** + * Helper method to validate that all results have the same size. + * + * @param results The list of results to validate + * @param paths The JsonPath objects used to generate the results + * @throws HttpConnectorException if results are inconsistent + */ + protected void validateResultsConsistency(List> results, JsonPath[] paths) { + if (results.isEmpty()) { + return; + } + + int expectedSize = results.get(0).size(); + for (int i = 1; i < results.size(); i++) { + if (results.get(i).size() != expectedSize) { + throw new HttpConnectorException( + HttpConnectorErrorCode.FIELD_DATA_IS_INCONSISTENT, + String.format( + "[%s](%d) and [%s](%d) the number of parsing records is inconsistent.", + paths[0].getPath(), + expectedSize, + paths[i].getPath(), + results.get(i).size())); + } + } + } + + /** + * Flips a matrix of results so that rows become columns and vice versa. + * + * @param results The original data matrix + * @return The flipped data matrix + */ + protected List> dataFlip(List> results) { + List> datas = new ArrayList<>(); + + for (int i = 0; i < results.size(); i++) { + List result = results.get(i); + if (i == 0) { + for (Object o : result) { + String val = o == null ? null : o.toString(); + List row = new ArrayList<>(results.size()); + row.add(val); + datas.add(row); + } + } else { + for (int j = 0; j < result.size(); j++) { + Object o = result.get(j); + String val = o == null ? null : o.toString(); + List row = datas.get(j); + row.add(val); + } + } + } + + return datas; + } + + /** + * Extract value from a JSON context using a relative path. + * + * @param objContext The JSON read context + * @param relativePath The relative path to extract from + * @return The extracted value as a string + */ + protected String extractValue(ReadContext objContext, String relativePath) { + try { + Object value = objContext.read(relativePath); + if (value == null) { + return null; + } + if (value instanceof String) { + // For string types, return the original value directly without JSON serialization, + // otherwise "value" will become "\"value"\" + return (String) value; + } + if (value instanceof List) { + List list = (List) value; + return !list.isEmpty() ? JsonUtils.toJsonString(list) : null; + } + // For other non-string values, use JsonUtils to serialize them. + return JsonUtils.toJsonString(value); + } catch (Exception e) { + return null; + } + } +} diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/util/ArrayJsonPathProcessor.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/util/ArrayJsonPathProcessor.java new file mode 100644 index 00000000000..21caa1a59df --- /dev/null +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/util/ArrayJsonPathProcessor.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.http.util; + +import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException; + +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.ReadContext; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** Processor for handling JsonPath with array notation (using [*]). */ +public class ArrayJsonPathProcessor extends AbstractJsonPathProcessor { + /** + * Extract the common parent path from an array of JsonPaths. + * + * @param paths Array of JsonPath objects + * @return The common parent path as a string + */ + private String extractCommonParentPath(JsonPath[] paths) { + if (paths == null || paths.length == 0) { + return null; + } + + // Get all paths as strings + String[] pathStrings = new String[paths.length]; + for (int i = 0; i < paths.length; i++) { + pathStrings[i] = paths[i].getPath(); + } + + String firstPath = pathStrings[0]; + int arrayPos = firstPath.indexOf("[*]"); + + if (arrayPos == -1) { + return null; // Not an array path, cannot process + } + + String parentPath = firstPath.substring(0, arrayPos + 3); + + // Verify all other paths have the same parent + for (int i = 1; i < pathStrings.length; i++) { + if (!pathStrings[i].startsWith(parentPath)) { + throw new HttpConnectorException( + HttpConnectorErrorCode.FIELD_DATA_IS_INCONSISTENT, + String.format( + "Paths have different array parents. Expected '%s' but found path starting with '%s'", + parentPath, pathStrings[i])); + } + } + + return parentPath; + } + + /** + * Get a relative path based on a parent path and a full path. + * + * @param parentPath The parent path + * @param fullPath The complete path + * @return The relative path from parent to full path + */ + private String getRelativePath(String parentPath, String fullPath) { + if (!parentPath.contains("[*]")) { + throw new IllegalArgumentException( + "Parent path must contain [*] for ArrayJsonPathProcessor"); + } + + if (!fullPath.contains("[*]")) { + // For non-array paths when parent has [*], extract the correct relative part + String commonPart = parentPath.substring(0, parentPath.indexOf("[*]")); + String relativePart = fullPath.substring(commonPart.length()); + + // If the relative part starts with a dot, remove it + if (relativePart.startsWith(".")) { + relativePart = relativePart.substring(1); + } + + return "$." + relativePart; + } else { + // Original implementation for array paths + String relativePart = fullPath.substring(parentPath.length()); + + // If the relative part starts with a dot, remove it + if (relativePart.startsWith(".")) { + relativePart = relativePart.substring(1); + } + + return "$." + relativePart; + } + } + + /** + * Read objects from a specific path in JSON. + * + * @param jsonReadContext The JSON read context + * @param path The path to read from + * @return List of objects read from the path + */ + private List> readObjectsFromPath( + ReadContext jsonReadContext, String path) { + try { + return jsonReadContext.read(path); + } catch (Exception e) { + throw new HttpConnectorException( + HttpConnectorErrorCode.FIELD_DATA_IS_INCONSISTENT, + String.format( + "Failed to read data from JSON using path %s: %s", + path, e.getMessage())); + } + } + + /** {@inheritDoc} */ + @Override + public List> processJsonData(ReadContext jsonReadContext, JsonPath[] paths) { + String commonParentPath = extractCommonParentPath(paths); + if (commonParentPath == null) { + throw new HttpConnectorException( + HttpConnectorErrorCode.FIELD_DATA_IS_INCONSISTENT, + "Could not find common parent path in JsonPaths. All paths must share a common array parent."); + } + + List> objects = readObjectsFromPath(jsonReadContext, commonParentPath); + + // If we're allowing null values for missing fields, we don't need additional validation + return processObjects(objects, commonParentPath, paths); + } + + /** + * Process objects extracted from JSON and convert them to the result format. + * + * @param objects List of objects extracted from JSON + * @param commonParentPath The common parent path used for extraction + * @param paths Array of JsonPath objects + * @return List of processed data + */ + private List> processObjects( + List> objects, String commonParentPath, JsonPath[] paths) { + List> results = initializeResults(paths.length, objects.size()); + + for (int objIndex = 0; objIndex < objects.size(); objIndex++) { + Map obj = objects.get(objIndex); + ReadContext objContext = JsonPath.parse(obj); + + for (int pathIndex = 0; pathIndex < paths.length; pathIndex++) { + String fieldPath = paths[pathIndex].getPath(); + String relativePath = getRelativePath(commonParentPath, fieldPath); + String value = extractValue(objContext, relativePath); + results.get(pathIndex).add(value); + } + } + + return dataFlip(results); + } + + /** + * Initialize a results list with the given dimensions. + * + * @param pathCount Number of paths (rows) + * @param objectCount Number of objects (columns) + * @return Initialized results list + */ + private List> initializeResults(int pathCount, int objectCount) { + List> results = new ArrayList<>(pathCount); + for (int i = 0; i < pathCount; i++) { + List row = new ArrayList<>(objectCount); + results.add(row); + } + return results; + } +} diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/util/JsonPathProcessor.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/util/JsonPathProcessor.java new file mode 100644 index 00000000000..2b914563272 --- /dev/null +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/util/JsonPathProcessor.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.http.util; + +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.ReadContext; + +import java.util.List; + +/** + * Interface for processing JsonPath operations. Different implementations can handle various + * JsonPath formats. + */ +public interface JsonPathProcessor { + /** + * Process objects from a JSON structure based on JsonPaths. + * + * @param jsonReadContext The JSON read context + * @param paths Array of JsonPath objects + * @return List of extracted data + */ + List> processJsonData(ReadContext jsonReadContext, JsonPath[] paths); +} diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/util/JsonPathProcessorFactory.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/util/JsonPathProcessorFactory.java new file mode 100644 index 00000000000..c23e002f582 --- /dev/null +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/util/JsonPathProcessorFactory.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.http.util; + +import com.jayway.jsonpath.JsonPath; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Supplier; + +/** Factory for creating appropriate JsonPathProcessor instances based on the JsonPath format. */ +public class JsonPathProcessorFactory { + + // List of processor suppliers in order of precedence + private static final List PROCESSOR_MATCHERS = new ArrayList<>(); + + static { + // Register all available processor matchers in order of precedence + PROCESSOR_MATCHERS.add( + new ProcessorMatcher( + path -> path.contains("[*]"), () -> new ArrayJsonPathProcessor())); + PROCESSOR_MATCHERS.add( + new ProcessorMatcher( + path -> true, // Default matcher + () -> new ObjectJsonPathProcessor())); + } + + /** + * Get the appropriate processor for a single JsonPath. + * + * @param jsonPath The JsonPath to process + * @return The appropriate JsonPathProcessor + */ + public static JsonPathProcessor getProcessor(JsonPath jsonPath) { + return getProcessor(jsonPath.getPath()); + } + + /** + * Get the appropriate processor for a JsonPath string. + * + * @param pathString The JsonPath string to process + * @return The appropriate JsonPathProcessor + */ + public static JsonPathProcessor getProcessor(String pathString) { + for (ProcessorMatcher matcher : PROCESSOR_MATCHERS) { + if (matcher.matches(pathString)) { + return matcher.createProcessor(); + } + } + + // Default to ObjectJsonPathProcessor if no other processor matches + return new ObjectJsonPathProcessor(); + } + + /** + * Get the appropriate processor for an array of JsonPaths. This will choose a processor based + * on the first path in the array. + * + * @param paths Array of JsonPath objects + * @return The appropriate JsonPathProcessor + */ + public static JsonPathProcessor getProcessor(JsonPath[] paths) { + if (paths == null || paths.length == 0) { + throw new IllegalArgumentException("JsonPath array cannot be null or empty"); + } + + return getProcessor(paths[0]); + } + + /** + * Get the appropriate processor for an array of JsonPaths with jsonFiledMissedReturnNull flag. + * + * @param paths Array of JsonPath objects + * @param jsonFiledMissedReturnNull Whether to return null for missing fields + * @return The appropriate JsonPathProcessor + */ + public static JsonPathProcessor getProcessor( + JsonPath[] paths, boolean jsonFiledMissedReturnNull) { + if (paths == null || paths.length == 0) { + throw new IllegalArgumentException("JsonPath array cannot be null or empty"); + } + + JsonPathProcessor processor = getProcessor(paths[0]); + + // If this processor is an AbstractJsonPathProcessor and jsonFiledMissedReturnNull is true, + // we need to set the flag + if (processor instanceof AbstractJsonPathProcessor && jsonFiledMissedReturnNull) { + ((AbstractJsonPathProcessor) processor).setJsonFiledMissedReturnNull(true); + } + + return processor; + } + + /** Helper class to match and create JsonPathProcessors. */ + private static class ProcessorMatcher { + private final PathMatcher matcher; + private final Supplier processorSupplier; + + public ProcessorMatcher( + PathMatcher matcher, Supplier processorSupplier) { + this.matcher = matcher; + this.processorSupplier = processorSupplier; + } + + public boolean matches(String pathString) { + return matcher.matches(pathString); + } + + public JsonPathProcessor createProcessor() { + return processorSupplier.get(); + } + } + + /** Interface for path matching. */ + @FunctionalInterface + private interface PathMatcher { + boolean matches(String pathString); + } +} diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/util/JsonPathUtils.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/util/JsonPathUtils.java new file mode 100644 index 00000000000..2da87dd95c6 --- /dev/null +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/util/JsonPathUtils.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.http.util; + +import org.apache.seatunnel.connectors.seatunnel.http.config.JsonField; +import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException; + +import com.jayway.jsonpath.Configuration; +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.Option; +import com.jayway.jsonpath.ReadContext; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Utility class for JsonPath operations. */ +public class JsonPathUtils { + + private static final Option[] DEFAULT_OPTIONS = { + Option.SUPPRESS_EXCEPTIONS, Option.ALWAYS_RETURN_LIST, Option.DEFAULT_PATH_LEAF_TO_NULL + }; + + private static final Configuration JSON_CONFIGURATION = + Configuration.defaultConfiguration().addOptions(DEFAULT_OPTIONS); + + /** + * Creates a ReadContext from a JSON string. + * + * @param json The JSON string + * @return A ReadContext for the JSON + */ + public static ReadContext parseJson(String json) { + return JsonPath.using(JSON_CONFIGURATION).parse(json); + } + + /** + * Creates JsonPath array from JsonField. + * + * @param jsonField The JsonField to convert + * @return Array of JsonPath objects + */ + public static JsonPath[] createJsonPaths(JsonField jsonField) { + if (jsonField == null || jsonField.getFields() == null || jsonField.getFields().isEmpty()) { + throw new HttpConnectorException( + HttpConnectorErrorCode.FIELD_DATA_IS_INCONSISTENT, + "JsonField cannot be null or empty"); + } + + JsonPath[] jsonPaths = new JsonPath[jsonField.getFields().size()]; + int index = 0; + for (String pathString : jsonField.getFields().values()) { + jsonPaths[index++] = JsonPath.compile(pathString); + } + + return jsonPaths; + } + + /** + * Converts parsed data to a list of maps. + * + * @param data The raw data (list of lists) + * @param jsonField The JsonField containing field names + * @return List of maps with field names as keys + */ + public static List> parseToMap( + List> data, JsonField jsonField) { + List> resultList = new ArrayList<>(data.size()); + String[] keys = jsonField.getFields().keySet().toArray(new String[0]); + + for (List row : data) { + Map resultMap = new HashMap<>(jsonField.getFields().size()); + for (int i = 0; i < row.size(); i++) { + resultMap.put(keys[i], row.get(i)); + } + resultList.add(resultMap); + } + + return resultList; + } +} diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/util/ObjectJsonPathProcessor.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/util/ObjectJsonPathProcessor.java new file mode 100644 index 00000000000..5ddfe2100cb --- /dev/null +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/util/ObjectJsonPathProcessor.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.http.util; + +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.ReadContext; + +import java.util.ArrayList; +import java.util.List; + +/** Processor for handling JsonPath with dot notation (standard object notation). */ +public class ObjectJsonPathProcessor extends AbstractJsonPathProcessor { + + /** {@inheritDoc} */ + @Override + public List> processJsonData(ReadContext jsonReadContext, JsonPath[] paths) { + // Default implementation - can be overridden by subclasses + List> results = new ArrayList<>(paths.length); + + // Read all paths + for (JsonPath path : paths) { + results.add(jsonReadContext.read(path)); + } + + // Only validate consistency if jsonFiledMissedReturnNull is false + boolean shouldValidate = !isJsonFiledMissedReturnNull(); + if (shouldValidate) { + validateResultsConsistency(results, paths); + } + + return dataFlip(results); + } +} diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/http/JsonFieldMissedReturnNullComplexTest.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/http/JsonFieldMissedReturnNullComplexTest.java new file mode 100644 index 00000000000..ea0f03906a6 --- /dev/null +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/http/JsonFieldMissedReturnNullComplexTest.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.http; + +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; +import org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider; +import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse; +import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter; +import org.apache.seatunnel.connectors.seatunnel.http.config.HttpRequestMethod; +import org.apache.seatunnel.connectors.seatunnel.http.config.JsonField; +import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceReader; +import org.apache.seatunnel.connectors.seatunnel.http.source.SimpleTextDeserializationSchema; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class JsonFieldMissedReturnNullComplexTest { + + private HttpParameter httpParameter; + private JsonField jsonField; + private SimpleTextDeserializationSchema deserializationSchema; + + @Mock private SingleSplitReaderContext context; + + @Mock private Collector collector; + + @Mock private HttpClientProvider httpClientProvider; + + @Mock private HttpResponse httpResponse; + + @BeforeEach + public void setUp() { + MockitoAnnotations.openMocks(this); + + httpParameter = new HttpParameter(); + httpParameter.setUrl("http://test-url.com"); + httpParameter.setMethod(HttpRequestMethod.GET); + + Map fields = new HashMap<>(); + fields.put("key1_1", "$.result.rows[*].key1.key1_1"); + fields.put("key2_1", "$.result.rows[*].key2.key2_1"); + jsonField = JsonField.builder().fields(fields).build(); + + // Create the schema with two string fields + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"key1_1", "key2_1"}, + new SeaTunnelDataType[] {BasicType.STRING_TYPE, BasicType.STRING_TYPE}); + deserializationSchema = new SimpleTextDeserializationSchema(rowType); + + // Setup mocks + when(httpResponse.getCode()).thenReturn(200); + when(collector.getCheckpointLock()).thenReturn(new Object()); + } + + @Test + public void testJsonFieldMissedReturnNull() throws Exception { + // Test data with missing fields Array with common parent path + String testJsonData = + "{\n" + + " \"result\": {\n" + + " \"rows\": [\n" + + " {\n" + + " \"rowNumber\": 1,\n" + + " \"key1\": {\n" + + " \"key1_1\": \"value11\"\n" + + " },\n" + + " \"key2\": {\n" + + " \"key2_1\": 100\n" + + " }\n" + + " },\n" + + " {\n" + + " \"rowNumber\": 2,\n" + + " \"key1\": {\n" + + " },\n" + + " \"key2\": {\n" + + " \"key2_1\": 200\n" + + " }\n" + + " },\n" + + " {\n" + + " \"rowNumber\": 3,\n" + + " \"key1\": {\n" + + " \"key1_1\": \"value33\"\n" + + " },\n" + + " \"key2\": {\n" + + " }\n" + + " },\n" + + " {\n" + + " \"rowNumber\": 4,\n" + + " \"key1\": {\n" + + " \"key1_1\": \"value44\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"rowNumber\": 5,\n" + + " \"key2\": {\n" + + " \"key2_1\": 500\n" + + " }\n" + + " },\n" + + " {\n" + + " \"rowNumber\": 6,\n" + + " \"key1\": null,\n" + + " \"key2\": {\n" + + " \"key2_1\": 600\n" + + " }\n" + + " },\n" + + " {\n" + + " \"rowNumber\": 7,\n" + + " \"key1\": {\n" + + " \"key1_1\": \"value77\"\n" + + " },\n" + + " \"key2\": null\n" + + " }\n" + + " ]\n" + + " }\n" + + "}"; + + // Set json_filed_missed_return_null to true + httpParameter.setJsonFiledMissedReturnNull(true); + + // Setup HTTP response + when(httpResponse.getContent()).thenReturn(testJsonData); + when(httpClientProvider.execute( + anyString(), anyString(), any(), any(), any(), any(Boolean.class))) + .thenReturn(httpResponse); + + // Create HttpSourceReader + HttpSourceReader sourceReader = + new HttpSourceReader( + httpParameter, context, deserializationSchema, jsonField, null); + + // Use reflection to inject our mocked HTTP client + sourceReader.open(); // This creates the real HTTP client + sourceReader.setHttpClient(httpClientProvider); + + // Field httpClientField = HttpSourceReader.class.getDeclaredField("httpClient"); + // httpClientField.setAccessible(true); + // httpClientField.set(sourceReader, httpClientProvider); + + // Capture the rows collected + ArgumentCaptor rowCaptor = ArgumentCaptor.forClass(SeaTunnelRow.class); + + // Call the method that processes data + sourceReader.pollNext(collector); + + // Verify collector.collect was called 3 times (once for each JSON object) + verify(collector, times(1)).collect(rowCaptor.capture()); + + // Get the captured rows + try { + String result = (rowCaptor.getValue().getFields())[0].toString(); + ObjectMapper objectMapper = new ObjectMapper(); + List list = objectMapper.readValue(result, List.class); + + // Check the first row (has both fields) + Assertions.assertEquals("value11", ((Map) list.get(0)).get("key1_1")); + Assertions.assertEquals("100", ((Map) list.get(0)).get("key2_1")); + + // Check the second row (missing key1) + Assertions.assertNull( + ((Map) list.get(1)).get("key1_1"), "Field key1 should be a JSON null"); + Assertions.assertEquals("200", ((Map) list.get(1)).get("key2_1")); + + Assertions.assertNull( + ((Map) list.get(2)).get("key2_1"), "Field key1 should be a JSON null"); + Assertions.assertEquals("value33", ((Map) list.get(2)).get("key1_1")); + + Assertions.assertNull( + ((Map) list.get(3)).get("key2_1"), "Field key1 should be a JSON null"); + Assertions.assertEquals("value44", ((Map) list.get(3)).get("key1_1")); + + Assertions.assertNull( + ((Map) list.get(4)).get("key1_1"), "Field key1 should be a JSON null"); + Assertions.assertEquals("500", ((Map) list.get(4)).get("key2_1")); + + Assertions.assertNull( + ((Map) list.get(5)).get("key1_1"), "Field key1 should be a JSON null"); + Assertions.assertEquals("600", ((Map) list.get(5)).get("key2_1")); + + Assertions.assertNull( + ((Map) list.get(6)).get("key2_1"), "Field key1 should be a JSON null"); + Assertions.assertEquals("value77", ((Map) list.get(6)).get("key1_1")); + + } catch (Exception e) { + throw new RuntimeException( + "set JsonFiledMissedReturnNull is True Unit Test is failed!", e); + } + } +} diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/http/JsonFieldMissedReturnNullTest.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/http/JsonFieldMissedReturnNullTest.java new file mode 100644 index 00000000000..21e47cd8a75 --- /dev/null +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/http/JsonFieldMissedReturnNullTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.http; + +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; +import org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider; +import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse; +import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter; +import org.apache.seatunnel.connectors.seatunnel.http.config.HttpRequestMethod; +import org.apache.seatunnel.connectors.seatunnel.http.config.JsonField; +import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceReader; +import org.apache.seatunnel.connectors.seatunnel.http.source.SimpleTextDeserializationSchema; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class JsonFieldMissedReturnNullTest { + + private HttpParameter httpParameter; + private JsonField jsonField; + private SimpleTextDeserializationSchema deserializationSchema; + + @Mock private SingleSplitReaderContext context; + + @Mock private Collector collector; + + @Mock private HttpClientProvider httpClientProvider; + + @Mock private HttpResponse httpResponse; + + @BeforeEach + public void setUp() { + MockitoAnnotations.openMocks(this); + + httpParameter = new HttpParameter(); + httpParameter.setUrl("http://test-url.com"); + httpParameter.setMethod(HttpRequestMethod.GET); + + Map fields = new HashMap<>(); + fields.put("key1", "$.result.key1"); + fields.put("key2", "$.result2.key2.key2"); + jsonField = JsonField.builder().fields(fields).build(); + + // Create the schema with two string fields + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"key1", "key2"}, + new SeaTunnelDataType[] {BasicType.STRING_TYPE, BasicType.STRING_TYPE}); + deserializationSchema = new SimpleTextDeserializationSchema(rowType); + + // Setup mocks + when(httpResponse.getCode()).thenReturn(200); + when(collector.getCheckpointLock()).thenReturn(new Object()); + } + + @Test + public void testJsonFieldMissedReturnNull() throws Exception { + // Test data with missing fields Non-array, no common parent path + String testJsonData = + "{\n" + + " \"result\": {\n" + + " \"key1\": \"value1\"\n" + + " },\n" + + " \"result2\": {}\n" + + "}"; + + // Set json_filed_missed_return_null to true + httpParameter.setJsonFiledMissedReturnNull(true); + + // Setup HTTP response + when(httpResponse.getContent()).thenReturn(testJsonData); + when(httpClientProvider.execute( + anyString(), anyString(), any(), any(), any(), any(Boolean.class))) + .thenReturn(httpResponse); + + // Create HttpSourceReader + HttpSourceReader sourceReader = + new HttpSourceReader( + httpParameter, context, deserializationSchema, jsonField, null); + + // Use reflection to inject our mocked HTTP client + sourceReader.open(); // This creates the real HTTP client + sourceReader.setHttpClient(httpClientProvider); + + // Capture the rows collected + ArgumentCaptor rowCaptor = ArgumentCaptor.forClass(SeaTunnelRow.class); + + // Call the method that processes data + sourceReader.pollNext(collector); + + // Verify collector.collect was called 1 times (once for each JSON object) + verify(collector, times(1)).collect(rowCaptor.capture()); + + // Get the captured rows + try { + String result = (rowCaptor.getValue().getFields())[0].toString(); + ObjectMapper objectMapper = new ObjectMapper(); + List list = objectMapper.readValue(result, List.class); + + // Check the first row (has both fields) + Assertions.assertEquals("value1", ((Map) list.get(0)).get("key1")); + Assertions.assertNull( + ((Map) list.get(0)).get("key2"), "Field key2 should be a JSON null"); + + } catch (Exception e) { + throw new RuntimeException( + "set JsonFiledMissedReturnNull is True Unit Test is failed!", e); + } + } +} diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/http/JsonFieldMissedReturnNullTreeFeatureTest.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/http/JsonFieldMissedReturnNullTreeFeatureTest.java new file mode 100644 index 00000000000..0234ddade35 --- /dev/null +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/http/JsonFieldMissedReturnNullTreeFeatureTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.http; + +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; +import org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider; +import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse; +import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter; +import org.apache.seatunnel.connectors.seatunnel.http.config.HttpRequestMethod; +import org.apache.seatunnel.connectors.seatunnel.http.config.JsonField; +import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceReader; +import org.apache.seatunnel.connectors.seatunnel.http.source.SimpleTextDeserializationSchema; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class JsonFieldMissedReturnNullTreeFeatureTest { + + private HttpParameter httpParameter; + private JsonField jsonField; + private SimpleTextDeserializationSchema deserializationSchema; + + @Mock private SingleSplitReaderContext context; + + @Mock private Collector collector; + + @Mock private HttpClientProvider httpClientProvider; + + @Mock private HttpResponse httpResponse; + + @BeforeEach + public void setUp() { + MockitoAnnotations.openMocks(this); + + httpParameter = new HttpParameter(); + httpParameter.setUrl("http://test-url.com"); + httpParameter.setMethod(HttpRequestMethod.GET); + + Map fields = new HashMap<>(); + fields.put("author", "$.store['book'][*].author"); + fields.put("isbn", "$.store['book'][*].isbn"); + jsonField = JsonField.builder().fields(fields).build(); + + // Create the schema with two string fields + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"author", "isbn"}, + new SeaTunnelDataType[] {BasicType.STRING_TYPE, BasicType.STRING_TYPE}); + deserializationSchema = new SimpleTextDeserializationSchema(rowType); + + // Setup mocks + when(httpResponse.getCode()).thenReturn(200); + when(collector.getCheckpointLock()).thenReturn(new Object()); + } + + @Test + public void testJsonFieldMissedReturnNull() throws Exception { + // Test data with missing fields Array with common parent path + String testJsonData = + "{\n" + + " \"store\": {\n" + + " \"book\": [\n" + + " {\n" + + " \"category\": \"reference\",\n" + + " \"author\": \"Nigel Rees\",\n" + + " \"title\": \"Sayings of the Century\",\n" + + " \"price\": 8.95\n" + + " },\n" + + " {\n" + + " \"category\": \"fiction\",\n" + + " \"author\": \"Evelyn Waugh\",\n" + + " \"title\": \"Sword of Honour\",\n" + + " \"price\": 12.99\n" + + " },\n" + + " {\n" + + " \"category\": \"fiction\",\n" + + " \"author\": \"Herman Melville\",\n" + + " \"title\": \"Moby Dick\",\n" + + " \"isbn\": \"0-553-21311-3\",\n" + + " \"price\": 8.99\n" + + " },\n" + + " {\n" + + " \"category\": \"fiction\",\n" + + " \"author\": \"J. R. R. Tolkien\",\n" + + " \"title\": \"The Lord of the Rings\",\n" + + " \"isbn\": \"0-395-19395-8\",\n" + + " \"price\": 22.99\n" + + " }\n" + + " ],\n" + + " \"bicycle\": {\n" + + " \"color\": \"red\",\n" + + " \"price\": 19.95\n" + + " }\n" + + " },\n" + + " \"expensive\": 10\n" + + "}"; + + // Set json_filed_missed_return_null to true + httpParameter.setJsonFiledMissedReturnNull(true); + + // Setup HTTP response + when(httpResponse.getContent()).thenReturn(testJsonData); + when(httpClientProvider.execute( + anyString(), anyString(), any(), any(), any(), any(Boolean.class))) + .thenReturn(httpResponse); + + // Create HttpSourceReader + HttpSourceReader sourceReader = + new HttpSourceReader( + httpParameter, context, deserializationSchema, jsonField, null); + + // Use reflection to inject our mocked HTTP client + sourceReader.open(); // This creates the real HTTP client + sourceReader.setHttpClient(httpClientProvider); + + // Capture the rows collected + ArgumentCaptor rowCaptor = ArgumentCaptor.forClass(SeaTunnelRow.class); + + // Call the method that processes data + sourceReader.pollNext(collector); + + // Verify collector.collect was called 3 times (once for each JSON object) + verify(collector, times(1)).collect(rowCaptor.capture()); + + // Get the captured rows + try { + String result = (rowCaptor.getValue().getFields())[0].toString(); + ObjectMapper objectMapper = new ObjectMapper(); + List list = objectMapper.readValue(result, List.class); + + // Check the first row (has both fields) + Assertions.assertEquals("Nigel Rees", ((Map) list.get(0)).get("author")); + Assertions.assertNull( + ((Map) list.get(0)).get("isbn"), "Field key1 should be a JSON null"); + + Assertions.assertEquals("Evelyn Waugh", ((Map) list.get(1)).get("author")); + Assertions.assertNull( + ((Map) list.get(1)).get("isbn"), "Field key1 should be a JSON null"); + + Assertions.assertEquals("Herman Melville", ((Map) list.get(2)).get("author")); + Assertions.assertEquals("0-553-21311-3", ((Map) list.get(2)).get("isbn")); + + Assertions.assertEquals("J. R. R. Tolkien", ((Map) list.get(3)).get("author")); + Assertions.assertEquals("0-395-19395-8", ((Map) list.get(3)).get("isbn")); + + } catch (Exception e) { + throw new RuntimeException( + "set JsonFiledMissedReturnNull is True Unit Test is failed!", e); + } + } +}