From e023ca4d7229d44a1f462aa22366e80e39a17997 Mon Sep 17 00:00:00 2001 From: "nhuan.bc" Date: Sun, 25 Jan 2026 23:58:59 +0700 Subject: [PATCH] Add a test for scan.startup.mode='latest' --- .../source/ChangelogVirtualTableITCase.java | 19 +++++++++++++++++++ .../testutils/FlinkRowAssertionsUtils.java | 5 +++++ 2 files changed, 24 insertions(+) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java index 2b9111af89..20788ce7a8 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java @@ -377,6 +377,25 @@ public void testChangelogWithScanStartupMode() throws Exception { .containsExactlyInAnyOrder( "+I[+I, 3, 1970-01-01T00:00:00.200Z, 4, v4]", "+I[+I, 4, 1970-01-01T00:00:00.200Z, 5, v5]"); + + // 3. Test scan.startup.mode='latest' - should only read new records after subscription + String optionsLatest = " /*+ OPTIONS('scan.startup.mode' = 'latest') */"; + String queryLatest = "SELECT id FROM startup_mode_test$changelog" + optionsLatest; + CloseableIterator rowIterLatest = tEnv.executeSql(queryLatest).collect(); + List latestResults = new ArrayList<>(); + for (int attempt = 0; attempt < 10; attempt++) { + // Write a new record (with id larger than 5) + int rowId = 6 + attempt; + writeRows(conn, tablePath, Arrays.asList(row(rowId, "v" + rowId)), false); + + // Try to fetch one record with a 5-second timeout + latestResults = collectRowsWithTimeout(rowIterLatest, 1, Duration.ofSeconds(5)); + if (!latestResults.isEmpty()) { + break; + } + } + int id = Integer.parseInt(latestResults.getFirst().replaceAll(".*\\[(\\d+)]", "$1")); + assertThat(id).isGreaterThan(5); } @Test diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java index 00807497d6..dcabc8e143 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java @@ -93,6 +93,11 @@ public static List collectRowsWithTimeout( return collectRowsWithTimeout(iterator, expectedCount, true); } + public static List collectRowsWithTimeout( + CloseableIterator iterator, int expectedCount, Duration maxWaitTime) { + return collectRowsWithTimeout(iterator, expectedCount, true, maxWaitTime); + } + public static List collectRowsWithTimeout( CloseableIterator iterator, int expectedCount, boolean closeIterator) { if (expectedCount < 0) {