diff --git a/apps/optimizer/schedulerapp/build.gradle b/apps/optimizer/schedulerapp/build.gradle
new file mode 100644
index 000000000..28d93598c
--- /dev/null
+++ b/apps/optimizer/schedulerapp/build.gradle
@@ -0,0 +1,14 @@
+plugins {
+ id 'openhouse.springboot-ext-conventions'
+ id 'org.springframework.boot' version '2.7.8'
+}
+
+// Deployable Spring Boot wrapper around the scheduler library. Holds SchedulerApplication (the
+// @SpringBootApplication entry point) and application.properties; the scheduling logic lives in
+// :services:optimizer:scheduler.
+dependencies {
+ implementation project(':services:optimizer:scheduler')
+ implementation 'org.springframework.boot:spring-boot-starter:2.7.8'
+ implementation 'org.springframework.boot:spring-boot-starter-data-jpa:2.7.8'
+ runtimeOnly 'mysql:mysql-connector-java:8.0.33'
+}
diff --git a/apps/optimizer/schedulerapp/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerApplication.java b/apps/optimizer/schedulerapp/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerApplication.java
new file mode 100644
index 000000000..d83db7524
--- /dev/null
+++ b/apps/optimizer/schedulerapp/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerApplication.java
@@ -0,0 +1,63 @@
+package com.linkedin.openhouse.optimizer.scheduler;
+
+import com.linkedin.openhouse.optimizer.model.OperationTypeDto;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.ExitCodeGenerator;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.domain.EntityScan;
+import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
+
+/**
+ * Entry point for the Optimizer Scheduler application.
+ *
+ *
Spring Batch–style: implements {@link CommandLineRunner} so the work runs after context
+ * startup, and {@link ExitCodeGenerator} so the JVM exit code reflects batch outcome. {@code
+ * SpringApplication.exit(...)} closes the context (triggers {@code @PreDestroy} hooks, drains the
+ * JPA pool, etc.) so the k8s CronJob pod terminates cleanly with a status reflecting reality.
+ */
+@Slf4j
+@SpringBootApplication
+@EntityScan(basePackages = "com.linkedin.openhouse.optimizer.db")
+@EnableJpaRepositories(basePackages = "com.linkedin.openhouse.optimizer.repository")
+public class SchedulerApplication implements CommandLineRunner, ExitCodeGenerator {
+
+ private final SchedulerRunner runner;
+ private final Map binPackers;
+ private int exitCode = 0;
+
+ @Autowired
+ public SchedulerApplication(SchedulerRunner runner, Map binPackers) {
+ this.runner = runner;
+ this.binPackers = binPackers;
+ }
+
+ public static void main(String[] args) {
+ System.exit(SpringApplication.exit(SpringApplication.run(SchedulerApplication.class, args)));
+ }
+
+ /**
+ * Runs the scheduler once per registered {@link BinPacker} per process invocation. Each call is
+ * scoped to one operation type. Any thrown exception is logged and surfaces as a non-zero exit
+ * code via {@link #getExitCode()} after the context is shut down cleanly.
+ */
+ @Override
+ public void run(String... args) {
+ try {
+ log.info("Scheduler starting; operation types: {}", binPackers.keySet());
+ binPackers.keySet().forEach(runner::schedule);
+ log.info("Scheduler completed successfully");
+ } catch (Exception e) {
+ log.error("Scheduler failed", e);
+ exitCode = 1;
+ }
+ }
+
+ @Override
+ public int getExitCode() {
+ return exitCode;
+ }
+}
diff --git a/apps/optimizer/schedulerapp/src/main/resources/application.properties b/apps/optimizer/schedulerapp/src/main/resources/application.properties
new file mode 100644
index 000000000..5184cf1bc
--- /dev/null
+++ b/apps/optimizer/schedulerapp/src/main/resources/application.properties
@@ -0,0 +1,11 @@
+spring.application.name=openhouse-optimizer-scheduler
+spring.main.web-application-type=none
+spring.main.banner-mode=off
+spring.datasource.url=${OPTIMIZER_DB_URL:jdbc:h2:mem:schedulerdb;DB_CLOSE_DELAY=-1;MODE=MySQL}
+spring.datasource.username=${OPTIMIZER_DB_USER:sa}
+spring.datasource.password=${OPTIMIZER_DB_PASSWORD:}
+spring.jpa.hibernate.ddl-auto=none
+optimizer.scheduler.jobs.base-uri=${JOBS_BASE_URI:http://localhost:8002}
+optimizer.scheduler.ofd.max-files-per-bin=${SCHEDULER_OFD_MAX_FILES_PER_BIN:1000000}
+optimizer.scheduler.results-endpoint=${SCHEDULER_RESULTS_ENDPOINT:http://openhouse-optimizer:8080/v1/optimizer/operations}
+optimizer.scheduler.cluster-id=${SCHEDULER_CLUSTER_ID:LocalHadoopCluster}
diff --git a/services/optimizer/scheduler/build.gradle b/services/optimizer/scheduler/build.gradle
new file mode 100644
index 000000000..9294f64b8
--- /dev/null
+++ b/services/optimizer/scheduler/build.gradle
@@ -0,0 +1,33 @@
+plugins {
+ id 'openhouse.springboot-ext-conventions'
+ id 'org.springframework.boot' version '2.7.8'
+}
+
+// Library jar — the @SpringBootApplication entry point lives in :apps:optimizer:schedulerapp.
+// Disable bootJar so we don't try to assemble a runnable jar from a library that has no main
+// class; keep jar enabled so consumers (the apps wrapper) get a normal library artifact.
+bootJar {
+ enabled = false
+}
+
+jar {
+ enabled = true
+ archiveClassifier = ''
+}
+
+dependencies {
+ // api: the scheduler's public types (e.g. BinPacker, OperationTypeDto) come from
+ // :services:optimizer, so consumers of this library see them on their compile classpath.
+ api project(':services:optimizer')
+ implementation 'org.springframework.boot:spring-boot-starter:2.7.8'
+ implementation 'org.springframework.boot:spring-boot-starter-webflux:2.7.8'
+ implementation 'org.springframework.boot:spring-boot-starter-data-jpa:2.7.8'
+ implementation 'org.springframework.boot:spring-boot-starter-aop:2.7.8'
+ runtimeOnly 'mysql:mysql-connector-java:8.0.33'
+ testImplementation 'org.springframework.boot:spring-boot-starter-test:2.7.8'
+ testRuntimeOnly 'com.h2database:h2'
+}
+
+test {
+ useJUnitPlatform()
+}
diff --git a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/Bin.java b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/Bin.java
new file mode 100644
index 000000000..082a3bbd7
--- /dev/null
+++ b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/Bin.java
@@ -0,0 +1,61 @@
+package com.linkedin.openhouse.optimizer.scheduler;
+
+import com.linkedin.openhouse.optimizer.model.OperationTypeDto;
+import com.linkedin.openhouse.optimizer.model.TableOperationDto;
+import com.linkedin.openhouse.optimizer.scheduler.client.JobsServiceClient;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * A set of operations the scheduler will submit together as a single Spark job. A bin owns its own
+ * launch — callers ask it to schedule itself and react to the returned job id. The surrounding
+ * status-update machinery (claim, mark-scheduled, revert-to-pending) lives in the scheduler because
+ * it is shared across all bins regardless of operation type.
+ */
+@RequiredArgsConstructor
+public class Bin {
+
+ @Getter private final OperationTypeDto operationType;
+ @Getter private final List operations;
+
+ /** Operation UUIDs in this bin, parallel to {@link #getTableNames()}. */
+ public List getOperationIds() {
+ return operations.stream().map(TableOperationDto::getId).collect(Collectors.toList());
+ }
+
+ /** Fully-qualified {@code database.table} identifiers for the operations in this bin. */
+ public List getTableNames() {
+ return operations.stream()
+ .map(op -> op.getDatabaseName() + "." + op.getTableName())
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Return a new {@link Bin} containing only the operations whose IDs are in {@code keepIds}. Used
+ * by the scheduler to narrow the bin to the rows it actually claimed before launching the job.
+ */
+ public Bin subset(Collection keepIds) {
+ Set keep = new HashSet<>(keepIds);
+ List filtered =
+ operations.stream().filter(op -> keep.contains(op.getId())).collect(Collectors.toList());
+ return new Bin(operationType, filtered);
+ }
+
+ /**
+ * Submit this bin as a single Spark job. Returns the job id on success, or empty on submission
+ * failure — the caller is responsible for the surrounding status updates.
+ */
+ public Optional schedule(JobsServiceClient client, String resultsEndpoint) {
+ String jobName =
+ "batched-" + operationType.name().toLowerCase() + "-" + Instant.now().toEpochMilli();
+ return client.launch(
+ jobName, operationType.name(), getTableNames(), getOperationIds(), resultsEndpoint);
+ }
+}
diff --git a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/BinPacker.java b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/BinPacker.java
new file mode 100644
index 000000000..509c37b75
--- /dev/null
+++ b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/BinPacker.java
@@ -0,0 +1,24 @@
+package com.linkedin.openhouse.optimizer.scheduler;
+
+import com.linkedin.openhouse.optimizer.model.TableStatsDto;
+import java.util.List;
+
+/**
+ * Strategy for packing a set of operations into bins for batched job submission. Implementations
+ * encode the constraints of a particular packing dimension (file count, partition count, etc.);
+ * binding to an operation type is the responsibility of the scheduler configuration, not the
+ * strategy class.
+ *
+ * {@link TableStatsDto} is the cost source at the interface boundary, carried alongside each
+ * operation in a {@link SchedulingCandidate}. Implementations project the stats down to the minimal
+ * data needed to make their packing decision (e.g. file count for OFD) and do not retain the full
+ * stats payload in the returned bins.
+ */
+public interface BinPacker {
+
+ /**
+ * Pack {@code pending} into one or more {@link Bin}s. Each returned bin is non-empty; the
+ * scheduler dispatches one Spark job per bin.
+ */
+ List pack(List pending);
+}
diff --git a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/FileCountBinPacker.java b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/FileCountBinPacker.java
new file mode 100644
index 000000000..b62e1bf9b
--- /dev/null
+++ b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/FileCountBinPacker.java
@@ -0,0 +1,84 @@
+package com.linkedin.openhouse.optimizer.scheduler;
+
+import com.linkedin.openhouse.optimizer.model.OperationTypeDto;
+import com.linkedin.openhouse.optimizer.model.TableOperationDto;
+import com.linkedin.openhouse.optimizer.model.TableStatsDto;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * Greedy first-fit-descending bin-packer keyed on per-table file count, projected from each
+ * candidate's {@link TableStatsDto}.
+ *
+ * Candidates are sorted by descending file count, then assigned to the first bin whose running
+ * total stays at or below {@code maxFilesPerBin}. An operation larger than the limit gets its own
+ * bin (oversized bins are allowed — we never drop an operation).
+ */
+@RequiredArgsConstructor
+public class FileCountBinPacker implements BinPacker {
+
+ private final OperationTypeDto operationType;
+ private final long maxFilesPerBin;
+
+ @Override
+ public List pack(List pending) {
+ if (pending.isEmpty()) {
+ return List.of();
+ }
+
+ // Project once: each candidate's packing cost is just a long, keyed by operation id.
+ Map costByOperationId =
+ pending.stream()
+ .collect(Collectors.toMap(c -> c.getOperation().getId(), c -> cost(c.getStats())));
+
+ List sorted =
+ pending.stream()
+ .map(SchedulingCandidate::getOperation)
+ .sorted(
+ Comparator.comparingLong(
+ (TableOperationDto op) -> costByOperationId.get(op.getId()))
+ .reversed())
+ .collect(Collectors.toList());
+
+ // First-fit-descending is inherently stateful — each placement depends on the running totals
+ // for bins assembled so far.
+ List> binContents = new ArrayList<>();
+ List binTotals = new ArrayList<>();
+ sorted.forEach(
+ op -> {
+ long c = costByOperationId.get(op.getId());
+ OptionalInt placed =
+ IntStream.range(0, binContents.size())
+ .filter(i -> binTotals.get(i) + c <= maxFilesPerBin || binTotals.get(i) == 0)
+ .findFirst();
+ if (placed.isPresent()) {
+ int idx = placed.getAsInt();
+ binContents.get(idx).add(op);
+ binTotals.set(idx, binTotals.get(idx) + c);
+ } else {
+ List newBin = new ArrayList<>();
+ newBin.add(op);
+ binContents.add(newBin);
+ binTotals.add(c);
+ }
+ });
+
+ return binContents.stream()
+ .map(ops -> new Bin(operationType, ops))
+ .collect(Collectors.toList());
+ }
+
+ private static long cost(TableStatsDto stats) {
+ if (stats == null || stats.getSnapshot() == null) {
+ return 0L;
+ }
+ Long n = stats.getSnapshot().getNumCurrentFiles();
+ return n != null ? n : 0L;
+ }
+}
diff --git a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunner.java b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunner.java
new file mode 100644
index 000000000..7b4f7594b
--- /dev/null
+++ b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunner.java
@@ -0,0 +1,248 @@
+package com.linkedin.openhouse.optimizer.scheduler;
+
+import com.linkedin.openhouse.optimizer.db.OperationStatus;
+import com.linkedin.openhouse.optimizer.db.TableOperationsRow;
+import com.linkedin.openhouse.optimizer.db.TableStatsRow;
+import com.linkedin.openhouse.optimizer.model.OperationTypeDto;
+import com.linkedin.openhouse.optimizer.model.TableOperationDto;
+import com.linkedin.openhouse.optimizer.model.TableStatsDto;
+import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository;
+import com.linkedin.openhouse.optimizer.repository.TableStatsRepository;
+import com.linkedin.openhouse.optimizer.scheduler.client.JobsServiceClient;
+import java.time.Instant;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.data.domain.Pageable;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+
+/**
+ * For one operation type per call, reads PENDING rows, looks up per-table stats, dispatches to the
+ * registered {@link BinPacker}, and submits one Spark job per returned {@link Bin}. The {@link
+ * com.linkedin.openhouse.optimizer.scheduler.SchedulerApplication}'s CommandLineRunner loops over
+ * the registered packers and invokes {@code schedule(opType)} for each.
+ */
+@Slf4j
+@Component
+public class SchedulerRunner {
+ private final TableOperationsRepository operationsRepo;
+ private final TableStatsRepository statsRepo;
+ private final JobsServiceClient jobsClient;
+ private final Map binPackers;
+ private final String resultsEndpoint;
+
+ public SchedulerRunner(
+ TableOperationsRepository operationsRepo,
+ TableStatsRepository statsRepo,
+ JobsServiceClient jobsClient,
+ Map binPackers,
+ @Value("${optimizer.scheduler.results-endpoint}") String resultsEndpoint) {
+ this.operationsRepo = operationsRepo;
+ this.statsRepo = statsRepo;
+ this.jobsClient = jobsClient;
+ this.binPackers = binPackers;
+ this.resultsEndpoint = resultsEndpoint;
+ }
+
+ /** Schedule all PENDING operations of the given type across all databases. */
+ @Transactional
+ public void schedule(OperationTypeDto operationType) {
+ schedule(operationType, Optional.empty(), Optional.empty());
+ }
+
+ /**
+ * Schedule PENDING operations for {@code operationType}, optionally scoped to a single database
+ * or table name.
+ */
+ @Transactional
+ public void schedule(
+ OperationTypeDto operationType, Optional databaseName, Optional tableName) {
+
+ BinPacker packer = binPackers.get(operationType);
+ if (packer == null) {
+ throw new IllegalStateException(
+ "No BinPacker registered for operation type " + operationType);
+ }
+
+ // Unpaged: a single-page truncation would silently drop work past page 0 (next cycle would
+ // re-load the same first page in MySQL row order, leaving the tail unscheduled until the
+ // ordering shifts). Correctness here requires the full PENDING set in one cycle; the working
+ // set is bounded by count(PENDING for this op type).
+ List pendingRows =
+ operationsRepo.find(
+ Optional.of(operationType.toDb()),
+ Optional.of(OperationStatus.PENDING),
+ Optional.empty(),
+ databaseName,
+ tableName,
+ Optional.empty(),
+ Optional.empty(),
+ Pageable.unpaged());
+ if (pendingRows.isEmpty()) {
+ log.info("No PENDING operations of type {}; nothing to schedule", operationType);
+ return;
+ }
+
+ // Deduplicate before claiming: if multiple PENDING rows exist for the same tableUuid, keep
+ // the oldest (lex-tiebreak on id) and cancel the rest. Per-cycle, not per-bin — running this
+ // inside the bin loop nuked rows belonging to other bins of the same cycle.
+ List survivors = cancelDuplicates(pendingRows);
+ if (survivors.isEmpty()) {
+ return;
+ }
+
+ List pending =
+ survivors.stream().map(TableOperationDto::fromRow).collect(Collectors.toList());
+
+ // Tradeoff: we fetch fresh table_stats per scheduling cycle (one batched query) rather than
+ // denormalizing the relevant fields onto TableOperationDto. The denormalized alternative would
+ // remove the per-cycle lookup but widen the TableOperationDto row and serve staler data; the
+ // current shape favors smaller operations + freshness over fewer queries.
+ Set uuids =
+ pending.stream().map(TableOperationDto::getTableUuid).collect(Collectors.toSet());
+ Map statsByUuid =
+ statsRepo.findAllById(uuids).stream()
+ .collect(Collectors.toMap(TableStatsRow::getTableUuid, TableStatsDto::fromRow));
+
+ // Filter at the boundary so SchedulingCandidate.stats is guaranteed non-null. A table without
+ // a stats row gets skipped this cycle and reconsidered after stats land.
+ List withStats =
+ pending.stream()
+ .filter(op -> statsByUuid.containsKey(op.getTableUuid()))
+ .collect(Collectors.toList());
+ if (withStats.size() < pending.size()) {
+ log.warn(
+ "Skipped {} {} operations with no table_stats row",
+ pending.size() - withStats.size(),
+ operationType);
+ }
+ if (withStats.isEmpty()) {
+ return;
+ }
+
+ List candidates =
+ withStats.stream()
+ .map(op -> new SchedulingCandidate(op, statsByUuid.get(op.getTableUuid())))
+ .collect(Collectors.toList());
+
+ List bins = packer.pack(candidates);
+ log.info(
+ "Packed {} PENDING {} operations into {} bins",
+ candidates.size(),
+ operationType,
+ bins.size());
+
+ bins.forEach(this::submitBin);
+ }
+
+ /**
+ * Group {@code pendingRows} by {@code tableUuid}; for any group with more than one row, cancel
+ * all but the oldest (lex-tiebreak on id). Returns the survivors in input order. Deterministic.
+ */
+ private List cancelDuplicates(List pendingRows) {
+ Map> byTableUuid =
+ pendingRows.stream().collect(Collectors.groupingBy(TableOperationsRow::getTableUuid));
+
+ List duplicateIds =
+ byTableUuid.values().stream()
+ .filter(rows -> rows.size() > 1)
+ .flatMap(
+ rows ->
+ rows.stream()
+ .sorted(
+ Comparator.comparing(TableOperationsRow::getCreatedAt)
+ .thenComparing(TableOperationsRow::getId))
+ .skip(1))
+ .map(TableOperationsRow::getId)
+ .collect(Collectors.toList());
+
+ if (duplicateIds.isEmpty()) {
+ return pendingRows;
+ }
+
+ int cancelled = operationsRepo.cancel(duplicateIds);
+ log.warn("Cancelled {} duplicate PENDING rows", cancelled);
+
+ Set cancelledIds = Set.copyOf(duplicateIds);
+ return pendingRows.stream()
+ .filter(r -> !cancelledIds.contains(r.getId()))
+ .collect(Collectors.toList());
+ }
+
+ private void submitBin(Bin bin) {
+ List ids = bin.getOperationIds();
+
+ // Claim the rows in one batched UPDATE: PENDING → SCHEDULING. The UPDATE's row count is just
+ // an aggregate — to know *which* rows we own, re-query for SCHEDULING rows tagged with our
+ // scheduledAt watermark. Anything not in that subset belongs to another instance or was
+ // canceled, and must not be submitted or marked SCHEDULED.
+ Instant claimedAt = Instant.now();
+ operationsRepo.updateBatch(
+ ids,
+ OperationStatus.PENDING,
+ OperationStatus.SCHEDULING,
+ Optional.of(claimedAt),
+ Optional.empty());
+ // Unpaged: the result set is already bounded by ids.size() (the bin we just claimed); no
+ // need to cap it further.
+ List claimedIds =
+ operationsRepo
+ .find(
+ Optional.empty(),
+ Optional.of(OperationStatus.SCHEDULING),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.of(claimedAt),
+ Optional.of(ids),
+ Pageable.unpaged())
+ .stream()
+ .map(TableOperationsRow::getId)
+ .collect(Collectors.toList());
+ if (claimedIds.isEmpty()) {
+ log.info("All rows in bin already claimed by another scheduler instance; skipping");
+ return;
+ }
+ if (claimedIds.size() < ids.size()) {
+ log.info(
+ "Partial claim: {} of {} ops in bin claimed; launching job for claimed subset only",
+ claimedIds.size(),
+ ids.size());
+ }
+
+ Bin claimedBin = bin.subset(claimedIds);
+ Optional jobId = claimedBin.schedule(jobsClient, resultsEndpoint);
+ if (jobId.isPresent()) {
+ int updated =
+ operationsRepo.updateBatch(
+ claimedIds,
+ OperationStatus.SCHEDULING,
+ OperationStatus.SCHEDULED,
+ Optional.empty(),
+ Optional.of(jobId.get()));
+ log.info(
+ "Submitted job {} for {} tables ({} rows marked SCHEDULED)",
+ jobId.get(),
+ claimedBin.getOperations().size(),
+ updated);
+ } else {
+ int reverted =
+ operationsRepo.updateBatch(
+ claimedIds,
+ OperationStatus.SCHEDULING,
+ OperationStatus.PENDING,
+ Optional.empty(),
+ Optional.empty());
+ log.warn(
+ "Job submission failed; reverted {} claimed rows back to PENDING for retry on the next"
+ + " pass",
+ reverted);
+ }
+ }
+}
diff --git a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulingCandidate.java b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulingCandidate.java
new file mode 100644
index 000000000..b031ae6b7
--- /dev/null
+++ b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulingCandidate.java
@@ -0,0 +1,19 @@
+package com.linkedin.openhouse.optimizer.scheduler;
+
+import com.linkedin.openhouse.optimizer.model.TableOperationDto;
+import com.linkedin.openhouse.optimizer.model.TableStatsDto;
+import lombok.NonNull;
+import lombok.Value;
+
+/**
+ * A pending operation paired with the stats the bin packer will use as its cost source. Built by
+ * the scheduler at scheduling time and handed to the {@link BinPacker} as the unit of packing.
+ *
+ * Both fields are non-null. The scheduler filters out operations whose tables have no stats row
+ * before constructing candidates.
+ */
+@Value
+public class SchedulingCandidate {
+ @NonNull TableOperationDto operation;
+ @NonNull TableStatsDto stats;
+}
diff --git a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/client/JobsServiceClient.java b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/client/JobsServiceClient.java
new file mode 100644
index 000000000..ee8fa38ee
--- /dev/null
+++ b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/client/JobsServiceClient.java
@@ -0,0 +1,80 @@
+package com.linkedin.openhouse.optimizer.scheduler.client;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.web.reactive.function.client.WebClient;
+
+/**
+ * Client for the OpenHouse Jobs Service.
+ *
+ *
Submits one {@code BatchedOrphanFilesDeletionSparkApp} job per bin via {@code POST /jobs}.
+ */
+@Slf4j
+public class JobsServiceClient {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static final Duration TIMEOUT = Duration.ofSeconds(30);
+
+ private final WebClient webClient;
+ private final String clusterId;
+
+ public JobsServiceClient(WebClient webClient, String clusterId) {
+ this.webClient = webClient;
+ this.clusterId = clusterId;
+ }
+
+ /**
+ * Submit a batched Spark job for the given tables.
+ *
+ * @param jobName human-readable name for the job
+ * @param jobType operation type string (e.g. "ORPHAN_FILES_DELETION")
+ * @param tableNames fully-qualified table names (db.table)
+ * @param operationIds operation UUIDs — parallel to tableNames
+ * @param resultsEndpoint base URL the Spark app PATCHes results back to
+ * @return job ID if the submission succeeded, empty if an error occurred
+ */
+ public Optional launch(
+ String jobName,
+ String jobType,
+ List tableNames,
+ List operationIds,
+ String resultsEndpoint) {
+ try {
+ ObjectNode body = MAPPER.createObjectNode();
+ body.put("jobName", jobName);
+ body.put("clusterId", clusterId);
+
+ ObjectNode jobConf = body.putObject("jobConf");
+ jobConf.put("jobType", jobType);
+
+ ArrayNode args = jobConf.putArray("args");
+ args.add("--tableNames");
+ args.add(String.join(",", tableNames));
+ args.add("--operationIds");
+ args.add(String.join(",", operationIds));
+ args.add("--resultsEndpoint");
+ args.add(resultsEndpoint);
+
+ String responseBody =
+ webClient
+ .post()
+ .uri("/jobs")
+ .bodyValue(body)
+ .retrieve()
+ .bodyToMono(String.class)
+ .timeout(TIMEOUT)
+ .block();
+
+ String jobId = MAPPER.readTree(responseBody).path("jobId").asText(null);
+ return Optional.ofNullable(jobId);
+ } catch (Exception e) {
+ log.error("Failed to submit job '{}': {}", jobName, e.getMessage());
+ return Optional.empty();
+ }
+ }
+}
diff --git a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/config/SchedulerConfig.java b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/config/SchedulerConfig.java
new file mode 100644
index 000000000..796e707f4
--- /dev/null
+++ b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/config/SchedulerConfig.java
@@ -0,0 +1,46 @@
+package com.linkedin.openhouse.optimizer.scheduler.config;
+
+import com.linkedin.openhouse.optimizer.model.OperationTypeDto;
+import com.linkedin.openhouse.optimizer.scheduler.BinPacker;
+import com.linkedin.openhouse.optimizer.scheduler.FileCountBinPacker;
+import com.linkedin.openhouse.optimizer.scheduler.client.JobsServiceClient;
+import java.util.Map;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.reactive.function.client.WebClient;
+
+@Configuration
+public class SchedulerConfig {
+
+ @Value("${optimizer.scheduler.jobs.base-uri}")
+ private String jobsBaseUri;
+
+ @Value("${optimizer.scheduler.cluster-id}")
+ private String clusterId;
+
+ @Value("${optimizer.scheduler.ofd.max-files-per-bin}")
+ private long ofdMaxFilesPerBin;
+
+ @Bean
+ public WebClient jobsWebClient() {
+ return WebClient.builder().baseUrl(jobsBaseUri).build();
+ }
+
+ @Bean
+ public JobsServiceClient jobsServiceClient(WebClient jobsWebClient) {
+ return new JobsServiceClient(jobsWebClient, clusterId);
+ }
+
+ /**
+ * Map of {@link OperationTypeDto} to the {@link BinPacker} strategy that handles it. Adding a new
+ * operation type means adding an entry here and configuring its packer; the strategy class itself
+ * stays generic.
+ */
+ @Bean
+ public Map binPackers() {
+ return Map.of(
+ OperationTypeDto.ORPHAN_FILES_DELETION,
+ new FileCountBinPacker(OperationTypeDto.ORPHAN_FILES_DELETION, ofdMaxFilesPerBin));
+ }
+}
diff --git a/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/FileCountBinPackerTest.java b/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/FileCountBinPackerTest.java
new file mode 100644
index 000000000..dc3b96b5c
--- /dev/null
+++ b/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/FileCountBinPackerTest.java
@@ -0,0 +1,104 @@
+package com.linkedin.openhouse.optimizer.scheduler;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.linkedin.openhouse.optimizer.model.OperationTypeDto;
+import com.linkedin.openhouse.optimizer.model.TableOperationDto;
+import com.linkedin.openhouse.optimizer.model.TableStatsDto;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.junit.jupiter.api.Test;
+
+class FileCountBinPackerTest {
+
+ private static final long MAX = 1_000_000L;
+ private final FileCountBinPacker packer =
+ new FileCountBinPacker(OperationTypeDto.ORPHAN_FILES_DELETION, MAX);
+
+ private static TableOperationDto op(String uuid) {
+ return TableOperationDto.builder()
+ .id(UUID.randomUUID().toString())
+ .tableUuid(uuid)
+ .databaseName("db")
+ .tableName("tbl_" + uuid)
+ .operationType(OperationTypeDto.ORPHAN_FILES_DELETION)
+ .build();
+ }
+
+ private static TableStatsDto stats(Long fileCount) {
+ return TableStatsDto.builder()
+ .snapshot(TableStatsDto.SnapshotMetrics.builder().numCurrentFiles(fileCount).build())
+ .build();
+ }
+
+ private static SchedulingCandidate candidate(String uuid, Long fileCount) {
+ return new SchedulingCandidate(op(uuid), stats(fileCount));
+ }
+
+ @Test
+ void emptyInput_returnsEmptyBins() {
+ assertThat(packer.pack(List.of())).isEmpty();
+ }
+
+ @Test
+ void singleTable_oneBin() {
+ SchedulingCandidate c = candidate("uuid-1", 100L);
+ List bins = packer.pack(List.of(c));
+ assertThat(bins).hasSize(1);
+ assertThat(bins.get(0).getOperations()).containsExactly(c.getOperation());
+ }
+
+ @Test
+ void tablesUnderLimit_oneBin() {
+ List bins =
+ packer.pack(
+ List.of(candidate("a", 300_000L), candidate("b", 300_000L), candidate("c", 300_000L)));
+ assertThat(bins).hasSize(1);
+ assertThat(bins.get(0).getOperations()).hasSize(3);
+ }
+
+ @Test
+ void tablesOverLimit_twoBins() {
+ List bins =
+ packer.pack(
+ List.of(candidate("a", 600_000L), candidate("b", 600_000L), candidate("c", 400_000L)));
+ assertThat(bins).hasSize(2);
+ assertThat(bins.get(0).getOperations()).hasSize(2); // 600k + 400k
+ assertThat(bins.get(1).getOperations()).hasSize(1); // 600k alone
+ }
+
+ @Test
+ void largeTableAlone_exceedsLimitSingleBin() {
+ SchedulingCandidate big = candidate("big", 5_000_000L);
+ List bins = packer.pack(List.of(big));
+ assertThat(bins).hasSize(1);
+ assertThat(bins.get(0).getOperations()).containsExactly(big.getOperation());
+ }
+
+ @Test
+ void nullFileCount_treatedAsZero() {
+ List bins = packer.pack(List.of(candidate("x", null), candidate("y", null)));
+ assertThat(bins).hasSize(1);
+ assertThat(bins.get(0).getOperations()).hasSize(2);
+ }
+
+ @Test
+ void sortedDescending_largestFirst() {
+ SchedulingCandidate small = candidate("small", 100L);
+ SchedulingCandidate large = candidate("large", 900_000L);
+ List bins = packer.pack(List.of(small, large));
+ assertThat(bins).hasSize(1);
+ List ordered =
+ bins.get(0).getOperations().stream()
+ .map(TableOperationDto::getTableUuid)
+ .collect(Collectors.toList());
+ assertThat(ordered).containsExactly("large", "small");
+ }
+
+ @Test
+ void binCarriesOperationType() {
+ List bins = packer.pack(List.of(candidate("u", 1L)));
+ assertThat(bins.get(0).getOperationType()).isEqualTo(OperationTypeDto.ORPHAN_FILES_DELETION);
+ }
+}
diff --git a/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunnerTest.java b/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunnerTest.java
new file mode 100644
index 000000000..aa4abce8f
--- /dev/null
+++ b/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunnerTest.java
@@ -0,0 +1,364 @@
+package com.linkedin.openhouse.optimizer.scheduler;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.linkedin.openhouse.optimizer.db.OperationStatus;
+import com.linkedin.openhouse.optimizer.db.SnapshotMetrics;
+import com.linkedin.openhouse.optimizer.db.TableOperationsRow;
+import com.linkedin.openhouse.optimizer.db.TableStatsRow;
+import com.linkedin.openhouse.optimizer.model.OperationTypeDto;
+import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository;
+import com.linkedin.openhouse.optimizer.repository.TableStatsRepository;
+import com.linkedin.openhouse.optimizer.scheduler.client.JobsServiceClient;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class SchedulerRunnerTest {
+
+ private static final OperationTypeDto OFD = OperationTypeDto.ORPHAN_FILES_DELETION;
+ private static final com.linkedin.openhouse.optimizer.db.OperationType OFD_DB =
+ com.linkedin.openhouse.optimizer.db.OperationType.ORPHAN_FILES_DELETION;
+ private static final String OFD_STR = OFD.name();
+ private static final String RESULTS_ENDPOINT = "http://localhost:8080/v1/optimizer/operations";
+
+ @Mock private TableOperationsRepository operationsRepo;
+ @Mock private TableStatsRepository statsRepo;
+ @Mock private JobsServiceClient jobsClient;
+ @Mock private BinPacker binPacker;
+
+ private SchedulerRunner runner;
+
+ @BeforeEach
+ void setUp() {
+ runner =
+ new SchedulerRunner(
+ operationsRepo, statsRepo, jobsClient, Map.of(OFD, binPacker), RESULTS_ENDPOINT);
+ }
+
+ // ---- Stubbing helpers ----
+
+ /** Stubs the initial "find PENDING" call. */
+ private void stubFindPending(List rows) {
+ when(operationsRepo.find(
+ eq(Optional.of(OFD_DB)),
+ eq(Optional.of(OperationStatus.PENDING)),
+ eq(Optional.empty()),
+ eq(Optional.empty()),
+ eq(Optional.empty()),
+ eq(Optional.empty()),
+ eq(Optional.empty()),
+ any()))
+ .thenReturn(rows);
+ }
+
+ /** Stubs the post-claim "find SCHEDULING" call. */
+ private void stubFindClaimed(List rows) {
+ when(operationsRepo.find(
+ eq(Optional.empty()),
+ eq(Optional.of(OperationStatus.SCHEDULING)),
+ eq(Optional.empty()),
+ eq(Optional.empty()),
+ eq(Optional.empty()),
+ any(),
+ any(),
+ any()))
+ .thenReturn(rows);
+ }
+
+ /** Stubs the bin packer to return one bin containing every candidate. */
+ private void stubOneBinForAllCandidates() {
+ when(binPacker.pack(anyList()))
+ .thenAnswer(
+ inv ->
+ List.of(
+ new Bin(
+ OFD,
+ inv.>getArgument(0).stream()
+ .map(SchedulingCandidate::getOperation)
+ .collect(Collectors.toList()))));
+ }
+
+ private TableOperationsRow pendingRow(String uuid, String db, String table) {
+ return TableOperationsRow.builder()
+ .id(UUID.randomUUID().toString())
+ .tableUuid(uuid)
+ .databaseName(db)
+ .tableName(table)
+ .operationType(OFD_DB)
+ .status(OperationStatus.PENDING)
+ .createdAt(Instant.now())
+ .build();
+ }
+
+ private TableOperationsRow schedulingRow(TableOperationsRow source) {
+ return source.toBuilder().status(OperationStatus.SCHEDULING).build();
+ }
+
+ private TableStatsRow statsRow(String uuid, long numCurrentFiles) {
+ return TableStatsRow.builder()
+ .tableUuid(uuid)
+ .snapshot(SnapshotMetrics.builder().numCurrentFiles(numCurrentFiles).build())
+ .build();
+ }
+
+ // ---- Tests ----
+
+ @Test
+ void schedule_noPendingOps_noJobSubmitted() {
+ stubFindPending(List.of());
+
+ runner.schedule(OFD);
+
+ verify(jobsClient, never()).launch(anyString(), anyString(), anyList(), anyList(), anyString());
+ verify(binPacker, never()).pack(anyList());
+ }
+
+ @Test
+ void schedule_unknownOperationType_throws() {
+ SchedulerRunner emptyRunner =
+ new SchedulerRunner(operationsRepo, statsRepo, jobsClient, Map.of(), RESULTS_ENDPOINT);
+
+ assertThatThrownBy(() -> emptyRunner.schedule(OFD))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("No BinPacker registered");
+
+ verify(operationsRepo, never()).find(any(), any(), any(), any(), any(), any(), any(), any());
+ verify(jobsClient, never()).launch(anyString(), anyString(), anyList(), anyList(), anyString());
+ }
+
+ @Test
+ void schedule_singleBin_claimsAndMarksScheduled() {
+ String uuid = UUID.randomUUID().toString();
+ TableOperationsRow row = pendingRow(uuid, "db1", "tbl1");
+
+ stubFindPending(List.of(row));
+ when(statsRepo.findAllById(any())).thenReturn(List.of(statsRow(uuid, 100_000L)));
+ stubOneBinForAllCandidates();
+ when(operationsRepo.updateBatch(
+ anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any()))
+ .thenReturn(1);
+ stubFindClaimed(List.of(schedulingRow(row)));
+ when(operationsRepo.updateBatch(
+ anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.SCHEDULED), any(), any()))
+ .thenReturn(1);
+ when(jobsClient.launch(anyString(), anyString(), anyList(), anyList(), anyString()))
+ .thenReturn(Optional.of("job-123"));
+
+ runner.schedule(OFD);
+
+ verify(operationsRepo)
+ .updateBatch(
+ eq(List.of(row.getId())),
+ eq(OperationStatus.SCHEDULING),
+ eq(OperationStatus.SCHEDULED),
+ eq(Optional.empty()),
+ eq(Optional.of("job-123")));
+ verify(operationsRepo, never())
+ .updateBatch(
+ anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.PENDING), any(), any());
+
+ ArgumentCaptor> tableNames = ArgumentCaptor.forClass(List.class);
+ verify(jobsClient)
+ .launch(anyString(), eq(OFD_STR), tableNames.capture(), anyList(), anyString());
+ assertThat(tableNames.getValue()).containsExactly("db1.tbl1");
+ }
+
+ @Test
+ void schedule_jobLaunchFails_marksPendingForRetry() {
+ String uuid = UUID.randomUUID().toString();
+ TableOperationsRow row = pendingRow(uuid, "db1", "tbl1");
+
+ stubFindPending(List.of(row));
+ when(statsRepo.findAllById(any())).thenReturn(List.of(statsRow(uuid, 100L)));
+ stubOneBinForAllCandidates();
+ when(operationsRepo.updateBatch(
+ anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any()))
+ .thenReturn(1);
+ stubFindClaimed(List.of(schedulingRow(row)));
+ when(jobsClient.launch(anyString(), anyString(), anyList(), anyList(), anyString()))
+ .thenReturn(Optional.empty());
+ when(operationsRepo.updateBatch(
+ anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.PENDING), any(), any()))
+ .thenReturn(1);
+
+ runner.schedule(OFD);
+
+ verify(operationsRepo)
+ .updateBatch(
+ eq(List.of(row.getId())),
+ eq(OperationStatus.SCHEDULING),
+ eq(OperationStatus.PENDING),
+ eq(Optional.empty()),
+ eq(Optional.empty()));
+ verify(operationsRepo, never())
+ .updateBatch(
+ anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.SCHEDULED), any(), any());
+ }
+
+ @Test
+ void schedule_rowsAlreadyClaimed_skipsSubmit() {
+ String uuid = UUID.randomUUID().toString();
+ TableOperationsRow row = pendingRow(uuid, "db1", "tbl1");
+
+ stubFindPending(List.of(row));
+ when(statsRepo.findAllById(any())).thenReturn(List.of(statsRow(uuid, 100L)));
+ stubOneBinForAllCandidates();
+ when(operationsRepo.updateBatch(
+ anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any()))
+ .thenReturn(0);
+ stubFindClaimed(List.of());
+
+ runner.schedule(OFD);
+
+ verify(jobsClient, never()).launch(anyString(), anyString(), anyList(), anyList(), anyString());
+ verify(operationsRepo, never())
+ .updateBatch(
+ anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.SCHEDULED), any(), any());
+ verify(operationsRepo, never())
+ .updateBatch(
+ anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.PENDING), any(), any());
+ }
+
+ @Test
+ void schedule_cancelsDuplicatePendingPerCycle() {
+ String uuid = UUID.randomUUID().toString();
+ TableOperationsRow row1 = pendingRow(uuid, "db1", "tbl1");
+ TableOperationsRow row2 = pendingRow(uuid, "db1", "tbl1");
+
+ stubFindPending(List.of(row1, row2));
+ when(operationsRepo.cancel(anyList())).thenReturn(1);
+ when(statsRepo.findAllById(any())).thenReturn(List.of(statsRow(uuid, 100L)));
+ stubOneBinForAllCandidates();
+ when(operationsRepo.updateBatch(
+ anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any()))
+ .thenReturn(1);
+ // After dedup, only row1 (oldest by createdAt then id) survives.
+ TableOperationsRow survivor = row1.getCreatedAt().isBefore(row2.getCreatedAt()) ? row1 : row2;
+ if (row1.getCreatedAt().equals(row2.getCreatedAt())) {
+ survivor = row1.getId().compareTo(row2.getId()) <= 0 ? row1 : row2;
+ }
+ stubFindClaimed(List.of(schedulingRow(survivor)));
+ when(operationsRepo.updateBatch(
+ anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.SCHEDULED), any(), any()))
+ .thenReturn(1);
+ when(jobsClient.launch(anyString(), anyString(), anyList(), anyList(), anyString()))
+ .thenReturn(Optional.of("job-dedup"));
+
+ runner.schedule(OFD);
+
+ // Exactly one ID was cancelled (the duplicate).
+ ArgumentCaptor> cancelled = ArgumentCaptor.forClass(List.class);
+ verify(operationsRepo).cancel(cancelled.capture());
+ assertThat(cancelled.getValue()).hasSize(1);
+ }
+
+ @Test
+ void schedule_partialClaim_launchesAndMarksOnlyClaimedSubset() {
+ String uuidA = UUID.randomUUID().toString();
+ String uuidB = UUID.randomUUID().toString();
+ TableOperationsRow rowA = pendingRow(uuidA, "db1", "tblA");
+ TableOperationsRow rowB = pendingRow(uuidB, "db1", "tblB");
+
+ stubFindPending(List.of(rowA, rowB));
+ when(statsRepo.findAllById(any()))
+ .thenReturn(List.of(statsRow(uuidA, 100L), statsRow(uuidB, 100L)));
+ stubOneBinForAllCandidates();
+ when(operationsRepo.updateBatch(
+ anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any()))
+ .thenReturn(1);
+ // Only A actually claimed (B owned by another instance).
+ stubFindClaimed(List.of(schedulingRow(rowA)));
+ when(operationsRepo.updateBatch(
+ anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.SCHEDULED), any(), any()))
+ .thenReturn(1);
+ when(jobsClient.launch(anyString(), anyString(), anyList(), anyList(), anyString()))
+ .thenReturn(Optional.of("job-partial"));
+
+ runner.schedule(OFD);
+
+ ArgumentCaptor> launchedTableNames = ArgumentCaptor.forClass(List.class);
+ ArgumentCaptor> launchedOpIds = ArgumentCaptor.forClass(List.class);
+ verify(jobsClient)
+ .launch(
+ anyString(),
+ anyString(),
+ launchedTableNames.capture(),
+ launchedOpIds.capture(),
+ anyString());
+ assertThat(launchedTableNames.getValue()).containsExactly("db1.tblA");
+ assertThat(launchedOpIds.getValue()).containsExactly(rowA.getId());
+
+ verify(operationsRepo)
+ .updateBatch(
+ eq(List.of(rowA.getId())),
+ eq(OperationStatus.SCHEDULING),
+ eq(OperationStatus.SCHEDULED),
+ eq(Optional.empty()),
+ eq(Optional.of("job-partial")));
+ }
+
+ @Test
+ void schedule_opsWithoutStats_skipped() {
+ String withStats = UUID.randomUUID().toString();
+ String missing = UUID.randomUUID().toString();
+ TableOperationsRow withStatsRow = pendingRow(withStats, "db1", "tblA");
+ TableOperationsRow missingRow = pendingRow(missing, "db1", "tblB");
+
+ stubFindPending(List.of(withStatsRow, missingRow));
+ when(statsRepo.findAllById(any())).thenReturn(List.of(statsRow(withStats, 50L)));
+ stubOneBinForAllCandidates();
+ when(operationsRepo.updateBatch(
+ anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any()))
+ .thenReturn(1);
+ stubFindClaimed(List.of(schedulingRow(withStatsRow)));
+ when(operationsRepo.updateBatch(
+ anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.SCHEDULED), any(), any()))
+ .thenReturn(1);
+ when(jobsClient.launch(anyString(), anyString(), anyList(), anyList(), anyString()))
+ .thenReturn(Optional.of("job-skip"));
+
+ runner.schedule(OFD);
+
+ ArgumentCaptor> ids = ArgumentCaptor.forClass(List.class);
+ verify(operationsRepo)
+ .updateBatch(
+ ids.capture(),
+ eq(OperationStatus.PENDING),
+ eq(OperationStatus.SCHEDULING),
+ any(),
+ any());
+ assertThat(ids.getValue()).containsExactly(withStatsRow.getId());
+ }
+
+ @Test
+ void schedule_allOpsWithoutStats_noJobSubmitted() {
+ TableOperationsRow row = pendingRow(UUID.randomUUID().toString(), "db1", "tbl1");
+
+ stubFindPending(List.of(row));
+ when(statsRepo.findAllById(any())).thenReturn(List.of());
+
+ runner.schedule(OFD);
+
+ verify(binPacker, never()).pack(anyList());
+ verify(jobsClient, never()).launch(anyString(), anyString(), anyList(), anyList(), anyString());
+ }
+}
diff --git a/services/optimizer/scheduler/src/test/resources/application-test.properties b/services/optimizer/scheduler/src/test/resources/application-test.properties
new file mode 100644
index 000000000..b0609fa34
--- /dev/null
+++ b/services/optimizer/scheduler/src/test/resources/application-test.properties
@@ -0,0 +1,10 @@
+spring.datasource.url=jdbc:h2:mem:schedulertestdb;DB_CLOSE_DELAY=-1;MODE=MySQL
+spring.datasource.username=sa
+spring.datasource.password=
+spring.jpa.hibernate.ddl-auto=none
+spring.sql.init.mode=always
+spring.sql.init.schema-locations=classpath:db/optimizer-schema.sql
+optimizer.scheduler.jobs.base-uri=http://localhost:9999
+optimizer.scheduler.ofd.max-files-per-bin=1000000
+optimizer.scheduler.results-endpoint=http://localhost:8080/v1/optimizer/operations
+optimizer.scheduler.cluster-id=test-cluster
diff --git a/settings.gradle b/settings.gradle
index 810ecd643..b44481647 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -52,6 +52,8 @@ include ':services:jobs'
include ':services:optimizer'
include ':services:optimizer:analyzer'
include ':apps:optimizer:analyzerapp'
+include ':services:optimizer:scheduler'
+include ':apps:optimizer:schedulerapp'
include ':services:tables'
include ':tables-test-fixtures:tables-test-fixtures-iceberg-1.2'
include ':tables-test-fixtures:tables-test-fixtures-iceberg-1.5'