diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/JobsScheduler.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/JobsScheduler.java
index 2067fa014..361116288 100644
--- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/JobsScheduler.java
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/JobsScheduler.java
@@ -583,6 +583,13 @@ protected static CommandLine parseArgs(String[] args) {
.longOpt(OperationTasksBuilder.MAX_COST_BUDGET_GB_HRS)
.desc("Maximum compute cost budget in GB hours")
.build());
+ options.addOption(
+ Option.builder(null)
+ .required(false)
+ .hasArg()
+ .longOpt(OperationTasksBuilder.BATCH_MAX_ITEMS)
+ .desc("Max tables per batched OFD job (ORPHAN_FILES_DELETION_BATCH only)")
+ .build());
options.addOption(
Option.builder(null)
.required(false)
@@ -744,6 +751,11 @@ protected static Properties getAdditionalProperties(CommandLine cmdLine) {
OperationTasksBuilder.MAX_STRATEGIES_COUNT,
cmdLine.getOptionValue(OperationTasksBuilder.MAX_STRATEGIES_COUNT));
}
+ if (cmdLine.hasOption(OperationTasksBuilder.BATCH_MAX_ITEMS)) {
+ result.setProperty(
+ OperationTasksBuilder.BATCH_MAX_ITEMS,
+ cmdLine.getOptionValue(OperationTasksBuilder.BATCH_MAX_ITEMS));
+ }
return result;
}
diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/BatchedTableOrphanFilesDeletionTask.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/BatchedTableOrphanFilesDeletionTask.java
new file mode 100644
index 000000000..7c2e13c1b
--- /dev/null
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/BatchedTableOrphanFilesDeletionTask.java
@@ -0,0 +1,79 @@
+package com.linkedin.openhouse.jobs.scheduler.tasks;
+
+import com.linkedin.openhouse.jobs.client.JobsClient;
+import com.linkedin.openhouse.jobs.client.TablesClient;
+import com.linkedin.openhouse.jobs.client.model.JobConf;
+import com.linkedin.openhouse.jobs.util.TableMetadata;
+import com.linkedin.openhouse.jobs.util.TableMetadataBatch;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * A task to remove orphan files from a batch of tables in a single Spark job. Pairs with {@code
+ * com.linkedin.openhouse.jobs.spark.BatchedOrphanFilesDeletionSparkApp} via the {@link
+ * JobConf.JobTypeEnum#ORPHAN_FILES_DELETION_BATCH} JobType.
+ *
+ *
The legacy {@link com.linkedin.openhouse.jobs.scheduler.JobsScheduler} pre-dates the optimizer
+ * service, so this task omits the optimizer-only CLI flags ({@code --resultsEndpoint}, {@code
+ * --operationIds}, {@code --tableUuids}). The Spark app treats them as optional and falls back to
+ * HTS-only lifecycle tracking when they are absent.
+ *
+ * @see Delete
+ * orphan files
+ */
+@Slf4j
+@Getter
+public class BatchedTableOrphanFilesDeletionTask extends OperationTask {
+ public static final JobConf.JobTypeEnum OPERATION_TYPE =
+ JobConf.JobTypeEnum.ORPHAN_FILES_DELETION_BATCH;
+
+ public BatchedTableOrphanFilesDeletionTask(
+ JobsClient jobsClient,
+ TablesClient tablesClient,
+ TableMetadataBatch metadata,
+ long pollIntervalMs,
+ long queuedTimeoutMs,
+ long taskTimeoutMs) {
+ super(jobsClient, tablesClient, metadata, pollIntervalMs, queuedTimeoutMs, taskTimeoutMs);
+ }
+
+ public BatchedTableOrphanFilesDeletionTask(
+ JobsClient jobsClient, TablesClient tablesClient, TableMetadataBatch metadata) {
+ super(jobsClient, tablesClient, metadata);
+ }
+
+ @Override
+ public JobConf.JobTypeEnum getType() {
+ return OPERATION_TYPE;
+ }
+
+ @Override
+ protected List getArgs() {
+ String tableNames =
+ metadata.getTables().stream().map(TableMetadata::fqtn).collect(Collectors.joining(","));
+ return Arrays.asList("--tableNames", tableNames);
+ }
+
+ @Override
+ protected boolean shouldRun() {
+ return !metadata.getTables().isEmpty();
+ }
+
+ @Override
+ protected boolean launchJob() {
+ String jobName =
+ String.format("%s_%s_%d", getType(), metadata.getDbName(), metadata.getTables().size());
+ Map executionProperties = Collections.emptyMap();
+ String proxyUser = metadata.getTables().get(0).getCreator();
+ jobId =
+ jobsClient
+ .launch(jobName, getType(), proxyUser, executionProperties, getArgs())
+ .orElse(null);
+ return jobId != null;
+ }
+}
diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java
index 37e3df399..7de5bf926 100644
--- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java
@@ -9,6 +9,7 @@
import com.linkedin.openhouse.jobs.client.TablesClient;
import com.linkedin.openhouse.jobs.client.model.JobConf;
import com.linkedin.openhouse.jobs.scheduler.JobsScheduler;
+import com.linkedin.openhouse.jobs.spark.BatchedOrphanFilesDeletionSparkApp;
import com.linkedin.openhouse.jobs.util.AppConstants;
import com.linkedin.openhouse.jobs.util.DataLayoutUtil;
import com.linkedin.openhouse.jobs.util.DatabaseMetadata;
@@ -16,10 +17,15 @@
import com.linkedin.openhouse.jobs.util.Metadata;
import com.linkedin.openhouse.jobs.util.TableDataLayoutMetadata;
import com.linkedin.openhouse.jobs.util.TableMetadata;
+import com.linkedin.openhouse.jobs.util.TableMetadataBatch;
+import com.linkedin.openhouse.jobs.util.binpack.Bin;
+import com.linkedin.openhouse.jobs.util.binpack.BinItem;
+import com.linkedin.openhouse.jobs.util.binpack.FirstFitDecreasingBinPacker;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
@@ -40,10 +46,12 @@
public class OperationTasksBuilder {
public static final String MAX_COST_BUDGET_GB_HRS = "maxCostBudgetGbHrs";
public static final String MAX_STRATEGIES_COUNT = "maxStrategiesCount";
+ public static final String BATCH_MAX_ITEMS = "batchMaxItems";
private static final double COMPUTE_COST_WEIGHT_DEFAULT = 0.3;
private static final double COMPACTION_GAIN_WEIGHT_DEFAULT = 0.7;
private static final double MAX_COST_BUDGET_GB_HRS_DEFAULT = 1000.0;
private static final int MAX_STRATEGIES_COUNT_DEFAULT = 10;
+ private static final int BATCH_MAX_ITEMS_DEFAULT = 25;
private static final String METRICS_SCOPE = JobsScheduler.class.getName();
private final OperationTaskFactory extends OperationTask>> taskFactory;
@@ -65,6 +73,80 @@ private List> prepareTableOperationTaskList(
return processMetadataList(tableMetadataList, jobType, operationMode, otelEmitter);
}
+ /**
+ * Builds one {@link BatchedTableOrphanFilesDeletionTask} per database-scoped bin. Groups eligible
+ * tables by database (batches never cross databases), then applies the first-fit-decreasing bin
+ * packer with a per-bin item cap from {@code properties} (defaults to {@value
+ * #BATCH_MAX_ITEMS_DEFAULT}). Tables with the maintenance op disabled are filtered out before
+ * grouping.
+ */
+ private List> prepareBatchedOrphanFilesDeletionTaskList(
+ JobConf.JobTypeEnum jobType,
+ Properties properties,
+ OperationMode operationMode,
+ OtelEmitter otelEmitter) {
+ int maxItemsPerBin =
+ NumberUtils.toInt(properties.getProperty(BATCH_MAX_ITEMS), BATCH_MAX_ITEMS_DEFAULT);
+ if (maxItemsPerBin > BatchedOrphanFilesDeletionSparkApp.MAX_BATCH_SIZE) {
+ throw new IllegalArgumentException(
+ String.format(
+ "--%s=%d exceeds Spark-app ceiling MAX_BATCH_SIZE=%d",
+ BATCH_MAX_ITEMS, maxItemsPerBin, BatchedOrphanFilesDeletionSparkApp.MAX_BATCH_SIZE));
+ }
+ List eligible =
+ tablesClient.getTableMetadataList().stream()
+ .filter(t -> !t.isMaintenanceJobDisabled(jobType))
+ .collect(Collectors.toList());
+ log.info(
+ "Fetched metadata for {} batched-OFD-eligible tables; binMaxItems={}",
+ eligible.size(),
+ maxItemsPerBin);
+
+ FirstFitDecreasingBinPacker packer =
+ FirstFitDecreasingBinPacker.builder()
+ .maxItemsPerBin(maxItemsPerBin)
+ // Item-count cap only; weight/size dimensions disabled until table_stats is wired in.
+ .maxWeightPerBin(0)
+ .maxSizeBytesPerBin(0)
+ .build();
+
+ Map> byDb =
+ eligible.stream().collect(Collectors.groupingBy(TableMetadata::getDbName));
+
+ List batches = new ArrayList<>();
+ for (Map.Entry> dbGroup : byDb.entrySet()) {
+ String dbName = dbGroup.getKey();
+ List items =
+ dbGroup.getValue().stream()
+ .map(
+ t ->
+ BinItem.builder()
+ .fqtn(t.fqtn())
+ .operationId("")
+ .tableUuid("")
+ .databaseName(t.getDbName())
+ .tableName(t.getTableName())
+ .weight(1L)
+ .sizeBytes(0L)
+ .build())
+ .collect(Collectors.toList());
+ for (Bin bin : packer.pack(items)) {
+ List tablesForBin =
+ bin.items().stream()
+ .map(
+ item ->
+ dbGroup.getValue().stream()
+ .filter(t -> t.fqtn().equals(item.getFqtn()))
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException("missing table for bin")))
+ .collect(Collectors.toList());
+ batches.add(TableMetadataBatch.builder().dbName(dbName).tables(tablesForBin).build());
+ }
+ }
+ log.info("Packed {} eligible tables into {} batches", eligible.size(), batches.size());
+ return processMetadataList(batches, jobType, operationMode, otelEmitter);
+ }
+
private List> prepareReplicationOperationTaskList(
JobConf.JobTypeEnum jobType, OperationMode operationMode, OtelEmitter otelEmitter) {
List replicationSetupTableMetadataList = tablesClient.getTableMetadataList();
@@ -272,6 +354,9 @@ public List> buildOperationTaskList(
case DATA_LAYOUT_STRATEGY_GENERATION:
case SORT_STATS_COLLECTION:
return prepareTableOperationTaskList(jobType, operationMode, otelEmitter);
+ case ORPHAN_FILES_DELETION_BATCH:
+ return prepareBatchedOrphanFilesDeletionTaskList(
+ jobType, properties, operationMode, otelEmitter);
case REPLICATION:
return prepareReplicationOperationTaskList(jobType, operationMode, otelEmitter);
case DATA_LAYOUT_STRATEGY_EXECUTION:
@@ -300,6 +385,22 @@ public void buildOperationTaskListInParallel(
buildDataLayoutOperationTaskListInParallel(jobType, properties, operationMode, otelEmitter);
} else if (jobType == JobConf.JobTypeEnum.TABLE_DIRECTORY_DELETION) {
buildDatabaseLevelOperationTasksInParallel(jobType, operationMode, otelEmitter);
+ } else if (jobType == JobConf.JobTypeEnum.ORPHAN_FILES_DELETION_BATCH) {
+ // Batched OFD needs the full table set in hand before it can group-by-db and bin-pack,
+ // so we use the synchronous fetch path then enqueue the tasks in bulk.
+ List> tasks =
+ prepareBatchedOrphanFilesDeletionTaskList(
+ jobType, properties, operationMode, otelEmitter);
+ for (OperationTask> task : tasks) {
+ try {
+ operationTaskManager.addData(task);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.warn("Interrupted while enqueueing batched OFD task", e);
+ }
+ }
+ operationTaskManager.updateDataGenerationCompletion();
+ log.info("Enqueued {} batched OFD tasks for job type: {}", tasks.size(), jobType);
} else {
buildOperationTaskListInParallelInternal(jobType, operationMode, otelEmitter);
}
diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkApp.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkApp.java
new file mode 100644
index 000000000..1484b86a5
--- /dev/null
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkApp.java
@@ -0,0 +1,479 @@
+package com.linkedin.openhouse.jobs.spark;
+
+import com.google.common.collect.Iterables;
+import com.linkedin.openhouse.common.metrics.DefaultOtelConfig;
+import com.linkedin.openhouse.common.metrics.OtelEmitter;
+import com.linkedin.openhouse.jobs.exception.TableValidationException;
+import com.linkedin.openhouse.jobs.spark.optimizer.OperationUpdateRequest;
+import com.linkedin.openhouse.jobs.spark.optimizer.OptimizerServiceClient;
+import com.linkedin.openhouse.jobs.spark.state.StateManager;
+import com.linkedin.openhouse.jobs.util.AppConstants;
+import com.linkedin.openhouse.jobs.util.AppsOtelEmitter;
+import com.linkedin.openhouse.jobs.util.TableStateValidator;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.DeleteOrphanFiles;
+
+/**
+ * Batched orphan-files-deletion Spark app. One Spark job processes a list of {@code (table,
+ * operationId)} pairs that the optimizer scheduler bin-packed into a single batch. Each table is
+ * handled by a worker thread; per-table failures are caught and reported back independently — the
+ * job continues for the remaining tables and exits 0 if at least one table succeeds.
+ *
+ * This is the multi-table counterpart of {@link OrphanFilesDeletionSparkApp}. The single-table
+ * app remains the deployment unit when bin size is 1, and stays the canonical reference for the
+ * actual deletion logic.
+ *
+ *
Example invocation:
+ *
+ *
{@code
+ * com.linkedin.openhouse.jobs.spark.BatchedOrphanFilesDeletionSparkApp \
+ * --tableNames db.t1,db.t2,db.t3 \
+ * --operationIds op-uuid-1,op-uuid-2,op-uuid-3 \
+ * --tableUuids tab-uuid-1,tab-uuid-2,tab-uuid-3 \
+ * --resultsEndpoint http://optimizer.svc:8080 \
+ * --driverParallelism 4
+ * }
+ */
+@Slf4j
+public class BatchedOrphanFilesDeletionSparkApp extends BaseSparkApp {
+
+ private static final String OPERATION_TYPE = "ORPHAN_FILES_DELETION";
+ private static final String STATUS_SUCCESS = "SUCCESS";
+ private static final String STATUS_FAILED = "FAILED";
+ private static final int DEFAULT_MAX_ORPHAN_FILE_SAMPLE_SIZE = 20000;
+ private static final int DEFAULT_MIN_OFD_TTL_IN_DAYS = 3;
+
+ /**
+ * Hard ceiling on the number of tables a single batched job can carry. The wire path is parallel
+ * CSV CLI args (see {@link #buildEntries}); at ~120 chars per entry (36-char UUID × 3 lists) this
+ * gives ~24 KB on the command line, well under the typical Linux {@code ARG_MAX} of 128 KB but
+ * leaves headroom for the {@code spark-submit} envelope and JVM flags. The scheduler-driven path
+ * uses a smaller per-entry footprint but inherits the same cap for defense in depth. Operators
+ * tune the per-job batch size with {@code --batchMaxItems} (default {@code 25}); this constant is
+ * a footgun stop, not the operating point.
+ */
+ public static final int MAX_BATCH_SIZE = 200;
+
+ private final List entries;
+ private final String resultsEndpoint;
+ private final int driverParallelism;
+ private final long ttlSeconds;
+ private final String backupDir;
+ private final int concurrentDeletes;
+ private final boolean streamResults;
+ private final int maxOrphanFileSampleSize;
+
+ public BatchedOrphanFilesDeletionSparkApp(
+ String jobId,
+ StateManager stateManager,
+ OtelEmitter otelEmitter,
+ List entries,
+ String resultsEndpoint,
+ int driverParallelism,
+ long ttlSeconds,
+ String backupDir,
+ int concurrentDeletes,
+ boolean streamResults,
+ int maxOrphanFileSampleSize) {
+ super(jobId, stateManager, otelEmitter);
+ this.entries = entries;
+ this.resultsEndpoint = resultsEndpoint;
+ this.driverParallelism = Math.max(1, driverParallelism);
+ this.ttlSeconds = ttlSeconds;
+ this.backupDir = backupDir;
+ this.concurrentDeletes = concurrentDeletes;
+ this.streamResults = streamResults;
+ this.maxOrphanFileSampleSize = maxOrphanFileSampleSize;
+ }
+
+ @Override
+ protected void runInner(Operations ops) {
+ log.info(
+ "Batched OFD start: entries={} driverParallelism={} resultsEndpoint={}",
+ entries.size(),
+ driverParallelism,
+ resultsEndpoint);
+
+ if (entries.isEmpty()) {
+ log.warn("Batched OFD invoked with no entries; nothing to do");
+ return;
+ }
+
+ int successCount;
+ try (OptimizerServiceClient client = newOptimizerClient()) {
+ successCount = runBatch(ops, client);
+ }
+
+ int failureCount = entries.size() - successCount;
+ log.info(
+ "Batched OFD finished: total={} success={} failed={}",
+ entries.size(),
+ successCount,
+ failureCount);
+
+ if (successCount == 0) {
+ throw new RuntimeException(
+ String.format("All %d operations in batch failed", entries.size()));
+ }
+ }
+
+ private int runBatch(Operations ops, OptimizerServiceClient client) {
+ ExecutorService pool = Executors.newFixedThreadPool(driverParallelism);
+ try {
+ // Two-phase pipeline: submit every worker first (so they run concurrently), then await each.
+ // Pairing each Future with its BatchEntry via AbstractMap.SimpleImmutableEntry.
+ List>> submissions =
+ entries.stream()
+ .map(
+ entry ->
+ new AbstractMap.SimpleImmutableEntry<>(
+ entry, pool.submit(new TableWorker(ops, entry, client))))
+ .collect(Collectors.toList());
+ return submissions.stream()
+ .mapToInt(submission -> awaitOne(submission.getKey(), submission.getValue(), client))
+ .sum();
+ } finally {
+ shutdownPool(pool);
+ }
+ }
+
+ private int awaitOne(BatchEntry entry, Future future, OptimizerServiceClient client) {
+ try {
+ return Boolean.TRUE.equals(future.get()) ? 1 : 0;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("Worker interrupted (likely job cancellation): fqtn={}", entry.getFqtn(), e);
+ otelEmitter.count(
+ METRICS_SCOPE,
+ "optimizer_batch_interrupted",
+ 1,
+ Attributes.of(AttributeKey.stringKey(AppConstants.TABLE_NAME), entry.getFqtn()));
+ return 0;
+ } catch (ExecutionException e) {
+ // The worker catches Throwable internally and always reports its own result, so reaching
+ // here means the worker itself leaked an exception. Be defensive: post FAILED so the
+ // operation row doesn't sit SCHEDULED until the stale-timeout.
+ log.error(
+ "Worker threw outside its own catch for fqtn={} — reporting FAILED",
+ entry.getFqtn(),
+ e.getCause());
+ reportResult(entry, false, client);
+ return 0;
+ }
+ }
+
+ private void shutdownPool(ExecutorService pool) {
+ pool.shutdown();
+ try {
+ if (!pool.awaitTermination(30, TimeUnit.SECONDS)) {
+ pool.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ pool.shutdownNow();
+ }
+ }
+
+ /**
+ * Returns a client bound to {@link #resultsEndpoint}, or {@code null} when the endpoint was not
+ * configured — in that case the legacy {@link
+ * com.linkedin.openhouse.jobs.scheduler.JobsScheduler} is the caller and reports lifecycle via
+ * HTS; the per-operation optimizer callback is skipped.
+ */
+ protected OptimizerServiceClient newOptimizerClient() {
+ return resultsEndpoint == null ? null : new OptimizerServiceClient(resultsEndpoint);
+ }
+
+ /**
+ * POST the per-operation outcome to the Optimizer Service. No-op when {@code client} is null
+ * (scheduler-driven invocations); failure is logged + counted.
+ */
+ private void reportResult(BatchEntry entry, boolean success, OptimizerServiceClient client) {
+ if (client == null) {
+ return;
+ }
+ OperationUpdateRequest body =
+ OperationUpdateRequest.builder()
+ .operationId(entry.getOperationId())
+ .status(success ? STATUS_SUCCESS : STATUS_FAILED)
+ .tableUuid(entry.getTableUuid())
+ .databaseName(entry.getDatabaseName())
+ .tableName(entry.getTableName())
+ .operationType(OPERATION_TYPE)
+ .build();
+ try {
+ client.updateOperation(body);
+ } catch (IOException e) {
+ log.error(
+ "Failed to report operation result; row will stay SCHEDULED until stale-timeout: operationId={} fqtn={}",
+ entry.getOperationId(),
+ entry.getFqtn(),
+ e);
+ otelEmitter.count(
+ METRICS_SCOPE,
+ "optimizer_update_failed",
+ 1,
+ Attributes.of(AttributeKey.stringKey(AppConstants.TABLE_NAME), entry.getFqtn()));
+ }
+ }
+
+ /** One unit of work in a batched OFD job. */
+ private final class TableWorker implements Callable {
+ private final Operations ops;
+ private final BatchEntry entry;
+ private final OptimizerServiceClient client;
+
+ TableWorker(Operations ops, BatchEntry entry, OptimizerServiceClient client) {
+ this.ops = ops;
+ this.entry = entry;
+ this.client = client;
+ }
+
+ @Override
+ public Boolean call() {
+ String fqtn = entry.getFqtn();
+ boolean success = false;
+ try {
+ log.info("OFD start: fqtn={} operationId={}", fqtn, entry.getOperationId());
+ Table table = ops.getTable(fqtn);
+ long olderThanTimestampMillis =
+ System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(resolveTtlSeconds(table));
+ DeleteOrphanFiles.Result result =
+ ops.deleteOrphanFiles(
+ table,
+ olderThanTimestampMillis,
+ Boolean.parseBoolean(
+ table.properties().getOrDefault(AppConstants.BACKUP_ENABLED_KEY, "false")),
+ backupDir,
+ concurrentDeletes,
+ streamResults,
+ maxOrphanFileSampleSize);
+ // Count via iteration rather than materializing the full path list: a table with millions
+ // of orphan files would otherwise OOM the driver, and that risk multiplies with
+ // driverParallelism workers running concurrently.
+ int orphanCount = Iterables.size(result.orphanFileLocations());
+ otelEmitter.count(
+ METRICS_SCOPE,
+ AppConstants.ORPHAN_FILE_COUNT,
+ orphanCount,
+ Attributes.of(AttributeKey.stringKey(AppConstants.TABLE_NAME), fqtn));
+ validate(fqtn);
+ success = true;
+ log.info("OFD success: fqtn={} orphansDetected={}", fqtn, orphanCount);
+ } catch (Throwable t) {
+ log.error("OFD failed: fqtn={} operationId={}", fqtn, entry.getOperationId(), t);
+ } finally {
+ reportResult(entry, success, client);
+ }
+ return success;
+ }
+
+ /**
+ * Re-runs {@link TableStateValidator} — the same post-job consistency check the single-table
+ * {@link OrphanFilesDeletionSparkApp} uses — to confirm the table's manifests and metadata are
+ * intact after deletion. A failure here is treated as a failed operation: it's logged, counted,
+ * and re-thrown so the outer {@link #call()} marks {@code success=false}.
+ */
+ private void validate(String fqtn) {
+ try {
+ TableStateValidator.run(ops.spark(), fqtn);
+ } catch (TableValidationException e) {
+ log.error("Post-job validation failed: fqtn={}", fqtn, e);
+ otelEmitter.count(
+ METRICS_SCOPE,
+ "post_run_validation_error",
+ 1,
+ Attributes.of(
+ AttributeKey.stringKey(AppConstants.TABLE_NAME),
+ fqtn,
+ AttributeKey.stringKey(AppConstants.JOB_NAME),
+ BatchedOrphanFilesDeletionSparkApp.class.getSimpleName()));
+ throw e;
+ }
+ }
+
+ private long resolveTtlSeconds(Table table) {
+ long resolved = ttlSeconds;
+ if (Boolean.parseBoolean(
+ table.properties().getOrDefault(AppConstants.OFD_ONE_DAY_TTL_ENABLED_KEY, "false"))) {
+ resolved = TimeUnit.DAYS.toSeconds(1);
+ }
+ String tableType =
+ table
+ .properties()
+ .getOrDefault(AppConstants.OPENHOUSE_TABLE_TYPE_KEY, AppConstants.TABLE_TYPE_PRIMARY);
+ if (AppConstants.TABLE_TYPE_REPLICA.equals(tableType)) {
+ long days = Duration.ofSeconds(resolved).toDays();
+ if (days < DEFAULT_MIN_OFD_TTL_IN_DAYS) {
+ resolved = TimeUnit.DAYS.toSeconds(DEFAULT_MIN_OFD_TTL_IN_DAYS);
+ }
+ }
+ return resolved;
+ }
+ }
+
+ /** Per-table inputs for one operation row inside a bin. */
+ @lombok.AllArgsConstructor
+ @lombok.Builder
+ @lombok.Getter
+ @lombok.ToString
+ public static class BatchEntry {
+ private final String fqtn;
+ private final String operationId;
+ private final String tableUuid;
+ private final String databaseName;
+ private final String tableName;
+ }
+
+ public static void main(String[] args) {
+ OtelEmitter otelEmitter =
+ new AppsOtelEmitter(Collections.singletonList(DefaultOtelConfig.getOpenTelemetry()));
+ createApp(args, otelEmitter).run();
+ }
+
+ public static BatchedOrphanFilesDeletionSparkApp createApp(
+ String[] args, OtelEmitter otelEmitter) {
+ List extraOptions = new ArrayList<>();
+ extraOptions.add(
+ new Option(
+ null, "tableNames", true, "Comma-separated list of fully-qualified table names"));
+ extraOptions.add(
+ new Option(
+ null, "operationIds", true, "Comma-separated operation UUIDs, parallel to tableNames"));
+ extraOptions.add(
+ new Option(
+ null, "tableUuids", true, "Comma-separated table UUIDs, parallel to tableNames"));
+ extraOptions.add(
+ new Option(null, "resultsEndpoint", true, "Base URL of the Optimizer Service"));
+ extraOptions.add(
+ new Option(null, "driverParallelism", true, "Worker threads in this batch (default 1)"));
+ extraOptions.add(
+ new Option("tr", "trashDir", true, "Orphan files staging dir before deletion"));
+ extraOptions.add(
+ new Option(
+ "r",
+ "ttl",
+ true,
+ "How old files should be to be considered orphaned in seconds, minimum 1d is enforced"));
+ extraOptions.add(new Option("b", "backupDir", true, "Backup directory for deleted data"));
+ extraOptions.add(
+ new Option("c", "concurrentDeletes", true, "Number of concurrent deletes per table"));
+ extraOptions.add(
+ new Option(
+ null, "streamResults", false, "Stream orphan file deletions instead of collecting"));
+ extraOptions.add(
+ new Option(null, "maxOrphanFileSampleSize", true, "Max orphan file sample paths returned"));
+
+ CommandLine cmdLine = createCommandLine(args, extraOptions);
+
+ List entries =
+ buildEntries(
+ cmdLine.getOptionValue("tableNames"),
+ cmdLine.getOptionValue("operationIds"),
+ cmdLine.getOptionValue("tableUuids"));
+
+ return new BatchedOrphanFilesDeletionSparkApp(
+ getJobId(cmdLine),
+ createStateManager(cmdLine, otelEmitter),
+ otelEmitter,
+ entries,
+ cmdLine.getOptionValue("resultsEndpoint"),
+ Integer.parseInt(cmdLine.getOptionValue("driverParallelism", "1")),
+ Math.max(
+ NumberUtils.toLong(cmdLine.getOptionValue("ttl"), TimeUnit.DAYS.toSeconds(7)),
+ TimeUnit.DAYS.toSeconds(1)),
+ cmdLine.getOptionValue("backupDir", ".backup"),
+ Integer.parseInt(cmdLine.getOptionValue("concurrentDeletes", "10")),
+ cmdLine.hasOption("streamResults"),
+ Integer.parseInt(
+ cmdLine.getOptionValue(
+ "maxOrphanFileSampleSize", String.valueOf(DEFAULT_MAX_ORPHAN_FILE_SAMPLE_SIZE))));
+ }
+
+ /**
+ * Builds per-table entries from the parallel CSV CLI args. {@code tableNames} is always required;
+ * {@code operationIds} and {@code tableUuids} are optional — the legacy {@link
+ * com.linkedin.openhouse.jobs.scheduler.JobsScheduler} caller doesn't have those concepts and
+ * leaves them blank. When supplied, both must be parallel to {@code tableNames}; when omitted,
+ * the resulting {@link BatchEntry} fields are null and the optimizer callback is skipped at
+ * runtime.
+ */
+ static List buildEntries(String tableNames, String operationIds, String tableUuids) {
+ if (tableNames == null || tableNames.isEmpty()) {
+ throw new IllegalArgumentException("--tableNames is required and must be non-empty");
+ }
+ String[] tables = tableNames.split(",");
+ if (tables.length > MAX_BATCH_SIZE) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Batch size %d exceeds MAX_BATCH_SIZE=%d; reduce --batchMaxItems on the scheduler",
+ tables.length, MAX_BATCH_SIZE));
+ }
+ String[] ops = isBlank(operationIds) ? null : operationIds.split(",");
+ String[] uuids = isBlank(tableUuids) ? null : tableUuids.split(",");
+ if (ops != null && ops.length != tables.length) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Parallel-list length mismatch: tableNames=%d operationIds=%d",
+ tables.length, ops.length));
+ }
+ if (uuids != null && uuids.length != tables.length) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Parallel-list length mismatch: tableNames=%d tableUuids=%d",
+ tables.length, uuids.length));
+ }
+ List entries = new ArrayList<>(tables.length);
+ for (int i = 0; i < tables.length; i++) {
+ String fqtn = tables[i].trim();
+ String[] dbAndTable = fqtn.split("\\.", 2);
+ if (dbAndTable.length != 2 || dbAndTable[0].isEmpty() || dbAndTable[1].isEmpty()) {
+ throw new IllegalArgumentException(
+ "tableNames entries must be fully-qualified (db.table): " + fqtn);
+ }
+ entries.add(
+ BatchEntry.builder()
+ .fqtn(fqtn)
+ .operationId(ops == null ? null : ops[i].trim())
+ .tableUuid(uuids == null ? null : uuids[i].trim())
+ .databaseName(dbAndTable[0])
+ .tableName(dbAndTable[1])
+ .build());
+ }
+ return entries;
+ }
+
+ private static boolean isBlank(String s) {
+ return s == null || s.isEmpty();
+ }
+
+ /** Visible for tests. */
+ List getEntries() {
+ return Collections.unmodifiableList(entries);
+ }
+
+ /** Visible for tests. */
+ int getDriverParallelism() {
+ return driverParallelism;
+ }
+}
diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/optimizer/OperationUpdateRequest.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/optimizer/OperationUpdateRequest.java
new file mode 100644
index 000000000..715873aaa
--- /dev/null
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/optimizer/OperationUpdateRequest.java
@@ -0,0 +1,26 @@
+package com.linkedin.openhouse.jobs.spark.optimizer;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Wire-compatible body for {@code POST /v1/optimizer/operations/update} on the Optimizer Service.
+ *
+ * Mirrors {@code com.linkedin.openhouse.optimizer.api.spec.UpdateOperationRequest} from the
+ * optimizer service module so this app can be built before that module merges. Keep the two in
+ * sync.
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class OperationUpdateRequest {
+ private String operationId;
+ private String status;
+ private String tableUuid;
+ private String databaseName;
+ private String tableName;
+ private String operationType;
+}
diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/optimizer/OptimizerServiceClient.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/optimizer/OptimizerServiceClient.java
new file mode 100644
index 000000000..50cc20b02
--- /dev/null
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/optimizer/OptimizerServiceClient.java
@@ -0,0 +1,96 @@
+package com.linkedin.openhouse.jobs.spark.optimizer;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import lombok.Builder;
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+
+/**
+ * Thin OkHttp client for the Optimizer Service. The batched Spark app calls {@link
+ * #updateOperation(OperationUpdateRequest)} once per finished operation to record SUCCESS or
+ * FAILED.
+ *
+ *
Errors are surfaced as {@link IOException}; the caller decides whether to retry. Per the
+ * design, a missed update is recoverable — the operation row stays SCHEDULED and the Analyzer's
+ * stale-timeout will re-queue it.
+ *
+ *
Construct with the {@link Config} builder to override the default timeouts.
+ */
+@Slf4j
+public class OptimizerServiceClient implements AutoCloseable {
+
+ private static final MediaType JSON = MediaType.parse("application/json; charset=utf-8");
+ private static final String UPDATE_PATH = "/v1/optimizer/operations/update";
+
+ private final String baseUrl;
+ private final OkHttpClient httpClient;
+ private final ObjectMapper objectMapper;
+
+ public OptimizerServiceClient(String baseUrl) {
+ this(Config.builder().baseUrl(baseUrl).build());
+ }
+
+ public OptimizerServiceClient(Config config) {
+ this(config.getBaseUrl(), clientFor(config), new ObjectMapper());
+ }
+
+ OptimizerServiceClient(String baseUrl, OkHttpClient httpClient, ObjectMapper objectMapper) {
+ this.baseUrl = stripTrailingSlash(Objects.requireNonNull(baseUrl, "baseUrl"));
+ this.httpClient = httpClient;
+ this.objectMapper = objectMapper;
+ }
+
+ public void updateOperation(OperationUpdateRequest body) throws IOException {
+ String url = baseUrl + UPDATE_PATH;
+ String json = objectMapper.writeValueAsString(body);
+ Request request = new Request.Builder().url(url).post(RequestBody.create(json, JSON)).build();
+ try (Response response = httpClient.newCall(request).execute()) {
+ if (!response.isSuccessful()) {
+ throw new IOException(
+ String.format(
+ "Optimizer Service update failed: url=%s status=%d operationId=%s",
+ url, response.code(), body.getOperationId()));
+ }
+ log.info(
+ "Reported operation update: operationId={} status={} httpStatus={}",
+ body.getOperationId(),
+ body.getStatus(),
+ response.code());
+ }
+ }
+
+ @Override
+ public void close() {
+ httpClient.dispatcher().executorService().shutdown();
+ httpClient.connectionPool().evictAll();
+ }
+
+ private static OkHttpClient clientFor(Config config) {
+ return new OkHttpClient.Builder()
+ .connectTimeout(config.getConnectTimeoutSeconds(), TimeUnit.SECONDS)
+ .readTimeout(config.getReadTimeoutSeconds(), TimeUnit.SECONDS)
+ .writeTimeout(config.getWriteTimeoutSeconds(), TimeUnit.SECONDS)
+ .build();
+ }
+
+ private static String stripTrailingSlash(String url) {
+ return url.endsWith("/") ? url.substring(0, url.length() - 1) : url;
+ }
+
+ /** Tunable transport settings. Defaults match the previous hardcoded values. */
+ @lombok.Getter
+ @Builder
+ public static class Config {
+ private final String baseUrl;
+ @Builder.Default private final long connectTimeoutSeconds = 10L;
+ @Builder.Default private final long readTimeoutSeconds = 30L;
+ @Builder.Default private final long writeTimeoutSeconds = 30L;
+ }
+}
diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableMetadataBatch.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableMetadataBatch.java
new file mode 100644
index 000000000..5eac1da9b
--- /dev/null
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableMetadataBatch.java
@@ -0,0 +1,42 @@
+package com.linkedin.openhouse.jobs.util;
+
+import java.util.Collections;
+import java.util.List;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * A group of {@link TableMetadata} that share a database and will be processed in a single batched
+ * Spark job (e.g. {@code BatchedOrphanFilesDeletionSparkApp}).
+ *
+ *
By design the scheduler never crosses database boundaries when bin-packing — every table in
+ * {@link #tables} has the same {@link #dbName}. The {@link
+ * com.linkedin.openhouse.jobs.util.binpack.FirstFitDecreasingBinPacker} is invoked per-database;
+ * each emitted bin becomes one {@code TableMetadataBatch}.
+ */
+@Getter
+@SuperBuilder
+@EqualsAndHashCode(callSuper = true)
+@ToString(callSuper = true)
+public class TableMetadataBatch extends Metadata {
+ @NonNull protected String dbName;
+ @NonNull protected List tables;
+
+ /**
+ * Identifier used in metrics and the Jobs Service {@code jobName} — combines the database with
+ * the bin size so logs/dashboards distinguish bins of different fan-outs without exposing every
+ * fqtn.
+ */
+ @Override
+ public String getEntityName() {
+ return String.format("%s[%d]", dbName, tables.size());
+ }
+
+ /** Unmodifiable view of the underlying tables list. */
+ public List getTables() {
+ return Collections.unmodifiableList(tables);
+ }
+}
diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/binpack/Bin.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/binpack/Bin.java
new file mode 100644
index 000000000..0b40b4958
--- /dev/null
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/binpack/Bin.java
@@ -0,0 +1,49 @@
+package com.linkedin.openhouse.jobs.util.binpack;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import lombok.Getter;
+import lombok.ToString;
+
+/**
+ * Mutable accumulator used by {@link FirstFitDecreasingBinPacker}. After packing completes the
+ * caller treats the returned bins as immutable; {@link #items()} returns an unmodifiable view.
+ */
+@ToString
+public class Bin {
+ private final List items = new ArrayList<>();
+ @Getter private long totalWeight;
+ @Getter private long totalSizeBytes;
+
+ /**
+ * Returns true iff adding {@code item} would keep this bin at or below all three caps. A cap of
+ * {@code <= 0} disables that dimension.
+ */
+ boolean fits(BinItem item, long maxWeight, long maxSizeBytes, int maxItems) {
+ if (maxItems > 0 && items.size() >= maxItems) {
+ return false;
+ }
+ if (maxWeight > 0 && totalWeight + item.getWeight() > maxWeight) {
+ return false;
+ }
+ if (maxSizeBytes > 0 && totalSizeBytes + item.getSizeBytes() > maxSizeBytes) {
+ return false;
+ }
+ return true;
+ }
+
+ void add(BinItem item) {
+ items.add(item);
+ totalWeight += item.getWeight();
+ totalSizeBytes += item.getSizeBytes();
+ }
+
+ public List items() {
+ return Collections.unmodifiableList(items);
+ }
+
+ public int size() {
+ return items.size();
+ }
+}
diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/binpack/BinItem.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/binpack/BinItem.java
new file mode 100644
index 000000000..68bcb16e2
--- /dev/null
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/binpack/BinItem.java
@@ -0,0 +1,29 @@
+package com.linkedin.openhouse.jobs.util.binpack;
+
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.ToString;
+
+/**
+ * A single packable unit for {@link FirstFitDecreasingBinPacker}. Carries everything the batched
+ * Spark app needs both to do the work ({@link #fqtn}) and to report the result back to the
+ * Optimizer Service ({@link #operationId}, {@link #tableUuid}, {@link #databaseName}, {@link
+ * #tableName}).
+ *
+ * {@link #weight} is the bin-packing dimension (for OFD: number of current files in the table).
+ * {@link #sizeBytes} is a secondary capacity dimension that lets the packer cap the total on-disk
+ * footprint of a bin independently of file count.
+ */
+@Getter
+@Builder
+@ToString
+public class BinItem {
+ @NonNull private final String fqtn;
+ @NonNull private final String operationId;
+ @NonNull private final String tableUuid;
+ @NonNull private final String databaseName;
+ @NonNull private final String tableName;
+ private final long weight;
+ private final long sizeBytes;
+}
diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/binpack/FirstFitDecreasingBinPacker.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/binpack/FirstFitDecreasingBinPacker.java
new file mode 100644
index 000000000..71009d3ff
--- /dev/null
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/binpack/FirstFitDecreasingBinPacker.java
@@ -0,0 +1,70 @@
+package com.linkedin.openhouse.jobs.util.binpack;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+import lombok.Builder;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * First-fit-decreasing bin packer used by the optimizer scheduler to group table operations into
+ * batches before launching a single Spark job per batch.
+ *
+ *
Each bin has three independent caps:
+ *
+ *
+ * {@code maxWeightPerBin} — total {@link BinItem#getWeight()} (for OFD: number of files)
+ * {@code maxSizeBytesPerBin} — total on-disk size of all tables in the bin
+ * {@code maxItemsPerBin} — number of tables per bin
+ *
+ *
+ * An item that exceeds any single cap on its own is placed into a bin by itself rather than
+ * dropped — we never silently skip maintenance work for an oversized table.
+ *
+ *
Pass {@code 0} or a negative value for any cap to disable that dimension.
+ */
+@Slf4j
+@Builder
+public class FirstFitDecreasingBinPacker {
+
+ @Builder.Default private final long maxWeightPerBin = 1_000_000L;
+ @Builder.Default private final long maxSizeBytesPerBin = 5L * 1024L * 1024L * 1024L * 1024L;
+ @Builder.Default private final int maxItemsPerBin = 50;
+
+ public List pack(List items) {
+ if (items == null || items.isEmpty()) {
+ return new ArrayList<>();
+ }
+
+ List sorted =
+ items.stream()
+ .sorted(Comparator.comparingLong(BinItem::getWeight).reversed())
+ .collect(Collectors.toList());
+
+ List bins = new ArrayList<>();
+ for (BinItem item : sorted) {
+ Bin target = null;
+ for (Bin bin : bins) {
+ if (bin.fits(item, maxWeightPerBin, maxSizeBytesPerBin, maxItemsPerBin)) {
+ target = bin;
+ break;
+ }
+ }
+ if (target == null) {
+ target = new Bin();
+ bins.add(target);
+ if (!target.fits(item, maxWeightPerBin, maxSizeBytesPerBin, maxItemsPerBin)) {
+ log.warn(
+ "Item exceeds per-bin caps on its own; placing in dedicated bin: fqtn={} weight={} sizeBytes={}",
+ item.getFqtn(),
+ item.getWeight(),
+ item.getSizeBytes());
+ }
+ }
+ target.add(item);
+ }
+ log.info("Packed {} items into {} bins", items.size(), bins.size());
+ return bins;
+ }
+}
diff --git a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/scheduler/tasks/BatchedTableOrphanFilesDeletionTaskTest.java b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/scheduler/tasks/BatchedTableOrphanFilesDeletionTaskTest.java
new file mode 100644
index 000000000..134720c49
--- /dev/null
+++ b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/scheduler/tasks/BatchedTableOrphanFilesDeletionTaskTest.java
@@ -0,0 +1,91 @@
+package com.linkedin.openhouse.jobs.scheduler.tasks;
+
+import com.linkedin.openhouse.jobs.client.JobsClient;
+import com.linkedin.openhouse.jobs.client.TablesClient;
+import com.linkedin.openhouse.jobs.client.model.JobConf;
+import com.linkedin.openhouse.jobs.util.TableMetadata;
+import com.linkedin.openhouse.jobs.util.TableMetadataBatch;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+public class BatchedTableOrphanFilesDeletionTaskTest {
+
+ @Test
+ public void operationTypeIsBatchedOfd() {
+ Assertions.assertEquals(
+ JobConf.JobTypeEnum.ORPHAN_FILES_DELETION_BATCH,
+ BatchedTableOrphanFilesDeletionTask.OPERATION_TYPE);
+ }
+
+ @Test
+ public void getArgsBuildsCsvOfFqtns() {
+ TableMetadataBatch batch =
+ TableMetadataBatch.builder()
+ .dbName("db")
+ .tables(Arrays.asList(table("db", "t1"), table("db", "t2"), table("db", "t3")))
+ .build();
+ BatchedTableOrphanFilesDeletionTask task =
+ new BatchedTableOrphanFilesDeletionTask(
+ Mockito.mock(JobsClient.class), Mockito.mock(TablesClient.class), batch);
+
+ Assertions.assertEquals(Arrays.asList("--tableNames", "db.t1,db.t2,db.t3"), task.getArgs());
+ }
+
+ @Test
+ public void shouldRunFalseForEmptyBatch() {
+ TableMetadataBatch batch =
+ TableMetadataBatch.builder().dbName("db").tables(Collections.emptyList()).build();
+ BatchedTableOrphanFilesDeletionTask task =
+ new BatchedTableOrphanFilesDeletionTask(
+ Mockito.mock(JobsClient.class), Mockito.mock(TablesClient.class), batch);
+
+ Assertions.assertFalse(task.shouldRun());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void launchJobUsesBatchScopedJobName() {
+ JobsClient jobsClient = Mockito.mock(JobsClient.class);
+ Mockito.when(
+ jobsClient.launch(
+ Mockito.anyString(),
+ Mockito.eq(JobConf.JobTypeEnum.ORPHAN_FILES_DELETION_BATCH),
+ Mockito.anyString(),
+ Mockito.anyMap(),
+ Mockito.anyList()))
+ .thenReturn(Optional.of("job-42"));
+
+ TableMetadataBatch batch =
+ TableMetadataBatch.builder()
+ .dbName("warehouse")
+ .tables(Arrays.asList(table("warehouse", "a"), table("warehouse", "b")))
+ .build();
+ BatchedTableOrphanFilesDeletionTask task =
+ new BatchedTableOrphanFilesDeletionTask(
+ jobsClient, Mockito.mock(TablesClient.class), batch);
+
+ boolean launched = task.launchJob();
+
+ Assertions.assertTrue(launched);
+ Assertions.assertEquals("job-42", task.getJobId());
+ ArgumentCaptor jobNameCaptor = ArgumentCaptor.forClass(String.class);
+ Mockito.verify(jobsClient)
+ .launch(
+ jobNameCaptor.capture(),
+ Mockito.eq(JobConf.JobTypeEnum.ORPHAN_FILES_DELETION_BATCH),
+ Mockito.anyString(),
+ Mockito.anyMap(),
+ Mockito.anyList());
+ // Job name carries db + batch size; nothing per-table is embedded so the string stays bounded.
+ Assertions.assertEquals("ORPHAN_FILES_DELETION_BATCH_warehouse_2", jobNameCaptor.getValue());
+ }
+
+ private static TableMetadata table(String db, String name) {
+ return TableMetadata.builder().dbName(db).tableName(name).creator("test-user").build();
+ }
+}
diff --git a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkAppArgsTest.java b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkAppArgsTest.java
new file mode 100644
index 000000000..73a02017f
--- /dev/null
+++ b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkAppArgsTest.java
@@ -0,0 +1,113 @@
+package com.linkedin.openhouse.jobs.spark;
+
+import java.util.List;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Pure-Java unit tests for {@link BatchedOrphanFilesDeletionSparkApp#buildEntries}. No Spark
+ * session, no HTTP — exercises the CLI-parsing edges that decide whether the app can even start.
+ */
+public class BatchedOrphanFilesDeletionSparkAppArgsTest {
+
+ @Test
+ public void buildEntriesParsesParallelLists() {
+ List entries =
+ BatchedOrphanFilesDeletionSparkApp.buildEntries(
+ "db1.t1,db2.t2", "op-1,op-2", "uuid-1,uuid-2");
+
+ Assertions.assertEquals(2, entries.size());
+ Assertions.assertEquals("db1.t1", entries.get(0).getFqtn());
+ Assertions.assertEquals("db1", entries.get(0).getDatabaseName());
+ Assertions.assertEquals("t1", entries.get(0).getTableName());
+ Assertions.assertEquals("op-1", entries.get(0).getOperationId());
+ Assertions.assertEquals("uuid-1", entries.get(0).getTableUuid());
+ Assertions.assertEquals("db2.t2", entries.get(1).getFqtn());
+ Assertions.assertEquals("op-2", entries.get(1).getOperationId());
+ }
+
+ @Test
+ public void buildEntriesTrimsWhitespaceInEachEntry() {
+ List entries =
+ BatchedOrphanFilesDeletionSparkApp.buildEntries(
+ " db1.t1 , db2.t2 ", " op-1 , op-2 ", " uuid-1 , uuid-2 ");
+
+ Assertions.assertEquals("db1.t1", entries.get(0).getFqtn());
+ Assertions.assertEquals("op-1", entries.get(0).getOperationId());
+ Assertions.assertEquals("uuid-1", entries.get(0).getTableUuid());
+ }
+
+ @Test
+ public void buildEntriesRejectsMismatchedLengths() {
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ BatchedOrphanFilesDeletionSparkApp.buildEntries("db.a,db.b", "op-1", "uuid-1,uuid-2"));
+ }
+
+ @Test
+ public void buildEntriesRejectsNullOrEmptyTableNames() {
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> BatchedOrphanFilesDeletionSparkApp.buildEntries(null, "op-1", "uuid-1"));
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> BatchedOrphanFilesDeletionSparkApp.buildEntries("", "op-1", "uuid-1"));
+ }
+
+ @Test
+ public void buildEntriesAllowsAbsentOperationIdsAndTableUuids() {
+ // The legacy JobsScheduler path doesn't know about optimizer-service operationIds or table
+ // UUIDs — it just passes the tables. Both null and empty should produce entries with null
+ // optional fields, no exception.
+ List entriesNull =
+ BatchedOrphanFilesDeletionSparkApp.buildEntries("db.a,db.b", null, null);
+ List entriesEmpty =
+ BatchedOrphanFilesDeletionSparkApp.buildEntries("db.a,db.b", "", "");
+
+ for (List entries :
+ java.util.Arrays.asList(entriesNull, entriesEmpty)) {
+ Assertions.assertEquals(2, entries.size());
+ Assertions.assertEquals("db.a", entries.get(0).getFqtn());
+ Assertions.assertNull(entries.get(0).getOperationId());
+ Assertions.assertNull(entries.get(0).getTableUuid());
+ Assertions.assertEquals("db.b", entries.get(1).getFqtn());
+ Assertions.assertNull(entries.get(1).getOperationId());
+ }
+ }
+
+ @Test
+ public void buildEntriesRejectsNonFqtn() {
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> BatchedOrphanFilesDeletionSparkApp.buildEntries("just_a_table", "op-1", "uuid-1"));
+ }
+
+ @Test
+ public void buildEntriesAcceptsAtMaxBatchSize() {
+ String tableNames = generateFqtnCsv(BatchedOrphanFilesDeletionSparkApp.MAX_BATCH_SIZE);
+ List entries =
+ BatchedOrphanFilesDeletionSparkApp.buildEntries(tableNames, null, null);
+ Assertions.assertEquals(BatchedOrphanFilesDeletionSparkApp.MAX_BATCH_SIZE, entries.size());
+ }
+
+ @Test
+ public void buildEntriesRejectsAboveMaxBatchSize() {
+ String tableNames = generateFqtnCsv(BatchedOrphanFilesDeletionSparkApp.MAX_BATCH_SIZE + 1);
+ IllegalArgumentException ex =
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> BatchedOrphanFilesDeletionSparkApp.buildEntries(tableNames, null, null));
+ Assertions.assertTrue(
+ ex.getMessage().contains("MAX_BATCH_SIZE"), "error should reference the constant name");
+ }
+
+ private static String generateFqtnCsv(int n) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < n; i++) {
+ if (i > 0) sb.append(',');
+ sb.append("db.t").append(i);
+ }
+ return sb.toString();
+ }
+}
diff --git a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/util/binpack/FirstFitDecreasingBinPackerTest.java b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/util/binpack/FirstFitDecreasingBinPackerTest.java
new file mode 100644
index 000000000..d77944772
--- /dev/null
+++ b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/util/binpack/FirstFitDecreasingBinPackerTest.java
@@ -0,0 +1,150 @@
+package com.linkedin.openhouse.jobs.util.binpack;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class FirstFitDecreasingBinPackerTest {
+
+ @Test
+ public void emptyInputProducesEmptyOutput() {
+ List bins = packer(100, 0, 50).pack(Collections.emptyList());
+ Assertions.assertTrue(bins.isEmpty());
+ }
+
+ @Test
+ public void nullInputProducesEmptyOutput() {
+ List bins = packer(100, 0, 50).pack(null);
+ Assertions.assertTrue(bins.isEmpty());
+ }
+
+ @Test
+ public void itemsSortDescendingByWeightBeforePacking() {
+ List items =
+ Arrays.asList(item("db.t_small", 10), item("db.t_big", 100), item("db.t_mid", 50));
+
+ List bins = packer(1000, 0, 50).pack(items);
+
+ // Everything fits in one bin since capacity is huge; order inside the bin must be descending.
+ Assertions.assertEquals(1, bins.size());
+ Bin only = bins.get(0);
+ Assertions.assertEquals(3, only.size());
+ Assertions.assertEquals("db.t_big", only.items().get(0).getFqtn());
+ Assertions.assertEquals("db.t_mid", only.items().get(1).getFqtn());
+ Assertions.assertEquals("db.t_small", only.items().get(2).getFqtn());
+ Assertions.assertEquals(160, only.getTotalWeight());
+ }
+
+ @Test
+ public void weightCapForcesMultipleBins() {
+ List items =
+ Arrays.asList(item("db.a", 60), item("db.b", 50), item("db.c", 40), item("db.d", 30));
+
+ List bins = packer(100, 0, 50).pack(items);
+
+ // FFD on [60, 50, 40, 30] with cap 100:
+ // bin0: 60 -> remaining 40
+ // bin0 tries 50 -> doesn't fit, new bin1: 50
+ // bin0 tries 40 -> fits, bin0: 60+40=100
+ // bin1 tries 30 -> fits, bin1: 50+30=80
+ Assertions.assertEquals(2, bins.size());
+ Assertions.assertEquals(100, bins.get(0).getTotalWeight());
+ Assertions.assertEquals(80, bins.get(1).getTotalWeight());
+ }
+
+ @Test
+ public void maxItemsPerBinCapHonored() {
+ List items =
+ IntStream.range(0, 5).mapToObj(i -> item("db.t" + i, 1)).collect(Collectors.toList());
+
+ List bins = packer(1000, 0, 2).pack(items);
+
+ Assertions.assertEquals(3, bins.size());
+ Assertions.assertEquals(2, bins.get(0).size());
+ Assertions.assertEquals(2, bins.get(1).size());
+ Assertions.assertEquals(1, bins.get(2).size());
+ }
+
+ @Test
+ public void maxSizeBytesCapHonored() {
+ List items =
+ Arrays.asList(
+ BinItem.builder()
+ .fqtn("db.a")
+ .operationId("op-a")
+ .tableUuid("uuid-a")
+ .databaseName("db")
+ .tableName("a")
+ .weight(1)
+ .sizeBytes(800L)
+ .build(),
+ BinItem.builder()
+ .fqtn("db.b")
+ .operationId("op-b")
+ .tableUuid("uuid-b")
+ .databaseName("db")
+ .tableName("b")
+ .weight(1)
+ .sizeBytes(800L)
+ .build());
+
+ List bins = packer(1000, 1000L, 50).pack(items);
+
+ Assertions.assertEquals(2, bins.size());
+ Assertions.assertEquals(800L, bins.get(0).getTotalSizeBytes());
+ Assertions.assertEquals(800L, bins.get(1).getTotalSizeBytes());
+ }
+
+ @Test
+ public void oversizedItemGetsItsOwnBinRatherThanBeingDropped() {
+ List items =
+ Arrays.asList(item("db.tiny1", 10), item("db.giant", 500), item("db.tiny2", 10));
+
+ List bins = packer(100, 0, 50).pack(items);
+
+ // Giant exceeds the cap on its own — must still appear in some bin.
+ long total = bins.stream().mapToLong(Bin::getTotalWeight).sum();
+ Assertions.assertEquals(520, total);
+ boolean giantPresent =
+ bins.stream()
+ .flatMap(b -> b.items().stream())
+ .anyMatch(i -> i.getFqtn().equals("db.giant"));
+ Assertions.assertTrue(giantPresent, "oversized item must not be dropped");
+ }
+
+ @Test
+ public void disabledCapsLetEverythingShareOneBin() {
+ List items =
+ IntStream.range(0, 20).mapToObj(i -> item("db.t" + i, 100)).collect(Collectors.toList());
+
+ List bins = packer(0, 0, 0).pack(items);
+
+ Assertions.assertEquals(1, bins.size());
+ Assertions.assertEquals(20, bins.get(0).size());
+ }
+
+ private static FirstFitDecreasingBinPacker packer(long maxWeight, long maxSize, int maxItems) {
+ return FirstFitDecreasingBinPacker.builder()
+ .maxWeightPerBin(maxWeight)
+ .maxSizeBytesPerBin(maxSize)
+ .maxItemsPerBin(maxItems)
+ .build();
+ }
+
+ private static BinItem item(String fqtn, long weight) {
+ String[] parts = fqtn.split("\\.", 2);
+ return BinItem.builder()
+ .fqtn(fqtn)
+ .operationId("op-" + parts[1])
+ .tableUuid("uuid-" + parts[1])
+ .databaseName(parts[0])
+ .tableName(parts[1])
+ .weight(weight)
+ .sizeBytes(0L)
+ .build();
+ }
+}
diff --git a/infra/recipes/docker-compose/oh-hadoop-spark/jobs.yaml b/infra/recipes/docker-compose/oh-hadoop-spark/jobs.yaml
index 3e795d4db..58cce65ce 100644
--- a/infra/recipes/docker-compose/oh-hadoop-spark/jobs.yaml
+++ b/infra/recipes/docker-compose/oh-hadoop-spark/jobs.yaml
@@ -58,6 +58,14 @@ jobs:
<<: *spark-defaults
"spark.driver.memory": "2g"
<< : *livy-engine
+ - type: ORPHAN_FILES_DELETION_BATCH
+ class-name: com.linkedin.openhouse.jobs.spark.BatchedOrphanFilesDeletionSparkApp
+ args: ["--backupDir", ".backup"]
+ <<: *apps-defaults
+ spark-properties:
+ <<: *spark-defaults
+ "spark.driver.memory": "2g"
+ << : *livy-engine
- type: STAGED_FILES_DELETION
class-name: com.linkedin.openhouse.jobs.spark.StagedFilesDeletionSparkApp
args: ["--trashDir", ".trash", "--daysOld", "10", "--recursive", "true"]
diff --git a/services/jobs/src/main/java/com/linkedin/openhouse/jobs/model/JobConf.java b/services/jobs/src/main/java/com/linkedin/openhouse/jobs/model/JobConf.java
index 4c2d2b3da..36764bb84 100644
--- a/services/jobs/src/main/java/com/linkedin/openhouse/jobs/model/JobConf.java
+++ b/services/jobs/src/main/java/com/linkedin/openhouse/jobs/model/JobConf.java
@@ -27,6 +27,12 @@ public enum JobType {
SQL_TEST,
RETENTION,
ORPHAN_FILES_DELETION,
+ /**
+ * Multi-table orphan-files-deletion. One Spark job processes a list of tables grouped by
+ * database — bin-packing happens scheduler-side. See {@code
+ * BatchedOrphanFilesDeletionSparkApp}.
+ */
+ ORPHAN_FILES_DELETION_BATCH,
SNAPSHOTS_EXPIRATION,
STAGED_FILES_DELETION,
DATA_COMPACTION,