Skip to content

Commit b182811

Browse files
authored
[BitSail][Connector] migrate kafka sink to v1 interface (#395)
1 parent 878f724 commit b182811

File tree

13 files changed

+946
-0
lines changed

13 files changed

+946
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.bytedance.bitsail.component.format.json;
18+
19+
import java.io.Serializable;
20+
21+
/**
22+
* Convert BitSail Row data structure into Json.
23+
*/
24+
public class RowToJsonConverter implements Serializable {
25+
private static final long serialVersionUID = 1L;
26+
27+
28+
29+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
4+
~
5+
~ Licensed under the Apache License, Version 2.0 (the "License");
6+
~ you may not use this file except in compliance with the License.
7+
~ You may obtain a copy of the License at
8+
~
9+
~ http://www.apache.org/licenses/LICENSE-2.0
10+
~
11+
~ Unless required by applicable law or agreed to in writing, software
12+
~ distributed under the License is distributed on an "AS IS" BASIS,
13+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
~ See the License for the specific language governing permissions and
15+
~ limitations under the License.
16+
-->
17+
18+
<project xmlns="http://maven.apache.org/POM/4.0.0"
19+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
20+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
21+
<modelVersion>4.0.0</modelVersion>
22+
<parent>
23+
<groupId>com.bytedance.bitsail</groupId>
24+
<artifactId>bitsail-connectors</artifactId>
25+
<version>${revision}</version>
26+
</parent>
27+
28+
<artifactId>connector-kafka</artifactId>
29+
30+
<properties>
31+
<maven.compiler.source>8</maven.compiler.source>
32+
<maven.compiler.target>8</maven.compiler.target>
33+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
34+
<kafka.clients.version>1.0.1</kafka.clients.version>
35+
</properties>
36+
37+
<dependencies>
38+
<!-- Kafka -->
39+
<dependency>
40+
<groupId>org.apache.kafka</groupId>
41+
<artifactId>kafka-clients</artifactId>
42+
<version>${kafka.clients.version}</version>
43+
</dependency>
44+
45+
<!-- test -->
46+
<dependency>
47+
<groupId>com.bytedance.bitsail</groupId>
48+
<artifactId>bitsail-connector-test</artifactId>
49+
<version>${revision}</version>
50+
<scope>test</scope>
51+
</dependency>
52+
53+
<dependency>
54+
<groupId>com.bytedance.bitsail</groupId>
55+
<artifactId>bitsail-connector-fake</artifactId>
56+
<version>${revision}</version>
57+
<scope>test</scope>
58+
</dependency>
59+
60+
<dependency>
61+
<groupId>com.bytedance.bitsail</groupId>
62+
<artifactId>bitsail-connector-print</artifactId>
63+
<version>${revision}</version>
64+
<scope>test</scope>
65+
</dependency>
66+
</dependencies>
67+
68+
</project>
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.bytedance.bitsail.connector.kafka.common;
18+
19+
import com.bytedance.bitsail.common.exception.ErrorCode;
20+
21+
public enum KafkaErrorCode implements ErrorCode {
22+
23+
REQUIRED_VALUE("Kafka-01", "You missed parameter which is required, please check your configuration."),
24+
UNSUPPORTED_ENCODING("Kafka-02", "Unsupported Encoding."),
25+
UNSUPPORTED_COLUMN_TYPE("Kafka-03", "Unsupported column type.");
26+
27+
private final String code;
28+
private final String description;
29+
30+
KafkaErrorCode(String code, String description) {
31+
this.code = code;
32+
this.description = description;
33+
}
34+
35+
@Override
36+
public String getCode() {
37+
return code;
38+
}
39+
40+
@Override
41+
public String getDescription() {
42+
return description;
43+
}
44+
45+
@Override
46+
public String toString() {
47+
return String.format("Code:[%s], Description:[%s].", this.code,
48+
this.description);
49+
}
50+
}
51+
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.bytedance.bitsail.connector.kafka.constants;
18+
19+
public class KafkaConstants {
20+
public static final String CONNECTOR_TYPE_VALUE_KAFKA = "kafka";
21+
public static final String CONNECTOR_TYPE_VALUE_KAFKA010X = "kafka010x";
22+
public static final String CONNECTOR_TYPE_VALUE_KAFKA220 = "kafka220";
23+
public static final String CONNECTOR_NAME_VALUE_KAFKA_SOURCE = "kafka_source";
24+
public static final String CONNECTOR_NAME_VALUE_KAFKA_SINK = "kafka-sink";
25+
public static final String CONNECTOR_VERSION_VALUE_08 = "0.8";
26+
public static final String CONNECTOR_VERSION_VALUE_09 = "0.9";
27+
public static final String CONNECTOR_VERSION_VALUE_010 = "0.10";
28+
public static final String CONNECTOR_VERSION_VALUE_011 = "0.11";
29+
public static final String CONNECTOR_VERSION_VALUE_UNIVERSAL = "universal";
30+
public static final String CONNECTOR_TOPIC = "connector.topic";
31+
public static final String CONNECTOR_CLUSTER = "connector.cluster";
32+
public static final String CONNECTOR_SERVERS = "connector.bootstrap.servers";
33+
public static final String CONNECTOR_TEAM = "connector.team";
34+
public static final String CONNECTOR_PSM = "connector.psm";
35+
public static final String CONNECTOR_OWNER = "connector.owner";
36+
public static final String CONNECTOR_GROUP_ID = "connector.group.id";
37+
public static final String CONNECTOR_SOURCE_INDEX = "connector.source.index";
38+
public static final String CONNECTOR_LOCAL_MODE = "connector.local-mode";
39+
public static final String CONNECTOR_STARTUP_MODE = "connector.startup-mode";
40+
public static final String CONNECTOR_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset";
41+
public static final String CONNECTOR_STARTUP_MODE_VALUE_LATEST = "latest-offset";
42+
public static final String CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS = "group-offsets";
43+
public static final String CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = "specific-offsets";
44+
public static final String CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_TIMESTAMP = "specific-timestamp";
45+
public static final String CONNECTOR_SPECIFIC_OFFSETS = "connector.specific-offsets";
46+
public static final String CONNECTOR_SPECIFIC_OFFSETS_PARTITION = "partition";
47+
public static final String CONNECTOR_SPECIFIC_OFFSETS_OFFSET = "offset";
48+
public static final String CONNECTOR_SPECIFIC_TIMESTAMP = "connector.specific-timestamp";
49+
public static final String CONNECTOR_RELATIVE_OFFSET = "connector.relative-offset";
50+
public static final String CONNECTOR_RESET_TO_EARLIEST_FOR_NEW_PARTITION =
51+
"connector.reset-to-earliest-for-new-partition";
52+
public static final String CONNECTOR_KAFKA_PROPERTIES = "connector.kafka.properties";
53+
public static final String CONNECTOR_KAFKA_PROPERTIES_PARTITIONER_CLASS =
54+
"connector.kafka.properties.partitioner.class";
55+
public static final String CONNECTOR_PROPERTIES = "connector.properties";
56+
public static final String CONNECTOR_PROPERTIES_KEY = "key";
57+
public static final String CONNECTOR_PROPERTIES_VALUE = "value";
58+
public static final String CONNECTOR_SECURITY_PROTOCOL = "connector.security-protocol";
59+
public static final String CONNECTOR_SASL_MECHANISM = "connector.sasl_mechanism";
60+
public static final String CONNECTOR_SASL_PLAIN_USERNAME = "connector.sasl-plain-username";
61+
public static final String CONNECTOR_SASL_PLAIN_PASSWORD = "connector.sasl-plain-password";
62+
public static final String CONNECTOR_SSL_TRUSTSTORE_LOCATION = "connector.ssl_truststore_location";
63+
public static final String CONNECTOR_SSL_TRUSTSTORE_PASSWORD = "connector.ssl_truststore_password";
64+
public static final String CONNECTOR_SSL_IDENTIFICATION_ALGORITHM = "connector.ssl_endpoint_identification_algorithm";
65+
public static final String CONNECTOR_SINK_SEMANTIC = "connector.sink-semantic";
66+
public static final String CONNECTOR_SINK_PARTITIONER = "connector.sink-partitioner";
67+
public static final String CONNECTOR_SINK_PARTITIONER_VALUE_FIXED = "fixed";
68+
public static final String CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN = "round-robin";
69+
public static final String CONNECTOR_SINK_PARTITIONER_VALUE_ROW_FIELDS_HASH = "row-fields-hash";
70+
public static final String CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM = "custom";
71+
public static final String CONNECTOR_SINK_PARTITIONER_CLASS = "connector.sink-partitioner-class";
72+
public static final String CONNECTOR_SINK_IGNORE_TRANSACTION_TIMEOUT =
73+
"connector.sink-ignore-transaction-timeout-error";
74+
75+
// Rate limiting configurations
76+
public static final String CONNECTOR_RATE_LIMITING_NUM = "connector.rate-limiting-num";
77+
public static final String CONNECTOR_RATE_LIMITING_UNIT = "connector.rate-limiting-unit";
78+
79+
// Partition range to consume
80+
public static final String CONNECTOR_SOURCE_PARTITION_RANGE = "connector.source-partition-range";
81+
82+
// Source sampling
83+
public static final String CONNECTOR_SOURCE_SAMPLE_INTERVAL = "connector.source-sample-interval";
84+
public static final String CONNECTOR_SOURCE_SAMPLE_NUM = "connector.source-sample-num";
85+
86+
// Disable currentOffsetsRate metrics
87+
public static final String DISABLE_CURRENT_OFFSET_RATE_METRICS = "disableCurrentOffsetsRateMetrics";
88+
public static final int MAX_PARALLELISM = 5;
89+
public static final int REQUEST_TIMEOUT_MS_CONFIG = 1200 * 1000;
90+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.bytedance.bitsail.connector.kafka.model;
18+
19+
import lombok.AllArgsConstructor;
20+
import lombok.Builder;
21+
import lombok.Getter;
22+
import lombok.NoArgsConstructor;
23+
import lombok.NonNull;
24+
import lombok.Setter;
25+
import lombok.ToString;
26+
27+
import java.util.Map;
28+
29+
@Getter
30+
@Setter
31+
@AllArgsConstructor
32+
@Builder
33+
@NoArgsConstructor
34+
@ToString(of = {"key", "value", "partitionId"})
35+
public class KafkaRecord {
36+
private String key;
37+
@NonNull
38+
private String value;
39+
40+
private Integer partitionId;
41+
42+
private Long timestamp;
43+
44+
private Map<String, String> headers;
45+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.bytedance.bitsail.connector.kafka.option;
18+
19+
import com.bytedance.bitsail.common.annotation.Essential;
20+
import com.bytedance.bitsail.common.option.ConfigOption;
21+
import com.bytedance.bitsail.common.option.WriterOptions;
22+
23+
import static com.bytedance.bitsail.common.option.ConfigOptions.key;
24+
import static com.bytedance.bitsail.common.option.WriterOptions.WRITER_PREFIX;
25+
26+
public interface KafkaWriterOptions extends WriterOptions.BaseWriterOptions {
27+
ConfigOption<String> KAFKA_SERVERS =
28+
key(WRITER_PREFIX + "kafka_servers")
29+
.noDefaultValue(String.class);
30+
31+
@Essential
32+
ConfigOption<String> TOPIC_NAME =
33+
key(WRITER_PREFIX + "topic_name")
34+
.noDefaultValue(String.class);
35+
36+
ConfigOption<String> PARTITION_FIELD =
37+
key(WRITER_PREFIX + "partition_field")
38+
.noDefaultValue(String.class);
39+
40+
ConfigOption<Boolean> LOG_FAILURES_ONLY =
41+
key(WRITER_PREFIX + "log_failures_only")
42+
.defaultValue(false);
43+
44+
ConfigOption<Integer> RETRIES =
45+
key(WRITER_PREFIX + "retries")
46+
.defaultValue(10);
47+
48+
/**
49+
* retry.backoff.ms
50+
*/
51+
ConfigOption<Long> RETRY_BACKOFF_MS =
52+
key(WRITER_PREFIX + "retry_backoff_ms")
53+
.defaultValue(1000L);
54+
55+
/**
56+
* linger.ms
57+
*/
58+
ConfigOption<Long> LINGER_MS =
59+
key(WRITER_PREFIX + "linger_ms")
60+
.defaultValue(5000L);
61+
}

0 commit comments

Comments
 (0)