diff --git a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java index 60eb942e8b..af2f3d30d1 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java +++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java @@ -53,6 +53,17 @@ public class MetricNames { public static final String KV_ALL_SNAPSHOT_SIZE = "allKvSnapshotSize"; public static final String SERVER_PHYSICAL_STORAGE_REMOTE_KV_SIZE = "remoteKvSize"; + // for lake tiering metrics - global level + public static final String LAKE_TIERING_PENDING_TABLES_COUNT = "pendingTablesCount"; + public static final String LAKE_TIERING_RUNNING_TABLES_COUNT = "runningTablesCount"; + public static final String LAKE_TIERING_FAILURES_TOTAL = "failuresTotal"; + + // for lake tiering table-level metrics + public static final String LAKE_TIERING_LAST_SUCCESS_AGE_MS = "lastSuccessAgeMs"; + public static final String LAKE_TIERING_LAST_DURATION_MS = "lastDurationMs"; + public static final String LAKE_TIERING_TABLE_FAILURES_TOTAL = "failuresTotal"; + public static final String LAKE_TIERING_TABLE_STATE = "state"; + // -------------------------------------------------------------------------------------------- // metrics for tablet server // -------------------------------------------------------------------------------------------- diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java index d0b81b3be9..0165ba1eb0 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java @@ -22,13 +22,19 @@ import org.apache.fluss.lake.committer.CommitterInitContext; import org.apache.fluss.lake.committer.LakeCommitter; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.utils.types.Tuple2; import org.apache.paimon.CoreOptions; +import org.apache.paimon.FileStore; import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ManifestFile; +import org.apache.paimon.manifest.ManifestFileMeta; +import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.manifest.SimpleFileEntry; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitCallback; @@ -92,6 +98,14 @@ public long commit(PaimonCommittable committable, Map snapshotPr tableCommit = fileStoreTable.newCommit(FLUSS_LAKE_TIERING_COMMIT_USER); tableCommit.commit(manifestCommittable); + Snapshot snapshot = fileStoreTable.snapshotManager().latestSnapshot(); + if (snapshot != null) { + Tuple2 info = manifestListInfo(fileStoreTable.store(), snapshot); + System.out.printf( + "Committed snapshot %d with %d files and %d bytes.%n", + snapshot.id(), info.f0, info.f1); + } + Long commitSnapshotId = currentCommitSnapshotId.get(); currentCommitSnapshotId.remove(); @@ -105,6 +119,27 @@ public long commit(PaimonCommittable committable, Map snapshotPr } } + private Tuple2 manifestListInfo(FileStore store, Snapshot snapshot) { + ManifestList manifestList = store.manifestListFactory().create(); + ManifestFile manifestFile = store.manifestFileFactory().create(); + List manifestFileMetas = manifestList.readDataManifests(snapshot); + int fileCount = 0; + long fileSize = 0; + for (ManifestFileMeta manifestFileMeta : manifestFileMetas) { + List manifestEntries = manifestFile.read(manifestFileMeta.fileName()); + for (ManifestEntry entry : manifestEntries) { + if (entry.kind() == FileKind.ADD) { + fileSize += entry.file().fileSize(); + fileCount++; + } else { + fileSize -= entry.file().fileSize(); + fileCount--; + } + } + } + return Tuple2.of(fileCount, fileSize); + } + @Override public void abort(PaimonCommittable committable) throws IOException { tableCommit = fileStoreTable.newCommit(FLUSS_LAKE_TIERING_COMMIT_USER); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java index 76a3fbd68c..c28a3683f2 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java @@ -38,6 +38,7 @@ import org.apache.fluss.server.metadata.ServerMetadataCache; import org.apache.fluss.server.metrics.ServerMetricUtils; import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup; +import org.apache.fluss.server.metrics.group.LakeTieringMetricGroup; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperUtils; import org.apache.fluss.server.zk.data.CoordinatorAddress; @@ -188,7 +189,9 @@ protected void startServices() throws Exception { authorizer.startup(); } - this.lakeTableTieringManager = new LakeTableTieringManager(); + this.lakeTableTieringManager = + new LakeTableTieringManager( + new LakeTieringMetricGroup(metricRegistry, serverMetricGroup)); MetadataManager metadataManager = new MetadataManager(zkClient, conf, lakeCatalogDynamicLoader); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java index c25c779691..dcc1eaf69a 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java @@ -23,7 +23,10 @@ import org.apache.fluss.exception.TableNotExistException; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.metrics.MetricNames; +import org.apache.fluss.metrics.groups.MetricGroup; import org.apache.fluss.server.entity.LakeTieringTableInfo; +import org.apache.fluss.server.metrics.group.LakeTieringMetricGroup; import org.apache.fluss.server.utils.timer.DefaultTimer; import org.apache.fluss.server.utils.timer.Timer; import org.apache.fluss.server.utils.timer.TimerTask; @@ -126,6 +129,12 @@ public class LakeTableTieringManager implements AutoCloseable { // table_id -> the last timestamp of tiered lake snapshot private final Map tableLastTieredTime; + // table_id -> the last tiering duration in milliseconds + private final Map tableLastTieringDuration; + + // table_id -> the tiering failure count + private final Map tableTieringFailureCount; + // the live tables that are tiering, // from table_id -> last heartbeat time by the tiering service private final Map liveTieringTableIds; @@ -133,21 +142,28 @@ public class LakeTableTieringManager implements AutoCloseable { // table_id -> delayed tiering task private final Map delayedTieringByTableId; + // global tiering failure count + private long globalTieringFailureCount = 0; + private final Lock lock = new ReentrantLock(); - public LakeTableTieringManager() { + private final LakeTieringMetricGroup lakeTieringMetricGroup; + + public LakeTableTieringManager(LakeTieringMetricGroup lakeTieringMetricGroup) { this( new DefaultTimer("delay lake tiering", 1_000, 20), Executors.newSingleThreadScheduledExecutor( new ExecutorThreadFactory("fluss-lake-tiering-timeout-checker")), - SystemClock.getInstance()); + SystemClock.getInstance(), + lakeTieringMetricGroup); } @VisibleForTesting protected LakeTableTieringManager( Timer lakeTieringScheduleTimer, ScheduledExecutorService lakeTieringServiceTimeoutChecker, - Clock clock) { + Clock clock, + LakeTieringMetricGroup lakeTieringMetricGroup) { this.lakeTieringScheduleTimer = lakeTieringScheduleTimer; this.lakeTieringServiceTimeoutChecker = lakeTieringServiceTimeoutChecker; this.clock = clock; @@ -163,6 +179,19 @@ protected LakeTableTieringManager( this.tableTierEpoch = new HashMap<>(); this.tableLastTieredTime = new HashMap<>(); this.delayedTieringByTableId = new HashMap<>(); + this.tableLastTieringDuration = new HashMap<>(); + this.tableTieringFailureCount = new HashMap<>(); + this.lakeTieringMetricGroup = lakeTieringMetricGroup; + registerMetrics(); + } + + private void registerMetrics() { + lakeTieringMetricGroup.gauge( + MetricNames.LAKE_TIERING_PENDING_TABLES_COUNT, pendingTieringTables::size); + lakeTieringMetricGroup.gauge( + MetricNames.LAKE_TIERING_RUNNING_TABLES_COUNT, liveTieringTableIds::size); + lakeTieringMetricGroup.gauge( + MetricNames.LAKE_TIERING_FAILURES_TOTAL, () -> globalTieringFailureCount); } public void initWithLakeTables(List> tableInfoWithTieredTime) { @@ -200,6 +229,11 @@ private void registerLakeTable(TableInfo tableInfo, long lastTieredTime) { tableId, tableInfo.getTableConfig().getDataLakeFreshness().toMillis()); tableLastTieredTime.put(tableId, lastTieredTime); tableTierEpoch.put(tableId, 0L); + tableLastTieringDuration.put(tableId, 0L); + tableTieringFailureCount.put(tableId, 0L); + + // register table-level metrics + registerTableMetrics(tableId, tableInfo.getTablePath()); } private void scheduleTableTiering(long tableId) { @@ -222,6 +256,41 @@ private void scheduleTableTiering(long tableId) { lakeTieringScheduleTimer.add(delayedTiering); } + private void registerTableMetrics(long tableId, TablePath tablePath) { + // create table-level metric group + MetricGroup tableMetricGroup = + lakeTieringMetricGroup.addGroup( + "table", tablePath.getDatabaseName() + "." + tablePath.getTableName()); + + // lastSuccessAgeMs: milliseconds since last successful tiering + tableMetricGroup.gauge( + MetricNames.LAKE_TIERING_LAST_SUCCESS_AGE_MS, + () -> { + Long lastTiered = tableLastTieredTime.get(tableId); + return lastTiered != null ? clock.milliseconds() - lastTiered : -1L; + }); + + // lastDurationMs: duration of last tiering job + tableMetricGroup.gauge( + MetricNames.LAKE_TIERING_LAST_DURATION_MS, + () -> tableLastTieringDuration.getOrDefault(tableId, 0L)); + + // failuresTotal: total failure count for this table + tableMetricGroup.gauge( + MetricNames.LAKE_TIERING_TABLE_FAILURES_TOTAL, + () -> tableTieringFailureCount.getOrDefault(tableId, 0L)); + + // state: current tiering state represented as ordinal value + // (0=New, 1=Initialized, 2=Scheduled, 3=Pending, 4=Tiering, 5=Tiered, 6=Failed, -1=table + // not exist) + tableMetricGroup.gauge( + MetricNames.LAKE_TIERING_TABLE_STATE, + () -> { + TieringState state = tieringStates.get(tableId); + return state != null ? state.ordinal() : -1; + }); + } + public void removeLakeTable(long tableId) { inLock( lock, @@ -229,6 +298,8 @@ public void removeLakeTable(long tableId) { tablePaths.remove(tableId); tableLakeFreshness.remove(tableId); tableLastTieredTime.remove(tableId); + tableLastTieringDuration.remove(tableId); + tableTieringFailureCount.remove(tableId); tieringStates.remove(tableId); liveTieringTableIds.remove(tableId); tableTierEpoch.remove(tableId); @@ -450,9 +521,17 @@ private void doHandleStateChange(long tableId, TieringState targetState) { break; case Tiered: tableLastTieredTime.put(tableId, clock.milliseconds()); - liveTieringTableIds.remove(tableId); + // calculate and record tiering duration + Long startTime = liveTieringTableIds.remove(tableId); + if (startTime != null) { + long duration = clock.milliseconds() - startTime; + tableLastTieringDuration.put(tableId, duration); + } break; case Failed: + // increment failure counters + tableTieringFailureCount.computeIfPresent(tableId, (t, v) -> v + 1); + globalTieringFailureCount++; liveTieringTableIds.remove(tableId); // do nothing break; @@ -541,7 +620,8 @@ private void advanceClock() throws InterruptedException { } } - private enum TieringState { + @VisibleForTesting + enum TieringState { // When a new lake table is created, the state will be New New { @Override @@ -598,4 +678,43 @@ public Set validPreviousStates() { abstract Set validPreviousStates(); } + + // ------------------------------------------------------------------------ + // Test-only methods + // ------------------------------------------------------------------------ + + @VisibleForTesting + protected int getPendingTablesCount() { + return pendingTieringTables.size(); + } + + @VisibleForTesting + protected int getRunningTablesCount() { + return liveTieringTableIds.size(); + } + + @VisibleForTesting + protected long getGlobalFailureCount() { + return globalTieringFailureCount; + } + + @VisibleForTesting + protected Long getTableLastSuccessTime(long tableId) { + return tableLastTieredTime.get(tableId); + } + + @VisibleForTesting + protected Long getTableLastDuration(long tableId) { + return tableLastTieringDuration.get(tableId); + } + + @VisibleForTesting + protected Long getTableFailureCount(long tableId) { + return tableTieringFailureCount.get(tableId); + } + + @VisibleForTesting + protected TieringState getTableState(long tableId) { + return tieringStates.get(tableId); + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/LakeTieringMetricGroup.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/LakeTieringMetricGroup.java new file mode 100644 index 0000000000..7066db8ce9 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/LakeTieringMetricGroup.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.metrics.group; + +import org.apache.fluss.metrics.CharacterFilter; +import org.apache.fluss.metrics.groups.AbstractMetricGroup; +import org.apache.fluss.metrics.registry.MetricRegistry; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope; + +/** Metrics for lake tiering. */ +public class LakeTieringMetricGroup extends AbstractMetricGroup { + + private static final Logger LOG = LoggerFactory.getLogger(LakeTieringMetricGroup.class); + + public LakeTieringMetricGroup(MetricRegistry registry, CoordinatorMetricGroup parent) { + super(registry, makeScope(parent), parent); + } + + @Override + protected String getGroupName(CharacterFilter filter) { + return "lakeTiering"; + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index 7a2003786c..9314901fea 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -535,7 +535,7 @@ private void registerLakeTieringMetrics() { MetricNames.LOG_LAKE_PENDING_RECORDS, () -> getLakeLogEndOffset() < 0L - ? -1 + ? getLogHighWatermark() - getLogStartOffset() : getLogHighWatermark() - getLakeLogEndOffset()); lakeTieringMetricGroup.gauge( MetricNames.LOG_LAKE_TIMESTAMP_LAG, diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java index dfb4088e49..457fd1cad3 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java @@ -190,7 +190,8 @@ void beforeEach() throws IOException { testCoordinatorChannelManager = new TestCoordinatorChannelManager(); autoPartitionManager = new AutoPartitionManager(serverMetadataCache, metadataManager, new Configuration()); - lakeTableTieringManager = new LakeTableTieringManager(); + lakeTableTieringManager = + new LakeTableTieringManager(TestingMetricGroups.LAKE_TIERING_METRICS); Configuration conf = new Configuration(); conf.setString(ConfigOptions.REMOTE_DATA_DIR, "/tmp/fluss/remote-data"); eventProcessor = buildCoordinatorEventProcessor(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java index fb0b75b18a..e68e34413e 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java @@ -25,6 +25,7 @@ import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.server.entity.LakeTieringTableInfo; +import org.apache.fluss.server.metrics.group.TestingMetricGroups; import org.apache.fluss.server.utils.timer.DefaultTimer; import org.apache.fluss.testutils.common.ManuallyTriggeredScheduledExecutorService; import org.apache.fluss.types.DataTypes; @@ -62,7 +63,8 @@ private LakeTableTieringManager createLakeTableTieringManager() { return new LakeTableTieringManager( new DefaultTimer("delay lake tiering", 1_000, 20, manualClock), lakeTieringServiceTimeoutChecker, - manualClock); + manualClock, + TestingMetricGroups.LAKE_TIERING_METRICS); } @Test @@ -238,6 +240,101 @@ void testTieringFail() { assertRequestTable(tableId1, tablePath1, 2); } + @Test + void testGlobalMetrics() throws Exception { + // Initially no tables - verify counts are 0 + assertThat(tableTieringManager.getPendingTablesCount()).isEqualTo(0); + assertThat(tableTieringManager.getRunningTablesCount()).isEqualTo(0); + assertThat(tableTieringManager.getGlobalFailureCount()).isEqualTo(0); + + // Add a table + long tableId1 = 1L; + TablePath tablePath1 = TablePath.of("db", "table1"); + TableInfo tableInfo1 = createTableInfo(tableId1, tablePath1, Duration.ofSeconds(10)); + tableTieringManager.addNewLakeTable(tableInfo1); + + // Advance time to make it pending - need to wait for timer to trigger + manualClock.advanceTime(Duration.ofSeconds(10)); + + // Wait for the delayed task to execute and move to pending + waitValue( + () -> + tableTieringManager.getPendingTablesCount() == 1 + ? Optional.of(1) + : Optional.empty(), + Duration.ofSeconds(5), + "Table should be in pending state"); + + assertThat(tableTieringManager.getPendingTablesCount()).isEqualTo(1); + assertThat(tableTieringManager.getRunningTablesCount()).isEqualTo(0); + + // Request table - should transition to tiering + assertRequestTable(tableId1, tablePath1, 1); + assertThat(tableTieringManager.getPendingTablesCount()).isEqualTo(0); + assertThat(tableTieringManager.getRunningTablesCount()).isEqualTo(1); + + // Report failure + tableTieringManager.reportTieringFail(tableId1, 1); + assertThat(tableTieringManager.getGlobalFailureCount()).isEqualTo(1); + assertThat(tableTieringManager.getRunningTablesCount()).isEqualTo(0); + assertThat(tableTieringManager.getPendingTablesCount()).isEqualTo(1); // back to pending + } + + @Test + void testTableLevelMetrics() { + long tableId1 = 1L; + TablePath tablePath1 = TablePath.of("db", "table1"); + TableInfo tableInfo1 = createTableInfo(tableId1, tablePath1, Duration.ofSeconds(10)); + tableTieringManager.addNewLakeTable(tableInfo1); + + // Initially, table is just created, lastSuccessTime is the current time + Long initialTime = tableTieringManager.getTableLastSuccessTime(tableId1); + assertThat(initialTime).isNotNull(); + assertThat(initialTime).isEqualTo(manualClock.milliseconds()); + + // Advance time and request table + long startTime = manualClock.milliseconds(); + manualClock.advanceTime(Duration.ofSeconds(10)); + assertRequestTable(tableId1, tablePath1, 1); + + // State should be Tiering (4) + assertThat(tableTieringManager.getTableState(tableId1)) + .isEqualTo(LakeTableTieringManager.TieringState.Tiering); + + // Simulate tiering duration + manualClock.advanceTime(Duration.ofSeconds(5)); + tableTieringManager.finishTableTiering(tableId1, 1); + + // lastDurationMs should be around 5000ms + assertThat(tableTieringManager.getTableLastDuration(tableId1)).isEqualTo(5000L); + + // lastSuccessTime should be just now + assertThat(tableTieringManager.getTableLastSuccessTime(tableId1)) + .isEqualTo(manualClock.milliseconds()); + + // State should be Scheduled (2) after finish + assertThat(tableTieringManager.getTableState(tableId1)) + .isEqualTo(LakeTableTieringManager.TieringState.Scheduled); + + // Advance time to make lastSuccessAge increase + manualClock.advanceTime(Duration.ofSeconds(3)); + long lastSuccessAge = + manualClock.milliseconds() - tableTieringManager.getTableLastSuccessTime(tableId1); + assertThat(lastSuccessAge).isEqualTo(3000L); + + // Request again and report failure + manualClock.advanceTime(Duration.ofSeconds(7)); + assertRequestTable(tableId1, tablePath1, 2); + tableTieringManager.reportTieringFail(tableId1, 2); + + // Failures should increment + assertThat(tableTieringManager.getTableFailureCount(tableId1)).isEqualTo(1L); + + // State should be Pending (3) after failure + assertThat(tableTieringManager.getTableState(tableId1)) + .isEqualTo(LakeTableTieringManager.TieringState.Pending); + } + private TableInfo createTableInfo(long tableId, TablePath tablePath, Duration freshness) { TableDescriptor tableDescriptor = TableDescriptor.builder() diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java index fc36e3034f..5c2c58ee33 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java @@ -78,7 +78,8 @@ void beforeEach() { testCoordinatorChannelManager = new TestCoordinatorChannelManager(); autoPartitionManager = new AutoPartitionManager(serverMetadataCache, metadataManager, new Configuration()); - lakeTableTieringManager = new LakeTableTieringManager(); + lakeTableTieringManager = + new LakeTableTieringManager(TestingMetricGroups.LAKE_TIERING_METRICS); CoordinatorEventProcessor eventProcessor = buildCoordinatorEventProcessor(); rebalanceManager = new RebalanceManager(eventProcessor, zookeeperClient); rebalanceManager.startup(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java index bafb477c54..6d2d17b231 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java @@ -118,7 +118,8 @@ void beforeEach() throws IOException { new Configuration(), new LakeCatalogDynamicLoader(new Configuration(), null, true)), new Configuration()); - lakeTableTieringManager = new LakeTableTieringManager(); + lakeTableTieringManager = + new LakeTableTieringManager(TestingMetricGroups.LAKE_TIERING_METRICS); } @Test diff --git a/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java b/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java index b2e93f2bbc..3bc6158df1 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java @@ -34,6 +34,9 @@ public class TestingMetricGroups { public static final CoordinatorMetricGroup COORDINATOR_METRICS = new CoordinatorMetricGroup(NOPMetricRegistry.INSTANCE, "cluster1", "host", "0"); + public static final LakeTieringMetricGroup LAKE_TIERING_METRICS = + new LakeTieringMetricGroup(NOPMetricRegistry.INSTANCE, COORDINATOR_METRICS); + public static final TableMetricGroup TABLE_METRICS = new TableMetricGroup( NOPMetricRegistry.INSTANCE, diff --git a/website/docs/maintenance/observability/monitor-metrics.md b/website/docs/maintenance/observability/monitor-metrics.md index 65109551c2..48e8ba62d8 100644 --- a/website/docs/maintenance/observability/monitor-metrics.md +++ b/website/docs/maintenance/observability/monitor-metrics.md @@ -294,7 +294,7 @@ Some metrics might not be exposed when using other JVM implementations (e.g. IBM - coordinator + coordinator - activeCoordinatorCount The number of active CoordinatorServer in this cluster. @@ -363,6 +363,43 @@ Some metrics might not be exposed when using other JVM implementations (e.g. IBM all kv snapshot size of each table bucket. Gauge + + lakeTiering + pendingTablesCount + The number of tables waiting to be tiered. + Gauge + + + runningTablesCount + The number of tables currently being tiered. + Gauge + + + failuresTotal + The total number of tiering failures across all tables. + Gauge + + + lakeTiering_table + lastSuccessAgeMs + Time in milliseconds since the last successful tiering operation for this table. For newly created tables that have never been tiered, this represents the time since table creation. + Gauge + + + lastDurationMs + Duration in milliseconds of the last tiering operation for this table. + Gauge + + + failuresTotal + The total number of tiering failures for this table. + Gauge + + + state + Current tiering state of the table: 0=New, 1=Initialized, 2=Scheduled, 3=Pending, 4=Tiering, 5=Tiered, 6=Failed. Returns -1 if the table is not in the tiering system. + Gauge +