Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ public class MetricNames {
public static final String KV_ALL_SNAPSHOT_SIZE = "allKvSnapshotSize";
public static final String SERVER_PHYSICAL_STORAGE_REMOTE_KV_SIZE = "remoteKvSize";

// for lake tiering metrics - global level
public static final String LAKE_TIERING_PENDING_TABLES_COUNT = "pendingTablesCount";
public static final String LAKE_TIERING_RUNNING_TABLES_COUNT = "runningTablesCount";
public static final String LAKE_TIERING_FAILURES_TOTAL = "failuresTotal";

// for lake tiering table-level metrics
public static final String LAKE_TIERING_LAST_SUCCESS_AGE_MS = "lastSuccessAgeMs";
public static final String LAKE_TIERING_LAST_DURATION_MS = "lastDurationMs";
public static final String LAKE_TIERING_TABLE_FAILURES_TOTAL = "failuresTotal";
public static final String LAKE_TIERING_TABLE_STATE = "state";

// --------------------------------------------------------------------------------------------
// metrics for tablet server
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,19 @@
import org.apache.fluss.lake.committer.CommitterInitContext;
import org.apache.fluss.lake.committer.LakeCommitter;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.utils.types.Tuple2;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.FileStore;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.manifest.SimpleFileEntry;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitCallback;
Expand Down Expand Up @@ -92,6 +98,14 @@ public long commit(PaimonCommittable committable, Map<String, String> snapshotPr
tableCommit = fileStoreTable.newCommit(FLUSS_LAKE_TIERING_COMMIT_USER);
tableCommit.commit(manifestCommittable);

Snapshot snapshot = fileStoreTable.snapshotManager().latestSnapshot();
if (snapshot != null) {
Tuple2<Integer, Long> info = manifestListInfo(fileStoreTable.store(), snapshot);
System.out.printf(
"Committed snapshot %d with %d files and %d bytes.%n",
snapshot.id(), info.f0, info.f1);
}

Long commitSnapshotId = currentCommitSnapshotId.get();
currentCommitSnapshotId.remove();

Expand All @@ -105,6 +119,27 @@ public long commit(PaimonCommittable committable, Map<String, String> snapshotPr
}
}

private Tuple2<Integer, Long> manifestListInfo(FileStore<?> store, Snapshot snapshot) {
ManifestList manifestList = store.manifestListFactory().create();
ManifestFile manifestFile = store.manifestFileFactory().create();
List<ManifestFileMeta> manifestFileMetas = manifestList.readDataManifests(snapshot);
int fileCount = 0;
long fileSize = 0;
for (ManifestFileMeta manifestFileMeta : manifestFileMetas) {
List<ManifestEntry> manifestEntries = manifestFile.read(manifestFileMeta.fileName());
for (ManifestEntry entry : manifestEntries) {
if (entry.kind() == FileKind.ADD) {
fileSize += entry.file().fileSize();
fileCount++;
} else {
fileSize -= entry.file().fileSize();
fileCount--;
}
}
}
return Tuple2.of(fileCount, fileSize);
}

@Override
public void abort(PaimonCommittable committable) throws IOException {
tableCommit = fileStoreTable.newCommit(FLUSS_LAKE_TIERING_COMMIT_USER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.fluss.server.metadata.ServerMetadataCache;
import org.apache.fluss.server.metrics.ServerMetricUtils;
import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
import org.apache.fluss.server.metrics.group.LakeTieringMetricGroup;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.ZooKeeperUtils;
import org.apache.fluss.server.zk.data.CoordinatorAddress;
Expand Down Expand Up @@ -188,7 +189,9 @@ protected void startServices() throws Exception {
authorizer.startup();
}

this.lakeTableTieringManager = new LakeTableTieringManager();
this.lakeTableTieringManager =
new LakeTableTieringManager(
new LakeTieringMetricGroup(metricRegistry, serverMetricGroup));

MetadataManager metadataManager =
new MetadataManager(zkClient, conf, lakeCatalogDynamicLoader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.metrics.MetricNames;
import org.apache.fluss.metrics.groups.MetricGroup;
import org.apache.fluss.server.entity.LakeTieringTableInfo;
import org.apache.fluss.server.metrics.group.LakeTieringMetricGroup;
import org.apache.fluss.server.utils.timer.DefaultTimer;
import org.apache.fluss.server.utils.timer.Timer;
import org.apache.fluss.server.utils.timer.TimerTask;
Expand Down Expand Up @@ -126,28 +129,41 @@ public class LakeTableTieringManager implements AutoCloseable {
// table_id -> the last timestamp of tiered lake snapshot
private final Map<Long, Long> tableLastTieredTime;

// table_id -> the last tiering duration in milliseconds
private final Map<Long, Long> tableLastTieringDuration;

// table_id -> the tiering failure count
private final Map<Long, Long> tableTieringFailureCount;

// the live tables that are tiering,
// from table_id -> last heartbeat time by the tiering service
private final Map<Long, Long> liveTieringTableIds;

// table_id -> delayed tiering task
private final Map<Long, DelayedTiering> delayedTieringByTableId;

// global tiering failure count
private long globalTieringFailureCount = 0;

private final Lock lock = new ReentrantLock();

public LakeTableTieringManager() {
private final LakeTieringMetricGroup lakeTieringMetricGroup;

public LakeTableTieringManager(LakeTieringMetricGroup lakeTieringMetricGroup) {
this(
new DefaultTimer("delay lake tiering", 1_000, 20),
Executors.newSingleThreadScheduledExecutor(
new ExecutorThreadFactory("fluss-lake-tiering-timeout-checker")),
SystemClock.getInstance());
SystemClock.getInstance(),
lakeTieringMetricGroup);
}

@VisibleForTesting
protected LakeTableTieringManager(
Timer lakeTieringScheduleTimer,
ScheduledExecutorService lakeTieringServiceTimeoutChecker,
Clock clock) {
Clock clock,
LakeTieringMetricGroup lakeTieringMetricGroup) {
this.lakeTieringScheduleTimer = lakeTieringScheduleTimer;
this.lakeTieringServiceTimeoutChecker = lakeTieringServiceTimeoutChecker;
this.clock = clock;
Expand All @@ -163,6 +179,19 @@ protected LakeTableTieringManager(
this.tableTierEpoch = new HashMap<>();
this.tableLastTieredTime = new HashMap<>();
this.delayedTieringByTableId = new HashMap<>();
this.tableLastTieringDuration = new HashMap<>();
this.tableTieringFailureCount = new HashMap<>();
this.lakeTieringMetricGroup = lakeTieringMetricGroup;
registerMetrics();
}

private void registerMetrics() {
lakeTieringMetricGroup.gauge(
MetricNames.LAKE_TIERING_PENDING_TABLES_COUNT, pendingTieringTables::size);
lakeTieringMetricGroup.gauge(
MetricNames.LAKE_TIERING_RUNNING_TABLES_COUNT, liveTieringTableIds::size);
lakeTieringMetricGroup.gauge(
MetricNames.LAKE_TIERING_FAILURES_TOTAL, () -> globalTieringFailureCount);
}

public void initWithLakeTables(List<Tuple2<TableInfo, Long>> tableInfoWithTieredTime) {
Expand Down Expand Up @@ -200,6 +229,11 @@ private void registerLakeTable(TableInfo tableInfo, long lastTieredTime) {
tableId, tableInfo.getTableConfig().getDataLakeFreshness().toMillis());
tableLastTieredTime.put(tableId, lastTieredTime);
tableTierEpoch.put(tableId, 0L);
tableLastTieringDuration.put(tableId, 0L);
tableTieringFailureCount.put(tableId, 0L);

// register table-level metrics
registerTableMetrics(tableId, tableInfo.getTablePath());
}

private void scheduleTableTiering(long tableId) {
Expand All @@ -222,13 +256,50 @@ private void scheduleTableTiering(long tableId) {
lakeTieringScheduleTimer.add(delayedTiering);
}

private void registerTableMetrics(long tableId, TablePath tablePath) {
// create table-level metric group
MetricGroup tableMetricGroup =
lakeTieringMetricGroup.addGroup(
"table", tablePath.getDatabaseName() + "." + tablePath.getTableName());

// lastSuccessAgeMs: milliseconds since last successful tiering
tableMetricGroup.gauge(
MetricNames.LAKE_TIERING_LAST_SUCCESS_AGE_MS,
() -> {
Long lastTiered = tableLastTieredTime.get(tableId);
return lastTiered != null ? clock.milliseconds() - lastTiered : -1L;
});

// lastDurationMs: duration of last tiering job
tableMetricGroup.gauge(
MetricNames.LAKE_TIERING_LAST_DURATION_MS,
() -> tableLastTieringDuration.getOrDefault(tableId, 0L));

// failuresTotal: total failure count for this table
tableMetricGroup.gauge(
MetricNames.LAKE_TIERING_TABLE_FAILURES_TOTAL,
() -> tableTieringFailureCount.getOrDefault(tableId, 0L));

// state: current tiering state represented as ordinal value
// (0=New, 1=Initialized, 2=Scheduled, 3=Pending, 4=Tiering, 5=Tiered, 6=Failed, -1=table
// not exist)
tableMetricGroup.gauge(
MetricNames.LAKE_TIERING_TABLE_STATE,
() -> {
TieringState state = tieringStates.get(tableId);
return state != null ? state.ordinal() : -1;
});
}

public void removeLakeTable(long tableId) {
inLock(
lock,
() -> {
tablePaths.remove(tableId);
tableLakeFreshness.remove(tableId);
tableLastTieredTime.remove(tableId);
tableLastTieringDuration.remove(tableId);
tableTieringFailureCount.remove(tableId);
tieringStates.remove(tableId);
liveTieringTableIds.remove(tableId);
tableTierEpoch.remove(tableId);
Expand Down Expand Up @@ -450,9 +521,17 @@ private void doHandleStateChange(long tableId, TieringState targetState) {
break;
case Tiered:
tableLastTieredTime.put(tableId, clock.milliseconds());
liveTieringTableIds.remove(tableId);
// calculate and record tiering duration
Long startTime = liveTieringTableIds.remove(tableId);
if (startTime != null) {
long duration = clock.milliseconds() - startTime;
tableLastTieringDuration.put(tableId, duration);
}
break;
case Failed:
// increment failure counters
tableTieringFailureCount.computeIfPresent(tableId, (t, v) -> v + 1);
globalTieringFailureCount++;
liveTieringTableIds.remove(tableId);
// do nothing
break;
Expand Down Expand Up @@ -541,7 +620,8 @@ private void advanceClock() throws InterruptedException {
}
}

private enum TieringState {
@VisibleForTesting
enum TieringState {
// When a new lake table is created, the state will be New
New {
@Override
Expand Down Expand Up @@ -598,4 +678,43 @@ public Set<TieringState> validPreviousStates() {

abstract Set<TieringState> validPreviousStates();
}

// ------------------------------------------------------------------------
// Test-only methods
// ------------------------------------------------------------------------

@VisibleForTesting
protected int getPendingTablesCount() {
return pendingTieringTables.size();
}

@VisibleForTesting
protected int getRunningTablesCount() {
return liveTieringTableIds.size();
}

@VisibleForTesting
protected long getGlobalFailureCount() {
return globalTieringFailureCount;
}

@VisibleForTesting
protected Long getTableLastSuccessTime(long tableId) {
return tableLastTieredTime.get(tableId);
}

@VisibleForTesting
protected Long getTableLastDuration(long tableId) {
return tableLastTieringDuration.get(tableId);
}

@VisibleForTesting
protected Long getTableFailureCount(long tableId) {
return tableTieringFailureCount.get(tableId);
}

@VisibleForTesting
protected TieringState getTableState(long tableId) {
return tieringStates.get(tableId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.server.metrics.group;

import org.apache.fluss.metrics.CharacterFilter;
import org.apache.fluss.metrics.groups.AbstractMetricGroup;
import org.apache.fluss.metrics.registry.MetricRegistry;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;

/** Metrics for lake tiering. */
public class LakeTieringMetricGroup extends AbstractMetricGroup {

private static final Logger LOG = LoggerFactory.getLogger(LakeTieringMetricGroup.class);

public LakeTieringMetricGroup(MetricRegistry registry, CoordinatorMetricGroup parent) {
super(registry, makeScope(parent), parent);
}

@Override
protected String getGroupName(CharacterFilter filter) {
return "lakeTiering";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ private void registerLakeTieringMetrics() {
MetricNames.LOG_LAKE_PENDING_RECORDS,
() ->
getLakeLogEndOffset() < 0L
? -1
? getLogHighWatermark() - getLogStartOffset()
: getLogHighWatermark() - getLakeLogEndOffset());
lakeTieringMetricGroup.gauge(
MetricNames.LOG_LAKE_TIMESTAMP_LAG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ void beforeEach() throws IOException {
testCoordinatorChannelManager = new TestCoordinatorChannelManager();
autoPartitionManager =
new AutoPartitionManager(serverMetadataCache, metadataManager, new Configuration());
lakeTableTieringManager = new LakeTableTieringManager();
lakeTableTieringManager =
new LakeTableTieringManager(TestingMetricGroups.LAKE_TIERING_METRICS);
Configuration conf = new Configuration();
conf.setString(ConfigOptions.REMOTE_DATA_DIR, "/tmp/fluss/remote-data");
eventProcessor = buildCoordinatorEventProcessor();
Expand Down
Loading
Loading