diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java index 2b9111af89..20788ce7a8 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java @@ -377,6 +377,25 @@ public void testChangelogWithScanStartupMode() throws Exception { .containsExactlyInAnyOrder( "+I[+I, 3, 1970-01-01T00:00:00.200Z, 4, v4]", "+I[+I, 4, 1970-01-01T00:00:00.200Z, 5, v5]"); + + // 3. Test scan.startup.mode='latest' - should only read new records after subscription + String optionsLatest = " /*+ OPTIONS('scan.startup.mode' = 'latest') */"; + String queryLatest = "SELECT id FROM startup_mode_test$changelog" + optionsLatest; + CloseableIterator rowIterLatest = tEnv.executeSql(queryLatest).collect(); + List latestResults = new ArrayList<>(); + for (int attempt = 0; attempt < 10; attempt++) { + // Write a new record (with id larger than 5) + int rowId = 6 + attempt; + writeRows(conn, tablePath, Arrays.asList(row(rowId, "v" + rowId)), false); + + // Try to fetch one record with a 5-second timeout + latestResults = collectRowsWithTimeout(rowIterLatest, 1, Duration.ofSeconds(5)); + if (!latestResults.isEmpty()) { + break; + } + } + int id = Integer.parseInt(latestResults.getFirst().replaceAll(".*\\[(\\d+)]", "$1")); + assertThat(id).isGreaterThan(5); } @Test diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java index 00807497d6..dcabc8e143 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java @@ -93,6 +93,11 @@ public static List collectRowsWithTimeout( return collectRowsWithTimeout(iterator, expectedCount, true); } + public static List collectRowsWithTimeout( + CloseableIterator iterator, int expectedCount, Duration maxWaitTime) { + return collectRowsWithTimeout(iterator, expectedCount, true, maxWaitTime); + } + public static List collectRowsWithTimeout( CloseableIterator iterator, int expectedCount, boolean closeIterator) { if (expectedCount < 0) {