From ebd723018f33df6934252ca8902a0e4d16754e62 Mon Sep 17 00:00:00 2001 From: mkuchenbecker Date: Fri, 29 May 2026 00:28:22 -0700 Subject: [PATCH] fix(catalog): abort doCommit on stale-base divergence Aborts doCommit when the writer's declared base (COMMIT_KEY) diverges from the catalog's actual persisted base, preventing the BaseTransaction.applyUpdates silent-rebase lost-update (incident-12185), where applyUpdates re-stamps the writer's original non-null COMMIT_KEY on top of a concurrently-advanced base. Commits that leave COMMIT_KEY unset (wholesale replace/create) are authoritative over the snapshot set and are intentionally not defended. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../OpenHouseInternalTableOperations.java | 52 +++++++++ .../OpenHouseInternalTableOperationsTest.java | 102 ++++++++++++++++++ 2 files changed, 154 insertions(+) diff --git a/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperations.java b/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperations.java index d1636bda9..f3140bed8 100644 --- a/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperations.java +++ b/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperations.java @@ -259,6 +259,9 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { try { // Now that we have metadataLocation we stamp it in metadata property. Map properties = new HashMap<>(metadata.properties()); + + abortIfWriterBaseDivergedFromCatalog(base, metadata); + failIfRetryUpdate(properties); restoreOverriddenProperties(properties); @@ -575,6 +578,55 @@ static SortOrder rebuildSortOrder(SortOrder originalSortOrder, Schema newSchema) return builder.build(); } + /** + * Catalog-level CAS. Aborts the commit if the writer's declared base ({@code COMMIT_KEY}) does + * not match the catalog's current persisted base. Closes the BaseTransaction.applyUpdates + * silent-rebase variant of the stale-base bug class, where {@code applyUpdates} re-stamps the + * writer's original (non-null) {@code COMMIT_KEY} on top of a concurrently-advanced base. + * + *

Commits that leave {@code COMMIT_KEY} unset (wholesale replace / create — replaceTable, + * stage-create, stage-replace) are authoritative over the snapshot set and are intentionally not + * defended: there is no stale base to compare against. + * + *

Must run before failIfRetryUpdate, which strips COMMIT_KEY from the doCommit-local + * properties copy. + * + * @throws CommitFailedException when writer and catalog disagree on the base — retriable by + * Iceberg's commit loop after the writer refreshes + */ + private void abortIfWriterBaseDivergedFromCatalog(TableMetadata base, TableMetadata metadata) { + if (base == null || base.metadataFileLocation() == null) { + // No persisted catalog state to defend (initial CREATE, or mid-CREATE constructed + // metadata before any metadata.json has been written). + return; + } + if (!metadata.properties().containsKey(CatalogConstants.SNAPSHOTS_JSON_KEY)) { + // Not a snapshot-bearing writer commit (e.g. rename, property-only update, internal + // metadata-field write). These paths legitimately have no COMMIT_KEY because they don't + // go through OpenHouseInternalRepositoryImpl.save:187, and they don't carry the + // stale-snapshot-list payload that the subtractive merge would silently expire. + return; + } + String actualBase = base.metadataFileLocation(); + String writerClaimedBase = metadata.properties().get(CatalogConstants.COMMIT_KEY); + + if (writerClaimedBase == null) { + return; + } + + if (CatalogConstants.INITIAL_VERSION.equals(writerClaimedBase) + || !new org.apache.hadoop.fs.Path(writerClaimedBase) + .toUri() + .getPath() + .equals(new org.apache.hadoop.fs.Path(actualBase).toUri().getPath())) { + throw new CommitFailedException( + "Cannot commit: writer's declared base [%s] does not match the catalog's current " + + "base [%s] for table %s. A concurrent commit landed between the writer's " + + "loadTable and commit. Refresh and retry.", + writerClaimedBase, actualBase, tableIdentifier); + } + } + /** * If this commit comes from Iceberg built-in retry in * org.apache.iceberg.PropertiesUpdate#commit() Then throw fatal {@link CommitFailedException} to diff --git a/iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperationsTest.java b/iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperationsTest.java index 57aef978c..14c586fc2 100644 --- a/iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperationsTest.java +++ b/iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperationsTest.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.nio.file.Files; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -213,6 +214,107 @@ void testDoCommitAppendSnapshotsExistingVersion() throws IOException { } } + /** + * Deterministic local reproduction of incident-12185 -- the {@code BaseTransaction.applyUpdates} + * silent-rebase variant of the stale-base lost-update class (2026-05-25 WAR fingerprint, where + * snapshot 3635817277608242413 was silently rebased out by a concurrent commit). + * + *

Reconstructed at the {@code doCommit} boundary (no threads, no deploy): + * + *

    + *
  • A writer loaded the table at base T_X and staged a commit: {@code COMMIT_KEY=T_X} and a + * {@code SNAPSHOTS_JSON} payload computed against T_X (does NOT contain the racing + * snapshot). + *
  • A concurrent commit landed, advancing the catalog to T_Y, which DOES contain the racing + * snapshot. + *
  • {@code BaseTransaction.applyUpdates} silently refreshed the in-flight base T_X -> T_Y + * and re-applied the staged {@code PropertiesUpdate}: {@code COMMIT_KEY} is re-stamped as + * T_X on top of T_Y, while {@code SNAPSHOTS_JSON} stays the writer's stale + * (racing-snapshot-free) list. + *
  • {@code doCommit} then runs with {@code base=T_Y} (metadataFileLocation non-null) but + * {@code COMMIT_KEY=T_X}. + *
+ * + *

On the unfixed catalog, {@code doCommit}'s subtractive snapshot merge computes {@code + * toRemove = T_Y.snapshots() - stalePayload = {racingSnapshot}} and SILENTLY drops it (no + * exception, {@code save()} invoked) -- the lost update. {@code doCommit} MUST instead detect + * that the writer's declared base ({@code COMMIT_KEY=T_X}) diverges from the actual base (T_Y) + * and abort with {@link CommitFailedException} so Iceberg refreshes and retries against T_Y + * (where a recomputed expire/append keeps the racing snapshot). + * + *

This test asserts the required abort, so it FAILS on the current checkout (the racing + * snapshot is silently dropped instead) and PASSES once the {@code doCommit} stale-base CAS (OSS + * PR #612) is applied. + */ + @Test + void testDoCommitMustAbortStaleBaseRebaseToPreventSnapshotLoss() throws IOException { + List testSnapshots = IcebergTestUtil.getSnapshots(); + Snapshot writerKnown1 = testSnapshots.get(0); + Snapshot writerKnown2 = testSnapshots.get(1); + // Added concurrently (lands in T_Y); absent from the writer's stale payload. + Snapshot racingSnapshot = testSnapshots.get(2); + + // The base the writer THOUGHT it was committing against (T_X) -- different from the catalog + // head. + String writerClaimedBaseLocation = + "/test/openhouse/test_db/test_table/00001-writer-claimed-base.metadata.json"; + + // Build T_Y (post-refresh base) with all three snapshots and round-trip it through + // TableMetadataParser so metadataFileLocation() is non-null, matching what Iceberg passes into + // doCommit for a base loaded from disk after applyUpdates' silent refresh. + java.nio.file.Path tmpDir = Files.createTempDirectory("oh-incident-12185-repro"); + String postRefreshBasePath = tmpDir.resolve("00010-post-refresh-base.metadata.json").toString(); + TableMetadata buildable = + TableMetadata.buildFrom(BASE_TABLE_METADATA) + .setBranchSnapshot(writerKnown1, SnapshotRef.MAIN_BRANCH) + .setBranchSnapshot(writerKnown2, SnapshotRef.MAIN_BRANCH) + .setBranchSnapshot(racingSnapshot, SnapshotRef.MAIN_BRANCH) + .build(); + Path postRefreshBaseFsPath = new Path(postRefreshBasePath); + FileSystem fs = postRefreshBaseFsPath.getFileSystem(new Configuration()); + try (FSDataOutputStream out = fs.create(postRefreshBaseFsPath, true)) { + out.write(TableMetadataParser.toJson(buildable).getBytes()); + } + TableMetadata postRefreshBase = + TableMetadataParser.read(new HadoopFileIO(new Configuration()), postRefreshBasePath); + Assertions.assertNotNull(postRefreshBase.metadataFileLocation()); + + // The writer's stale payload: only the snapshots it knew about at T_X (racing snapshot + // omitted). + List staleWriterPayload = Arrays.asList(writerKnown1, writerKnown2); + Map properties = new HashMap<>(); + properties.put( + CatalogConstants.SNAPSHOTS_JSON_KEY, SnapshotsUtil.serializedSnapshots(staleWriterPayload)); + properties.put( + CatalogConstants.SNAPSHOTS_REFS_KEY, + SnapshotsUtil.serializeMap(IcebergTestUtil.createMainBranchRefPointingTo(writerKnown2))); + properties.put(getCanonicalFieldName("tableLocation"), TEST_LOCATION); + // applyUpdates re-stamped the writer's ORIGINAL claimed base here, even though base is now T_Y. + properties.put(CatalogConstants.COMMIT_KEY, writerClaimedBaseLocation); + + TableMetadata metadata = postRefreshBase.replaceProperties(properties); + + try (MockedStatic ignoreWriteMock = + Mockito.mockStatic(TableMetadataParser.class)) { + CommitFailedException thrown = + Assertions.assertThrows( + CommitFailedException.class, + () -> openHouseInternalTableOperations.doCommit(postRefreshBase, metadata), + "REPRO incident-12185: doCommit must abort when the writer's declared base (COMMIT_KEY) " + + "diverges from the catalog's actual base. On the unfixed catalog no exception is " + + "thrown and the subtractive merge silently expires the racing snapshot " + + racingSnapshot.snapshotId() + + " -- reproducing the lost update."); + + Assertions.assertTrue( + thrown.getMessage().contains("Cannot commit"), + "Expected stale-base abort message, got: " + thrown.getMessage()); + + // The racing snapshot must never have been persisted out of existence. + Mockito.verify(mockHouseTableRepository, Mockito.never()).save(Mockito.any()); + } + } + /** * Tests committing changes that both append new snapshots and delete existing ones. Verifies that * both appended and deleted snapshots are correctly reflected in table metadata.