Skip to content

POC Reverse otel normalization #126100

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1991,7 +1991,6 @@ private void recordTelemetry() {

/**
* Extract telemetry data from the search response.
* @param searchResponse The final response from the search.
*/
private void extractCCSTelemetry(SearchResponse searchResponse) {
usageBuilder.took(searchResponse.getTookInMillis());
Expand Down
5 changes: 4 additions & 1 deletion server/src/main/java/org/elasticsearch/search/SearchHit.java
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,10 @@ public XContentBuilder toInnerXContent(XContentBuilder builder, Params params) t
}
}
if (source != null) {
XContentHelper.writeRawField(SourceFieldMapper.NAME, source, builder, params);
// don't write source as raw, but turn into map which will apply the MapFlattener
// this is most likely Very Bad for performance, but it will do for a POC
builder.field(SourceFieldMapper.NAME, getSourceAsMap());
// XContentHelper.writeRawField(SourceFieldMapper.NAME, source, builder, params);
}
if (documentFields.isEmpty() == false &&
// ignore fields all together if they are all empty
Expand Down
66 changes: 65 additions & 1 deletion server/src/main/java/org/elasticsearch/search/lookup/Source.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.Supplier;

Expand Down Expand Up @@ -119,6 +120,7 @@ public Map<String, Object> source() {
if (asMap == null) {
parseBytes();
}
asMap = MapFlattener.flattenMap(asMap);
return asMap;
}

Expand All @@ -139,6 +141,67 @@ public Source filter(SourceFilter sourceFilter) {
};
}

class MapFlattener {
public static Map<String, Object> flattenMap(Map<String, Object> input) {
Map<String, Object> result = new LinkedHashMap<>();
flattenHelper("", input, result);
applyCustomRules(result);
return result;
}

private static void flattenHelper(String prefix, Map<String, Object> current, Map<String, Object> result) {
for (Map.Entry<String, Object> entry : current.entrySet()) {
String newKey = prefix.isEmpty() ? entry.getKey() : prefix + "." + entry.getKey();
Object value = entry.getValue();

if (value instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> nestedMap = (Map<String, Object>) value;
flattenHelper(newKey, nestedMap, result);
} else {
result.put(newKey, value); // Store all values
}
}
}

private static void applyCustomRules(Map<String, Object> result) {
Map<String, Object> additionalEntries = new LinkedHashMap<>();

for (Map.Entry<String, Object> entry : new LinkedHashMap<>(result).entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();

if (key.startsWith("resource.attributes.")) {
String newKey = key.substring("resource.attributes.".length());
if (result.containsKey(newKey) == false) {
additionalEntries.put(newKey, value);
}
}
if (key.startsWith("attributes.")) {
String newKey = key.substring("attributes.".length());
if (result.containsKey(newKey) == false) {
additionalEntries.put(newKey, value);
}
}
}

if (result.containsKey("body.text") && result.containsKey("message") == false) {
additionalEntries.put("message", result.get("body.text"));
}
if (result.containsKey("severity_text") && result.containsKey("log.level") == false) {
additionalEntries.put("log.level", result.get("severity_text"));
}
if (result.containsKey("span_id") && result.containsKey("span.id") == false) {
additionalEntries.put("span.id", result.get("span_id"));
}
if (result.containsKey("trace_id") && result.containsKey("trace.id") == false) {
additionalEntries.put("trace.id", result.get("trace_id"));
}

result.putAll(additionalEntries);
}
}

/**
* Build a Source from a Map representation.
*
Expand Down Expand Up @@ -174,10 +237,11 @@ public Source filter(SourceFilter sourceFilter) {
}

private static BytesReference mapToBytes(Map<String, Object> value, XContentType xContentType) {
Map<String, Object> flattenedValue = MapFlattener.flattenMap(value);
BytesStreamOutput streamOutput = new BytesStreamOutput(1024);
try {
XContentBuilder builder = new XContentBuilder(xContentType.xContent(), streamOutput);
builder.value(value);
builder.value(flattenedValue);
return BytesReference.bytes(builder);
} catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down