diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java index 737222493403d..4d9490b0e0df8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java @@ -141,7 +141,7 @@ public ScheduledFuture scheduleWithFixedDelay( private ScheduledFuture scheduleRepeatedly( ProcessingTimeCallback callback, long initialDelay, long period, boolean fixedDelay) { final long nextTimestamp = getCurrentProcessingTime() + initialDelay; - final Runnable task = wrapOnTimerCallback(callback, nextTimestamp, period); + final Runnable task = wrapOnTimerCallback(callback, nextTimestamp, period, fixedDelay); // we directly try to register the timer and only react to the status on exception // that way we save unnecessary volatile accesses for each timer @@ -281,12 +281,13 @@ interface ExceptionHandler { } private Runnable wrapOnTimerCallback(ProcessingTimeCallback callback, long timestamp) { - return new ScheduledTask(status, exceptionHandler, callback, timestamp, 0); + return new ScheduledTask(status, exceptionHandler, callback, timestamp, 0, false); } private Runnable wrapOnTimerCallback( - ProcessingTimeCallback callback, long nextTimestamp, long period) { - return new ScheduledTask(status, exceptionHandler, callback, nextTimestamp, period); + ProcessingTimeCallback callback, long nextTimestamp, long period, boolean fixedDelay) { + return new ScheduledTask( + status, exceptionHandler, callback, nextTimestamp, period, fixedDelay); } private static final class ScheduledTask implements Runnable { @@ -296,18 +297,21 @@ private static final class ScheduledTask implements Runnable { private long nextTimestamp; private final long period; + private final boolean fixedDelay; ScheduledTask( AtomicInteger serviceStatus, ExceptionHandler exceptionHandler, ProcessingTimeCallback callback, long timestamp, - long period) { + long period, + boolean fixedDelay) { this.serviceStatus = serviceStatus; this.exceptionHandler = exceptionHandler; this.callback = callback; this.nextTimestamp = timestamp; this.period = period; + this.fixedDelay = fixedDelay; } @Override @@ -320,7 +324,8 @@ public void run() { } catch (Exception ex) { exceptionHandler.handleException(ex); } - nextTimestamp += period; + nextTimestamp = + fixedDelay ? System.currentTimeMillis() + period : nextTimestamp + period; } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java index 097b6863dbac6..f11ffa65bc931 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java @@ -21,6 +21,7 @@ import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.util.Preconditions; +import org.assertj.core.data.Offset; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -43,13 +44,15 @@ class SystemProcessingTimeServiceTest { /** * Tests that SystemProcessingTimeService#scheduleAtFixedRate is actually triggered multiple - * times. + * times with the expected scheduled timestamps. */ @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS) @Test void testScheduleAtFixedRate() throws Exception { final AtomicReference errorRef = new AtomicReference<>(); + final long initialDelay = 10L; final long period = 10L; + final long executionDelay = 10L; final int countDown = 3; final SystemProcessingTimeService timer = createSystemProcessingTimeService(errorRef); @@ -57,7 +60,76 @@ void testScheduleAtFixedRate() throws Exception { final CountDownLatch countDownLatch = new CountDownLatch(countDown); try { - timer.scheduleAtFixedRate(timestamp -> countDownLatch.countDown(), 0L, period); + final long initialTimestamp = timer.getCurrentProcessingTime() + initialDelay; + timer.scheduleAtFixedRate( + timestamp -> { + try { + long executionTimes = countDown - countDownLatch.getCount(); + assertThat(timestamp) + .isCloseTo( + initialTimestamp + executionTimes * period, + Offset.offset(period)); + Thread.sleep(executionDelay); + } catch (Error e) { + System.out.println(e.getMessage()); + throw new Error(e); + } + countDownLatch.countDown(); + }, + initialDelay, + period); + + countDownLatch.await(); + + assertThat(errorRef.get()).isNull(); + } finally { + timer.shutdownService(); + } + } + + /** + * Tests that SystemProcessingTimeService#testScheduleAtFixedDelay is actually triggered + * multiple times with the expected scheduled timestamps. + */ + @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS) + @Test + void testScheduleAtFixedDelay() throws Exception { + final AtomicReference errorRef = new AtomicReference<>(); + final long initialDelay = 10L; + final long period = 10L; + final long executionDelay = 10L; + final int countDown = 3; + + final SystemProcessingTimeService timer = createSystemProcessingTimeService(errorRef); + + final CountDownLatch countDownLatch = new CountDownLatch(countDown); + + final LastExecutionTimeWrapper lastExecutionTimeWrapper = new LastExecutionTimeWrapper(); + + try { + final long initialTimestamp = timer.getCurrentProcessingTime() + initialDelay; + timer.scheduleWithFixedDelay( + timestamp -> { + try { + if (countDownLatch.getCount() == countDown) { + assertThat(timestamp) + .isCloseTo(initialTimestamp, Offset.offset(period)); + } else { + assertThat(timestamp) + .isCloseTo( + lastExecutionTimeWrapper.ts + period, + Offset.offset(period)); + } + Thread.sleep(executionDelay); + lastExecutionTimeWrapper.ts = timer.getCurrentProcessingTime(); + } catch (Error e) { + System.out.println(e.getMessage()); + throw new Error(e); + } + countDownLatch.countDown(); + }, + initialDelay, + period); countDownLatch.await(); @@ -67,6 +139,10 @@ void testScheduleAtFixedRate() throws Exception { } } + private static class LastExecutionTimeWrapper { + private long ts; + } + /** * Tests that shutting down the SystemProcessingTimeService will also cancel the scheduled at * fix rate future.