Skip to content

Commit d1249da

Browse files
Pipe: Implemented OPC DA Sink for local COM & Fixed the newest value of OPC UA Sink (#14964) (#14978)
Co-authored-by: Steve Yurong Su <[email protected]>
1 parent 2fc1f06 commit d1249da

File tree

11 files changed

+815
-15
lines changed

11 files changed

+815
-15
lines changed

iotdb-core/datanode/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,14 @@
165165
<groupId>org.slf4j</groupId>
166166
<artifactId>slf4j-api</artifactId>
167167
</dependency>
168+
<dependency>
169+
<groupId>net.java.dev.jna</groupId>
170+
<artifactId>jna</artifactId>
171+
</dependency>
172+
<dependency>
173+
<groupId>net.java.dev.jna</groupId>
174+
<artifactId>jna-platform</artifactId>
175+
</dependency>
168176
<dependency>
169177
<groupId>io.jsonwebtoken</groupId>
170178
<artifactId>jjwt-api</artifactId>

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionConnectorConstructor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper;
2626
import org.apache.iotdb.db.pipe.connector.protocol.airgap.IoTDBDataRegionAirGapConnector;
2727
import org.apache.iotdb.db.pipe.connector.protocol.legacy.IoTDBLegacyPipeConnector;
28+
import org.apache.iotdb.db.pipe.connector.protocol.opcda.OpcDaConnector;
2829
import org.apache.iotdb.db.pipe.connector.protocol.opcua.OpcUaConnector;
2930
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.PipeConsensusAsyncConnector;
3031
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
@@ -65,6 +66,8 @@ protected void initConstructors() {
6566
BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName(), WebSocketConnector::new);
6667
pluginConstructors.put(
6768
BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName(), OpcUaConnector::new);
69+
pluginConstructors.put(
70+
BuiltinPipePlugin.OPC_DA_CONNECTOR.getPipePluginName(), OpcDaConnector::new);
6871
pluginConstructors.put(
6972
BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), DoNothingConnector::new);
7073
pluginConstructors.put(
@@ -91,6 +94,7 @@ protected void initConstructors() {
9194
pluginConstructors.put(
9295
BuiltinPipePlugin.WEBSOCKET_SINK.getPipePluginName(), WebSocketConnector::new);
9396
pluginConstructors.put(BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName(), OpcUaConnector::new);
97+
pluginConstructors.put(BuiltinPipePlugin.OPC_DA_SINK.getPipePluginName(), OpcDaConnector::new);
9498
pluginConstructors.put(
9599
BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), DoNothingConnector::new);
96100
pluginConstructors.put(
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.pipe.connector.protocol.opcda;
21+
22+
import org.apache.iotdb.db.pipe.connector.protocol.opcua.OpcUaConnector;
23+
import org.apache.iotdb.pipe.api.PipeConnector;
24+
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
25+
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
26+
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
27+
import org.apache.iotdb.pipe.api.event.Event;
28+
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
29+
30+
import org.apache.tsfile.utils.Pair;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
import java.util.Map;
35+
import java.util.Objects;
36+
import java.util.concurrent.ConcurrentHashMap;
37+
import java.util.concurrent.atomic.AtomicInteger;
38+
39+
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_DA_CLSID_KEY;
40+
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_DA_PROGID_KEY;
41+
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_OPC_DA_CLSID_KEY;
42+
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_OPC_DA_PROGID_KEY;
43+
44+
/**
45+
* Send data in IoTDB based on Opc Da protocol, using JNA. All data are converted into tablets, and
46+
* then push the newest value to the <b>local COM</b> server in another process.
47+
*/
48+
public class OpcDaConnector implements PipeConnector {
49+
private static final Logger LOGGER = LoggerFactory.getLogger(OpcDaConnector.class);
50+
private static final Map<String, Pair<AtomicInteger, OpcDaServerHandle>>
51+
CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP = new ConcurrentHashMap<>();
52+
private String clsID;
53+
private OpcDaServerHandle handle;
54+
55+
@Override
56+
public void validate(final PipeParameterValidator validator) throws Exception {
57+
// TODO: upgrade this logic after "1 in 2" logic is supported
58+
validator.validate(
59+
args ->
60+
(((boolean) args[1] || (boolean) args[2] || (boolean) args[3] || (boolean) args[4])),
61+
String.format(
62+
"One of '%s', '%s', '%s' and '%s' must be specified",
63+
SINK_OPC_DA_CLSID_KEY,
64+
CONNECTOR_OPC_DA_CLSID_KEY,
65+
SINK_OPC_DA_PROGID_KEY,
66+
CONNECTOR_OPC_DA_PROGID_KEY),
67+
validator.getParameters().hasAttribute(SINK_OPC_DA_CLSID_KEY),
68+
validator.getParameters().hasAttribute(CONNECTOR_OPC_DA_CLSID_KEY),
69+
validator.getParameters().hasAttribute(SINK_OPC_DA_PROGID_KEY),
70+
validator.getParameters().hasAttribute(CONNECTOR_OPC_DA_PROGID_KEY));
71+
}
72+
73+
@Override
74+
public void customize(
75+
final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration)
76+
throws Exception {
77+
synchronized (CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP) {
78+
clsID = parameters.getStringByKeys(CONNECTOR_OPC_DA_CLSID_KEY, SINK_OPC_DA_CLSID_KEY);
79+
if (Objects.isNull(clsID)) {
80+
clsID =
81+
OpcDaServerHandle.getClsIDFromProgID(
82+
parameters.getStringByKeys(CONNECTOR_OPC_DA_PROGID_KEY, SINK_OPC_DA_PROGID_KEY));
83+
}
84+
handle =
85+
CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP
86+
.computeIfAbsent(
87+
clsID, key -> new Pair<>(new AtomicInteger(0), new OpcDaServerHandle(clsID)))
88+
.getRight();
89+
CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP.get(clsID).getLeft().incrementAndGet();
90+
}
91+
}
92+
93+
@Override
94+
public void handshake() throws Exception {
95+
// Do nothing
96+
}
97+
98+
@Override
99+
public void heartbeat() throws Exception {
100+
// Do nothing
101+
}
102+
103+
@Override
104+
public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception {
105+
OpcUaConnector.transferByTablet(
106+
tabletInsertionEvent, LOGGER, tablet -> handle.transfer(tablet));
107+
}
108+
109+
@Override
110+
public void transfer(final Event event) throws Exception {
111+
// Do nothing
112+
}
113+
114+
@Override
115+
public void close() throws Exception {
116+
if (Objects.isNull(clsID)) {
117+
return;
118+
}
119+
120+
synchronized (CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP) {
121+
final Pair<AtomicInteger, OpcDaServerHandle> pair =
122+
CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP.get(clsID);
123+
if (pair == null) {
124+
return;
125+
}
126+
127+
if (pair.getLeft().decrementAndGet() <= 0) {
128+
try {
129+
pair.getRight().close();
130+
} finally {
131+
CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP.remove(clsID);
132+
}
133+
}
134+
}
135+
}
136+
}
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.pipe.connector.protocol.opcda;
21+
22+
import com.sun.jna.Pointer;
23+
import com.sun.jna.Structure;
24+
import com.sun.jna.WString;
25+
import com.sun.jna.platform.win32.COM.Unknown;
26+
import com.sun.jna.platform.win32.Guid;
27+
import com.sun.jna.platform.win32.Variant;
28+
import com.sun.jna.ptr.IntByReference;
29+
import com.sun.jna.ptr.PointerByReference;
30+
31+
import java.util.Arrays;
32+
import java.util.List;
33+
34+
/** We define the OPC DA Classes and interfaces here like C's .h file. */
35+
public class OpcDaHeader {
36+
// IOPCServer
37+
static final Guid.IID IID_IOPCServer = new Guid.IID("39C13A4D-011E-11D0-9675-0020AFD8ADB3");
38+
39+
// IOPCItemMgt
40+
static final Guid.IID IID_IOPCItemMgt = new Guid.IID("39C13A54-011E-11D0-9675-0020AFD8ADB3");
41+
42+
// IOPCSyncIO
43+
static final Guid.IID IID_IOPCSyncIO = new Guid.IID("39C13A52-011E-11D0-9675-0020AFD8ADB3");
44+
45+
// IUnknown
46+
static final Guid.IID IID_IUNKNOWN = new Guid.IID("00000000-0000-0000-C000-000000000046");
47+
48+
public static class IOPCServer extends Unknown {
49+
public IOPCServer(final Pointer p) {
50+
super(p);
51+
}
52+
53+
// /* [string][in] */ LPCWSTR szName,
54+
// /* [in] */ BOOL bActive,
55+
// /* [in] */ DWORD dwRequestedUpdateRate,
56+
// /* [in] */ OPCHANDLE hClientGroup,
57+
// /* [in][unique] */ LONG *pTimeBias,
58+
// /* [in][unique] */ FLOAT *pPercentDeadband,
59+
// /* [in] */ DWORD dwLCID,
60+
// /* [out] */ OPCHANDLE *phServerGroup,
61+
// /* [out] */ DWORD *pRevisedUpdateRate,
62+
// /* [in] */ REFIID riid,
63+
// /* [iid_is][out] */ LPUNKNOWN *ppUnk) = 0;
64+
public int addGroup(
65+
final String szName, // Group name ("" means auto)
66+
final boolean bActive, // Whether to activate the group
67+
final int dwRequestedUpdateRate, // The update rate of request (ms)
68+
final int hClientGroup, // The handle of client group
69+
final Pointer pTimeBias, // Time zone bias
70+
final Pointer pPercentDeadband, // Dead band
71+
final int dwLCID, // Region ID
72+
final PointerByReference phServerGroup, // Server group handler
73+
final IntByReference pRevisedUpdateRate, // Real update rate
74+
final Guid.GUID.ByReference riid, // Interface IID
75+
final PointerByReference ppUnk // The OPC Group pointer returned
76+
) {
77+
// Convert Java string into COM "bstr"
78+
final WString wName = new WString(szName);
79+
80+
return this._invokeNativeInt(
81+
3,
82+
new Object[] {
83+
this.getPointer(),
84+
wName,
85+
bActive ? 1 : 0,
86+
dwRequestedUpdateRate,
87+
hClientGroup,
88+
pTimeBias,
89+
pPercentDeadband,
90+
dwLCID,
91+
phServerGroup,
92+
pRevisedUpdateRate,
93+
riid != null ? riid.getPointer() : null,
94+
ppUnk
95+
});
96+
}
97+
}
98+
99+
// IOPCItemMgt(
100+
// /* [in] */ DWORD dwCount,
101+
// /* [in] */ OPCITEMDEF *pItemArray,
102+
// /* [out] */ OPCITEMRESULT **ppAddResults,
103+
// /* [out] */ HRESULT **ppErrors) = 0;
104+
public static class IOPCItemMgt extends Unknown {
105+
public IOPCItemMgt(final Pointer p) {
106+
super(p);
107+
}
108+
109+
public int addItems(
110+
final int dwCount, // Data count
111+
final OPCITEMDEF[] pItemArray, // Items array to create
112+
final PointerByReference pResults, // Results' handles
113+
final PointerByReference pErrors // Error's pointers
114+
) {
115+
return this._invokeNativeInt(
116+
3, new Object[] {this.getPointer(), dwCount, pItemArray, pResults, pErrors});
117+
}
118+
}
119+
120+
public static class IOPCSyncIO extends Unknown {
121+
public IOPCSyncIO(final Pointer p) {
122+
super(p);
123+
}
124+
125+
// /* [in] */ DWORD dwCount,
126+
// /* [size_is][in] */ OPCHANDLE *phServer,
127+
// /* [size_is][in] */ VARIANT *pItemValues,
128+
// /* [size_is][size_is][out] */ HRESULT **ppErrors) = 0;
129+
public int write(
130+
final int dwCount, // Data count
131+
final Pointer phServer, // Server handles of items
132+
final Pointer pItemValues, // Values of items
133+
final PointerByReference pErrors // Error codes
134+
) {
135+
return this._invokeNativeInt(
136+
4,
137+
new Object[] { // Write is the 4th method in vtable
138+
this.getPointer(), dwCount, phServer, pItemValues, pErrors
139+
});
140+
}
141+
}
142+
143+
// /* [string] */ LPWSTR szAccessPath;
144+
// /* [string] */ LPWSTR szItemID;
145+
// BOOL bActive;
146+
// OPCHANDLE hClient;
147+
// DWORD dwBlobSize;
148+
// /* [size_is] */ BYTE *pBlob;
149+
// VARTYPE vtRequestedDataType;
150+
// WORD wReserved;
151+
public static class OPCITEMDEF extends Structure {
152+
public WString szAccessPath = new WString(""); // Access path (Usually empty)
153+
public WString szItemID; // Item ID(Like "Channel1.Device1.Tag1")
154+
public int bActive; // Whether to activate this item(TRUE=1, FALSE=0)
155+
public int hClient; // Client handle, Used in async callback and remove item
156+
public int dwBlobSize; // BLOB size
157+
public Pointer pBlob; // BLOB's pointer
158+
public short vtRequestedDataType = Variant.VT_UNKNOWN; // Requested datatype
159+
public short wReserved; // Reserved
160+
161+
// As C structure
162+
@Override
163+
protected List<String> getFieldOrder() {
164+
return Arrays.asList(
165+
"szAccessPath",
166+
"szItemID",
167+
"bActive",
168+
"hClient",
169+
"dwBlobSize",
170+
"pBlob",
171+
"vtRequestedDataType",
172+
"wReserved");
173+
}
174+
}
175+
176+
// OPCHANDLE hServer;
177+
// VARTYPE vtCanonicalDataType;
178+
// WORD wReserved;
179+
// DWORD dwAccessRights;
180+
// DWORD dwBlobSize;
181+
// /* [size_is] */ BYTE *pBlob;
182+
public static class OPCITEMRESULT extends Structure {
183+
public int hServer; // Server handle, Used to write
184+
public short vtCanonicalDataType; // Data type (like Variant.VT_R8)
185+
public short wReserved; // Reserved word
186+
public int dwAccessRights; // Access right
187+
public int dwBlobSize; // BLOB size
188+
public Pointer pBlob; // BLOB pointer
189+
190+
public OPCITEMRESULT(final Pointer pointer) {
191+
super(pointer);
192+
}
193+
194+
@Override
195+
protected List<String> getFieldOrder() {
196+
return Arrays.asList(
197+
"hServer", "vtCanonicalDataType", "wReserved", "dwAccessRights", "dwBlobSize", "pBlob");
198+
}
199+
}
200+
}

0 commit comments

Comments
 (0)