diff --git a/services/common/src/main/java/com/linkedin/openhouse/common/metrics/MetricsConstant.java b/services/common/src/main/java/com/linkedin/openhouse/common/metrics/MetricsConstant.java index baf65c627..1c7945f9c 100644 --- a/services/common/src/main/java/com/linkedin/openhouse/common/metrics/MetricsConstant.java +++ b/services/common/src/main/java/com/linkedin/openhouse/common/metrics/MetricsConstant.java @@ -63,4 +63,12 @@ private MetricsConstant() {} public static final String HTS_LIST_TABLES_REQUEST = "hts_list_tables_request"; public static final String HTS_LIST_TABLES_TIME = "hts_list_tables_time"; public static final String HTS_SEARCH_TABLES_TIME = "hts_search_tables_time"; + + // Tables post-commit operation framework (bounded, best-effort actions after commit). + // Tagged with op={operation name}; on failure additionally tagged with outcome={timeout, + // network_error, server_error, client_error, prepare_threw, unknown_error}. + // The duration timer is bounded by tables.postcommit.per-op-timeout-ms. + public static final String POSTCOMMIT_OP_DURATION = "postcommit_op_duration"; + public static final String POSTCOMMIT_OP_SKIPPED = "postcommit_op_skipped"; + public static final String POSTCOMMIT_OP_FAILED = "postcommit_op_failed"; } diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableStatsController.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableStatsController.java index b119dd1c7..87fbfa709 100644 --- a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableStatsController.java +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableStatsController.java @@ -24,10 +24,22 @@ /** REST controller for managing per-table stats in the optimizer DB. */ @RestController -@RequestMapping("/v1/optimizer/stats") +@RequestMapping(TableStatsController.BASE_PATH) @RequiredArgsConstructor public class TableStatsController { + /** + * Base path for the stats endpoints. Single source of truth — external callers must reference. + */ + public static final String BASE_PATH = "/v1/optimizer/stats"; + + /** + * URI template (with {@code {tableUuid}} placeholder) for the per-table stats upsert/fetch + * endpoint. Use this from any client that calls the optimizer over HTTP, rather than rebuilding + * the path inline. + */ + public static final String TABLE_PATH_TEMPLATE = BASE_PATH + "/{tableUuid}"; + private final OptimizerDataService service; /** diff --git a/services/tables/build.gradle b/services/tables/build.gradle index c85a57131..da42c32ef 100644 --- a/services/tables/build.gradle +++ b/services/tables/build.gradle @@ -44,6 +44,7 @@ dependencies { testImplementation 'org.junit.jupiter:junit-jupiter-engine:' + junit_version testImplementation 'org.springframework.security:spring-security-test:5.7.3' testImplementation 'org.springframework:spring-context-support:5.3.18' + testImplementation 'com.squareup.okhttp3:mockwebserver:4.10.0' testImplementation(testFixtures(project(':services:common'))) testImplementation (project(':tables-test-fixtures:tables-test-fixtures_2.12')) { exclude group: 'com.linkedin.iceberg' diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/TablesMapper.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/TablesMapper.java index 08111cd82..9315df9d8 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/TablesMapper.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/TablesMapper.java @@ -73,6 +73,7 @@ public interface TablesMapper { @Mapping(source = "requestBody.sortOrder", target = "sortOrder"), @Mapping(target = "lastModifiedTime", ignore = true), @Mapping(target = "creationTime", ignore = true), + @Mapping(target = "currentSnapshot", ignore = true) }) TableDto toTableDto(TableDto tableDto, CreateUpdateTableRequestBody requestBody); @@ -120,7 +121,8 @@ public interface TablesMapper { defaultExpression = "java(TableType.PRIMARY_TABLE)"), @Mapping(source = "requestBody.createUpdateTableRequestBody.sortOrder", target = "sortOrder"), @Mapping(target = "lastModifiedTime", ignore = true), - @Mapping(target = "creationTime", ignore = true) + @Mapping(target = "creationTime", ignore = true), + @Mapping(target = "currentSnapshot", ignore = true) }) TableDto toTableDto(TableDto tableDto, IcebergSnapshotsRequestBody requestBody); diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/model/CurrentSnapshotInfo.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/model/CurrentSnapshotInfo.java new file mode 100644 index 000000000..90437d418 --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/model/CurrentSnapshotInfo.java @@ -0,0 +1,30 @@ +package com.linkedin.openhouse.tables.model; + +import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; + +// In-memory snapshot of the Iceberg current-snapshot metadata that was loaded alongside a +// TableDto. +// +// This carries only fields already materialized by the catalog client. Constructing it never +// triggers a separate HDFS or object-store read. +// +// The value is present whenever the underlying table has at least one committed snapshot at +// construction time. It is absent (modeled as Optional.empty() on TableDto) for tables with no +// committed data, such as a CREATE TABLE with no rows yet. +@Getter +@Builder +@AllArgsConstructor +@EqualsAndHashCode +public class CurrentSnapshotInfo { + + // Iceberg snapshot ID. Stable per commit; usable as an idempotency token. + private final long snapshotId; + + // Iceberg Snapshot.summary() map, unmodified. Keys include total-data-files, total-files-size, + // added-data-files, deleted-data-files, added-files-size, removed-files-size. + private final Map summary; +} diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/model/TableDto.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/model/TableDto.java index 917c632d8..59dc2e80f 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/model/TableDto.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/model/TableDto.java @@ -11,11 +11,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import javax.persistence.Convert; import javax.persistence.ElementCollection; import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.IdClass; +import javax.persistence.Transient; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Builder; @@ -83,6 +85,23 @@ public class TableDto { private boolean replaceCommit; + // In-memory current-snapshot metadata captured when this TableDto was built from an Iceberg + // Table. + // + // Present whenever the underlying table has at least one committed snapshot at that point. + // Absent for tables with no committed data, such as a CREATE TABLE with no rows yet. + // + // Not persisted and not part of equality. Read through getCurrentSnapshot(). + @Getter(AccessLevel.NONE) + @Transient + @EqualsAndHashCode.Exclude + private CurrentSnapshotInfo currentSnapshot; + + // Returns the current-snapshot metadata if any, else Optional.empty(). + public Optional getCurrentSnapshot() { + return Optional.ofNullable(currentSnapshot); + } + /** * Bundling eligible string type field into a map as {@link org.mapstruct.Mapper} doesn't provide * easy interface to achieve so. diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java index 0dde039c0..d3ced5458 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java @@ -9,6 +9,7 @@ import com.linkedin.openhouse.tables.dto.mapper.iceberg.PartitionSpecMapper; import com.linkedin.openhouse.tables.dto.mapper.iceberg.PoliciesSpecMapper; import com.linkedin.openhouse.tables.dto.mapper.iceberg.TableTypeMapper; +import com.linkedin.openhouse.tables.model.CurrentSnapshotInfo; import com.linkedin.openhouse.tables.model.TableDto; import com.linkedin.openhouse.tables.repository.PreservedKeyChecker; import java.net.URI; @@ -135,6 +136,13 @@ static TableDto convertToTableDto( .jsonSnapshots(null) .tableProperties(megaProps) .sortOrder(SortOrderParser.toJson(table.sortOrder())) + .currentSnapshot( + table.currentSnapshot() == null + ? null + : CurrentSnapshotInfo.builder() + .snapshotId(table.currentSnapshot().snapshotId()) + .summary(table.currentSnapshot().summary()) + .build()) .build(); return tableDto; diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/IcebergSnapshotsServiceImpl.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/IcebergSnapshotsServiceImpl.java index af43a169e..6623b5f37 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/IcebergSnapshotsServiceImpl.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/IcebergSnapshotsServiceImpl.java @@ -10,6 +10,7 @@ import com.linkedin.openhouse.tables.model.TableDto; import com.linkedin.openhouse.tables.model.TableDtoPrimaryKey; import com.linkedin.openhouse.tables.repository.OpenHouseInternalRepository; +import com.linkedin.openhouse.tables.services.postcommit.PostCommitDispatcher; import com.linkedin.openhouse.tables.utils.AuthorizationUtils; import com.linkedin.openhouse.tables.utils.TableUUIDGenerator; import java.util.Optional; @@ -32,6 +33,13 @@ public class IcebergSnapshotsServiceImpl implements IcebergSnapshotsService { @Autowired AuthorizationUtils authorizationUtils; + /** + * Present only when {@code tables.postcommit.enabled=true}. When absent, the on-commit hook is a + * literal no-op and no post-commit operations run. + */ + @Autowired(required = false) + Optional postCommitDispatcher = Optional.empty(); + @Override public Pair putIcebergSnapshots( String databaseId, @@ -83,7 +91,9 @@ public Pair putIcebergSnapshots( databaseId, tableCreatorUpdater, Privileges.CREATE_TABLE); } try { - return Pair.of(openHouseInternalRepository.save(tableDtoToSave), !tableDto.isPresent()); + TableDto savedDto = openHouseInternalRepository.save(tableDtoToSave); + postCommitDispatcher.ifPresent(d -> d.dispatch(savedDto)); + return Pair.of(savedDto, !tableDto.isPresent()); } catch (BadRequestException e) { throw new RequestValidationFailureException(e.getMessage(), e); } catch (CommitFailedException ce) { diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsConfig.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsConfig.java new file mode 100644 index 000000000..47613f98c --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsConfig.java @@ -0,0 +1,36 @@ +package com.linkedin.openhouse.tables.services.optimizer; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.netty.http.client.HttpClient; + +// Wiring for the post-commit Tables-to-Optimizer stats push. +// +// This configuration is active only when optimizer.stats.enabled=true. When the flag is false, +// no WebClient bean is constructed, and the dispatcher sees no optimizer-stats operation. +@Configuration +@EnableConfigurationProperties(OptimizerStatsProperties.class) +@ConditionalOnProperty(prefix = "optimizer.stats", name = "enabled", havingValue = "true") +public class OptimizerStatsConfig { + + // Dedicated WebClient for the optimizer stats endpoint. + // + // The per-attempt timeout is applied on the Reactor chain in OptimizerStatsPostCommitOperation, + // and the outer per-op timeout is applied by PostCommitDispatcher. Neither timeout is + // configured on the underlying Netty client. + // + // This arrangement ensures that any timeout always emerges as a standard + // java.util.concurrent.TimeoutException rather than a Netty ReadTimeoutException, which keeps + // the dispatcher's outcome classification simple. + @Bean("optimizerStatsWebClient") + public WebClient optimizerStatsWebClient(OptimizerStatsProperties properties) { + return WebClient.builder() + .baseUrl(properties.getBaseUri()) + .clientConnector(new ReactorClientHttpConnector(HttpClient.create())) + .build(); + } +} diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperation.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperation.java new file mode 100644 index 000000000..756da2d10 --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperation.java @@ -0,0 +1,168 @@ +package com.linkedin.openhouse.tables.services.optimizer; + +import com.linkedin.openhouse.tables.model.CurrentSnapshotInfo; +import com.linkedin.openhouse.tables.model.TableDto; +import com.linkedin.openhouse.tables.services.postcommit.PostCommitOperation; +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeoutException; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClientRequestException; +import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; +import reactor.util.retry.RetrySpec; + +// A PostCommitOperation that PUTs a snapshot-stats record to the optimizer's per-table stats +// endpoint. +// +// prepare() returns a Mono that completes on HTTP 2xx and signals an error otherwise. The +// dispatcher owns the timeout, the subscription, error swallowing, and metric emission. The +// returned value is Optional.empty() when the table is not opted in via OPT_IN_PROPERTY or has +// no current snapshot. +// +// Internal retry is bounded by OptimizerStatsProperties.maxAttempts and fires only on retryable +// errors: network failures, a TimeoutException, or HTTP 408 / 429 / 5xx responses. The +// dispatcher's per-op timeout is the hard ceiling on the whole chain. +// +// The bean is wired only when optimizer.stats.enabled=true. PATH_TEMPLATE is intentionally +// duplicated from TableStatsController.TABLE_PATH_TEMPLATE so the tables service does not take a +// compile-time dependency on the optimizer jar. Keep both copies in sync. +@Slf4j +@Component +@ConditionalOnProperty(prefix = "optimizer.stats", name = "enabled", havingValue = "true") +public class OptimizerStatsPostCommitOperation implements PostCommitOperation { + + // Metric tag value for the "op" tag. + static final String OP_NAME = "optimizer_stats"; + + // Per-call URL path. Intentionally duplicated from TableStatsController.TABLE_PATH_TEMPLATE so + // that we avoid a compile-time dependency on the optimizer jar. Keep both copies in sync. + static final String PATH_TEMPLATE = "/v1/optimizer/stats/{tableUuid}"; + + // Table-property key that opts a table in for the post-commit stats push. + static final String OPT_IN_PROPERTY = "maintenance.optimizer.stats.enabled"; + + // Iceberg snapshot-summary keys we read. All values are decimal-string longs. + static final String SUMMARY_TOTAL_DATA_FILES = "total-data-files"; + static final String SUMMARY_TOTAL_FILES_SIZE = "total-files-size"; + static final String SUMMARY_ADDED_DATA_FILES = "added-data-files"; + static final String SUMMARY_DELETED_DATA_FILES = "deleted-data-files"; + static final String SUMMARY_ADDED_FILES_SIZE = "added-files-size"; + static final String SUMMARY_REMOVED_FILES_SIZE = "removed-files-size"; + + private final WebClient webClient; + private final OptimizerStatsProperties properties; + + public OptimizerStatsPostCommitOperation( + @Qualifier("optimizerStatsWebClient") WebClient webClient, + OptimizerStatsProperties properties) { + this.webClient = webClient; + this.properties = properties; + } + + @Override + public String name() { + return OP_NAME; + } + + @Override + public Optional> prepare(TableDto savedDto) { + if (!isOptedIn(savedDto)) { + return Optional.empty(); + } + Optional snapshot = savedDto.getCurrentSnapshot(); + if (!snapshot.isPresent()) { + return Optional.empty(); + } + + OptimizerStatsRequest body = buildRequest(savedDto, snapshot.get()); + String tableUuid = savedDto.getTableUUID(); + + RetrySpec retrySpec = + Retry.max(Math.max(0, properties.getMaxAttempts() - 1)).filter(this::isRetryable); + + Mono chain = + webClient + .put() + .uri(PATH_TEMPLATE, tableUuid) + .bodyValue(body) + .retrieve() + .toBodilessEntity() + .timeout(Duration.ofMillis(properties.getPerAttemptTimeoutMs())) + .retryWhen(retrySpec.onRetryExhaustedThrow((spec, signal) -> signal.failure())) + .then(); + return Optional.of(chain); + } + + // Returns true when the table's properties contain the literal opt-in value "true" for the + // configured OPT_IN_PROPERTY. + private boolean isOptedIn(TableDto saved) { + Map props = saved.getTableProperties(); + return props != null && "true".equals(props.get(OPT_IN_PROPERTY)); + } + + // Builds the wire body. Missing summary keys default to 0L. + OptimizerStatsRequest buildRequest(TableDto saved, CurrentSnapshotInfo snapshot) { + Map summary = snapshot.getSummary(); + OptimizerStatsRequest.Snapshot snapshotPayload = + OptimizerStatsRequest.Snapshot.builder() + .snapshotId(snapshot.getSnapshotId()) + .tableVersion(saved.getTableVersion()) + .tableLocation(saved.getTableLocation()) + .tableSizeBytes(longOrZero(summary, SUMMARY_TOTAL_FILES_SIZE)) + .numCurrentFiles(longOrZero(summary, SUMMARY_TOTAL_DATA_FILES)) + .build(); + OptimizerStatsRequest.Delta delta = + OptimizerStatsRequest.Delta.builder() + .numFilesAdded(longOrZero(summary, SUMMARY_ADDED_DATA_FILES)) + .numFilesDeleted(longOrZero(summary, SUMMARY_DELETED_DATA_FILES)) + .addedSizeBytes(longOrZero(summary, SUMMARY_ADDED_FILES_SIZE)) + .deletedSizeBytes(longOrZero(summary, SUMMARY_REMOVED_FILES_SIZE)) + .build(); + return OptimizerStatsRequest.builder() + .databaseName(saved.getDatabaseId()) + .tableName(saved.getTableId()) + .stats(OptimizerStatsRequest.Stats.builder().snapshot(snapshotPayload).delta(delta).build()) + .tableProperties(saved.getTableProperties()) + .build(); + } + + // Returns true for errors that a later attempt could plausibly succeed against: a per-attempt + // timeout, a network-level failure, or an HTTP 5xx / 408 / 429 response. + // + // Other 4xx responses are client errors that retries cannot fix, so we fail fast on them. + boolean isRetryable(Throwable e) { + if (e instanceof TimeoutException) { + return true; + } + if (e instanceof WebClientRequestException) { + return true; + } + if (e instanceof WebClientResponseException) { + int code = ((WebClientResponseException) e).getStatusCode().value(); + return code == 408 || code == 429 || (code >= 500 && code < 600); + } + return false; + } + + private static long longOrZero(Map m, String key) { + if (m == null) { + return 0L; + } + String v = m.get(key); + if (v == null) { + return 0L; + } + try { + return Long.parseLong(v); + } catch (NumberFormatException nfe) { + return 0L; + } + } +} diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsProperties.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsProperties.java new file mode 100644 index 000000000..877ce3423 --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsProperties.java @@ -0,0 +1,35 @@ +package com.linkedin.openhouse.tables.services.optimizer; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; + +// Configuration for the post-commit stats push from the Tables Service to the Optimizer's stats +// endpoint. Property prefix is optimizer.stats. +// +// When enabled is false (the default), no client bean is constructed, and the operation is +// absent from the dispatcher. Environments that want the feed turn it on and set baseUri. +@ConfigurationProperties("optimizer.stats") +@Getter +@Setter +public class OptimizerStatsProperties { + + // Master switch. When false, no HTTP push is attempted and no client bean is wired. + private boolean enabled = false; + + // Base URI of the Optimizer service. The path /v1/optimizer/stats/{tableUuid} is appended at + // call time. No default; required when enabled is true. + private String baseUri; + + // Per-attempt HTTP timeout in milliseconds. + // + // Bounds each individual attempt so retries can fit inside the dispatcher's outer per-op budget + // (tables.postcommit.per-op-timeout-ms, default 3000 ms). Default is 1000 ms. + private long perAttemptTimeoutMs = 1000L; + + // Total attempt count (the initial try plus retries). Default is 3. + // + // Retries fire only on retryable errors: network failures, a timeout, or an HTTP 408 / 429 / + // 5xx response. Other 4xx responses fail fast without retry. + private int maxAttempts = 3; +} diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsRequest.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsRequest.java new file mode 100644 index 000000000..2cb2b2873 --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsRequest.java @@ -0,0 +1,67 @@ +package com.linkedin.openhouse.tables.services.optimizer; + +import com.fasterxml.jackson.annotation.JsonInclude; +import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +// Wire body for PUT /v1/optimizer/stats/{tableUuid}. +// +// This mirrors the optimizer's UpsertTableStatsRequest field-for-field. The type is duplicated +// here so that the tables service does not take a compile-time dependency on the optimizer jar. +// Keep both copies in sync. +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@JsonInclude(JsonInclude.Include.NON_NULL) +public class OptimizerStatsRequest { + + private String databaseName; + private String tableName; + private Stats stats; + private Map tableProperties; + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @JsonInclude(JsonInclude.Include.NON_NULL) + public static class Stats { + private Snapshot snapshot; + private Delta delta; + } + + // Point-in-time snapshot metrics. Maps to the optimizer's + // TableStatsPayload.SnapshotMetricsDto. + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @JsonInclude(JsonInclude.Include.NON_NULL) + public static class Snapshot { + // Iceberg snapshot ID. Sent so the optimizer can use it as an idempotency token on upsert and + // reject out-of-order replays. + private Long snapshotId; + + private String tableVersion; + private String tableLocation; + private Long tableSizeBytes; + private Long numCurrentFiles; + } + + // Per-commit incremental counters. Maps to the optimizer's TableStatsPayload.CommitDeltaDto. + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @JsonInclude(JsonInclude.Include.NON_NULL) + public static class Delta { + private Long numFilesAdded; + private Long numFilesDeleted; + private Long addedSizeBytes; + private Long deletedSizeBytes; + } +} diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcher.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcher.java new file mode 100644 index 000000000..5a04e2df7 --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcher.java @@ -0,0 +1,136 @@ +package com.linkedin.openhouse.tables.services.postcommit; + +import com.linkedin.openhouse.common.metrics.MetricsConstant; +import com.linkedin.openhouse.tables.model.TableDto; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Timer; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeoutException; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.client.WebClientRequestException; +import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Mono; + +// Runs PostCommitOperations after a successful Iceberg commit. +// +// Each operation receives a wall-clock timeout from tables.postcommit.per-op-timeout-ms. Any +// error the operation signals is recorded as a metric and a log line, then swallowed. Dispatch +// is fire-and-forget, so the commit thread is never blocked on operation work. +// +// Operations describe payload and endpoint only. The timeout, error swallowing, subscription, +// and metric emission all live here, so the contract across operations stays uniform. +// +// The bean is constructed only when tables.postcommit.enabled=true. +@Slf4j +@Component +@EnableConfigurationProperties(PostCommitProperties.class) +@ConditionalOnProperty(prefix = "tables.postcommit", name = "enabled", havingValue = "true") +public class PostCommitDispatcher { + + private final List operations; + private final PostCommitProperties properties; + private final MeterRegistry meterRegistry; + + public PostCommitDispatcher( + List operations, + PostCommitProperties properties, + MeterRegistry meterRegistry) { + this.operations = operations; + this.properties = properties; + this.meterRegistry = meterRegistry; + } + + // Dispatches all registered operations for savedDto. + // + // Returns immediately on the calling thread. Each operation runs on its underlying reactive + // scheduler. This method never throws. + public void dispatch(TableDto savedDto) { + for (PostCommitOperation op : operations) { + decorate(op, savedDto).ifPresent(Mono::subscribe); + } + } + + // Returns the fully-decorated Mono for op without subscribing to it. + // + // The decoration applies the per-op timeout, records the success or error metric, and swallows + // any error. When the operation does not apply (or its prepare() throws synchronously), this + // method emits the "skipped" or "prepare_threw" metric and returns Optional.empty(). + // + // Package-private so that tests can .block() on the chain rather than poll for metric emission + // after a fire-and-forget subscription. + Optional> decorate(PostCommitOperation op, TableDto savedDto) { + Optional> work; + try { + work = op.prepare(savedDto); + } catch (RuntimeException e) { + // Defensive: a prepare() that throws synchronously must not break dispatch of later ops. + meterRegistry + .counter( + MetricsConstant.POSTCOMMIT_OP_FAILED, "op", op.name(), "outcome", "prepare_threw") + .increment(); + log.warn("Post-commit op {} prepare() threw {}", op.name(), e.toString()); + return Optional.empty(); + } + if (!work.isPresent()) { + meterRegistry.counter(MetricsConstant.POSTCOMMIT_OP_SKIPPED, "op", op.name()).increment(); + return Optional.empty(); + } + Timer.Sample sample = Timer.start(meterRegistry); + Mono decorated = + work.get() + .timeout(Duration.ofMillis(properties.getPerOpTimeoutMs())) + .doOnSuccess( + ignored -> + sample.stop( + meterRegistry.timer( + MetricsConstant.POSTCOMMIT_OP_DURATION, + "op", + op.name(), + "outcome", + "success"))) + .onErrorResume( + e -> { + String outcome = classifyOutcome(e); + sample.stop( + meterRegistry.timer( + MetricsConstant.POSTCOMMIT_OP_DURATION, + "op", + op.name(), + "outcome", + outcome)); + meterRegistry + .counter( + MetricsConstant.POSTCOMMIT_OP_FAILED, "op", op.name(), "outcome", outcome) + .increment(); + log.warn("Post-commit op {} failed ({}): {}", op.name(), outcome, e.toString()); + return Mono.empty(); + }); + return Optional.of(decorated); + } + + // Maps a terminal error to a small set of outcome tags. The classifier lives here so that all + // operations share the same taxonomy. + private static String classifyOutcome(Throwable e) { + if (e instanceof TimeoutException) { + return "timeout"; + } + if (e instanceof WebClientRequestException) { + return "network_error"; + } + if (e instanceof WebClientResponseException) { + int code = ((WebClientResponseException) e).getStatusCode().value(); + if (code >= 500 && code < 600) { + return "server_error"; + } + if (code >= 400 && code < 500) { + return "client_error"; + } + } + return "unknown_error"; + } +} diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitOperation.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitOperation.java new file mode 100644 index 000000000..6cf99cd7e --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitOperation.java @@ -0,0 +1,26 @@ +package com.linkedin.openhouse.tables.services.postcommit; + +import com.linkedin.openhouse.tables.model.TableDto; +import java.util.Optional; +import reactor.core.publisher.Mono; + +// A best-effort, bounded, asynchronous action that the Tables Service runs after a successful +// Iceberg commit. +// +// Operations are invoked by PostCommitDispatcher, which owns the per-op timeout, the +// subscription, error swallowing, and metric emission. An error signaled from prepare() is +// recorded and dropped, so it never affects commit correctness. +// +// Implementations describe what to push and where. They may apply internal retries within a +// bounded budget. They must not subscribe themselves or apply an outer timeout, because the +// dispatcher already owns both concerns. +public interface PostCommitOperation { + + // Returns a short, stable identifier for this operation. Used as the "op" metric tag. + String name(); + + // Builds the work to run for savedDto, or returns Optional.empty() when the operation does not + // apply to this commit (for example, when the table is not opted in or has no committed + // snapshot). The dispatcher records an empty return as a "skipped" metric. + Optional> prepare(TableDto savedDto); +} diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitProperties.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitProperties.java new file mode 100644 index 000000000..98e8b02eb --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitProperties.java @@ -0,0 +1,27 @@ +package com.linkedin.openhouse.tables.services.postcommit; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; + +// Configuration for the Tables Service post-commit operation framework. Property prefix is +// tables.postcommit. +// +// When enabled is false (the default), the dispatcher bean is not constructed, and any +// registered PostCommitOperation beans are never invoked. +@ConfigurationProperties("tables.postcommit") +@Getter +@Setter +public class PostCommitProperties { + + // Master switch. When false, no operations are dispatched on commit. + private boolean enabled = false; + + // Wall-clock ceiling that the dispatcher applies to each operation's prepared Mono. + // + // This bounds resource occupancy (connections, threads) for a misbehaving operation. It does + // not block the commit thread, because operations run on the underlying reactive scheduler. + // + // Default is 3000 ms. + private long perOpTimeoutMs = 3000L; +} diff --git a/services/tables/src/main/resources/application.properties b/services/tables/src/main/resources/application.properties index 4a6c82a89..24f5554c5 100644 --- a/services/tables/src/main/resources/application.properties +++ b/services/tables/src/main/resources/application.properties @@ -23,4 +23,16 @@ management.metrics.distribution.maximum-expected-value.catalog_metadata_retrieva management.metrics.distribution.maximum-expected-value.catalog_metadata_update_latency=600s management.metrics.distribution.maximum-expected-value.http.server.requests=600s server.shutdown=graceful -spring.lifecycle.timeout-per-shutdown-phase=60s \ No newline at end of file +spring.lifecycle.timeout-per-shutdown-phase=60s + +# Post-commit operation framework. Disabled by default; flip per OH instance. +# Outer per-op wall-clock budget is enforced by PostCommitDispatcher. +tables.postcommit.enabled=${TABLES_POSTCOMMIT_ENABLED:false} +tables.postcommit.per-op-timeout-ms=${TABLES_POSTCOMMIT_PER_OP_TIMEOUT_MS:3000} + +# Optimizer stats post-commit operation (one impl of PostCommitOperation). +# Both flags must be true for the push to be wired and dispatched. +optimizer.stats.enabled=${OPTIMIZER_STATS_ENABLED:false} +optimizer.stats.base-uri=${OPTIMIZER_STATS_BASE_URI:} +optimizer.stats.per-attempt-timeout-ms=${OPTIMIZER_STATS_PER_ATTEMPT_TIMEOUT_MS:1000} +optimizer.stats.max-attempts=${OPTIMIZER_STATS_MAX_ATTEMPTS:3} \ No newline at end of file diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/service/IcebergSnapshotsServiceTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/service/IcebergSnapshotsServiceTest.java index 44467b705..648ae1871 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/service/IcebergSnapshotsServiceTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/service/IcebergSnapshotsServiceTest.java @@ -11,11 +11,14 @@ import com.linkedin.openhouse.tables.api.spec.v0.request.components.Policies; import com.linkedin.openhouse.tables.dto.mapper.TablesMapper; import com.linkedin.openhouse.tables.dto.mapper.TablesMapperImpl; +import com.linkedin.openhouse.tables.model.CurrentSnapshotInfo; import com.linkedin.openhouse.tables.model.TableDto; import com.linkedin.openhouse.tables.model.TableDtoPrimaryKey; import com.linkedin.openhouse.tables.repository.OpenHouseInternalRepository; import com.linkedin.openhouse.tables.services.IcebergSnapshotsService; +import com.linkedin.openhouse.tables.services.postcommit.PostCommitDispatcher; import com.linkedin.openhouse.tables.utils.TableUUIDGenerator; +import java.util.Collections; import java.util.Optional; import java.util.UUID; import org.apache.iceberg.exceptions.BadRequestException; @@ -45,6 +48,13 @@ public class IcebergSnapshotsServiceTest { @MockBean private TableUUIDGenerator tableUUIDGenerator; + /** + * Forces the optional post-commit dispatcher into the service so that we can verify it is invoked + * after a successful save. Production wiring is conditional on {@code + * tables.postcommit.enabled=true}; this {@code @MockBean} bypasses that condition. + */ + @MockBean private PostCommitDispatcher postCommitDispatcher; + private OpenHouseInternalRepository mockRepository; @Captor ArgumentCaptor tableDtoArgumentCaptor; @@ -90,6 +100,32 @@ public void testTableCreated() { verifyCalls(key, TEST_TABLE_CREATOR, requestBody.getCreateUpdateTableRequestBody()); } + @Test + public void testPostCommitDispatcherInvokedAfterSuccessfulCommit() { + final IcebergSnapshotsRequestBody requestBody = + TEST_ICEBERG_SNAPSHOTS_INITIAL_VERSION_REQUEST_BODY; + final String dbId = requestBody.getCreateUpdateTableRequestBody().getDatabaseId(); + final String tableId = requestBody.getCreateUpdateTableRequestBody().getTableId(); + final TableDtoPrimaryKey key = + TableDtoPrimaryKey.builder().databaseId(dbId).tableId(tableId).build(); + final CurrentSnapshotInfo snapshot = + CurrentSnapshotInfo.builder() + .snapshotId(42L) + .summary(Collections.singletonMap("total-data-files", "7")) + .build(); + final TableDto savedDto = + TableDto.builder().databaseId(dbId).tableId(tableId).currentSnapshot(snapshot).build(); + + Mockito.when(tableUUIDGenerator.generateUUID(Mockito.any(IcebergSnapshotsRequestBody.class))) + .thenReturn(UUID.randomUUID()); + Mockito.when(mockRepository.findById(key)).thenReturn(Optional.empty()); + Mockito.when(mockRepository.save(Mockito.any(TableDto.class))).thenReturn(savedDto); + + service.putIcebergSnapshots(dbId, tableId, requestBody, TEST_TABLE_CREATOR); + + Mockito.verify(postCommitDispatcher, Mockito.times(1)).dispatch(savedDto); + } + @Test public void testPutTableExceptionHandling() { final IcebergSnapshotsRequestBody requestBody = diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableDtoMappingTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableDtoMappingTest.java index e3e19a7d8..53fb633e3 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableDtoMappingTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableDtoMappingTest.java @@ -33,7 +33,8 @@ public class TableDtoMappingTest { "jsonSnapshots", "snapshotRefs", "policies", - "tableType"); + "tableType", + "currentSnapshot"); /** Making all fields making it to map is expected, and all expected field are making it there. */ @Test diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperationTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperationTest.java new file mode 100644 index 000000000..417db2abf --- /dev/null +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperationTest.java @@ -0,0 +1,211 @@ +package com.linkedin.openhouse.tables.services.optimizer; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.linkedin.openhouse.tables.model.CurrentSnapshotInfo; +import com.linkedin.openhouse.tables.model.TableDto; +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClient; + +class OptimizerStatsPostCommitOperationTest { + + private static final String TABLE_UUID = "uuid-1"; + private static final String DB = "db1"; + private static final String TABLE = "tbl1"; + private static final long SNAPSHOT_ID = 9876543210L; + private static final Duration BLOCK_MAX = Duration.ofSeconds(5); + + private MockWebServer server; + private OptimizerStatsProperties properties; + private OptimizerStatsPostCommitOperation op; + private final ObjectMapper mapper = new ObjectMapper(); + + @BeforeEach + void setUp() throws IOException { + server = new MockWebServer(); + server.start(); + properties = new OptimizerStatsProperties(); + properties.setEnabled(true); + properties.setBaseUri(server.url("/").toString()); + properties.setPerAttemptTimeoutMs(1000L); + properties.setMaxAttempts(3); + op = new OptimizerStatsPostCommitOperation(buildWebClient(), properties); + } + + @AfterEach + void tearDown() throws IOException { + server.shutdown(); + } + + private WebClient buildWebClient() { + return WebClient.builder() + .baseUrl(properties.getBaseUri()) + .clientConnector(new ReactorClientHttpConnector(HttpClient.create())) + .build(); + } + + @Test + void name_isStableTagValue() { + assertThat(op.name()).isEqualTo("optimizer_stats"); + } + + @Test + void prepare_optedIn_emitsRequestWithFullPayload() throws Exception { + server.enqueue(new MockResponse().setResponseCode(200)); + + TableDto saved = + optedInBuilder() + .tableVersion("v3") + .tableLocation("/data/tables/db1/tbl1") + .currentSnapshot(snapshot()) + .build(); + + Optional> work = op.prepare(saved); + assertThat(work).isPresent(); + work.get().block(BLOCK_MAX); + + RecordedRequest req = server.takeRequest(2, TimeUnit.SECONDS); + assertThat(req).isNotNull(); + assertThat(req.getMethod()).isEqualTo("PUT"); + assertThat(req.getPath()).isEqualTo("/v1/optimizer/stats/" + TABLE_UUID); + + @SuppressWarnings("unchecked") + Map body = mapper.readValue(req.getBody().readUtf8(), Map.class); + assertThat(body).containsEntry("databaseName", DB).containsEntry("tableName", TABLE); + @SuppressWarnings("unchecked") + Map stats = (Map) body.get("stats"); + @SuppressWarnings("unchecked") + Map snapshot = (Map) stats.get("snapshot"); + @SuppressWarnings("unchecked") + Map delta = (Map) stats.get("delta"); + assertThat(snapshot) + .containsEntry("snapshotId", SNAPSHOT_ID) + .containsEntry("tableVersion", "v3") + .containsEntry("tableLocation", "/data/tables/db1/tbl1") + .containsEntry("tableSizeBytes", 4096) + .containsEntry("numCurrentFiles", 12); + assertThat(delta) + .containsEntry("numFilesAdded", 5) + .containsEntry("numFilesDeleted", 2) + .containsEntry("addedSizeBytes", 2048) + .containsEntry("deletedSizeBytes", 1024); + } + + @Test + void prepare_notOptedIn_returnsEmpty() { + TableDto saved = builderWithoutOptIn().currentSnapshot(snapshot()).build(); + assertThat(op.prepare(saved)).isEmpty(); + assertThat(server.getRequestCount()).isZero(); + } + + @Test + void prepare_noSnapshot_returnsEmpty() { + TableDto saved = optedInBuilder().currentSnapshot(null).build(); + assertThat(op.prepare(saved)).isEmpty(); + assertThat(server.getRequestCount()).isZero(); + } + + @Test + void prepare_5xxThenSuccess_retriesAndCompletes() { + server.enqueue(new MockResponse().setResponseCode(503)); + server.enqueue(new MockResponse().setResponseCode(200)); + + TableDto saved = optedInBuilder().currentSnapshot(snapshot()).build(); + op.prepare(saved).orElseThrow(AssertionError::new).block(BLOCK_MAX); + + assertThat(server.getRequestCount()).isEqualTo(2); + } + + @Test + void prepare_allAttemptsFail_propagatesUnderlyingError() { + properties.setMaxAttempts(2); + server.enqueue(new MockResponse().setResponseCode(503)); + server.enqueue(new MockResponse().setResponseCode(503)); + + TableDto saved = optedInBuilder().currentSnapshot(snapshot()).build(); + Mono chain = op.prepare(saved).orElseThrow(AssertionError::new); + + assertThatChainErrors(chain, WebClientResponseException.class); + assertThat(server.getRequestCount()).isEqualTo(2); + } + + @Test + void prepare_4xxNonRetryable_doesNotRetry() { + server.enqueue(new MockResponse().setResponseCode(400)); + + TableDto saved = optedInBuilder().currentSnapshot(snapshot()).build(); + Mono chain = op.prepare(saved).orElseThrow(AssertionError::new); + + assertThatChainErrors(chain, WebClientResponseException.class); + assertThat(server.getRequestCount()).isEqualTo(1); + } + + // ---- helpers ---- + + private void assertThatChainErrors(Mono chain, Class errorType) { + try { + chain.block(BLOCK_MAX); + throw new AssertionError("expected chain to signal an error of type " + errorType); + } catch (RuntimeException e) { + Throwable cause = unwrap(e); + assertThat(cause).isInstanceOf(errorType); + } + } + + private static Throwable unwrap(Throwable t) { + Throwable cur = t; + while (cur.getCause() != null && cur.getCause() != cur) { + cur = cur.getCause(); + } + return cur; + } + + private static CurrentSnapshotInfo snapshot() { + return CurrentSnapshotInfo.builder().snapshotId(SNAPSHOT_ID).summary(summary()).build(); + } + + private static Map summary() { + Map s = new HashMap<>(); + s.put("total-data-files", "12"); + s.put("total-files-size", "4096"); + s.put("added-data-files", "5"); + s.put("deleted-data-files", "2"); + s.put("added-files-size", "2048"); + s.put("removed-files-size", "1024"); + return s; + } + + private static TableDto.TableDtoBuilder optedInBuilder() { + Map props = new HashMap<>(); + props.put(OptimizerStatsPostCommitOperation.OPT_IN_PROPERTY, "true"); + return TableDto.builder() + .tableUUID(TABLE_UUID) + .databaseId(DB) + .tableId(TABLE) + .tableProperties(props); + } + + private static TableDto.TableDtoBuilder builderWithoutOptIn() { + return TableDto.builder() + .tableUUID(TABLE_UUID) + .databaseId(DB) + .tableId(TABLE) + .tableProperties(new HashMap<>()); + } +} diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcherTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcherTest.java new file mode 100644 index 000000000..14b6f2dde --- /dev/null +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/services/postcommit/PostCommitDispatcherTest.java @@ -0,0 +1,177 @@ +package com.linkedin.openhouse.tables.services.postcommit; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.linkedin.openhouse.common.metrics.MetricsConstant; +import com.linkedin.openhouse.tables.model.TableDto; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import java.time.Duration; +import java.util.Arrays; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; + +class PostCommitDispatcherTest { + + private static final Duration BLOCK_MAX = Duration.ofSeconds(5); + + private MeterRegistry meterRegistry; + private PostCommitProperties properties; + private final TableDto savedDto = TableDto.builder().databaseId("db").tableId("t").build(); + + @BeforeEach + void setUp() { + meterRegistry = new SimpleMeterRegistry(); + properties = new PostCommitProperties(); + properties.setEnabled(true); + properties.setPerOpTimeoutMs(500L); + } + + private PostCommitDispatcher dispatcherWith(PostCommitOperation... ops) { + return new PostCommitDispatcher(Arrays.asList(ops), properties, meterRegistry); + } + + @Test + void dispatch_invokesEveryRegisteredOperationsPrepare() { + AtomicInteger aCount = new AtomicInteger(); + AtomicInteger bCount = new AtomicInteger(); + dispatcherWith(new EmptyOp("a", aCount), new EmptyOp("b", bCount)).dispatch(savedDto); + + assertThat(aCount.get()).isEqualTo(1); + assertThat(bCount.get()).isEqualTo(1); + } + + @Test + void decorate_emptyPrepare_incrementsSkippedAndReturnsEmpty() { + PostCommitOperation op = new EmptyOp("opx", new AtomicInteger()); + + Optional> decorated = dispatcherWith(op).decorate(op, savedDto); + + assertThat(decorated).isEmpty(); + assertThat(meterRegistry.counter(MetricsConstant.POSTCOMMIT_OP_SKIPPED, "op", "opx").count()) + .isEqualTo(1.0); + } + + @Test + void decorate_successfulWork_recordsDurationWithSuccessOutcome() { + PostCommitOperation op = new SimpleOp("ok", Mono::empty); + + dispatcherWith(op).decorate(op, savedDto).orElseThrow(AssertionError::new).block(BLOCK_MAX); + + assertThat( + meterRegistry + .timer(MetricsConstant.POSTCOMMIT_OP_DURATION, "op", "ok", "outcome", "success") + .count()) + .isEqualTo(1L); + } + + @Test + void decorate_workSignalsError_incrementsFailedWithClassifiedOutcomeAndSwallowsError() { + RuntimeException boom = new RuntimeException("boom"); + PostCommitOperation op = new SimpleOp("broke", () -> Mono.error(boom)); + + // Dispatcher swallows the error — block() returns normally. + dispatcherWith(op).decorate(op, savedDto).orElseThrow(AssertionError::new).block(BLOCK_MAX); + + assertThat( + meterRegistry + .counter( + MetricsConstant.POSTCOMMIT_OP_FAILED, "op", "broke", "outcome", "unknown_error") + .count()) + .isEqualTo(1.0); + } + + @Test + void decorate_prepareThrowsSynchronously_incrementsFailedAndDispatchContinuesToLaterOps() { + AtomicInteger laterCount = new AtomicInteger(); + PostCommitOperation thrower = + new PostCommitOperation() { + @Override + public String name() { + return "thrower"; + } + + @Override + public Optional> prepare(TableDto dto) { + throw new IllegalStateException("nope"); + } + }; + PostCommitOperation later = new EmptyOp("later", laterCount); + + dispatcherWith(thrower, later).dispatch(savedDto); + + assertThat( + meterRegistry + .counter( + MetricsConstant.POSTCOMMIT_OP_FAILED, + "op", + "thrower", + "outcome", + "prepare_threw") + .count()) + .isEqualTo(1.0); + assertThat(laterCount.get()) + .as("later operations must still run after a synchronous prepare() throw") + .isEqualTo(1); + } + + @Test + void decorate_workExceedsPerOpTimeout_incrementsFailedWithTimeoutOutcome() { + properties.setPerOpTimeoutMs(50L); + PostCommitOperation op = new SimpleOp("slow", Mono::never); + + dispatcherWith(op).decorate(op, savedDto).orElseThrow(AssertionError::new).block(BLOCK_MAX); + + assertThat( + meterRegistry + .counter(MetricsConstant.POSTCOMMIT_OP_FAILED, "op", "slow", "outcome", "timeout") + .count()) + .isEqualTo(1.0); + } + + // ---- helpers ---- + + private static class EmptyOp implements PostCommitOperation { + private final String name; + private final AtomicInteger calls; + + EmptyOp(String name, AtomicInteger calls) { + this.name = name; + this.calls = calls; + } + + @Override + public String name() { + return name; + } + + @Override + public Optional> prepare(TableDto dto) { + calls.incrementAndGet(); + return Optional.empty(); + } + } + + private static class SimpleOp implements PostCommitOperation { + private final String name; + private final java.util.function.Supplier> work; + + SimpleOp(String name, java.util.function.Supplier> work) { + this.name = name; + this.work = work; + } + + @Override + public String name() { + return name; + } + + @Override + public Optional> prepare(TableDto dto) { + return Optional.of(work.get()); + } + } +}