Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Harden the circuit breaker and failure handle logic in query result consumer ([#19396](https://github.com/opensearch-project/OpenSearch/pull/19396))
- Add streaming cardinality aggregator ([#19484](https://github.com/opensearch-project/OpenSearch/pull/19484))
- Disable request cache for streaming aggregation queries ([#19520](https://github.com/opensearch-project/OpenSearch/pull/19520))
- Return full error for GRPC error response ([#19568](https://github.com/opensearch-project/OpenSearch/pull/19568))
- Implement IndexDocument, UpdateDocument, GetDocument, DeleteDocument GRPC APIs ([#19590](https://github.com/opensearch-project/OpenSearch/pull/19590))

### Changed
- Refactor `if-else` chains to use `Java 17 pattern matching switch expressions`(([#18965](https://github.com/opensearch-project/OpenSearch/pull/18965))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.transport.grpc.listeners;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.core.action.ActionListener;
import org.opensearch.protobufs.DeleteDocumentResponse;
import org.opensearch.transport.grpc.proto.response.document.DeleteDocumentResponseProtoUtils;
import org.opensearch.transport.grpc.util.GrpcErrorHandler;

import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

/**
* Listener for delete document request execution completion, handling successful and failure scenarios.
*/
public class DeleteDocumentActionListener implements ActionListener<DeleteResponse> {
private static final Logger logger = LogManager.getLogger(DeleteDocumentActionListener.class);

private final StreamObserver<DeleteDocumentResponse> responseObserver;

/**
* Constructs a new DeleteDocumentActionListener.
*
* @param responseObserver the gRPC stream observer to send the delete response to
*/
public DeleteDocumentActionListener(StreamObserver<DeleteDocumentResponse> responseObserver) {
this.responseObserver = responseObserver;
}

@Override
public void onResponse(DeleteResponse response) {
try {
DeleteDocumentResponse protoResponse = DeleteDocumentResponseProtoUtils.toProto(response);
responseObserver.onNext(protoResponse);
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("Failed to convert delete response to protobuf: " + e.getMessage());
StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e);
responseObserver.onError(grpcError);
}
}

@Override
public void onFailure(Exception e) {
logger.warn("DeleteDocumentActionListener failed: {} - {}", e.getClass().getSimpleName(), e.getMessage());
StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e);
responseObserver.onError(grpcError);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.transport.grpc.listeners;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.get.GetResponse;
import org.opensearch.core.action.ActionListener;
import org.opensearch.protobufs.GetDocumentResponse;
import org.opensearch.transport.grpc.proto.response.document.GetDocumentResponseProtoUtils;
import org.opensearch.transport.grpc.util.GrpcErrorHandler;

import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

/**
* Listener for get document request execution completion, handling successful and failure scenarios.
*/
public class GetDocumentActionListener implements ActionListener<GetResponse> {
private static final Logger logger = LogManager.getLogger(GetDocumentActionListener.class);

private final StreamObserver<GetDocumentResponse> responseObserver;

/**
* Constructs a new GetDocumentActionListener.
*
* @param responseObserver the gRPC stream observer to send the get response to
*/
public GetDocumentActionListener(StreamObserver<GetDocumentResponse> responseObserver) {
this.responseObserver = responseObserver;
}

@Override
public void onResponse(GetResponse response) {
try {
GetDocumentResponse protoResponse = GetDocumentResponseProtoUtils.toProto(response);
responseObserver.onNext(protoResponse);
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("Failed to convert get response to protobuf: " + e.getMessage());
StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e);
responseObserver.onError(grpcError);
}
}

@Override
public void onFailure(Exception e) {
logger.warn("GetDocumentActionListener failed: {} - {}", e.getClass().getSimpleName(), e.getMessage());
StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e);
responseObserver.onError(grpcError);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.transport.grpc.listeners;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.core.action.ActionListener;
import org.opensearch.protobufs.IndexDocumentResponse;
import org.opensearch.transport.grpc.proto.response.document.IndexDocumentResponseProtoUtils;
import org.opensearch.transport.grpc.util.GrpcErrorHandler;

import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

/**
* Listener for index document request execution completion, handling successful and failure scenarios.
*/
public class IndexDocumentActionListener implements ActionListener<IndexResponse> {
private static final Logger logger = LogManager.getLogger(IndexDocumentActionListener.class);

private final StreamObserver<IndexDocumentResponse> responseObserver;

/**
* Constructs a new IndexDocumentActionListener.
*
* @param responseObserver the gRPC stream observer to send the index response to
*/
public IndexDocumentActionListener(StreamObserver<IndexDocumentResponse> responseObserver) {
this.responseObserver = responseObserver;
}

@Override
public void onResponse(IndexResponse response) {
try {
IndexDocumentResponse protoResponse = IndexDocumentResponseProtoUtils.toProto(response);
responseObserver.onNext(protoResponse);
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("Failed to convert index response to protobuf: " + e.getMessage());
StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e);
responseObserver.onError(grpcError);
}
}

@Override
public void onFailure(Exception e) {
logger.warn("IndexDocumentActionListener failed: {} - {}", e.getClass().getSimpleName(), e.getMessage());
StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e);
responseObserver.onError(grpcError);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void onResponse(SearchResponse response) {

@Override
public void onFailure(Exception e) {
logger.error("SearchRequestActionListener failed to process search request: " + e.getMessage());
logger.debug("SearchRequestActionListener failed to process search request: " + e.getMessage());
StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e);
responseObserver.onError(grpcError);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.transport.grpc.listeners;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.core.action.ActionListener;
import org.opensearch.protobufs.UpdateDocumentResponse;
import org.opensearch.transport.grpc.proto.response.document.UpdateDocumentResponseProtoUtils;
import org.opensearch.transport.grpc.util.GrpcErrorHandler;

import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

/**
* Listener for update document request execution completion, handling successful and failure scenarios.
*/
public class UpdateDocumentActionListener implements ActionListener<UpdateResponse> {
private static final Logger logger = LogManager.getLogger(UpdateDocumentActionListener.class);

private final StreamObserver<UpdateDocumentResponse> responseObserver;

/**
* Constructs a new UpdateDocumentActionListener.
*
* @param responseObserver the gRPC stream observer to send the update response to
*/
public UpdateDocumentActionListener(StreamObserver<UpdateDocumentResponse> responseObserver) {
this.responseObserver = responseObserver;
}

@Override
public void onResponse(UpdateResponse response) {
try {
UpdateDocumentResponse protoResponse = UpdateDocumentResponseProtoUtils.toProto(response);
responseObserver.onNext(protoResponse);
responseObserver.onCompleted();
} catch (Exception e) {
logger.error("Failed to convert update response to protobuf: " + e.getMessage());
StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e);
responseObserver.onError(grpcError);
}
}

@Override
public void onFailure(Exception e) {
logger.warn("UpdateDocumentActionListener failed: {} - {}", e.getClass().getSimpleName(), e.getMessage());
StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e);
responseObserver.onError(grpcError);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.transport.grpc.proto.request.document;

import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.protobufs.DeleteDocumentRequest;

/**
* Utility class for converting protobuf DeleteDocumentRequest to OpenSearch DeleteRequest.
* <p>
* This class provides functionality similar to the REST layer's delete document request processing.
* The parameter mapping and processing logic should be kept consistent with the corresponding
* REST implementation to ensure feature parity between gRPC and HTTP APIs.
*
* @see org.opensearch.rest.action.document.RestDeleteAction#prepareRequest(RestRequest, NodeClient) REST equivalent for parameter processing
* @see org.opensearch.action.delete.DeleteRequest OpenSearch internal delete request representation
* @see org.opensearch.protobufs.DeleteDocumentRequest Protobuf definition for gRPC delete requests
*/
public class DeleteDocumentRequestProtoUtils {

private DeleteDocumentRequestProtoUtils() {
// Utility class, no instances
}

/**
* Converts a protobuf DeleteDocumentRequest to an OpenSearch DeleteRequest.
* <p>
* This method processes delete document request parameters similar to how
* {@link org.opensearch.rest.action.document.RestDeleteAction#prepareRequest(RestRequest, NodeClient)}
* processes REST requests. Parameter mapping includes index name, document ID, routing,
* timeout, refresh policy, versioning, and wait for active shards.
*
* @param protoRequest The protobuf DeleteDocumentRequest to convert
* @return The corresponding OpenSearch DeleteRequest
* @throws IllegalArgumentException if required fields are missing or invalid
* @see org.opensearch.rest.action.document.RestDeleteAction#prepareRequest(RestRequest, NodeClient) REST equivalent
*/
public static DeleteRequest fromProto(DeleteDocumentRequest protoRequest) {
if (protoRequest.getIndex().isEmpty()) {
throw new IllegalArgumentException("Index name is required");
}

if (protoRequest.getId().isEmpty()) {
throw new IllegalArgumentException("Document ID is required");
}

DeleteRequest deleteRequest = new DeleteRequest(protoRequest.getIndex(), protoRequest.getId());

// Set routing if provided
if (protoRequest.hasRouting() && !protoRequest.getRouting().isEmpty()) {
deleteRequest.routing(protoRequest.getRouting());
}

// Set refresh policy if provided
if (protoRequest.hasRefresh()) {
deleteRequest.setRefreshPolicy(convertRefresh(protoRequest.getRefresh()));
}

// Set version constraints if provided
if (protoRequest.hasIfSeqNo()) {
deleteRequest.setIfSeqNo(protoRequest.getIfSeqNo());
}

if (protoRequest.hasIfPrimaryTerm()) {
deleteRequest.setIfPrimaryTerm(protoRequest.getIfPrimaryTerm());
}

// Set timeout if provided
if (protoRequest.hasTimeout() && !protoRequest.getTimeout().isEmpty()) {
deleteRequest.timeout(protoRequest.getTimeout());
}

return deleteRequest;
}

/**
* Convert protobuf Refresh to WriteRequest.RefreshPolicy.
*/
private static WriteRequest.RefreshPolicy convertRefresh(org.opensearch.protobufs.Refresh refresh) {
switch (refresh) {
case REFRESH_TRUE:
return WriteRequest.RefreshPolicy.IMMEDIATE;
case REFRESH_WAIT_FOR:
return WriteRequest.RefreshPolicy.WAIT_UNTIL;
case REFRESH_FALSE:
default:
return WriteRequest.RefreshPolicy.NONE;
}
}
}
Loading
Loading