Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,22 @@
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final ObjectMapper BULK_INDEX_REQUEST_MAPPER = OBJECT_MAPPER.copy()
.registerModule(new SimpleModule()
.addSerializer(BulkIndex.class, new BulkIndex.BulkIndexRequestSerializer()));
.addSerializer(BulkIndexOperation.class, new BulkIndexOperation.BulkIndexRequestSerializer()));
private static final String NEWLINE = "\n";

@EqualsAndHashCode.Include
@Getter
private final String docId;
private final BulkIndex bulkIndex;
private final BulkIndexOperation bulkIndexOperation;

public BulkDocSection(String id, String indexName, String type, String docBody) {
this.docId = id;
this.bulkIndex = new BulkIndex(new BulkIndex.Metadata(id, type, indexName), parseSource(docBody));
this.bulkIndexOperation = new BulkIndexOperation(new BulkIndexOperation.Metadata(id, type, indexName), parseSource(docBody));
}

private BulkDocSection(BulkIndex bulkIndex) {
this.docId = bulkIndex.metadata.id;
this.bulkIndex = bulkIndex;
private BulkDocSection(BulkIndexOperation bulkIndexOperation) {
this.docId = bulkIndexOperation.metadata.id;
this.bulkIndexOperation = bulkIndexOperation;
}

@SuppressWarnings("unchecked")
Expand All @@ -62,7 +62,7 @@
// Using a single SegmentedStringWriter across all object serializations
try (SegmentedStringWriter writer = new SegmentedStringWriter(new BufferRecycler())) {
for (BulkDocSection section : bulkSections) {
BULK_INDEX_REQUEST_MAPPER.writeValue(writer, section.bulkIndex);
BULK_INDEX_REQUEST_MAPPER.writeValue(writer, section.bulkIndexOperation);
writer.append(NEWLINE);
}
return writer.getAndClear();
Expand All @@ -72,41 +72,42 @@
}

public static BulkDocSection fromMap(Map<String, Object> map) {
BulkIndex bulkIndex = OBJECT_MAPPER.convertValue(map, BulkIndex.class);
return new BulkDocSection(bulkIndex);
BulkIndexOperation bulkIndexOperation = OBJECT_MAPPER.convertValue(map, BulkIndexOperation.class);
return new BulkDocSection(bulkIndexOperation);
}

public long getSerializedLength() {
// Using a separate counting path to limit to a single string allocation per BulkDocSection collection
try (var stream = new CountingNullOutputStream()) {
BULK_INDEX_REQUEST_MAPPER.writeValue(stream, this.bulkIndex);
BULK_INDEX_REQUEST_MAPPER.writeValue(stream, this.bulkIndexOperation);
return stream.length;
} catch (IOException e) {
log.atError().setMessage("Failed to get bulk index length").setCause(e).log();
throw new SerializationException("Failed to get bulk index length " + this.bulkIndex +
throw new SerializationException("Failed to get bulk index length " + this.bulkIndexOperation +

Check warning on line 86 in RFS/src/main/java/org/opensearch/migrations/bulkload/common/BulkDocSection.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/org/opensearch/migrations/bulkload/common/BulkDocSection.java#L86

Added line #L86 was not covered by tests
" from string: " + e.getMessage(), e);
}
}

public String asBulkIndexString() {
try {
return BULK_INDEX_REQUEST_MAPPER.writeValueAsString(this.bulkIndex);
return BULK_INDEX_REQUEST_MAPPER.writeValueAsString(this.bulkIndexOperation);
} catch (IOException e) {
throw new SerializationException("Failed to write bulk index " + this.bulkIndex
throw new SerializationException("Failed to write bulk index " + this.bulkIndexOperation

Check warning on line 95 in RFS/src/main/java/org/opensearch/migrations/bulkload/common/BulkDocSection.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/org/opensearch/migrations/bulkload/common/BulkDocSection.java#L95

Added line #L95 was not covered by tests
+ " from string: " + e.getMessage(), e);
}
}

@SuppressWarnings("unchecked")
public Map<String, Object> toMap() {
return (Map<String, Object>) OBJECT_MAPPER.convertValue(bulkIndex, Map.class);
return (Map<String, Object>) OBJECT_MAPPER.convertValue(bulkIndexOperation, Map.class);
}

@NoArgsConstructor(force = true) // For Jackson
@AllArgsConstructor
@ToString
@EqualsAndHashCode
@JsonInclude(JsonInclude.Include.NON_NULL)
private static class BulkIndex {
private static class BulkIndexOperation {
@JsonProperty("index")
private final Metadata metadata;
@ToString.Exclude
Expand All @@ -119,17 +120,17 @@
@JsonInclude(JsonInclude.Include.NON_NULL)
private static class Metadata {
@JsonProperty("_id")
private final String id;
private String id;
@JsonProperty("_type")
private final String type;
@JsonProperty("_index")
private final String index;
}

public static class BulkIndexRequestSerializer extends JsonSerializer<BulkIndex> {
public static class BulkIndexRequestSerializer extends JsonSerializer<BulkIndexOperation> {
public static final String BULK_INDEX_COMMAND = "index";
@Override
public void serialize(BulkIndex value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
public void serialize(BulkIndexOperation value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
gen.setRootValueSeparator(new SerializedString(NEWLINE));
gen.writeStartObject();
gen.writePOJOField(BULK_INDEX_COMMAND, value.metadata);
Expand Down
Loading