Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
* from a logical description.
*/
public class ElasticsearchDynamicSource implements LookupTableSource, SupportsProjectionPushDown {
private final DecodingFormat<DeserializationSchema<RowData>> format;
private final ElasticsearchConfiguration config;
protected final DecodingFormat<DeserializationSchema<RowData>> format;
protected final ElasticsearchConfiguration config;
private final int lookupMaxRetryTimes;
private final LookupCache lookupCache;
private final String docType;
private final String summaryString;
private final ElasticsearchApiCallBridge<?> apiCallBridge;
private DataType physicalRowDataType;
protected final ElasticsearchApiCallBridge<?> apiCallBridge;
protected DataType physicalRowDataType;

public ElasticsearchDynamicSource(
DecodingFormat<DeserializationSchema<RowData>> format,
Expand Down Expand Up @@ -84,7 +84,7 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
}
}

private NetworkClientConfig buildNetworkClientConfig() {
protected NetworkClientConfig buildNetworkClientConfig() {
NetworkClientConfig.Builder builder = new NetworkClientConfig.Builder();
if (config.getUsername().isPresent()
&& !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ ElasticsearchConfiguration getConfiguration(FactoryUtil.TableFactoryHelper helpe
}

@Nullable
private LookupCache getLookupCache(ReadableConfig tableOptions) {
protected LookupCache getLookupCache(ReadableConfig tableOptions) {
LookupCache cache = null;
if (tableOptions
.get(LookupOptions.CACHE_TYPE)
Expand Down
8 changes: 8 additions & 0 deletions flink-connector-elasticsearch7/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.apache.flink.connector.elasticsearch.table;

import org.apache.flink.configuration.ReadableConfig;

import static org.apache.flink.connector.elasticsearch.table.Elasticsearch7ConnectorOptions.VECTOR_SEARCH_MAX_RETRIES;

/** Elasticsearch 7 specific configuration. */
public class Elasticsearch7Configuration extends ElasticsearchConfiguration {
Elasticsearch7Configuration(ReadableConfig config) {
super(config);
}

public int getVectorSearchMaxRetries() {
return config.get(VECTOR_SEARCH_MAX_RETRIES);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.apache.flink.connector.elasticsearch.table;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

/**
* Options specific for the Elasticsearch 7 connector. Public so that the {@link
* org.apache.flink.table.api.TableDescriptor} can access it.
*/
public class Elasticsearch7ConnectorOptions extends ElasticsearchConnectorOptions {
private Elasticsearch7ConnectorOptions() {}

public static final ConfigOption<Integer> VECTOR_SEARCH_MAX_RETRIES =
ConfigOptions.key("vector-search.max-retries")
.intType()
.defaultValue(3)
.withDescription("The max retry times for vector searching Elasticsearch.");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package org.apache.flink.connector.elasticsearch.table;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.connector.elasticsearch.ElasticsearchApiCallBridge;
import org.apache.flink.connector.elasticsearch.NetworkClientConfig;
import org.apache.flink.connector.elasticsearch.table.search.ElasticsearchRowDataVectorSearchFunction;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.VectorSearchTableSource;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.connector.source.search.VectorSearchFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;

import org.elasticsearch.client.RestHighLevelClient;

import javax.annotation.Nullable;

/**
* A {@link DynamicTableSource} that describes how to create a {@link Elasticsearch7DynamicSource}
* from a logical description.
*/
public class Elasticsearch7DynamicSource extends ElasticsearchDynamicSource
implements VectorSearchTableSource {

public Elasticsearch7DynamicSource(
DecodingFormat<DeserializationSchema<RowData>> format,
ElasticsearchConfiguration config,
DataType physicalRowDataType,
int lookupMaxRetryTimes,
String summaryString,
ElasticsearchApiCallBridge<RestHighLevelClient> apiCallBridge,
@Nullable LookupCache lookupCache,
@Nullable String docType) {
super(
format,
config,
physicalRowDataType,
lookupMaxRetryTimes,
summaryString,
apiCallBridge,
lookupCache,
docType);
}

@Override
public VectorSearchRuntimeProvider getSearchRuntimeProvider(
VectorSearchContext vectorSearchContext) {

NetworkClientConfig networkClientConfig = buildNetworkClientConfig();

ElasticsearchRowDataVectorSearchFunction vectorSearchFunction =
new ElasticsearchRowDataVectorSearchFunction(
this.format.createRuntimeDecoder(vectorSearchContext, physicalRowDataType),
((Elasticsearch7Configuration) config).getVectorSearchMaxRetries(),
config.getIndex(),
getSearchColumn(vectorSearchContext),
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
config.getHosts(),
networkClientConfig,
(ElasticsearchApiCallBridge<RestHighLevelClient>) apiCallBridge);

return VectorSearchFunctionProvider.of(vectorSearchFunction);
}

private String getSearchColumn(VectorSearchContext vectorSearchContext) {
int[][] searchColumns = vectorSearchContext.getSearchColumns();

if (searchColumns.length != 1) {
throw new IllegalArgumentException(
String.format(
"Elasticsearch only supports one search columns now, but input search columns size is %d.",
searchColumns.length));
}
int[] searchColumn = searchColumns[0];
if (searchColumn.length != 1) {
throw new IllegalArgumentException(
"Elasticsearch doesn't support to search data using nested columns.");
}
int searchColumnIndex = searchColumn[0];

if (searchColumnIndex < 0
|| searchColumnIndex >= physicalRowDataType.getChildren().size()) {
throw new ValidationException(
String.format(
"The specified search column with index %d doesn't exist in schema.",
searchColumnIndex));
}

DataType searchColumnType = physicalRowDataType.getChildren().get(searchColumnIndex);
if (!searchColumnType.getLogicalType().is(LogicalTypeRoot.ARRAY)
|| !((ArrayType) searchColumnType.getLogicalType())
.getElementType()
.is(LogicalTypeRoot.FLOAT)) {
throw new UnsupportedOperationException(
String.format(
"Elasticsearch only supports search data using float vector now, but input search column type is %s.",
searchColumnType));
}

return ((RowType) (physicalRowDataType.getLogicalType()))
.getFieldNames()
.get(searchColumnIndex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,22 @@
package org.apache.flink.connector.elasticsearch.table;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.elasticsearch.Elasticsearch7ApiCallBridge;
import org.apache.flink.connector.elasticsearch.ElasticsearchApiCallBridge;
import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.FactoryUtil;

import org.elasticsearch.client.RestHighLevelClient;

import static org.apache.flink.table.connector.source.lookup.LookupOptions.MAX_RETRIES;
import static org.elasticsearch.common.Strings.capitalize;

/** A {@link DynamicTableSinkFactory} for discovering {@link ElasticsearchDynamicSink}. */
@Internal
Expand All @@ -34,7 +46,38 @@ public Elasticsearch7DynamicTableFactory() {
}

@Override
ElasticsearchApiCallBridge<?> getElasticsearchApiCallBridge() {
ElasticsearchConfiguration getConfiguration(FactoryUtil.TableFactoryHelper helper) {
return new Elasticsearch7Configuration(helper.getOptions());
}

@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
final ReadableConfig options = helper.getOptions();
final DecodingFormat<DeserializationSchema<RowData>> format =
helper.discoverDecodingFormat(
DeserializationFormatFactory.class,
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions
.FORMAT_OPTION);

ElasticsearchConfiguration config = getConfiguration(helper);
helper.validate();
validateConfiguration(config);

return new Elasticsearch7DynamicSource(
format,
config,
context.getPhysicalRowDataType(),
options.get(MAX_RETRIES),
capitalize(FACTORY_IDENTIFIER),
getElasticsearchApiCallBridge(),
getLookupCache(options),
getDocumentType(config));
}

@Override
ElasticsearchApiCallBridge<RestHighLevelClient> getElasticsearchApiCallBridge() {
return new Elasticsearch7ApiCallBridge();
}
}
Loading