Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,10 @@ public static UUID uuidFromRemoteIndexCacheFileName(String fileName) {
// Remote Log Paths
// ----------------------------------------------------------------------------------------

public static FsPath remoteDataDir(Configuration conf) {
return new FsPath(conf.get(ConfigOptions.REMOTE_DATA_DIR));
}

/**
* Returns the remote root directory path for storing log files.
*
Expand Down Expand Up @@ -584,6 +588,10 @@ public static FsPath remoteKvDir(Configuration conf) {
return new FsPath(conf.get(ConfigOptions.REMOTE_DATA_DIR) + "/" + REMOTE_KV_DIR_NAME);
}

public static FsPath remoteKvDir(FsPath remoteDataDir) {
return new FsPath(remoteDataDir, REMOTE_KV_DIR_NAME);
}

/**
* Returns the remote directory path for storing kv snapshot files for a kv tablet.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
package org.apache.fluss.server.coordinator;

import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.metrics.MetricNames;
import org.apache.fluss.metrics.groups.MetricGroup;
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
Expand All @@ -30,6 +31,7 @@
import org.apache.fluss.server.kv.snapshot.ZooKeeperCompletedSnapshotHandleStore;
import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.utils.FlussPaths;
import org.apache.fluss.utils.MapUtils;

import org.slf4j.Logger;
Expand Down Expand Up @@ -66,17 +68,21 @@ public class CompletedSnapshotStoreManager {
makeZookeeperCompletedSnapshotHandleStore;
private final CoordinatorMetricGroup coordinatorMetricGroup;

private final FsPath remoteKvDir;

public CompletedSnapshotStoreManager(
int maxNumberOfSnapshotsToRetain,
Executor ioExecutor,
ZooKeeperClient zooKeeperClient,
CoordinatorMetricGroup coordinatorMetricGroup) {
CoordinatorMetricGroup coordinatorMetricGroup,
FsPath remoteKvDir) {
this(
maxNumberOfSnapshotsToRetain,
ioExecutor,
zooKeeperClient,
ZooKeeperCompletedSnapshotHandleStore::new,
coordinatorMetricGroup);
coordinatorMetricGroup,
remoteKvDir);
}

@VisibleForTesting
Expand All @@ -86,7 +92,8 @@ public CompletedSnapshotStoreManager(
ZooKeeperClient zooKeeperClient,
Function<ZooKeeperClient, CompletedSnapshotHandleStore>
makeZookeeperCompletedSnapshotHandleStore,
CoordinatorMetricGroup coordinatorMetricGroup) {
CoordinatorMetricGroup coordinatorMetricGroup,
FsPath remoteKvDir) {
checkArgument(
maxNumberOfSnapshotsToRetain > 0, "maxNumberOfSnapshotsToRetain must be positive");
this.maxNumberOfSnapshotsToRetain = maxNumberOfSnapshotsToRetain;
Expand All @@ -95,6 +102,7 @@ public CompletedSnapshotStoreManager(
this.ioExecutor = ioExecutor;
this.makeZookeeperCompletedSnapshotHandleStore = makeZookeeperCompletedSnapshotHandleStore;
this.coordinatorMetricGroup = coordinatorMetricGroup;
this.remoteKvDir = remoteKvDir;

registerMetrics();
}
Expand All @@ -121,15 +129,15 @@ private long getAllSnapshotSize(TableBucket tableBucket) {
}

public CompletedSnapshotStore getOrCreateCompletedSnapshotStore(
TablePath tablePath, TableBucket tableBucket) {
PhysicalTablePath tablePath, TableBucket tableBucket) {
return bucketCompletedSnapshotStores.computeIfAbsent(
tableBucket,
(bucket) -> {
try {
LOG.info("Creating snapshot store for table bucket {}.", bucket);
long start = System.currentTimeMillis();
CompletedSnapshotStore snapshotStore =
createCompletedSnapshotStore(tableBucket, ioExecutor);
createCompletedSnapshotStore(tablePath, tableBucket, ioExecutor);
long end = System.currentTimeMillis();
LOG.info(
"Created snapshot store for table bucket {} in {} ms.",
Expand All @@ -138,7 +146,7 @@ public CompletedSnapshotStore getOrCreateCompletedSnapshotStore(

MetricGroup bucketMetricGroup =
coordinatorMetricGroup.getTableBucketMetricGroup(
tablePath, tableBucket);
tablePath.getTablePath(), tableBucket);
if (bucketMetricGroup != null) {
LOG.info("Add bucketMetricGroup for tableBucket {}.", bucket);
bucketMetricGroup.gauge(
Expand Down Expand Up @@ -168,7 +176,8 @@ public void removeCompletedSnapshotStoreByTableBuckets(Set<TableBucket> tableBuc
}

private CompletedSnapshotStore createCompletedSnapshotStore(
TableBucket tableBucket, Executor ioExecutor) throws Exception {
PhysicalTablePath tablePath, TableBucket tableBucket, Executor ioExecutor)
throws Exception {
final CompletedSnapshotHandleStore completedSnapshotHandleStore =
this.makeZookeeperCompletedSnapshotHandleStore.apply(zooKeeperClient);

Expand All @@ -179,14 +188,18 @@ private CompletedSnapshotStore createCompletedSnapshotStore(
final int numberOfInitialSnapshots = initialSnapshots.size();

LOG.info(
"Found {} snapshots in {}.",
"Found {} snapshots in {} of table bucket {}.",
numberOfInitialSnapshots,
completedSnapshotHandleStore.getClass().getSimpleName());
completedSnapshotHandleStore.getClass().getSimpleName(),
tableBucket);

final List<CompletedSnapshot> retrievedSnapshots =
new ArrayList<>(numberOfInitialSnapshots);

LOG.info("Trying to fetch {} snapshots from storage.", numberOfInitialSnapshots);
LOG.info(
"Trying to fetch {} snapshots from storage of table bucket {}.",
numberOfInitialSnapshots,
tableBucket);

for (CompletedSnapshotHandle snapshotStateHandle : initialSnapshots) {
try {
Expand All @@ -205,28 +218,36 @@ private CompletedSnapshotStore createCompletedSnapshotStore(
tableBucket, snapshotStateHandle.getSnapshotId());
} catch (Exception t) {
LOG.error(
"Failed to remove snapshotStateHandle {}.", snapshotStateHandle, t);
"Failed to remove snapshotStateHandle {} of table bucket {}.",
snapshotStateHandle,
tableBucket,
t);
throw t;
}
} else {
LOG.error(
"Failed to retrieveCompleteSnapshot for snapshotStateHandle {}.",
"Failed to retrieveCompleteSnapshot for snapshotStateHandle {} of table bucket {}.",
snapshotStateHandle,
tableBucket,
e);
throw e;
}
}
}

FsPath remoteKvTabletDir =
FlussPaths.remoteKvTabletDir(remoteKvDir, tablePath, tableBucket);
// register all the files to shared kv file registry
SharedKvFileRegistry sharedKvFileRegistry = new SharedKvFileRegistry(ioExecutor);
SharedKvFileRegistry sharedKvFileRegistry =
new SharedKvFileRegistry(remoteKvTabletDir, ioExecutor);
for (CompletedSnapshot completedSnapshot : retrievedSnapshots) {
try {
sharedKvFileRegistry.registerAllAfterRestored(completedSnapshot);
} catch (Exception e) {
LOG.error(
"Failed to registerAllAfterRestored for completedSnapshot {}.",
"Failed to registerAllAfterRestored for completedSnapshot {} of table bucket {}.",
completedSnapshot,
tableBucket,
e);
throw e;
}
Expand All @@ -237,7 +258,8 @@ private CompletedSnapshotStore createCompletedSnapshotStore(
sharedKvFileRegistry,
retrievedSnapshots,
completedSnapshotHandleStore,
ioExecutor);
ioExecutor,
remoteKvTabletDir);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.fluss.exception.IneligibleReplicaException;
import org.apache.fluss.exception.InvalidCoordinatorException;
import org.apache.fluss.exception.InvalidUpdateVersionException;
import org.apache.fluss.exception.PartitionNotExistException;
import org.apache.fluss.exception.RebalanceFailureException;
import org.apache.fluss.exception.ServerNotExistException;
import org.apache.fluss.exception.ServerTagAlreadyExistException;
Expand Down Expand Up @@ -118,6 +119,7 @@
import org.apache.fluss.server.zk.data.lake.LakeTable;
import org.apache.fluss.server.zk.data.lake.LakeTableHelper;
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
import org.apache.fluss.utils.FlussPaths;
import org.apache.fluss.utils.types.Tuple2;

import org.slf4j.Logger;
Expand Down Expand Up @@ -234,7 +236,8 @@ public CoordinatorEventProcessor(
conf.getInt(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS),
ioExecutor,
zooKeeperClient,
coordinatorMetricGroup);
coordinatorMetricGroup,
FlussPaths.remoteKvDir(conf));
this.autoPartitionManager = autoPartitionManager;
this.lakeTableTieringManager = lakeTableTieringManager;
this.coordinatorMetricGroup = coordinatorMetricGroup;
Expand Down Expand Up @@ -1738,9 +1741,29 @@ private void tryProcessCommitKvSnapshot(
callback.completeExceptionally(e);
return;
}
// commit the kv snapshot asynchronously

TableBucket tb = event.getTableBucket();
TablePath tablePath = coordinatorContext.getTablePathById(tb.getTableId());

// get physical table path
PhysicalTablePath physicalTablePath;
if (tb.getPartitionId() == null) {
TablePath tablePath = coordinatorContext.getTablePathById(tb.getTableId());
physicalTablePath = PhysicalTablePath.of(tablePath);
} else {
Optional<PhysicalTablePath> physicalTablePathOp =
coordinatorContext.getPhysicalTablePath(tb.getPartitionId());
if (!physicalTablePathOp.isPresent()) {
callback.completeExceptionally(
new PartitionNotExistException(
"PhysicalTablePath not found for partition "
+ tb.getPartitionId()));
return;
} else {
physicalTablePath = physicalTablePathOp.get();
}
}

// commit the kv snapshot asynchronously
ioExecutor.execute(
() -> {
try {
Expand All @@ -1749,7 +1772,7 @@ private void tryProcessCommitKvSnapshot(
// add completed snapshot
CompletedSnapshotStore completedSnapshotStore =
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(
tablePath, tb);
physicalTablePath, tb);
// this involves IO operation (ZK), so we do it in ioExecutor
completedSnapshotStore.add(completedSnapshot);
coordinatorEventManager.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.fluss.fs.FileSystem;
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.utils.FlussPaths;
import org.apache.fluss.utils.concurrent.FutureUtils;

import javax.annotation.concurrent.NotThreadSafe;
Expand Down Expand Up @@ -68,31 +69,22 @@ public class CompletedSnapshot {
/** The next log offset when the snapshot is triggered. */
private final long logOffset;

/** The location where the snapshot is stored. */
private final FsPath snapshotLocation;

public static final String SNAPSHOT_DATA_NOT_EXISTS_ERROR_MESSAGE = "No such file or directory";

public CompletedSnapshot(
TableBucket tableBucket,
long snapshotID,
FsPath snapshotLocation,
KvSnapshotHandle kvSnapshotHandle,
long logOffset) {
this.tableBucket = tableBucket;
this.snapshotID = snapshotID;
this.snapshotLocation = snapshotLocation;
this.kvSnapshotHandle = kvSnapshotHandle;
this.logOffset = logOffset;
}

@VisibleForTesting
CompletedSnapshot(
TableBucket tableBucket,
long snapshotID,
FsPath snapshotLocation,
KvSnapshotHandle kvSnapshotHandle) {
this(tableBucket, snapshotID, snapshotLocation, kvSnapshotHandle, 0);
CompletedSnapshot(TableBucket tableBucket, long snapshotID, KvSnapshotHandle kvSnapshotHandle) {
this(tableBucket, snapshotID, kvSnapshotHandle, 0);
}

public long getSnapshotID() {
Expand Down Expand Up @@ -125,54 +117,43 @@ public void registerSharedKvFilesAfterRestored(SharedKvFileRegistry sharedKvFile
sharedKvFileRegistry.registerAllAfterRestored(this);
}

public CompletableFuture<Void> discardAsync(Executor ioExecutor) {
public CompletableFuture<Void> discardAsync(FsPath remoteKvTabletDir, Executor ioExecutor) {
// it'll discard the snapshot files for kv, it'll always discard
// the private files; for shared files, only if they're not be registered in
// SharedKvRegistry, can the files be deleted.
CompletableFuture<Void> discardKvFuture =
FutureUtils.runAsync(kvSnapshotHandle::discard, ioExecutor);
FutureUtils.runAsync(
() -> kvSnapshotHandle.discard(remoteKvTabletDir, snapshotID), ioExecutor);

FsPath snapshotLocation = FlussPaths.remoteKvSnapshotDir(remoteKvTabletDir, snapshotID);
CompletableFuture<Void> discardMetaFileFuture =
FutureUtils.runAsync(this::disposeMetadata, ioExecutor);
FutureUtils.runAsync(() -> disposeMetadata(snapshotLocation), ioExecutor);

return FutureUtils.runAfterwards(
FutureUtils.completeAll(Arrays.asList(discardKvFuture, discardMetaFileFuture)),
this::disposeSnapshotStorage);
() -> disposeSnapshotStorage(snapshotLocation));
}

private void disposeSnapshotStorage() throws IOException {
private void disposeSnapshotStorage(FsPath snapshotLocation) throws IOException {
if (snapshotLocation != null) {
FileSystem fileSystem = snapshotLocation.getFileSystem();
fileSystem.delete(snapshotLocation, false);
}
}

/**
* Return the metadata file path that stores all the information that describes the snapshot.
*/
public FsPath getMetadataFilePath() {
return new FsPath(snapshotLocation, SNAPSHOT_METADATA_FILE_NAME);
}

public static FsPath getMetadataFilePath(FsPath snapshotLocation) {
return new FsPath(snapshotLocation, SNAPSHOT_METADATA_FILE_NAME);
}

private void disposeMetadata() throws IOException {
FsPath metadataFilePath = getMetadataFilePath();
private void disposeMetadata(FsPath snapshotLocation) throws IOException {
FsPath metadataFilePath = getMetadataFilePath(snapshotLocation);
FileSystem fileSystem = metadataFilePath.getFileSystem();
fileSystem.delete(metadataFilePath, false);
}

public FsPath getSnapshotLocation() {
return snapshotLocation;
}

@Override
public String toString() {
return String.format(
"CompletedSnapshot %d for %s located at %s",
snapshotID, tableBucket, snapshotLocation);
return String.format("CompletedSnapshot %d for %s", snapshotID, tableBucket);
}

@Override
Expand All @@ -187,12 +168,11 @@ public boolean equals(Object o) {
return snapshotID == that.snapshotID
&& logOffset == that.logOffset
&& Objects.equals(tableBucket, that.tableBucket)
&& Objects.equals(kvSnapshotHandle, that.kvSnapshotHandle)
&& Objects.equals(snapshotLocation, that.snapshotLocation);
&& Objects.equals(kvSnapshotHandle, that.kvSnapshotHandle);
}

@Override
public int hashCode() {
return Objects.hash(tableBucket, snapshotID, kvSnapshotHandle, logOffset, snapshotLocation);
return Objects.hash(tableBucket, snapshotID, kvSnapshotHandle, logOffset);
}
}
Loading
Loading