Skip to content

Commit 4931d08

Browse files
authored
[FLINK-38229][runtime-web] Enhanced Job History Retention Policies for HistoryServer (apache#26902)
1 parent 8aaeafc commit 4931d08

File tree

7 files changed

+402
-28
lines changed

7 files changed

+402
-28
lines changed

docs/layouts/shortcodes/generated/history_server_configuration.html

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,13 @@
3030
<td><h5>historyserver.archive.retained-jobs</h5></td>
3131
<td style="word-wrap: break-word;">-1</td>
3232
<td>Integer</td>
33-
<td>The maximum number of jobs to retain in each archive directory defined by `historyserver.archive.fs.dir`. If set to `-1`(default), there is no limit to the number of archives. If set to `0` or less than `-1` HistoryServer will throw an <code class="highlighter-rouge">IllegalConfigurationException</code>. </td>
33+
<td>The maximum number of jobs to retain in each archive directory defined by <code class="highlighter-rouge">historyserver.archive.fs.dir</code>. <ul><li>If the option is not specified as a positive number without specifying <code class="highlighter-rouge">historyserver.archive.retained-ttl</code>, all of the jobs archives will be retained. </li><li>If the option is specified as a positive number without specifying a value of <code class="highlighter-rouge">historyserver.archive.retained-ttl</code>, the jobs archive whose order index based modification time is equals to or less than the value will be retained. </li><li>If this option is specified as a positive number together with the specified <code class="highlighter-rouge">historyserver.archive.retained-ttl</code> option, the job archive will be removed if its TTL has expired or the retained job count has been reached. </li></ul>If set to <code class="highlighter-rouge">0</code> or less than <code class="highlighter-rouge">-1</code>, HistoryServer will throw an <code class="highlighter-rouge">IllegalConfigurationException</code>. <br />Note, when there are multiple history server instances, two recommended approaches when using this option are: <ul><li>Specify the option in only one HistoryServer instance to avoid errors caused by multiple instances simultaneously cleaning up remote files, </li><li>Or you can keep the value of this configuration consistent across them. </li></ul></td>
34+
</tr>
35+
<tr>
36+
<td><h5>historyserver.archive.retained-ttl</h5></td>
37+
<td style="word-wrap: break-word;">(none)</td>
38+
<td>Duration</td>
39+
<td>The time-to-live duration to retain the jobs archived in each archive directory defined by <code class="highlighter-rouge">historyserver.archive.fs.dir</code>. <ul><li>If the option is not specified without specifying <code class="highlighter-rouge">historyserver.archive.retained-jobs</code>, all of the jobs archives will be retained. </li><li>If the option is specified without specifying <code class="highlighter-rouge">historyserver.archive.retained-jobs</code>, the jobs archive whose modification time in the time-to-live duration will be retained. </li><li>If this option is specified as a positive time duration together with the <code class="highlighter-rouge">historyserver.archive.retained-jobs</code> option, the job archive will be removed if its TTL has expired or the retained job count has been reached. </li></ul>If set to equal to or less than <code class="highlighter-rouge">0</code> milliseconds, HistoryServer will throw an <code class="highlighter-rouge">IllegalConfigurationException</code>. <br />Note, when there are multiple history server instances, two recommended approaches when using this option are: <ul><li>Specify the option in only one HistoryServer instance to avoid errors caused by multiple instances simultaneously cleaning up remote files, </li><li>Or you can keep the value of this configuration consistent across them. </li></ul></td>
3440
</tr>
3541
<tr>
3642
<td><h5>historyserver.log.jobmanager.url-pattern</h5></td>

flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java

Lines changed: 62 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import static org.apache.flink.configuration.ConfigOptions.key;
2727
import static org.apache.flink.configuration.description.TextElement.code;
28+
import static org.apache.flink.configuration.description.TextElement.text;
2829

2930
/** The set of configuration options relating to the HistoryServer. */
3031
@PublicEvolving
@@ -126,21 +127,75 @@ public class HistoryServerOptions {
126127
"Enable HTTPs access to the HistoryServer web frontend. This is applicable only when the"
127128
+ " global SSL flag security.ssl.enabled is set to true.");
128129

130+
private static final String HISTORY_SERVER_RETAINED_JOBS_KEY =
131+
"historyserver.archive.retained-jobs";
132+
private static final String HISTORY_SERVER_RETAINED_TTL_KEY =
133+
"historyserver.archive.retained-ttl";
134+
private static final String NOTE_MESSAGE =
135+
"Note, when there are multiple history server instances, two recommended approaches when using this option are: ";
136+
private static final String CONFIGURE_SINGLE_INSTANCE =
137+
"Specify the option in only one HistoryServer instance to avoid errors caused by multiple instances simultaneously cleaning up remote files, ";
138+
private static final String CONFIGURE_CONSISTENT =
139+
"Or you can keep the value of this configuration consistent across them. ";
140+
129141
public static final ConfigOption<Integer> HISTORY_SERVER_RETAINED_JOBS =
130-
key("historyserver.archive.retained-jobs")
142+
key(HISTORY_SERVER_RETAINED_JOBS_KEY)
131143
.intType()
132144
.defaultValue(-1)
133145
.withDescription(
134146
Description.builder()
135147
.text(
136-
String.format(
137-
"The maximum number of jobs to retain in each archive directory defined by `%s`. ",
138-
HISTORY_SERVER_ARCHIVE_DIRS.key()))
139-
.text(
140-
"If set to `-1`(default), there is no limit to the number of archives. ")
148+
"The maximum number of jobs to retain in each archive directory defined by %s. ",
149+
code(HISTORY_SERVER_ARCHIVE_DIRS.key()))
150+
.list(
151+
text(
152+
"If the option is not specified as a positive number without specifying %s, all of the jobs archives will be retained. ",
153+
code(HISTORY_SERVER_RETAINED_TTL_KEY)),
154+
text(
155+
"If the option is specified as a positive number without specifying a value of %s, the jobs archive whose order index based modification time is equals to or less than the value will be retained. ",
156+
code(HISTORY_SERVER_RETAINED_TTL_KEY)),
157+
text(
158+
"If this option is specified as a positive number together with the specified %s option, the job archive will be removed if its TTL has expired or the retained job count has been reached. ",
159+
code(HISTORY_SERVER_RETAINED_TTL_KEY)))
141160
.text(
142-
"If set to `0` or less than `-1` HistoryServer will throw an %s. ",
161+
"If set to %s or less than %s, HistoryServer will throw an %s. ",
162+
code("0"),
163+
code("-1"),
143164
code("IllegalConfigurationException"))
165+
.linebreak()
166+
.text(NOTE_MESSAGE)
167+
.list(
168+
text(CONFIGURE_SINGLE_INSTANCE),
169+
text(CONFIGURE_CONSISTENT))
170+
.build());
171+
172+
public static final ConfigOption<Duration> HISTORY_SERVER_RETAINED_TTL =
173+
key(HISTORY_SERVER_RETAINED_TTL_KEY)
174+
.durationType()
175+
.noDefaultValue()
176+
.withDescription(
177+
Description.builder()
178+
.text(
179+
"The time-to-live duration to retain the jobs archived in each archive directory defined by %s. ",
180+
code(HISTORY_SERVER_ARCHIVE_DIRS.key()))
181+
.list(
182+
text(
183+
"If the option is not specified without specifying %s, all of the jobs archives will be retained. ",
184+
code(HISTORY_SERVER_RETAINED_JOBS_KEY)),
185+
text(
186+
"If the option is specified without specifying %s, the jobs archive whose modification time in the time-to-live duration will be retained. ",
187+
code(HISTORY_SERVER_RETAINED_JOBS_KEY)),
188+
text(
189+
"If this option is specified as a positive time duration together with the %s option, the job archive will be removed if its TTL has expired or the retained job count has been reached. ",
190+
code(HISTORY_SERVER_RETAINED_JOBS_KEY)))
191+
.text(
192+
"If set to equal to or less than %s milliseconds, HistoryServer will throw an %s. ",
193+
code("0"), code("IllegalConfigurationException"))
194+
.linebreak()
195+
.text(NOTE_MESSAGE)
196+
.list(
197+
text(CONFIGURE_SINGLE_INSTANCE),
198+
text(CONFIGURE_CONSISTENT))
144199
.build());
145200

146201
private HistoryServerOptions() {}

flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.flink.configuration.Configuration;
2323
import org.apache.flink.configuration.GlobalConfiguration;
2424
import org.apache.flink.configuration.HistoryServerOptions;
25-
import org.apache.flink.configuration.IllegalConfigurationException;
2625
import org.apache.flink.core.fs.FileSystem;
2726
import org.apache.flink.core.fs.Path;
2827
import org.apache.flink.core.plugin.PluginUtils;
@@ -38,6 +37,7 @@
3837
import org.apache.flink.runtime.security.SecurityUtils;
3938
import org.apache.flink.runtime.util.EnvironmentInformation;
4039
import org.apache.flink.runtime.util.Runnables;
40+
import org.apache.flink.runtime.webmonitor.history.retaining.CompositeJobRetainedStrategy;
4141
import org.apache.flink.runtime.webmonitor.utils.LogUrlUtil;
4242
import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
4343
import org.apache.flink.util.ExceptionUtils;
@@ -230,19 +230,13 @@ public HistoryServer(
230230

231231
refreshIntervalMillis =
232232
config.get(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL).toMillis();
233-
int maxHistorySize = config.get(HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS);
234-
if (maxHistorySize == 0 || maxHistorySize < -1) {
235-
throw new IllegalConfigurationException(
236-
"Cannot set %s to 0 or less than -1",
237-
HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS.key());
238-
}
239233
archiveFetcher =
240234
new HistoryServerArchiveFetcher(
241235
refreshDirs,
242236
webDir,
243237
jobArchiveEventListener,
244238
cleanupExpiredArchives,
245-
maxHistorySize);
239+
CompositeJobRetainedStrategy.createFrom(config));
246240

247241
this.shutdownHook =
248242
ShutdownHookUtil.addShutdownHook(

flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
3030
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
3131
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
32+
import org.apache.flink.runtime.webmonitor.history.retaining.JobRetainedStrategy;
3233
import org.apache.flink.util.FileUtils;
3334
import org.apache.flink.util.jackson.JacksonMapperFactory;
3435

@@ -112,8 +113,7 @@ public ArchiveEventType getType() {
112113
private final List<HistoryServer.RefreshLocation> refreshDirs;
113114
private final Consumer<ArchiveEvent> jobArchiveEventListener;
114115
private final boolean processExpiredArchiveDeletion;
115-
private final boolean processBeyondLimitArchiveDeletion;
116-
private final int maxHistorySize;
116+
private final JobRetainedStrategy jobRetainedStrategy;
117117

118118
/** Cache of all available jobs identified by their id. */
119119
private final Map<Path, Set<String>> cachedArchivesPerRefreshDirectory;
@@ -127,13 +127,12 @@ public ArchiveEventType getType() {
127127
File webDir,
128128
Consumer<ArchiveEvent> jobArchiveEventListener,
129129
boolean cleanupExpiredArchives,
130-
int maxHistorySize)
130+
JobRetainedStrategy jobRetainedStrategy)
131131
throws IOException {
132132
this.refreshDirs = checkNotNull(refreshDirs);
133133
this.jobArchiveEventListener = jobArchiveEventListener;
134134
this.processExpiredArchiveDeletion = cleanupExpiredArchives;
135-
this.maxHistorySize = maxHistorySize;
136-
this.processBeyondLimitArchiveDeletion = this.maxHistorySize > 0;
135+
this.jobRetainedStrategy = checkNotNull(jobRetainedStrategy);
137136
this.cachedArchivesPerRefreshDirectory = new HashMap<>();
138137
for (HistoryServer.RefreshLocation refreshDir : refreshDirs) {
139138
cachedArchivesPerRefreshDirectory.put(refreshDir.getPath(), new HashSet<>());
@@ -159,7 +158,7 @@ void fetchArchives() {
159158
Map<Path, Set<String>> jobsToRemove = new HashMap<>();
160159
cachedArchivesPerRefreshDirectory.forEach(
161160
(path, archives) -> jobsToRemove.put(path, new HashSet<>(archives)));
162-
Map<Path, Set<Path>> archivesBeyondSizeLimit = new HashMap<>();
161+
Map<Path, Set<Path>> archivesBeyondRetainedLimit = new HashMap<>();
163162
for (HistoryServer.RefreshLocation refreshLocation : refreshDirs) {
164163
Path refreshDir = refreshLocation.getPath();
165164
LOG.debug("Checking archive directory {}.", refreshDir);
@@ -176,7 +175,7 @@ void fetchArchives() {
176175
continue;
177176
}
178177

179-
int historySize = 0;
178+
int fileOrderedIndexOnModifiedTime = 0;
180179
for (FileStatus jobArchive : jobArchives) {
181180
Path jobArchivePath = jobArchive.getPath();
182181
String jobID = jobArchivePath.getName();
@@ -186,9 +185,10 @@ void fetchArchives() {
186185

187186
jobsToRemove.get(refreshDir).remove(jobID);
188187

189-
historySize++;
190-
if (historySize > maxHistorySize && processBeyondLimitArchiveDeletion) {
191-
archivesBeyondSizeLimit
188+
fileOrderedIndexOnModifiedTime++;
189+
if (!jobRetainedStrategy.shouldRetain(
190+
jobArchive, fileOrderedIndexOnModifiedTime)) {
191+
archivesBeyondRetainedLimit
192192
.computeIfAbsent(refreshDir, ignored -> new HashSet<>())
193193
.add(jobArchivePath);
194194
continue;
@@ -220,8 +220,8 @@ void fetchArchives() {
220220
&& processExpiredArchiveDeletion) {
221221
events.addAll(cleanupExpiredJobs(jobsToRemove));
222222
}
223-
if (!archivesBeyondSizeLimit.isEmpty() && processBeyondLimitArchiveDeletion) {
224-
events.addAll(cleanupJobsBeyondSizeLimit(archivesBeyondSizeLimit));
223+
if (!archivesBeyondRetainedLimit.isEmpty()) {
224+
events.addAll(cleanupJobsBeyondSizeLimit(archivesBeyondRetainedLimit));
225225
}
226226
if (!events.isEmpty()) {
227227
updateJobOverview(webOverviewDir, webDir);

0 commit comments

Comments
 (0)