Skip to content

Commit bc6b0a3

Browse files
author
ocean-zhc
committed
Solving the problem of data disorganization
1 parent 3779e9b commit bc6b0a3

File tree

3 files changed

+127
-79
lines changed

3 files changed

+127
-79
lines changed

Diff for: seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java

+112-20
Original file line numberDiff line numberDiff line change
@@ -257,31 +257,75 @@ private List<Map<String, String>> parseToMap(List<List<String>> datas, JsonField
257257

258258
private List<List<String>> decodeJSON(String data) {
259259
ReadContext jsonReadContext = JsonPath.using(jsonConfiguration).parse(data);
260-
List<List<String>> results = new ArrayList<>(jsonPaths.length);
261-
for (JsonPath path : jsonPaths) {
262-
List<String> result = jsonReadContext.read(path);
263-
results.add(result);
264-
}
260+
265261
if (httpParameter.isJsonFiledMissedReturnNull()) {
266-
int maxLength = 0;
267-
for (List<?> result : results) {
268-
maxLength = Math.max(maxLength, result.size());
262+
// Extract the common parent path from all configured paths
263+
String commonParentPath = extractCommonParentPath(jsonPaths);
264+
if (commonParentPath == null) {
265+
throw new HttpConnectorException(
266+
HttpConnectorErrorCode.FIELD_DATA_IS_INCONSISTENT,
267+
"Could not find common parent path in JsonPaths. All paths must share a common array parent.");
268+
}
269+
270+
// Get all objects under the common parent path
271+
List<Map<String, Object>> objects;
272+
try {
273+
objects = jsonReadContext.read(commonParentPath);
274+
} catch (Exception e) {
275+
throw new HttpConnectorException(
276+
HttpConnectorErrorCode.FIELD_DATA_IS_INCONSISTENT,
277+
"Failed to read data from JSON using path "
278+
+ commonParentPath
279+
+ ": "
280+
+ e.getMessage());
269281
}
270-
for (int i = 0; i < results.size(); i++) {
271-
List<String> result = results.get(i);
272-
if (result.size() < maxLength) {
273-
log.warn(
274-
"Field [{}] with size ({}) is less than max size ({}), will be padded with null values. "
275-
+ "This may happen when JSON paths return different numbers of elements.",
276-
jsonPaths[i].getPath(),
277-
result.size(),
278-
maxLength);
279-
for (int j = result.size(); j < maxLength; j++) {
280-
result.add(null);
282+
283+
// Create results list for each field
284+
List<List<String>> results = new ArrayList<>(jsonPaths.length);
285+
for (int i = 0; i < jsonPaths.length; i++) {
286+
results.add(new ArrayList<>(objects.size()));
287+
}
288+
289+
// For each object in the array
290+
for (Map<String, Object> obj : objects) {
291+
// For each configured field
292+
for (int i = 0; i < jsonPaths.length; i++) {
293+
String fieldPath = jsonPaths[i].getPath();
294+
// Get the relative path from the common parent to the field
295+
String relativePath = getRelativePath(commonParentPath, fieldPath);
296+
297+
try {
298+
// Create a new context for this object
299+
ReadContext objContext = JsonPath.using(jsonConfiguration).parse(obj);
300+
Object value = objContext.read(relativePath);
301+
String stringValue = null;
302+
if (value != null) {
303+
if (value instanceof List) {
304+
// If it's a list with one element, take that element
305+
List<?> list = (List<?>) value;
306+
if (!list.isEmpty()) {
307+
stringValue = list.get(0).toString();
308+
}
309+
} else {
310+
stringValue = value.toString();
311+
}
312+
}
313+
results.get(i).add(stringValue);
314+
} catch (Exception e) {
315+
// If field doesn't exist in this object, add null
316+
results.get(i).add(null);
281317
}
282318
}
283319
}
320+
321+
return dataFlip(results);
284322
} else {
323+
// Original implementation for when isJsonFiledMissedReturnNull is false
324+
List<List<String>> results = new ArrayList<>(jsonPaths.length);
325+
for (JsonPath path : jsonPaths) {
326+
List<String> result = jsonReadContext.read(path);
327+
results.add(result);
328+
}
285329
for (int i = 1; i < results.size(); i++) {
286330
List<?> result0 = results.get(0);
287331
List<?> result = results.get(i);
@@ -296,8 +340,56 @@ private List<List<String>> decodeJSON(String data) {
296340
result.size()));
297341
}
298342
}
343+
344+
return dataFlip(results);
345+
}
346+
}
347+
348+
private String extractCommonParentPath(JsonPath[] paths) {
349+
if (paths == null || paths.length == 0) {
350+
return null;
351+
}
352+
353+
// Get all paths as strings
354+
String[] pathStrings = new String[paths.length];
355+
for (int i = 0; i < paths.length; i++) {
356+
pathStrings[i] = paths[i].getPath();
357+
}
358+
359+
// Find the array notation position in the first path
360+
String firstPath = pathStrings[0];
361+
int arrayPos = firstPath.indexOf("[*]");
362+
if (arrayPos == -1) {
363+
throw new HttpConnectorException(
364+
HttpConnectorErrorCode.FIELD_DATA_IS_INCONSISTENT,
365+
"No array notation [*] found in path: " + firstPath);
366+
}
367+
368+
// Get the parent path up to and including [*]
369+
String parentPath = firstPath.substring(0, arrayPos + 3);
370+
371+
// Verify all other paths have the same parent
372+
for (int i = 1; i < pathStrings.length; i++) {
373+
if (!pathStrings[i].startsWith(parentPath)) {
374+
throw new HttpConnectorException(
375+
HttpConnectorErrorCode.FIELD_DATA_IS_INCONSISTENT,
376+
String.format(
377+
"Paths have different array parents. Expected '%s' but found path starting with '%s'",
378+
parentPath, pathStrings[i]));
379+
}
380+
}
381+
382+
return parentPath;
383+
}
384+
385+
private String getRelativePath(String parentPath, String fullPath) {
386+
// Remove the parent path (including [*]) and keep everything after
387+
String relativePart = fullPath.substring(parentPath.length());
388+
// If the relative part starts with a dot, remove it
389+
if (relativePart.startsWith(".")) {
390+
relativePart = relativePart.substring(1);
299391
}
300-
return dataFlip(results);
392+
return "$." + relativePart;
301393
}
302394

303395
private String getPartOfJson(String data) {

Diff for: seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_to_console.conf

+5-4
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,18 @@ env {
2525
source{
2626
Http {
2727
url = "http://mockserver:1080/example/httpFixParsingJson"
28+
#json_filed_missed_return_null = true
2829
method = "POST"
2930
format = "json"
3031
json_field = {
31-
barcode = "$.result.rows[*].item.barcode"
32-
amount = "$.result.rows[*].delivery_commission.amount"
32+
key1 = "$[*].key1"
33+
key2 = "$[*].key2"
3334
}
3435

3536
schema = {
3637
fields {
37-
barcode = string
38-
amount= string
38+
key1 = string
39+
key2= string
3940
}
4041
}
4142
}

Diff for: seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json

+10-55
Original file line numberDiff line numberDiff line change
@@ -4780,62 +4780,17 @@
47804780
"method": "POST"
47814781
},
47824782
"httpResponse": {
4783-
"body": [
4783+
"body": [{
4784+
"key1":"value11",
4785+
"key2":"value22"
4786+
},
47844787
{
4785-
"result": {
4786-
"rows": [
4787-
{
4788-
"rowNumber": 1,
4789-
"item": {
4790-
"name": "Джинсы Feimailis",
4791-
"offer_id": "AB926-52E-7-30",
4792-
"barcode": "OZN201543969",
4793-
"sku": 201543969
4794-
},
4795-
"seller_price_per_instance": 1028,
4796-
"delivery_commission": {
4797-
"price_per_instance": 782,
4798-
"quantity": 1,
4799-
"amount": 782,
4800-
"compensation": 0,
4801-
"commission": 0,
4802-
"bonus": 244.04,
4803-
"standard_fee": 128.5,
4804-
"total": 899.5,
4805-
"stars": 0,
4806-
"bank_coinvestment": 1.96,
4807-
"pick_up_point_coinvestment": 0
4808-
},
4809-
"return_commission": null,
4810-
"commission_ratio": 0.125
4811-
},
4812-
{
4813-
"rowNumber": 2,
4814-
"item": {
4815-
"name": "Джинсы Feimailis",
4816-
"offer_id": "AF979-4-25",
4817-
"barcode": "OZN201550625",
4818-
"sku": 201550625
4819-
},
4820-
"seller_price_per_instance": 737,
4821-
"delivery_commission": {
4822-
"price_per_instance": 521,
4823-
"quantity": 1,
4824-
"amount": 521,
4825-
"compensation": 0,
4826-
"commission": 0,
4827-
"bonus": 214.7,
4828-
"standard_fee": 92.13,
4829-
"total": 644.87,
4830-
"stars": 0,
4831-
"bank_coinvestment": 1.3,
4832-
"pick_up_point_coinvestment": 0
4833-
},
4834-
"return_commission": null,
4835-
"commission_ratio": 0.125
4836-
}
4837-
]
4838-
}
4788+
"key2":"value33",
4789+
"key3":"value44"
4790+
},
4791+
{
4792+
"key1":"value55",
4793+
"key3":"value66"
48394794
}
48404795
],
48414796
"headers": {

0 commit comments

Comments
 (0)