Skip to content

Prototype for using routing path in logs mode #109398

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
Draft
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
import org.elasticsearch.index.mapper.DimensionHasher;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.AggregatorReducer;
import org.elasticsearch.search.aggregations.InternalAggregation;
Expand Down Expand Up @@ -67,7 +67,7 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public Map<String, Object> getKey() {
return TimeSeriesIdFieldMapper.decodeTsidAsMap(key);
return DimensionHasher.decodeAsMap(key);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.mapper.RoutingDimensions;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.Aggregator;
Expand Down Expand Up @@ -160,11 +161,11 @@ public void collect(int doc, long bucket) throws IOException {
if (currentTsidOrd == aggCtx.getTsidHashOrd()) {
tsid = currentTsid;
} else {
TimeSeriesIdFieldMapper.TimeSeriesIdBuilder tsidBuilder = new TimeSeriesIdFieldMapper.TimeSeriesIdBuilder(null);
RoutingDimensions routingDimensions = new RoutingDimensions(null);
for (TsidConsumer consumer : dimensionConsumers.values()) {
consumer.accept(doc, tsidBuilder);
consumer.accept(doc, routingDimensions);
}
currentTsid = tsid = tsidBuilder.buildLegacyTsid().toBytesRef();
currentTsid = tsid = TimeSeriesIdFieldMapper.buildLegacyTsid(routingDimensions).toBytesRef();
}
long bucketOrdinal = bucketOrds.add(bucket, tsid);
if (bucketOrdinal < 0) { // already seen
Expand All @@ -188,6 +189,6 @@ InternalTimeSeries buildResult(InternalTimeSeries.InternalBucket[] topBuckets) {

@FunctionalInterface
interface TsidConsumer {
void accept(int docId, TimeSeriesIdFieldMapper.TimeSeriesIdBuilder tsidBuilder) throws IOException;
void accept(int docId, RoutingDimensions tsidBuilder) throws IOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.index.mapper.RoutingDimensions;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
Expand Down Expand Up @@ -41,12 +42,12 @@ private List<InternalBucket> randomBuckets(boolean keyed, InternalAggregations a
List<Map<String, Object>> keys = randomKeys(bucketKeys(randomIntBetween(1, 4)), numberOfBuckets);
for (int j = 0; j < numberOfBuckets; j++) {
long docCount = randomLongBetween(0, Long.MAX_VALUE / (20L * numberOfBuckets));
var builder = new TimeSeriesIdFieldMapper.TimeSeriesIdBuilder(null);
var routingDimensions = new RoutingDimensions(null);
for (var entry : keys.get(j).entrySet()) {
builder.addString(entry.getKey(), (String) entry.getValue());
routingDimensions.addString(entry.getKey(), (String) entry.getValue());
}
try {
var key = builder.buildLegacyTsid().toBytesRef();
var key = TimeSeriesIdFieldMapper.buildLegacyTsid(routingDimensions).toBytesRef();
bucketList.add(new InternalBucket(key, docCount, aggregations, keyed));
} catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperBuilderContext;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.mapper.RoutingDimensions;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper.TimeSeriesIdBuilder;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
Expand Down Expand Up @@ -92,10 +92,10 @@ public static void writeTS(RandomIndexWriter iw, long timestamp, Object[] dimens
final List<IndexableField> fields = new ArrayList<>();
fields.add(new SortedNumericDocValuesField(DataStreamTimestampFieldMapper.DEFAULT_PATH, timestamp));
fields.add(new LongPoint(DataStreamTimestampFieldMapper.DEFAULT_PATH, timestamp));
final TimeSeriesIdBuilder builder = new TimeSeriesIdBuilder(null);
final RoutingDimensions routingDimensions = new RoutingDimensions(null);
for (int i = 0; i < dimensions.length; i += 2) {
if (dimensions[i + 1] instanceof Number n) {
builder.addLong(dimensions[i].toString(), n.longValue());
routingDimensions.addLong(dimensions[i].toString(), n.longValue());
if (dimensions[i + 1] instanceof Integer || dimensions[i + 1] instanceof Long) {
fields.add(new NumericDocValuesField(dimensions[i].toString(), ((Number) dimensions[i + 1]).longValue()));
} else if (dimensions[i + 1] instanceof Float) {
Expand All @@ -104,7 +104,7 @@ public static void writeTS(RandomIndexWriter iw, long timestamp, Object[] dimens
fields.add(new DoubleDocValuesField(dimensions[i].toString(), (double) dimensions[i + 1]));
}
} else {
builder.addString(dimensions[i].toString(), dimensions[i + 1].toString());
routingDimensions.addString(dimensions[i].toString(), dimensions[i + 1].toString());
fields.add(new SortedSetDocValuesField(dimensions[i].toString(), new BytesRef(dimensions[i + 1].toString())));
}
}
Expand All @@ -117,7 +117,9 @@ public static void writeTS(RandomIndexWriter iw, long timestamp, Object[] dimens
fields.add(new DoubleDocValuesField(metrics[i].toString(), (double) metrics[i + 1]));
}
}
fields.add(new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, builder.buildLegacyTsid().toBytesRef()));
fields.add(
new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, TimeSeriesIdFieldMapper.buildLegacyTsid(routingDimensions).toBytesRef())
);
iw.addDocument(fields);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ setup:
reason: "Error message changed in 8.15.0"

- do:
catch: '/Indices with with index mode \[time_series\] only support synthetic source/'
catch: '/Time series indices only support synthetic source/'
indices.create:
index: tsdb_error
body:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.datastreams;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.FormatNames;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.xcontent.XContentType;

import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.UUID;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

public class LogsIndexingIT extends ESSingleNodeTestCase {

public static final String MAPPING_TEMPLATE = """
{
"_doc":{
"properties": {
"@timestamp" : {
"type": "date"
},
"metricset": {
"type": "keyword"
}
}
}
}""";

private static final String DOC = """
{
"@timestamp": "$time",
"metricset": "pod",
"k8s": {
"pod": {
"name": "dog",
"uid":"$uuid",
"ip": "10.10.55.3",
"network": {
"tx": 1434595272,
"rx": 530605511
}
}
}
}
""";

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return List.of(DataStreamsPlugin.class, InternalSettingsPlugin.class);
}

public void testIndexSearchAndRetrieval() throws Exception {
String[] uuis = {
UUID.randomUUID().toString(),
UUID.randomUUID().toString(),
UUID.randomUUID().toString(),
UUID.randomUUID().toString() };

String dataStreamName = "k8s";
var putTemplateRequest = new TransportPutComposableIndexTemplateAction.Request("id");
putTemplateRequest.indexTemplate(
ComposableIndexTemplate.builder()
.indexPatterns(List.of(dataStreamName + "*"))
.template(
new Template(
Settings.builder()
.put("index.mode", "logs")
.put("index.routing_path", "metricset,k8s.pod.uid")
.put("index.number_of_replicas", 0)
// Reduce sync interval to speedup this integraton test,
// otherwise by default it will take 30 seconds before minimum retained seqno is updated:
.put("index.soft_deletes.retention_lease.sync_interval", "100ms")
.build(),
new CompressedXContent(MAPPING_TEMPLATE),
null
)
)
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false))
.build()
);
client().execute(TransportPutComposableIndexTemplateAction.TYPE, putTemplateRequest).actionGet();

// index some data
int numBulkRequests = randomIntBetween(128, 1024);
;
int numDocsPerBulk = randomIntBetween(16, 256);
String indexName = null;
{
Instant time = Instant.now();
for (int i = 0; i < numBulkRequests; i++) {
BulkRequest bulkRequest = new BulkRequest(dataStreamName);
for (int j = 0; j < numDocsPerBulk; j++) {
var indexRequest = new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.CREATE);
indexRequest.source(
DOC.replace("$time", formatInstant(time)).replace("$uuid", uuis[j % uuis.length]),
XContentType.JSON
);
bulkRequest.add(indexRequest);
time = time.plusMillis(1);
}
var bulkResponse = client().bulk(bulkRequest).actionGet();
assertThat(bulkResponse.hasFailures(), is(false));
indexName = bulkResponse.getItems()[0].getIndex();
}
client().admin().indices().refresh(new RefreshRequest(dataStreamName)).actionGet();
}

// Check the search api can synthesize _id
final String idxName = indexName;
var searchRequest = new SearchRequest(dataStreamName);
searchRequest.source().trackTotalHits(true);
assertResponse(client().search(searchRequest), searchResponse -> {
assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) numBulkRequests * numDocsPerBulk));
String id = searchResponse.getHits().getHits()[0].getId();
assertThat(id, notNullValue());

// Check that the _id is gettable:
var getResponse = client().get(new GetRequest(idxName).id(id)).actionGet();
assertThat(getResponse.isExists(), is(true));
assertThat(getResponse.getId(), equalTo(id));
});
}

static String formatInstant(Instant instant) {
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,14 @@
import org.elasticsearch.geometry.Geometry;
import org.elasticsearch.geometry.Point;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.DimensionRoutingHashFieldMapper;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.OnScriptError;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.TimeSeriesRoutingHashFieldMapper;
import org.elasticsearch.index.query.AbstractQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.SearchExecutionContext;
Expand Down Expand Up @@ -819,13 +818,13 @@ private static Response prepareRamIndex(
BytesReference document = request.contextSetup.document;
XContentType xContentType = request.contextSetup.xContentType;

SourceToParse sourceToParse = (indexService.getIndexSettings().getMode() == IndexMode.TIME_SERIES)
SourceToParse sourceToParse = (indexService.getIndexSettings().usesRoutingPath())
? new SourceToParse(
null,
document,
xContentType,
indexService.getIndexSettings().getIndexVersionCreated().onOrAfter(IndexVersions.TIME_SERIES_ROUTING_HASH_IN_ID)
? TimeSeriesRoutingHashFieldMapper.DUMMY_ENCODED_VALUE
? DimensionRoutingHashFieldMapper.DUMMY_ENCODED_VALUE
: null
)
: new SourceToParse("_id", document, xContentType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.index.analysis.NamedAnalyzer;
import org.elasticsearch.index.analysis.StandardTokenizerFactory;
import org.elasticsearch.index.analysis.TokenFilterFactory;
import org.elasticsearch.index.mapper.DimensionRoutingHashFieldMapper;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperParsingException;
Expand All @@ -44,7 +45,6 @@
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.TextFieldFamilySyntheticSourceTestSetup;
import org.elasticsearch.index.mapper.TextFieldMapper;
import org.elasticsearch.index.mapper.TimeSeriesRoutingHashFieldMapper;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
Expand Down Expand Up @@ -332,7 +332,7 @@ public void testStoreParameterDefaults() throws IOException {
});
DocumentMapper mapper = createMapperService(getVersion(), indexSettings, () -> true, mapping).documentMapper();

var source = source(TimeSeriesRoutingHashFieldMapper.DUMMY_ENCODED_VALUE, b -> {
var source = source(DimensionRoutingHashFieldMapper.DUMMY_ENCODED_VALUE, b -> {
b.field("field", "1234");
if (timeSeriesIndexMode) {
b.field("@timestamp", randomMillisUpToYear9999());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ missing hostname field:

- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.type: "illegal_argument_exception" }
- match: { error.reason: "unknown index sort field:[hostname]" }
- match: { error.reason: "unknown index sort field:[hostname] required by [index.mode=logs]" }

---
missing sort field:
Expand Down Expand Up @@ -190,7 +190,7 @@ missing sort field:

- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.type: "illegal_argument_exception" }
- match: { error.reason: "unknown index sort field:[host_name]" }
- match: { error.reason: "unknown index sort field:[host_name] required by [index.mode=logs]" }

---
non-default sort settings:
Expand Down Expand Up @@ -480,7 +480,7 @@ override sort field using nested field type:
- match: { error.reason: "cannot have nested fields when index sort is activated" }

---
routing path not allowed in logs mode:
routing path allowed in logs mode:
- requires:
test_runner_features: [ capabilities ]
capabilities:
Expand All @@ -490,7 +490,6 @@ routing path not allowed in logs mode:
reason: "Support for 'logs' index mode capability required"

- do:
catch: bad_request
indices.create:
index: test
body:
Expand All @@ -515,9 +514,13 @@ routing path not allowed in logs mode:
message:
type: text

- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.type: "illegal_argument_exception" }
- match: { error.reason: "[index.routing_path] requires [index.mode=time_series]" }
- do:
indices.get_settings:
index: test

- is_true: test
- match: { test.settings.index.mode: "logs" }
- match: { test.settings.index.routing_path: [ "hostname", "agent_id"] }

---
start time not allowed in logs mode:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public static boolean isFailureStoreFeatureFlagEnabled() {
public static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern("uuuu.MM.dd");
public static final String TIMESTAMP_FIELD_NAME = "@timestamp";
// Timeseries indices' leaf readers should be sorted by desc order of their timestamp field, as it allows search time optimizations
public static Comparator<LeafReader> TIMESERIES_LEAF_READERS_SORTER = Comparator.comparingLong((LeafReader r) -> {
public static Comparator<LeafReader> TIME_LEAF_READERS_SORTER = Comparator.comparingLong((LeafReader r) -> {
try {
PointValues points = r.getPointValues(TIMESTAMP_FIELD_NAME);
if (points != null) {
Expand Down
Loading