Skip to content

Commit 37578e1

Browse files
authored
[Fix][Kafka-Sink] fix kafka sink factory option rule (#6657)
1 parent ffae2f7 commit 37578e1

File tree

1 file changed

+2
-13
lines changed
  • seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink

1 file changed

+2
-13
lines changed

Diff for: seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java

+2-13
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,9 @@
2323
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
2424
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
2525
import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
26-
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
2726

2827
import com.google.auto.service.AutoService;
2928

30-
import java.util.Arrays;
31-
3229
@AutoService(Factory.class)
3330
public class KafkaSinkFactory implements TableSinkFactory {
3431
@Override
@@ -39,17 +36,9 @@ public String factoryIdentifier() {
3936
@Override
4037
public OptionRule optionRule() {
4138
return OptionRule.builder()
42-
.required(Config.FORMAT, Config.BOOTSTRAP_SERVERS)
43-
.conditional(
44-
Config.FORMAT,
45-
Arrays.asList(
46-
MessageFormat.JSON,
47-
MessageFormat.CANAL_JSON,
48-
MessageFormat.TEXT,
49-
MessageFormat.OGG_JSON,
50-
MessageFormat.AVRO),
51-
Config.TOPIC)
39+
.required(Config.TOPIC, Config.BOOTSTRAP_SERVERS)
5240
.optional(
41+
Config.FORMAT,
5342
Config.KAFKA_CONFIG,
5443
Config.ASSIGN_PARTITIONS,
5544
Config.TRANSACTION_PREFIX,

0 commit comments

Comments
 (0)