Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/main/java/io/kestra/plugin/influxdb/AbstractLoad.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@
)
public abstract class AbstractLoad extends AbstractTask implements RunnableTask<AbstractLoad.Output> {
@Schema(
title = "The source file URI",
description = "URI of the file containing data to be loaded into InfluxDB"
title = "Source file URI",
description = "URI in internal storage (e.g., `kestra://...`) containing the data to load"
)
@NotNull
@PluginProperty(internalStorageURI = true)
private Property<String> from;

@Schema(
title = "Chunk size for each bulk write request",
description = "Number of points to include in each write batch"
title = "Chunk size per write",
description = "Number of points per batch when writing to InfluxDB; defaults to 1000"
)
@Builder.Default
private Property<Integer> chunk = Property.ofValue(1000);
Expand Down Expand Up @@ -116,4 +116,4 @@ public static class Output implements io.kestra.core.models.tasks.Output {
@Schema(title = "Number of records written to InfluxDB")
private final Integer recordCount;
}
}
}
29 changes: 13 additions & 16 deletions src/main/java/io/kestra/plugin/influxdb/AbstractQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,15 @@
@NoArgsConstructor
public abstract class AbstractQuery extends AbstractTask {
@Schema(
title = "Query to execute",
description = "Query to run against InfluxDB"
title = "Query string",
description = "Flux or InfluxQL statement to run against InfluxDB"
)
@NotNull
protected Property<String> query;

@Schema(
title = "The way you want to store the data.",
description = "FETCH_ONE output the first row, "
+ "FETCH output all the rows, "
+ "STORE store all rows in a file, "
+ "NONE do nothing."
title = "Fetch behavior",
description = "`FETCH_ONE` returns the first row, `FETCH` returns all rows, `STORE` writes rows to an ION file, `NONE` only records metrics; default is `NONE`"
)
@Builder.Default
protected Property<FetchType> fetchType = Property.ofValue(FetchType.NONE);
Expand Down Expand Up @@ -90,31 +87,31 @@ protected Output handleFetchType(RunContext runContext, List<Map<String, Object>
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@Schema(
title = "The number of rows fetched."
title = "Number of rows returned"
)
private Integer size;

@Schema(
title = "The total number of the rows fetched without pagination."
title = "Total rows without pagination"
)
private Long total;

@Schema(
title = "List containing the fetched data.",
description = "Only populated if using `fetchType=FETCH`."
title = "Fetched rows",
description = "Only populated when `fetchType=FETCH`"
)
private List<Map<String, Object>> rows;

@Schema(
title = "Map containing the first row of fetched data.",
description = "Only populated if using `fetchType=FETCH_ONE`."
title = "First fetched row",
description = "Only populated when `fetchType=FETCH_ONE`"
)
private Map<String, Object> row;

@Schema(
title = "The URI of the stored data.",
description = "Only populated if using `fetchType=STORE`."
title = "URI of stored data",
description = "Only populated when `fetchType=STORE`"
)
private URI uri;
}
}
}
13 changes: 7 additions & 6 deletions src/main/java/io/kestra/plugin/influxdb/AbstractTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,21 @@
@NoArgsConstructor
public abstract class AbstractTask extends Task {
@Schema(
title = "InfluxDB connection properties."
title = "InfluxDB connection",
description = "Connection settings (URL, token, optional options) reused across tasks"
)
@NotNull
protected InfluxDBConnection connection;

@Schema(
title = "InfluxDB bucket.",
description = "The bucket to use for operations."
title = "Bucket name",
description = "Target bucket for write/query operations when required; ignored by Flux queries"
)
protected Property<String> bucket;

@Schema(
title = "InfluxDB organization.",
description = "The organization to use for operations."
title = "Organization name",
description = "Organization scope for queries and writes"
)
@NotNull
protected Property<String> org;
Expand All @@ -48,4 +49,4 @@ public abstract class AbstractTask extends Task {
protected InfluxDBClient client(RunContext runContext) throws IllegalVariableEvaluationException {
return connection.client(runContext);
}
}
}
6 changes: 3 additions & 3 deletions src/main/java/io/kestra/plugin/influxdb/FluxQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
@Getter
@NoArgsConstructor
@Schema(
title = "Query InfluxDB using Flux language.",
description = "Execute a Flux query against InfluxDB."
title = "Run Flux query against InfluxDB",
description = "Executes a Flux query for a given organization. The bucket parameter is ignored because the query embeds it; `fetchType` controls whether rows are returned, stored as ION, or only counted."
)
@Plugin(
examples = {
Expand Down Expand Up @@ -149,4 +149,4 @@ public Output run(RunContext runContext) throws Exception {
return handleFetchType(runContext, results);
}
}
}
}
5 changes: 3 additions & 2 deletions src/main/java/io/kestra/plugin/influxdb/FluxTrigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
@Getter
@NoArgsConstructor
@Schema(
title = "Trigger a flow on a Flux query of InfluxDB that returns results."
title = "Trigger flow when Flux query returns rows",
description = "Polls a Flux query at a fixed interval (default 60 seconds) and starts a flow when at least one row is returned; `fetchType` mirrors FluxQuery output handling."
)
@Plugin(
examples = {
Expand Down Expand Up @@ -98,4 +99,4 @@ public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerCo
TriggerService.generateExecution(this, conditionContext, context, output)
);
}
}
}
6 changes: 3 additions & 3 deletions src/main/java/io/kestra/plugin/influxdb/InfluxQLQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
@Getter
@NoArgsConstructor
@Schema(
title = "Query InfluxDB using InfluxQL.",
description = "Execute an InfluxQL query against InfluxDB."
title = "Run InfluxQL query against InfluxDB",
description = "Executes an InfluxQL query for the specified bucket and organization. `fetchType` controls whether rows are returned inline, stored, or just counted."
)
@Plugin(
examples = {
Expand Down Expand Up @@ -142,4 +142,4 @@ public AbstractQuery.Output run(RunContext runContext) throws Exception {
return handleFetchType(runContext, results);
}
}
}
}
18 changes: 9 additions & 9 deletions src/main/java/io/kestra/plugin/influxdb/Load.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
@Getter
@NoArgsConstructor
@Schema(
title = "Load data points to InfluxDB from a file.",
description = "Load data points to InfluxDB from an ION file where each record becomes a data point."
title = "Load ION records into InfluxDB",
description = "Reads an ION file from internal storage and writes each object as a point to InfluxDB with nanosecond precision. Uses the provided `measurement`, batches writes in chunks of 1000 by default, and skips a `time` field unless `timeField` is set so InfluxDB assigns the server timestamp."
)
@Plugin(
examples = {
Expand Down Expand Up @@ -65,21 +65,21 @@
)
public class Load extends AbstractLoad {
@Schema(
title = "Measurement name",
description = "The measurement name to be used for all points from the ION file"
title = "Measurement for all points",
description = "Measurement applied to every point from the ION file"
)
@NotNull
private Property<String> measurement;

@Schema(
title = "List of field names to use as tags",
description = "Fields listed here will be added as tags; all others will be added as fields"
title = "Fields treated as tags",
description = "Fields listed here are stored as tags; remaining keys become fields"
)
private Property<List<String>> tags;

@Schema(
title = "Field name to use as timestamp",
description = "The field containing timestamp values. If null, InfluxDB will use the current time."
title = "Timestamp field name",
description = "Field whose value becomes the point timestamp with nanosecond precision; when unset a `time` key is skipped and InfluxDB assigns the server time"
)
private Property<String> timeField;

Expand Down Expand Up @@ -132,4 +132,4 @@ protected Flux<Point> source(RunContext runContext, BufferedReader inputStream)
return point;
}));
}
}
}
12 changes: 6 additions & 6 deletions src/main/java/io/kestra/plugin/influxdb/Write.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
@Getter
@NoArgsConstructor
@Schema(
title = "Write data to InfluxDB using line protocol.",
description = "Write data to InfluxDB using InfluxDB line protocol format."
title = "Write line protocol to InfluxDB",
description = "Sends raw InfluxDB line protocol to a bucket/org with configurable timestamp precision (default nanoseconds). Counts and reports written lines."
)
@Plugin(
examples = {
Expand Down Expand Up @@ -64,15 +64,15 @@
)
public class Write extends AbstractTask implements RunnableTask<Write.Output> {
@Schema(
title = "Data in InfluxDB line protocol format",
description = "Multiline string in InfluxDB wire format (line protocol)"
title = "Line protocol payload",
description = "Multiline string in InfluxDB line protocol"
)
@NotNull
private Property<String> source;

@Schema(
title = "Write precision",
description = "The precision for the unix timestamps within the body line-protocol"
title = "Timestamp precision",
description = "Precision applied to unix timestamps in the payload; defaults to nanoseconds"
)
@Builder.Default
private Property<WritePrecision> precision = Property.ofValue(WritePrecision.NS);
Expand Down
Loading