Skip to content

Commit 6d20de1

Browse files
HADOOP-19233: ABFS: [FnsOverBlob] Implementing Rename and Delete APIs over Blob Endpoint (#7265)
Contributed by Manish Bhatt. Signed off by Anuj Modi, Anmol Asrani
1 parent 741bdd6 commit 6d20de1

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+5035
-349
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

+67-12
Original file line numberDiff line numberDiff line change
@@ -21,25 +21,24 @@
2121
import java.io.IOException;
2222
import java.lang.reflect.Field;
2323

24-
import org.apache.hadoop.classification.VisibleForTesting;
25-
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
26-
import org.apache.hadoop.fs.azurebfs.services.FixedSASTokenProvider;
27-
import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType;
28-
import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
29-
import org.apache.hadoop.util.Preconditions;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
3026

3127
import org.apache.commons.lang3.StringUtils;
3228
import org.apache.hadoop.classification.InterfaceAudience;
3329
import org.apache.hadoop.classification.InterfaceStability;
30+
import org.apache.hadoop.classification.VisibleForTesting;
3431
import org.apache.hadoop.conf.Configuration;
3532
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
33+
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
3634
import org.apache.hadoop.fs.azurebfs.constants.AuthConfigurations;
35+
import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType;
36+
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.Base64StringConfigurationValidatorAnnotation;
37+
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.BooleanConfigurationValidatorAnnotation;
3738
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerConfigurationValidatorAnnotation;
3839
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerWithOutlierConfigurationValidatorAnnotation;
3940
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.LongConfigurationValidatorAnnotation;
4041
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.StringConfigurationValidatorAnnotation;
41-
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.Base64StringConfigurationValidatorAnnotation;
42-
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.BooleanConfigurationValidatorAnnotation;
4342
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
4443
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException;
4544
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
@@ -65,16 +64,16 @@
6564
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
6665
import org.apache.hadoop.fs.azurebfs.services.AuthType;
6766
import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
67+
import org.apache.hadoop.fs.azurebfs.services.FixedSASTokenProvider;
6868
import org.apache.hadoop.fs.azurebfs.services.KeyProvider;
6969
import org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider;
70+
import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
7071
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
71-
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
7272
import org.apache.hadoop.security.ProviderUtils;
73+
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
74+
import org.apache.hadoop.util.Preconditions;
7375
import org.apache.hadoop.util.ReflectionUtils;
7476

75-
import org.slf4j.Logger;
76-
import org.slf4j.LoggerFactory;
77-
7877
import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
7978
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
8079
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*;
@@ -399,6 +398,34 @@ public class AbfsConfiguration{
399398
FS_AZURE_ENABLE_PAGINATED_DELETE, DefaultValue = DEFAULT_ENABLE_PAGINATED_DELETE)
400399
private boolean isPaginatedDeleteEnabled;
401400

401+
@LongConfigurationValidatorAnnotation(ConfigurationKey =
402+
FS_AZURE_BLOB_COPY_PROGRESS_WAIT_MILLIS, DefaultValue = DEFAULT_AZURE_BLOB_COPY_PROGRESS_WAIT_MILLIS)
403+
private long blobCopyProgressPollWaitMillis;
404+
405+
@LongConfigurationValidatorAnnotation(ConfigurationKey =
406+
FS_AZURE_BLOB_COPY_MAX_WAIT_MILLIS, DefaultValue = DEFAULT_AZURE_BLOB_COPY_MAX_WAIT_MILLIS)
407+
private long blobCopyProgressMaxWaitMillis;
408+
409+
@LongConfigurationValidatorAnnotation(ConfigurationKey =
410+
FS_AZURE_BLOB_ATOMIC_RENAME_LEASE_REFRESH_DURATION, DefaultValue = DEFAULT_AZURE_BLOB_ATOMIC_RENAME_LEASE_REFRESH_DURATION)
411+
private long blobAtomicRenameLeaseRefreshDuration;
412+
413+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
414+
FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, DefaultValue = DEFAULT_FS_AZURE_PRODUCER_QUEUE_MAX_SIZE)
415+
private int producerQueueMaxSize;
416+
417+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
418+
FS_AZURE_CONSUMER_MAX_LAG, DefaultValue = DEFAULT_FS_AZURE_CONSUMER_MAX_LAG)
419+
private int listingMaxConsumptionLag;
420+
421+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
422+
FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, DefaultValue = DEFAULT_FS_AZURE_BLOB_RENAME_THREAD)
423+
private int blobRenameDirConsumptionParallelism;
424+
425+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
426+
FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD, DefaultValue = DEFAULT_FS_AZURE_BLOB_DELETE_THREAD)
427+
private int blobDeleteDirConsumptionParallelism;
428+
402429
@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
403430
FS_AZURE_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES, DefaultValue = DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES)
404431
private int maxApacheHttpClientIoExceptionsRetries;
@@ -1522,4 +1549,32 @@ public boolean getIsChecksumValidationEnabled() {
15221549
public void setIsChecksumValidationEnabled(boolean isChecksumValidationEnabled) {
15231550
this.isChecksumValidationEnabled = isChecksumValidationEnabled;
15241551
}
1552+
1553+
public long getBlobCopyProgressPollWaitMillis() {
1554+
return blobCopyProgressPollWaitMillis;
1555+
}
1556+
1557+
public long getBlobCopyProgressMaxWaitMillis() {
1558+
return blobCopyProgressMaxWaitMillis;
1559+
}
1560+
1561+
public long getAtomicRenameLeaseRefreshDuration() {
1562+
return blobAtomicRenameLeaseRefreshDuration;
1563+
}
1564+
1565+
public int getProducerQueueMaxSize() {
1566+
return producerQueueMaxSize;
1567+
}
1568+
1569+
public int getListingMaxConsumptionLag() {
1570+
return listingMaxConsumptionLag;
1571+
}
1572+
1573+
public int getBlobRenameDirConsumptionParallelism() {
1574+
return blobRenameDirConsumptionParallelism;
1575+
}
1576+
1577+
public int getBlobDeleteDirConsumptionParallelism() {
1578+
return blobDeleteDirConsumptionParallelism;
1579+
}
15251580
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

+70-58
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@
2828
import java.net.URISyntaxException;
2929
import java.nio.file.AccessDeniedException;
3030
import java.time.Duration;
31-
import java.util.Hashtable;
32-
import java.util.List;
3331
import java.util.ArrayList;
3432
import java.util.EnumSet;
33+
import java.util.Hashtable;
34+
import java.util.List;
3535
import java.util.Map;
3636
import java.util.Optional;
3737
import java.util.UUID;
@@ -43,23 +43,15 @@
4343
import java.util.concurrent.Future;
4444
import javax.annotation.Nullable;
4545

46-
import org.apache.commons.lang3.StringUtils;
47-
import org.apache.hadoop.classification.VisibleForTesting;
48-
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
49-
import org.apache.hadoop.fs.azurebfs.services.AuthType;
50-
import org.apache.hadoop.fs.impl.BackReference;
51-
import org.apache.hadoop.security.ProviderUtils;
52-
import org.apache.hadoop.util.Preconditions;
5346
import org.slf4j.Logger;
5447
import org.slf4j.LoggerFactory;
48+
5549
import org.apache.commons.lang3.ArrayUtils;
50+
import org.apache.commons.lang3.StringUtils;
5651
import org.apache.commons.lang3.tuple.Pair;
5752
import org.apache.hadoop.classification.InterfaceAudience;
58-
import org.apache.hadoop.fs.azurebfs.commit.ResilientCommitByRename;
59-
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
60-
import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator;
61-
import org.apache.hadoop.fs.RemoteIterator;
6253
import org.apache.hadoop.classification.InterfaceStability;
54+
import org.apache.hadoop.classification.VisibleForTesting;
6355
import org.apache.hadoop.conf.Configuration;
6456
import org.apache.hadoop.fs.BlockLocation;
6557
import org.apache.hadoop.fs.CommonPathCapabilities;
@@ -71,27 +63,34 @@
7163
import org.apache.hadoop.fs.FileSystem;
7264
import org.apache.hadoop.fs.LocatedFileStatus;
7365
import org.apache.hadoop.fs.Path;
74-
import org.apache.hadoop.fs.PathIOException;
7566
import org.apache.hadoop.fs.PathFilter;
67+
import org.apache.hadoop.fs.PathIOException;
68+
import org.apache.hadoop.fs.RemoteIterator;
7669
import org.apache.hadoop.fs.XAttrSetFlag;
70+
import org.apache.hadoop.fs.azurebfs.commit.ResilientCommitByRename;
7771
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
72+
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
7873
import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations;
7974
import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
80-
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
8175
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
8276
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
8377
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnhandledException;
78+
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
8479
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriAuthorityException;
8580
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
8681
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException;
8782
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
8883
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
84+
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
8985
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
86+
import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator;
9087
import org.apache.hadoop.fs.azurebfs.services.AbfsLocatedFileStatus;
88+
import org.apache.hadoop.fs.azurebfs.services.AuthType;
9189
import org.apache.hadoop.fs.azurebfs.utils.Listener;
9290
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
9391
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
9492
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
93+
import org.apache.hadoop.fs.impl.BackReference;
9594
import org.apache.hadoop.fs.impl.OpenFileParameters;
9695
import org.apache.hadoop.fs.permission.AclEntry;
9796
import org.apache.hadoop.fs.permission.AclStatus;
@@ -102,14 +101,16 @@
102101
import org.apache.hadoop.fs.store.DataBlocks;
103102
import org.apache.hadoop.io.IOUtils;
104103
import org.apache.hadoop.security.AccessControlException;
105-
import org.apache.hadoop.security.token.Token;
104+
import org.apache.hadoop.security.ProviderUtils;
106105
import org.apache.hadoop.security.UserGroupInformation;
107-
import org.apache.hadoop.util.RateLimiting;
108-
import org.apache.hadoop.util.RateLimitingFactory;
109-
import org.apache.hadoop.util.functional.RemoteIterators;
106+
import org.apache.hadoop.security.token.Token;
110107
import org.apache.hadoop.util.DurationInfo;
111108
import org.apache.hadoop.util.LambdaUtils;
109+
import org.apache.hadoop.util.Preconditions;
112110
import org.apache.hadoop.util.Progressable;
111+
import org.apache.hadoop.util.RateLimiting;
112+
import org.apache.hadoop.util.RateLimitingFactory;
113+
import org.apache.hadoop.util.functional.RemoteIterators;
113114

114115
import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
115116
import static java.net.HttpURLConnection.HTTP_CONFLICT;
@@ -431,8 +432,12 @@ public FSDataOutputStream create(final Path f,
431432

432433
@Override
433434
@SuppressWarnings("deprecation")
434-
public FSDataOutputStream createNonRecursive(final Path f, final FsPermission permission,
435-
final boolean overwrite, final int bufferSize, final short replication, final long blockSize,
435+
public FSDataOutputStream createNonRecursive(final Path f,
436+
final FsPermission permission,
437+
final boolean overwrite,
438+
final int bufferSize,
439+
final short replication,
440+
final long blockSize,
436441
final Progressable progress) throws IOException {
437442

438443
statIncrement(CALL_CREATE_NON_RECURSIVE);
@@ -442,18 +447,21 @@ public FSDataOutputStream createNonRecursive(final Path f, final FsPermission pe
442447
ERR_CREATE_ON_ROOT,
443448
null);
444449
}
445-
final Path parent = f.getParent();
446-
TracingContext tracingContext = new TracingContext(clientCorrelationId,
447-
fileSystemId, FSOperationType.CREATE_NON_RECURSIVE, tracingHeaderFormat,
448-
listener);
449-
final FileStatus parentFileStatus = tryGetFileStatus(parent, tracingContext);
450-
451-
if (parentFileStatus == null) {
452-
throw new FileNotFoundException("Cannot create file "
453-
+ f.getName() + " because parent folder does not exist.");
450+
Path qualifiedPath = makeQualified(f);
451+
try {
452+
TracingContext tracingContext = new TracingContext(clientCorrelationId,
453+
fileSystemId, FSOperationType.CREATE_NON_RECURSIVE, tracingHeaderFormat,
454+
listener);
455+
OutputStream outputStream = getAbfsStore().createNonRecursive(qualifiedPath, statistics,
456+
overwrite,
457+
permission == null ? FsPermission.getFileDefault() : permission,
458+
FsPermission.getUMask(getConf()), tracingContext);
459+
statIncrement(FILES_CREATED);
460+
return new FSDataOutputStream(outputStream, statistics);
461+
} catch (AzureBlobFileSystemException ex) {
462+
checkException(f, ex);
463+
return null;
454464
}
455-
456-
return create(f, permission, overwrite, bufferSize, replication, blockSize, progress);
457465
}
458466

459467
@Override
@@ -480,7 +488,10 @@ public FSDataOutputStream createNonRecursive(final Path f,
480488
@Override
481489
@SuppressWarnings("deprecation")
482490
public FSDataOutputStream createNonRecursive(final Path f,
483-
final boolean overwrite, final int bufferSize, final short replication, final long blockSize,
491+
final boolean overwrite,
492+
final int bufferSize,
493+
final short replication,
494+
final long blockSize,
484495
final Progressable progress) throws IOException {
485496
return this.createNonRecursive(f, FsPermission.getFileDefault(),
486497
overwrite, bufferSize, replication, blockSize, progress);
@@ -530,45 +541,41 @@ public boolean rename(final Path src, final Path dst) throws IOException {
530541
return tryGetFileStatus(qualifiedSrcPath, tracingContext) != null;
531542
}
532543

533-
FileStatus dstFileStatus = null;
544+
FileStatus dstFileStatus = tryGetFileStatus(qualifiedDstPath, tracingContext);
545+
Path adjustedDst = dst;
534546
if (qualifiedSrcPath.equals(qualifiedDstPath)) {
535547
// rename to itself
536548
// - if it doesn't exist, return false
537549
// - if it is file, return true
538550
// - if it is dir, return false.
539-
dstFileStatus = tryGetFileStatus(qualifiedDstPath, tracingContext);
540551
if (dstFileStatus == null) {
541552
return false;
542553
}
543-
return dstFileStatus.isDirectory() ? false : true;
554+
return !dstFileStatus.isDirectory();
544555
}
545-
546-
// Non-HNS account need to check dst status on driver side.
547-
if (!getIsNamespaceEnabled(tracingContext) && dstFileStatus == null) {
548-
dstFileStatus = tryGetFileStatus(qualifiedDstPath, tracingContext);
556+
// adjust the destination path in case of FNS account.
557+
if (!getIsNamespaceEnabled(tracingContext) && dstFileStatus != null) {
558+
// return false if the destination is a file.
559+
if (!dstFileStatus.isDirectory()) {
560+
return false;
561+
}
562+
String sourceFileName = src.getName();
563+
adjustedDst = new Path(dst, sourceFileName);
549564
}
550565

551566
try {
552-
String sourceFileName = src.getName();
553-
Path adjustedDst = dst;
554-
555-
if (dstFileStatus != null) {
556-
if (!dstFileStatus.isDirectory()) {
557-
return qualifiedSrcPath.equals(qualifiedDstPath);
558-
}
559-
adjustedDst = new Path(dst, sourceFileName);
560-
}
561-
562567
qualifiedDstPath = makeQualified(adjustedDst);
563-
564-
abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext, null);
568+
getAbfsStore().rename(qualifiedSrcPath, qualifiedDstPath, tracingContext,
569+
null);
565570
return true;
566571
} catch (AzureBlobFileSystemException ex) {
567572
LOG.debug("Rename operation failed. ", ex);
568573
checkException(
569574
src,
570575
ex,
571576
AzureServiceErrorCode.PATH_ALREADY_EXISTS,
577+
AzureServiceErrorCode.BLOB_ALREADY_EXISTS,
578+
AzureServiceErrorCode.BLOB_PATH_NOT_FOUND,
572579
AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH,
573580
AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND,
574581
AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE,
@@ -641,7 +648,7 @@ public Pair<Boolean, Duration> commitSingleFileByRename(
641648
final Duration waitTime = rateLimiting.acquire(1);
642649

643650
try {
644-
final boolean recovered = abfsStore.rename(qualifiedSrcPath,
651+
final boolean recovered = getAbfsStore().rename(qualifiedSrcPath,
645652
qualifiedDstPath, tracingContext, sourceEtag);
646653
return Pair.of(recovered, waitTime);
647654
} catch (AzureBlobFileSystemException ex) {
@@ -655,9 +662,11 @@ public Pair<Boolean, Duration> commitSingleFileByRename(
655662
}
656663

657664
@Override
658-
public boolean delete(final Path f, final boolean recursive) throws IOException {
665+
public boolean delete(final Path f, final boolean recursive)
666+
throws IOException {
659667
LOG.debug(
660-
"AzureBlobFileSystem.delete path: {} recursive: {}", f.toString(), recursive);
668+
"AzureBlobFileSystem.delete path: {} recursive: {}", f.toString(),
669+
recursive);
661670
statIncrement(CALL_DELETE);
662671
Path qualifiedPath = makeQualified(f);
663672

@@ -673,10 +682,13 @@ public boolean delete(final Path f, final boolean recursive) throws IOException
673682
TracingContext tracingContext = new TracingContext(clientCorrelationId,
674683
fileSystemId, FSOperationType.DELETE, tracingHeaderFormat,
675684
listener);
676-
abfsStore.delete(qualifiedPath, recursive, tracingContext);
685+
getAbfsStore().delete(qualifiedPath, recursive, tracingContext);
677686
return true;
678687
} catch (AzureBlobFileSystemException ex) {
679-
checkException(f, ex, AzureServiceErrorCode.PATH_NOT_FOUND);
688+
checkException(f,
689+
ex,
690+
AzureServiceErrorCode.PATH_NOT_FOUND,
691+
AzureServiceErrorCode.BLOB_PATH_NOT_FOUND);
680692
return false;
681693
}
682694

@@ -693,7 +705,7 @@ public FileStatus[] listStatus(final Path f) throws IOException {
693705
TracingContext tracingContext = new TracingContext(clientCorrelationId,
694706
fileSystemId, FSOperationType.LISTSTATUS, true, tracingHeaderFormat,
695707
listener);
696-
FileStatus[] result = abfsStore.listStatus(qualifiedPath, tracingContext);
708+
FileStatus[] result = getAbfsStore().listStatus(qualifiedPath, tracingContext);
697709
return result;
698710
} catch (AzureBlobFileSystemException ex) {
699711
checkException(f, ex);

0 commit comments

Comments
 (0)