+ */
+ public static HttpHost validateAndParseHostsString(String host) {
+ try {
+ HttpHost httpHost = HttpHost.create(host);
+ if (httpHost.getPort() < 0) {
+ throw new ValidationException(
+ String.format(
+ "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing port.",
+ host, HOSTS_OPTION.key()));
+ }
+
+ if (httpHost.getSchemeName() == null) {
+ throw new ValidationException(
+ String.format(
+ "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing scheme.",
+ host, HOSTS_OPTION.key()));
+ }
+ return httpHost;
+ } catch (Exception e) {
+ throw new ValidationException(
+ String.format(
+ "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'.",
+ host, HOSTS_OPTION.key()),
+ e);
+ }
+ }
+}
diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8ConnectorOptions.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8ConnectorOptions.java
new file mode 100644
index 00000000..56c778a7
--- /dev/null
+++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8ConnectorOptions.java
@@ -0,0 +1,131 @@
+package org.apache.flink.connector.elasticsearch.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+
+import java.time.Duration;
+import java.util.List;
+
+/**
+ * Base options for the Elasticsearch connector. Needs to be public so that the {@link
+ * org.apache.flink.table.api.TableDescriptor} can access it.
+ */
+@PublicEvolving
+public class Elasticsearch8ConnectorOptions {
+
+ Elasticsearch8ConnectorOptions() {}
+
+ public static final ConfigOption> HOSTS_OPTION =
+ ConfigOptions.key("hosts")
+ .stringType()
+ .asList()
+ .noDefaultValue()
+ .withDescription("Elasticsearch hosts to connect to.");
+
+ public static final ConfigOption INDEX_OPTION =
+ ConfigOptions.key("index")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Elasticsearch index for every record.");
+
+ public static final ConfigOption PASSWORD_OPTION =
+ ConfigOptions.key("password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Password used to connect to Elasticsearch instance.");
+
+ public static final ConfigOption USERNAME_OPTION =
+ ConfigOptions.key("username")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Username used to connect to Elasticsearch instance.");
+
+ public static final ConfigOption KEY_DELIMITER_OPTION =
+ ConfigOptions.key("document-id.key-delimiter")
+ .stringType()
+ .defaultValue("_")
+ .withDescription(
+ "Delimiter for composite keys e.g., \"$\" would result in IDs \"KEY1$KEY2$KEY3\".");
+
+ public static final ConfigOption BULK_FLUSH_MAX_ACTIONS_OPTION =
+ ConfigOptions.key("sink.bulk-flush.max-actions")
+ .intType()
+ .defaultValue(1000)
+ .withDescription("Maximum number of actions for each bulk request.");
+
+ public static final ConfigOption BULK_FLUSH_MAX_BUFFERED_ACTIONS_OPTION =
+ ConfigOptions.key("sink.bulk-flush.max-buffered-actions")
+ .intType()
+ .defaultValue(10000)
+ .withDescription("Maximum buffer length for actions");
+
+ public static final ConfigOption BULK_FLUSH_MAX_IN_FLIGHT_ACTIONS_OPTION =
+ ConfigOptions.key("sink.bulk-flush.max-in-flight-actions")
+ .intType()
+ .defaultValue(50)
+ .withDescription(
+ "Threshold for uncompleted actions before blocking new write actions.");
+
+ public static final ConfigOption BULK_FLUSH_MAX_SIZE_OPTION =
+ ConfigOptions.key("sink.bulk-flush.max-size")
+ .memoryType()
+ .defaultValue(MemorySize.parse("2mb"))
+ .withDescription("Maximum size of buffered actions per bulk request");
+
+ public static final ConfigOption BULK_FLUSH_INTERVAL_OPTION =
+ ConfigOptions.key("sink.bulk-flush.interval")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(1))
+ .withDescription("Bulk flush interval");
+
+ public static final ConfigOption CONNECTION_PATH_PREFIX_OPTION =
+ ConfigOptions.key("connection.path-prefix")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Prefix string to be added to every REST communication.");
+
+ public static final ConfigOption CONNECTION_REQUEST_TIMEOUT =
+ ConfigOptions.key("connection.request-timeout")
+ .durationType()
+ .noDefaultValue()
+ .withDescription(
+ "The timeout for requesting a connection from the connection manager.");
+
+ public static final ConfigOption CONNECTION_TIMEOUT =
+ ConfigOptions.key("connection.timeout")
+ .durationType()
+ .noDefaultValue()
+ .withDescription("The timeout for establishing a connection.");
+
+ public static final ConfigOption SOCKET_TIMEOUT =
+ ConfigOptions.key("socket.timeout")
+ .durationType()
+ .noDefaultValue()
+ .withDescription(
+ "The socket timeout (SO_TIMEOUT) for waiting for data or, put differently,"
+ + "a maximum period inactivity between two consecutive data packets.");
+
+ public static final ConfigOption SSL_CERTIFICATE_FINGERPRINT =
+ ConfigOptions.key("ssl.certificate-fingerprint")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The HTTP CA certificate SHA-256 fingerprint used to verify the HTTPS connection.");
+
+ public static final ConfigOption FORMAT_OPTION =
+ ConfigOptions.key("format")
+ .stringType()
+ .defaultValue("json")
+ .withDescription(
+ "The format must produce a valid JSON document. "
+ + "Please refer to the documentation on formats for more details.");
+
+ public static final ConfigOption DELIVERY_GUARANTEE_OPTION =
+ ConfigOptions.key("sink.delivery-guarantee")
+ .enumType(DeliveryGuarantee.class)
+ .defaultValue(DeliveryGuarantee.AT_LEAST_ONCE)
+ .withDescription("Optional delivery guarantee when committing.");
+}
diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchValidationUtils.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchValidationUtils.java
new file mode 100644
index 00000000..b431fec8
--- /dev/null
+++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchValidationUtils.java
@@ -0,0 +1,74 @@
+package org.apache.flink.connector.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Utility methods for validating Elasticsearch properties. */
+@Internal
+class ElasticsearchValidationUtils {
+ private static final Set ALLOWED_PRIMARY_KEY_TYPES = new LinkedHashSet<>();
+
+ static {
+ ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.CHAR);
+ ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.VARCHAR);
+ ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.BOOLEAN);
+ ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.DECIMAL);
+ ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.TINYINT);
+ ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.SMALLINT);
+ ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.INTEGER);
+ ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.BIGINT);
+ ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.FLOAT);
+ ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.DOUBLE);
+ ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.DATE);
+ ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE);
+ ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+ ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE);
+ ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+ ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.INTERVAL_YEAR_MONTH);
+ ALLOWED_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.INTERVAL_DAY_TIME);
+ }
+
+ /**
+ * Checks that the table does not have a primary key defined on illegal types. In Elasticsearch
+ * the primary key is used to calculate the Elasticsearch document id, which is a string of up
+ * to 512 bytes. It cannot have whitespaces. As of now it is calculated by concatenating the
+ * fields. Certain types do not have a good string representation to be used in this scenario.
+ * The illegal types are mostly {@link LogicalTypeFamily#COLLECTION} types and {@link
+ * LogicalTypeRoot#RAW} type.
+ */
+ public static void validatePrimaryKey(DataType primaryKeyDataType) {
+ List fieldDataTypes = DataType.getFieldDataTypes(primaryKeyDataType);
+ List illegalTypes =
+ fieldDataTypes.stream()
+ .map(DataType::getLogicalType)
+ .map(
+ logicalType -> {
+ if (logicalType.is(LogicalTypeRoot.DISTINCT_TYPE)) {
+ return ((DistinctType) logicalType)
+ .getSourceType()
+ .getTypeRoot();
+ } else {
+ return logicalType.getTypeRoot();
+ }
+ })
+ .filter(t -> !ALLOWED_PRIMARY_KEY_TYPES.contains(t))
+ .collect(Collectors.toList());
+ if (!illegalTypes.isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "The table has a primary key on columns of illegal types: %s.",
+ illegalTypes));
+ }
+ }
+
+ private ElasticsearchValidationUtils() {}
+}
diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGenerator.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGenerator.java
new file mode 100644
index 00000000..12718f55
--- /dev/null
+++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGenerator.java
@@ -0,0 +1,21 @@
+package org.apache.flink.connector.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.Row;
+
+import java.io.Serializable;
+
+/** This interface is responsible to generate index name from given {@link Row} record. */
+@Internal
+interface IndexGenerator extends Serializable {
+
+ /**
+ * Initialize the index generator, this will be called only once before {@link
+ * #generate(RowData)} is called.
+ */
+ default void open() {}
+
+ /** Generate index name according to the given row. */
+ String generate(RowData row);
+}
diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorBase.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorBase.java
new file mode 100644
index 00000000..9af36ee6
--- /dev/null
+++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorBase.java
@@ -0,0 +1,34 @@
+package org.apache.flink.connector.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.Objects;
+
+/** Base class for {@link IndexGenerator}. */
+@Internal
+public abstract class IndexGeneratorBase implements IndexGenerator {
+
+ private static final long serialVersionUID = 1L;
+ protected final String index;
+
+ public IndexGeneratorBase(String index) {
+ this.index = index;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof IndexGeneratorBase)) {
+ return false;
+ }
+ IndexGeneratorBase that = (IndexGeneratorBase) o;
+ return index.equals(that.index);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(index);
+ }
+}
diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java
new file mode 100644
index 00000000..2bf651ee
--- /dev/null
+++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java
@@ -0,0 +1,298 @@
+package org.apache.flink.connector.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Factory of {@link IndexGenerator}.
+ *
+ *
Flink supports both static index and dynamic index.
+ *
+ *
If you want to have a static index, this option value should be a plain string, e.g.
+ * 'myusers', all the records will be consistently written into "myusers" index.
+ *
+ *
If you want to have a dynamic index, you can use '{field_name}' to reference a field value in
+ * the record to dynamically generate a target index. You can also use
+ * '{field_name|date_format_string}' to convert a field value of TIMESTAMP/DATE/TIME type into the
+ * format specified by date_format_string. The date_format_string is compatible with {@link
+ * java.text.SimpleDateFormat}. For example, if the option value is 'myusers_{log_ts|yyyy-MM-dd}',
+ * then a record with log_ts field value 2020-03-27 12:25:55 will be written into
+ * "myusers_2020-03-27" index.
+ */
+@Internal
+final class IndexGeneratorFactory {
+
+ private IndexGeneratorFactory() {}
+
+ public static IndexGenerator createIndexGenerator(
+ String index,
+ List fieldNames,
+ List dataTypes,
+ ZoneId localTimeZoneId) {
+ final IndexHelper indexHelper = new IndexHelper();
+ if (indexHelper.checkIsDynamicIndex(index)) {
+ return createRuntimeIndexGenerator(
+ index,
+ fieldNames.toArray(new String[0]),
+ dataTypes.toArray(new DataType[0]),
+ indexHelper,
+ localTimeZoneId);
+ } else {
+ return new StaticIndexGenerator(index);
+ }
+ }
+
+ public static IndexGenerator createIndexGenerator(
+ String index, List fieldNames, List dataTypes) {
+ return createIndexGenerator(index, fieldNames, dataTypes, ZoneId.systemDefault());
+ }
+
+ interface DynamicFormatter extends Serializable {
+ String format(@Nonnull Object fieldValue, DateTimeFormatter formatter);
+ }
+
+ private static IndexGenerator createRuntimeIndexGenerator(
+ String index,
+ String[] fieldNames,
+ DataType[] fieldTypes,
+ IndexHelper indexHelper,
+ ZoneId localTimeZoneId) {
+ final String dynamicIndexPatternStr = indexHelper.extractDynamicIndexPatternStr(index);
+ final String indexPrefix = index.substring(0, index.indexOf(dynamicIndexPatternStr));
+ final String indexSuffix =
+ index.substring(indexPrefix.length() + dynamicIndexPatternStr.length());
+
+ if (indexHelper.checkIsDynamicIndexWithSystemTimeFormat(index)) {
+ final String dateTimeFormat =
+ indexHelper.extractDateFormat(
+ index, LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+ return new AbstractTimeIndexGenerator(index, dateTimeFormat) {
+ @Override
+ public String generate(RowData row) {
+ return indexPrefix
+ .concat(LocalDateTime.now(localTimeZoneId).format(dateTimeFormatter))
+ .concat(indexSuffix);
+ }
+ };
+ }
+
+ final boolean isDynamicIndexWithFormat = indexHelper.checkIsDynamicIndexWithFormat(index);
+ final int indexFieldPos =
+ indexHelper.extractIndexFieldPos(index, fieldNames, isDynamicIndexWithFormat);
+ final LogicalType indexFieldType = fieldTypes[indexFieldPos].getLogicalType();
+ final LogicalTypeRoot indexFieldLogicalTypeRoot = indexFieldType.getTypeRoot();
+
+ // validate index field type
+ indexHelper.validateIndexFieldType(indexFieldLogicalTypeRoot);
+
+ // time extract dynamic index pattern
+ final RowData.FieldGetter fieldGetter =
+ RowData.createFieldGetter(indexFieldType, indexFieldPos);
+
+ if (isDynamicIndexWithFormat) {
+ final String dateTimeFormat =
+ indexHelper.extractDateFormat(index, indexFieldLogicalTypeRoot);
+ DynamicFormatter formatFunction =
+ createFormatFunction(
+ indexFieldType, indexFieldLogicalTypeRoot, localTimeZoneId);
+
+ return new AbstractTimeIndexGenerator(index, dateTimeFormat) {
+ @Override
+ public String generate(RowData row) {
+ Object fieldOrNull = fieldGetter.getFieldOrNull(row);
+ final String formattedField;
+ if (fieldOrNull != null) {
+ formattedField = formatFunction.format(fieldOrNull, dateTimeFormatter);
+ } else {
+ formattedField = "null";
+ }
+ return indexPrefix.concat(formattedField).concat(indexSuffix);
+ }
+ };
+ }
+ // general dynamic index pattern
+ return new IndexGeneratorBase(index) {
+ @Override
+ public String generate(RowData row) {
+ Object indexField = fieldGetter.getFieldOrNull(row);
+ return indexPrefix
+ .concat(indexField == null ? "null" : indexField.toString())
+ .concat(indexSuffix);
+ }
+ };
+ }
+
+ private static DynamicFormatter createFormatFunction(
+ LogicalType indexFieldType,
+ LogicalTypeRoot indexFieldLogicalTypeRoot,
+ ZoneId localTimeZoneId) {
+ switch (indexFieldLogicalTypeRoot) {
+ case DATE:
+ return (value, dateTimeFormatter) -> {
+ Integer indexField = (Integer) value;
+ return LocalDate.ofEpochDay(indexField).format(dateTimeFormatter);
+ };
+ case TIME_WITHOUT_TIME_ZONE:
+ return (value, dateTimeFormatter) -> {
+ Integer indexField = (Integer) value;
+ return LocalTime.ofNanoOfDay(indexField * 1_000_000L).format(dateTimeFormatter);
+ };
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return (value, dateTimeFormatter) -> {
+ TimestampData indexField = (TimestampData) value;
+ return indexField.toLocalDateTime().format(dateTimeFormatter);
+ };
+ case TIMESTAMP_WITH_TIME_ZONE:
+ throw new UnsupportedOperationException(
+ "TIMESTAMP_WITH_TIME_ZONE is not supported yet");
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return (value, dateTimeFormatter) -> {
+ TimestampData indexField = (TimestampData) value;
+ return indexField.toInstant().atZone(localTimeZoneId).format(dateTimeFormatter);
+ };
+ default:
+ throw new TableException(
+ String.format(
+ "Unsupported type '%s' found in Elasticsearch dynamic index field, "
+ + "time-related pattern only support types are: DATE,TIME,TIMESTAMP.",
+ indexFieldType));
+ }
+ }
+
+ /**
+ * Helper class for {@link IndexGeneratorFactory}, this helper can use to validate index field
+ * type ans parse index format from pattern.
+ */
+ static class IndexHelper {
+ private static final Pattern dynamicIndexPattern = Pattern.compile("\\{[^\\{\\}]+\\}?");
+ private static final Pattern dynamicIndexTimeExtractPattern =
+ Pattern.compile(".*\\{.+\\|.*\\}.*");
+ private static final Pattern dynamicIndexSystemTimeExtractPattern =
+ Pattern.compile(
+ ".*\\{\\s*(now\\(\\s*\\)|NOW\\(\\s*\\)|current_timestamp|CURRENT_TIMESTAMP)\\s*\\|.*\\}.*");
+ private static final List supportedTypes = new ArrayList<>();
+ private static final Map defaultFormats = new HashMap<>();
+
+ static {
+ // time related types
+ supportedTypes.add(LogicalTypeRoot.DATE);
+ supportedTypes.add(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE);
+ supportedTypes.add(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+ supportedTypes.add(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE);
+ supportedTypes.add(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+ // general types
+ supportedTypes.add(LogicalTypeRoot.VARCHAR);
+ supportedTypes.add(LogicalTypeRoot.CHAR);
+ supportedTypes.add(LogicalTypeRoot.TINYINT);
+ supportedTypes.add(LogicalTypeRoot.INTEGER);
+ supportedTypes.add(LogicalTypeRoot.BIGINT);
+ }
+
+ static {
+ defaultFormats.put(LogicalTypeRoot.DATE, "yyyy_MM_dd");
+ defaultFormats.put(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, "HH_mm_ss");
+ defaultFormats.put(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, "yyyy_MM_dd_HH_mm_ss");
+ defaultFormats.put(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE, "yyyy_MM_dd_HH_mm_ss");
+ defaultFormats.put(
+ LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, "yyyy_MM_dd_HH_mm_ssX");
+ }
+
+ /** Validate the index field Type. */
+ void validateIndexFieldType(LogicalTypeRoot logicalType) {
+ if (!supportedTypes.contains(logicalType)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Unsupported type %s of index field, " + "Supported types are: %s",
+ logicalType, supportedTypes));
+ }
+ }
+
+ /** Get the default date format. */
+ String getDefaultFormat(LogicalTypeRoot logicalType) {
+ return defaultFormats.get(logicalType);
+ }
+
+ /** Check general dynamic index is enabled or not by index pattern. */
+ boolean checkIsDynamicIndex(String index) {
+ final Matcher matcher = dynamicIndexPattern.matcher(index);
+ int count = 0;
+ while (matcher.find()) {
+ count++;
+ }
+ if (count > 1) {
+ throw new TableException(
+ String.format(
+ "Chaining dynamic index pattern %s is not supported,"
+ + " only support single dynamic index pattern.",
+ index));
+ }
+ return count == 1;
+ }
+
+ /** Check time extract dynamic index is enabled or not by index pattern. */
+ boolean checkIsDynamicIndexWithFormat(String index) {
+ return dynamicIndexTimeExtractPattern.matcher(index).matches();
+ }
+
+ /** Check generate dynamic index is from system time or not. */
+ boolean checkIsDynamicIndexWithSystemTimeFormat(String index) {
+ return dynamicIndexSystemTimeExtractPattern.matcher(index).matches();
+ }
+
+ /** Extract dynamic index pattern string from index pattern string. */
+ String extractDynamicIndexPatternStr(String index) {
+ int start = index.indexOf("{");
+ int end = index.lastIndexOf("}");
+ return index.substring(start, end + 1);
+ }
+
+ /** Extract index field position in a fieldNames, return the field position. */
+ int extractIndexFieldPos(
+ String index, String[] fieldNames, boolean isDynamicIndexWithFormat) {
+ List fieldList = Arrays.asList(fieldNames);
+ String indexFieldName;
+ if (isDynamicIndexWithFormat) {
+ indexFieldName = index.substring(index.indexOf("{") + 1, index.indexOf("|"));
+ } else {
+ indexFieldName = index.substring(index.indexOf("{") + 1, index.indexOf("}"));
+ }
+ if (!fieldList.contains(indexFieldName)) {
+ throw new TableException(
+ String.format(
+ "Unknown field '%s' in index pattern '%s', please check the field name.",
+ indexFieldName, index));
+ }
+ return fieldList.indexOf(indexFieldName);
+ }
+
+ /** Extract dateTime format by the date format that extracted from index pattern string. */
+ private String extractDateFormat(String index, LogicalTypeRoot logicalType) {
+ String format = index.substring(index.indexOf("|") + 1, index.indexOf("}"));
+ if ("".equals(format)) {
+ format = getDefaultFormat(logicalType);
+ }
+ return format;
+ }
+ }
+}
diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/KeyExtractor.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/KeyExtractor.java
new file mode 100644
index 00000000..2dda12af
--- /dev/null
+++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/KeyExtractor.java
@@ -0,0 +1,79 @@
+package org.apache.flink.connector.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.function.SerializableFunction;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.Period;
+import java.util.List;
+
+/** An extractor for a Elasticsearch key from a {@link RowData}. */
+@Internal
+class KeyExtractor implements SerializableFunction {
+ private final FieldFormatter[] fieldFormatters;
+ private final String keyDelimiter;
+
+ private interface FieldFormatter extends Serializable {
+ String format(RowData rowData);
+ }
+
+ private KeyExtractor(FieldFormatter[] fieldFormatters, String keyDelimiter) {
+ this.fieldFormatters = fieldFormatters;
+ this.keyDelimiter = keyDelimiter;
+ }
+
+ @Override
+ public String apply(RowData rowData) {
+ final StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < fieldFormatters.length; i++) {
+ if (i > 0) {
+ builder.append(keyDelimiter);
+ }
+ final String value = fieldFormatters[i].format(rowData);
+ builder.append(value);
+ }
+ return builder.toString();
+ }
+
+ public static SerializableFunction createKeyExtractor(
+ List primaryKeyTypesWithIndex, String keyDelimiter) {
+ if (!primaryKeyTypesWithIndex.isEmpty()) {
+ FieldFormatter[] formatters =
+ primaryKeyTypesWithIndex.stream()
+ .map(
+ logicalTypeWithIndex ->
+ toFormatter(
+ logicalTypeWithIndex.index,
+ logicalTypeWithIndex.logicalType))
+ .toArray(FieldFormatter[]::new);
+ return new KeyExtractor(formatters, keyDelimiter);
+ } else {
+ return (row) -> null;
+ }
+ }
+
+ private static FieldFormatter toFormatter(int index, LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case DATE:
+ return (row) -> LocalDate.ofEpochDay(row.getInt(index)).toString();
+ case TIME_WITHOUT_TIME_ZONE:
+ return (row) ->
+ LocalTime.ofNanoOfDay((long) row.getInt(index) * 1_000_000L).toString();
+ case INTERVAL_YEAR_MONTH:
+ return (row) -> Period.ofDays(row.getInt(index)).toString();
+ case INTERVAL_DAY_TIME:
+ return (row) -> Duration.ofMillis(row.getLong(index)).toString();
+ case DISTINCT_TYPE:
+ return toFormatter(index, ((DistinctType) type).getSourceType());
+ default:
+ RowData.FieldGetter fieldGetter = RowData.createFieldGetter(type, index);
+ return row -> fieldGetter.getFieldOrNull(row).toString();
+ }
+ }
+}
diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/LogicalTypeWithIndex.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/LogicalTypeWithIndex.java
new file mode 100644
index 00000000..7891e50a
--- /dev/null
+++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/LogicalTypeWithIndex.java
@@ -0,0 +1,13 @@
+package org.apache.flink.connector.elasticsearch.table;
+
+import org.apache.flink.table.types.logical.LogicalType;
+
+class LogicalTypeWithIndex {
+ public final int index;
+ public final LogicalType logicalType;
+
+ LogicalTypeWithIndex(int index, LogicalType logicalType) {
+ this.index = index;
+ this.logicalType = logicalType;
+ }
+}
diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/RowDataElementConverter.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/RowDataElementConverter.java
new file mode 100644
index 00000000..a1e499ed
--- /dev/null
+++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/RowDataElementConverter.java
@@ -0,0 +1,81 @@
+package org.apache.flink.connector.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant;
+import co.elastic.clients.elasticsearch.core.bulk.DeleteOperation;
+import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
+import co.elastic.clients.elasticsearch.core.bulk.UpdateOperation;
+
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Implementation of an {@link ElementConverter} for the ElasticSearch Table sink. The element
+ * converter maps the Flink internal type of {@link RowData} to a {@link BulkOperationVariant} to be
+ * used by Elasticsearch Java API
+ */
+@Internal
+public class RowDataElementConverter implements ElementConverter {
+ private final IndexGenerator indexGenerator;
+ private final Function keyExtractor;
+ private final RowDataToMapConverter rowDataToMapConverter;
+
+ public RowDataElementConverter(
+ DataType physicalDataType,
+ IndexGenerator indexGenerator,
+ Function keyExtractor) {
+ this.rowDataToMapConverter = new RowDataToMapConverter(physicalDataType);
+ this.indexGenerator = indexGenerator;
+ this.keyExtractor = keyExtractor;
+ }
+
+ @Override
+ public void open(WriterInitContext context) {
+ indexGenerator.open();
+ }
+
+ @Override
+ public BulkOperationVariant apply(RowData rowData, SinkWriter.Context context) {
+ Map dataMap = rowDataToMapConverter.toMap(rowData);
+
+ BulkOperationVariant operation;
+
+ switch (rowData.getRowKind()) {
+ case INSERT:
+ operation =
+ new IndexOperation.Builder<>()
+ .index(indexGenerator.generate(rowData))
+ .id(keyExtractor.apply(rowData))
+ .document(dataMap)
+ .build();
+ break;
+ case UPDATE_AFTER:
+ operation =
+ new UpdateOperation.Builder<>()
+ .index(indexGenerator.generate(rowData))
+ .id(keyExtractor.apply(rowData))
+ .action(a -> a.doc(dataMap).docAsUpsert(true))
+ .build();
+ break;
+ case UPDATE_BEFORE:
+ case DELETE:
+ operation =
+ new DeleteOperation.Builder()
+ .index(indexGenerator.generate(rowData))
+ .id(keyExtractor.apply(rowData))
+ .build();
+ break;
+ default:
+ throw new TableException("Unsupported message kind: " + rowData.getRowKind());
+ }
+
+ return operation;
+ }
+}
diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/RowDataToMapConverter.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/RowDataToMapConverter.java
new file mode 100644
index 00000000..aa5fc098
--- /dev/null
+++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/RowDataToMapConverter.java
@@ -0,0 +1,44 @@
+package org.apache.flink.connector.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.types.DataType;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Tool class used to convert from {@link RowData} to {@link Map}. * */
+@Internal
+public class RowDataToMapConverter implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final DataType physicalDataType;
+
+ public RowDataToMapConverter(DataType physicalDataType) {
+ this.physicalDataType = physicalDataType;
+ }
+
+ public Map toMap(RowData rowData) {
+ List fields = DataType.getFields(physicalDataType);
+
+ Map map = new HashMap<>(fields.size());
+ for (int i = 0; i < fields.size(); i++) {
+ DataTypes.Field field = fields.get(i);
+ RowData.FieldGetter fieldGetter =
+ RowData.createFieldGetter(field.getDataType().getLogicalType(), i);
+
+ String key = field.getName();
+ Object value =
+ DataStructureConverters.getConverter(field.getDataType())
+ .toExternalOrNull(fieldGetter.getFieldOrNull(rowData));
+
+ map.put(key, value);
+ }
+ return map;
+ }
+}
diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/StaticIndexGenerator.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/StaticIndexGenerator.java
new file mode 100644
index 00000000..470ef6c1
--- /dev/null
+++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/StaticIndexGenerator.java
@@ -0,0 +1,17 @@
+package org.apache.flink.connector.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.RowData;
+
+/** A static {@link IndexGenerator} which generate fixed index name. */
+@Internal
+final class StaticIndexGenerator extends IndexGeneratorBase {
+
+ public StaticIndexGenerator(String index) {
+ super(index);
+ }
+
+ public String generate(RowData row) {
+ return index;
+ }
+}
diff --git a/flink-connector-elasticsearch8/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connector-elasticsearch8/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 00000000..de87735b
--- /dev/null
+++ b/flink-connector-elasticsearch8/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.connector.elasticsearch.table.ElasticSearch8AsyncDynamicTableFactory
+
diff --git a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriterITCase.java b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriterITCase.java
index c401e761..d65ae612 100644
--- a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriterITCase.java
+++ b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriterITCase.java
@@ -216,9 +216,13 @@ private NetworkConfig createNetworkConfig() {
ES_CLUSTER_USERNAME,
ES_CLUSTER_PASSWORD,
null,
+ null,
+ null,
+ null,
+ null,
() -> ES_CONTAINER_SECURE.createSslContextFromCa(),
null)
- : new NetworkConfig(esHost, null, null, null, null, null);
+ : new NetworkConfig(esHost, null, null, null, null, null, null, null, null, null);
}
private Elasticsearch8AsyncWriter createWriter(
diff --git a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicSinkBaseITCase.java b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicSinkBaseITCase.java
new file mode 100644
index 00000000..7ecde01b
--- /dev/null
+++ b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicSinkBaseITCase.java
@@ -0,0 +1,304 @@
+package org.apache.flink.connector.elasticsearch.table;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.elasticsearch.sink.NetworkConfig;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+
+import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
+import co.elastic.clients.elasticsearch.core.GetRequest;
+import co.elastic.clients.elasticsearch.core.SearchRequest;
+import co.elastic.clients.transport.TransportUtils;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.http.HttpHost;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.platform.commons.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.elasticsearch.table.TestContext.context;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * {@link Elasticsearch8DynamicSinkBaseITCase} is the base class for integration tests.
+ *
+ *
It is extended with the {@link ParameterizedTestExtension} for parameterized testing against
+ * secure and non-secure Elasticsearch clusters. Tests must be annotated by {@link TestTemplate} in
+ * order to be parameterized.
+ *
+ *
The cluster is running via test containers. In order to reuse the singleton containers by all
+ * inheriting test classes, we manage their lifecycle. The two containers are started only once when
+ * this class is loaded. At the end of the test suite the Ryuk container that is started by
+ * Testcontainers core will take care of stopping the singleton container.
+ */
+@ExtendWith(ParameterizedTestExtension.class)
+abstract class Elasticsearch8DynamicSinkBaseITCase {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(Elasticsearch8DynamicSinkBaseITCase.class);
+
+ public static final String ELASTICSEARCH_VERSION = "8.12.1";
+ public static final DockerImageName ELASTICSEARCH_IMAGE =
+ DockerImageName.parse("docker.elastic.co/elasticsearch/elasticsearch")
+ .withTag(ELASTICSEARCH_VERSION);
+ protected static final String ES_CLUSTER_USERNAME = "elastic";
+ protected static final String ES_CLUSTER_PASSWORD = "s3cret";
+ protected static final ElasticsearchContainer ES_CONTAINER = createElasticsearchContainer();
+ protected static final ElasticsearchContainer ES_CONTAINER_SECURE =
+ createSecureElasticsearchContainer();
+
+ protected static String certificateFingerprint = null;
+
+ // Use singleton test containers which are only started once for several test classes.
+ // There is no special support for this use case provided by the Testcontainers
+ // extension @Testcontainers.
+ static {
+ ES_CONTAINER.start();
+ ES_CONTAINER_SECURE.start();
+ }
+
+ @Parameter public boolean secure;
+
+ protected ElasticsearchAsyncClient client;
+
+ @Parameters(name = "ES secured = {0}")
+ public static List secureEnabled() {
+ return Arrays.asList(false, true);
+ }
+
+ @BeforeEach
+ public void setUpBase() {
+ LOG.info("Setting up elasticsearch client, host: {}, secure: {}", getHost(), secure);
+ certificateFingerprint = secure ? getEsCertFingerprint() : null;
+ assertThat(secure).isEqualTo(StringUtils.isNotBlank(certificateFingerprint));
+ client = secure ? createSecureElasticsearchClient() : createElasticsearchClient();
+ }
+
+ @AfterEach
+ public void shutdownBase() throws IOException {
+ client.shutdown();
+ }
+
+ private String getEsCertFingerprint() {
+ if (!ES_CONTAINER_SECURE.caCertAsBytes().isPresent()) {
+ LOG.error("cannot get the CA cert from the docker container.");
+ return null;
+ }
+
+ byte[] caCertBytes = ES_CONTAINER_SECURE.caCertAsBytes().get();
+
+ CertificateFactory cf;
+ byte[] fingerprintBytes = new byte[0];
+ try {
+ cf = CertificateFactory.getInstance("X.509");
+ X509Certificate caCert =
+ (X509Certificate)
+ cf.generateCertificate(new java.io.ByteArrayInputStream(caCertBytes));
+
+ MessageDigest md = MessageDigest.getInstance("SHA-256");
+ fingerprintBytes = md.digest(caCert.getEncoded());
+ } catch (CertificateException | NoSuchAlgorithmException e) {
+ LOG.error("failed to compute certificate fingerprint: ", e);
+ }
+
+ return Hex.encodeHexString(fingerprintBytes);
+ }
+
+ TestContext getPrefilledTestContext(String index) {
+ TestContext testContext =
+ context()
+ .withOption(Elasticsearch8ConnectorOptions.INDEX_OPTION.key(), index)
+ .withOption(
+ Elasticsearch8ConnectorOptions.HOSTS_OPTION.key(),
+ secure
+ ? "https://" + ES_CONTAINER_SECURE.getHttpHostAddress()
+ : "http://" + ES_CONTAINER.getHttpHostAddress());
+ if (secure) {
+ testContext
+ .withOption(
+ Elasticsearch8ConnectorOptions.USERNAME_OPTION.key(),
+ ES_CLUSTER_USERNAME)
+ .withOption(
+ Elasticsearch8ConnectorOptions.PASSWORD_OPTION.key(),
+ ES_CLUSTER_PASSWORD)
+ .withOption(
+ Elasticsearch8ConnectorOptions.SSL_CERTIFICATE_FINGERPRINT.key(),
+ certificateFingerprint);
+ }
+ return testContext;
+ }
+
+ @SuppressWarnings({"unchecked"})
+ Map makeGetRequest(String index, String id)
+ throws ExecutionException, InterruptedException {
+ return (Map)
+ client.get(new GetRequest.Builder().index(index).id(id).build(), Map.class)
+ .get()
+ .source();
+ }
+
+ @SuppressWarnings({"unchecked"})
+ List