Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.springframework.boot.autoconfigure.r2dbc.R2dbcProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;

import java.util.Objects;

@ConfigurationProperties(prefix = "timescaledb")
@Getter
@Setter
Expand All @@ -39,6 +41,15 @@ public class TimescaleDBProperties {
//数据库的schema
private String schema = "public";

/**
* TimescaleDB超表函数所在的位置
*/
private String functionSchema = null;

public String getFunctionSchema() {
return functionSchema == null ? schema : functionSchema;
}

/**
* 写入缓冲区配置
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ public static String getTableName(String name) {
return ThingsDatabaseUtils.createTableName(name);
}

public static NativeSelectColumn createTimeGroupColumn(long startWith, Interval interval) {
public static NativeSelectColumn createTimeGroupColumn(long startWith, Interval interval, String functionSchema) {

String unit = interval.getNumber().intValue() + " " + interval
.getUnit()
.name()
.toLowerCase();

return NativeSelectColumn
.of("time_bucket('" + unit + "',timestamp)");
.of("\"" + functionSchema + "\"" + ".time_bucket('" + unit + "',timestamp)");
}

public static TimeSeriesData convertToTimeSeriesData(Record record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.jetlinks.community.timescaledb.configuration;

import org.jetlinks.community.timescaledb.TimescaleDBOperations;
import org.jetlinks.community.timescaledb.TimescaleDBProperties;
import org.jetlinks.community.timescaledb.timeseries.TimescaleDBTimeSeriesManager;
import org.jetlinks.community.timescaledb.timeseries.TimescaleDBTimeSeriesProperties;
import org.springframework.boot.autoconfigure.AutoConfiguration;
Expand All @@ -35,8 +36,9 @@ public class TimescaleDBTimeSeriesConfiguration {
@Bean
@Primary
public TimescaleDBTimeSeriesManager timescaleDBTimeSeriesManager(TimescaleDBOperations operations,
TimescaleDBTimeSeriesProperties properties) {
return new TimescaleDBTimeSeriesManager(properties, operations);
TimescaleDBTimeSeriesProperties properties,
TimescaleDBProperties timescaleDBProperties) {
return new TimescaleDBTimeSeriesManager(properties, operations,timescaleDBProperties);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.jetlinks.community.timescaledb.TimescaleDBOperations;
import org.jetlinks.community.timescaledb.TimescaleDBProperties;
import org.jetlinks.community.timescaledb.metadata.TimescaleDBDialectProvider;
import org.jetlinks.community.timescaledb.metadata.TimescaleDBPropertiesFeature;
import org.springframework.beans.BeansException;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
Expand Down Expand Up @@ -64,7 +65,7 @@ public void init() {
RDBDatabaseMetadata database = new RDBDatabaseMetadata(Dialect.POSTGRES);
database.addFeature(sqlExecutor);
database.addFeature(ReactiveSyncSqlExecutor.of(sqlExecutor));

database.addFeature(TimescaleDBPropertiesFeature.of(properties));
RDBSchemaMetadata schema = TimescaleDBDialectProvider.GLOBAL.createSchema(properties.getSchema());
database.addSchema(schema);
database.setCurrentSchema(schema);
Expand All @@ -91,6 +92,7 @@ public void init() {
.create("TimescaleDB", datasource);
disposable.add(dataSource);
database = dataSource.operator();
database.getMetadata().addFeature(TimescaleDBPropertiesFeature.of(properties));
}
writer = new DefaultTimescaleDBDataWriter(database, properties.getWriteBuffer());
writer.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,11 @@

public class TimescaleDBCreateTableSqlBuilder extends CommonCreateTableSqlBuilder {

private String schema;

public TimescaleDBCreateTableSqlBuilder(String schema) {
this.schema = schema;
}

@Override
public SqlRequest build(RDBTableMetadata table) {
DefaultBatchSqlRequest sqlRequest = (DefaultBatchSqlRequest) super.build(table);


table.getFeature(CreateHypertable.ID)
.ifPresent(createHypertable -> sqlRequest.addBatch(createCreateHypertableSQL(table, createHypertable)));

Expand All @@ -47,9 +42,12 @@ private SqlRequest createCreateRetentionPolicySQL(RDBTableMetadata table, Create

String interval = createHypertable.getInterval().getNumber().intValue() + " "
+ createHypertable.getInterval().getUnit().name().toLowerCase();
String functionSchema = table.findFeatureNow(TimescaleDBPropertiesFeature.ID)
.getProperties()
.getFunctionSchema();

return SqlRequests.of(
"SELECT "+ schema +".add_retention_policy( ? , INTERVAL '" + interval + "')",
"SELECT " + "\"" + functionSchema + "\"" + ".add_retention_policy( ? , INTERVAL '" + interval + "')",
table.getFullName()
);
}
Expand All @@ -58,9 +56,11 @@ private SqlRequest createCreateHypertableSQL(RDBTableMetadata table, CreateHyper

String interval = createHypertable.getChunkTimeInterval().getNumber().intValue() + " "
+ createHypertable.getChunkTimeInterval().getUnit().name().toLowerCase();

String functionSchema = table.findFeatureNow(TimescaleDBPropertiesFeature.ID)
.getProperties()
.getFunctionSchema();
return SqlRequests.of(
"SELECT "+ schema +".create_hypertable( ? , ? , chunk_time_interval => INTERVAL '" + interval + "')",
"SELECT " + "\"" + functionSchema + "\"" + ".create_hypertable( ? , ? , chunk_time_interval => INTERVAL '" + interval + "')",
table.getFullName(),
table.getColumnNow(createHypertable.getColumn()).getName()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public String getBindSymbol() {
@Override
public RDBSchemaMetadata createSchema(String name) {
PostgresqlSchemaMetadata schema = new PostgresqlSchemaMetadata(name);
schema.addFeature(new TimescaleDBCreateTableSqlBuilder(name));
schema.addFeature(new TimescaleDBCreateTableSqlBuilder());
schema.addFeature(new TimescaleDBAlterTableSqlBuilder());
DefaultValueCodecFactory codecFactory = new DefaultValueCodecFactory();
codecFactory
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.jetlinks.community.timescaledb.metadata;

import lombok.AllArgsConstructor;
import lombok.Getter;
import org.hswebframework.ezorm.core.FeatureId;
import org.hswebframework.ezorm.core.FeatureType;
import org.hswebframework.ezorm.core.meta.Feature;
import org.jetlinks.community.timescaledb.TimescaleDBProperties;

@AllArgsConstructor(staticName = "of")
@Getter
public class TimescaleDBPropertiesFeature implements Feature, FeatureType {

public static final FeatureId<TimescaleDBPropertiesFeature> ID = FeatureId.of("TimescaleDBPropertiesFeature");

private final TimescaleDBProperties properties;

@Override
public String getId() {
return ID.getId();
}

@Override
public String getName() {
return "TimescaleDBPropertiesFeature";
}

@Override
public FeatureType getType() {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.hswebframework.web.crud.query.QueryHelper;
import org.jetlinks.core.metadata.EventMetadata;
import org.jetlinks.community.timescaledb.metadata.TimescaleDBPropertiesFeature;
import org.jetlinks.core.things.ThingsRegistry;
import org.jetlinks.community.things.data.AggregationRequest;
import org.jetlinks.community.things.data.PropertyAggregation;
Expand All @@ -37,7 +37,6 @@
import org.jetlinks.community.things.data.operations.ColumnModeQueryOperationsBase;
import org.jetlinks.community.things.data.operations.DataSettings;
import org.jetlinks.community.things.data.operations.MetricBuilder;
import org.jetlinks.community.things.data.operations.RowModeQueryOperationsBase;
import org.jetlinks.community.timescaledb.TimescaleDBUtils;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.community.timeseries.query.Aggregation;
Expand Down Expand Up @@ -122,7 +121,11 @@ static Flux<AggregationData> doAggregation0(DatabaseOperator database,
if (request.getInterval() != null) {
NativeSelectColumn column = createTimeGroupColumn(
request.getFrom().getTime(),
request.getInterval()
request.getInterval(),
database.getMetadata()
.getFeatureNow(TimescaleDBPropertiesFeature.ID)
.getProperties()
.getFunctionSchema()
);
query.groupBy(column);
query.select(column);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.hswebframework.web.crud.query.QueryHelper;
import org.jetlinks.core.metadata.EventMetadata;
import org.jetlinks.community.timescaledb.metadata.TimescaleDBPropertiesFeature;
import org.jetlinks.core.things.ThingsRegistry;
import org.jetlinks.community.things.data.AggregationRequest;
import org.jetlinks.community.things.data.PropertyAggregation;
Expand All @@ -49,8 +49,6 @@
import java.util.*;
import java.util.function.Function;

import static org.jetlinks.community.timescaledb.thing.TimescaleDBColumnModeQueryOperations.doAggregation0;

@Slf4j
public class TimescaleDBRowModeQueryOperations extends RowModeQueryOperationsBase {
private final DatabaseOperator database;
Expand Down Expand Up @@ -117,7 +115,11 @@ protected Flux<AggregationData> doAggregation(String metric,
if (request.getInterval() != null) {
NativeSelectColumn column = TimescaleDBUtils.createTimeGroupColumn(
request.getFrom().getTime(),
request.getInterval()
request.getInterval(),
database.getMetadata()
.getFeatureNow(TimescaleDBPropertiesFeature.ID)
.getProperties()
.getFunctionSchema()
);

query.groupBy(column);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.hswebframework.ezorm.rdb.codec.DateTimeCodec;
import org.hswebframework.ezorm.rdb.metadata.RDBIndexMetadata;
import org.hswebframework.ezorm.rdb.operator.ddl.TableBuilder;
import org.jetlinks.community.timescaledb.TimescaleDBProperties;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.community.Interval;
import org.jetlinks.community.things.data.ThingsDataConstants;
Expand Down Expand Up @@ -48,6 +49,8 @@ public class TimescaleDBTimeSeriesManager implements TimeSeriesManager {

private final TimescaleDBOperations operations;

private final TimescaleDBProperties timescaleDBProperties;

@Override
public TimeSeriesService getService(TimeSeriesMetric metric) {
return getService(metric.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.hswebframework.ezorm.rdb.operator.dml.query.SelectColumn;
import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.community.timescaledb.metadata.TimescaleDBPropertiesFeature;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.community.things.data.ThingsDataConstants;
import org.jetlinks.community.timescaledb.TimescaleDBOperations;
Expand Down Expand Up @@ -113,7 +114,14 @@ public Flux<AggregationData> aggregation(AggregationQueryParam param) {
for (Group group : groups) {
if (group instanceof TimeGroup) {
_timeGroup = ((TimeGroup) group);
NativeSelectColumn column = TimescaleDBUtils.createTimeGroupColumn(startWith, _timeGroup.getInterval());
NativeSelectColumn column = TimescaleDBUtils.createTimeGroupColumn(startWith,
_timeGroup.getInterval(),
operations
.database()
.getMetadata()
.getFeatureNow(TimescaleDBPropertiesFeature.ID)
.getProperties()
.getFunctionSchema());
column.setColumn(ThingsDataConstants.COLUMN_TIMESTAMP);
column.setAlias(group.getAlias());
query.select(column);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ REDIS_DATABASE: 0 # redis 数据库索引

# timescalbedb相关配置
TIMESCALEDB_SCHEMA: public # timescaledb 数据库schema,默认public
TIMESCALEDB_FUNCTION_SCHEMA: public # 时序数据所用到相关函数所在的schema


# elasticsearch相关配置
Expand Down
1 change: 1 addition & 0 deletions jetlinks-standalone/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ timescaledb:
# username: postgres
# password: p@ssw0rd
schema: ${TIMESCALEDB_SCHEMA:${easyorm.default-schema}} # timescaledb的schema,默认public
function-schema: ${TIMESCALEDB_FUNCTION_SCHEMA:${timescaledb.schema}} #时序数据所用到相关函数所在的schema
time-series:
enabled: true
retention-policies:
Expand Down