Skip to content

Commit

Permalink
Pipe: Implemented OPC DA Sink for local COM & Fixed the newest value …
Browse files Browse the repository at this point in the history
…of OPC UA Sink (#14964)

Co-authored-by: Steve Yurong Su <[email protected]>
  • Loading branch information
Caideyipi and SteveYurongSu authored Feb 27, 2025
1 parent f1a224e commit ede623a
Show file tree
Hide file tree
Showing 12 changed files with 826 additions and 15 deletions.
1 change: 1 addition & 0 deletions dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
"jakarta.ws.rs:jakarta.ws.rs-api",
"jakarta.xml.bind:jakarta.xml.bind-api",
"net.java.dev.jna:jna",
"net.java.dev.jna:jna-platform",
"net.minidev:accessors-smart",
"net.minidev:json-smart",
"org.antlr:antlr4-runtime",
Expand Down
8 changes: 8 additions & 0 deletions iotdb-core/datanode/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,14 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
</dependency>
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna-platform</artifactId>
</dependency>
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper;
import org.apache.iotdb.db.pipe.connector.protocol.airgap.IoTDBDataRegionAirGapConnector;
import org.apache.iotdb.db.pipe.connector.protocol.legacy.IoTDBLegacyPipeConnector;
import org.apache.iotdb.db.pipe.connector.protocol.opcda.OpcDaConnector;
import org.apache.iotdb.db.pipe.connector.protocol.opcua.OpcUaConnector;
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.PipeConsensusAsyncConnector;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
Expand Down Expand Up @@ -65,6 +66,8 @@ protected void initConstructors() {
BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName(), WebSocketConnector::new);
pluginConstructors.put(
BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName(), OpcUaConnector::new);
pluginConstructors.put(
BuiltinPipePlugin.OPC_DA_CONNECTOR.getPipePluginName(), OpcDaConnector::new);
pluginConstructors.put(
BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), DoNothingConnector::new);
pluginConstructors.put(
Expand All @@ -91,6 +94,7 @@ protected void initConstructors() {
pluginConstructors.put(
BuiltinPipePlugin.WEBSOCKET_SINK.getPipePluginName(), WebSocketConnector::new);
pluginConstructors.put(BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName(), OpcUaConnector::new);
pluginConstructors.put(BuiltinPipePlugin.OPC_DA_SINK.getPipePluginName(), OpcDaConnector::new);
pluginConstructors.put(
BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), DoNothingConnector::new);
pluginConstructors.put(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.pipe.connector.protocol.opcda;

import org.apache.iotdb.db.pipe.connector.protocol.opcua.OpcUaConnector;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.annotation.TreeModel;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;

import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_DA_CLSID_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_DA_PROGID_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_OPC_DA_CLSID_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_OPC_DA_PROGID_KEY;

/**
* Send data in IoTDB based on Opc Da protocol, using JNA. All data are converted into tablets, and
* then push the newest value to the <b>local COM</b> server in another process.
*/
@TreeModel
public class OpcDaConnector implements PipeConnector {
private static final Logger LOGGER = LoggerFactory.getLogger(OpcDaConnector.class);
private static final Map<String, Pair<AtomicInteger, OpcDaServerHandle>>
CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP = new ConcurrentHashMap<>();
private String clsID;
private OpcDaServerHandle handle;

@Override
public void validate(final PipeParameterValidator validator) throws Exception {
// TODO: upgrade this logic after "1 in 2" logic is supported
validator.validate(
args ->
(((boolean) args[1] || (boolean) args[2] || (boolean) args[3] || (boolean) args[4])),
String.format(
"One of '%s', '%s', '%s' and '%s' must be specified",
SINK_OPC_DA_CLSID_KEY,
CONNECTOR_OPC_DA_CLSID_KEY,
SINK_OPC_DA_PROGID_KEY,
CONNECTOR_OPC_DA_PROGID_KEY),
validator.getParameters().hasAttribute(SINK_OPC_DA_CLSID_KEY),
validator.getParameters().hasAttribute(CONNECTOR_OPC_DA_CLSID_KEY),
validator.getParameters().hasAttribute(SINK_OPC_DA_PROGID_KEY),
validator.getParameters().hasAttribute(CONNECTOR_OPC_DA_PROGID_KEY));
}

@Override
public void customize(
final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration)
throws Exception {
synchronized (CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP) {
clsID = parameters.getStringByKeys(CONNECTOR_OPC_DA_CLSID_KEY, SINK_OPC_DA_CLSID_KEY);
if (Objects.isNull(clsID)) {
clsID =
OpcDaServerHandle.getClsIDFromProgID(
parameters.getStringByKeys(CONNECTOR_OPC_DA_PROGID_KEY, SINK_OPC_DA_PROGID_KEY));
}
handle =
CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP
.computeIfAbsent(
clsID, key -> new Pair<>(new AtomicInteger(0), new OpcDaServerHandle(clsID)))
.getRight();
CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP.get(clsID).getLeft().incrementAndGet();
}
}

@Override
public void handshake() throws Exception {
// Do nothing
}

@Override
public void heartbeat() throws Exception {
// Do nothing
}

@Override
public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception {
OpcUaConnector.transferByTablet(
tabletInsertionEvent, LOGGER, (tablet, isTableModel) -> handle.transfer(tablet));
}

@Override
public void transfer(final Event event) throws Exception {
// Do nothing
}

@Override
public void close() throws Exception {
if (Objects.isNull(clsID)) {
return;
}

synchronized (CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP) {
final Pair<AtomicInteger, OpcDaServerHandle> pair =
CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP.get(clsID);
if (pair == null) {
return;
}

if (pair.getLeft().decrementAndGet() <= 0) {
try {
pair.getRight().close();
} finally {
CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP.remove(clsID);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.pipe.connector.protocol.opcda;

import com.sun.jna.Pointer;
import com.sun.jna.Structure;
import com.sun.jna.WString;
import com.sun.jna.platform.win32.COM.Unknown;
import com.sun.jna.platform.win32.Guid;
import com.sun.jna.platform.win32.Variant;
import com.sun.jna.ptr.IntByReference;
import com.sun.jna.ptr.PointerByReference;

import java.util.Arrays;
import java.util.List;

/** We define the OPC DA Classes and interfaces here like C's .h file. */
public class OpcDaHeader {
// IOPCServer
static final Guid.IID IID_IOPCServer = new Guid.IID("39C13A4D-011E-11D0-9675-0020AFD8ADB3");

// IOPCItemMgt
static final Guid.IID IID_IOPCItemMgt = new Guid.IID("39C13A54-011E-11D0-9675-0020AFD8ADB3");

// IOPCSyncIO
static final Guid.IID IID_IOPCSyncIO = new Guid.IID("39C13A52-011E-11D0-9675-0020AFD8ADB3");

// IUnknown
static final Guid.IID IID_IUNKNOWN = new Guid.IID("00000000-0000-0000-C000-000000000046");

public static class IOPCServer extends Unknown {
public IOPCServer(final Pointer p) {
super(p);
}

// /* [string][in] */ LPCWSTR szName,
// /* [in] */ BOOL bActive,
// /* [in] */ DWORD dwRequestedUpdateRate,
// /* [in] */ OPCHANDLE hClientGroup,
// /* [in][unique] */ LONG *pTimeBias,
// /* [in][unique] */ FLOAT *pPercentDeadband,
// /* [in] */ DWORD dwLCID,
// /* [out] */ OPCHANDLE *phServerGroup,
// /* [out] */ DWORD *pRevisedUpdateRate,
// /* [in] */ REFIID riid,
// /* [iid_is][out] */ LPUNKNOWN *ppUnk) = 0;
public int addGroup(
final String szName, // Group name ("" means auto)
final boolean bActive, // Whether to activate the group
final int dwRequestedUpdateRate, // The update rate of request (ms)
final int hClientGroup, // The handle of client group
final Pointer pTimeBias, // Time zone bias
final Pointer pPercentDeadband, // Dead band
final int dwLCID, // Region ID
final PointerByReference phServerGroup, // Server group handler
final IntByReference pRevisedUpdateRate, // Real update rate
final Guid.GUID.ByReference riid, // Interface IID
final PointerByReference ppUnk // The OPC Group pointer returned
) {
// Convert Java string into COM "bstr"
final WString wName = new WString(szName);

return this._invokeNativeInt(
3,
new Object[] {
this.getPointer(),
wName,
bActive ? 1 : 0,
dwRequestedUpdateRate,
hClientGroup,
pTimeBias,
pPercentDeadband,
dwLCID,
phServerGroup,
pRevisedUpdateRate,
riid != null ? riid.getPointer() : null,
ppUnk
});
}
}

// IOPCItemMgt(
// /* [in] */ DWORD dwCount,
// /* [in] */ OPCITEMDEF *pItemArray,
// /* [out] */ OPCITEMRESULT **ppAddResults,
// /* [out] */ HRESULT **ppErrors) = 0;
public static class IOPCItemMgt extends Unknown {
public IOPCItemMgt(final Pointer p) {
super(p);
}

public int addItems(
final int dwCount, // Data count
final OPCITEMDEF[] pItemArray, // Items array to create
final PointerByReference pResults, // Results' handles
final PointerByReference pErrors // Error's pointers
) {
return this._invokeNativeInt(
3, new Object[] {this.getPointer(), dwCount, pItemArray, pResults, pErrors});
}
}

public static class IOPCSyncIO extends Unknown {
public IOPCSyncIO(final Pointer p) {
super(p);
}

// /* [in] */ DWORD dwCount,
// /* [size_is][in] */ OPCHANDLE *phServer,
// /* [size_is][in] */ VARIANT *pItemValues,
// /* [size_is][size_is][out] */ HRESULT **ppErrors) = 0;
public int write(
final int dwCount, // Data count
final Pointer phServer, // Server handles of items
final Pointer pItemValues, // Values of items
final PointerByReference pErrors // Error codes
) {
return this._invokeNativeInt(
4,
new Object[] { // Write is the 4th method in vtable
this.getPointer(), dwCount, phServer, pItemValues, pErrors
});
}
}

// /* [string] */ LPWSTR szAccessPath;
// /* [string] */ LPWSTR szItemID;
// BOOL bActive;
// OPCHANDLE hClient;
// DWORD dwBlobSize;
// /* [size_is] */ BYTE *pBlob;
// VARTYPE vtRequestedDataType;
// WORD wReserved;
public static class OPCITEMDEF extends Structure {
public WString szAccessPath = new WString(""); // Access path (Usually empty)
public WString szItemID; // Item ID(Like "Channel1.Device1.Tag1")
public int bActive; // Whether to activate this item(TRUE=1, FALSE=0)
public int hClient; // Client handle, Used in async callback and remove item
public int dwBlobSize; // BLOB size
public Pointer pBlob; // BLOB's pointer
public short vtRequestedDataType = Variant.VT_UNKNOWN; // Requested datatype
public short wReserved; // Reserved

// As C structure
@Override
protected List<String> getFieldOrder() {
return Arrays.asList(
"szAccessPath",
"szItemID",
"bActive",
"hClient",
"dwBlobSize",
"pBlob",
"vtRequestedDataType",
"wReserved");
}
}

// OPCHANDLE hServer;
// VARTYPE vtCanonicalDataType;
// WORD wReserved;
// DWORD dwAccessRights;
// DWORD dwBlobSize;
// /* [size_is] */ BYTE *pBlob;
public static class OPCITEMRESULT extends Structure {
public int hServer; // Server handle, Used to write
public short vtCanonicalDataType; // Data type (like Variant.VT_R8)
public short wReserved; // Reserved word
public int dwAccessRights; // Access right
public int dwBlobSize; // BLOB size
public Pointer pBlob; // BLOB pointer

public OPCITEMRESULT(final Pointer pointer) {
super(pointer);
}

@Override
protected List<String> getFieldOrder() {
return Arrays.asList(
"hServer", "vtCanonicalDataType", "wReserved", "dwAccessRights", "dwBlobSize", "pBlob");
}
}
}
Loading

0 comments on commit ede623a

Please sign in to comment.