Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -518,62 +518,6 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES)
private int maxApacheHttpClientIoExceptionsRetries;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT,
DefaultValue = DEFAULT_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT)
private boolean dynamicWriteThreadPoolEnablement;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_THREADPOOL_KEEP_ALIVE_TIME_MILLIS,
DefaultValue = DEFAULT_WRITE_THREADPOOL_KEEP_ALIVE_TIME_MILLIS)
private int writeThreadPoolKeepAliveTime;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_CPU_MONITORING_INTERVAL_MILLIS,
MinValue = MIN_WRITE_CPU_MONITORING_INTERVAL_MILLIS,
MaxValue = MAX_WRITE_CPU_MONITORING_INTERVAL_MILLIS,
DefaultValue = DEFAULT_WRITE_CPU_MONITORING_INTERVAL_MILLIS)
private int writeCpuMonitoringInterval;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_HIGH_CPU_THRESHOLD_PERCENT,
MinValue = MIN_WRITE_HIGH_CPU_THRESHOLD_PERCENT,
MaxValue = MAX_WRITE_HIGH_CPU_THRESHOLD_PERCENT,
DefaultValue = DEFAULT_WRITE_HIGH_CPU_THRESHOLD_PERCENT)
private int writeHighCpuThreshold;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT,
MinValue = MIN_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT,
MaxValue = MAX_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT,
DefaultValue = DEFAULT_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT)
private int writeMediumCpuThreshold;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_LOW_CPU_THRESHOLD_PERCENT,
MaxValue = MAX_WRITE_LOW_CPU_THRESHOLD_PERCENT,
DefaultValue = DEFAULT_WRITE_LOW_CPU_THRESHOLD_PERCENT)
private int writeLowCpuThreshold;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_LOW_TIER_MEMORY_MULTIPLIER,
MinValue = MIN_WRITE_LOW_TIER_MEMORY_MULTIPLIER,
DefaultValue = DEFAULT_WRITE_LOW_TIER_MEMORY_MULTIPLIER)
private int lowTierMemoryMultiplier;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER,
MinValue = MIN_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER,
DefaultValue = DEFAULT_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER)
private int mediumTierMemoryMultiplier;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_HIGH_TIER_MEMORY_MULTIPLIER,
MinValue = MIN_WRITE_HIGH_TIER_MEMORY_MULTIPLIER,
DefaultValue = DEFAULT_WRITE_HIGH_TIER_MEMORY_MULTIPLIER)
private int highTierMemoryMultiplier;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_WRITE_HIGH_MEMORY_USAGE_THRESHOLD_PERCENT,
DefaultValue = DEFAULT_WRITE_HIGH_MEMORY_USAGE_THRESHOLD_PERCENT)
private int writeHighMemoryUsageThresholdPercent;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_WRITE_LOW_MEMORY_USAGE_THRESHOLD_PERCENT,
DefaultValue = DEFAULT_WRITE_LOW_MEMORY_USAGE_THRESHOLD_PERCENT)
private int writeLowMemoryUsageThresholdPercent;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE, DefaultValue = DEFAULT_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE,
MinValue = MIN_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE, MaxValue = MAX_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE)
Expand Down Expand Up @@ -1883,60 +1827,16 @@ public ExponentialRetryPolicy getOauthTokenFetchRetryPolicy() {
oauthTokenFetchRetryDeltaBackoff);
}

public int getWriteConcurrentRequestCount() {
public int getWriteMaxConcurrentRequestCount() {
if (this.writeMaxConcurrentRequestCount < 1) {
return 4 * Runtime.getRuntime().availableProcessors();
}
return this.writeMaxConcurrentRequestCount;
}

public int getWriteThreadPoolKeepAliveTime() {
return writeThreadPoolKeepAliveTime;
}

public int getWriteCpuMonitoringInterval() {
return writeCpuMonitoringInterval;
}

public boolean isDynamicWriteThreadPoolEnablement() {
return dynamicWriteThreadPoolEnablement;
}

public int getWriteLowCpuThreshold() {
return writeLowCpuThreshold;
}

public int getWriteMediumCpuThreshold() {
return writeMediumCpuThreshold;
}

public int getWriteHighCpuThreshold() {
return writeHighCpuThreshold;
}

public int getLowTierMemoryMultiplier() {
return lowTierMemoryMultiplier;
}

public int getMediumTierMemoryMultiplier() {
return mediumTierMemoryMultiplier;
}

public int getHighTierMemoryMultiplier() {
return highTierMemoryMultiplier;
}

public int getWriteHighMemoryUsageThresholdPercent() {
return writeHighMemoryUsageThresholdPercent;
}

public int getWriteLowMemoryUsageThresholdPercent() {
return writeLowMemoryUsageThresholdPercent;
}

public int getMaxWriteRequestsToQueue() {
if (this.maxWriteRequestsToQueue < 1) {
return 2 * getWriteConcurrentRequestCount();
return 2 * getWriteMaxConcurrentRequestCount();
}
return this.maxWriteRequestsToQueue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
import org.apache.hadoop.fs.azurebfs.services.AbfsReadFooterMetrics;
import org.apache.hadoop.fs.azurebfs.services.AbfsReadResourceUtilizationMetrics;
import org.apache.hadoop.fs.azurebfs.services.AbfsWriteResourceUtilizationMetrics;
import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.IOStatistics;
Expand Down Expand Up @@ -108,8 +107,6 @@ public class AbfsCountersImpl implements AbfsCounters {

private AbfsReadFooterMetrics abfsReadFooterMetrics = null;

private AbfsWriteResourceUtilizationMetrics abfsWriteResourceUtilizationMetrics = null;

private AbfsReadResourceUtilizationMetrics abfsReadResourceUtilizationMetrics = null;

private AtomicLong lastExecutionTime = null;
Expand Down Expand Up @@ -187,18 +184,6 @@ public void initializeReadResourceUtilizationMetrics() {
abfsReadResourceUtilizationMetrics = new AbfsReadResourceUtilizationMetrics();
}

/**
* Initializes the metrics collector for the write thread pool.
* <p>
* This method creates a new instance of {@link AbfsWriteResourceUtilizationMetrics}
* to track performance statistics and operational metrics related to
* write operations executed by the thread pool.
* </p>
*/
public void initializeWriteResourceUtilizationMetrics() {
abfsWriteResourceUtilizationMetrics = new AbfsWriteResourceUtilizationMetrics();
}


@Override
public void initializeMetrics(MetricFormat metricFormat) {
Expand Down Expand Up @@ -298,14 +283,6 @@ public AbfsReadFooterMetrics getAbfsReadFooterMetrics() {
return abfsReadFooterMetrics != null ? abfsReadFooterMetrics : null;
}

/**
* Returns the write thread pool metrics instance, or {@code null} if uninitialized.
*/
@Override
public AbfsWriteResourceUtilizationMetrics getAbfsWriteResourceUtilizationMetrics() {
return abfsWriteResourceUtilizationMetrics != null ? abfsWriteResourceUtilizationMetrics : null;
}

/**
* Returns the read thread pool metrics instance, or {@code null} if uninitialized.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.WeakHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -205,7 +204,6 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
private int blockOutputActiveBlocks;
/** Bounded ThreadPool for this instance. */
private ExecutorService boundedThreadPool;
private WriteThreadPoolSizeManager writeThreadPoolSizeManager;

/** ABFS instance reference to be held by the store to avoid GC close. */
private BackReference fsBackRef;
Expand Down Expand Up @@ -280,18 +278,11 @@ public AzureBlobFileSystemStore(
}
this.blockFactory = abfsStoreBuilder.blockFactory;
this.blockOutputActiveBlocks = abfsStoreBuilder.blockOutputActiveBlocks;
if (abfsConfiguration.isDynamicWriteThreadPoolEnablement()) {
this.writeThreadPoolSizeManager = WriteThreadPoolSizeManager.getInstance(
getClient().getFileSystem() + "-" + UUID.randomUUID(),
abfsConfiguration, getClient().getAbfsCounters());
this.boundedThreadPool = writeThreadPoolSizeManager.getExecutorService();
} else {
this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
abfsConfiguration.getWriteConcurrentRequestCount(),
abfsConfiguration.getMaxWriteRequestsToQueue(),
10L, TimeUnit.SECONDS,
"abfs-bounded");
}
this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
abfsConfiguration.getWriteMaxConcurrentRequestCount(),
abfsConfiguration.getMaxWriteRequestsToQueue(),
10L, TimeUnit.SECONDS,
"abfs-bounded");
}

/**
Expand Down Expand Up @@ -330,19 +321,17 @@ public void close() throws IOException {
}
try {
Futures.allAsList(futures).get();
if (!abfsConfiguration.isDynamicWriteThreadPoolEnablement()) {
// shutdown the threadPool and set it to null.
HadoopExecutors.shutdown(boundedThreadPool, LOG,
30, TimeUnit.SECONDS);
boundedThreadPool = null;
}
// shutdown the threadPool and set it to null.
HadoopExecutors.shutdown(boundedThreadPool, LOG,
30, TimeUnit.SECONDS);
boundedThreadPool = null;
} catch (InterruptedException e) {
LOG.error("Interrupted freeing leases", e);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
LOG.error("Error freeing leases", e);
} finally {
IOUtils.cleanupWithLogger(LOG, writeThreadPoolSizeManager, getClientHandler());
IOUtils.cleanupWithLogger(LOG, getClientHandler());
}
}

Expand Down Expand Up @@ -809,7 +798,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
.disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
.withAppendBlob(isAppendBlob)
.withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteConcurrentRequestCount())
.withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount())
.withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue())
.withLease(lease)
.withEncryptionAdapter(contextEncryptionAdapter)
Expand All @@ -821,7 +810,6 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
.withPath(path)
.withExecutorService(new SemaphoredDelegatingExecutor(boundedThreadPool,
blockOutputActiveBlocks, true))
.withWriteThreadPoolManager(writeThreadPoolSizeManager)
.withTracingContext(tracingContext)
.withAbfsBackRef(fsBackRef)
.withIngressServiceType(abfsConfiguration.getIngressServiceType())
Expand Down
Loading