-
Notifications
You must be signed in to change notification settings - Fork 78
feat(optimizer): [4/N] Scheduler app #534
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
d4b9bca
79753f1
ae610ae
85a432f
dceef97
7170599
3e03fdb
bf04488
f2ac002
5f6fa3b
6e44c50
8054586
a9104ae
5af5f14
cbddc5d
62f426a
e13a31b
442f1e4
c4f194a
790994e
6da624a
9913b77
52ba858
beedad8
f63a952
3483b25
9748548
e4a1ad1
f663537
8ad9cac
d7e3a65
0069b92
0293009
0efab45
6fa885d
a5585f4
dd4faf2
0668486
91ba362
b315c11
eba1392
10ed1bb
35bcd38
0dbe3d9
e80f9ce
ded97d0
e576593
9f88a4a
6f98e1a
d44783f
ef78c17
d90c26f
62f33b7
17e280f
a5df7e4
b0898e3
7a0d6d2
8142af3
9a129a8
a8978a0
dfb9102
e3bf9e1
fb71bd9
681407e
2005bca
b689969
7ea8868
d7767e8
3ec0b0a
e3fb777
f89889d
beaaf88
4a8796c
949e814
d3e1726
db9513a
1d469a7
eee8eca
0567753
328e5b9
f7a5d20
2a532b5
2e3a231
db5eb29
bbcf84a
ac3abc0
e79eec7
f955ded
13987c1
969949d
861b584
41d4c6d
69d9e8f
b60a3bf
eb6e3be
b80b2e5
ef453ca
ad11533
0c36ff2
25d98aa
31fac5b
51dab67
3a4f0a8
3fee7ab
188713d
f060b5e
1119699
619df83
a04cad6
8d64273
ee7bcab
c1ad246
72b431c
0b30130
e183906
b2fd321
c72aae8
1fca287
8ae8777
b3aacff
bb8aa4d
6a23755
95456be
b9620e3
de9b0e1
af23d5e
3864e42
0a1125b
a6045b5
4427de0
db5921e
d82c17f
f74c6f9
3aebf64
bf30f86
faba6d7
1d56fa6
d0ec73e
b6c7f42
177af95
487ac56
6ffc703
7f51360
2903e0b
9ad8861
a325684
9e5fbae
7dd1e97
2b06c92
5b5aae2
c862777
7935f43
c73d8cb
ad1bf4c
d9ffe48
ad88893
ac7f84d
437a0ed
aabb51c
928d537
eedf6d0
1166efb
e492f7c
4f98c22
2c26872
b849b7d
231efde
a2580b1
9a0ab09
f1c500b
d315227
f788ab6
bb52e7a
0b87381
b31decf
caf3294
c6a64bf
91e89ef
8a4251d
c305aa9
428cb17
f6c4674
4e86569
cc8aa80
efcceea
f85edd5
c00f201
6998a0f
5cbdbcb
a89a600
1fe71f0
fb5e726
ad0c0f1
947bedf
ce5745b
c99bc3a
b96c388
d65b511
78de390
49e43bc
210d5f0
040046e
6dd07bd
66aa3e7
b69e09a
a028a98
2b6f67c
231320c
6eb6a1e
de39a78
3309983
a89e037
1e361af
a37169d
6416c9d
bbef386
02bbc5c
266e2a7
e8d2c7a
144da72
c74ad5f
1866033
6ef7964
357d5c2
020e94a
08bebcb
d7dead0
9abde3b
e7321d0
eae3a4a
3ef5eaf
a638541
2477530
f529a5c
8e5e97c
87b9a75
4b05cb2
21dbf34
adbff95
eccd2bc
b183f9c
663564c
cfe5509
f3fe80a
8cb1a37
9630e57
84a3532
2cb7c54
3f1173b
10a5dda
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| plugins { | ||
| id 'openhouse.springboot-ext-conventions' | ||
| id 'org.springframework.boot' version '2.7.8' | ||
| } | ||
|
|
||
| // Deployable Spring Boot wrapper around the scheduler library. Holds SchedulerApplication (the | ||
| // @SpringBootApplication entry point) and application.properties; the scheduling logic lives in | ||
| // :services:optimizer:scheduler. | ||
| dependencies { | ||
| implementation project(':services:optimizer:scheduler') | ||
| implementation 'org.springframework.boot:spring-boot-starter:2.7.8' | ||
| implementation 'org.springframework.boot:spring-boot-starter-data-jpa:2.7.8' | ||
| runtimeOnly 'mysql:mysql-connector-java:8.0.33' | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| package com.linkedin.openhouse.optimizer.scheduler; | ||
|
|
||
| import com.linkedin.openhouse.optimizer.model.OperationTypeDto; | ||
| import java.util.Map; | ||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.springframework.beans.factory.annotation.Autowired; | ||
| import org.springframework.boot.CommandLineRunner; | ||
| import org.springframework.boot.ExitCodeGenerator; | ||
| import org.springframework.boot.SpringApplication; | ||
| import org.springframework.boot.autoconfigure.SpringBootApplication; | ||
| import org.springframework.boot.autoconfigure.domain.EntityScan; | ||
| import org.springframework.data.jpa.repository.config.EnableJpaRepositories; | ||
|
|
||
| /** | ||
| * Entry point for the Optimizer Scheduler application. | ||
| * | ||
| * <p>Spring Batch–style: implements {@link CommandLineRunner} so the work runs after context | ||
| * startup, and {@link ExitCodeGenerator} so the JVM exit code reflects batch outcome. {@code | ||
| * SpringApplication.exit(...)} closes the context (triggers {@code @PreDestroy} hooks, drains the | ||
| * JPA pool, etc.) so the k8s CronJob pod terminates cleanly with a status reflecting reality. | ||
| */ | ||
| @Slf4j | ||
| @SpringBootApplication | ||
| @EntityScan(basePackages = "com.linkedin.openhouse.optimizer.db") | ||
| @EnableJpaRepositories(basePackages = "com.linkedin.openhouse.optimizer.repository") | ||
| public class SchedulerApplication implements CommandLineRunner, ExitCodeGenerator { | ||
|
|
||
| private final SchedulerRunner runner; | ||
| private final Map<OperationTypeDto, BinPacker> binPackers; | ||
| private int exitCode = 0; | ||
|
|
||
| @Autowired | ||
| public SchedulerApplication(SchedulerRunner runner, Map<OperationTypeDto, BinPacker> binPackers) { | ||
| this.runner = runner; | ||
| this.binPackers = binPackers; | ||
| } | ||
|
|
||
| public static void main(String[] args) { | ||
| System.exit(SpringApplication.exit(SpringApplication.run(SchedulerApplication.class, args))); | ||
| } | ||
|
|
||
| /** | ||
| * Runs the scheduler once per registered {@link BinPacker} per process invocation. Each call is | ||
| * scoped to one operation type. Any thrown exception is logged and surfaces as a non-zero exit | ||
| * code via {@link #getExitCode()} after the context is shut down cleanly. | ||
| */ | ||
| @Override | ||
| public void run(String... args) { | ||
| try { | ||
| log.info("Scheduler starting; operation types: {}", binPackers.keySet()); | ||
| binPackers.keySet().forEach(runner::schedule); | ||
| log.info("Scheduler completed successfully"); | ||
| } catch (Exception e) { | ||
| log.error("Scheduler failed", e); | ||
| exitCode = 1; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public int getExitCode() { | ||
| return exitCode; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| spring.application.name=openhouse-optimizer-scheduler | ||
| spring.main.web-application-type=none | ||
| spring.main.banner-mode=off | ||
| spring.datasource.url=${OPTIMIZER_DB_URL:jdbc:h2:mem:schedulerdb;DB_CLOSE_DELAY=-1;MODE=MySQL} | ||
| spring.datasource.username=${OPTIMIZER_DB_USER:sa} | ||
| spring.datasource.password=${OPTIMIZER_DB_PASSWORD:} | ||
| spring.jpa.hibernate.ddl-auto=none | ||
| optimizer.scheduler.jobs.base-uri=${JOBS_BASE_URI:http://localhost:8002} | ||
| optimizer.scheduler.ofd.max-files-per-bin=${SCHEDULER_OFD_MAX_FILES_PER_BIN:1000000} | ||
| optimizer.scheduler.results-endpoint=${SCHEDULER_RESULTS_ENDPOINT:http://openhouse-optimizer:8080/v1/optimizer/operations} | ||
| optimizer.scheduler.cluster-id=${SCHEDULER_CLUSTER_ID:LocalHadoopCluster} |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| plugins { | ||
| id 'openhouse.springboot-ext-conventions' | ||
| id 'org.springframework.boot' version '2.7.8' | ||
| } | ||
|
|
||
| // Library jar — the @SpringBootApplication entry point lives in :apps:optimizer:schedulerapp. | ||
| // Disable bootJar so we don't try to assemble a runnable jar from a library that has no main | ||
| // class; keep jar enabled so consumers (the apps wrapper) get a normal library artifact. | ||
| bootJar { | ||
| enabled = false | ||
| } | ||
|
|
||
| jar { | ||
| enabled = true | ||
| archiveClassifier = '' | ||
| } | ||
|
|
||
| dependencies { | ||
| // api: the scheduler's public types (e.g. BinPacker, OperationTypeDto) come from | ||
| // :services:optimizer, so consumers of this library see them on their compile classpath. | ||
| api project(':services:optimizer') | ||
| implementation 'org.springframework.boot:spring-boot-starter:2.7.8' | ||
| implementation 'org.springframework.boot:spring-boot-starter-webflux:2.7.8' | ||
| implementation 'org.springframework.boot:spring-boot-starter-data-jpa:2.7.8' | ||
| implementation 'org.springframework.boot:spring-boot-starter-aop:2.7.8' | ||
| runtimeOnly 'mysql:mysql-connector-java:8.0.33' | ||
| testImplementation 'org.springframework.boot:spring-boot-starter-test:2.7.8' | ||
| testRuntimeOnly 'com.h2database:h2' | ||
| } | ||
|
|
||
| test { | ||
| useJUnitPlatform() | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| package com.linkedin.openhouse.optimizer.scheduler; | ||
|
|
||
| import com.linkedin.openhouse.optimizer.model.OperationTypeDto; | ||
| import com.linkedin.openhouse.optimizer.model.TableOperationDto; | ||
| import com.linkedin.openhouse.optimizer.scheduler.client.JobsServiceClient; | ||
| import java.time.Instant; | ||
| import java.util.Collection; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Optional; | ||
| import java.util.Set; | ||
| import java.util.stream.Collectors; | ||
| import lombok.Getter; | ||
| import lombok.RequiredArgsConstructor; | ||
|
|
||
| /** | ||
| * A set of operations the scheduler will submit together as a single Spark job. A bin owns its own | ||
| * launch — callers ask it to schedule itself and react to the returned job id. The surrounding | ||
| * status-update machinery (claim, mark-scheduled, revert-to-pending) lives in the scheduler because | ||
| * it is shared across all bins regardless of operation type. | ||
| */ | ||
| @RequiredArgsConstructor | ||
| public class Bin { | ||
|
|
||
| @Getter private final OperationTypeDto operationType; | ||
| @Getter private final List<TableOperationDto> operations; | ||
|
Comment on lines
+25
to
+26
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shall we keep bin packing generic as common utility instead of referencing internal models and operations?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That should give more flexibility and we should be able to integrate well with the optimizer flow as well existing scheduler flow. I am planning to leverage as a common lib as used in this PR - https://github.com/linkedin/openhouse/pull/604/changes#diff-bd8bddafa29e6a0d0dcc04642cf89b969c4890f53efa9828826e51c25f970a7d. |
||
|
|
||
| /** Operation UUIDs in this bin, parallel to {@link #getTableNames()}. */ | ||
| public List<String> getOperationIds() { | ||
| return operations.stream().map(TableOperationDto::getId).collect(Collectors.toList()); | ||
| } | ||
|
|
||
| /** Fully-qualified {@code database.table} identifiers for the operations in this bin. */ | ||
| public List<String> getTableNames() { | ||
| return operations.stream() | ||
| .map(op -> op.getDatabaseName() + "." + op.getTableName()) | ||
| .collect(Collectors.toList()); | ||
| } | ||
|
|
||
| /** | ||
| * Return a new {@link Bin} containing only the operations whose IDs are in {@code keepIds}. Used | ||
| * by the scheduler to narrow the bin to the rows it actually claimed before launching the job. | ||
| */ | ||
| public Bin subset(Collection<String> keepIds) { | ||
| Set<String> keep = new HashSet<>(keepIds); | ||
| List<TableOperationDto> filtered = | ||
| operations.stream().filter(op -> keep.contains(op.getId())).collect(Collectors.toList()); | ||
| return new Bin(operationType, filtered); | ||
| } | ||
|
|
||
| /** | ||
| * Submit this bin as a single Spark job. Returns the job id on success, or empty on submission | ||
| * failure — the caller is responsible for the surrounding status updates. | ||
| */ | ||
| public Optional<String> schedule(JobsServiceClient client, String resultsEndpoint) { | ||
| String jobName = | ||
| "batched-" + operationType.name().toLowerCase() + "-" + Instant.now().toEpochMilli(); | ||
| return client.launch( | ||
| jobName, operationType.name(), getTableNames(), getOperationIds(), resultsEndpoint); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| package com.linkedin.openhouse.optimizer.scheduler; | ||
|
|
||
| import com.linkedin.openhouse.optimizer.model.TableStatsDto; | ||
| import java.util.List; | ||
|
|
||
| /** | ||
| * Strategy for packing a set of operations into bins for batched job submission. Implementations | ||
| * encode the constraints of a particular packing dimension (file count, partition count, etc.); | ||
| * binding to an operation type is the responsibility of the scheduler configuration, not the | ||
| * strategy class. | ||
| * | ||
| * <p>{@link TableStatsDto} is the cost source at the interface boundary, carried alongside each | ||
| * operation in a {@link SchedulingCandidate}. Implementations project the stats down to the minimal | ||
| * data needed to make their packing decision (e.g. file count for OFD) and do not retain the full | ||
| * stats payload in the returned bins. | ||
| */ | ||
| public interface BinPacker { | ||
|
|
||
| /** | ||
| * Pack {@code pending} into one or more {@link Bin}s. Each returned bin is non-empty; the | ||
| * scheduler dispatches one Spark job per bin. | ||
| */ | ||
| List<Bin> pack(List<SchedulingCandidate> pending); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,84 @@ | ||
| package com.linkedin.openhouse.optimizer.scheduler; | ||
|
|
||
| import com.linkedin.openhouse.optimizer.model.OperationTypeDto; | ||
| import com.linkedin.openhouse.optimizer.model.TableOperationDto; | ||
| import com.linkedin.openhouse.optimizer.model.TableStatsDto; | ||
| import java.util.ArrayList; | ||
| import java.util.Comparator; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.OptionalInt; | ||
| import java.util.stream.Collectors; | ||
| import java.util.stream.IntStream; | ||
| import lombok.RequiredArgsConstructor; | ||
|
|
||
| /** | ||
| * Greedy first-fit-descending bin-packer keyed on per-table file count, projected from each | ||
| * candidate's {@link TableStatsDto}. | ||
| * | ||
| * <p>Candidates are sorted by descending file count, then assigned to the first bin whose running | ||
| * total stays at or below {@code maxFilesPerBin}. An operation larger than the limit gets its own | ||
| * bin (oversized bins are allowed — we never drop an operation). | ||
| */ | ||
| @RequiredArgsConstructor | ||
| public class FileCountBinPacker implements BinPacker { | ||
|
|
||
| private final OperationTypeDto operationType; | ||
| private final long maxFilesPerBin; | ||
|
|
||
| @Override | ||
| public List<Bin> pack(List<SchedulingCandidate> pending) { | ||
| if (pending.isEmpty()) { | ||
| return List.of(); | ||
| } | ||
|
|
||
| // Project once: each candidate's packing cost is just a long, keyed by operation id. | ||
| Map<String, Long> costByOperationId = | ||
| pending.stream() | ||
| .collect(Collectors.toMap(c -> c.getOperation().getId(), c -> cost(c.getStats()))); | ||
|
|
||
| List<TableOperationDto> sorted = | ||
| pending.stream() | ||
| .map(SchedulingCandidate::getOperation) | ||
| .sorted( | ||
| Comparator.comparingLong( | ||
| (TableOperationDto op) -> costByOperationId.get(op.getId())) | ||
| .reversed()) | ||
| .collect(Collectors.toList()); | ||
|
|
||
| // First-fit-descending is inherently stateful — each placement depends on the running totals | ||
| // for bins assembled so far. | ||
| List<List<TableOperationDto>> binContents = new ArrayList<>(); | ||
| List<Long> binTotals = new ArrayList<>(); | ||
| sorted.forEach( | ||
| op -> { | ||
| long c = costByOperationId.get(op.getId()); | ||
| OptionalInt placed = | ||
| IntStream.range(0, binContents.size()) | ||
| .filter(i -> binTotals.get(i) + c <= maxFilesPerBin || binTotals.get(i) == 0) | ||
| .findFirst(); | ||
| if (placed.isPresent()) { | ||
| int idx = placed.getAsInt(); | ||
| binContents.get(idx).add(op); | ||
| binTotals.set(idx, binTotals.get(idx) + c); | ||
| } else { | ||
| List<TableOperationDto> newBin = new ArrayList<>(); | ||
| newBin.add(op); | ||
| binContents.add(newBin); | ||
| binTotals.add(c); | ||
| } | ||
| }); | ||
|
|
||
| return binContents.stream() | ||
| .map(ops -> new Bin(operationType, ops)) | ||
| .collect(Collectors.toList()); | ||
| } | ||
|
|
||
| private static long cost(TableStatsDto stats) { | ||
| if (stats == null || stats.getSnapshot() == null) { | ||
| return 0L; | ||
| } | ||
| Long n = stats.getSnapshot().getNumCurrentFiles(); | ||
| return n != null ? n : 0L; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am introducing BinItem as part of this PR - https://github.com/linkedin/openhouse/pull/599/changes#diff-5e026d8449953ed5f2853964c9fc6427827dac24fa3b9dba10318fb6618fb703 to represent data a granular level. I will rebase my PR once this PR is merged.