-
Notifications
You must be signed in to change notification settings - Fork 78
(wip | no review) feat(tables): post-commit stats push to optimizer #608
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
mkuchenbecker
wants to merge
5
commits into
mkuchenb/optimizer-4
Choose a base branch
from
mkuchenb/optimizer-postcommit-stats
base: mkuchenb/optimizer-4
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
6e2669b
feat(tables): post-commit stats push to optimizer
mkuchenbecker af025ae
refactor(tables): generic PostCommitOperation framework; address review
mkuchenbecker 119c609
docs: collapse <p>/<ul>/<em>/<b> noise in postcommit javadoc
mkuchenbecker 1c7fbd7
docs: switch to // comments with line breaks, drop block javadoc
mkuchenbecker ecc65f8
docs: drop internal ticket reference from OptimizerStatsRequest comment
mkuchenbecker File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
30 changes: 30 additions & 0 deletions
30
services/tables/src/main/java/com/linkedin/openhouse/tables/model/CurrentSnapshotInfo.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| package com.linkedin.openhouse.tables.model; | ||
|
|
||
| import java.util.Map; | ||
| import lombok.AllArgsConstructor; | ||
| import lombok.Builder; | ||
| import lombok.EqualsAndHashCode; | ||
| import lombok.Getter; | ||
|
|
||
| // In-memory snapshot of the Iceberg current-snapshot metadata that was loaded alongside a | ||
| // TableDto. | ||
| // | ||
| // This carries only fields already materialized by the catalog client. Constructing it never | ||
| // triggers a separate HDFS or object-store read. | ||
| // | ||
| // The value is present whenever the underlying table has at least one committed snapshot at | ||
| // construction time. It is absent (modeled as Optional.empty() on TableDto) for tables with no | ||
| // committed data, such as a CREATE TABLE with no rows yet. | ||
| @Getter | ||
| @Builder | ||
| @AllArgsConstructor | ||
| @EqualsAndHashCode | ||
| public class CurrentSnapshotInfo { | ||
|
|
||
| // Iceberg snapshot ID. Stable per commit; usable as an idempotency token. | ||
| private final long snapshotId; | ||
|
|
||
| // Iceberg Snapshot.summary() map, unmodified. Keys include total-data-files, total-files-size, | ||
| // added-data-files, deleted-data-files, added-files-size, removed-files-size. | ||
| private final Map<String, String> summary; | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
36 changes: 36 additions & 0 deletions
36
.../src/main/java/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsConfig.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| package com.linkedin.openhouse.tables.services.optimizer; | ||
|
|
||
| import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; | ||
| import org.springframework.boot.context.properties.EnableConfigurationProperties; | ||
| import org.springframework.context.annotation.Bean; | ||
| import org.springframework.context.annotation.Configuration; | ||
| import org.springframework.http.client.reactive.ReactorClientHttpConnector; | ||
| import org.springframework.web.reactive.function.client.WebClient; | ||
| import reactor.netty.http.client.HttpClient; | ||
|
|
||
| // Wiring for the post-commit Tables-to-Optimizer stats push. | ||
| // | ||
| // This configuration is active only when optimizer.stats.enabled=true. When the flag is false, | ||
| // no WebClient bean is constructed, and the dispatcher sees no optimizer-stats operation. | ||
| @Configuration | ||
| @EnableConfigurationProperties(OptimizerStatsProperties.class) | ||
| @ConditionalOnProperty(prefix = "optimizer.stats", name = "enabled", havingValue = "true") | ||
| public class OptimizerStatsConfig { | ||
|
|
||
| // Dedicated WebClient for the optimizer stats endpoint. | ||
| // | ||
| // The per-attempt timeout is applied on the Reactor chain in OptimizerStatsPostCommitOperation, | ||
| // and the outer per-op timeout is applied by PostCommitDispatcher. Neither timeout is | ||
| // configured on the underlying Netty client. | ||
| // | ||
| // This arrangement ensures that any timeout always emerges as a standard | ||
| // java.util.concurrent.TimeoutException rather than a Netty ReadTimeoutException, which keeps | ||
| // the dispatcher's outcome classification simple. | ||
| @Bean("optimizerStatsWebClient") | ||
| public WebClient optimizerStatsWebClient(OptimizerStatsProperties properties) { | ||
| return WebClient.builder() | ||
| .baseUrl(properties.getBaseUri()) | ||
| .clientConnector(new ReactorClientHttpConnector(HttpClient.create())) | ||
| .build(); | ||
| } | ||
| } |
168 changes: 168 additions & 0 deletions
168
...a/com/linkedin/openhouse/tables/services/optimizer/OptimizerStatsPostCommitOperation.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,168 @@ | ||
| package com.linkedin.openhouse.tables.services.optimizer; | ||
|
|
||
| import com.linkedin.openhouse.tables.model.CurrentSnapshotInfo; | ||
| import com.linkedin.openhouse.tables.model.TableDto; | ||
| import com.linkedin.openhouse.tables.services.postcommit.PostCommitOperation; | ||
| import java.time.Duration; | ||
| import java.util.Map; | ||
| import java.util.Optional; | ||
| import java.util.concurrent.TimeoutException; | ||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.springframework.beans.factory.annotation.Qualifier; | ||
| import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; | ||
| import org.springframework.stereotype.Component; | ||
| import org.springframework.web.reactive.function.client.WebClient; | ||
| import org.springframework.web.reactive.function.client.WebClientRequestException; | ||
| import org.springframework.web.reactive.function.client.WebClientResponseException; | ||
| import reactor.core.publisher.Mono; | ||
| import reactor.util.retry.Retry; | ||
| import reactor.util.retry.RetrySpec; | ||
|
|
||
| // A PostCommitOperation that PUTs a snapshot-stats record to the optimizer's per-table stats | ||
| // endpoint. | ||
| // | ||
| // prepare() returns a Mono that completes on HTTP 2xx and signals an error otherwise. The | ||
| // dispatcher owns the timeout, the subscription, error swallowing, and metric emission. The | ||
| // returned value is Optional.empty() when the table is not opted in via OPT_IN_PROPERTY or has | ||
| // no current snapshot. | ||
| // | ||
| // Internal retry is bounded by OptimizerStatsProperties.maxAttempts and fires only on retryable | ||
| // errors: network failures, a TimeoutException, or HTTP 408 / 429 / 5xx responses. The | ||
| // dispatcher's per-op timeout is the hard ceiling on the whole chain. | ||
| // | ||
| // The bean is wired only when optimizer.stats.enabled=true. PATH_TEMPLATE is intentionally | ||
| // duplicated from TableStatsController.TABLE_PATH_TEMPLATE so the tables service does not take a | ||
| // compile-time dependency on the optimizer jar. Keep both copies in sync. | ||
| @Slf4j | ||
| @Component | ||
| @ConditionalOnProperty(prefix = "optimizer.stats", name = "enabled", havingValue = "true") | ||
| public class OptimizerStatsPostCommitOperation implements PostCommitOperation { | ||
|
|
||
| // Metric tag value for the "op" tag. | ||
| static final String OP_NAME = "optimizer_stats"; | ||
|
|
||
| // Per-call URL path. Intentionally duplicated from TableStatsController.TABLE_PATH_TEMPLATE so | ||
| // that we avoid a compile-time dependency on the optimizer jar. Keep both copies in sync. | ||
| static final String PATH_TEMPLATE = "/v1/optimizer/stats/{tableUuid}"; | ||
|
|
||
| // Table-property key that opts a table in for the post-commit stats push. | ||
| static final String OPT_IN_PROPERTY = "maintenance.optimizer.stats.enabled"; | ||
|
|
||
| // Iceberg snapshot-summary keys we read. All values are decimal-string longs. | ||
| static final String SUMMARY_TOTAL_DATA_FILES = "total-data-files"; | ||
| static final String SUMMARY_TOTAL_FILES_SIZE = "total-files-size"; | ||
| static final String SUMMARY_ADDED_DATA_FILES = "added-data-files"; | ||
| static final String SUMMARY_DELETED_DATA_FILES = "deleted-data-files"; | ||
| static final String SUMMARY_ADDED_FILES_SIZE = "added-files-size"; | ||
| static final String SUMMARY_REMOVED_FILES_SIZE = "removed-files-size"; | ||
|
|
||
| private final WebClient webClient; | ||
| private final OptimizerStatsProperties properties; | ||
|
|
||
| public OptimizerStatsPostCommitOperation( | ||
| @Qualifier("optimizerStatsWebClient") WebClient webClient, | ||
| OptimizerStatsProperties properties) { | ||
| this.webClient = webClient; | ||
| this.properties = properties; | ||
| } | ||
|
|
||
| @Override | ||
| public String name() { | ||
| return OP_NAME; | ||
| } | ||
|
|
||
| @Override | ||
| public Optional<Mono<Void>> prepare(TableDto savedDto) { | ||
| if (!isOptedIn(savedDto)) { | ||
| return Optional.empty(); | ||
| } | ||
| Optional<CurrentSnapshotInfo> snapshot = savedDto.getCurrentSnapshot(); | ||
| if (!snapshot.isPresent()) { | ||
| return Optional.empty(); | ||
| } | ||
|
|
||
| OptimizerStatsRequest body = buildRequest(savedDto, snapshot.get()); | ||
| String tableUuid = savedDto.getTableUUID(); | ||
|
|
||
| RetrySpec retrySpec = | ||
| Retry.max(Math.max(0, properties.getMaxAttempts() - 1)).filter(this::isRetryable); | ||
|
|
||
| Mono<Void> chain = | ||
| webClient | ||
| .put() | ||
| .uri(PATH_TEMPLATE, tableUuid) | ||
| .bodyValue(body) | ||
| .retrieve() | ||
| .toBodilessEntity() | ||
| .timeout(Duration.ofMillis(properties.getPerAttemptTimeoutMs())) | ||
| .retryWhen(retrySpec.onRetryExhaustedThrow((spec, signal) -> signal.failure())) | ||
| .then(); | ||
| return Optional.of(chain); | ||
| } | ||
|
|
||
| // Returns true when the table's properties contain the literal opt-in value "true" for the | ||
| // configured OPT_IN_PROPERTY. | ||
| private boolean isOptedIn(TableDto saved) { | ||
| Map<String, String> props = saved.getTableProperties(); | ||
| return props != null && "true".equals(props.get(OPT_IN_PROPERTY)); | ||
| } | ||
|
|
||
| // Builds the wire body. Missing summary keys default to 0L. | ||
| OptimizerStatsRequest buildRequest(TableDto saved, CurrentSnapshotInfo snapshot) { | ||
| Map<String, String> summary = snapshot.getSummary(); | ||
| OptimizerStatsRequest.Snapshot snapshotPayload = | ||
| OptimizerStatsRequest.Snapshot.builder() | ||
| .snapshotId(snapshot.getSnapshotId()) | ||
| .tableVersion(saved.getTableVersion()) | ||
| .tableLocation(saved.getTableLocation()) | ||
| .tableSizeBytes(longOrZero(summary, SUMMARY_TOTAL_FILES_SIZE)) | ||
| .numCurrentFiles(longOrZero(summary, SUMMARY_TOTAL_DATA_FILES)) | ||
| .build(); | ||
| OptimizerStatsRequest.Delta delta = | ||
| OptimizerStatsRequest.Delta.builder() | ||
| .numFilesAdded(longOrZero(summary, SUMMARY_ADDED_DATA_FILES)) | ||
| .numFilesDeleted(longOrZero(summary, SUMMARY_DELETED_DATA_FILES)) | ||
| .addedSizeBytes(longOrZero(summary, SUMMARY_ADDED_FILES_SIZE)) | ||
| .deletedSizeBytes(longOrZero(summary, SUMMARY_REMOVED_FILES_SIZE)) | ||
| .build(); | ||
| return OptimizerStatsRequest.builder() | ||
| .databaseName(saved.getDatabaseId()) | ||
| .tableName(saved.getTableId()) | ||
| .stats(OptimizerStatsRequest.Stats.builder().snapshot(snapshotPayload).delta(delta).build()) | ||
| .tableProperties(saved.getTableProperties()) | ||
| .build(); | ||
| } | ||
|
|
||
| // Returns true for errors that a later attempt could plausibly succeed against: a per-attempt | ||
| // timeout, a network-level failure, or an HTTP 5xx / 408 / 429 response. | ||
| // | ||
| // Other 4xx responses are client errors that retries cannot fix, so we fail fast on them. | ||
| boolean isRetryable(Throwable e) { | ||
| if (e instanceof TimeoutException) { | ||
| return true; | ||
| } | ||
| if (e instanceof WebClientRequestException) { | ||
| return true; | ||
| } | ||
| if (e instanceof WebClientResponseException) { | ||
| int code = ((WebClientResponseException) e).getStatusCode().value(); | ||
| return code == 408 || code == 429 || (code >= 500 && code < 600); | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| private static long longOrZero(Map<String, String> m, String key) { | ||
| if (m == null) { | ||
| return 0L; | ||
| } | ||
| String v = m.get(key); | ||
| if (v == null) { | ||
| return 0L; | ||
| } | ||
| try { | ||
| return Long.parseLong(v); | ||
| } catch (NumberFormatException nfe) { | ||
| return 0L; | ||
| } | ||
| } | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be modeled as a postcommit operation. Postcommit operations must be bounded and async and are considered best-effort. We ahve one postcommit operation registered to the service which is optimizer stats. That way we have a generic behaviour and a specific impl.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. New
services/tables/.../services/postcommit/package:PostCommitOperationinterface (name()+Optional<Mono<Void>> prepare(TableDto)),PostCommitProperties(enabled,perOpTimeoutMs),PostCommitDispatcher(per-op timeout, error swallow, metric emission, fire-and-forget subscribe).OptimizerStatsPostCommitOperationis the first registered impl.IcebergSnapshotsServiceImplnow has one line:postCommitDispatcher.ifPresent(d -> d.dispatch(savedDto)). Adding a future operation = implement the interface and let@ConditionalOnPropertywire it; no service-layer change.