feat(optimizer): [5/n] integrate remaining optimizer work for skeleton#591
Draft
mkuchenbecker wants to merge 32 commits into
Draft
feat(optimizer): [5/n] integrate remaining optimizer work for skeleton#591mkuchenbecker wants to merge 32 commits into
mkuchenbecker wants to merge 32 commits into
Conversation
Multi-table OFD app that amortizes Spark-session startup across many tables in a single job, using a driver-side thread pool so each table's deletion runs concurrently. When --resultsEndpoint and --operationIds are supplied, each table's outcome (SUCCESS/FAILED with file count + bytes deleted, or errorMessage+errorType) is POSTed to the optimizer's complete-operation endpoint as it completes — letting the optimizer track per-table status independently of the overall job. Operations.java grows a deleteOrphanFilesWithMetrics() overload that delegates from the existing deleteOrphanFiles() and additionally tracks bytes deleted via an AtomicLong inside the deleteWith callback. The existing single-table OrphanFilesDeletionSparkApp is unchanged. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
- table_operations_history grows four nullable columns:
orphan_files_deleted BIGINT
orphan_bytes_deleted BIGINT
error_message VARCHAR(1024)
error_type VARCHAR(256)
- Row, model, DTO, history-DTO all gain the four fields.
- CompleteOperationRequestDto drops the body-based operationId — id moves
to the URL path. New endpoint shape is POST /v1/optimizer/operations/{id}/complete.
- OptimizerDataService.completeOperation grows four trailing nullable
parameters carrying the metrics / error details.
- Scheduler config default + test fixture URLs migrate from the stale
/v1/table-operations to the correct /v1/optimizer/operations.
- BatchedOFD Spark app's OperationResult payload flattens the nested
ResultPayload into peer fields so the body matches the flat DTO.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
New OptimizerTableStatsClient PUTs /v1/optimizer/stats/{tableUuid} after
every successful Iceberg commit, carrying database/table names and the
current tableProperties (which the optimizer analyzer reads to decide
opt-in for OFD via maintenance.optimizer.ofd.enabled).
Activated by cluster.optimizer.base-uri. When the property is absent
the bean is not created and IcebergSnapshotsServiceImpl skips the call
(behavior unchanged for unit tests and dev clusters without an
optimizer).
The hook is best-effort: WebClient failures are caught and logged at
WARN; the commit itself succeeds either way. The analyzer re-reads
stats on every cycle so a transient post failure is recovered by the
next commit.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Three new repo-root Dockerfiles:
- optimizer-service.Dockerfile (Spring web app, like tables-service)
- optimizer-analyzer.Dockerfile (loops the CommandLineRunner on
ANALYZER_INTERVAL_SECONDS; default 30s)
- optimizer-scheduler.Dockerfile (loops on SCHEDULER_INTERVAL_SECONDS;
default 30s)
Compose wiring:
- common/oh-services.yml — three new service blocks pointing at the
new Dockerfiles; optimizer-service exposes 8005:8080
- oh-hadoop-spark/docker-compose.yml — includes the three services
with MySQL credentials and inter-service URLs (analyzer/scheduler
point at openhouse-optimizer; scheduler points at openhouse-jobs)
- oh-hadoop-spark/cluster.yaml — sets cluster.optimizer.base-uri so
the on-commit hook in tables service activates
Build wiring:
- build.gradle dockerPrereqs depends on the analyzer/scheduler
bootJar targets
Renamed OPTIMIZER_DB_USERNAME → OPTIMIZER_DB_USER in the optimizer
service application.properties so all three optimizer JVMs read the
same env var name.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
AnalyzerApplication and SchedulerApplication scanned com.linkedin.openhouse.optimizer.entity for JPA-managed entities, but the entities live in com.linkedin.openhouse.optimizer.db. Both apps crashed at startup against a real database (MySQL backend in docker) with "Not a managed type: class ...db.TableStatsRow". Unit tests didn't catch it because they configure JPA differently. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Discovered while wiring up the OFD flow on the live oh-hadoop-spark
stack. Each fix lives in a file owned by a lower PR; consolidating on
opt-5 per the delta-discovery framing, to be relocated in a follow-up.
1. FileCountBinPacker.cost() NPE on stats with null snapshot.
The analyzer can legitimately schedule OFD against a table whose
stats row has no snapshot (e.g. brand-new table). Treat that as
cost 0 instead of crashing.
2. SchedulerRunner.schedule(OperationType) was non-@transactional;
the 3-arg overload was annotated but self-invocation bypasses the
CGLIB proxy, so the @Modifying repository calls hit
TransactionRequiredException. Annotate the no-arg variant too.
3. TableOperation model dropped jobId entirely (db row had it; api
dto had a field but couldn't be populated). Wire jobId through
model.toRow/fromRow and dto.toModel/fromModel so the SCHEDULED
row exposes its Spark job id over the API.
4. jobs.yaml ORPHAN_FILES_DELETION mapping pointed at the single-table
OrphanFilesDeletionSparkApp, which doesn't recognize --tableNames
and crashed at arg parsing. Route to BatchedOrphanFilesDeletionSparkApp.
Verified end-to-end: stats PUT -> analyzer PENDING -> scheduler SCHEDULED
with jobId -> Livy -> BatchedOFD launches -> NoSuchTableException for
the stub table -> POST /operations/{id}/complete with FAILED + errorMessage
+ errorType -> history row written. The success path is structurally the
same; we'll exercise it once we wire up real Spark-table creation.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Cascades the api-spec strip-Dto + model add-Dto rename up from opt-0. Conflict resolution on six files where opt-5's Step 3 / Step 7 additions (CompleteOperationRequest fields, jobId plumbing, scheduler @transactional + null-guard) had to be reconciled with opt-4's renamed types. Result: opt-5 keeps its semantic additions and uses the new type names. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Cascades the CompleteOperation → UpdateOperation rename. Conflict
resolution on the six files where opt-5's earlier Step-3 / Step-7
additions had to be combined with the rename:
- Path-based id (opt-5) + /update path (opt-2 rename) → /{id}/update
- Expanded service signature (opt-5: +4 metric params) kept;
method name renamed completeOperation → updateOperation
- Test method names updated
- BatchedOFD Spark app's POST builder + log msg → /update
- DEMO.md and dev-loop.md curl examples → /{id}/update
Conflict on TableOperationsController.updateOperation: opt-4 added path/body operationId validation; opt-5 had already removed operationId from the body in favor of metrics + status only. Resolution keeps opt-5's shape — drop the operationId checks (no such field), keep the status null check, keep the metrics-carrying service call. ControllerErrorHandlingTest trimmed accordingly (5 cases instead of 7).
…FD demo
The SUCCESS-path demo on -5 exposed five separate gaps between the merged
component layers; each was a real bug that blocked the loop from running
green out-of-the-box.
1. build.gradle dockerPrereqs + Dockerfile BUILD_DIR pointed at the
pre-split paths (apps/optimizer-analyzer, apps/optimizer-scheduler).
After the analyzer/scheduler module split into
apps/optimizer/{analyzerapp,schedulerapp} and the corresponding
build outputs land at build/{analyzerapp,schedulerapp}/libs/,
dockerUp failed with "Task with path ':apps:optimizer-analyzer:bootJar'
not found".
2. tables-service env var CLUSTER_OPTIMIZER_BASE_URI. The
OptimizerTableStatsClient @ConditionalOnProperty(name=
"cluster.optimizer.base-uri") did not activate from the cluster.yaml-
loaded value alone in the running container — likely a property-source
ordering race between the tables scan-package and ClusterProperties'
@propertysource. Setting the env var lets Spring's relaxed binding
resolve the property deterministically before the conditional fires.
Verified via /actuator/beans: optimizerTableStatsClient now appears
in the bean list.
3. CadenceBasedOrphanFilesDeletionAnalyzer ambiguous-constructor crash.
The class has two constructors (a public @Value-injected one and a
package-private test one). Without @Autowired on either, Spring
couldn't pick and threw
NoSuchMethodException: ...<init>()
on every analyzer cron pass. The analyzer integration test wires the
policy directly via the test constructor, so the @Component-scan path
was never exercised — real bug only visible in the deployable app.
4. jobs.yaml --ttl 0 for ORPHAN_FILES_DELETION. The Spark app defaults to
a 7-day older-than filter on candidate files. Newly-expired snapshots
in the test stack are seconds old, so OFD reported 0 files deleted
every run. Adding --ttl 0 makes all orphans eligible in the test
recipe.
End-to-end SUCCESS verified on 2/3 tables in the docker stack:
demo_ofd_b (23 files / 101,503 bytes deleted) and demo_ofd_c (11 files /
47,385 bytes deleted) both reach status=SUCCESS in history with
per-table metrics. The 3rd table hits a docker-stack OOM
(executor SIGKILL/137) under three concurrent OFD jobs, not a code bug.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ion tip
Four scripts at the repo root that drive the optimizer loop against a
local oh-hadoop-spark docker stack, adapted for the -5 API surface:
- local-spark-sql.sh — Livy session-reuse shim for one-liner Spark SQL.
- demo-setup.sh — create N tables with maintenance.optimizer.ofd.enabled
=true, populate via INSERT OVERWRITE, expire snapshots (producing
orphan data files on HDFS), then wait for the continuous analyzer +
scheduler to land SCHEDULED rows.
- demo-check.sh — show per-table HDFS file counts and the most recent
operations-history entries per tableUuid.
- demo.sh — wraps the setup, polls for SUCCESS history per table, and
runs demo-check.sh at the end.
URLs adapted from the tip's pre-stack surface
(/v1/table-stats, /v1/table-operations, port 8003) to -5's stacked
surface (/v1/optimizer/stats, /v1/optimizer/operations,
/v1/optimizer/operations-history/{tableUuid}, port 8005, required
?limit on list endpoints, history lookup by tableUuid rather than by
databaseName).
The analyzer + scheduler run continuously on -5 (every 30s loop in the
analyzer/scheduler containers), so the manual "run-analyzer"/
"run-scheduler" docker-compose profiles the tip used are dropped — the
demo just waits for the next cron pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Cascades scheduler property prefix rename + SchedulerApplication ExitCodeGenerator pattern from -4 review feedback.
After a successful Iceberg snapshot commit, push the table's current
snapshot summary to the optimizer's PUT /v1/optimizer/stats/{tableUuid}
endpoint. Opt-in per table via the maintenance.optimizer.stats.enabled
table property; globally gated by optimizer.stats.enabled in
application.properties (default false).
Implementation notes:
- Stats sourced from the in-memory Snapshot.summary() map captured at
convertToTableDto(); no extra HDFS round-trip on the commit path.
- WebClient + Reactor pipeline runs fire-and-forget on the Netty event
loop. The hook on the commit thread is .subscribe() and returns
immediately, so a slow/down optimizer cannot impact commit latency.
- Per-attempt timeout 1000 ms, total budget 2000 ms, up to 3 attempts.
Retries only fire on retryable errors (network, 408, 429, 5xx,
TimeoutException). All terminal errors are swallowed; Micrometer
counters and a warn log preserve observability.
- Bean wiring (WebClient, client component) is @ConditionalOnProperty
on optimizer.stats.enabled=true. The service consumes the client as
an Optional, so disabled deployments construct nothing.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Restructures the post-commit stats push around a small generic dispatcher so async/timeout/swallow plumbing lives in one place. Addresses the review comments on PR #608. Framework (new): - PostCommitOperation interface: String name() + Optional<Mono<Void>> prepare(TableDto). Implementations describe what to do; the dispatcher owns how it runs. - PostCommitDispatcher (@ConditionalOnProperty tables.postcommit.enabled): per-op wall-clock timeout, error swallow, metric emission, fire-and- forget subscribe. Exposes package-private decorate() for synchronous testing. - PostCommitProperties: enabled (default false), per-op-timeout-ms (default 3000). - Metric keys renamed: POSTCOMMIT_OP_DURATION / POSTCOMMIT_OP_SKIPPED / POSTCOMMIT_OP_FAILED, tagged op={name} and outcome={success, timeout, network_error, server_error, client_error, prepare_threw, unknown_error}. Why async: a synchronous post-commit push converts an optimizer outage into a tables write outage. Stats are a best-effort scheduling signal and must not block the write path. Crash-loss is bounded because state is cumulative: the next commit re-pushes the current state. Operation (refactor): - OptimizerStatsClient -> OptimizerStatsPostCommitOperation implementing PostCommitOperation. Drops outer timeout, onErrorResume, .subscribe() (dispatcher owns these). Keeps per-attempt timeout + bounded retry on retryable errors only. - Adds snapshotId to OptimizerStatsRequest.Snapshot; tracked for server-side idempotency wiring in BDP-102985. - Path constant comment now cites TableStatsController.TABLE_PATH_TEMPLATE as the canonical source. TableDto: - currentSnapshotSummary (Map) replaced by Optional<CurrentSnapshotInfo> (snapshotId + summary). Stored nullable internally; consumers read through getCurrentSnapshot() which returns Optional. Javadoc rewritten to describe the semantic condition (presence ~ table has a committed snapshot), not the private impl that populates it. - New CurrentSnapshotInfo value class. - InternalRepositoryUtils populates the new field with both snapshotId and summary; no extra I/O (still purely in-memory). Service: - IcebergSnapshotsServiceImpl: Optional<OptimizerStatsClient> replaced by Optional<PostCommitDispatcher>; on-commit hook is one line. Optimizer side: - TableStatsController publishes BASE_PATH and TABLE_PATH_TEMPLATE as public constants; @RequestMapping refactored to use BASE_PATH. Config: - application.properties: drop optimizer.stats.total-timeout-ms; add tables.postcommit.enabled + tables.postcommit.per-op-timeout-ms. Tests: - New PostCommitDispatcherTest (6 cases, no sleeps; uses decorate() to block synchronously). - OptimizerStatsClientTest renamed -> OptimizerStatsPostCommitOperationTest (6 cases adapted to the new prepare() contract; timeout case moved to PostCommitDispatcherTest). - IcebergSnapshotsServiceTest swaps @MockBean OptimizerStatsClient for @MockBean PostCommitDispatcher and verifies dispatch(savedDto) once. - TableDtoMappingTest updated for the renamed field. - All :services:tables and :services:optimizer tests pass; spotless clean. Filed: BDP-102985 (Optimizer stats: use snapshot ID as idempotency token on upsert) under epic BDP-102026. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Per review feedback that the HTML tags made the diff ugly. Drop paragraph breaks where the comment can be one sentence/paragraph, drop bullet lists in favor of inline prose, drop <em>/<b> emphasis entirely. No behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Spotless's googlejavaformat rewrites multi-paragraph block javadoc to re-insert <p> tags. // comments are left untouched. Per review feedback that the source should read like prose with paragraph breaks, not like a webpage, swap all multi-paragraph descriptions on the new files to // blocks. Single-line descriptors that were javadoc become a single // line above the declaration. Complete sentences throughout. No behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Server-side idempotency wiring is a follow-up; the comment now states that without naming an internal tracker. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…mizer-5 Conflicts were the pre-existing tables.config.OptimizerTableStatsClient on optimizer-5 vs the new generic PostCommitDispatcher framework. The new framework supersedes it (same wire endpoint, plus per-table opt-in, async fire-and-forget, bounded timeout/retry, snapshot-summary payload). Resolution: - Replaced @Autowired OptimizerTableStatsClient with Optional<PostCommitDispatcher> in IcebergSnapshotsServiceImpl. - Removed services/tables/.../config/OptimizerTableStatsClient.java and the dead cluster.optimizer.base-uri config field on ClusterProperties.java (no other consumers). - Replaced the @MockBean OptimizerTableStatsClient + verify() in IcebergSnapshotsServiceTest.testTableCreated with the testPostCommitDispatcherInvokedAfterSuccessfulCommit case from postcommit-stats. Build + the 22 affected tests (PostCommitDispatcherTest, OptimizerStatsPostCommitOperationTest, IcebergSnapshotsServiceTest, TableDtoMappingTest) all pass on the merged tree. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The on-commit stats hook used to be gated by cluster.optimizer.base-uri
and pushed a minimal payload (databaseName/tableName/properties only).
The merge replaces it with the PostCommitOperation framework, which is
opt-in per table and pushes the full snapshot summary + snapshot id.
Update the demo to exercise the new path:
* docker-compose: set TABLES_POSTCOMMIT_ENABLED=true,
OPTIMIZER_STATS_ENABLED=true, OPTIMIZER_STATS_BASE_URI (replaces
CLUSTER_OPTIMIZER_BASE_URI, which is no longer read by anything).
* CREATE TABLE in demo-setup.sh now sets
maintenance.optimizer.stats.enabled=true alongside the OFD flag,
so each demo table is opted in to the stats push.
* Stats verification is now a bounded poll (30s) because the push is
async fire-and-forget. After settling, the check also asserts the
new payload shape: a sample table's stats row carries non-zero
numCurrentFiles, non-zero tableSizeBytes, and the snapshotId.
End result: the demo still passes if and only if every committed table
generated a stats row, and the row actually contains snapshot stats
(not just identity fields).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Scope
Single large PR (per @mkuchenbecker's direction; will be triaged into follow-on
PRs). Reimplements remaining
mkuchenb/optimizerintegration-branch work on topof opt-4's architecture (BinPacker / Bin / SchedulingCandidate; no REST between
analyzer and scheduler).
The integration branch's older Analyzer + Submitter + REST design is not
ported — opt-2/3/4 replaced it.
Roadmap
BatchedOrphanFilesDeletionSparkApp+Operations.deleteOrphanFilesWithMetrics()CompleteOperationRequestcarries per-table metrics(
orphanFilesDeleted,orphanBytesDeleted,errorMessage,errorType)table_statsschema + upsert/get endpoints(
OptimizerTableStatsClientinvoked fromIcebergSnapshotsServiceImpl)+ compose wiring (
oh-hadoop-sparkandoh-only)DEMO.mdOut of scope here
code/docs/table-optimizer/STATE.md/ARCHITECTURE.md/EXTENDING.mdto reflect the integrated state (follow-up after opt-5 takes shape).
Plan
code/docs/table-optimizer/STATE.mddocuments opt-0..opt-4 — opt-5 layers on top.Detailed plan lives in the agent session.