diff --git a/docs/layouts/shortcodes/generated/history_server_configuration.html b/docs/layouts/shortcodes/generated/history_server_configuration.html index 8b171ba627c41..db696545a1809 100644 --- a/docs/layouts/shortcodes/generated/history_server_configuration.html +++ b/docs/layouts/shortcodes/generated/history_server_configuration.html @@ -8,11 +8,17 @@ + +
historyserver.archive.clean-expired-applications
+ false + Boolean + Whether HistoryServer should cleanup applications that are no longer present in the archive directory defined by historyserver.archive.fs.dir. +
historyserver.archive.clean-expired-jobs
false Boolean - Whether HistoryServer should cleanup jobs that are no longer present `historyserver.archive.fs.dir`. + Whether HistoryServer should cleanup jobs that are no longer present in the archive directory defined by historyserver.archive.fs.dir.
Note: This option applies only to legacy job archives created before the introduction of application archiving (FLINK-38761).
historyserver.archive.fs.dir
@@ -26,17 +32,23 @@ Duration Interval for refreshing the archived job directories. + +
historyserver.archive.retained-applications
+ -1 + Integer + The maximum number of applications to retain in each archive directory defined by historyserver.archive.fs.dir. This option works together with the TTL (see historyserver.archive.retained-ttl). Archived entities will be removed if their TTL has expired or the retention count limit has been reached.
If set to -1 (default), there is no limit to the number of archives. If set to 0 or less than -1, HistoryServer will throw an IllegalConfigurationException.
Note, when there are multiple history server instances, two recommended approaches when using this option are: +
historyserver.archive.retained-jobs
-1 Integer - The maximum number of jobs to retain in each archive directory defined by historyserver.archive.fs.dir. If set to 0 or less than -1, HistoryServer will throw an IllegalConfigurationException.
Note, when there are multiple history server instances, two recommended approaches when using this option are: + The maximum number of jobs to retain in each archive directory defined by historyserver.archive.fs.dir. If set to 0 or less than -1, HistoryServer will throw an IllegalConfigurationException.
Note, when there are multiple history server instances, two recommended approaches when using this option are:
Note: This option applies only to legacy job archives created before the introduction of application archiving (FLINK-38761).
historyserver.archive.retained-ttl
(none) Duration - The time-to-live duration to retain the jobs archived in each archive directory defined by historyserver.archive.fs.dir. If set to equal to or less than 0 milliseconds, HistoryServer will throw an IllegalConfigurationException.
Note, when there are multiple history server instances, two recommended approaches when using this option are: + The time-to-live duration to retain the archived entities (jobs and applications) in each archive directory defined by historyserver.archive.fs.dir. This option works together with the retention count limits (see historyserver.archive.retained-applications and historyserver.archive.retained-jobs). Archived entities will be removed if their TTL has expired or the retention count limit has been reached.
If set to equal to or less than 0 milliseconds, HistoryServer will throw an IllegalConfigurationException.
Note, when there are multiple history server instances, two recommended approaches when using this option are:
historyserver.log.jobmanager.url-pattern
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java index 1628d266f3516..1677dc2e1a035 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java @@ -51,17 +51,6 @@ public class HistoryServerOptions { + " monitor these directories for archived jobs. You can configure the JobManager to archive jobs to a" + " directory via `jobmanager.archive.fs.dir`."); - /** If this option is enabled then deleted job archives are also deleted from HistoryServer. */ - public static final ConfigOption HISTORY_SERVER_CLEANUP_EXPIRED_JOBS = - key("historyserver.archive.clean-expired-jobs") - .booleanType() - .defaultValue(false) - .withDescription( - String.format( - "Whether HistoryServer should cleanup jobs" - + " that are no longer present `%s`.", - HISTORY_SERVER_ARCHIVE_DIRS.key())); - /** * Pattern of the log URL of TaskManager. The HistoryServer will generate actual URLs from it. */ @@ -137,6 +126,24 @@ public class HistoryServerOptions { "Specify the option in only one HistoryServer instance to avoid errors caused by multiple instances simultaneously cleaning up remote files, "; private static final String CONFIGURE_CONSISTENT = "Or you can keep the value of this configuration consistent across them. "; + private static final String LEGACY_NOTE_MESSAGE = + "Note: This option applies only to legacy job archives created before the introduction of application archiving (FLINK-38761)."; + private static final String RETAINED_STRATEGY_MESSAGE = + "Archived entities will be removed if their TTL has expired or the retention count limit has been reached. "; + + /** If this option is enabled then deleted job archives are also deleted from HistoryServer. */ + public static final ConfigOption HISTORY_SERVER_CLEANUP_EXPIRED_JOBS = + key("historyserver.archive.clean-expired-jobs") + .booleanType() + .defaultValue(false) + .withDescription( + Description.builder() + .text( + "Whether HistoryServer should cleanup jobs that are no longer present in the archive directory defined by %s. ", + code(HISTORY_SERVER_ARCHIVE_DIRS.key())) + .linebreak() + .text(LEGACY_NOTE_MESSAGE) + .build()); public static final ConfigOption HISTORY_SERVER_RETAINED_JOBS = key(HISTORY_SERVER_RETAINED_JOBS_KEY) @@ -164,6 +171,52 @@ public class HistoryServerOptions { code("IllegalConfigurationException")) .linebreak() .text(NOTE_MESSAGE) + .list( + text(CONFIGURE_SINGLE_INSTANCE), + text(CONFIGURE_CONSISTENT)) + .linebreak() + .text(LEGACY_NOTE_MESSAGE) + .build()); + + /** + * If this option is enabled then deleted application archives are also deleted from + * HistoryServer. + */ + public static final ConfigOption HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS = + key("historyserver.archive.clean-expired-applications") + .booleanType() + .defaultValue(false) + .withDescription( + Description.builder() + .text( + "Whether HistoryServer should cleanup applications that are no longer present in the archive directory defined by %s. ", + code(HISTORY_SERVER_ARCHIVE_DIRS.key())) + .build()); + + public static final ConfigOption HISTORY_SERVER_RETAINED_APPLICATIONS = + key("historyserver.archive.retained-applications") + .intType() + .defaultValue(-1) + .withDescription( + Description.builder() + .text( + "The maximum number of applications to retain in each archive directory defined by %s. ", + code(HISTORY_SERVER_ARCHIVE_DIRS.key())) + .text( + "This option works together with the TTL (see %s). ", + code(HISTORY_SERVER_RETAINED_TTL_KEY)) + .text(RETAINED_STRATEGY_MESSAGE) + .linebreak() + .text( + "If set to %s (default), there is no limit to the number of archives. ", + code("-1")) + .text( + "If set to %s or less than %s, HistoryServer will throw an %s. ", + code("0"), + code("-1"), + code("IllegalConfigurationException")) + .linebreak() + .text(NOTE_MESSAGE) .list( text(CONFIGURE_SINGLE_INSTANCE), text(CONFIGURE_CONSISTENT)) @@ -176,18 +229,14 @@ public class HistoryServerOptions { .withDescription( Description.builder() .text( - "The time-to-live duration to retain the jobs archived in each archive directory defined by %s. ", + "The time-to-live duration to retain the archived entities (jobs and applications) in each archive directory defined by %s. ", code(HISTORY_SERVER_ARCHIVE_DIRS.key())) - .list( - text( - "If the option is not specified without specifying %s, all of the jobs archives will be retained. ", - code(HISTORY_SERVER_RETAINED_JOBS_KEY)), - text( - "If the option is specified without specifying %s, the jobs archive whose modification time in the time-to-live duration will be retained. ", - code(HISTORY_SERVER_RETAINED_JOBS_KEY)), - text( - "If this option is specified as a positive time duration together with the %s option, the job archive will be removed if its TTL has expired or the retained job count has been reached. ", - code(HISTORY_SERVER_RETAINED_JOBS_KEY))) + .text( + "This option works together with the retention count limits (see %s and %s). ", + code(HISTORY_SERVER_RETAINED_APPLICATIONS.key()), + code(HISTORY_SERVER_RETAINED_JOBS_KEY)) + .text(RETAINED_STRATEGY_MESSAGE) + .linebreak() .text( "If set to equal to or less than %s milliseconds, HistoryServer will throw an %s. ", code("0"), code("IllegalConfigurationException")) diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java index 69d14e7fecbdf..9564815609359 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java @@ -37,7 +37,7 @@ import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.Runnables; -import org.apache.flink.runtime.webmonitor.history.retaining.CompositeJobRetainedStrategy; +import org.apache.flink.runtime.webmonitor.history.retaining.CompositeArchiveRetainedStrategy; import org.apache.flink.runtime.webmonitor.utils.LogUrlUtil; import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap; import org.apache.flink.util.ExceptionUtils; @@ -93,6 +93,8 @@ *
  • /config *
  • /joboverview *
  • /jobs/:jobid/* + *
  • /applications/overview + *
  • /applications/:applicationid/* * * *

    and relies on static files that are served by the {@link @@ -110,8 +112,18 @@ public class HistoryServer { private final long webRefreshIntervalMillis; private final File webDir; + /** + * The archive fetcher is responsible for fetching job archives that are not part of an + * application (legacy jobs created before application archiving was introduced in FLINK-38761). + */ private final HistoryServerArchiveFetcher archiveFetcher; + /** + * The archive fetcher is responsible for fetching application archives and their associated job + * archives. + */ + private final HistoryServerApplicationArchiveFetcher applicationArchiveFetcher; + @Nullable private final SSLHandlerFactory serverSSLFactory; private WebFrontendBootstrap netty; @@ -161,7 +173,7 @@ public Integer call() throws Exception { } public HistoryServer(Configuration config) throws IOException, FlinkException { - this(config, (event) -> {}); + this(config, (event) -> {}, (event) -> {}); } /** @@ -175,7 +187,9 @@ public HistoryServer(Configuration config) throws IOException, FlinkException { */ public HistoryServer( Configuration config, - Consumer jobArchiveEventListener) + Consumer jobArchiveEventListener, + Consumer + applicationArchiveEventListener) throws IOException, FlinkException { Preconditions.checkNotNull(config); Preconditions.checkNotNull(jobArchiveEventListener); @@ -199,8 +213,10 @@ public HistoryServer( webDir = clearWebDir(config); - boolean cleanupExpiredArchives = + boolean cleanupExpiredJobs = config.get(HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS); + boolean cleanupExpiredApplications = + config.get(HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS); String refreshDirectories = config.get(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS); if (refreshDirectories == null) { @@ -235,8 +251,15 @@ public HistoryServer( refreshDirs, webDir, jobArchiveEventListener, - cleanupExpiredArchives, - CompositeJobRetainedStrategy.createFrom(config)); + cleanupExpiredJobs, + CompositeArchiveRetainedStrategy.createForJobFromConfig(config)); + applicationArchiveFetcher = + new HistoryServerApplicationArchiveFetcher( + refreshDirs, + webDir, + applicationArchiveEventListener, + cleanupExpiredApplications, + CompositeArchiveRetainedStrategy.createForApplicationFromConfig(config)); this.shutdownHook = ShutdownHookUtil.addShutdownHook( @@ -339,7 +362,11 @@ void start() throws IOException, InterruptedException { private Runnable getArchiveFetchingRunnable() { return Runnables.withUncaughtExceptionHandler( - () -> archiveFetcher.fetchArchives(), FatalExitExceptionHandler.INSTANCE); + () -> { + archiveFetcher.fetchArchives(); + applicationArchiveFetcher.fetchArchives(); + }, + FatalExitExceptionHandler.INSTANCE); } void stop() { diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerApplicationArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerApplicationArchiveFetcher.java new file mode 100644 index 0000000000000..310e91b77538e --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerApplicationArchiveFetcher.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.history; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.history.ArchivePathUtils; +import org.apache.flink.runtime.history.FsJsonArchivist; +import org.apache.flink.runtime.messages.webmonitor.ApplicationDetails; +import org.apache.flink.runtime.messages.webmonitor.MultipleApplicationsDetails; +import org.apache.flink.runtime.rest.messages.ApplicationsOverviewHeaders; +import org.apache.flink.runtime.webmonitor.history.retaining.ArchiveRetainedStrategy; +import org.apache.flink.util.FileUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; + +/** + * This class is used by the {@link HistoryServer} to fetch the application and job archives that + * are located at {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS}. The directories are + * polled in regular intervals, defined by {@link + * HistoryServerOptions#HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL}. + * + *

    The archives are downloaded and expanded into a file structure analog to the REST API. + * + *

    Removes existing archives from these directories and the cache according to {@link + * ArchiveRetainedStrategy} and {@link + * HistoryServerOptions#HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS}. + */ +public class HistoryServerApplicationArchiveFetcher extends HistoryServerArchiveFetcher { + + private static final Logger LOG = + LoggerFactory.getLogger(HistoryServerApplicationArchiveFetcher.class); + + private static final String APPLICATIONS_SUBDIR = "applications"; + private static final String APPLICATION_OVERVIEWS_SUBDIR = "application-overviews"; + + private final Map>> cachedApplicationIdsToJobIds = + new HashMap<>(); + + private final File webApplicationDir; + private final File webApplicationsOverviewDir; + + HistoryServerApplicationArchiveFetcher( + List refreshDirs, + File webDir, + Consumer archiveEventListener, + boolean cleanupExpiredArchives, + ArchiveRetainedStrategy retainedStrategy) + throws IOException { + super(refreshDirs, webDir, archiveEventListener, cleanupExpiredArchives, retainedStrategy); + + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + cachedApplicationIdsToJobIds.put(refreshDir.getPath(), new HashMap<>()); + } + this.webApplicationDir = new File(webDir, APPLICATIONS_SUBDIR); + Files.createDirectories(webApplicationDir.toPath()); + this.webApplicationsOverviewDir = new File(webDir, APPLICATION_OVERVIEWS_SUBDIR); + Files.createDirectories(webApplicationsOverviewDir.toPath()); + updateApplicationOverview(); + } + + @Override + List listValidArchives(FileSystem refreshFS, Path refreshDir) throws IOException { + List applicationArchiveDirs = new ArrayList<>(); + FileStatus[] clusterDirs = refreshFS.listStatus(refreshDir); + if (clusterDirs == null) { + // the entire refreshDirectory was removed + return applicationArchiveDirs; + } + + // Check for application archive directories in the cluster directories and named according + // to the application ID format + for (FileStatus clusterDir : clusterDirs) { + if (clusterDir.isDir() && isValidId(clusterDir.getPath().getName(), refreshDir)) { + Path applicationsDir = + new Path(clusterDir.getPath(), ArchivePathUtils.APPLICATIONS_DIR); + FileStatus[] applicationDirs = refreshFS.listStatus(applicationsDir); + if (applicationDirs == null) { + // the entire applicationsDirectory was removed + return applicationArchiveDirs; + } + + for (FileStatus applicationDir : applicationDirs) { + if (applicationDir.isDir() + && isValidId(applicationDir.getPath().getName(), refreshDir)) { + applicationArchiveDirs.add(applicationDir); + } + } + } + } + + return applicationArchiveDirs; + } + + private boolean isValidId(String id, Path refreshDir) { + try { + ApplicationID.fromHexString(id); + return true; + } catch (IllegalArgumentException iae) { + LOG.debug( + "Archive directory {} contained file with unexpected name {}. Ignoring file.", + refreshDir, + id, + iae); + return false; + } + } + + @Override + List processArchive(String archiveId, Path archivePath, Path refreshDir) + throws IOException { + FileSystem fs = archivePath.getFileSystem(); + Path applicationArchive = new Path(archivePath, ArchivePathUtils.APPLICATION_ARCHIVE_NAME); + if (!fs.exists(applicationArchive)) { + throw new IOException("Application archive " + applicationArchive + " does not exist."); + } + + List events = new ArrayList<>(); + events.add(processApplicationArchive(archiveId, applicationArchive)); + + Path jobArchivesDir = new Path(archivePath, ArchivePathUtils.JOBS_DIR); + + List jobArchives = listValidJobArchives(fs, jobArchivesDir); + for (FileStatus jobArchive : jobArchives) { + String jobId = jobArchive.getPath().getName(); + cachedApplicationIdsToJobIds + .get(refreshDir) + .computeIfAbsent(archiveId, k -> new HashSet<>()) + .add(jobId); + events.add(processJobArchive(jobId, jobArchive.getPath())); + } + + return events; + } + + private ArchiveEvent processApplicationArchive(String applicationId, Path applicationArchive) + throws IOException { + for (ArchivedJson archive : FsJsonArchivist.readArchivedJsons(applicationArchive)) { + String path = archive.getPath(); + String json = archive.getJson(); + + File target; + if (path.equals(ApplicationsOverviewHeaders.URL)) { + target = new File(webApplicationsOverviewDir, applicationId + JSON_FILE_ENDING); + } else { + // this implicitly writes into webApplicationDir + target = new File(webDir, path + JSON_FILE_ENDING); + } + + writeTargetFile(target, json); + } + + return new ArchiveEvent(applicationId, ArchiveEventType.CREATED); + } + + @Override + void deleteFromRemote(Path archive) throws IOException { + // delete application archive directory recursively (including all its job archives) + archive.getFileSystem().delete(archive, true); + } + + @Override + List deleteCachedArchives(String archiveId, Path refreshDir) { + LOG.info("Archive directories for application {} is deleted", archiveId); + List deleteLog = new ArrayList<>(); + + deleteLog.add(deleteApplicationFiles(archiveId)); + + Set jobIds = cachedApplicationIdsToJobIds.get(refreshDir).remove(archiveId); + if (jobIds != null) { + jobIds.forEach(jobId -> deleteLog.add(deleteJobFiles(jobId))); + } + + return deleteLog; + } + + private ArchiveEvent deleteApplicationFiles(String applicationId) { + // Make sure we do not include this application in the overview + try { + Files.deleteIfExists( + new File(webApplicationsOverviewDir, applicationId + JSON_FILE_ENDING) + .toPath()); + } catch (IOException ioe) { + LOG.warn("Could not delete file from overview directory.", ioe); + } + + // Clean up application files we may have created + File applicationDirectory = new File(webApplicationDir, applicationId); + try { + FileUtils.deleteDirectory(applicationDirectory); + } catch (IOException ioe) { + LOG.warn("Could not clean up application directory.", ioe); + } + + try { + Files.deleteIfExists( + new File(webApplicationDir, applicationId + JSON_FILE_ENDING).toPath()); + } catch (IOException ioe) { + LOG.warn("Could not delete file from application directory.", ioe); + } + + return new ArchiveEvent(applicationId, ArchiveEventType.DELETED); + } + + @Override + void updateOverview() { + updateApplicationOverview(); + updateJobOverview(); + } + + /** + * This method replicates the JSON response that would be given by the + * ApplicationsOverviewHandler when listing applications. + * + *

    Every application archive contains an overview entry with the same structure. Since + * applications are archived on their own however the list of applications only contains a + * single application. + * + *

    For the display in the HistoryServer WebFrontend we have to combine these overviews. + */ + private void updateApplicationOverview() { + try (JsonGenerator gen = + jacksonFactory.createGenerator( + HistoryServer.createOrGetFile(webDir, ApplicationsOverviewHeaders.URL))) { + File[] overviews = new File(webApplicationsOverviewDir.getPath()).listFiles(); + if (overviews != null) { + Collection allApplications = new ArrayList<>(overviews.length); + for (File overview : overviews) { + MultipleApplicationsDetails subApplications = + mapper.readValue(overview, MultipleApplicationsDetails.class); + allApplications.addAll(subApplications.getApplications()); + } + mapper.writeValue(gen, new MultipleApplicationsDetails(allApplications)); + } + } catch (IOException ioe) { + LOG.error("Failed to update application overview.", ioe); + } + } +} diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java index 4fe8bd58d5b7f..59eb258b3ed1e 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java @@ -29,7 +29,7 @@ import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; -import org.apache.flink.runtime.webmonitor.history.retaining.JobRetainedStrategy; +import org.apache.flink.runtime.webmonitor.history.retaining.ArchiveRetainedStrategy; import org.apache.flink.util.FileUtils; import org.apache.flink.util.jackson.JacksonMapperFactory; @@ -48,7 +48,6 @@ import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -68,34 +67,31 @@ * *

    The archives are downloaded and expanded into a file structure analog to the REST API. * - *

    Removes existing archives from these directories and the cache if configured by {@link - * HistoryServerOptions#HISTORY_SERVER_CLEANUP_EXPIRED_JOBS} or {@link - * HistoryServerOptions#HISTORY_SERVER_RETAINED_JOBS}. + *

    Removes existing archives from these directories and the cache according to {@link + * ArchiveRetainedStrategy} and {@link HistoryServerOptions#HISTORY_SERVER_CLEANUP_EXPIRED_JOBS}. */ class HistoryServerArchiveFetcher { - /** Possible job archive operations in history-server. */ + /** Possible archive operations in history-server. */ public enum ArchiveEventType { - /** Job archive was found in one refresh location and created in history server. */ + /** Archive was found in one refresh location and created in history server. */ CREATED, - /** - * Job archive was deleted from one of refresh locations and deleted from history server. - */ + /** Archive was deleted from one of refresh locations and deleted from history server. */ DELETED } - /** Representation of job archive event. */ + /** Representation of archive event. */ public static class ArchiveEvent { - private final String jobID; + private final String id; private final ArchiveEventType operation; - ArchiveEvent(String jobID, ArchiveEventType operation) { - this.jobID = jobID; + ArchiveEvent(String id, ArchiveEventType operation) { + this.id = id; this.operation = operation; } - public String getJobID() { - return jobID; + public String getId() { + return id; } public ArchiveEventType getType() { @@ -105,48 +101,50 @@ public ArchiveEventType getType() { private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); - private static final JsonFactory jacksonFactory = new JsonFactory(); - private static final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper(); + protected static final String JSON_FILE_ENDING = ".json"; + protected static final String JOBS_SUBDIR = "jobs"; + protected static final String JOB_OVERVIEWS_SUBDIR = "overviews"; - private static final String JSON_FILE_ENDING = ".json"; + protected final JsonFactory jacksonFactory = new JsonFactory(); + protected final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper(); - private final List refreshDirs; - private final Consumer jobArchiveEventListener; - private final boolean processExpiredArchiveDeletion; - private final JobRetainedStrategy jobRetainedStrategy; + protected final List refreshDirs; + protected final Consumer archiveEventListener; + protected final boolean processExpiredArchiveDeletion; + protected final ArchiveRetainedStrategy retainedStrategy; - /** Cache of all available jobs identified by their id. */ - private final Map> cachedArchivesPerRefreshDirectory; + /** Cache of all available archives identified by their id. */ + protected final Map> cachedArchivesPerRefreshDirectory; - private final File webDir; - private final File webJobDir; - private final File webOverviewDir; + protected final File webDir; + protected final File webJobDir; + protected final File webOverviewDir; HistoryServerArchiveFetcher( List refreshDirs, File webDir, - Consumer jobArchiveEventListener, + Consumer archiveEventListener, boolean cleanupExpiredArchives, - JobRetainedStrategy jobRetainedStrategy) + ArchiveRetainedStrategy retainedStrategy) throws IOException { this.refreshDirs = checkNotNull(refreshDirs); - this.jobArchiveEventListener = jobArchiveEventListener; + this.archiveEventListener = archiveEventListener; this.processExpiredArchiveDeletion = cleanupExpiredArchives; - this.jobRetainedStrategy = checkNotNull(jobRetainedStrategy); + this.retainedStrategy = checkNotNull(retainedStrategy); this.cachedArchivesPerRefreshDirectory = new HashMap<>(); for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { cachedArchivesPerRefreshDirectory.put(refreshDir.getPath(), new HashSet<>()); } this.webDir = checkNotNull(webDir); - this.webJobDir = new File(webDir, "jobs"); + this.webJobDir = new File(webDir, JOBS_SUBDIR); Files.createDirectories(webJobDir.toPath()); - this.webOverviewDir = new File(webDir, "overviews"); + this.webOverviewDir = new File(webDir, JOB_OVERVIEWS_SUBDIR); Files.createDirectories(webOverviewDir.toPath()); - updateJobOverview(webOverviewDir, webDir); + updateJobOverview(); if (LOG.isInfoEnabled()) { for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { - LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + LOG.info("Monitoring directory {} for archives.", refreshDir.getPath()); } } } @@ -155,100 +153,102 @@ void fetchArchives() { try { LOG.debug("Starting archive fetching."); List events = new ArrayList<>(); - Map> jobsToRemove = new HashMap<>(); + Map> archivesToRemove = new HashMap<>(); cachedArchivesPerRefreshDirectory.forEach( - (path, archives) -> jobsToRemove.put(path, new HashSet<>(archives))); + (path, archives) -> archivesToRemove.put(path, new HashSet<>(archives))); Map> archivesBeyondRetainedLimit = new HashMap<>(); for (HistoryServer.RefreshLocation refreshLocation : refreshDirs) { Path refreshDir = refreshLocation.getPath(); LOG.debug("Checking archive directory {}.", refreshDir); - // contents of /:refreshDir - FileStatus[] jobArchives; + List archives; try { - jobArchives = listArchives(refreshLocation.getFs(), refreshDir); + archives = listValidArchives(refreshLocation.getFs(), refreshDir); + archives.sort( + Comparator.comparingLong(FileStatus::getModificationTime).reversed()); } catch (IOException e) { - LOG.error("Failed to access job archive location for path {}.", refreshDir, e); + LOG.error("Failed to access archive location for path {}.", refreshDir, e); // something went wrong, potentially due to a concurrent deletion - // do not remove any jobs now; we will retry later - jobsToRemove.remove(refreshDir); + // do not remove any archives now; we will retry later + archivesToRemove.remove(refreshDir); continue; } int fileOrderedIndexOnModifiedTime = 0; - for (FileStatus jobArchive : jobArchives) { - Path jobArchivePath = jobArchive.getPath(); - String jobID = jobArchivePath.getName(); - if (!isValidJobID(jobID, refreshDir)) { - continue; - } - - jobsToRemove.get(refreshDir).remove(jobID); + for (FileStatus archive : archives) { + Path archivePath = archive.getPath(); + String archiveId = archivePath.getName(); + archivesToRemove.get(refreshDir).remove(archiveId); fileOrderedIndexOnModifiedTime++; - if (!jobRetainedStrategy.shouldRetain( - jobArchive, fileOrderedIndexOnModifiedTime)) { + if (!retainedStrategy.shouldRetain(archive, fileOrderedIndexOnModifiedTime)) { archivesBeyondRetainedLimit .computeIfAbsent(refreshDir, ignored -> new HashSet<>()) - .add(jobArchivePath); + .add(archivePath); continue; } - if (cachedArchivesPerRefreshDirectory.get(refreshDir).contains(jobID)) { + if (cachedArchivesPerRefreshDirectory.get(refreshDir).contains(archiveId)) { LOG.trace( - "Ignoring archive {} because it was already fetched.", - jobArchivePath); + "Ignoring archive {} because it was already fetched.", archivePath); } else { - LOG.info("Processing archive {}.", jobArchivePath); + LOG.info("Processing archive {}.", archivePath); try { - processArchive(jobID, jobArchivePath); - events.add(new ArchiveEvent(jobID, ArchiveEventType.CREATED)); - cachedArchivesPerRefreshDirectory.get(refreshDir).add(jobID); - LOG.info("Processing archive {} finished.", jobArchivePath); + events.addAll(processArchive(archiveId, archivePath, refreshDir)); + cachedArchivesPerRefreshDirectory.get(refreshDir).add(archiveId); + LOG.info("Processing archive {} finished.", archivePath); } catch (IOException e) { LOG.error( - "Failure while fetching/processing job archive for job {}.", - jobID, - e); - deleteJobFiles(jobID); + "Failure while fetching/processing archive {}.", archiveId, e); + deleteCachedArchives(archiveId, refreshDir); } } } } - if (jobsToRemove.values().stream().flatMap(Set::stream).findAny().isPresent() + if (archivesToRemove.values().stream().flatMap(Set::stream).findAny().isPresent() && processExpiredArchiveDeletion) { - events.addAll(cleanupExpiredJobs(jobsToRemove)); + events.addAll(cleanupExpiredArchives(archivesToRemove)); } if (!archivesBeyondRetainedLimit.isEmpty()) { - events.addAll(cleanupJobsBeyondSizeLimit(archivesBeyondRetainedLimit)); + events.addAll(cleanupArchivesBeyondRetainedLimit(archivesBeyondRetainedLimit)); } if (!events.isEmpty()) { - updateJobOverview(webOverviewDir, webDir); + updateOverview(); } - events.forEach(jobArchiveEventListener::accept); + events.forEach(archiveEventListener); LOG.debug("Finished archive fetching."); } catch (Exception e) { - LOG.error("Critical failure while fetching/processing job archives.", e); + LOG.error("Critical failure while fetching/processing archives.", e); } } - private static FileStatus[] listArchives(FileSystem refreshFS, Path refreshDir) + List listValidArchives(FileSystem refreshFS, Path refreshDir) throws IOException { + return listValidJobArchives(refreshFS, refreshDir); + } + + List listValidJobArchives(FileSystem refreshFS, Path refreshDir) throws IOException { + List jobArchives = new ArrayList<>(); // contents of /:refreshDir - FileStatus[] jobArchives = refreshFS.listStatus(refreshDir); - if (jobArchives == null) { + FileStatus[] archives = refreshFS.listStatus(refreshDir); + if (archives == null) { // the entire refreshDirectory was removed - return new FileStatus[0]; + return jobArchives; } - Arrays.sort( - jobArchives, Comparator.comparingLong(FileStatus::getModificationTime).reversed()); + // Check for job archive files located directly in the refresh directory and named according + // to the job ID format + for (FileStatus archive : archives) { + if (!archive.isDir() && isValidJobId(archive.getPath().getName(), refreshDir)) { + jobArchives.add(archive); + } + } return jobArchives; } - private static boolean isValidJobID(String jobId, Path refreshDir) { + boolean isValidJobId(String jobId, Path refreshDir) { try { JobID.fromHexString(jobId); return true; @@ -262,98 +262,110 @@ private static boolean isValidJobID(String jobId, Path refreshDir) { } } - private void processArchive(String jobID, Path jobArchive) throws IOException { + List processArchive(String archiveId, Path archivePath, Path refreshDir) + throws IOException { + return Collections.singletonList(processJobArchive(archiveId, archivePath)); + } + + ArchiveEvent processJobArchive(String jobId, Path jobArchive) throws IOException { for (ArchivedJson archive : FsJsonArchivist.readArchivedJsons(jobArchive)) { String path = archive.getPath(); String json = archive.getJson(); File target; if (path.equals(JobsOverviewHeaders.URL)) { - target = new File(webOverviewDir, jobID + JSON_FILE_ENDING); + target = new File(webOverviewDir, jobId + JSON_FILE_ENDING); } else if (path.equals("/joboverview")) { // legacy path LOG.debug("Migrating legacy archive {}", jobArchive); json = convertLegacyJobOverview(json); - target = new File(webOverviewDir, jobID + JSON_FILE_ENDING); + target = new File(webOverviewDir, jobId + JSON_FILE_ENDING); } else { // this implicitly writes into webJobDir target = new File(webDir, path + JSON_FILE_ENDING); } - java.nio.file.Path parent = target.getParentFile().toPath(); + writeTargetFile(target, json); + } - try { - Files.createDirectories(parent); - } catch (FileAlreadyExistsException ignored) { - // there may be left-over directories from the previous - // attempt - } + return new ArchiveEvent(jobId, ArchiveEventType.CREATED); + } + + void writeTargetFile(File target, String json) throws IOException { + java.nio.file.Path parent = target.getParentFile().toPath(); + + try { + Files.createDirectories(parent); + } catch (FileAlreadyExistsException ignored) { + // there may be left-over directories from the previous attempt + } - java.nio.file.Path targetPath = target.toPath(); + java.nio.file.Path targetPath = target.toPath(); - // We overwrite existing files since this may be another attempt - // at fetching this archive. - // Existing files may be incomplete/corrupt. - Files.deleteIfExists(targetPath); + // We overwrite existing files since this may be another attempt + // at fetching this archive. + // Existing files may be incomplete/corrupt. + Files.deleteIfExists(targetPath); - Files.createFile(target.toPath()); - try (FileWriter fw = new FileWriter(target)) { - fw.write(json); - fw.flush(); - } + Files.createFile(target.toPath()); + try (FileWriter fw = new FileWriter(target)) { + fw.write(json); + fw.flush(); } } - private List cleanupJobsBeyondSizeLimit( - Map> jobArchivesToRemove) { - Map> allJobIdsToRemoveFromOverview = new HashMap<>(); + List cleanupArchivesBeyondRetainedLimit(Map> archivesToRemove) { + Map> allArchiveIdsToRemove = new HashMap<>(); - for (Map.Entry> pathSetEntry : jobArchivesToRemove.entrySet()) { - HashSet jobIdsToRemoveFromOverview = new HashSet<>(); + for (Map.Entry> pathSetEntry : archivesToRemove.entrySet()) { + HashSet archiveIdsToRemove = new HashSet<>(); for (Path archive : pathSetEntry.getValue()) { - jobIdsToRemoveFromOverview.add(archive.getName()); + archiveIdsToRemove.add(archive.getName()); try { - archive.getFileSystem().delete(archive, false); + deleteFromRemote(archive); } catch (IOException ioe) { - LOG.warn("Could not delete old archive " + archive, ioe); + LOG.warn("Could not delete old archive {}", archive, ioe); } } - allJobIdsToRemoveFromOverview.put(pathSetEntry.getKey(), jobIdsToRemoveFromOverview); + allArchiveIdsToRemove.put(pathSetEntry.getKey(), archiveIdsToRemove); } - return cleanupExpiredJobs(allJobIdsToRemoveFromOverview); + return cleanupExpiredArchives(allArchiveIdsToRemove); } - private List cleanupExpiredJobs(Map> jobsToRemove) { + void deleteFromRemote(Path archive) throws IOException { + archive.getFileSystem().delete(archive, false); + } + List cleanupExpiredArchives(Map> archivesToRemove) { List deleteLog = new ArrayList<>(); - LOG.info("Archive directories for jobs {} were deleted.", jobsToRemove); - jobsToRemove.forEach( - (refreshDir, archivesToRemove) -> { - cachedArchivesPerRefreshDirectory.get(refreshDir).removeAll(archivesToRemove); + archivesToRemove.forEach( + (refreshDir, archives) -> { + cachedArchivesPerRefreshDirectory.get(refreshDir).removeAll(archives); + archives.forEach( + archiveId -> + deleteLog.addAll(deleteCachedArchives(archiveId, refreshDir))); }); - jobsToRemove.values().stream() - .flatMap(Set::stream) - .forEach( - removedJobID -> { - deleteJobFiles(removedJobID); - deleteLog.add(new ArchiveEvent(removedJobID, ArchiveEventType.DELETED)); - }); return deleteLog; } - private void deleteJobFiles(String jobID) { + List deleteCachedArchives(String archiveId, Path refreshDir) { + LOG.info("Archive directories for job {} is deleted", archiveId); + return Collections.singletonList(deleteJobFiles(archiveId)); + } + + ArchiveEvent deleteJobFiles(String jobId) { // Make sure we do not include this job in the overview try { - Files.deleteIfExists(new File(webOverviewDir, jobID + JSON_FILE_ENDING).toPath()); + Files.deleteIfExists(new File(webOverviewDir, jobId + JSON_FILE_ENDING).toPath()); } catch (IOException ioe) { LOG.warn("Could not delete file from overview directory.", ioe); } // Clean up job files we may have created - File jobDirectory = new File(webJobDir, jobID); + File jobDirectory = new File(webJobDir, jobId); try { FileUtils.deleteDirectory(jobDirectory); } catch (IOException ioe) { @@ -361,13 +373,15 @@ private void deleteJobFiles(String jobID) { } try { - Files.deleteIfExists(new File(webJobDir, jobID + JSON_FILE_ENDING).toPath()); + Files.deleteIfExists(new File(webJobDir, jobId + JSON_FILE_ENDING).toPath()); } catch (IOException ioe) { LOG.warn("Could not delete file from job directory.", ioe); } + + return new ArchiveEvent(jobId, ArchiveEventType.DELETED); } - private static String convertLegacyJobOverview(String legacyOverview) throws IOException { + private String convertLegacyJobOverview(String legacyOverview) throws IOException { JsonNode root = mapper.readTree(legacyOverview); JsonNode finishedJobs = root.get("finished"); JsonNode job = finishedJobs.get(0); @@ -436,6 +450,10 @@ private static String convertLegacyJobOverview(String legacyOverview) throws IOE return sw.toString(); } + void updateOverview() { + updateJobOverview(); + } + /** * This method replicates the JSON response that would be given by the JobsOverviewHandler when * listing both running and finished jobs. @@ -445,7 +463,7 @@ private static String convertLegacyJobOverview(String legacyOverview) throws IOE * *

    For the display in the HistoryServer WebFrontend we have to combine these overviews. */ - private static void updateJobOverview(File webOverviewDir, File webDir) { + void updateJobOverview() { try (JsonGenerator gen = jacksonFactory.createGenerator( HistoryServer.createOrGetFile(webDir, JobsOverviewHeaders.URL))) { diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/JobRetainedStrategy.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/ArchiveRetainedStrategy.java similarity index 96% rename from flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/JobRetainedStrategy.java rename to flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/ArchiveRetainedStrategy.java index 2ef991698bf71..f2e50dd73c551 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/JobRetainedStrategy.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/ArchiveRetainedStrategy.java @@ -21,7 +21,7 @@ import org.apache.flink.core.fs.FileStatus; /** To define the strategy interface to judge whether the file should be retained. */ -public interface JobRetainedStrategy { +public interface ArchiveRetainedStrategy { /** * Judge whether the file should be retained. diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategy.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategy.java similarity index 63% rename from flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategy.java rename to flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategy.java index acd35c93f7903..2a38dfaeff773 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategy.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategy.java @@ -31,23 +31,42 @@ import java.util.List; import java.util.Optional; +import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_APPLICATIONS; import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS; import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_TTL; +import static org.apache.flink.util.Preconditions.checkArgument; /** The retained strategy. */ -public class CompositeJobRetainedStrategy implements JobRetainedStrategy { +public class CompositeArchiveRetainedStrategy implements ArchiveRetainedStrategy { - public static JobRetainedStrategy createFrom(ReadableConfig config) { + public static ArchiveRetainedStrategy createForJobFromConfig(ReadableConfig config) { int maxHistorySizeByOldKey = config.get(HISTORY_SERVER_RETAINED_JOBS); + if (maxHistorySizeByOldKey == 0 || maxHistorySizeByOldKey < -1) { + throw new IllegalConfigurationException( + "Cannot set %s to 0 or less than -1", HISTORY_SERVER_RETAINED_JOBS.key()); + } Optional retainedTtlOpt = config.getOptional(HISTORY_SERVER_RETAINED_TTL); - return new CompositeJobRetainedStrategy( - new QuantityJobRetainedStrategy(maxHistorySizeByOldKey), - new TimeToLiveJobRetainedStrategy(retainedTtlOpt.orElse(null))); + return new CompositeArchiveRetainedStrategy( + new QuantityArchiveRetainedStrategy(maxHistorySizeByOldKey), + new TimeToLiveArchiveRetainedStrategy(retainedTtlOpt.orElse(null))); } - private final List strategies; + public static ArchiveRetainedStrategy createForApplicationFromConfig(ReadableConfig config) { + int maxHistorySize = config.get(HISTORY_SERVER_RETAINED_APPLICATIONS); + if (maxHistorySize == 0 || maxHistorySize < -1) { + throw new IllegalConfigurationException( + "Cannot set %s to 0 or less than -1", + HISTORY_SERVER_RETAINED_APPLICATIONS.key()); + } + Optional retainedTtlOpt = config.getOptional(HISTORY_SERVER_RETAINED_TTL); + return new CompositeArchiveRetainedStrategy( + new QuantityArchiveRetainedStrategy(maxHistorySize), + new TimeToLiveArchiveRetainedStrategy(retainedTtlOpt.orElse(null))); + } + + private final List strategies; - CompositeJobRetainedStrategy(JobRetainedStrategy... strategies) { + CompositeArchiveRetainedStrategy(ArchiveRetainedStrategy... strategies) { this.strategies = strategies == null || strategies.length == 0 ? Collections.emptyList() @@ -64,11 +83,11 @@ public boolean shouldRetain(FileStatus file, int fileOrderedIndex) { } /** The time to live based retained strategy. */ -class TimeToLiveJobRetainedStrategy implements JobRetainedStrategy { +class TimeToLiveArchiveRetainedStrategy implements ArchiveRetainedStrategy { @Nullable private final Duration ttlThreshold; - TimeToLiveJobRetainedStrategy(Duration ttlThreshold) { + TimeToLiveArchiveRetainedStrategy(@Nullable Duration ttlThreshold) { if (ttlThreshold != null && ttlThreshold.toMillis() <= 0) { throw new IllegalConfigurationException( "Cannot set %s to 0 or less than 0 milliseconds", @@ -86,16 +105,13 @@ public boolean shouldRetain(FileStatus file, int fileOrderedIndex) { } } -/** The job quantity based retained strategy. */ -class QuantityJobRetainedStrategy implements JobRetainedStrategy { +/** The quantity based retained strategy. */ +class QuantityArchiveRetainedStrategy implements ArchiveRetainedStrategy { private final int quantityThreshold; - QuantityJobRetainedStrategy(int quantityThreshold) { - if (quantityThreshold == 0 || quantityThreshold < -1) { - throw new IllegalConfigurationException( - "Cannot set %s to 0 or less than -1", HISTORY_SERVER_RETAINED_JOBS.key()); - } + QuantityArchiveRetainedStrategy(int quantityThreshold) { + checkArgument(quantityThreshold == -1 || quantityThreshold > 0); this.quantityThreshold = quantityThreshold; } diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java index 537d58fae8e77..c67a3147628b5 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java @@ -18,19 +18,32 @@ package org.apache.flink.runtime.webmonitor.history; +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.api.common.ApplicationState; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ClusterOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HistoryServerOptions; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.application.ArchivedApplication; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.history.ArchivePathUtils; import org.apache.flink.runtime.history.FsJsonArchivist; +import org.apache.flink.runtime.messages.webmonitor.ApplicationDetails; +import org.apache.flink.runtime.messages.webmonitor.ApplicationDetailsInfo; import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.MultipleApplicationsDetails; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; +import org.apache.flink.runtime.rest.messages.ApplicationIDPathParameter; +import org.apache.flink.runtime.rest.messages.ApplicationsOverviewHeaders; import org.apache.flink.runtime.rest.messages.DashboardConfiguration; import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders; import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; +import org.apache.flink.runtime.rest.messages.application.ApplicationDetailsHeaders; +import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.runtime.webmonitor.testutils.HttpUtils; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -51,6 +64,8 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import javax.annotation.Nullable; + import java.io.File; import java.io.IOException; import java.io.StringWriter; @@ -58,11 +73,14 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -86,13 +104,14 @@ class HistoryServerTest { private MiniClusterWithClientResource cluster; private File jmDirectory; private File hsDirectory; + private Configuration clusterConfig; @BeforeEach void setUp(@TempDir File jmDirectory, @TempDir File hsDirectory) throws Exception { this.jmDirectory = jmDirectory; this.hsDirectory = hsDirectory; - Configuration clusterConfig = new Configuration(); + clusterConfig = new Configuration(); clusterConfig.set(JobManagerOptions.ARCHIVE_DIR, jmDirectory.toURI().toString()); cluster = @@ -130,6 +149,9 @@ void testHistoryServerIntegration(final boolean versionLessThan14) throws Except == HistoryServerArchiveFetcher.ArchiveEventType.CREATED) { numExpectedArchivedJobs.countDown(); } + }, + (event) -> { + throw new RuntimeException("Should not call"); }); try { @@ -164,10 +186,10 @@ void testRemoveOldestModifiedArchivesBeyondHistorySizeLimit(final boolean versio final int numArchivesToRemoveUponHsStart = numArchivesBeforeHsStarted - numArchivesToKeepInHistory; final long oneMinuteSinceEpoch = 1000L * 60L; - List expectedJobIdsToKeep = new LinkedList<>(); + List expectedJobIdsToKeep = new LinkedList<>(); for (int j = 0; j < numArchivesBeforeHsStarted; j++) { - String jobId = + JobID jobId = createLegacyArchive( jmDirectory.toPath(), j * oneMinuteSinceEpoch, versionLessThan14); if (j >= numArchivesToRemoveUponHsStart) { @@ -205,6 +227,9 @@ void testRemoveOldestModifiedArchivesBeyondHistorySizeLimit(final boolean versio numArchivesDeletedTotal.countDown(); break; } + }, + (event) -> { + throw new RuntimeException("Should not call"); }); try { @@ -232,10 +257,9 @@ void testRemoveOldestModifiedArchivesBeyondHistorySizeLimit(final boolean versio } } - private Set getIdsFromJobOverview(String baseUrl) throws Exception { + private Set getIdsFromJobOverview(String baseUrl) throws Exception { return getJobsOverview(baseUrl).getJobs().stream() .map(JobDetails::getJobId) - .map(JobID::toString) .collect(Collectors.toSet()); } @@ -288,15 +312,26 @@ void testClearWebDir() throws Exception { new File(hsDirectory.toURI() + "/overviews/dirtyEmptySubFile.json").createNewFile(); new File(hsDirectory.toURI() + "/jobs/dirtyEmptySubDir").mkdir(); new File(hsDirectory.toURI() + "/jobs/dirtyEmptySubFile.json").createNewFile(); + new File(hsDirectory.toURI() + "/application-overviews/dirtyEmptySubDir").mkdir(); + new File(hsDirectory.toURI() + "/application-overviews/dirtyEmptySubFile.json") + .createNewFile(); + new File(hsDirectory.toURI() + "/applications/dirtyEmptySubDir").mkdir(); + new File(hsDirectory.toURI() + "/applications/dirtyEmptySubFile.json").createNewFile(); hs = new HistoryServer(historyServerConfig); assertInitializedHistoryServerWebDir(hs.getWebDir()); } private void assertInitializedHistoryServerWebDir(File historyWebDir) { - - assertThat(historyWebDir.list()).containsExactlyInAnyOrder("overviews", "jobs"); + assertThat(historyWebDir.list()) + .containsExactlyInAnyOrder( + "overviews", "jobs", "application-overviews", "applications"); assertThat(new File(historyWebDir, "overviews")).exists().isDirectory().isEmptyDirectory(); assertThat(new File(historyWebDir, "jobs").list()).containsExactly("overview.json"); + assertThat(new File(historyWebDir, "application-overviews")) + .exists() + .isDirectory() + .isEmptyDirectory(); + assertThat(new File(historyWebDir, "applications").list()).containsExactly("overview.json"); } private void runArchiveExpirationTest(boolean cleanupExpiredJobs) throws Exception { @@ -327,6 +362,9 @@ private void runArchiveExpirationTest(boolean cleanupExpiredJobs) throws Excepti allArchivesExpiredLatch.countDown(); break; } + }, + (event) -> { + throw new RuntimeException("Should not call"); }); try { @@ -378,27 +416,33 @@ private void runArchiveExpirationTest(boolean cleanupExpiredJobs) throws Excepti assertThat(allArchivesExpiredLatch.await(10L, TimeUnit.SECONDS)).isTrue(); - assertJobFilesCleanedUp(cleanupExpiredJobs); + assertFilesCleanedUp(cleanupExpiredJobs); } finally { hs.stop(); } } - private void assertJobFilesCleanedUp(boolean jobFilesShouldBeDeleted) throws IOException { + private void assertFilesCleanedUp(boolean filesShouldBeDeleted) throws IOException { try (Stream paths = Files.walk(hsDirectory.toPath())) { - final List jobFiles = + final List applicationOrJobFiles = paths.filter(path -> !path.equals(hsDirectory.toPath())) .map(path -> hsDirectory.toPath().relativize(path)) .filter(path -> !path.equals(Paths.get("config.json"))) .filter(path -> !path.equals(Paths.get("jobs"))) .filter(path -> !path.equals(Paths.get("jobs", "overview.json"))) .filter(path -> !path.equals(Paths.get("overviews"))) + .filter(path -> !path.equals(Paths.get("applications"))) + .filter( + path -> + !path.equals( + Paths.get("applications", "overview.json"))) + .filter(path -> !path.equals(Paths.get("application-overviews"))) .collect(Collectors.toList()); - if (jobFilesShouldBeDeleted) { - assertThat(jobFiles).isEmpty(); + if (filesShouldBeDeleted) { + assertThat(applicationOrJobFiles).isEmpty(); } else { - assertThat(jobFiles).isNotEmpty(); + assertThat(applicationOrJobFiles).isNotEmpty(); } } } @@ -413,6 +457,13 @@ private void waitForArchivesCreation(int numJobs) throws InterruptedException { } private Configuration createTestConfiguration(boolean cleanupExpiredJobs) { + return createTestConfiguration( + cleanupExpiredJobs, + HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS.defaultValue()); + } + + private Configuration createTestConfiguration( + boolean cleanupExpiredJobs, boolean cleanupExpiredApplications) { Configuration historyServerConfig = new Configuration(); historyServerConfig.set( HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, jmDirectory.toURI().toString()); @@ -424,6 +475,9 @@ private Configuration createTestConfiguration(boolean cleanupExpiredJobs) { historyServerConfig.set( HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS, cleanupExpiredJobs); + historyServerConfig.set( + HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS, + cleanupExpiredApplications); historyServerConfig.set(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, 0); return historyServerConfig; @@ -451,15 +505,15 @@ private static void runJob() throws Exception { env.execute(); } - private static String createLegacyArchive( + private static JobID createLegacyArchive( Path directory, long fileModifiedDate, boolean versionLessThan14) throws IOException { - String jobId = createLegacyArchive(directory, versionLessThan14); - File jobArchive = directory.resolve(jobId).toFile(); + JobID jobId = createLegacyArchive(directory, versionLessThan14); + File jobArchive = directory.resolve(jobId.toString()).toFile(); jobArchive.setLastModified(fileModifiedDate); return jobId; } - private static String createLegacyArchive(Path directory, boolean versionLessThan14) + private static JobID createLegacyArchive(Path directory, boolean versionLessThan14) throws IOException { JobID jobId = JobID.generate(); @@ -504,7 +558,405 @@ private static String createLegacyArchive(Path directory, boolean versionLessTha directory.toAbsolutePath().toString(), jobId.toString()), Collections.singleton(archivedJson)); - return jobId.toString(); + return jobId; + } + + @Test + void testApplicationAndJobArchives() throws Exception { + int numApplications = 2; + int numJobsPerApplication = 2; + // jobs that are not part of an application + int numJobsOutsideApplication = 1; + + Map> expectedApplicationAndJobIds = + new HashMap<>(numApplications); + for (int i = 0; i < numApplications; i++) { + ArchivedApplication archivedApplication = mockApplicationArchive(numJobsPerApplication); + ApplicationID applicationId = archivedApplication.getApplicationId(); + List jobIds = + archivedApplication.getJobs().values().stream() + .map(ExecutionGraphInfo::getJobId) + .collect(Collectors.toList()); + expectedApplicationAndJobIds.put(applicationId, new HashSet<>(jobIds)); + } + Set expectedJobIdsOutsideApplication = new HashSet<>(numJobsOutsideApplication); + for (int i = 0; i < numJobsOutsideApplication; i++) { + ExecutionGraphInfo executionGraphInfo = createExecutionGraphInfo(); + mockJobArchive(executionGraphInfo, null); + expectedJobIdsOutsideApplication.add(executionGraphInfo.getJobId()); + } + + int numTotalJobs = numApplications * numJobsPerApplication + numJobsOutsideApplication; + int numTotal = numApplications + numTotalJobs; + CountDownLatch numExpectedArchives = new CountDownLatch(numTotal); + Configuration historyServerConfig = createTestConfiguration(false); + HistoryServer hs = + new HistoryServer( + historyServerConfig, + (event) -> { + if (event.getType() + == HistoryServerArchiveFetcher.ArchiveEventType.CREATED) { + numExpectedArchives.countDown(); + } + }, + (event) -> { + if (event.getType() + == HistoryServerApplicationArchiveFetcher.ArchiveEventType + .CREATED) { + numExpectedArchives.countDown(); + } + }); + try { + hs.start(); + String baseUrl = "http://localhost:" + hs.getWebPort(); + assertThat(numExpectedArchives.await(10L, TimeUnit.SECONDS)).isTrue(); + assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl)) + .isEqualTo(expectedApplicationAndJobIds); + Set expectedJobIds = + Stream.concat( + expectedApplicationAndJobIds.values().stream() + .flatMap(Set::stream), + expectedJobIdsOutsideApplication.stream()) + .collect(Collectors.toSet()); + assertThat(getIdsFromJobOverview(baseUrl)).isEqualTo(expectedJobIds); + // checks whether the dashboard configuration contains all expected fields + getDashboardConfiguration(baseUrl); + } finally { + hs.stop(); + } + } + + @Test + void testRemoveApplicationArchivesBeyondHistorySizeLimit() throws Exception { + int numJobsPerApplication = 1; + int numApplicationsToKeepInHistory = 2; + int numApplicationsBeforeHsStarted = 4; + int numApplicationsAfterHsStarted = 2; + int numApplicationsToRemoveUponHsStart = + numApplicationsBeforeHsStarted - numApplicationsToKeepInHistory; + List>> expectedApplicationAndJobIdsToKeep = + new LinkedList<>(); + for (int i = 0; i < numApplicationsBeforeHsStarted; i++) { + ArchivedApplication archivedApplication = mockApplicationArchive(numJobsPerApplication); + ApplicationID applicationId = archivedApplication.getApplicationId(); + List jobIds = + archivedApplication.getJobs().values().stream() + .map(ExecutionGraphInfo::getJobId) + .collect(Collectors.toList()); + if (i >= numApplicationsToRemoveUponHsStart) { + expectedApplicationAndJobIdsToKeep.add( + new Tuple2<>(applicationId, new HashSet<>(jobIds))); + } + } + + // one for application itself, numJobsPerApplication for jobs + int numArchivesRatio = 1 + numJobsPerApplication; + CountDownLatch numArchivesCreatedInitially = + new CountDownLatch(numApplicationsToKeepInHistory * numArchivesRatio); + // jobs in applications that exceed the size limit are not read by the fetcher at all, + // so there is no need to delete these jobs. + CountDownLatch numArchivesDeletedInitially = + new CountDownLatch(numApplicationsToRemoveUponHsStart); + CountDownLatch numArchivesCreatedTotal = + new CountDownLatch( + (numApplicationsBeforeHsStarted + - numApplicationsToRemoveUponHsStart + + numApplicationsAfterHsStarted) + * numArchivesRatio); + CountDownLatch numArchivesDeletedTotal = + new CountDownLatch( + numApplicationsToRemoveUponHsStart + + numApplicationsAfterHsStarted * numArchivesRatio); + Configuration historyServerConfig = + createTestConfiguration( + HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS.defaultValue()); + historyServerConfig.set( + HistoryServerOptions.HISTORY_SERVER_RETAINED_APPLICATIONS, + numApplicationsToKeepInHistory); + HistoryServer hs = + new HistoryServer( + historyServerConfig, + (event) -> { + throw new RuntimeException("Should not call"); + }, + (event) -> { + if (event.getType() + == HistoryServerApplicationArchiveFetcher.ArchiveEventType + .CREATED) { + numArchivesCreatedInitially.countDown(); + numArchivesCreatedTotal.countDown(); + } else if (event.getType() + == HistoryServerApplicationArchiveFetcher.ArchiveEventType + .DELETED) { + numArchivesDeletedInitially.countDown(); + numArchivesDeletedTotal.countDown(); + } + }); + try { + hs.start(); + String baseUrl = "http://localhost:" + hs.getWebPort(); + assertThat(numArchivesCreatedInitially.await(10L, TimeUnit.SECONDS)).isTrue(); + assertThat(numArchivesDeletedInitially.await(10L, TimeUnit.SECONDS)).isTrue(); + assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl)) + .isEqualTo( + expectedApplicationAndJobIdsToKeep.stream() + .collect( + Collectors.toMap( + tuple -> tuple.f0, tuple -> tuple.f1))); + for (int i = numApplicationsBeforeHsStarted; + i < numApplicationsBeforeHsStarted + numApplicationsAfterHsStarted; + i++) { + expectedApplicationAndJobIdsToKeep.remove(0); + ArchivedApplication archivedApplication = + mockApplicationArchive(numJobsPerApplication); + ApplicationID applicationId = archivedApplication.getApplicationId(); + List jobIds = + archivedApplication.getJobs().values().stream() + .map(ExecutionGraphInfo::getJobId) + .collect(Collectors.toList()); + expectedApplicationAndJobIdsToKeep.add( + new Tuple2<>(applicationId, new HashSet<>(jobIds))); + // avoid executing too fast, resulting in the same creation time of archive files + Thread.sleep(50); + } + + assertThat(numArchivesCreatedTotal.await(10L, TimeUnit.SECONDS)).isTrue(); + assertThat(numArchivesDeletedTotal.await(10L, TimeUnit.SECONDS)).isTrue(); + assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl)) + .isEqualTo( + expectedApplicationAndJobIdsToKeep.stream() + .collect( + Collectors.toMap( + tuple -> tuple.f0, tuple -> tuple.f1))); + } finally { + hs.stop(); + } + } + + @Test + void testFailIfApplicationHistorySizeLimitIsZero() { + assertThatThrownBy(() -> startHistoryServerWithApplicationSizeLimit(0)) + .isInstanceOf(IllegalConfigurationException.class); + } + + @Test + void testFailIfApplicationHistorySizeLimitIsLessThanMinusOne() { + assertThatThrownBy(() -> startHistoryServerWithApplicationSizeLimit(-2)) + .isInstanceOf(IllegalConfigurationException.class); + } + + private void startHistoryServerWithApplicationSizeLimit(int maxHistorySize) + throws IOException, FlinkException, InterruptedException { + Configuration historyServerConfig = + createTestConfiguration( + HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS + .defaultValue()); + historyServerConfig.set( + HistoryServerOptions.HISTORY_SERVER_RETAINED_APPLICATIONS, maxHistorySize); + new HistoryServer(historyServerConfig).start(); + } + + @Test + void testCleanExpiredApplication() throws Exception { + runApplicationArchiveExpirationTest(true); + } + + @Test + void testRemainExpiredApplication() throws Exception { + runApplicationArchiveExpirationTest(false); + } + + private void runApplicationArchiveExpirationTest(boolean cleanupExpiredApplications) + throws Exception { + int numExpiredApplications = cleanupExpiredApplications ? 1 : 0; + int numApplications = 3; + int numJobsPerApplication = 1; + + Map> expectedApplicationAndJobIds = + new HashMap<>(numApplications); + for (int i = 0; i < numApplications; i++) { + ArchivedApplication archivedApplication = mockApplicationArchive(numJobsPerApplication); + ApplicationID applicationId = archivedApplication.getApplicationId(); + List jobIds = + archivedApplication.getJobs().values().stream() + .map(ExecutionGraphInfo::getJobId) + .collect(Collectors.toList()); + expectedApplicationAndJobIds.put(applicationId, new HashSet<>(jobIds)); + } + + // one for application itself, numJobsPerApplication for jobs + int numArchivesRatio = 1 + numJobsPerApplication; + CountDownLatch numExpectedArchives = new CountDownLatch(numApplications * numArchivesRatio); + CountDownLatch firstArchiveExpiredLatch = + new CountDownLatch(numExpiredApplications * numArchivesRatio); + CountDownLatch allArchivesExpiredLatch = + new CountDownLatch( + cleanupExpiredApplications ? numApplications * numArchivesRatio : 0); + + Configuration historyServerConfig = + createTestConfiguration( + HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS.defaultValue(), + cleanupExpiredApplications); + HistoryServer hs = + new HistoryServer( + historyServerConfig, + (event) -> { + throw new RuntimeException("Should not call"); + }, + (event) -> { + if (event.getType() + == HistoryServerApplicationArchiveFetcher.ArchiveEventType + .CREATED) { + numExpectedArchives.countDown(); + } else if (event.getType() + == HistoryServerApplicationArchiveFetcher.ArchiveEventType + .DELETED) { + firstArchiveExpiredLatch.countDown(); + allArchivesExpiredLatch.countDown(); + } + }); + try { + hs.start(); + String baseUrl = "http://localhost:" + hs.getWebPort(); + assertThat(numExpectedArchives.await(10L, TimeUnit.SECONDS)).isTrue(); + assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl)) + .isEqualTo(expectedApplicationAndJobIds); + ApplicationID applicationIdToDelete = + expectedApplicationAndJobIds.keySet().stream() + .findFirst() + .orElseThrow( + () -> + new IllegalStateException( + "Expected at least one application")); + if (cleanupExpiredApplications) { + expectedApplicationAndJobIds.remove(applicationIdToDelete); + } + // trigger another fetch and delete one archive from jm + // we fetch again to probabilistically cause a concurrent deletion + hs.fetchArchives(); + deleteApplicationArchiveDir(applicationIdToDelete); + + assertThat(firstArchiveExpiredLatch.await(10L, TimeUnit.SECONDS)).isTrue(); + // check that archive is still/no longer present in hs + assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl)) + .isEqualTo(expectedApplicationAndJobIds); + for (ApplicationID remainingApplicationId : expectedApplicationAndJobIds.keySet()) { + deleteApplicationArchiveDir(remainingApplicationId); + } + assertThat(allArchivesExpiredLatch.await(10L, TimeUnit.SECONDS)).isTrue(); + assertFilesCleanedUp(cleanupExpiredApplications); + } finally { + hs.stop(); + } + } + + private Map> getApplicationAndJobIdsFromApplicationOverview( + String baseUrl) throws Exception { + Set applicationIds = + getApplicationsOverview(baseUrl).getApplications().stream() + .map(ApplicationDetails::getApplicationId) + .collect(Collectors.toSet()); + Map> applicationAndJobIds = new HashMap<>(applicationIds.size()); + for (ApplicationID applicationId : applicationIds) { + Set jobIds = + getApplicationDetails(baseUrl, applicationId).getJobs().stream() + .map(JobDetails::getJobId) + .collect(Collectors.toSet()); + applicationAndJobIds.put(applicationId, jobIds); + } + return applicationAndJobIds; + } + + private static MultipleApplicationsDetails getApplicationsOverview(String baseUrl) + throws Exception { + Tuple2 response = + HttpUtils.getFromHTTP(baseUrl + ApplicationsOverviewHeaders.URL); + return OBJECT_MAPPER.readValue(response.f1, MultipleApplicationsDetails.class); + } + + private static ApplicationDetailsInfo getApplicationDetails( + String baseUrl, ApplicationID applicationId) throws Exception { + Tuple2 response = + HttpUtils.getFromHTTP( + baseUrl + + ApplicationDetailsHeaders.URL.replace( + ':' + ApplicationIDPathParameter.KEY, + applicationId.toString())); + return OBJECT_MAPPER.readValue(response.f1, ApplicationDetailsInfo.class); + } + + private ArchivedApplication mockApplicationArchive(int numJobs) throws IOException { + ArchivedApplication archivedApplication = createArchivedApplication(numJobs); + ApplicationID applicationId = archivedApplication.getApplicationId(); + ArchivedJson archivedApplicationsOverview = + new ArchivedJson( + ApplicationsOverviewHeaders.URL, + new MultipleApplicationsDetails( + Collections.singleton( + ApplicationDetails.fromArchivedApplication( + archivedApplication)))); + ArchivedJson archivedApplicationDetails = + new ArchivedJson( + ApplicationDetailsHeaders.URL.replace( + ':' + ApplicationIDPathParameter.KEY, applicationId.toString()), + ApplicationDetailsInfo.fromArchivedApplication(archivedApplication)); + // set cluster id to application id to simplify the test + clusterConfig.set(ClusterOptions.CLUSTER_ID, applicationId.toString()); + FsJsonArchivist.writeArchivedJsons( + ArchivePathUtils.getApplicationArchivePath(clusterConfig, applicationId), + Arrays.asList(archivedApplicationsOverview, archivedApplicationDetails)); + + Map jobs = archivedApplication.getJobs(); + for (Map.Entry jobEntry : jobs.entrySet()) { + mockJobArchive(jobEntry.getValue(), applicationId); + } + return archivedApplication; + } + + private void mockJobArchive( + ExecutionGraphInfo executionGraphInfo, @Nullable ApplicationID applicationId) + throws IOException { + JobID jobId = executionGraphInfo.getJobId(); + ArchivedJson archivedJobsOverview = + new ArchivedJson( + JobsOverviewHeaders.URL, + new MultipleJobsDetails( + Collections.singleton( + JobDetails.createDetailsForJob( + executionGraphInfo.getArchivedExecutionGraph())))); + FsJsonArchivist.writeArchivedJsons( + ArchivePathUtils.getJobArchivePath(clusterConfig, jobId, applicationId), + Collections.singletonList(archivedJobsOverview)); + } + + private ArchivedApplication createArchivedApplication(int numJobs) { + ApplicationID applicationId = ApplicationID.generate(); + Map jobs = new HashMap<>(numJobs); + for (int i = 0; i < numJobs; i++) { + ExecutionGraphInfo executionGraphInfo = createExecutionGraphInfo(); + jobs.put(executionGraphInfo.getJobId(), executionGraphInfo); + } + return new ArchivedApplication( + applicationId, + "test-application", + ApplicationState.FINISHED, + new long[ApplicationState.values().length], + jobs); + } + + private ExecutionGraphInfo createExecutionGraphInfo() { + return new ExecutionGraphInfo( + ArchivedExecutionGraph.createSparseArchivedExecutionGraph( + JobID.generate(), "test-job", JobStatus.FINISHED, null, null, null, 0)); + } + + private void deleteApplicationArchiveDir(ApplicationID applicationId) throws IOException { + // set cluster id to application id to simplify the test + clusterConfig.set(ClusterOptions.CLUSTER_ID, applicationId.toString()); + org.apache.flink.core.fs.Path applicationArchiveDir = + ArchivePathUtils.getApplicationArchivePath(clusterConfig, applicationId) + .getParent(); + applicationArchiveDir.getFileSystem().delete(applicationArchiveDir, true); } private static final class JsonObject implements AutoCloseable { diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategyTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategyTest.java similarity index 64% rename from flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategyTest.java rename to flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategyTest.java index e8983967df537..d1d6124b3570f 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategyTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategyTest.java @@ -18,31 +18,49 @@ package org.apache.flink.runtime.webmonitor.history.retaining; +import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.Path; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import java.time.Duration; import java.time.Instant; +import java.util.function.Function; +import java.util.stream.Stream; +import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_APPLICATIONS; import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS; import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_TTL; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** Testing for {@link CompositeJobRetainedStrategy}. */ -class CompositeJobRetainedStrategyTest { +/** Testing for {@link CompositeArchiveRetainedStrategy}. */ +class CompositeArchiveRetainedStrategyTest { + + private static Stream getTestCases() { + return Stream.of( + new TestCase( + "Legacy Jobs", + HISTORY_SERVER_RETAINED_JOBS, + CompositeArchiveRetainedStrategy::createForJobFromConfig), + new TestCase( + "Applications", + HISTORY_SERVER_RETAINED_APPLICATIONS, + CompositeArchiveRetainedStrategy::createForApplicationFromConfig)); + } - @Test - void testTimeToLiveBasedJobRetainedStrategy() { + @ParameterizedTest(name = "{index}: {0}") + @MethodSource("getTestCases") + void testTimeToLiveBasedArchiveRetainedStrategy(TestCase testCase) { final Configuration conf = new Configuration(); // Test for invalid option value. conf.set(HISTORY_SERVER_RETAINED_TTL, Duration.ZERO); - assertThatThrownBy(() -> CompositeJobRetainedStrategy.createFrom(conf)) + assertThatThrownBy(() -> testCase.createStrategy(conf)) .isInstanceOf(IllegalConfigurationException.class); // Skipped for option value that is less than 0 milliseconds, which will throw a // java.lang.NumberFormatException caused by TimeUtils. @@ -51,7 +69,7 @@ void testTimeToLiveBasedJobRetainedStrategy() { // Test the case where no specific retention policy is configured, i.e., all archived files // are retained. - JobRetainedStrategy strategy = CompositeJobRetainedStrategy.createFrom(conf); + ArchiveRetainedStrategy strategy = testCase.createStrategy(conf); assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue(); assertThat( strategy.shouldRetain( @@ -63,7 +81,7 @@ void testTimeToLiveBasedJobRetainedStrategy() { // Test the case where TTL-based retention policies is specified only. conf.set(HISTORY_SERVER_RETAINED_TTL, Duration.ofMinutes(1L)); - strategy = CompositeJobRetainedStrategy.createFrom(conf); + strategy = testCase.createStrategy(conf); assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue(); assertThat( strategy.shouldRetain( @@ -74,35 +92,37 @@ void testTimeToLiveBasedJobRetainedStrategy() { .isFalse(); } - @Test - void testQuantityBasedJobRetainedStrategy() { + @ParameterizedTest(name = "{index}: {0}") + @MethodSource("getTestCases") + void testQuantityBasedArchiveRetainedStrategy(TestCase testCase) { final Configuration conf = new Configuration(); // Test for invalid option value. - conf.set(HISTORY_SERVER_RETAINED_JOBS, 0); - assertThatThrownBy(() -> CompositeJobRetainedStrategy.createFrom(conf)) + conf.set(testCase.getQuantityConfigOption(), 0); + assertThatThrownBy(() -> testCase.createStrategy(conf)) .isInstanceOf(IllegalConfigurationException.class); - conf.set(HISTORY_SERVER_RETAINED_JOBS, -2); - assertThatThrownBy(() -> CompositeJobRetainedStrategy.createFrom(conf)) + conf.set(testCase.getQuantityConfigOption(), -2); + assertThatThrownBy(() -> testCase.createStrategy(conf)) .isInstanceOf(IllegalConfigurationException.class); - conf.removeConfig(HISTORY_SERVER_RETAINED_JOBS); + conf.removeConfig(testCase.getQuantityConfigOption()); // Test the case where no specific retention policy is configured, i.e., all archived files // are retained. - JobRetainedStrategy strategy = CompositeJobRetainedStrategy.createFrom(conf); + ArchiveRetainedStrategy strategy = testCase.createStrategy(conf); assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue(); assertThat(strategy.shouldRetain(new TestingFileStatus(), 3)).isTrue(); // Test the case where QUANTITY-based retention policies is specified only. - conf.set(HISTORY_SERVER_RETAINED_JOBS, 2); - strategy = CompositeJobRetainedStrategy.createFrom(conf); + conf.set(testCase.getQuantityConfigOption(), 2); + strategy = testCase.createStrategy(conf); assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue(); assertThat(strategy.shouldRetain(new TestingFileStatus(), 3)).isFalse(); } - @Test - void testCompositeBasedJobRetainedStrategy() { + @ParameterizedTest(name = "{index}: {0}") + @MethodSource("getTestCases") + void testCompositeBasedArchiveRetainedStrategy(TestCase testCase) { final long outOfTtlMillis = Instant.now().toEpochMilli() - Duration.ofMinutes(2L).toMillis(); @@ -110,7 +130,7 @@ void testCompositeBasedJobRetainedStrategy() { // Test the case where no specific retention policy is configured, i.e., all archived files // are retained. final Configuration conf = new Configuration(); - JobRetainedStrategy strategy = CompositeJobRetainedStrategy.createFrom(conf); + ArchiveRetainedStrategy strategy = testCase.createStrategy(conf); assertThat(strategy.shouldRetain(new TestingFileStatus(outOfTtlMillis), 1)).isTrue(); assertThat(strategy.shouldRetain(new TestingFileStatus(), 10)).isTrue(); assertThat(strategy.shouldRetain(new TestingFileStatus(outOfTtlMillis), 3)).isTrue(); @@ -118,8 +138,8 @@ void testCompositeBasedJobRetainedStrategy() { // Test the case where both retention policies are specified. conf.set(HISTORY_SERVER_RETAINED_TTL, Duration.ofMinutes(1)); - conf.set(HISTORY_SERVER_RETAINED_JOBS, 2); - strategy = CompositeJobRetainedStrategy.createFrom(conf); + conf.set(testCase.getQuantityConfigOption(), 2); + strategy = testCase.createStrategy(conf); assertThat(strategy.shouldRetain(new TestingFileStatus(outOfTtlMillis), 1)).isFalse(); assertThat(strategy.shouldRetain(new TestingFileStatus(), 10)).isFalse(); assertThat(strategy.shouldRetain(new TestingFileStatus(outOfTtlMillis), 3)).isFalse(); @@ -173,4 +193,32 @@ public Path getPath() { return null; } } + + private static final class TestCase { + private final String testName; + private final ConfigOption quantityConfigOption; + private final Function strategyFunction; + + TestCase( + String testName, + ConfigOption quantityConfigOption, + Function strategyFunction) { + this.testName = testName; + this.quantityConfigOption = quantityConfigOption; + this.strategyFunction = strategyFunction; + } + + ArchiveRetainedStrategy createStrategy(Configuration conf) { + return strategyFunction.apply(conf); + } + + ConfigOption getQuantityConfigOption() { + return quantityConfigOption; + } + + @Override + public String toString() { + return testName; + } + } }