From 6e2669ba5ca18ab88d8d5972ff7ce16240a7b8ab Mon Sep 17 00:00:00 2001 From: mkuchenbecker Date: Wed, 27 May 2026 13:26:06 -0700 Subject: [PATCH 1/5] feat(tables): post-commit stats push to optimizer After a successful Iceberg snapshot commit, push the table's current snapshot summary to the optimizer's PUT /v1/optimizer/stats/{tableUuid} endpoint. Opt-in per table via the maintenance.optimizer.stats.enabled table property; globally gated by optimizer.stats.enabled in application.properties (default false). Implementation notes: - Stats sourced from the in-memory Snapshot.summary() map captured at convertToTableDto(); no extra HDFS round-trip on the commit path. - WebClient + Reactor pipeline runs fire-and-forget on the Netty event loop. The hook on the commit thread is .subscribe() and returns immediately, so a slow/down optimizer cannot impact commit latency. - Per-attempt timeout 1000 ms, total budget 2000 ms, up to 3 attempts. Retries only fire on retryable errors (network, 408, 429, 5xx, TimeoutException). All terminal errors are swallowed; Micrometer counters and a warn log preserve observability. - Bean wiring (WebClient, client component) is @ConditionalOnProperty on optimizer.stats.enabled=true. The service consumes the client as an Optional, so disabled deployments construct nothing. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../common/metrics/MetricsConstant.java | 6 + services/tables/build.gradle | 1 + .../openhouse/tables/model/TableDto.java | 13 + .../impl/InternalRepositoryUtils.java | 2 + .../services/IcebergSnapshotsServiceImpl.java | 17 +- .../optimizer/OptimizerStatsClient.java | 245 ++++++++++++++++++ .../optimizer/OptimizerStatsConfig.java | 35 +++ .../optimizer/OptimizerStatsProperties.java | 43 +++ .../optimizer/OptimizerStatsRequest.java | 70 +++++ .../src/main/resources/application.properties | 10 +- .../service/IcebergSnapshotsServiceTest.java | 37 +++ .../tables/model/TableDtoMappingTest.java | 3 +- .../optimizer/OptimizerStatsClientTest.java | 242 +++++++++++++++++ 13 files changed, 721 insertions(+), 3 deletions(-) create mode 100644 services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsClient.java create mode 100644 services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsConfig.java create mode 100644 services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsProperties.java create mode 100644 services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsRequest.java create mode 100644 services/tables/src/test/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsClientTest.java diff --git a/services/common/src/main/java/com/linkedin/openhouse/common/metrics/MetricsConstant.java b/services/common/src/main/java/com/linkedin/openhouse/common/metrics/MetricsConstant.java index baf65c627..7a47b790e 100644 --- a/services/common/src/main/java/com/linkedin/openhouse/common/metrics/MetricsConstant.java +++ b/services/common/src/main/java/com/linkedin/openhouse/common/metrics/MetricsConstant.java @@ -63,4 +63,10 @@ private MetricsConstant() {} public static final String HTS_LIST_TABLES_REQUEST = "hts_list_tables_request"; public static final String HTS_LIST_TABLES_TIME = "hts_list_tables_time"; public static final String HTS_SEARCH_TABLES_TIME = "hts_search_tables_time"; + + // Optimizer post-commit stats push (Tables → Optimizer) + public static final String OPTIMIZER_STATS_DURATION = "optimizer_stats_duration"; + public static final String OPTIMIZER_STATS_ATTEMPTS = "optimizer_stats_attempts"; + public static final String OPTIMIZER_STATS_SKIPPED = "optimizer_stats_skipped"; + public static final String OPTIMIZER_STATS_FAILED_FINAL = "optimizer_stats_failed_final"; } diff --git a/services/tables/build.gradle b/services/tables/build.gradle index c85a57131..da42c32ef 100644 --- a/services/tables/build.gradle +++ b/services/tables/build.gradle @@ -44,6 +44,7 @@ dependencies { testImplementation 'org.junit.jupiter:junit-jupiter-engine:' + junit_version testImplementation 'org.springframework.security:spring-security-test:5.7.3' testImplementation 'org.springframework:spring-context-support:5.3.18' + testImplementation 'com.squareup.okhttp3:mockwebserver:4.10.0' testImplementation(testFixtures(project(':services:common'))) testImplementation (project(':tables-test-fixtures:tables-test-fixtures_2.12')) { exclude group: 'com.linkedin.iceberg' diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/model/TableDto.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/model/TableDto.java index 917c632d8..24373f4a1 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/model/TableDto.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/model/TableDto.java @@ -16,6 +16,7 @@ import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.IdClass; +import javax.persistence.Transient; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Builder; @@ -83,6 +84,18 @@ public class TableDto { private boolean replaceCommit; + /** + * Iceberg {@code Snapshot.summary()} for the current snapshot at the time the {@code TableDto} + * was constructed (post-commit on save; post-load on read). Populated only when an Iceberg {@code + * Table} is available — i.e. by {@code InternalRepositoryUtils.convertToTableDto}. Not persisted, + * not part of equality. + * + *

Used downstream by the optimizer post-commit stats push (see {@code + * services.optimizer.OptimizerStatsClient}) so that the service layer can read snapshot stats + * without a separate HDFS round-trip. + */ + @Transient @EqualsAndHashCode.Exclude private Map currentSnapshotSummary; + /** * Bundling eligible string type field into a map as {@link org.mapstruct.Mapper} doesn't provide * easy interface to achieve so. diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java index 0dde039c0..9ba40c2d8 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java @@ -135,6 +135,8 @@ static TableDto convertToTableDto( .jsonSnapshots(null) .tableProperties(megaProps) .sortOrder(SortOrderParser.toJson(table.sortOrder())) + .currentSnapshotSummary( + table.currentSnapshot() == null ? null : table.currentSnapshot().summary()) .build(); return tableDto; diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/IcebergSnapshotsServiceImpl.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/IcebergSnapshotsServiceImpl.java index af43a169e..bb2cbde4c 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/IcebergSnapshotsServiceImpl.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/IcebergSnapshotsServiceImpl.java @@ -10,6 +10,7 @@ import com.linkedin.openhouse.tables.model.TableDto; import com.linkedin.openhouse.tables.model.TableDtoPrimaryKey; import com.linkedin.openhouse.tables.repository.OpenHouseInternalRepository; +import com.linkedin.openhouse.tables.services.optimizer.OptimizerStatsClient; import com.linkedin.openhouse.tables.utils.AuthorizationUtils; import com.linkedin.openhouse.tables.utils.TableUUIDGenerator; import java.util.Optional; @@ -32,6 +33,13 @@ public class IcebergSnapshotsServiceImpl implements IcebergSnapshotsService { @Autowired AuthorizationUtils authorizationUtils; + /** + * Present only when {@code optimizer.stats.enabled=true}. When absent, no post-commit push is + * attempted. + */ + @Autowired(required = false) + Optional optimizerStatsClient = Optional.empty(); + @Override public Pair putIcebergSnapshots( String databaseId, @@ -83,7 +91,14 @@ public Pair putIcebergSnapshots( databaseId, tableCreatorUpdater, Privileges.CREATE_TABLE); } try { - return Pair.of(openHouseInternalRepository.save(tableDtoToSave), !tableDto.isPresent()); + TableDto savedDto = openHouseInternalRepository.save(tableDtoToSave); + // Fire-and-forget push of the post-commit snapshot summary to the optimizer. Returns + // immediately; failures are swallowed inside the client. Skipped at the client when the + // table is not opted in via maintenance.optimizer.stats.enabled or when there is no current + // snapshot (e.g. CREATE TABLE without a data commit). + optimizerStatsClient.ifPresent( + client -> client.report(savedDto, savedDto.getCurrentSnapshotSummary())); + return Pair.of(savedDto, !tableDto.isPresent()); } catch (BadRequestException e) { throw new RequestValidationFailureException(e.getMessage(), e); } catch (CommitFailedException ce) { diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsClient.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsClient.java new file mode 100644 index 000000000..23c69082e --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsClient.java @@ -0,0 +1,245 @@ +package com.linkedin.openhouse.tables.services.optimizer; + +import com.linkedin.openhouse.common.metrics.MetricsConstant; +import com.linkedin.openhouse.tables.model.TableDto; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Timer; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.TimeoutException; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClientRequestException; +import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; +import reactor.util.retry.RetrySpec; + +/** + * Pushes a stats record to the optimizer's {@code PUT /v1/optimizer/stats/{tableUuid}} endpoint + * after a successful Iceberg snapshot commit. Fire-and-forget — the {@link #report} call returns + * immediately on the commit thread; the HTTP exchange runs on the WebClient's Netty event loop. + * + *

Errors are recorded as Micrometer metrics and logged at {@code warn} level, but never + * propagated. A push failure must not break the commit. + * + *

Timeout / retry shape (from {@link OptimizerStatsProperties}): + * + *

+ * + *

The bean is only wired when {@code optimizer.stats.enabled=true}. + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "optimizer.stats", name = "enabled", havingValue = "true") +public class OptimizerStatsClient { + + /** Per-call URL path. */ + static final String PATH_TEMPLATE = "/v1/optimizer/stats/{tableUuid}"; + + /** Table-property key that opts a table in for the post-commit stats push. */ + static final String OPT_IN_PROPERTY = "maintenance.optimizer.stats.enabled"; + + /** Iceberg snapshot-summary keys we read. All values are decimal-string longs. */ + static final String SUMMARY_TOTAL_DATA_FILES = "total-data-files"; + + static final String SUMMARY_TOTAL_FILES_SIZE = "total-files-size"; + static final String SUMMARY_ADDED_DATA_FILES = "added-data-files"; + static final String SUMMARY_DELETED_DATA_FILES = "deleted-data-files"; + static final String SUMMARY_ADDED_FILES_SIZE = "added-files-size"; + static final String SUMMARY_REMOVED_FILES_SIZE = "removed-files-size"; + + private final WebClient webClient; + private final MeterRegistry meterRegistry; + private final OptimizerStatsProperties properties; + + public OptimizerStatsClient( + @Qualifier("optimizerStatsWebClient") WebClient webClient, + MeterRegistry meterRegistry, + OptimizerStatsProperties properties) { + this.webClient = webClient; + this.meterRegistry = meterRegistry; + this.properties = properties; + } + + /** + * Build and dispatch a stats push for the just-committed table. Returns immediately; the HTTP + * call runs in the background. {@code snapshotSummary} is the in-memory {@code + * Snapshot.summary()} map from the latest snapshot (no HDFS round-trip). + * + *

The call is skipped (and a {@code skipped} counter is emitted) when: + * + *

+ */ + public void report(TableDto saved, Map snapshotSummary) { + reportAsync(saved, snapshotSummary).subscribe(); + } + + /** + * Same as {@link #report} but returns the underlying {@link Mono} so callers (notably tests) can + * block on completion. Production callers use {@link #report}. + */ + Mono reportAsync(TableDto saved, Map snapshotSummary) { + if (!isOptedIn(saved)) { + meterRegistry + .counter(MetricsConstant.OPTIMIZER_STATS_SKIPPED, "reason", "opt_out") + .increment(); + return Mono.empty(); + } + if (snapshotSummary == null || snapshotSummary.isEmpty()) { + meterRegistry + .counter(MetricsConstant.OPTIMIZER_STATS_SKIPPED, "reason", "no_snapshot") + .increment(); + return Mono.empty(); + } + + OptimizerStatsRequest body = buildRequest(saved, snapshotSummary); + String tableUuid = saved.getTableUUID(); + Timer.Sample sample = Timer.start(meterRegistry); + + RetrySpec retrySpec = + Retry.max(Math.max(0, properties.getMaxAttempts() - 1)) + .filter(this::isRetryable) + .onRetryExhaustedThrow((spec, signal) -> signal.failure()); + + return webClient + .put() + .uri(PATH_TEMPLATE, tableUuid) + .bodyValue(body) + .retrieve() + .toBodilessEntity() + .timeout(Duration.ofMillis(properties.getPerAttemptTimeoutMs())) + .doOnError( + e -> + meterRegistry + .counter( + MetricsConstant.OPTIMIZER_STATS_ATTEMPTS, + "outcome", + isRetryable(e) ? "retryable_failure" : "non_retryable_failure") + .increment()) + .doOnSuccess( + ignored -> + meterRegistry + .counter(MetricsConstant.OPTIMIZER_STATS_ATTEMPTS, "outcome", "success") + .increment()) + .retryWhen(retrySpec) + .timeout(Duration.ofMillis(properties.getTotalTimeoutMs())) + .doOnSuccess( + ignored -> + sample.stop( + meterRegistry.timer( + MetricsConstant.OPTIMIZER_STATS_DURATION, "outcome", "success"))) + .onErrorResume( + e -> { + String outcome = classifyOutcome(e); + sample.stop( + meterRegistry.timer( + MetricsConstant.OPTIMIZER_STATS_DURATION, "outcome", outcome)); + meterRegistry + .counter(MetricsConstant.OPTIMIZER_STATS_FAILED_FINAL, "outcome", outcome) + .increment(); + log.warn( + "Optimizer stats push failed for table {} ({}): {}", + tableUuid, + outcome, + e.toString()); + return Mono.empty(); + }) + .then(); + } + + /** {@code true} iff the table's properties contain the literal opt-in value {@code "true"}. */ + private boolean isOptedIn(TableDto saved) { + Map props = saved.getTableProperties(); + return props != null && "true".equals(props.get(OPT_IN_PROPERTY)); + } + + /** Build the wire body. Missing summary keys default to 0L. */ + OptimizerStatsRequest buildRequest(TableDto saved, Map summary) { + OptimizerStatsRequest.Snapshot snapshot = + OptimizerStatsRequest.Snapshot.builder() + .tableVersion(saved.getTableVersion()) + .tableLocation(saved.getTableLocation()) + .tableSizeBytes(longOrZero(summary, SUMMARY_TOTAL_FILES_SIZE)) + .numCurrentFiles(longOrZero(summary, SUMMARY_TOTAL_DATA_FILES)) + .build(); + OptimizerStatsRequest.Delta delta = + OptimizerStatsRequest.Delta.builder() + .numFilesAdded(longOrZero(summary, SUMMARY_ADDED_DATA_FILES)) + .numFilesDeleted(longOrZero(summary, SUMMARY_DELETED_DATA_FILES)) + .addedSizeBytes(longOrZero(summary, SUMMARY_ADDED_FILES_SIZE)) + .deletedSizeBytes(longOrZero(summary, SUMMARY_REMOVED_FILES_SIZE)) + .build(); + return OptimizerStatsRequest.builder() + .databaseName(saved.getDatabaseId()) + .tableName(saved.getTableId()) + .stats(OptimizerStatsRequest.Stats.builder().snapshot(snapshot).delta(delta).build()) + .tableProperties(saved.getTableProperties()) + .build(); + } + + /** + * Retryable errors: per-attempt timeout, network-level failures, 5xx, 408, 429. Other 4xx are + * client errors — retrying won't fix them. + */ + boolean isRetryable(Throwable e) { + if (e instanceof TimeoutException) { + return true; + } + if (e instanceof WebClientRequestException) { + return true; + } + if (e instanceof WebClientResponseException) { + int code = ((WebClientResponseException) e).getStatusCode().value(); + return code == 408 || code == 429 || (code >= 500 && code < 600); + } + return false; + } + + /** + * Map a final-stage error to one of {@code success, timeout, network_error, server_error, + * client_error, unknown_error} for the {@code outcome} tag on duration / failed_final metrics. + */ + private String classifyOutcome(Throwable e) { + if (e instanceof TimeoutException) { + return "timeout"; + } + if (e instanceof WebClientRequestException) { + return "network_error"; + } + if (e instanceof WebClientResponseException) { + int code = ((WebClientResponseException) e).getStatusCode().value(); + if (code >= 500 && code < 600) { + return "server_error"; + } + if (code >= 400 && code < 500) { + return "client_error"; + } + } + return "unknown_error"; + } + + private static long longOrZero(Map m, String key) { + String v = m.get(key); + if (v == null) { + return 0L; + } + try { + return Long.parseLong(v); + } catch (NumberFormatException nfe) { + return 0L; + } + } +} diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsConfig.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsConfig.java new file mode 100644 index 000000000..bbddbce6b --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsConfig.java @@ -0,0 +1,35 @@ +package com.linkedin.openhouse.tables.services.optimizer; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.netty.http.client.HttpClient; + +/** + * Wiring for the post-commit Tables → Optimizer stats push. Only active when {@code + * optimizer.stats.enabled=true}; otherwise no WebClient or client bean is constructed and the + * on-commit hook in {@code IcebergSnapshotsServiceImpl} is a no-op. + */ +@Configuration +@EnableConfigurationProperties(OptimizerStatsProperties.class) +@ConditionalOnProperty(prefix = "optimizer.stats", name = "enabled", havingValue = "true") +public class OptimizerStatsConfig { + + /** + * Dedicated WebClient for the optimizer stats endpoint. Per-attempt and outer timeouts are + * applied at the call site on the Reactor chain in {@link OptimizerStatsClient} — they are not + * configured on the underlying Netty client so that the timeout always emerges as a standard + * {@link java.util.concurrent.TimeoutException} (not a Netty {@code ReadTimeoutException}), which + * keeps the client's outcome classification simple. + */ + @Bean("optimizerStatsWebClient") + public WebClient optimizerStatsWebClient(OptimizerStatsProperties properties) { + return WebClient.builder() + .baseUrl(properties.getBaseUri()) + .clientConnector(new ReactorClientHttpConnector(HttpClient.create())) + .build(); + } +} diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsProperties.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsProperties.java new file mode 100644 index 000000000..7d746ee93 --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsProperties.java @@ -0,0 +1,43 @@ +package com.linkedin.openhouse.tables.services.optimizer; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * Configuration for the post-commit stats push from the Tables Service to the Optimizer's stats + * endpoint. Property prefix: {@code optimizer.stats}. + * + *

When {@code enabled} is false (the default), no client bean is constructed and the push is a + * no-op on every commit. Environments that want the feed turn it on and set {@link #baseUri}. + */ +@ConfigurationProperties("optimizer.stats") +@Getter +@Setter +public class OptimizerStatsProperties { + + /** Master switch. When {@code false}, no HTTP push is attempted and no client bean is wired. */ + private boolean enabled = false; + + /** + * Base URI of the Optimizer service. Path {@code /v1/optimizer/stats/{tableUuid}} is appended at + * call time. No default — required when {@link #enabled} is {@code true}. + */ + private String baseUri; + + /** Per-attempt request timeout in milliseconds. Default 1000. */ + private long perAttemptTimeoutMs = 1000L; + + /** + * Hard ceiling on total wall-clock time for the entire call (including retries) in milliseconds. + * Default 2000. When the outer timeout fires, the chain is cancelled and the error is swallowed. + */ + private long totalTimeoutMs = 2000L; + + /** + * Total attempt count (initial try plus retries). Default 3. With 1000 ms per attempt and a 2000 + * ms outer ceiling, only about two attempts realistically fit on a slow path; the third is + * available if attempts return quickly. + */ + private int maxAttempts = 3; +} diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsRequest.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsRequest.java new file mode 100644 index 000000000..68a8646da --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsRequest.java @@ -0,0 +1,70 @@ +package com.linkedin.openhouse.tables.services.optimizer; + +import com.fasterxml.jackson.annotation.JsonInclude; +import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Wire body for {@code PUT /v1/optimizer/stats/{tableUuid}}. Mirrors the optimizer's {@code + * UpsertTableStatsRequest} field-for-field — see {@code + * services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/UpsertTableStatsRequest.java}. + * + *

Tables service owns its own copy so that the wire contract is explicit at the call site and + * the optimizer client jar is not a compile-time dependency. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@JsonInclude(JsonInclude.Include.NON_NULL) +public class OptimizerStatsRequest { + + private String databaseName; + private String tableName; + private Stats stats; + private Map tableProperties; + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @JsonInclude(JsonInclude.Include.NON_NULL) + public static class Stats { + private Snapshot snapshot; + private Delta delta; + } + + /** + * Point-in-time snapshot metrics. Map to optimizer's {@code + * TableStatsPayload.SnapshotMetricsDto}. + */ + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @JsonInclude(JsonInclude.Include.NON_NULL) + public static class Snapshot { + private String tableVersion; + private String tableLocation; + private Long tableSizeBytes; + private Long numCurrentFiles; + } + + /** + * Per-commit incremental counters. Map to optimizer's {@code TableStatsPayload.CommitDeltaDto}. + */ + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @JsonInclude(JsonInclude.Include.NON_NULL) + public static class Delta { + private Long numFilesAdded; + private Long numFilesDeleted; + private Long addedSizeBytes; + private Long deletedSizeBytes; + } +} diff --git a/services/tables/src/main/resources/application.properties b/services/tables/src/main/resources/application.properties index 4a6c82a89..ddf4c3022 100644 --- a/services/tables/src/main/resources/application.properties +++ b/services/tables/src/main/resources/application.properties @@ -23,4 +23,12 @@ management.metrics.distribution.maximum-expected-value.catalog_metadata_retrieva management.metrics.distribution.maximum-expected-value.catalog_metadata_update_latency=600s management.metrics.distribution.maximum-expected-value.http.server.requests=600s server.shutdown=graceful -spring.lifecycle.timeout-per-shutdown-phase=60s \ No newline at end of file +spring.lifecycle.timeout-per-shutdown-phase=60s + +# Post-commit Tables -> Optimizer stats push. +# Disabled by default; environments that run the optimizer opt in via env vars. +optimizer.stats.enabled=${OPTIMIZER_STATS_ENABLED:false} +optimizer.stats.base-uri=${OPTIMIZER_STATS_BASE_URI:} +optimizer.stats.per-attempt-timeout-ms=${OPTIMIZER_STATS_PER_ATTEMPT_TIMEOUT_MS:1000} +optimizer.stats.total-timeout-ms=${OPTIMIZER_STATS_TOTAL_TIMEOUT_MS:2000} +optimizer.stats.max-attempts=${OPTIMIZER_STATS_MAX_ATTEMPTS:3} \ No newline at end of file diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/service/IcebergSnapshotsServiceTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/service/IcebergSnapshotsServiceTest.java index 44467b705..80f9233af 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/service/IcebergSnapshotsServiceTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/service/IcebergSnapshotsServiceTest.java @@ -15,7 +15,10 @@ import com.linkedin.openhouse.tables.model.TableDtoPrimaryKey; import com.linkedin.openhouse.tables.repository.OpenHouseInternalRepository; import com.linkedin.openhouse.tables.services.IcebergSnapshotsService; +import com.linkedin.openhouse.tables.services.optimizer.OptimizerStatsClient; import com.linkedin.openhouse.tables.utils.TableUUIDGenerator; +import java.util.Collections; +import java.util.Map; import java.util.Optional; import java.util.UUID; import org.apache.iceberg.exceptions.BadRequestException; @@ -45,6 +48,14 @@ public class IcebergSnapshotsServiceTest { @MockBean private TableUUIDGenerator tableUUIDGenerator; + /** + * Forces the optional optimizer-stats client into the service so that we can verify the + * post-commit push is invoked. Production wiring is conditional on {@code + * optimizer.stats.enabled=true}; this {@code @MockBean} bypasses that condition for the one test + * that exercises the hook. + */ + @MockBean private OptimizerStatsClient optimizerStatsClient; + private OpenHouseInternalRepository mockRepository; @Captor ArgumentCaptor tableDtoArgumentCaptor; @@ -90,6 +101,32 @@ public void testTableCreated() { verifyCalls(key, TEST_TABLE_CREATOR, requestBody.getCreateUpdateTableRequestBody()); } + @Test + public void testOptimizerStatsClientInvokedAfterSuccessfulCommit() { + final IcebergSnapshotsRequestBody requestBody = + TEST_ICEBERG_SNAPSHOTS_INITIAL_VERSION_REQUEST_BODY; + final String dbId = requestBody.getCreateUpdateTableRequestBody().getDatabaseId(); + final String tableId = requestBody.getCreateUpdateTableRequestBody().getTableId(); + final TableDtoPrimaryKey key = + TableDtoPrimaryKey.builder().databaseId(dbId).tableId(tableId).build(); + final Map summary = Collections.singletonMap("total-data-files", "7"); + final TableDto savedDto = + TableDto.builder() + .databaseId(dbId) + .tableId(tableId) + .currentSnapshotSummary(summary) + .build(); + + Mockito.when(tableUUIDGenerator.generateUUID(Mockito.any(IcebergSnapshotsRequestBody.class))) + .thenReturn(UUID.randomUUID()); + Mockito.when(mockRepository.findById(key)).thenReturn(Optional.empty()); + Mockito.when(mockRepository.save(Mockito.any(TableDto.class))).thenReturn(savedDto); + + service.putIcebergSnapshots(dbId, tableId, requestBody, TEST_TABLE_CREATOR); + + Mockito.verify(optimizerStatsClient, Mockito.times(1)).report(savedDto, summary); + } + @Test public void testPutTableExceptionHandling() { final IcebergSnapshotsRequestBody requestBody = diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableDtoMappingTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableDtoMappingTest.java index e3e19a7d8..64aefd19b 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableDtoMappingTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableDtoMappingTest.java @@ -33,7 +33,8 @@ public class TableDtoMappingTest { "jsonSnapshots", "snapshotRefs", "policies", - "tableType"); + "tableType", + "currentSnapshotSummary"); /** Making all fields making it to map is expected, and all expected field are making it there. */ @Test diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsClientTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsClientTest.java new file mode 100644 index 000000000..33ea0d197 --- /dev/null +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsClientTest.java @@ -0,0 +1,242 @@ +package com.linkedin.openhouse.tables.services.optimizer; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.linkedin.openhouse.common.metrics.MetricsConstant; +import com.linkedin.openhouse.tables.model.TableDto; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import okhttp3.mockwebserver.SocketPolicy; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.netty.http.client.HttpClient; + +class OptimizerStatsClientTest { + + private static final String TABLE_UUID = "uuid-1"; + private static final String DB = "db1"; + private static final String TABLE = "tbl1"; + private static final Duration BLOCK_MAX = Duration.ofSeconds(5); + + private MockWebServer server; + private MeterRegistry meterRegistry; + private OptimizerStatsProperties properties; + private OptimizerStatsClient client; + private final ObjectMapper mapper = new ObjectMapper(); + + @BeforeEach + void setUp() throws IOException { + server = new MockWebServer(); + server.start(); + meterRegistry = new SimpleMeterRegistry(); + properties = new OptimizerStatsProperties(); + properties.setEnabled(true); + properties.setBaseUri(server.url("/").toString()); + properties.setPerAttemptTimeoutMs(1000L); + properties.setTotalTimeoutMs(2000L); + properties.setMaxAttempts(3); + client = new OptimizerStatsClient(buildWebClient(), meterRegistry, properties); + } + + @AfterEach + void tearDown() throws IOException { + server.shutdown(); + } + + private WebClient buildWebClient() { + HttpClient httpClient = HttpClient.create(); + return WebClient.builder() + .baseUrl(properties.getBaseUri()) + .clientConnector(new ReactorClientHttpConnector(httpClient)) + .build(); + } + + @Test + void report_optedIn_sendsRequestWithPayloadFieldsFromSummary() throws Exception { + server.enqueue(new MockResponse().setResponseCode(200)); + + TableDto saved = + optedInBuilder() + .tableVersion("v3") + .tableLocation("/data/tables/db1/tbl1") + .currentSnapshotSummary(summary()) + .build(); + + client.reportAsync(saved, saved.getCurrentSnapshotSummary()).block(BLOCK_MAX); + + RecordedRequest req = server.takeRequest(2, TimeUnit.SECONDS); + assertThat(req).isNotNull(); + assertThat(req.getMethod()).isEqualTo("PUT"); + assertThat(req.getPath()).isEqualTo("/v1/optimizer/stats/" + TABLE_UUID); + + @SuppressWarnings("unchecked") + Map body = mapper.readValue(req.getBody().readUtf8(), Map.class); + assertThat(body).containsEntry("databaseName", DB).containsEntry("tableName", TABLE); + @SuppressWarnings("unchecked") + Map stats = (Map) body.get("stats"); + @SuppressWarnings("unchecked") + Map snapshot = (Map) stats.get("snapshot"); + @SuppressWarnings("unchecked") + Map delta = (Map) stats.get("delta"); + assertThat(snapshot) + .containsEntry("tableVersion", "v3") + .containsEntry("tableLocation", "/data/tables/db1/tbl1") + .containsEntry("tableSizeBytes", 4096) + .containsEntry("numCurrentFiles", 12); + assertThat(delta) + .containsEntry("numFilesAdded", 5) + .containsEntry("numFilesDeleted", 2) + .containsEntry("addedSizeBytes", 2048) + .containsEntry("deletedSizeBytes", 1024); + + assertThat( + meterRegistry + .counter(MetricsConstant.OPTIMIZER_STATS_ATTEMPTS, "outcome", "success") + .count()) + .isEqualTo(1.0); + } + + @Test + void report_notOptedIn_skipsCallAndIncrementsSkippedCounter() { + TableDto saved = builderWithoutOptIn().currentSnapshotSummary(summary()).build(); + + client.reportAsync(saved, saved.getCurrentSnapshotSummary()).block(BLOCK_MAX); + + assertThat(server.getRequestCount()).isZero(); + assertThat( + meterRegistry + .counter(MetricsConstant.OPTIMIZER_STATS_SKIPPED, "reason", "opt_out") + .count()) + .isEqualTo(1.0); + } + + @Test + void report_noSnapshot_skipsCallAndIncrementsSkippedCounter() { + TableDto saved = optedInBuilder().currentSnapshotSummary(null).build(); + + client.reportAsync(saved, null).block(BLOCK_MAX); + + assertThat(server.getRequestCount()).isZero(); + assertThat( + meterRegistry + .counter(MetricsConstant.OPTIMIZER_STATS_SKIPPED, "reason", "no_snapshot") + .count()) + .isEqualTo(1.0); + } + + @Test + void report_5xxThenSuccess_retriesAndSucceeds() { + server.enqueue(new MockResponse().setResponseCode(503)); + server.enqueue(new MockResponse().setResponseCode(200)); + + TableDto saved = optedInBuilder().currentSnapshotSummary(summary()).build(); + client.reportAsync(saved, saved.getCurrentSnapshotSummary()).block(BLOCK_MAX); + + assertThat(server.getRequestCount()).isEqualTo(2); + assertThat( + meterRegistry + .counter(MetricsConstant.OPTIMIZER_STATS_ATTEMPTS, "outcome", "success") + .count()) + .isEqualTo(1.0); + assertThat( + meterRegistry + .counter(MetricsConstant.OPTIMIZER_STATS_ATTEMPTS, "outcome", "retryable_failure") + .count()) + .isEqualTo(1.0); + } + + @Test + void report_allAttemptsFail_swallowsAndIncrementsFailedFinal() { + properties.setMaxAttempts(2); + server.enqueue(new MockResponse().setResponseCode(503)); + server.enqueue(new MockResponse().setResponseCode(503)); + + TableDto saved = optedInBuilder().currentSnapshotSummary(summary()).build(); + client.reportAsync(saved, saved.getCurrentSnapshotSummary()).block(BLOCK_MAX); + + assertThat(server.getRequestCount()).isEqualTo(2); + assertThat( + meterRegistry + .counter(MetricsConstant.OPTIMIZER_STATS_FAILED_FINAL, "outcome", "server_error") + .count()) + .isEqualTo(1.0); + } + + @Test + void report_4xxNonRetryable_doesNotRetry() { + server.enqueue(new MockResponse().setResponseCode(400)); + + TableDto saved = optedInBuilder().currentSnapshotSummary(summary()).build(); + client.reportAsync(saved, saved.getCurrentSnapshotSummary()).block(BLOCK_MAX); + + assertThat(server.getRequestCount()).isEqualTo(1); + assertThat( + meterRegistry + .counter(MetricsConstant.OPTIMIZER_STATS_FAILED_FINAL, "outcome", "client_error") + .count()) + .isEqualTo(1.0); + } + + @Test + void report_perAttemptTimeoutTrips_recordsTimeout() { + properties.setPerAttemptTimeoutMs(150L); + properties.setTotalTimeoutMs(600L); + properties.setMaxAttempts(1); + client = new OptimizerStatsClient(buildWebClient(), meterRegistry, properties); + // Connection opens but the server never sends any response — forces Reactor's per-attempt + // .timeout(150ms) to fire with TimeoutException. + server.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.NO_RESPONSE)); + + TableDto saved = optedInBuilder().currentSnapshotSummary(summary()).build(); + client.reportAsync(saved, saved.getCurrentSnapshotSummary()).block(BLOCK_MAX); + + assertThat( + meterRegistry + .counter(MetricsConstant.OPTIMIZER_STATS_FAILED_FINAL, "outcome", "timeout") + .count()) + .isEqualTo(1.0); + } + + // ---- helpers ---- + + private static Map summary() { + Map s = new HashMap<>(); + s.put("total-data-files", "12"); + s.put("total-files-size", "4096"); + s.put("added-data-files", "5"); + s.put("deleted-data-files", "2"); + s.put("added-files-size", "2048"); + s.put("removed-files-size", "1024"); + return s; + } + + private static TableDto.TableDtoBuilder optedInBuilder() { + Map props = new HashMap<>(); + props.put(OptimizerStatsClient.OPT_IN_PROPERTY, "true"); + return TableDto.builder() + .tableUUID(TABLE_UUID) + .databaseId(DB) + .tableId(TABLE) + .tableProperties(props); + } + + private static TableDto.TableDtoBuilder builderWithoutOptIn() { + return TableDto.builder() + .tableUUID(TABLE_UUID) + .databaseId(DB) + .tableId(TABLE) + .tableProperties(new HashMap<>()); + } +} From af025aef96153de3852408e78bc0f2d8a43c5aef Mon Sep 17 00:00:00 2001 From: mkuchenbecker Date: Wed, 27 May 2026 14:16:30 -0700 Subject: [PATCH 2/5] refactor(tables): generic PostCommitOperation framework; address review Restructures the post-commit stats push around a small generic dispatcher so async/timeout/swallow plumbing lives in one place. Addresses the review comments on PR #608. Framework (new): - PostCommitOperation interface: String name() + Optional> prepare(TableDto). Implementations describe what to do; the dispatcher owns how it runs. - PostCommitDispatcher (@ConditionalOnProperty tables.postcommit.enabled): per-op wall-clock timeout, error swallow, metric emission, fire-and- forget subscribe. Exposes package-private decorate() for synchronous testing. - PostCommitProperties: enabled (default false), per-op-timeout-ms (default 3000). - Metric keys renamed: POSTCOMMIT_OP_DURATION / POSTCOMMIT_OP_SKIPPED / POSTCOMMIT_OP_FAILED, tagged op={name} and outcome={success, timeout, network_error, server_error, client_error, prepare_threw, unknown_error}. Why async: a synchronous post-commit push converts an optimizer outage into a tables write outage. Stats are a best-effort scheduling signal and must not block the write path. Crash-loss is bounded because state is cumulative: the next commit re-pushes the current state. Operation (refactor): - OptimizerStatsClient -> OptimizerStatsPostCommitOperation implementing PostCommitOperation. Drops outer timeout, onErrorResume, .subscribe() (dispatcher owns these). Keeps per-attempt timeout + bounded retry on retryable errors only. - Adds snapshotId to OptimizerStatsRequest.Snapshot; tracked for server-side idempotency wiring in BDP-102985. - Path constant comment now cites TableStatsController.TABLE_PATH_TEMPLATE as the canonical source. TableDto: - currentSnapshotSummary (Map) replaced by Optional (snapshotId + summary). Stored nullable internally; consumers read through getCurrentSnapshot() which returns Optional. Javadoc rewritten to describe the semantic condition (presence ~ table has a committed snapshot), not the private impl that populates it. - New CurrentSnapshotInfo value class. - InternalRepositoryUtils populates the new field with both snapshotId and summary; no extra I/O (still purely in-memory). Service: - IcebergSnapshotsServiceImpl: Optional replaced by Optional; on-commit hook is one line. Optimizer side: - TableStatsController publishes BASE_PATH and TABLE_PATH_TEMPLATE as public constants; @RequestMapping refactored to use BASE_PATH. Config: - application.properties: drop optimizer.stats.total-timeout-ms; add tables.postcommit.enabled + tables.postcommit.per-op-timeout-ms. Tests: - New PostCommitDispatcherTest (6 cases, no sleeps; uses decorate() to block synchronously). - OptimizerStatsClientTest renamed -> OptimizerStatsPostCommitOperationTest (6 cases adapted to the new prepare() contract; timeout case moved to PostCommitDispatcherTest). - IcebergSnapshotsServiceTest swaps @MockBean OptimizerStatsClient for @MockBean PostCommitDispatcher and verifies dispatch(savedDto) once. - TableDtoMappingTest updated for the renamed field. - All :services:tables and :services:optimizer tests pass; spotless clean. Filed: BDP-102985 (Optimizer stats: use snapshot ID as idempotency token on upsert) under epic BDP-102026. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../common/metrics/MetricsConstant.java | 12 +- .../api/controller/TableStatsController.java | 14 +- .../tables/dto/mapper/TablesMapper.java | 4 +- .../tables/model/CurrentSnapshotInfo.java | 33 +++ .../openhouse/tables/model/TableDto.java | 24 +- .../impl/InternalRepositoryUtils.java | 10 +- .../services/IcebergSnapshotsServiceImpl.java | 15 +- .../optimizer/OptimizerStatsClient.java | 245 ------------------ .../optimizer/OptimizerStatsConfig.java | 9 +- .../OptimizerStatsPostCommitOperation.java | 173 +++++++++++++ .../optimizer/OptimizerStatsProperties.java | 15 +- .../optimizer/OptimizerStatsRequest.java | 6 + .../postcommit/PostCommitDispatcher.java | 147 +++++++++++ .../postcommit/PostCommitOperation.java | 39 +++ .../postcommit/PostCommitProperties.java | 31 +++ .../src/main/resources/application.properties | 10 +- .../service/IcebergSnapshotsServiceTest.java | 29 +-- .../tables/model/TableDtoMappingTest.java | 2 +- ...ptimizerStatsPostCommitOperationTest.java} | 145 ++++------- .../postcommit/PostCommitDispatcherTest.java | 177 +++++++++++++ 20 files changed, 747 insertions(+), 393 deletions(-) create mode 100644 services/tables/src/main/java/com/linkedin/openhouse/tables/model/CurrentSnapshotInfo.java delete mode 100644 services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsClient.java create mode 100644 services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperation.java create mode 100644 services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcher.java create mode 100644 services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitOperation.java create mode 100644 services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitProperties.java rename services/tables/src/test/java/com/linkedin/openhouse/tables/services/optimizer/{OptimizerStatsClientTest.java => OptimizerStatsPostCommitOperationTest.java} (53%) create mode 100644 services/tables/src/test/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcherTest.java diff --git a/services/common/src/main/java/com/linkedin/openhouse/common/metrics/MetricsConstant.java b/services/common/src/main/java/com/linkedin/openhouse/common/metrics/MetricsConstant.java index 7a47b790e..1c7945f9c 100644 --- a/services/common/src/main/java/com/linkedin/openhouse/common/metrics/MetricsConstant.java +++ b/services/common/src/main/java/com/linkedin/openhouse/common/metrics/MetricsConstant.java @@ -64,9 +64,11 @@ private MetricsConstant() {} public static final String HTS_LIST_TABLES_TIME = "hts_list_tables_time"; public static final String HTS_SEARCH_TABLES_TIME = "hts_search_tables_time"; - // Optimizer post-commit stats push (Tables → Optimizer) - public static final String OPTIMIZER_STATS_DURATION = "optimizer_stats_duration"; - public static final String OPTIMIZER_STATS_ATTEMPTS = "optimizer_stats_attempts"; - public static final String OPTIMIZER_STATS_SKIPPED = "optimizer_stats_skipped"; - public static final String OPTIMIZER_STATS_FAILED_FINAL = "optimizer_stats_failed_final"; + // Tables post-commit operation framework (bounded, best-effort actions after commit). + // Tagged with op={operation name}; on failure additionally tagged with outcome={timeout, + // network_error, server_error, client_error, prepare_threw, unknown_error}. + // The duration timer is bounded by tables.postcommit.per-op-timeout-ms. + public static final String POSTCOMMIT_OP_DURATION = "postcommit_op_duration"; + public static final String POSTCOMMIT_OP_SKIPPED = "postcommit_op_skipped"; + public static final String POSTCOMMIT_OP_FAILED = "postcommit_op_failed"; } diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableStatsController.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableStatsController.java index b119dd1c7..87fbfa709 100644 --- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableStatsController.java +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableStatsController.java @@ -24,10 +24,22 @@ /** REST controller for managing per-table stats in the optimizer DB. */ @RestController -@RequestMapping("/v1/optimizer/stats") +@RequestMapping(TableStatsController.BASE_PATH) @RequiredArgsConstructor public class TableStatsController { + /** + * Base path for the stats endpoints. Single source of truth — external callers must reference. + */ + public static final String BASE_PATH = "/v1/optimizer/stats"; + + /** + * URI template (with {@code {tableUuid}} placeholder) for the per-table stats upsert/fetch + * endpoint. Use this from any client that calls the optimizer over HTTP, rather than rebuilding + * the path inline. + */ + public static final String TABLE_PATH_TEMPLATE = BASE_PATH + "/{tableUuid}"; + private final OptimizerDataService service; /** diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/TablesMapper.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/TablesMapper.java index 08111cd82..9315df9d8 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/TablesMapper.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/TablesMapper.java @@ -73,6 +73,7 @@ public interface TablesMapper { @Mapping(source = "requestBody.sortOrder", target = "sortOrder"), @Mapping(target = "lastModifiedTime", ignore = true), @Mapping(target = "creationTime", ignore = true), + @Mapping(target = "currentSnapshot", ignore = true) }) TableDto toTableDto(TableDto tableDto, CreateUpdateTableRequestBody requestBody); @@ -120,7 +121,8 @@ public interface TablesMapper { defaultExpression = "java(TableType.PRIMARY_TABLE)"), @Mapping(source = "requestBody.createUpdateTableRequestBody.sortOrder", target = "sortOrder"), @Mapping(target = "lastModifiedTime", ignore = true), - @Mapping(target = "creationTime", ignore = true) + @Mapping(target = "creationTime", ignore = true), + @Mapping(target = "currentSnapshot", ignore = true) }) TableDto toTableDto(TableDto tableDto, IcebergSnapshotsRequestBody requestBody); diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/model/CurrentSnapshotInfo.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/model/CurrentSnapshotInfo.java new file mode 100644 index 000000000..9f363b4ec --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/model/CurrentSnapshotInfo.java @@ -0,0 +1,33 @@ +package com.linkedin.openhouse.tables.model; + +import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; + +/** + * In-memory snapshot of the Iceberg current-snapshot metadata that was loaded alongside a {@link + * TableDto}. Carries only fields already materialized by the catalog client — never triggers a + * separate HDFS / object-store read. + * + *

Present whenever the table has at least one committed snapshot at construction time; absent + * (modeled as {@link java.util.Optional#empty()} on {@link TableDto}) when the table has no + * committed data — e.g. a {@code CREATE TABLE} with no rows yet. + */ +@Getter +@Builder +@AllArgsConstructor +@EqualsAndHashCode +public class CurrentSnapshotInfo { + + /** Iceberg snapshot ID (decimal long). Stable per commit; usable as an idempotency token. */ + private final long snapshotId; + + /** + * Iceberg {@code Snapshot.summary()} map, unmodified. Keys include {@code total-data-files}, + * {@code total-files-size}, {@code added-data-files}, {@code deleted-data-files}, {@code + * added-files-size}, {@code removed-files-size}. + */ + private final Map summary; +} diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/model/TableDto.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/model/TableDto.java index 24373f4a1..16608f286 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/model/TableDto.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/model/TableDto.java @@ -11,6 +11,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import javax.persistence.Convert; import javax.persistence.ElementCollection; import javax.persistence.Entity; @@ -85,16 +86,21 @@ public class TableDto { private boolean replaceCommit; /** - * Iceberg {@code Snapshot.summary()} for the current snapshot at the time the {@code TableDto} - * was constructed (post-commit on save; post-load on read). Populated only when an Iceberg {@code - * Table} is available — i.e. by {@code InternalRepositoryUtils.convertToTableDto}. Not persisted, - * not part of equality. - * - *

Used downstream by the optimizer post-commit stats push (see {@code - * services.optimizer.OptimizerStatsClient}) so that the service layer can read snapshot stats - * without a separate HDFS round-trip. + * In-memory current-snapshot metadata captured when this {@code TableDto} was built from an + * Iceberg {@code Table}. Present whenever the underlying table has at least one committed + * snapshot at that point; absent for tables with no committed data (e.g. {@code CREATE TABLE} + * with no rows). Not persisted, not part of equality. Stored nullable internally; consumers must + * read through {@link #getCurrentSnapshot()} to get the {@link Optional}. */ - @Transient @EqualsAndHashCode.Exclude private Map currentSnapshotSummary; + @Getter(AccessLevel.NONE) + @Transient + @EqualsAndHashCode.Exclude + private CurrentSnapshotInfo currentSnapshot; + + /** Returns the current-snapshot metadata if any, else {@link Optional#empty()}. */ + public Optional getCurrentSnapshot() { + return Optional.ofNullable(currentSnapshot); + } /** * Bundling eligible string type field into a map as {@link org.mapstruct.Mapper} doesn't provide diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java index 9ba40c2d8..d3ced5458 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java @@ -9,6 +9,7 @@ import com.linkedin.openhouse.tables.dto.mapper.iceberg.PartitionSpecMapper; import com.linkedin.openhouse.tables.dto.mapper.iceberg.PoliciesSpecMapper; import com.linkedin.openhouse.tables.dto.mapper.iceberg.TableTypeMapper; +import com.linkedin.openhouse.tables.model.CurrentSnapshotInfo; import com.linkedin.openhouse.tables.model.TableDto; import com.linkedin.openhouse.tables.repository.PreservedKeyChecker; import java.net.URI; @@ -135,8 +136,13 @@ static TableDto convertToTableDto( .jsonSnapshots(null) .tableProperties(megaProps) .sortOrder(SortOrderParser.toJson(table.sortOrder())) - .currentSnapshotSummary( - table.currentSnapshot() == null ? null : table.currentSnapshot().summary()) + .currentSnapshot( + table.currentSnapshot() == null + ? null + : CurrentSnapshotInfo.builder() + .snapshotId(table.currentSnapshot().snapshotId()) + .summary(table.currentSnapshot().summary()) + .build()) .build(); return tableDto; diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/IcebergSnapshotsServiceImpl.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/IcebergSnapshotsServiceImpl.java index bb2cbde4c..6623b5f37 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/IcebergSnapshotsServiceImpl.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/IcebergSnapshotsServiceImpl.java @@ -10,7 +10,7 @@ import com.linkedin.openhouse.tables.model.TableDto; import com.linkedin.openhouse.tables.model.TableDtoPrimaryKey; import com.linkedin.openhouse.tables.repository.OpenHouseInternalRepository; -import com.linkedin.openhouse.tables.services.optimizer.OptimizerStatsClient; +import com.linkedin.openhouse.tables.services.postcommit.PostCommitDispatcher; import com.linkedin.openhouse.tables.utils.AuthorizationUtils; import com.linkedin.openhouse.tables.utils.TableUUIDGenerator; import java.util.Optional; @@ -34,11 +34,11 @@ public class IcebergSnapshotsServiceImpl implements IcebergSnapshotsService { @Autowired AuthorizationUtils authorizationUtils; /** - * Present only when {@code optimizer.stats.enabled=true}. When absent, no post-commit push is - * attempted. + * Present only when {@code tables.postcommit.enabled=true}. When absent, the on-commit hook is a + * literal no-op and no post-commit operations run. */ @Autowired(required = false) - Optional optimizerStatsClient = Optional.empty(); + Optional postCommitDispatcher = Optional.empty(); @Override public Pair putIcebergSnapshots( @@ -92,12 +92,7 @@ public Pair putIcebergSnapshots( } try { TableDto savedDto = openHouseInternalRepository.save(tableDtoToSave); - // Fire-and-forget push of the post-commit snapshot summary to the optimizer. Returns - // immediately; failures are swallowed inside the client. Skipped at the client when the - // table is not opted in via maintenance.optimizer.stats.enabled or when there is no current - // snapshot (e.g. CREATE TABLE without a data commit). - optimizerStatsClient.ifPresent( - client -> client.report(savedDto, savedDto.getCurrentSnapshotSummary())); + postCommitDispatcher.ifPresent(d -> d.dispatch(savedDto)); return Pair.of(savedDto, !tableDto.isPresent()); } catch (BadRequestException e) { throw new RequestValidationFailureException(e.getMessage(), e); diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsClient.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsClient.java deleted file mode 100644 index 23c69082e..000000000 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsClient.java +++ /dev/null @@ -1,245 +0,0 @@ -package com.linkedin.openhouse.tables.services.optimizer; - -import com.linkedin.openhouse.common.metrics.MetricsConstant; -import com.linkedin.openhouse.tables.model.TableDto; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.Timer; -import java.time.Duration; -import java.util.Map; -import java.util.concurrent.TimeoutException; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; -import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.web.reactive.function.client.WebClientRequestException; -import org.springframework.web.reactive.function.client.WebClientResponseException; -import reactor.core.publisher.Mono; -import reactor.util.retry.Retry; -import reactor.util.retry.RetrySpec; - -/** - * Pushes a stats record to the optimizer's {@code PUT /v1/optimizer/stats/{tableUuid}} endpoint - * after a successful Iceberg snapshot commit. Fire-and-forget — the {@link #report} call returns - * immediately on the commit thread; the HTTP exchange runs on the WebClient's Netty event loop. - * - *

Errors are recorded as Micrometer metrics and logged at {@code warn} level, but never - * propagated. A push failure must not break the commit. - * - *

Timeout / retry shape (from {@link OptimizerStatsProperties}): - * - *

    - *
  • {@code perAttemptTimeoutMs} bounds each HTTP attempt (1000 ms default). - *
  • {@code maxAttempts} caps the total number of attempts (3 default — one initial + two - * retries). Retries only fire on retryable errors (network, 5xx, 408, 429, timeout). - *
  • {@code totalTimeoutMs} is the outer wall-clock ceiling (2000 ms default). Hard cancel. - *
- * - *

The bean is only wired when {@code optimizer.stats.enabled=true}. - */ -@Slf4j -@Component -@ConditionalOnProperty(prefix = "optimizer.stats", name = "enabled", havingValue = "true") -public class OptimizerStatsClient { - - /** Per-call URL path. */ - static final String PATH_TEMPLATE = "/v1/optimizer/stats/{tableUuid}"; - - /** Table-property key that opts a table in for the post-commit stats push. */ - static final String OPT_IN_PROPERTY = "maintenance.optimizer.stats.enabled"; - - /** Iceberg snapshot-summary keys we read. All values are decimal-string longs. */ - static final String SUMMARY_TOTAL_DATA_FILES = "total-data-files"; - - static final String SUMMARY_TOTAL_FILES_SIZE = "total-files-size"; - static final String SUMMARY_ADDED_DATA_FILES = "added-data-files"; - static final String SUMMARY_DELETED_DATA_FILES = "deleted-data-files"; - static final String SUMMARY_ADDED_FILES_SIZE = "added-files-size"; - static final String SUMMARY_REMOVED_FILES_SIZE = "removed-files-size"; - - private final WebClient webClient; - private final MeterRegistry meterRegistry; - private final OptimizerStatsProperties properties; - - public OptimizerStatsClient( - @Qualifier("optimizerStatsWebClient") WebClient webClient, - MeterRegistry meterRegistry, - OptimizerStatsProperties properties) { - this.webClient = webClient; - this.meterRegistry = meterRegistry; - this.properties = properties; - } - - /** - * Build and dispatch a stats push for the just-committed table. Returns immediately; the HTTP - * call runs in the background. {@code snapshotSummary} is the in-memory {@code - * Snapshot.summary()} map from the latest snapshot (no HDFS round-trip). - * - *

The call is skipped (and a {@code skipped} counter is emitted) when: - * - *

    - *
  • The table is not opted in via {@link #OPT_IN_PROPERTY}. - *
  • {@code snapshotSummary} is null or empty (no committed snapshot yet — e.g. CREATE TABLE - * with no rows). - *
- */ - public void report(TableDto saved, Map snapshotSummary) { - reportAsync(saved, snapshotSummary).subscribe(); - } - - /** - * Same as {@link #report} but returns the underlying {@link Mono} so callers (notably tests) can - * block on completion. Production callers use {@link #report}. - */ - Mono reportAsync(TableDto saved, Map snapshotSummary) { - if (!isOptedIn(saved)) { - meterRegistry - .counter(MetricsConstant.OPTIMIZER_STATS_SKIPPED, "reason", "opt_out") - .increment(); - return Mono.empty(); - } - if (snapshotSummary == null || snapshotSummary.isEmpty()) { - meterRegistry - .counter(MetricsConstant.OPTIMIZER_STATS_SKIPPED, "reason", "no_snapshot") - .increment(); - return Mono.empty(); - } - - OptimizerStatsRequest body = buildRequest(saved, snapshotSummary); - String tableUuid = saved.getTableUUID(); - Timer.Sample sample = Timer.start(meterRegistry); - - RetrySpec retrySpec = - Retry.max(Math.max(0, properties.getMaxAttempts() - 1)) - .filter(this::isRetryable) - .onRetryExhaustedThrow((spec, signal) -> signal.failure()); - - return webClient - .put() - .uri(PATH_TEMPLATE, tableUuid) - .bodyValue(body) - .retrieve() - .toBodilessEntity() - .timeout(Duration.ofMillis(properties.getPerAttemptTimeoutMs())) - .doOnError( - e -> - meterRegistry - .counter( - MetricsConstant.OPTIMIZER_STATS_ATTEMPTS, - "outcome", - isRetryable(e) ? "retryable_failure" : "non_retryable_failure") - .increment()) - .doOnSuccess( - ignored -> - meterRegistry - .counter(MetricsConstant.OPTIMIZER_STATS_ATTEMPTS, "outcome", "success") - .increment()) - .retryWhen(retrySpec) - .timeout(Duration.ofMillis(properties.getTotalTimeoutMs())) - .doOnSuccess( - ignored -> - sample.stop( - meterRegistry.timer( - MetricsConstant.OPTIMIZER_STATS_DURATION, "outcome", "success"))) - .onErrorResume( - e -> { - String outcome = classifyOutcome(e); - sample.stop( - meterRegistry.timer( - MetricsConstant.OPTIMIZER_STATS_DURATION, "outcome", outcome)); - meterRegistry - .counter(MetricsConstant.OPTIMIZER_STATS_FAILED_FINAL, "outcome", outcome) - .increment(); - log.warn( - "Optimizer stats push failed for table {} ({}): {}", - tableUuid, - outcome, - e.toString()); - return Mono.empty(); - }) - .then(); - } - - /** {@code true} iff the table's properties contain the literal opt-in value {@code "true"}. */ - private boolean isOptedIn(TableDto saved) { - Map props = saved.getTableProperties(); - return props != null && "true".equals(props.get(OPT_IN_PROPERTY)); - } - - /** Build the wire body. Missing summary keys default to 0L. */ - OptimizerStatsRequest buildRequest(TableDto saved, Map summary) { - OptimizerStatsRequest.Snapshot snapshot = - OptimizerStatsRequest.Snapshot.builder() - .tableVersion(saved.getTableVersion()) - .tableLocation(saved.getTableLocation()) - .tableSizeBytes(longOrZero(summary, SUMMARY_TOTAL_FILES_SIZE)) - .numCurrentFiles(longOrZero(summary, SUMMARY_TOTAL_DATA_FILES)) - .build(); - OptimizerStatsRequest.Delta delta = - OptimizerStatsRequest.Delta.builder() - .numFilesAdded(longOrZero(summary, SUMMARY_ADDED_DATA_FILES)) - .numFilesDeleted(longOrZero(summary, SUMMARY_DELETED_DATA_FILES)) - .addedSizeBytes(longOrZero(summary, SUMMARY_ADDED_FILES_SIZE)) - .deletedSizeBytes(longOrZero(summary, SUMMARY_REMOVED_FILES_SIZE)) - .build(); - return OptimizerStatsRequest.builder() - .databaseName(saved.getDatabaseId()) - .tableName(saved.getTableId()) - .stats(OptimizerStatsRequest.Stats.builder().snapshot(snapshot).delta(delta).build()) - .tableProperties(saved.getTableProperties()) - .build(); - } - - /** - * Retryable errors: per-attempt timeout, network-level failures, 5xx, 408, 429. Other 4xx are - * client errors — retrying won't fix them. - */ - boolean isRetryable(Throwable e) { - if (e instanceof TimeoutException) { - return true; - } - if (e instanceof WebClientRequestException) { - return true; - } - if (e instanceof WebClientResponseException) { - int code = ((WebClientResponseException) e).getStatusCode().value(); - return code == 408 || code == 429 || (code >= 500 && code < 600); - } - return false; - } - - /** - * Map a final-stage error to one of {@code success, timeout, network_error, server_error, - * client_error, unknown_error} for the {@code outcome} tag on duration / failed_final metrics. - */ - private String classifyOutcome(Throwable e) { - if (e instanceof TimeoutException) { - return "timeout"; - } - if (e instanceof WebClientRequestException) { - return "network_error"; - } - if (e instanceof WebClientResponseException) { - int code = ((WebClientResponseException) e).getStatusCode().value(); - if (code >= 500 && code < 600) { - return "server_error"; - } - if (code >= 400 && code < 500) { - return "client_error"; - } - } - return "unknown_error"; - } - - private static long longOrZero(Map m, String key) { - String v = m.get(key); - if (v == null) { - return 0L; - } - try { - return Long.parseLong(v); - } catch (NumberFormatException nfe) { - return 0L; - } - } -} diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsConfig.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsConfig.java index bbddbce6b..8061fb644 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsConfig.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsConfig.java @@ -20,10 +20,11 @@ public class OptimizerStatsConfig { /** * Dedicated WebClient for the optimizer stats endpoint. Per-attempt and outer timeouts are - * applied at the call site on the Reactor chain in {@link OptimizerStatsClient} — they are not - * configured on the underlying Netty client so that the timeout always emerges as a standard - * {@link java.util.concurrent.TimeoutException} (not a Netty {@code ReadTimeoutException}), which - * keeps the client's outcome classification simple. + * applied at the call site on the Reactor chain (per-attempt in {@link + * OptimizerStatsPostCommitOperation}, outer per-op in the {@code PostCommitDispatcher}) — they + * are not configured on the underlying Netty client so that the timeout always emerges as a + * standard {@link java.util.concurrent.TimeoutException} (not a Netty {@code + * ReadTimeoutException}), which keeps the dispatcher's outcome classification simple. */ @Bean("optimizerStatsWebClient") public WebClient optimizerStatsWebClient(OptimizerStatsProperties properties) { diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperation.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperation.java new file mode 100644 index 000000000..0a82d8598 --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperation.java @@ -0,0 +1,173 @@ +package com.linkedin.openhouse.tables.services.optimizer; + +import com.linkedin.openhouse.tables.model.CurrentSnapshotInfo; +import com.linkedin.openhouse.tables.model.TableDto; +import com.linkedin.openhouse.tables.services.postcommit.PostCommitOperation; +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeoutException; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClientRequestException; +import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; +import reactor.util.retry.RetrySpec; + +/** + * {@link PostCommitOperation} that PUTs a snapshot-stats record to the optimizer's per-table stats + * endpoint. {@link #prepare(TableDto)} returns a {@link Mono} that completes on HTTP 2xx and + * signals an error otherwise; the dispatcher owns timeout, subscription, error swallowing, and + * metric emission. + * + *

Skipped (operation returns {@link Optional#empty()}) when the table is not opted in via the + * {@link #OPT_IN_PROPERTY} table property, or when no current snapshot is present (e.g. {@code + * CREATE TABLE} with no rows yet). + * + *

Internal retry: bounded by {@link OptimizerStatsProperties#getMaxAttempts()}, fires only on + * retryable errors (network, {@link TimeoutException}, HTTP 408 / 429 / 5xx). The dispatcher's + * outer per-op timeout is the hard ceiling on the whole chain. + * + *

Bean is only wired when {@code optimizer.stats.enabled=true}. The path constant is + * intentionally duplicated from {@code TableStatsController.TABLE_PATH_TEMPLATE}; keep in sync. + * Tables service does not take a compile-time dependency on the optimizer service jar. + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "optimizer.stats", name = "enabled", havingValue = "true") +public class OptimizerStatsPostCommitOperation implements PostCommitOperation { + + /** Metric tag value for {@code op}. */ + static final String OP_NAME = "optimizer_stats"; + + /** + * Per-call URL path. Intentionally duplicated from {@code + * TableStatsController.TABLE_PATH_TEMPLATE}; keep in sync. + */ + static final String PATH_TEMPLATE = "/v1/optimizer/stats/{tableUuid}"; + + /** Table-property key that opts a table in for the post-commit stats push. */ + static final String OPT_IN_PROPERTY = "maintenance.optimizer.stats.enabled"; + + /** Iceberg snapshot-summary keys we read. All values are decimal-string longs. */ + static final String SUMMARY_TOTAL_DATA_FILES = "total-data-files"; + + static final String SUMMARY_TOTAL_FILES_SIZE = "total-files-size"; + static final String SUMMARY_ADDED_DATA_FILES = "added-data-files"; + static final String SUMMARY_DELETED_DATA_FILES = "deleted-data-files"; + static final String SUMMARY_ADDED_FILES_SIZE = "added-files-size"; + static final String SUMMARY_REMOVED_FILES_SIZE = "removed-files-size"; + + private final WebClient webClient; + private final OptimizerStatsProperties properties; + + public OptimizerStatsPostCommitOperation( + @Qualifier("optimizerStatsWebClient") WebClient webClient, + OptimizerStatsProperties properties) { + this.webClient = webClient; + this.properties = properties; + } + + @Override + public String name() { + return OP_NAME; + } + + @Override + public Optional> prepare(TableDto savedDto) { + if (!isOptedIn(savedDto)) { + return Optional.empty(); + } + Optional snapshot = savedDto.getCurrentSnapshot(); + if (!snapshot.isPresent()) { + return Optional.empty(); + } + + OptimizerStatsRequest body = buildRequest(savedDto, snapshot.get()); + String tableUuid = savedDto.getTableUUID(); + + RetrySpec retrySpec = + Retry.max(Math.max(0, properties.getMaxAttempts() - 1)).filter(this::isRetryable); + + Mono chain = + webClient + .put() + .uri(PATH_TEMPLATE, tableUuid) + .bodyValue(body) + .retrieve() + .toBodilessEntity() + .timeout(Duration.ofMillis(properties.getPerAttemptTimeoutMs())) + .retryWhen(retrySpec.onRetryExhaustedThrow((spec, signal) -> signal.failure())) + .then(); + return Optional.of(chain); + } + + /** {@code true} iff the table's properties contain the literal opt-in value {@code "true"}. */ + private boolean isOptedIn(TableDto saved) { + Map props = saved.getTableProperties(); + return props != null && "true".equals(props.get(OPT_IN_PROPERTY)); + } + + /** Build the wire body. Missing summary keys default to 0L. */ + OptimizerStatsRequest buildRequest(TableDto saved, CurrentSnapshotInfo snapshot) { + Map summary = snapshot.getSummary(); + OptimizerStatsRequest.Snapshot snapshotPayload = + OptimizerStatsRequest.Snapshot.builder() + .snapshotId(snapshot.getSnapshotId()) + .tableVersion(saved.getTableVersion()) + .tableLocation(saved.getTableLocation()) + .tableSizeBytes(longOrZero(summary, SUMMARY_TOTAL_FILES_SIZE)) + .numCurrentFiles(longOrZero(summary, SUMMARY_TOTAL_DATA_FILES)) + .build(); + OptimizerStatsRequest.Delta delta = + OptimizerStatsRequest.Delta.builder() + .numFilesAdded(longOrZero(summary, SUMMARY_ADDED_DATA_FILES)) + .numFilesDeleted(longOrZero(summary, SUMMARY_DELETED_DATA_FILES)) + .addedSizeBytes(longOrZero(summary, SUMMARY_ADDED_FILES_SIZE)) + .deletedSizeBytes(longOrZero(summary, SUMMARY_REMOVED_FILES_SIZE)) + .build(); + return OptimizerStatsRequest.builder() + .databaseName(saved.getDatabaseId()) + .tableName(saved.getTableId()) + .stats(OptimizerStatsRequest.Stats.builder().snapshot(snapshotPayload).delta(delta).build()) + .tableProperties(saved.getTableProperties()) + .build(); + } + + /** + * Retryable errors: per-attempt timeout, network-level failures, 5xx, 408, 429. Other 4xx are + * client errors — retrying won't fix them. + */ + boolean isRetryable(Throwable e) { + if (e instanceof TimeoutException) { + return true; + } + if (e instanceof WebClientRequestException) { + return true; + } + if (e instanceof WebClientResponseException) { + int code = ((WebClientResponseException) e).getStatusCode().value(); + return code == 408 || code == 429 || (code >= 500 && code < 600); + } + return false; + } + + private static long longOrZero(Map m, String key) { + if (m == null) { + return 0L; + } + String v = m.get(key); + if (v == null) { + return 0L; + } + try { + return Long.parseLong(v); + } catch (NumberFormatException nfe) { + return 0L; + } + } +} diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsProperties.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsProperties.java index 7d746ee93..53b1844be 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsProperties.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsProperties.java @@ -25,19 +25,16 @@ public class OptimizerStatsProperties { */ private String baseUri; - /** Per-attempt request timeout in milliseconds. Default 1000. */ - private long perAttemptTimeoutMs = 1000L; - /** - * Hard ceiling on total wall-clock time for the entire call (including retries) in milliseconds. - * Default 2000. When the outer timeout fires, the chain is cancelled and the error is swallowed. + * Per-attempt HTTP timeout in milliseconds. Bounds each individual attempt so retries can fit + * inside the dispatcher's outer per-op budget ({@code tables.postcommit.per-op-timeout-ms}, + * default 3000 ms). Default 1000 ms. */ - private long totalTimeoutMs = 2000L; + private long perAttemptTimeoutMs = 1000L; /** - * Total attempt count (initial try plus retries). Default 3. With 1000 ms per attempt and a 2000 - * ms outer ceiling, only about two attempts realistically fit on a slow path; the third is - * available if attempts return quickly. + * Total attempt count (initial try plus retries). Default 3. Retries fire only on retryable + * errors (network, timeout, 408/429/5xx); other 4xx fail fast without retry. */ private int maxAttempts = 3; } diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsRequest.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsRequest.java index 68a8646da..df929f666 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsRequest.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsRequest.java @@ -47,6 +47,12 @@ public static class Stats { @AllArgsConstructor @JsonInclude(JsonInclude.Include.NON_NULL) public static class Snapshot { + /** + * Iceberg snapshot ID. Sent so the optimizer can use it as an idempotency token on upsert and + * reject out-of-order replays — server-side wiring tracked in BDP-102985. + */ + private Long snapshotId; + private String tableVersion; private String tableLocation; private Long tableSizeBytes; diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcher.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcher.java new file mode 100644 index 000000000..d205aee12 --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcher.java @@ -0,0 +1,147 @@ +package com.linkedin.openhouse.tables.services.postcommit; + +import com.linkedin.openhouse.common.metrics.MetricsConstant; +import com.linkedin.openhouse.tables.model.TableDto; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Timer; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeoutException; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.client.WebClientRequestException; +import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Mono; + +/** + * Runs {@link PostCommitOperation}s after a successful Iceberg commit. Best-effort and bounded: + * each operation gets a wall-clock timeout, errors are swallowed after metric/log, and dispatch + * itself is fire-and-forget so the commit thread is never blocked on operation work. + * + *

Why async. A synchronous post-commit hook converts a downstream outage (the optimizer + * being slow or unavailable, a network glitch) into a Tables-Service write outage — the post-commit + * push is a best-effort scheduling signal, not a write-correctness step, and its blast radius must + * not include the write path. The crash-loss window (a JVM dying after commit and before the HTTP + * push completes) is acceptable because operations are designed to be cumulative: the next commit + * carries the same state forward and the consumer self-corrects from missing data. + * + *

Why the dispatcher owns timeouts. Operations only describe payload + endpoint. The + * timeout/swallow/subscribe machinery lives here once so individual operations stay small and so + * the contract across operations is uniform (one knob: {@code + * tables.postcommit.per-op-timeout-ms}). + * + *

Bean is only wired when {@code tables.postcommit.enabled=true}. + */ +@Slf4j +@Component +@EnableConfigurationProperties(PostCommitProperties.class) +@ConditionalOnProperty(prefix = "tables.postcommit", name = "enabled", havingValue = "true") +public class PostCommitDispatcher { + + private final List operations; + private final PostCommitProperties properties; + private final MeterRegistry meterRegistry; + + public PostCommitDispatcher( + List operations, + PostCommitProperties properties, + MeterRegistry meterRegistry) { + this.operations = operations; + this.properties = properties; + this.meterRegistry = meterRegistry; + } + + /** + * Dispatch all registered operations for {@code savedDto}. Returns immediately on the calling + * thread; each operation runs on its underlying reactive scheduler. Never throws. + */ + public void dispatch(TableDto savedDto) { + for (PostCommitOperation op : operations) { + decorate(op, savedDto).ifPresent(Mono::subscribe); + } + } + + /** + * Returns the fully-decorated {@link Mono} for {@code op} (per-op timeout, success / error metric + * emission, error swallow) without subscribing. Emits the {@code skipped} or {@code + * prepare_threw} metric synchronously and returns {@link Optional#empty()} in those cases. + * + *

Package-private so tests can {@code .block()} on the decorated chain rather than polling for + * metric emission after a fire-and-forget {@code subscribe()}. + */ + Optional> decorate(PostCommitOperation op, TableDto savedDto) { + Optional> work; + try { + work = op.prepare(savedDto); + } catch (RuntimeException e) { + // Defensive: a prepare() that throws synchronously must not break dispatch of later ops. + meterRegistry + .counter( + MetricsConstant.POSTCOMMIT_OP_FAILED, "op", op.name(), "outcome", "prepare_threw") + .increment(); + log.warn("Post-commit op {} prepare() threw {}", op.name(), e.toString()); + return Optional.empty(); + } + if (!work.isPresent()) { + meterRegistry.counter(MetricsConstant.POSTCOMMIT_OP_SKIPPED, "op", op.name()).increment(); + return Optional.empty(); + } + Timer.Sample sample = Timer.start(meterRegistry); + Mono decorated = + work.get() + .timeout(Duration.ofMillis(properties.getPerOpTimeoutMs())) + .doOnSuccess( + ignored -> + sample.stop( + meterRegistry.timer( + MetricsConstant.POSTCOMMIT_OP_DURATION, + "op", + op.name(), + "outcome", + "success"))) + .onErrorResume( + e -> { + String outcome = classifyOutcome(e); + sample.stop( + meterRegistry.timer( + MetricsConstant.POSTCOMMIT_OP_DURATION, + "op", + op.name(), + "outcome", + outcome)); + meterRegistry + .counter( + MetricsConstant.POSTCOMMIT_OP_FAILED, "op", op.name(), "outcome", outcome) + .increment(); + log.warn("Post-commit op {} failed ({}): {}", op.name(), outcome, e.toString()); + return Mono.empty(); + }); + return Optional.of(decorated); + } + + /** + * Map a terminal error to a small set of outcome tags. Kept here so all operations share the same + * taxonomy. + */ + private static String classifyOutcome(Throwable e) { + if (e instanceof TimeoutException) { + return "timeout"; + } + if (e instanceof WebClientRequestException) { + return "network_error"; + } + if (e instanceof WebClientResponseException) { + int code = ((WebClientResponseException) e).getStatusCode().value(); + if (code >= 500 && code < 600) { + return "server_error"; + } + if (code >= 400 && code < 500) { + return "client_error"; + } + } + return "unknown_error"; + } +} diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitOperation.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitOperation.java new file mode 100644 index 000000000..5ba49b371 --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitOperation.java @@ -0,0 +1,39 @@ +package com.linkedin.openhouse.tables.services.postcommit; + +import com.linkedin.openhouse.tables.model.TableDto; +import java.util.Optional; +import reactor.core.publisher.Mono; + +/** + * A best-effort, bounded, asynchronous action run by the Tables Service after a successful Iceberg + * commit. Operations are invoked by {@link PostCommitDispatcher} and are subject to a single per-op + * wall-clock timeout owned by the dispatcher. Operations never affect commit correctness: any error + * they signal is recorded as a metric and a log line, then swallowed. + * + *

Implementations describe what to do; the dispatcher owns how it runs + * (subscription, timeout, error handling, metric emission). Each operation contributes: + * + *

    + *
  • {@link #name()} — short identifier used as a metric tag. + *
  • {@link #prepare(TableDto)} — returns the work to perform, or {@link Optional#empty()} when + * the operation does not apply to this commit (e.g. table not opted in, no committed + * snapshot). Returning empty is a normal outcome; the dispatcher emits a {@code skipped} + * metric and moves on. + *
+ * + *

The returned {@link Mono} must not subscribe itself, apply its own outer timeout, or swallow + * errors — those concerns belong to the dispatcher. Implementations may apply internal retries with + * bounded budgets. + */ +public interface PostCommitOperation { + + /** Short, stable identifier for this operation. Used as the {@code op} metric tag. */ + String name(); + + /** + * Build the work to run for {@code savedDto}, or return {@link Optional#empty()} if this + * operation does not apply. The dispatcher only invokes the returned {@link Mono} after applying + * its own timeout / error-swallow plumbing. + */ + Optional> prepare(TableDto savedDto); +} diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitProperties.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitProperties.java new file mode 100644 index 000000000..8a153cd79 --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitProperties.java @@ -0,0 +1,31 @@ +package com.linkedin.openhouse.tables.services.postcommit; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * Configuration for the Tables Service post-commit operation framework. Property prefix: {@code + * tables.postcommit}. + * + *

When {@code enabled} is {@code false} (the default), no dispatcher bean is constructed and + * registered {@link PostCommitOperation} beans, if any, are never invoked. Per-OH-instance roll-out + * flips a single flag. + */ +@ConfigurationProperties("tables.postcommit") +@Getter +@Setter +public class PostCommitProperties { + + /** Master switch. When {@code false}, no operations are dispatched on commit. */ + private boolean enabled = false; + + /** + * Wall-clock ceiling applied by the dispatcher to each operation's {@link + * PostCommitOperation#prepare(com.linkedin.openhouse.tables.model.TableDto)} {@code Mono}. Bounds + * how long a misbehaving operation can keep resources (connections, threads) tied up. Does not + * bound commit-thread latency — operations run on the executing reactive scheduler, not on the + * commit thread. Default 3000 ms. + */ + private long perOpTimeoutMs = 3000L; +} diff --git a/services/tables/src/main/resources/application.properties b/services/tables/src/main/resources/application.properties index ddf4c3022..24f5554c5 100644 --- a/services/tables/src/main/resources/application.properties +++ b/services/tables/src/main/resources/application.properties @@ -25,10 +25,14 @@ management.metrics.distribution.maximum-expected-value.http.server.requests=600s server.shutdown=graceful spring.lifecycle.timeout-per-shutdown-phase=60s -# Post-commit Tables -> Optimizer stats push. -# Disabled by default; environments that run the optimizer opt in via env vars. +# Post-commit operation framework. Disabled by default; flip per OH instance. +# Outer per-op wall-clock budget is enforced by PostCommitDispatcher. +tables.postcommit.enabled=${TABLES_POSTCOMMIT_ENABLED:false} +tables.postcommit.per-op-timeout-ms=${TABLES_POSTCOMMIT_PER_OP_TIMEOUT_MS:3000} + +# Optimizer stats post-commit operation (one impl of PostCommitOperation). +# Both flags must be true for the push to be wired and dispatched. optimizer.stats.enabled=${OPTIMIZER_STATS_ENABLED:false} optimizer.stats.base-uri=${OPTIMIZER_STATS_BASE_URI:} optimizer.stats.per-attempt-timeout-ms=${OPTIMIZER_STATS_PER_ATTEMPT_TIMEOUT_MS:1000} -optimizer.stats.total-timeout-ms=${OPTIMIZER_STATS_TOTAL_TIMEOUT_MS:2000} optimizer.stats.max-attempts=${OPTIMIZER_STATS_MAX_ATTEMPTS:3} \ No newline at end of file diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/service/IcebergSnapshotsServiceTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/service/IcebergSnapshotsServiceTest.java index 80f9233af..648ae1871 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/service/IcebergSnapshotsServiceTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/service/IcebergSnapshotsServiceTest.java @@ -11,14 +11,14 @@ import com.linkedin.openhouse.tables.api.spec.v0.request.components.Policies; import com.linkedin.openhouse.tables.dto.mapper.TablesMapper; import com.linkedin.openhouse.tables.dto.mapper.TablesMapperImpl; +import com.linkedin.openhouse.tables.model.CurrentSnapshotInfo; import com.linkedin.openhouse.tables.model.TableDto; import com.linkedin.openhouse.tables.model.TableDtoPrimaryKey; import com.linkedin.openhouse.tables.repository.OpenHouseInternalRepository; import com.linkedin.openhouse.tables.services.IcebergSnapshotsService; -import com.linkedin.openhouse.tables.services.optimizer.OptimizerStatsClient; +import com.linkedin.openhouse.tables.services.postcommit.PostCommitDispatcher; import com.linkedin.openhouse.tables.utils.TableUUIDGenerator; import java.util.Collections; -import java.util.Map; import java.util.Optional; import java.util.UUID; import org.apache.iceberg.exceptions.BadRequestException; @@ -49,12 +49,11 @@ public class IcebergSnapshotsServiceTest { @MockBean private TableUUIDGenerator tableUUIDGenerator; /** - * Forces the optional optimizer-stats client into the service so that we can verify the - * post-commit push is invoked. Production wiring is conditional on {@code - * optimizer.stats.enabled=true}; this {@code @MockBean} bypasses that condition for the one test - * that exercises the hook. + * Forces the optional post-commit dispatcher into the service so that we can verify it is invoked + * after a successful save. Production wiring is conditional on {@code + * tables.postcommit.enabled=true}; this {@code @MockBean} bypasses that condition. */ - @MockBean private OptimizerStatsClient optimizerStatsClient; + @MockBean private PostCommitDispatcher postCommitDispatcher; private OpenHouseInternalRepository mockRepository; @@ -102,20 +101,20 @@ public void testTableCreated() { } @Test - public void testOptimizerStatsClientInvokedAfterSuccessfulCommit() { + public void testPostCommitDispatcherInvokedAfterSuccessfulCommit() { final IcebergSnapshotsRequestBody requestBody = TEST_ICEBERG_SNAPSHOTS_INITIAL_VERSION_REQUEST_BODY; final String dbId = requestBody.getCreateUpdateTableRequestBody().getDatabaseId(); final String tableId = requestBody.getCreateUpdateTableRequestBody().getTableId(); final TableDtoPrimaryKey key = TableDtoPrimaryKey.builder().databaseId(dbId).tableId(tableId).build(); - final Map summary = Collections.singletonMap("total-data-files", "7"); - final TableDto savedDto = - TableDto.builder() - .databaseId(dbId) - .tableId(tableId) - .currentSnapshotSummary(summary) + final CurrentSnapshotInfo snapshot = + CurrentSnapshotInfo.builder() + .snapshotId(42L) + .summary(Collections.singletonMap("total-data-files", "7")) .build(); + final TableDto savedDto = + TableDto.builder().databaseId(dbId).tableId(tableId).currentSnapshot(snapshot).build(); Mockito.when(tableUUIDGenerator.generateUUID(Mockito.any(IcebergSnapshotsRequestBody.class))) .thenReturn(UUID.randomUUID()); @@ -124,7 +123,7 @@ public void testOptimizerStatsClientInvokedAfterSuccessfulCommit() { service.putIcebergSnapshots(dbId, tableId, requestBody, TEST_TABLE_CREATOR); - Mockito.verify(optimizerStatsClient, Mockito.times(1)).report(savedDto, summary); + Mockito.verify(postCommitDispatcher, Mockito.times(1)).dispatch(savedDto); } @Test diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableDtoMappingTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableDtoMappingTest.java index 64aefd19b..53fb633e3 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableDtoMappingTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableDtoMappingTest.java @@ -34,7 +34,7 @@ public class TableDtoMappingTest { "snapshotRefs", "policies", "tableType", - "currentSnapshotSummary"); + "currentSnapshot"); /** Making all fields making it to map is expected, and all expected field are making it there. */ @Test diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsClientTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperationTest.java similarity index 53% rename from services/tables/src/test/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsClientTest.java rename to services/tables/src/test/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperationTest.java index 33ea0d197..417db2abf 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsClientTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperationTest.java @@ -3,51 +3,49 @@ import static org.assertj.core.api.Assertions.assertThat; import com.fasterxml.jackson.databind.ObjectMapper; -import com.linkedin.openhouse.common.metrics.MetricsConstant; +import com.linkedin.openhouse.tables.model.CurrentSnapshotInfo; import com.linkedin.openhouse.tables.model.TableDto; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import java.io.IOException; import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.RecordedRequest; -import okhttp3.mockwebserver.SocketPolicy; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; -class OptimizerStatsClientTest { +class OptimizerStatsPostCommitOperationTest { private static final String TABLE_UUID = "uuid-1"; private static final String DB = "db1"; private static final String TABLE = "tbl1"; + private static final long SNAPSHOT_ID = 9876543210L; private static final Duration BLOCK_MAX = Duration.ofSeconds(5); private MockWebServer server; - private MeterRegistry meterRegistry; private OptimizerStatsProperties properties; - private OptimizerStatsClient client; + private OptimizerStatsPostCommitOperation op; private final ObjectMapper mapper = new ObjectMapper(); @BeforeEach void setUp() throws IOException { server = new MockWebServer(); server.start(); - meterRegistry = new SimpleMeterRegistry(); properties = new OptimizerStatsProperties(); properties.setEnabled(true); properties.setBaseUri(server.url("/").toString()); properties.setPerAttemptTimeoutMs(1000L); - properties.setTotalTimeoutMs(2000L); properties.setMaxAttempts(3); - client = new OptimizerStatsClient(buildWebClient(), meterRegistry, properties); + op = new OptimizerStatsPostCommitOperation(buildWebClient(), properties); } @AfterEach @@ -56,25 +54,31 @@ void tearDown() throws IOException { } private WebClient buildWebClient() { - HttpClient httpClient = HttpClient.create(); return WebClient.builder() .baseUrl(properties.getBaseUri()) - .clientConnector(new ReactorClientHttpConnector(httpClient)) + .clientConnector(new ReactorClientHttpConnector(HttpClient.create())) .build(); } @Test - void report_optedIn_sendsRequestWithPayloadFieldsFromSummary() throws Exception { + void name_isStableTagValue() { + assertThat(op.name()).isEqualTo("optimizer_stats"); + } + + @Test + void prepare_optedIn_emitsRequestWithFullPayload() throws Exception { server.enqueue(new MockResponse().setResponseCode(200)); TableDto saved = optedInBuilder() .tableVersion("v3") .tableLocation("/data/tables/db1/tbl1") - .currentSnapshotSummary(summary()) + .currentSnapshot(snapshot()) .build(); - client.reportAsync(saved, saved.getCurrentSnapshotSummary()).block(BLOCK_MAX); + Optional> work = op.prepare(saved); + assertThat(work).isPresent(); + work.get().block(BLOCK_MAX); RecordedRequest req = server.takeRequest(2, TimeUnit.SECONDS); assertThat(req).isNotNull(); @@ -91,6 +95,7 @@ void report_optedIn_sendsRequestWithPayloadFieldsFromSummary() throws Exception @SuppressWarnings("unchecked") Map delta = (Map) stats.get("delta"); assertThat(snapshot) + .containsEntry("snapshotId", SNAPSHOT_ID) .containsEntry("tableVersion", "v3") .containsEntry("tableLocation", "/data/tables/db1/tbl1") .containsEntry("tableSizeBytes", 4096) @@ -100,116 +105,80 @@ void report_optedIn_sendsRequestWithPayloadFieldsFromSummary() throws Exception .containsEntry("numFilesDeleted", 2) .containsEntry("addedSizeBytes", 2048) .containsEntry("deletedSizeBytes", 1024); - - assertThat( - meterRegistry - .counter(MetricsConstant.OPTIMIZER_STATS_ATTEMPTS, "outcome", "success") - .count()) - .isEqualTo(1.0); } @Test - void report_notOptedIn_skipsCallAndIncrementsSkippedCounter() { - TableDto saved = builderWithoutOptIn().currentSnapshotSummary(summary()).build(); - - client.reportAsync(saved, saved.getCurrentSnapshotSummary()).block(BLOCK_MAX); - + void prepare_notOptedIn_returnsEmpty() { + TableDto saved = builderWithoutOptIn().currentSnapshot(snapshot()).build(); + assertThat(op.prepare(saved)).isEmpty(); assertThat(server.getRequestCount()).isZero(); - assertThat( - meterRegistry - .counter(MetricsConstant.OPTIMIZER_STATS_SKIPPED, "reason", "opt_out") - .count()) - .isEqualTo(1.0); } @Test - void report_noSnapshot_skipsCallAndIncrementsSkippedCounter() { - TableDto saved = optedInBuilder().currentSnapshotSummary(null).build(); - - client.reportAsync(saved, null).block(BLOCK_MAX); - + void prepare_noSnapshot_returnsEmpty() { + TableDto saved = optedInBuilder().currentSnapshot(null).build(); + assertThat(op.prepare(saved)).isEmpty(); assertThat(server.getRequestCount()).isZero(); - assertThat( - meterRegistry - .counter(MetricsConstant.OPTIMIZER_STATS_SKIPPED, "reason", "no_snapshot") - .count()) - .isEqualTo(1.0); } @Test - void report_5xxThenSuccess_retriesAndSucceeds() { + void prepare_5xxThenSuccess_retriesAndCompletes() { server.enqueue(new MockResponse().setResponseCode(503)); server.enqueue(new MockResponse().setResponseCode(200)); - TableDto saved = optedInBuilder().currentSnapshotSummary(summary()).build(); - client.reportAsync(saved, saved.getCurrentSnapshotSummary()).block(BLOCK_MAX); + TableDto saved = optedInBuilder().currentSnapshot(snapshot()).build(); + op.prepare(saved).orElseThrow(AssertionError::new).block(BLOCK_MAX); assertThat(server.getRequestCount()).isEqualTo(2); - assertThat( - meterRegistry - .counter(MetricsConstant.OPTIMIZER_STATS_ATTEMPTS, "outcome", "success") - .count()) - .isEqualTo(1.0); - assertThat( - meterRegistry - .counter(MetricsConstant.OPTIMIZER_STATS_ATTEMPTS, "outcome", "retryable_failure") - .count()) - .isEqualTo(1.0); } @Test - void report_allAttemptsFail_swallowsAndIncrementsFailedFinal() { + void prepare_allAttemptsFail_propagatesUnderlyingError() { properties.setMaxAttempts(2); server.enqueue(new MockResponse().setResponseCode(503)); server.enqueue(new MockResponse().setResponseCode(503)); - TableDto saved = optedInBuilder().currentSnapshotSummary(summary()).build(); - client.reportAsync(saved, saved.getCurrentSnapshotSummary()).block(BLOCK_MAX); + TableDto saved = optedInBuilder().currentSnapshot(snapshot()).build(); + Mono chain = op.prepare(saved).orElseThrow(AssertionError::new); + assertThatChainErrors(chain, WebClientResponseException.class); assertThat(server.getRequestCount()).isEqualTo(2); - assertThat( - meterRegistry - .counter(MetricsConstant.OPTIMIZER_STATS_FAILED_FINAL, "outcome", "server_error") - .count()) - .isEqualTo(1.0); } @Test - void report_4xxNonRetryable_doesNotRetry() { + void prepare_4xxNonRetryable_doesNotRetry() { server.enqueue(new MockResponse().setResponseCode(400)); - TableDto saved = optedInBuilder().currentSnapshotSummary(summary()).build(); - client.reportAsync(saved, saved.getCurrentSnapshotSummary()).block(BLOCK_MAX); + TableDto saved = optedInBuilder().currentSnapshot(snapshot()).build(); + Mono chain = op.prepare(saved).orElseThrow(AssertionError::new); + assertThatChainErrors(chain, WebClientResponseException.class); assertThat(server.getRequestCount()).isEqualTo(1); - assertThat( - meterRegistry - .counter(MetricsConstant.OPTIMIZER_STATS_FAILED_FINAL, "outcome", "client_error") - .count()) - .isEqualTo(1.0); } - @Test - void report_perAttemptTimeoutTrips_recordsTimeout() { - properties.setPerAttemptTimeoutMs(150L); - properties.setTotalTimeoutMs(600L); - properties.setMaxAttempts(1); - client = new OptimizerStatsClient(buildWebClient(), meterRegistry, properties); - // Connection opens but the server never sends any response — forces Reactor's per-attempt - // .timeout(150ms) to fire with TimeoutException. - server.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.NO_RESPONSE)); + // ---- helpers ---- - TableDto saved = optedInBuilder().currentSnapshotSummary(summary()).build(); - client.reportAsync(saved, saved.getCurrentSnapshotSummary()).block(BLOCK_MAX); + private void assertThatChainErrors(Mono chain, Class errorType) { + try { + chain.block(BLOCK_MAX); + throw new AssertionError("expected chain to signal an error of type " + errorType); + } catch (RuntimeException e) { + Throwable cause = unwrap(e); + assertThat(cause).isInstanceOf(errorType); + } + } - assertThat( - meterRegistry - .counter(MetricsConstant.OPTIMIZER_STATS_FAILED_FINAL, "outcome", "timeout") - .count()) - .isEqualTo(1.0); + private static Throwable unwrap(Throwable t) { + Throwable cur = t; + while (cur.getCause() != null && cur.getCause() != cur) { + cur = cur.getCause(); + } + return cur; } - // ---- helpers ---- + private static CurrentSnapshotInfo snapshot() { + return CurrentSnapshotInfo.builder().snapshotId(SNAPSHOT_ID).summary(summary()).build(); + } private static Map summary() { Map s = new HashMap<>(); @@ -224,7 +193,7 @@ private static Map summary() { private static TableDto.TableDtoBuilder optedInBuilder() { Map props = new HashMap<>(); - props.put(OptimizerStatsClient.OPT_IN_PROPERTY, "true"); + props.put(OptimizerStatsPostCommitOperation.OPT_IN_PROPERTY, "true"); return TableDto.builder() .tableUUID(TABLE_UUID) .databaseId(DB) diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcherTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcherTest.java new file mode 100644 index 000000000..14b6f2dde --- /dev/null +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcherTest.java @@ -0,0 +1,177 @@ +package com.linkedin.openhouse.tables.services.postcommit; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.linkedin.openhouse.common.metrics.MetricsConstant; +import com.linkedin.openhouse.tables.model.TableDto; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import java.time.Duration; +import java.util.Arrays; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; + +class PostCommitDispatcherTest { + + private static final Duration BLOCK_MAX = Duration.ofSeconds(5); + + private MeterRegistry meterRegistry; + private PostCommitProperties properties; + private final TableDto savedDto = TableDto.builder().databaseId("db").tableId("t").build(); + + @BeforeEach + void setUp() { + meterRegistry = new SimpleMeterRegistry(); + properties = new PostCommitProperties(); + properties.setEnabled(true); + properties.setPerOpTimeoutMs(500L); + } + + private PostCommitDispatcher dispatcherWith(PostCommitOperation... ops) { + return new PostCommitDispatcher(Arrays.asList(ops), properties, meterRegistry); + } + + @Test + void dispatch_invokesEveryRegisteredOperationsPrepare() { + AtomicInteger aCount = new AtomicInteger(); + AtomicInteger bCount = new AtomicInteger(); + dispatcherWith(new EmptyOp("a", aCount), new EmptyOp("b", bCount)).dispatch(savedDto); + + assertThat(aCount.get()).isEqualTo(1); + assertThat(bCount.get()).isEqualTo(1); + } + + @Test + void decorate_emptyPrepare_incrementsSkippedAndReturnsEmpty() { + PostCommitOperation op = new EmptyOp("opx", new AtomicInteger()); + + Optional> decorated = dispatcherWith(op).decorate(op, savedDto); + + assertThat(decorated).isEmpty(); + assertThat(meterRegistry.counter(MetricsConstant.POSTCOMMIT_OP_SKIPPED, "op", "opx").count()) + .isEqualTo(1.0); + } + + @Test + void decorate_successfulWork_recordsDurationWithSuccessOutcome() { + PostCommitOperation op = new SimpleOp("ok", Mono::empty); + + dispatcherWith(op).decorate(op, savedDto).orElseThrow(AssertionError::new).block(BLOCK_MAX); + + assertThat( + meterRegistry + .timer(MetricsConstant.POSTCOMMIT_OP_DURATION, "op", "ok", "outcome", "success") + .count()) + .isEqualTo(1L); + } + + @Test + void decorate_workSignalsError_incrementsFailedWithClassifiedOutcomeAndSwallowsError() { + RuntimeException boom = new RuntimeException("boom"); + PostCommitOperation op = new SimpleOp("broke", () -> Mono.error(boom)); + + // Dispatcher swallows the error — block() returns normally. + dispatcherWith(op).decorate(op, savedDto).orElseThrow(AssertionError::new).block(BLOCK_MAX); + + assertThat( + meterRegistry + .counter( + MetricsConstant.POSTCOMMIT_OP_FAILED, "op", "broke", "outcome", "unknown_error") + .count()) + .isEqualTo(1.0); + } + + @Test + void decorate_prepareThrowsSynchronously_incrementsFailedAndDispatchContinuesToLaterOps() { + AtomicInteger laterCount = new AtomicInteger(); + PostCommitOperation thrower = + new PostCommitOperation() { + @Override + public String name() { + return "thrower"; + } + + @Override + public Optional> prepare(TableDto dto) { + throw new IllegalStateException("nope"); + } + }; + PostCommitOperation later = new EmptyOp("later", laterCount); + + dispatcherWith(thrower, later).dispatch(savedDto); + + assertThat( + meterRegistry + .counter( + MetricsConstant.POSTCOMMIT_OP_FAILED, + "op", + "thrower", + "outcome", + "prepare_threw") + .count()) + .isEqualTo(1.0); + assertThat(laterCount.get()) + .as("later operations must still run after a synchronous prepare() throw") + .isEqualTo(1); + } + + @Test + void decorate_workExceedsPerOpTimeout_incrementsFailedWithTimeoutOutcome() { + properties.setPerOpTimeoutMs(50L); + PostCommitOperation op = new SimpleOp("slow", Mono::never); + + dispatcherWith(op).decorate(op, savedDto).orElseThrow(AssertionError::new).block(BLOCK_MAX); + + assertThat( + meterRegistry + .counter(MetricsConstant.POSTCOMMIT_OP_FAILED, "op", "slow", "outcome", "timeout") + .count()) + .isEqualTo(1.0); + } + + // ---- helpers ---- + + private static class EmptyOp implements PostCommitOperation { + private final String name; + private final AtomicInteger calls; + + EmptyOp(String name, AtomicInteger calls) { + this.name = name; + this.calls = calls; + } + + @Override + public String name() { + return name; + } + + @Override + public Optional> prepare(TableDto dto) { + calls.incrementAndGet(); + return Optional.empty(); + } + } + + private static class SimpleOp implements PostCommitOperation { + private final String name; + private final java.util.function.Supplier> work; + + SimpleOp(String name, java.util.function.Supplier> work) { + this.name = name; + this.work = work; + } + + @Override + public String name() { + return name; + } + + @Override + public Optional> prepare(TableDto dto) { + return Optional.of(work.get()); + } + } +} From 119c609f9cd36a07bdcd8a2be2992d03e3e332dc Mon Sep 17 00:00:00 2001 From: mkuchenbecker Date: Wed, 27 May 2026 14:49:25 -0700 Subject: [PATCH 3/5] docs: collapse

/

    // noise in postcommit javadoc Per review feedback that the HTML tags made the diff ugly. Drop paragraph breaks where the comment can be one sentence/paragraph, drop bullet lists in favor of inline prose, drop / emphasis entirely. No behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../tables/model/CurrentSnapshotInfo.java | 12 +++---- .../openhouse/tables/model/TableDto.java | 3 +- .../optimizer/OptimizerStatsConfig.java | 17 +++++----- .../OptimizerStatsPostCommitOperation.java | 20 ++++------- .../optimizer/OptimizerStatsProperties.java | 7 ++-- .../optimizer/OptimizerStatsRequest.java | 7 ++-- .../postcommit/PostCommitDispatcher.java | 33 ++++++------------- .../postcommit/PostCommitOperation.java | 28 ++++------------ .../postcommit/PostCommitProperties.java | 19 ++++------- 9 files changed, 50 insertions(+), 96 deletions(-) diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/model/CurrentSnapshotInfo.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/model/CurrentSnapshotInfo.java index 9f363b4ec..74644ac3f 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/model/CurrentSnapshotInfo.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/model/CurrentSnapshotInfo.java @@ -7,13 +7,11 @@ import lombok.Getter; /** - * In-memory snapshot of the Iceberg current-snapshot metadata that was loaded alongside a {@link - * TableDto}. Carries only fields already materialized by the catalog client — never triggers a - * separate HDFS / object-store read. - * - *

    Present whenever the table has at least one committed snapshot at construction time; absent - * (modeled as {@link java.util.Optional#empty()} on {@link TableDto}) when the table has no - * committed data — e.g. a {@code CREATE TABLE} with no rows yet. + * In-memory snapshot of the Iceberg current-snapshot metadata loaded alongside a {@link TableDto}. + * Carries only fields already materialized by the catalog client — never triggers a separate HDFS + * or object-store read. Present whenever the table has at least one committed snapshot at + * construction time; absent (modeled as {@link java.util.Optional#empty()} on {@link TableDto}) for + * tables with no committed data, e.g. a {@code CREATE TABLE} with no rows yet. */ @Getter @Builder diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/model/TableDto.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/model/TableDto.java index 16608f286..bd44653e7 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/model/TableDto.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/model/TableDto.java @@ -89,8 +89,7 @@ public class TableDto { * In-memory current-snapshot metadata captured when this {@code TableDto} was built from an * Iceberg {@code Table}. Present whenever the underlying table has at least one committed * snapshot at that point; absent for tables with no committed data (e.g. {@code CREATE TABLE} - * with no rows). Not persisted, not part of equality. Stored nullable internally; consumers must - * read through {@link #getCurrentSnapshot()} to get the {@link Optional}. + * with no rows). Not persisted, not part of equality. Read through {@link #getCurrentSnapshot()}. */ @Getter(AccessLevel.NONE) @Transient diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsConfig.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsConfig.java index 8061fb644..351ac604d 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsConfig.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsConfig.java @@ -9,9 +9,9 @@ import reactor.netty.http.client.HttpClient; /** - * Wiring for the post-commit Tables → Optimizer stats push. Only active when {@code - * optimizer.stats.enabled=true}; otherwise no WebClient or client bean is constructed and the - * on-commit hook in {@code IcebergSnapshotsServiceImpl} is a no-op. + * Wiring for the post-commit Tables → Optimizer stats push. Active only when {@code + * optimizer.stats.enabled=true}; otherwise no WebClient bean is constructed and the dispatcher sees + * no optimizer-stats operation. */ @Configuration @EnableConfigurationProperties(OptimizerStatsProperties.class) @@ -19,12 +19,11 @@ public class OptimizerStatsConfig { /** - * Dedicated WebClient for the optimizer stats endpoint. Per-attempt and outer timeouts are - * applied at the call site on the Reactor chain (per-attempt in {@link - * OptimizerStatsPostCommitOperation}, outer per-op in the {@code PostCommitDispatcher}) — they - * are not configured on the underlying Netty client so that the timeout always emerges as a - * standard {@link java.util.concurrent.TimeoutException} (not a Netty {@code - * ReadTimeoutException}), which keeps the dispatcher's outcome classification simple. + * Dedicated WebClient for the optimizer stats endpoint. Per-attempt timeout is applied on the + * Reactor chain in {@link OptimizerStatsPostCommitOperation} and the outer per-op timeout in + * {@code PostCommitDispatcher}; neither is configured on the Netty client so the timeout always + * emerges as a standard {@link java.util.concurrent.TimeoutException} rather than a Netty {@code + * ReadTimeoutException}, keeping the dispatcher's outcome classification simple. */ @Bean("optimizerStatsWebClient") public WebClient optimizerStatsWebClient(OptimizerStatsProperties properties) { diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperation.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperation.java index 0a82d8598..eb858dece 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperation.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperation.java @@ -22,19 +22,13 @@ * {@link PostCommitOperation} that PUTs a snapshot-stats record to the optimizer's per-table stats * endpoint. {@link #prepare(TableDto)} returns a {@link Mono} that completes on HTTP 2xx and * signals an error otherwise; the dispatcher owns timeout, subscription, error swallowing, and - * metric emission. - * - *

    Skipped (operation returns {@link Optional#empty()}) when the table is not opted in via the - * {@link #OPT_IN_PROPERTY} table property, or when no current snapshot is present (e.g. {@code - * CREATE TABLE} with no rows yet). - * - *

    Internal retry: bounded by {@link OptimizerStatsProperties#getMaxAttempts()}, fires only on - * retryable errors (network, {@link TimeoutException}, HTTP 408 / 429 / 5xx). The dispatcher's - * outer per-op timeout is the hard ceiling on the whole chain. - * - *

    Bean is only wired when {@code optimizer.stats.enabled=true}. The path constant is - * intentionally duplicated from {@code TableStatsController.TABLE_PATH_TEMPLATE}; keep in sync. - * Tables service does not take a compile-time dependency on the optimizer service jar. + * metric emission. Returns {@link Optional#empty()} when the table is not opted in via {@link + * #OPT_IN_PROPERTY} or has no current snapshot. Internal retry is bounded by {@link + * OptimizerStatsProperties#getMaxAttempts()} and fires only on retryable errors (network, {@link + * TimeoutException}, HTTP 408 / 429 / 5xx); the dispatcher's per-op timeout is the hard ceiling. + * Bean wired only when {@code optimizer.stats.enabled=true}. Path constant is intentionally + * duplicated from {@code TableStatsController.TABLE_PATH_TEMPLATE} (keep in sync) so the tables + * service does not take a compile-time dependency on the optimizer jar. */ @Slf4j @Component diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsProperties.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsProperties.java index 53b1844be..ca2b78a2d 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsProperties.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsProperties.java @@ -6,10 +6,9 @@ /** * Configuration for the post-commit stats push from the Tables Service to the Optimizer's stats - * endpoint. Property prefix: {@code optimizer.stats}. - * - *

    When {@code enabled} is false (the default), no client bean is constructed and the push is a - * no-op on every commit. Environments that want the feed turn it on and set {@link #baseUri}. + * endpoint. Property prefix {@code optimizer.stats}. When {@code enabled} is false (the default), + * no client bean is constructed and the operation is absent from the dispatcher. Environments that + * want the feed turn it on and set {@link #baseUri}. */ @ConfigurationProperties("optimizer.stats") @Getter diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsRequest.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsRequest.java index df929f666..2496436d3 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsRequest.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsRequest.java @@ -9,11 +9,8 @@ /** * Wire body for {@code PUT /v1/optimizer/stats/{tableUuid}}. Mirrors the optimizer's {@code - * UpsertTableStatsRequest} field-for-field — see {@code - * services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/UpsertTableStatsRequest.java}. - * - *

    Tables service owns its own copy so that the wire contract is explicit at the call site and - * the optimizer client jar is not a compile-time dependency. + * UpsertTableStatsRequest} field-for-field. Duplicated here so the tables service does not take a + * compile-time dependency on the optimizer jar; keep in sync with that type. */ @Data @Builder diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcher.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcher.java index d205aee12..44c8a166b 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcher.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcher.java @@ -17,23 +17,12 @@ import reactor.core.publisher.Mono; /** - * Runs {@link PostCommitOperation}s after a successful Iceberg commit. Best-effort and bounded: - * each operation gets a wall-clock timeout, errors are swallowed after metric/log, and dispatch - * itself is fire-and-forget so the commit thread is never blocked on operation work. - * - *

    Why async. A synchronous post-commit hook converts a downstream outage (the optimizer - * being slow or unavailable, a network glitch) into a Tables-Service write outage — the post-commit - * push is a best-effort scheduling signal, not a write-correctness step, and its blast radius must - * not include the write path. The crash-loss window (a JVM dying after commit and before the HTTP - * push completes) is acceptable because operations are designed to be cumulative: the next commit - * carries the same state forward and the consumer self-corrects from missing data. - * - *

    Why the dispatcher owns timeouts. Operations only describe payload + endpoint. The - * timeout/swallow/subscribe machinery lives here once so individual operations stay small and so - * the contract across operations is uniform (one knob: {@code - * tables.postcommit.per-op-timeout-ms}). - * - *

    Bean is only wired when {@code tables.postcommit.enabled=true}. + * Runs {@link PostCommitOperation}s after a successful Iceberg commit. Each operation gets a + * wall-clock timeout ({@code tables.postcommit.per-op-timeout-ms}), errors are swallowed after + * metric/log, and dispatch is fire-and-forget so the commit thread is never blocked. Operations + * describe payload and endpoint only — the timeout/swallow/subscribe machinery lives here so the + * contract across operations is uniform. Bean wired only when {@code + * tables.postcommit.enabled=true}. */ @Slf4j @Component @@ -65,12 +54,10 @@ public void dispatch(TableDto savedDto) { } /** - * Returns the fully-decorated {@link Mono} for {@code op} (per-op timeout, success / error metric - * emission, error swallow) without subscribing. Emits the {@code skipped} or {@code - * prepare_threw} metric synchronously and returns {@link Optional#empty()} in those cases. - * - *

    Package-private so tests can {@code .block()} on the decorated chain rather than polling for - * metric emission after a fire-and-forget {@code subscribe()}. + * Returns the fully-decorated {@link Mono} for {@code op} (per-op timeout, success/error metric + * emission, error swallow) without subscribing. Emits {@code skipped} or {@code prepare_threw} + * synchronously and returns {@link Optional#empty()} in those cases. Package-private so tests can + * {@code .block()} on the chain rather than poll for metric emission. */ Optional> decorate(PostCommitOperation op, TableDto savedDto) { Optional> work; diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitOperation.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitOperation.java index 5ba49b371..b94804b34 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitOperation.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitOperation.java @@ -6,24 +6,10 @@ /** * A best-effort, bounded, asynchronous action run by the Tables Service after a successful Iceberg - * commit. Operations are invoked by {@link PostCommitDispatcher} and are subject to a single per-op - * wall-clock timeout owned by the dispatcher. Operations never affect commit correctness: any error - * they signal is recorded as a metric and a log line, then swallowed. - * - *

    Implementations describe what to do; the dispatcher owns how it runs - * (subscription, timeout, error handling, metric emission). Each operation contributes: - * - *

      - *
    • {@link #name()} — short identifier used as a metric tag. - *
    • {@link #prepare(TableDto)} — returns the work to perform, or {@link Optional#empty()} when - * the operation does not apply to this commit (e.g. table not opted in, no committed - * snapshot). Returning empty is a normal outcome; the dispatcher emits a {@code skipped} - * metric and moves on. - *
    - * - *

    The returned {@link Mono} must not subscribe itself, apply its own outer timeout, or swallow - * errors — those concerns belong to the dispatcher. Implementations may apply internal retries with - * bounded budgets. + * commit. Operations are invoked by {@link PostCommitDispatcher}, which owns the per-op timeout, + * subscription, error swallowing, and metric emission. An error from prepare() is recorded and + * dropped — it never affects commit correctness. Implementations may apply internal retries with + * bounded budgets but must not subscribe themselves or apply an outer timeout. */ public interface PostCommitOperation { @@ -31,9 +17,9 @@ public interface PostCommitOperation { String name(); /** - * Build the work to run for {@code savedDto}, or return {@link Optional#empty()} if this - * operation does not apply. The dispatcher only invokes the returned {@link Mono} after applying - * its own timeout / error-swallow plumbing. + * Build the work to run for {@code savedDto}, or {@link Optional#empty()} when the operation does + * not apply (e.g. table not opted in, no committed snapshot). The dispatcher records empty as a + * {@code skipped} metric. */ Optional> prepare(TableDto savedDto); } diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitProperties.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitProperties.java index 8a153cd79..bc757d37f 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitProperties.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitProperties.java @@ -5,27 +5,22 @@ import org.springframework.boot.context.properties.ConfigurationProperties; /** - * Configuration for the Tables Service post-commit operation framework. Property prefix: {@code - * tables.postcommit}. - * - *

    When {@code enabled} is {@code false} (the default), no dispatcher bean is constructed and - * registered {@link PostCommitOperation} beans, if any, are never invoked. Per-OH-instance roll-out - * flips a single flag. + * Configuration for the Tables Service post-commit operation framework. Property prefix {@code + * tables.postcommit}. When {@code enabled} is false (the default), no dispatcher bean is + * constructed and registered {@link PostCommitOperation} beans are never invoked. */ @ConfigurationProperties("tables.postcommit") @Getter @Setter public class PostCommitProperties { - /** Master switch. When {@code false}, no operations are dispatched on commit. */ + /** Master switch. When false, no operations are dispatched on commit. */ private boolean enabled = false; /** - * Wall-clock ceiling applied by the dispatcher to each operation's {@link - * PostCommitOperation#prepare(com.linkedin.openhouse.tables.model.TableDto)} {@code Mono}. Bounds - * how long a misbehaving operation can keep resources (connections, threads) tied up. Does not - * bound commit-thread latency — operations run on the executing reactive scheduler, not on the - * commit thread. Default 3000 ms. + * Wall-clock ceiling applied by the dispatcher to each operation's prepared {@code Mono}. Bounds + * resource occupancy (connections, threads) but does not block the commit thread. Default 3000 + * ms. */ private long perOpTimeoutMs = 3000L; } From 1c7fbd79cdf28c511ac90d8122f7a78ba3241725 Mon Sep 17 00:00:00 2001 From: mkuchenbecker Date: Wed, 27 May 2026 14:57:49 -0700 Subject: [PATCH 4/5] docs: switch to // comments with line breaks, drop block javadoc Spotless's googlejavaformat rewrites multi-paragraph block javadoc to re-insert

    tags. // comments are left untouched. Per review feedback that the source should read like prose with paragraph breaks, not like a webpage, swap all multi-paragraph descriptions on the new files to // blocks. Single-line descriptors that were javadoc become a single // line above the declaration. Complete sentences throughout. No behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../tables/model/CurrentSnapshotInfo.java | 25 +++++---- .../openhouse/tables/model/TableDto.java | 15 +++--- .../optimizer/OptimizerStatsConfig.java | 25 ++++----- .../OptimizerStatsPostCommitOperation.java | 53 ++++++++++--------- .../optimizer/OptimizerStatsProperties.java | 36 ++++++------- .../optimizer/OptimizerStatsRequest.java | 26 ++++----- .../postcommit/PostCommitDispatcher.java | 46 ++++++++-------- .../postcommit/PostCommitOperation.java | 27 +++++----- .../postcommit/PostCommitProperties.java | 23 ++++---- 9 files changed, 136 insertions(+), 140 deletions(-) diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/model/CurrentSnapshotInfo.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/model/CurrentSnapshotInfo.java index 74644ac3f..90437d418 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/model/CurrentSnapshotInfo.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/model/CurrentSnapshotInfo.java @@ -6,26 +6,25 @@ import lombok.EqualsAndHashCode; import lombok.Getter; -/** - * In-memory snapshot of the Iceberg current-snapshot metadata loaded alongside a {@link TableDto}. - * Carries only fields already materialized by the catalog client — never triggers a separate HDFS - * or object-store read. Present whenever the table has at least one committed snapshot at - * construction time; absent (modeled as {@link java.util.Optional#empty()} on {@link TableDto}) for - * tables with no committed data, e.g. a {@code CREATE TABLE} with no rows yet. - */ +// In-memory snapshot of the Iceberg current-snapshot metadata that was loaded alongside a +// TableDto. +// +// This carries only fields already materialized by the catalog client. Constructing it never +// triggers a separate HDFS or object-store read. +// +// The value is present whenever the underlying table has at least one committed snapshot at +// construction time. It is absent (modeled as Optional.empty() on TableDto) for tables with no +// committed data, such as a CREATE TABLE with no rows yet. @Getter @Builder @AllArgsConstructor @EqualsAndHashCode public class CurrentSnapshotInfo { - /** Iceberg snapshot ID (decimal long). Stable per commit; usable as an idempotency token. */ + // Iceberg snapshot ID. Stable per commit; usable as an idempotency token. private final long snapshotId; - /** - * Iceberg {@code Snapshot.summary()} map, unmodified. Keys include {@code total-data-files}, - * {@code total-files-size}, {@code added-data-files}, {@code deleted-data-files}, {@code - * added-files-size}, {@code removed-files-size}. - */ + // Iceberg Snapshot.summary() map, unmodified. Keys include total-data-files, total-files-size, + // added-data-files, deleted-data-files, added-files-size, removed-files-size. private final Map summary; } diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/model/TableDto.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/model/TableDto.java index bd44653e7..59dc2e80f 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/model/TableDto.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/model/TableDto.java @@ -85,18 +85,19 @@ public class TableDto { private boolean replaceCommit; - /** - * In-memory current-snapshot metadata captured when this {@code TableDto} was built from an - * Iceberg {@code Table}. Present whenever the underlying table has at least one committed - * snapshot at that point; absent for tables with no committed data (e.g. {@code CREATE TABLE} - * with no rows). Not persisted, not part of equality. Read through {@link #getCurrentSnapshot()}. - */ + // In-memory current-snapshot metadata captured when this TableDto was built from an Iceberg + // Table. + // + // Present whenever the underlying table has at least one committed snapshot at that point. + // Absent for tables with no committed data, such as a CREATE TABLE with no rows yet. + // + // Not persisted and not part of equality. Read through getCurrentSnapshot(). @Getter(AccessLevel.NONE) @Transient @EqualsAndHashCode.Exclude private CurrentSnapshotInfo currentSnapshot; - /** Returns the current-snapshot metadata if any, else {@link Optional#empty()}. */ + // Returns the current-snapshot metadata if any, else Optional.empty(). public Optional getCurrentSnapshot() { return Optional.ofNullable(currentSnapshot); } diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsConfig.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsConfig.java index 351ac604d..47613f98c 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsConfig.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsConfig.java @@ -8,23 +8,24 @@ import org.springframework.web.reactive.function.client.WebClient; import reactor.netty.http.client.HttpClient; -/** - * Wiring for the post-commit Tables → Optimizer stats push. Active only when {@code - * optimizer.stats.enabled=true}; otherwise no WebClient bean is constructed and the dispatcher sees - * no optimizer-stats operation. - */ +// Wiring for the post-commit Tables-to-Optimizer stats push. +// +// This configuration is active only when optimizer.stats.enabled=true. When the flag is false, +// no WebClient bean is constructed, and the dispatcher sees no optimizer-stats operation. @Configuration @EnableConfigurationProperties(OptimizerStatsProperties.class) @ConditionalOnProperty(prefix = "optimizer.stats", name = "enabled", havingValue = "true") public class OptimizerStatsConfig { - /** - * Dedicated WebClient for the optimizer stats endpoint. Per-attempt timeout is applied on the - * Reactor chain in {@link OptimizerStatsPostCommitOperation} and the outer per-op timeout in - * {@code PostCommitDispatcher}; neither is configured on the Netty client so the timeout always - * emerges as a standard {@link java.util.concurrent.TimeoutException} rather than a Netty {@code - * ReadTimeoutException}, keeping the dispatcher's outcome classification simple. - */ + // Dedicated WebClient for the optimizer stats endpoint. + // + // The per-attempt timeout is applied on the Reactor chain in OptimizerStatsPostCommitOperation, + // and the outer per-op timeout is applied by PostCommitDispatcher. Neither timeout is + // configured on the underlying Netty client. + // + // This arrangement ensures that any timeout always emerges as a standard + // java.util.concurrent.TimeoutException rather than a Netty ReadTimeoutException, which keeps + // the dispatcher's outcome classification simple. @Bean("optimizerStatsWebClient") public WebClient optimizerStatsWebClient(OptimizerStatsProperties properties) { return WebClient.builder() diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperation.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperation.java index eb858dece..756da2d10 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperation.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperation.java @@ -18,38 +18,38 @@ import reactor.util.retry.Retry; import reactor.util.retry.RetrySpec; -/** - * {@link PostCommitOperation} that PUTs a snapshot-stats record to the optimizer's per-table stats - * endpoint. {@link #prepare(TableDto)} returns a {@link Mono} that completes on HTTP 2xx and - * signals an error otherwise; the dispatcher owns timeout, subscription, error swallowing, and - * metric emission. Returns {@link Optional#empty()} when the table is not opted in via {@link - * #OPT_IN_PROPERTY} or has no current snapshot. Internal retry is bounded by {@link - * OptimizerStatsProperties#getMaxAttempts()} and fires only on retryable errors (network, {@link - * TimeoutException}, HTTP 408 / 429 / 5xx); the dispatcher's per-op timeout is the hard ceiling. - * Bean wired only when {@code optimizer.stats.enabled=true}. Path constant is intentionally - * duplicated from {@code TableStatsController.TABLE_PATH_TEMPLATE} (keep in sync) so the tables - * service does not take a compile-time dependency on the optimizer jar. - */ +// A PostCommitOperation that PUTs a snapshot-stats record to the optimizer's per-table stats +// endpoint. +// +// prepare() returns a Mono that completes on HTTP 2xx and signals an error otherwise. The +// dispatcher owns the timeout, the subscription, error swallowing, and metric emission. The +// returned value is Optional.empty() when the table is not opted in via OPT_IN_PROPERTY or has +// no current snapshot. +// +// Internal retry is bounded by OptimizerStatsProperties.maxAttempts and fires only on retryable +// errors: network failures, a TimeoutException, or HTTP 408 / 429 / 5xx responses. The +// dispatcher's per-op timeout is the hard ceiling on the whole chain. +// +// The bean is wired only when optimizer.stats.enabled=true. PATH_TEMPLATE is intentionally +// duplicated from TableStatsController.TABLE_PATH_TEMPLATE so the tables service does not take a +// compile-time dependency on the optimizer jar. Keep both copies in sync. @Slf4j @Component @ConditionalOnProperty(prefix = "optimizer.stats", name = "enabled", havingValue = "true") public class OptimizerStatsPostCommitOperation implements PostCommitOperation { - /** Metric tag value for {@code op}. */ + // Metric tag value for the "op" tag. static final String OP_NAME = "optimizer_stats"; - /** - * Per-call URL path. Intentionally duplicated from {@code - * TableStatsController.TABLE_PATH_TEMPLATE}; keep in sync. - */ + // Per-call URL path. Intentionally duplicated from TableStatsController.TABLE_PATH_TEMPLATE so + // that we avoid a compile-time dependency on the optimizer jar. Keep both copies in sync. static final String PATH_TEMPLATE = "/v1/optimizer/stats/{tableUuid}"; - /** Table-property key that opts a table in for the post-commit stats push. */ + // Table-property key that opts a table in for the post-commit stats push. static final String OPT_IN_PROPERTY = "maintenance.optimizer.stats.enabled"; - /** Iceberg snapshot-summary keys we read. All values are decimal-string longs. */ + // Iceberg snapshot-summary keys we read. All values are decimal-string longs. static final String SUMMARY_TOTAL_DATA_FILES = "total-data-files"; - static final String SUMMARY_TOTAL_FILES_SIZE = "total-files-size"; static final String SUMMARY_ADDED_DATA_FILES = "added-data-files"; static final String SUMMARY_DELETED_DATA_FILES = "deleted-data-files"; @@ -100,13 +100,14 @@ public Optional> prepare(TableDto savedDto) { return Optional.of(chain); } - /** {@code true} iff the table's properties contain the literal opt-in value {@code "true"}. */ + // Returns true when the table's properties contain the literal opt-in value "true" for the + // configured OPT_IN_PROPERTY. private boolean isOptedIn(TableDto saved) { Map props = saved.getTableProperties(); return props != null && "true".equals(props.get(OPT_IN_PROPERTY)); } - /** Build the wire body. Missing summary keys default to 0L. */ + // Builds the wire body. Missing summary keys default to 0L. OptimizerStatsRequest buildRequest(TableDto saved, CurrentSnapshotInfo snapshot) { Map summary = snapshot.getSummary(); OptimizerStatsRequest.Snapshot snapshotPayload = @@ -132,10 +133,10 @@ OptimizerStatsRequest buildRequest(TableDto saved, CurrentSnapshotInfo snapshot) .build(); } - /** - * Retryable errors: per-attempt timeout, network-level failures, 5xx, 408, 429. Other 4xx are - * client errors — retrying won't fix them. - */ + // Returns true for errors that a later attempt could plausibly succeed against: a per-attempt + // timeout, a network-level failure, or an HTTP 5xx / 408 / 429 response. + // + // Other 4xx responses are client errors that retries cannot fix, so we fail fast on them. boolean isRetryable(Throwable e) { if (e instanceof TimeoutException) { return true; diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsProperties.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsProperties.java index ca2b78a2d..877ce3423 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsProperties.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsProperties.java @@ -4,36 +4,32 @@ import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; -/** - * Configuration for the post-commit stats push from the Tables Service to the Optimizer's stats - * endpoint. Property prefix {@code optimizer.stats}. When {@code enabled} is false (the default), - * no client bean is constructed and the operation is absent from the dispatcher. Environments that - * want the feed turn it on and set {@link #baseUri}. - */ +// Configuration for the post-commit stats push from the Tables Service to the Optimizer's stats +// endpoint. Property prefix is optimizer.stats. +// +// When enabled is false (the default), no client bean is constructed, and the operation is +// absent from the dispatcher. Environments that want the feed turn it on and set baseUri. @ConfigurationProperties("optimizer.stats") @Getter @Setter public class OptimizerStatsProperties { - /** Master switch. When {@code false}, no HTTP push is attempted and no client bean is wired. */ + // Master switch. When false, no HTTP push is attempted and no client bean is wired. private boolean enabled = false; - /** - * Base URI of the Optimizer service. Path {@code /v1/optimizer/stats/{tableUuid}} is appended at - * call time. No default — required when {@link #enabled} is {@code true}. - */ + // Base URI of the Optimizer service. The path /v1/optimizer/stats/{tableUuid} is appended at + // call time. No default; required when enabled is true. private String baseUri; - /** - * Per-attempt HTTP timeout in milliseconds. Bounds each individual attempt so retries can fit - * inside the dispatcher's outer per-op budget ({@code tables.postcommit.per-op-timeout-ms}, - * default 3000 ms). Default 1000 ms. - */ + // Per-attempt HTTP timeout in milliseconds. + // + // Bounds each individual attempt so retries can fit inside the dispatcher's outer per-op budget + // (tables.postcommit.per-op-timeout-ms, default 3000 ms). Default is 1000 ms. private long perAttemptTimeoutMs = 1000L; - /** - * Total attempt count (initial try plus retries). Default 3. Retries fire only on retryable - * errors (network, timeout, 408/429/5xx); other 4xx fail fast without retry. - */ + // Total attempt count (the initial try plus retries). Default is 3. + // + // Retries fire only on retryable errors: network failures, a timeout, or an HTTP 408 / 429 / + // 5xx response. Other 4xx responses fail fast without retry. private int maxAttempts = 3; } diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsRequest.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsRequest.java index 2496436d3..8239184ec 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsRequest.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsRequest.java @@ -7,11 +7,11 @@ import lombok.Data; import lombok.NoArgsConstructor; -/** - * Wire body for {@code PUT /v1/optimizer/stats/{tableUuid}}. Mirrors the optimizer's {@code - * UpsertTableStatsRequest} field-for-field. Duplicated here so the tables service does not take a - * compile-time dependency on the optimizer jar; keep in sync with that type. - */ +// Wire body for PUT /v1/optimizer/stats/{tableUuid}. +// +// This mirrors the optimizer's UpsertTableStatsRequest field-for-field. The type is duplicated +// here so that the tables service does not take a compile-time dependency on the optimizer jar. +// Keep both copies in sync. @Data @Builder @NoArgsConstructor @@ -34,20 +34,16 @@ public static class Stats { private Delta delta; } - /** - * Point-in-time snapshot metrics. Map to optimizer's {@code - * TableStatsPayload.SnapshotMetricsDto}. - */ + // Point-in-time snapshot metrics. Maps to the optimizer's + // TableStatsPayload.SnapshotMetricsDto. @Data @Builder @NoArgsConstructor @AllArgsConstructor @JsonInclude(JsonInclude.Include.NON_NULL) public static class Snapshot { - /** - * Iceberg snapshot ID. Sent so the optimizer can use it as an idempotency token on upsert and - * reject out-of-order replays — server-side wiring tracked in BDP-102985. - */ + // Iceberg snapshot ID. Sent so the optimizer can use it as an idempotency token on upsert and + // reject out-of-order replays. Server-side wiring is tracked in BDP-102985. private Long snapshotId; private String tableVersion; @@ -56,9 +52,7 @@ public static class Snapshot { private Long numCurrentFiles; } - /** - * Per-commit incremental counters. Map to optimizer's {@code TableStatsPayload.CommitDeltaDto}. - */ + // Per-commit incremental counters. Maps to the optimizer's TableStatsPayload.CommitDeltaDto. @Data @Builder @NoArgsConstructor diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcher.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcher.java index 44c8a166b..5a04e2df7 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcher.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcher.java @@ -16,14 +16,16 @@ import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Mono; -/** - * Runs {@link PostCommitOperation}s after a successful Iceberg commit. Each operation gets a - * wall-clock timeout ({@code tables.postcommit.per-op-timeout-ms}), errors are swallowed after - * metric/log, and dispatch is fire-and-forget so the commit thread is never blocked. Operations - * describe payload and endpoint only — the timeout/swallow/subscribe machinery lives here so the - * contract across operations is uniform. Bean wired only when {@code - * tables.postcommit.enabled=true}. - */ +// Runs PostCommitOperations after a successful Iceberg commit. +// +// Each operation receives a wall-clock timeout from tables.postcommit.per-op-timeout-ms. Any +// error the operation signals is recorded as a metric and a log line, then swallowed. Dispatch +// is fire-and-forget, so the commit thread is never blocked on operation work. +// +// Operations describe payload and endpoint only. The timeout, error swallowing, subscription, +// and metric emission all live here, so the contract across operations stays uniform. +// +// The bean is constructed only when tables.postcommit.enabled=true. @Slf4j @Component @EnableConfigurationProperties(PostCommitProperties.class) @@ -43,22 +45,24 @@ public PostCommitDispatcher( this.meterRegistry = meterRegistry; } - /** - * Dispatch all registered operations for {@code savedDto}. Returns immediately on the calling - * thread; each operation runs on its underlying reactive scheduler. Never throws. - */ + // Dispatches all registered operations for savedDto. + // + // Returns immediately on the calling thread. Each operation runs on its underlying reactive + // scheduler. This method never throws. public void dispatch(TableDto savedDto) { for (PostCommitOperation op : operations) { decorate(op, savedDto).ifPresent(Mono::subscribe); } } - /** - * Returns the fully-decorated {@link Mono} for {@code op} (per-op timeout, success/error metric - * emission, error swallow) without subscribing. Emits {@code skipped} or {@code prepare_threw} - * synchronously and returns {@link Optional#empty()} in those cases. Package-private so tests can - * {@code .block()} on the chain rather than poll for metric emission. - */ + // Returns the fully-decorated Mono for op without subscribing to it. + // + // The decoration applies the per-op timeout, records the success or error metric, and swallows + // any error. When the operation does not apply (or its prepare() throws synchronously), this + // method emits the "skipped" or "prepare_threw" metric and returns Optional.empty(). + // + // Package-private so that tests can .block() on the chain rather than poll for metric emission + // after a fire-and-forget subscription. Optional> decorate(PostCommitOperation op, TableDto savedDto) { Optional> work; try { @@ -109,10 +113,8 @@ Optional> decorate(PostCommitOperation op, TableDto savedDto) { return Optional.of(decorated); } - /** - * Map a terminal error to a small set of outcome tags. Kept here so all operations share the same - * taxonomy. - */ + // Maps a terminal error to a small set of outcome tags. The classifier lives here so that all + // operations share the same taxonomy. private static String classifyOutcome(Throwable e) { if (e instanceof TimeoutException) { return "timeout"; diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitOperation.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitOperation.java index b94804b34..6cf99cd7e 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitOperation.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitOperation.java @@ -4,22 +4,23 @@ import java.util.Optional; import reactor.core.publisher.Mono; -/** - * A best-effort, bounded, asynchronous action run by the Tables Service after a successful Iceberg - * commit. Operations are invoked by {@link PostCommitDispatcher}, which owns the per-op timeout, - * subscription, error swallowing, and metric emission. An error from prepare() is recorded and - * dropped — it never affects commit correctness. Implementations may apply internal retries with - * bounded budgets but must not subscribe themselves or apply an outer timeout. - */ +// A best-effort, bounded, asynchronous action that the Tables Service runs after a successful +// Iceberg commit. +// +// Operations are invoked by PostCommitDispatcher, which owns the per-op timeout, the +// subscription, error swallowing, and metric emission. An error signaled from prepare() is +// recorded and dropped, so it never affects commit correctness. +// +// Implementations describe what to push and where. They may apply internal retries within a +// bounded budget. They must not subscribe themselves or apply an outer timeout, because the +// dispatcher already owns both concerns. public interface PostCommitOperation { - /** Short, stable identifier for this operation. Used as the {@code op} metric tag. */ + // Returns a short, stable identifier for this operation. Used as the "op" metric tag. String name(); - /** - * Build the work to run for {@code savedDto}, or {@link Optional#empty()} when the operation does - * not apply (e.g. table not opted in, no committed snapshot). The dispatcher records empty as a - * {@code skipped} metric. - */ + // Builds the work to run for savedDto, or returns Optional.empty() when the operation does not + // apply to this commit (for example, when the table is not opted in or has no committed + // snapshot). The dispatcher records an empty return as a "skipped" metric. Optional> prepare(TableDto savedDto); } diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitProperties.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitProperties.java index bc757d37f..98e8b02eb 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitProperties.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitProperties.java @@ -4,23 +4,24 @@ import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; -/** - * Configuration for the Tables Service post-commit operation framework. Property prefix {@code - * tables.postcommit}. When {@code enabled} is false (the default), no dispatcher bean is - * constructed and registered {@link PostCommitOperation} beans are never invoked. - */ +// Configuration for the Tables Service post-commit operation framework. Property prefix is +// tables.postcommit. +// +// When enabled is false (the default), the dispatcher bean is not constructed, and any +// registered PostCommitOperation beans are never invoked. @ConfigurationProperties("tables.postcommit") @Getter @Setter public class PostCommitProperties { - /** Master switch. When false, no operations are dispatched on commit. */ + // Master switch. When false, no operations are dispatched on commit. private boolean enabled = false; - /** - * Wall-clock ceiling applied by the dispatcher to each operation's prepared {@code Mono}. Bounds - * resource occupancy (connections, threads) but does not block the commit thread. Default 3000 - * ms. - */ + // Wall-clock ceiling that the dispatcher applies to each operation's prepared Mono. + // + // This bounds resource occupancy (connections, threads) for a misbehaving operation. It does + // not block the commit thread, because operations run on the underlying reactive scheduler. + // + // Default is 3000 ms. private long perOpTimeoutMs = 3000L; } From ecc65f8ff9bed21165f9f10002138af752bcee7e Mon Sep 17 00:00:00 2001 From: mkuchenbecker Date: Wed, 27 May 2026 15:43:21 -0700 Subject: [PATCH 5/5] docs: drop internal ticket reference from OptimizerStatsRequest comment Server-side idempotency wiring is a follow-up; the comment now states that without naming an internal tracker. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../tables/services/optimizer/OptimizerStatsRequest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsRequest.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsRequest.java index 8239184ec..2cb2b2873 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsRequest.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsRequest.java @@ -43,7 +43,7 @@ public static class Stats { @JsonInclude(JsonInclude.Include.NON_NULL) public static class Snapshot { // Iceberg snapshot ID. Sent so the optimizer can use it as an idempotency token on upsert and - // reject out-of-order replays. Server-side wiring is tracked in BDP-102985. + // reject out-of-order replays. private Long snapshotId; private String tableVersion;