Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public ScheduledFuture<?> scheduleWithFixedDelay(
private ScheduledFuture<?> scheduleRepeatedly(
ProcessingTimeCallback callback, long initialDelay, long period, boolean fixedDelay) {
final long nextTimestamp = getCurrentProcessingTime() + initialDelay;
final Runnable task = wrapOnTimerCallback(callback, nextTimestamp, period);
final Runnable task = wrapOnTimerCallback(callback, nextTimestamp, period, fixedDelay);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please could you add a test for this change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am checking the logic on the first time - I would expect the initial delay to take effect, then subsequent time the fixed delay. As the code is written the first time we get the initial plus the fixed delay, can you confirm this is what you want?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial delay is accounted when calling scheduleRepeatedly, where nextTimestamp is initialized to getCurrentProcessingTime() + initialDelay

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi David, I've added UTs to check the timestamps for both scheduleAtFixedRate& scheduleWithFixedDelay.


// we directly try to register the timer and only react to the status on exception
// that way we save unnecessary volatile accesses for each timer
Expand Down Expand Up @@ -281,12 +281,13 @@ interface ExceptionHandler {
}

private Runnable wrapOnTimerCallback(ProcessingTimeCallback callback, long timestamp) {
return new ScheduledTask(status, exceptionHandler, callback, timestamp, 0);
return new ScheduledTask(status, exceptionHandler, callback, timestamp, 0, false);
}

private Runnable wrapOnTimerCallback(
ProcessingTimeCallback callback, long nextTimestamp, long period) {
return new ScheduledTask(status, exceptionHandler, callback, nextTimestamp, period);
ProcessingTimeCallback callback, long nextTimestamp, long period, boolean fixedDelay) {
return new ScheduledTask(
status, exceptionHandler, callback, nextTimestamp, period, fixedDelay);
}

private static final class ScheduledTask implements Runnable {
Expand All @@ -296,18 +297,21 @@ private static final class ScheduledTask implements Runnable {

private long nextTimestamp;
private final long period;
private final boolean fixedDelay;

ScheduledTask(
AtomicInteger serviceStatus,
ExceptionHandler exceptionHandler,
ProcessingTimeCallback callback,
long timestamp,
long period) {
long period,
boolean fixedDelay) {
this.serviceStatus = serviceStatus;
this.exceptionHandler = exceptionHandler;
this.callback = callback;
this.nextTimestamp = timestamp;
this.period = period;
this.fixedDelay = fixedDelay;
}

@Override
Expand All @@ -320,7 +324,8 @@ public void run() {
} catch (Exception ex) {
exceptionHandler.handleException(ex);
}
nextTimestamp += period;
nextTimestamp =
fixedDelay ? System.currentTimeMillis() + period : nextTimestamp + period;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.util.Preconditions;

import org.assertj.core.data.Offset;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

Expand All @@ -43,21 +44,92 @@ class SystemProcessingTimeServiceTest {

/**
* Tests that SystemProcessingTimeService#scheduleAtFixedRate is actually triggered multiple
* times.
* times with the expected scheduled timestamps.
*/
@Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
@Test
void testScheduleAtFixedRate() throws Exception {
final AtomicReference<Throwable> errorRef = new AtomicReference<>();
final long initialDelay = 10L;
final long period = 10L;
final long executionDelay = 10L;
final int countDown = 3;

final SystemProcessingTimeService timer = createSystemProcessingTimeService(errorRef);

final CountDownLatch countDownLatch = new CountDownLatch(countDown);

try {
timer.scheduleAtFixedRate(timestamp -> countDownLatch.countDown(), 0L, period);
final long initialTimestamp = timer.getCurrentProcessingTime() + initialDelay;
timer.scheduleAtFixedRate(
timestamp -> {
try {
long executionTimes = countDown - countDownLatch.getCount();
assertThat(timestamp)
.isCloseTo(
initialTimestamp + executionTimes * period,
Offset.offset(period));
Thread.sleep(executionDelay);
} catch (Error e) {
System.out.println(e.getMessage());
throw new Error(e);
}
countDownLatch.countDown();
},
initialDelay,
period);

countDownLatch.await();

assertThat(errorRef.get()).isNull();
} finally {
timer.shutdownService();
}
}

/**
* Tests that SystemProcessingTimeService#testScheduleAtFixedDelay is actually triggered
* multiple times with the expected scheduled timestamps.
*/
@Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
@Test
void testScheduleAtFixedDelay() throws Exception {
final AtomicReference<Throwable> errorRef = new AtomicReference<>();
final long initialDelay = 10L;
final long period = 10L;
final long executionDelay = 10L;
final int countDown = 3;

final SystemProcessingTimeService timer = createSystemProcessingTimeService(errorRef);

final CountDownLatch countDownLatch = new CountDownLatch(countDown);

final LastExecutionTimeWrapper lastExecutionTimeWrapper = new LastExecutionTimeWrapper();

try {
final long initialTimestamp = timer.getCurrentProcessingTime() + initialDelay;
timer.scheduleWithFixedDelay(
timestamp -> {
try {
if (countDownLatch.getCount() == countDown) {
assertThat(timestamp)
.isCloseTo(initialTimestamp, Offset.offset(period));
} else {
assertThat(timestamp)
.isCloseTo(
lastExecutionTimeWrapper.ts + period,
Offset.offset(period));
}
Thread.sleep(executionDelay);
lastExecutionTimeWrapper.ts = timer.getCurrentProcessingTime();
} catch (Error e) {
System.out.println(e.getMessage());
throw new Error(e);
}
countDownLatch.countDown();
},
initialDelay,
period);

countDownLatch.await();

Expand All @@ -67,6 +139,10 @@ void testScheduleAtFixedRate() throws Exception {
}
}

private static class LastExecutionTimeWrapper {
private long ts;
}

/**
* Tests that shutting down the SystemProcessingTimeService will also cancel the scheduled at
* fix rate future.
Expand Down