-
Notifications
You must be signed in to change notification settings - Fork 190
Expand file tree
/
Copy pathDefaultResultStreamProvider.java
More file actions
174 lines (152 loc) · 6.31 KB
/
DefaultResultStreamProvider.java
File metadata and controls
174 lines (152 loc) · 6.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package net.snowflake.client.jdbc;
import java.io.IOException;
import java.io.InputStream;
import java.io.PushbackInputStream;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import java.util.zip.GZIPInputStream;
import net.snowflake.client.core.ExecTimeTelemetryData;
import net.snowflake.client.core.HttpUtil;
import net.snowflake.client.core.SFBaseSession;
import net.snowflake.client.core.SFSession;
import net.snowflake.client.log.ArgSupplier;
import net.snowflake.client.log.SFLogger;
import net.snowflake.client.log.SFLoggerFactory;
import net.snowflake.client.util.SecretDetector;
import net.snowflake.common.core.SqlState;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
public class DefaultResultStreamProvider implements ResultStreamProvider {
private static final SFLogger logger =
SFLoggerFactory.getLogger(DefaultResultStreamProvider.class);
// SSE-C algorithm header
private static final String SSE_C_ALGORITHM = "x-amz-server-side-encryption-customer-algorithm";
// SSE-C customer key header
private static final String SSE_C_KEY = "x-amz-server-side-encryption-customer-key";
// SSE-C algorithm value
private static final String SSE_C_AES = "AES256";
private CompressedStreamFactory compressedStreamFactory;
public DefaultResultStreamProvider() {
this.compressedStreamFactory = new CompressedStreamFactory();
}
@Override
public InputStream getInputStream(ChunkDownloadContext context) throws Exception {
HttpResponse response;
try {
response = getResultChunk(context);
} catch (URISyntaxException | IOException ex) {
throw new SnowflakeSQLLoggedException(
context.getSession(),
ErrorCode.NETWORK_ERROR.getMessageCode(),
SqlState.IO_ERROR,
"Error encountered when request a result chunk URL: "
+ context.getResultChunk().getUrl()
+ " "
+ ex.getLocalizedMessage());
}
/*
* return error if we don't get a response or the response code
* means failure.
*/
if (response == null || response.getStatusLine().getStatusCode() != 200) {
logger.error("Error fetching chunk from: {}", context.getResultChunk().getScrubbedUrl());
SnowflakeUtil.logResponseDetails(response, logger);
throw new SnowflakeSQLException(
SqlState.IO_ERROR,
ErrorCode.NETWORK_ERROR.getMessageCode(),
"Error encountered when downloading a result chunk: HTTP "
+ "status: "
+ ((response != null) ? response.getStatusLine().getStatusCode() : "null response"));
}
InputStream inputStream;
final HttpEntity entity = response.getEntity();
Header encoding = response.getFirstHeader("Content-Encoding");
try {
// create stream based on compression type
inputStream =
compressedStreamFactory.createBasedOnEncodingHeader(entity.getContent(), encoding);
} catch (Exception ex) {
logger.error("Failed to decompress data: {}", response);
throw new SnowflakeSQLLoggedException(
context.getSession(),
ErrorCode.INTERNAL_ERROR.getMessageCode(),
SqlState.INTERNAL_ERROR,
"Failed to decompress data: " + response.toString());
}
// trace the response if requested
logger.debug("Json response: {}", response);
return inputStream;
}
private HttpResponse getResultChunk(ChunkDownloadContext context) throws Exception {
URIBuilder uriBuilder = new URIBuilder(context.getResultChunk().getUrl());
HttpGet httpRequest = new HttpGet(uriBuilder.build());
if (context.getChunkHeadersMap() != null && context.getChunkHeadersMap().size() != 0) {
for (Map.Entry<String, String> entry : context.getChunkHeadersMap().entrySet()) {
logger.debug("Adding header key: {}", entry.getKey());
httpRequest.addHeader(entry.getKey(), entry.getValue());
}
}
// Add SSE-C headers
else if (context.getQrmk() != null) {
httpRequest.addHeader(SSE_C_ALGORITHM, SSE_C_AES);
httpRequest.addHeader(SSE_C_KEY, context.getQrmk());
logger.debug("Adding SSE-C headers", false);
}
logger.debug(
"Thread {} Fetching result chunk#{}: {}",
Thread.currentThread().getId(),
context.getChunkIndex(),
context.getResultChunk().getScrubbedUrl());
SFBaseSession session = context.getSession();
List<HttpHeadersCustomizer> headersCustomizers = null;
if (session instanceof SFSession) {
headersCustomizers = ((SFSession) session).getHttpHeadersCustomizers();
}
CloseableHttpClient httpClient =
HttpUtil.getHttpClient(
context.getChunkDownloader().getHttpClientSettingsKey(), headersCustomizers);
// fetch the result chunk
HttpResponse response =
RestRequest.executeWithRetries(
httpClient,
httpRequest,
context.getNetworkTimeoutInMilli() / 1000, // retry timeout
0,
context.getSocketTimeout(),
0,
0, // no socket timeout injection
null, // no canceling
false, // no cookie
false, // no retry parameters in url
false, // no request_guid
true, // retry on HTTP403 for AWS S3
true, // no retry on http request
false,
new ExecTimeTelemetryData())
.getHttpResponse();
logger.debug(
"Thread {} Call chunk#{} returned for URL: {}, response: {}",
Thread.currentThread().getId(),
context.getChunkIndex(),
(ArgSupplier) () -> SecretDetector.maskSASToken(context.getResultChunk().getUrl()),
response);
return response;
}
public static InputStream detectGzipAndGetStream(InputStream is) throws IOException {
PushbackInputStream pb = new PushbackInputStream(is, 2);
byte[] signature = new byte[2];
int len = pb.read(signature);
pb.unread(signature, 0, len);
// https://tools.ietf.org/html/rfc1952
if (signature[0] == (byte) 0x1f && signature[1] == (byte) 0x8b) {
return new GZIPInputStream(pb);
} else {
return pb;
}
}
}