Skip to content

Conversation

AndreKurait
Copy link
Member

@AndreKurait AndreKurait commented Oct 30, 2024

Description

Implements rfs document transforms.

input/output spec:

{
   "index": {
       "_index": "your_index_name",
       "_type": "your_type", // only for ES <=6.
       "_id": "doc_1234"
   }
   // Document source
   "source": {}
}

Note: This changes the current default behavior for ES6 regarding docIds, before we were retaining the lucene id, which would be modifying the docId as ES/OS sees it. Before: id = mytype#docId, now id = docId

Also, moved transformationPlugins from TrafficCapture to top level transformer package

  • Category: New feature
  • Why these changes are required? Flexibility during doc migrations, support for multi-type mappings
  • What is the old behavior before changes and new behavior after changes? No transformation support, added in this PR.

Issues Resolved

MIGRATIONS-2152

Is this a backport? If so, please add backport PR # and/or commits #

Testing

Added Unit testing

Performed performance testing in the cloud in a case where we were cpu bound on the workers. 5TB migration went from 35 -> 49 minutes with 200 workers

Check List

  • New functionality includes testing
    • All tests pass, including unit test, integration test and doctest
  • New functionality has been documented
  • Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Copy link

codecov bot commented Oct 30, 2024

Codecov Report

Attention: Patch coverage is 54.91803% with 55 lines in your changes missing coverage. Please review.

Project coverage is 80.43%. Comparing base (dc326f0) to head (f62c670).
Report is 17 commits behind head on main.

Files with missing lines Patch % Lines
...h/migrations/transform/TransformerConfigUtils.java 0.00% 20 Missing ⚠️
...rch/migrations/bulkload/common/BulkDocSection.java 73.84% 17 Missing ⚠️
...org/opensearch/migrations/RfsMigrateDocuments.java 0.00% 14 Missing ⚠️
.../opensearch/migrations/replay/TrafficReplayer.java 0.00% 2 Missing ⚠️
.../migrations/bulkload/common/DocumentReindexer.java 87.50% 1 Missing ⚠️
...h/migrations/bulkload/common/OpenSearchClient.java 50.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1110      +/-   ##
============================================
- Coverage     80.55%   80.43%   -0.13%     
- Complexity     2930     2947      +17     
============================================
  Files           390      393       +3     
  Lines         14462    14534      +72     
  Branches        998     1000       +2     
============================================
+ Hits          11650    11690      +40     
- Misses         2209     2243      +34     
+ Partials        603      601       -2     
Flag Coverage Δ
gradle-test 78.50% <54.91%> (-0.14%) ⬇️
python-test 90.33% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@AndreKurait AndreKurait force-pushed the RFSDocumentTransformations branch from 641dbff to 4941c99 Compare October 30, 2024 02:13
@AndreKurait AndreKurait marked this pull request as ready for review October 30, 2024 15:29
Signed-off-by: Andre Kurait <[email protected]>
@AndreKurait AndreKurait force-pushed the RFSDocumentTransformations branch from 8beace1 to 94b5d94 Compare October 30, 2024 17:06
Copy link
Member

@peternied peternied left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for getting this out there, I've got a couple of design going forward questions and some follow ups for what you've built here.

return (ObjectNode) OBJECT_MAPPER.readTree(doc);
}

public static String convertToBulkRequestBody(Collection<BulkDocSection> bulkSections) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to self - how does this relate to transformations

Comment on lines +249 to +251
@SuppressWarnings("unchecked")
var bulkRequestCaptor = ArgumentCaptor.forClass((Class<List<BulkDocSection>>)(Class<?>) List.class);
verify(mockClient, times(1)).sendBulkRequest(eq("test-index"), bulkRequestCaptor.capture(), any());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't do times() checks. If it's valuable enough for a unit test, it's probably even more valuable as a metric. Please use in-memory otel instrumentation checks instead. Take a look at DocumentMigrationTestContext.

Also, full-disclosure that I'm mockito illiterate. I still find the cognitive load extremely high to confirm completeness and relevance for tests that utilize it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What Andre has added functionality mirrors the existing test code, I would hesitate on having him re-write these tests.

While telemetry is another way to evaulate these test expectations, it isn't plumbed through and I don't think its critical path for this PR. What do you think @gregschohn?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Telemetry has value in a production system. Mocks have none. We have had a history of not adopting and leveraging our telemetry tools enough. Doing it for tests too is a win-win.

String expectedSource = "{\"content\":\"This doc will not be changed\nIt has multiple lines of text\nIts source doc has extra newlines.\"}";
String actualSource = doc.source;
assertDocsEqual(expectedId, actualId, expectedSource, actualSource);
assertDocsEqual(expectedId, actualId, expectedType, actualType, expectedSource, actualSource);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since Peter is looking at this PR too... it's code like this that makes me miss record types.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI - We've got benchmark test cases that might be useful here if we have concerns about performance in these smaller paths https://github.com/opensearch-project/opensearch-migrations/blob/main/RFS/src/test/java/org/opensearch/migrations/parsing/JacksonParserVsReadObjectBenchmark.java

Copy link
Member

@peternied peternied left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Execution failed for task ':transformation:jacocoTestReport'.
Cannot convert the provided notation to a File or URI: {dir=/home/runner/work/opensearch-migrations/opensearch-migrations/transformation/build/classes/java/main}.

Do you know what is going on with the gradle build?

@SuppressWarnings("unchecked")
private static Map<String, Object> parseSource(final String doc) {
try {
return OBJECT_MAPPER.readValue(doc, Map.class);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment as to why this won't ever be a list

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason that you aren't reading this directly into your strongly typed objects?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the string sourceDoc from the lucene Document that we're parsing here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment on why it's not a list

return asStringBuilder().toString();
@SuppressWarnings("unchecked")
public Map<String, Object> toMap() {
return (Map<String, Object>) OBJECT_MAPPER.convertValue(bulkIndex, Map.class);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the intention of this method - won't convertValue here just try to force a string into a map?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bulkIndex is the POJO, this will put convert it to a map according to the jsonproperties. This will not do any string->map conversions

return new BulkDocSection(bulkIndex);
}

public long getSerializedLength() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you have a bound on this from the values that came in to the c'tor.

Running an entire serialization only to get a size is really wasteful and will be a big impact for everything else running on the system.

Copy link
Member Author

@AndreKurait AndreKurait Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only called once on the output after transformations.
There, our object is constructed from a map, and not the string.

I'm not sure of a better way to identify the serialization size of a map than this.

As a future enhancement, we can cache the serialized string from this to use in the subsequent bulk serialization step, I played around with this didn't see a performance increase

Comment on lines +108 to +112
@JsonProperty("index")
private final Metadata metadata;
@ToString.Exclude
@JsonProperty("source")
private final Map<String, Object> sourceDoc;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should BulkIndex be a proxy or longer term a flyweight on the underlying map if you ever need to maintain a map version of the data (or flipped around, make this a Map<>). You shouldn't need to create copies of the fields eagerly.

Keeping sub-objects close to the usage site w/ a cheap c'tor will make it decently likely that objects stay in gen0 & can keep pressure off of the more expensive GCs, effectively, making them flyweights.

I'm more concerned about this not because of this example, but because this sets a precedent that will be copied elsewhere in code. If you want to just put a comment on this explaining alternates and tradeoffs, that works too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will follow up offline if we still need any action items based on this

@ToString
@EqualsAndHashCode
@JsonInclude(JsonInclude.Include.NON_NULL)
private static class BulkIndex {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this called BulkIndex? From the doc (https://opensearch.org/docs/latest/api-reference/document-apis/bulk/, which you should reference) it still isn't readily apparent what this means. I can't figure out why there are the json property fields, given what a bulk request requires. It looks like this is a json model object even though there aren't any consumers of a json object with these fields.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the format for transformations

}

@Test
void testFromMap() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this test or the complementary one?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I was working on the class, it increased my development speed to have this test that broke immediately if I broke something

Comment on lines +143 to +146
while (docBuilder.length() < targetSize) {
docBuilder.append("\"").append(key).append(i).append("\":\"").append(value).append(i).append("\",");
i++;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just FYI, take a look at GenerateRandomNestedJsonObject.makeRandomJsonObject for a way to make a random tree. I can't remember how easy it is to control size limits.

}

@Test
void testLargeSourceDoc() throws JsonProcessingException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are you trying to affirm w/ this test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The objectMapper is able to work with large docs

Signed-off-by: Andre Kurait <[email protected]>
@AndreKurait AndreKurait merged commit a176c9d into opensearch-project:main Nov 5, 2024
14 of 15 checks passed
@AndreKurait AndreKurait deleted the RFSDocumentTransformations branch November 5, 2024 17:08
@AndreKurait AndreKurait mentioned this pull request Nov 5, 2024
4 tasks
@AndreKurait AndreKurait changed the title Rfs document transformations RFS - Support Document Transformations Sep 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants