diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/JobsScheduler.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/JobsScheduler.java index 2067fa014..361116288 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/JobsScheduler.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/JobsScheduler.java @@ -583,6 +583,13 @@ protected static CommandLine parseArgs(String[] args) { .longOpt(OperationTasksBuilder.MAX_COST_BUDGET_GB_HRS) .desc("Maximum compute cost budget in GB hours") .build()); + options.addOption( + Option.builder(null) + .required(false) + .hasArg() + .longOpt(OperationTasksBuilder.BATCH_MAX_ITEMS) + .desc("Max tables per batched OFD job (ORPHAN_FILES_DELETION_BATCH only)") + .build()); options.addOption( Option.builder(null) .required(false) @@ -744,6 +751,11 @@ protected static Properties getAdditionalProperties(CommandLine cmdLine) { OperationTasksBuilder.MAX_STRATEGIES_COUNT, cmdLine.getOptionValue(OperationTasksBuilder.MAX_STRATEGIES_COUNT)); } + if (cmdLine.hasOption(OperationTasksBuilder.BATCH_MAX_ITEMS)) { + result.setProperty( + OperationTasksBuilder.BATCH_MAX_ITEMS, + cmdLine.getOptionValue(OperationTasksBuilder.BATCH_MAX_ITEMS)); + } return result; } diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/BatchedTableOrphanFilesDeletionTask.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/BatchedTableOrphanFilesDeletionTask.java new file mode 100644 index 000000000..7c2e13c1b --- /dev/null +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/BatchedTableOrphanFilesDeletionTask.java @@ -0,0 +1,79 @@ +package com.linkedin.openhouse.jobs.scheduler.tasks; + +import com.linkedin.openhouse.jobs.client.JobsClient; +import com.linkedin.openhouse.jobs.client.TablesClient; +import com.linkedin.openhouse.jobs.client.model.JobConf; +import com.linkedin.openhouse.jobs.util.TableMetadata; +import com.linkedin.openhouse.jobs.util.TableMetadataBatch; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * A task to remove orphan files from a batch of tables in a single Spark job. Pairs with {@code + * com.linkedin.openhouse.jobs.spark.BatchedOrphanFilesDeletionSparkApp} via the {@link + * JobConf.JobTypeEnum#ORPHAN_FILES_DELETION_BATCH} JobType. + * + *

The legacy {@link com.linkedin.openhouse.jobs.scheduler.JobsScheduler} pre-dates the optimizer + * service, so this task omits the optimizer-only CLI flags ({@code --resultsEndpoint}, {@code + * --operationIds}, {@code --tableUuids}). The Spark app treats them as optional and falls back to + * HTS-only lifecycle tracking when they are absent. + * + * @see Delete + * orphan files + */ +@Slf4j +@Getter +public class BatchedTableOrphanFilesDeletionTask extends OperationTask { + public static final JobConf.JobTypeEnum OPERATION_TYPE = + JobConf.JobTypeEnum.ORPHAN_FILES_DELETION_BATCH; + + public BatchedTableOrphanFilesDeletionTask( + JobsClient jobsClient, + TablesClient tablesClient, + TableMetadataBatch metadata, + long pollIntervalMs, + long queuedTimeoutMs, + long taskTimeoutMs) { + super(jobsClient, tablesClient, metadata, pollIntervalMs, queuedTimeoutMs, taskTimeoutMs); + } + + public BatchedTableOrphanFilesDeletionTask( + JobsClient jobsClient, TablesClient tablesClient, TableMetadataBatch metadata) { + super(jobsClient, tablesClient, metadata); + } + + @Override + public JobConf.JobTypeEnum getType() { + return OPERATION_TYPE; + } + + @Override + protected List getArgs() { + String tableNames = + metadata.getTables().stream().map(TableMetadata::fqtn).collect(Collectors.joining(",")); + return Arrays.asList("--tableNames", tableNames); + } + + @Override + protected boolean shouldRun() { + return !metadata.getTables().isEmpty(); + } + + @Override + protected boolean launchJob() { + String jobName = + String.format("%s_%s_%d", getType(), metadata.getDbName(), metadata.getTables().size()); + Map executionProperties = Collections.emptyMap(); + String proxyUser = metadata.getTables().get(0).getCreator(); + jobId = + jobsClient + .launch(jobName, getType(), proxyUser, executionProperties, getArgs()) + .orElse(null); + return jobId != null; + } +} diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java index 37e3df399..7de5bf926 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java @@ -9,6 +9,7 @@ import com.linkedin.openhouse.jobs.client.TablesClient; import com.linkedin.openhouse.jobs.client.model.JobConf; import com.linkedin.openhouse.jobs.scheduler.JobsScheduler; +import com.linkedin.openhouse.jobs.spark.BatchedOrphanFilesDeletionSparkApp; import com.linkedin.openhouse.jobs.util.AppConstants; import com.linkedin.openhouse.jobs.util.DataLayoutUtil; import com.linkedin.openhouse.jobs.util.DatabaseMetadata; @@ -16,10 +17,15 @@ import com.linkedin.openhouse.jobs.util.Metadata; import com.linkedin.openhouse.jobs.util.TableDataLayoutMetadata; import com.linkedin.openhouse.jobs.util.TableMetadata; +import com.linkedin.openhouse.jobs.util.TableMetadataBatch; +import com.linkedin.openhouse.jobs.util.binpack.Bin; +import com.linkedin.openhouse.jobs.util.binpack.BinItem; +import com.linkedin.openhouse.jobs.util.binpack.FirstFitDecreasingBinPacker; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.stream.Collectors; @@ -40,10 +46,12 @@ public class OperationTasksBuilder { public static final String MAX_COST_BUDGET_GB_HRS = "maxCostBudgetGbHrs"; public static final String MAX_STRATEGIES_COUNT = "maxStrategiesCount"; + public static final String BATCH_MAX_ITEMS = "batchMaxItems"; private static final double COMPUTE_COST_WEIGHT_DEFAULT = 0.3; private static final double COMPACTION_GAIN_WEIGHT_DEFAULT = 0.7; private static final double MAX_COST_BUDGET_GB_HRS_DEFAULT = 1000.0; private static final int MAX_STRATEGIES_COUNT_DEFAULT = 10; + private static final int BATCH_MAX_ITEMS_DEFAULT = 25; private static final String METRICS_SCOPE = JobsScheduler.class.getName(); private final OperationTaskFactory> taskFactory; @@ -65,6 +73,80 @@ private List> prepareTableOperationTaskList( return processMetadataList(tableMetadataList, jobType, operationMode, otelEmitter); } + /** + * Builds one {@link BatchedTableOrphanFilesDeletionTask} per database-scoped bin. Groups eligible + * tables by database (batches never cross databases), then applies the first-fit-decreasing bin + * packer with a per-bin item cap from {@code properties} (defaults to {@value + * #BATCH_MAX_ITEMS_DEFAULT}). Tables with the maintenance op disabled are filtered out before + * grouping. + */ + private List> prepareBatchedOrphanFilesDeletionTaskList( + JobConf.JobTypeEnum jobType, + Properties properties, + OperationMode operationMode, + OtelEmitter otelEmitter) { + int maxItemsPerBin = + NumberUtils.toInt(properties.getProperty(BATCH_MAX_ITEMS), BATCH_MAX_ITEMS_DEFAULT); + if (maxItemsPerBin > BatchedOrphanFilesDeletionSparkApp.MAX_BATCH_SIZE) { + throw new IllegalArgumentException( + String.format( + "--%s=%d exceeds Spark-app ceiling MAX_BATCH_SIZE=%d", + BATCH_MAX_ITEMS, maxItemsPerBin, BatchedOrphanFilesDeletionSparkApp.MAX_BATCH_SIZE)); + } + List eligible = + tablesClient.getTableMetadataList().stream() + .filter(t -> !t.isMaintenanceJobDisabled(jobType)) + .collect(Collectors.toList()); + log.info( + "Fetched metadata for {} batched-OFD-eligible tables; binMaxItems={}", + eligible.size(), + maxItemsPerBin); + + FirstFitDecreasingBinPacker packer = + FirstFitDecreasingBinPacker.builder() + .maxItemsPerBin(maxItemsPerBin) + // Item-count cap only; weight/size dimensions disabled until table_stats is wired in. + .maxWeightPerBin(0) + .maxSizeBytesPerBin(0) + .build(); + + Map> byDb = + eligible.stream().collect(Collectors.groupingBy(TableMetadata::getDbName)); + + List batches = new ArrayList<>(); + for (Map.Entry> dbGroup : byDb.entrySet()) { + String dbName = dbGroup.getKey(); + List items = + dbGroup.getValue().stream() + .map( + t -> + BinItem.builder() + .fqtn(t.fqtn()) + .operationId("") + .tableUuid("") + .databaseName(t.getDbName()) + .tableName(t.getTableName()) + .weight(1L) + .sizeBytes(0L) + .build()) + .collect(Collectors.toList()); + for (Bin bin : packer.pack(items)) { + List tablesForBin = + bin.items().stream() + .map( + item -> + dbGroup.getValue().stream() + .filter(t -> t.fqtn().equals(item.getFqtn())) + .findFirst() + .orElseThrow(() -> new IllegalStateException("missing table for bin"))) + .collect(Collectors.toList()); + batches.add(TableMetadataBatch.builder().dbName(dbName).tables(tablesForBin).build()); + } + } + log.info("Packed {} eligible tables into {} batches", eligible.size(), batches.size()); + return processMetadataList(batches, jobType, operationMode, otelEmitter); + } + private List> prepareReplicationOperationTaskList( JobConf.JobTypeEnum jobType, OperationMode operationMode, OtelEmitter otelEmitter) { List replicationSetupTableMetadataList = tablesClient.getTableMetadataList(); @@ -272,6 +354,9 @@ public List> buildOperationTaskList( case DATA_LAYOUT_STRATEGY_GENERATION: case SORT_STATS_COLLECTION: return prepareTableOperationTaskList(jobType, operationMode, otelEmitter); + case ORPHAN_FILES_DELETION_BATCH: + return prepareBatchedOrphanFilesDeletionTaskList( + jobType, properties, operationMode, otelEmitter); case REPLICATION: return prepareReplicationOperationTaskList(jobType, operationMode, otelEmitter); case DATA_LAYOUT_STRATEGY_EXECUTION: @@ -300,6 +385,22 @@ public void buildOperationTaskListInParallel( buildDataLayoutOperationTaskListInParallel(jobType, properties, operationMode, otelEmitter); } else if (jobType == JobConf.JobTypeEnum.TABLE_DIRECTORY_DELETION) { buildDatabaseLevelOperationTasksInParallel(jobType, operationMode, otelEmitter); + } else if (jobType == JobConf.JobTypeEnum.ORPHAN_FILES_DELETION_BATCH) { + // Batched OFD needs the full table set in hand before it can group-by-db and bin-pack, + // so we use the synchronous fetch path then enqueue the tasks in bulk. + List> tasks = + prepareBatchedOrphanFilesDeletionTaskList( + jobType, properties, operationMode, otelEmitter); + for (OperationTask task : tasks) { + try { + operationTaskManager.addData(task); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("Interrupted while enqueueing batched OFD task", e); + } + } + operationTaskManager.updateDataGenerationCompletion(); + log.info("Enqueued {} batched OFD tasks for job type: {}", tasks.size(), jobType); } else { buildOperationTaskListInParallelInternal(jobType, operationMode, otelEmitter); } diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkApp.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkApp.java new file mode 100644 index 000000000..1484b86a5 --- /dev/null +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkApp.java @@ -0,0 +1,479 @@ +package com.linkedin.openhouse.jobs.spark; + +import com.google.common.collect.Iterables; +import com.linkedin.openhouse.common.metrics.DefaultOtelConfig; +import com.linkedin.openhouse.common.metrics.OtelEmitter; +import com.linkedin.openhouse.jobs.exception.TableValidationException; +import com.linkedin.openhouse.jobs.spark.optimizer.OperationUpdateRequest; +import com.linkedin.openhouse.jobs.spark.optimizer.OptimizerServiceClient; +import com.linkedin.openhouse.jobs.spark.state.StateManager; +import com.linkedin.openhouse.jobs.util.AppConstants; +import com.linkedin.openhouse.jobs.util.AppsOtelEmitter; +import com.linkedin.openhouse.jobs.util.TableStateValidator; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import java.io.IOException; +import java.time.Duration; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.DeleteOrphanFiles; + +/** + * Batched orphan-files-deletion Spark app. One Spark job processes a list of {@code (table, + * operationId)} pairs that the optimizer scheduler bin-packed into a single batch. Each table is + * handled by a worker thread; per-table failures are caught and reported back independently — the + * job continues for the remaining tables and exits 0 if at least one table succeeds. + * + *

This is the multi-table counterpart of {@link OrphanFilesDeletionSparkApp}. The single-table + * app remains the deployment unit when bin size is 1, and stays the canonical reference for the + * actual deletion logic. + * + *

Example invocation: + * + *

{@code
+ * com.linkedin.openhouse.jobs.spark.BatchedOrphanFilesDeletionSparkApp \
+ *   --tableNames db.t1,db.t2,db.t3 \
+ *   --operationIds op-uuid-1,op-uuid-2,op-uuid-3 \
+ *   --tableUuids tab-uuid-1,tab-uuid-2,tab-uuid-3 \
+ *   --resultsEndpoint http://optimizer.svc:8080 \
+ *   --driverParallelism 4
+ * }
+ */ +@Slf4j +public class BatchedOrphanFilesDeletionSparkApp extends BaseSparkApp { + + private static final String OPERATION_TYPE = "ORPHAN_FILES_DELETION"; + private static final String STATUS_SUCCESS = "SUCCESS"; + private static final String STATUS_FAILED = "FAILED"; + private static final int DEFAULT_MAX_ORPHAN_FILE_SAMPLE_SIZE = 20000; + private static final int DEFAULT_MIN_OFD_TTL_IN_DAYS = 3; + + /** + * Hard ceiling on the number of tables a single batched job can carry. The wire path is parallel + * CSV CLI args (see {@link #buildEntries}); at ~120 chars per entry (36-char UUID × 3 lists) this + * gives ~24 KB on the command line, well under the typical Linux {@code ARG_MAX} of 128 KB but + * leaves headroom for the {@code spark-submit} envelope and JVM flags. The scheduler-driven path + * uses a smaller per-entry footprint but inherits the same cap for defense in depth. Operators + * tune the per-job batch size with {@code --batchMaxItems} (default {@code 25}); this constant is + * a footgun stop, not the operating point. + */ + public static final int MAX_BATCH_SIZE = 200; + + private final List entries; + private final String resultsEndpoint; + private final int driverParallelism; + private final long ttlSeconds; + private final String backupDir; + private final int concurrentDeletes; + private final boolean streamResults; + private final int maxOrphanFileSampleSize; + + public BatchedOrphanFilesDeletionSparkApp( + String jobId, + StateManager stateManager, + OtelEmitter otelEmitter, + List entries, + String resultsEndpoint, + int driverParallelism, + long ttlSeconds, + String backupDir, + int concurrentDeletes, + boolean streamResults, + int maxOrphanFileSampleSize) { + super(jobId, stateManager, otelEmitter); + this.entries = entries; + this.resultsEndpoint = resultsEndpoint; + this.driverParallelism = Math.max(1, driverParallelism); + this.ttlSeconds = ttlSeconds; + this.backupDir = backupDir; + this.concurrentDeletes = concurrentDeletes; + this.streamResults = streamResults; + this.maxOrphanFileSampleSize = maxOrphanFileSampleSize; + } + + @Override + protected void runInner(Operations ops) { + log.info( + "Batched OFD start: entries={} driverParallelism={} resultsEndpoint={}", + entries.size(), + driverParallelism, + resultsEndpoint); + + if (entries.isEmpty()) { + log.warn("Batched OFD invoked with no entries; nothing to do"); + return; + } + + int successCount; + try (OptimizerServiceClient client = newOptimizerClient()) { + successCount = runBatch(ops, client); + } + + int failureCount = entries.size() - successCount; + log.info( + "Batched OFD finished: total={} success={} failed={}", + entries.size(), + successCount, + failureCount); + + if (successCount == 0) { + throw new RuntimeException( + String.format("All %d operations in batch failed", entries.size())); + } + } + + private int runBatch(Operations ops, OptimizerServiceClient client) { + ExecutorService pool = Executors.newFixedThreadPool(driverParallelism); + try { + // Two-phase pipeline: submit every worker first (so they run concurrently), then await each. + // Pairing each Future with its BatchEntry via AbstractMap.SimpleImmutableEntry. + List>> submissions = + entries.stream() + .map( + entry -> + new AbstractMap.SimpleImmutableEntry<>( + entry, pool.submit(new TableWorker(ops, entry, client)))) + .collect(Collectors.toList()); + return submissions.stream() + .mapToInt(submission -> awaitOne(submission.getKey(), submission.getValue(), client)) + .sum(); + } finally { + shutdownPool(pool); + } + } + + private int awaitOne(BatchEntry entry, Future future, OptimizerServiceClient client) { + try { + return Boolean.TRUE.equals(future.get()) ? 1 : 0; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("Worker interrupted (likely job cancellation): fqtn={}", entry.getFqtn(), e); + otelEmitter.count( + METRICS_SCOPE, + "optimizer_batch_interrupted", + 1, + Attributes.of(AttributeKey.stringKey(AppConstants.TABLE_NAME), entry.getFqtn())); + return 0; + } catch (ExecutionException e) { + // The worker catches Throwable internally and always reports its own result, so reaching + // here means the worker itself leaked an exception. Be defensive: post FAILED so the + // operation row doesn't sit SCHEDULED until the stale-timeout. + log.error( + "Worker threw outside its own catch for fqtn={} — reporting FAILED", + entry.getFqtn(), + e.getCause()); + reportResult(entry, false, client); + return 0; + } + } + + private void shutdownPool(ExecutorService pool) { + pool.shutdown(); + try { + if (!pool.awaitTermination(30, TimeUnit.SECONDS)) { + pool.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + pool.shutdownNow(); + } + } + + /** + * Returns a client bound to {@link #resultsEndpoint}, or {@code null} when the endpoint was not + * configured — in that case the legacy {@link + * com.linkedin.openhouse.jobs.scheduler.JobsScheduler} is the caller and reports lifecycle via + * HTS; the per-operation optimizer callback is skipped. + */ + protected OptimizerServiceClient newOptimizerClient() { + return resultsEndpoint == null ? null : new OptimizerServiceClient(resultsEndpoint); + } + + /** + * POST the per-operation outcome to the Optimizer Service. No-op when {@code client} is null + * (scheduler-driven invocations); failure is logged + counted. + */ + private void reportResult(BatchEntry entry, boolean success, OptimizerServiceClient client) { + if (client == null) { + return; + } + OperationUpdateRequest body = + OperationUpdateRequest.builder() + .operationId(entry.getOperationId()) + .status(success ? STATUS_SUCCESS : STATUS_FAILED) + .tableUuid(entry.getTableUuid()) + .databaseName(entry.getDatabaseName()) + .tableName(entry.getTableName()) + .operationType(OPERATION_TYPE) + .build(); + try { + client.updateOperation(body); + } catch (IOException e) { + log.error( + "Failed to report operation result; row will stay SCHEDULED until stale-timeout: operationId={} fqtn={}", + entry.getOperationId(), + entry.getFqtn(), + e); + otelEmitter.count( + METRICS_SCOPE, + "optimizer_update_failed", + 1, + Attributes.of(AttributeKey.stringKey(AppConstants.TABLE_NAME), entry.getFqtn())); + } + } + + /** One unit of work in a batched OFD job. */ + private final class TableWorker implements Callable { + private final Operations ops; + private final BatchEntry entry; + private final OptimizerServiceClient client; + + TableWorker(Operations ops, BatchEntry entry, OptimizerServiceClient client) { + this.ops = ops; + this.entry = entry; + this.client = client; + } + + @Override + public Boolean call() { + String fqtn = entry.getFqtn(); + boolean success = false; + try { + log.info("OFD start: fqtn={} operationId={}", fqtn, entry.getOperationId()); + Table table = ops.getTable(fqtn); + long olderThanTimestampMillis = + System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(resolveTtlSeconds(table)); + DeleteOrphanFiles.Result result = + ops.deleteOrphanFiles( + table, + olderThanTimestampMillis, + Boolean.parseBoolean( + table.properties().getOrDefault(AppConstants.BACKUP_ENABLED_KEY, "false")), + backupDir, + concurrentDeletes, + streamResults, + maxOrphanFileSampleSize); + // Count via iteration rather than materializing the full path list: a table with millions + // of orphan files would otherwise OOM the driver, and that risk multiplies with + // driverParallelism workers running concurrently. + int orphanCount = Iterables.size(result.orphanFileLocations()); + otelEmitter.count( + METRICS_SCOPE, + AppConstants.ORPHAN_FILE_COUNT, + orphanCount, + Attributes.of(AttributeKey.stringKey(AppConstants.TABLE_NAME), fqtn)); + validate(fqtn); + success = true; + log.info("OFD success: fqtn={} orphansDetected={}", fqtn, orphanCount); + } catch (Throwable t) { + log.error("OFD failed: fqtn={} operationId={}", fqtn, entry.getOperationId(), t); + } finally { + reportResult(entry, success, client); + } + return success; + } + + /** + * Re-runs {@link TableStateValidator} — the same post-job consistency check the single-table + * {@link OrphanFilesDeletionSparkApp} uses — to confirm the table's manifests and metadata are + * intact after deletion. A failure here is treated as a failed operation: it's logged, counted, + * and re-thrown so the outer {@link #call()} marks {@code success=false}. + */ + private void validate(String fqtn) { + try { + TableStateValidator.run(ops.spark(), fqtn); + } catch (TableValidationException e) { + log.error("Post-job validation failed: fqtn={}", fqtn, e); + otelEmitter.count( + METRICS_SCOPE, + "post_run_validation_error", + 1, + Attributes.of( + AttributeKey.stringKey(AppConstants.TABLE_NAME), + fqtn, + AttributeKey.stringKey(AppConstants.JOB_NAME), + BatchedOrphanFilesDeletionSparkApp.class.getSimpleName())); + throw e; + } + } + + private long resolveTtlSeconds(Table table) { + long resolved = ttlSeconds; + if (Boolean.parseBoolean( + table.properties().getOrDefault(AppConstants.OFD_ONE_DAY_TTL_ENABLED_KEY, "false"))) { + resolved = TimeUnit.DAYS.toSeconds(1); + } + String tableType = + table + .properties() + .getOrDefault(AppConstants.OPENHOUSE_TABLE_TYPE_KEY, AppConstants.TABLE_TYPE_PRIMARY); + if (AppConstants.TABLE_TYPE_REPLICA.equals(tableType)) { + long days = Duration.ofSeconds(resolved).toDays(); + if (days < DEFAULT_MIN_OFD_TTL_IN_DAYS) { + resolved = TimeUnit.DAYS.toSeconds(DEFAULT_MIN_OFD_TTL_IN_DAYS); + } + } + return resolved; + } + } + + /** Per-table inputs for one operation row inside a bin. */ + @lombok.AllArgsConstructor + @lombok.Builder + @lombok.Getter + @lombok.ToString + public static class BatchEntry { + private final String fqtn; + private final String operationId; + private final String tableUuid; + private final String databaseName; + private final String tableName; + } + + public static void main(String[] args) { + OtelEmitter otelEmitter = + new AppsOtelEmitter(Collections.singletonList(DefaultOtelConfig.getOpenTelemetry())); + createApp(args, otelEmitter).run(); + } + + public static BatchedOrphanFilesDeletionSparkApp createApp( + String[] args, OtelEmitter otelEmitter) { + List