Skip to content

Commit 37a82be

Browse files
committed
Mapping JSON arrays as attributes #241
1 parent 842ddef commit 37a82be

File tree

5 files changed

+222
-113
lines changed

5 files changed

+222
-113
lines changed

RELEASE-NOTES.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22

33
* #239 Remove APIs deprecated in 3.0.0
44

5-
## Release 3.0.0
5+
## Release 3.0.0-RC2
66

77
* #240 Deprecate StageListener and TaskStageType
8+
* #241 Mapping JSON arrays as attributes
89

910
## Release 3.0.0-RC1
1011

link-move-json/src/main/java/com/nhl/link/move/runtime/json/JsonRowReader.java

Lines changed: 102 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -12,117 +12,107 @@
1212
class JsonRowReader implements RowReader {
1313

1414
private final JsonRowAttribute[] header;
15-
private final JsonNode rootNode;
16-
private final List<JsonNodeWrapper> items;
17-
18-
public JsonRowReader(JsonRowAttribute[] header, JsonNode rootNode, List<JsonNodeWrapper> items) {
19-
this.header = header;
20-
this.rootNode = rootNode;
21-
this.items = items;
22-
}
23-
24-
@Override
25-
public void close() {
26-
// no need to close anything
27-
}
28-
29-
@Override
30-
public JsonRowAttribute[] getHeader() {
31-
return header;
32-
}
33-
34-
@Override
35-
public Iterator<Object[]> iterator() {
36-
return new Iterator<>() {
37-
private int i = 0;
38-
39-
@Override
40-
public boolean hasNext() {
41-
return i < items.size();
42-
}
43-
44-
@Override
45-
public Object[] next() {
46-
return fromNode(items.get(i++));
47-
}
48-
49-
@Override
50-
public void remove() {
51-
throw new UnsupportedOperationException();
52-
}
53-
};
54-
}
55-
56-
private Object[] fromNode(JsonNodeWrapper node) {
57-
58-
Object[] row = new Object[header.length];
59-
60-
for (int i = 0; i < header.length; i++) {
61-
row[i] = valueFromNode(header[i], node);
62-
}
63-
64-
return row;
65-
}
66-
67-
private Object valueFromNode(JsonRowAttribute attribute, JsonNodeWrapper node) {
68-
69-
List<JsonNodeWrapper> result = attribute.getSourceQuery().execute(rootNode, node);
70-
if (result == null || result.isEmpty()) {
71-
return null;
72-
}
73-
if (result.size() > 1) {
74-
throw new LmRuntimeException("Attribute query yielded a list of values (total: " + result.size() +
75-
"). A single value is expected.");
76-
}
77-
return extractValue(result.get(0).getNode());
78-
}
79-
80-
private Object extractValue(JsonNode node) {
81-
switch (node.getNodeType()) {
82-
case ARRAY:
83-
case POJO:
84-
case OBJECT: {
85-
throw new LmRuntimeException("Expected a value node");
86-
}
87-
case MISSING:
88-
case NULL: {
89-
return null;
90-
}
91-
case BINARY:
92-
case STRING: {
93-
return node.asText();
94-
}
95-
case BOOLEAN: {
96-
return node.asBoolean();
97-
}
98-
case NUMBER: {
99-
NumericNode numericNode = (NumericNode) node;
100-
switch (numericNode.numberType()) {
101-
case INT: {
102-
return numericNode.intValue();
103-
}
104-
case LONG: {
105-
return numericNode.longValue();
106-
}
107-
case FLOAT: {
108-
return numericNode.floatValue();
109-
}
110-
case DOUBLE: {
111-
return numericNode.doubleValue();
112-
}
113-
case BIG_INTEGER: {
114-
return numericNode.bigIntegerValue();
115-
}
116-
case BIG_DECIMAL: {
117-
return numericNode.decimalValue();
118-
}
119-
// intentionally fall through
120-
}
121-
}
122-
default: {
123-
throw new LmRuntimeException("Unexpected JSON node type: " + node.getNodeType());
124-
}
125-
}
126-
}
15+
private final JsonNode rootNode;
16+
private final List<JsonNodeWrapper> items;
17+
18+
public JsonRowReader(JsonRowAttribute[] header, JsonNode rootNode, List<JsonNodeWrapper> items) {
19+
this.header = header;
20+
this.rootNode = rootNode;
21+
this.items = items;
22+
}
23+
24+
@Override
25+
public void close() {
26+
// no need to close anything
27+
}
28+
29+
@Override
30+
public JsonRowAttribute[] getHeader() {
31+
return header;
32+
}
33+
34+
@Override
35+
public Iterator<Object[]> iterator() {
36+
return new Iterator<>() {
37+
private int i = 0;
38+
39+
@Override
40+
public boolean hasNext() {
41+
return i < items.size();
42+
}
43+
44+
@Override
45+
public Object[] next() {
46+
return fromNode(items.get(i++));
47+
}
48+
49+
@Override
50+
public void remove() {
51+
throw new UnsupportedOperationException();
52+
}
53+
};
54+
}
55+
56+
private Object[] fromNode(JsonNodeWrapper node) {
57+
58+
Object[] row = new Object[header.length];
59+
60+
for (int i = 0; i < header.length; i++) {
61+
row[i] = valueFromNode(header[i], node);
62+
}
63+
64+
return row;
65+
}
66+
67+
private Object valueFromNode(JsonRowAttribute attribute, JsonNodeWrapper node) {
68+
69+
List<JsonNodeWrapper> result = attribute.getSourceQuery().execute(rootNode, node);
70+
if (result == null || result.isEmpty()) {
71+
return null;
72+
}
73+
if (result.size() > 1) {
74+
throw new LmRuntimeException("Attribute query yielded a list of values (total: " + result.size() +
75+
"). A single value is expected.");
76+
}
77+
return extractValue(result.get(0).getNode());
78+
}
79+
80+
private Object extractValue(JsonNode node) {
81+
82+
// extract values where we can; for the rest just return the unchanged JSON node
83+
84+
switch (node.getNodeType()) {
85+
case STRING:
86+
return node.asText();
87+
case BOOLEAN:
88+
return node.asBoolean();
89+
case NUMBER: {
90+
NumericNode numericNode = (NumericNode) node;
91+
switch (numericNode.numberType()) {
92+
case INT: {
93+
return numericNode.intValue();
94+
}
95+
case LONG: {
96+
return numericNode.longValue();
97+
}
98+
case FLOAT: {
99+
return numericNode.floatValue();
100+
}
101+
case DOUBLE: {
102+
return numericNode.doubleValue();
103+
}
104+
case BIG_INTEGER: {
105+
return numericNode.bigIntegerValue();
106+
}
107+
case BIG_DECIMAL: {
108+
return numericNode.decimalValue();
109+
}
110+
// intentionally fall through
111+
}
112+
}
113+
default:
114+
return node;
115+
}
116+
}
127117

128118
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package com.nhl.link.move.runtime.json;
2+
3+
import com.fasterxml.jackson.databind.JsonNode;
4+
import com.nhl.link.move.Execution;
5+
import com.nhl.link.move.LmTask;
6+
import com.nhl.link.move.connect.StreamConnector;
7+
import com.nhl.link.move.connect.URLConnector;
8+
import com.nhl.link.move.json.unit.LmJsonIntegrationTest;
9+
import com.nhl.link.move.json.unit.cayenne.t.Etlt1;
10+
import com.nhl.link.move.runtime.LmRuntimeBuilder;
11+
import com.nhl.link.move.runtime.task.createorupdate.CreateOrUpdateStage;
12+
import org.dflib.DataFrame;
13+
import org.dflib.Exp;
14+
import org.junit.jupiter.api.Test;
15+
16+
import java.net.URL;
17+
18+
public class CreateOrUpdate_ArrayAttributeIT extends LmJsonIntegrationTest {
19+
20+
@Override
21+
protected LmRuntimeBuilder testRuntimeBuilder() {
22+
URL url = getClass()
23+
.getClassLoader()
24+
.getResource("com/nhl/link/move/json/src_json/etl1_src_array_attribute.json");
25+
26+
return super
27+
.testRuntimeBuilder()
28+
.connector(StreamConnector.class, "etl_src_id", URLConnector.of(url));
29+
}
30+
31+
@Test
32+
public void testCreateOrUpdate() {
33+
34+
etlt1().insertColumns("id", "num_int", "string")
35+
.values(15, 12, null)
36+
.values(16, 1, "z")
37+
.exec();
38+
39+
LmTask task = lmRuntime.getTaskService()
40+
.createOrUpdate(Etlt1.class)
41+
.sourceExtractor("com/nhl/link/move/json/extractor/etl1_src_to_etl1t_array_attribute.xml")
42+
.stage(CreateOrUpdateStage.EXTRACT_SOURCE_ROWS, (e, s) -> {
43+
DataFrame df = s.getSourceRows().cols("strings").merge(Exp.$col("strings", JsonNode.class).mapVal(n -> {
44+
if (n == null || n.size() == 0) {
45+
return null;
46+
}
47+
48+
StringBuilder out = new StringBuilder();
49+
for (JsonNode cn : n) {
50+
if (out.length() > 0) {
51+
out.append(",");
52+
}
53+
out.append(cn.asText());
54+
}
55+
56+
return out.toString();
57+
}));
58+
59+
s.setSourceRows(df);
60+
})
61+
.task();
62+
63+
Execution e1 = task.run();
64+
assertExec(3, 2, 1, 0, e1);
65+
etlt1().matcher().assertMatches(4);
66+
etlt1().matcher().eq("id", 12).andEq("num_int", 24).andEq("string", "s,t").assertOneMatch();
67+
etlt1().matcher().eq("id", 15).andEq("num_int", 11).andEq("string", null).assertOneMatch();
68+
etlt1().matcher().eq("id", -1).andEq("num_int", 4).andEq("string", null).assertOneMatch();
69+
}
70+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?xml version="1.0" encoding="utf-8"?>
2+
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="https://nhl.github.io/link-move/xsd/extractor_config_3.xsd https://nhl.github.io/link-move/xsd/extractor_config_3.xsd"
4+
xmlns="https://nhl.github.io/link-move/xsd/extractor_config_3.xsd">
5+
6+
<type>json</type>
7+
<connectorId>etl_src_id</connectorId>
8+
<extractor>
9+
<attributes>
10+
<attribute>
11+
<type>java.lang.Integer</type>
12+
<source>id</source>
13+
<target>db:id</target>
14+
</attribute>
15+
<attribute>
16+
<type>java.lang.Integer</type>
17+
<source>numInt</source>
18+
<target>numInt</target>
19+
</attribute>
20+
<attribute>
21+
<type>java.lang.String</type>
22+
<source>strings</source>
23+
<target>string</target>
24+
</attribute>
25+
</attributes>
26+
<properties>
27+
<extractor.json.path>$.root[*]</extractor.json.path>
28+
</properties>
29+
</extractor>
30+
</config>
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
{
2+
"root": [
3+
{
4+
"id": 12,
5+
"numInt": 24,
6+
"strings": ["s","t"]
7+
},
8+
{
9+
"id": 15,
10+
"numInt": 11,
11+
"strings": []
12+
},
13+
{
14+
"id": -1,
15+
"numInt": 4
16+
}
17+
]
18+
}

0 commit comments

Comments
 (0)