Skip to content

Commit 83e8015

Browse files
authored
Reduce licence checks in LicensedWriteLoadForecaster (#123369) (#123409)
Rather than checking the license (updating the usage map) on every single shard, just do it once at the start of a computation that needs to forecast write loads. Backport of #123346 to 8.x Closes #123247
1 parent 2083ef8 commit 83e8015

File tree

10 files changed

+139
-8
lines changed

10 files changed

+139
-8
lines changed

docs/changelog/123346.yaml

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 123346
2+
summary: Reduce license checks in `LicensedWriteLoadForecaster`
3+
area: CRUD
4+
type: bug
5+
issues:
6+
- 123247

server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetDesiredBalanceAction.java

+1
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ protected void masterOperation(
9595
return;
9696
}
9797
var clusterInfo = clusterInfoService.getClusterInfo();
98+
writeLoadForecaster.refreshLicense();
9899
listener.onResponse(
99100
new DesiredBalanceResponse(
100101
desiredBalanceShardsAllocator.getStats(),

server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java

+1
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,7 @@ yield new DataStreamAutoShardingEvent(
430430
);
431431
}
432432

433+
writeLoadForecaster.refreshLicense();
433434
metadataBuilder = writeLoadForecaster.withWriteLoadForecastForWriteIndex(dataStreamName, metadataBuilder);
434435
metadataBuilder = withShardSizeForecastForWriteIndex(dataStreamName, metadataBuilder);
435436

server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsService.java

+2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ public AllocationStatsService(
4444
public Map<String, NodeAllocationStats> stats() {
4545
assert Transports.assertNotTransportThread("too expensive for a transport worker");
4646

47+
writeLoadForecaster.refreshLicense();
48+
4749
var state = clusterService.state();
4850
var info = clusterInfoService.getClusterInfo();
4951
var desiredBalance = desiredBalanceShardsAllocator != null ? desiredBalanceShardsAllocator.getDesiredBalance() : null;

server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadForecaster.java

+5
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ public interface WriteLoadForecaster {
2121

2222
OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata);
2323

24+
void refreshLicense();
25+
2426
class DefaultWriteLoadForecaster implements WriteLoadForecaster {
2527
@Override
2628
public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName, Metadata.Builder metadata) {
@@ -31,5 +33,8 @@ public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName
3133
public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) {
3234
return OptionalDouble.empty();
3335
}
36+
37+
@Override
38+
public void refreshLicense() {}
3439
}
3540
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

+5
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,11 @@ private static float ensureValidThreshold(float threshold) {
170170

171171
@Override
172172
public void allocate(RoutingAllocation allocation) {
173+
if (allocation.metadata().indices().isEmpty() == false) {
174+
// must not use licensed features when just starting up
175+
writeLoadForecaster.refreshLicense();
176+
}
177+
173178
assert allocation.ignoreDisable() == false;
174179

175180
if (allocation.routingNodes().size() == 0) {

test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java

+3
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName
8686
public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) {
8787
return indexMetadata.getForecastedWriteLoad();
8888
}
89+
90+
@Override
91+
public void refreshLicense() {}
8992
};
9093

9194
public static MockAllocationService createAllocationService() {

x-pack/plugin/write-load-forecaster/src/internalClusterTest/java/org/elasticsearch/xpack/writeloadforecaster/WriteLoadForecasterIT.java

+2
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ public void testWriteLoadForecastGetsPopulatedDuringRollovers() throws Exception
8484
assertAllPreviousForecastsAreClearedAfterRollover(dataStream, metadata);
8585

8686
setHasValidLicense(false);
87+
writeLoadForecaster.refreshLicense();
8788

8889
final OptionalDouble forecastedWriteLoadAfterLicenseChange = writeLoadForecaster.getForecastedWriteLoad(writeIndexMetadata);
8990
assertThat(forecastedWriteLoadAfterLicenseChange.isPresent(), is(equalTo(false)));
@@ -131,6 +132,7 @@ public void testWriteLoadForecastIsOverriddenBySetting() throws Exception {
131132
assertAllPreviousForecastsAreClearedAfterRollover(dataStream, metadata);
132133

133134
setHasValidLicense(false);
135+
writeLoadForecaster.refreshLicense();
134136

135137
final OptionalDouble forecastedWriteLoadAfterLicenseChange = writeLoadForecaster.getForecastedWriteLoad(writeIndexMetadata);
136138
assertThat(forecastedWriteLoadAfterLicenseChange.isPresent(), is(equalTo(false)));

x-pack/plugin/write-load-forecaster/src/main/java/org/elasticsearch/xpack/writeloadforecaster/LicensedWriteLoadForecaster.java

+42-7
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,12 @@
1919
import org.elasticsearch.core.SuppressForbidden;
2020
import org.elasticsearch.core.TimeValue;
2121
import org.elasticsearch.index.Index;
22+
import org.elasticsearch.logging.LogManager;
23+
import org.elasticsearch.logging.Logger;
2224
import org.elasticsearch.threadpool.ThreadPool;
2325

26+
import java.lang.invoke.MethodHandles;
27+
import java.lang.invoke.VarHandle;
2428
import java.util.List;
2529
import java.util.Objects;
2630
import java.util.OptionalDouble;
@@ -30,30 +34,36 @@
3034
import static org.elasticsearch.xpack.writeloadforecaster.WriteLoadForecasterPlugin.OVERRIDE_WRITE_LOAD_FORECAST_SETTING;
3135

3236
class LicensedWriteLoadForecaster implements WriteLoadForecaster {
37+
38+
private static final Logger logger = LogManager.getLogger(LicensedWriteLoadForecaster.class);
39+
3340
public static final Setting<TimeValue> MAX_INDEX_AGE_SETTING = Setting.timeSetting(
3441
"write_load_forecaster.max_index_age",
3542
TimeValue.timeValueDays(7),
3643
TimeValue.timeValueHours(1),
3744
Setting.Property.NodeScope,
3845
Setting.Property.Dynamic
3946
);
40-
private final BooleanSupplier hasValidLicense;
47+
private final BooleanSupplier hasValidLicenseSupplier;
4148
private final ThreadPool threadPool;
4249
private volatile TimeValue maxIndexAge;
4350

51+
@SuppressWarnings("unused") // modified via VH_HAS_VALID_LICENSE_FIELD
52+
private volatile boolean hasValidLicense;
53+
4454
LicensedWriteLoadForecaster(
45-
BooleanSupplier hasValidLicense,
55+
BooleanSupplier hasValidLicenseSupplier,
4656
ThreadPool threadPool,
4757
Settings settings,
4858
ClusterSettings clusterSettings
4959
) {
50-
this(hasValidLicense, threadPool, MAX_INDEX_AGE_SETTING.get(settings));
60+
this(hasValidLicenseSupplier, threadPool, MAX_INDEX_AGE_SETTING.get(settings));
5161
clusterSettings.addSettingsUpdateConsumer(MAX_INDEX_AGE_SETTING, this::setMaxIndexAgeSetting);
5262
}
5363

5464
// exposed for tests only
55-
LicensedWriteLoadForecaster(BooleanSupplier hasValidLicense, ThreadPool threadPool, TimeValue maxIndexAge) {
56-
this.hasValidLicense = hasValidLicense;
65+
LicensedWriteLoadForecaster(BooleanSupplier hasValidLicenseSupplier, ThreadPool threadPool, TimeValue maxIndexAge) {
66+
this.hasValidLicenseSupplier = hasValidLicenseSupplier;
5767
this.threadPool = threadPool;
5868
this.maxIndexAge = maxIndexAge;
5969
}
@@ -64,7 +74,7 @@ private void setMaxIndexAgeSetting(TimeValue updatedMaxIndexAge) {
6474

6575
@Override
6676
public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName, Metadata.Builder metadata) {
67-
if (hasValidLicense.getAsBoolean() == false) {
77+
if (hasValidLicense == false) {
6878
return metadata;
6979
}
7080

@@ -143,7 +153,7 @@ static OptionalDouble forecastIndexWriteLoad(List<IndexWriteLoad> indicesWriteLo
143153
@Override
144154
@SuppressForbidden(reason = "This is the only place where IndexMetadata#getForecastedWriteLoad is allowed to be used")
145155
public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) {
146-
if (hasValidLicense.getAsBoolean() == false) {
156+
if (hasValidLicense == false) {
147157
return OptionalDouble.empty();
148158
}
149159

@@ -154,4 +164,29 @@ public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) {
154164

155165
return indexMetadata.getForecastedWriteLoad();
156166
}
167+
168+
/**
169+
* Used to atomically {@code getAndSet()} the {@link #hasValidLicense} field. This is better than an
170+
* {@link java.util.concurrent.atomic.AtomicBoolean} because it takes one less pointer dereference on each read.
171+
*/
172+
private static final VarHandle VH_HAS_VALID_LICENSE_FIELD;
173+
174+
static {
175+
try {
176+
VH_HAS_VALID_LICENSE_FIELD = MethodHandles.lookup()
177+
.in(LicensedWriteLoadForecaster.class)
178+
.findVarHandle(LicensedWriteLoadForecaster.class, "hasValidLicense", boolean.class);
179+
} catch (NoSuchFieldException | IllegalAccessException e) {
180+
throw new RuntimeException(e);
181+
}
182+
}
183+
184+
@Override
185+
public void refreshLicense() {
186+
final var newValue = hasValidLicenseSupplier.getAsBoolean();
187+
final var oldValue = (boolean) VH_HAS_VALID_LICENSE_FIELD.getAndSet(this, newValue);
188+
if (newValue != oldValue) {
189+
logger.info("license state changed, now [{}]", newValue ? "valid" : "not valid");
190+
}
191+
}
157192
}

x-pack/plugin/write-load-forecaster/src/test/java/org/elasticsearch/xpack/writeloadforecaster/LicensedWriteLoadForecasterTests.java

+72-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
package org.elasticsearch.xpack.writeloadforecaster;
99

10+
import org.apache.logging.log4j.Level;
11+
import org.apache.logging.log4j.core.LogEvent;
1012
import org.elasticsearch.cluster.metadata.DataStream;
1113
import org.elasticsearch.cluster.metadata.IndexMetadata;
1214
import org.elasticsearch.cluster.metadata.IndexMetadataStats;
@@ -19,6 +21,7 @@
1921
import org.elasticsearch.index.IndexMode;
2022
import org.elasticsearch.index.IndexVersion;
2123
import org.elasticsearch.test.ESTestCase;
24+
import org.elasticsearch.test.MockLog;
2225
import org.elasticsearch.threadpool.TestThreadPool;
2326
import org.elasticsearch.threadpool.ThreadPool;
2427
import org.junit.After;
@@ -30,9 +33,12 @@
3033
import java.util.OptionalDouble;
3134
import java.util.concurrent.TimeUnit;
3235
import java.util.concurrent.atomic.AtomicBoolean;
36+
import java.util.concurrent.atomic.AtomicInteger;
3337

3438
import static org.elasticsearch.xpack.writeloadforecaster.LicensedWriteLoadForecaster.forecastIndexWriteLoad;
3539
import static org.hamcrest.Matchers.closeTo;
40+
import static org.hamcrest.Matchers.contains;
41+
import static org.hamcrest.Matchers.empty;
3642
import static org.hamcrest.Matchers.equalTo;
3743
import static org.hamcrest.Matchers.greaterThan;
3844
import static org.hamcrest.Matchers.is;
@@ -53,7 +59,13 @@ public void tearDownThreadPool() {
5359
public void testWriteLoadForecastIsAddedToWriteIndex() {
5460
final TimeValue maxIndexAge = TimeValue.timeValueDays(7);
5561
final AtomicBoolean hasValidLicense = new AtomicBoolean(true);
56-
final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(hasValidLicense::get, threadPool, maxIndexAge);
62+
final AtomicInteger licenseCheckCount = new AtomicInteger();
63+
final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(() -> {
64+
licenseCheckCount.incrementAndGet();
65+
return hasValidLicense.get();
66+
}, threadPool, maxIndexAge);
67+
68+
writeLoadForecaster.refreshLicense();
5769

5870
final Metadata.Builder metadataBuilder = Metadata.builder();
5971
final String dataStreamName = "logs-es";
@@ -95,8 +107,12 @@ public void testWriteLoadForecastIsAddedToWriteIndex() {
95107
assertThat(forecastedWriteLoad.isPresent(), is(true));
96108
assertThat(forecastedWriteLoad.getAsDouble(), is(greaterThan(0.0)));
97109

110+
assertThat(licenseCheckCount.get(), equalTo(1));
98111
hasValidLicense.set(false);
99112

113+
writeLoadForecaster.refreshLicense();
114+
assertThat(licenseCheckCount.get(), equalTo(2));
115+
100116
final OptionalDouble forecastedWriteLoadAfterLicenseChange = writeLoadForecaster.getForecastedWriteLoad(writeIndex);
101117
assertThat(forecastedWriteLoadAfterLicenseChange.isPresent(), is(false));
102118
}
@@ -136,6 +152,7 @@ public void testUptimeIsUsedToWeightWriteLoad() {
136152
metadataBuilder.put(dataStream);
137153

138154
final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(() -> true, threadPool, maxIndexAge);
155+
writeLoadForecaster.refreshLicense();
139156

140157
final Metadata.Builder updatedMetadataBuilder = writeLoadForecaster.withWriteLoadForecastForWriteIndex(
141158
dataStream.getName(),
@@ -154,6 +171,7 @@ public void testForecastedWriteLoadIsOverriddenBySetting() {
154171
final TimeValue maxIndexAge = TimeValue.timeValueDays(7);
155172
final AtomicBoolean hasValidLicense = new AtomicBoolean(true);
156173
final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(hasValidLicense::get, threadPool, maxIndexAge);
174+
writeLoadForecaster.refreshLicense();
157175

158176
final Metadata.Builder metadataBuilder = Metadata.builder();
159177
final String dataStreamName = "logs-es";
@@ -197,6 +215,7 @@ public void testForecastedWriteLoadIsOverriddenBySetting() {
197215
assertThat(forecastedWriteLoad.getAsDouble(), is(equalTo(0.6)));
198216

199217
hasValidLicense.set(false);
218+
writeLoadForecaster.refreshLicense();
200219

201220
final OptionalDouble forecastedWriteLoadAfterLicenseChange = writeLoadForecaster.getForecastedWriteLoad(writeIndex);
202221
assertThat(forecastedWriteLoadAfterLicenseChange.isPresent(), is(false));
@@ -327,4 +346,56 @@ private DataStream createDataStream(String name, List<Index> backingIndices) {
327346
.setIndexMode(IndexMode.STANDARD)
328347
.build();
329348
}
349+
350+
public void testLicenseStateLogging() {
351+
352+
final var seenMessages = new ArrayList<String>();
353+
354+
final var collectingLoggingAssertion = new MockLog.SeenEventExpectation(
355+
"seen event",
356+
LicensedWriteLoadForecaster.class.getCanonicalName(),
357+
Level.INFO,
358+
"*"
359+
) {
360+
@Override
361+
public boolean innerMatch(LogEvent event) {
362+
final var message = event.getMessage().getFormattedMessage();
363+
if (message.startsWith("license state changed, now [")) {
364+
seenMessages.add(message);
365+
return true;
366+
}
367+
368+
return false;
369+
}
370+
};
371+
372+
MockLog.assertThatLogger(() -> {
373+
final var hasValidLicense = new AtomicBoolean();
374+
final var writeLoadForecaster = new LicensedWriteLoadForecaster(hasValidLicense::get, threadPool, randomTimeValue());
375+
assertThat(seenMessages, empty());
376+
writeLoadForecaster.refreshLicense();
377+
assertThat(seenMessages, empty());
378+
379+
hasValidLicense.set(true);
380+
writeLoadForecaster.refreshLicense();
381+
assertThat(seenMessages, contains("license state changed, now [valid]"));
382+
writeLoadForecaster.refreshLicense();
383+
assertThat(seenMessages, contains("license state changed, now [valid]"));
384+
385+
hasValidLicense.set(false);
386+
writeLoadForecaster.refreshLicense();
387+
assertThat(seenMessages, contains("license state changed, now [valid]", "license state changed, now [not valid]"));
388+
389+
hasValidLicense.set(true);
390+
ESTestCase.startInParallel(between(1, 10), ignored -> writeLoadForecaster.refreshLicense());
391+
assertThat(
392+
seenMessages,
393+
contains(
394+
"license state changed, now [valid]",
395+
"license state changed, now [not valid]",
396+
"license state changed, now [valid]"
397+
)
398+
);
399+
}, LicensedWriteLoadForecaster.class, collectingLoggingAssertion);
400+
}
330401
}

0 commit comments

Comments
 (0)