Skip to content

Commit b530128

Browse files
committed
HADOOP-19801. Optimize non-empty directory deletes with generic request headers.
1 parent caa518e commit b530128

8 files changed

Lines changed: 398 additions & 45 deletions

File tree

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1353,16 +1353,21 @@ private Constants() {
13531353
*/
13541354
public static final String CUSTOM_HEADERS_POSTFIX = ".custom.headers";
13551355

1356+
/** Custom per-request headers postfix.
1357+
* value: {@value}
1358+
*/
1359+
public static final String CUSTOM_PER_REQUEST_HEADERS_POSTFIX = ".request";
1360+
13561361
/**
13571362
* List of custom headers to be set on the service client.
13581363
* Multiple parameters can be used to specify custom headers.
13591364
* <pre>
13601365
* Usage:
1361-
* fs.s3a.client.s3.custom.headers - Headers to add on all the S3 requests.
1362-
* fs.s3a.client.sts.custom.headers - Headers to add on all the STS requests.
1366+
* fs.s3a.client.s3.custom.headers - Headers to add to all S3 requests.
1367+
* fs.s3a.client.sts.custom.headers - Headers to add to all STS requests.
13631368
*
13641369
* Examples:
1365-
* CustomHeader {@literal ->} 'Header1:Value1'
1370+
* CustomHeader {@literal ->} 'Header1=Value1'
13661371
* CustomHeaders {@literal ->} 'Header1=Value1;Value2,Header2=Value1'
13671372
* </pre>
13681373
*/
@@ -1374,6 +1379,34 @@ private Constants() {
13741379
FS_S3A_CLIENT_PREFIX + AWS_SERVICE_IDENTIFIER_S3.toLowerCase(Locale.ROOT)
13751380
+ CUSTOM_HEADERS_POSTFIX;
13761381

1382+
/**
1383+
* List of custom per-request-type headers to be set on the service client.
1384+
* Multiple parameters can be used to specify custom headers.
1385+
* <pre>
1386+
* Usage:
1387+
* fs.s3a.client.s3.custom.headers.request.REQUEST - Headers to add to all
1388+
* S3 REQUEST requests.
1389+
* fs.s3a.client.sts.custom.headers.request.REQUEST - Headers to add to all
1390+
* STS REQUEST requests.
1391+
*
1392+
* Note: REQUEST refers to the AWS request name.
1393+
*
1394+
* Examples:
1395+
* fs.s3a.client.s3.custom.headers.request.DeleteObjectRequest
1396+
* CustomHeader {@literal ->} 'Header1=Value1'
1397+
* CustomHeaders {@literal ->} 'Header1=Value1;Value2,Header2=Value1'
1398+
* </pre>
1399+
*/
1400+
public static final String CUSTOM_REQUEST_HEADERS_STS_PREFIX =
1401+
CUSTOM_HEADERS_STS + CUSTOM_PER_REQUEST_HEADERS_POSTFIX + ".";
1402+
1403+
/**
1404+
* Prefix for custom S3 request-type headers.
1405+
* Value: {@value}.
1406+
*/
1407+
public static final String CUSTOM_REQUEST_HEADERS_S3_PREFIX =
1408+
CUSTOM_HEADERS_S3 + CUSTOM_PER_REQUEST_HEADERS_POSTFIX + ".";
1409+
13771410
/**
13781411
* How long to wait for the thread pool to terminate when cleaning up.
13791412
* Value: {@value} seconds.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2625,7 +2625,8 @@ public void deleteObjectAtPath(final Path path,
26252625
throws IOException {
26262626
auditSpan.activate();
26272627
once("delete", path.toString(), () ->
2628-
S3AFileSystem.this.deleteObjectAtPath(path, key, isFile));
2628+
S3AFileSystem.this.deleteObjectAtPath(
2629+
path, key, isFile));
26292630
}
26302631

26312632
@Override
@@ -3232,7 +3233,10 @@ void deleteObjectAtPath(Path f,
32323233
} else {
32333234
instrumentation.directoryDeleted();
32343235
}
3235-
deleteObject(key);
3236+
incrementWriteOperations();
3237+
getStore().deleteObject(getRequestFactory()
3238+
.newDeleteObjectRequestBuilder(key)
3239+
.build());
32363240
}
32373241

32383242
/**

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSClientConfig.java

Lines changed: 58 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,12 @@
2323
import java.net.URISyntaxException;
2424
import java.time.Duration;
2525
import java.util.Arrays;
26+
import java.util.HashMap;
2627
import java.util.List;
2728
import java.util.Map;
2829
import java.util.concurrent.TimeUnit;
30+
import java.util.function.BiConsumer;
31+
import java.util.function.Consumer;
2932
import java.util.stream.Collectors;
3033

3134
import org.slf4j.Logger;
@@ -82,6 +85,8 @@
8285
import static org.apache.hadoop.fs.s3a.Constants.USER_AGENT_PREFIX;
8386
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_HEADERS_S3;
8487
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_HEADERS_STS;
88+
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_REQUEST_HEADERS_S3_PREFIX;
89+
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_REQUEST_HEADERS_STS_PREFIX;
8590
import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.enforceMinimumDuration;
8691
import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.getDuration;
8792
import static org.apache.hadoop.util.Preconditions.checkArgument;
@@ -429,35 +434,74 @@ private static void initSigner(Configuration conf,
429434
private static void initRequestHeaders(Configuration conf,
430435
ClientOverrideConfiguration.Builder clientConfig, String awsServiceIdentifier) {
431436
String configKey = null;
437+
String configKeyPrefix = null;
432438
switch (awsServiceIdentifier) {
433439
case AWS_SERVICE_IDENTIFIER_S3:
434440
configKey = CUSTOM_HEADERS_S3;
441+
configKeyPrefix = CUSTOM_REQUEST_HEADERS_S3_PREFIX;
435442
break;
436443
case AWS_SERVICE_IDENTIFIER_STS:
437444
configKey = CUSTOM_HEADERS_STS;
445+
configKeyPrefix = CUSTOM_REQUEST_HEADERS_STS_PREFIX;
438446
break;
439447
default:
440448
// No known service.
441449
}
442450
if (configKey != null) {
443-
Map<String, String> awsClientCustomHeadersMap =
444-
S3AUtils.getTrimmedStringCollectionSplitByEquals(conf, configKey);
445-
awsClientCustomHeadersMap.forEach((header, valueString) -> {
446-
List<String> headerValues = Arrays.stream(valueString.split(";"))
447-
.map(String::trim)
448-
.filter(v -> !v.isEmpty())
449-
.collect(Collectors.toList());
450-
if (!headerValues.isEmpty()) {
451-
clientConfig.putHeader(header, headerValues);
452-
} else {
453-
LOG.warn("Ignoring header '{}' for {} client because no values were provided",
454-
header, awsServiceIdentifier);
455-
}
456-
});
451+
applyHeaders(conf, configKey, clientConfig::putHeader,
452+
(header) -> LOG.warn(
453+
"Ignoring header '{}' for {} client because no values were provided",
454+
header, awsServiceIdentifier));
457455
LOG.debug("headers for {} client = {}", awsServiceIdentifier, clientConfig.headers());
456+
457+
String keyPrefix = configKeyPrefix;
458+
Map<String, Map<String, List<String>>> requestHeaders = new HashMap<>();
459+
Map<String, String> requestHeaderConfs = conf.getPropsWithPrefix(keyPrefix);
460+
requestHeaderConfs.keySet().forEach((request) ->
461+
applyHeaders(conf, keyPrefix + request,
462+
(header, headerValues) ->
463+
requestHeaders.computeIfAbsent(request, ignored -> new HashMap<>())
464+
.put(header, headerValues),
465+
(header) -> LOG.warn(
466+
"Ignoring {} request header '{}' for {} client because "
467+
+ "no values were provided",
468+
request, header, awsServiceIdentifier)));
469+
if (!requestHeaders.isEmpty()) {
470+
clientConfig.addExecutionInterceptor(
471+
new AddRequestHeaderInterceptor(requestHeaders));
472+
requestHeaders.forEach((request, headers) ->
473+
LOG.debug("{} request headers for {} client = {}",
474+
request, awsServiceIdentifier, headers));
475+
}
458476
}
459477
}
460478

479+
/**
480+
* Parse the header configuration at the given key.
481+
* @param conf hadoop configuration
482+
* @param configKey configuration key
483+
* @param apply callback for non-empty header values
484+
* @param ignore callback when no values are supplied
485+
*/
486+
private static void applyHeaders(Configuration conf,
487+
String configKey,
488+
BiConsumer<String, List<String>> apply,
489+
Consumer<String> ignore) {
490+
Map<String, String> awsClientCustomHeadersMap =
491+
S3AUtils.getTrimmedStringCollectionSplitByEquals(conf, configKey);
492+
awsClientCustomHeadersMap.forEach((header, valueString) -> {
493+
List<String> headerValues = Arrays.stream(valueString.split(";"))
494+
.map(String::trim)
495+
.filter(v -> !v.isEmpty())
496+
.collect(Collectors.toList());
497+
if (!headerValues.isEmpty()) {
498+
apply.accept(header, headerValues);
499+
} else {
500+
ignore.accept(header);
501+
}
502+
});
503+
}
504+
461505
/**
462506
* Configures request timeout in the client configuration.
463507
* This is independent of the timeouts set in the sync and async HTTP clients;
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.s3a.impl;
20+
21+
import java.util.HashMap;
22+
import java.util.List;
23+
import java.util.Locale;
24+
import java.util.Map;
25+
import java.util.function.Consumer;
26+
27+
import software.amazon.awssdk.awscore.AwsRequest;
28+
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
29+
import software.amazon.awssdk.core.SdkRequest;
30+
import software.amazon.awssdk.core.interceptor.Context;
31+
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
32+
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
33+
34+
/**
35+
* Add request-specific headers to AWS requests before execution.
36+
*/
37+
public class AddRequestHeaderInterceptor implements ExecutionInterceptor {
38+
39+
private final Map<String, Consumer<AwsRequestOverrideConfiguration.Builder>>
40+
appliers = new HashMap<>();
41+
42+
/**
43+
* Create an interceptor which adds headers for specific request types.
44+
* @param requestHeaders request-type to header mappings
45+
*/
46+
public AddRequestHeaderInterceptor(
47+
Map<String, Map<String, List<String>>> requestHeaders) {
48+
requestHeaders.forEach((request, headers) ->
49+
appliers.put(request.toLowerCase(Locale.ROOT),
50+
builder -> headers.forEach(builder::putHeader)));
51+
}
52+
53+
@Override
54+
public SdkRequest modifyRequest(
55+
Context.ModifyRequest context,
56+
ExecutionAttributes executionAttributes) {
57+
if (!(context.request() instanceof AwsRequest)) {
58+
return context.request();
59+
}
60+
61+
AwsRequest request = (AwsRequest) context.request();
62+
String requestName =
63+
request.getClass().getSimpleName().toLowerCase(Locale.ROOT);
64+
Consumer<AwsRequestOverrideConfiguration.Builder> applier =
65+
appliers.get(requestName);
66+
67+
if (applier == null) {
68+
return request;
69+
}
70+
71+
AwsRequestOverrideConfiguration overrideConfiguration =
72+
request.overrideConfiguration()
73+
.map(AwsRequestOverrideConfiguration::toBuilder)
74+
.orElseGet(AwsRequestOverrideConfiguration::builder)
75+
.applyMutation(applier)
76+
.build();
77+
return request.toBuilder()
78+
.overrideConfiguration(overrideConfiguration)
79+
.build();
80+
}
81+
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,8 @@ public Boolean execute() throws IOException {
254254
LOG.debug("deleting empty directory {}", path);
255255
deleteObjectAtPath(path, key, false);
256256
} else if (deleteNonEmptyDirectoryEnabled) {
257-
LOG.debug("deleting non-empty directory {} with single request (endpoint supports it)", path);
257+
LOG.debug("deleting non-empty directory {} with single request "
258+
+ "(endpoint supports it)", path);
258259
deleteObjectAtPath(path, key, false);
259260
} else {
260261
deleteDirectoryTree(path, key);
@@ -363,7 +364,8 @@ private void queueForDeletion(
363364
*/
364365
private void queueForDeletion(final String key,
365366
boolean isDirMarker) throws IOException {
366-
LOG.debug("Adding object to delete: \"{}\"", key);
367+
LOG.debug("Adding {} to delete: \"{}\"",
368+
isDirMarker ? "dir marker" : "file", key);
367369
keys.add(new DeleteEntry(key, isDirMarker));
368370
if (keys.size() == pageSize) {
369371
submitNextBatch();

hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -579,7 +579,9 @@ Here are some the S3A properties for use in production.
579579
<description>When true, recursive deletion of a non-empty directory uses a single delete
580580
request for the directory key (prefix) instead of listing and deleting contained
581581
objects first. Only enable this for S3-compatible endpoints that support
582-
deleting non-empty directories (path prefix) in one request (e.g. VAST).
582+
deleting non-empty directories (path prefix) in one request. If an endpoint
583+
requires additional request headers for that delete request, configure them
584+
with request-type specific S3 client headers.
583585
</description>
584586
</property>
585587

@@ -961,10 +963,15 @@ The switch to turn S3A auditing on or off.
961963

962964
### Configuring Custom Headers for AWS Service Clients
963965

964-
You can set custom headers for S3 and STS requests. These headers are set on client level, and will be sent for all requests made to these services.
966+
You can set custom headers for S3 and STS requests. Headers can be set on
967+
client level and request-type level. Client-level headers are sent for all
968+
requests made through the client. Request-type headers are sent only for
969+
requests of the named type.
970+
971+
#### Client Level Headers
965972

966973
**Configuration Properties:**
967-
- `fs.s3a.client.s3.custom.headers`: Custom headers for S3 service requests.
974+
- `fs.s3a.client.s3.custom.headers`: Sets custom headers for S3 service requests.
968975
- `fs.s3a.client.sts.custom.headers`: Sets custom headers for all requests to AWS STS.
969976

970977
**Header Format:**
@@ -983,6 +990,33 @@ Custom headers should be specified as key-value pairs, separated by `=`. Multipl
983990
</property>
984991
```
985992

993+
#### Request-type Level Headers
994+
995+
**Configuration Properties:**
996+
- `fs.s3a.client.s3.custom.headers.request.REQUEST`: Sets custom headers for
997+
S3 requests of type `REQUEST`.
998+
- `fs.s3a.client.sts.custom.headers.request.REQUEST`: Sets custom headers for
999+
STS requests of type `REQUEST`.
1000+
1001+
Note: `REQUEST` refers to the AWS S3 or STS request name and is case-insensitive.
1002+
1003+
**Header Format:**
1004+
Custom headers should be specified as key-value pairs, separated by `=`.
1005+
Multiple values for a single header can be separated by `;`. Multiple headers
1006+
can be separated by `,`.
1007+
1008+
```xml
1009+
<property>
1010+
<name>fs.s3a.client.s3.custom.headers.request.DeleteObjectRequest</name>
1011+
<value>x-amz-delete-contents=true</value>
1012+
</property>
1013+
1014+
<property>
1015+
<name>fs.s3a.client.s3.custom.headers.request.ListObjectsV2Request</name>
1016+
<value>Header1=Value1;Value2,Header2=Value1</value>
1017+
</property>
1018+
```
1019+
9861020
## <a name="retry_and_recovery"></a>Retry and Recovery
9871021

9881022
The S3A client makes a best-effort attempt at recovering from network failures;
@@ -1731,10 +1765,14 @@ to the number of files (objects) in the directory.
17311765

17321766
When `fs.s3a.delete.non-empty-directory.enabled=true`, only one delete request is sent for
17331767
the directory (prefix). The S3 endpoint has to support this feature. Depending on the
1734-
S3 endpoint implementation of this feature, deletes might be synchronous or asynchronous.
1768+
S3 endpoint implementation of this feature, deletes might be synchronous or
1769+
asynchronous. If the endpoint requires additional headers on that delete
1770+
request, configure them with
1771+
`fs.s3a.client.s3.custom.headers.request.DeleteObjectRequest` so they are
1772+
scoped to that request type instead of being sent on every S3 call.
17351773

17361774
The [VAST S3 endpoint](https://kb.vastdata.com/documentation/docs/using-trash-folder-for-s3-objects-6)
1737-
supports such deletes.
1775+
supports such deletes and may require request-specific headers.
17381776

17391777
## <a name="metrics"></a>Metrics
17401778

0 commit comments

Comments
 (0)