Skip to content

Support per-project s3 clients #127631

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 32 commits into from
May 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
33dc439
WIP
ywangd May 2, 2025
a8dc278
add more tests
ywangd May 3, 2025
a318fc8
tweaks
ywangd May 3, 2025
c3b36e7
add jira issue
ywangd May 5, 2025
fdc9c1d
applier
ywangd May 9, 2025
acf6da4
Merge remote-tracking branch 'origin/main' into per-project-s3-clients
ywangd May 20, 2025
0f1bdd5
abstract runnable
ywangd May 20, 2025
9bc5d09
nodeSettings
ywangd May 20, 2025
d665303
Merge remote-tracking branch 'origin/main' into per-project-s3-clients
ywangd May 21, 2025
55c697c
generic
ywangd May 21, 2025
833e085
consolidate
ywangd May 21, 2025
4d7bd69
no wait for async close
ywangd May 21, 2025
6ce0828
update
ywangd May 21, 2025
616a770
tweak
ywangd May 21, 2025
39fa184
tweak
ywangd May 21, 2025
922327b
more assertion
ywangd May 22, 2025
f4537ef
no client creation after manager closing
ywangd May 28, 2025
cf7cf7f
Merge remote-tracking branch 'origin/main' into per-project-s3-clients
ywangd May 28, 2025
208a540
Merge branch 'main' into per-project-s3-clients
ywangd May 28, 2025
ab5f911
remove dead code
ywangd May 28, 2025
dc14ef7
[CI] Auto commit changes from spotless
elasticsearchmachine May 28, 2025
ff631de
separate field for cluster clients
ywangd May 29, 2025
3962106
assert no double close
ywangd May 29, 2025
6cc05c6
use lightweight thread pool
ywangd May 29, 2025
f17e3e9
Merge remote-tracking branch 'origin/main' into per-project-s3-clients
ywangd May 29, 2025
ef5631b
tweak
ywangd May 29, 2025
9066af3
comments
ywangd May 29, 2025
4048064
fix test
ywangd May 29, 2025
38ee3b0
Merge branch 'main' into per-project-s3-clients
elasticmachine May 29, 2025
e1862dd
comment
ywangd May 29, 2025
9ab977b
Merge branch 'main' into per-project-s3-clients
elasticmachine May 30, 2025
d3c0dbb
Merge branch 'main' into per-project-s3-clients
elasticmachine May 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
Expand Down Expand Up @@ -256,8 +258,13 @@ public ProxyS3RepositoryPlugin(Settings settings) {
}

@Override
S3Service s3Service(Environment environment, Settings nodeSettings, ResourceWatcherService resourceWatcherService) {
return new ProxyS3Service(environment, nodeSettings, resourceWatcherService);
S3Service s3Service(
Environment environment,
ClusterService clusterService,
ProjectResolver projectResolver,
ResourceWatcherService resourceWatcherService
) {
return new ProxyS3Service(environment, clusterService, projectResolver, resourceWatcherService);
}

/**
Expand Down Expand Up @@ -293,8 +300,13 @@ public static final class ProxyS3Service extends S3Service {

private static final Logger logger = LogManager.getLogger(ProxyS3Service.class);

ProxyS3Service(Environment environment, Settings nodeSettings, ResourceWatcherService resourceWatcherService) {
super(environment, nodeSettings, resourceWatcherService, () -> null);
ProxyS3Service(
Environment environment,
ClusterService clusterService,
ProjectResolver projectResolver,
ResourceWatcherService resourceWatcherService
) {
super(environment, clusterService, projectResolver, resourceWatcherService, () -> null);
}

@Override
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -80,13 +81,20 @@ protected S3Repository createRepository(

@Override
public Collection<?> createComponents(PluginServices services) {
service.set(s3Service(services.environment(), services.clusterService().getSettings(), services.resourceWatcherService()));
service.set(
s3Service(services.environment(), services.clusterService(), services.projectResolver(), services.resourceWatcherService())
);
this.service.get().refreshAndClearCache(S3ClientSettings.load(settings));
return List.of(service.get());
}

S3Service s3Service(Environment environment, Settings nodeSettings, ResourceWatcherService resourceWatcherService) {
return new S3Service(environment, nodeSettings, resourceWatcherService, S3RepositoryPlugin::getDefaultRegion);
S3Service s3Service(
Environment environment,
ClusterService clusterService,
ProjectResolver projectResolver,
ResourceWatcherService resourceWatcherService
) {
return new S3Service(environment, clusterService, projectResolver, resourceWatcherService, S3RepositoryPlugin::getDefaultRegion);
}

private static Region getDefaultRegion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,19 @@
import org.apache.http.conn.DnsResolver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.coordination.stateless.StoreHeartbeatService;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
Expand All @@ -72,7 +74,6 @@
import java.util.function.Consumer;
import java.util.function.Supplier;

import static java.util.Collections.emptyMap;
import static software.amazon.awssdk.core.SdkSystemSetting.AWS_ROLE_ARN;
import static software.amazon.awssdk.core.SdkSystemSetting.AWS_ROLE_SESSION_NAME;
import static software.amazon.awssdk.core.SdkSystemSetting.AWS_WEB_IDENTITY_TOKEN_FILE;
Expand All @@ -93,21 +94,6 @@ class S3Service extends AbstractLifecycleComponent {
TimeValue.timeValueHours(24),
Setting.Property.NodeScope
);
private volatile Map<S3ClientSettings, AmazonS3Reference> clientsCache = emptyMap();

/**
* Client settings calculated from static configuration and settings in the keystore.
*/
private volatile Map<String, S3ClientSettings> staticClientSettings = Map.of(
"default",
S3ClientSettings.getClientSettings(Settings.EMPTY, "default")
);

/**
* Client settings derived from those in {@link #staticClientSettings} by combining them with settings
* in the {@link RepositoryMetadata}.
*/
private volatile Map<Settings, S3ClientSettings> derivedClientSettings = emptyMap();

private final Runnable defaultRegionSetter;
private volatile Region defaultRegion;
Expand All @@ -124,13 +110,16 @@ class S3Service extends AbstractLifecycleComponent {
final TimeValue compareAndExchangeTimeToLive;
final TimeValue compareAndExchangeAntiContentionDelay;
final boolean isStateless;
private final S3ClientsManager s3ClientsManager;

S3Service(
Environment environment,
Settings nodeSettings,
ClusterService clusterService,
ProjectResolver projectResolver,
ResourceWatcherService resourceWatcherService,
Supplier<Region> defaultRegionSupplier
) {
final Settings nodeSettings = clusterService.getSettings();
webIdentityTokenCredentialsProvider = new CustomWebIdentityTokenCredentialsProvider(
environment,
System::getenv,
Expand All @@ -142,6 +131,20 @@ class S3Service extends AbstractLifecycleComponent {
compareAndExchangeAntiContentionDelay = REPOSITORY_S3_CAS_ANTI_CONTENTION_DELAY_SETTING.get(nodeSettings);
isStateless = DiscoveryNode.isStateless(nodeSettings);
defaultRegionSetter = new RunOnce(() -> defaultRegion = defaultRegionSupplier.get());
s3ClientsManager = new S3ClientsManager(
nodeSettings,
this::buildClientReference,
clusterService.threadPool().generic(),
projectResolver.supportsMultipleProjects()
);
if (projectResolver.supportsMultipleProjects()) {
clusterService.addHighPriorityApplier(s3ClientsManager);
}
}

// visible to tests
S3ClientsManager getS3ClientsManager() {
return s3ClientsManager;
}

/**
Expand All @@ -151,86 +154,55 @@ class S3Service extends AbstractLifecycleComponent {
* of being returned to the cache.
*/
public synchronized void refreshAndClearCache(Map<String, S3ClientSettings> clientsSettings) {
// shutdown all unused clients
// others will shutdown on their respective release
releaseCachedClients();
this.staticClientSettings = Maps.ofEntries(clientsSettings.entrySet());
derivedClientSettings = emptyMap();
assert this.staticClientSettings.containsKey("default") : "always at least have 'default'";
/* clients are built lazily by {@link #client} */
s3ClientsManager.refreshAndClearCacheForClusterClients(clientsSettings);
}

/**
* Attempts to retrieve a client by its repository metadata and settings from the cache.
* If the client does not exist it will be created.
*/
@FixForMultiProject(description = "can be removed once blobstore is project aware")
public AmazonS3Reference client(RepositoryMetadata repositoryMetadata) {
final S3ClientSettings clientSettings = settings(repositoryMetadata);
{
final AmazonS3Reference clientReference = clientsCache.get(clientSettings);
if (clientReference != null && clientReference.tryIncRef()) {
return clientReference;
}
}
synchronized (this) {
final AmazonS3Reference existing = clientsCache.get(clientSettings);
if (existing != null && existing.tryIncRef()) {
return existing;
}

if (lifecycle.started() == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 this lifecycle check is now happening in S3ClientsManager

// doClose() calls releaseCachedClients() which is also synchronized (this) so if we're STARTED here then the client we
// create will definitely not leak on close.
throw new AlreadyClosedException("S3Service is in state [" + lifecycle + "]");
}
return client(ProjectId.DEFAULT, repositoryMetadata);
}

final SdkHttpClient httpClient = buildHttpClient(clientSettings, getCustomDnsResolver());
Releasable toRelease = httpClient::close;
try {
final AmazonS3Reference clientReference = new AmazonS3Reference(buildClient(clientSettings, httpClient), httpClient);
clientReference.mustIncRef();
clientsCache = Maps.copyMapWithAddedEntry(clientsCache, clientSettings, clientReference);
toRelease = null;
return clientReference;
} finally {
Releasables.close(toRelease);
}
}
/**
* Attempts to retrieve either a cluster or project client from the client manager. Throws if project-id or
* the client name does not exist. The client maybe initialized lazily.
* @param projectId The project associated with the client, or null if the client is cluster level
*/
public AmazonS3Reference client(@Nullable ProjectId projectId, RepositoryMetadata repositoryMetadata) {
return s3ClientsManager.client(effectiveProjectId(projectId), repositoryMetadata);
}

/**
* Either fetches {@link S3ClientSettings} for a given {@link RepositoryMetadata} from cached settings or creates them
* by overriding static client settings from {@link #staticClientSettings} with settings found in the repository metadata.
* @param repositoryMetadata Repository Metadata
* @return S3ClientSettings
* We use the default project-id for cluster level clients.
*/
S3ClientSettings settings(RepositoryMetadata repositoryMetadata) {
final Settings settings = repositoryMetadata.settings();
{
final S3ClientSettings existing = derivedClientSettings.get(settings);
if (existing != null) {
return existing;
}
}
final String clientName = S3Repository.CLIENT_NAME.get(settings);
final S3ClientSettings staticSettings = staticClientSettings.get(clientName);
if (staticSettings != null) {
synchronized (this) {
final S3ClientSettings existing = derivedClientSettings.get(settings);
if (existing != null) {
return existing;
}
final S3ClientSettings newSettings = staticSettings.refine(settings);
derivedClientSettings = Maps.copyMapWithAddedOrReplacedEntry(derivedClientSettings, settings, newSettings);
return newSettings;
}
ProjectId effectiveProjectId(@Nullable ProjectId projectId) {
return projectId == null ? ProjectId.DEFAULT : projectId;
}

// TODO: consider moving client building into S3ClientsManager
private AmazonS3Reference buildClientReference(final S3ClientSettings clientSettings) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT this method is only used as a method reference that's passed into S3ClientsManager. I don't see a great reason for keeping it within S3Service. Could we move this method, and its dependencies, into S3ClientsManager instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method calls into many other methods directly and indirectly. If we move all of them across, we basically no longer have a S3Service. It is also difficult to move all the methods since some of them are overridden in tests, e.g. getCustomDnsResolver, getConnectionAcquisitionTimeout. One alternative is to merge S3ClientsManager into S3Service. But I don't particularly like the idea either.

Also since this change will not be backported, I'd prefer to keep the change-set smaller so it does not cause trouble for any future backports. We can revisit the structure but outside of this PR in future. I can add a TODO for it. Does that sound OK?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, ok

final SdkHttpClient httpClient = buildHttpClient(clientSettings, getCustomDnsResolver());
Releasable toRelease = httpClient::close;
try {
final AmazonS3Reference clientReference = new AmazonS3Reference(buildClient(clientSettings, httpClient), httpClient);
clientReference.mustIncRef();
toRelease = null;
return clientReference;
} finally {
Releasables.close(toRelease);
}
throw new IllegalArgumentException(
"Unknown s3 client name ["
+ clientName
+ "]. Existing client configs: "
+ Strings.collectionToDelimitedString(staticClientSettings.keySet(), ",")
);
}

@FixForMultiProject(description = "can be removed once blobstore is project aware")
S3ClientSettings settings(RepositoryMetadata repositoryMetadata) {
return settings(ProjectId.DEFAULT, repositoryMetadata);
}

S3ClientSettings settings(@Nullable ProjectId projectId, RepositoryMetadata repositoryMetadata) {
return s3ClientsManager.settingsForClient(effectiveProjectId(projectId), repositoryMetadata);
}

// proxy for testing
Expand Down Expand Up @@ -448,18 +420,17 @@ static AwsCredentialsProvider buildCredentials(
}
}

private synchronized void releaseCachedClients() {
// the clients will shutdown when they will not be used anymore
for (final AmazonS3Reference clientReference : clientsCache.values()) {
clientReference.decRef();
}
// clear previously cached clients, they will be build lazily
clientsCache = emptyMap();
derivedClientSettings = emptyMap();
@FixForMultiProject(description = "can be removed once blobstore is project aware")
public void onBlobStoreClose() {
onBlobStoreClose(ProjectId.DEFAULT);
}

public void onBlobStoreClose() {
releaseCachedClients();
/**
* Release clients for the specified project.
* @param projectId The project associated with the client, or null if the client is cluster level
*/
public void onBlobStoreClose(@Nullable ProjectId projectId) {
s3ClientsManager.releaseCachedClients(effectiveProjectId(projectId));
}

@Override
Expand All @@ -472,7 +443,7 @@ protected void doStop() {}

@Override
public void doClose() throws IOException {
releaseCachedClients();
s3ClientsManager.close();
webIdentityTokenCredentialsProvider.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -234,7 +237,8 @@ public void testEndPointAndRegionOverrides() throws IOException {
try (
S3Service s3Service = new S3Service(
mock(Environment.class),
Settings.EMPTY,
ClusterServiceUtils.createClusterService(new DeterministicTaskQueue().getThreadPool()),
TestProjectResolvers.DEFAULT_PROJECT_ONLY,
mock(ResourceWatcherService.class),
() -> Region.of("es-test-region")
)
Expand All @@ -248,7 +252,6 @@ public void testEndPointAndRegionOverrides() throws IOException {
assertEquals("es-test-region", reference.client().serviceClientConfiguration().region().toString());

reference.close();
s3Service.doClose();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.logging.log4j.Level;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.common.BackoffPolicy;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobContainer;
Expand Down Expand Up @@ -54,6 +55,7 @@
import org.elasticsearch.telemetry.InstrumentType;
import org.elasticsearch.telemetry.Measurement;
import org.elasticsearch.telemetry.RecordingMeterRegistry;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.watcher.ResourceWatcherService;
Expand Down Expand Up @@ -126,7 +128,13 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
@Before
public void setUp() throws Exception {
shouldErrorOnDns = false;
service = new S3Service(Mockito.mock(Environment.class), Settings.EMPTY, Mockito.mock(ResourceWatcherService.class), () -> null) {
service = new S3Service(
Mockito.mock(Environment.class),
ClusterServiceUtils.createClusterService(new DeterministicTaskQueue().getThreadPool()),
TestProjectResolvers.DEFAULT_PROJECT_ONLY,
Mockito.mock(ResourceWatcherService.class),
() -> null
) {
private InetAddress[] resolveHost(String host) throws UnknownHostException {
assertEquals("127.0.0.1", host);
if (shouldErrorOnDns && randomBoolean() && randomBoolean()) {
Expand Down Expand Up @@ -1308,7 +1316,11 @@ public void testRetryOn403InStateless() {

service = new S3Service(
Mockito.mock(Environment.class),
Settings.builder().put(STATELESS_ENABLED_SETTING_NAME, "true").build(),
ClusterServiceUtils.createClusterService(
new DeterministicTaskQueue().getThreadPool(),
Settings.builder().put(STATELESS_ENABLED_SETTING_NAME, "true").build()
),
TestProjectResolvers.DEFAULT_PROJECT_ONLY,
Mockito.mock(ResourceWatcherService.class),
() -> null
);
Expand Down
Loading