/applications/:applicationid/*
*
*
* and relies on static files that are served by the {@link
@@ -110,8 +112,18 @@ public class HistoryServer {
private final long webRefreshIntervalMillis;
private final File webDir;
+ /**
+ * The archive fetcher is responsible for fetching job archives that are not part of an
+ * application (legacy jobs created before application archiving was introduced in FLINK-38761).
+ */
private final HistoryServerArchiveFetcher archiveFetcher;
+ /**
+ * The archive fetcher is responsible for fetching application archives and their associated job
+ * archives.
+ */
+ private final HistoryServerApplicationArchiveFetcher applicationArchiveFetcher;
+
@Nullable private final SSLHandlerFactory serverSSLFactory;
private WebFrontendBootstrap netty;
@@ -161,7 +173,7 @@ public Integer call() throws Exception {
}
public HistoryServer(Configuration config) throws IOException, FlinkException {
- this(config, (event) -> {});
+ this(config, (event) -> {}, (event) -> {});
}
/**
@@ -175,7 +187,9 @@ public HistoryServer(Configuration config) throws IOException, FlinkException {
*/
public HistoryServer(
Configuration config,
- Consumer jobArchiveEventListener)
+ Consumer jobArchiveEventListener,
+ Consumer
+ applicationArchiveEventListener)
throws IOException, FlinkException {
Preconditions.checkNotNull(config);
Preconditions.checkNotNull(jobArchiveEventListener);
@@ -199,8 +213,10 @@ public HistoryServer(
webDir = clearWebDir(config);
- boolean cleanupExpiredArchives =
+ boolean cleanupExpiredJobs =
config.get(HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS);
+ boolean cleanupExpiredApplications =
+ config.get(HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS);
String refreshDirectories = config.get(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS);
if (refreshDirectories == null) {
@@ -235,8 +251,15 @@ public HistoryServer(
refreshDirs,
webDir,
jobArchiveEventListener,
- cleanupExpiredArchives,
- CompositeJobRetainedStrategy.createFrom(config));
+ cleanupExpiredJobs,
+ CompositeArchiveRetainedStrategy.createForJobFromConfig(config));
+ applicationArchiveFetcher =
+ new HistoryServerApplicationArchiveFetcher(
+ refreshDirs,
+ webDir,
+ applicationArchiveEventListener,
+ cleanupExpiredApplications,
+ CompositeArchiveRetainedStrategy.createForApplicationFromConfig(config));
this.shutdownHook =
ShutdownHookUtil.addShutdownHook(
@@ -339,7 +362,11 @@ void start() throws IOException, InterruptedException {
private Runnable getArchiveFetchingRunnable() {
return Runnables.withUncaughtExceptionHandler(
- () -> archiveFetcher.fetchArchives(), FatalExitExceptionHandler.INSTANCE);
+ () -> {
+ archiveFetcher.fetchArchives();
+ applicationArchiveFetcher.fetchArchives();
+ },
+ FatalExitExceptionHandler.INSTANCE);
}
void stop() {
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerApplicationArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerApplicationArchiveFetcher.java
new file mode 100644
index 0000000000000..310e91b77538e
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerApplicationArchiveFetcher.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.history.ArchivePathUtils;
+import org.apache.flink.runtime.history.FsJsonArchivist;
+import org.apache.flink.runtime.messages.webmonitor.ApplicationDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleApplicationsDetails;
+import org.apache.flink.runtime.rest.messages.ApplicationsOverviewHeaders;
+import org.apache.flink.runtime.webmonitor.history.retaining.ArchiveRetainedStrategy;
+import org.apache.flink.util.FileUtils;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+
+/**
+ * This class is used by the {@link HistoryServer} to fetch the application and job archives that
+ * are located at {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS}. The directories are
+ * polled in regular intervals, defined by {@link
+ * HistoryServerOptions#HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL}.
+ *
+ * The archives are downloaded and expanded into a file structure analog to the REST API.
+ *
+ *
Removes existing archives from these directories and the cache according to {@link
+ * ArchiveRetainedStrategy} and {@link
+ * HistoryServerOptions#HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS}.
+ */
+public class HistoryServerApplicationArchiveFetcher extends HistoryServerArchiveFetcher {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(HistoryServerApplicationArchiveFetcher.class);
+
+ private static final String APPLICATIONS_SUBDIR = "applications";
+ private static final String APPLICATION_OVERVIEWS_SUBDIR = "application-overviews";
+
+ private final Map>> cachedApplicationIdsToJobIds =
+ new HashMap<>();
+
+ private final File webApplicationDir;
+ private final File webApplicationsOverviewDir;
+
+ HistoryServerApplicationArchiveFetcher(
+ List refreshDirs,
+ File webDir,
+ Consumer archiveEventListener,
+ boolean cleanupExpiredArchives,
+ ArchiveRetainedStrategy retainedStrategy)
+ throws IOException {
+ super(refreshDirs, webDir, archiveEventListener, cleanupExpiredArchives, retainedStrategy);
+
+ for (HistoryServer.RefreshLocation refreshDir : refreshDirs) {
+ cachedApplicationIdsToJobIds.put(refreshDir.getPath(), new HashMap<>());
+ }
+ this.webApplicationDir = new File(webDir, APPLICATIONS_SUBDIR);
+ Files.createDirectories(webApplicationDir.toPath());
+ this.webApplicationsOverviewDir = new File(webDir, APPLICATION_OVERVIEWS_SUBDIR);
+ Files.createDirectories(webApplicationsOverviewDir.toPath());
+ updateApplicationOverview();
+ }
+
+ @Override
+ List listValidArchives(FileSystem refreshFS, Path refreshDir) throws IOException {
+ List applicationArchiveDirs = new ArrayList<>();
+ FileStatus[] clusterDirs = refreshFS.listStatus(refreshDir);
+ if (clusterDirs == null) {
+ // the entire refreshDirectory was removed
+ return applicationArchiveDirs;
+ }
+
+ // Check for application archive directories in the cluster directories and named according
+ // to the application ID format
+ for (FileStatus clusterDir : clusterDirs) {
+ if (clusterDir.isDir() && isValidId(clusterDir.getPath().getName(), refreshDir)) {
+ Path applicationsDir =
+ new Path(clusterDir.getPath(), ArchivePathUtils.APPLICATIONS_DIR);
+ FileStatus[] applicationDirs = refreshFS.listStatus(applicationsDir);
+ if (applicationDirs == null) {
+ // the entire applicationsDirectory was removed
+ return applicationArchiveDirs;
+ }
+
+ for (FileStatus applicationDir : applicationDirs) {
+ if (applicationDir.isDir()
+ && isValidId(applicationDir.getPath().getName(), refreshDir)) {
+ applicationArchiveDirs.add(applicationDir);
+ }
+ }
+ }
+ }
+
+ return applicationArchiveDirs;
+ }
+
+ private boolean isValidId(String id, Path refreshDir) {
+ try {
+ ApplicationID.fromHexString(id);
+ return true;
+ } catch (IllegalArgumentException iae) {
+ LOG.debug(
+ "Archive directory {} contained file with unexpected name {}. Ignoring file.",
+ refreshDir,
+ id,
+ iae);
+ return false;
+ }
+ }
+
+ @Override
+ List processArchive(String archiveId, Path archivePath, Path refreshDir)
+ throws IOException {
+ FileSystem fs = archivePath.getFileSystem();
+ Path applicationArchive = new Path(archivePath, ArchivePathUtils.APPLICATION_ARCHIVE_NAME);
+ if (!fs.exists(applicationArchive)) {
+ throw new IOException("Application archive " + applicationArchive + " does not exist.");
+ }
+
+ List events = new ArrayList<>();
+ events.add(processApplicationArchive(archiveId, applicationArchive));
+
+ Path jobArchivesDir = new Path(archivePath, ArchivePathUtils.JOBS_DIR);
+
+ List jobArchives = listValidJobArchives(fs, jobArchivesDir);
+ for (FileStatus jobArchive : jobArchives) {
+ String jobId = jobArchive.getPath().getName();
+ cachedApplicationIdsToJobIds
+ .get(refreshDir)
+ .computeIfAbsent(archiveId, k -> new HashSet<>())
+ .add(jobId);
+ events.add(processJobArchive(jobId, jobArchive.getPath()));
+ }
+
+ return events;
+ }
+
+ private ArchiveEvent processApplicationArchive(String applicationId, Path applicationArchive)
+ throws IOException {
+ for (ArchivedJson archive : FsJsonArchivist.readArchivedJsons(applicationArchive)) {
+ String path = archive.getPath();
+ String json = archive.getJson();
+
+ File target;
+ if (path.equals(ApplicationsOverviewHeaders.URL)) {
+ target = new File(webApplicationsOverviewDir, applicationId + JSON_FILE_ENDING);
+ } else {
+ // this implicitly writes into webApplicationDir
+ target = new File(webDir, path + JSON_FILE_ENDING);
+ }
+
+ writeTargetFile(target, json);
+ }
+
+ return new ArchiveEvent(applicationId, ArchiveEventType.CREATED);
+ }
+
+ @Override
+ void deleteFromRemote(Path archive) throws IOException {
+ // delete application archive directory recursively (including all its job archives)
+ archive.getFileSystem().delete(archive, true);
+ }
+
+ @Override
+ List deleteCachedArchives(String archiveId, Path refreshDir) {
+ LOG.info("Archive directories for application {} is deleted", archiveId);
+ List deleteLog = new ArrayList<>();
+
+ deleteLog.add(deleteApplicationFiles(archiveId));
+
+ Set jobIds = cachedApplicationIdsToJobIds.get(refreshDir).remove(archiveId);
+ if (jobIds != null) {
+ jobIds.forEach(jobId -> deleteLog.add(deleteJobFiles(jobId)));
+ }
+
+ return deleteLog;
+ }
+
+ private ArchiveEvent deleteApplicationFiles(String applicationId) {
+ // Make sure we do not include this application in the overview
+ try {
+ Files.deleteIfExists(
+ new File(webApplicationsOverviewDir, applicationId + JSON_FILE_ENDING)
+ .toPath());
+ } catch (IOException ioe) {
+ LOG.warn("Could not delete file from overview directory.", ioe);
+ }
+
+ // Clean up application files we may have created
+ File applicationDirectory = new File(webApplicationDir, applicationId);
+ try {
+ FileUtils.deleteDirectory(applicationDirectory);
+ } catch (IOException ioe) {
+ LOG.warn("Could not clean up application directory.", ioe);
+ }
+
+ try {
+ Files.deleteIfExists(
+ new File(webApplicationDir, applicationId + JSON_FILE_ENDING).toPath());
+ } catch (IOException ioe) {
+ LOG.warn("Could not delete file from application directory.", ioe);
+ }
+
+ return new ArchiveEvent(applicationId, ArchiveEventType.DELETED);
+ }
+
+ @Override
+ void updateOverview() {
+ updateApplicationOverview();
+ updateJobOverview();
+ }
+
+ /**
+ * This method replicates the JSON response that would be given by the
+ * ApplicationsOverviewHandler when listing applications.
+ *
+ * Every application archive contains an overview entry with the same structure. Since
+ * applications are archived on their own however the list of applications only contains a
+ * single application.
+ *
+ *
For the display in the HistoryServer WebFrontend we have to combine these overviews.
+ */
+ private void updateApplicationOverview() {
+ try (JsonGenerator gen =
+ jacksonFactory.createGenerator(
+ HistoryServer.createOrGetFile(webDir, ApplicationsOverviewHeaders.URL))) {
+ File[] overviews = new File(webApplicationsOverviewDir.getPath()).listFiles();
+ if (overviews != null) {
+ Collection allApplications = new ArrayList<>(overviews.length);
+ for (File overview : overviews) {
+ MultipleApplicationsDetails subApplications =
+ mapper.readValue(overview, MultipleApplicationsDetails.class);
+ allApplications.addAll(subApplications.getApplications());
+ }
+ mapper.writeValue(gen, new MultipleApplicationsDetails(allApplications));
+ }
+ } catch (IOException ioe) {
+ LOG.error("Failed to update application overview.", ioe);
+ }
+ }
+}
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index 4fe8bd58d5b7f..59eb258b3ed1e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -29,7 +29,7 @@
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
-import org.apache.flink.runtime.webmonitor.history.retaining.JobRetainedStrategy;
+import org.apache.flink.runtime.webmonitor.history.retaining.ArchiveRetainedStrategy;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.jackson.JacksonMapperFactory;
@@ -48,7 +48,6 @@
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -68,34 +67,31 @@
*
* The archives are downloaded and expanded into a file structure analog to the REST API.
*
- *
Removes existing archives from these directories and the cache if configured by {@link
- * HistoryServerOptions#HISTORY_SERVER_CLEANUP_EXPIRED_JOBS} or {@link
- * HistoryServerOptions#HISTORY_SERVER_RETAINED_JOBS}.
+ *
Removes existing archives from these directories and the cache according to {@link
+ * ArchiveRetainedStrategy} and {@link HistoryServerOptions#HISTORY_SERVER_CLEANUP_EXPIRED_JOBS}.
*/
class HistoryServerArchiveFetcher {
- /** Possible job archive operations in history-server. */
+ /** Possible archive operations in history-server. */
public enum ArchiveEventType {
- /** Job archive was found in one refresh location and created in history server. */
+ /** Archive was found in one refresh location and created in history server. */
CREATED,
- /**
- * Job archive was deleted from one of refresh locations and deleted from history server.
- */
+ /** Archive was deleted from one of refresh locations and deleted from history server. */
DELETED
}
- /** Representation of job archive event. */
+ /** Representation of archive event. */
public static class ArchiveEvent {
- private final String jobID;
+ private final String id;
private final ArchiveEventType operation;
- ArchiveEvent(String jobID, ArchiveEventType operation) {
- this.jobID = jobID;
+ ArchiveEvent(String id, ArchiveEventType operation) {
+ this.id = id;
this.operation = operation;
}
- public String getJobID() {
- return jobID;
+ public String getId() {
+ return id;
}
public ArchiveEventType getType() {
@@ -105,48 +101,50 @@ public ArchiveEventType getType() {
private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class);
- private static final JsonFactory jacksonFactory = new JsonFactory();
- private static final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
+ protected static final String JSON_FILE_ENDING = ".json";
+ protected static final String JOBS_SUBDIR = "jobs";
+ protected static final String JOB_OVERVIEWS_SUBDIR = "overviews";
- private static final String JSON_FILE_ENDING = ".json";
+ protected final JsonFactory jacksonFactory = new JsonFactory();
+ protected final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
- private final List refreshDirs;
- private final Consumer jobArchiveEventListener;
- private final boolean processExpiredArchiveDeletion;
- private final JobRetainedStrategy jobRetainedStrategy;
+ protected final List refreshDirs;
+ protected final Consumer archiveEventListener;
+ protected final boolean processExpiredArchiveDeletion;
+ protected final ArchiveRetainedStrategy retainedStrategy;
- /** Cache of all available jobs identified by their id. */
- private final Map> cachedArchivesPerRefreshDirectory;
+ /** Cache of all available archives identified by their id. */
+ protected final Map> cachedArchivesPerRefreshDirectory;
- private final File webDir;
- private final File webJobDir;
- private final File webOverviewDir;
+ protected final File webDir;
+ protected final File webJobDir;
+ protected final File webOverviewDir;
HistoryServerArchiveFetcher(
List refreshDirs,
File webDir,
- Consumer jobArchiveEventListener,
+ Consumer archiveEventListener,
boolean cleanupExpiredArchives,
- JobRetainedStrategy jobRetainedStrategy)
+ ArchiveRetainedStrategy retainedStrategy)
throws IOException {
this.refreshDirs = checkNotNull(refreshDirs);
- this.jobArchiveEventListener = jobArchiveEventListener;
+ this.archiveEventListener = archiveEventListener;
this.processExpiredArchiveDeletion = cleanupExpiredArchives;
- this.jobRetainedStrategy = checkNotNull(jobRetainedStrategy);
+ this.retainedStrategy = checkNotNull(retainedStrategy);
this.cachedArchivesPerRefreshDirectory = new HashMap<>();
for (HistoryServer.RefreshLocation refreshDir : refreshDirs) {
cachedArchivesPerRefreshDirectory.put(refreshDir.getPath(), new HashSet<>());
}
this.webDir = checkNotNull(webDir);
- this.webJobDir = new File(webDir, "jobs");
+ this.webJobDir = new File(webDir, JOBS_SUBDIR);
Files.createDirectories(webJobDir.toPath());
- this.webOverviewDir = new File(webDir, "overviews");
+ this.webOverviewDir = new File(webDir, JOB_OVERVIEWS_SUBDIR);
Files.createDirectories(webOverviewDir.toPath());
- updateJobOverview(webOverviewDir, webDir);
+ updateJobOverview();
if (LOG.isInfoEnabled()) {
for (HistoryServer.RefreshLocation refreshDir : refreshDirs) {
- LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath());
+ LOG.info("Monitoring directory {} for archives.", refreshDir.getPath());
}
}
}
@@ -155,100 +153,102 @@ void fetchArchives() {
try {
LOG.debug("Starting archive fetching.");
List events = new ArrayList<>();
- Map> jobsToRemove = new HashMap<>();
+ Map> archivesToRemove = new HashMap<>();
cachedArchivesPerRefreshDirectory.forEach(
- (path, archives) -> jobsToRemove.put(path, new HashSet<>(archives)));
+ (path, archives) -> archivesToRemove.put(path, new HashSet<>(archives)));
Map> archivesBeyondRetainedLimit = new HashMap<>();
for (HistoryServer.RefreshLocation refreshLocation : refreshDirs) {
Path refreshDir = refreshLocation.getPath();
LOG.debug("Checking archive directory {}.", refreshDir);
- // contents of /:refreshDir
- FileStatus[] jobArchives;
+ List archives;
try {
- jobArchives = listArchives(refreshLocation.getFs(), refreshDir);
+ archives = listValidArchives(refreshLocation.getFs(), refreshDir);
+ archives.sort(
+ Comparator.comparingLong(FileStatus::getModificationTime).reversed());
} catch (IOException e) {
- LOG.error("Failed to access job archive location for path {}.", refreshDir, e);
+ LOG.error("Failed to access archive location for path {}.", refreshDir, e);
// something went wrong, potentially due to a concurrent deletion
- // do not remove any jobs now; we will retry later
- jobsToRemove.remove(refreshDir);
+ // do not remove any archives now; we will retry later
+ archivesToRemove.remove(refreshDir);
continue;
}
int fileOrderedIndexOnModifiedTime = 0;
- for (FileStatus jobArchive : jobArchives) {
- Path jobArchivePath = jobArchive.getPath();
- String jobID = jobArchivePath.getName();
- if (!isValidJobID(jobID, refreshDir)) {
- continue;
- }
-
- jobsToRemove.get(refreshDir).remove(jobID);
+ for (FileStatus archive : archives) {
+ Path archivePath = archive.getPath();
+ String archiveId = archivePath.getName();
+ archivesToRemove.get(refreshDir).remove(archiveId);
fileOrderedIndexOnModifiedTime++;
- if (!jobRetainedStrategy.shouldRetain(
- jobArchive, fileOrderedIndexOnModifiedTime)) {
+ if (!retainedStrategy.shouldRetain(archive, fileOrderedIndexOnModifiedTime)) {
archivesBeyondRetainedLimit
.computeIfAbsent(refreshDir, ignored -> new HashSet<>())
- .add(jobArchivePath);
+ .add(archivePath);
continue;
}
- if (cachedArchivesPerRefreshDirectory.get(refreshDir).contains(jobID)) {
+ if (cachedArchivesPerRefreshDirectory.get(refreshDir).contains(archiveId)) {
LOG.trace(
- "Ignoring archive {} because it was already fetched.",
- jobArchivePath);
+ "Ignoring archive {} because it was already fetched.", archivePath);
} else {
- LOG.info("Processing archive {}.", jobArchivePath);
+ LOG.info("Processing archive {}.", archivePath);
try {
- processArchive(jobID, jobArchivePath);
- events.add(new ArchiveEvent(jobID, ArchiveEventType.CREATED));
- cachedArchivesPerRefreshDirectory.get(refreshDir).add(jobID);
- LOG.info("Processing archive {} finished.", jobArchivePath);
+ events.addAll(processArchive(archiveId, archivePath, refreshDir));
+ cachedArchivesPerRefreshDirectory.get(refreshDir).add(archiveId);
+ LOG.info("Processing archive {} finished.", archivePath);
} catch (IOException e) {
LOG.error(
- "Failure while fetching/processing job archive for job {}.",
- jobID,
- e);
- deleteJobFiles(jobID);
+ "Failure while fetching/processing archive {}.", archiveId, e);
+ deleteCachedArchives(archiveId, refreshDir);
}
}
}
}
- if (jobsToRemove.values().stream().flatMap(Set::stream).findAny().isPresent()
+ if (archivesToRemove.values().stream().flatMap(Set::stream).findAny().isPresent()
&& processExpiredArchiveDeletion) {
- events.addAll(cleanupExpiredJobs(jobsToRemove));
+ events.addAll(cleanupExpiredArchives(archivesToRemove));
}
if (!archivesBeyondRetainedLimit.isEmpty()) {
- events.addAll(cleanupJobsBeyondSizeLimit(archivesBeyondRetainedLimit));
+ events.addAll(cleanupArchivesBeyondRetainedLimit(archivesBeyondRetainedLimit));
}
if (!events.isEmpty()) {
- updateJobOverview(webOverviewDir, webDir);
+ updateOverview();
}
- events.forEach(jobArchiveEventListener::accept);
+ events.forEach(archiveEventListener);
LOG.debug("Finished archive fetching.");
} catch (Exception e) {
- LOG.error("Critical failure while fetching/processing job archives.", e);
+ LOG.error("Critical failure while fetching/processing archives.", e);
}
}
- private static FileStatus[] listArchives(FileSystem refreshFS, Path refreshDir)
+ List listValidArchives(FileSystem refreshFS, Path refreshDir) throws IOException {
+ return listValidJobArchives(refreshFS, refreshDir);
+ }
+
+ List listValidJobArchives(FileSystem refreshFS, Path refreshDir)
throws IOException {
+ List jobArchives = new ArrayList<>();
// contents of /:refreshDir
- FileStatus[] jobArchives = refreshFS.listStatus(refreshDir);
- if (jobArchives == null) {
+ FileStatus[] archives = refreshFS.listStatus(refreshDir);
+ if (archives == null) {
// the entire refreshDirectory was removed
- return new FileStatus[0];
+ return jobArchives;
}
- Arrays.sort(
- jobArchives, Comparator.comparingLong(FileStatus::getModificationTime).reversed());
+ // Check for job archive files located directly in the refresh directory and named according
+ // to the job ID format
+ for (FileStatus archive : archives) {
+ if (!archive.isDir() && isValidJobId(archive.getPath().getName(), refreshDir)) {
+ jobArchives.add(archive);
+ }
+ }
return jobArchives;
}
- private static boolean isValidJobID(String jobId, Path refreshDir) {
+ boolean isValidJobId(String jobId, Path refreshDir) {
try {
JobID.fromHexString(jobId);
return true;
@@ -262,98 +262,110 @@ private static boolean isValidJobID(String jobId, Path refreshDir) {
}
}
- private void processArchive(String jobID, Path jobArchive) throws IOException {
+ List processArchive(String archiveId, Path archivePath, Path refreshDir)
+ throws IOException {
+ return Collections.singletonList(processJobArchive(archiveId, archivePath));
+ }
+
+ ArchiveEvent processJobArchive(String jobId, Path jobArchive) throws IOException {
for (ArchivedJson archive : FsJsonArchivist.readArchivedJsons(jobArchive)) {
String path = archive.getPath();
String json = archive.getJson();
File target;
if (path.equals(JobsOverviewHeaders.URL)) {
- target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
+ target = new File(webOverviewDir, jobId + JSON_FILE_ENDING);
} else if (path.equals("/joboverview")) { // legacy path
LOG.debug("Migrating legacy archive {}", jobArchive);
json = convertLegacyJobOverview(json);
- target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
+ target = new File(webOverviewDir, jobId + JSON_FILE_ENDING);
} else {
// this implicitly writes into webJobDir
target = new File(webDir, path + JSON_FILE_ENDING);
}
- java.nio.file.Path parent = target.getParentFile().toPath();
+ writeTargetFile(target, json);
+ }
- try {
- Files.createDirectories(parent);
- } catch (FileAlreadyExistsException ignored) {
- // there may be left-over directories from the previous
- // attempt
- }
+ return new ArchiveEvent(jobId, ArchiveEventType.CREATED);
+ }
+
+ void writeTargetFile(File target, String json) throws IOException {
+ java.nio.file.Path parent = target.getParentFile().toPath();
+
+ try {
+ Files.createDirectories(parent);
+ } catch (FileAlreadyExistsException ignored) {
+ // there may be left-over directories from the previous attempt
+ }
- java.nio.file.Path targetPath = target.toPath();
+ java.nio.file.Path targetPath = target.toPath();
- // We overwrite existing files since this may be another attempt
- // at fetching this archive.
- // Existing files may be incomplete/corrupt.
- Files.deleteIfExists(targetPath);
+ // We overwrite existing files since this may be another attempt
+ // at fetching this archive.
+ // Existing files may be incomplete/corrupt.
+ Files.deleteIfExists(targetPath);
- Files.createFile(target.toPath());
- try (FileWriter fw = new FileWriter(target)) {
- fw.write(json);
- fw.flush();
- }
+ Files.createFile(target.toPath());
+ try (FileWriter fw = new FileWriter(target)) {
+ fw.write(json);
+ fw.flush();
}
}
- private List cleanupJobsBeyondSizeLimit(
- Map> jobArchivesToRemove) {
- Map> allJobIdsToRemoveFromOverview = new HashMap<>();
+ List cleanupArchivesBeyondRetainedLimit(Map> archivesToRemove) {
+ Map> allArchiveIdsToRemove = new HashMap<>();
- for (Map.Entry> pathSetEntry : jobArchivesToRemove.entrySet()) {
- HashSet jobIdsToRemoveFromOverview = new HashSet<>();
+ for (Map.Entry> pathSetEntry : archivesToRemove.entrySet()) {
+ HashSet archiveIdsToRemove = new HashSet<>();
for (Path archive : pathSetEntry.getValue()) {
- jobIdsToRemoveFromOverview.add(archive.getName());
+ archiveIdsToRemove.add(archive.getName());
try {
- archive.getFileSystem().delete(archive, false);
+ deleteFromRemote(archive);
} catch (IOException ioe) {
- LOG.warn("Could not delete old archive " + archive, ioe);
+ LOG.warn("Could not delete old archive {}", archive, ioe);
}
}
- allJobIdsToRemoveFromOverview.put(pathSetEntry.getKey(), jobIdsToRemoveFromOverview);
+ allArchiveIdsToRemove.put(pathSetEntry.getKey(), archiveIdsToRemove);
}
- return cleanupExpiredJobs(allJobIdsToRemoveFromOverview);
+ return cleanupExpiredArchives(allArchiveIdsToRemove);
}
- private List cleanupExpiredJobs(Map> jobsToRemove) {
+ void deleteFromRemote(Path archive) throws IOException {
+ archive.getFileSystem().delete(archive, false);
+ }
+ List cleanupExpiredArchives(Map> archivesToRemove) {
List deleteLog = new ArrayList<>();
- LOG.info("Archive directories for jobs {} were deleted.", jobsToRemove);
- jobsToRemove.forEach(
- (refreshDir, archivesToRemove) -> {
- cachedArchivesPerRefreshDirectory.get(refreshDir).removeAll(archivesToRemove);
+ archivesToRemove.forEach(
+ (refreshDir, archives) -> {
+ cachedArchivesPerRefreshDirectory.get(refreshDir).removeAll(archives);
+ archives.forEach(
+ archiveId ->
+ deleteLog.addAll(deleteCachedArchives(archiveId, refreshDir)));
});
- jobsToRemove.values().stream()
- .flatMap(Set::stream)
- .forEach(
- removedJobID -> {
- deleteJobFiles(removedJobID);
- deleteLog.add(new ArchiveEvent(removedJobID, ArchiveEventType.DELETED));
- });
return deleteLog;
}
- private void deleteJobFiles(String jobID) {
+ List deleteCachedArchives(String archiveId, Path refreshDir) {
+ LOG.info("Archive directories for job {} is deleted", archiveId);
+ return Collections.singletonList(deleteJobFiles(archiveId));
+ }
+
+ ArchiveEvent deleteJobFiles(String jobId) {
// Make sure we do not include this job in the overview
try {
- Files.deleteIfExists(new File(webOverviewDir, jobID + JSON_FILE_ENDING).toPath());
+ Files.deleteIfExists(new File(webOverviewDir, jobId + JSON_FILE_ENDING).toPath());
} catch (IOException ioe) {
LOG.warn("Could not delete file from overview directory.", ioe);
}
// Clean up job files we may have created
- File jobDirectory = new File(webJobDir, jobID);
+ File jobDirectory = new File(webJobDir, jobId);
try {
FileUtils.deleteDirectory(jobDirectory);
} catch (IOException ioe) {
@@ -361,13 +373,15 @@ private void deleteJobFiles(String jobID) {
}
try {
- Files.deleteIfExists(new File(webJobDir, jobID + JSON_FILE_ENDING).toPath());
+ Files.deleteIfExists(new File(webJobDir, jobId + JSON_FILE_ENDING).toPath());
} catch (IOException ioe) {
LOG.warn("Could not delete file from job directory.", ioe);
}
+
+ return new ArchiveEvent(jobId, ArchiveEventType.DELETED);
}
- private static String convertLegacyJobOverview(String legacyOverview) throws IOException {
+ private String convertLegacyJobOverview(String legacyOverview) throws IOException {
JsonNode root = mapper.readTree(legacyOverview);
JsonNode finishedJobs = root.get("finished");
JsonNode job = finishedJobs.get(0);
@@ -436,6 +450,10 @@ private static String convertLegacyJobOverview(String legacyOverview) throws IOE
return sw.toString();
}
+ void updateOverview() {
+ updateJobOverview();
+ }
+
/**
* This method replicates the JSON response that would be given by the JobsOverviewHandler when
* listing both running and finished jobs.
@@ -445,7 +463,7 @@ private static String convertLegacyJobOverview(String legacyOverview) throws IOE
*
* For the display in the HistoryServer WebFrontend we have to combine these overviews.
*/
- private static void updateJobOverview(File webOverviewDir, File webDir) {
+ void updateJobOverview() {
try (JsonGenerator gen =
jacksonFactory.createGenerator(
HistoryServer.createOrGetFile(webDir, JobsOverviewHeaders.URL))) {
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/JobRetainedStrategy.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/ArchiveRetainedStrategy.java
similarity index 96%
rename from flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/JobRetainedStrategy.java
rename to flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/ArchiveRetainedStrategy.java
index 2ef991698bf71..f2e50dd73c551 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/JobRetainedStrategy.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/ArchiveRetainedStrategy.java
@@ -21,7 +21,7 @@
import org.apache.flink.core.fs.FileStatus;
/** To define the strategy interface to judge whether the file should be retained. */
-public interface JobRetainedStrategy {
+public interface ArchiveRetainedStrategy {
/**
* Judge whether the file should be retained.
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategy.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategy.java
similarity index 63%
rename from flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategy.java
rename to flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategy.java
index acd35c93f7903..2a38dfaeff773 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategy.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategy.java
@@ -31,23 +31,42 @@
import java.util.List;
import java.util.Optional;
+import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_APPLICATIONS;
import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS;
import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_TTL;
+import static org.apache.flink.util.Preconditions.checkArgument;
/** The retained strategy. */
-public class CompositeJobRetainedStrategy implements JobRetainedStrategy {
+public class CompositeArchiveRetainedStrategy implements ArchiveRetainedStrategy {
- public static JobRetainedStrategy createFrom(ReadableConfig config) {
+ public static ArchiveRetainedStrategy createForJobFromConfig(ReadableConfig config) {
int maxHistorySizeByOldKey = config.get(HISTORY_SERVER_RETAINED_JOBS);
+ if (maxHistorySizeByOldKey == 0 || maxHistorySizeByOldKey < -1) {
+ throw new IllegalConfigurationException(
+ "Cannot set %s to 0 or less than -1", HISTORY_SERVER_RETAINED_JOBS.key());
+ }
Optional retainedTtlOpt = config.getOptional(HISTORY_SERVER_RETAINED_TTL);
- return new CompositeJobRetainedStrategy(
- new QuantityJobRetainedStrategy(maxHistorySizeByOldKey),
- new TimeToLiveJobRetainedStrategy(retainedTtlOpt.orElse(null)));
+ return new CompositeArchiveRetainedStrategy(
+ new QuantityArchiveRetainedStrategy(maxHistorySizeByOldKey),
+ new TimeToLiveArchiveRetainedStrategy(retainedTtlOpt.orElse(null)));
}
- private final List strategies;
+ public static ArchiveRetainedStrategy createForApplicationFromConfig(ReadableConfig config) {
+ int maxHistorySize = config.get(HISTORY_SERVER_RETAINED_APPLICATIONS);
+ if (maxHistorySize == 0 || maxHistorySize < -1) {
+ throw new IllegalConfigurationException(
+ "Cannot set %s to 0 or less than -1",
+ HISTORY_SERVER_RETAINED_APPLICATIONS.key());
+ }
+ Optional retainedTtlOpt = config.getOptional(HISTORY_SERVER_RETAINED_TTL);
+ return new CompositeArchiveRetainedStrategy(
+ new QuantityArchiveRetainedStrategy(maxHistorySize),
+ new TimeToLiveArchiveRetainedStrategy(retainedTtlOpt.orElse(null)));
+ }
+
+ private final List strategies;
- CompositeJobRetainedStrategy(JobRetainedStrategy... strategies) {
+ CompositeArchiveRetainedStrategy(ArchiveRetainedStrategy... strategies) {
this.strategies =
strategies == null || strategies.length == 0
? Collections.emptyList()
@@ -64,11 +83,11 @@ public boolean shouldRetain(FileStatus file, int fileOrderedIndex) {
}
/** The time to live based retained strategy. */
-class TimeToLiveJobRetainedStrategy implements JobRetainedStrategy {
+class TimeToLiveArchiveRetainedStrategy implements ArchiveRetainedStrategy {
@Nullable private final Duration ttlThreshold;
- TimeToLiveJobRetainedStrategy(Duration ttlThreshold) {
+ TimeToLiveArchiveRetainedStrategy(@Nullable Duration ttlThreshold) {
if (ttlThreshold != null && ttlThreshold.toMillis() <= 0) {
throw new IllegalConfigurationException(
"Cannot set %s to 0 or less than 0 milliseconds",
@@ -86,16 +105,13 @@ public boolean shouldRetain(FileStatus file, int fileOrderedIndex) {
}
}
-/** The job quantity based retained strategy. */
-class QuantityJobRetainedStrategy implements JobRetainedStrategy {
+/** The quantity based retained strategy. */
+class QuantityArchiveRetainedStrategy implements ArchiveRetainedStrategy {
private final int quantityThreshold;
- QuantityJobRetainedStrategy(int quantityThreshold) {
- if (quantityThreshold == 0 || quantityThreshold < -1) {
- throw new IllegalConfigurationException(
- "Cannot set %s to 0 or less than -1", HISTORY_SERVER_RETAINED_JOBS.key());
- }
+ QuantityArchiveRetainedStrategy(int quantityThreshold) {
+ checkArgument(quantityThreshold == -1 || quantityThreshold > 0);
this.quantityThreshold = quantityThreshold;
}
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 537d58fae8e77..c67a3147628b5 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -18,19 +18,32 @@
package org.apache.flink.runtime.webmonitor.history;
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.ApplicationState;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HistoryServerOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.application.ArchivedApplication;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.history.ArchivePathUtils;
import org.apache.flink.runtime.history.FsJsonArchivist;
+import org.apache.flink.runtime.messages.webmonitor.ApplicationDetails;
+import org.apache.flink.runtime.messages.webmonitor.ApplicationDetailsInfo;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleApplicationsDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.rest.messages.ApplicationIDPathParameter;
+import org.apache.flink.runtime.rest.messages.ApplicationsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.application.ApplicationDetailsHeaders;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.webmonitor.testutils.HttpUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -51,6 +64,8 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import javax.annotation.Nullable;
+
import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
@@ -58,11 +73,14 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -86,13 +104,14 @@ class HistoryServerTest {
private MiniClusterWithClientResource cluster;
private File jmDirectory;
private File hsDirectory;
+ private Configuration clusterConfig;
@BeforeEach
void setUp(@TempDir File jmDirectory, @TempDir File hsDirectory) throws Exception {
this.jmDirectory = jmDirectory;
this.hsDirectory = hsDirectory;
- Configuration clusterConfig = new Configuration();
+ clusterConfig = new Configuration();
clusterConfig.set(JobManagerOptions.ARCHIVE_DIR, jmDirectory.toURI().toString());
cluster =
@@ -130,6 +149,9 @@ void testHistoryServerIntegration(final boolean versionLessThan14) throws Except
== HistoryServerArchiveFetcher.ArchiveEventType.CREATED) {
numExpectedArchivedJobs.countDown();
}
+ },
+ (event) -> {
+ throw new RuntimeException("Should not call");
});
try {
@@ -164,10 +186,10 @@ void testRemoveOldestModifiedArchivesBeyondHistorySizeLimit(final boolean versio
final int numArchivesToRemoveUponHsStart =
numArchivesBeforeHsStarted - numArchivesToKeepInHistory;
final long oneMinuteSinceEpoch = 1000L * 60L;
- List expectedJobIdsToKeep = new LinkedList<>();
+ List expectedJobIdsToKeep = new LinkedList<>();
for (int j = 0; j < numArchivesBeforeHsStarted; j++) {
- String jobId =
+ JobID jobId =
createLegacyArchive(
jmDirectory.toPath(), j * oneMinuteSinceEpoch, versionLessThan14);
if (j >= numArchivesToRemoveUponHsStart) {
@@ -205,6 +227,9 @@ void testRemoveOldestModifiedArchivesBeyondHistorySizeLimit(final boolean versio
numArchivesDeletedTotal.countDown();
break;
}
+ },
+ (event) -> {
+ throw new RuntimeException("Should not call");
});
try {
@@ -232,10 +257,9 @@ void testRemoveOldestModifiedArchivesBeyondHistorySizeLimit(final boolean versio
}
}
- private Set getIdsFromJobOverview(String baseUrl) throws Exception {
+ private Set getIdsFromJobOverview(String baseUrl) throws Exception {
return getJobsOverview(baseUrl).getJobs().stream()
.map(JobDetails::getJobId)
- .map(JobID::toString)
.collect(Collectors.toSet());
}
@@ -288,15 +312,26 @@ void testClearWebDir() throws Exception {
new File(hsDirectory.toURI() + "/overviews/dirtyEmptySubFile.json").createNewFile();
new File(hsDirectory.toURI() + "/jobs/dirtyEmptySubDir").mkdir();
new File(hsDirectory.toURI() + "/jobs/dirtyEmptySubFile.json").createNewFile();
+ new File(hsDirectory.toURI() + "/application-overviews/dirtyEmptySubDir").mkdir();
+ new File(hsDirectory.toURI() + "/application-overviews/dirtyEmptySubFile.json")
+ .createNewFile();
+ new File(hsDirectory.toURI() + "/applications/dirtyEmptySubDir").mkdir();
+ new File(hsDirectory.toURI() + "/applications/dirtyEmptySubFile.json").createNewFile();
hs = new HistoryServer(historyServerConfig);
assertInitializedHistoryServerWebDir(hs.getWebDir());
}
private void assertInitializedHistoryServerWebDir(File historyWebDir) {
-
- assertThat(historyWebDir.list()).containsExactlyInAnyOrder("overviews", "jobs");
+ assertThat(historyWebDir.list())
+ .containsExactlyInAnyOrder(
+ "overviews", "jobs", "application-overviews", "applications");
assertThat(new File(historyWebDir, "overviews")).exists().isDirectory().isEmptyDirectory();
assertThat(new File(historyWebDir, "jobs").list()).containsExactly("overview.json");
+ assertThat(new File(historyWebDir, "application-overviews"))
+ .exists()
+ .isDirectory()
+ .isEmptyDirectory();
+ assertThat(new File(historyWebDir, "applications").list()).containsExactly("overview.json");
}
private void runArchiveExpirationTest(boolean cleanupExpiredJobs) throws Exception {
@@ -327,6 +362,9 @@ private void runArchiveExpirationTest(boolean cleanupExpiredJobs) throws Excepti
allArchivesExpiredLatch.countDown();
break;
}
+ },
+ (event) -> {
+ throw new RuntimeException("Should not call");
});
try {
@@ -378,27 +416,33 @@ private void runArchiveExpirationTest(boolean cleanupExpiredJobs) throws Excepti
assertThat(allArchivesExpiredLatch.await(10L, TimeUnit.SECONDS)).isTrue();
- assertJobFilesCleanedUp(cleanupExpiredJobs);
+ assertFilesCleanedUp(cleanupExpiredJobs);
} finally {
hs.stop();
}
}
- private void assertJobFilesCleanedUp(boolean jobFilesShouldBeDeleted) throws IOException {
+ private void assertFilesCleanedUp(boolean filesShouldBeDeleted) throws IOException {
try (Stream paths = Files.walk(hsDirectory.toPath())) {
- final List jobFiles =
+ final List applicationOrJobFiles =
paths.filter(path -> !path.equals(hsDirectory.toPath()))
.map(path -> hsDirectory.toPath().relativize(path))
.filter(path -> !path.equals(Paths.get("config.json")))
.filter(path -> !path.equals(Paths.get("jobs")))
.filter(path -> !path.equals(Paths.get("jobs", "overview.json")))
.filter(path -> !path.equals(Paths.get("overviews")))
+ .filter(path -> !path.equals(Paths.get("applications")))
+ .filter(
+ path ->
+ !path.equals(
+ Paths.get("applications", "overview.json")))
+ .filter(path -> !path.equals(Paths.get("application-overviews")))
.collect(Collectors.toList());
- if (jobFilesShouldBeDeleted) {
- assertThat(jobFiles).isEmpty();
+ if (filesShouldBeDeleted) {
+ assertThat(applicationOrJobFiles).isEmpty();
} else {
- assertThat(jobFiles).isNotEmpty();
+ assertThat(applicationOrJobFiles).isNotEmpty();
}
}
}
@@ -413,6 +457,13 @@ private void waitForArchivesCreation(int numJobs) throws InterruptedException {
}
private Configuration createTestConfiguration(boolean cleanupExpiredJobs) {
+ return createTestConfiguration(
+ cleanupExpiredJobs,
+ HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS.defaultValue());
+ }
+
+ private Configuration createTestConfiguration(
+ boolean cleanupExpiredJobs, boolean cleanupExpiredApplications) {
Configuration historyServerConfig = new Configuration();
historyServerConfig.set(
HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, jmDirectory.toURI().toString());
@@ -424,6 +475,9 @@ private Configuration createTestConfiguration(boolean cleanupExpiredJobs) {
historyServerConfig.set(
HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS, cleanupExpiredJobs);
+ historyServerConfig.set(
+ HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS,
+ cleanupExpiredApplications);
historyServerConfig.set(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, 0);
return historyServerConfig;
@@ -451,15 +505,15 @@ private static void runJob() throws Exception {
env.execute();
}
- private static String createLegacyArchive(
+ private static JobID createLegacyArchive(
Path directory, long fileModifiedDate, boolean versionLessThan14) throws IOException {
- String jobId = createLegacyArchive(directory, versionLessThan14);
- File jobArchive = directory.resolve(jobId).toFile();
+ JobID jobId = createLegacyArchive(directory, versionLessThan14);
+ File jobArchive = directory.resolve(jobId.toString()).toFile();
jobArchive.setLastModified(fileModifiedDate);
return jobId;
}
- private static String createLegacyArchive(Path directory, boolean versionLessThan14)
+ private static JobID createLegacyArchive(Path directory, boolean versionLessThan14)
throws IOException {
JobID jobId = JobID.generate();
@@ -504,7 +558,405 @@ private static String createLegacyArchive(Path directory, boolean versionLessTha
directory.toAbsolutePath().toString(), jobId.toString()),
Collections.singleton(archivedJson));
- return jobId.toString();
+ return jobId;
+ }
+
+ @Test
+ void testApplicationAndJobArchives() throws Exception {
+ int numApplications = 2;
+ int numJobsPerApplication = 2;
+ // jobs that are not part of an application
+ int numJobsOutsideApplication = 1;
+
+ Map> expectedApplicationAndJobIds =
+ new HashMap<>(numApplications);
+ for (int i = 0; i < numApplications; i++) {
+ ArchivedApplication archivedApplication = mockApplicationArchive(numJobsPerApplication);
+ ApplicationID applicationId = archivedApplication.getApplicationId();
+ List jobIds =
+ archivedApplication.getJobs().values().stream()
+ .map(ExecutionGraphInfo::getJobId)
+ .collect(Collectors.toList());
+ expectedApplicationAndJobIds.put(applicationId, new HashSet<>(jobIds));
+ }
+ Set expectedJobIdsOutsideApplication = new HashSet<>(numJobsOutsideApplication);
+ for (int i = 0; i < numJobsOutsideApplication; i++) {
+ ExecutionGraphInfo executionGraphInfo = createExecutionGraphInfo();
+ mockJobArchive(executionGraphInfo, null);
+ expectedJobIdsOutsideApplication.add(executionGraphInfo.getJobId());
+ }
+
+ int numTotalJobs = numApplications * numJobsPerApplication + numJobsOutsideApplication;
+ int numTotal = numApplications + numTotalJobs;
+ CountDownLatch numExpectedArchives = new CountDownLatch(numTotal);
+ Configuration historyServerConfig = createTestConfiguration(false);
+ HistoryServer hs =
+ new HistoryServer(
+ historyServerConfig,
+ (event) -> {
+ if (event.getType()
+ == HistoryServerArchiveFetcher.ArchiveEventType.CREATED) {
+ numExpectedArchives.countDown();
+ }
+ },
+ (event) -> {
+ if (event.getType()
+ == HistoryServerApplicationArchiveFetcher.ArchiveEventType
+ .CREATED) {
+ numExpectedArchives.countDown();
+ }
+ });
+ try {
+ hs.start();
+ String baseUrl = "http://localhost:" + hs.getWebPort();
+ assertThat(numExpectedArchives.await(10L, TimeUnit.SECONDS)).isTrue();
+ assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl))
+ .isEqualTo(expectedApplicationAndJobIds);
+ Set expectedJobIds =
+ Stream.concat(
+ expectedApplicationAndJobIds.values().stream()
+ .flatMap(Set::stream),
+ expectedJobIdsOutsideApplication.stream())
+ .collect(Collectors.toSet());
+ assertThat(getIdsFromJobOverview(baseUrl)).isEqualTo(expectedJobIds);
+ // checks whether the dashboard configuration contains all expected fields
+ getDashboardConfiguration(baseUrl);
+ } finally {
+ hs.stop();
+ }
+ }
+
+ @Test
+ void testRemoveApplicationArchivesBeyondHistorySizeLimit() throws Exception {
+ int numJobsPerApplication = 1;
+ int numApplicationsToKeepInHistory = 2;
+ int numApplicationsBeforeHsStarted = 4;
+ int numApplicationsAfterHsStarted = 2;
+ int numApplicationsToRemoveUponHsStart =
+ numApplicationsBeforeHsStarted - numApplicationsToKeepInHistory;
+ List>> expectedApplicationAndJobIdsToKeep =
+ new LinkedList<>();
+ for (int i = 0; i < numApplicationsBeforeHsStarted; i++) {
+ ArchivedApplication archivedApplication = mockApplicationArchive(numJobsPerApplication);
+ ApplicationID applicationId = archivedApplication.getApplicationId();
+ List jobIds =
+ archivedApplication.getJobs().values().stream()
+ .map(ExecutionGraphInfo::getJobId)
+ .collect(Collectors.toList());
+ if (i >= numApplicationsToRemoveUponHsStart) {
+ expectedApplicationAndJobIdsToKeep.add(
+ new Tuple2<>(applicationId, new HashSet<>(jobIds)));
+ }
+ }
+
+ // one for application itself, numJobsPerApplication for jobs
+ int numArchivesRatio = 1 + numJobsPerApplication;
+ CountDownLatch numArchivesCreatedInitially =
+ new CountDownLatch(numApplicationsToKeepInHistory * numArchivesRatio);
+ // jobs in applications that exceed the size limit are not read by the fetcher at all,
+ // so there is no need to delete these jobs.
+ CountDownLatch numArchivesDeletedInitially =
+ new CountDownLatch(numApplicationsToRemoveUponHsStart);
+ CountDownLatch numArchivesCreatedTotal =
+ new CountDownLatch(
+ (numApplicationsBeforeHsStarted
+ - numApplicationsToRemoveUponHsStart
+ + numApplicationsAfterHsStarted)
+ * numArchivesRatio);
+ CountDownLatch numArchivesDeletedTotal =
+ new CountDownLatch(
+ numApplicationsToRemoveUponHsStart
+ + numApplicationsAfterHsStarted * numArchivesRatio);
+ Configuration historyServerConfig =
+ createTestConfiguration(
+ HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS.defaultValue());
+ historyServerConfig.set(
+ HistoryServerOptions.HISTORY_SERVER_RETAINED_APPLICATIONS,
+ numApplicationsToKeepInHistory);
+ HistoryServer hs =
+ new HistoryServer(
+ historyServerConfig,
+ (event) -> {
+ throw new RuntimeException("Should not call");
+ },
+ (event) -> {
+ if (event.getType()
+ == HistoryServerApplicationArchiveFetcher.ArchiveEventType
+ .CREATED) {
+ numArchivesCreatedInitially.countDown();
+ numArchivesCreatedTotal.countDown();
+ } else if (event.getType()
+ == HistoryServerApplicationArchiveFetcher.ArchiveEventType
+ .DELETED) {
+ numArchivesDeletedInitially.countDown();
+ numArchivesDeletedTotal.countDown();
+ }
+ });
+ try {
+ hs.start();
+ String baseUrl = "http://localhost:" + hs.getWebPort();
+ assertThat(numArchivesCreatedInitially.await(10L, TimeUnit.SECONDS)).isTrue();
+ assertThat(numArchivesDeletedInitially.await(10L, TimeUnit.SECONDS)).isTrue();
+ assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl))
+ .isEqualTo(
+ expectedApplicationAndJobIdsToKeep.stream()
+ .collect(
+ Collectors.toMap(
+ tuple -> tuple.f0, tuple -> tuple.f1)));
+ for (int i = numApplicationsBeforeHsStarted;
+ i < numApplicationsBeforeHsStarted + numApplicationsAfterHsStarted;
+ i++) {
+ expectedApplicationAndJobIdsToKeep.remove(0);
+ ArchivedApplication archivedApplication =
+ mockApplicationArchive(numJobsPerApplication);
+ ApplicationID applicationId = archivedApplication.getApplicationId();
+ List jobIds =
+ archivedApplication.getJobs().values().stream()
+ .map(ExecutionGraphInfo::getJobId)
+ .collect(Collectors.toList());
+ expectedApplicationAndJobIdsToKeep.add(
+ new Tuple2<>(applicationId, new HashSet<>(jobIds)));
+ // avoid executing too fast, resulting in the same creation time of archive files
+ Thread.sleep(50);
+ }
+
+ assertThat(numArchivesCreatedTotal.await(10L, TimeUnit.SECONDS)).isTrue();
+ assertThat(numArchivesDeletedTotal.await(10L, TimeUnit.SECONDS)).isTrue();
+ assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl))
+ .isEqualTo(
+ expectedApplicationAndJobIdsToKeep.stream()
+ .collect(
+ Collectors.toMap(
+ tuple -> tuple.f0, tuple -> tuple.f1)));
+ } finally {
+ hs.stop();
+ }
+ }
+
+ @Test
+ void testFailIfApplicationHistorySizeLimitIsZero() {
+ assertThatThrownBy(() -> startHistoryServerWithApplicationSizeLimit(0))
+ .isInstanceOf(IllegalConfigurationException.class);
+ }
+
+ @Test
+ void testFailIfApplicationHistorySizeLimitIsLessThanMinusOne() {
+ assertThatThrownBy(() -> startHistoryServerWithApplicationSizeLimit(-2))
+ .isInstanceOf(IllegalConfigurationException.class);
+ }
+
+ private void startHistoryServerWithApplicationSizeLimit(int maxHistorySize)
+ throws IOException, FlinkException, InterruptedException {
+ Configuration historyServerConfig =
+ createTestConfiguration(
+ HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS
+ .defaultValue());
+ historyServerConfig.set(
+ HistoryServerOptions.HISTORY_SERVER_RETAINED_APPLICATIONS, maxHistorySize);
+ new HistoryServer(historyServerConfig).start();
+ }
+
+ @Test
+ void testCleanExpiredApplication() throws Exception {
+ runApplicationArchiveExpirationTest(true);
+ }
+
+ @Test
+ void testRemainExpiredApplication() throws Exception {
+ runApplicationArchiveExpirationTest(false);
+ }
+
+ private void runApplicationArchiveExpirationTest(boolean cleanupExpiredApplications)
+ throws Exception {
+ int numExpiredApplications = cleanupExpiredApplications ? 1 : 0;
+ int numApplications = 3;
+ int numJobsPerApplication = 1;
+
+ Map> expectedApplicationAndJobIds =
+ new HashMap<>(numApplications);
+ for (int i = 0; i < numApplications; i++) {
+ ArchivedApplication archivedApplication = mockApplicationArchive(numJobsPerApplication);
+ ApplicationID applicationId = archivedApplication.getApplicationId();
+ List jobIds =
+ archivedApplication.getJobs().values().stream()
+ .map(ExecutionGraphInfo::getJobId)
+ .collect(Collectors.toList());
+ expectedApplicationAndJobIds.put(applicationId, new HashSet<>(jobIds));
+ }
+
+ // one for application itself, numJobsPerApplication for jobs
+ int numArchivesRatio = 1 + numJobsPerApplication;
+ CountDownLatch numExpectedArchives = new CountDownLatch(numApplications * numArchivesRatio);
+ CountDownLatch firstArchiveExpiredLatch =
+ new CountDownLatch(numExpiredApplications * numArchivesRatio);
+ CountDownLatch allArchivesExpiredLatch =
+ new CountDownLatch(
+ cleanupExpiredApplications ? numApplications * numArchivesRatio : 0);
+
+ Configuration historyServerConfig =
+ createTestConfiguration(
+ HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS.defaultValue(),
+ cleanupExpiredApplications);
+ HistoryServer hs =
+ new HistoryServer(
+ historyServerConfig,
+ (event) -> {
+ throw new RuntimeException("Should not call");
+ },
+ (event) -> {
+ if (event.getType()
+ == HistoryServerApplicationArchiveFetcher.ArchiveEventType
+ .CREATED) {
+ numExpectedArchives.countDown();
+ } else if (event.getType()
+ == HistoryServerApplicationArchiveFetcher.ArchiveEventType
+ .DELETED) {
+ firstArchiveExpiredLatch.countDown();
+ allArchivesExpiredLatch.countDown();
+ }
+ });
+ try {
+ hs.start();
+ String baseUrl = "http://localhost:" + hs.getWebPort();
+ assertThat(numExpectedArchives.await(10L, TimeUnit.SECONDS)).isTrue();
+ assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl))
+ .isEqualTo(expectedApplicationAndJobIds);
+ ApplicationID applicationIdToDelete =
+ expectedApplicationAndJobIds.keySet().stream()
+ .findFirst()
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "Expected at least one application"));
+ if (cleanupExpiredApplications) {
+ expectedApplicationAndJobIds.remove(applicationIdToDelete);
+ }
+ // trigger another fetch and delete one archive from jm
+ // we fetch again to probabilistically cause a concurrent deletion
+ hs.fetchArchives();
+ deleteApplicationArchiveDir(applicationIdToDelete);
+
+ assertThat(firstArchiveExpiredLatch.await(10L, TimeUnit.SECONDS)).isTrue();
+ // check that archive is still/no longer present in hs
+ assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl))
+ .isEqualTo(expectedApplicationAndJobIds);
+ for (ApplicationID remainingApplicationId : expectedApplicationAndJobIds.keySet()) {
+ deleteApplicationArchiveDir(remainingApplicationId);
+ }
+ assertThat(allArchivesExpiredLatch.await(10L, TimeUnit.SECONDS)).isTrue();
+ assertFilesCleanedUp(cleanupExpiredApplications);
+ } finally {
+ hs.stop();
+ }
+ }
+
+ private Map> getApplicationAndJobIdsFromApplicationOverview(
+ String baseUrl) throws Exception {
+ Set applicationIds =
+ getApplicationsOverview(baseUrl).getApplications().stream()
+ .map(ApplicationDetails::getApplicationId)
+ .collect(Collectors.toSet());
+ Map> applicationAndJobIds = new HashMap<>(applicationIds.size());
+ for (ApplicationID applicationId : applicationIds) {
+ Set jobIds =
+ getApplicationDetails(baseUrl, applicationId).getJobs().stream()
+ .map(JobDetails::getJobId)
+ .collect(Collectors.toSet());
+ applicationAndJobIds.put(applicationId, jobIds);
+ }
+ return applicationAndJobIds;
+ }
+
+ private static MultipleApplicationsDetails getApplicationsOverview(String baseUrl)
+ throws Exception {
+ Tuple2 response =
+ HttpUtils.getFromHTTP(baseUrl + ApplicationsOverviewHeaders.URL);
+ return OBJECT_MAPPER.readValue(response.f1, MultipleApplicationsDetails.class);
+ }
+
+ private static ApplicationDetailsInfo getApplicationDetails(
+ String baseUrl, ApplicationID applicationId) throws Exception {
+ Tuple2 response =
+ HttpUtils.getFromHTTP(
+ baseUrl
+ + ApplicationDetailsHeaders.URL.replace(
+ ':' + ApplicationIDPathParameter.KEY,
+ applicationId.toString()));
+ return OBJECT_MAPPER.readValue(response.f1, ApplicationDetailsInfo.class);
+ }
+
+ private ArchivedApplication mockApplicationArchive(int numJobs) throws IOException {
+ ArchivedApplication archivedApplication = createArchivedApplication(numJobs);
+ ApplicationID applicationId = archivedApplication.getApplicationId();
+ ArchivedJson archivedApplicationsOverview =
+ new ArchivedJson(
+ ApplicationsOverviewHeaders.URL,
+ new MultipleApplicationsDetails(
+ Collections.singleton(
+ ApplicationDetails.fromArchivedApplication(
+ archivedApplication))));
+ ArchivedJson archivedApplicationDetails =
+ new ArchivedJson(
+ ApplicationDetailsHeaders.URL.replace(
+ ':' + ApplicationIDPathParameter.KEY, applicationId.toString()),
+ ApplicationDetailsInfo.fromArchivedApplication(archivedApplication));
+ // set cluster id to application id to simplify the test
+ clusterConfig.set(ClusterOptions.CLUSTER_ID, applicationId.toString());
+ FsJsonArchivist.writeArchivedJsons(
+ ArchivePathUtils.getApplicationArchivePath(clusterConfig, applicationId),
+ Arrays.asList(archivedApplicationsOverview, archivedApplicationDetails));
+
+ Map jobs = archivedApplication.getJobs();
+ for (Map.Entry jobEntry : jobs.entrySet()) {
+ mockJobArchive(jobEntry.getValue(), applicationId);
+ }
+ return archivedApplication;
+ }
+
+ private void mockJobArchive(
+ ExecutionGraphInfo executionGraphInfo, @Nullable ApplicationID applicationId)
+ throws IOException {
+ JobID jobId = executionGraphInfo.getJobId();
+ ArchivedJson archivedJobsOverview =
+ new ArchivedJson(
+ JobsOverviewHeaders.URL,
+ new MultipleJobsDetails(
+ Collections.singleton(
+ JobDetails.createDetailsForJob(
+ executionGraphInfo.getArchivedExecutionGraph()))));
+ FsJsonArchivist.writeArchivedJsons(
+ ArchivePathUtils.getJobArchivePath(clusterConfig, jobId, applicationId),
+ Collections.singletonList(archivedJobsOverview));
+ }
+
+ private ArchivedApplication createArchivedApplication(int numJobs) {
+ ApplicationID applicationId = ApplicationID.generate();
+ Map jobs = new HashMap<>(numJobs);
+ for (int i = 0; i < numJobs; i++) {
+ ExecutionGraphInfo executionGraphInfo = createExecutionGraphInfo();
+ jobs.put(executionGraphInfo.getJobId(), executionGraphInfo);
+ }
+ return new ArchivedApplication(
+ applicationId,
+ "test-application",
+ ApplicationState.FINISHED,
+ new long[ApplicationState.values().length],
+ jobs);
+ }
+
+ private ExecutionGraphInfo createExecutionGraphInfo() {
+ return new ExecutionGraphInfo(
+ ArchivedExecutionGraph.createSparseArchivedExecutionGraph(
+ JobID.generate(), "test-job", JobStatus.FINISHED, null, null, null, 0));
+ }
+
+ private void deleteApplicationArchiveDir(ApplicationID applicationId) throws IOException {
+ // set cluster id to application id to simplify the test
+ clusterConfig.set(ClusterOptions.CLUSTER_ID, applicationId.toString());
+ org.apache.flink.core.fs.Path applicationArchiveDir =
+ ArchivePathUtils.getApplicationArchivePath(clusterConfig, applicationId)
+ .getParent();
+ applicationArchiveDir.getFileSystem().delete(applicationArchiveDir, true);
}
private static final class JsonObject implements AutoCloseable {
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategyTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategyTest.java
similarity index 64%
rename from flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategyTest.java
rename to flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategyTest.java
index e8983967df537..d1d6124b3570f 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategyTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategyTest.java
@@ -18,31 +18,49 @@
package org.apache.flink.runtime.webmonitor.history.retaining;
+import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.Path;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import java.time.Duration;
import java.time.Instant;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_APPLICATIONS;
import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS;
import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_TTL;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-/** Testing for {@link CompositeJobRetainedStrategy}. */
-class CompositeJobRetainedStrategyTest {
+/** Testing for {@link CompositeArchiveRetainedStrategy}. */
+class CompositeArchiveRetainedStrategyTest {
+
+ private static Stream getTestCases() {
+ return Stream.of(
+ new TestCase(
+ "Legacy Jobs",
+ HISTORY_SERVER_RETAINED_JOBS,
+ CompositeArchiveRetainedStrategy::createForJobFromConfig),
+ new TestCase(
+ "Applications",
+ HISTORY_SERVER_RETAINED_APPLICATIONS,
+ CompositeArchiveRetainedStrategy::createForApplicationFromConfig));
+ }
- @Test
- void testTimeToLiveBasedJobRetainedStrategy() {
+ @ParameterizedTest(name = "{index}: {0}")
+ @MethodSource("getTestCases")
+ void testTimeToLiveBasedArchiveRetainedStrategy(TestCase testCase) {
final Configuration conf = new Configuration();
// Test for invalid option value.
conf.set(HISTORY_SERVER_RETAINED_TTL, Duration.ZERO);
- assertThatThrownBy(() -> CompositeJobRetainedStrategy.createFrom(conf))
+ assertThatThrownBy(() -> testCase.createStrategy(conf))
.isInstanceOf(IllegalConfigurationException.class);
// Skipped for option value that is less than 0 milliseconds, which will throw a
// java.lang.NumberFormatException caused by TimeUtils.
@@ -51,7 +69,7 @@ void testTimeToLiveBasedJobRetainedStrategy() {
// Test the case where no specific retention policy is configured, i.e., all archived files
// are retained.
- JobRetainedStrategy strategy = CompositeJobRetainedStrategy.createFrom(conf);
+ ArchiveRetainedStrategy strategy = testCase.createStrategy(conf);
assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue();
assertThat(
strategy.shouldRetain(
@@ -63,7 +81,7 @@ void testTimeToLiveBasedJobRetainedStrategy() {
// Test the case where TTL-based retention policies is specified only.
conf.set(HISTORY_SERVER_RETAINED_TTL, Duration.ofMinutes(1L));
- strategy = CompositeJobRetainedStrategy.createFrom(conf);
+ strategy = testCase.createStrategy(conf);
assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue();
assertThat(
strategy.shouldRetain(
@@ -74,35 +92,37 @@ void testTimeToLiveBasedJobRetainedStrategy() {
.isFalse();
}
- @Test
- void testQuantityBasedJobRetainedStrategy() {
+ @ParameterizedTest(name = "{index}: {0}")
+ @MethodSource("getTestCases")
+ void testQuantityBasedArchiveRetainedStrategy(TestCase testCase) {
final Configuration conf = new Configuration();
// Test for invalid option value.
- conf.set(HISTORY_SERVER_RETAINED_JOBS, 0);
- assertThatThrownBy(() -> CompositeJobRetainedStrategy.createFrom(conf))
+ conf.set(testCase.getQuantityConfigOption(), 0);
+ assertThatThrownBy(() -> testCase.createStrategy(conf))
.isInstanceOf(IllegalConfigurationException.class);
- conf.set(HISTORY_SERVER_RETAINED_JOBS, -2);
- assertThatThrownBy(() -> CompositeJobRetainedStrategy.createFrom(conf))
+ conf.set(testCase.getQuantityConfigOption(), -2);
+ assertThatThrownBy(() -> testCase.createStrategy(conf))
.isInstanceOf(IllegalConfigurationException.class);
- conf.removeConfig(HISTORY_SERVER_RETAINED_JOBS);
+ conf.removeConfig(testCase.getQuantityConfigOption());
// Test the case where no specific retention policy is configured, i.e., all archived files
// are retained.
- JobRetainedStrategy strategy = CompositeJobRetainedStrategy.createFrom(conf);
+ ArchiveRetainedStrategy strategy = testCase.createStrategy(conf);
assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue();
assertThat(strategy.shouldRetain(new TestingFileStatus(), 3)).isTrue();
// Test the case where QUANTITY-based retention policies is specified only.
- conf.set(HISTORY_SERVER_RETAINED_JOBS, 2);
- strategy = CompositeJobRetainedStrategy.createFrom(conf);
+ conf.set(testCase.getQuantityConfigOption(), 2);
+ strategy = testCase.createStrategy(conf);
assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue();
assertThat(strategy.shouldRetain(new TestingFileStatus(), 3)).isFalse();
}
- @Test
- void testCompositeBasedJobRetainedStrategy() {
+ @ParameterizedTest(name = "{index}: {0}")
+ @MethodSource("getTestCases")
+ void testCompositeBasedArchiveRetainedStrategy(TestCase testCase) {
final long outOfTtlMillis =
Instant.now().toEpochMilli() - Duration.ofMinutes(2L).toMillis();
@@ -110,7 +130,7 @@ void testCompositeBasedJobRetainedStrategy() {
// Test the case where no specific retention policy is configured, i.e., all archived files
// are retained.
final Configuration conf = new Configuration();
- JobRetainedStrategy strategy = CompositeJobRetainedStrategy.createFrom(conf);
+ ArchiveRetainedStrategy strategy = testCase.createStrategy(conf);
assertThat(strategy.shouldRetain(new TestingFileStatus(outOfTtlMillis), 1)).isTrue();
assertThat(strategy.shouldRetain(new TestingFileStatus(), 10)).isTrue();
assertThat(strategy.shouldRetain(new TestingFileStatus(outOfTtlMillis), 3)).isTrue();
@@ -118,8 +138,8 @@ void testCompositeBasedJobRetainedStrategy() {
// Test the case where both retention policies are specified.
conf.set(HISTORY_SERVER_RETAINED_TTL, Duration.ofMinutes(1));
- conf.set(HISTORY_SERVER_RETAINED_JOBS, 2);
- strategy = CompositeJobRetainedStrategy.createFrom(conf);
+ conf.set(testCase.getQuantityConfigOption(), 2);
+ strategy = testCase.createStrategy(conf);
assertThat(strategy.shouldRetain(new TestingFileStatus(outOfTtlMillis), 1)).isFalse();
assertThat(strategy.shouldRetain(new TestingFileStatus(), 10)).isFalse();
assertThat(strategy.shouldRetain(new TestingFileStatus(outOfTtlMillis), 3)).isFalse();
@@ -173,4 +193,32 @@ public Path getPath() {
return null;
}
}
+
+ private static final class TestCase {
+ private final String testName;
+ private final ConfigOption quantityConfigOption;
+ private final Function strategyFunction;
+
+ TestCase(
+ String testName,
+ ConfigOption quantityConfigOption,
+ Function strategyFunction) {
+ this.testName = testName;
+ this.quantityConfigOption = quantityConfigOption;
+ this.strategyFunction = strategyFunction;
+ }
+
+ ArchiveRetainedStrategy createStrategy(Configuration conf) {
+ return strategyFunction.apply(conf);
+ }
+
+ ConfigOption getQuantityConfigOption() {
+ return quantityConfigOption;
+ }
+
+ @Override
+ public String toString() {
+ return testName;
+ }
+ }
}