Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OAK-11568 Elastic: improved compatibility for aggregation definitions #2193

Open
wants to merge 5 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
*/
package org.apache.jackrabbit.oak.plugins.index.elastic;

import org.apache.commons.io.FilenameUtils;
import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
import org.apache.jackrabbit.oak.cache.CacheStats;
import org.apache.jackrabbit.oak.commons.IOUtils;
import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
import org.apache.jackrabbit.oak.plugins.index.AsyncIndexInfoService;
Expand Down Expand Up @@ -50,13 +47,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.ArrayList;
import java.util.Dictionary;
import java.util.Hashtable;
import java.util.List;

import static org.apache.commons.io.FileUtils.ONE_MB;
import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.scheduleWithFixedDelay;

Expand Down Expand Up @@ -130,8 +125,6 @@ public class ElasticIndexProviderService {

private static final Logger LOG = LoggerFactory.getLogger(ElasticIndexProviderService.class);

private static final String REPOSITORY_HOME = "repository.home";

@Reference
private StatisticsProvider statisticsProvider;

Expand All @@ -149,11 +142,10 @@ public class ElasticIndexProviderService {

private ExtractedTextCache extractedTextCache;

private final List<ServiceRegistration> regs = new ArrayList<>();
private final List<ServiceRegistration<?>> regs = new ArrayList<>();
private final List<Registration> oakRegs = new ArrayList<>();

private Whiteboard whiteboard;
private File textExtractionDir;

private ElasticConnection elasticConnection;
private ElasticMetricHandler metricHandler;
Expand Down Expand Up @@ -200,7 +192,7 @@ private void activate(BundleContext bundleContext, Config config) {

@Deactivate
private void deactivate() {
for (ServiceRegistration reg : regs) {
for (ServiceRegistration<?> reg : regs) {
reg.unregister();
}

Expand Down Expand Up @@ -242,63 +234,6 @@ private void registerIndexEditor(BundleContext bundleContext) {
Dictionary<String, Object> props = new Hashtable<>();
props.put("type", ElasticIndexDefinition.TYPE_ELASTICSEARCH);
regs.add(bundleContext.registerService(IndexEditorProvider.class.getName(), editorProvider, props));
// oakRegs.add(registerMBean(whiteboard,
// TextExtractionStatsMBean.class,
// editorProvider.getExtractedTextCache().getStatsMBean(),
// TextExtractionStatsMBean.TYPE,
// "TextExtraction statistics"));
}

private void initializeExtractedTextCache(final Config config, StatisticsProvider statisticsProvider) {

extractedTextCache = new ExtractedTextCache(
config.extractedTextCacheSizeInMB() * ONE_MB,
config.extractedTextCacheExpiryInSecs(),
config.alwaysUsePreExtractedCache(),
textExtractionDir,
statisticsProvider);
if (extractedTextProvider != null) {
registerExtractedTextProvider(extractedTextProvider);
}
CacheStats stats = extractedTextCache.getCacheStats();
if (stats != null) {
oakRegs.add(registerMBean(whiteboard,
CacheStatsMBean.class, stats,
CacheStatsMBean.TYPE, stats.getName()));
LOG.info("Extracted text caching enabled with maxSize {} MB, expiry time {} secs",
config.extractedTextCacheSizeInMB(), config.extractedTextCacheExpiryInSecs());
}
}

private void initializeTextExtractionDir(BundleContext bundleContext, Config config) {
String textExtractionDir = config.localTextExtractionDir();
if (textExtractionDir.trim().isEmpty()) {
String repoHome = bundleContext.getProperty(REPOSITORY_HOME);
if (repoHome != null) {
textExtractionDir = FilenameUtils.concat(repoHome, "index");
}
}

if (textExtractionDir == null) {
throw new IllegalStateException(String.format("Text extraction directory cannot be determined as neither " +
"directory path [%s] nor repository home [%s] defined", PROP_LOCAL_TEXT_EXTRACTION_DIR, REPOSITORY_HOME));
}

this.textExtractionDir = new File(textExtractionDir);
}

private void registerExtractedTextProvider(PreExtractedTextProvider provider) {
if (extractedTextCache != null) {
if (provider != null) {
String usage = extractedTextCache.isAlwaysUsePreExtractedCache() ?
"always" : "only during reindexing phase";
LOG.info("Registering PreExtractedTextProvider {} with extracted text cache. " +
"It would be used {}", provider, usage);
} else {
LOG.info("Unregistering PreExtractedTextProvider with extracted text cache");
}
extractedTextCache.setExtractedTextProvider(provider);
}
}

private ElasticConnection getElasticConnection(Config contextConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ private BulkIngester<String> initBulkIngester() {

private void checkFailures() throws IOException {
if (!suppressedErrorCauses.isEmpty()) {
IOException ioe = new IOException("Exception while indexing. See suppressed for details");
IOException ioe = new IOException("Exception while indexing " + indexName + ". See suppressed for details");
suppressedErrorCauses.stream().map(ec -> new IllegalStateException(ec.reason())).forEach(ioe::addSuppressed);
throw ioe;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import co.elastic.clients.elasticsearch._types.analysis.Analyzer;
import co.elastic.clients.elasticsearch._types.analysis.CharFilterDefinition;
import co.elastic.clients.elasticsearch._types.analysis.CustomAnalyzer;
import co.elastic.clients.elasticsearch._types.analysis.NGramTokenizer;
import co.elastic.clients.elasticsearch._types.analysis.TokenFilterDefinition;
import co.elastic.clients.elasticsearch._types.analysis.TokenizerDefinition;
import co.elastic.clients.elasticsearch.indices.IndexSettingsAnalysis;
Expand All @@ -40,6 +41,7 @@
import org.apache.lucene.analysis.AbstractAnalysisFactory;
import org.apache.lucene.analysis.CharFilterFactory;
import org.apache.lucene.analysis.TokenFilterFactory;
import org.apache.lucene.analysis.charfilter.MappingCharFilterFactory;
import org.apache.lucene.analysis.en.AbstractWordsFileFilterFactory;
import org.apache.lucene.util.ResourceLoader;
import org.jetbrains.annotations.NotNull;
Expand All @@ -55,6 +57,7 @@
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -97,7 +100,13 @@ public static IndexSettingsAnalysis.Builder buildCustomAnalyzers(NodeState state
NodeState defaultAnalyzer = state.getChildNode(FulltextIndexConstants.ANL_DEFAULT);
if (defaultAnalyzer.exists()) {
IndexSettingsAnalysis.Builder builder = new IndexSettingsAnalysis.Builder();
Map<String, Object> analyzer = convertNodeState(defaultAnalyzer);
Map<String, Object> analyzer;
try {
analyzer = convertNodeState(defaultAnalyzer);
} catch (IOException e) {
LOG.warn("Can not load analyzer; using an empty configuration", e);
analyzer = Map.of();
}
String builtIn = defaultAnalyzer.getString(FulltextIndexConstants.ANL_CLASS);
if (builtIn == null) {
builtIn = defaultAnalyzer.getString(FulltextIndexConstants.ANL_NAME);
Expand All @@ -107,11 +116,14 @@ public static IndexSettingsAnalysis.Builder buildCustomAnalyzers(NodeState state

// content params, usually stop words
for (ChildNodeEntry nodeEntry : defaultAnalyzer.getChildNodeEntries()) {
List<String> list;
try {
analyzer.put(normalize(nodeEntry.getName()), loadContent(nodeEntry.getNodeState(), nodeEntry.getName(), NOOP_TRANSFORMATION));
list = loadContent(nodeEntry.getNodeState(), nodeEntry.getName(), NOOP_TRANSFORMATION);
} catch (IOException e) {
throw new IllegalStateException("Unable to load content for node entry " + nodeEntry.getName(), e);
LOG.warn("Unable to load analyzer content for entry '" + nodeEntry.getName() + "'; using empty list", e);
list = List.of();
}
analyzer.put(normalize(nodeEntry.getName()), list);
}

builder.analyzer(analyzerName, new Analyzer(null, JsonData.of(analyzer)));
Expand Down Expand Up @@ -145,25 +157,59 @@ public static IndexSettingsAnalysis.Builder buildCustomAnalyzers(NodeState state

@NotNull
private static TokenizerDefinition loadTokenizer(NodeState state) {
String name = normalize(Objects.requireNonNull(state.getString(FulltextIndexConstants.ANL_NAME)));
Map<String, Object> args = convertNodeState(state);
String name;
Map<String, Object> args;
if (!state.exists()) {
LOG.warn("No tokenizer specified; the standard with an empty configuration");
name = "Standard";
args = new HashMap<String, Object>();
} else {
name = Objects.requireNonNull(state.getString(FulltextIndexConstants.ANL_NAME));
try {
args = convertNodeState(state);
} catch (IOException e) {
LOG.warn("Can not load tokenizer; using an empty configuration", e);
args = new HashMap<String, Object>();
}
}
name = normalize(name);
if ("n_gram".equals(name)) {
// OAK-11568
// https://www.elastic.co/guide/en/elasticsearch/reference/current/analysis-ngram-tokenizer.html
Integer minGramSize = getIntegerSetting(args, "minGramSize", 2);
Integer maxGramSize = getIntegerSetting(args, "maxGramSize", 3);
TokenizerDefinition ngram = TokenizerDefinition.of(t -> t.ngram(
NGramTokenizer.of(n -> n.minGram(minGramSize).maxGram(maxGramSize))));
return ngram;
}
args.put(ANALYZER_TYPE, name);
return new TokenizerDefinition(name, JsonData.of(args));
}

private static Integer getIntegerSetting(Map<String, Object> args, String name, Integer defaultValue) {
Object value = args.getOrDefault(name, defaultValue);
if (!(value instanceof Integer)) {
LOG.warn("Setting {} value {} is not an integer; using default: {}", name, value, defaultValue);
return defaultValue;
}
return (Integer) value;
}

private static <FD> LinkedHashMap<String, FD> loadFilters(NodeState state,
Function<String, Class<? extends AbstractAnalysisFactory>> lookup,
BiFunction<String, JsonData, FD> factory) {
LinkedHashMap<String, FD> filters = new LinkedHashMap<>();
int i = 0;
//Need to read children in order
Tree tree = TreeFactory.createReadOnlyTree(state);
String wordDelimiterFilterKey = null;
for (Tree t : tree.getChildren()) {
NodeState child = state.getChildNode(t.getName());

String name;
List<String> content = null;
List<ParameterTransformer> transformers;
boolean skipEntry = false;
try {
Class<? extends AbstractAnalysisFactory> tff = lookup.apply(t.getName());

Expand All @@ -184,6 +230,13 @@ private static <FD> LinkedHashMap<String, FD> loadFilters(NodeState state,
wordsFF.inform(new NodeStateResourceLoader(child));
content = wordsFF.getWords().stream().map(w -> new String(((char[]) w))).collect(Collectors.toList());
}
if (luceneFactory instanceof MappingCharFilterFactory) {
MappingCharFilterFactory map = (MappingCharFilterFactory) luceneFactory;
if (map.getOriginalArgs().isEmpty()) {
skipEntry = true;
LOG.warn("Empty CharFilter mapping: ignoring");
}
}

name = normalize((String) tff.getField("NAME").get(null));
transformers = LUCENE_ELASTIC_TRANSFORMERS.entrySet().stream()
Expand Down Expand Up @@ -221,14 +274,35 @@ private static <FD> LinkedHashMap<String, FD> loadFilters(NodeState state,
}
args.put(ANALYZER_TYPE, name);

filters.put(name + "_" + i, factory.apply(name, JsonData.of(args)));
if (skipEntry) {
continue;
}
String key = name + "_" + i;
filters.put(key, factory.apply(name, JsonData.of(args)));
if (name.equals("word_delimiter")) {
wordDelimiterFilterKey = key;
} else if (name.equals("synonym")) {
if (wordDelimiterFilterKey != null) {
// re-order the synonyms filter _before_ the word delimiter, to avoid
// "Token filter [word_delimiter_1] cannot be used to parse synonyms"
i++;
String newKey = key = "word_delimiter_" + i;
filters.put(newKey, filters.remove(wordDelimiterFilterKey));
wordDelimiterFilterKey = newKey;
}
}
i++;
}
return filters;
}

private static List<String> loadContent(NodeState file, String name, ContentTransformer transformer) throws IOException {
Blob blob = ConfigUtil.getBlob(file, name);
Blob blob;
try {
blob = ConfigUtil.getBlob(file, name);
} catch (IllegalArgumentException | IllegalStateException e) {
throw new IOException("Could not load " + name, e);
}
try (Reader content = new InputStreamReader(Objects.requireNonNull(blob).getNewStream(), StandardCharsets.UTF_8)) {
try (BufferedReader br = new BufferedReader(content)) {
return br.lines()
Expand Down Expand Up @@ -264,11 +338,25 @@ private static String normalize(String value) {
return name;
}

private static Map<String, Object> convertNodeState(NodeState state) {
return convertNodeState(state, List.of(), List.of());
private static Map<String, Object> convertNodeState(NodeState state) throws IOException {
try {
return convertNodeState(state, List.of(), List.of());
} catch (IllegalStateException e) {
// convert runtime exception back to checked exception
throw new IOException("Can not convert", e);
}
}

private static Map<String, Object> convertNodeState(NodeState state, List<ParameterTransformer> transformers, List<String> preloadedContent) {
/**
* Read analyzer configuration.
*
* @param state the node state
* @param transformers
* @param preloadedContent
* @return
* @throws IllegalStateException
*/
private static Map<String, Object> convertNodeState(NodeState state, List<ParameterTransformer> transformers, List<String> preloadedContent) throws IllegalStateException {
Map<String, Object> luceneParams = StreamSupport.stream(Spliterators.spliteratorUnknownSize(state.getProperties().iterator(), Spliterator.ORDERED), false)
.filter(ElasticCustomAnalyzer::isPropertySupported)
.collect(Collectors.toMap(PropertyState::getName, ps -> {
Expand All @@ -280,6 +368,8 @@ private static Map<String, Object> convertNodeState(NodeState state, List<Parame
return loadContent(state.getChildNode(v.trim()), v.trim(),
CONTENT_TRANSFORMERS.getOrDefault(ps.getName(), NOOP_TRANSFORMATION)).stream();
} catch (IOException e) {
// convert checked exception to runtime exception to runtime exception,
// because the stream API doesn't support checked exceptions
throw new IllegalStateException(e);
}
}).collect(Collectors.toList()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ void addFulltextRelative(String path, String value) {
map -> {
Object existingValue = map.get(ElasticIndexHelper.DYNAMIC_PROPERTY_VALUE);
if (existingValue instanceof Set) {
@SuppressWarnings("unchecked")
Set<Object> existingSet = (Set<Object>) existingValue;
existingSet.add(value);
} else {
Expand Down Expand Up @@ -134,6 +135,7 @@ void addProperty(String fieldName, Object value) {
if (existingValue == null) {
finalValue = value;
} else if (existingValue instanceof Set) {
@SuppressWarnings("unchecked")
Set<Object> existingSet = (Set<Object>) existingValue;
existingSet.add(value);
finalValue = existingSet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ElasticIndexEditorContext extends FulltextIndexEditorContext<ElasticDocume
}

@Override
public IndexDefinition.Builder newDefinitionBuilder() {
public ElasticIndexDefinition.Builder newDefinitionBuilder() {
return new ElasticIndexDefinition.Builder(((ElasticIndexDefinition) definition).getIndexPrefix());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,13 @@ public void hybridSearch() throws Exception {
for (String path : paths) {
URL json = this.getClass().getResource("/inference" + path + ".json");
if (json != null) {
Map<String, Object> map = mapper.readValue(json, Map.class);
@SuppressWarnings("unchecked")
Map<String, Collection<Double>> map = mapper.readValue(json, Map.class);
ObjectNode updateDoc = mapper.createObjectNode();
ObjectNode inferenceNode = updateDoc.putObject(ElasticIndexDefinition.INFERENCE);
ArrayNode embeddingsNode = inferenceNode.putObject("embeddings").putArray("value");
inferenceNode.putObject("metadata").put("updatedAt", Instant.now().toEpochMilli());
for (Double d : (Collection<Double>) map.get("embedding")) {
for (Double d : map.get("embedding")) {
embeddingsNode.add(d);
}
updateDocument(index, path, updateDoc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,11 @@ private void createTestData(Supplier<String> extraContentSupplier) throws Except

private void testQuery(String query, String language) throws Exception {
Result result = executeQuery(query, language, NO_BINDINGS);
Iterable<ResultRow> it = (Iterable<ResultRow>) result.getRows();
Iterator<ResultRow> iterator = it.iterator();
Iterator<? extends ResultRow> iterator = result.getRows().iterator();
long start = LOG_PERF.startForInfoLog("Getting result rows");
int i = 0;
while (iterator.hasNext()) {
ResultRow row = iterator.next();
iterator.next();
i++;
}
LOG_PERF.end(start, -1,-1, "{} Results fetched", i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public static synchronized ElasticsearchContainer getESTestServer() {
return CONTAINER;
}

@SuppressWarnings("resource")
private synchronized void setup() {
String esDockerImageVersion = ELASTIC_DOCKER_IMAGE_VERSION != null ? ELASTIC_DOCKER_IMAGE_VERSION : Version.VERSION.toString();
LOG.info("Elasticsearch test Docker image version: {}.", esDockerImageVersion);
Expand Down
Loading
Loading