Skip to content

Commit 69b614d

Browse files
authored
Fix some more rest api inconsistencies (#1210)
Fix some more rest api inconsistencies - Use the name sourceDescription(s) for fields that contain a SourceDescription or List<SourceDescription> - Use the name queryDescription(s) for fields that contain a QueryDescription or List<QueryDescription> - List sinks in the RunningQuery object (instead of a topic name) - Return a simple string for query ids - Include sinks in QueryDescriptions. Previously we had a bug where they were not being listed. - Display read queries in SourceDescriptions. Previously we had a bug where readQueries actually listed queries writing into a source - Return RunningQuery objects in SourceDescription Fixes #1199 Fixes #1206
1 parent 0b59057 commit 69b614d

File tree

27 files changed

+173
-216
lines changed

27 files changed

+173
-216
lines changed

ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.confluent.ksql.rest.entity.QueryDescription;
2626
import io.confluent.ksql.rest.entity.QueryDescriptionEntity;
2727
import io.confluent.ksql.rest.entity.QueryDescriptionList;
28+
import io.confluent.ksql.rest.entity.RunningQuery;
2829
import io.confluent.ksql.rest.entity.SourceDescriptionEntity;
2930
import io.confluent.ksql.rest.entity.SourceDescriptionList;
3031
import org.apache.commons.lang3.StringUtils;
@@ -385,10 +386,11 @@ private void printAsTable(KsqlEntity ksqlEntity) {
385386
.withColumnHeaders(PROPERTIES_COLUMN_HEADERS)
386387
.withRows(propertiesRowValues(properties));
387388
} else if (ksqlEntity instanceof Queries) {
388-
List<Queries.RunningQuery> runningQueries = ((Queries) ksqlEntity).getQueries();
389+
List<RunningQuery> runningQueries = ((Queries) ksqlEntity).getQueries();
389390
tableBuilder.withColumnHeaders("Query ID", "Kafka Topic", "Query String");
390391
runningQueries.forEach(
391-
r -> tableBuilder.withRow(r.getId().toString(), r.getKafkaTopic(), r.getQueryString()));
392+
r -> tableBuilder.withRow(
393+
r.getId(), String.join(",", r.getSinks()), r.getQueryString()));
392394
tableBuilder.withFooterLine("For detailed information on a Query run: EXPLAIN <Query ID>;");
393395
} else if (ksqlEntity instanceof SourceDescriptionEntity) {
394396
SourceDescriptionEntity sourceDescriptionEntity = (SourceDescriptionEntity) ksqlEntity;
@@ -502,7 +504,7 @@ private void printTopicInfo(SourceDescription source) {
502504
writer().println(String.format("%-20s : %s", "Key field", source.getKey()));
503505
writer().println(String.format("%-20s : %s", "Key format", "STRING"));
504506
writer().println(String.format("%-20s : %s", "Timestamp field", timestamp));
505-
writer().println(String.format("%-20s : %s", "Value format", source.getSerdes()));
507+
writer().println(String.format("%-20s : %s", "Value format", source.getFormat()));
506508

507509
if (!source.getTopic().isEmpty()) {
508510
writer().println(String.format(
@@ -522,8 +524,8 @@ private void printWriteQueries(SourceDescription source) {
522524
"Queries that write into this " + source.getType(),
523525
"-----------------------------------"
524526
));
525-
for (String writeQuery : source.getWriteQueries()) {
526-
writer().println(writeQuery);
527+
for (RunningQuery writeQuery : source.getWriteQueries()) {
528+
writer().println(writeQuery.getId() + " : " + writeQuery.getQueryString());
527529
}
528530
writer().println("\nFor query topology and execution plan please run: EXPLAIN <QueryId>");
529531
}
@@ -633,7 +635,7 @@ private void printQuerySinks(QueryDescription query) {
633635
}
634636

635637
private void printQueryDescription(QueryDescription query) {
636-
writer().println(String.format("%-20s : %s", "ID", query.getId().getId()));
638+
writer().println(String.format("%-20s : %s", "ID", query.getId()));
637639
if (query.getStatementText().length() > 0) {
638640
writer().println(String.format("%-20s : %s", "SQL", query.getStatementText()));
639641
}

ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.google.common.collect.ImmutableList;
2020

21+
import io.confluent.ksql.rest.entity.RunningQuery;
2122
import org.apache.kafka.connect.data.Field;
2223
import org.apache.kafka.connect.data.SchemaBuilder;
2324
import org.junit.After;
@@ -36,7 +37,6 @@
3637
import io.confluent.ksql.FakeException;
3738
import io.confluent.ksql.GenericRow;
3839
import io.confluent.ksql.TestTerminal;
39-
import io.confluent.ksql.query.QueryId;
4040
import io.confluent.ksql.rest.client.KsqlRestClient;
4141
import io.confluent.ksql.rest.entity.CommandStatusEntity;
4242
import io.confluent.ksql.rest.entity.ExecutionPlan;
@@ -102,8 +102,8 @@ public void testPrintKSqlEntityList() throws IOException {
102102
properties.put("k2", "v2");
103103
properties.put("k3", true);
104104

105-
List<Queries.RunningQuery> queries = new ArrayList<>();
106-
queries.add(new Queries.RunningQuery("select * from t1", "TestTopic", new QueryId("0")));
105+
List<RunningQuery> queries = new ArrayList<>();
106+
queries.add(new RunningQuery("select * from t1", Collections.singleton("Test"), "0"));
107107

108108
for (int i = 0; i < 5; i++) {
109109
KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of(

ksql-common/src/main/java/io/confluent/ksql/query/QueryId.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,9 @@
1717

1818
import com.fasterxml.jackson.annotation.JsonCreator;
1919
import com.fasterxml.jackson.annotation.JsonProperty;
20-
import com.fasterxml.jackson.annotation.JsonSubTypes;
2120

2221
import java.util.Objects;
2322

24-
@JsonSubTypes({})
2523
public class QueryId {
2624
private final String id;
2725

ksql-engine/src/main/java/io/confluent/ksql/KsqlContext.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,6 @@ public void sql(String sql) throws Exception {
9090
if (queryMetadata instanceof PersistentQueryMetadata) {
9191
PersistentQueryMetadata persistentQueryMetadata = (PersistentQueryMetadata) queryMetadata;
9292
persistentQueryMetadata.getKafkaStreams().start();
93-
ksqlEngine.getPersistentQueries()
94-
.put(persistentQueryMetadata.getQueryId(), persistentQueryMetadata);
9593
} else {
9694
System.err.println("Ignoring statemenst: " + sql);
9795
System.err.println("Only CREATE statements can run in KSQL embedded mode.");

ksql-engine/src/main/java/io/confluent/ksql/KsqlEngine.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import java.io.Closeable;
2828
import java.util.ArrayList;
29+
import java.util.Collection;
2930
import java.util.Collections;
3031
import java.util.HashMap;
3132
import java.util.HashSet;
@@ -478,8 +479,14 @@ public boolean terminateQuery(final QueryId queryId, final boolean closeStreams)
478479
return true;
479480
}
480481

481-
public Map<QueryId, PersistentQueryMetadata> getPersistentQueries() {
482-
return new HashMap<>(persistentQueries);
482+
public PersistentQueryMetadata getPersistentQuery(QueryId queryId) {
483+
return persistentQueries.get(queryId);
484+
}
485+
486+
public Collection<PersistentQueryMetadata> getPersistentQueries() {
487+
return Collections.unmodifiableList(
488+
new ArrayList<>(
489+
persistentQueries.values()));
483490
}
484491

485492
public static List<String> getImmutableProperties() {

ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ public void shouldRunSimpleStatements() throws Exception {
5656
(Collections.emptyList());
5757
expect(ksqlEngine.buildMultipleQueries(statement2, Collections.emptyMap()))
5858
.andReturn(getQueryMetadata(new QueryId("CSAS_BIGORDERS"), DataSource.DataSourceType.KSTREAM));
59-
expect(ksqlEngine.getPersistentQueries()).andReturn(new HashMap<>());
6059
replay(ksqlEngine);
6160

6261
KsqlContext ksqlContext = new KsqlContext(ksqlEngine);

ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/CommandStatusEntity.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,12 @@
1717
package io.confluent.ksql.rest.entity;
1818

1919
import com.fasterxml.jackson.annotation.JsonCreator;
20-
import com.fasterxml.jackson.annotation.JsonSubTypes;
2120
import com.fasterxml.jackson.annotation.JsonTypeInfo;
22-
import com.fasterxml.jackson.annotation.JsonTypeName;
2321
import io.confluent.ksql.rest.server.computation.CommandId;
2422

2523
import java.util.Map;
2624
import java.util.Objects;
2725

28-
@JsonTypeName("currentStatus")
29-
@JsonSubTypes({})
3026
public class CommandStatusEntity extends KsqlEntity {
3127
private final CommandId commandId;
3228
private final CommandStatus commandStatus;

ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/ExecutionPlan.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,9 @@
1818

1919
import com.fasterxml.jackson.annotation.JsonCreator;
2020
import com.fasterxml.jackson.annotation.JsonProperty;
21-
import com.fasterxml.jackson.annotation.JsonSubTypes;
22-
import com.fasterxml.jackson.annotation.JsonTypeName;
2321

2422
import java.util.Objects;
2523

26-
@JsonTypeName("executionPlan")
27-
@JsonSubTypes({})
2824
public class ExecutionPlan extends KsqlEntity {
2925

3026
private final String executionPlan;

ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KafkaTopicsList.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020

2121
import com.fasterxml.jackson.annotation.JsonCreator;
2222
import com.fasterxml.jackson.annotation.JsonProperty;
23-
import com.fasterxml.jackson.annotation.JsonSubTypes;
24-
import com.fasterxml.jackson.annotation.JsonTypeName;
2523

2624
import org.apache.kafka.clients.admin.TopicDescription;
2725
import org.apache.kafka.common.TopicPartition;
@@ -45,8 +43,6 @@
4543
import io.confluent.ksql.util.KsqlConfig;
4644
import io.confluent.ksql.util.KsqlConstants;
4745

48-
@JsonTypeName("kafka_topics")
49-
@JsonSubTypes({})
5046
public class KafkaTopicsList extends KsqlEntity {
5147

5248
private final Collection<KafkaTopicInfo> topics;

ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@
2727
@JsonSubTypes.Type(value = CommandStatusEntity.class, name = "currentStatus"),
2828
@JsonSubTypes.Type(value = PropertiesList.class, name = "properties"),
2929
@JsonSubTypes.Type(value = Queries.class, name = "queries"),
30-
@JsonSubTypes.Type(value = SourceDescriptionEntity.class, name = "description"),
31-
@JsonSubTypes.Type(value = QueryDescriptionEntity.class, name = "query_description"),
32-
@JsonSubTypes.Type(value = TopicDescription.class, name = "topic_description"),
30+
@JsonSubTypes.Type(value = SourceDescriptionEntity.class, name = "sourceDescription"),
31+
@JsonSubTypes.Type(value = QueryDescriptionEntity.class, name = "queryDescription"),
32+
@JsonSubTypes.Type(value = TopicDescription.class, name = "topicDescription"),
3333
@JsonSubTypes.Type(value = StreamsList.class, name = "streams"),
3434
@JsonSubTypes.Type(value = TablesList.class, name = "tables"),
3535
@JsonSubTypes.Type(value = KsqlTopicsList.class, name = "ksql_topics"),

0 commit comments

Comments
 (0)