Skip to content

(wip | no review) feat(tables): post-commit stats push to optimizer#608

Draft
mkuchenbecker wants to merge 5 commits into
mkuchenb/optimizer-4from
mkuchenb/optimizer-postcommit-stats
Draft

(wip | no review) feat(tables): post-commit stats push to optimizer#608
mkuchenbecker wants to merge 5 commits into
mkuchenb/optimizer-4from
mkuchenb/optimizer-postcommit-stats

Conversation

@mkuchenbecker
Copy link
Copy Markdown
Collaborator

@mkuchenbecker mkuchenbecker commented May 27, 2026

Summary

After a successful Iceberg snapshot commit, the tables service runs registered
post-commit operations against the freshly-saved TableDto. The first operation is
the optimizer stats push (PUT /v1/optimizer/stats/{tableUuid}), opting in per table
via maintenance.optimizer.stats.enabled=true and gated globally by
tables.postcommit.enabled (default false).

The framework is one generic dispatcher; individual operations describe payload +
endpoint and let the dispatcher own async / timeout / error swallow / metrics.

  • Per-op wall-clock budget enforced by PostCommitDispatcher
    (tables.postcommit.per-op-timeout-ms, default 3000 ms).
  • Best-effort: any operation error is recorded as a Micrometer counter + warn log,
    never propagated to the commit caller.
  • No HDFS round-trip — stats sourced from Snapshot.summary() already in memory at
    load time, captured on TableDto.currentSnapshot as Optional<CurrentSnapshotInfo>
    (snapshot id + summary).
  • The OptimizerStatsPostCommitOperation retains its own per-attempt timeout +
    bounded retry on retryable errors (network, 408, 429, 5xx, TimeoutException).

Why async (the explicit justification)

A synchronous post-commit hook converts an optimizer outage into a tables-service
write outage. The stats push is a best-effort scheduling signal, not a
write-correctness step — its blast radius must not include the write path. The
crash-loss window (JVM dies between commit and HTTP push completion) is acceptable
because operations are cumulative: the next commit re-pushes the current state, and
the consumer self-corrects from missing data.

The complexity stays bounded by living in one place (PostCommitDispatcher). New
operations register without re-deriving async plumbing; reviewers reason about
async semantics once.

Configuration

Property Default Notes
tables.postcommit.enabled false Master switch for the dispatcher; per-instance rollout.
tables.postcommit.per-op-timeout-ms 3000 Outer wall-clock ceiling per operation.
optimizer.stats.enabled false Master switch for the optimizer-stats operation bean.
optimizer.stats.base-uri (unset) Required when both above are true.
optimizer.stats.per-attempt-timeout-ms 1000 Bounds each HTTP attempt inside the operation.
optimizer.stats.max-attempts 3 One initial + up to two retries.

Metrics (Micrometer, tagged op={operation name})

Key Type Tags
postcommit_op_duration timer op, outcome={success, timeout, network_error, server_error, client_error, unknown_error}
postcommit_op_skipped counter op
postcommit_op_failed counter op, outcome={timeout, network_error, server_error, client_error, prepare_threw, unknown_error}

Wiring

PostCommitDispatcher, OptimizerStatsConfig (WebClient bean), and
OptimizerStatsPostCommitOperation are each @ConditionalOnProperty — when their
flag is false no bean is constructed. IcebergSnapshotsServiceImpl consumes the
dispatcher as Optional<PostCommitDispatcher> and the on-commit hook is a literal
no-op when absent.

Follow-up

The wire body already carries snapshotId. Server-side dedupe (use the snapshot ID
as an idempotency token on upsert and reject out-of-order replays) is intentionally
deferred to a follow-up change so this PR stays focused on the framework + the
first operation.

Changes

  • Client-facing API Changes
  • Internal API Changes
  • Bug Fixes
  • New Features
  • Performance Improvements
  • Code Style
  • Refactoring
  • Documentation
  • Tests

Internal API: new services/tables/.../services/postcommit/ package
(PostCommitOperation interface, PostCommitDispatcher, PostCommitProperties);
new services/tables/.../model/CurrentSnapshotInfo; TableDto.currentSnapshot
replaces currentSnapshotSummary; new metric keys in MetricsConstant;
TableStatsController publishes BASE_PATH / TABLE_PATH_TEMPLATE constants.

New features: opt-in post-commit operation framework + the first operation
(optimizer stats push), each independently gated by config.

Refactor: optimizer-stats client recast as a PostCommitOperation impl; outer
timeout, onErrorResume, and .subscribe() removed from the operation and moved
into the dispatcher.

Tests: new PostCommitDispatcherTest (6 cases); OptimizerStatsClientTest renamed
to OptimizerStatsPostCommitOperationTest (6 cases adapted to the prepare()
contract); IcebergSnapshotsServiceTest updated; TableDtoMappingTest updated.

Testing Done

  • Manually Tested on local docker setup. Please include commands ran, and their output.
  • Added new tests for the changes made.
  • Updated existing tests to reflect the changes made.
  • No tests added or updated. Please explain why. If unsure, please feel free to ask for help.
  • Some other form of testing like staging or soak time in production. Please explain.

PostCommitDispatcherTest — 6 cases, all synchronous via decorate(...).block(BLOCK_MAX):

  • dispatch_invokesEveryRegisteredOperationsPrepare
  • decorate_emptyPrepare_incrementsSkippedAndReturnsEmpty
  • decorate_successfulWork_recordsDurationWithSuccessOutcome
  • decorate_workSignalsError_incrementsFailedWithClassifiedOutcomeAndSwallowsError
  • decorate_prepareThrowsSynchronously_incrementsFailedAndDispatchContinuesToLaterOps
  • decorate_workExceedsPerOpTimeout_incrementsFailedWithTimeoutOutcome

OptimizerStatsPostCommitOperationTest — 7 cases (1 stable-name + 6 behavior):

  • name_isStableTagValue
  • prepare_optedIn_emitsRequestWithFullPayload (asserts wire body field-for-field, including new snapshotId)
  • prepare_notOptedIn_returnsEmpty
  • prepare_noSnapshot_returnsEmpty
  • prepare_5xxThenSuccess_retriesAndCompletes
  • prepare_allAttemptsFail_propagatesUnderlyingError
  • prepare_4xxNonRetryable_doesNotRetry

IcebergSnapshotsServiceTest.testPostCommitDispatcherInvokedAfterSuccessfulCommit
verifies dispatcher.dispatch(savedDto) is invoked exactly once after a successful
save (replaces the prior OptimizerStatsClient.report assertion).

TableDtoMappingTestcurrentSnapshot added to FIELDS_UNMAPPABLE.

Local build: :services:tables:test + :services:optimizer:test BUILD SUCCESSFUL;
:services:tables:spotlessCheck, :services:common:spotlessCheck,
:services:optimizer:spotlessCheck clean.

Additional Information

  • Breaking Changes
  • Deprecations
  • Large PR broken into smaller PRs, and PR plan linked in the description.

Both tables.postcommit.enabled and optimizer.stats.enabled default to false,
so existing deployments see no behavioural change. The feature only activates when
the global dispatcher flag, the operation's flag, and the per-table opt-in property
are all set.

public static final String HTS_SEARCH_TABLES_TIME = "hts_search_tables_time";

// Optimizer post-commit stats push (Tables → Optimizer)
public static final String OPTIMIZER_STATS_DURATION = "optimizer_stats_duration";
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max duration timeout

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to POSTCOMMIT_OP_DURATION (tagged op + outcome). The timer is bounded by the dispatcher's outer tables.postcommit.per-op-timeout-ms (default 3000 ms) — comment on the constant calls that out explicitly. Latest commit.

public static final String OPTIMIZER_STATS_DURATION = "optimizer_stats_duration";
public static final String OPTIMIZER_STATS_ATTEMPTS = "optimizer_stats_attempts";
public static final String OPTIMIZER_STATS_SKIPPED = "optimizer_stats_skipped";
public static final String OPTIMIZER_STATS_FAILED_FINAL = "optimizer_stats_failed_final";
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Failed vs failed_final.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dropped _FINAL. The dispatcher now emits a single POSTCOMMIT_OP_FAILED counter tagged with the classified outcome. There is no per-attempt failure counter at the dispatcher; per-attempt retry behavior lives inside each op and is observable through that op's own logging if it needs it.


/**
* Pushes a stats record to the optimizer's {@code PUT /v1/optimizer/stats/{tableUuid}} endpoint
* after a successful Iceberg snapshot commit. Fire-and-forget — the {@link #report} call returns
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should see if we get an error. this working is ambiguous and non-useful.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rewrote the javadoc on OptimizerStatsPostCommitOperation to state the contract: prepare() returns a Mono that completes on HTTP 2xx and signals an error otherwise; the dispatcher owns timeout, subscription, error swallowing, and metric emission. Latest commit.

public class OptimizerStatsClient {

/** Per-call URL path. */
static final String PATH_TEMPLATE = "/v1/optimizer/stats/{tableUuid}";
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the canonical way to do this?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There wasn't one. Added TableStatsController.BASE_PATH and TableStatsController.TABLE_PATH_TEMPLATE as public constants on the optimizer controller (latest commit) and refactored its own @RequestMapping to use BASE_PATH. The tables-side keeps its own literal because we don't want a tables → optimizer compile dep (optimizer pulls MySQL/JPA); the comment on the tables-side constant now names TableStatsController.TABLE_PATH_TEMPLATE as the canonical source.

static final String OPT_IN_PROPERTY = "maintenance.optimizer.stats.enabled";

/** Iceberg snapshot-summary keys we read. All values are decimal-string longs. */
static final String SUMMARY_TOTAL_DATA_FILES = "total-data-files";
Copy link
Copy Markdown
Collaborator Author

@mkuchenbecker mkuchenbecker May 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. TableDto.currentSnapshot now carries a CurrentSnapshotInfo(snapshotId, summary) populated from Snapshot.snapshotId() and Snapshot.summary() at convertToTableDto. OptimizerStatsRequest.Snapshot carries snapshotId. The server-side dedupe wiring (use snapshot ID as an idempotency token on upsert and reject out-of-order replays) is intentionally deferred to a follow-up change so this PR stays focused on the framework + first operation.

Copy link
Copy Markdown
Collaborator Author

@mkuchenbecker mkuchenbecker May 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. TableDto.currentSnapshot now carries a CurrentSnapshotInfo(snapshotId, summary) populated from Snapshot.snapshotId() and Snapshot.summary() at convertToTableDto. OptimizerStatsRequest.Snapshot carries snapshotId. The server-side dedupe wiring is intentionally deferred to a follow-up change so this PR stays focused on the framework + first operation.

* </ul>
*/
public void report(TableDto saved, Map<String, String> snapshotSummary) {
reportAsync(saved, snapshotSummary).subscribe();
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why async? If the thread dies we lose the data, no? There will never be a timeout ever with this impl as we don't actually.

Help me justify this complexity given we subscribe to the result.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Justification, on record: a synchronous push converts an optimizer outage into a tables-service write outage. The push is a best-effort scheduling signal, not a write-correctness step — its blast radius must not include the write path. Crash-loss between commit and HTTP-completion is acceptable because operations are cumulative: the next commit re-pushes the current state and the consumer self-corrects from missing data.

On complexity: the async machinery now lives in one place (PostCommitDispatcher — timeout, swallow, subscribe). Operations describe payload + endpoint. Future reviewers reason about async semantics once. The optimizer-stats op itself dropped to prepare() + buildRequest() + a couple of static helpers — no .subscribe(), no outer timeout, no onErrorResume.

"There will never be a timeout" — the per-op timeout still fires; it bounds Reactor pipeline runtime (resource freeing, connection pool occupancy, the duration timer) even though the commit thread is no longer watching for it. The dispatcher's onErrorResume records outcome=timeout deterministically (covered by decorate_workExceedsPerOpTimeout_incrementsFailedWithTimeoutOutcome).

.increment();
return Mono.empty();
}
if (snapshotSummary == null || snapshotSummary.isEmpty()) {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not use null guards if this will never be null. In the case its empty we should not sent null to this function, but detect and immedtialty conver to an optional.

If it may be null, use optional instead. Apply this across the pr.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied. TableDto.currentSnapshot is now read through getCurrentSnapshot(): Optional<CurrentSnapshotInfo> and InternalRepositoryUtils converts the nullable table.currentSnapshot() at the producer. The operation's prepare() no longer has any null/empty-map guards on the summary — it does savedDto.getCurrentSnapshot().isPresent() and returns Optional.empty() if not, which the dispatcher records as postcommit_op_skipped{op=optimizer_stats}.

* </ul>
*/
public void report(TableDto saved, Map<String, String> snapshotSummary) {
reportAsync(saved, snapshotSummary).subscribe();
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is fully async timeouts can be substantially relaxed. we can have a timeout on the order of 3s overall then.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relaxed. Dropped optimizer.stats.total-timeout-ms. The dispatcher now owns the outer ceiling via tables.postcommit.per-op-timeout-ms (default 3000 ms). The op keeps its per-attempt timeout (default 1000 ms) so retries can fit inside the 3 s budget.


OptimizerStatsRequest body = buildRequest(saved, snapshotSummary);
String tableUuid = saved.getTableUUID();
Timer.Sample sample = Timer.start(meterRegistry);
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the closure capture this variable correctly?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes — sample is effectively final and captured by both lambdas; doOnSuccess and onErrorResume are mutually exclusive on a Mono, so there's no double-stop. After the refactor the Timer.Sample lives in PostCommitDispatcher.decorate(...) instead, and the same reasoning applies there. Covered by decorate_successfulWork_recordsDurationWithSuccessOutcome and decorate_workSignalsError_incrementsFailedWithClassifiedOutcomeAndSwallowsError.

/**
* Iceberg {@code Snapshot.summary()} for the current snapshot at the time the {@code TableDto}
* was constructed (post-commit on save; post-load on read). Populated only when an Iceberg {@code
* Table} is available — i.e. by {@code InternalRepositoryUtils.convertToTableDto}. Not persisted,
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't reference private impls. Reference the conditions this is or is not present.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rewrote. The javadoc now describes the semantic condition: "Present whenever the underlying table has at least one committed snapshot at construction time; absent for tables with no committed data (e.g. CREATE TABLE with no rows yet)." No mention of InternalRepositoryUtils or any other private impl.

Comment on lines +94 to +100
TableDto savedDto = openHouseInternalRepository.save(tableDtoToSave);
// Fire-and-forget push of the post-commit snapshot summary to the optimizer. Returns
// immediately; failures are swallowed inside the client. Skipped at the client when the
// table is not opted in via maintenance.optimizer.stats.enabled or when there is no current
// snapshot (e.g. CREATE TABLE without a data commit).
optimizerStatsClient.ifPresent(
client -> client.report(savedDto, savedDto.getCurrentSnapshotSummary()));
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gate this block with a config for the tables service so we can enable the check one openhouse instance at a time and not cause a problem.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — single gate at the tables-service level: tables.postcommit.enabled (default false). When false the PostCommitDispatcher bean isn't constructed; the service consumes Optional<PostCommitDispatcher> and the on-commit hook is a literal no-op. Per-OH-instance rollout is one config flip. The op also keeps its own optimizer.stats.enabled switch so we can disable one operation while leaving the framework on.

Comment on lines +36 to +42
/**
* Present only when {@code optimizer.stats.enabled=true}. When absent, no post-commit push is
* attempted.
*/
@Autowired(required = false)
Optional<OptimizerStatsClient> optimizerStatsClient = Optional.empty();

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be modeled as a postcommit operation. Postcommit operations must be bounded and async and are considered best-effort. We ahve one postcommit operation registered to the service which is optimizer stats. That way we have a generic behaviour and a specific impl.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. New services/tables/.../services/postcommit/ package: PostCommitOperation interface (name() + Optional<Mono<Void>> prepare(TableDto)), PostCommitProperties (enabled, perOpTimeoutMs), PostCommitDispatcher (per-op timeout, error swallow, metric emission, fire-and-forget subscribe). OptimizerStatsPostCommitOperation is the first registered impl. IcebergSnapshotsServiceImpl now has one line: postCommitDispatcher.ifPresent(d -> d.dispatch(savedDto)). Adding a future operation = implement the interface and let @ConditionalOnProperty wire it; no service-layer change.

mkuchenbecker added a commit that referenced this pull request May 27, 2026
Restructures the post-commit stats push around a small generic dispatcher
so async/timeout/swallow plumbing lives in one place. Addresses the
review comments on PR #608.

Framework (new):
- PostCommitOperation interface: String name() + Optional<Mono<Void>>
  prepare(TableDto). Implementations describe what to do; the dispatcher
  owns how it runs.
- PostCommitDispatcher (@ConditionalOnProperty tables.postcommit.enabled):
  per-op wall-clock timeout, error swallow, metric emission, fire-and-
  forget subscribe. Exposes package-private decorate() for synchronous
  testing.
- PostCommitProperties: enabled (default false), per-op-timeout-ms
  (default 3000).
- Metric keys renamed: POSTCOMMIT_OP_DURATION / POSTCOMMIT_OP_SKIPPED /
  POSTCOMMIT_OP_FAILED, tagged op={name} and outcome={success, timeout,
  network_error, server_error, client_error, prepare_threw,
  unknown_error}.

Why async: a synchronous post-commit push converts an optimizer outage
into a tables write outage. Stats are a best-effort scheduling signal
and must not block the write path. Crash-loss is bounded because state
is cumulative: the next commit re-pushes the current state.

Operation (refactor):
- OptimizerStatsClient -> OptimizerStatsPostCommitOperation implementing
  PostCommitOperation. Drops outer timeout, onErrorResume, .subscribe()
  (dispatcher owns these). Keeps per-attempt timeout + bounded retry on
  retryable errors only.
- Adds snapshotId to OptimizerStatsRequest.Snapshot; tracked for
  server-side idempotency wiring in BDP-102985.
- Path constant comment now cites TableStatsController.TABLE_PATH_TEMPLATE
  as the canonical source.

TableDto:
- currentSnapshotSummary (Map) replaced by Optional<CurrentSnapshotInfo>
  (snapshotId + summary). Stored nullable internally; consumers read
  through getCurrentSnapshot() which returns Optional. Javadoc rewritten
  to describe the semantic condition (presence ~ table has a committed
  snapshot), not the private impl that populates it.
- New CurrentSnapshotInfo value class.
- InternalRepositoryUtils populates the new field with both snapshotId
  and summary; no extra I/O (still purely in-memory).

Service:
- IcebergSnapshotsServiceImpl: Optional<OptimizerStatsClient> replaced
  by Optional<PostCommitDispatcher>; on-commit hook is one line.

Optimizer side:
- TableStatsController publishes BASE_PATH and TABLE_PATH_TEMPLATE as
  public constants; @RequestMapping refactored to use BASE_PATH.

Config:
- application.properties: drop optimizer.stats.total-timeout-ms; add
  tables.postcommit.enabled + tables.postcommit.per-op-timeout-ms.

Tests:
- New PostCommitDispatcherTest (6 cases, no sleeps; uses decorate() to
  block synchronously).
- OptimizerStatsClientTest renamed -> OptimizerStatsPostCommitOperationTest
  (6 cases adapted to the new prepare() contract; timeout case moved to
  PostCommitDispatcherTest).
- IcebergSnapshotsServiceTest swaps @MockBean OptimizerStatsClient for
  @MockBean PostCommitDispatcher and verifies dispatch(savedDto) once.
- TableDtoMappingTest updated for the renamed field.
- All :services:tables and :services:optimizer tests pass; spotless
  clean.

Filed: BDP-102985 (Optimizer stats: use snapshot ID as idempotency
token on upsert) under epic BDP-102026.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* Map a terminal error to a small set of outcome tags. Kept here so all operations share the same
* taxonomy.
*/
private static String classifyOutcome(Throwable e) {
Copy link
Copy Markdown
Collaborator Author

@mkuchenbecker mkuchenbecker May 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this entire thing is stupid, remove it and report the error type / code. everyone knows 500 are sys error and 400 are user.

cbb330
cbb330 previously requested changes May 27, 2026
Copy link
Copy Markdown
Collaborator

@cbb330 cbb330 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets pull from the kafka events emitted from the tableauditevent or CDC stream or poll the database periodically for last-updated

otherwise we're editting core openhouse commit() logic to plug table optimizer. this doesn't look right

@mkuchenbecker mkuchenbecker requested a review from cbb330 May 27, 2026 22:32
@mkuchenbecker
Copy link
Copy Markdown
Collaborator Author

@cbb330 this is for testing. NRT stats is the official design. I put up draft PRs for review and tell claude what to fix.

@mkuchenbecker mkuchenbecker dismissed cbb330’s stale review May 27, 2026 22:35

This PR is not in-review, is for testing, and there is no design for this.

@mkuchenbecker mkuchenbecker changed the title feat(tables): post-commit stats push to optimizer (wip) feat(tables): post-commit stats push to optimizer May 27, 2026
@mkuchenbecker mkuchenbecker changed the title (wip) feat(tables): post-commit stats push to optimizer (wip | no review) feat(tables): post-commit stats push to optimizer May 27, 2026
mkuchenbecker and others added 5 commits May 27, 2026 17:39
After a successful Iceberg snapshot commit, push the table's current
snapshot summary to the optimizer's PUT /v1/optimizer/stats/{tableUuid}
endpoint. Opt-in per table via the maintenance.optimizer.stats.enabled
table property; globally gated by optimizer.stats.enabled in
application.properties (default false).

Implementation notes:
- Stats sourced from the in-memory Snapshot.summary() map captured at
  convertToTableDto(); no extra HDFS round-trip on the commit path.
- WebClient + Reactor pipeline runs fire-and-forget on the Netty event
  loop. The hook on the commit thread is .subscribe() and returns
  immediately, so a slow/down optimizer cannot impact commit latency.
- Per-attempt timeout 1000 ms, total budget 2000 ms, up to 3 attempts.
  Retries only fire on retryable errors (network, 408, 429, 5xx,
  TimeoutException). All terminal errors are swallowed; Micrometer
  counters and a warn log preserve observability.
- Bean wiring (WebClient, client component) is @ConditionalOnProperty
  on optimizer.stats.enabled=true. The service consumes the client as
  an Optional, so disabled deployments construct nothing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Restructures the post-commit stats push around a small generic dispatcher
so async/timeout/swallow plumbing lives in one place. Addresses the
review comments on PR #608.

Framework (new):
- PostCommitOperation interface: String name() + Optional<Mono<Void>>
  prepare(TableDto). Implementations describe what to do; the dispatcher
  owns how it runs.
- PostCommitDispatcher (@ConditionalOnProperty tables.postcommit.enabled):
  per-op wall-clock timeout, error swallow, metric emission, fire-and-
  forget subscribe. Exposes package-private decorate() for synchronous
  testing.
- PostCommitProperties: enabled (default false), per-op-timeout-ms
  (default 3000).
- Metric keys renamed: POSTCOMMIT_OP_DURATION / POSTCOMMIT_OP_SKIPPED /
  POSTCOMMIT_OP_FAILED, tagged op={name} and outcome={success, timeout,
  network_error, server_error, client_error, prepare_threw,
  unknown_error}.

Why async: a synchronous post-commit push converts an optimizer outage
into a tables write outage. Stats are a best-effort scheduling signal
and must not block the write path. Crash-loss is bounded because state
is cumulative: the next commit re-pushes the current state.

Operation (refactor):
- OptimizerStatsClient -> OptimizerStatsPostCommitOperation implementing
  PostCommitOperation. Drops outer timeout, onErrorResume, .subscribe()
  (dispatcher owns these). Keeps per-attempt timeout + bounded retry on
  retryable errors only.
- Adds snapshotId to OptimizerStatsRequest.Snapshot; tracked for
  server-side idempotency wiring in BDP-102985.
- Path constant comment now cites TableStatsController.TABLE_PATH_TEMPLATE
  as the canonical source.

TableDto:
- currentSnapshotSummary (Map) replaced by Optional<CurrentSnapshotInfo>
  (snapshotId + summary). Stored nullable internally; consumers read
  through getCurrentSnapshot() which returns Optional. Javadoc rewritten
  to describe the semantic condition (presence ~ table has a committed
  snapshot), not the private impl that populates it.
- New CurrentSnapshotInfo value class.
- InternalRepositoryUtils populates the new field with both snapshotId
  and summary; no extra I/O (still purely in-memory).

Service:
- IcebergSnapshotsServiceImpl: Optional<OptimizerStatsClient> replaced
  by Optional<PostCommitDispatcher>; on-commit hook is one line.

Optimizer side:
- TableStatsController publishes BASE_PATH and TABLE_PATH_TEMPLATE as
  public constants; @RequestMapping refactored to use BASE_PATH.

Config:
- application.properties: drop optimizer.stats.total-timeout-ms; add
  tables.postcommit.enabled + tables.postcommit.per-op-timeout-ms.

Tests:
- New PostCommitDispatcherTest (6 cases, no sleeps; uses decorate() to
  block synchronously).
- OptimizerStatsClientTest renamed -> OptimizerStatsPostCommitOperationTest
  (6 cases adapted to the new prepare() contract; timeout case moved to
  PostCommitDispatcherTest).
- IcebergSnapshotsServiceTest swaps @MockBean OptimizerStatsClient for
  @MockBean PostCommitDispatcher and verifies dispatch(savedDto) once.
- TableDtoMappingTest updated for the renamed field.
- All :services:tables and :services:optimizer tests pass; spotless
  clean.

Filed: BDP-102985 (Optimizer stats: use snapshot ID as idempotency
token on upsert) under epic BDP-102026.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Per review feedback that the HTML tags made the diff ugly. Drop
paragraph breaks where the comment can be one sentence/paragraph,
drop bullet lists in favor of inline prose, drop <em>/<b> emphasis
entirely. No behavior change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Spotless's googlejavaformat rewrites multi-paragraph block javadoc to
re-insert <p> tags. // comments are left untouched. Per review
feedback that the source should read like prose with paragraph
breaks, not like a webpage, swap all multi-paragraph descriptions on
the new files to // blocks. Single-line descriptors that were javadoc
become a single // line above the declaration. Complete sentences
throughout. No behavior change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Server-side idempotency wiring is a follow-up; the comment now states
that without naming an internal tracker.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@mkuchenbecker mkuchenbecker force-pushed the mkuchenb/optimizer-postcommit-stats branch from 13f1140 to ecc65f8 Compare May 28, 2026 00:40
@mkuchenbecker mkuchenbecker changed the base branch from main to mkuchenb/optimizer-4 May 28, 2026 00:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants