Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,12 @@ private MetricsConstant() {}
public static final String HTS_LIST_TABLES_REQUEST = "hts_list_tables_request";
public static final String HTS_LIST_TABLES_TIME = "hts_list_tables_time";
public static final String HTS_SEARCH_TABLES_TIME = "hts_search_tables_time";

// Tables post-commit operation framework (bounded, best-effort actions after commit).
// Tagged with op={operation name}; on failure additionally tagged with outcome={timeout,
// network_error, server_error, client_error, prepare_threw, unknown_error}.
// The duration timer is bounded by tables.postcommit.per-op-timeout-ms.
public static final String POSTCOMMIT_OP_DURATION = "postcommit_op_duration";
public static final String POSTCOMMIT_OP_SKIPPED = "postcommit_op_skipped";
public static final String POSTCOMMIT_OP_FAILED = "postcommit_op_failed";
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,22 @@

/** REST controller for managing per-table stats in the optimizer DB. */
@RestController
@RequestMapping("/v1/optimizer/stats")
@RequestMapping(TableStatsController.BASE_PATH)
@RequiredArgsConstructor
public class TableStatsController {

/**
* Base path for the stats endpoints. Single source of truth — external callers must reference.
*/
public static final String BASE_PATH = "/v1/optimizer/stats";

/**
* URI template (with {@code {tableUuid}} placeholder) for the per-table stats upsert/fetch
* endpoint. Use this from any client that calls the optimizer over HTTP, rather than rebuilding
* the path inline.
*/
public static final String TABLE_PATH_TEMPLATE = BASE_PATH + "/{tableUuid}";

private final OptimizerDataService service;

/**
Expand Down
1 change: 1 addition & 0 deletions services/tables/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter-engine:' + junit_version
testImplementation 'org.springframework.security:spring-security-test:5.7.3'
testImplementation 'org.springframework:spring-context-support:5.3.18'
testImplementation 'com.squareup.okhttp3:mockwebserver:4.10.0'
testImplementation(testFixtures(project(':services:common')))
testImplementation (project(':tables-test-fixtures:tables-test-fixtures_2.12')) {
exclude group: 'com.linkedin.iceberg'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public interface TablesMapper {
@Mapping(source = "requestBody.sortOrder", target = "sortOrder"),
@Mapping(target = "lastModifiedTime", ignore = true),
@Mapping(target = "creationTime", ignore = true),
@Mapping(target = "currentSnapshot", ignore = true)
})
TableDto toTableDto(TableDto tableDto, CreateUpdateTableRequestBody requestBody);

Expand Down Expand Up @@ -120,7 +121,8 @@ public interface TablesMapper {
defaultExpression = "java(TableType.PRIMARY_TABLE)"),
@Mapping(source = "requestBody.createUpdateTableRequestBody.sortOrder", target = "sortOrder"),
@Mapping(target = "lastModifiedTime", ignore = true),
@Mapping(target = "creationTime", ignore = true)
@Mapping(target = "creationTime", ignore = true),
@Mapping(target = "currentSnapshot", ignore = true)
})
TableDto toTableDto(TableDto tableDto, IcebergSnapshotsRequestBody requestBody);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.linkedin.openhouse.tables.model;

import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;

// In-memory snapshot of the Iceberg current-snapshot metadata that was loaded alongside a
// TableDto.
//
// This carries only fields already materialized by the catalog client. Constructing it never
// triggers a separate HDFS or object-store read.
//
// The value is present whenever the underlying table has at least one committed snapshot at
// construction time. It is absent (modeled as Optional.empty() on TableDto) for tables with no
// committed data, such as a CREATE TABLE with no rows yet.
@Getter
@Builder
@AllArgsConstructor
@EqualsAndHashCode
public class CurrentSnapshotInfo {

// Iceberg snapshot ID. Stable per commit; usable as an idempotency token.
private final long snapshotId;

// Iceberg Snapshot.summary() map, unmodified. Keys include total-data-files, total-files-size,
// added-data-files, deleted-data-files, added-files-size, removed-files-size.
private final Map<String, String> summary;
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.persistence.Convert;
import javax.persistence.ElementCollection;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.IdClass;
import javax.persistence.Transient;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
Expand Down Expand Up @@ -83,6 +85,23 @@ public class TableDto {

private boolean replaceCommit;

// In-memory current-snapshot metadata captured when this TableDto was built from an Iceberg
// Table.
//
// Present whenever the underlying table has at least one committed snapshot at that point.
// Absent for tables with no committed data, such as a CREATE TABLE with no rows yet.
//
// Not persisted and not part of equality. Read through getCurrentSnapshot().
@Getter(AccessLevel.NONE)
@Transient
@EqualsAndHashCode.Exclude
private CurrentSnapshotInfo currentSnapshot;

// Returns the current-snapshot metadata if any, else Optional.empty().
public Optional<CurrentSnapshotInfo> getCurrentSnapshot() {
return Optional.ofNullable(currentSnapshot);
}

/**
* Bundling eligible string type field into a map as {@link org.mapstruct.Mapper} doesn't provide
* easy interface to achieve so.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.openhouse.tables.dto.mapper.iceberg.PartitionSpecMapper;
import com.linkedin.openhouse.tables.dto.mapper.iceberg.PoliciesSpecMapper;
import com.linkedin.openhouse.tables.dto.mapper.iceberg.TableTypeMapper;
import com.linkedin.openhouse.tables.model.CurrentSnapshotInfo;
import com.linkedin.openhouse.tables.model.TableDto;
import com.linkedin.openhouse.tables.repository.PreservedKeyChecker;
import java.net.URI;
Expand Down Expand Up @@ -135,6 +136,13 @@ static TableDto convertToTableDto(
.jsonSnapshots(null)
.tableProperties(megaProps)
.sortOrder(SortOrderParser.toJson(table.sortOrder()))
.currentSnapshot(
table.currentSnapshot() == null
? null
: CurrentSnapshotInfo.builder()
.snapshotId(table.currentSnapshot().snapshotId())
.summary(table.currentSnapshot().summary())
.build())
.build();

return tableDto;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.linkedin.openhouse.tables.model.TableDto;
import com.linkedin.openhouse.tables.model.TableDtoPrimaryKey;
import com.linkedin.openhouse.tables.repository.OpenHouseInternalRepository;
import com.linkedin.openhouse.tables.services.postcommit.PostCommitDispatcher;
import com.linkedin.openhouse.tables.utils.AuthorizationUtils;
import com.linkedin.openhouse.tables.utils.TableUUIDGenerator;
import java.util.Optional;
Expand All @@ -32,6 +33,13 @@ public class IcebergSnapshotsServiceImpl implements IcebergSnapshotsService {

@Autowired AuthorizationUtils authorizationUtils;

/**
* Present only when {@code tables.postcommit.enabled=true}. When absent, the on-commit hook is a
* literal no-op and no post-commit operations run.
*/
@Autowired(required = false)
Optional<PostCommitDispatcher> postCommitDispatcher = Optional.empty();

Comment on lines +36 to +42
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be modeled as a postcommit operation. Postcommit operations must be bounded and async and are considered best-effort. We ahve one postcommit operation registered to the service which is optimizer stats. That way we have a generic behaviour and a specific impl.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. New services/tables/.../services/postcommit/ package: PostCommitOperation interface (name() + Optional<Mono<Void>> prepare(TableDto)), PostCommitProperties (enabled, perOpTimeoutMs), PostCommitDispatcher (per-op timeout, error swallow, metric emission, fire-and-forget subscribe). OptimizerStatsPostCommitOperation is the first registered impl. IcebergSnapshotsServiceImpl now has one line: postCommitDispatcher.ifPresent(d -> d.dispatch(savedDto)). Adding a future operation = implement the interface and let @ConditionalOnProperty wire it; no service-layer change.

@Override
public Pair<TableDto, Boolean> putIcebergSnapshots(
String databaseId,
Expand Down Expand Up @@ -83,7 +91,9 @@ public Pair<TableDto, Boolean> putIcebergSnapshots(
databaseId, tableCreatorUpdater, Privileges.CREATE_TABLE);
}
try {
return Pair.of(openHouseInternalRepository.save(tableDtoToSave), !tableDto.isPresent());
TableDto savedDto = openHouseInternalRepository.save(tableDtoToSave);
postCommitDispatcher.ifPresent(d -> d.dispatch(savedDto));
return Pair.of(savedDto, !tableDto.isPresent());
} catch (BadRequestException e) {
throw new RequestValidationFailureException(e.getMessage(), e);
} catch (CommitFailedException ce) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.linkedin.openhouse.tables.services.optimizer;

import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.netty.http.client.HttpClient;

// Wiring for the post-commit Tables-to-Optimizer stats push.
//
// This configuration is active only when optimizer.stats.enabled=true. When the flag is false,
// no WebClient bean is constructed, and the dispatcher sees no optimizer-stats operation.
@Configuration
@EnableConfigurationProperties(OptimizerStatsProperties.class)
@ConditionalOnProperty(prefix = "optimizer.stats", name = "enabled", havingValue = "true")
public class OptimizerStatsConfig {

// Dedicated WebClient for the optimizer stats endpoint.
//
// The per-attempt timeout is applied on the Reactor chain in OptimizerStatsPostCommitOperation,
// and the outer per-op timeout is applied by PostCommitDispatcher. Neither timeout is
// configured on the underlying Netty client.
//
// This arrangement ensures that any timeout always emerges as a standard
// java.util.concurrent.TimeoutException rather than a Netty ReadTimeoutException, which keeps
// the dispatcher's outcome classification simple.
@Bean("optimizerStatsWebClient")
public WebClient optimizerStatsWebClient(OptimizerStatsProperties properties) {
return WebClient.builder()
.baseUrl(properties.getBaseUri())
.clientConnector(new ReactorClientHttpConnector(HttpClient.create()))
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package com.linkedin.openhouse.tables.services.optimizer;

import com.linkedin.openhouse.tables.model.CurrentSnapshotInfo;
import com.linkedin.openhouse.tables.model.TableDto;
import com.linkedin.openhouse.tables.services.postcommit.PostCommitOperation;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientRequestException;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import reactor.util.retry.RetrySpec;

// A PostCommitOperation that PUTs a snapshot-stats record to the optimizer's per-table stats
// endpoint.
//
// prepare() returns a Mono that completes on HTTP 2xx and signals an error otherwise. The
// dispatcher owns the timeout, the subscription, error swallowing, and metric emission. The
// returned value is Optional.empty() when the table is not opted in via OPT_IN_PROPERTY or has
// no current snapshot.
//
// Internal retry is bounded by OptimizerStatsProperties.maxAttempts and fires only on retryable
// errors: network failures, a TimeoutException, or HTTP 408 / 429 / 5xx responses. The
// dispatcher's per-op timeout is the hard ceiling on the whole chain.
//
// The bean is wired only when optimizer.stats.enabled=true. PATH_TEMPLATE is intentionally
// duplicated from TableStatsController.TABLE_PATH_TEMPLATE so the tables service does not take a
// compile-time dependency on the optimizer jar. Keep both copies in sync.
@Slf4j
@Component
@ConditionalOnProperty(prefix = "optimizer.stats", name = "enabled", havingValue = "true")
public class OptimizerStatsPostCommitOperation implements PostCommitOperation {

// Metric tag value for the "op" tag.
static final String OP_NAME = "optimizer_stats";

// Per-call URL path. Intentionally duplicated from TableStatsController.TABLE_PATH_TEMPLATE so
// that we avoid a compile-time dependency on the optimizer jar. Keep both copies in sync.
static final String PATH_TEMPLATE = "/v1/optimizer/stats/{tableUuid}";

// Table-property key that opts a table in for the post-commit stats push.
static final String OPT_IN_PROPERTY = "maintenance.optimizer.stats.enabled";

// Iceberg snapshot-summary keys we read. All values are decimal-string longs.
static final String SUMMARY_TOTAL_DATA_FILES = "total-data-files";
static final String SUMMARY_TOTAL_FILES_SIZE = "total-files-size";
static final String SUMMARY_ADDED_DATA_FILES = "added-data-files";
static final String SUMMARY_DELETED_DATA_FILES = "deleted-data-files";
static final String SUMMARY_ADDED_FILES_SIZE = "added-files-size";
static final String SUMMARY_REMOVED_FILES_SIZE = "removed-files-size";

private final WebClient webClient;
private final OptimizerStatsProperties properties;

public OptimizerStatsPostCommitOperation(
@Qualifier("optimizerStatsWebClient") WebClient webClient,
OptimizerStatsProperties properties) {
this.webClient = webClient;
this.properties = properties;
}

@Override
public String name() {
return OP_NAME;
}

@Override
public Optional<Mono<Void>> prepare(TableDto savedDto) {
if (!isOptedIn(savedDto)) {
return Optional.empty();
}
Optional<CurrentSnapshotInfo> snapshot = savedDto.getCurrentSnapshot();
if (!snapshot.isPresent()) {
return Optional.empty();
}

OptimizerStatsRequest body = buildRequest(savedDto, snapshot.get());
String tableUuid = savedDto.getTableUUID();

RetrySpec retrySpec =
Retry.max(Math.max(0, properties.getMaxAttempts() - 1)).filter(this::isRetryable);

Mono<Void> chain =
webClient
.put()
.uri(PATH_TEMPLATE, tableUuid)
.bodyValue(body)
.retrieve()
.toBodilessEntity()
.timeout(Duration.ofMillis(properties.getPerAttemptTimeoutMs()))
.retryWhen(retrySpec.onRetryExhaustedThrow((spec, signal) -> signal.failure()))
.then();
return Optional.of(chain);
}

// Returns true when the table's properties contain the literal opt-in value "true" for the
// configured OPT_IN_PROPERTY.
private boolean isOptedIn(TableDto saved) {
Map<String, String> props = saved.getTableProperties();
return props != null && "true".equals(props.get(OPT_IN_PROPERTY));
}

// Builds the wire body. Missing summary keys default to 0L.
OptimizerStatsRequest buildRequest(TableDto saved, CurrentSnapshotInfo snapshot) {
Map<String, String> summary = snapshot.getSummary();
OptimizerStatsRequest.Snapshot snapshotPayload =
OptimizerStatsRequest.Snapshot.builder()
.snapshotId(snapshot.getSnapshotId())
.tableVersion(saved.getTableVersion())
.tableLocation(saved.getTableLocation())
.tableSizeBytes(longOrZero(summary, SUMMARY_TOTAL_FILES_SIZE))
.numCurrentFiles(longOrZero(summary, SUMMARY_TOTAL_DATA_FILES))
.build();
OptimizerStatsRequest.Delta delta =
OptimizerStatsRequest.Delta.builder()
.numFilesAdded(longOrZero(summary, SUMMARY_ADDED_DATA_FILES))
.numFilesDeleted(longOrZero(summary, SUMMARY_DELETED_DATA_FILES))
.addedSizeBytes(longOrZero(summary, SUMMARY_ADDED_FILES_SIZE))
.deletedSizeBytes(longOrZero(summary, SUMMARY_REMOVED_FILES_SIZE))
.build();
return OptimizerStatsRequest.builder()
.databaseName(saved.getDatabaseId())
.tableName(saved.getTableId())
.stats(OptimizerStatsRequest.Stats.builder().snapshot(snapshotPayload).delta(delta).build())
.tableProperties(saved.getTableProperties())
.build();
}

// Returns true for errors that a later attempt could plausibly succeed against: a per-attempt
// timeout, a network-level failure, or an HTTP 5xx / 408 / 429 response.
//
// Other 4xx responses are client errors that retries cannot fix, so we fail fast on them.
boolean isRetryable(Throwable e) {
if (e instanceof TimeoutException) {
return true;
}
if (e instanceof WebClientRequestException) {
return true;
}
if (e instanceof WebClientResponseException) {
int code = ((WebClientResponseException) e).getStatusCode().value();
return code == 408 || code == 429 || (code >= 500 && code < 600);
}
return false;
}

private static long longOrZero(Map<String, String> m, String key) {
if (m == null) {
return 0L;
}
String v = m.get(key);
if (v == null) {
return 0L;
}
try {
return Long.parseLong(v);
} catch (NumberFormatException nfe) {
return 0L;
}
}
}
Loading