Skip to content

Commit ca23df0

Browse files
committed
Add AOSS connection
Signed-off-by: Manasvini B S <[email protected]>
1 parent 83a9bc6 commit ca23df0

File tree

6 files changed

+62
-33
lines changed

6 files changed

+62
-33
lines changed

src/main/java/org/opensearch/jdbc/ConnectionImpl.java

+14-12
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
package org.opensearch.jdbc;
88

9+
import org.opensearch.jdbc.auth.AuthenticationType;
910
import org.opensearch.jdbc.config.ConnectionConfig;
1011
import org.opensearch.jdbc.internal.JdbcWrapper;
1112
import org.opensearch.jdbc.internal.Version;
@@ -83,21 +84,22 @@ log, new SQLNonTransientException("Could not initialize transport for the connec
8384

8485
log.debug(() -> logMessage("Initialized Transport: %s, Protocol: %s", transport, protocol));
8586

86-
try {
87-
ConnectionResponse connectionResponse = this.protocol.connect(connectionConfig.getLoginTimeout() * 1000);
88-
this.clusterMetadata = connectionResponse.getClusterMetadata();
89-
this.open = true;
90-
} catch (HttpException ex) {
91-
if (ex.getStatusCode() == 401) {
92-
logAndThrowSQLException(log, new SQLException("Connection error " + ex.getMessage(),
93-
INCORRECT_CREDENTIALS_SQLSTATE, ex));
94-
} else {
87+
if (connectionConfig.getAuthenticationType() != AuthenticationType.AWS_SIGV4_SERVERLESS) {
88+
try {
89+
ConnectionResponse connectionResponse = this.protocol.connect(connectionConfig.getLoginTimeout() * 1000);
90+
this.clusterMetadata = connectionResponse.getClusterMetadata();
91+
this.open = true;
92+
} catch (HttpException ex) {
93+
if (ex.getStatusCode() == 401) {
94+
logAndThrowSQLException(log, new SQLException("Connection error " + ex.getMessage(),
95+
INCORRECT_CREDENTIALS_SQLSTATE, ex));
96+
} else {
97+
logAndThrowSQLException(log, new SQLException("Connection error " + ex.getMessage(), ex));
98+
}
99+
} catch (ResponseException | IOException ex) {
95100
logAndThrowSQLException(log, new SQLException("Connection error " + ex.getMessage(), ex));
96101
}
97-
} catch (ResponseException | IOException ex) {
98-
logAndThrowSQLException(log, new SQLException("Connection error " + ex.getMessage(), ex));
99102
}
100-
101103
}
102104

103105
public String getUser() {

src/main/java/org/opensearch/jdbc/auth/AuthenticationType.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,10 @@ public enum AuthenticationType {
2525
/**
2626
* AWS Signature V4
2727
*/
28-
AWS_SIGV4;
28+
AWS_SIGV4,
29+
30+
/**
31+
* AWS Signature V4 for AOSS Serverless collection
32+
*/
33+
AWS_SIGV4_SERVERLESS;
2934
}

src/main/java/org/opensearch/jdbc/config/ConnectionConfig.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,7 @@ private void validateConfig() throws ConnectionPropertyException {
521521
throw new ConnectionPropertyException(authConnectionProperty.getKey(),
522522
"Basic authentication requires a valid username but none was provided.");
523523

524-
} else if (authenticationType == AuthenticationType.AWS_SIGV4 &&
524+
} else if ((authenticationType == AuthenticationType.AWS_SIGV4 || authenticationType == AuthenticationType.AWS_SIGV4_SERVERLESS) &&
525525
regionConnectionProperty.getValue() == null) {
526526

527527
// aws sdk auto-detection does not work for AWS ES endpoints

src/main/java/org/opensearch/jdbc/protocol/http/JsonHttpProtocol.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,9 @@ public JsonHttpResponseHandler getJsonHttpResponseHandler() {
6767
@Override
6868
public ConnectionResponse connect(int timeout) throws ResponseException, IOException {
6969
try (CloseableHttpResponse response = transport.doGet(
70-
"/",
71-
defaultEmptyRequestBodyJsonHeaders,
72-
null, timeout)) {
70+
"/",
71+
defaultEmptyRequestBodyJsonHeaders,
72+
null, timeout)) {
7373

7474
return jsonHttpResponseHandler.handleResponse(response, this::processConnectionResponse);
7575

@@ -79,10 +79,10 @@ public ConnectionResponse connect(int timeout) throws ResponseException, IOExcep
7979
@Override
8080
public QueryResponse execute(QueryRequest request) throws ResponseException, IOException {
8181
try (CloseableHttpResponse response = transport.doPost(
82-
sqlContextPath,
83-
defaultJsonHeaders,
84-
defaultJdbcParams,
85-
buildQueryRequestBody(request), 0)) {
82+
sqlContextPath,
83+
defaultJsonHeaders,
84+
defaultJdbcParams,
85+
buildQueryRequestBody(request), 0)) {
8686

8787
return jsonHttpResponseHandler.handleResponse(response, this::processQueryResponse);
8888

src/main/java/org/opensearch/jdbc/transport/http/ApacheHttpTransport.java

+16-2
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@
66

77
package org.opensearch.jdbc.transport.http;
88

9+
import com.amazonaws.auth.AWS4Signer;
910
import org.opensearch.jdbc.auth.AuthenticationType;
1011
import org.opensearch.jdbc.config.ConnectionConfig;
1112
import org.opensearch.jdbc.logging.Logger;
1213
import org.opensearch.jdbc.logging.LoggingSource;
1314
import org.opensearch.jdbc.transport.TransportException;
1415
import org.opensearch.jdbc.transport.http.auth.aws.AWSRequestSigningApacheInterceptor;
15-
import com.amazonaws.auth.AWS4Signer;
16+
import com.amazonaws.auth.AWS4UnsignedPayloadSigner;
1617
import com.amazonaws.auth.AWSCredentialsProvider;
1718
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
1819
import org.apache.http.Header;
@@ -121,8 +122,21 @@ public ApacheHttpTransport(ConnectionConfig connectionConfig, Logger log, String
121122
signer,
122123
provider,
123124
connectionConfig.tunnelHost()));
124-
}
125+
} else if (connectionConfig.getAuthenticationType() == AuthenticationType.AWS_SIGV4_SERVERLESS) {
126+
AWS4UnsignedPayloadSigner signer = new AWS4UnsignedPayloadSigner();
127+
signer.setServiceName("aoss");
128+
signer.setRegionName(connectionConfig.getRegion());
129+
130+
AWSCredentialsProvider provider = connectionConfig.getAwsCredentialsProvider() != null ?
131+
connectionConfig.getAwsCredentialsProvider() : new DefaultAWSCredentialsProviderChain();
125132

133+
httpClientBuilder.addInterceptorLast(
134+
new AWSRequestSigningApacheInterceptor(
135+
"aoss",
136+
signer,
137+
provider,
138+
null));
139+
}
126140
// TODO - can apply settings retry & backoff
127141
this.httpClient = httpClientBuilder.build();
128142
}

src/main/java/org/opensearch/jdbc/transport/http/auth/aws/AWSRequestSigningApacheInterceptor.java

+18-10
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
package org.opensearch.jdbc.transport.http.auth.aws;
88

99
import com.amazonaws.DefaultRequest;
10-
import com.amazonaws.auth.AWSCredentials;
1110
import com.amazonaws.auth.AWSCredentialsProvider;
1211
import com.amazonaws.auth.Signer;
1312
import com.amazonaws.http.HttpMethodName;
@@ -23,13 +22,10 @@
2322
import org.apache.http.message.BasicHeader;
2423
import org.apache.http.protocol.HttpContext;
2524

26-
import java.io.IOException;
25+
import java.io.*;
2726
import java.net.URI;
2827
import java.net.URISyntaxException;
29-
import java.util.ArrayList;
30-
import java.util.List;
31-
import java.util.Map;
32-
import java.util.TreeMap;
28+
import java.util.*;
3329

3430
import static org.apache.http.protocol.HttpCoreContext.HTTP_TARGET_HOST;
3531

@@ -113,18 +109,31 @@ public void process(final HttpRequest request, final HttpContext context)
113109
if (request instanceof HttpEntityEnclosingRequest) {
114110
HttpEntityEnclosingRequest httpEntityEnclosingRequest =
115111
(HttpEntityEnclosingRequest) request;
116-
if (httpEntityEnclosingRequest.getEntity() != null) {
112+
113+
if (httpEntityEnclosingRequest.getEntity() == null) {
114+
signableRequest.setContent(new ByteArrayInputStream(new byte[0]));
115+
} else {
117116
signableRequest.setContent(httpEntityEnclosingRequest.getEntity().getContent());
118117
}
119118
}
119+
120120
signableRequest.setParameters(nvpToMapParams(uriBuilder.getQueryParams()));
121-
signableRequest.setHeaders(headerArrayToMap(request.getAllHeaders()));
121+
122+
Map<String, String> cleanedHeadersBeforeSign = headerArrayToMap(request.getAllHeaders());
123+
signableRequest.setHeaders(cleanedHeadersBeforeSign);
122124

123125
// Sign it
124126
signer.sign(signableRequest, awsCredentialsProvider.getCredentials());
125127

126128
// Now copy everything back
129+
Header[] headers = request.getHeaders("content-length");
127130
request.setHeaders(mapToHeaderArray(signableRequest.getHeaders()));
131+
if (headers != null) {
132+
Arrays.stream(headers)
133+
.filter(h -> !"0".equals(h.getValue()))
134+
.forEach(h -> request.addHeader(h));
135+
}
136+
128137
if (request instanceof HttpEntityEnclosingRequest) {
129138
HttpEntityEnclosingRequest httpEntityEnclosingRequest =
130139
(HttpEntityEnclosingRequest) request;
@@ -172,8 +181,7 @@ private static Map<String, String> headerArrayToMap(final Header[] headers) {
172181
*/
173182
private static boolean skipHeader(final Header header) {
174183
return ("content-length".equalsIgnoreCase(header.getName())
175-
&& "0".equals(header.getValue())) // Strip Content-Length: 0
176-
|| "host".equalsIgnoreCase(header.getName()); // Host comes from endpoint
184+
|| "host".equalsIgnoreCase(header.getName())); // Host comes from endpoint
177185
}
178186

179187
/**

0 commit comments

Comments
 (0)