Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>historyserver.archive.cached-retained-jobs</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>The maximum number of n latest jobs to retain in the local directory defined by `historyserver.web.tmpdir`. If this configuration is provided, the remote and local storage of job archives will be decoupled.If set to `0` or less than `-1` HistoryServer will throw an <code class="highlighter-rouge">IllegalConfigurationException</code>. </td>
</tr>
<tr>
<td><h5>historyserver.archive.clean-expired-jobs</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand All @@ -26,6 +32,12 @@
<td>Duration</td>
<td>Interval for refreshing the archived job directories.</td>
</tr>
<tr>
<td><h5>historyserver.archive.num-cached-most-recently-viewed-jobs</h5></td>
<td style="word-wrap: break-word;">1</td>
<td>Integer</td>
<td>The maximum number of jobs to retain in the local cache defined by `historyserver.web.tmpdir` which stores the job archives that are fetched from the remote storage. This limit is distinct from the number of most recent jobs which will in the cache.The total cache size is a combination of the number of remote cache jobs and the number of remote fetch cached jobs and retained cache jobs.If set to less than `0` HistoryServer will throw an <code class="highlighter-rouge">IllegalConfigurationException</code>. </td>
</tr>
<tr>
<td><h5>historyserver.archive.retained-jobs</h5></td>
<td style="word-wrap: break-word;">-1</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,25 @@ public class HistoryServerOptions {
"Enable HTTPs access to the HistoryServer web frontend. This is applicable only when the"
+ " global SSL flag security.ssl.enabled is set to true.");

public static final ConfigOption<Integer> HISTORY_SERVER_CACHED_JOBS =
key("historyserver.archive.cached-retained-jobs")
.intType()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
String.format(
"The maximum number of n latest jobs to retain in the local directory defined by `%s`. ",
HISTORY_SERVER_WEB_DIR.key()))
.text(
"If this configuration is provided, the remote and local storage of job archives will be decoupled.")
.text(
"If set to `0` or less than `-1` HistoryServer will throw an %s. ",
code(
IllegalConfigurationException.class
.getSimpleName()))
.build());

public static final ConfigOption<Integer> HISTORY_SERVER_RETAINED_JOBS =
key("historyserver.archive.retained-jobs")
.intType()
Expand All @@ -140,7 +159,30 @@ public class HistoryServerOptions {
"If set to `-1`(default), there is no limit to the number of archives. ")
.text(
"If set to `0` or less than `-1` HistoryServer will throw an %s. ",
code("IllegalConfigurationException"))
code(
IllegalConfigurationException.class
.getSimpleName()))
.build());

public static final ConfigOption<Integer> HISTORY_SERVER_NUM_CACHED_MOST_RECENTLY_VIEWED_JOBS =
key("historyserver.archive.num-cached-most-recently-viewed-jobs")
.intType()
.defaultValue(1)
.withDescription(
Description.builder()
.text(
String.format(
"The maximum number of jobs to retain in the local cache defined by `%s` which "
+ "stores the job archives that are fetched from the remote storage. This "
+ "limit is distinct from the number of most recent jobs which will in the cache."
+ "The total cache size is a combination of the number of remote cache jobs and "
+ "the number of remote fetch cached jobs and retained cache jobs.",
HISTORY_SERVER_WEB_DIR.key()))
.text(
"If set to less than `0` HistoryServer will throw an %s. ",
code(
IllegalConfigurationException.class
.getSimpleName()))
.build());

private HistoryServerOptions() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ public class HistoryServer {

private final HistoryServerArchiveFetcher archiveFetcher;

private final HistoryServerStaticFileServerHandler staticFileServerHandler;

@Nullable private final SSLHandlerFactory serverSSLFactory;
private WebFrontendBootstrap netty;

Expand Down Expand Up @@ -215,12 +217,12 @@ public HistoryServer(
throw new FlinkException(
HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS + " was not configured.");
}
List<RefreshLocation> refreshDirs = new ArrayList<>();
List<HistoryServer.RefreshLocation> refreshDirs = new ArrayList<>();
for (String refreshDirectory : refreshDirectories.split(",")) {
try {
Path refreshPath = new Path(refreshDirectory);
FileSystem refreshFS = refreshPath.getFileSystem();
refreshDirs.add(new RefreshLocation(refreshPath, refreshFS));
refreshDirs.add(new HistoryServer.RefreshLocation(refreshPath, refreshFS));
} catch (Exception e) {
// there's most likely something wrong with the path itself, so we ignore it from
// here on
Expand All @@ -238,19 +240,27 @@ public HistoryServer(

refreshIntervalMillis =
config.get(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL).toMillis();
int maxHistorySize = config.get(HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS);
if (maxHistorySize == 0 || maxHistorySize < -1) {
throw new IllegalConfigurationException(
"Cannot set %s to 0 or less than -1",
HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS.key());
}
int maxHistorySize = validateRetainedJobsConfig(config);
boolean remoteFetchEnabled =
config.contains(HistoryServerOptions.HISTORY_SERVER_CACHED_JOBS);

int generalCachedJobSize = validateCachedJobsConfig(config, remoteFetchEnabled);
int numCachedMostRecentlyViewedJobs = validateNumCachedMostRecentlyViewedJobsConfig(config);

archiveFetcher =
new HistoryServerArchiveFetcher(
refreshDirs,
webDir,
jobArchiveEventListener,
cleanupExpiredArchives,
maxHistorySize);
maxHistorySize,
generalCachedJobSize,
remoteFetchEnabled,
numCachedMostRecentlyViewedJobs);

staticFileServerHandler =
new HistoryServerStaticFileServerHandler(
webDir, remoteFetchEnabled, archiveFetcher);

this.shutdownHook =
ShutdownHookUtil.addShutdownHook(
Expand Down Expand Up @@ -310,7 +320,7 @@ void start() throws IOException, InterruptedException {
new GeneratedLogUrlHandler(
CompletableFuture.completedFuture(pattern))));

router.addGet("/:*", new HistoryServerStaticFileServerHandler(webDir));
router.addGet("/:*", staticFileServerHandler);

createDashboardConfigFile();

Expand Down Expand Up @@ -392,12 +402,69 @@ private static String createConfigJson(DashboardConfiguration dashboardConfigura
return OBJECT_MAPPER.writeValueAsString(dashboardConfiguration);
}

/**
* Validates and retrieves the retained jobs configuration.
*
* @param config the configuration
* @return the maximum history size
* @throws IllegalConfigurationException if the configuration is invalid
*/
private int validateRetainedJobsConfig(Configuration config) {
int maxHistorySize = config.get(HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS);
if (maxHistorySize == 0 || maxHistorySize < -1) {
throw new IllegalConfigurationException(
"Cannot set %s to 0 or less than -1",
HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS.key());
}
return maxHistorySize;
}

/**
* Validates and retrieves the cached jobs configuration if remote fetch is enabled.
*
* @param config the configuration
* @param remoteFetchEnabled whether remote fetch is enabled
* @return the general cached job size
* @throws IllegalConfigurationException if the configuration is invalid
*/
private int validateCachedJobsConfig(Configuration config, boolean remoteFetchEnabled) {
int generalCachedJobSize = -1;
if (remoteFetchEnabled) {
generalCachedJobSize = config.get(HistoryServerOptions.HISTORY_SERVER_CACHED_JOBS);
if (generalCachedJobSize == 0 || generalCachedJobSize < -1) {
throw new IllegalConfigurationException(
"Cannot set %s to 0 or less than -1",
HistoryServerOptions.HISTORY_SERVER_CACHED_JOBS.key());
}
}
return generalCachedJobSize;
}

/**
* Validates and retrieves the number of cached most recently viewed jobs configuration.
*
* @param config the configuration
* @return the number of cached most recently viewed jobs
* @throws IllegalConfigurationException if the configuration is invalid
*/
private int validateNumCachedMostRecentlyViewedJobsConfig(Configuration config) {
int numCachedMostRecentlyViewedJobs =
config.get(
HistoryServerOptions.HISTORY_SERVER_NUM_CACHED_MOST_RECENTLY_VIEWED_JOBS);
if (numCachedMostRecentlyViewedJobs <= 0) {
throw new IllegalConfigurationException(
"Cannot set %s to less than 0",
HistoryServerOptions.HISTORY_SERVER_NUM_CACHED_MOST_RECENTLY_VIEWED_JOBS.key());
}
return numCachedMostRecentlyViewedJobs;
}

/** Container for the {@link Path} and {@link FileSystem} of a refresh directory. */
static class RefreshLocation {
public static class RefreshLocation {
private final Path path;
private final FileSystem fs;

private RefreshLocation(Path path, FileSystem fs) {
public RefreshLocation(Path path, FileSystem fs) {
this.path = path;
this.fs = fs;
}
Expand Down
Loading