diff --git a/cygnus-common/pom.xml b/cygnus-common/pom.xml index a870b0b98..dc5d1c52e 100644 --- a/cygnus-common/pom.xml +++ b/cygnus-common/pom.xml @@ -157,6 +157,11 @@ jetty-util 6.1.25 + + com.openlinksw + virtjdbc4_3 + 3.123 + diff --git a/cygnus-common/src/main/java/com/telefonica/iot/cygnus/backends/sql/SQLBackendImpl.java b/cygnus-common/src/main/java/com/telefonica/iot/cygnus/backends/sql/SQLBackendImpl.java index 461b6e422..af3de27f3 100644 --- a/cygnus-common/src/main/java/com/telefonica/iot/cygnus/backends/sql/SQLBackendImpl.java +++ b/cygnus-common/src/main/java/com/telefonica/iot/cygnus/backends/sql/SQLBackendImpl.java @@ -668,7 +668,6 @@ public void upsertTransaction (LinkedHashMap> agg Connection connection = null; String upsertQuerys = new String(); String currentUpsertQuery = new String(); - int insertedRows[]; try { diff --git a/cygnus-common/src/main/java/com/telefonica/iot/cygnus/backends/sql/SQLQueryUtils.java b/cygnus-common/src/main/java/com/telefonica/iot/cygnus/backends/sql/SQLQueryUtils.java index ce18d587e..76b65a3ed 100644 --- a/cygnus-common/src/main/java/com/telefonica/iot/cygnus/backends/sql/SQLQueryUtils.java +++ b/cygnus-common/src/main/java/com/telefonica/iot/cygnus/backends/sql/SQLQueryUtils.java @@ -558,7 +558,7 @@ protected static int collectionSizeOnLinkedHashMap(LinkedHashMap> aggregation, + LinkedHashMap> lastData, + LinkedHashMap> lastDataDelete, + String dataBase, + String schema, + String tableName, + String tableSuffix, + String uniqueKey, + String timestampKey, + String timestampFormat, + boolean attrNativeTypes) + throws CygnusPersistenceError, CygnusBadContextData, CygnusRuntimeError { + + Connection connection = null; + String upsertQuerys = new String(); + String currentUpsertQuery = new String(); + + try { + + connection = driver.getConnection(dataBase); + connection.setAutoCommit(false); + + ArrayList upsertQuerysList = VirtuosoQueryUtils.virtuosoUpsertQuery(aggregation, + lastData, + lastDataDelete, + tableName, + tableSuffix, + uniqueKey, + timestampKey, + timestampFormat, + dataBase, + schema, + attrNativeTypes); + + // Ordering queries to avoid deadlocks. See issue #2197 for more detail + upsertQuerysList.sort(Comparator.comparing(buff -> buff.toString())); + + for (StringBuffer query : upsertQuerysList) { + PreparedStatement upsertStatement; + currentUpsertQuery = query.toString(); + upsertStatement = connection.prepareStatement(currentUpsertQuery); + // FIXME https://github.com/telefonicaid/fiware-cygnus/issues/1959 + upsertStatement.executeUpdate(); + upsertQuerys = upsertQuerys + " " + query; + } + + connection.commit(); + LOGGER.info(" Finished transactions into database: " + + dataBase + " \n upsertQuerys: " + upsertQuerys); + + } catch (SQLTimeoutException e) { + cygnusSQLRollback(connection); + if (upsertQuerys.isEmpty() && currentUpsertQuery.isEmpty()) { + throw new CygnusPersistenceError(" " + e.getNextException() + + " Data insertion error. database: " + dataBase + + " connection: " + connection, + " SQLTimeoutException", e.getMessage()); + } else { + throw new CygnusPersistenceError(" " + e.getNextException() + + " Data insertion error. database: " + dataBase + + " upsertQuerys: " + upsertQuerys + + " currentUpsertQuery: " + currentUpsertQuery, + " SQLTimeoutException", e.getMessage()); + } + } catch (SQLException e) { + cygnusSQLRollback(connection); + if (upsertQuerys.isEmpty() && currentUpsertQuery.isEmpty()) { + throw new CygnusBadContextData(" " + e.getNextException() + + " Data insertion error. database: " + dataBase + + " connection: " + connection, + " SQLException", e.getMessage()); + + } else { + String allQueries = " upsertQuerys: " + upsertQuerys + + " currentUpsertQuery: " + currentUpsertQuery; + throw new CygnusBadContextData(" " + e.getNextException() + + " Data insertion error. database: " + dataBase + allQueries, + " SQLException", e.getMessage()); + } + } finally { + closeConnection(connection); + } // try catch + tableName = schema + "." + tableName; + + LOGGER.debug(" Trying to add '" + dataBase + "' and '" + tableName + "' to the cache after upsertion"); + cache.addDataBase(dataBase); + cache.addTable(dataBase, tableName); + } + + + private void cygnusSQLRollback (Connection connection) { + try { + connection.rollback(); + } catch (SQLException e) { + LOGGER.error("Error when rollingback transaction " + e.getMessage()); + } + } + + + public class VirtuosoDriver { + + private final HashMap datasources; + private final HashMap pools; + private final String virtuosoHost; + private final String virtuosoPort; + private final String virtuosoUsername; + private final String virtuosoPassword; + private final int maxPoolSize; + private final int maxPoolIdle; + private final int minPoolIdle; + private final int minPoolIdleTimeMillis; + private final String virtuosoOptions; + + /** + * Constructor. + * + * @param virtuosoHost + * @param virtuosoPort + * @param virtuosoUsername + * @param virtuosoPassword + * @param maxPoolSize + * @param maxPoolIdle + * @param minPoolIdle + * @param minPoolIdleTimeMillis + * @param virtuosoOptions + */ + public VirtuosoDriver(String virtuosoHost, String virtuosoPort, String virtuosoUsername, String virtuosoPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, int minPoolIdleTimeMillis, String virtuosoOptions) { + datasources = new HashMap<>(); + pools = new HashMap<>(); + this.virtuosoHost = virtuosoHost; + this.virtuosoPort = virtuosoPort; + this.virtuosoUsername = virtuosoUsername; + this.virtuosoPassword = virtuosoPassword; + this.maxPoolSize = maxPoolSize; + this.maxPoolIdle = maxPoolIdle; + this.minPoolIdle = minPoolIdle; + this.minPoolIdleTimeMillis = minPoolIdleTimeMillis; + this.virtuosoOptions = virtuosoOptions; + } // VirtuosoDriver + + /** + * Gets a connection to the SQL server. + * + * @param destination + * @return + * @throws CygnusRuntimeError + * @throws CygnusPersistenceError + */ + public Connection getConnection(String destination) throws CygnusRuntimeError, CygnusPersistenceError { + try { + // FIXME: the number of cached connections should be limited to + // a certain number; with such a limit + // number, if a new connection is needed, the oldest one is closed + Connection connection = null; + + if (datasources.containsKey(destination)) { + connection = datasources.get(destination).getConnection(); + LOGGER.debug(" Recovered destination connection from cache (" + destination + ")"); + } + + if (connection == null || !connection.isValid(0)) { + if (connection != null) { + LOGGER.debug(" Closing invalid sql connection for destination " + destination); + try { + connection.close(); + } catch (SQLException e) { + LOGGER.warn(" error closing invalid connection: " + e.getMessage()); + } + } // if + + DataSource datasource = createConnectionPool(destination); + datasources.put(destination, datasource); + connection = datasource.getConnection(); + } // if + + // Check Pool cache and log status + if (pools.containsKey(destination)){ + GenericObjectPool pool = pools.get(destination); + LOGGER.debug(" Pool status (" + destination + ") Max.: " + pool.getMaxActive() + "; Active: " + + pool.getNumActive() + "; Idle: " + pool.getNumIdle()); + }else{ + LOGGER.error(" Can't find dabase in pool cache (" + destination + ")"); + } + return connection; + } catch (ClassNotFoundException e) { + throw new CygnusRuntimeError(" Connection error", "ClassNotFoundException", e.getMessage()); + } catch (SQLException e) { + throw new CygnusPersistenceError(" Connection error", "SQLException", e.getMessage()); + } catch (Exception e) { + throw new CygnusRuntimeError(" Connection error creating new Pool", "Exception", e.getMessage()); + } // try catch + } // getConnection + + /** + * Gets if a connection is created for the given destination. It is + * protected since it is only used in the tests. + * + * @param destination + * @return True if the connection exists, false other wise + */ + protected boolean isConnectionCreated(String destination) { + return datasources.containsKey(destination); + } // isConnectionCreated + + /** + * Returns the actual number of active connections + * @return + */ + protected int activePoolConnections() { + int connectionCount = 0; + for ( String destination : pools.keySet()){ + GenericObjectPool pool = pools.get(destination); + connectionCount += pool.getNumActive(); + LOGGER.debug(" Pool status (" + destination + ") Max.: " + pool.getMaxActive() + "; Active: " + + pool.getNumActive() + "; Idle: " + pool.getNumIdle()); + } + LOGGER.debug(" Total pool's active connections: " + connectionCount); + return connectionCount; + } // activePoolConnections + + /** + * Returns the Maximum number of connections + * @return + */ + protected int maxPoolConnections() { + int connectionCount = 0; + for ( String destination : pools.keySet()){ + GenericObjectPool pool = pools.get(destination); + connectionCount += pool.getMaxActive(); + LOGGER.debug(" Pool status (" + destination + ") Max.: " + pool.getMaxActive() + "; Active: " + + pool.getNumActive() + "; Idle: " + pool.getNumIdle()); + } + LOGGER.debug(" Max pool connections: " + connectionCount); + return connectionCount; + } // maxPoolConnections + + /** + * Gets the number of connections created. + * + * @return The number of connections created + */ + protected int numConnectionsCreated() { + return activePoolConnections(); + } // numConnectionsCreated + + /** + * Create a connection pool for destination. + * + * @param destination + * @return PoolingDataSource + * @throws Exception + */ + @SuppressWarnings("unused") + private DataSource createConnectionPool(String destination) throws Exception { + GenericObjectPool gPool = null; + if (pools.containsKey(destination)){ + LOGGER.debug(" Pool recovered from Cache (" + destination + ")"); + gPool = pools.get(destination); + }else{ + String jdbcUrl = generateJDBCUrl(destination); + Class.forName("virtuoso.jdbc4.Driver"); + + // Creates an Instance of GenericObjectPool That Holds Our Pool of Connections Object! + gPool = new GenericObjectPool(); + // Tune from https://javadoc.io/static/commons-pool/commons-pool/1.6/org/apache/commons/pool/impl/GenericObjectPool.html + // Sets the cap on the number of objects that can be allocated by the pool (checked out to clients, or idle awaiting checkout) at a given time. + gPool.setMaxActive(this.maxPoolSize); + // Sets the cap on the number of "idle" instances in the pool. + gPool.setMaxIdle(this.maxPoolIdle); + // Sets the minimum number of objects allowed in the pool before the evictor thread (if active) spawns new objects. + gPool.setMinIdle(this.minPoolIdle); + // Sets the minimum amount of time an object may sit idle in the pool before it is eligible for eviction by the idle object evictor (if any) + gPool.setMinEvictableIdleTimeMillis(this.minPoolIdleTimeMillis); + // Sets the number of milliseconds to sleep between runs of the idle object evictor thread + gPool.setTimeBetweenEvictionRunsMillis(this.minPoolIdleTimeMillis*3); + pools.put(destination, gPool); + + // Creates a ConnectionFactory Object Which Will Be Used by the Pool to Create the Connection Object! + String sep = (virtuosoOptions != null && !virtuosoOptions.trim().isEmpty()) ? "&" : "?"; + String logJdbc = jdbcUrl + sep + "user=" + virtuosoUsername + "&password=XXXXXXXXXX"; + + LOGGER.debug(" Creating connection pool jdbc: " + logJdbc); + ConnectionFactory cf = new DriverManagerConnectionFactory(jdbcUrl, virtuosoUsername, virtuosoPassword); + + // Creates a PoolableConnectionFactory That Will Wraps the Connection Object Created by + // the ConnectionFactory to Add Object Pooling Functionality! + PoolableConnectionFactory pcf = new PoolableConnectionFactory(cf, gPool, null, null, false, true); + } //else + return new PoolingDataSource(gPool); + } // createConnectionPool + + /** + * Generate the JDBC Url. This method is portected since this is called from test class. + * + * @param destination + * @return jdbcurl + */ + protected String generateJDBCUrl(String destination) { + String jdbcUrl = ""; + + jdbcUrl = "jdbc:" + "virtuoso" + "://" + virtuosoHost + ":" + virtuosoPort + "/" + destination; + + if (virtuosoOptions != null && !virtuosoOptions.trim().isEmpty()) { + jdbcUrl += "?" + virtuosoOptions; + } + + return jdbcUrl; + } // generateJDBCUrl + + /** + * Closes the Driver releasing resources + * @return + */ + public void close() { + int poolCount = 0; + int poolsSize = pools.size(); + + for ( String destination : pools.keySet()){ + GenericObjectPool pool = pools.get(destination); + try { + pool.close(); + pools.remove(destination); + poolCount ++; + LOGGER.debug(" Pool closed: (" + destination + ")"); + } catch (Exception e) { + LOGGER.error(" Error closing SQL pool " + destination +": " + e.getMessage()); + } + } + LOGGER.debug(" Number of Pools closed: " + poolCount + "/" + poolsSize); + } // close + + /** + * Last resort releasing resources + */ + public void Finally(){ + this.close(); + } + + } // VirtuosoDriver +} diff --git a/cygnus-common/src/main/java/com/telefonica/iot/cygnus/backends/virtuoso/VirtuosoQueryUtils.java b/cygnus-common/src/main/java/com/telefonica/iot/cygnus/backends/virtuoso/VirtuosoQueryUtils.java new file mode 100644 index 000000000..e0b2ce8cc --- /dev/null +++ b/cygnus-common/src/main/java/com/telefonica/iot/cygnus/backends/virtuoso/VirtuosoQueryUtils.java @@ -0,0 +1,135 @@ +/** + * Copyright 2014-2020 Telefonica Investigación y Desarrollo, S.A.U + * + * This file is part of fiware-cygnus (FIWARE project). + * + * fiware-cygnus is free software: you can redistribute it and/or modify it under the terms of the GNU Affero + * General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your + * option) any later version. + * fiware-cygnus is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the + * implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License + * for more details. + * + * You should have received a copy of the GNU Affero General Public License along with fiware-cygnus. If not, see + * http://www.gnu.org/licenses/. + * + * For those usages not covered by the GNU Affero General Public License please contact with iot_support at tid dot es + */ + +package com.telefonica.iot.cygnus.backends.virtuoso; + +import com.google.gson.JsonElement; +import com.telefonica.iot.cygnus.log.CygnusLogger; +import com.telefonica.iot.cygnus.backends.sql.SQLQueryUtils; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Set; +import java.util.Arrays; +import java.util.List; + + +/** + * The type Ngsivirtuoso utils. + */ +public class VirtuosoQueryUtils { + + private static final CygnusLogger LOGGER = new CygnusLogger(VirtuosoQueryUtils.class); + + public static final String VIRTUOSO_FIELDS_MARK = ""; + public static final String SEPARATION_MARK = ","; + + /** + * Virtuoso upsert query for Virtuoso string buffer. + * + * @param aggregation the aggregation + * @param lastData the last data + * @param lastDataDelete the last data delete + * @param tableName the table name + * @param tableSuffix the table suffix + * @param uniqueKey the unique key + * @param timestampKey the timestamp key + * @param timestampFormat the timestamp format + * @param schema the destination + * @return the string buffer + */ + protected static ArrayList virtuosoUpsertQuery(LinkedHashMap> aggregation, + LinkedHashMap> lastData, + LinkedHashMap> lastDataDelete, + String tableName, + String tableSuffix, + String uniqueKey, + String timestampKey, + String timestampFormat, + String dataBase, + String schema, + boolean attrNativeTypes) { + + ArrayList upsertList = new ArrayList<>(); + + LOGGER.debug("[VirtuosoQueryUtils.virtuosoUpsertQuery] tableName: " + tableName + " tableSuffix " + tableSuffix + " uniqueKey " + uniqueKey + " dataBase " + dataBase + " schema " + schema); + + for (int i = 0 ; i < collectionSizeOnLinkedHashMap(lastData) ; i++) { + StringBuffer query = new StringBuffer(); + // StringBuffer values = new StringBuffer("("); + // StringBuffer fields = new StringBuffer("("); + // StringBuffer updateSet = new StringBuffer(); + // String valuesSeparator = ""; + // String fieldsSeparator = ""; + // String updateSetSeparator = ""; + ArrayList keys = new ArrayList<>(aggregation.keySet()); + for (int j = 0 ; j < keys.size() ; j++) { + // values + JsonElement value = lastData.get(keys.get(j)).get(i); + String valueToAppend = value == null ? "null" : SQLQueryUtils.getStringValueFromJsonElement(value, "'", attrNativeTypes); + LOGGER.debug("[VirtuosoQueryUtils.virtuosoUpsertQuery] key: " + keys.get(j) + " value: " + value + " valueToAppend " + valueToAppend); + // values.append(valuesSeparator).append(valueToAppend); + // valuesSeparator = ","; + + // // fields + // fields.append(fieldsSeparator).append(keys.get(j)); + // fieldsSeparator = ","; + + // // updateSet + // if (!Arrays.asList(uniqueKey.split("\\s*,\\s*")).contains(keys.get(j))) { + // updateSet.append(updateSetSeparator).append(keys.get(j)).append("=").append(postgisTempReference).append(".").append(keys.get(j)); + // updateSetSeparator = ","; + // } + + } + + String graphUri = "http://example.org/graph"; + String triple = " \"Objeto\" ."; + // Insertar el triple usando DB.DBA.TTLP + query.append("DB.DBA.TTLP('").append(triple).append("', '', '").append(graphUri).append("')"); + + upsertList.add(query); + } + // for (int i = 0 ; i < collectionSizeOnLinkedHashMap(lastDataDelete) ; i++) { + // StringBuffer query = new StringBuffer(); + // // TBD + // } + + + LOGGER.debug("[VirtuosoQueryUtils.virtuosoUpsertQuery] Preparing Upsert querys: " + upsertList.toString()); + return upsertList; + } + + /** + * Collection size on linked hash map int. + * + * @param aggregation the aggregation + * @return the number of attributes contained on the aggregation object. + */ + protected static int collectionSizeOnLinkedHashMap(LinkedHashMap> aggregation) { + ArrayList> list = new ArrayList<>(aggregation.values()); + if (list.size() > 0) + return list.get(0).size(); + else + return 0; + } + +} diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIVirtuosoSink.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIVirtuosoSink.java new file mode 100644 index 000000000..f4b75cb38 --- /dev/null +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIVirtuosoSink.java @@ -0,0 +1,634 @@ +/** + * Copyright 2025 Telefonica Espana + * + * This file is part of fiware-cygnus (FIWARE project). + * + * fiware-cygnus is free software: you can redistribute it and/or modify it under the terms of the GNU Affero + * General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your + * option) any later version. + * fiware-cygnus is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the + * implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License + * for more details. + * + * You should have received a copy of the GNU Affero General Public License along with fiware-cygnus. If not, see + * http://www.gnu.org/licenses/. + * + * For those usages not covered by the GNU Affero General Public License please contact with iot_support at tid dot es + */ + +package com.telefonica.iot.cygnus.sinks; + +import com.telefonica.iot.cygnus.aggregation.NGSIGenericAggregator; +import com.telefonica.iot.cygnus.aggregation.NGSIGenericColumnAggregator; +import com.telefonica.iot.cygnus.aggregation.NGSIGenericRowAggregator; +import com.telefonica.iot.cygnus.backends.virtuoso.VirtuosoBackendImpl; +import com.telefonica.iot.cygnus.errors.CygnusBadConfiguration; +import com.telefonica.iot.cygnus.errors.CygnusBadContextData; +import com.telefonica.iot.cygnus.errors.CygnusCappingError; +import com.telefonica.iot.cygnus.errors.CygnusExpiratingError; +import com.telefonica.iot.cygnus.errors.CygnusPersistenceError; +import com.telefonica.iot.cygnus.errors.CygnusRuntimeError; +import com.telefonica.iot.cygnus.interceptors.NGSIEvent; +import com.telefonica.iot.cygnus.log.CygnusLogger; +import com.telefonica.iot.cygnus.utils.*; +import java.util.ArrayList; +import java.util.Arrays; +import org.apache.flume.Context; + +/** + * + * @author smartcities + + Detailed documentation can be found at: + *** TDB *** + */ +public class NGSIVirtuosoSink extends NGSISink { + + //private static final String DEFAULT_ROW_ATTR_PERSISTENCE = "row"; + private static final String DEFAULT_PASSWORD = "dba"; + private static final String DEFAULT_PORT = "1111"; + private static final String DEFAULT_HOST = "localhost"; + private static final String DEFAULT_USER_NAME = "dba"; + private static final String DEFAULT_DATABASE = "vdb"; + private static final String DEFAULT_ENABLE_CACHE = "false"; + private static final int DEFAULT_MAX_POOL_SIZE = 3; + private static final int DEFAULT_MAX_POOL_IDLE = 2; + private static final int DEFAULT_MIN_POOL_IDLE = 0; + private static final int DEFAULT_MIN_POOL_IDLE_TIME_MILLIS = 10000; + private static final String DEFAULT_POSTGIS_TYPE = "geometry"; + private static final String DEFAULT_ATTR_NATIVE_TYPES = "false"; + private static final String DEFAULT_FIWARE_SERVICE = "default"; + private static final String ESCAPED_DEFAULT_FIWARE_SERVICE = "default_service"; + private static final String DEFAULT_LAST_DATA_MODE = "insert"; + private static final String DEFAULT_LAST_DATA_TABLE_SUFFIX = "_last_data"; + private static final String DEFAULT_LAST_DATA_UNIQUE_KEY = NGSIConstants.ENTITY_ID; + private static final String DEFAULT_LAST_DATA_TIMESTAMP_KEY = NGSIConstants.RECV_TIME; + private static final String DEFAULT_LAST_DATA_SQL_TS_FORMAT = "YYYY-MM-DD HH24:MI:SS.MS"; + + private static final CygnusLogger LOGGER = new CygnusLogger(NGSIVirtuosoSink.class); + private String virtuosoHost; + private String virtuosoPort; + private String virtuosoDatabase; + private String virtuosoUsername; + private String virtuosoPassword; + //private boolean rowAttrPersistence; + private int maxPoolSize; + private int maxPoolIdle; + private int minPoolIdle; + private int minPoolIdleTimeMillis; + private VirtuosoBackendImpl virtuosoPersistenceBackend; + private boolean enableCache; + private boolean swapCoordinates; + private boolean attrNativeTypes; + private boolean attrMetadataStore; + private String virtuosoOptions; + //private boolean persistErrors; + private String lastDataMode; + private String lastDataTableSuffix; + private String lastDataUniqueKey; + private String lastDataTimeStampKey; + private String lastDataSQLTimestampFormat; + + /** + * Constructor. + */ + public NGSIVirtuosoSink() { + super(); + } // NGSIVirtuosoSink + + /** + * Gets the Virtuoso host. It is protected due to it is only required for testing purposes. + * @return The Virtuoso host + */ + protected String getVirtuosoHost() { + return virtuosoHost; + } // getVirtuosoHost + + /** + * Gets the Virtuoso cache. It is protected due to it is only required for testing purposes. + * @return The Virtuoso cache state + */ + protected boolean getEnableCache() { + return enableCache; + } // getEnableCache + + /** + * Gets the Virtuoso port. It is protected due to it is only required for testing purposes. + * @return The Virtuoso port + */ + protected String getVirtuosoPort() { + return virtuosoPort; + } // getVirtuosoPort + + /** + * Gets the Virtuoso database. It is protected due to it is only required for testing purposes. + * @return The Virtuoso database + */ + protected String getVirtuosoDatabase() { + return virtuosoDatabase; + } // getVirtuosoDatabase + + /** + * Gets the Virtuoso username. It is protected due to it is only required for testing purposes. + * @return The Virtuoso username + */ + protected String getVirtuosoUsername() { + return virtuosoUsername; + } // getVirtuosoUsername + + /** + * Gets the Virtuoso password. It is protected due to it is only required for testing purposes. + * @return The Virtuoso password + */ + protected String getVirtuosoPassword() { + return virtuosoPassword; + } // getVirtuosoPassword + + // /** + // * Returns if the attribute persistence is row-based. It is protected due to it is only required for testing + // * purposes. + // * @return True if the attribute persistence is row-based, false otherwise + // */ + // protected boolean getRowAttrPersistence() { + // return rowAttrPersistence; + // } // getRowAttrPersistence + + /** + * Gets the Virtuoso options. It is protected due to it is only required for testing purposes. + * @return The Virtuoso options + */ + protected String getVirtuosoOptions() { + return virtuosoOptions; + } // getVirtuosoOptions + + /** + * Returns if the attribute value will be native or stringfy. It will be stringfy due to backward compatibility + * purposes. + * @return True if the attribute value will be native, false otherwise + */ + protected boolean getNativeAttrTypes() { + return attrNativeTypes; + } // attrNativeTypes + + /** + * Returns the persistence backend. It is protected due to it is only required for testing purposes. + * @return The persistence backend + */ + protected VirtuosoBackendImpl getPersistenceBackend() { + return virtuosoPersistenceBackend; + } // getPersistenceBackend + + /** + * Sets the persistence backend. It is protected due to it is only required for testing purposes. + * @param virtuosoPersistenceBackend + */ + protected void setPersistenceBackend(VirtuosoBackendImpl virtuosoPersistenceBackend) { + this.virtuosoPersistenceBackend = virtuosoPersistenceBackend; + } // setPersistenceBackend + + @Override + public void configure(Context context) { + // Read NGSISink general configuration + super.configure(context); + + // Impose enable lower case, since Virtuoso only accepts lower case + enableLowercase = true; + + virtuosoHost = context.getString("virtuoso_host", DEFAULT_HOST); + LOGGER.debug("[" + this.getName() + "] Reading configuration (virtuoso_host=" + virtuosoHost + ")"); + virtuosoPort = context.getString("virtuoso_port", DEFAULT_PORT); + int intPort = Integer.parseInt(virtuosoPort); + + if ((intPort <= 0) || (intPort > 65535)) { + invalidConfiguration = true; + LOGGER.warn("[" + this.getName() + "] Invalid configuration (virtuoso_port=" + virtuosoPort + ")" + + " -- Must be between 0 and 65535"); + } else { + LOGGER.debug("[" + this.getName() + "] Reading configuration (virtuoso_port=" + virtuosoPort + ")"); + } // if else + + virtuosoDatabase = context.getString("virtuoso_database", DEFAULT_DATABASE); + LOGGER.debug("[" + this.getName() + "] Reading configuration (virtuoso_database=" + virtuosoDatabase + ")"); + virtuosoUsername = context.getString("virtuoso_username", DEFAULT_USER_NAME); + LOGGER.debug("[" + this.getName() + "] Reading configuration (virtuoso_username=" + virtuosoUsername + ")"); + // FIXME: virtuosoPassword should be read as a SHA1 and decoded here + virtuosoPassword = context.getString("virtuoso_password", DEFAULT_PASSWORD); + LOGGER.debug("[" + this.getName() + "] Reading configuration (virtuoso_password=" + virtuosoPassword + ")"); + + maxPoolSize = context.getInteger("virtuoso_maxPoolSize", DEFAULT_MAX_POOL_SIZE); + LOGGER.debug("[" + this.getName() + "] Reading configuration (virtuoso_maxPoolSize=" + maxPoolSize + ")"); + + maxPoolIdle = context.getInteger("virtuoso_maxPoolIdle", DEFAULT_MAX_POOL_IDLE); + LOGGER.debug("[" + this.getName() + "] Reading configuration (virtuoso_maxPoolIdle=" + maxPoolIdle + ")"); + + minPoolIdle = context.getInteger("virtuoso_minPoolIdle", DEFAULT_MIN_POOL_IDLE); + LOGGER.debug("[" + this.getName() + "] Reading configuration (virtuoso_minPoolIdle=" + minPoolIdle + ")"); + + minPoolIdleTimeMillis = context.getInteger("virtuoso_minPoolIdleTimeMillis", DEFAULT_MIN_POOL_IDLE_TIME_MILLIS); + LOGGER.debug("[" + this.getName() + "] Reading configuration (virtuoso_minPoolIdleTimeMillis=" + minPoolIdleTimeMillis + ")"); + + // rowAttrPersistence = context.getString("attr_persistence", DEFAULT_ROW_ATTR_PERSISTENCE).equals("row"); + // String persistence = context.getString("attr_persistence", DEFAULT_ROW_ATTR_PERSISTENCE); + + // if (persistence.equals("row") || persistence.equals("column")) { + // LOGGER.debug("[" + this.getName() + "] Reading configuration (attr_persistence=" + // + persistence + ")"); + // } else { + // invalidConfiguration = true; + // LOGGER.warn("[" + this.getName() + "] Invalid configuration (attr_persistence=" + // + persistence + ") -- Must be 'row' or 'column'"); + // } // if else + + String enableCacheStr = context.getString("backend.enable_cache", DEFAULT_ENABLE_CACHE); + + if (enableCacheStr.equals("true") || enableCacheStr.equals("false")) { + enableCache = Boolean.valueOf(enableCacheStr); + LOGGER.debug("[" + this.getName() + "] Reading configuration (backend.enable_cache=" + enableCache + ")"); + } else { + invalidConfiguration = true; + LOGGER.warn("[" + this.getName() + "] Invalid configuration (backend.enable_cache=" + + enableCacheStr + ") -- Must be 'true' or 'false'"); + } // if else + + // TBD: possible option for virtuosoSink + swapCoordinates = false; + + + String attrNativeTypesStr = context.getString("attr_native_types", DEFAULT_ATTR_NATIVE_TYPES); + if (attrNativeTypesStr.equals("true") || attrNativeTypesStr.equals("false")) { + attrNativeTypes = Boolean.valueOf(attrNativeTypesStr); + LOGGER.debug("[" + this.getName() + "] Reading configuration (attr_native_types=" + attrNativeTypesStr + ")"); + } else { + invalidConfiguration = true; + LOGGER.warn("[" + this.getName() + "] Invalid configuration (attr_native_types=" + + attrNativeTypesStr + ") -- Must be 'true' or 'false'"); + } // if else + + String attrMetadataStoreStr = context.getString("attr_metadata_store", "true"); + + if (attrMetadataStoreStr.equals("true") || attrMetadataStoreStr.equals("false")) { + attrMetadataStore = Boolean.parseBoolean(attrMetadataStoreStr); + LOGGER.debug("[" + this.getName() + "] Reading configuration (attr_metadata_store=" + + attrMetadataStore + ")"); + } else { + invalidConfiguration = true; + LOGGER.debug("[" + this.getName() + "] Invalid configuration (attr_metadata_store=" + + attrMetadataStoreStr + ") -- Must be 'true' or 'false'"); + } // if else + + virtuosoOptions = context.getString("virtuoso_options", null); + LOGGER.debug("[" + this.getName() + "] Reading configuration (virtuoso_options=" + virtuosoOptions + ")"); + + // String persistErrorsStr = context.getString("persist_errors", "true"); + + // if (persistErrorsStr.equals("true") || persistErrorsStr.equals("false")) { + // persistErrors = Boolean.parseBoolean(persistErrorsStr); + // LOGGER.debug("[" + this.getName() + "] Reading configuration (persist_errors=" + // + persistErrors + ")"); + // } else { + // invalidConfiguration = true; + // LOGGER.debug("[" + this.getName() + "] Invalid configuration (persist_errors=" + // + persistErrorsStr + ") -- Must be 'true' or 'false'"); + // } // if else + + lastDataMode = context.getString("last_data_mode", DEFAULT_LAST_DATA_MODE); + + if (lastDataMode.equals("upsert") || lastDataMode.equals("insert") || lastDataMode.equals("both")) { + LOGGER.debug("[" + this.getName() + "] Reading configuration (last_data_mode=" + + lastDataMode + ")"); + } else { + invalidConfiguration = true; + LOGGER.debug("[" + this.getName() + "] Invalid configuration (last_data_mode=" + + lastDataMode + ") -- Must be 'upsert', 'insert' or 'both'"); + } // if else + + lastDataTableSuffix = context.getString("last_data_table_suffix", DEFAULT_LAST_DATA_TABLE_SUFFIX); + LOGGER.debug("[" + this.getName() + "] Reading configuration (last_data_table_suffix=" + + lastDataTableSuffix + ")"); + + lastDataUniqueKey = context.getString("last_data_unique_key", DEFAULT_LAST_DATA_UNIQUE_KEY); + LOGGER.debug("[" + this.getName() + "] Reading configuration (last_data_unique_key=" + + lastDataUniqueKey + ")"); + + lastDataTimeStampKey = context.getString("last_data_timestamp_key", DEFAULT_LAST_DATA_TIMESTAMP_KEY); + LOGGER.debug("[" + this.getName() + "] Reading configuration (last_data_timestamp_key=" + + lastDataTimeStampKey + ")"); + + lastDataSQLTimestampFormat = context.getString("last_data_sql_timestamp_format", DEFAULT_LAST_DATA_SQL_TS_FORMAT); + LOGGER.debug("[" + this.getName() + "] Reading configuration (last_data_sql_timestamp_format=" + + lastDataSQLTimestampFormat + ")"); + + } // configure + + @Override + public void stop() { + super.stop(); + if (virtuosoPersistenceBackend != null) virtuosoPersistenceBackend.close(); + } // stop + + @Override + public void start() { + try { + createPersistenceBackend(virtuosoHost, virtuosoPort, virtuosoUsername, virtuosoPassword, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, virtuosoOptions); + LOGGER.debug("[" + this.getName() + "] VIRTUOSO persistence backend created"); + } catch (Exception e) { + String configParams = " virtuosoHost " + virtuosoHost + " virtuosoPort " + virtuosoPort + " virtuosoUsername " + + virtuosoUsername + " virtuosoPassword " + virtuosoPassword + " maxPoolSize " + maxPoolSize + " maxPoolIdle " + maxPoolIdle + " minPoolIdle " + minPoolIdle + " minPoolIdleTimeMillis " + minPoolIdleTimeMillis + " virtuosoOptions " + + virtuosoOptions; + LOGGER.error("Error while creating the Virtuoso persistence backend. " + + "Config params= " + configParams + + "Details=" + e.getMessage() + + "Stack trace: " + Arrays.toString(e.getStackTrace())); + } // try catch + + super.start(); + LOGGER.info("[" + this.getName() + "] Startup completed"); + } // start + + /** + * Initialices a lazy singleton to share among instances on JVM + */ + private void createPersistenceBackend(String virtuosoHost, String virtuosoPort, String virtuosoUsername, String virtuosoPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, int minPoolIdleTimeMillis, String virtuosoOptions) { + if (virtuosoPersistenceBackend == null) { + virtuosoPersistenceBackend = new VirtuosoBackendImpl(virtuosoHost, virtuosoPort, virtuosoUsername, virtuosoPassword, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, virtuosoOptions); + } + } + + @Override + void persistBatch(NGSIBatch batch) + throws CygnusBadConfiguration, CygnusPersistenceError, CygnusRuntimeError, CygnusBadContextData { + if (batch == null) { + LOGGER.debug("[" + this.getName() + "] Null batch, nothing to do"); + return; + } // if + + // Iterate on the destinations + batch.startIterator(); + + while (batch.hasNext()) { + String destination = batch.getNextDestination(); + LOGGER.debug("[" + this.getName() + "] Processing sub-batch regarding the " + + destination + " destination"); + + // get the sub-batch for this destination + ArrayList events = batch.getNextEvents(); + + // get an aggregator for this destination and initialize it + NGSIGenericAggregator aggregator = getAggregator(/*rowAttrPersistence*/); + aggregator.setService(events.get(0).getServiceForNaming(enableNameMappings)); + aggregator.setServicePathForData(events.get(0).getServicePathForData()); + aggregator.setServicePathForNaming(events.get(0).getServicePathForNaming(enableNameMappings)); + aggregator.setEntityForNaming(events.get(0).getEntityForNaming(enableNameMappings, enableEncoding)); + aggregator.setEntityType(events.get(0).getEntityTypeForNaming(enableNameMappings)); + aggregator.setAttribute(events.get(0).getAttributeForNaming(enableNameMappings)); + aggregator.setSchemeName(buildSchemaName(aggregator.getService(), aggregator.getServicePathForNaming())); + aggregator.setDbName(buildDBName(events.get(0).getServiceForNaming(enableNameMappings))); + aggregator.setTableName(buildTableName(aggregator.getServicePathForNaming(), aggregator.getEntityForNaming(), aggregator.getEntityType(), aggregator.getAttribute())); + aggregator.setAttrNativeTypes(attrNativeTypes); + aggregator.setAttrMetadataStore(attrMetadataStore); + aggregator.setEnableGeoParse(true); + aggregator.setEnableNameMappings(enableNameMappings); + aggregator.setLastDataMode(lastDataMode); + aggregator.setLastDataTimestampKey(lastDataTimeStampKey); + aggregator.setLastDataUniqueKey(lastDataUniqueKey); + aggregator.initialize(events.get(0)); + for (NGSIEvent event : events) { + aggregator.aggregate(event); + } // for + // LOGGER.debug("[" + getName() + "] adding event to aggregator object (name=" + + // SQLQueryUtils.getFieldsForInsert(aggregator.getAggregation().keySet(), SQLQueryUtils.POSTGRES_FIELDS_MARK) + ", values=" + + // SQLQueryUtils.getValuesForInsert(aggregator.getAggregation(), attrNativeTypes) + ")"); + // persist the fieldValues + persistAggregation(aggregator); + batch.setNextPersisted(true); + } // for + } // persistBatch + + protected NGSIGenericAggregator getAggregator(/*boolean rowAttrPersistence*/) { + // if (rowAttrPersistence) { + // return new NGSIGenericRowAggregator(); + // } else { + return new NGSIGenericColumnAggregator(); + // } // if else + } // getAggregator + + @Override + public void capRecords(NGSIBatch batch, long maxRecords) throws CygnusCappingError { + } // capRecords + + @Override + public void expirateRecords(long expirationTime) throws CygnusExpiratingError { + } // expirateRecords + + + private void persistAggregation(NGSIGenericAggregator aggregator) throws CygnusPersistenceError, CygnusRuntimeError, CygnusBadContextData { + + String dataBaseName = aggregator.getDbName(enableLowercase); + String schemaName = aggregator.getSchemeName(enableLowercase); + String tableName = aggregator.getTableName(enableLowercase); + + // Escape a syntax error in SQL + if (schemaName.equals(DEFAULT_FIWARE_SERVICE)) { + schemaName = ESCAPED_DEFAULT_FIWARE_SERVICE; + } + + if (lastDataMode.equals("upsert")) { + // if (rowAttrPersistence) { + // LOGGER.warn("[" + this.getName() + "] no upsert due to row mode"); + // } else { + virtuosoPersistenceBackend.upsertTransaction(aggregator.getAggregationToPersist(), + aggregator.getLastDataToPersist(), + aggregator.getLastDataDeleteToPersist(), + dataBaseName, + schemaName, + tableName, + lastDataTableSuffix, + lastDataUniqueKey, + lastDataTimeStampKey, + lastDataSQLTimestampFormat, + attrNativeTypes); + //} + } + } // persistAggregation + + /** + * Creates a Virtuoso DB name given the FIWARE service. + * @param service + * @return The Virtuoso DB name + * @throws CygnusBadConfiguration + */ + public String buildDBName(String service) throws CygnusBadConfiguration { + String name = null; + + if (enableEncoding) { + switch(dataModel) { + case DMBYENTITYDATABASE: + case DMBYENTITYDATABASESCHEMA: + case DMBYENTITYTYPEDATABASE: + case DMBYENTITYTYPEDATABASESCHEMA: + case DMBYFIXEDENTITYTYPEDATABASE: + case DMBYFIXEDENTITYTYPEDATABASESCHEMA: + if (service != null) + name = NGSICharsets.encodePostgreSQL(service); + break; + default: + name = virtuosoDatabase; + } + } else { + switch(dataModel) { + case DMBYENTITYDATABASE: + case DMBYENTITYDATABASESCHEMA: + case DMBYENTITYTYPEDATABASE: + case DMBYENTITYTYPEDATABASESCHEMA: + case DMBYFIXEDENTITYTYPEDATABASE: + case DMBYFIXEDENTITYTYPEDATABASESCHEMA: + if (service != null) + name = NGSIUtils.encode(service, false, true); + break; + default: + name = virtuosoDatabase; + } + } // if else + if (name.length() > NGSIConstants.POSTGRESQL_MAX_NAME_LEN) { + throw new CygnusBadConfiguration("Building DB name '" + name + + "' and its length is greater than " + NGSIConstants.POSTGRESQL_MAX_NAME_LEN); + } // if + + return name; + } // buildSchemaName + + /** + * Creates a Virtuoso scheme name given the FIWARE service. + * @param service + * @return The Virtuoso scheme name + * @throws CygnusBadConfiguration + */ + public String buildSchemaName(String service, String subService) throws CygnusBadConfiguration { + String name; + + if (enableEncoding) { + switch(dataModel) { + case DMBYENTITYDATABASESCHEMA: + case DMBYENTITYTYPEDATABASESCHEMA: + case DMBYFIXEDENTITYTYPEDATABASESCHEMA: + name = NGSICharsets.encodePostgreSQL(subService); + break; + default: + name = NGSICharsets.encodePostgreSQL(service); + } + } else { + switch(dataModel) { + case DMBYENTITYDATABASESCHEMA: + case DMBYENTITYTYPEDATABASESCHEMA: + case DMBYFIXEDENTITYTYPEDATABASESCHEMA: + name = NGSIUtils.encode(subService, true, false); + break; + default: + name = NGSIUtils.encode(service, false, true); + } + } // if else + + if (name.length() > NGSIConstants.POSTGRESQL_MAX_NAME_LEN) { + throw new CygnusBadConfiguration("Building schema name '" + name + + "' and its length is greater than " + NGSIConstants.POSTGRESQL_MAX_NAME_LEN); + } // if + + return name; + } // buildSchemaName + + /** + * Creates a Virtuoso table name given the FIWARE service path, the entity and the attribute. + * @param servicePath + * @param entity + * @param attribute + * @return The Virtuoso table name + * @throws CygnusBadConfiguration + */ + public String buildTableName(String servicePath, String entity, String entityType, String attribute) throws CygnusBadConfiguration { + String name; + + if (enableEncoding) { + switch(dataModel) { + case DMBYSERVICEPATH: + name = NGSICharsets.encodePostgreSQL(servicePath); + break; + case DMBYENTITYDATABASE: + case DMBYENTITYDATABASESCHEMA: + case DMBYENTITY: + name = NGSICharsets.encodePostgreSQL(servicePath) + + CommonConstants.CONCATENATOR + + NGSICharsets.encodePostgreSQL(entity); + break; + case DMBYENTITYTYPEDATABASE: + case DMBYENTITYTYPEDATABASESCHEMA: + case DMBYENTITYTYPE: + name = NGSICharsets.encodePostgreSQL(servicePath) + + CommonConstants.CONCATENATOR + + NGSICharsets.encodePostgreSQL(entityType); + break; + case DMBYATTRIBUTE: + name = NGSICharsets.encodePostgreSQL(servicePath) + + CommonConstants.CONCATENATOR + + NGSICharsets.encodePostgreSQL(entity) + + CommonConstants.CONCATENATOR + + NGSICharsets.encodePostgreSQL(attribute); + break; + case DMBYFIXEDENTITYTYPE: + case DMBYFIXEDENTITYTYPEDATABASE: + case DMBYFIXEDENTITYTYPEDATABASESCHEMA: + name = NGSICharsets.encodePostgreSQL(entityType); + break; + default: + throw new CygnusBadConfiguration("Unknown data model '" + dataModel.toString() + + "'. Please, use dm-by-service-path, dm-by-entity, dm-by-entity-database, dm-by-entity-database-schema, dm-by-entity-type, dm-by-entity-type-database, dm-by-entity-type-database-schema, dm-by-fixed-entity-type-database, dm-by-fixed-entity-type-database-schema or dm-by-attribute"); + } // switch + } else { + switch(dataModel) { + case DMBYSERVICEPATH: + if (servicePath.equals("/")) { + throw new CygnusBadConfiguration("Default service path '/' cannot be used with " + + "dm-by-service-path data model"); + } // if + + name = NGSIUtils.encode(servicePath, true, false); + break; + case DMBYENTITYDATABASE: + case DMBYENTITYDATABASESCHEMA: + case DMBYENTITY: + String truncatedServicePath = NGSIUtils.encode(servicePath, true, false); + name = (truncatedServicePath.isEmpty() ? "" : truncatedServicePath + '_') + + NGSIUtils.encode(entity, false, true); + break; + case DMBYENTITYTYPEDATABASE: + case DMBYENTITYTYPEDATABASESCHEMA: + case DMBYENTITYTYPE: + truncatedServicePath = NGSIUtils.encode(servicePath, true, false); + name = (truncatedServicePath.isEmpty() ? "" : truncatedServicePath + '_') + + NGSIUtils.encode(entityType, false, true); + break; + case DMBYATTRIBUTE: + truncatedServicePath = NGSIUtils.encode(servicePath, true, false); + name = (truncatedServicePath.isEmpty() ? "" : truncatedServicePath + '_') + + NGSIUtils.encode(entity, false, true) + + '_' + NGSIUtils.encode(attribute, false, true); + break; + case DMBYFIXEDENTITYTYPEDATABASE: + case DMBYFIXEDENTITYTYPEDATABASESCHEMA: + case DMBYFIXEDENTITYTYPE: + name = NGSIUtils.encode(entityType, false, true); + break; + default: + throw new CygnusBadConfiguration("Unknown data model '" + dataModel.toString() + + "'. Please, use DMBYSERVICEPATH, DMBYENTITYDATABASE, DMBYENTITYDATABASESCHEMA, DMBYENTITY, DMBYENTITYTYPEDATABASE, DMBYENTITYTYPEDATABASESCHEMA, DMBYENTITYTYPE, DMBYFIXEDENTITYTYPE, DMBYFIXEDENTITYTYPEDATABASE, DMBYFIXEDENTITYTYPEDATABASESCHEMA or DMBYATTRIBUTE"); + } // switch + } // if else + + if (name.length() > NGSIConstants.POSTGRESQL_MAX_NAME_LEN) { + throw new CygnusBadConfiguration("Building table name '" + name + + "' and its length is greater than " + NGSIConstants.POSTGRESQL_MAX_NAME_LEN); + } // if + + return name; + } // buildTableName + +} // NGSIVirtuosoSink diff --git a/doc/cygnus-ngsi/flume_extensions_catalogue/ngsi_virtuoso_sink.md b/doc/cygnus-ngsi/flume_extensions_catalogue/ngsi_virtuoso_sink.md new file mode 100644 index 000000000..26ee98c5b --- /dev/null +++ b/doc/cygnus-ngsi/flume_extensions_catalogue/ngsi_virtuoso_sink.md @@ -0,0 +1,89 @@ +# NGSIVirtuosoSink +Content: + +* [Functionality](#section1) +* [Administration guide](#section2) + * [Configuration](#section2.1) + + +## Functionality +`com.iot.telefonica.cygnus.sinks.NGSIVirtuosoSink`, or simply `NGSIVirtuosoSink` is a sink designed to persist NGSI-like context data events within a [Virtuoso server](https://virtuoso.openlinksw.com/). Usually, such a context data is notified by a [Orion Context Broker](https://github.com/telefonicaid/fiware-orion) instance, but could be any other system speaking the NGSI language. + +Independently of the data generator, NGSI context data is always transformed into internal `NGSIEvent` objects at Cygnus sources. In the end, the information within these events must be mapped into specific Virtuoso data structures (like RDFs). + +Next sections will explain this in detail. + +[Top](#top) + + +## Administration guide +### Configuration +`NGSIVirtuosoSink` is configured through the following parameters: + + +| Parameter | Mandatory | Default value | Comments | +|---|---|---|---| +| type | yes | N/A | Must be com.telefonica.iot.cygnus.sinks.NGSIVirtuosoSink | +| channel | yes | N/A || +| enable\_encoding | no | false | true or false, true applies the new encoding, false applies the old encoding. | +| enable\_name\_mappings | no | false | true or false. Check this [link](./ngsi_name_mappings_interceptor.md) for more details. | +| enable\_lowercase | no | false | true or false. | +| last\_data\_mode | no | upsert | upsert, to set last data mode. Check this [link](./last_data_function.md) for more details. | +| last\_data\_table\_suffix | no | false | This suffix will be added to the table name in order to know where Cygnus will store the last record of an entity. Check this [link](./last_data_function.md) for more details. | +| last\_data\_unique\_key | no | entityId | This must be a unique key on the database to find when a previous record exists. Check this [link](./last_data_function.md) for more details. | +| last\_data\_timestamp\_key | no | recvTime | This must be a timestamp key on the aggregation to know which record is older. Check this [link](./last_data_function.md) for more details. | +| last\_data\_sql_timestamp\_format | no | YYYY-MM-DD HH24:MI:SS.MS | This must be a timestamp format to cast [SQL Text to timestamp](https://www.postgresql.org/docs/9.1/functions-formatting.html). Check this [link](./last_data_function.md) for more details. | +| data\_model | no | dm-by-entity | dm-by-service-path or dm-by-entity or dm-by-entity-type or dm-by-entity-database or dm-by-entity-database-schema or dm-by-entity-type-database or dm-by-entity-type-database-schema. dm-by-service and are not currently supported. | +| virtuoso\_host | no | localhost | FQDN/IP address where the Virtuoso server runs. | +| virtuoso\_port | no | 1111 || +| virtuoso\_database | no | vdb | `vdb` is the default database that is created automatically when install. +| virtuoso\_username | no | dba | `dba` is the default username that is created automatically when install | +| virtuoso\_password | no | N/A | Empty value by default (No password is created when install) | +| virtuoso\_maxPoolSize | no | 3 | Max number of connections per database pool | +| virtuoso\_maxPoolIdle | no | 2 | Max number of idle connections per database pool | +| virtuoso\_minPoolIdle | no | 0 | Min number of idle connections per database pool | +| virtuoso\_minPoolIdleTimeMillis | no | 10000 | minimum amount of time an idle connection before is eligible for eviction | +| virtuoso\_options | no | N/A | optional connection parameter(s) concatinated to jdbc url if necessary
| +| attr\_persistence | no | row | row or column. | +| attr\_metadata\_store | no | false | true or false. | +| attr\_native\_types | no | false | if the attribute value will be native true or stringfy or false. | +| batch\_size | no | 1 | Number of events accumulated before persistence. | +| batch\_timeout | no | 30 | Number of seconds the batch will be building before it is persisted as it is. | +| batch\_ttl | no | 10 | Number of retries when a batch cannot be persisted. Use `0` for no retries, `-1` for infinite retries. Please, consider an infinite TTL (even a very large one) may consume all the sink's channel capacity very quickly. | +| batch\_retry\_intervals | no | 5000 | Comma-separated list of intervals (in miliseconds) at which the retries regarding not persisted batches will be done. First retry will be done as many miliseconds after as the first value, then the second retry will be done as many miliseconds after as second value, and so on. If the batch\_ttl is greater than the number of intervals, the last interval is repeated. | +| backend.enable\_cache | no | false | true or false, true enables the creation of a Cache, false disables the creation of a Cache. | +| persist\_errors | no | true | if there is an exception when trying to persist data into storage then error is persisted into a table | + + +A configuration example could be: + + cygnus-ngsi.sinks = mysql-sink + cygnus-ngsi.channels = mysql-channel + ... + cygnus-ngsi.sinks.mysql-sink.type = com.telefonica.iot.cygnus.sinks.NGSIMySQLSink + cygnus-ngsi.sinks.mysql-sink.channel = mysql-channel + + + + cygnus-ngsi.sinks.virtuoso-sink.enable_grouping = false + cygnus-ngsi.sinks.virtuoso-sink.enable_name_mappings = true + cygnus-ngsi.sinks.virtuoso-sink.data_model = dm-by-entity + cygnus-ngsi.sinks.virtuoso-sink.virtuoso_host = 172.17.0.1 + cygnus-ngsi.sinks.virtuoso-sink.virtuoso_port = 1111 + cygnus-ngsi.sinks.virtuoso-sink.virtuoso_username = dba + cygnus-ngsi.sinks.virtuoso-sink.virtuoso_password = dba + cygnus-ngsi.sinks.virtuoso-sink.virtuoso_database = vdb + cygnus-ngsi.sinks.virtuoso-sink.attr_persistence = column + cygnus-ngsi.sinks.virtuoso-sink.attr_native_types = true + cygnus-ngsi.sinks.virtuoso-sink.attr_metadata_store = false + cygnus-ngsi.sinks.virtuoso-sink.batch_size = 2 + cygnus-ngsi.sinks.virtuoso-sink.batch_timeout = 10 + cygnus-ngsi.sinks.virtuoso-sink.batch_ttl = -1 + cygnus-ngsi.sinks.virtuoso-sink.last_data_mode = upsert + cygnus-ngsi.sinks.virtuoso-sink.last_data_table_suffix = _lastdata + cygnus-ngsi.sinks.virtuoso-sink.last_data_unique_key = entityId + cygnus-ngsi.sinks.virtuoso-sink.last_data_timestamp_key = recvTime + cygnus-ngsi.sinks.virtuoso-sink.last_data_sql_timestamp_format = YYYY-MM-DD HH24:MI:SS.MS + + +[Top](#top)