diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java b/fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java index d04268fbaa..f158b52b02 100644 --- a/fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java +++ b/fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java @@ -405,6 +405,10 @@ public static UUID uuidFromRemoteIndexCacheFileName(String fileName) { // Remote Log Paths // ---------------------------------------------------------------------------------------- + public static FsPath remoteDataDir(Configuration conf) { + return new FsPath(conf.get(ConfigOptions.REMOTE_DATA_DIR)); + } + /** * Returns the remote root directory path for storing log files. * @@ -584,6 +588,10 @@ public static FsPath remoteKvDir(Configuration conf) { return new FsPath(conf.get(ConfigOptions.REMOTE_DATA_DIR) + "/" + REMOTE_KV_DIR_NAME); } + public static FsPath remoteKvDir(FsPath remoteDataDir) { + return new FsPath(remoteDataDir, REMOTE_KV_DIR_NAME); + } + /** * Returns the remote directory path for storing kv snapshot files for a kv tablet. * diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java index b7c92289bc..376ebc9232 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java @@ -18,8 +18,9 @@ package org.apache.fluss.server.coordinator; import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.fs.FsPath; +import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.TableBucket; -import org.apache.fluss.metadata.TablePath; import org.apache.fluss.metrics.MetricNames; import org.apache.fluss.metrics.groups.MetricGroup; import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; @@ -30,6 +31,7 @@ import org.apache.fluss.server.kv.snapshot.ZooKeeperCompletedSnapshotHandleStore; import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup; import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.utils.FlussPaths; import org.apache.fluss.utils.MapUtils; import org.slf4j.Logger; @@ -66,17 +68,21 @@ public class CompletedSnapshotStoreManager { makeZookeeperCompletedSnapshotHandleStore; private final CoordinatorMetricGroup coordinatorMetricGroup; + private final FsPath remoteKvDir; + public CompletedSnapshotStoreManager( int maxNumberOfSnapshotsToRetain, Executor ioExecutor, ZooKeeperClient zooKeeperClient, - CoordinatorMetricGroup coordinatorMetricGroup) { + CoordinatorMetricGroup coordinatorMetricGroup, + FsPath remoteKvDir) { this( maxNumberOfSnapshotsToRetain, ioExecutor, zooKeeperClient, ZooKeeperCompletedSnapshotHandleStore::new, - coordinatorMetricGroup); + coordinatorMetricGroup, + remoteKvDir); } @VisibleForTesting @@ -86,7 +92,8 @@ public CompletedSnapshotStoreManager( ZooKeeperClient zooKeeperClient, Function makeZookeeperCompletedSnapshotHandleStore, - CoordinatorMetricGroup coordinatorMetricGroup) { + CoordinatorMetricGroup coordinatorMetricGroup, + FsPath remoteKvDir) { checkArgument( maxNumberOfSnapshotsToRetain > 0, "maxNumberOfSnapshotsToRetain must be positive"); this.maxNumberOfSnapshotsToRetain = maxNumberOfSnapshotsToRetain; @@ -95,6 +102,7 @@ public CompletedSnapshotStoreManager( this.ioExecutor = ioExecutor; this.makeZookeeperCompletedSnapshotHandleStore = makeZookeeperCompletedSnapshotHandleStore; this.coordinatorMetricGroup = coordinatorMetricGroup; + this.remoteKvDir = remoteKvDir; registerMetrics(); } @@ -121,7 +129,7 @@ private long getAllSnapshotSize(TableBucket tableBucket) { } public CompletedSnapshotStore getOrCreateCompletedSnapshotStore( - TablePath tablePath, TableBucket tableBucket) { + PhysicalTablePath tablePath, TableBucket tableBucket) { return bucketCompletedSnapshotStores.computeIfAbsent( tableBucket, (bucket) -> { @@ -129,7 +137,7 @@ public CompletedSnapshotStore getOrCreateCompletedSnapshotStore( LOG.info("Creating snapshot store for table bucket {}.", bucket); long start = System.currentTimeMillis(); CompletedSnapshotStore snapshotStore = - createCompletedSnapshotStore(tableBucket, ioExecutor); + createCompletedSnapshotStore(tablePath, tableBucket, ioExecutor); long end = System.currentTimeMillis(); LOG.info( "Created snapshot store for table bucket {} in {} ms.", @@ -138,7 +146,7 @@ public CompletedSnapshotStore getOrCreateCompletedSnapshotStore( MetricGroup bucketMetricGroup = coordinatorMetricGroup.getTableBucketMetricGroup( - tablePath, tableBucket); + tablePath.getTablePath(), tableBucket); if (bucketMetricGroup != null) { LOG.info("Add bucketMetricGroup for tableBucket {}.", bucket); bucketMetricGroup.gauge( @@ -168,7 +176,8 @@ public void removeCompletedSnapshotStoreByTableBuckets(Set tableBuc } private CompletedSnapshotStore createCompletedSnapshotStore( - TableBucket tableBucket, Executor ioExecutor) throws Exception { + PhysicalTablePath tablePath, TableBucket tableBucket, Executor ioExecutor) + throws Exception { final CompletedSnapshotHandleStore completedSnapshotHandleStore = this.makeZookeeperCompletedSnapshotHandleStore.apply(zooKeeperClient); @@ -179,14 +188,18 @@ private CompletedSnapshotStore createCompletedSnapshotStore( final int numberOfInitialSnapshots = initialSnapshots.size(); LOG.info( - "Found {} snapshots in {}.", + "Found {} snapshots in {} of table bucket {}.", numberOfInitialSnapshots, - completedSnapshotHandleStore.getClass().getSimpleName()); + completedSnapshotHandleStore.getClass().getSimpleName(), + tableBucket); final List retrievedSnapshots = new ArrayList<>(numberOfInitialSnapshots); - LOG.info("Trying to fetch {} snapshots from storage.", numberOfInitialSnapshots); + LOG.info( + "Trying to fetch {} snapshots from storage of table bucket {}.", + numberOfInitialSnapshots, + tableBucket); for (CompletedSnapshotHandle snapshotStateHandle : initialSnapshots) { try { @@ -205,28 +218,36 @@ private CompletedSnapshotStore createCompletedSnapshotStore( tableBucket, snapshotStateHandle.getSnapshotId()); } catch (Exception t) { LOG.error( - "Failed to remove snapshotStateHandle {}.", snapshotStateHandle, t); + "Failed to remove snapshotStateHandle {} of table bucket {}.", + snapshotStateHandle, + tableBucket, + t); throw t; } } else { LOG.error( - "Failed to retrieveCompleteSnapshot for snapshotStateHandle {}.", + "Failed to retrieveCompleteSnapshot for snapshotStateHandle {} of table bucket {}.", snapshotStateHandle, + tableBucket, e); throw e; } } } + FsPath remoteKvTabletDir = + FlussPaths.remoteKvTabletDir(remoteKvDir, tablePath, tableBucket); // register all the files to shared kv file registry - SharedKvFileRegistry sharedKvFileRegistry = new SharedKvFileRegistry(ioExecutor); + SharedKvFileRegistry sharedKvFileRegistry = + new SharedKvFileRegistry(remoteKvTabletDir, ioExecutor); for (CompletedSnapshot completedSnapshot : retrievedSnapshots) { try { sharedKvFileRegistry.registerAllAfterRestored(completedSnapshot); } catch (Exception e) { LOG.error( - "Failed to registerAllAfterRestored for completedSnapshot {}.", + "Failed to registerAllAfterRestored for completedSnapshot {} of table bucket {}.", completedSnapshot, + tableBucket, e); throw e; } @@ -237,7 +258,8 @@ private CompletedSnapshotStore createCompletedSnapshotStore( sharedKvFileRegistry, retrievedSnapshots, completedSnapshotHandleStore, - ioExecutor); + ioExecutor, + remoteKvTabletDir); } @VisibleForTesting diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java index aa0afbbcd2..962fa2d40f 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java @@ -32,6 +32,7 @@ import org.apache.fluss.exception.IneligibleReplicaException; import org.apache.fluss.exception.InvalidCoordinatorException; import org.apache.fluss.exception.InvalidUpdateVersionException; +import org.apache.fluss.exception.PartitionNotExistException; import org.apache.fluss.exception.RebalanceFailureException; import org.apache.fluss.exception.ServerNotExistException; import org.apache.fluss.exception.ServerTagAlreadyExistException; @@ -118,6 +119,7 @@ import org.apache.fluss.server.zk.data.lake.LakeTable; import org.apache.fluss.server.zk.data.lake.LakeTableHelper; import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot; +import org.apache.fluss.utils.FlussPaths; import org.apache.fluss.utils.types.Tuple2; import org.slf4j.Logger; @@ -234,7 +236,8 @@ public CoordinatorEventProcessor( conf.getInt(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS), ioExecutor, zooKeeperClient, - coordinatorMetricGroup); + coordinatorMetricGroup, + FlussPaths.remoteKvDir(conf)); this.autoPartitionManager = autoPartitionManager; this.lakeTableTieringManager = lakeTableTieringManager; this.coordinatorMetricGroup = coordinatorMetricGroup; @@ -1738,9 +1741,29 @@ private void tryProcessCommitKvSnapshot( callback.completeExceptionally(e); return; } - // commit the kv snapshot asynchronously + TableBucket tb = event.getTableBucket(); - TablePath tablePath = coordinatorContext.getTablePathById(tb.getTableId()); + + // get physical table path + PhysicalTablePath physicalTablePath; + if (tb.getPartitionId() == null) { + TablePath tablePath = coordinatorContext.getTablePathById(tb.getTableId()); + physicalTablePath = PhysicalTablePath.of(tablePath); + } else { + Optional physicalTablePathOp = + coordinatorContext.getPhysicalTablePath(tb.getPartitionId()); + if (!physicalTablePathOp.isPresent()) { + callback.completeExceptionally( + new PartitionNotExistException( + "PhysicalTablePath not found for partition " + + tb.getPartitionId())); + return; + } else { + physicalTablePath = physicalTablePathOp.get(); + } + } + + // commit the kv snapshot asynchronously ioExecutor.execute( () -> { try { @@ -1749,7 +1772,7 @@ private void tryProcessCommitKvSnapshot( // add completed snapshot CompletedSnapshotStore completedSnapshotStore = completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore( - tablePath, tb); + physicalTablePath, tb); // this involves IO operation (ZK), so we do it in ioExecutor completedSnapshotStore.add(completedSnapshot); coordinatorEventManager.put( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshot.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshot.java index b58e1728c5..c4b7a5dc3e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshot.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshot.java @@ -21,6 +21,7 @@ import org.apache.fluss.fs.FileSystem; import org.apache.fluss.fs.FsPath; import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.utils.FlussPaths; import org.apache.fluss.utils.concurrent.FutureUtils; import javax.annotation.concurrent.NotThreadSafe; @@ -68,31 +69,22 @@ public class CompletedSnapshot { /** The next log offset when the snapshot is triggered. */ private final long logOffset; - /** The location where the snapshot is stored. */ - private final FsPath snapshotLocation; - public static final String SNAPSHOT_DATA_NOT_EXISTS_ERROR_MESSAGE = "No such file or directory"; public CompletedSnapshot( TableBucket tableBucket, long snapshotID, - FsPath snapshotLocation, KvSnapshotHandle kvSnapshotHandle, long logOffset) { this.tableBucket = tableBucket; this.snapshotID = snapshotID; - this.snapshotLocation = snapshotLocation; this.kvSnapshotHandle = kvSnapshotHandle; this.logOffset = logOffset; } @VisibleForTesting - CompletedSnapshot( - TableBucket tableBucket, - long snapshotID, - FsPath snapshotLocation, - KvSnapshotHandle kvSnapshotHandle) { - this(tableBucket, snapshotID, snapshotLocation, kvSnapshotHandle, 0); + CompletedSnapshot(TableBucket tableBucket, long snapshotID, KvSnapshotHandle kvSnapshotHandle) { + this(tableBucket, snapshotID, kvSnapshotHandle, 0); } public long getSnapshotID() { @@ -125,54 +117,43 @@ public void registerSharedKvFilesAfterRestored(SharedKvFileRegistry sharedKvFile sharedKvFileRegistry.registerAllAfterRestored(this); } - public CompletableFuture discardAsync(Executor ioExecutor) { + public CompletableFuture discardAsync(FsPath remoteKvTabletDir, Executor ioExecutor) { // it'll discard the snapshot files for kv, it'll always discard // the private files; for shared files, only if they're not be registered in // SharedKvRegistry, can the files be deleted. CompletableFuture discardKvFuture = - FutureUtils.runAsync(kvSnapshotHandle::discard, ioExecutor); + FutureUtils.runAsync( + () -> kvSnapshotHandle.discard(remoteKvTabletDir, snapshotID), ioExecutor); + FsPath snapshotLocation = FlussPaths.remoteKvSnapshotDir(remoteKvTabletDir, snapshotID); CompletableFuture discardMetaFileFuture = - FutureUtils.runAsync(this::disposeMetadata, ioExecutor); + FutureUtils.runAsync(() -> disposeMetadata(snapshotLocation), ioExecutor); return FutureUtils.runAfterwards( FutureUtils.completeAll(Arrays.asList(discardKvFuture, discardMetaFileFuture)), - this::disposeSnapshotStorage); + () -> disposeSnapshotStorage(snapshotLocation)); } - private void disposeSnapshotStorage() throws IOException { + private void disposeSnapshotStorage(FsPath snapshotLocation) throws IOException { if (snapshotLocation != null) { FileSystem fileSystem = snapshotLocation.getFileSystem(); fileSystem.delete(snapshotLocation, false); } } - /** - * Return the metadata file path that stores all the information that describes the snapshot. - */ - public FsPath getMetadataFilePath() { - return new FsPath(snapshotLocation, SNAPSHOT_METADATA_FILE_NAME); - } - public static FsPath getMetadataFilePath(FsPath snapshotLocation) { return new FsPath(snapshotLocation, SNAPSHOT_METADATA_FILE_NAME); } - private void disposeMetadata() throws IOException { - FsPath metadataFilePath = getMetadataFilePath(); + private void disposeMetadata(FsPath snapshotLocation) throws IOException { + FsPath metadataFilePath = getMetadataFilePath(snapshotLocation); FileSystem fileSystem = metadataFilePath.getFileSystem(); fileSystem.delete(metadataFilePath, false); } - public FsPath getSnapshotLocation() { - return snapshotLocation; - } - @Override public String toString() { - return String.format( - "CompletedSnapshot %d for %s located at %s", - snapshotID, tableBucket, snapshotLocation); + return String.format("CompletedSnapshot %d for %s", snapshotID, tableBucket); } @Override @@ -187,12 +168,11 @@ public boolean equals(Object o) { return snapshotID == that.snapshotID && logOffset == that.logOffset && Objects.equals(tableBucket, that.tableBucket) - && Objects.equals(kvSnapshotHandle, that.kvSnapshotHandle) - && Objects.equals(snapshotLocation, that.snapshotLocation); + && Objects.equals(kvSnapshotHandle, that.kvSnapshotHandle); } @Override public int hashCode() { - return Objects.hash(tableBucket, snapshotID, kvSnapshotHandle, logOffset, snapshotLocation); + return Objects.hash(tableBucket, snapshotID, kvSnapshotHandle, logOffset); } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotJsonSerde.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotJsonSerde.java index 6b79966dae..54dd87240c 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotJsonSerde.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotJsonSerde.java @@ -35,7 +35,8 @@ public class CompletedSnapshotJsonSerde public static final CompletedSnapshotJsonSerde INSTANCE = new CompletedSnapshotJsonSerde(); - private static final int VERSION = 1; + // Version 2: use relative paths + private static final int VERSION = 2; private static final String VERSION_KEY = "version"; // for table bucket the snapshot belongs to private static final String TABLE_ID = "table_id"; @@ -43,7 +44,6 @@ public class CompletedSnapshotJsonSerde private static final String BUCKET_ID = "bucket_id"; private static final String SNAPSHOT_ID = "snapshot_id"; - private static final String SNAPSHOT_LOCATION = "snapshot_location"; // for kv snapshot's files private static final String KV_SNAPSHOT_HANDLE = "kv_snapshot_handle"; @@ -77,10 +77,6 @@ public void serialize(CompletedSnapshot completedSnapshot, JsonGenerator generat // serialize snapshot id generator.writeNumberField(SNAPSHOT_ID, completedSnapshot.getSnapshotID()); - // serialize snapshot location - generator.writeStringField( - SNAPSHOT_LOCATION, completedSnapshot.getSnapshotLocation().toString()); - // serialize kv snapshot handle generator.writeObjectFieldStart(KV_SNAPSHOT_HANDLE); KvSnapshotHandle kvSnapshotHandle = completedSnapshot.getKvSnapshotHandle(); @@ -128,6 +124,10 @@ private void serializeKvFileHandles( @Override public CompletedSnapshot deserialize(JsonNode node) { + // read version (backward compatible) + int version = node.has(VERSION_KEY) ? node.get(VERSION_KEY).asInt() : 1; + + // Deserialize table bucket JsonNode partitionIdNode = node.get(PARTITION_ID); Long partitionId = partitionIdNode == null ? null : partitionIdNode.asLong(); // deserialize table bucket @@ -138,19 +138,28 @@ public CompletedSnapshot deserialize(JsonNode node) { // deserialize snapshot id long snapshotId = node.get(SNAPSHOT_ID).asLong(); - // deserialize snapshot location - String snapshotLocation = node.get(SNAPSHOT_LOCATION).asText(); - // deserialize kv snapshot file handle JsonNode kvSnapshotFileHandleNode = node.get(KV_SNAPSHOT_HANDLE); // deserialize shared file handles - List sharedFileHandles = - deserializeKvFileHandles(kvSnapshotFileHandleNode, KV_SHARED_FILES_HANDLE); + List sharedFileHandles; + if (version == 1) { + sharedFileHandles = + deserializeKvFileHandlesV1(kvSnapshotFileHandleNode, KV_SHARED_FILES_HANDLE); + } else { + sharedFileHandles = + deserializeKvFileHandles(kvSnapshotFileHandleNode, KV_SHARED_FILES_HANDLE); + } // deserialize private file handles - List privateFileHandles = - deserializeKvFileHandles(kvSnapshotFileHandleNode, KV_PRIVATE_FILES_HANDLE); + List privateFileHandles; + if (version == 1) { + privateFileHandles = + deserializeKvFileHandlesV1(kvSnapshotFileHandleNode, KV_PRIVATE_FILES_HANDLE); + } else { + privateFileHandles = + deserializeKvFileHandles(kvSnapshotFileHandleNode, KV_PRIVATE_FILES_HANDLE); + } // deserialize snapshot incremental size long incrementalSize = kvSnapshotFileHandleNode.get(SNAPSHOT_INCREMENTAL_SIZE).asLong(); @@ -161,8 +170,7 @@ public CompletedSnapshot deserialize(JsonNode node) { // construct CompletedSnapshot KvSnapshotHandle kvSnapshotHandle = new KvSnapshotHandle(sharedFileHandles, privateFileHandles, incrementalSize); - return new CompletedSnapshot( - tableBucket, snapshotId, new FsPath(snapshotLocation), kvSnapshotHandle, logOffset); + return new CompletedSnapshot(tableBucket, snapshotId, kvSnapshotHandle, logOffset); } private List deserializeKvFileHandles( @@ -184,6 +192,26 @@ private List deserializeKvFileHandles( return kvFileHandleAndLocalPaths; } + private List deserializeKvFileHandlesV1( + JsonNode node, String kvHandleType) { + List kvFileHandleAndLocalPaths = new ArrayList<>(); + for (JsonNode kvFileHandleAndLocalPathNode : node.get(kvHandleType)) { + // deserialize kv file handle + JsonNode kvFileHandleNode = kvFileHandleAndLocalPathNode.get(KV_FILE_HANDLE); + String filePath = kvFileHandleNode.get(KV_FILE_PATH).asText(); + long fileSize = kvFileHandleNode.get(KV_FILE_SIZE).asLong(); + FsPath fsPath = new FsPath(filePath); + KvFileHandle kvFileHandle = new KvFileHandle(fsPath.getName(), fileSize); + + // deserialize kv file local path + String localPath = kvFileHandleAndLocalPathNode.get(KV_FILE_LOCAL_PATH).asText(); + KvFileHandleAndLocalPath kvFileHandleAndLocalPath = + KvFileHandleAndLocalPath.of(kvFileHandle, localPath); + kvFileHandleAndLocalPaths.add(kvFileHandleAndLocalPath); + } + return kvFileHandleAndLocalPaths; + } + /** Serialize the {@link CompletedSnapshot} to json bytes. */ public static byte[] toJson(CompletedSnapshot completedSnapshot) { return JsonSerdeUtils.writeValueAsBytes(completedSnapshot, INSTANCE); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java index b67f984668..fe94f6e471 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java @@ -22,6 +22,7 @@ import org.apache.fluss.fs.FileSystem; import org.apache.fluss.fs.FsPath; import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.utils.FlussPaths; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,12 +76,15 @@ public class CompletedSnapshotStore { */ private final ArrayDeque completedSnapshots; + private final FsPath remoteKvTabletDir; + public CompletedSnapshotStore( int maxNumberOfSnapshotsToRetain, SharedKvFileRegistry sharedKvFileRegistry, Collection completedSnapshots, CompletedSnapshotHandleStore completedSnapshotHandleStore, - Executor executor) { + Executor executor, + FsPath remoteKvTabletDir) { this.maxNumberOfSnapshotsToRetain = maxNumberOfSnapshotsToRetain; this.sharedKvFileRegistry = sharedKvFileRegistry; this.completedSnapshots = new ArrayDeque<>(); @@ -88,6 +92,7 @@ public CompletedSnapshotStore( this.completedSnapshotHandleStore = completedSnapshotHandleStore; this.ioExecutor = executor; this.snapshotsCleaner = new SnapshotsCleaner(); + this.remoteKvTabletDir = remoteKvTabletDir; } public void add(final CompletedSnapshot completedSnapshot) throws Exception { @@ -103,7 +108,7 @@ public long getPhysicalStorageRemoteKvSize() { } public long getNumSnapshots() { - return inLock(lock, () -> completedSnapshots.size()); + return inLock(lock, completedSnapshots::size); } /** @@ -154,6 +159,7 @@ CompletedSnapshot addSnapshotAndSubsumeOldestOne( // deletion sharedKvFileRegistry.unregisterUnusedKvFile(id); snapshotsCleaner.cleanSubsumedSnapshots( + remoteKvTabletDir, id, Collections.emptySet(), postCleanup, @@ -234,6 +240,12 @@ public Optional getLatestSnapshot() { return inLock(lock, () -> Optional.ofNullable(completedSnapshots.peekLast())); } + public FsPath getSnapshotMetadataFilePath(CompletedSnapshot snapshot) { + FsPath snapshotLocation = + FlussPaths.remoteKvSnapshotDir(remoteKvTabletDir, snapshot.getSnapshotID()); + return CompletedSnapshot.getMetadataFilePath(snapshotLocation); + } + /** * Serialize the completed snapshot to a metadata file, and return the handle wrapping the * metadata file path. @@ -244,7 +256,7 @@ private CompletedSnapshotHandle store(CompletedSnapshot snapshot) throws Excepti // we just reuse the snapshot dir to store the snapshot info to avoid another path // config Exception latestException = null; - FsPath filePath = snapshot.getMetadataFilePath(); + FsPath filePath = getSnapshotMetadataFilePath(snapshot); FileSystem fs = filePath.getFileSystem(); byte[] jsonBytes = CompletedSnapshotJsonSerde.toJson(snapshot); for (int attempt = 0; attempt < 10; attempt++) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/DefaultSnapshotContext.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/DefaultSnapshotContext.java index a6894851bf..41a9792e0e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/DefaultSnapshotContext.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/DefaultSnapshotContext.java @@ -157,8 +157,9 @@ public int maxFetchLogSizeInRecoverKv() { } @Override - public void handleSnapshotBroken(CompletedSnapshot snapshot) throws Exception { + public void handleSnapshotBroken(FsPath remoteKvTabletDir, CompletedSnapshot snapshot) + throws Exception { completedSnapshotHandleStore.remove(snapshot.getTableBucket(), snapshot.getSnapshotID()); - snapshot.discardAsync(asyncOperationsThreadPool); + snapshot.discardAsync(remoteKvTabletDir, asyncOperationsThreadPool); } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvFileHandle.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvFileHandle.java index 7f537815a6..5dca2b5339 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvFileHandle.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvFileHandle.java @@ -75,8 +75,8 @@ public String toString() { * * @throws Exception Thrown, if the file deletion (not the directory deletion) fails. */ - public void discard() throws Exception { - FsPath fsPath = new FsPath(filePath); + public void discard(FsPath remoteDir) throws Exception { + FsPath fsPath = new FsPath(remoteDir, filePath); final FileSystem fs = fsPath.getFileSystem(); IOException actualException = null; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvSnapshotDataDownloader.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvSnapshotDataDownloader.java index 48b235300a..dfd18e348b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvSnapshotDataDownloader.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvSnapshotDataDownloader.java @@ -66,16 +66,28 @@ void transferAllDataToDirectory( KvSnapshotHandle kvSnapshotHandle = kvSnapshotDownloadSpec.getKvSnapshotHandle(); List fsPathAndFileNames = Stream.concat( - kvSnapshotHandle.getSharedKvFileHandles().stream(), - kvSnapshotHandle.getPrivateFileHandles().stream()) - .map( - kvFileHandleAndLocalPath -> - new FsPathAndFileName( - new FsPath( - kvFileHandleAndLocalPath - .getKvFileHandle() - .getFilePath()), - kvFileHandleAndLocalPath.getLocalPath())) + kvSnapshotHandle.getSharedKvFileHandles().stream() + .map( + sharedHandle -> + new FsPathAndFileName( + new FsPath( + kvSnapshotDownloadSpec + .getSharedSnapshotDirectory(), + sharedHandle + .getKvFileHandle() + .getFilePath()), + sharedHandle.getLocalPath())), + kvSnapshotHandle.getPrivateFileHandles().stream() + .map( + privateHandle -> + new FsPathAndFileName( + new FsPath( + kvSnapshotDownloadSpec + .getSnapshotDirectory(), + privateHandle + .getKvFileHandle() + .getFilePath()), + privateHandle.getLocalPath()))) .collect(Collectors.toList()); fileDownloadSpecs.add( new FileDownloadSpec( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvSnapshotDataUploader.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvSnapshotDataUploader.java index ef4d3b6eee..d0c1899957 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvSnapshotDataUploader.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvSnapshotDataUploader.java @@ -149,7 +149,10 @@ private KvFileHandleAndLocalPath uploadLocalFileToSnapshotLocation( } // tmp resource registry will be closed when the snapshot is not completed, // which will then discard the uploaded files - tmpResourcesRegistry.registerCloseable(() -> SnapshotUtil.discardKvFileQuietly(result)); + tmpResourcesRegistry.registerCloseable( + () -> + SnapshotUtil.discardKvFileQuietly( + snapshotLocation.getSnapshotDirectory(), result)); return KvFileHandleAndLocalPath.of(result, filePath.getFileName().toString()); } finally { if (closeableRegistry.unregisterCloseable(inputStream)) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvSnapshotDownloadSpec.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvSnapshotDownloadSpec.java index 83aae52d1f..a08e21788f 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvSnapshotDownloadSpec.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvSnapshotDownloadSpec.java @@ -17,6 +17,8 @@ package org.apache.fluss.server.kv.snapshot; +import org.apache.fluss.fs.FsPath; + import java.nio.file.Path; /** @@ -25,17 +27,37 @@ */ public class KvSnapshotDownloadSpec { + /** The directory for exclusive snapshot data. */ + private final FsPath snapshotDirectory; + + /** The directory for shared snapshot data. */ + private final FsPath sharedSnapshotDirectory; + /** The handle to download . */ private final KvSnapshotHandle kvSnapshotHandle; /** The path to which the content of the snapshot handle shall be downloaded. */ private final Path downloadDestination; - public KvSnapshotDownloadSpec(KvSnapshotHandle kvSnapshotHandle, Path downloadDestination) { + public KvSnapshotDownloadSpec( + FsPath snapshotDirectory, + FsPath sharedSnapshotDirectory, + KvSnapshotHandle kvSnapshotHandle, + Path downloadDestination) { + this.snapshotDirectory = snapshotDirectory; + this.sharedSnapshotDirectory = sharedSnapshotDirectory; this.kvSnapshotHandle = kvSnapshotHandle; this.downloadDestination = downloadDestination; } + public FsPath getSnapshotDirectory() { + return snapshotDirectory; + } + + public FsPath getSharedSnapshotDirectory() { + return sharedSnapshotDirectory; + } + public KvSnapshotHandle getKvSnapshotHandle() { return kvSnapshotHandle; } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvSnapshotHandle.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvSnapshotHandle.java index a016f98755..2d09241bfb 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvSnapshotHandle.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvSnapshotHandle.java @@ -17,7 +17,9 @@ package org.apache.fluss.server.kv.snapshot; +import org.apache.fluss.fs.FsPath; import org.apache.fluss.server.utils.SnapshotUtil; +import org.apache.fluss.utils.FlussPaths; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,12 +99,13 @@ public long getSnapshotSize() { return snapshotSize; } - public void discard() { + public void discard(FsPath remoteKvTabletDir, long snapshotId) { SharedKvFileRegistry registry = this.sharedKvFileRegistry; final boolean isRegistered = (registry != null); try { SnapshotUtil.bestEffortDiscardAllKvFiles( + FlussPaths.remoteKvSnapshotDir(remoteKvTabletDir, snapshotId), privateFileHandles.stream() .map(KvFileHandleAndLocalPath::getKvFileHandle) .collect(Collectors.toList())); @@ -113,6 +116,7 @@ public void discard() { if (!isRegistered) { try { SnapshotUtil.bestEffortDiscardAllKvFiles( + FlussPaths.remoteKvSharedDir(remoteKvTabletDir), sharedFileHandles.stream() .map(KvFileHandleAndLocalPath::getKvFileHandle) .collect(Collectors.toSet())); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java index e61a4999ad..007844c2e5 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTarget.java @@ -224,7 +224,6 @@ public void handleSnapshotResult( new CompletedSnapshot( tableBucket, snapshotId, - snapshotResult.getSnapshotPath(), snapshotResult.getKvSnapshotHandle(), snapshotResult.getLogOffset()); try { @@ -323,7 +322,11 @@ private void handleSnapshotCommitException( + "The commit truly failed, proceeding with cleanup.", snapshotId, tableBucket); - snapshotsCleaner.cleanSnapshot(completedSnapshot, () -> {}, ioExecutor); + snapshotsCleaner.cleanSnapshot( + snapshotResult.getSnapshotPath().getParent(), + completedSnapshot, + () -> {}, + ioExecutor); handleSnapshotFailure(snapshotId, snapshotLocation, t); } } catch (Exception zkException) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java index c6b42f0efc..4ea8155496 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java @@ -307,7 +307,9 @@ private void discardFailedUploads(RunnableFuture snapshotedRunna try { SnapshotResult snapshotResult = snapshotedRunnableFuture.get(); if (snapshotResult != null) { - snapshotResult.getKvSnapshotHandle().discard(); + snapshotResult + .getKvSnapshotHandle() + .discard(snapshotResult.getSnapshotPath(), 1); FsPath remoteSnapshotPath = snapshotResult.getSnapshotPath(); remoteSnapshotPath.getFileSystem().delete(remoteSnapshotPath, true); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PlaceholderKvFileHandler.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PlaceholderKvFileHandler.java index 0ad884ea66..b2029e344c 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PlaceholderKvFileHandler.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PlaceholderKvFileHandler.java @@ -17,6 +17,8 @@ package org.apache.fluss.server.kv.snapshot; +import org.apache.fluss.fs.FsPath; + /** * A placeholder handle for shared kv files that will be replaced by an original that was created in * a previous snapshot. This class is used in the referenced kv files of {@link KvSnapshotHandle}. @@ -30,7 +32,7 @@ public PlaceholderKvFileHandler(KvFileHandle kvFileHandle) { } @Override - public void discard() throws Exception { + public void discard(FsPath remoteDir) throws Exception { // do nothing } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SharedKvFileRegistry.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SharedKvFileRegistry.java index 9d13d3e0e9..73422e4b61 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SharedKvFileRegistry.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SharedKvFileRegistry.java @@ -17,6 +17,7 @@ package org.apache.fluss.server.kv.snapshot; +import org.apache.fluss.fs.FsPath; import org.apache.fluss.utils.concurrent.Executors; import org.slf4j.Logger; @@ -59,12 +60,15 @@ public class SharedKvFileRegistry implements AutoCloseable { /** Executor for async kv deletion. */ private final Executor asyncDisposalExecutor; - public SharedKvFileRegistry() { - this(Executors.directExecutor()); + private final FsPath remoteKvSnapshotSharedDir; + + public SharedKvFileRegistry(FsPath remoteKvSnapshotSharedDir) { + this(remoteKvSnapshotSharedDir, Executors.directExecutor()); } - public SharedKvFileRegistry(Executor asyncDisposalExecutor) { + public SharedKvFileRegistry(FsPath remoteKvSnapshotSharedDir, Executor asyncDisposalExecutor) { this.registeredKvEntries = new HashMap<>(); + this.remoteKvSnapshotSharedDir = checkNotNull(remoteKvSnapshotSharedDir); this.asyncDisposalExecutor = checkNotNull(asyncDisposalExecutor); this.open = true; this.fileSize = 0L; @@ -171,7 +175,8 @@ private void scheduleAsyncDelete(KvFileHandle kvFileHandle) { // We do the small optimization to not issue discards for placeholders, which are NOPs. if (kvFileHandle != null && !isPlaceholder(kvFileHandle)) { LOG.debug("Scheduled delete of kv handle {}.", kvFileHandle); - AsyncDisposalRunnable asyncDisposalRunnable = new AsyncDisposalRunnable(kvFileHandle); + AsyncDisposalRunnable asyncDisposalRunnable = + new AsyncDisposalRunnable(remoteKvSnapshotSharedDir, kvFileHandle); asyncDisposalExecutor.execute(asyncDisposalRunnable); } } @@ -190,16 +195,18 @@ public void close() throws Exception { /** Encapsulates the operation the delete state handles asynchronously. */ private static final class AsyncDisposalRunnable implements Runnable { + private final FsPath remoteKvSnapshotSharedDir; private final KvFileHandle toDispose; - public AsyncDisposalRunnable(KvFileHandle toDispose) { + public AsyncDisposalRunnable(FsPath remoteKvSnapshotSharedDir, KvFileHandle toDispose) { + this.remoteKvSnapshotSharedDir = checkNotNull(remoteKvSnapshotSharedDir); this.toDispose = checkNotNull(toDispose); } @Override public void run() { try { - toDispose.discard(); + toDispose.discard(remoteKvSnapshotSharedDir); } catch (Exception e) { LOG.warn( "A problem occurred during asynchronous disposal of a shared kv object: {}", diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotContext.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotContext.java index 8f528bc473..884472f96c 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotContext.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotContext.java @@ -75,7 +75,8 @@ public interface SnapshotContext { * @param snapshot The broken snapshot to handle * @throws Exception if recovery handling fails */ - void handleSnapshotBroken(CompletedSnapshot snapshot) throws Exception; + void handleSnapshotBroken(FsPath remoteKvTabletDir, CompletedSnapshot snapshot) + throws Exception; /** * Get the max fetch size for fetching log to apply kv during recovering kv. The kv may apply diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotLocation.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotLocation.java index fe517f2682..dd498c1f1a 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotLocation.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotLocation.java @@ -87,6 +87,10 @@ public FsPath getSnapshotDirectory() { return snapshotDirectory; } + public FsPath getSharedSnapshotDirectory() { + return sharedSnapshotDirectory; + } + // ------------------------------------------------------------------------ /** @@ -289,7 +293,10 @@ public KvFileHandle closeAndGetHandle() throws IOException { outStream.close(); - return new KvFileHandle(kvFilePath.toString(), size); + // Only return the file name, not the full path + // The full path will be constructed by the caller based on scope + String fileName = kvFilePath.getName(); + return new KvFileHandle(fileName, size); } catch (Exception exception) { try { if (kvFilePath != null) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotsCleaner.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotsCleaner.java index dafabb1ded..687be78005 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotsCleaner.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotsCleaner.java @@ -17,6 +17,8 @@ package org.apache.fluss.server.kv.snapshot; +import org.apache.fluss.fs.FsPath; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +47,7 @@ public class SnapshotsCleaner { /** * Add one subsumed snapshot to SnapshotsCleaner, the subsumed snapshot would be discarded at - * {@link #cleanSubsumedSnapshots(long, Set, Runnable, Executor)}. + * {@link #cleanSubsumedSnapshots(FsPath, long, Set, Runnable, Executor)}. * * @param completedSnapshot which is subsumed. */ @@ -64,7 +66,11 @@ public void addSubsumedSnapshot(CompletedSnapshot completedSnapshot) { * @param executor is used to perform the cleanup logic. */ public void cleanSubsumedSnapshots( - long upTo, Set stillInUse, Runnable postCleanAction, Executor executor) { + FsPath remoteKvTabletDir, + long upTo, + Set stillInUse, + Runnable postCleanAction, + Executor executor) { synchronized (lock) { Iterator iterator = subsumedSnapshots.iterator(); while (iterator.hasNext()) { @@ -73,7 +79,7 @@ public void cleanSubsumedSnapshots( && !stillInUse.contains(snapshot.getSnapshotID())) { try { LOG.debug("Try to discard snapshot {}.", snapshot.getSnapshotID()); - cleanSnapshot(snapshot, postCleanAction, executor); + cleanSnapshot(remoteKvTabletDir, snapshot, postCleanAction, executor); iterator.remove(); } catch (Exception e) { LOG.warn("Fail to discard the old snapshot {}.", snapshot, e); @@ -84,9 +90,12 @@ public void cleanSubsumedSnapshots( } public void cleanSnapshot( - CompletedSnapshot snapshot, Runnable postCleanAction, Executor executor) { + FsPath remoteKvTabletDir, + CompletedSnapshot snapshot, + Runnable postCleanAction, + Executor executor) { LOG.debug("Clean snapshot {}.", snapshot.getSnapshotID()); - CompletableFuture discardFuture = snapshot.discardAsync(executor); + CompletableFuture discardFuture = snapshot.discardAsync(remoteKvTabletDir, executor); discardFuture.handle( (Object outerIgnored, Throwable outerThrowable) -> { if (outerThrowable != null) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index ee158b7a4b..1fdc1db9d2 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -144,6 +144,7 @@ public final class Replica { private final PhysicalTablePath physicalPath; private final TableBucket tableBucket; + private final FsPath remoteDataDir; private final LogManager logManager; private final LogTablet logTablet; @@ -219,6 +220,7 @@ public Replica( FatalErrorHandler fatalErrorHandler, BucketMetricGroup bucketMetricGroup, TableInfo tableInfo, + FsPath remoteDataDir, Clock clock) throws Exception { this.physicalPath = physicalPath; @@ -249,6 +251,7 @@ public Replica( this.logTablet = createLog(lazyHighWatermarkCheckpoint); this.logTablet.updateIsDataLakeEnabled(tableConfig.isDataLakeEnabled()); + this.remoteDataDir = remoteDataDir; this.clock = clock; registerMetrics(); } @@ -732,8 +735,20 @@ private Optional initKvTablet() { private void downloadKvSnapshots(CompletedSnapshot completedSnapshot, Path kvTabletDir) throws IOException { Path kvDbPath = kvTabletDir.resolve(RocksDBKvBuilder.DB_INSTANCE_DIR_STRING); + + FsPath remoteKvTabletDir = + FlussPaths.remoteKvTabletDir( + FlussPaths.remoteKvDir(remoteDataDir), physicalPath, tableBucket); + FsPath snapshotDirectory = + FlussPaths.remoteKvSnapshotDir( + remoteKvTabletDir, completedSnapshot.getSnapshotID()); + FsPath sharedSnapshotDirectory = FlussPaths.remoteKvSharedDir(remoteKvTabletDir); KvSnapshotDownloadSpec downloadSpec = - new KvSnapshotDownloadSpec(completedSnapshot.getKvSnapshotHandle(), kvDbPath); + new KvSnapshotDownloadSpec( + snapshotDirectory, + sharedSnapshotDirectory, + completedSnapshot.getKvSnapshotHandle(), + kvDbPath); long start = clock.milliseconds(); LOG.info("Start to download kv snapshot {} to directory {}.", completedSnapshot, kvDbPath); KvSnapshotDataDownloader kvSnapshotDataDownloader = @@ -743,7 +758,7 @@ private void downloadKvSnapshots(CompletedSnapshot completedSnapshot, Path kvTab } catch (Exception e) { if (e.getMessage().contains(CompletedSnapshot.SNAPSHOT_DATA_NOT_EXISTS_ERROR_MESSAGE)) { try { - snapshotContext.handleSnapshotBroken(completedSnapshot); + snapshotContext.handleSnapshotBroken(remoteKvTabletDir, completedSnapshot); } catch (Exception t) { LOG.error("Handle broken snapshot {} failed.", completedSnapshot, t); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index 76e3ea1867..4707055e24 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -1648,6 +1648,7 @@ protected Optional maybeCreateReplica(NotifyLeaderAndIsrData data) { fatalErrorHandler, bucketMetricGroup, tableInfo, + FlussPaths.remoteDataDir(conf), clock); allReplicas.put(tb, new OnlineReplica(replica)); replicaOpt = Optional.of(replica); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/SnapshotUtil.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/SnapshotUtil.java index 32e679ae11..955154348d 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/SnapshotUtil.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/SnapshotUtil.java @@ -17,6 +17,7 @@ package org.apache.fluss.server.utils; +import org.apache.fluss.fs.FsPath; import org.apache.fluss.server.kv.snapshot.KvFileHandle; import org.apache.fluss.utils.ExceptionUtils; import org.apache.fluss.utils.function.ThrowingConsumer; @@ -40,16 +41,17 @@ public class SnapshotUtil { * caught during iteration */ public static void bestEffortDiscardAllKvFiles( - Iterable handlesToDiscard) throws Exception { - applyToAllWhileSuppressingExceptions(handlesToDiscard, KvFileHandle::discard); + FsPath remoteDir, Iterable handlesToDiscard) throws Exception { + applyToAllWhileSuppressingExceptions( + handlesToDiscard, kvFileHandle -> kvFileHandle.discard(remoteDir)); } - public static void discardKvFileQuietly(KvFileHandle kvFileHandle) { + public static void discardKvFileQuietly(FsPath remoteDir, KvFileHandle kvFileHandle) { if (kvFileHandle == null) { return; } try { - kvFileHandle.discard(); + kvFileHandle.discard(remoteDir); } catch (Exception exception) { LOG.warn("Discard {} exception.", kvFileHandle, exception); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManagerTest.java index b8fec59cab..82676d0f02 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManagerTest.java @@ -54,7 +54,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH; +import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link CompletedSnapshotStoreManager}. */ @@ -120,7 +120,7 @@ void testCompletedSnapshotStoreManage(int maxNumberOfSnapshotsToRetain) throws E assertThat( completedSnapshotStoreManager .getOrCreateCompletedSnapshotStore( - DATA1_TABLE_PATH, tableBucket) + DATA1_PHYSICAL_TABLE_PATH, tableBucket) .getAllSnapshots()) .hasSize(maxNumberOfSnapshotsToRetain); } @@ -148,7 +148,7 @@ void testCompletedSnapshotStoreManage(int maxNumberOfSnapshotsToRetain) throws E assertThat( completedSnapshotStoreManager .getOrCreateCompletedSnapshotStore( - DATA1_TABLE_PATH, tableBucket) + DATA1_PHYSICAL_TABLE_PATH, tableBucket) .getAllSnapshots()) .hasSize(maxNumberOfSnapshotsToRetain); } @@ -157,7 +157,8 @@ void testCompletedSnapshotStoreManage(int maxNumberOfSnapshotsToRetain) throws E TableBucket nonExistBucket = new TableBucket(10, 100); assertThat( completedSnapshotStoreManager - .getOrCreateCompletedSnapshotStore(DATA1_TABLE_PATH, nonExistBucket) + .getOrCreateCompletedSnapshotStore( + DATA1_PHYSICAL_TABLE_PATH, nonExistBucket) .getAllSnapshots()) .hasSize(0); } @@ -191,7 +192,7 @@ void testMetadataInconsistencyWithMetadataNotExistsException() throws Exception CompletedSnapshot validSnapshot = KvTestUtils.mockCompletedSnapshot(tempDir, tableBucket, 1L); TestingCompletedSnapshotHandle validSnapshotHandle = - new TestingCompletedSnapshotHandle(validSnapshot); + new TestingCompletedSnapshotHandle(validSnapshot, null); CompletedSnapshot invalidSnapshot = KvTestUtils.mockCompletedSnapshot(tempDir, tableBucket, 2L); @@ -211,12 +212,13 @@ void testMetadataInconsistencyWithMetadataNotExistsException() throws Exception ioExecutor, zookeeperClient, zooKeeperClient -> completedSnapshotHandleStore, - TestingMetricGroups.COORDINATOR_METRICS); + TestingMetricGroups.COORDINATOR_METRICS, + null); // Verify that only the valid snapshot remains CompletedSnapshotStore completedSnapshotStore = completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore( - DATA1_TABLE_PATH, tableBucket); + DATA1_PHYSICAL_TABLE_PATH, tableBucket); assertThat(completedSnapshotStore.getAllSnapshots()).hasSize(1); assertThat(completedSnapshotStore.getAllSnapshots().get(0).getSnapshotID()).isEqualTo(1L); } @@ -227,14 +229,15 @@ private CompletedSnapshotStoreManager createCompletedSnapshotStoreManager( maxNumberOfSnapshotsToRetain, ioExecutor, zookeeperClient, - TestingMetricGroups.COORDINATOR_METRICS); + TestingMetricGroups.COORDINATOR_METRICS, + null); } private CompletedSnapshot getLatestCompletedSnapshot( CompletedSnapshotStoreManager completedSnapshotStoreManager, TableBucket tableBucket) { CompletedSnapshotStore completedSnapshotStore = completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore( - DATA1_TABLE_PATH, tableBucket); + DATA1_PHYSICAL_TABLE_PATH, tableBucket); return completedSnapshotStore.getLatestSnapshot().get(); } @@ -245,7 +248,7 @@ private void addCompletedSnapshot( TableBucket tableBucket = completedSnapshot.getTableBucket(); CompletedSnapshotStore completedSnapshotStore = completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore( - DATA1_TABLE_PATH, tableBucket); + DATA1_PHYSICAL_TABLE_PATH, tableBucket); completedSnapshotStore.add(completedSnapshot); } @@ -283,7 +286,7 @@ private static class TestingCompletedSnapshotHandleWithFileNotFound extends TestingCompletedSnapshotHandle { public TestingCompletedSnapshotHandleWithFileNotFound(CompletedSnapshot snapshot) { - super(snapshot, false); + super(snapshot, null, false); } @Override diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java index d4ec8f8db2..d0cb724c98 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java @@ -238,7 +238,7 @@ void testCreateAndDropTable() throws Exception { // mock CompletedSnapshotStore for (TableBucket tableBucket : allTableBuckets(t1Id, nBuckets)) { completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore( - t1, new TableBucket(tableBucket.getTableId(), tableBucket.getBucket())); + null, new TableBucket(tableBucket.getTableId(), tableBucket.getBucket())); } assertThat(completedSnapshotStoreManager.getBucketCompletedSnapshotStores()).isNotEmpty(); @@ -632,7 +632,7 @@ void testCreateAndDropPartition() throws Exception { // mock CompletedSnapshotStore for partition1 for (TableBucket tableBucket : allTableBuckets(tableId, partition1Id, nBuckets)) { completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore( - tablePath, + null, new TableBucket( tableBucket.getTableId(), tableBucket.getPartitionId(), diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotJsonSerdeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotJsonSerdeTest.java index d89087ab26..517b410f2d 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotJsonSerdeTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotJsonSerdeTest.java @@ -17,14 +17,24 @@ package org.apache.fluss.server.kv.snapshot; -import org.apache.fluss.fs.FsPath; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.utils.json.JsonSerdeTestBase; +import org.apache.fluss.utils.json.JsonSerdeUtils; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; -/** Test for {@link org.apache.fluss.server.kv.snapshot.CompletedSnapshotJsonSerde}. */ +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test for {@link CompletedSnapshotJsonSerde}. + * + *

This test verifies JSON serialization/deserialization and backward compatibility between V1 + * (absolute paths) and V2 (relative paths) formats. + */ class CompletedSnapshotJsonSerdeTest extends JsonSerdeTestBase { protected CompletedSnapshotJsonSerdeTest() { @@ -35,32 +45,22 @@ protected CompletedSnapshotJsonSerdeTest() { protected CompletedSnapshot[] createObjects() { List sharedFileHandles = Arrays.asList( - KvFileHandleAndLocalPath.of( - new KvFileHandle("oss://bucket/snapshot/shared/t1.sst", 1), - "localPath1"), - KvFileHandleAndLocalPath.of( - new KvFileHandle("oss://bucket/snapshot/shared/t2.sst", 2), - "localPath2")); + KvFileHandleAndLocalPath.of(new KvFileHandle("t1.sst", 1), "localPath1"), + KvFileHandleAndLocalPath.of(new KvFileHandle("t2.sst", 2), "localPath2")); List privateFileHandles = Arrays.asList( - KvFileHandleAndLocalPath.of( - new KvFileHandle("oss://bucket/snapshot/snapshot1/t3", 3), - "localPath3"), - KvFileHandleAndLocalPath.of( - new KvFileHandle("oss://bucket/snapshot/snapshot1/t4", 4), - "localPath4")); + KvFileHandleAndLocalPath.of(new KvFileHandle("t3", 3), "localPath3"), + KvFileHandleAndLocalPath.of(new KvFileHandle("t4", 4), "localPath4")); CompletedSnapshot completedSnapshot1 = new CompletedSnapshot( new TableBucket(1, 1), 1, - new FsPath("oss://bucket/snapshot"), new KvSnapshotHandle(sharedFileHandles, privateFileHandles, 5), 10); CompletedSnapshot completedSnapshot2 = new CompletedSnapshot( new TableBucket(1, 10L, 1), 1, - new FsPath("oss://bucket/snapshot"), new KvSnapshotHandle(sharedFileHandles, privateFileHandles, 5), 10); return new CompletedSnapshot[] {completedSnapshot1, completedSnapshot2}; @@ -68,27 +68,90 @@ protected CompletedSnapshot[] createObjects() { @Override protected String[] expectedJsons() { + // V2 format: version=2, uses relative paths return new String[] { - "{\"version\":1," + "{\"version\":2," + "\"table_id\":1,\"bucket_id\":1," + "\"snapshot_id\":1," - + "\"snapshot_location\":\"oss://bucket/snapshot\"," + "\"kv_snapshot_handle\":{" - + "\"shared_file_handles\":[{\"kv_file_handle\":{\"path\":\"oss://bucket/snapshot/shared/t1.sst\",\"size\":1},\"local_path\":\"localPath1\"}," - + "{\"kv_file_handle\":{\"path\":\"oss://bucket/snapshot/shared/t2.sst\",\"size\":2},\"local_path\":\"localPath2\"}]," - + "\"private_file_handles\":[{\"kv_file_handle\":{\"path\":\"oss://bucket/snapshot/snapshot1/t3\",\"size\":3},\"local_path\":\"localPath3\"}," - + "{\"kv_file_handle\":{\"path\":\"oss://bucket/snapshot/snapshot1/t4\",\"size\":4},\"local_path\":\"localPath4\"}]," + + "\"shared_file_handles\":[{\"kv_file_handle\":{\"path\":\"t1.sst\",\"size\":1},\"local_path\":\"localPath1\"}," + + "{\"kv_file_handle\":{\"path\":\"t2.sst\",\"size\":2},\"local_path\":\"localPath2\"}]," + + "\"private_file_handles\":[{\"kv_file_handle\":{\"path\":\"t3\",\"size\":3},\"local_path\":\"localPath3\"}," + + "{\"kv_file_handle\":{\"path\":\"t4\",\"size\":4},\"local_path\":\"localPath4\"}]," + "\"snapshot_incremental_size\":5},\"log_offset\":10}", - "{\"version\":1," + "{\"version\":2," + "\"table_id\":1,\"partition_id\":10,\"bucket_id\":1," + "\"snapshot_id\":1," - + "\"snapshot_location\":\"oss://bucket/snapshot\"," + "\"kv_snapshot_handle\":{" - + "\"shared_file_handles\":[{\"kv_file_handle\":{\"path\":\"oss://bucket/snapshot/shared/t1.sst\",\"size\":1},\"local_path\":\"localPath1\"}," - + "{\"kv_file_handle\":{\"path\":\"oss://bucket/snapshot/shared/t2.sst\",\"size\":2},\"local_path\":\"localPath2\"}]," - + "\"private_file_handles\":[{\"kv_file_handle\":{\"path\":\"oss://bucket/snapshot/snapshot1/t3\",\"size\":3},\"local_path\":\"localPath3\"}," - + "{\"kv_file_handle\":{\"path\":\"oss://bucket/snapshot/snapshot1/t4\",\"size\":4},\"local_path\":\"localPath4\"}]," + + "\"shared_file_handles\":[{\"kv_file_handle\":{\"path\":\"t1.sst\",\"size\":1},\"local_path\":\"localPath1\"}," + + "{\"kv_file_handle\":{\"path\":\"t2.sst\",\"size\":2},\"local_path\":\"localPath2\"}]," + + "\"private_file_handles\":[{\"kv_file_handle\":{\"path\":\"t3\",\"size\":3},\"local_path\":\"localPath3\"}," + + "{\"kv_file_handle\":{\"path\":\"t4\",\"size\":4},\"local_path\":\"localPath4\"}]," + "\"snapshot_incremental_size\":5},\"log_offset\":10}" }; } + + /** Test backward compatibility with V1 format (absolute paths). */ + @Test + void testV1Compatibility() { + // V1 format: no version field or version=1, uses absolute paths + // Test case 1: Non-partitioned table without version field (implicit V1) + String v1JsonWithoutVersion = + "{\"table_id\":1,\"bucket_id\":1," + + "\"snapshot_id\":1," + + "\"kv_snapshot_handle\":{" + + "\"shared_file_handles\":[{\"kv_file_handle\":{\"path\":\"oss://bucket/snapshot/shared/t1.sst\",\"size\":1},\"local_path\":\"localPath1\"}," + + "{\"kv_file_handle\":{\"path\":\"oss://bucket/snapshot/shared/t2.sst\",\"size\":2},\"local_path\":\"localPath2\"}]," + + "\"private_file_handles\":[{\"kv_file_handle\":{\"path\":\"oss://bucket/snapshot/snapshot1/t3\",\"size\":3},\"local_path\":\"localPath3\"}," + + "{\"kv_file_handle\":{\"path\":\"oss://bucket/snapshot/snapshot1/t4\",\"size\":4},\"local_path\":\"localPath4\"}]," + + "\"snapshot_incremental_size\":5},\"log_offset\":10}"; + + CompletedSnapshot snapshot1 = + JsonSerdeUtils.readValue( + v1JsonWithoutVersion.getBytes(StandardCharsets.UTF_8), + CompletedSnapshotJsonSerde.INSTANCE); + + // Verify the file paths are extracted correctly from absolute paths + assertThat(snapshot1.getTableBucket()).isEqualTo(new TableBucket(1, 1)); + assertThat(snapshot1.getSnapshotID()).isEqualTo(1); + assertThat(snapshot1.getLogOffset()).isEqualTo(10); + KvSnapshotHandle handle1 = snapshot1.getKvSnapshotHandle(); + assertThat(handle1.getSharedKvFileHandles()).hasSize(2); + assertThat(handle1.getSharedKvFileHandles().get(0).getKvFileHandle().getFilePath()) + .isEqualTo("t1.sst"); + assertThat(handle1.getSharedKvFileHandles().get(1).getKvFileHandle().getFilePath()) + .isEqualTo("t2.sst"); + assertThat(handle1.getPrivateFileHandles()).hasSize(2); + assertThat(handle1.getPrivateFileHandles().get(0).getKvFileHandle().getFilePath()) + .isEqualTo("t3"); + assertThat(handle1.getPrivateFileHandles().get(1).getKvFileHandle().getFilePath()) + .isEqualTo("t4"); + assertThat(handle1.getIncrementalSize()).isEqualTo(5); + + // Test case 2: Partitioned table with explicit version=1 + String v1JsonWithVersion = + "{\"version\":1," + + "\"table_id\":1,\"partition_id\":10,\"bucket_id\":1," + + "\"snapshot_id\":1," + + "\"kv_snapshot_handle\":{" + + "\"shared_file_handles\":[{\"kv_file_handle\":{\"path\":\"oss://bucket/snapshot/shared/t1.sst\",\"size\":1},\"local_path\":\"localPath1\"}]," + + "\"private_file_handles\":[{\"kv_file_handle\":{\"path\":\"oss://bucket/snapshot/snapshot1/t3\",\"size\":3},\"local_path\":\"localPath3\"}]," + + "\"snapshot_incremental_size\":5},\"log_offset\":10}"; + + CompletedSnapshot snapshot2 = + JsonSerdeUtils.readValue( + v1JsonWithVersion.getBytes(StandardCharsets.UTF_8), + CompletedSnapshotJsonSerde.INSTANCE); + + assertThat(snapshot2.getTableBucket()).isEqualTo(new TableBucket(1, 10L, 1)); + assertThat(snapshot2.getSnapshotID()).isEqualTo(1); + assertThat(snapshot2.getLogOffset()).isEqualTo(10); + KvSnapshotHandle handle2 = snapshot2.getKvSnapshotHandle(); + assertThat(handle2.getSharedKvFileHandles()).hasSize(1); + assertThat(handle2.getSharedKvFileHandles().get(0).getKvFileHandle().getFilePath()) + .isEqualTo("t1.sst"); + assertThat(handle2.getPrivateFileHandles()).hasSize(1); + assertThat(handle2.getPrivateFileHandles().get(0).getKvFileHandle().getFilePath()) + .isEqualTo("t3"); + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStoreTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStoreTest.java index 1ab73cfee4..81e516f34c 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStoreTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStoreTest.java @@ -59,12 +59,14 @@ class CompletedSnapshotStoreTest { private TestCompletedSnapshotHandleStore.Builder builder; private TestCompletedSnapshotHandleStore defaultHandleStore; private @TempDir Path tempDir; + private FsPath remoteKvTabletDir; @BeforeEach void setup() { builder = TestCompletedSnapshotHandleStore.newBuilder(); defaultHandleStore = builder.build(); executorService = Executors.newFixedThreadPool(2, new ExecutorThreadFactory("IO-Executor")); + remoteKvTabletDir = FsPath.fromLocalFile(tempDir.toFile()); } @AfterEach @@ -525,7 +527,6 @@ private CompletedSnapshot getSnapshot(long id) { return new CompletedSnapshot( tableBucket, id, - new FsPath(tempDir.toString(), "test_snapshot"), new KvSnapshotHandle(Collections.emptyList(), Collections.emptyList(), 0)); } @@ -551,13 +552,14 @@ private CompletedSnapshotStore createCompletedSnapshotStore( CompletedSnapshotHandleStore snapshotHandleStore, Collection completedSnapshots) { - SharedKvFileRegistry sharedKvFileRegistry = new SharedKvFileRegistry(); + SharedKvFileRegistry sharedKvFileRegistry = new SharedKvFileRegistry(remoteKvTabletDir); return new CompletedSnapshotStore( numToRetain, sharedKvFileRegistry, completedSnapshots, snapshotHandleStore, - executorService); + executorService, + remoteKvTabletDir); } private List> createSnapshotHandles(int num) { @@ -572,12 +574,13 @@ private List> createSnapshotHandles( new CompletedSnapshot( new TableBucket(1, 1), i, - new FsPath("test_snapshot"), new KvSnapshotHandle( Collections.emptyList(), Collections.emptyList(), -1)); final CompletedSnapshotHandle snapshotStateHandle = new TestingCompletedSnapshotHandle( - completedSnapshot, failSnapshots.contains(num)); + completedSnapshot, + new FsPath("test_snapshot"), + failSnapshots.contains(num)); stateHandles.add(new Tuple2<>(snapshotStateHandle, String.valueOf(i))); } return stateHandles; diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotTest.java index 44b6d4f3b6..0b698dc550 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotTest.java @@ -19,6 +19,7 @@ import org.apache.fluss.fs.FsPath; import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.utils.FlussPaths; import org.apache.fluss.utils.concurrent.Executors; import org.junit.jupiter.api.Test; @@ -44,42 +45,42 @@ void testCleanup(@TempDir Path tempDir) throws Exception { Path localFileDir = makeDir(tempDir, "local"); Path snapshotBaseLocation = makeDir(tempDir, "snapshot"); // create a share directory - Path shareDir = makeDir(snapshotBaseLocation, "share"); + Path shareDir = makeDir(snapshotBaseLocation, FlussPaths.REMOTE_KV_SNAPSHOT_SHARED_DIR); // snapshot 1 long snapshotId = 1; - Path snapshotPath = makeDir(snapshotBaseLocation, "snapshot-" + snapshotId); + Path snapshotPath = + makeDir( + snapshotBaseLocation, + FlussPaths.REMOTE_KV_SNAPSHOT_DIR_PREFIX + snapshotId); KvSnapshotHandle kvSnapshotHandle = makeSnapshotHandle(localFileDir, snapshotPath, shareDir, 100); CompletedSnapshot snapshot = - new CompletedSnapshot( - tableBucket, - snapshotId, - FsPath.fromLocalFile(snapshotPath.toFile()), - kvSnapshotHandle); + new CompletedSnapshot(tableBucket, snapshotId, kvSnapshotHandle); - SharedKvFileRegistry sharedKvFileRegistry = new SharedKvFileRegistry(); + SharedKvFileRegistry sharedKvFileRegistry = + new SharedKvFileRegistry(FsPath.fromLocalFile(shareDir.toFile())); // register the snapshot to a registry snapshot.registerSharedKvFilesAfterRestored(sharedKvFileRegistry); Executor ioExecutor = Executors.directExecutor(); - snapshot.discardAsync(ioExecutor).get(); + snapshot.discardAsync(FsPath.fromLocalFile(snapshotBaseLocation.toFile()), ioExecutor) + .get(); // share files shouldn't be deleted - checkCompletedSnapshotCleanUp(snapshotPath, kvSnapshotHandle, false); + checkCompletedSnapshotCleanUp(shareDir, snapshotPath, kvSnapshotHandle, false); // snapshot 2 snapshotId = 2; - snapshotPath = makeDir(snapshotBaseLocation, "snapshot-" + snapshotId); + snapshotPath = + makeDir( + snapshotBaseLocation, + FlussPaths.REMOTE_KV_SNAPSHOT_DIR_PREFIX + snapshotId); kvSnapshotHandle = makeSnapshotHandle(localFileDir, snapshotPath, shareDir, 100); - snapshot = - new CompletedSnapshot( - tableBucket, - snapshotId, - FsPath.fromLocalFile(snapshotPath.toFile()), - kvSnapshotHandle); - snapshot.discardAsync(ioExecutor).get(); + snapshot = new CompletedSnapshot(tableBucket, snapshotId, kvSnapshotHandle); + snapshot.discardAsync(FsPath.fromLocalFile(snapshotBaseLocation.toFile()), ioExecutor) + .get(); // share files should be deleted since it has not been registered - checkCompletedSnapshotCleanUp(snapshotPath, kvSnapshotHandle, true); + checkCompletedSnapshotCleanUp(shareDir, snapshotPath, kvSnapshotHandle, true); } @Test @@ -97,20 +98,26 @@ void testKvSnapshotSize(@TempDir Path tempDir) throws Exception { KvSnapshotHandle kvSnapshotHandle = makeSnapshotHandle(localFileDir, snapshotPath, shareDir, 100); CompletedSnapshot snapshot = - new CompletedSnapshot( - tableBucket, - snapshotId, - FsPath.fromLocalFile(snapshotPath.toFile()), - kvSnapshotHandle); + new CompletedSnapshot(tableBucket, snapshotId, kvSnapshotHandle); assertThat(snapshot.getKvSnapshotHandle().getSnapshotSize()).isEqualTo(400L); } private void checkCompletedSnapshotCleanUp( - Path snapshotPath, KvSnapshotHandle kvSnapshotHandle, boolean isShareFileShouldDelete) { + Path shareDir, + Path snapshotPath, + KvSnapshotHandle kvSnapshotHandle, + boolean isShareFileShouldDelete) { // private should be deleted, but the local file should still remain for (KvFileHandleAndLocalPath kvFileHandleAndLocalPath : kvSnapshotHandle.getPrivateFileHandles()) { - assertThat(new File(kvFileHandleAndLocalPath.getKvFileHandle().getFilePath())) + assertThat( + new File( + snapshotPath + .resolve( + kvFileHandleAndLocalPath + .getKvFileHandle() + .getFilePath()) + .toString())) .doesNotExist(); assertThat(new File(kvFileHandleAndLocalPath.getLocalPath())).exists(); } @@ -118,13 +125,14 @@ private void checkCompletedSnapshotCleanUp( // check the share files is as expected, and the local file should still remain for (KvFileHandleAndLocalPath kvFileHandleAndLocalPath : kvSnapshotHandle.getSharedKvFileHandles()) { + String sharedFie = + shareDir.resolve(kvFileHandleAndLocalPath.getKvFileHandle().getFilePath()) + .toString(); // share files should also be deleted, but the local file should still remain if (isShareFileShouldDelete) { - assertThat(new File(kvFileHandleAndLocalPath.getKvFileHandle().getFilePath())) - .doesNotExist(); + assertThat(new File(sharedFie)).doesNotExist(); } else { - assertThat(new File(kvFileHandleAndLocalPath.getKvFileHandle().getFilePath())) - .exists(); + assertThat(new File(sharedFie)).exists(); } assertThat(new File(kvFileHandleAndLocalPath.getLocalPath())).exists(); } @@ -149,7 +157,7 @@ KvSnapshotHandle makeSnapshotHandle( writeRandomDataToFile(shareFile, perFileSize); sharedFileHandles.add( KvFileHandleAndLocalPath.of( - new KvFileHandle(shareFile.getPath(), shareFile.length()), + new KvFileHandle(shareFile.getName(), shareFile.length()), localFile.getPath())); } @@ -157,15 +165,15 @@ KvSnapshotHandle makeSnapshotHandle( // private files dir List privateFileHandles = new ArrayList<>(); for (int i = 0; i < 2; i++) { - File localFile = new File(localPath.toFile(), "local_private_i"); + File localFile = new File(localPath.toFile(), "local_private_" + i); localFile.createNewFile(); writeRandomDataToFile(localFile, perFileSize); - File privateFile = new File(baseSnapshotDir.toFile(), "remote_i"); + File privateFile = new File(baseSnapshotDir.toFile(), "remote_" + i); privateFile.createNewFile(); writeRandomDataToFile(privateFile, perFileSize); privateFileHandles.add( KvFileHandleAndLocalPath.of( - new KvFileHandle(privateFile.getPath(), privateFile.length()), + new KvFileHandle(privateFile.getName(), privateFile.length()), localFile.getPath())); } return new KvSnapshotHandle(sharedFileHandles, privateFileHandles, 10); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvSnapshotDataDownloaderTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvSnapshotDataDownloaderTest.java index a4b9494a5d..3d8bdd31e4 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvSnapshotDataDownloaderTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvSnapshotDataDownloaderTest.java @@ -164,6 +164,6 @@ private KvSnapshotDownloadSpec createDownloadRequestForContent( KvSnapshotHandle kvSnapshotHandle = new KvSnapshotHandle(sharedStates, privateStates, -1); - return new KvSnapshotDownloadSpec(kvSnapshotHandle, dstPath); + return new KvSnapshotDownloadSpec(null, null, kvSnapshotHandle, dstPath); } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvSnapshotDataUploaderTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvSnapshotDataUploaderTest.java index ef50485937..931f785ea6 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvSnapshotDataUploaderTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvSnapshotDataUploaderTest.java @@ -96,9 +96,7 @@ void testMultiThreadUploadCorrectly() throws Exception { .findFirst() .get() .getKvFileHandle(); - assertThat(kvFileHandle.getFilePath()) - .startsWith(LocalFileSystem.getLocalFsURI().getScheme()); - FsPath fsPath = new FsPath(kvFileHandle.getFilePath()); + FsPath fsPath = new FsPath(snapshotSharedDirectory, kvFileHandle.getFilePath()); FSDataInputStream inputStream = fsPath.getFileSystem().open(fsPath); assertContentEqual(path, inputStream); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java index 4077e0e65e..dabcde4e1a 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java @@ -22,6 +22,7 @@ import org.apache.fluss.exception.FlussException; import org.apache.fluss.exception.FlussRuntimeException; import org.apache.fluss.fs.FsPath; +import org.apache.fluss.fs.local.LocalFileSystem; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.server.SequenceIDCounter; import org.apache.fluss.server.kv.rocksdb.RocksDBExtension; @@ -157,13 +158,18 @@ void testSnapshot(@TempDir Path kvTabletDir, @TempDir Path temoRebuildPath) thro completedSnapshotHandleStore.get(tableBucket, snapshotId1).get(); CompletedSnapshot snapshot = completedSnapshotHandle.retrieveCompleteSnapshot(); assertThat(snapshot.getSnapshotID()).isEqualTo(snapshotId1); - // verify the metadata file path - assertThat(snapshot.getMetadataFilePath()) - .isEqualTo(CompletedSnapshot.getMetadataFilePath(snapshot.getSnapshotLocation())); + SnapshotLocation snapshotLocation = + new SnapshotLocation( + LocalFileSystem.getSharedInstance(), + FlussPaths.remoteKvSnapshotDir(remoteKvTabletDir, snapshotId1), + FlussPaths.remoteKvSharedDir(remoteKvTabletDir), + 1024); // rebuild from snapshot, and the check the rebuilt rocksdb try (RocksDBKv rocksDBKv = KvTestUtils.buildFromSnapshotHandle( - snapshot.getKvSnapshotHandle(), temoRebuildPath.resolve("restore1"))) { + snapshotLocation, + snapshot.getKvSnapshotHandle(), + temoRebuildPath.resolve("restore1"))) { assertThat(rocksDBKv.get("key1".getBytes())).isEqualTo("val1".getBytes()); } @@ -181,14 +187,19 @@ void testSnapshot(@TempDir Path kvTabletDir, @TempDir Path temoRebuildPath) thro completedSnapshotHandle = completedSnapshotHandleStore.get(tableBucket, snapshotId2).get(); snapshot = completedSnapshotHandle.retrieveCompleteSnapshot(); // rebuild from snapshot, and the check the rebuilt rocksdb - // verify the metadata file path - assertThat(snapshot.getMetadataFilePath()) - .isEqualTo(CompletedSnapshot.getMetadataFilePath(snapshot.getSnapshotLocation())); assertThat(snapshot.getSnapshotID()).isEqualTo(snapshotId2); assertThat(updateMinRetainOffsetConsumer.get()).isEqualTo(5); + snapshotLocation = + new SnapshotLocation( + LocalFileSystem.getSharedInstance(), + FlussPaths.remoteKvSnapshotDir(remoteKvTabletDir, snapshotId2), + FlussPaths.remoteKvSharedDir(remoteKvTabletDir), + 1024); try (RocksDBKv rocksDBKv = KvTestUtils.buildFromSnapshotHandle( - snapshot.getKvSnapshotHandle(), temoRebuildPath.resolve("restore2"))) { + snapshotLocation, + snapshot.getKvSnapshotHandle(), + temoRebuildPath.resolve("restore2"))) { assertThat(rocksDBKv.get("key1".getBytes())).isEqualTo("val1".getBytes()); assertThat(rocksDBKv.get("key2".getBytes())).isEqualTo("val2".getBytes()); } @@ -320,7 +331,7 @@ void testIdempotentCheckWhenSnapshotExistsInZK(@TempDir Path kvTabletDir) throws CompletedSnapshotHandle handle = new CompletedSnapshotHandle( snapshot.getSnapshotID(), - snapshot.getSnapshotLocation(), + remoteKvTabletDir, snapshot.getLogOffset()); completedSnapshotHandleStore.add( snapshot.getTableBucket(), snapshot.getSnapshotID(), handle); @@ -471,7 +482,7 @@ private KvTabletSnapshotTarget createSnapshotTarget( SnapshotFailType snapshotFailType) throws IOException { TableBucket tableBucket = new TableBucket(1, 1); - SharedKvFileRegistry sharedKvFileRegistry = new SharedKvFileRegistry(); + SharedKvFileRegistry sharedKvFileRegistry = new SharedKvFileRegistry(remoteKvTabletDir); Executor executor = Executors.directExecutor(); CompletedSnapshotStore completedSnapshotStore = @@ -480,7 +491,8 @@ private KvTabletSnapshotTarget createSnapshotTarget( sharedKvFileRegistry, Collections.emptyList(), snapshotHandleStore, - executor); + executor, + remoteKvTabletDir); RocksIncrementalSnapshot rocksIncrementalSnapshot = createIncrementalSnapshot(snapshotFailType); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshotTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshotTest.java index 2e4b4fbca0..1416e21205 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshotTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/RocksIncrementalSnapshotTest.java @@ -116,7 +116,8 @@ void testIncrementalSnapshot(@TempDir Path snapshotBaseDir, @TempDir Path snapsh // test restore from cp2 Path dest1 = snapshotDownDir.resolve("restore1"); try (RocksDBKv rocksDBKv = - KvTestUtils.buildFromSnapshotHandle(kvSnapshotHandle2, dest1)) { + KvTestUtils.buildFromSnapshotHandle( + snapshotLocation, kvSnapshotHandle2, dest1)) { assertThat(rocksDBKv.get("key1".getBytes())).isEqualTo("val1".getBytes()); assertThat(rocksDBKv.get("key2".getBytes())).isNull(); assertThat(rocksDBKv.get("key3".getBytes())).isNull(); @@ -124,7 +125,8 @@ void testIncrementalSnapshot(@TempDir Path snapshotBaseDir, @TempDir Path snapsh Path dest2 = snapshotDownDir.resolve("restore2"); // test restore from cp4 try (RocksDBKv rocksDBKv = - KvTestUtils.buildFromSnapshotHandle(kvSnapshotHandle4, dest2)) { + KvTestUtils.buildFromSnapshotHandle( + snapshotLocation, kvSnapshotHandle4, dest2)) { assertThat(rocksDBKv.get("key1".getBytes())).isEqualTo("val1".getBytes()); assertThat(rocksDBKv.get("key2".getBytes())).isEqualTo("val2".getBytes()); assertThat(rocksDBKv.get("key3".getBytes())).isEqualTo("val3".getBytes()); @@ -135,12 +137,13 @@ void testIncrementalSnapshot(@TempDir Path snapshotBaseDir, @TempDir Path snapsh KvSnapshotHandle kvSnapshotHandle5 = snapshot(5L, incrementalSnapshot, snapshotLocation, closeableRegistry); // discard the snapshot handle - kvSnapshotHandle5.discard(); + kvSnapshotHandle5.discard(testingTabletDir, 5L); // we can still restore from cp4 Path dest3 = snapshotDownDir.resolve("restore3"); try (RocksDBKv rocksDBKv = - KvTestUtils.buildFromSnapshotHandle(kvSnapshotHandle4, dest3)) { + KvTestUtils.buildFromSnapshotHandle( + snapshotLocation, kvSnapshotHandle4, dest3)) { assertThat(rocksDBKv.get("key1".getBytes())).isEqualTo("val1".getBytes()); assertThat(rocksDBKv.get("key2".getBytes())).isEqualTo("val2".getBytes()); assertThat(rocksDBKv.get("key3".getBytes())).isEqualTo("val3".getBytes()); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/SharedKvFileRegistryTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/SharedKvFileRegistryTest.java index 6a178b40c2..ed9e3eb784 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/SharedKvFileRegistryTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/SharedKvFileRegistryTest.java @@ -17,7 +17,12 @@ package org.apache.fluss.server.kv.snapshot; +import org.apache.fluss.fs.FsPath; + import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; import static org.assertj.core.api.Assertions.assertThat; @@ -25,8 +30,10 @@ class SharedKvFileRegistryTest { @Test - void testRegistryNormal() throws Exception { - SharedKvFileRegistry sharedKvFileRegistry = new SharedKvFileRegistry(); + void testRegistryNormal(@TempDir Path tmpDir) throws Exception { + FsPath snapshotPath = FsPath.fromLocalFile(tmpDir.toFile()); + + SharedKvFileRegistry sharedKvFileRegistry = new SharedKvFileRegistry(snapshotPath); TestKvHandle firstHandle = new TestKvHandle("first"); @@ -51,7 +58,7 @@ void testRegistryNormal() throws Exception { // now, we test the case register a handle again with a placeholder sharedKvFileRegistry.close(); - sharedKvFileRegistry = new SharedKvFileRegistry(); + sharedKvFileRegistry = new SharedKvFileRegistry(snapshotPath); TestKvHandle testKvHandle = new TestKvHandle("test"); KvFileHandle handle = sharedKvFileRegistry.registerReference( @@ -71,8 +78,10 @@ void testRegistryNormal() throws Exception { /** Validate that unregister a nonexistent snapshot will not throw exception. */ @Test - void testUnregisterWithUnexistedKey() { - SharedKvFileRegistry sharedStateRegistry = new SharedKvFileRegistry(); + void testUnregisterWithUnexistedKey(@TempDir Path tmpDir) { + FsPath snapshotPath = FsPath.fromLocalFile(tmpDir.toFile()); + + SharedKvFileRegistry sharedStateRegistry = new SharedKvFileRegistry(snapshotPath); sharedStateRegistry.unregisterUnusedKvFile(-1); sharedStateRegistry.unregisterUnusedKvFile(Long.MAX_VALUE); } @@ -88,7 +97,7 @@ public TestKvHandle(String path) { } @Override - public void discard() throws Exception { + public void discard(FsPath remoteDir) throws Exception { this.discarded = true; } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/SnapshotsCleanerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/SnapshotsCleanerTest.java index 2e66bd945c..3cbf1a8c00 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/SnapshotsCleanerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/SnapshotsCleanerTest.java @@ -24,8 +24,6 @@ import org.junit.jupiter.api.Test; -import javax.annotation.Nullable; - import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -37,6 +35,8 @@ /** Test for {@link org.apache.fluss.server.kv.snapshot.SnapshotsCleaner}. */ class SnapshotsCleanerTest { + private static final FsPath REMOTE_KV_TABLET_DIR = new FsPath("testpath"); + @Test void testNotCleanSnapshotInUse() { TableBucket tableBucket = new TableBucket(1, 1); @@ -49,7 +49,11 @@ void testNotCleanSnapshotInUse() { snapshotsCleaner.addSubsumedSnapshot(cp3); snapshotsCleaner.cleanSubsumedSnapshots( - 3, Collections.singleton(1L), () -> {}, Executors.directExecutor()); + REMOTE_KV_TABLET_DIR, + 3, + Collections.singleton(1L), + () -> {}, + Executors.directExecutor()); // cp 1 is in use, shouldn't discard. assertThat(cp1.isDiscarded()).isFalse(); assertThat(cp2.isDiscarded()).isTrue(); @@ -67,7 +71,11 @@ void testNotCleanHigherSnapshot() { TestCompletedSnapshot cp3 = createSnapshot(tableBucket, 3); snapshotsCleaner.addSubsumedSnapshot(cp3); snapshotsCleaner.cleanSubsumedSnapshots( - 2, Collections.emptySet(), () -> {}, Executors.directExecutor()); + REMOTE_KV_TABLET_DIR, + 2, + Collections.emptySet(), + () -> {}, + Executors.directExecutor()); assertThat(cp1.isDiscarded()).isTrue(); // cp2 is the lowest snapshot that is still valid, shouldn't discard. @@ -81,16 +89,13 @@ public static class TestCompletedSnapshot extends CompletedSnapshot { private boolean isDiscarded; public TestCompletedSnapshot( - TableBucket tableBucket, - long snapshotID, - FsPath snapshotLocation, - TestKvSnapshotHandle kvSnapshotHandle) { - super(tableBucket, snapshotID, snapshotLocation, kvSnapshotHandle); + TableBucket tableBucket, long snapshotID, TestKvSnapshotHandle kvSnapshotHandle) { + super(tableBucket, snapshotID, kvSnapshotHandle); } - public CompletableFuture discardAsync(Executor ioExecutor) { + public CompletableFuture discardAsync(FsPath remoteKvTabletDir, Executor ioExecutor) { CompletableFuture resultFuture = new CompletableFuture<>(); - super.discardAsync(ioExecutor) + super.discardAsync(remoteKvTabletDir, ioExecutor) .whenComplete( (ignore, throwable) -> { if (throwable != null) { @@ -122,7 +127,7 @@ public TestKvSnapshotHandle( } @Override - public void discard() { + public void discard(FsPath remoteKvTabletDir, long snapshotId) { isDiscarded = true; } @@ -141,11 +146,6 @@ public static void verifySnapshotDiscarded(TestKvSnapshotHandle testKvSnapshotHa } public static TestCompletedSnapshot createSnapshot(TableBucket tableBucket, long snapshotId) { - return createSnapshot(tableBucket, snapshotId, new FsPath("testpath")); - } - - public static TestCompletedSnapshot createSnapshot( - TableBucket tableBucket, long snapshotId, @Nullable FsPath snapshotPath) { List sharedFileHandles = new ArrayList<>(); List privateFileHandles = new ArrayList<>(); @@ -160,6 +160,6 @@ public static TestCompletedSnapshot createSnapshot( TestKvSnapshotHandle kvSnapshotHandle = new TestKvSnapshotHandle(sharedFileHandles, privateFileHandles); - return new TestCompletedSnapshot(tableBucket, snapshotId, snapshotPath, kvSnapshotHandle); + return new TestCompletedSnapshot(tableBucket, snapshotId, kvSnapshotHandle); } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/TestingCompletedSnapshotHandle.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/TestingCompletedSnapshotHandle.java index d8dbc8f763..620f50affc 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/TestingCompletedSnapshotHandle.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/TestingCompletedSnapshotHandle.java @@ -17,6 +17,8 @@ package org.apache.fluss.server.kv.snapshot; +import org.apache.fluss.fs.FsPath; + import java.io.IOException; /** @@ -29,13 +31,13 @@ public class TestingCompletedSnapshotHandle extends CompletedSnapshotHandle { private final boolean shouldFailWhenRetrieve; - public TestingCompletedSnapshotHandle(CompletedSnapshot snapshot) { - this(snapshot, false); + public TestingCompletedSnapshotHandle(CompletedSnapshot snapshot, FsPath metaDataFilePath) { + this(snapshot, metaDataFilePath, false); } public TestingCompletedSnapshotHandle( - CompletedSnapshot snapshot, boolean shouldFailWhenRetrieve) { - super(snapshot.getSnapshotID(), snapshot.getSnapshotLocation(), snapshot.getLogOffset()); + CompletedSnapshot snapshot, FsPath metaDataFilePath, boolean shouldFailWhenRetrieve) { + super(snapshot.getSnapshotID(), metaDataFilePath, snapshot.getLogOffset()); this.snapshot = snapshot; this.shouldFailWhenRetrieve = shouldFailWhenRetrieve; } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/ZooKeeperCompletedSnapshotStoreTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/ZooKeeperCompletedSnapshotStoreTest.java index c6e90ff630..d43a3841fd 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/ZooKeeperCompletedSnapshotStoreTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/ZooKeeperCompletedSnapshotStoreTest.java @@ -72,19 +72,20 @@ static void afterAll() { /** Tests that subsumed snapshots are discarded. */ @Test void testDiscardingSubsumedSnapshots(@TempDir Path tmpDir) throws Exception { - SharedKvFileRegistry sharedKvFileRegistry = new SharedKvFileRegistry(); + FsPath snapshotPath = FsPath.fromLocalFile(tmpDir.toFile()); + + SharedKvFileRegistry sharedKvFileRegistry = new SharedKvFileRegistry(snapshotPath); final CompletedSnapshotStore snapshotStore = - createZooKeeperSnapshotStore(zooKeeperClient, sharedKvFileRegistry); + createZooKeeperSnapshotStore(snapshotPath, zooKeeperClient, sharedKvFileRegistry); TableBucket tableBucket = new TableBucket(1, 1); - FsPath snapshotPath = FsPath.fromLocalFile(tmpDir.toFile()); SnapshotsCleanerTest.TestCompletedSnapshot snapshot1 = - SnapshotsCleanerTest.createSnapshot(tableBucket, 0, snapshotPath); + SnapshotsCleanerTest.createSnapshot(tableBucket, 0); snapshotStore.add(snapshot1); assertThat(snapshotStore.getAllSnapshots()).containsExactly(snapshot1); final SnapshotsCleanerTest.TestCompletedSnapshot snapshot2 = - SnapshotsCleanerTest.createSnapshot(tableBucket, 1, snapshotPath); + SnapshotsCleanerTest.createSnapshot(tableBucket, 1); snapshotStore.add(snapshot2); final List allSnapshots = snapshotStore.getAllSnapshots(); assertThat(allSnapshots).containsExactly(snapshot2); @@ -104,18 +105,19 @@ void testAddSnapshotWithFailedRemove(@TempDir Path tmpDir) throws Exception { ConfigOptions.ZOOKEEPER_ADDRESS, zooKeeperExtensionWrapper.getCustomExtension().getConnectString()); - SharedKvFileRegistry sharedKvFileRegistry = new SharedKvFileRegistry(); + FsPath snapshotPath = FsPath.fromLocalFile(tmpDir.toFile()); + SharedKvFileRegistry sharedKvFileRegistry = new SharedKvFileRegistry(snapshotPath); TableBucket tableBucket = new TableBucket(1, 2); try (ZooKeeperClient zooKeeperClient = ZooKeeperUtils.startZookeeperClient(configuration, NOPErrorHandler.INSTANCE)) { final CompletedSnapshotStore store = - createZooKeeperSnapshotStore(zooKeeperClient, sharedKvFileRegistry); + createZooKeeperSnapshotStore( + snapshotPath, zooKeeperClient, sharedKvFileRegistry); CountDownLatch discardAttempted = new CountDownLatch(1); for (long i = 0; i < 2; ++i) { - FsPath snapshotPath = FsPath.fromLocalFile(tmpDir.toFile()); CompletedSnapshot snapshotToAdd = - SnapshotsCleanerTest.createSnapshot(tableBucket, i, snapshotPath); + SnapshotsCleanerTest.createSnapshot(tableBucket, i); // shouldn't fail despite the exception store.addSnapshotAndSubsumeOldestOne( snapshotToAdd, @@ -131,7 +133,9 @@ void testAddSnapshotWithFailedRemove(@TempDir Path tmpDir) throws Exception { @Nonnull private CompletedSnapshotStore createZooKeeperSnapshotStore( - ZooKeeperClient zooKeeperClient, SharedKvFileRegistry sharedKvFileRegistry) { + FsPath snapshotPath, + ZooKeeperClient zooKeeperClient, + SharedKvFileRegistry sharedKvFileRegistry) { ZooKeeperCompletedSnapshotHandleStore snapshotsInZooKeeper = new ZooKeeperCompletedSnapshotHandleStore(zooKeeperClient); return new CompletedSnapshotStore( @@ -139,6 +143,7 @@ private CompletedSnapshotStore createZooKeeperSnapshotStore( sharedKvFileRegistry, Collections.emptyList(), snapshotsInZooKeeper, - Executors.directExecutor()); + Executors.directExecutor(), + snapshotPath); } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/KvSnapshotITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/KvSnapshotITCase.java index bf178eb2a2..02bc8cbb84 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/KvSnapshotITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/KvSnapshotITCase.java @@ -138,8 +138,9 @@ void testKvSnapshotAndDelete() throws Exception { Tuple2.of("k1", new Object[] {1, "k1"}), Tuple2.of("k2", new Object[] {2, "k2"}))); KvTestUtils.checkSnapshot(completedSnapshot, expectedKeyValues, 2); - bucketKvSnapshotDirs.add( - new File(completedSnapshot.getSnapshotLocation().getParent().getPath())); + // bucketKvSnapshotDirs.add( + // new + // File(completedSnapshot.getSnapshotLocation().getParent().getPath())); // put kv batch again kvRecordBatch = diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java index 8bdd46db66..4573f320c2 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java @@ -616,7 +616,7 @@ void testBrokenSnapshotRecovery(@TempDir File snapshotKvTabletDir) throws Except // now simulate the latest snapshot (snapshot2) being broken by // deleting its metadata files and unshared SST files // This simulates file corruption while ZK metadata remains intact - snapshot2.getKvSnapshotHandle().discard(); + snapshot2.getKvSnapshotHandle().discard(null, 1); // ZK metadata should still show snapshot2 as latest (file corruption hasn't been detected // yet) @@ -655,7 +655,7 @@ void testBrokenSnapshotRecovery(@TempDir File snapshotKvTabletDir) throws Except // The key test is that the system can handle broken snapshots and recover correctly // Verify that we successfully simulated the broken snapshot condition - File metadataFile = new File(snapshot2.getMetadataFilePath().getPath()); + File metadataFile = new File(snapshot2.getMetadataFilePath(null).getPath()); assertThat(metadataFile.exists()).isFalse(); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java index 559353df63..6d56c5a341 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java @@ -61,6 +61,7 @@ import org.apache.fluss.testutils.common.AllCallbackWrapper; import org.apache.fluss.testutils.common.ManuallyTriggeredScheduledExecutorService; import org.apache.fluss.utils.CloseableRegistry; +import org.apache.fluss.utils.FlussPaths; import org.apache.fluss.utils.clock.ManualClock; import org.apache.fluss.utils.concurrent.FlussScheduler; import org.apache.fluss.utils.function.FunctionWithException; @@ -481,6 +482,7 @@ private Replica makeReplica( NOPErrorHandler.INSTANCE, metricGroup, DATA1_TABLE_INFO, + FlussPaths.remoteDataDir(conf), manualClock); } @@ -664,11 +666,12 @@ private void unchecked(ThrowingRunnable throwingRunnable) { } @Override - public void handleSnapshotBroken(CompletedSnapshot snapshot) throws Exception { + public void handleSnapshotBroken(FsPath remoteKvTabletDir, CompletedSnapshot snapshot) + throws Exception { // Remove the broken snapshot from the snapshot store (simulating ZK metadata removal) testKvSnapshotStore.removeSnapshot(snapshot.getTableBucket(), snapshot.getSnapshotID()); // Discard the snapshot files async (similar to DefaultSnapshotContext implementation) - snapshot.discardAsync(executorService); + snapshot.discardAsync(remoteKvTabletDir, executorService); } } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/KvTestUtils.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/KvTestUtils.java index 273858d311..c2e359e9c5 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/KvTestUtils.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/KvTestUtils.java @@ -18,7 +18,6 @@ package org.apache.fluss.server.testutils; import org.apache.fluss.config.Configuration; -import org.apache.fluss.fs.FsPath; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.rpc.messages.LookupResponse; import org.apache.fluss.rpc.messages.PbLookupRespForBucket; @@ -35,6 +34,7 @@ import org.apache.fluss.server.kv.snapshot.KvSnapshotDownloadSpec; import org.apache.fluss.server.kv.snapshot.KvSnapshotHandle; import org.apache.fluss.server.kv.snapshot.PlaceholderKvFileHandler; +import org.apache.fluss.server.kv.snapshot.SnapshotLocation; import org.apache.fluss.utils.CloseableRegistry; import org.apache.fluss.utils.FileUtils; import org.apache.fluss.utils.types.Tuple2; @@ -68,7 +68,7 @@ public static void checkSnapshot( assertThat(completedSnapshot.getLogOffset()).isEqualTo(expectLogOffset); try (RocksDBKv rocksDBKv = buildFromSnapshotHandle( - completedSnapshot.getKvSnapshotHandle(), temRebuildPath)) { + null, completedSnapshot.getKvSnapshotHandle(), temRebuildPath)) { // check the key counts int keyCounts = getKeyCounts(rocksDBKv.getDb()); assertThat(keyCounts).isEqualTo(expectedKeyValues.size()); @@ -121,13 +121,13 @@ public static CompletedSnapshot mockCompletedSnapshot( return new CompletedSnapshot( tableBucket, snapshotId, - new FsPath( - snapshotRootDir - + String.format( - "/tableBucket-%d-bucket-%d-snapshot-%d", - tableBucket.getTableId(), - tableBucket.getBucket(), - snapshotId)), + // new FsPath( + // snapshotRootDir + // + String.format( + // "/tableBucket-%d-bucket-%d-snapshot-%d", + // tableBucket.getTableId(), + // tableBucket.getBucket(), + // snapshotId)), new KvSnapshotHandle(Collections.emptyList(), Collections.emptyList(), 0), 0); } @@ -143,13 +143,16 @@ public static int getKeyCounts(RocksDB rocksDB) { } public static RocksDBKv buildFromSnapshotHandle( - KvSnapshotHandle kvSnapshotHandle, Path destPath) throws Exception { + SnapshotLocation snapshotLocation, KvSnapshotHandle kvSnapshotHandle, Path destPath) + throws Exception { ExecutorService downloadThreadPool = Executors.newSingleThreadExecutor(); try (CloseableRegistry closeableRegistry = new CloseableRegistry()) { KvSnapshotDataDownloader dbDataDownloader = new KvSnapshotDataDownloader(downloadThreadPool); KvSnapshotDownloadSpec downloadSpec1 = new KvSnapshotDownloadSpec( + snapshotLocation.getSnapshotDirectory(), + snapshotLocation.getSharedSnapshotDirectory(), kvSnapshotHandle, destPath.resolve(RocksDBKvBuilder.DB_INSTANCE_DIR_STRING));