Skip to content

Commit 62dadee

Browse files
fix: Support abstract data types in OPC-UA adapter (#4178)
1 parent f6cb310 commit 62dadee

File tree

19 files changed

+171
-221
lines changed

19 files changed

+171
-221
lines changed

streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ private void prepareAdapter() throws AdapterException {
9999
this.connectedClient = clientProvider.getClient(this.opcUaAdapterConfig);
100100
OpcUaNodeBrowser browserClient =
101101
new OpcUaNodeBrowser(this.connectedClient.getClient(), this.opcUaAdapterConfig);
102-
this.nodeProvider = browserClient.makeNodeProvider(List.of());
102+
this.nodeProvider = browserClient.makeNodeProvider();
103103
this.allNodes = nodeProvider.getNodes();
104104

105105
if (opcUaAdapterConfig.inPullMode()) {

streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeBrowser.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,10 @@ public OpcUaNodeBrowser(
6060
this.spOpcConfig = spOpcUaClientConfig;
6161
}
6262

63-
public OpcUaNodeProvider makeNodeProvider(List<String> runtimeNameFilters) throws UaException {
63+
public OpcUaNodeProvider makeNodeProvider() throws UaException {
6464
var opcNodes = new ArrayList<OpcUaNode>();
6565
for (String selectedNodeName : this.spOpcConfig.getSelectedNodeNames()) {
66-
opcNodes.add(toOpcNode(selectedNodeName, runtimeNameFilters));
66+
opcNodes.add(toOpcNode(selectedNodeName));
6767
}
6868

6969
return new OpcUaNodeProvider(opcNodes);
@@ -79,8 +79,7 @@ public List<TreeInputNode> buildNodeTreeFromOrigin(String nextBaseNodeToResolve)
7979
return findChildren(client, currentNodeId);
8080
}
8181

82-
private OpcUaNode toOpcNode(String nodeName,
83-
List<String> runtimeNamesToDelete) throws UaException {
82+
private OpcUaNode toOpcNode(String nodeName) throws UaException {
8483
AddressSpace addressSpace = getAddressSpace();
8584

8685
NodeId nodeId;
@@ -109,8 +108,9 @@ private OpcUaNode toOpcNode(String nodeName,
109108
);
110109

111110
if (node instanceof VariableNode) {
111+
var dataValue = ((VariableNode) node).getValue();
112112
var nodeInfo = new BasicVariableNodeInfo((VariableNode) node, spOpcConfig.getNamingStrategy());
113-
return OpcUaNodeFactory.createOpcUaNode(nodeInfo, runtimeNamesToDelete);
113+
return OpcUaNodeFactory.createOpcUaNode(nodeInfo, dataValue);
114114
}
115115

116116
LOG.warn("Node {} not of type VariableNode", node.getDisplayName());

streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeMetadataExtractor.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.eclipse.milo.opcua.stack.core.UaException;
2626
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
2727
import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
28+
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
2829
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
2930

3031
import java.util.HashMap;
@@ -57,10 +58,12 @@ public Map<String, Object> extract() {
5758
var dataTypeNode = client.getAddressSpace().getNode(dataTypeNodeId);
5859
var value = client.readValue(0, TimestampsToReturn.Both, node.getNodeId()).get();
5960

61+
extractNodeId((UaVariableNode) node);
6062
extractSourceTime(value);
6163
extractServerTime(value);
6264
extractStatusCode(value);
6365
extractDataType(dataTypeNode);
66+
extractDataTypeNodeId(dataTypeNodeId);
6467
} catch (UaException | ExecutionException | InterruptedException e) {
6568
throw new RuntimeException(e);
6669
}
@@ -69,20 +72,36 @@ public Map<String, Object> extract() {
6972
return metadata;
7073
}
7174

75+
public void extractNodeId(UaVariableNode node) {
76+
if (node != null && node.getNodeId() != null) {
77+
add("Node ID", node.getNodeId().toParseableString());
78+
} else {
79+
add("Node ID", "N/A");
80+
}
81+
}
82+
83+
public void extractDataTypeNodeId(NodeId dataTypeNodeId) {
84+
if (dataTypeNodeId != null) {
85+
add("Data Type Node ID", dataTypeNodeId.toParseableString());
86+
} else {
87+
add("Data Type Node ID", "N/A");
88+
}
89+
}
90+
7291

7392
public void extractDescription() {
7493
if (node.getDescription() != null) {
7594
add("Description", node.getDescription().getText());
7695
} else {
77-
add("Description", "");
96+
add("Description", "N/A");
7897
}
7998
}
8099

81100
public void extractNamespaceIndex() {
82101
if (node.getNodeId() != null) {
83102
add("NamespaceIndex", node.getNodeId().getNamespaceIndex().toString());
84103
} else {
85-
add("NamespaceIndex", "");
104+
add("NamespaceIndex", "N/A");
86105
}
87106
}
88107

@@ -98,15 +117,15 @@ public void extractBrowseName() {
98117
if (node.getBrowseName() != null) {
99118
add("BrowseName", node.getBrowseName().getName());
100119
} else {
101-
add("BrowseName", "");
120+
add("BrowseName", "N/A");
102121
}
103122
}
104123

105124
public void extractDisplayName() {
106125
if (node.getDisplayName() != null) {
107126
add("DisplayName", node.getDisplayName().getText());
108127
} else {
109-
add("DisplayName", "");
128+
add("DisplayName", "N/A");
110129
}
111130
}
112131

streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaSchemaProvider.java

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,13 @@
2727
import org.apache.streampipes.extensions.connectors.opcua.model.node.OpcUaNode;
2828
import org.apache.streampipes.model.connect.guess.FieldStatusInfo;
2929
import org.apache.streampipes.model.connect.guess.SampleData;
30-
import org.apache.streampipes.model.schema.EventProperty;
31-
import org.apache.streampipes.model.schema.EventSchema;
32-
import org.apache.streampipes.sdk.builder.adapter.GuessSchemaBuilder;
3330
import org.apache.streampipes.sdk.builder.adapter.SampleDataBuilder;
3431

3532
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
3633
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
3734
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
3835
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
3936

40-
import java.util.ArrayList;
4137
import java.util.HashMap;
4238
import java.util.List;
4339
import java.util.Map;
@@ -56,11 +52,8 @@ public SampleData getSampleData(OpcUaClientProvider clientProvider,
5652
IAdapterParameterExtractor extractor,
5753
IStreamPipesClient streamPipesClient)
5854
throws AdapterException, ParseException {
59-
var builder = GuessSchemaBuilder.create();
60-
EventSchema eventSchema = new EventSchema();
6155
Map<String, Object> eventPreview = new HashMap<>();
6256
Map<String, FieldStatusInfo> fieldStatusInfos = new HashMap<>();
63-
List<EventProperty> allProperties = new ArrayList<>();
6457

6558
var opcUaConfig = SpOpcUaConfigExtractor.extractAdapterConfig(
6659
extractor.getStaticPropertyExtractor(),
@@ -70,15 +63,9 @@ public SampleData getSampleData(OpcUaClientProvider clientProvider,
7063
var connectedClient = clientProvider.getClient(opcUaConfig);
7164
OpcUaNodeBrowser nodeBrowser =
7265
new OpcUaNodeBrowser(connectedClient.getClient(), opcUaConfig);
73-
var nodeProvider = nodeBrowser.makeNodeProvider(List.of());
66+
var nodeProvider = nodeBrowser.makeNodeProvider();
7467
var selectedNodes = nodeProvider.getNodes();
7568

76-
if (!selectedNodes.isEmpty()) {
77-
for (OpcUaNode opcNode : selectedNodes) {
78-
opcNode.addToSchema(connectedClient.getClient(), allProperties);
79-
}
80-
}
81-
8269
var nodeIds = selectedNodes.stream()
8370
.map(node -> node.nodeInfo().getNodeId())
8471
.collect(Collectors.toList());
@@ -96,12 +83,10 @@ public SampleData getSampleData(OpcUaClientProvider clientProvider,
9683
clientProvider.releaseClient(opcUaConfig);
9784
}
9885

99-
var sampleData = SampleDataBuilder.create()
86+
return SampleDataBuilder.create()
10087
.sample(eventPreview)
88+
.fieldStatusInfos(fieldStatusInfos)
10189
.build();
102-
103-
104-
return sampleData;
10590
}
10691

10792
private static void makeEventPreview(

streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/model/OpcUaNodeFactory.java

Lines changed: 79 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,89 @@
2222
import org.apache.streampipes.extensions.connectors.opcua.model.node.ExtensionObjectOpcUaNode;
2323
import org.apache.streampipes.extensions.connectors.opcua.model.node.OpcUaNode;
2424
import org.apache.streampipes.extensions.connectors.opcua.model.node.PrimitiveOpcUaNode;
25-
import org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaTypes;
2625

27-
import java.util.List;
26+
import org.eclipse.milo.opcua.stack.core.BuiltinDataType;
27+
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
28+
import org.eclipse.milo.opcua.stack.core.types.builtin.ExtensionObject;
29+
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
30+
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
31+
32+
import java.lang.reflect.Array;
33+
import java.util.Objects;
2834

2935
public class OpcUaNodeFactory {
3036

31-
public static OpcUaNode createOpcUaNode(BasicVariableNodeInfo nodeInfo,
32-
List<String> runtimeNamesToDelete) {
33-
if (OpcUaTypes.isExtensionOrCustom(nodeInfo.getNode())) {
34-
return new ExtensionObjectOpcUaNode(nodeInfo, runtimeNamesToDelete);
35-
} else {
36-
return new PrimitiveOpcUaNode(nodeInfo, runtimeNamesToDelete);
37+
public static OpcUaNode createOpcUaNode(
38+
BasicVariableNodeInfo nodeInfo,
39+
DataValue dataValue
40+
) {
41+
var hasVariant = hasVariant(dataValue);
42+
if (hasVariant) {
43+
var byValue = isExtensionByValue(dataValue);
44+
return byValue
45+
? new ExtensionObjectOpcUaNode(nodeInfo)
46+
: new PrimitiveOpcUaNode(nodeInfo);
47+
}
48+
49+
return isExtensionByDataType(nodeInfo)
50+
? new ExtensionObjectOpcUaNode(nodeInfo)
51+
: new PrimitiveOpcUaNode(nodeInfo);
52+
}
53+
54+
private static boolean hasVariant(DataValue dataValue) {
55+
if (dataValue == null) {
56+
return false;
3757
}
58+
59+
StatusCode sc = dataValue.getStatusCode();
60+
if (sc == null || !sc.isGood()) {
61+
return false;
62+
}
63+
64+
return dataValue.getValue() != null;
65+
}
66+
67+
/**
68+
* @return TRUE -> value is (or contains) ExtensionObject
69+
* FALSE -> value is present and not ExtensionObject
70+
*/
71+
private static boolean isExtensionByValue(DataValue dv) {
72+
Object v = dv.getValue().getValue();
73+
if (v == null) {
74+
return false;
75+
}
76+
77+
if (v instanceof ExtensionObject) {
78+
return true;
79+
}
80+
81+
// Handle arrays of any kind (Object[] or primitive arrays)
82+
Class<?> c = v.getClass();
83+
if (c.isArray()) {
84+
int len = Array.getLength(v);
85+
for (int i = 0; i < len; i++) {
86+
Object el = Array.get(v, i);
87+
if (el instanceof ExtensionObject) {
88+
return true;
89+
}
90+
}
91+
return false;
92+
}
93+
94+
return false;
95+
}
96+
97+
/**
98+
* Conservative fallback based on declared DataType only.
99+
* Treat only ExtensionObject itself as "extension" here.
100+
*
101+
* Why so conservative? Because abstract standard types like Integer/Number
102+
* are NOT builtins but are still "primitive-ish" and should not be treated
103+
* as custom/extension.
104+
*/
105+
private static boolean isExtensionByDataType(BasicVariableNodeInfo nodeInfo) {
106+
NodeId dt = nodeInfo.getNode().getDataType();
107+
return Objects.equals(dt, BuiltinDataType.ExtensionObject.getNodeId());
38108
}
39109
}
110+

streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/model/node/ExtensionObjectOpcUaNode.java

Lines changed: 3 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,29 +19,22 @@
1919
package org.apache.streampipes.extensions.connectors.opcua.model.node;
2020

2121
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
22-
import org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaTypes;
2322
import org.apache.streampipes.model.connect.guess.FieldStatus;
2423
import org.apache.streampipes.model.connect.guess.FieldStatusInfo;
25-
import org.apache.streampipes.model.schema.EventProperty;
26-
import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
2724

2825
import org.eclipse.milo.opcua.binaryschema.Struct;
2926
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
3027
import org.eclipse.milo.opcua.stack.core.types.builtin.ExtensionObject;
3128
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
3229

33-
import java.util.List;
3430
import java.util.Map;
3531

3632
public class ExtensionObjectOpcUaNode implements OpcUaNode {
3733

3834
private final BasicVariableNodeInfo nodeInfo;
39-
private final List<String> runtimeNamesToDelete;
4035

41-
public ExtensionObjectOpcUaNode(BasicVariableNodeInfo nodeInfo,
42-
List<String> runtimeNamesToDelete) {
36+
public ExtensionObjectOpcUaNode(BasicVariableNodeInfo nodeInfo) {
4337
this.nodeInfo = nodeInfo;
44-
this.runtimeNamesToDelete = runtimeNamesToDelete;
4538
}
4639

4740
@Override
@@ -52,26 +45,7 @@ public BasicVariableNodeInfo nodeInfo() {
5245
@Override
5346
public int getNumberOfEventProperties(OpcUaClient client) {
5447
var struct = extractStruct(client, nodeInfo.getNode().getValue().getValue());
55-
return (int) struct.getMembers().entrySet().stream()
56-
.filter(entry -> {
57-
var nodeName = nodeInfo.getDesiredName(entry.getKey());
58-
return !runtimeNamesToDelete.contains(nodeName);
59-
})
60-
.count();
61-
}
62-
63-
@Override
64-
public void addToSchema(OpcUaClient client,
65-
List<EventProperty> eventProperties) {
66-
var struct = extractStruct(client, nodeInfo.getNode().getValue().getValue());
67-
struct.getMembers().forEach((key, member) -> {
68-
var nodeName = nodeInfo.getDesiredName(key);
69-
eventProperties.add(
70-
PrimitivePropertyBuilder.create(OpcUaTypes.getTypeFromValue(member.getValue()), nodeName)
71-
.label(nodeName)
72-
.build()
73-
);
74-
});
48+
return struct.getMembers().size();
7549
}
7650

7751
@Override
@@ -82,9 +56,7 @@ public void addToEvent(OpcUaClient client,
8256

8357
struct.getMembers().forEach((key, member) -> {
8458
var nodeName = nodeInfo.getDesiredName(key);
85-
if (!runtimeNamesToDelete.contains(nodeName)) {
86-
event.put(nodeName, member.getValue());
87-
}
59+
event.put(nodeName, member.getValue());
8860
});
8961
}
9062

streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/model/node/OpcUaNode.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,10 @@
1919
package org.apache.streampipes.extensions.connectors.opcua.model.node;
2020

2121
import org.apache.streampipes.model.connect.guess.FieldStatusInfo;
22-
import org.apache.streampipes.model.schema.EventProperty;
2322

2423
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
2524
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
2625

27-
import java.util.List;
2826
import java.util.Map;
2927

3028
public interface OpcUaNode {
@@ -33,9 +31,6 @@ public interface OpcUaNode {
3331

3432
int getNumberOfEventProperties(OpcUaClient client);
3533

36-
void addToSchema(OpcUaClient client,
37-
List<EventProperty> eventProperties);
38-
3934
void addToEvent(OpcUaClient client,
4035
Map<String, Object> event,
4136
Variant variant);

0 commit comments

Comments
 (0)