From 35220bb7caaf6e1ad05ce106e23cac1551ad3654 Mon Sep 17 00:00:00 2001 From: Allison Chang Date: Tue, 19 Sep 2023 17:43:57 +0000 Subject: [PATCH 1/2] [FLINK-37155] [Runtime/Coordination] Implementing FLIP-505 for Flink History Server scalability improvements --- .../history_server_configuration.html | 12 + .../configuration/HistoryServerOptions.java | 36 ++ .../webmonitor/history/HistoryServer.java | 42 +- .../history/HistoryServerArchiveFetcher.java | 246 ++++++--- .../HistoryServerStaticFileServerHandler.java | 56 +- .../HistoryServerArchiveFetcherTest.java | 485 ++++++++++++++++++ ...toryServerStaticFileServerHandlerTest.java | 106 +++- .../webmonitor/history/HistoryServerTest.java | 245 +++++++-- .../utils/WebFrontendBootstrapTest.java | 23 +- .../job-list/job-list.component.html | 2 +- .../components/job-list/job-list.component.ts | 1 + .../legacy/files/StaticFileServerHandler.java | 2 +- 12 files changed, 1142 insertions(+), 114 deletions(-) create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcherTest.java diff --git a/docs/layouts/shortcodes/generated/history_server_configuration.html b/docs/layouts/shortcodes/generated/history_server_configuration.html index 3f80eb1fbdf1b..6395f7a5b998c 100644 --- a/docs/layouts/shortcodes/generated/history_server_configuration.html +++ b/docs/layouts/shortcodes/generated/history_server_configuration.html @@ -8,6 +8,12 @@ + +
historyserver.archive.cached-retained-jobs
+ (none) + Integer + The maximum number of n latest jobs to retain in the local directory defined by `historyserver.web.tmpdir`. If this configuration is provided, the remote and local storage of job archives will be decoupled.If set to `0` or less than `-1` HistoryServer will throw an IllegalConfigurationException. +
historyserver.archive.clean-expired-jobs
false @@ -26,6 +32,12 @@ Duration Interval for refreshing the archived job directories. + +
historyserver.archive.num-cached-most-recently-viewed-jobs
+ 1 + Integer + The maximum number of jobs to retain in the local cache defined by `historyserver.web.tmpdir` which stores the job archives that are fetched from the remote storage. This limit is distinct from the number of most recent jobs which will in the cache.The total cache size is a combination of the number of remote cache jobs and the number of remote fetch cached jobs and retained cache jobs.If set to less than `0` HistoryServer will throw an IllegalConfigurationException. +
historyserver.archive.retained-jobs
-1 diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java index 0ead9cdeed1a6..235424c6dff1a 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java @@ -126,6 +126,23 @@ public class HistoryServerOptions { "Enable HTTPs access to the HistoryServer web frontend. This is applicable only when the" + " global SSL flag security.ssl.enabled is set to true."); + public static final ConfigOption HISTORY_SERVER_CACHED_JOBS = + key("historyserver.archive.cached-retained-jobs") + .intType() + .noDefaultValue() + .withDescription( + Description.builder() + .text( + String.format( + "The maximum number of n latest jobs to retain in the local directory defined by `%s`. ", + HISTORY_SERVER_WEB_DIR.key())) + .text( + "If this configuration is provided, the remote and local storage of job archives will be decoupled.") + .text( + "If set to `0` or less than `-1` HistoryServer will throw an %s. ", + code("IllegalConfigurationException")) + .build()); + public static final ConfigOption HISTORY_SERVER_RETAINED_JOBS = key("historyserver.archive.retained-jobs") .intType() @@ -143,5 +160,24 @@ public class HistoryServerOptions { code("IllegalConfigurationException")) .build()); + public static final ConfigOption HISTORY_SERVER_NUM_CACHED_MOST_RECENTLY_VIEWED_JOBS = + key("historyserver.archive.num-cached-most-recently-viewed-jobs") + .intType() + .defaultValue(1) + .withDescription( + Description.builder() + .text( + String.format( + "The maximum number of jobs to retain in the local cache defined by `%s` which " + + "stores the job archives that are fetched from the remote storage. This " + + "limit is distinct from the number of most recent jobs which will in the cache." + + "The total cache size is a combination of the number of remote cache jobs and " + + "the number of remote fetch cached jobs and retained cache jobs.", + HISTORY_SERVER_WEB_DIR.key())) + .text( + "If set to less than `0` HistoryServer will throw an %s. ", + code("IllegalConfigurationException")) + .build()); + private HistoryServerOptions() {} } diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java index 00b96e0bbf3d2..c1845dd9cbc3c 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java @@ -112,6 +112,8 @@ public class HistoryServer { private final HistoryServerArchiveFetcher archiveFetcher; + private final HistoryServerStaticFileServerHandler staticFileServerHandler; + @Nullable private final SSLHandlerFactory serverSSLFactory; private WebFrontendBootstrap netty; @@ -215,12 +217,12 @@ public HistoryServer( throw new FlinkException( HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS + " was not configured."); } - List refreshDirs = new ArrayList<>(); + List refreshDirs = new ArrayList<>(); for (String refreshDirectory : refreshDirectories.split(",")) { try { Path refreshPath = new Path(refreshDirectory); FileSystem refreshFS = refreshPath.getFileSystem(); - refreshDirs.add(new RefreshLocation(refreshPath, refreshFS)); + refreshDirs.add(new HistoryServer.RefreshLocation(refreshPath, refreshFS)); } catch (Exception e) { // there's most likely something wrong with the path itself, so we ignore it from // here on @@ -244,13 +246,41 @@ public HistoryServer( "Cannot set %s to 0 or less than -1", HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS.key()); } + boolean remoteFetchEnabled = + config.contains(HistoryServerOptions.HISTORY_SERVER_CACHED_JOBS); + + int generalCachedJobSize = -1; + if (remoteFetchEnabled) { + generalCachedJobSize = config.get(HistoryServerOptions.HISTORY_SERVER_CACHED_JOBS); + if (generalCachedJobSize == 0 || generalCachedJobSize < -1) { + throw new IllegalConfigurationException( + "Cannot set %s to 0 or less than -1", + HistoryServerOptions.HISTORY_SERVER_CACHED_JOBS.key()); + } + } + int numCachedMostRecentlyViewedJobs = + config.get( + HistoryServerOptions.HISTORY_SERVER_NUM_CACHED_MOST_RECENTLY_VIEWED_JOBS); + if (numCachedMostRecentlyViewedJobs <= 0) { + throw new IllegalConfigurationException( + "Cannot set %s to less than 0", + HistoryServerOptions.HISTORY_SERVER_NUM_CACHED_MOST_RECENTLY_VIEWED_JOBS.key()); + } + archiveFetcher = new HistoryServerArchiveFetcher( refreshDirs, webDir, jobArchiveEventListener, cleanupExpiredArchives, - maxHistorySize); + maxHistorySize, + generalCachedJobSize, + remoteFetchEnabled, + numCachedMostRecentlyViewedJobs); + + staticFileServerHandler = + new HistoryServerStaticFileServerHandler( + webDir, remoteFetchEnabled, archiveFetcher); this.shutdownHook = ShutdownHookUtil.addShutdownHook( @@ -310,7 +340,7 @@ void start() throws IOException, InterruptedException { new GeneratedLogUrlHandler( CompletableFuture.completedFuture(pattern)))); - router.addGet("/:*", new HistoryServerStaticFileServerHandler(webDir)); + router.addGet("/:*", staticFileServerHandler); createDashboardConfigFile(); @@ -393,11 +423,11 @@ private static String createConfigJson(DashboardConfiguration dashboardConfigura } /** Container for the {@link Path} and {@link FileSystem} of a refresh directory. */ - static class RefreshLocation { + public static class RefreshLocation { private final Path path; private final FileSystem fs; - private RefreshLocation(Path path, FileSystem fs) { + public RefreshLocation(Path path, FileSystem fs) { this.path = path; this.fs = fs; } diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java index a8d782354a05d..8def33a716f3e 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java @@ -53,6 +53,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -71,16 +72,16 @@ * HistoryServerOptions#HISTORY_SERVER_CLEANUP_EXPIRED_JOBS} or {@link * HistoryServerOptions#HISTORY_SERVER_RETAINED_JOBS}. */ -class HistoryServerArchiveFetcher { +public class HistoryServerArchiveFetcher { /** Possible job archive operations in history-server. */ public enum ArchiveEventType { /** Job archive was found in one refresh location and created in history server. */ CREATED, - /** - * Job archive was deleted from one of refresh locations and deleted from history server. - */ - DELETED + /** Job archive was deleted from one of cache locations. */ + DELETED, + /** Job archive was deleted from the remote location. */ + DELETED_FROM_REMOTE } /** Representation of job archive event. */ @@ -102,6 +103,21 @@ public ArchiveEventType getType() { } } + /** Maintains a LRU cache of a given capacity. */ + public class MostRecentlyViewedCache extends LinkedHashMap { + private int maxSize; + + public MostRecentlyViewedCache(int capacity) { + super(capacity, 0.75f, true); + this.maxSize = capacity; + } + + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return this.size() > maxSize; + } + } + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); private static final JsonFactory jacksonFactory = new JsonFactory(); @@ -113,31 +129,58 @@ public ArchiveEventType getType() { private final Consumer jobArchiveEventListener; private final boolean processExpiredArchiveDeletion; private final boolean processBeyondLimitArchiveDeletion; + private final boolean processBeyondLimitLocalCacheDeletion; private final int maxHistorySize; - /** Cache of all available jobs identified by their id. */ + /** + * Number of retained jobs in the local cache based on the asynchronous fetch, which is based on + * most recently run jobs. + */ + private final int generalCachedJobSize; + + private final boolean remoteFetchEnabled; + + /** Cache of all available jobs identified by their id and refresh directory. */ private final Map> cachedArchivesPerRefreshDirectory; + /** Cache of all available jobs by id. */ + private final Set cachedArchives; + + private final MostRecentlyViewedCache mostRecentlyViewedCache; + private final File webDir; private final File webJobDir; private final File webOverviewDir; - HistoryServerArchiveFetcher( + public HistoryServerArchiveFetcher( List refreshDirs, File webDir, Consumer jobArchiveEventListener, boolean cleanupExpiredArchives, - int maxHistorySize) + int maxHistorySize, + int generalCachedJobSize, + boolean remoteFetchEnabled, + int numCachedMostRecentlyViewedJobs) throws IOException { this.refreshDirs = checkNotNull(refreshDirs); this.jobArchiveEventListener = jobArchiveEventListener; this.processExpiredArchiveDeletion = cleanupExpiredArchives; this.maxHistorySize = maxHistorySize; + this.remoteFetchEnabled = remoteFetchEnabled; + if (this.remoteFetchEnabled) { + this.mostRecentlyViewedCache = + new MostRecentlyViewedCache(numCachedMostRecentlyViewedJobs); + } else { + this.mostRecentlyViewedCache = null; + } + this.generalCachedJobSize = generalCachedJobSize; this.processBeyondLimitArchiveDeletion = this.maxHistorySize > 0; + this.processBeyondLimitLocalCacheDeletion = generalCachedJobSize > 0; this.cachedArchivesPerRefreshDirectory = new HashMap<>(); for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { cachedArchivesPerRefreshDirectory.put(refreshDir.getPath(), new HashSet<>()); } + this.cachedArchives = new HashSet<>(); this.webDir = checkNotNull(webDir); this.webJobDir = new File(webDir, "jobs"); Files.createDirectories(webJobDir.toPath()); @@ -152,14 +195,59 @@ public ArchiveEventType getType() { } } + void fetchArchiveByJobId(String jobID) throws IOException { + if (remoteFetchEnabled) { + boolean jobIdFound = false; + if (cachedArchives.contains(jobID)) { + mostRecentlyViewedCache.put(jobID, jobID); + } else { + for (HistoryServer.RefreshLocation refreshLocation : refreshDirs) { + Path refreshDir = refreshLocation.getPath(); + Path jobArchivePath = new Path(refreshDir, jobID); + LOG.info( + String.format( + "Fetching JobId archive %s from %s.", jobID, jobArchivePath)); + if (jobArchivePath.getFileSystem().exists(jobArchivePath)) { + LOG.info("Processing archive {}.", jobArchivePath); + try { + processArchive(jobID, jobArchivePath); + cachedArchives.add(jobID); + cachedArchivesPerRefreshDirectory.get(refreshDir).add(jobID); + LOG.info("Processing archive {} finished.", jobArchivePath); + } catch (IOException e) { + LOG.error( + "Failure while fetching/processing job archive for job {}.", + jobID, + e); + deleteJobFiles(jobID); + throw e; + } + jobIdFound = true; + mostRecentlyViewedCache.put(jobID, jobID); + break; + } + } + if (!jobIdFound) { + LOG.warn("Unable to find archive in remote job archives for job {}.", jobID); + } + } + } else { + LOG.warn("Unable to fetch jobs from remote archives as this feature is not enabled"); + } + } + void fetchArchives() { try { LOG.debug("Starting archive fetching."); List events = new ArrayList<>(); - Map> jobsToRemove = new HashMap<>(); + Map> expiredJobsToRemove = new HashMap<>(); cachedArchivesPerRefreshDirectory.forEach( - (path, archives) -> jobsToRemove.put(path, new HashSet<>(archives))); - Map> archivesBeyondSizeLimit = new HashMap<>(); + (path, archives) -> expiredJobsToRemove.put(path, new HashSet<>(archives))); + + Set remoteJobArchivesToRemove = new HashSet<>(); + Map> cachedJobArchivesToRemove = new HashMap<>(); + + int generalCachedJobCount = 0; for (HistoryServer.RefreshLocation refreshLocation : refreshDirs) { Path refreshDir = refreshLocation.getPath(); LOG.debug("Checking archive directory {}.", refreshDir); @@ -172,7 +260,7 @@ void fetchArchives() { LOG.error("Failed to access job archive location for path {}.", refreshDir, e); // something went wrong, potentially due to a concurrent deletion // do not remove any jobs now; we will retry later - jobsToRemove.remove(refreshDir); + expiredJobsToRemove.remove(refreshDir); continue; } @@ -184,44 +272,87 @@ void fetchArchives() { continue; } - jobsToRemove.get(refreshDir).remove(jobID); + expiredJobsToRemove.get(refreshDir).remove(jobID); historySize++; - if (historySize > maxHistorySize && processBeyondLimitArchiveDeletion) { - archivesBeyondSizeLimit - .computeIfAbsent(refreshDir, ignored -> new HashSet<>()) - .add(jobArchivePath); - continue; + + if (!remoteFetchEnabled + || (remoteFetchEnabled + && !mostRecentlyViewedCache.containsKey(jobID))) { + generalCachedJobCount++; } - if (cachedArchivesPerRefreshDirectory.get(refreshDir).contains(jobID)) { - LOG.trace( - "Ignoring archive {} because it was already fetched.", - jobArchivePath); - } else { - LOG.info("Processing archive {}.", jobArchivePath); - try { - processArchive(jobID, jobArchivePath); - events.add(new ArchiveEvent(jobID, ArchiveEventType.CREATED)); - cachedArchivesPerRefreshDirectory.get(refreshDir).add(jobID); - LOG.info("Processing archive {} finished.", jobArchivePath); - } catch (IOException e) { - LOG.error( - "Failure while fetching/processing job archive for job {}.", - jobID, - e); - deleteJobFiles(jobID); + boolean processJob = true; + boolean remoteArchiveDeletion = + historySize > maxHistorySize && processBeyondLimitArchiveDeletion; + boolean localCacheDeletion = + generalCachedJobCount > generalCachedJobSize + && processBeyondLimitLocalCacheDeletion; + + if (remoteArchiveDeletion) { + remoteJobArchivesToRemove.add(jobArchivePath); + processJob = false; + } + + if (remoteArchiveDeletion || localCacheDeletion) { + if ((!remoteFetchEnabled + || (remoteFetchEnabled + && !mostRecentlyViewedCache.containsKey(jobID))) + && cachedArchives.contains(jobID)) { + cachedJobArchivesToRemove + .computeIfAbsent(refreshDir, ignored -> new HashSet<>()) + .add(jobID); + } + processJob = false; + } + + if (processJob) { + if (cachedArchives.contains(jobID)) { + LOG.trace( + "Ignoring archive {} because it was already fetched.", + jobArchivePath); + } else { + LOG.info("Processing archive {}.", jobArchivePath); + try { + processArchive(jobID, jobArchivePath); + events.add(new ArchiveEvent(jobID, ArchiveEventType.CREATED)); + cachedArchives.add(jobID); + cachedArchivesPerRefreshDirectory.get(refreshDir).add(jobID); + LOG.info("Processing archive {} finished.", jobArchivePath); + } catch (IOException e) { + LOG.error( + "Failure while fetching/processing job archive for job {}.", + jobID, + e); + deleteJobFiles(jobID); + } } } } } - if (jobsToRemove.values().stream().flatMap(Set::stream).findAny().isPresent() + // Remove any expired jobs before cleaning up any other local cached jobs + if (expiredJobsToRemove.values().stream().flatMap(Set::stream).findAny().isPresent() && processExpiredArchiveDeletion) { - events.addAll(cleanupExpiredJobs(jobsToRemove)); + events.addAll(cleanupCachedJobs(expiredJobsToRemove)); + } + + int numCurrentCachedMostRecentlyViewedJobs = + remoteFetchEnabled ? mostRecentlyViewedCache.size() : 0; + + boolean localCacheDeletion = + !cachedJobArchivesToRemove.isEmpty() + && cachedArchives.size() - numCurrentCachedMostRecentlyViewedJobs + > generalCachedJobSize + && processBeyondLimitLocalCacheDeletion; + boolean remoteArchiveDeletion = + !remoteJobArchivesToRemove.isEmpty() && processBeyondLimitArchiveDeletion; + + if (localCacheDeletion || remoteArchiveDeletion) { + events.addAll(cleanupCachedJobs(cachedJobArchivesToRemove)); } - if (!archivesBeyondSizeLimit.isEmpty() && processBeyondLimitArchiveDeletion) { - events.addAll(cleanupJobsBeyondSizeLimit(archivesBeyondSizeLimit)); + if (remoteArchiveDeletion) { + events.addAll(cleanupRemoteJobs(remoteJobArchivesToRemove)); } if (!events.isEmpty()) { updateJobOverview(webOverviewDir, webDir); @@ -233,8 +364,7 @@ void fetchArchives() { } } - private static FileStatus[] listArchives(FileSystem refreshFS, Path refreshDir) - throws IOException { + static FileStatus[] listArchives(FileSystem refreshFS, Path refreshDir) throws IOException { // contents of /:refreshDir FileStatus[] jobArchives = refreshFS.listStatus(refreshDir); if (jobArchives == null) { @@ -303,35 +433,31 @@ private void processArchive(String jobID, Path jobArchive) throws IOException { } } - private List cleanupJobsBeyondSizeLimit( - Map> jobArchivesToRemove) { - Map> allJobIdsToRemoveFromOverview = new HashMap<>(); + private List cleanupRemoteJobs(Set jobArchivesToRemove) { - for (Map.Entry> pathSetEntry : jobArchivesToRemove.entrySet()) { - HashSet jobIdsToRemoveFromOverview = new HashSet<>(); - - for (Path archive : pathSetEntry.getValue()) { - jobIdsToRemoveFromOverview.add(archive.getName()); - try { - archive.getFileSystem().delete(archive, false); - } catch (IOException ioe) { - LOG.warn("Could not delete old archive " + archive, ioe); - } + List deleteFromRemoteLog = new ArrayList<>(); + for (Path archive : jobArchivesToRemove) { + try { + archive.getFileSystem().delete(archive, false); + deleteFromRemoteLog.add( + new ArchiveEvent(archive.getName(), ArchiveEventType.DELETED_FROM_REMOTE)); + } catch (IOException ioe) { + LOG.warn("Could not delete old archive " + archive, ioe); } - allJobIdsToRemoveFromOverview.put(pathSetEntry.getKey(), jobIdsToRemoveFromOverview); } - - return cleanupExpiredJobs(allJobIdsToRemoveFromOverview); + return deleteFromRemoteLog; } - private List cleanupExpiredJobs(Map> jobsToRemove) { + private List cleanupCachedJobs(Map> jobsToRemove) { List deleteLog = new ArrayList<>(); - LOG.info("Archive directories for jobs {} were deleted.", jobsToRemove); jobsToRemove.forEach( (refreshDir, archivesToRemove) -> { - cachedArchivesPerRefreshDirectory.get(refreshDir).removeAll(archivesToRemove); + for (String jobID : archivesToRemove) { + cachedArchivesPerRefreshDirectory.get(refreshDir).remove(jobID); + cachedArchives.remove(jobID); + } }); jobsToRemove.values().stream() .flatMap(Set::stream) diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java index 40e5cb379d8d0..759574062b2f4 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java @@ -49,6 +49,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler; import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,8 +100,23 @@ public class HistoryServerStaticFileServerHandler /** The path in which the static documents are. */ private final File rootPath; - public HistoryServerStaticFileServerHandler(File rootPath) throws IOException { + private final HistoryServerArchiveFetcher archiveFetcher; + + private final boolean enableRemoteArchiveRefresh; + + public HistoryServerStaticFileServerHandler( + File rootPath, + boolean enableRemoteArchiveFetch, + HistoryServerArchiveFetcher historyServerArchiveFetcher) + throws IOException { this.rootPath = checkNotNull(rootPath).getCanonicalFile(); + this.enableRemoteArchiveRefresh = enableRemoteArchiveFetch; + + if (enableRemoteArchiveFetch) { + this.archiveFetcher = checkNotNull(historyServerArchiveFetcher); + } else { + this.archiveFetcher = null; + } } // ------------------------------------------------------------------------ @@ -138,7 +154,38 @@ private void respondWithFile(ChannelHandlerContext ctx, HttpRequest request, Str } // convert to absolute path - final File file = new File(rootPath, requestPath); + File file = new File(rootPath, requestPath); + + if (enableRemoteArchiveRefresh) { + // Attempt to fetch files from the remote file system or the local cache of archives + // that were retrieved from the remote file system. + if (!file.exists()) { + String jobID = retrieveJobId(requestPath); + String jobArchiveDirectoryPath = "/jobs/" + jobID; + + File localCacheJobArchivePath = new File(rootPath, jobArchiveDirectoryPath); + boolean fileExistsInCache = localCacheJobArchivePath.exists(); + boolean remoteFileFetched = false; + try { + // We only attempt to retrieve from the remote job archives if the + // jobID directory doesn't exist in the local job archive cache, which would + // indicate the job has already been fetched before. + if (!fileExistsInCache) { + archiveFetcher.fetchArchiveByJobId(jobID); + file = new File(rootPath, requestPath); + remoteFileFetched = true; + } + } catch (Throwable t) { + LOG.error("Error while responding.", t); + } finally { + if (!remoteFileFetched && !fileExistsInCache) { + LOG.debug( + "Unable to load requested file {} from remote directory", + requestPath); + } + } + } + } if (!file.exists()) { // file does not exist. Try to load it with the classloader @@ -279,4 +326,9 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { Collections.emptyMap()); } } + + private String retrieveJobId(String requestPath) { + String[] jobPath = requestPath.split("[\\./]"); + return jobPath.length >= 2 ? jobPath[2] : StringUtils.EMPTY; + } } diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcherTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcherTest.java new file mode 100644 index 0000000000000..d91b62562ae00 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcherTest.java @@ -0,0 +1,485 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.history; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; + +import org.apache.commons.io.FileUtils; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for the HistoryServerArchiveFetcher. */ +public class HistoryServerArchiveFetcherTest { + + @Test + void testMostRecentlyViewedCacheEvictedWhenFull(@TempDir Path tmpDir) throws IOException { + final Path webDir = Files.createDirectory(tmpDir.resolve("webDir")); + final Path tempDir = Files.createDirectory(tmpDir.resolve("tmpDir")); + final org.apache.flink.core.fs.Path tempDirFs = + new org.apache.flink.core.fs.Path(tempDir.toUri()); + + List refreshDirs = new ArrayList<>(); + FileSystem refreshFS = tempDirFs.getFileSystem(); + refreshDirs.add(new HistoryServer.RefreshLocation(tempDirFs, refreshFS)); + + int numJobs = 5; + int localCacheSize = 3; + int mostRecentlyViewedCacheSize = 1; + + HistoryServerArchiveFetcher archiveFetcher = + new HistoryServerArchiveFetcher( + refreshDirs, + webDir.toFile(), + (event) -> {}, + true, + -1, + localCacheSize, + true, + mostRecentlyViewedCacheSize); + + for (int i = 0; i < numJobs; i++) { + String jobID = JobID.generate().toString(); + buildRemoteStorageJobDirectory(tempDir, jobID); + } + + Map jobIdByCreation = + retrieveRemoteArchivesByModificationTime(refreshFS, tempDirFs); + + // Fetching the remote archives and storing them in the local cache asynchronously + archiveFetcher.fetchArchives(); + + // Confirming that the size of the local cache is based on the asynchronous fetch cache + // limit + assertThat(getNumJobsInGeneralCache(webDir)).isEqualTo(localCacheSize); + for (int i = 0; i < localCacheSize; i++) { + assertThat(findJobIdInGeneralCache(webDir, jobIdByCreation.get(i))).isTrue(); + } + + // Fetching the second to last job which was created. It should not be present in the + // local cache. + archiveFetcher.fetchArchiveByJobId(jobIdByCreation.get(3)); + + // Validating that the job has been fetched from the remote job archive and that the + // cache is still within the right size of localCacheSize + remoteFetchCacheSize; + assertThat(findJobIdInGeneralCache(webDir, jobIdByCreation.get(3))).isTrue(); + assertThat(getNumJobsInGeneralCache(webDir)) + .isEqualTo(localCacheSize + mostRecentlyViewedCacheSize); + + // Fetching another job via remote fetch + archiveFetcher.fetchArchiveByJobId(jobIdByCreation.get(4)); + + // Forcing a background refresh to evict the previously remotely fetched job + archiveFetcher.fetchArchives(); + + // Validating that the previously remotely fetched job has been evicted and the new + // job has taken its place + assertThat(findJobIdInGeneralCache(webDir, jobIdByCreation.get(4))).isTrue(); + assertThat(findJobIdInGeneralCache(webDir, jobIdByCreation.get(3))).isFalse(); + assertThat(getNumJobsInGeneralCache(webDir)) + .isEqualTo(localCacheSize + mostRecentlyViewedCacheSize); + } + + @Test + void testGeneralCacheEviction(@TempDir Path tmpDir) throws IOException, InterruptedException { + final Path webDir = Files.createDirectory(tmpDir.resolve("webDir")); + final Path tempDir = Files.createDirectory(tmpDir.resolve("tmpDir")); + final org.apache.flink.core.fs.Path tempDirFs = + new org.apache.flink.core.fs.Path(tempDir.toUri()); + + List refreshDirs = new ArrayList<>(); + FileSystem refreshFS = tempDirFs.getFileSystem(); + refreshDirs.add(new HistoryServer.RefreshLocation(tempDirFs, refreshFS)); + + int numJobs = 5; + int generalCacheSize = 3; + int mostRecentlyViewedCacheSize = 1; + + HistoryServerArchiveFetcher archiveFetcher = + new HistoryServerArchiveFetcher( + refreshDirs, + webDir.toFile(), + (event) -> {}, + true, + -1, + generalCacheSize, + true, + mostRecentlyViewedCacheSize); + + for (int i = 0; i < numJobs; i++) { + String jobID = JobID.generate().toString(); + buildRemoteStorageJobDirectory(tempDir, jobID); + } + + Map jobIdByCreation = + retrieveRemoteArchivesByModificationTime(refreshFS, tempDirFs); + + // Fetching the remote archives and storing them in the local cache + archiveFetcher.fetchArchives(); + + // Confirming that the size of the local cache is based on the asynchronous fetch cache + // limit + assertThat(getNumJobsInGeneralCache(webDir)).isEqualTo(generalCacheSize); + for (int i = 0; i < generalCacheSize; i++) { + assertThat(findJobIdInGeneralCache(webDir, jobIdByCreation.get(i))).isTrue(); + } + + // Adding another remote directory to indicate a newer job, we sleep for a short period + // of time to ensure a more recent timestamp + Thread.sleep(1000); + String latestJobID = JobID.generate().toString(); + buildRemoteStorageJobDirectory(tempDir, latestJobID); + + archiveFetcher.fetchArchives(); + + // Validating that the cache has not exceeded the async refresh cache limit and that + // the new job is cached + assertThat(getNumJobsInGeneralCache(webDir)).isEqualTo(generalCacheSize); + assertThat(findJobIdInGeneralCache(webDir, latestJobID)).isTrue(); + + // Validating that the oldest record was evicted from the cache + assertThat(findJobIdInGeneralCache(webDir, jobIdByCreation.get(2))).isFalse(); + } + + @Test + void testMostRecentlyViewedCacheNotEvictedByPeriodicFetch(@TempDir Path tmpDir) + throws IOException, InterruptedException { + final Path webDir = Files.createDirectory(tmpDir.resolve("webDir")); + final Path tempDir = Files.createDirectory(tmpDir.resolve("tmpDir")); + final org.apache.flink.core.fs.Path tempDirFs = + new org.apache.flink.core.fs.Path(tempDir.toUri()); + + List refreshDirs = new ArrayList<>(); + FileSystem refreshFS = tempDirFs.getFileSystem(); + refreshDirs.add(new HistoryServer.RefreshLocation(tempDirFs, refreshFS)); + + int numJobs = 5; + int generalCacheSize = 3; + int mostRecentlyViewedCacheSize = 1; + + HistoryServerArchiveFetcher archiveFetcher = + new HistoryServerArchiveFetcher( + refreshDirs, + webDir.toFile(), + (event) -> {}, + true, + -1, + generalCacheSize, + true, + mostRecentlyViewedCacheSize); + + for (int i = 0; i < numJobs; i++) { + String jobID = JobID.generate().toString(); + buildRemoteStorageJobDirectory(tempDir, jobID); + } + + Map jobIdByCreation = + retrieveRemoteArchivesByModificationTime(refreshFS, tempDirFs); + + // Fetching the remote archives and storing them in the local cache + archiveFetcher.fetchArchives(); + + // Confirming that the size of the local cache is based on the asynchronous fetch cache + // limit + assertThat(getNumJobsInGeneralCache(webDir)).isEqualTo(generalCacheSize); + for (int i = 0; i < generalCacheSize; i++) { + assertThat(findJobIdInGeneralCache(webDir, jobIdByCreation.get(i))).isTrue(); + } + + // Adding another remote directory to indicate a newer job, we sleep for a short period + // of time to ensure a more recent timestamp + Thread.sleep(1000); + String latestJobID = JobID.generate().toString(); + buildRemoteStorageJobDirectory(tempDir, latestJobID); + + // Fetching a job stored in the remote archive + archiveFetcher.fetchArchiveByJobId(jobIdByCreation.get(3)); + + // triggering an asynchronous fetch to update the cache + archiveFetcher.fetchArchives(); + + // Validating that the cache has not exceeded the async refresh cache limit and that + // the new job is cached and that the most recently viewed job is still present + assertThat(getNumJobsInGeneralCache(webDir)) + .isEqualTo(generalCacheSize + mostRecentlyViewedCacheSize); + assertThat(findJobIdInGeneralCache(webDir, latestJobID)).isTrue(); + assertThat(findJobIdInGeneralCache(webDir, jobIdByCreation.get(3))).isTrue(); + + // Validating that the oldest record was evicted from the cache + assertThat(findJobIdInGeneralCache(webDir, jobIdByCreation.get(2))).isFalse(); + } + + @Test + void testGeneralAndMostRecentlyViewedCacheEviction(@TempDir Path tmpDir) + throws IOException, InterruptedException { + final Path webDir = Files.createDirectory(tmpDir.resolve("webDir")); + final Path tempDir = Files.createDirectory(tmpDir.resolve("tmpDir")); + final org.apache.flink.core.fs.Path tempDirFs = + new org.apache.flink.core.fs.Path(tempDir.toUri()); + + List refreshDirs = new ArrayList<>(); + FileSystem refreshFS = tempDirFs.getFileSystem(); + refreshDirs.add(new HistoryServer.RefreshLocation(tempDirFs, refreshFS)); + + int numJobs = 5; + int generalCacheSize = 3; + int mostRecentlyViewedCacheSize = 1; + + HistoryServerArchiveFetcher archiveFetcher = + new HistoryServerArchiveFetcher( + refreshDirs, + webDir.toFile(), + (event) -> {}, + true, + -1, + generalCacheSize, + true, + mostRecentlyViewedCacheSize); + + for (int i = 0; i < numJobs; i++) { + String jobID = JobID.generate().toString(); + buildRemoteStorageJobDirectory(tempDir, jobID); + } + + Map jobIdByCreation = + retrieveRemoteArchivesByModificationTime(refreshFS, tempDirFs); + + // Fetching the remote archives and store them in the local cache + archiveFetcher.fetchArchives(); + + // Confirming that the size of the local cache is based on the asynchronous fetch cache + // limit + assertThat(getNumJobsInGeneralCache(webDir)).isEqualTo(generalCacheSize); + for (int i = 0; i < generalCacheSize; i++) { + assertThat(findJobIdInGeneralCache(webDir, jobIdByCreation.get(i))).isTrue(); + } + + // Fetching a job stored in the remote archive + archiveFetcher.fetchArchiveByJobId(jobIdByCreation.get(3)); + + // Validating that the cache size contains both the asynchronous and remote user fetch + // records + assertThat(getNumJobsInGeneralCache(webDir)) + .isEqualTo(generalCacheSize + mostRecentlyViewedCacheSize); + for (int i = 0; i < generalCacheSize + mostRecentlyViewedCacheSize; i++) { + assertThat(findJobIdInGeneralCache(webDir, jobIdByCreation.get(i))).isTrue(); + } + + // Adding another remote directory to indicate a newer job, we sleep for a short period + // of time to ensure a more recent timestamp + Thread.sleep(1000); + String latestJobID = JobID.generate().toString(); + buildRemoteStorageJobDirectory(tempDir, latestJobID); + + // We remotely fetch another older archive which will mark the last remote fetch archive + // for eviction and trigger an asynchronous fetch which will evict both the out of date + // remote fetch job and the oldest asynchronous fetch job + archiveFetcher.fetchArchiveByJobId(jobIdByCreation.get(4)); + archiveFetcher.fetchArchives(); + + // Validating that the correct remotely fetched job has been evicted + assertThat(findJobIdInGeneralCache(webDir, jobIdByCreation.get(4))).isTrue(); + assertThat(findJobIdInGeneralCache(webDir, jobIdByCreation.get(3))).isFalse(); + + // Validating that the cache does not exceed the total cache limit + assertThat(getNumJobsInGeneralCache(webDir)) + .isEqualTo(generalCacheSize + mostRecentlyViewedCacheSize); + assertThat(findJobIdInGeneralCache(webDir, latestJobID)).isTrue(); + + // Validating that the oldest record which was not remotely fetched was evicted from the + // cache + assertThat(findJobIdInGeneralCache(webDir, jobIdByCreation.get(2))).isFalse(); + } + + @Test + void testMostRecentlyViewedCacheUpdatesBasedOnFetchingByIdFromLocalCache(@TempDir Path tmpDir) + throws IOException, InterruptedException { + final Path webDir = Files.createDirectory(tmpDir.resolve("webDir")); + final Path tempDir = Files.createDirectory(tmpDir.resolve("tmpDir")); + final org.apache.flink.core.fs.Path tempDirFs = + new org.apache.flink.core.fs.Path(tempDir.toUri()); + + List refreshDirs = new ArrayList<>(); + FileSystem refreshFS = tempDirFs.getFileSystem(); + refreshDirs.add(new HistoryServer.RefreshLocation(tempDirFs, refreshFS)); + + int numJobs = 5; + int generalCacheSize = 3; + int mostRecentlyViewedCacheSize = 2; + + HistoryServerArchiveFetcher archiveFetcher = + new HistoryServerArchiveFetcher( + refreshDirs, + webDir.toFile(), + (event) -> {}, + true, + -1, + generalCacheSize, + true, + mostRecentlyViewedCacheSize); + + for (int i = 0; i < numJobs; i++) { + String jobID = JobID.generate().toString(); + buildRemoteStorageJobDirectory(tempDir, jobID); + } + + Map jobIdByCreation = + retrieveRemoteArchivesByModificationTime(refreshFS, tempDirFs); + + // Fetching the remote archives and store them in the local cache + archiveFetcher.fetchArchives(); + + // Confirming that the size of the local cache is based on the asynchronous fetch cache + // limit + assertThat(getNumJobsInGeneralCache(webDir)).isEqualTo(generalCacheSize); + for (int i = 0; i < generalCacheSize; i++) { + assertThat(findJobIdInGeneralCache(webDir, jobIdByCreation.get(i))).isTrue(); + } + + // Fetching a job stored in the remote archive + archiveFetcher.fetchArchiveByJobId(jobIdByCreation.get(3)); + + // Fetching a job that is in the local cache via direct request + archiveFetcher.fetchArchiveByJobId(jobIdByCreation.get(2)); + + // Validating that the cache size contains both the asynchronous and remote user fetch + // records + assertThat(getNumJobsInGeneralCache(webDir)).isEqualTo(generalCacheSize + 1); + for (int i = 0; i < generalCacheSize + 1; i++) { + assertThat(findJobIdInGeneralCache(webDir, jobIdByCreation.get(i))).isTrue(); + } + + // Adding another remote directory to indicate a newer job, we sleep for a short period + // of time to ensure a more recent timestamp + Thread.sleep(1000); + String latestJobID = JobID.generate().toString(); + buildRemoteStorageJobDirectory(tempDir, latestJobID); + + // Doing asynchronous to retrieve the latest job. Because the job that would have + // otherwise would have been evicted was fetched via a user request, it should not + // be evicted from the archives + archiveFetcher.fetchArchives(); + + // Validating that the cache does not exceed the total cache limit and that the + // correct jobs are stored in the cache + assertThat(getNumJobsInGeneralCache(webDir)) + .isEqualTo(generalCacheSize + mostRecentlyViewedCacheSize); + assertThat(findJobIdInGeneralCache(webDir, latestJobID)).isTrue(); + for (int i = 0; i < generalCacheSize + 1; i++) { + assertThat(findJobIdInGeneralCache(webDir, jobIdByCreation.get(i))).isTrue(); + } + } + + private int getNumJobsInGeneralCache(Path cacheDir) { + File f = new File(cacheDir.toFile(), "/jobs"); + return f.listFiles().length - 1; + } + + private boolean findJobIdInGeneralCache(Path cacheDir, String jobID) throws IOException { + File jobPath = new File(cacheDir.toFile(), "/jobs/" + jobID + ".json"); + return jobPath.exists(); + } + + private void buildRemoteStorageJobDirectory(Path refreshDir, String jobID) throws IOException { + String json = + "{\n" + + " \"archive\": [\n" + + " {\n" + + " \"path\": \"/jobs/" + + jobID + + "\",\n" + + " \"json\": \"{\\\"jid\\\":\\\"" + + jobID + + "\\\",\\\"name\\\":\\\"WordCount\\\",\\\"isStoppable\\\":false," + + "\\\"state\\\":\\\"FINISHED\\\",\\\"start-time\\\":1705527079656,\\\"end-time\\\":1705527080059," + + "\\\"duration\\\":403,\\\"maxParallelism\\\":-1,\\\"now\\\":1705527080104,\\\"timestamps\\\":{\\\"FAILED\\\":" + + "0,\\\"FINISHED\\\":1705527080059,\\\"CANCELLING\\\":0,\\\"CANCELED\\\":0,\\\"INITIALIZING\\\":" + + "1705527079656,\\\"CREATED\\\":1705527079708,\\\"RUNNING\\\":1705527079763,\\\"RESTARTING\\\":" + + "0,\\\"SUSPENDED\\\":0,\\\"FAILING\\\":0,\\\"RECONCILING\\\":0},\\\"vertices\\\":[{\\\"id\\\":" + + "\\\"cbc357ccb763df2852fee8c4fc7d55f2\\\",\\\"name\\\":\\\"Source: in-memory-input -> tokenizer\\\"," + + "\\\"maxParallelism\\\":128,\\\"parallelism\\\":1,\\\"status\\\":\\\"FINISHED\\\"," + + "\\\"start-time\\\":1705527079881,\\\"end-time\\\":1705527080046,\\\"duration\\\":165," + + "\\\"tasks\\\":{\\\"CANCELED\\\":0,\\\"FAILED\\\":0,\\\"FINISHED\\\":1,\\\"DEPLOYING\\\":" + + "0,\\\"RUNNING\\\":0,\\\"INITIALIZING\\\":0,\\\"SCHEDULED\\\":0,\\\"CANCELING\\\":0," + + "\\\"RECONCILING\\\":0,\\\"CREATED\\\":0},\\\"metrics\\\":{\\\"read-bytes\\\":0," + + "\\\"read-bytes-complete\\\":true,\\\"write-bytes\\\":4047,\\\"write-bytes-complete\\\":true," + + "\\\"read-records\\\":0,\\\"read-records-complete\\\":true,\\\"write-records\\\":287," + + "\\\"write-records-complete\\\":true,\\\"accumulated-backpressured-time\\\":0," + + "\\\"accumulated-idle-time\\\":0,\\\"accumulated-busy-time\\\":\\\"NaN\\\"}},{\\\"id\\\":" + + "\\\"" + + jobID + + "\\\",\\\"name\\\":\\\"counter -> Sink: print-sink\\\",\\\"maxParallelism\\\":" + + "128,\\\"parallelism\\\":1,\\\"status\\\":\\\"FINISHED\\\",\\\"start-time\\\":" + + "1705527079885,\\\"end-time\\\":1705527080057,\\\"duration\\\":172,\\\"tasks\\\":{\\\"CANCELED\\\":" + + "0,\\\"FAILED\\\":0,\\\"FINISHED\\\":1,\\\"DEPLOYING\\\":0,\\\"RUNNING\\\":0,\\\"INITIALIZING\\\":" + + "0,\\\"SCHEDULED\\\":0,\\\"CANCELING\\\":0,\\\"RECONCILING\\\":0,\\\"CREATED\\\":0},\\\"metrics\\\":" + + "{\\\"read-bytes\\\":4060,\\\"read-bytes-complete\\\":true,\\\"write-bytes\\\":0," + + "\\\"write-bytes-complete\\\":true,\\\"read-records\\\":287,\\\"read-records-complete\\\":true," + + "\\\"write-records\\\":0,\\\"write-records-complete\\\":true,\\\"accumulated-backpressured-time\\\":" + + "0,\\\"accumulated-idle-time\\\":1,\\\"accumulated-busy-time\\\":15.0}}],\\\"status-counts\\\":{\\\"CANCELED\\\":" + + "0,\\\"FAILED\\\":0,\\\"FINISHED\\\":2,\\\"DEPLOYING\\\":0,\\\"RUNNING\\\":0,\\\"INITIALIZING\\\":0," + + "\\\"SCHEDULED\\\":0,\\\"CANCELING\\\":0,\\\"RECONCILING\\\":0,\\\"CREATED\\\":0},\\\"plan\\\":{\\\"jid\\\":" + + "\\\"" + + jobID + + "\\\",\\\"name\\\":\\\"WordCount\\\",\\\"type\\\":\\\"STREAMING\\\",\\\"nodes\\\":[{\\\"id\\\":" + + "\\\"" + + jobID + + "\\\",\\\"parallelism\\\":1,\\\"operator\\\":\\\"\\\",\\\"operator_strategy\\\":\\\"\\\"," + + "\\\"description\\\":\\\"counter
+- Sink: print-sink
\\\",\\\"inputs\\\":[{\\\"num\\\":0," + + "\\\"id\\\":\\\"cbc357ccb763df2852fee8c4fc7d55f2\\\",\\\"ship_strategy\\\":\\\"HASH\\\"," + + "\\\"exchange\\\":\\\"pipelined_bounded\\\"}],\\\"optimizer_properties\\\":{}},{\\\"id\\\":" + + "\\\"cbc357ccb763df2852fee8c4fc7d55f2\\\",\\\"parallelism\\\":1,\\\"operator\\\":\\\"\\\"," + + "\\\"operator_strategy\\\":\\\"\\\",\\\"description\\\":\\\"Source: in-memory-input
+- tokenizer" + + "
\\\",\\\"optimizer_properties\\\":{}}]}\"\n" + + " }\n" + + " ]\n" + + "}"; + Path remoteJobArchive = Files.createFile(refreshDir.resolve(jobID)); + remoteJobArchive.toFile().setLastModified(System.currentTimeMillis()); + FileUtils.writeStringToFile(remoteJobArchive.toFile(), json); + } + + private Map retrieveRemoteArchivesByModificationTime( + FileSystem refreshFS, org.apache.flink.core.fs.Path tempDirFs) throws IOException { + FileStatus[] remoteFiles = HistoryServerArchiveFetcher.listArchives(refreshFS, tempDirFs); + + return IntStream.range(0, remoteFiles.length) + .boxed() + .collect( + Collectors.toMap( + i -> i, + i -> remoteFiles[i].getPath().getName(), + (a, b) -> b, + LinkedHashMap::new)); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java index d908cf633b8b1..bc7a0f1d2163e 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java @@ -20,16 +20,22 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.rest.handler.router.Router; import org.apache.flink.runtime.webmonitor.testutils.HttpUtils; import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap; +import org.apache.commons.io.FileUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; import static org.assertj.core.api.Assertions.assertThat; @@ -40,10 +46,27 @@ class HistoryServerStaticFileServerHandlerTest { void testRespondWithFile(@TempDir Path tmpDir) throws Exception { final Path webDir = Files.createDirectory(tmpDir.resolve("webDir")); final Path uploadDir = Files.createDirectory(tmpDir.resolve("uploadDir")); + final Path tempDir = Files.createDirectory(tmpDir.resolve("tmpDir")); + final org.apache.flink.core.fs.Path tempDirFs = + new org.apache.flink.core.fs.Path(tempDir.toUri()); + + buildLocalCacheDirectory(webDir, "job123"); + buildRemoteStorageJobDirectory(tempDir, "jobabc"); + + List refreshDirs = new ArrayList<>(); + FileSystem refreshFS = tempDirFs.getFileSystem(); + refreshDirs.add(new HistoryServer.RefreshLocation(tempDirFs, refreshFS)); + + HistoryServerArchiveFetcher archiveFetcher = + new HistoryServerArchiveFetcher( + refreshDirs, webDir.toFile(), (event) -> {}, true, 10, 5, true, 3); Router router = new Router() - .addGet("/:*", new HistoryServerStaticFileServerHandler(webDir.toFile())); + .addGet( + "/:*", + new HistoryServerStaticFileServerHandler( + webDir.toFile(), true, archiveFetcher)); WebFrontendBootstrap webUI = new WebFrontendBootstrap( router, @@ -62,6 +85,23 @@ void testRespondWithFile(@TempDir Path tmpDir) throws Exception { assertThat(notFound404.f0).isEqualTo(404); assertThat(notFound404.f1).contains("not found"); + // verify that we are able to fetch job files from the local cache + Tuple2 foundFromCache = + HttpUtils.getFromHTTP("http://localhost:" + port + "/jobs/job123"); + assertThat(foundFromCache.f0).isEqualTo(200); + + // verify that we are able to fetch job files from the remote file system when the + // file is not present in the local cache + Tuple2 foundFromRemoteCache = + HttpUtils.getFromHTTP("http://localhost:" + port + "/jobs/jobabc"); + assertThat(foundFromRemoteCache.f0).isEqualTo(200); + + // verify that if a job does not exist either in the remote or local cache + // that a 404 message is returned + Tuple2 jobNotFound404 = + HttpUtils.getFromHTTP("http://localhost:" + port + "/jobs/jobfoo"); + assertThat(jobNotFound404.f0).isEqualTo(404); + // verify that a) a file can be loaded using the ClassLoader and b) that the // HistoryServer // index_hs.html is injected @@ -92,4 +132,68 @@ void testRespondWithFile(@TempDir Path tmpDir) throws Exception { webUI.shutdown(); } } + + private void buildLocalCacheDirectory(Path cacheDir, String jobID) throws IOException { + File localCachePath = new File(cacheDir.toFile(), "jobs/" + jobID + ".json"); + localCachePath.getParentFile().mkdirs(); + Files.createFile(localCachePath.toPath()); + } + + private void buildRemoteStorageJobDirectory(Path refreshDir, String jobID) throws IOException { + String json = + "{\n" + + " \"archive\": [\n" + + " {\n" + + " \"path\": \"/jobs/" + + jobID + + "\",\n" + + " \"json\": \"{\\\"jid\\\":\\\"" + + jobID + + "\\\",\\\"name\\\":\\\"WordCount\\\",\\\"isStoppable\\\":false," + + "\\\"state\\\":\\\"FINISHED\\\",\\\"start-time\\\":1705527079656,\\\"end-time\\\":1705527080059," + + "\\\"duration\\\":403,\\\"maxParallelism\\\":-1,\\\"now\\\":1705527080104,\\\"timestamps\\\":{\\\"FAILED\\\":" + + "0,\\\"FINISHED\\\":1705527080059,\\\"CANCELLING\\\":0,\\\"CANCELED\\\":0,\\\"INITIALIZING\\\":" + + "1705527079656,\\\"CREATED\\\":1705527079708,\\\"RUNNING\\\":1705527079763,\\\"RESTARTING\\\":" + + "0,\\\"SUSPENDED\\\":0,\\\"FAILING\\\":0,\\\"RECONCILING\\\":0},\\\"vertices\\\":[{\\\"id\\\":" + + "\\\"cbc357ccb763df2852fee8c4fc7d55f2\\\",\\\"name\\\":\\\"Source: in-memory-input -> tokenizer\\\"," + + "\\\"maxParallelism\\\":128,\\\"parallelism\\\":1,\\\"status\\\":\\\"FINISHED\\\"," + + "\\\"start-time\\\":1705527079881,\\\"end-time\\\":1705527080046,\\\"duration\\\":165," + + "\\\"tasks\\\":{\\\"CANCELED\\\":0,\\\"FAILED\\\":0,\\\"FINISHED\\\":1,\\\"DEPLOYING\\\":" + + "0,\\\"RUNNING\\\":0,\\\"INITIALIZING\\\":0,\\\"SCHEDULED\\\":0,\\\"CANCELING\\\":0," + + "\\\"RECONCILING\\\":0,\\\"CREATED\\\":0},\\\"metrics\\\":{\\\"read-bytes\\\":0," + + "\\\"read-bytes-complete\\\":true,\\\"write-bytes\\\":4047,\\\"write-bytes-complete\\\":true," + + "\\\"read-records\\\":0,\\\"read-records-complete\\\":true,\\\"write-records\\\":287," + + "\\\"write-records-complete\\\":true,\\\"accumulated-backpressured-time\\\":0," + + "\\\"accumulated-idle-time\\\":0,\\\"accumulated-busy-time\\\":\\\"NaN\\\"}},{\\\"id\\\":" + + "\\\"" + + jobID + + "\\\",\\\"name\\\":\\\"counter -> Sink: print-sink\\\",\\\"maxParallelism\\\":" + + "128,\\\"parallelism\\\":1,\\\"status\\\":\\\"FINISHED\\\",\\\"start-time\\\":" + + "1705527079885,\\\"end-time\\\":1705527080057,\\\"duration\\\":172,\\\"tasks\\\":{\\\"CANCELED\\\":" + + "0,\\\"FAILED\\\":0,\\\"FINISHED\\\":1,\\\"DEPLOYING\\\":0,\\\"RUNNING\\\":0,\\\"INITIALIZING\\\":" + + "0,\\\"SCHEDULED\\\":0,\\\"CANCELING\\\":0,\\\"RECONCILING\\\":0,\\\"CREATED\\\":0},\\\"metrics\\\":" + + "{\\\"read-bytes\\\":4060,\\\"read-bytes-complete\\\":true,\\\"write-bytes\\\":0," + + "\\\"write-bytes-complete\\\":true,\\\"read-records\\\":287,\\\"read-records-complete\\\":true," + + "\\\"write-records\\\":0,\\\"write-records-complete\\\":true,\\\"accumulated-backpressured-time\\\":" + + "0,\\\"accumulated-idle-time\\\":1,\\\"accumulated-busy-time\\\":15.0}}],\\\"status-counts\\\":{\\\"CANCELED\\\":" + + "0,\\\"FAILED\\\":0,\\\"FINISHED\\\":2,\\\"DEPLOYING\\\":0,\\\"RUNNING\\\":0,\\\"INITIALIZING\\\":0," + + "\\\"SCHEDULED\\\":0,\\\"CANCELING\\\":0,\\\"RECONCILING\\\":0,\\\"CREATED\\\":0},\\\"plan\\\":{\\\"jid\\\":" + + "\\\"" + + jobID + + "\\\",\\\"name\\\":\\\"WordCount\\\",\\\"type\\\":\\\"STREAMING\\\",\\\"nodes\\\":[{\\\"id\\\":" + + "\\\"" + + jobID + + "\\\",\\\"parallelism\\\":1,\\\"operator\\\":\\\"\\\",\\\"operator_strategy\\\":\\\"\\\"," + + "\\\"description\\\":\\\"counter
+- Sink: print-sink
\\\",\\\"inputs\\\":[{\\\"num\\\":0," + + "\\\"id\\\":\\\"cbc357ccb763df2852fee8c4fc7d55f2\\\",\\\"ship_strategy\\\":\\\"HASH\\\"," + + "\\\"exchange\\\":\\\"pipelined_bounded\\\"}],\\\"optimizer_properties\\\":{}},{\\\"id\\\":" + + "\\\"cbc357ccb763df2852fee8c4fc7d55f2\\\",\\\"parallelism\\\":1,\\\"operator\\\":\\\"\\\"," + + "\\\"operator_strategy\\\":\\\"\\\",\\\"description\\\":\\\"Source: in-memory-input
+- tokenizer" + + "
\\\",\\\"optimizer_properties\\\":{}}]}\"\n" + + " }\n" + + " ]\n" + + "}"; + Path remoteJobArchive = Files.createFile(refreshDir.resolve(jobID)); + FileUtils.writeStringToFile(remoteJobArchive.toFile(), json); + } } diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java index 82441b24007ac..3cf1aaa023893 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java @@ -44,11 +44,14 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.io.FileUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import java.io.File; @@ -74,7 +77,6 @@ /** Tests for the HistoryServer. */ class HistoryServerTest { - private static final JsonFactory JACKSON_FACTORY = new JsonFactory() .enable(JsonGenerator.Feature.AUTO_CLOSE_TARGET) @@ -176,15 +178,14 @@ void testRemoveOldestModifiedArchivesBeyondHistorySizeLimit(final boolean versio } CountDownLatch numArchivesCreatedInitially = new CountDownLatch(numArchivesToKeepInHistory); - CountDownLatch numArchivesDeletedInitially = - new CountDownLatch(numArchivesToRemoveUponHsStart); CountDownLatch numArchivesCreatedTotal = new CountDownLatch( numArchivesBeforeHsStarted - numArchivesToRemoveUponHsStart + numArchivesAfterHsStarted); - CountDownLatch numArchivesDeletedTotal = - new CountDownLatch(numArchivesToRemoveUponHsStart + numArchivesAfterHsStarted); + CountDownLatch numArchivesDeletedTotal = new CountDownLatch(numArchivesAfterHsStarted); + CountDownLatch numArchivesRemovedInitially = + new CountDownLatch(numArchivesToRemoveUponHsStart); Configuration historyServerConfig = createTestConfiguration( @@ -201,9 +202,11 @@ void testRemoveOldestModifiedArchivesBeyondHistorySizeLimit(final boolean versio numArchivesCreatedTotal.countDown(); break; case DELETED: - numArchivesDeletedInitially.countDown(); numArchivesDeletedTotal.countDown(); break; + case DELETED_FROM_REMOTE: + numArchivesRemovedInitially.countDown(); + break; } }); @@ -211,7 +214,7 @@ void testRemoveOldestModifiedArchivesBeyondHistorySizeLimit(final boolean versio hs.start(); String baseUrl = "http://localhost:" + hs.getWebPort(); assertThat(numArchivesCreatedInitially.await(10L, TimeUnit.SECONDS)).isTrue(); - assertThat(numArchivesDeletedInitially.await(10L, TimeUnit.SECONDS)).isTrue(); + assertThat(numArchivesRemovedInitially.await(10L, TimeUnit.SECONDS)).isTrue(); assertThat(getIdsFromJobOverview(baseUrl)) .isEqualTo(new HashSet<>(expectedJobIdsToKeep)); @@ -251,6 +254,24 @@ void testFailIfHistorySizeLimitIsLessThanMinusOne() throws Exception { .isInstanceOf(IllegalConfigurationException.class); } + @Test + void testFailIfCacheSizeLimitIsZero() throws Exception { + assertThatThrownBy(() -> startHistoryServerWithCacheSizeLimit(0)) + .isInstanceOf(IllegalConfigurationException.class); + } + + @Test + void testFailIfCacheSizeLimitIsLessThanMinusOne() throws Exception { + assertThatThrownBy(() -> startHistoryServerWithCacheSizeLimit(-2)) + .isInstanceOf(IllegalConfigurationException.class); + } + + @Test + void testFailIfRemoteCacheSizeLimitIsLessThanOrEqualToZero() throws Exception { + assertThatThrownBy(() -> startHistoryServerWithRemoteFetchCacheSizeLimit(0)) + .isInstanceOf(IllegalConfigurationException.class); + } + private void startHistoryServerWithSizeLimit(int maxHistorySize) throws IOException, FlinkException, InterruptedException { Configuration historyServerConfig = @@ -260,30 +281,73 @@ private void startHistoryServerWithSizeLimit(int maxHistorySize) new HistoryServer(historyServerConfig).start(); } - @Test - void testCleanExpiredJob() throws Exception { - runArchiveExpirationTest(true); + private void startHistoryServerWithCacheSizeLimit(int maxCacheSize) + throws IOException, FlinkException, InterruptedException { + Configuration historyServerConfig = + createTestConfiguration( + HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS.defaultValue()); + historyServerConfig.set(HistoryServerOptions.HISTORY_SERVER_CACHED_JOBS, maxCacheSize); + new HistoryServer(historyServerConfig).start(); } - @Test - void testRemainExpiredJob() throws Exception { - runArchiveExpirationTest(false); + private void startHistoryServerWithRemoteFetchCacheSizeLimit( + int numCachedMostRecentlyViewedJobs) + throws IOException, FlinkException, InterruptedException { + Configuration historyServerConfig = + createTestConfiguration( + HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS.defaultValue()); + historyServerConfig.set(HistoryServerOptions.HISTORY_SERVER_CACHED_JOBS, 3); + historyServerConfig.set( + HistoryServerOptions.HISTORY_SERVER_NUM_CACHED_MOST_RECENTLY_VIEWED_JOBS, + numCachedMostRecentlyViewedJobs); + new HistoryServer(historyServerConfig).start(); } - private void runArchiveExpirationTest(boolean cleanupExpiredJobs) throws Exception { + private static Stream archiveExpirationTestSource() { + return Stream.of( + Arguments.arguments(true, 3, 5, 5, 3), + Arguments.arguments(false, 3, 5, 5, 3), + Arguments.arguments(true, 3, 5, 1, 1), + Arguments.arguments(false, 3, 5, 1, 1), + Arguments.arguments(true, 5, 4, -1, 4), + Arguments.arguments(false, 3, 1, 5, 1), + Arguments.arguments(true, 5, 1, -1, 1), + Arguments.arguments(true, 3, 5, 2, 2), + Arguments.arguments(true, 4, -1, 3, 3), + Arguments.arguments(true, 3, -1, -1, 3), + Arguments.arguments(false, 3, -1, -1, 3)); + } + + @ParameterizedTest + @MethodSource("archiveExpirationTestSource") + void runArchiveExpirationTest( + boolean cleanupExpiredJobs, + final int numJobs, + final int maxCache, + final int maxHistory, + final int limitingHistory) + throws Exception { int numExpiredJobs = cleanupExpiredJobs ? 1 : 0; - int numJobs = 3; for (int x = 0; x < numJobs; x++) { runJob(); } waitForArchivesCreation(numJobs); - CountDownLatch numExpectedArchivedJobs = new CountDownLatch(numJobs); - CountDownLatch firstArchiveExpiredLatch = new CountDownLatch(numExpiredJobs); - CountDownLatch allArchivesExpiredLatch = - new CountDownLatch(cleanupExpiredJobs ? numJobs : 0); + CountDownLatch numExpectedArchivedJobs = new CountDownLatch(limitingHistory); + int initialDeletedArchives = + (maxHistory > 0 && maxHistory < numJobs) ? numJobs - maxHistory : 0; + CountDownLatch numInitialDeletedArchives = new CountDownLatch(initialDeletedArchives); + + CountDownLatch numDeletedArchivesInCache = new CountDownLatch(numExpiredJobs); + + int allDeletedArchives = + (maxCache > 0 && maxCache == limitingHistory) ? maxCache : limitingHistory; + CountDownLatch allArchivesExpiredLatch; + allArchivesExpiredLatch = new CountDownLatch(cleanupExpiredJobs ? allDeletedArchives : 0); Configuration historyServerConfig = createTestConfiguration(cleanupExpiredJobs); + historyServerConfig.set(HistoryServerOptions.HISTORY_SERVER_CACHED_JOBS, maxCache); + historyServerConfig.set(HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS, maxHistory); HistoryServer hs = new HistoryServer( @@ -294,9 +358,12 @@ private void runArchiveExpirationTest(boolean cleanupExpiredJobs) throws Excepti numExpectedArchivedJobs.countDown(); break; case DELETED: - firstArchiveExpiredLatch.countDown(); + numDeletedArchivesInCache.countDown(); allArchivesExpiredLatch.countDown(); break; + case DELETED_FROM_REMOTE: + numInitialDeletedArchives.countDown(); + break; } }); @@ -304,12 +371,15 @@ private void runArchiveExpirationTest(boolean cleanupExpiredJobs) throws Excepti hs.start(); String baseUrl = "http://localhost:" + hs.getWebPort(); assertThat(numExpectedArchivedJobs.await(10L, TimeUnit.SECONDS)).isTrue(); + assertThat(numInitialDeletedArchives.await(10L, TimeUnit.SECONDS)).isTrue(); Collection jobs = getJobsOverview(baseUrl).getJobs(); - assertThat(jobs).hasSize(numJobs); + assertThat(jobs).hasSize(Math.min(numJobs, limitingHistory)); + int jobCount = jobs.size(); String jobIdToDelete = jobs.stream() + .skip(jobCount - 1) .findFirst() .map(JobDetails::getJobId) .map(JobID::toString) @@ -322,34 +392,125 @@ private void runArchiveExpirationTest(boolean cleanupExpiredJobs) throws Excepti // we fetch again to probabilistically cause a concurrent deletion hs.fetchArchives(); Files.deleteIfExists(jmDirectory.toPath().resolve(jobIdToDelete)); - - assertThat(firstArchiveExpiredLatch.await(10L, TimeUnit.SECONDS)).isTrue(); - + assertThat(numDeletedArchivesInCache.await(10L, TimeUnit.SECONDS)).isTrue(); // check that archive is still/no longer present in hs Collection jobsAfterDeletion = getJobsOverview(baseUrl).getJobs(); - assertThat(jobsAfterDeletion).hasSize(numJobs - numExpiredJobs); - assertThat( - jobsAfterDeletion.stream() - .map(JobDetails::getJobId) - .map(JobID::toString) - .filter(jobId -> jobId.equals(jobIdToDelete)) - .count()) - .isEqualTo(1 - numExpiredJobs); + + if (numExpiredJobs != 0) { + assertThat( + jobsAfterDeletion.stream() + .map(JobDetails::getJobId) + .map(JobID::toString) + .filter(jobId -> jobId.equals(jobIdToDelete)) + .count()) + .isEqualTo(0); + } // delete remaining archives from jm and ensure files are cleaned up - List remainingJobIds = - jobsAfterDeletion.stream() - .map(JobDetails::getJobId) - .map(JobID::toString) - .collect(Collectors.toList()); + FileUtils.cleanDirectory(jmDirectory); + assertThat(allArchivesExpiredLatch.await(10L, TimeUnit.SECONDS)).isTrue(); + Thread.sleep(3000); + assertJobFilesCleanedUp(cleanupExpiredJobs); + } finally { + hs.stop(); + } + } + + private static Stream remoteAndLocalCacheSource() { + return Stream.of( + Arguments.arguments(true, 1, 3, 1), + Arguments.arguments(false, 1, 3, 1), + Arguments.arguments(true, 3, 1, 1), + Arguments.arguments(false, 1, 3, 1), + Arguments.arguments( + true, + 2, + HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS.defaultValue(), + 2), + Arguments.arguments( + false, + 2, + HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS.defaultValue(), + 2), + Arguments.arguments(true, -1, 2, 2), + Arguments.arguments(false, -1, 2, 2)); + } + + @ParameterizedTest + @MethodSource("remoteAndLocalCacheSource") + void testRemoveLocalAndRemoteArchiveLimits( + final boolean versionLessThan14, + final int cacheLimit, + final int remoteHistoryLimit, + final int limitingStorage) + throws Exception { + + final int numRemoteArchivesBeforeHsStarted = 4; + final int numRemoteArchivesAfterHsStarted = 2; - for (String remainingJobId : remainingJobIds) { - Files.deleteIfExists(jmDirectory.toPath().resolve(remainingJobId)); + final int numArchivesDeletedAfterHsStarted = + numRemoteArchivesAfterHsStarted - limitingStorage; + + final long oneMinuteSinceEpoch = 1000L * 60L; + List expectedJobIdsToKeep = new LinkedList<>(); + + for (int j = 0; j < numRemoteArchivesBeforeHsStarted; j++) { + String jobId = + createLegacyArchive( + jmDirectory.toPath(), j * oneMinuteSinceEpoch, versionLessThan14); + if (j >= numRemoteArchivesBeforeHsStarted - limitingStorage) { + expectedJobIdsToKeep.add(jobId); } + } - assertThat(allArchivesExpiredLatch.await(10L, TimeUnit.SECONDS)).isTrue(); + CountDownLatch numArchivesCreatedInitially = new CountDownLatch(limitingStorage); + CountDownLatch numArchivesCreatedTotal = new CountDownLatch(limitingStorage * 2); + CountDownLatch numArchivesDeletedTotal = + new CountDownLatch(numArchivesDeletedAfterHsStarted); - assertJobFilesCleanedUp(cleanupExpiredJobs); + Configuration historyServerConfig = + createTestConfiguration( + HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS.defaultValue()); + if (cacheLimit > 0) { + historyServerConfig.set(HistoryServerOptions.HISTORY_SERVER_CACHED_JOBS, cacheLimit); + } + historyServerConfig.set( + HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS, remoteHistoryLimit); + + HistoryServer hs = + new HistoryServer( + historyServerConfig, + (event) -> { + switch (event.getType()) { + case CREATED: + numArchivesCreatedInitially.countDown(); + numArchivesCreatedTotal.countDown(); + break; + case DELETED: + numArchivesDeletedTotal.countDown(); + break; + } + }); + + try { + hs.start(); + String baseUrl = "http://localhost:" + hs.getWebPort(); + assertThat(numArchivesCreatedInitially.await(10L, TimeUnit.SECONDS)).isTrue(); + assertThat(getIdsFromJobOverview(baseUrl)) + .isEqualTo(new HashSet<>(expectedJobIdsToKeep)); + + for (int j = numRemoteArchivesBeforeHsStarted; + j < numRemoteArchivesBeforeHsStarted + numRemoteArchivesAfterHsStarted; + j++) { + expectedJobIdsToKeep.remove(0); + expectedJobIdsToKeep.add( + createLegacyArchive( + jmDirectory.toPath(), j * oneMinuteSinceEpoch, versionLessThan14)); + } + assertThat(numArchivesCreatedTotal.await(10L, TimeUnit.SECONDS)).isTrue(); + assertThat(numArchivesDeletedTotal.await(10L, TimeUnit.SECONDS)).isTrue(); + assertThat(getIdsFromJobOverview(baseUrl)) + .isEqualTo(new HashSet<>(expectedJobIdsToKeep)); } finally { hs.stop(); } @@ -391,7 +552,7 @@ private Configuration createTestConfiguration(boolean cleanupExpiredJobs) { HistoryServerOptions.HISTORY_SERVER_WEB_DIR, hsDirectory.getAbsolutePath()); historyServerConfig.set( HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL, - Duration.ofMillis(100L)); + Duration.ofMillis(1000L)); historyServerConfig.set( HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS, cleanupExpiredJobs); diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrapTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrapTest.java index 968dcf73a7d17..e2995105dc639 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrapTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrapTest.java @@ -20,10 +20,13 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.io.network.netty.InboundChannelHandlerFactory; import org.apache.flink.runtime.io.network.netty.Prio0InboundChannelHandlerFactory; import org.apache.flink.runtime.io.network.netty.Prio1InboundChannelHandlerFactory; import org.apache.flink.runtime.rest.handler.router.Router; +import org.apache.flink.runtime.webmonitor.history.HistoryServer; +import org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher; import org.apache.flink.runtime.webmonitor.history.HistoryServerStaticFileServerHandler; import org.apache.flink.runtime.webmonitor.testutils.HttpUtils; import org.apache.flink.testutils.junit.extensions.ContextClassLoaderExtension; @@ -36,6 +39,8 @@ import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -57,12 +62,28 @@ class WebFrontendBootstrapTest { @Test void testHandlersMustBeLoaded() throws Exception { Path webDir = Files.createDirectories(tmp.resolve("webDir")); + + Path refreshDir = Files.createDirectories(tmp.resolve("refreshDir")); + final org.apache.flink.core.fs.Path tempDirFs = + new org.apache.flink.core.fs.Path(refreshDir.toUri()); + + List refreshDirs = new ArrayList<>(); + FileSystem refreshFS = tempDirFs.getFileSystem(); + refreshDirs.add(new HistoryServer.RefreshLocation(tempDirFs, refreshFS)); + Configuration configuration = new Configuration(); configuration.set(Prio0InboundChannelHandlerFactory.REDIRECT_FROM_URL, "/nonExisting"); configuration.set(Prio0InboundChannelHandlerFactory.REDIRECT_TO_URL, "/index.html"); + + HistoryServerArchiveFetcher archiveFetcher = + new HistoryServerArchiveFetcher( + refreshDirs, webDir.toFile(), (event) -> {}, true, 10, 5, true, 3); Router router = new Router<>() - .addGet("/:*", new HistoryServerStaticFileServerHandler(webDir.toFile())); + .addGet( + "/:*", + new HistoryServerStaticFileServerHandler( + webDir.toFile(), false, archiveFetcher)); WebFrontendBootstrap webUI = new WebFrontendBootstrap( router, diff --git a/flink-runtime-web/web-dashboard/src/app/components/job-list/job-list.component.html b/flink-runtime-web/web-dashboard/src/app/components/job-list/job-list.component.html index 257159ba9e7a9..a08245f42d6d6 100644 --- a/flink-runtime-web/web-dashboard/src/app/components/job-list/job-list.component.html +++ b/flink-runtime-web/web-dashboard/src/app/components/job-list/job-list.component.html @@ -30,7 +30,7 @@ Job Name Start Time Duration - End Time + End Time Tasks Status diff --git a/flink-runtime-web/web-dashboard/src/app/components/job-list/job-list.component.ts b/flink-runtime-web/web-dashboard/src/app/components/job-list/job-list.component.ts index db30339688c1e..660ae0ea6d833 100644 --- a/flink-runtime-web/web-dashboard/src/app/components/job-list/job-list.component.ts +++ b/flink-runtime-web/web-dashboard/src/app/components/job-list/job-list.component.ts @@ -61,6 +61,7 @@ export class JobListComponent implements OnInit, OnDestroy, OnChanges { listOfJob: JobsItem[] = []; isLoading = true; destroy$ = new Subject(); + sortOrder = 'descend'; @Input() completed = false; @Input() title: string; @Input() jobData$: Observable; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java index 4454101d17cba..447a279311ecd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java @@ -368,7 +368,7 @@ public static void setContentTypeHeader(HttpResponse response, File file) { public static void checkFileValidity(File file, File rootPath, Logger logger) throws IOException, RestHandlerException { // this check must be done first to prevent probing for arbitrary files - if (!file.getCanonicalFile().toPath().startsWith(rootPath.toPath())) { + if (!file.getCanonicalFile().toPath().startsWith(rootPath.getCanonicalFile().toPath())) { if (logger.isDebugEnabled()) { logger.debug( "Requested path {} points outside the root directory.", From 82d1c50c6d9212183ff33f0956db0678513861aa Mon Sep 17 00:00:00 2001 From: mattcuento Date: Wed, 14 Jan 2026 13:31:30 -0800 Subject: [PATCH 2/2] [FLINK-37155][flink-runtime-web][flink-core] Implementing FLIP-505 with PR feedback incorporated lint --- .../configuration/HistoryServerOptions.java | 12 ++- .../webmonitor/history/HistoryServer.java | 83 ++++++++++++++----- .../history/HistoryServerArchiveFetcher.java | 8 +- .../HistoryServerStaticFileServerHandler.java | 4 +- 4 files changed, 74 insertions(+), 33 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java index 235424c6dff1a..c891df1a88314 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java @@ -140,7 +140,9 @@ public class HistoryServerOptions { "If this configuration is provided, the remote and local storage of job archives will be decoupled.") .text( "If set to `0` or less than `-1` HistoryServer will throw an %s. ", - code("IllegalConfigurationException")) + code( + IllegalConfigurationException.class + .getSimpleName())) .build()); public static final ConfigOption HISTORY_SERVER_RETAINED_JOBS = @@ -157,7 +159,9 @@ public class HistoryServerOptions { "If set to `-1`(default), there is no limit to the number of archives. ") .text( "If set to `0` or less than `-1` HistoryServer will throw an %s. ", - code("IllegalConfigurationException")) + code( + IllegalConfigurationException.class + .getSimpleName())) .build()); public static final ConfigOption HISTORY_SERVER_NUM_CACHED_MOST_RECENTLY_VIEWED_JOBS = @@ -176,7 +180,9 @@ public class HistoryServerOptions { HISTORY_SERVER_WEB_DIR.key())) .text( "If set to less than `0` HistoryServer will throw an %s. ", - code("IllegalConfigurationException")) + code( + IllegalConfigurationException.class + .getSimpleName())) .build()); private HistoryServerOptions() {} diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java index c1845dd9cbc3c..bcce1a8a237dc 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java @@ -240,32 +240,12 @@ public HistoryServer( refreshIntervalMillis = config.get(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL).toMillis(); - int maxHistorySize = config.get(HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS); - if (maxHistorySize == 0 || maxHistorySize < -1) { - throw new IllegalConfigurationException( - "Cannot set %s to 0 or less than -1", - HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS.key()); - } + int maxHistorySize = validateRetainedJobsConfig(config); boolean remoteFetchEnabled = config.contains(HistoryServerOptions.HISTORY_SERVER_CACHED_JOBS); - int generalCachedJobSize = -1; - if (remoteFetchEnabled) { - generalCachedJobSize = config.get(HistoryServerOptions.HISTORY_SERVER_CACHED_JOBS); - if (generalCachedJobSize == 0 || generalCachedJobSize < -1) { - throw new IllegalConfigurationException( - "Cannot set %s to 0 or less than -1", - HistoryServerOptions.HISTORY_SERVER_CACHED_JOBS.key()); - } - } - int numCachedMostRecentlyViewedJobs = - config.get( - HistoryServerOptions.HISTORY_SERVER_NUM_CACHED_MOST_RECENTLY_VIEWED_JOBS); - if (numCachedMostRecentlyViewedJobs <= 0) { - throw new IllegalConfigurationException( - "Cannot set %s to less than 0", - HistoryServerOptions.HISTORY_SERVER_NUM_CACHED_MOST_RECENTLY_VIEWED_JOBS.key()); - } + int generalCachedJobSize = validateCachedJobsConfig(config, remoteFetchEnabled); + int numCachedMostRecentlyViewedJobs = validateNumCachedMostRecentlyViewedJobsConfig(config); archiveFetcher = new HistoryServerArchiveFetcher( @@ -422,6 +402,63 @@ private static String createConfigJson(DashboardConfiguration dashboardConfigura return OBJECT_MAPPER.writeValueAsString(dashboardConfiguration); } + /** + * Validates and retrieves the retained jobs configuration. + * + * @param config the configuration + * @return the maximum history size + * @throws IllegalConfigurationException if the configuration is invalid + */ + private int validateRetainedJobsConfig(Configuration config) { + int maxHistorySize = config.get(HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS); + if (maxHistorySize == 0 || maxHistorySize < -1) { + throw new IllegalConfigurationException( + "Cannot set %s to 0 or less than -1", + HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS.key()); + } + return maxHistorySize; + } + + /** + * Validates and retrieves the cached jobs configuration if remote fetch is enabled. + * + * @param config the configuration + * @param remoteFetchEnabled whether remote fetch is enabled + * @return the general cached job size + * @throws IllegalConfigurationException if the configuration is invalid + */ + private int validateCachedJobsConfig(Configuration config, boolean remoteFetchEnabled) { + int generalCachedJobSize = -1; + if (remoteFetchEnabled) { + generalCachedJobSize = config.get(HistoryServerOptions.HISTORY_SERVER_CACHED_JOBS); + if (generalCachedJobSize == 0 || generalCachedJobSize < -1) { + throw new IllegalConfigurationException( + "Cannot set %s to 0 or less than -1", + HistoryServerOptions.HISTORY_SERVER_CACHED_JOBS.key()); + } + } + return generalCachedJobSize; + } + + /** + * Validates and retrieves the number of cached most recently viewed jobs configuration. + * + * @param config the configuration + * @return the number of cached most recently viewed jobs + * @throws IllegalConfigurationException if the configuration is invalid + */ + private int validateNumCachedMostRecentlyViewedJobsConfig(Configuration config) { + int numCachedMostRecentlyViewedJobs = + config.get( + HistoryServerOptions.HISTORY_SERVER_NUM_CACHED_MOST_RECENTLY_VIEWED_JOBS); + if (numCachedMostRecentlyViewedJobs <= 0) { + throw new IllegalConfigurationException( + "Cannot set %s to less than 0", + HistoryServerOptions.HISTORY_SERVER_NUM_CACHED_MOST_RECENTLY_VIEWED_JOBS.key()); + } + return numCachedMostRecentlyViewedJobs; + } + /** Container for the {@link Path} and {@link FileSystem} of a refresh directory. */ public static class RefreshLocation { private final Path path; diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java index 8def33a716f3e..9148f0112e3b0 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java @@ -276,9 +276,7 @@ void fetchArchives() { historySize++; - if (!remoteFetchEnabled - || (remoteFetchEnabled - && !mostRecentlyViewedCache.containsKey(jobID))) { + if (!remoteFetchEnabled || !mostRecentlyViewedCache.containsKey(jobID)) { generalCachedJobCount++; } @@ -295,9 +293,7 @@ void fetchArchives() { } if (remoteArchiveDeletion || localCacheDeletion) { - if ((!remoteFetchEnabled - || (remoteFetchEnabled - && !mostRecentlyViewedCache.containsKey(jobID))) + if ((!remoteFetchEnabled || !mostRecentlyViewedCache.containsKey(jobID)) && cachedArchives.contains(jobID)) { cachedJobArchivesToRemove .computeIfAbsent(refreshDir, ignored -> new HashSet<>()) diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java index 759574062b2f4..963bdcd4ca4d8 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java @@ -53,6 +53,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -100,7 +102,7 @@ public class HistoryServerStaticFileServerHandler /** The path in which the static documents are. */ private final File rootPath; - private final HistoryServerArchiveFetcher archiveFetcher; + @Nullable private final HistoryServerArchiveFetcher archiveFetcher; private final boolean enableRemoteArchiveRefresh;