Skip to content

Fix multi-schema replication intermediates dropped on snapshots-endpoint commits#603

Open
dushyantk1509 wants to merge 1 commit into
linkedin:mainfrom
dushyantk1509:dushyantk1509/fix-snapshot-mapper-intermediate-schemas
Open

Fix multi-schema replication intermediates dropped on snapshots-endpoint commits#603
dushyantk1509 wants to merge 1 commit into
linkedin:mainfrom
dushyantk1509:dushyantk1509/fix-snapshot-mapper-intermediate-schemas

Conversation

@dushyantk1509
Copy link
Copy Markdown
Contributor

@dushyantk1509 dushyantk1509 commented May 27, 2026

Summary

Replication commits arriving through PUT /iceberg/v2/snapshots with multi-schema-jump intermediate schemas were silently losing them due to two issues that combine to leave the dest replica with a snapshot whose schema-id points at a schema not present in schemas[]. Trino strictly resolves snapshot schema-id against schemas[] and fails with "Cannot find schema with schema id N"; Spark falls back to current-schema-id and reads succeed.

Two fixes:

  1. TablesMapper#toTableDto(TableDto, IcebergSnapshotsRequestBody) was missing the @mapping for newIntermediateSchemas. The CreateUpdateTableRequestBody overload had it (added in Add multi schema update support for tables during replica table commits #407), but the IcebergSnapshotsRequestBody overload used for the snapshots endpoint did not — so intermediates packed correctly by the client got dropped at the server-side DTO mapping step.

  2. OpenHouseTableOperations#doCommit (after [RTAS] Add client side support (part 3) #524 introduced putSnapshotsForReplace) was routing cross-cluster REPLICA_TABLE replication commits through the RTAS replace path because they update both metadata and snapshots in one commit. The server's REPLACE branch treats the commit as a fresh table creation and uses CLIENT_TABLE_SCHEMA (bootstrap-era schema) as the final schema, not the request's actual target. This produces a different but related corruption pattern. Detect REPLICA commits (base REPLICA_TABLE + metadata PRIMARY_TABLE + different cluster IDs) and route them through the regular putSnapshots path so the intermediate-schemas plumbing fires.

Both fixes are needed in tandem: without (1), even the regular update branch drops intermediates; without (2), commits hitting the post-#524 client get routed around (1) entirely.

Changes

  • Client-facing API Changes
  • Internal API Changes
  • Bug Fixes
  • New Features
  • Performance Improvements
  • Code Style
  • Refactoring
  • Documentation
  • Tests

For all the boxes checked, please include additional details of the changes made in this pull request.

Testing Done

  • Manually Tested on local docker setup. Please include commands ran, and their output.
  • Added new tests for the changes made.
  • Updated existing tests to reflect the changes made.
  • No tests added or updated. Please explain why. If unsure, please feel free to ask for help.
  • Some other form of testing like staging or soak time in production. Please explain.

For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request.

Additional Information

  • Breaking Changes
  • Deprecations
  • Large PR broken into smaller PRs, and PR plan linked in the description.

For all the boxes checked, include additional details of the changes made in this pull request.

…int commits

Replication commits arriving through PUT /iceberg/v2/snapshots with
multi-schema-jump intermediate schemas were silently losing them due to two
issues that combine to leave the dest replica with a snapshot whose schema-id
points at a schema not present in schemas[]. Trino strictly resolves snapshot
schema-id against schemas[] and fails with "Cannot find schema with schema id N";
Spark falls back to current-schema-id and reads succeed.

Two fixes:

1. TablesMapper#toTableDto(TableDto, IcebergSnapshotsRequestBody) was missing
   the @mapping for newIntermediateSchemas. The CreateUpdateTableRequestBody
   overload had it (added in linkedin#407), but the IcebergSnapshotsRequestBody overload
   used for the snapshots endpoint did not — so intermediates packed correctly
   by the client got dropped at the server-side DTO mapping step.

2. OpenHouseTableOperations#doCommit (after linkedin#524 introduced
   putSnapshotsForReplace) was routing cross-cluster REPLICA_TABLE replication
   commits through the RTAS replace path because they update both metadata and
   snapshots in one commit. The server's REPLACE branch treats the commit as a
   fresh table creation and uses CLIENT_TABLE_SCHEMA (bootstrap-era schema) as
   the final schema, not the request's actual target. This produces a different
   but related corruption pattern. Detect REPLICA commits (base REPLICA_TABLE +
   metadata PRIMARY_TABLE + different cluster IDs) and route them through the
   regular putSnapshots path so the intermediate-schemas plumbing fires.

Both fixes are needed in tandem: without (1), even the regular update branch
drops intermediates; without (2), commits hitting the post-linkedin#524 client get
routed around (1) entirely.

Unit tests:
- TablesMapperTest#testToTableDtoWithPutSnapshotsPreservesNewIntermediateSchemas
  pins the mapper extraction.
- OpenHouseTableOperationsTest#testDoCommitRoutesReplicaTableThroughPutSnapshotsNotReplace
  pins REPLICA dispatch goes to regular putSnapshots (replaceCommit=false).
- OpenHouseTableOperationsTest#testDoCommitRoutesPrimaryRtasThroughPutSnapshotsForReplace
  pins genuine RTAS on PRIMARY tables still uses the replace path.
@dushyantk1509 dushyantk1509 marked this pull request as ready for review May 27, 2026 17:02
Comment on lines +120 to +122
boolean isReplicaCommit =
getTableType(base, metadata) == CreateUpdateTableRequestBody.TableTypeEnum.REPLICA_TABLE;
if (metadataUpdated && snapshotsUpdated && base != null && !isReplicaCommit) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What makes a replica commit different that we need to special case?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that mean that this excludes RTAS commit for replica table?

// the replace path treats the commit as a fresh table creation and does not preserve
// multi-schema delta semantics. Reserve putSnapshotsForReplace for genuine CTAS/RTAS, where
// the metadata being committed is PRIMARY (not a REPLICA_TABLE update).
boolean isReplicaCommit =
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about if replica, then putsnapshots?

boolean isReplicaCommit =
getTableType(base, metadata) == CreateUpdateTableRequestBody.TableTypeEnum.REPLICA_TABLE;
if (metadataUpdated && snapshotsUpdated && base != null && !isReplicaCommit) {
// Only CTAS and RTAS can update both metadata and snapshots at the same time.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment is no longer accurate.

.get(OPENHOUSE_CLUSTER_ID_KEY)
.equals(metadata.properties().get(OPENHOUSE_CLUSTER_ID_KEY))) {
return baseTableType;
String baseTypeStr = base.properties().get(OPENHOUSE_TABLE_TYPE_KEY);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why might this not exist?

@VisibleForTesting
CreateUpdateTableRequestBody.TableTypeEnum getTableType(
TableMetadata base, TableMetadata metadata) {
String metaTypeStr = metadata.properties().get(OPENHOUSE_TABLE_TYPE_KEY);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, I would move this to after the if so on the case where base is null its not evaluated.

Comment on lines +247 to +252
if (baseTableType == CreateUpdateTableRequestBody.TableTypeEnum.REPLICA_TABLE
&& metadataTableType == CreateUpdateTableRequestBody.TableTypeEnum.PRIMARY_TABLE
&& baseCluster != null
&& !baseCluster.equals(metaCluster)) {
return baseTableType;
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic does not makse sense to me. Why can we not trust the table type?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to understand the change here. Seems like this change is applicable only for replica table to determine table type?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants