Skip to content

Add last refresh times and unwritten changes state to RefreshStats #111220

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/reference/cluster/nodes-stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,17 @@ A value of `-1` indicates that this is not available.
`listeners`::
(integer) Number of refresh listeners.

`last_refresh_timestamp`::
(integer) Time of the last internal refresh.
Recorded in milliseconds since the {wikipedia}/Unix_time[Unix Epoch].

`last_external_refresh_timestamp`::
(integer) Time of the last external refresh.
Recorded in milliseconds since the {wikipedia}/Unix_time[Unix Epoch].

`has_unwritten_changes`::
(Boolean) Whether there are changes that require a refresh to be written to disk.

=======

`flush`::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ENRICH_CACHE_STATS_SIZE_ADDED = def(8_707_00_0);
public static final TransportVersion ENTERPRISE_GEOIP_DOWNLOADER = def(8_708_00_0);
public static final TransportVersion NODES_STATS_ENUM_SET = def(8_709_00_0);
public static final TransportVersion LAST_REFRESH_TIME_STATS = def(8_710_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1199,14 +1199,18 @@ private void fillSegmentInfo(
public abstract List<Segment> segments(boolean includeVectorFormatsInfo);

public boolean refreshNeeded() {
return refreshNeeded("refresh_needed");
}

public boolean refreshNeeded(String source) {
if (store.tryIncRef()) {
/*
we need to inc the store here since we acquire a searcher and that might keep a file open on the
store. this violates the assumption that all files are closed when
the store is closed so we need to make sure we increment it here
*/
try {
try (Searcher searcher = acquireSearcher("refresh_needed", SearcherScope.EXTERNAL)) {
try (Searcher searcher = acquireSearcher(source, SearcherScope.EXTERNAL)) {
return searcher.getDirectoryReader().isCurrent() == false;
}
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,8 +511,10 @@ final boolean assertSearcherIsWarmedUp(String source, SearcherScope scope) {
if (scope == SearcherScope.EXTERNAL) {
switch (source) {
// we can access segment_stats while a shard is still in the recovering state.
// we may also access refresh state before the searcher is warmed up.
case "segments":
case "segments_stats":
case "refresh_needed_stats":
break;
default:
assert externalReaderManager.isWarmedUp : "searcher was not warmed up yet for source[" + source + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ public class RefreshStats implements Writeable, ToXContentFragment {
*/
private int listeners;

private long lastRefreshTime;

private long lastExternalRefreshTime;

private boolean hasUnwrittenChanges;

public RefreshStats() {}

public RefreshStats(StreamInput in) throws IOException {
Expand All @@ -44,6 +50,11 @@ public RefreshStats(StreamInput in) throws IOException {
externalTotalTimeInMillis = in.readVLong();
}
listeners = in.readVInt();
if (in.getTransportVersion().onOrAfter(TransportVersions.LAST_REFRESH_TIME_STATS)) {
lastRefreshTime = in.readVLong();
lastExternalRefreshTime = in.readVLong();
hasUnwrittenChanges = in.readBoolean();
}
}

@Override
Expand All @@ -55,14 +66,31 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(externalTotalTimeInMillis);
}
out.writeVInt(listeners);
if (out.getTransportVersion().onOrAfter(TransportVersions.LAST_REFRESH_TIME_STATS)) {
out.writeVLong(lastRefreshTime);
out.writeVLong(lastExternalRefreshTime);
out.writeBoolean(hasUnwrittenChanges);
}
}

public RefreshStats(long total, long totalTimeInMillis, long externalTotal, long externalTotalTimeInMillis, int listeners) {
public RefreshStats(
long total,
long totalTimeInMillis,
long externalTotal,
long externalTotalTimeInMillis,
int listeners,
long lastRefreshTime,
long lastExternalRefreshTime,
boolean hasUnwrittenChanges
) {
this.total = total;
this.totalTimeInMillis = totalTimeInMillis;
this.externalTotal = externalTotal;
this.externalTotalTimeInMillis = externalTotalTimeInMillis;
this.listeners = listeners;
this.lastRefreshTime = lastRefreshTime;
this.lastExternalRefreshTime = lastExternalRefreshTime;
this.hasUnwrittenChanges = hasUnwrittenChanges;
}

public void add(RefreshStats refreshStats) {
Expand All @@ -78,6 +106,9 @@ public void addTotals(RefreshStats refreshStats) {
this.externalTotal += refreshStats.externalTotal;
this.externalTotalTimeInMillis += refreshStats.externalTotalTimeInMillis;
this.listeners += refreshStats.listeners;
this.lastRefreshTime = Math.max(this.lastRefreshTime, refreshStats.lastRefreshTime);
this.lastExternalRefreshTime = Math.max(this.lastExternalRefreshTime, refreshStats.lastExternalRefreshTime);
this.hasUnwrittenChanges |= refreshStats.hasUnwrittenChanges;
}

/**
Expand Down Expand Up @@ -129,6 +160,27 @@ public int getListeners() {
return listeners;
}

/**
* Timestamp of the last refresh.
*/
public long getLastRefreshTime() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to make these values timestamps rather than a duration (like timeSinceLastRefresh) because from what I could tell timestamps are more widely used in the stats, but also because it was proving it be very difficult to write solid tests when the stat grows with time.

return lastRefreshTime;
}

/**
* Timestamp of the last external refresh.
*/
public long getLastExternalRefreshTime() {
return lastExternalRefreshTime;
}

/**
* Whether there are changes that need to be written to disk or not.
*/
public boolean getHasUnwrittenChanges() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went back and forth between unwritten and unrefreshed and settled on unwritten, but I'm happy to change it to unrefreshed..!

return hasUnwrittenChanges;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("refresh");
Expand All @@ -137,6 +189,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("external_total", externalTotal);
builder.humanReadableField("external_total_time_in_millis", "external_total_time", getExternalTotalTime());
builder.field("listeners", listeners);
builder.field("last_refresh_timestamp", lastRefreshTime);
builder.field("last_external_refresh_timestamp", lastExternalRefreshTime);
builder.field("has_unwritten_changes", hasUnwrittenChanges);
builder.endObject();
return builder;
}
Expand All @@ -151,11 +206,23 @@ public boolean equals(Object obj) {
&& totalTimeInMillis == rhs.totalTimeInMillis
&& externalTotal == rhs.externalTotal
&& externalTotalTimeInMillis == rhs.externalTotalTimeInMillis
&& listeners == rhs.listeners;
&& listeners == rhs.listeners
&& lastRefreshTime == rhs.lastRefreshTime
&& lastExternalRefreshTime == rhs.lastExternalRefreshTime
&& hasUnwrittenChanges == rhs.hasUnwrittenChanges;
}

@Override
public int hashCode() {
return Objects.hash(total, totalTimeInMillis, externalTotal, externalTotalTimeInMillis, listeners);
return Objects.hash(
total,
totalTimeInMillis,
externalTotal,
externalTotalTimeInMillis,
listeners,
lastRefreshTime,
lastExternalRefreshTime,
hasUnwrittenChanges
);
}
}
19 changes: 15 additions & 4 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final MeanMetric externalRefreshMetric = new MeanMetric();
private final MeanMetric flushMetric = new MeanMetric();
private final CounterMetric periodicFlushMetric = new CounterMetric();
private final AtomicLong lastRefreshTime = new AtomicLong();
private final AtomicLong lastExternalRefreshTime = new AtomicLong();

private final ShardEventListener shardEventListener = new ShardEventListener();

Expand Down Expand Up @@ -398,7 +400,8 @@ public IndexShard(
() -> refresh("too_many_listeners"),
logger,
threadPool.getThreadContext(),
externalRefreshMetric
externalRefreshMetric,
lastExternalRefreshTime
);
lastSearcherAccess.set(threadPool.relativeTimeInMillis());
persistMetadata(path, indexSettings, shardRouting, null, logger);
Expand Down Expand Up @@ -1304,12 +1307,17 @@ public long getWritingBytes() {

public RefreshStats refreshStats() {
int listeners = refreshListeners.pendingCount();
Engine engine = getEngineOrNull();
boolean hasUnwrittenChanges = engine != null && engine.refreshNeeded("refresh_needed_stats");
return new RefreshStats(
refreshMetric.count(),
TimeUnit.NANOSECONDS.toMillis(refreshMetric.sum()),
externalRefreshMetric.count(),
TimeUnit.NANOSECONDS.toMillis(externalRefreshMetric.sum()),
listeners
listeners,
lastRefreshTime.get(),
lastExternalRefreshTime.get(),
hasUnwrittenChanges
);
}

Expand Down Expand Up @@ -3490,7 +3498,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
List.of(refreshListeners, refreshPendingLocationListener, refreshFieldHasValueListener),
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
Collections.singletonList(new RefreshMetricUpdater(refreshMetric, lastRefreshTime)),
indexSort,
circuitBreakerService,
globalCheckpointSupplier,
Expand Down Expand Up @@ -4162,9 +4170,11 @@ private static class RefreshMetricUpdater implements ReferenceManager.RefreshLis
private final MeanMetric refreshMetric;
private long currentRefreshStartTime;
private Thread callingThread = null;
private AtomicLong lastRefreshTime;

private RefreshMetricUpdater(MeanMetric refreshMetric) {
private RefreshMetricUpdater(MeanMetric refreshMetric, AtomicLong lastRefreshTime) {
this.refreshMetric = refreshMetric;
this.lastRefreshTime = lastRefreshTime;
}

@Override
Expand All @@ -4190,6 +4200,7 @@ public void afterRefresh(boolean didRefresh) {
callingThread = null;
}
refreshMetric.inc(System.nanoTime() - currentRefreshStartTime);
lastRefreshTime.set(System.currentTimeMillis());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.IntSupplier;
import java.util.function.LongSupplier;
Expand All @@ -44,6 +45,7 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
private final Logger logger;
private final ThreadContext threadContext;
private final MeanMetric refreshMetric;
private final AtomicLong lastRefreshTime;

/**
* Time in nanosecond when beforeRefresh() is called. Used for calculating refresh metrics.
Expand Down Expand Up @@ -83,13 +85,15 @@ public RefreshListeners(
final Runnable forceRefresh,
final Logger logger,
final ThreadContext threadContext,
final MeanMetric refreshMetric
final MeanMetric refreshMetric,
final AtomicLong lastRefreshTime
) {
this.getMaxRefreshListeners = getMaxRefreshListeners;
this.forceRefresh = forceRefresh;
this.logger = logger;
this.threadContext = threadContext;
this.refreshMetric = refreshMetric;
this.lastRefreshTime = lastRefreshTime;
}

/**
Expand Down Expand Up @@ -308,6 +312,7 @@ public void beforeRefresh() throws IOException {
public void afterRefresh(boolean didRefresh) throws IOException {
// Increment refresh metric before communicating to listeners.
refreshMetric.inc(System.nanoTime() - currentRefreshStartTime);
lastRefreshTime.set(System.currentTimeMillis());

/* Set the lastRefreshedLocation so listeners that come in for locations before that will just execute inline without messing
* around with refreshListeners or synchronizing at all. Note that it is not safe for us to abort early if we haven't advanced the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ private static CommonStats createShardLevelCommonStats() {
mergeStats.add(++iota, ++iota, ++iota, ++iota, ++iota, ++iota, ++iota, ++iota, ++iota, 1.0 * ++iota);

indicesCommonStats.getMerge().add(mergeStats);
indicesCommonStats.getRefresh().add(new RefreshStats(++iota, ++iota, ++iota, ++iota, ++iota));
indicesCommonStats.getRefresh().add(new RefreshStats(++iota, ++iota, ++iota, ++iota, ++iota, ++iota, ++iota, false));
indicesCommonStats.getFlush().add(new FlushStats(++iota, ++iota, ++iota, ++iota));
indicesCommonStats.getWarmer().add(new WarmerStats(++iota, ++iota, ++iota));
indicesCommonStats.getCompletion().add(new CompletionStats(++iota, null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ public void testSerialize() throws IOException {
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
between(0, Integer.MAX_VALUE)
between(0, Integer.MAX_VALUE),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomBoolean()
);
BytesStreamOutput out = new BytesStreamOutput();
stats.writeTo(out);
Expand All @@ -34,5 +37,8 @@ public void testSerialize() throws IOException {
assertEquals(stats.getListeners(), read.getListeners());
assertEquals(stats.getTotalTimeInMillis(), read.getTotalTimeInMillis());
assertEquals(stats.getExternalTotalTimeInMillis(), read.getExternalTotalTimeInMillis());
assertEquals(stats.getLastRefreshTime(), read.getLastRefreshTime());
assertEquals(stats.getLastExternalRefreshTime(), read.getLastExternalRefreshTime());
assertEquals(stats.getHasUnwrittenChanges(), read.getHasUnwrittenChanges());
}
}
Loading