Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-19478. S3A: pull out new configuration load/probes under S3AStore #7463

Draft
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1522,6 +1522,31 @@ private Constants() {
*/
public static final String FS_S3A_PERFORMANCE_FLAGS =
"fs.s3a.performance.flags";


/**
* Is the create overwrite feature enabled or not?
* A configuration option and a path status probe.
* Value {@value}.
*/
public static final String FS_S3A_CONDITIONAL_CREATE_ENABLED = "fs.s3a.conditional.create.enabled";

/**
* If conditional create is available, should it be used in
* createFile() operations to check for file existence?
* If set, this disables probes for directories.
* Value {@value}.
*/
public static final String FS_S3A_CONDITIONAL_CREATE_FILES = "fs.s3a.conditional.create.files";

/**
* createFile() boolean option toreate a multipart file, always: {@value}.
* <p>
* This is inefficient and will not work on a store which doesn't support that feature,
* so is primarily for testing.
*/
public static final String FS_S3A_CREATE_MULTIPART = "fs.s3a.create.multipart";

/**
* Prefix for adding a header to the object when created.
* The actual value must have a "." suffix and then the actual header.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@
import org.apache.hadoop.fs.s3a.impl.StoreContextFactory;
import org.apache.hadoop.fs.s3a.impl.UploadContentProviders;
import org.apache.hadoop.fs.s3a.impl.CSEUtils;
import org.apache.hadoop.fs.s3a.impl.store.StoreConfiguration;
import org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationService;
import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters;
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamCallbacks;
Expand Down Expand Up @@ -258,6 +260,9 @@
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.logDnsLookup;
import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.STORE_CAPABILITY_S3_EXPRESS_STORAGE;
import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.isS3ExpressStore;
import static org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationFlags.ConditionalCreateAvailable;
import static org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationFlags.ConditionalCreateForFiles;
import static org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationFlags.DowngradeSyncableExceptions;
import static org.apache.hadoop.fs.s3a.impl.streams.StreamFactoryRequirements.Requirements.ExpectUnauditedGetRequests;
import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.checkNoS3Guard;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
Expand Down Expand Up @@ -611,6 +616,9 @@ public void initialize(URI name, Configuration originalConf)
setUri(name, delegationTokensEnabled);
super.initialize(uri, conf);
setConf(conf);
// init store configuration service.
StoreConfigurationService storeConfiguration = new StoreConfigurationService();
storeConfiguration.init(conf);

// initialize statistics, after which statistics
// can be collected.
Expand Down Expand Up @@ -794,7 +802,9 @@ public void initialize(URI name, Configuration originalConf)
int rateLimitCapacity = intOption(conf, S3A_IO_RATE_LIMIT, DEFAULT_S3A_IO_RATE_LIMIT, 0);

// now create and initialize the store
store = createS3AStore(clientManager, rateLimitCapacity);
store = createS3AStore(clientManager,
rateLimitCapacity,
storeConfiguration);
// the s3 client is created through the store, rather than
// directly through the client manager.
// this is to aid mocking.
Expand Down Expand Up @@ -864,24 +874,28 @@ private S3AFileSystemOperations createFileSystemHandler() {
* This is protected so that tests can override it.
* @param clientManager client manager
* @param rateLimitCapacity rate limit
* @param storeConfiguration the store configuration.
* @return a new store instance
*/
@VisibleForTesting
protected S3AStore createS3AStore(final ClientManager clientManager,
final int rateLimitCapacity) {
final int rateLimitCapacity,
final StoreConfigurationService storeConfiguration) {

final S3AStore st = new S3AStoreBuilder()
.withAuditSpanSource(getAuditManager())
.withClientManager(clientManager)
.withDurationTrackerFactory(getDurationTrackerFactory())
.withFsStatistics(getFsStatistics())
.withInstrumentation(getInstrumentation())
.withStatisticsContext(statisticsContext)
.withStoreConfigurationService(storeConfiguration)
.withStoreContextFactory(this)
.withStorageStatistics(getStorageStatistics())
.withReadRateLimiter(unlimitedRate())
.withWriteRateLimiter(RateLimitingFactory.create(rateLimitCapacity))
.build();
st.init(getConf());
st.init(storeConfiguration.getConfig());
st.start();
return st;
}
Expand Down Expand Up @@ -2123,28 +2137,28 @@ private FSDataOutputStream innerCreateFile(

final S3ABlockOutputStream.BlockOutputStreamBuilder builder =
S3ABlockOutputStream.builder()
.withKey(destKey)
.withBlockFactory(blockFactory)
.withBlockSize(partSize)
.withStatistics(outputStreamStatistics)
.withProgress(progress)
.withPutTracker(putTracker)
.withWriteOperations(
createWriteOperationHelper(auditSpan))
.withExecutorService(
new SemaphoredDelegatingExecutor(
boundedThreadPool,
blockOutputActiveBlocks,
true,
outputStreamStatistics))
.withDowngradeSyncableExceptions(
.withKey(destKey)
.withBlockFactory(blockFactory)
.withBlockSize(partSize)
.withStatistics(outputStreamStatistics)
.withProgress(progress)
.withPutTracker(putTracker)
.withWriteOperations(
createWriteOperationHelper(auditSpan))
.withExecutorService(
new SemaphoredDelegatingExecutor(
boundedThreadPool,
blockOutputActiveBlocks,
true,
outputStreamStatistics))
.withDowngradeSyncableExceptions(
getConf().getBoolean(
DOWNGRADE_SYNCABLE_EXCEPTIONS,
DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT))
.withCSEEnabled(isCSEEnabled)
.withPutOptions(putOptions)
.withIOStatisticsAggregator(
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator())
.withCSEEnabled(isCSEEnabled)
.withPutOptions(putOptions)
.withIOStatisticsAggregator(
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator())
.withMultipartEnabled(isMultipartUploadEnabled);
return new FSDataOutputStream(
new S3ABlockOutputStream(builder),
Expand Down Expand Up @@ -5299,6 +5313,7 @@ public CommitterStatistics newCommitterStatistics() {
public boolean hasPathCapability(final Path path, final String capability)
throws IOException {
final Path p = makeQualified(path);
final S3AStore store = getStore();
String cap = validatePathCapabilityArgs(p, capability);
switch (cap) {

Expand Down Expand Up @@ -5365,6 +5380,11 @@ public boolean hasPathCapability(final Path path, final String capability)
case FS_S3A_CREATE_HEADER:
return true;

case FS_OPTION_CREATE_CONDITIONAL_OVERWRITE:
case FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG:
// conditional create requires it to be enabled in the FS.
return store.getStoreConfiguration().isFlagSet(ConditionalCreateAvailable);

// is the FS configured for create file performance
case FS_S3A_CREATE_PERFORMANCE_ENABLED:
return performanceFlags.enabled(PerformanceFlagEnum.Create);
Expand All @@ -5388,8 +5408,8 @@ public boolean hasPathCapability(final Path path, final String capability)
}

// ask the store for what capabilities it offers
// this may include input and output capabilites -and more
if (getStore() != null && getStore().hasPathCapability(path, capability)) {
// this includes, store configuration flags, IO capabilites...etc.
if (store.hasPathCapability(path, capability)) {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteException;
import org.apache.hadoop.fs.s3a.impl.S3AFileSystemOperations;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.impl.store.StoreConfiguration;
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamFactory;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
Expand Down Expand Up @@ -366,4 +367,12 @@ default boolean hasCapability(String capability) {
/*
=============== END ObjectInputStreamFactory ===============
*/


/**
* Get the store configuration.
* @return the store configuration.
*/
StoreConfiguration getStoreConfiguration();

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
import org.apache.hadoop.fs.s3a.S3AStore;
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
import org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationService;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.store.audit.AuditSpanSource;
Expand Down Expand Up @@ -51,6 +52,8 @@ public class S3AStoreBuilder {

private AuditSpanSource<AuditSpanS3A> auditSpanSource;

private StoreConfigurationService storeConfigurationService;

/**
* The original file system statistics: fairly minimal but broadly
* collected so it is important to pick up.
Expand Down Expand Up @@ -117,6 +120,17 @@ public S3AStoreBuilder withFsStatistics(final FileSystem.Statistics value) {
return this;
}

/**
* Set the store configuration service.
* @param value new value
* @return the builder
*/
public S3AStoreBuilder withStoreConfigurationService(
final StoreConfigurationService value) {
storeConfigurationService = value;
return this;
}

public S3AStore build() {
return new S3AStoreImpl(storeContextFactory,
clientManager,
Expand All @@ -127,6 +141,7 @@ public S3AStore build() {
readRateLimiter,
writeRateLimiter,
auditSpanSource,
fsStatistics);
fsStatistics,
storeConfigurationService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,14 @@
import org.apache.hadoop.fs.s3a.UploadInfo;
import org.apache.hadoop.fs.s3a.api.RequestFactory;
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
import org.apache.hadoop.fs.s3a.impl.store.StoreConfiguration;
import org.apache.hadoop.fs.s3a.impl.streams.FactoryBindingParameters;
import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamFactory;
import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters;
import org.apache.hadoop.fs.s3a.impl.streams.StreamFactoryRequirements;
import org.apache.hadoop.fs.s3a.impl.store.StoreConfigurationService;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
Expand Down Expand Up @@ -193,10 +195,15 @@ public class S3AStoreImpl
*/
private ObjectInputStreamFactory objectInputStreamFactory;

/**
* Store Configuration.
*/
private final StoreConfigurationService storeConfiguration;

/**
* Constructor to create S3A store.
* Package private, as {@link S3AStoreBuilder} creates them.
* */
*/
S3AStoreImpl(StoreContextFactory storeContextFactory,
ClientManager clientManager,
DurationTrackerFactory durationTrackerFactory,
Expand All @@ -206,7 +213,8 @@ public class S3AStoreImpl
RateLimiting readRateLimiter,
RateLimiting writeRateLimiter,
AuditSpanSource<AuditSpanS3A> auditSpanSource,
@Nullable FileSystem.Statistics fsStatistics) {
@Nullable FileSystem.Statistics fsStatistics,
StoreConfigurationService storeConfiguration) {
super("S3AStore");
this.auditSpanSource = requireNonNull(auditSpanSource);
this.clientManager = requireNonNull(clientManager);
Expand All @@ -223,7 +231,9 @@ public class S3AStoreImpl
this.invoker = requireNonNull(storeContext.getInvoker());
this.bucket = requireNonNull(storeContext.getBucket());
this.requestFactory = requireNonNull(storeContext.getRequestFactory());
this.storeConfiguration = requireNonNull(storeConfiguration);
addService(clientManager);
addService(storeConfiguration);
}

/**
Expand Down Expand Up @@ -253,20 +263,26 @@ protected void serviceStart() throws Exception {

/**
* Return the store path capabilities.
* If the object stream factory is non-null, hands off the
* query to that factory if not handled here.
* This may hand off the probe to assistant classes/services.
* @param path path to query the capability of.
* @param capability non-null, non-empty string to query the path for support.
* @return known capabilities
* @return true if the capability is known and enabled.
*/
@Override
public boolean hasPathCapability(final Path path, final String capability) {
switch (toLowerCase(capability)) {
case StreamCapabilities.IOSTATISTICS:

// only support this once started; avoids worrying about
// state of services which assist in this calculation.
if (!isInState(STATE.STARTED)) {
return false;
}
final String cap = toLowerCase(capability);
if (cap.equals(StreamCapabilities.IOSTATISTICS)) {
return true;
default:
return inputStreamHasCapability(capability);
}
// probe store configuration and the input stream for
// the capability.
return storeConfiguration.hasPathCapability(path, cap)|| inputStreamHasCapability(cap);
}

/**
Expand Down Expand Up @@ -1001,4 +1017,19 @@ public S3AsyncClient getOrCreateAsyncClient(final boolean requireCRT) throws IOE
/*
=============== END ObjectInputStreamFactory ===============
*/


/*
=============== BEGIN StoreConfigurationService ===============
*/

@Override
public StoreConfiguration getStoreConfiguration() {
return storeConfiguration;
}

/*
=============== END StoreConfigurationService ===============
*/

}
Loading