Skip to content

Commit 7286705

Browse files
author
qiuyu.yu
committed
2024/11/30: calcite-kafka: search by sql
1 parent bfb5c1e commit 7286705

File tree

3 files changed

+93
-4
lines changed

3 files changed

+93
-4
lines changed

kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaRowConverterImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ public class KafkaRowConverterImpl implements KafkaRowConverter<byte[], byte[]>
4343
fieldInfo.add("MSG_PARTITION", typeFactory.createSqlType(SqlTypeName.INTEGER)).nullable(false);
4444
fieldInfo.add("MSG_TIMESTAMP", typeFactory.createSqlType(SqlTypeName.BIGINT)).nullable(false);
4545
fieldInfo.add("MSG_OFFSET", typeFactory.createSqlType(SqlTypeName.BIGINT)).nullable(false);
46-
fieldInfo.add("MSG_KEY_BYTES", typeFactory.createSqlType(SqlTypeName.VARBINARY)).nullable(true);
47-
fieldInfo.add("MSG_VALUE_BYTES", typeFactory.createSqlType(SqlTypeName.VARBINARY))
46+
fieldInfo.add("MSG_KEY_BYTES", typeFactory.createSqlType(SqlTypeName.STRING_TYPES.get(0))).nullable(true);
47+
fieldInfo.add("MSG_VALUE_BYTES", typeFactory.createSqlType(SqlTypeName.STRING_TYPES.get(0)))
4848
.nullable(false);
4949

5050
return fieldInfo.build();

kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaStreamTable.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
package org.apache.calcite.adapter.kafka;
1818

19+
import com.fasterxml.jackson.databind.deser.std.StringDeserializer;
20+
1921
import org.apache.calcite.DataContext;
2022
import org.apache.calcite.config.CalciteConnectionConfig;
2123
import org.apache.calcite.linq4j.AbstractEnumerable;
@@ -75,9 +77,9 @@ public class KafkaStreamTable implements ScannableTable, StreamableTable {
7577
tableOptions.getBootstrapServers());
7678
// by default it's <byte[], byte[]>
7779
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
78-
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
80+
"org.apache.kafka.common.serialization.StringSerializer");
7981
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
80-
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
82+
"org.apache.kafka.common.serialization.StringSerializer");
8183

8284
if (tableOptions.getConsumerParams() != null) {
8385
consumerConfig.putAll(tableOptions.getConsumerParams());
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to you under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.calcite.adapter.kafka;
19+
20+
import java.sql.*;
21+
import java.util.Properties;
22+
23+
public class KafkaSelfTest {
24+
public static void main(String[] args) throws ClassNotFoundException, SQLException {
25+
// KafkaRowConverter<byte[], byte[]> rowConverter = new KafkaRowConverterImpl();
26+
// String bootstrapServers = "kafka9001.eniot.io:9092";
27+
// String topicNAme = "topic-fishyu";
28+
// Map<String, String> consumerParams = new HashMap<>();
29+
// consumerParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
30+
// StringDeserializer.class.getName());
31+
// consumerParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
32+
// StringDeserializer.class.getName());
33+
// consumerParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
34+
// consumerParams.put(ConsumerConfig.GROUP_ID_CONFIG, "test-calcite-kafka");
35+
// consumerParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
36+
//
37+
// KafkaTableOptions kafkaTableOptions = new KafkaTableOptions();
38+
// kafkaTableOptions.setBootstrapServers(bootstrapServers);
39+
// kafkaTableOptions.setConsumerParams(consumerParams);
40+
// kafkaTableOptions.setRowConverter(rowConverter);
41+
// kafkaTableOptions.setTopicName(topicNAme);
42+
//
43+
// KafkaStreamTable kafkaStreamTable = new KafkaStreamTable(kafkaTableOptions);
44+
45+
Properties info = new Properties();
46+
info.put("model",
47+
"inline:"
48+
+ "{\n" +
49+
" \"version\": \"1.0\",\n" +
50+
" \"defaultSchema\": \"KAFKA\",\n" +
51+
" \"schemas\": [\n" +
52+
" {\n" +
53+
" \"name\": \"KAFKA\",\n" +
54+
" \"tables\": [\n" +
55+
" {\n" +
56+
" \"name\": \"ZK_ERRORLOG\",\n" +
57+
" \"type\": \"custom\",\n" +
58+
" \"factory\": \"org.apache.calcite.adapter.kafka.KafkaTableFactory\",\n" +
59+
" \"operand\": {\n" +
60+
"\"bootstrap.servers\":\"kafka9001.eniot.io:9092\",\n" +
61+
"\"topic.name\":\"zk_errorlog\",\n" +
62+
" \"consumer.params\": {\n" +
63+
" \"key.deserializer\": \"org.apache.kafka.common.serialization" +
64+
".StringDeserializer\",\n" +
65+
" \"value.deserializer\": \"org.apache.kafka.common.serialization" +
66+
".StringDeserializer\",\n" +
67+
" \"group.id\":\"testcalcite\""+
68+
" } \n"+
69+
" }\n" +
70+
" }\n" +
71+
" ]\n" +
72+
" }\n" +
73+
" ]\n" +
74+
"}");
75+
76+
Connection connection =
77+
DriverManager.getConnection("jdbc:calcite:", info);
78+
Statement statement = connection.createStatement();
79+
ResultSet resultSet = statement.executeQuery("select STREAM * from KAFKA.zk_errorlog");
80+
81+
while(resultSet.next()){
82+
System.out.println(resultSet.getString("MSG_KEY_BYTES"));
83+
System.out.println(resultSet.getString("MSG_VALUE_BYTES"));
84+
}
85+
86+
}
87+
}

0 commit comments

Comments
 (0)