Skip to content

feat(optimizer): [5/n] integrate remaining optimizer work for skeleton#591

Draft
mkuchenbecker wants to merge 32 commits into
mkuchenb/optimizer-4from
mkuchenb/optimizer-5
Draft

feat(optimizer): [5/n] integrate remaining optimizer work for skeleton#591
mkuchenbecker wants to merge 32 commits into
mkuchenb/optimizer-4from
mkuchenb/optimizer-5

Conversation

@mkuchenbecker
Copy link
Copy Markdown
Collaborator

@mkuchenbecker mkuchenbecker commented May 20, 2026

Scope

Single large PR (per @mkuchenbecker's direction; will be triaged into follow-on
PRs). Reimplements remaining mkuchenb/optimizer integration-branch work on top
of opt-4's architecture (BinPacker / Bin / SchedulingCandidate; no REST between
analyzer and scheduler).

The integration branch's older Analyzer + Submitter + REST design is not
ported — opt-2/3/4 replaced it.

Roadmap

  • Step 2BatchedOrphanFilesDeletionSparkApp + Operations.deleteOrphanFilesWithMetrics()
  • Step 3CompleteOperationRequest carries per-table metrics
    (orphanFilesDeleted, orphanBytesDeleted, errorMessage, errorType)
  • Step 4 — HouseTables table_stats schema + upsert/get endpoints
  • Step 5 — Tables service on-commit metrics hook
    (OptimizerTableStatsClient invoked from IcebergSnapshotsServiceImpl)
  • Step 6 — Dockerfiles for optimizer-service / -analyzer / -scheduler
    + compose wiring (oh-hadoop-spark and oh-only)
  • Step 7 — End-to-end docker integration test + DEMO.md

Out of scope here

  • Splitting into smaller PRs (deferred until review).
  • Closing/merging opt-0..opt-4.
  • Updating code/docs/table-optimizer/STATE.md / ARCHITECTURE.md / EXTENDING.md
    to reflect the integrated state (follow-up after opt-5 takes shape).

Plan

code/docs/table-optimizer/STATE.md documents opt-0..opt-4 — opt-5 layers on top.
Detailed plan lives in the agent session.

mkuchenbecker and others added 5 commits May 20, 2026 00:01
Multi-table OFD app that amortizes Spark-session startup across many
tables in a single job, using a driver-side thread pool so each table's
deletion runs concurrently.

When --resultsEndpoint and --operationIds are supplied, each table's
outcome (SUCCESS/FAILED with file count + bytes deleted, or
errorMessage+errorType) is POSTed to the optimizer's complete-operation
endpoint as it completes — letting the optimizer track per-table status
independently of the overall job.

Operations.java grows a deleteOrphanFilesWithMetrics() overload that
delegates from the existing deleteOrphanFiles() and additionally tracks
bytes deleted via an AtomicLong inside the deleteWith callback. The
existing single-table OrphanFilesDeletionSparkApp is unchanged.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
- table_operations_history grows four nullable columns:
    orphan_files_deleted BIGINT
    orphan_bytes_deleted BIGINT
    error_message        VARCHAR(1024)
    error_type           VARCHAR(256)
- Row, model, DTO, history-DTO all gain the four fields.
- CompleteOperationRequestDto drops the body-based operationId — id moves
  to the URL path. New endpoint shape is POST /v1/optimizer/operations/{id}/complete.
- OptimizerDataService.completeOperation grows four trailing nullable
  parameters carrying the metrics / error details.
- Scheduler config default + test fixture URLs migrate from the stale
  /v1/table-operations to the correct /v1/optimizer/operations.
- BatchedOFD Spark app's OperationResult payload flattens the nested
  ResultPayload into peer fields so the body matches the flat DTO.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
New OptimizerTableStatsClient PUTs /v1/optimizer/stats/{tableUuid} after
every successful Iceberg commit, carrying database/table names and the
current tableProperties (which the optimizer analyzer reads to decide
opt-in for OFD via maintenance.optimizer.ofd.enabled).

Activated by cluster.optimizer.base-uri. When the property is absent
the bean is not created and IcebergSnapshotsServiceImpl skips the call
(behavior unchanged for unit tests and dev clusters without an
optimizer).

The hook is best-effort: WebClient failures are caught and logged at
WARN; the commit itself succeeds either way. The analyzer re-reads
stats on every cycle so a transient post failure is recovered by the
next commit.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Three new repo-root Dockerfiles:
  - optimizer-service.Dockerfile (Spring web app, like tables-service)
  - optimizer-analyzer.Dockerfile (loops the CommandLineRunner on
    ANALYZER_INTERVAL_SECONDS; default 30s)
  - optimizer-scheduler.Dockerfile (loops on SCHEDULER_INTERVAL_SECONDS;
    default 30s)

Compose wiring:
  - common/oh-services.yml — three new service blocks pointing at the
    new Dockerfiles; optimizer-service exposes 8005:8080
  - oh-hadoop-spark/docker-compose.yml — includes the three services
    with MySQL credentials and inter-service URLs (analyzer/scheduler
    point at openhouse-optimizer; scheduler points at openhouse-jobs)
  - oh-hadoop-spark/cluster.yaml — sets cluster.optimizer.base-uri so
    the on-commit hook in tables service activates

Build wiring:
  - build.gradle dockerPrereqs depends on the analyzer/scheduler
    bootJar targets

Renamed OPTIMIZER_DB_USERNAME → OPTIMIZER_DB_USER in the optimizer
service application.properties so all three optimizer JVMs read the
same env var name.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
AnalyzerApplication and SchedulerApplication scanned
com.linkedin.openhouse.optimizer.entity for JPA-managed entities, but
the entities live in com.linkedin.openhouse.optimizer.db. Both apps
crashed at startup against a real database (MySQL backend in docker)
with "Not a managed type: class ...db.TableStatsRow". Unit tests
didn't catch it because they configure JPA differently.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@mkuchenbecker mkuchenbecker changed the title [BDP-102028] feat(optimizer): [5/?] integrate remaining optimizer work [BDP-102028] feat(optimizer): [5/n] integrate remaining optimizer work May 20, 2026
mkuchenbecker and others added 8 commits May 20, 2026 09:40
Discovered while wiring up the OFD flow on the live oh-hadoop-spark
stack. Each fix lives in a file owned by a lower PR; consolidating on
opt-5 per the delta-discovery framing, to be relocated in a follow-up.

  1. FileCountBinPacker.cost() NPE on stats with null snapshot.
     The analyzer can legitimately schedule OFD against a table whose
     stats row has no snapshot (e.g. brand-new table). Treat that as
     cost 0 instead of crashing.

  2. SchedulerRunner.schedule(OperationType) was non-@transactional;
     the 3-arg overload was annotated but self-invocation bypasses the
     CGLIB proxy, so the @Modifying repository calls hit
     TransactionRequiredException. Annotate the no-arg variant too.

  3. TableOperation model dropped jobId entirely (db row had it; api
     dto had a field but couldn't be populated). Wire jobId through
     model.toRow/fromRow and dto.toModel/fromModel so the SCHEDULED
     row exposes its Spark job id over the API.

  4. jobs.yaml ORPHAN_FILES_DELETION mapping pointed at the single-table
     OrphanFilesDeletionSparkApp, which doesn't recognize --tableNames
     and crashed at arg parsing. Route to BatchedOrphanFilesDeletionSparkApp.

Verified end-to-end: stats PUT -> analyzer PENDING -> scheduler SCHEDULED
with jobId -> Livy -> BatchedOFD launches -> NoSuchTableException for
the stub table -> POST /operations/{id}/complete with FAILED + errorMessage
+ errorType -> history row written. The success path is structurally the
same; we'll exercise it once we wire up real Spark-table creation.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Cascades the api-spec strip-Dto + model add-Dto rename up from
opt-0. Conflict resolution on six files where opt-5's Step 3 / Step 7
additions (CompleteOperationRequest fields, jobId plumbing, scheduler
@transactional + null-guard) had to be reconciled with opt-4's renamed
types. Result: opt-5 keeps its semantic additions and uses the new
type names.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Cascades the CompleteOperation → UpdateOperation rename. Conflict
resolution on the six files where opt-5's earlier Step-3 / Step-7
additions had to be combined with the rename:

  - Path-based id (opt-5) + /update path (opt-2 rename) → /{id}/update
  - Expanded service signature (opt-5: +4 metric params) kept;
    method name renamed completeOperation → updateOperation
  - Test method names updated
  - BatchedOFD Spark app's POST builder + log msg → /update
  - DEMO.md and dev-loop.md curl examples → /{id}/update
@mkuchenbecker mkuchenbecker changed the title [BDP-102028] feat(optimizer): [5/n] integrate remaining optimizer work feat(optimizer): [5/n] integrate remaining optimizer work May 22, 2026
@mkuchenbecker mkuchenbecker changed the title feat(optimizer): [5/n] integrate remaining optimizer work feat(optimizer): [5/n] integrate remaining optimizer work for skeleton May 22, 2026
mkuchenbecker and others added 14 commits May 22, 2026 12:28
Conflict on TableOperationsController.updateOperation: opt-4 added
path/body operationId validation; opt-5 had already removed operationId
from the body in favor of metrics + status only. Resolution keeps opt-5's
shape — drop the operationId checks (no such field), keep the status
null check, keep the metrics-carrying service call. ControllerErrorHandlingTest
trimmed accordingly (5 cases instead of 7).
…FD demo

The SUCCESS-path demo on -5 exposed five separate gaps between the merged
component layers; each was a real bug that blocked the loop from running
green out-of-the-box.

1. build.gradle dockerPrereqs + Dockerfile BUILD_DIR pointed at the
   pre-split paths (apps/optimizer-analyzer, apps/optimizer-scheduler).
   After the analyzer/scheduler module split into
   apps/optimizer/{analyzerapp,schedulerapp} and the corresponding
   build outputs land at build/{analyzerapp,schedulerapp}/libs/,
   dockerUp failed with "Task with path ':apps:optimizer-analyzer:bootJar'
   not found".

2. tables-service env var CLUSTER_OPTIMIZER_BASE_URI. The
   OptimizerTableStatsClient @ConditionalOnProperty(name=
   "cluster.optimizer.base-uri") did not activate from the cluster.yaml-
   loaded value alone in the running container — likely a property-source
   ordering race between the tables scan-package and ClusterProperties'
   @propertysource. Setting the env var lets Spring's relaxed binding
   resolve the property deterministically before the conditional fires.
   Verified via /actuator/beans: optimizerTableStatsClient now appears
   in the bean list.

3. CadenceBasedOrphanFilesDeletionAnalyzer ambiguous-constructor crash.
   The class has two constructors (a public @Value-injected one and a
   package-private test one). Without @Autowired on either, Spring
   couldn't pick and threw
   NoSuchMethodException: ...<init>()
   on every analyzer cron pass. The analyzer integration test wires the
   policy directly via the test constructor, so the @Component-scan path
   was never exercised — real bug only visible in the deployable app.

4. jobs.yaml --ttl 0 for ORPHAN_FILES_DELETION. The Spark app defaults to
   a 7-day older-than filter on candidate files. Newly-expired snapshots
   in the test stack are seconds old, so OFD reported 0 files deleted
   every run. Adding --ttl 0 makes all orphans eligible in the test
   recipe.

End-to-end SUCCESS verified on 2/3 tables in the docker stack:
demo_ofd_b (23 files / 101,503 bytes deleted) and demo_ofd_c (11 files /
47,385 bytes deleted) both reach status=SUCCESS in history with
per-table metrics. The 3rd table hits a docker-stack OOM
(executor SIGKILL/137) under three concurrent OFD jobs, not a code bug.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ion tip

Four scripts at the repo root that drive the optimizer loop against a
local oh-hadoop-spark docker stack, adapted for the -5 API surface:

- local-spark-sql.sh — Livy session-reuse shim for one-liner Spark SQL.
- demo-setup.sh — create N tables with maintenance.optimizer.ofd.enabled
  =true, populate via INSERT OVERWRITE, expire snapshots (producing
  orphan data files on HDFS), then wait for the continuous analyzer +
  scheduler to land SCHEDULED rows.
- demo-check.sh — show per-table HDFS file counts and the most recent
  operations-history entries per tableUuid.
- demo.sh — wraps the setup, polls for SUCCESS history per table, and
  runs demo-check.sh at the end.

URLs adapted from the tip's pre-stack surface
(/v1/table-stats, /v1/table-operations, port 8003) to -5's stacked
surface (/v1/optimizer/stats, /v1/optimizer/operations,
/v1/optimizer/operations-history/{tableUuid}, port 8005, required
?limit on list endpoints, history lookup by tableUuid rather than by
databaseName).

The analyzer + scheduler run continuously on -5 (every 30s loop in the
analyzer/scheduler containers), so the manual "run-analyzer"/
"run-scheduler" docker-compose profiles the tip used are dropped — the
demo just waits for the next cron pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Cascades scheduler property prefix rename + SchedulerApplication
ExitCodeGenerator pattern from -4 review feedback.
After a successful Iceberg snapshot commit, push the table's current
snapshot summary to the optimizer's PUT /v1/optimizer/stats/{tableUuid}
endpoint. Opt-in per table via the maintenance.optimizer.stats.enabled
table property; globally gated by optimizer.stats.enabled in
application.properties (default false).

Implementation notes:
- Stats sourced from the in-memory Snapshot.summary() map captured at
  convertToTableDto(); no extra HDFS round-trip on the commit path.
- WebClient + Reactor pipeline runs fire-and-forget on the Netty event
  loop. The hook on the commit thread is .subscribe() and returns
  immediately, so a slow/down optimizer cannot impact commit latency.
- Per-attempt timeout 1000 ms, total budget 2000 ms, up to 3 attempts.
  Retries only fire on retryable errors (network, 408, 429, 5xx,
  TimeoutException). All terminal errors are swallowed; Micrometer
  counters and a warn log preserve observability.
- Bean wiring (WebClient, client component) is @ConditionalOnProperty
  on optimizer.stats.enabled=true. The service consumes the client as
  an Optional, so disabled deployments construct nothing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Restructures the post-commit stats push around a small generic dispatcher
so async/timeout/swallow plumbing lives in one place. Addresses the
review comments on PR #608.

Framework (new):
- PostCommitOperation interface: String name() + Optional<Mono<Void>>
  prepare(TableDto). Implementations describe what to do; the dispatcher
  owns how it runs.
- PostCommitDispatcher (@ConditionalOnProperty tables.postcommit.enabled):
  per-op wall-clock timeout, error swallow, metric emission, fire-and-
  forget subscribe. Exposes package-private decorate() for synchronous
  testing.
- PostCommitProperties: enabled (default false), per-op-timeout-ms
  (default 3000).
- Metric keys renamed: POSTCOMMIT_OP_DURATION / POSTCOMMIT_OP_SKIPPED /
  POSTCOMMIT_OP_FAILED, tagged op={name} and outcome={success, timeout,
  network_error, server_error, client_error, prepare_threw,
  unknown_error}.

Why async: a synchronous post-commit push converts an optimizer outage
into a tables write outage. Stats are a best-effort scheduling signal
and must not block the write path. Crash-loss is bounded because state
is cumulative: the next commit re-pushes the current state.

Operation (refactor):
- OptimizerStatsClient -> OptimizerStatsPostCommitOperation implementing
  PostCommitOperation. Drops outer timeout, onErrorResume, .subscribe()
  (dispatcher owns these). Keeps per-attempt timeout + bounded retry on
  retryable errors only.
- Adds snapshotId to OptimizerStatsRequest.Snapshot; tracked for
  server-side idempotency wiring in BDP-102985.
- Path constant comment now cites TableStatsController.TABLE_PATH_TEMPLATE
  as the canonical source.

TableDto:
- currentSnapshotSummary (Map) replaced by Optional<CurrentSnapshotInfo>
  (snapshotId + summary). Stored nullable internally; consumers read
  through getCurrentSnapshot() which returns Optional. Javadoc rewritten
  to describe the semantic condition (presence ~ table has a committed
  snapshot), not the private impl that populates it.
- New CurrentSnapshotInfo value class.
- InternalRepositoryUtils populates the new field with both snapshotId
  and summary; no extra I/O (still purely in-memory).

Service:
- IcebergSnapshotsServiceImpl: Optional<OptimizerStatsClient> replaced
  by Optional<PostCommitDispatcher>; on-commit hook is one line.

Optimizer side:
- TableStatsController publishes BASE_PATH and TABLE_PATH_TEMPLATE as
  public constants; @RequestMapping refactored to use BASE_PATH.

Config:
- application.properties: drop optimizer.stats.total-timeout-ms; add
  tables.postcommit.enabled + tables.postcommit.per-op-timeout-ms.

Tests:
- New PostCommitDispatcherTest (6 cases, no sleeps; uses decorate() to
  block synchronously).
- OptimizerStatsClientTest renamed -> OptimizerStatsPostCommitOperationTest
  (6 cases adapted to the new prepare() contract; timeout case moved to
  PostCommitDispatcherTest).
- IcebergSnapshotsServiceTest swaps @MockBean OptimizerStatsClient for
  @MockBean PostCommitDispatcher and verifies dispatch(savedDto) once.
- TableDtoMappingTest updated for the renamed field.
- All :services:tables and :services:optimizer tests pass; spotless
  clean.

Filed: BDP-102985 (Optimizer stats: use snapshot ID as idempotency
token on upsert) under epic BDP-102026.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
mkuchenbecker and others added 5 commits May 27, 2026 17:39
Per review feedback that the HTML tags made the diff ugly. Drop
paragraph breaks where the comment can be one sentence/paragraph,
drop bullet lists in favor of inline prose, drop <em>/<b> emphasis
entirely. No behavior change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Spotless's googlejavaformat rewrites multi-paragraph block javadoc to
re-insert <p> tags. // comments are left untouched. Per review
feedback that the source should read like prose with paragraph
breaks, not like a webpage, swap all multi-paragraph descriptions on
the new files to // blocks. Single-line descriptors that were javadoc
become a single // line above the declaration. Complete sentences
throughout. No behavior change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Server-side idempotency wiring is a follow-up; the comment now states
that without naming an internal tracker.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…mizer-5

Conflicts were the pre-existing tables.config.OptimizerTableStatsClient
on optimizer-5 vs the new generic PostCommitDispatcher framework. The
new framework supersedes it (same wire endpoint, plus per-table opt-in,
async fire-and-forget, bounded timeout/retry, snapshot-summary payload).
Resolution:
  - Replaced @Autowired OptimizerTableStatsClient with
    Optional<PostCommitDispatcher> in IcebergSnapshotsServiceImpl.
  - Removed services/tables/.../config/OptimizerTableStatsClient.java
    and the dead cluster.optimizer.base-uri config field on
    ClusterProperties.java (no other consumers).
  - Replaced the @MockBean OptimizerTableStatsClient + verify()
    in IcebergSnapshotsServiceTest.testTableCreated with the
    testPostCommitDispatcherInvokedAfterSuccessfulCommit case from
    postcommit-stats.

Build + the 22 affected tests (PostCommitDispatcherTest,
OptimizerStatsPostCommitOperationTest, IcebergSnapshotsServiceTest,
TableDtoMappingTest) all pass on the merged tree.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The on-commit stats hook used to be gated by cluster.optimizer.base-uri
and pushed a minimal payload (databaseName/tableName/properties only).
The merge replaces it with the PostCommitOperation framework, which is
opt-in per table and pushes the full snapshot summary + snapshot id.

Update the demo to exercise the new path:

  * docker-compose: set TABLES_POSTCOMMIT_ENABLED=true,
    OPTIMIZER_STATS_ENABLED=true, OPTIMIZER_STATS_BASE_URI (replaces
    CLUSTER_OPTIMIZER_BASE_URI, which is no longer read by anything).
  * CREATE TABLE in demo-setup.sh now sets
    maintenance.optimizer.stats.enabled=true alongside the OFD flag,
    so each demo table is opted in to the stats push.
  * Stats verification is now a bounded poll (30s) because the push is
    async fire-and-forget. After settling, the check also asserts the
    new payload shape: a sample table's stats row carries non-zero
    numCurrentFiles, non-zero tableSizeBytes, and the snapshotId.

End result: the demo still passes if and only if every committed table
generated a stats row, and the row actually contains snapshot stats
(not just identity fields).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant