Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
try {
// Now that we have metadataLocation we stamp it in metadata property.
Map<String, String> properties = new HashMap<>(metadata.properties());

abortIfWriterBaseDivergedFromCatalog(base, metadata);

failIfRetryUpdate(properties);
restoreOverriddenProperties(properties);

Expand Down Expand Up @@ -575,6 +578,55 @@ static SortOrder rebuildSortOrder(SortOrder originalSortOrder, Schema newSchema)
return builder.build();
}

/**
* Catalog-level CAS. Aborts the commit if the writer's declared base ({@code COMMIT_KEY}) does
* not match the catalog's current persisted base. Closes the BaseTransaction.applyUpdates
* silent-rebase variant of the stale-base bug class, where {@code applyUpdates} re-stamps the
* writer's original (non-null) {@code COMMIT_KEY} on top of a concurrently-advanced base.
*
* <p>Commits that leave {@code COMMIT_KEY} unset (wholesale replace / create — replaceTable,
* stage-create, stage-replace) are authoritative over the snapshot set and are intentionally not
* defended: there is no stale base to compare against.
*
* <p>Must run before failIfRetryUpdate, which strips COMMIT_KEY from the doCommit-local
* properties copy.
*
* @throws CommitFailedException when writer and catalog disagree on the base — retriable by
* Iceberg's commit loop after the writer refreshes
*/
private void abortIfWriterBaseDivergedFromCatalog(TableMetadata base, TableMetadata metadata) {
if (base == null || base.metadataFileLocation() == null) {
// No persisted catalog state to defend (initial CREATE, or mid-CREATE constructed
// metadata before any metadata.json has been written).
return;
}
if (!metadata.properties().containsKey(CatalogConstants.SNAPSHOTS_JSON_KEY)) {
// Not a snapshot-bearing writer commit (e.g. rename, property-only update, internal
// metadata-field write). These paths legitimately have no COMMIT_KEY because they don't
// go through OpenHouseInternalRepositoryImpl.save:187, and they don't carry the
// stale-snapshot-list payload that the subtractive merge would silently expire.
return;
}
String actualBase = base.metadataFileLocation();
String writerClaimedBase = metadata.properties().get(CatalogConstants.COMMIT_KEY);

if (writerClaimedBase == null) {
return;
}

if (CatalogConstants.INITIAL_VERSION.equals(writerClaimedBase)
|| !new org.apache.hadoop.fs.Path(writerClaimedBase)
.toUri()
.getPath()
.equals(new org.apache.hadoop.fs.Path(actualBase).toUri().getPath())) {
throw new CommitFailedException(
"Cannot commit: writer's declared base [%s] does not match the catalog's current "
+ "base [%s] for table %s. A concurrent commit landed between the writer's "
+ "loadTable and commit. Refresh and retry.",
writerClaimedBase, actualBase, tableIdentifier);
}
}

/**
* If this commit comes from Iceberg built-in retry in
* org.apache.iceberg.PropertiesUpdate#commit() Then throw fatal {@link CommitFailedException} to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -213,6 +214,107 @@ void testDoCommitAppendSnapshotsExistingVersion() throws IOException {
}
}

/**
* Deterministic local reproduction of incident-12185 -- the {@code BaseTransaction.applyUpdates}
* silent-rebase variant of the stale-base lost-update class (2026-05-25 WAR fingerprint, where
* snapshot 3635817277608242413 was silently rebased out by a concurrent commit).
*
* <p>Reconstructed at the {@code doCommit} boundary (no threads, no deploy):
*
* <ul>
* <li>A writer loaded the table at base T_X and staged a commit: {@code COMMIT_KEY=T_X} and a
* {@code SNAPSHOTS_JSON} payload computed against T_X (does NOT contain the racing
* snapshot).
* <li>A concurrent commit landed, advancing the catalog to T_Y, which DOES contain the racing
* snapshot.
* <li>{@code BaseTransaction.applyUpdates} silently refreshed the in-flight base T_X -&gt; T_Y
* and re-applied the staged {@code PropertiesUpdate}: {@code COMMIT_KEY} is re-stamped as
* T_X on top of T_Y, while {@code SNAPSHOTS_JSON} stays the writer's stale
* (racing-snapshot-free) list.
* <li>{@code doCommit} then runs with {@code base=T_Y} (metadataFileLocation non-null) but
* {@code COMMIT_KEY=T_X}.
* </ul>
*
* <p>On the unfixed catalog, {@code doCommit}'s subtractive snapshot merge computes {@code
* toRemove = T_Y.snapshots() - stalePayload = {racingSnapshot}} and SILENTLY drops it (no
* exception, {@code save()} invoked) -- the lost update. {@code doCommit} MUST instead detect
* that the writer's declared base ({@code COMMIT_KEY=T_X}) diverges from the actual base (T_Y)
* and abort with {@link CommitFailedException} so Iceberg refreshes and retries against T_Y
* (where a recomputed expire/append keeps the racing snapshot).
*
* <p>This test asserts the required abort, so it FAILS on the current checkout (the racing
* snapshot is silently dropped instead) and PASSES once the {@code doCommit} stale-base CAS (OSS
* PR #612) is applied.
*/
@Test
void testDoCommitMustAbortStaleBaseRebaseToPreventSnapshotLoss() throws IOException {
List<Snapshot> testSnapshots = IcebergTestUtil.getSnapshots();
Snapshot writerKnown1 = testSnapshots.get(0);
Snapshot writerKnown2 = testSnapshots.get(1);
// Added concurrently (lands in T_Y); absent from the writer's stale payload.
Snapshot racingSnapshot = testSnapshots.get(2);

// The base the writer THOUGHT it was committing against (T_X) -- different from the catalog
// head.
String writerClaimedBaseLocation =
"/test/openhouse/test_db/test_table/00001-writer-claimed-base.metadata.json";

// Build T_Y (post-refresh base) with all three snapshots and round-trip it through
// TableMetadataParser so metadataFileLocation() is non-null, matching what Iceberg passes into
// doCommit for a base loaded from disk after applyUpdates' silent refresh.
java.nio.file.Path tmpDir = Files.createTempDirectory("oh-incident-12185-repro");
String postRefreshBasePath = tmpDir.resolve("00010-post-refresh-base.metadata.json").toString();
TableMetadata buildable =
TableMetadata.buildFrom(BASE_TABLE_METADATA)
.setBranchSnapshot(writerKnown1, SnapshotRef.MAIN_BRANCH)
.setBranchSnapshot(writerKnown2, SnapshotRef.MAIN_BRANCH)
.setBranchSnapshot(racingSnapshot, SnapshotRef.MAIN_BRANCH)
.build();
Path postRefreshBaseFsPath = new Path(postRefreshBasePath);
FileSystem fs = postRefreshBaseFsPath.getFileSystem(new Configuration());
try (FSDataOutputStream out = fs.create(postRefreshBaseFsPath, true)) {
out.write(TableMetadataParser.toJson(buildable).getBytes());
}
TableMetadata postRefreshBase =
TableMetadataParser.read(new HadoopFileIO(new Configuration()), postRefreshBasePath);
Assertions.assertNotNull(postRefreshBase.metadataFileLocation());

// The writer's stale payload: only the snapshots it knew about at T_X (racing snapshot
// omitted).
List<Snapshot> staleWriterPayload = Arrays.asList(writerKnown1, writerKnown2);
Map<String, String> properties = new HashMap<>();
properties.put(
CatalogConstants.SNAPSHOTS_JSON_KEY, SnapshotsUtil.serializedSnapshots(staleWriterPayload));
properties.put(
CatalogConstants.SNAPSHOTS_REFS_KEY,
SnapshotsUtil.serializeMap(IcebergTestUtil.createMainBranchRefPointingTo(writerKnown2)));
properties.put(getCanonicalFieldName("tableLocation"), TEST_LOCATION);
// applyUpdates re-stamped the writer's ORIGINAL claimed base here, even though base is now T_Y.
properties.put(CatalogConstants.COMMIT_KEY, writerClaimedBaseLocation);

TableMetadata metadata = postRefreshBase.replaceProperties(properties);

try (MockedStatic<TableMetadataParser> ignoreWriteMock =
Mockito.mockStatic(TableMetadataParser.class)) {
CommitFailedException thrown =
Assertions.assertThrows(
CommitFailedException.class,
() -> openHouseInternalTableOperations.doCommit(postRefreshBase, metadata),
"REPRO incident-12185: doCommit must abort when the writer's declared base (COMMIT_KEY) "
+ "diverges from the catalog's actual base. On the unfixed catalog no exception is "
+ "thrown and the subtractive merge silently expires the racing snapshot "
+ racingSnapshot.snapshotId()
+ " -- reproducing the lost update.");

Assertions.assertTrue(
thrown.getMessage().contains("Cannot commit"),
"Expected stale-base abort message, got: " + thrown.getMessage());

// The racing snapshot must never have been persisted out of existence.
Mockito.verify(mockHouseTableRepository, Mockito.never()).save(Mockito.any());
}
}

/**
* Tests committing changes that both append new snapshots and delete existing ones. Verifies that
* both appended and deleted snapshots are correctly reflected in table metadata.
Expand Down