Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8aa0079
Allow to pro-actively copy shuffle data to fallback storage
EnricoMi Nov 3, 2025
a3be813
Use Scala futures
EnricoMi Nov 3, 2025
9c6b192
Don't report blocks on async copy, only on copy
EnricoMi Nov 3, 2025
7fdb702
Recover pro-actively copied shuffle data from fallback storage
EnricoMi Nov 3, 2025
86d8c91
Attempt to test recovery
EnricoMi Nov 3, 2025
147411a
Improve conf access, log async copy
EnricoMi Nov 14, 2025
ee580ea
Recover shuflle data of lost executors and hosts
EnricoMi Nov 18, 2025
7abea7c
Remove recovery, support sync and async pro-active copy
EnricoMi Nov 18, 2025
ccabc18
Rename async to reliable mode, use more conf helper
EnricoMi Nov 24, 2025
f018f53
Improve comment, fix imports
EnricoMi Nov 24, 2025
0ec2f1a
Update map outputs to Fallback Storage on executor loss
EnricoMi Nov 26, 2025
d3496c1
Rework FallbackStorage.exists used from tests only
EnricoMi Nov 26, 2025
b7f7aa2
Revert testing recovery from missing decommissionig
EnricoMi Dec 17, 2025
5693bf2
Fix isReliable logic, fix comment typo
EnricoMi Dec 17, 2025
2dd1887
Test DAGScheduler with executor loss and reliable FallbackStorage
EnricoMi Dec 17, 2025
b09dec2
Fix FallbackStorageSuite
EnricoMi Dec 17, 2025
05ba4fd
Remove recover copyAsync test from FallbackStorageSuite
EnricoMi Dec 17, 2025
af91c93
Remove BlockManager test
EnricoMi Dec 17, 2025
32f3b8f
Fix binary compatibility
EnricoMi Dec 17, 2025
4ed2527
Fixing style
EnricoMi Dec 17, 2025
2fe6e42
Attempt to fetch failed block from FallbackStorage
EnricoMi May 14, 2025
2f86402
Fix tests in ShuffleBlockFetcherIteratorSuite
EnricoMi Feb 5, 2026
1986d20
Suppress logging fetch failure in reliable mode
EnricoMi Feb 5, 2026
9d685a1
Disable peer decommissioning when shuffle max disk size is 0
EnricoMi May 2, 2025
ce60801
Add executor feature step to create executor service
EnricoMi May 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ public ShuffleExecutorComponents executor() {

@Override
public ShuffleDriverComponents driver() {
return new LocalDiskShuffleDriverComponents();
return new LocalDiskShuffleDriverComponents(sparkConf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,27 @@
import java.util.Collections;
import java.util.Map;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import org.apache.spark.shuffle.api.ShuffleDriverComponents;
import org.apache.spark.storage.BlockManagerMaster;
import org.apache.spark.storage.FallbackStorage;

public class LocalDiskShuffleDriverComponents implements ShuffleDriverComponents {

private final SparkConf sparkConf;

private BlockManagerMaster blockManagerMaster;

@Deprecated
public LocalDiskShuffleDriverComponents() {
this.sparkConf = null;
}

public LocalDiskShuffleDriverComponents(SparkConf sparkConf) {
this.sparkConf = sparkConf;
}

@Override
public Map<String, String> initializeApplication() {
blockManagerMaster = SparkEnv.get().blockManager().master();
Expand All @@ -46,4 +59,8 @@ public void removeShuffle(int shuffleId, boolean blocking) {
}
blockManagerMaster.removeShuffle(shuffleId, blocking);
}

public boolean supportsReliableStorage() {
return FallbackStorage.isReliable(this.sparkConf);
}
}
46 changes: 46 additions & 0 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,38 @@ private class ShuffleStatus(

// TODO support updateMergeResult for similar use cases as updateMapOutput

/**
* Updates all shuffle outputs associated with this host.
*/
def updateOutputsOnHost(host: String, bm: BlockManagerId): Unit = withWriteLock {
logDebug(s"Updating outputs for host ${host}")
updateOutputsByFilter(x => x.host == host, bm)
}

/**
* Updates all map outputs associated with the specified executor.
*/
def updateOutputsOnExecutor(execId: String, bm: BlockManagerId): Unit = withWriteLock {
logDebug(s"Updating outputs for execId ${execId}")
updateOutputsByFilter(x => x.executorId == execId, bm)
}

/**
* Updates all shuffle outputs which satisfies the filter.
*/
def updateOutputsByFilter(
f: BlockManagerId => Boolean,
bm: BlockManagerId): Unit = withWriteLock {
for (mapIndex <- mapStatuses.indices) {
// get the map status from mapStatuses, or if deleted, from mapStatusesDeleted
val currentMapStatus = Option(mapStatuses(mapIndex)).getOrElse(mapStatusesDeleted(mapIndex))
if (currentMapStatus != null && f(currentMapStatus.location)) {
// use updateMapOutput so we can recover deleted map statuses
updateMapOutput(currentMapStatus.mapId, bm)
}
}
}

/**
* Remove the merge result which was served by the specified block manager.
*/
Expand Down Expand Up @@ -950,6 +982,20 @@ private[spark] class MapOutputTrackerMaster(
}
}

/**
* Updates all shuffle outputs associated with this host.
*/
def updateOutputsOnHost(host: String, bm: BlockManagerId): Unit = {
shuffleStatuses.valuesIterator.foreach { _.updateOutputsOnHost(host, bm) }
}

/**
* Updates all shuffle outputs associated with this executor.
*/
def updateOutputsOnExecutor(execId: String, bm: BlockManagerId): Unit = {
shuffleStatuses.valuesIterator.foreach { _.updateOutputsOnExecutor(execId, bm) }
}

/**
* Removes all shuffle outputs associated with this host. Note that this will also remove
* outputs which are served by an external shuffle server (if one exists).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,13 +625,34 @@ package object config {
.checkValue(_.endsWith(java.io.File.separator), "Path should end with separator.")
.createOptional

private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_PROACTIVE_ENABLED =
ConfigBuilder("spark.storage.decommission.fallbackStorage.proactive.enabled")
.doc("Enables proactive shuffle block replication for fallback storage. " +
"If enabled, all shuffle blocks are copied asynchronously to the fallback storage. " +
"This speeds-up decommissioning as most shuffle data might have been replicated by then, " +
"at the cost of extra network traffic and storage usage if no decommission occurs. " +
"This does not migrate any shuffle data until decommission is started.")
.version("4.2.0")
.booleanConf
.createWithDefault(false)

private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_PROACTIVE_RELIABLE =
ConfigBuilder("spark.storage.decommission.fallbackStorage.proactive.reliable")
.doc("Enables reliable shuffle data replication to fallback storage. " +
"The task success depends on the successful transfer of shuffle data " +
"to the fallback storage. This allows to recover from node failures.")
.version("4.2.0")
.booleanConf
.createWithDefault(false)

private[spark] val STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE =
ConfigBuilder("spark.storage.decommission.shuffleBlocks.maxDiskSize")
.doc("Maximum disk space to use to store shuffle blocks before rejecting remote " +
"shuffle blocks. Rejecting remote shuffle blocks means that an executor will not receive " +
"any shuffle migrations, and if there are no other executors available for migration " +
"then shuffle blocks will be lost unless " +
s"${STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH.key} is configured.")
s"${STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH.key} is configured. " +
s"Set to 0 to migrate to fallback storage only.")
.version("3.2.0")
.bytesConf(ByteUnit.BYTE)
.createOptional
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2842,6 +2842,21 @@ private[spark] class DAGScheduler(
mapOutputTracker.removeOutputsOnExecutor(execId)
}
}
} else {
// we only get here if fallback storage is reliable
// see LocalDiskShuffleDriverComponents.supportsReliableStorage
if (FallbackStorage.isConfigured(env.conf)) {
hostToUnregisterOutputs match {
case Some(host) =>
logInfo(log"Relocating shuffle files of host to Fallback Storage: ${MDC(HOST, host)} (epoch " +
log"${MDC(EPOCH, currentEpoch)}")
mapOutputTracker.updateOutputsOnHost(host, FallbackStorage.FALLBACK_BLOCK_MANAGER_ID)
case None =>
logInfo(log"Relocating shuffle files of executor to Fallback Storage: ${MDC(EXECUTOR_ID, execId)} " +
log"(epoch ${MDC(EPOCH, currentEpoch)})")
mapOutputTracker.updateOutputsOnExecutor(execId, FallbackStorage.FALLBACK_BLOCK_MANAGER_ID)
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.{ShuffleDependency, SparkEnv, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.{NUM_MERGER_LOCATIONS, SHUFFLE_ID, STAGE_ID}
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage.FallbackStorage

/**
* The interface for customizing shuffle write process. The driver create a ShuffleWriteProcessor
Expand Down Expand Up @@ -84,6 +85,26 @@ private[spark] class ShuffleWriteProcessor extends Serializable with Logging {
case _ =>
}
}
// copy block to fallback storage if pro-active replication to fallback storage is enabled
if (FallbackStorage.isProactive(SparkEnv.get.conf)) {
val shuffleBlockInfo = ShuffleBlockInfo(dep.shuffleId, mapId)
val fallbackStorage = FallbackStorage.getFallbackStorage(SparkEnv.get.conf)
val blockManager = SparkEnv.get.blockManager

if (FallbackStorage.isReliable(SparkEnv.get.conf)) {
// we are not catching exceptions here as we want them to fail the task to retry it,
// otherwise the next stage would see a fetch-failed exception when it has to read
// from the fallback storage, which will then retry this stage entirely
fallbackStorage.foreach(
_.copy(shuffleBlockInfo, blockManager, isAsyncCopy = false, reportBlockStatus = false)
)
} else {
// we ignore exceptions that occur asynchronously, this is best-effort replication
// we do not want to defer the task in any way
fallbackStorage
.foreach(_.copyAsync(shuffleBlockInfo, blockManager))
}
}
}
mapStatus.get
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ private[spark] class BlockManager(
shuffleManager.shuffleBlockResolver.getBlockData(blockId)
} catch {
case e: IOException =>
if (conf.get(config.STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
if (FallbackStorage.isConfigured(conf)) {
FallbackStorage.read(conf, blockId)
} else {
throw e
Expand Down Expand Up @@ -1822,8 +1822,7 @@ private[spark] class BlockManager(
lastPeerFetchTimeNs = System.nanoTime()
logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]"))
}
if (cachedPeers.isEmpty &&
conf.get(config.STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
if (cachedPeers.isEmpty && FallbackStorage.isConfigured(conf)) {
Seq(FallbackStorage.FALLBACK_BLOCK_MANAGER_ID)
} else {
cachedPeers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark._
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.internal.config.STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE
import org.apache.spark.shuffle.ShuffleBlockInfo
import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
import org.apache.spark.util.{ThreadUtils, Utils}
Expand Down Expand Up @@ -319,8 +320,15 @@ private[storage] class BlockManagerDecommissioner(
log"${MDC(TOTAL, localShuffles.size)} local shuffles are added. " +
log"In total, ${MDC(NUM_REMAINED, remainedShuffles)} shuffles are remained.")

// migrate to fallback storage only if
// STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH is set and
// STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE is 0
val fallbackOnly = conf.get(config.STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined &&
conf.get(STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE).contains(0)

// Update the threads doing migrations
val livePeerSet = bm.getPeers(false).toSet
val livePeerSet = if (fallbackOnly) Set(FallbackStorage.FALLBACK_BLOCK_MANAGER_ID)
else bm.getPeers(false).toSet
val currentPeerSet = migrationPeers.keys.toSet
val deadPeers = currentPeerSet.diff(livePeerSet)
// Randomize the orders of the peers to avoid hotspot nodes.
Expand Down
Loading
Loading