Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
*/
package org.apache.zeppelin.influxdb;

import com.influxdb.query.FluxRecord;
import java.util.Properties;
import java.util.StringJoiner;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -24,6 +24,7 @@
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.InfluxDBClientOptions;
import com.influxdb.client.QueryApi;
import java.util.stream.Collectors;
import org.apache.zeppelin.interpreter.AbstractInterpreter;
import org.apache.zeppelin.interpreter.ZeppelinContext;
import org.slf4j.Logger;
Expand Down Expand Up @@ -83,71 +84,83 @@ protected InterpreterResult internalInterpret(String query, InterpreterContext c
LOGGER.debug("Run Flux command '{}'", query);
query = query.trim();

QueryApi queryService = getInfluxDBClient(context);
QueryApi queryService = getQueryApi();

final int[] actualIndex = {-1};
final int[] currentTableIndex = {-1};

AtomicReference<InterpreterResult> resultRef = new AtomicReference<>();
CountDownLatch countDownLatch = new CountDownLatch(1);

StringBuilder result = new StringBuilder();
StringBuilder resultBuilder = new StringBuilder();
queryService.query(
query,

//process record
(cancellable, fluxRecord) -> {

Integer tableIndex = fluxRecord.getTable();
if (actualIndex[0] != tableIndex) {
result.append(NEWLINE);
result.append(TABLE_MAGIC_TAG);
actualIndex[0] = tableIndex;

//add column names to table header
StringJoiner joiner = new StringJoiner(TAB);
fluxRecord.getValues().keySet().forEach(c -> joiner.add(replaceReservedChars(c)));
result.append(joiner.toString());
result.append(NEWLINE);
}

StringJoiner rowsJoiner = new StringJoiner(TAB);
for (Object value : fluxRecord.getValues().values()) {
if (value == null) {
value = EMPTY_COLUMN_VALUE;
}
rowsJoiner.add(replaceReservedChars(value.toString()));
}
result.append(rowsJoiner.toString());
result.append(NEWLINE);
},

throwable -> {

LOGGER.error(throwable.getMessage(), throwable);
resultRef.set(new InterpreterResult(InterpreterResult.Code.ERROR,
throwable.getMessage()));

countDownLatch.countDown();

}, () -> {
//on complete
InterpreterResult intpResult = new InterpreterResult(InterpreterResult.Code.SUCCESS);
intpResult.add(result.toString());
resultRef.set(intpResult);
countDownLatch.countDown();
}
(cancellable, fluxRecord) -> handleRecord(fluxRecord, currentTableIndex, resultBuilder),
throwable -> handleError(throwable, resultRef, countDownLatch),
() -> handleComplete(resultBuilder, resultRef, countDownLatch)
);

awaitLatch(countDownLatch);

return resultRef.get();
}

private void handleRecord(FluxRecord fluxRecord, int[] currentTableIndex,
StringBuilder resultBuilder) {
Integer tableIndex = fluxRecord.getTable();
if (currentTableIndex[0] != tableIndex) {
appendTableHeader(fluxRecord, resultBuilder);
currentTableIndex[0] = tableIndex;
}

appendTableRow(fluxRecord, resultBuilder);
}

private void appendTableHeader(FluxRecord fluxRecord, StringBuilder resultBuilder) {
resultBuilder.append(NEWLINE).append(TABLE_MAGIC_TAG);
String headerLine = fluxRecord.getValues().keySet().stream()
.map(this::replaceReservedChars)
.collect(Collectors.joining(TAB));
resultBuilder.append(headerLine).append(NEWLINE);
}

private void appendTableRow(FluxRecord fluxRecord, StringBuilder resultBuilder) {
String rowLine = fluxRecord.getValues().values().stream()
.map(v -> v == null ? EMPTY_COLUMN_VALUE : v.toString())
.map(this::replaceReservedChars)
.collect(Collectors.joining(TAB));
resultBuilder.append(rowLine).append(NEWLINE);
}

private static void handleError(Throwable throwable, AtomicReference<InterpreterResult> resultRef,
CountDownLatch countDownLatch) {
LOGGER.error(throwable.getMessage(), throwable);
resultRef.set(new InterpreterResult(InterpreterResult.Code.ERROR,
throwable.getMessage()));

countDownLatch.countDown();
}

private static void handleComplete(StringBuilder resultBuilder,
AtomicReference<InterpreterResult> resultRef,
CountDownLatch countDownLatch) {
InterpreterResult intpResult = new InterpreterResult(InterpreterResult.Code.SUCCESS);
intpResult.add(resultBuilder.toString());
resultRef.set(intpResult);
countDownLatch.countDown();
}

private static void awaitLatch(CountDownLatch countDownLatch) throws InterpreterException {
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new InterpreterException(e);
}

return resultRef.get();
}


private QueryApi getInfluxDBClient(InterpreterContext context) {
private QueryApi getQueryApi() {
if (queryApi == null) {
queryApi = this.client.getQueryApi();
}
Expand All @@ -156,7 +169,7 @@ private QueryApi getInfluxDBClient(InterpreterContext context) {


@Override
public void open() throws InterpreterException {
public void open() {

if (this.client == null) {
InfluxDBClientOptions opt = InfluxDBClientOptions.builder()
Expand All @@ -172,25 +185,25 @@ public void open() throws InterpreterException {
}

@Override
public void close() throws InterpreterException {
public void close() {
if (this.client != null) {
this.client.close();
this.client = null;
}
}

@Override
public void cancel(InterpreterContext context) throws InterpreterException {
public void cancel(InterpreterContext context) {

}

@Override
public FormType getFormType() throws InterpreterException {
public FormType getFormType() {
return FormType.SIMPLE;
}

@Override
public int getProgress(InterpreterContext context) throws InterpreterException {
public int getProgress(InterpreterContext context) {
return 0;
}

Expand Down
Loading