diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java new file mode 100644 index 000000000..14fd88684 --- /dev/null +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/StaleBaseLostUpdateTest.java @@ -0,0 +1,315 @@ +package com.linkedin.openhouse.tables.e2e.h2; + +import static com.linkedin.openhouse.common.api.validator.ValidatorConstants.INITIAL_TABLE_VERSION; +import static com.linkedin.openhouse.tables.model.TableModelConstants.TABLE_DTO; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +import com.linkedin.openhouse.common.test.cluster.PropertyOverrideContextInitializer; +import com.linkedin.openhouse.internal.catalog.CatalogConstants; +import com.linkedin.openhouse.internal.catalog.OpenHouseInternalTableOperations; +import com.linkedin.openhouse.internal.catalog.SnapshotsUtil; +import com.linkedin.openhouse.tables.model.IcebergSnapshotsModelTestUtilities; +import com.linkedin.openhouse.tables.model.TableDto; +import com.linkedin.openhouse.tables.model.TableDtoPrimaryKey; +import com.linkedin.openhouse.tables.repository.OpenHouseInternalRepository; +import java.lang.reflect.Field; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotParser; +import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ContextConfiguration; + +/** + * Concurrency tests for committing a snapshot set against a stale base version. + * + *

In each test the table is at version L1, a second writer commits a new snapshot (advancing the + * catalog to L2), and then a writer that still declares L1 as its base commits a snapshot set + * computed at L1 — which therefore omits the snapshot the second writer just added. The catalog + * must not drop that concurrently added snapshot: the stale commit is rejected and the + * concurrently added snapshot remains in the table. + */ +@SpringBootTest +@ContextConfiguration(initializers = PropertyOverrideContextInitializer.class) +@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS) +public class StaleBaseLostUpdateTest { + + @Autowired OpenHouseInternalRepository openHouseInternalRepository; + + @Autowired Catalog catalog; + + /** + * An expiration commit declaring a stale base, racing a concurrent insert. The expiring writer + * keeps only the current head and drops older snapshots; its kept set, computed at the stale + * base, does not include the concurrently inserted snapshot. The concurrent insert must remain + * and the expiration must be rejected, leaving the prior snapshots plus the concurrent insert. + */ + @Test + void testExpireSnapshotsDropsConcurrentDataCommit() throws Exception { + TableDto l1 = createTableWithCommittedDataSnapshots("expire_race", 2); + TableIdentifier id = TableIdentifier.of(l1.getDatabaseId(), l1.getTableId()); + + Table staleHandle = catalog.loadTable(id); + List base = Lists.newArrayList(staleHandle.snapshots()); + Snapshot head = staleHandle.currentSnapshot(); + List keepOnlyHead = Lists.newArrayList(head); + + assertRacingDataCommitSurvivesStaleCommit(l1, base, keepOnlyHead, head); + } + + /** + * Two writers each perform the first insert into a freshly created table. One commits first; the + * other then commits its own first snapshot still declaring the create version as its base. The + * first writer's snapshot must remain. + */ + @Test + void testStaleInsertDropsConcurrentDataCommitOnFreshTable() throws Exception { + TableDto l1 = createTableWithCommittedDataSnapshots("insert_race_fresh", 0); + TableIdentifier id = TableIdentifier.of(l1.getDatabaseId(), l1.getTableId()); + + Table staleHandle = catalog.loadTable(id); + List base = Lists.newArrayList(staleHandle.snapshots()); // empty + Snapshot staleInsert = staleHandle.newAppend().appendFile(dummyDataFile()).apply(); + List stalePayload = new ArrayList<>(base); + stalePayload.add(staleInsert); + + assertRacingDataCommitSurvivesStaleCommit(l1, base, stalePayload, staleInsert); + } + + /** + * Two writers insert into a table that already holds data, both based on the same version. The + * stale writer's appended snapshot shares a sequence number with the concurrent insert, so the + * catalog rejects the stale commit; the concurrent insert remains. + */ + @Test + void testConcurrentInsertOnPopulatedTableIsRejected() throws Exception { + TableDto l1 = createTableWithCommittedDataSnapshots("insert_race_populated", 2); + TableIdentifier id = TableIdentifier.of(l1.getDatabaseId(), l1.getTableId()); + + Table staleHandle = catalog.loadTable(id); + List base = Lists.newArrayList(staleHandle.snapshots()); + Snapshot staleInsert = staleHandle.newAppend().appendFile(dummyDataFile()).apply(); + List stalePayload = new ArrayList<>(base); + stalePayload.add(staleInsert); + + assertRacingDataCommitSurvivesStaleCommit(l1, base, stalePayload, staleInsert); + } + + /** + * A stale commit that changes a table property and adds no snapshot, racing a concurrent insert. + * Its declared snapshot set is its base view, which omits the concurrent insert, so the diff is a + * pure deletion of the concurrent snapshot. The concurrent insert must remain and the stale + * commit must be rejected. + */ + @Test + void testStaleMetadataUpdateDropsConcurrentDataCommit() throws Exception { + TableDto l1 = createTableWithCommittedDataSnapshots("metadata_update_race", 2); + TableIdentifier id = TableIdentifier.of(l1.getDatabaseId(), l1.getTableId()); + + Table staleHandle = catalog.loadTable(id); + List base = Lists.newArrayList(staleHandle.snapshots()); + Snapshot head = staleHandle.currentSnapshot(); + + // Stale writer: set a table property and carry the base snapshot view (no new snapshot), held. + Transaction staleTxn = staleHandle.newTransaction(); + staleTxn + .updateProperties() + .set("test.stale.property", "changed") + .set(CatalogConstants.SNAPSHOTS_JSON_KEY, SnapshotsUtil.serializedSnapshots(base)) + .set( + CatalogConstants.SNAPSHOTS_REFS_KEY, + SnapshotsUtil.serializeMap( + IcebergSnapshotsModelTestUtilities.obtainSnapshotRefsFromSnapshot( + SnapshotParser.toJson(head)))) + .set(CatalogConstants.COMMIT_KEY, l1.getTableLocation()) + .commit(); + + // Second writer commits a fresh data snapshot, advancing the catalog. + Snapshot racing = catalog.loadTable(id).newAppend().appendFile(dummyDataFile()).apply(); + List snapshotsAfterRace = new ArrayList<>(base); + snapshotsAfterRace.add(racing); + openHouseInternalRepository.save( + l1.toBuilder() + .tableVersion(l1.getTableLocation()) + .jsonSnapshots( + snapshotsAfterRace.stream() + .map(SnapshotParser::toJson) + .collect(Collectors.toList())) + .snapshotRefs( + IcebergSnapshotsModelTestUtilities.obtainSnapshotRefsFromSnapshot( + SnapshotParser.toJson(racing))) + .build()); + + clearRetryCache(); + + Set expected = base.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); + expected.add(racing.snapshotId()); + Assertions.assertThrows( + Exception.class, + staleTxn::commitTransaction, + "the stale property update must be rejected, not applied against the advanced catalog"); + Set actual = + Lists.newArrayList(catalog.loadTable(id).snapshots()).stream() + .map(Snapshot::snapshotId) + .collect(Collectors.toSet()); + Assertions.assertEquals( + expected, + actual, + "table must hold exactly the prior snapshots plus the concurrently committed snapshot " + + racing.snapshotId()); + + cleanup(id); + } + + /** + * Loads the table at version {@code l1}, has a second writer commit a fresh data snapshot + * (advancing the catalog), then commits a writer that still declares {@code l1} as its base with + * {@code stalePayload} (pointing main at {@code staleHead}) — a snapshot set that omits the second + * writer's snapshot. After the stale commit, the table must hold exactly the prior snapshots plus + * the second writer's snapshot: the stale commit applies none of its payload. + * + * @param base the snapshots present at {@code l1} + * @param stalePayload the snapshot set the stale writer commits (omits the concurrent snapshot) + * @param staleHead the snapshot the stale writer points main at + */ + private void assertRacingDataCommitSurvivesStaleCommit( + TableDto l1, List base, List stalePayload, Snapshot staleHead) + throws Exception { + TableIdentifier id = TableIdentifier.of(l1.getDatabaseId(), l1.getTableId()); + + // Stale writer holds a transaction at L1 staging its payload, left uncommitted. + Transaction staleTxn = catalog.loadTable(id).newTransaction(); + staleTxn + .updateProperties() + .set(CatalogConstants.SNAPSHOTS_JSON_KEY, SnapshotsUtil.serializedSnapshots(stalePayload)) + .set( + CatalogConstants.SNAPSHOTS_REFS_KEY, + SnapshotsUtil.serializeMap( + IcebergSnapshotsModelTestUtilities.obtainSnapshotRefsFromSnapshot( + SnapshotParser.toJson(staleHead)))) + .set(CatalogConstants.COMMIT_KEY, l1.getTableLocation()) + .commit(); + + // Second writer, also based on L1, commits a fresh data snapshot, advancing the catalog to L2. + Snapshot racing = catalog.loadTable(id).newAppend().appendFile(dummyDataFile()).apply(); + List snapshotsAfterRace = new ArrayList<>(base); + snapshotsAfterRace.add(racing); + openHouseInternalRepository.save( + l1.toBuilder() + .tableVersion(l1.getTableLocation()) + .jsonSnapshots( + snapshotsAfterRace.stream() + .map(SnapshotParser::toJson) + .collect(Collectors.toList())) + .snapshotRefs( + IcebergSnapshotsModelTestUtilities.obtainSnapshotRefsFromSnapshot( + SnapshotParser.toJson(racing))) + .build()); + + // Evaluate the stale commit on its base version rather than short-circuit it as a duplicate. + clearRetryCache(); + + // The stale commit declares a base that no longer matches the catalog, so it must be rejected + // (the rejection's exception type is not part of the contract). + Assertions.assertThrows( + Exception.class, + staleTxn::commitTransaction, + "the stale commit must be rejected, not applied against the advanced catalog"); + + // The table must then hold exactly the prior snapshots plus the concurrently committed snapshot. + Set expected = base.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); + expected.add(racing.snapshotId()); + Set actual = + Lists.newArrayList(catalog.loadTable(id).snapshots()).stream() + .map(Snapshot::snapshotId) + .collect(Collectors.toSet()); + Assertions.assertEquals( + expected, + actual, + "table must hold exactly the prior snapshots plus the concurrently committed snapshot " + + racing.snapshotId()); + + cleanup(id); + } + + /** + * Creates a table and commits {@code count} data-bearing snapshots, returning the latest + * committed {@link TableDto} (the version both writers load as their base). + */ + private TableDto createTableWithCommittedDataSnapshots(String tableId, int count) + throws Exception { + Schema schema = + new Schema( + required(1, "id", Types.StringType.get()), optional(2, "data", Types.StringType.get())); + TableDto dto = + openHouseInternalRepository.save( + TABLE_DTO + .toBuilder() + .tableId(tableId) + .schema(SchemaParser.toJson(schema, false)) + .timePartitioning(null) + .clustering(null) + .tableVersion(INITIAL_TABLE_VERSION) + .build()); + TableIdentifier id = TableIdentifier.of(dto.getDatabaseId(), dto.getTableId()); + List committed = new ArrayList<>(); + for (int i = 0; i < count; i++) { + Snapshot snapshot = catalog.loadTable(id).newAppend().appendFile(dummyDataFile()).apply(); + committed.add(snapshot); + dto = + openHouseInternalRepository.save( + dto.toBuilder() + .tableVersion(dto.getTableLocation()) + .jsonSnapshots( + committed.stream().map(SnapshotParser::toJson).collect(Collectors.toList())) + .snapshotRefs( + IcebergSnapshotsModelTestUtilities.obtainSnapshotRefsFromSnapshot( + SnapshotParser.toJson(snapshot))) + .build()); + } + return dto; + } + + private DataFile dummyDataFile() throws Exception { + return IcebergSnapshotsModelTestUtilities.createDummyDataFile( + Files.createTempFile("stale-base-conflict-", ".orc").toString(), + PartitionSpec.unpartitioned()); + } + + private void cleanup(TableIdentifier id) { + openHouseInternalRepository.deleteById( + TableDtoPrimaryKey.builder() + .databaseId(id.namespace().toString()) + .tableId(id.name()) + .build()); + } + + /** + * Invalidates the per-JVM retry cache so the stale commit is evaluated against its base version + * rather than short-circuited as a duplicate retry of an already-seen commit. + */ + private static void clearRetryCache() throws Exception { + Field cacheField = OpenHouseInternalTableOperations.class.getDeclaredField("CACHE"); + cacheField.setAccessible(true); + ((com.google.common.cache.Cache) cacheField.get(null)).invalidateAll(); + } +}