From 881353edd69002a23bcd2b3bc5ab9c484458ef98 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 3 Mar 2025 12:04:01 +0000 Subject: [PATCH] HADOOP-19478. S3A: pull out new configuration load/probes under S3AStore New service under S3AStoreImpl: StoreConfigurationService This is just the draft design; it is intended to be a place to move most of our configuration options: flags, durations etc, ideally lifting some of the work from org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations, * putting it into common * adding duration and size attributes * move reflection code from AbfsConfiguration#AbfsConfiguration into there too, and apply Change-Id: I99c8305574492c05170274dcc363bfba4857981b --- .../org/apache/hadoop/fs/s3a/Constants.java | 25 ++++ .../apache/hadoop/fs/s3a/S3AFileSystem.java | 68 +++++---- .../org/apache/hadoop/fs/s3a/S3AStore.java | 9 ++ .../hadoop/fs/s3a/impl/S3AStoreBuilder.java | 17 ++- .../hadoop/fs/s3a/impl/S3AStoreImpl.java | 49 +++++-- .../fs/s3a/impl/store/StoreConfiguration.java | 56 ++++++++ .../impl/store/StoreConfigurationFlags.java | 121 ++++++++++++++++ .../impl/store/StoreConfigurationService.java | 130 ++++++++++++++++++ .../fs/s3a/impl/store/package-info.java | 22 +++ .../hadoop/fs/s3a/MockS3AFileSystem.java | 5 +- 10 files changed, 466 insertions(+), 36 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/store/StoreConfiguration.java create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/store/StoreConfigurationFlags.java create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/store/StoreConfigurationService.java create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/store/package-info.java diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 2b019e1fe4caa..1e5096fbdd0ac 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1522,6 +1522,31 @@ private Constants() { */ public static final String FS_S3A_PERFORMANCE_FLAGS = "fs.s3a.performance.flags"; + + + /** + * Is the create overwrite feature enabled or not? + * A configuration option and a path status probe. + * Value {@value}. + */ + public static final String FS_S3A_CONDITIONAL_CREATE_ENABLED = "fs.s3a.conditional.create.enabled"; + + /** + * If conditional create is available, should it be used in + * createFile() operations to check for file existence? + * If set, this disables probes for directories. + * Value {@value}. + */ + public static final String FS_S3A_CONDITIONAL_CREATE_FILES = "fs.s3a.conditional.create.files"; + + /** + * createFile() boolean option toreate a multipart file, always: {@value}. + *

+ * This is inefficient and will not work on a store which doesn't support that feature, + * so is primarily for testing. + */ + public static final String FS_S3A_CREATE_MULTIPART = "fs.s3a.create.multipart"; + /** * Prefix for adding a header to the object when created. * The actual value must have a "." suffix and then the actual header. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 2e4475063dfd8..32071f0dbb858 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -147,6 +147,8 @@ import org.apache.hadoop.fs.s3a.impl.StoreContextFactory; import org.apache.hadoop.fs.s3a.impl.UploadContentProviders; import org.apache.hadoop.fs.s3a.impl.CSEUtils; +import org.apache.hadoop.fs.s3a.impl.store.StoreConfiguration; +import org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationService; import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType; import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters; import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamCallbacks; @@ -258,6 +260,9 @@ import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.logDnsLookup; import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.STORE_CAPABILITY_S3_EXPRESS_STORAGE; import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.isS3ExpressStore; +import static org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationFlags.ConditionalCreateAvailable; +import static org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationFlags.ConditionalCreateForFiles; +import static org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationFlags.DowngradeSyncableExceptions; import static org.apache.hadoop.fs.s3a.impl.streams.StreamFactoryRequirements.Requirements.ExpectUnauditedGetRequests; import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.checkNoS3Guard; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel; @@ -611,6 +616,9 @@ public void initialize(URI name, Configuration originalConf) setUri(name, delegationTokensEnabled); super.initialize(uri, conf); setConf(conf); + // init store configuration service. + StoreConfigurationService storeConfiguration = new StoreConfigurationService(); + storeConfiguration.init(conf); // initialize statistics, after which statistics // can be collected. @@ -794,7 +802,9 @@ public void initialize(URI name, Configuration originalConf) int rateLimitCapacity = intOption(conf, S3A_IO_RATE_LIMIT, DEFAULT_S3A_IO_RATE_LIMIT, 0); // now create and initialize the store - store = createS3AStore(clientManager, rateLimitCapacity); + store = createS3AStore(clientManager, + rateLimitCapacity, + storeConfiguration); // the s3 client is created through the store, rather than // directly through the client manager. // this is to aid mocking. @@ -864,11 +874,14 @@ private S3AFileSystemOperations createFileSystemHandler() { * This is protected so that tests can override it. * @param clientManager client manager * @param rateLimitCapacity rate limit + * @param storeConfiguration the store configuration. * @return a new store instance */ @VisibleForTesting protected S3AStore createS3AStore(final ClientManager clientManager, - final int rateLimitCapacity) { + final int rateLimitCapacity, + final StoreConfigurationService storeConfiguration) { + final S3AStore st = new S3AStoreBuilder() .withAuditSpanSource(getAuditManager()) .withClientManager(clientManager) @@ -876,12 +889,13 @@ protected S3AStore createS3AStore(final ClientManager clientManager, .withFsStatistics(getFsStatistics()) .withInstrumentation(getInstrumentation()) .withStatisticsContext(statisticsContext) + .withStoreConfigurationService(storeConfiguration) .withStoreContextFactory(this) .withStorageStatistics(getStorageStatistics()) .withReadRateLimiter(unlimitedRate()) .withWriteRateLimiter(RateLimitingFactory.create(rateLimitCapacity)) .build(); - st.init(getConf()); + st.init(storeConfiguration.getConfig()); st.start(); return st; } @@ -2123,28 +2137,28 @@ private FSDataOutputStream innerCreateFile( final S3ABlockOutputStream.BlockOutputStreamBuilder builder = S3ABlockOutputStream.builder() - .withKey(destKey) - .withBlockFactory(blockFactory) - .withBlockSize(partSize) - .withStatistics(outputStreamStatistics) - .withProgress(progress) - .withPutTracker(putTracker) - .withWriteOperations( - createWriteOperationHelper(auditSpan)) - .withExecutorService( - new SemaphoredDelegatingExecutor( - boundedThreadPool, - blockOutputActiveBlocks, - true, - outputStreamStatistics)) - .withDowngradeSyncableExceptions( + .withKey(destKey) + .withBlockFactory(blockFactory) + .withBlockSize(partSize) + .withStatistics(outputStreamStatistics) + .withProgress(progress) + .withPutTracker(putTracker) + .withWriteOperations( + createWriteOperationHelper(auditSpan)) + .withExecutorService( + new SemaphoredDelegatingExecutor( + boundedThreadPool, + blockOutputActiveBlocks, + true, + outputStreamStatistics)) + .withDowngradeSyncableExceptions( getConf().getBoolean( DOWNGRADE_SYNCABLE_EXCEPTIONS, DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT)) - .withCSEEnabled(isCSEEnabled) - .withPutOptions(putOptions) - .withIOStatisticsAggregator( - IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator()) + .withCSEEnabled(isCSEEnabled) + .withPutOptions(putOptions) + .withIOStatisticsAggregator( + IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator()) .withMultipartEnabled(isMultipartUploadEnabled); return new FSDataOutputStream( new S3ABlockOutputStream(builder), @@ -5299,6 +5313,7 @@ public CommitterStatistics newCommitterStatistics() { public boolean hasPathCapability(final Path path, final String capability) throws IOException { final Path p = makeQualified(path); + final S3AStore store = getStore(); String cap = validatePathCapabilityArgs(p, capability); switch (cap) { @@ -5365,6 +5380,11 @@ public boolean hasPathCapability(final Path path, final String capability) case FS_S3A_CREATE_HEADER: return true; + case FS_OPTION_CREATE_CONDITIONAL_OVERWRITE: + case FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG: + // conditional create requires it to be enabled in the FS. + return store.getStoreConfiguration().isFlagSet(ConditionalCreateAvailable); + // is the FS configured for create file performance case FS_S3A_CREATE_PERFORMANCE_ENABLED: return performanceFlags.enabled(PerformanceFlagEnum.Create); @@ -5388,8 +5408,8 @@ public boolean hasPathCapability(final Path path, final String capability) } // ask the store for what capabilities it offers - // this may include input and output capabilites -and more - if (getStore() != null && getStore().hasPathCapability(path, capability)) { + // this includes, store configuration flags, IO capabilites...etc. + if (store.hasPathCapability(path, capability)) { return true; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java index 95019807b383b..83bab82f43541 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java @@ -54,6 +54,7 @@ import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteException; import org.apache.hadoop.fs.s3a.impl.S3AFileSystemOperations; import org.apache.hadoop.fs.s3a.impl.StoreContext; +import org.apache.hadoop.fs.s3a.impl.store.StoreConfiguration; import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamFactory; import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; import org.apache.hadoop.fs.statistics.DurationTrackerFactory; @@ -366,4 +367,12 @@ default boolean hasCapability(String capability) { /* =============== END ObjectInputStreamFactory =============== */ + + + /** + * Get the store configuration. + * @return the store configuration. + */ + StoreConfiguration getStoreConfiguration(); + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreBuilder.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreBuilder.java index a7565fe046e3e..2aaeac8d8410a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreBuilder.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreBuilder.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.s3a.S3AStorageStatistics; import org.apache.hadoop.fs.s3a.S3AStore; import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A; +import org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationService; import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; import org.apache.hadoop.fs.statistics.DurationTrackerFactory; import org.apache.hadoop.fs.store.audit.AuditSpanSource; @@ -51,6 +52,8 @@ public class S3AStoreBuilder { private AuditSpanSource auditSpanSource; + private StoreConfigurationService storeConfigurationService; + /** * The original file system statistics: fairly minimal but broadly * collected so it is important to pick up. @@ -117,6 +120,17 @@ public S3AStoreBuilder withFsStatistics(final FileSystem.Statistics value) { return this; } + /** + * Set the store configuration service. + * @param value new value + * @return the builder + */ + public S3AStoreBuilder withStoreConfigurationService( + final StoreConfigurationService value) { + storeConfigurationService = value; + return this; + } + public S3AStore build() { return new S3AStoreImpl(storeContextFactory, clientManager, @@ -127,6 +141,7 @@ public S3AStore build() { readRateLimiter, writeRateLimiter, auditSpanSource, - fsStatistics); + fsStatistics, + storeConfigurationService); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java index 1a7868dd044df..fb2d198e3bf00 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java @@ -73,12 +73,14 @@ import org.apache.hadoop.fs.s3a.UploadInfo; import org.apache.hadoop.fs.s3a.api.RequestFactory; import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A; +import org.apache.hadoop.fs.s3a.impl.store.StoreConfiguration; import org.apache.hadoop.fs.s3a.impl.streams.FactoryBindingParameters; import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType; import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream; import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamFactory; import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters; import org.apache.hadoop.fs.s3a.impl.streams.StreamFactoryRequirements; +import org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationService; import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; import org.apache.hadoop.fs.statistics.DurationTracker; import org.apache.hadoop.fs.statistics.DurationTrackerFactory; @@ -193,10 +195,15 @@ public class S3AStoreImpl */ private ObjectInputStreamFactory objectInputStreamFactory; + /** + * Store Configuration. + */ + private final StoreConfigurationService storeConfiguration; + /** * Constructor to create S3A store. * Package private, as {@link S3AStoreBuilder} creates them. - * */ + */ S3AStoreImpl(StoreContextFactory storeContextFactory, ClientManager clientManager, DurationTrackerFactory durationTrackerFactory, @@ -206,7 +213,8 @@ public class S3AStoreImpl RateLimiting readRateLimiter, RateLimiting writeRateLimiter, AuditSpanSource auditSpanSource, - @Nullable FileSystem.Statistics fsStatistics) { + @Nullable FileSystem.Statistics fsStatistics, + StoreConfigurationService storeConfiguration) { super("S3AStore"); this.auditSpanSource = requireNonNull(auditSpanSource); this.clientManager = requireNonNull(clientManager); @@ -223,7 +231,9 @@ public class S3AStoreImpl this.invoker = requireNonNull(storeContext.getInvoker()); this.bucket = requireNonNull(storeContext.getBucket()); this.requestFactory = requireNonNull(storeContext.getRequestFactory()); + this.storeConfiguration = requireNonNull(storeConfiguration); addService(clientManager); + addService(storeConfiguration); } /** @@ -253,20 +263,26 @@ protected void serviceStart() throws Exception { /** * Return the store path capabilities. - * If the object stream factory is non-null, hands off the - * query to that factory if not handled here. + * This may hand off the probe to assistant classes/services. * @param path path to query the capability of. * @param capability non-null, non-empty string to query the path for support. - * @return known capabilities + * @return true if the capability is known and enabled. */ @Override public boolean hasPathCapability(final Path path, final String capability) { - switch (toLowerCase(capability)) { - case StreamCapabilities.IOSTATISTICS: + + // only support this once started; avoids worrying about + // state of services which assist in this calculation. + if (!isInState(STATE.STARTED)) { + return false; + } + final String cap = toLowerCase(capability); + if (cap.equals(StreamCapabilities.IOSTATISTICS)) { return true; - default: - return inputStreamHasCapability(capability); } + // probe store configuration and the input stream for + // the capability. + return storeConfiguration.hasPathCapability(path, cap)|| inputStreamHasCapability(cap); } /** @@ -1001,4 +1017,19 @@ public S3AsyncClient getOrCreateAsyncClient(final boolean requireCRT) throws IOE /* =============== END ObjectInputStreamFactory =============== */ + + + /* + =============== BEGIN StoreConfigurationService =============== + */ + + @Override + public StoreConfiguration getStoreConfiguration() { + return storeConfiguration; + } + + /* + =============== END StoreConfigurationService =============== + */ + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/store/StoreConfiguration.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/store/StoreConfiguration.java new file mode 100644 index 0000000000000..29b718a54ddcb --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/store/StoreConfiguration.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl.store; + +import java.util.EnumSet; + +import org.apache.hadoop.fs.PathCapabilities; +import org.apache.hadoop.fs.StreamCapabilities; + +public interface StoreConfiguration extends PathCapabilities { + + /** + * Is a configuration flag set? + * @param flag flag to probe for. + * @return true iff the flag is set + */ + boolean isFlagSet(StoreConfigurationFlags flag); + + /** + * Get a clone of the flags. + * @return a copy of the flags. + */ + EnumSet getStoreFlags(); + + /** + * Set a flag. + * This is NOT thread safe. + * @param flag flag to set + * @return true if the flag enumset changed state. + */ + boolean setFlag(StoreConfigurationFlags flag); + + /** + * Clear a flag. + * This is NOT thread safe. + * @param flag flag to clear + * @return true if the flag enumset changed state. + */ + boolean clearFlag(StoreConfigurationFlags flag); +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/store/StoreConfigurationFlags.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/store/StoreConfigurationFlags.java new file mode 100644 index 0000000000000..b2a701362cbcb --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/store/StoreConfigurationFlags.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl.store; + +import org.apache.hadoop.conf.Configuration; + +import static org.apache.hadoop.fs.s3a.Constants.DOWNGRADE_SYNCABLE_EXCEPTIONS; +import static org.apache.hadoop.fs.s3a.Constants.DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_CREATE_FILES; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_CREATE_ENABLED; + +/** + * Store configuration flags. + */ +public enum StoreConfigurationFlags { + + /* When adding new flags, insert in alphabetical order */ + + /** + * Is Conditional Create available? + */ + ConditionalCreateAvailable(FS_S3A_CONDITIONAL_CREATE_ENABLED, + true), + + /** + * Should Conditional Create be used + * as the file overwrite check? + */ + ConditionalCreateForFiles(FS_S3A_CONDITIONAL_CREATE_FILES, + false), + + /** + * Downgrade exception raising on syncable API use when writing a file. + */ + DowngradeSyncableExceptions( + DOWNGRADE_SYNCABLE_EXCEPTIONS, + DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT); + /** + * Key name; read from the configuration, and + * for the capability probe unless the arity 3 + * constructor is used. + */ + private final String key; + + /** + * Capability to probe for in {@link #hasCapability(String)}. + */ + private final String capability; + + /** + * Default value when reading from the configuration. + */ + private final boolean defaultValue; + + StoreConfigurationFlags(String key, boolean defaultValue) { + this(key, "", defaultValue); + } + + StoreConfigurationFlags(String key, + String capability, + boolean defaultValue) { + this.key = key; + this.capability = capability; + this.defaultValue = defaultValue; + } + + public String getKey() { + return key; + } + + public String getCapability() { + return capability; + } + + /** + * Read from the the configuration, falling + * back to the default value. + * @param conf configuration. + * @return the evaluated value. + */ + public boolean evaluate(Configuration conf) { + return conf.getBoolean(key, defaultValue); + } + + /** + * Does this enum's key match the supplied key. + * @param k key to probe for + * @return true if there is a match. + */ + public boolean keyMatches(String k) { + return key.equals(k); + } + + /** + * Does this enum's capability match the supplied key? + * @param k key to probe for + * @return true if there is a match. + */ + public boolean hasCapability(String k) { + return !capability.isEmpty() && + capability.equals(k); + } + + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/store/StoreConfigurationService.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/store/StoreConfigurationService.java new file mode 100644 index 0000000000000..2e846da9fd600 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/store/StoreConfigurationService.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl.store; + +import java.util.Arrays; +import java.util.EnumSet; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.store.LogExactlyOnce; +import org.apache.hadoop.service.AbstractService; + +import static org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationFlags.*; + +/** + * A service which handles store configurations. + * New configuration options should be added here. + *

+ * The goal is to pull configuration flags and variables + * out of S3AFileSystem but not reimplement the + * same structure in S3AStore. + * Instead, configuration flags, numbers etc can + * be managed here. + * Maybe in future reflection could be used to + * build up the config, as done in ABFS. + *

+ * Usage. + *

    + *
  1. Instantiate.
  2. + *
  3. Call {@link #init(Configuration)} to trigger config reading + *
  4. Read loaded options.
  5. + *
+ * The start and close operations are (currently) no-ops. + */ +public class StoreConfigurationService extends AbstractService + implements StoreConfiguration { + + private static final Logger LOG = LoggerFactory.getLogger(StoreConfigurationService.class); + + private static final LogExactlyOnce LOG_CREATE_DOWNGRADE = new LogExactlyOnce(LOG); + + /** Store configuration flags. */ + private final EnumSet storeFlags = + EnumSet.noneOf(StoreConfigurationFlags.class);; + + + public StoreConfigurationService(final String name) { + super(name); + } + + public StoreConfigurationService() { + this("StoreConfigurationService"); + } + + /** + * Initialize the service by reading in configuration settings. + * @param conf configuration + * @throws Exception parser failures. + */ + @Override + protected void serviceInit(final Configuration conf) throws Exception { + super.serviceInit(conf); + // build up the store flag enumset. + storeFlags.clear(); + Arrays.stream(StoreConfigurationFlags.values()) + .filter(v -> v.evaluate(conf)) + .forEach(storeFlags::add); + + // tune some flags based on the state of others + if (!isFlagSet(ConditionalCreateAvailable) && isFlagSet(ConditionalCreateForFiles)) { + // only use the conditional create for files option if conditional + // create is actually available. + LOG_CREATE_DOWNGRADE.debug("Ignoring ConditionalCreateForFiles option"); + clearFlag(ConditionalCreateForFiles); + } + } + + @Override + public boolean isFlagSet(StoreConfigurationFlags flag) { + return storeFlags.contains(flag); + } + + @Override + public EnumSet getStoreFlags() { + return storeFlags.clone(); + } + + @Override + public boolean hasPathCapability(final Path path, final String capability) { + + // check the configuration flags. + if (storeFlags.stream() + .anyMatch(f -> f.keyMatches(capability))) { + return true; + } + + // no match + return false; + } + + @Override + public boolean setFlag(StoreConfigurationFlags flag) { + return storeFlags.add(flag); + } + + @Override + public boolean clearFlag(StoreConfigurationFlags flag) { + return storeFlags.remove(flag); + } + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/store/package-info.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/store/package-info.java new file mode 100644 index 0000000000000..3b1bbab7f41aa --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/store/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * S3A store related classes. + */ +package org.apache.hadoop.fs.s3a.impl.store; \ No newline at end of file diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java index f938494eef0b5..1b196a5771687 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java @@ -45,6 +45,7 @@ import org.apache.hadoop.fs.s3a.impl.StoreContext; import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder; import org.apache.hadoop.fs.s3a.impl.StubContextAccessor; +import org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationService; import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics; import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext; import org.apache.hadoop.fs.s3a.test.MinimalWriteOperationHelperCallbacks; @@ -124,8 +125,8 @@ private static void prepareRequest(SdkRequest.Builder t) {} @Override protected S3AStore createS3AStore(final ClientManager clientManager, - final int rateLimitCapacity) { - return super.createS3AStore(clientManager, rateLimitCapacity); + final int rateLimitCapacity, final StoreConfigurationService storeConfiguration) { + return super.createS3AStore(clientManager, rateLimitCapacity, storeConfiguration); } @Override