From 158d662aff95557ac8489b8deff1286673180e50 Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Wed, 28 Jan 2026 14:37:13 +0800 Subject: [PATCH 1/3] [lake/paimon] Compare paimon schema and Fluss schema before alter table. # Conflicts: # fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java # fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java # Conflicts: # fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java --- .../fluss/lake/lakestorage/LakeCatalog.java | 9 + .../TestingLakeCatalogContext.java | 16 ++ .../fluss/lake/paimon/PaimonLakeCatalog.java | 180 +++++++++++++----- .../lake/paimon/PaimonLakeCatalogTest.java | 144 +++++++++++--- .../coordinator/CoordinatorService.java | 18 +- .../server/coordinator/MetadataManager.java | 16 +- 6 files changed, 294 insertions(+), 89 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java index 4cbccb6c1f..652bf56c6b 100644 --- a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java +++ b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java @@ -20,6 +20,7 @@ import org.apache.fluss.annotation.PublicEvolving; import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.exception.TableNotExistException; +import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; @@ -84,5 +85,13 @@ interface Context { /** Get the fluss principal currently accessing the catalog. */ FlussPrincipal getFlussPrincipal(); + + /** + * Get the current schema of fluss. + * + * @return the current schema of fluss. + * @since 0.10 + */ + Schema getFlussSchema(); } } diff --git a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java index 7406b13b6b..8fa580d3e4 100644 --- a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java +++ b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java @@ -17,11 +17,22 @@ package org.apache.fluss.lake.lakestorage; +import org.apache.fluss.metadata.Schema; import org.apache.fluss.security.acl.FlussPrincipal; /** A testing implementation of {@link LakeCatalog.Context}. */ public class TestingLakeCatalogContext implements LakeCatalog.Context { + private final Schema schema; + + public TestingLakeCatalogContext(Schema schema) { + this.schema = schema; + } + + public TestingLakeCatalogContext() { + this(null); + } + @Override public boolean isCreatingFlussTable() { return false; @@ -31,4 +42,9 @@ public boolean isCreatingFlussTable() { public FlussPrincipal getFlussPrincipal() { return null; } + + @Override + public Schema getFlussSchema() { + return schema; + } } diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java index 500546e641..fbe41e3b82 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java @@ -37,6 +37,7 @@ import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.slf4j.Logger; @@ -44,6 +45,7 @@ import java.util.LinkedHashMap; import java.util.List; +import java.util.stream.Collectors; import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon; import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchema; @@ -113,9 +115,10 @@ public void alterTable(TablePath tablePath, List tableChanges, Cont throws TableNotExistException { try { List paimonSchemaChanges = toPaimonSchemaChanges(tableChanges); - - // Compare current Paimon table schema with expected target schema before altering - if (shouldAlterTable(tablePath, tableChanges)) { + org.apache.fluss.metadata.Schema flussSchema = context.getFlussSchema(); + boolean needsApplyChanges = + needsApplyTableChanges(tablePath, tableChanges, flussSchema); + if (needsApplyChanges) { alterTable(tablePath, paimonSchemaChanges); } else { // If schemas already match, treat as idempotent success @@ -133,72 +136,149 @@ public void alterTable(TablePath tablePath, List tableChanges, Cont } } - private boolean shouldAlterTable(TablePath tablePath, List tableChanges) + /** + * Check if table changes need to be applied to reconcile Paimon and Fluss schemas. + * + *

This method determines whether the provided table changes need to be applied based on the + * current state of Paimon and Fluss schemas. + * + * @param tablePath the table path + * @param tableChanges the proposed table changes + * @param flussSchema the current Fluss schema + * @return true if changes need to be applied (schemas are consistent), false if changes can be + * skipped (schemas already match after changes, idempotent) + * @throws TableNotExistException if the table does not exist + * @throws InvalidAlterTableException if schemas are inconsistent and cannot be reconciled + */ + private boolean needsApplyTableChanges( + TablePath tablePath, + List tableChanges, + org.apache.fluss.metadata.Schema flussSchema) throws TableNotExistException { + if (tableChanges.isEmpty()) { + return false; + } + try { + // Get current Paimon table schema Table table = paimonCatalog.getTable(toPaimon(tablePath)); FileStoreTable fileStoreTable = (FileStoreTable) table; - Schema currentSchema = fileStoreTable.schema().toSchema(); - - for (TableChange change : tableChanges) { - if (change instanceof TableChange.AddColumn) { - TableChange.AddColumn addColumn = (TableChange.AddColumn) change; - if (!isColumnAlreadyExists(currentSchema, addColumn)) { - return true; - } - } else { - return true; - } + List paimonFields = + fileStoreTable.schema().toSchema().fields().stream() + .filter(field -> !SYSTEM_COLUMNS.containsKey(field.name())) + .collect(Collectors.toList()); + List flussColumns = flussSchema.getColumns(); + + if (paimonFields.size() < flussColumns.size()) { + throw new InvalidAlterTableException( + String.format( + "Paimon table has less columns (%d) than Fluss schema (%d)", + paimonFields.size(), flussColumns.size())); + } + + // Validate schema compatibility + validateExistingColumns(paimonFields, flussColumns); + + // If schemas are same, can apply all table changes + if (paimonFields.size() == flussColumns.size()) { + return true; + } + + // If Paimon has more columns, check if table changes would reconcile schemas + if (canTableChangesReconcileSchemas(flussColumns, paimonFields, tableChanges)) { + // Schemas will match after applying changes, skip duplicate operations + return false; + } else { + throw new InvalidAlterTableException( + String.format( + "Paimon table has more columns (%d) than Fluss schema (%d); " + + "therefore you need to add the diff columns all at once, " + + "rather than applying other table changes: %s.", + paimonFields.size(), flussColumns.size(), tableChanges)); } - return false; } catch (Catalog.TableNotExistException e) { throw new TableNotExistException("Table " + tablePath + " does not exist."); } } - private boolean isColumnAlreadyExists(Schema currentSchema, TableChange.AddColumn addColumn) { + private void validateExistingColumns( + List paimonFields, + List flussColumns) { + for (int i = 0; i < flussColumns.size(); i++) { + if (!paimonFields.get(i).name().equals(flussColumns.get(i).getName())) { + throw new InvalidAlterTableException( + String.format( + "Column mismatch at position %d. Paimon: '%s', Fluss: '%s'", + i, paimonFields.get(i).name(), flussColumns.get(i).getName())); + } + } + } + + private boolean isColumnAlreadyExists( + org.apache.paimon.types.DataField field, TableChange.AddColumn addColumn) { String columnName = addColumn.getName(); - for (org.apache.paimon.types.DataField field : currentSchema.fields()) { - if (field.name().equals(columnName)) { - org.apache.paimon.types.DataType expectedType = - addColumn - .getDataType() - .accept( - org.apache.fluss.lake.paimon.utils - .FlussDataTypeToPaimonDataType.INSTANCE); - - if (!field.type().equals(expectedType)) { - throw new InvalidAlterTableException( - String.format( - "Column '%s' already exists but with different type. " - + "Existing: %s, Expected: %s", - columnName, field.type(), expectedType)); - } - String existingComment = field.description(); - String expectedComment = addColumn.getComment(); - - boolean commentsMatch = - (existingComment == null && expectedComment == null) - || (existingComment != null - && existingComment.equals(expectedComment)); - - if (!commentsMatch) { - throw new InvalidAlterTableException( - String.format( - "Column %s already exists but with different comment. " - + "Existing: %s, Expected: %s", - columnName, existingComment, expectedComment)); - } + if (field.name().equals(columnName)) { + org.apache.paimon.types.DataType expectedType = + addColumn + .getDataType() + .accept( + org.apache.fluss.lake.paimon.utils.FlussDataTypeToPaimonDataType + .INSTANCE); - return true; + if (!field.type().equals(expectedType)) { + throw new InvalidAlterTableException( + String.format( + "Column '%s' already exists but with different type. " + + "Existing: %s, Expected: %s", + columnName, field.type(), expectedType)); + } + String existingComment = field.description(); + String expectedComment = addColumn.getComment(); + + boolean commentsMatch = + (existingComment == null && expectedComment == null) + || (existingComment != null && existingComment.equals(expectedComment)); + + if (!commentsMatch) { + throw new InvalidAlterTableException( + String.format( + "Column %s already exists but with different comment. " + + "Existing: %s, Expected: %s", + columnName, existingComment, expectedComment)); } - } + return true; + } return false; } + /** Check if applying the table changes would reconcile Paimon and Fluss schemas. */ + private boolean canTableChangesReconcileSchemas( + List flussColumns, + List paimonFields, + List tableChanges) { + if (flussColumns.size() + tableChanges.size() != paimonFields.size()) { + return false; + } + + for (int i = 0; i < paimonFields.size() - flussColumns.size(); i++) { + DataField paimonDataField = paimonFields.get(i + flussColumns.size()); + TableChange tableChange = tableChanges.get(i); + if (!(tableChange instanceof TableChange.AddColumn + && ((TableChange.AddColumn) tableChange).getPosition() + == TableChange.ColumnPosition.last() + && isColumnAlreadyExists( + paimonDataField, (TableChange.AddColumn) tableChange))) { + return false; + } + } + + // All validations passed: applying these changes would reconcile the schemas + return true; + } + private void createTable(TablePath tablePath, Schema schema, boolean isCreatingFlussTable) throws Catalog.DatabaseNotExistException { Identifier paimonPath = toPaimon(tablePath); diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java index ea18b85f6a..405b3b0a5a 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java @@ -34,6 +34,7 @@ import org.junit.jupiter.api.io.TempDir; import java.io.File; +import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -42,6 +43,15 @@ /** Unit test for {@link PaimonLakeCatalog}. */ class PaimonLakeCatalogTest { + private static final Schema FLUSS_SCHEMA = + Schema.newBuilder() + .column("id", DataTypes.BIGINT()) + .column("name", DataTypes.STRING()) + .column("amount", DataTypes.INT()) + .column("address", DataTypes.STRING()) + .build(); + private static final TestingLakeCatalogContext LAKE_CATALOG_CONTEXT = + new TestingLakeCatalogContext(FLUSS_SCHEMA); @TempDir private File tempWarehouseDir; @@ -70,7 +80,7 @@ void testAlterTableProperties() throws Exception { flussPaimonCatalog.alterTable( tablePath, Collections.singletonList(TableChange.set("key", "value")), - new TestingLakeCatalogContext()); + LAKE_CATALOG_CONTEXT); table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier); // we have set the value for key @@ -80,7 +90,7 @@ void testAlterTableProperties() throws Exception { flussPaimonCatalog.alterTable( tablePath, Collections.singletonList(TableChange.reset("key")), - new TestingLakeCatalogContext()); + LAKE_CATALOG_CONTEXT); table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier); // we have reset the value for key @@ -89,14 +99,13 @@ void testAlterTableProperties() throws Exception { @Test void alterTablePropertiesWithNonExistentTable() { - TestingLakeCatalogContext context = new TestingLakeCatalogContext(); // db & table don't exist assertThatThrownBy( () -> flussPaimonCatalog.alterTable( TablePath.of("non_existing_db", "non_existing_table"), Collections.singletonList(TableChange.set("key", "value")), - context)) + LAKE_CATALOG_CONTEXT)) .isInstanceOf(TableNotExistException.class) .hasMessage("Table non_existing_db.non_existing_table does not exist."); @@ -110,7 +119,7 @@ void alterTablePropertiesWithNonExistentTable() { flussPaimonCatalog.alterTable( TablePath.of(database, "non_existing_table"), Collections.singletonList(TableChange.set("key", "value")), - context)) + LAKE_CATALOG_CONTEXT)) .isInstanceOf(TableNotExistException.class) .hasMessage("Table alter_props_db.non_existing_table does not exist."); } @@ -131,7 +140,7 @@ void testAlterTableAddColumnLastNullable() throws Exception { "new_col comment", TableChange.ColumnPosition.last())); - flussPaimonCatalog.alterTable(tablePath, changes, new TestingLakeCatalogContext()); + flussPaimonCatalog.alterTable(tablePath, changes, LAKE_CATALOG_CONTEXT); Table table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier); assertThat(table.rowType().getFieldNames()) @@ -164,7 +173,7 @@ void testAlterTableAddColumnNotLast() { assertThatThrownBy( () -> flussPaimonCatalog.alterTable( - tablePath, changes, new TestingLakeCatalogContext())) + tablePath, changes, LAKE_CATALOG_CONTEXT)) .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Only support to add column at last for paimon table."); } @@ -187,13 +196,13 @@ void testAlterTableAddColumnNotNullable() { assertThatThrownBy( () -> flussPaimonCatalog.alterTable( - tablePath, changes, new TestingLakeCatalogContext())) + tablePath, changes, LAKE_CATALOG_CONTEXT)) .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Only support to add nullable column for paimon table."); } @Test - void testAlterTableAddExistingColumn() { + void testAlterTableAddExistingColumns() { String database = "test_alter_table_add_existing_column_db"; String tableName = "test_alter_table_add_existing_column_table"; TablePath tablePath = TablePath.of(database, tableName); @@ -207,13 +216,38 @@ void testAlterTableAddExistingColumn() { null, TableChange.ColumnPosition.last())); - // no exception thrown when adding existing column - flussPaimonCatalog.alterTable(tablePath, changes, new TestingLakeCatalogContext()); + assertThatThrownBy( + () -> + flussPaimonCatalog.alterTable( + tablePath, changes, LAKE_CATALOG_CONTEXT)) + .isInstanceOf(InvalidAlterTableException.class) + .hasMessageContaining( + "Column address already exists in the test_alter_table_add_existing_column_db.test_alter_table_add_existing_column_table table."); List changes2 = - Collections.singletonList( + Arrays.asList( TableChange.addColumn( - "address", + "new_column", + DataTypes.INT(), + null, + TableChange.ColumnPosition.last()), + TableChange.addColumn( + "new_column2", + DataTypes.STRING(), + null, + TableChange.ColumnPosition.last())); + + // mock add columns to paimon successfully but fail to add columns to fluss. + flussPaimonCatalog.alterTable(tablePath, changes2, LAKE_CATALOG_CONTEXT); + List changes3 = + Arrays.asList( + TableChange.addColumn( + "new_column", + DataTypes.INT(), + null, + TableChange.ColumnPosition.last()), + TableChange.addColumn( + "new_column2", DataTypes.INT(), null, TableChange.ColumnPosition.last())); @@ -221,15 +255,20 @@ void testAlterTableAddExistingColumn() { assertThatThrownBy( () -> flussPaimonCatalog.alterTable( - tablePath, changes2, new TestingLakeCatalogContext())) + tablePath, changes3, LAKE_CATALOG_CONTEXT)) .isInstanceOf(InvalidAlterTableException.class) .hasMessage( - "Column 'address' already exists but with different type. Existing: STRING, Expected: INT"); + "Column 'new_column2' already exists but with different type. Existing: STRING, Expected: INT"); - List changes3 = - Collections.singletonList( + List changes4 = + Arrays.asList( TableChange.addColumn( - "address", + "new_column", + DataTypes.INT(), + null, + TableChange.ColumnPosition.last()), + TableChange.addColumn( + "new_column2", DataTypes.STRING(), "the address comment", TableChange.ColumnPosition.last())); @@ -237,29 +276,74 @@ tablePath, changes2, new TestingLakeCatalogContext())) assertThatThrownBy( () -> flussPaimonCatalog.alterTable( - tablePath, changes3, new TestingLakeCatalogContext())) + tablePath, changes4, LAKE_CATALOG_CONTEXT)) .isInstanceOf(InvalidAlterTableException.class) .hasMessage( - "Column address already exists but with different comment. Existing: null, Expected: the address comment"); + "Column new_column2 already exists but with different comment. Existing: null, Expected: the address comment"); + + // no exception thrown only when adding existing column to match fluss and paimon. + flussPaimonCatalog.alterTable(tablePath, changes2, LAKE_CATALOG_CONTEXT); } - private void createTable(String database, String tableName) { - Schema flussSchema = - Schema.newBuilder() - .column("id", DataTypes.BIGINT()) - .column("name", DataTypes.STRING()) - .column("amount", DataTypes.INT()) - .column("address", DataTypes.STRING()) - .build(); + @Test + void testAlterTableAddColumnWhenPaimonSchemaNotMatch() { + // this rarely happens only when new fluss lake table with an existed paimon table or use + // alter table in paimon side directly. + String database = "test_alter_table_add_column_fluss_wider"; + String tableName = "test_alter_table_add_column_fluss_wider"; + createTable(database, tableName); + TablePath tablePath = TablePath.of(database, tableName); + + List changes = + Collections.singletonList( + TableChange.addColumn( + "new_col", + DataTypes.INT(), + "new_col comment", + TableChange.ColumnPosition.last())); + assertThatThrownBy( + () -> + flussPaimonCatalog.alterTable( + tablePath, + changes, + new TestingLakeCatalogContext( + Schema.newBuilder() + .column("id", DataTypes.BIGINT()) + .column("name", DataTypes.STRING()) + .column("amount", DataTypes.INT()) + .column("address", DataTypes.STRING()) + .column("phone", DataTypes.INT()) + .build()))) + .isInstanceOf(InvalidAlterTableException.class) + .hasMessageContaining("Paimon table has less columns (4) than Fluss schema (5)"); + + assertThatThrownBy( + () -> + flussPaimonCatalog.alterTable( + tablePath, + changes, + new TestingLakeCatalogContext( + Schema.newBuilder() + .column("id", DataTypes.BIGINT()) + .column("amount", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .column("address", DataTypes.STRING()) + .build()))) + .isInstanceOf(InvalidAlterTableException.class) + .hasMessageContaining( + "Column mismatch at position 1. Paimon: 'name', Fluss: 'amount'"); + } + + private void createTable(String database, String tableName) { TableDescriptor td = TableDescriptor.builder() - .schema(flussSchema) + .schema(FLUSS_SCHEMA) .distributedBy(3) // no bucket key .build(); TablePath tablePath = TablePath.of(database, tableName); - flussPaimonCatalog.createTable(tablePath, td, new TestingLakeCatalogContext()); + flussPaimonCatalog.createTable(tablePath, td, LAKE_CATALOG_CONTEXT); } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index 6a846076ac..8fb1b47833 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -47,6 +47,7 @@ import org.apache.fluss.metadata.PartitionSpec; import org.apache.fluss.metadata.ResolvedPartitionSpec; import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; @@ -389,7 +390,9 @@ public CompletableFuture createTable(CreateTableRequest req tablePath, tableDescriptor, new DefaultLakeCatalogContext( - true, currentSession().getPrincipal())); + true, + currentSession().getPrincipal(), + tableDescriptor.getSchema())); } catch (TableAlreadyExistException e) { throw new LakeTableAlreadyExistException(e.getMessage(), e); } @@ -428,7 +431,7 @@ public CompletableFuture alterTable(AlterTableRequest reques tablePath, alterSchemaChanges, request.isIgnoreIfNotExists(), - lakeCatalogContext); + currentSession().getPrincipal()); } if (!alterTableConfigChanges.isEmpty()) { @@ -437,7 +440,7 @@ public CompletableFuture alterTable(AlterTableRequest reques alterTableConfigChanges, tablePropertyChanges, request.isIgnoreIfNotExists(), - lakeCatalogContext); + currentSession().getPrincipal()); } return CompletableFuture.completedFuture(new AlterTableResponse()); @@ -1011,11 +1014,13 @@ static class DefaultLakeCatalogContext implements LakeCatalog.Context { private final boolean isCreatingFlussTable; private final FlussPrincipal flussPrincipal; + private final Schema schema; public DefaultLakeCatalogContext( - boolean isCreatingFlussTable, FlussPrincipal flussPrincipal) { + boolean isCreatingFlussTable, FlussPrincipal flussPrincipal, Schema schema) { this.isCreatingFlussTable = isCreatingFlussTable; this.flussPrincipal = flussPrincipal; + this.schema = schema; } @Override @@ -1027,6 +1032,11 @@ public boolean isCreatingFlussTable() { public FlussPrincipal getFlussPrincipal() { return flussPrincipal; } + + @Override + public Schema getFlussSchema() { + return schema; + } } // ================================================================================== diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index 8d7b61a088..33dd82ec87 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -45,6 +45,7 @@ import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePartition; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.security.acl.FlussPrincipal; import org.apache.fluss.server.entity.TablePropertyChanges; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.data.DatabaseRegistration; @@ -325,7 +326,7 @@ public void alterTableSchema( TablePath tablePath, List schemaChanges, boolean ignoreIfNotExists, - LakeCatalog.Context lakeCatalogContext) + FlussPrincipal flussPrincipal) throws TableNotExistException, TableNotPartitionedException { try { @@ -334,7 +335,9 @@ public void alterTableSchema( // validate the table column changes if (!schemaChanges.isEmpty()) { Schema newSchema = SchemaUpdate.applySchemaChanges(table, schemaChanges); - + LakeCatalog.Context lakeCatalogContext = + new CoordinatorService.DefaultLakeCatalogContext( + false, flussPrincipal, table.getSchema()); // Lake First: sync to Lake before updating Fluss schema syncSchemaChangesToLake(tablePath, table, schemaChanges, lakeCatalogContext); @@ -396,7 +399,7 @@ public void alterTableProperties( List tableChanges, TablePropertyChanges tablePropertyChanges, boolean ignoreIfNotExists, - LakeCatalog.Context lakeCatalogContext) { + FlussPrincipal flussPrincipal) { try { // it throws TableNotExistException if the table or database not exists TableRegistration tableReg = getTableRegistration(tablePath); @@ -426,7 +429,7 @@ public void alterTableProperties( tableDescriptor, newDescriptor, tableChanges, - lakeCatalogContext); + flussPrincipal); // update the table to zk TableRegistration updatedTableRegistration = tableReg.newProperties( @@ -456,7 +459,10 @@ private void preAlterTableProperties( TableDescriptor tableDescriptor, TableDescriptor newDescriptor, List tableChanges, - LakeCatalog.Context lakeCatalogContext) { + FlussPrincipal flussPrincipal) { + LakeCatalog.Context lakeCatalogContext = + new CoordinatorService.DefaultLakeCatalogContext( + false, flussPrincipal, newDescriptor.getSchema()); LakeCatalog lakeCatalog = lakeCatalogDynamicLoader.getLakeCatalogContainer().getLakeCatalog(); From 18dedd2c2a0e701d918726e5303d045bacaa5ff7 Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Wed, 21 Jan 2026 20:36:25 +0800 Subject: [PATCH 2/3] modified base on cr --- .../fluss/client/admin/FlussAdminITCase.java | 16 ++ .../fluss/lake/lakestorage/LakeCatalog.java | 10 +- .../TestingLakeCatalogContext.java | 23 +- .../fluss/lake/paimon/PaimonLakeCatalog.java | 232 +++++++----------- .../lake/paimon/utils/PaimonConversions.java | 3 +- .../paimon/utils/PaimonTableValidation.java | 41 ++-- .../lake/paimon/PaimonLakeCatalogTest.java | 123 +++++++--- .../coordinator/CoordinatorService.java | 27 +- .../server/coordinator/MetadataManager.java | 17 +- .../server/coordinator/SchemaUpdate.java | 22 +- 10 files changed, 278 insertions(+), 236 deletions(-) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java index 5560e14eea..5dff27b587 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java @@ -436,6 +436,22 @@ void testAlterTableColumn() throws Exception { DataTypeChecks.equalsWithFieldId( schemaInfo.getSchema().getRowType(), expectedSchema.getRowType())) .isTrue(); + + assertThatThrownBy( + () -> + admin.alterTable( + tablePath, + Collections.singletonList( + TableChange.addColumn( + "nested_row", + DataTypes.ROW( + DataTypes.STRING(), + DataTypes.INT()), + "new nested column", + TableChange.ColumnPosition.last())), + false) + .get()) + .hasMessageContaining("Column nested_row already exists"); } @Test diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java index 652bf56c6b..8a05fd97f2 100644 --- a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java +++ b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java @@ -20,7 +20,6 @@ import org.apache.fluss.annotation.PublicEvolving; import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.exception.TableNotExistException; -import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; @@ -87,11 +86,14 @@ interface Context { FlussPrincipal getFlussPrincipal(); /** - * Get the current schema of fluss. + * Get the current table info of fluss. * - * @return the current schema of fluss. + * @return the current table info of fluss. * @since 0.10 */ - Schema getFlussSchema(); + TableDescriptor getCurrentTable(); + + /** Get the expected table info of fluss. */ + TableDescriptor getExpectedTable(); } } diff --git a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java index 8fa580d3e4..d3efd712ea 100644 --- a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java +++ b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java @@ -17,16 +17,22 @@ package org.apache.fluss.lake.lakestorage; -import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.security.acl.FlussPrincipal; /** A testing implementation of {@link LakeCatalog.Context}. */ public class TestingLakeCatalogContext implements LakeCatalog.Context { - private final Schema schema; + private final TableDescriptor currentTable; + private final TableDescriptor expectedTable; - public TestingLakeCatalogContext(Schema schema) { - this.schema = schema; + public TestingLakeCatalogContext(TableDescriptor tableDescriptor) { + this(tableDescriptor, tableDescriptor); + } + + public TestingLakeCatalogContext(TableDescriptor currentTable, TableDescriptor expectedTable) { + this.currentTable = currentTable; + this.expectedTable = expectedTable; } public TestingLakeCatalogContext() { @@ -44,7 +50,12 @@ public FlussPrincipal getFlussPrincipal() { } @Override - public Schema getFlussSchema() { - return schema; + public TableDescriptor getCurrentTable() { + return currentTable; + } + + @Override + public TableDescriptor getExpectedTable() { + return expectedTable; } } diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java index fbe41e3b82..0e662a5fea 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java @@ -37,7 +37,6 @@ import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; -import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.slf4j.Logger; @@ -51,7 +50,7 @@ import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchema; import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchemaChanges; import static org.apache.fluss.lake.paimon.utils.PaimonTableValidation.checkTableIsEmpty; -import static org.apache.fluss.lake.paimon.utils.PaimonTableValidation.validatePaimonSchemaCompatible; +import static org.apache.fluss.lake.paimon.utils.PaimonTableValidation.isPaimonSchemaCompatible; import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME; import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; @@ -114,18 +113,41 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Co public void alterTable(TablePath tablePath, List tableChanges, Context context) throws TableNotExistException { try { - List paimonSchemaChanges = toPaimonSchemaChanges(tableChanges); - org.apache.fluss.metadata.Schema flussSchema = context.getFlussSchema(); - boolean needsApplyChanges = - needsApplyTableChanges(tablePath, tableChanges, flussSchema); - if (needsApplyChanges) { - alterTable(tablePath, paimonSchemaChanges); + Table table = paimonCatalog.getTable(toPaimon(tablePath)); + FileStoreTable fileStoreTable = (FileStoreTable) table; + Schema currentPaimonSchema = fileStoreTable.schema().toSchema(); + + List paimonSchemaChanges; + if (isPaimonSchemaCompatible( + currentPaimonSchema, toPaimonSchema(context.getCurrentTable()))) { + // if the paimon schema is same as current fluss schema, directly apply all the + // changes. + paimonSchemaChanges = toPaimonSchemaChanges(tableChanges); + } else if (isPaimonSchemaCompatible( + currentPaimonSchema, toPaimonSchema(context.getExpectedTable()))) { + // if the schema is same as applied fluss schema , skip adding columns. + paimonSchemaChanges = + toPaimonSchemaChanges( + tableChanges.stream() + .filter( + tableChange -> + !(tableChange + instanceof TableChange.AddColumn)) + .collect(Collectors.toList())); } else { - // If schemas already match, treat as idempotent success - LOG.info( - "Skipping schema evolution for Paimon table {} because the column(s) to add {} already exist.", - tablePath, - tableChanges); + throw new InvalidAlterTableException( + String.format( + "Paimon schema is not compatible with Fluss schema: " + + "Paimon schema: %s, Fluss schema: %s. " + + "therefore you need to add the diff columns all at once, " + + "rather than applying other table changes: %s.", + currentPaimonSchema, + context.getCurrentTable().getSchema(), + tableChanges)); + } + + if (!paimonSchemaChanges.isEmpty()) { + alterTable(tablePath, paimonSchemaChanges); } } catch (Catalog.ColumnAlreadyExistException e) { // This shouldn't happen if shouldAlterTable works correctly, but keep as safeguard @@ -133,150 +155,75 @@ public void alterTable(TablePath tablePath, List tableChanges, Cont } catch (Catalog.ColumnNotExistException e) { // This shouldn't happen for AddColumn operations throw new InvalidAlterTableException(e.getMessage()); + } catch (Catalog.TableNotExistException e) { + throw new TableNotExistException("Table " + tablePath + " does not exist."); } } - /** - * Check if table changes need to be applied to reconcile Paimon and Fluss schemas. - * - *

This method determines whether the provided table changes need to be applied based on the - * current state of Paimon and Fluss schemas. - * - * @param tablePath the table path - * @param tableChanges the proposed table changes - * @param flussSchema the current Fluss schema - * @return true if changes need to be applied (schemas are consistent), false if changes can be - * skipped (schemas already match after changes, idempotent) - * @throws TableNotExistException if the table does not exist - * @throws InvalidAlterTableException if schemas are inconsistent and cannot be reconciled - */ - private boolean needsApplyTableChanges( - TablePath tablePath, - List tableChanges, - org.apache.fluss.metadata.Schema flussSchema) + private boolean shouldAlterTable(TablePath tablePath, List tableChanges) throws TableNotExistException { - if (tableChanges.isEmpty()) { - return false; - } - try { - // Get current Paimon table schema Table table = paimonCatalog.getTable(toPaimon(tablePath)); FileStoreTable fileStoreTable = (FileStoreTable) table; - List paimonFields = - fileStoreTable.schema().toSchema().fields().stream() - .filter(field -> !SYSTEM_COLUMNS.containsKey(field.name())) - .collect(Collectors.toList()); - List flussColumns = flussSchema.getColumns(); - - if (paimonFields.size() < flussColumns.size()) { - throw new InvalidAlterTableException( - String.format( - "Paimon table has less columns (%d) than Fluss schema (%d)", - paimonFields.size(), flussColumns.size())); - } - - // Validate schema compatibility - validateExistingColumns(paimonFields, flussColumns); - - // If schemas are same, can apply all table changes - if (paimonFields.size() == flussColumns.size()) { - return true; - } - - // If Paimon has more columns, check if table changes would reconcile schemas - if (canTableChangesReconcileSchemas(flussColumns, paimonFields, tableChanges)) { - // Schemas will match after applying changes, skip duplicate operations - return false; - } else { - throw new InvalidAlterTableException( - String.format( - "Paimon table has more columns (%d) than Fluss schema (%d); " - + "therefore you need to add the diff columns all at once, " - + "rather than applying other table changes: %s.", - paimonFields.size(), flussColumns.size(), tableChanges)); + Schema currentSchema = fileStoreTable.schema().toSchema(); + + for (TableChange change : tableChanges) { + if (change instanceof TableChange.AddColumn) { + TableChange.AddColumn addColumn = (TableChange.AddColumn) change; + if (!isColumnAlreadyExists(currentSchema, addColumn)) { + return true; + } + } else { + return true; + } } + return false; } catch (Catalog.TableNotExistException e) { throw new TableNotExistException("Table " + tablePath + " does not exist."); } } - private void validateExistingColumns( - List paimonFields, - List flussColumns) { - for (int i = 0; i < flussColumns.size(); i++) { - if (!paimonFields.get(i).name().equals(flussColumns.get(i).getName())) { - throw new InvalidAlterTableException( - String.format( - "Column mismatch at position %d. Paimon: '%s', Fluss: '%s'", - i, paimonFields.get(i).name(), flussColumns.get(i).getName())); - } - } - } - - private boolean isColumnAlreadyExists( - org.apache.paimon.types.DataField field, TableChange.AddColumn addColumn) { + private boolean isColumnAlreadyExists(Schema currentSchema, TableChange.AddColumn addColumn) { String columnName = addColumn.getName(); - if (field.name().equals(columnName)) { - org.apache.paimon.types.DataType expectedType = - addColumn - .getDataType() - .accept( - org.apache.fluss.lake.paimon.utils.FlussDataTypeToPaimonDataType - .INSTANCE); - - if (!field.type().equals(expectedType)) { - throw new InvalidAlterTableException( - String.format( - "Column '%s' already exists but with different type. " - + "Existing: %s, Expected: %s", - columnName, field.type(), expectedType)); - } - String existingComment = field.description(); - String expectedComment = addColumn.getComment(); - - boolean commentsMatch = - (existingComment == null && expectedComment == null) - || (existingComment != null && existingComment.equals(expectedComment)); - - if (!commentsMatch) { - throw new InvalidAlterTableException( - String.format( - "Column %s already exists but with different comment. " - + "Existing: %s, Expected: %s", - columnName, existingComment, expectedComment)); - } - - return true; - } - return false; - } - - /** Check if applying the table changes would reconcile Paimon and Fluss schemas. */ - private boolean canTableChangesReconcileSchemas( - List flussColumns, - List paimonFields, - List tableChanges) { - if (flussColumns.size() + tableChanges.size() != paimonFields.size()) { - return false; - } + for (org.apache.paimon.types.DataField field : currentSchema.fields()) { + if (field.name().equals(columnName)) { + org.apache.paimon.types.DataType expectedType = + addColumn + .getDataType() + .accept( + org.apache.fluss.lake.paimon.utils + .FlussDataTypeToPaimonDataType.INSTANCE); + + if (!field.type().equals(expectedType)) { + throw new InvalidAlterTableException( + String.format( + "Column '%s' already exists but with different type. " + + "Existing: %s, Expected: %s", + columnName, field.type(), expectedType)); + } + String existingComment = field.description(); + String expectedComment = addColumn.getComment(); + + boolean commentsMatch = + (existingComment == null && expectedComment == null) + || (existingComment != null + && existingComment.equals(expectedComment)); + + if (!commentsMatch) { + throw new InvalidAlterTableException( + String.format( + "Column %s already exists but with different comment. " + + "Existing: %s, Expected: %s", + columnName, existingComment, expectedComment)); + } - for (int i = 0; i < paimonFields.size() - flussColumns.size(); i++) { - DataField paimonDataField = paimonFields.get(i + flussColumns.size()); - TableChange tableChange = tableChanges.get(i); - if (!(tableChange instanceof TableChange.AddColumn - && ((TableChange.AddColumn) tableChange).getPosition() - == TableChange.ColumnPosition.last() - && isColumnAlreadyExists( - paimonDataField, (TableChange.AddColumn) tableChange))) { - return false; + return true; } } - // All validations passed: applying these changes would reconcile the schemas - return true; + return false; } private void createTable(TablePath tablePath, Schema schema, boolean isCreatingFlussTable) @@ -289,8 +236,15 @@ private void createTable(TablePath tablePath, Schema schema, boolean isCreatingF try { Table table = paimonCatalog.getTable(paimonPath); FileStoreTable fileStoreTable = (FileStoreTable) table; - validatePaimonSchemaCompatible( - paimonPath, fileStoreTable.schema().toSchema(), schema); + Schema existingSchema = fileStoreTable.schema().toSchema(); + if (!isPaimonSchemaCompatible(existingSchema, schema)) { + throw new TableAlreadyExistException( + String.format( + "The table %s already exists in Paimon catalog, but the table schema is not compatible. " + + "Existing schema: %s, new schema: %s. " + + "Please first drop the table in Paimon catalog or use a new table name.", + paimonPath.getEscapedFullName(), existingSchema, schema)); + } // if creating a new fluss table, we should ensure the lake table is empty if (isCreatingFlussTable) { checkTableIsEmpty(tablePath, fileStoreTable); diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java index ded40ac59b..ee6d0f0d7c 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java @@ -57,6 +57,7 @@ public class PaimonConversions { // for fluss config public static final String FLUSS_CONF_PREFIX = "fluss."; + public static final String TABLE_DATALAKE_PAIMON_PREFIX = "table.datalake.paimon."; // for paimon config private static final String PAIMON_CONF_PREFIX = "paimon."; @@ -261,7 +262,7 @@ private static void setPaimonDefaultProperties(Options options) { private static void setFlussPropertyToPaimon(String key, String value, Options options) { if (key.startsWith(PAIMON_CONF_PREFIX)) { options.set(key.substring(PAIMON_CONF_PREFIX.length()), value); - } else { + } else if (!key.startsWith(TABLE_DATALAKE_PAIMON_PREFIX)) { options.set(FLUSS_CONF_PREFIX + key, value); } } diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java index 94580df179..62ff19cc61 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java @@ -21,7 +21,6 @@ import org.apache.fluss.metadata.TablePath; import org.apache.paimon.CoreOptions; -import org.apache.paimon.catalog.Identifier; import org.apache.paimon.options.ConfigOption; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.FileStoreTable; @@ -34,6 +33,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import static org.apache.fluss.lake.paimon.utils.PaimonConversions.FLUSS_CONF_PREFIX; import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; @@ -43,8 +43,7 @@ public class PaimonTableValidation { private static final Map> PAIMON_CONFIGS = extractPaimonConfigs(); - public static void validatePaimonSchemaCompatible( - Identifier tablePath, Schema existingSchema, Schema newSchema) { + public static boolean isPaimonSchemaCompatible(Schema existingSchema, Schema newSchema) { // Adjust options for comparison Map existingOptions = existingSchema.options(); Map newOptions = newSchema.options(); @@ -66,21 +65,16 @@ public static void validatePaimonSchemaCompatible( // ignore the existing options that are not in new options existingOptions.entrySet().removeIf(entry -> !newOptions.containsKey(entry.getKey())); - if (!existingSchema.equals(newSchema)) { + // ignore the fields because newSchema is referred by fluss schema, whose field id maybe not + // same as existingSchema. + if (!equalIgnoreFieldId(existingSchema, newSchema)) { // Allow different precisions for __timestamp column for backward compatibility, // old cluster will use precision 6, but new cluster will use precision 3, // we allow such precision difference - if (equalIgnoreSystemColumnTimestampPrecision(existingSchema, newSchema)) { - return; - } - - throw new TableAlreadyExistException( - String.format( - "The table %s already exists in Paimon catalog, but the table schema is not compatible. " - + "Existing schema: %s, new schema: %s. " - + "Please first drop the table in Paimon catalog or use a new table name.", - tablePath.getEscapedFullName(), existingSchema, newSchema)); + return equalIgnoreSystemColumnTimestampPrecision(existingSchema, newSchema); } + + return true; } /** @@ -96,7 +90,7 @@ public static void validatePaimonSchemaCompatible( * @param newSchema the new schema descriptor generated by the current Fluss cluster * @return true if the schemas are identical, disregarding the precision of the system timestamp */ - private static boolean equalIgnoreSystemColumnTimestampPrecision( + public static boolean equalIgnoreSystemColumnTimestampPrecision( Schema existingSchema, Schema newSchema) { List existingFields = new ArrayList<>(existingSchema.fields()); DataField systemTimestampField = existingFields.get(existingFields.size() - 1); @@ -116,6 +110,23 @@ private static boolean equalIgnoreSystemColumnTimestampPrecision( return existingSchema.equals(newSchema); } + private static boolean equalIgnoreFieldId(Schema existingSchema, Schema newSchema) { + List existingFields = existingSchema.fields(); + List newFields = newSchema.fields(); + for (int i = 0; i < existingFields.size(); i++) { + DataField existingField = existingFields.get(i); + DataField newField = newFields.get(i); + if (!existingField.equalsIgnoreFieldId(newField)) { + return false; + } + } + + return Objects.equals(existingSchema.partitionKeys(), newSchema.partitionKeys()) + && Objects.equals(existingSchema.primaryKeys(), newSchema.primaryKeys()) + && Objects.equals(existingSchema.options(), newSchema.options()) + && Objects.equals(existingSchema.comment(), newSchema.comment()); + } + private static void removeChangeableOptions(Map options) { options.entrySet() .removeIf( diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java index 405b3b0a5a..94f51392cc 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java @@ -25,9 +25,11 @@ import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.coordinator.SchemaUpdate; import org.apache.fluss.types.DataTypes; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -38,6 +40,9 @@ import java.util.Collections; import java.util.List; +import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED; +import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_FORMAT; +import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -51,7 +56,7 @@ class PaimonLakeCatalogTest { .column("address", DataTypes.STRING()) .build(); private static final TestingLakeCatalogContext LAKE_CATALOG_CONTEXT = - new TestingLakeCatalogContext(FLUSS_SCHEMA); + new TestingLakeCatalogContext(TableDescriptor.builder().schema(FLUSS_SCHEMA).build()); @TempDir private File tempWarehouseDir; @@ -202,12 +207,11 @@ void testAlterTableAddColumnNotNullable() { } @Test - void testAlterTableAddExistingColumns() { + void testAlterTableAddExistingColumns() throws Exception { String database = "test_alter_table_add_existing_column_db"; String tableName = "test_alter_table_add_existing_column_table"; TablePath tablePath = TablePath.of(database, tableName); createTable(database, tableName); - List changes = Collections.singletonList( TableChange.addColumn( @@ -219,10 +223,11 @@ void testAlterTableAddExistingColumns() { assertThatThrownBy( () -> flussPaimonCatalog.alterTable( - tablePath, changes, LAKE_CATALOG_CONTEXT)) + tablePath, + changes, + getLakeCatalogContext(FLUSS_SCHEMA, changes))) .isInstanceOf(InvalidAlterTableException.class) - .hasMessageContaining( - "Column address already exists in the test_alter_table_add_existing_column_db.test_alter_table_add_existing_column_table table."); + .hasMessageContaining("Column address already exists"); List changes2 = Arrays.asList( @@ -238,7 +243,8 @@ void testAlterTableAddExistingColumns() { TableChange.ColumnPosition.last())); // mock add columns to paimon successfully but fail to add columns to fluss. - flussPaimonCatalog.alterTable(tablePath, changes2, LAKE_CATALOG_CONTEXT); + flussPaimonCatalog.alterTable( + tablePath, changes2, getLakeCatalogContext(FLUSS_SCHEMA, changes2)); List changes3 = Arrays.asList( TableChange.addColumn( @@ -255,10 +261,15 @@ void testAlterTableAddExistingColumns() { assertThatThrownBy( () -> flussPaimonCatalog.alterTable( - tablePath, changes3, LAKE_CATALOG_CONTEXT)) + tablePath, + changes3, + getLakeCatalogContext(FLUSS_SCHEMA, changes3))) .isInstanceOf(InvalidAlterTableException.class) - .hasMessage( - "Column 'new_column2' already exists but with different type. Existing: STRING, Expected: INT"); + .hasMessageContaining("Paimon schema is not compatible with Fluss schema") + .hasMessageContaining( + String.format( + "therefore you need to add the diff columns all at once, rather than applying other table changes: %s.", + changes3)); List changes4 = Arrays.asList( @@ -276,23 +287,34 @@ void testAlterTableAddExistingColumns() { assertThatThrownBy( () -> flussPaimonCatalog.alterTable( - tablePath, changes4, LAKE_CATALOG_CONTEXT)) + tablePath, + changes4, + getLakeCatalogContext(FLUSS_SCHEMA, changes4))) .isInstanceOf(InvalidAlterTableException.class) - .hasMessage( - "Column new_column2 already exists but with different comment. Existing: null, Expected: the address comment"); + .hasMessageContaining("Paimon schema is not compatible with Fluss schema") + .hasMessageContaining( + String.format( + "therefore you need to add the diff columns all at once, rather than applying other table changes: %s.", + changes4)); // no exception thrown only when adding existing column to match fluss and paimon. - flussPaimonCatalog.alterTable(tablePath, changes2, LAKE_CATALOG_CONTEXT); + flussPaimonCatalog.alterTable( + tablePath, changes2, getLakeCatalogContext(FLUSS_SCHEMA, changes2)); } @Test - void testAlterTableAddColumnWhenPaimonSchemaNotMatch() { + void testAlterTableAddColumnWhenPaimonSchemaNotMatch() throws Exception { // this rarely happens only when new fluss lake table with an existed paimon table or use // alter table in paimon side directly. String database = "test_alter_table_add_column_fluss_wider"; String tableName = "test_alter_table_add_column_fluss_wider"; createTable(database, tableName); TablePath tablePath = TablePath.of(database, tableName); + org.apache.paimon.schema.Schema paimonSchema = + ((FileStoreTable) + flussPaimonCatalog.getPaimonCatalog().getTable(toPaimon(tablePath))) + .schema() + .toSchema(); List changes = Collections.singletonList( @@ -302,48 +324,71 @@ void testAlterTableAddColumnWhenPaimonSchemaNotMatch() { "new_col comment", TableChange.ColumnPosition.last())); + // test column number mismatch. + Schema widerFlussSchema = + Schema.newBuilder() + .column("id", DataTypes.BIGINT()) + .column("name", DataTypes.STRING()) + .column("amount", DataTypes.INT()) + .column("address", DataTypes.STRING()) + .column("phone", DataTypes.INT()) + .build(); assertThatThrownBy( () -> flussPaimonCatalog.alterTable( tablePath, changes, - new TestingLakeCatalogContext( - Schema.newBuilder() - .column("id", DataTypes.BIGINT()) - .column("name", DataTypes.STRING()) - .column("amount", DataTypes.INT()) - .column("address", DataTypes.STRING()) - .column("phone", DataTypes.INT()) - .build()))) + getLakeCatalogContext(widerFlussSchema, changes))) .isInstanceOf(InvalidAlterTableException.class) - .hasMessageContaining("Paimon table has less columns (4) than Fluss schema (5)"); - + .hasMessageContaining("Paimon schema is not compatible with Fluss schema") + .hasMessageContaining( + String.format( + "therefore you need to add the diff columns all at once, rather than applying other table changes: %s.", + changes)); + + // test column order mismatch. + Schema disorderflussSchema = + Schema.newBuilder() + .column("id", DataTypes.BIGINT()) + .column("amount", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .column("address", DataTypes.STRING()) + .build(); assertThatThrownBy( () -> flussPaimonCatalog.alterTable( tablePath, changes, - new TestingLakeCatalogContext( - Schema.newBuilder() - .column("id", DataTypes.BIGINT()) - .column("amount", DataTypes.INT()) - .column("name", DataTypes.STRING()) - .column("address", DataTypes.STRING()) - .build()))) + getLakeCatalogContext(disorderflussSchema, changes))) .isInstanceOf(InvalidAlterTableException.class) + .hasMessageContaining("Paimon schema is not compatible with Fluss schema") .hasMessageContaining( - "Column mismatch at position 1. Paimon: 'name', Fluss: 'amount'"); + String.format( + "therefore you need to add the diff columns all at once, rather than applying other table changes: %s.", + changes)); } private void createTable(String database, String tableName) { - TableDescriptor td = - TableDescriptor.builder() - .schema(FLUSS_SCHEMA) - .distributedBy(3) // no bucket key - .build(); - + TableDescriptor td = getTableDescriptor(FLUSS_SCHEMA); TablePath tablePath = TablePath.of(database, tableName); flussPaimonCatalog.createTable(tablePath, td, LAKE_CATALOG_CONTEXT); } + + private TestingLakeCatalogContext getLakeCatalogContext( + Schema schema, List schemaChanges) { + Schema expectedSchema = SchemaUpdate.applySchemaChanges(schema, schemaChanges); + return new TestingLakeCatalogContext( + getTableDescriptor(schema), getTableDescriptor(expectedSchema)); + } + + private TableDescriptor getTableDescriptor(Schema schema) { + return TableDescriptor.builder() + .schema(schema) + .property(TABLE_DATALAKE_ENABLED.key(), "true") + .property(TABLE_DATALAKE_FORMAT.key(), "paimon") + .property("table.datalake.paimon.warehouse", tempWarehouseDir.toURI().toString()) + .distributedBy(3) // no bucket key + .build(); + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index 8fb1b47833..2d32ea5771 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -47,7 +47,6 @@ import org.apache.fluss.metadata.PartitionSpec; import org.apache.fluss.metadata.ResolvedPartitionSpec; import org.apache.fluss.metadata.TableBucket; -import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; @@ -392,7 +391,8 @@ public CompletableFuture createTable(CreateTableRequest req new DefaultLakeCatalogContext( true, currentSession().getPrincipal(), - tableDescriptor.getSchema())); + null, + tableDescriptor)); } catch (TableAlreadyExistException e) { throw new LakeTableAlreadyExistException(e.getMessage(), e); } @@ -423,9 +423,6 @@ public CompletableFuture alterTable(AlterTableRequest reques + "table properties or table schema."); } - LakeCatalog.Context lakeCatalogContext = - new DefaultLakeCatalogContext(false, currentSession().getPrincipal()); - if (!alterSchemaChanges.isEmpty()) { metadataManager.alterTableSchema( tablePath, @@ -1014,13 +1011,18 @@ static class DefaultLakeCatalogContext implements LakeCatalog.Context { private final boolean isCreatingFlussTable; private final FlussPrincipal flussPrincipal; - private final Schema schema; + private final TableDescriptor currentTable; + private final TableDescriptor expectedTable; public DefaultLakeCatalogContext( - boolean isCreatingFlussTable, FlussPrincipal flussPrincipal, Schema schema) { + boolean isCreatingFlussTable, + FlussPrincipal flussPrincipal, + TableDescriptor currentTable, + TableDescriptor expectedTable) { this.isCreatingFlussTable = isCreatingFlussTable; this.flussPrincipal = flussPrincipal; - this.schema = schema; + this.currentTable = currentTable; + this.expectedTable = expectedTable; } @Override @@ -1034,8 +1036,13 @@ public FlussPrincipal getFlussPrincipal() { } @Override - public Schema getFlussSchema() { - return schema; + public TableDescriptor getCurrentTable() { + return currentTable; + } + + @Override + public TableDescriptor getExpectedTable() { + return expectedTable; } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index 33dd82ec87..df411bb1e9 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -331,13 +331,18 @@ public void alterTableSchema( try { TableInfo table = getTable(tablePath); + TableDescriptor tableDescriptor = table.toTableDescriptor(); // validate the table column changes if (!schemaChanges.isEmpty()) { - Schema newSchema = SchemaUpdate.applySchemaChanges(table, schemaChanges); + Schema newSchema = + SchemaUpdate.applySchemaChanges(table.getSchema(), schemaChanges); LakeCatalog.Context lakeCatalogContext = new CoordinatorService.DefaultLakeCatalogContext( - false, flussPrincipal, table.getSchema()); + false, + flussPrincipal, + tableDescriptor, + TableDescriptor.builder(tableDescriptor).schema(newSchema).build()); // Lake First: sync to Lake before updating Fluss schema syncSchemaChangesToLake(tablePath, table, schemaChanges, lakeCatalogContext); @@ -425,11 +430,7 @@ public void alterTableProperties( // pre alter table properties, e.g. create lake table in lake storage if it's to // enable datalake for the table preAlterTableProperties( - tablePath, - tableDescriptor, - newDescriptor, - tableChanges, - flussPrincipal); + tablePath, tableDescriptor, newDescriptor, tableChanges, flussPrincipal); // update the table to zk TableRegistration updatedTableRegistration = tableReg.newProperties( @@ -462,7 +463,7 @@ private void preAlterTableProperties( FlussPrincipal flussPrincipal) { LakeCatalog.Context lakeCatalogContext = new CoordinatorService.DefaultLakeCatalogContext( - false, flussPrincipal, newDescriptor.getSchema()); + false, flussPrincipal, null, newDescriptor); LakeCatalog lakeCatalog = lakeCatalogDynamicLoader.getLakeCatalogContainer().getLakeCatalog(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java index 011871a0ba..d1ad0c16c2 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java @@ -17,20 +17,19 @@ package org.apache.fluss.server.coordinator; +import org.apache.fluss.exception.InvalidAlterTableException; import org.apache.fluss.exception.SchemaChangeException; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableChange; -import org.apache.fluss.metadata.TableInfo; import java.util.List; -import java.util.Objects; /** Schema update. */ public class SchemaUpdate { /** Apply schema changes to the given table info and return the updated schema. */ - public static Schema applySchemaChanges(TableInfo tableInfo, List changes) { - SchemaUpdate schemaUpdate = new SchemaUpdate(tableInfo); + public static Schema applySchemaChanges(Schema initialSchema, List changes) { + SchemaUpdate schemaUpdate = new SchemaUpdate(initialSchema); for (TableChange change : changes) { schemaUpdate = schemaUpdate.applySchemaChange(change); } @@ -40,9 +39,9 @@ public static Schema applySchemaChanges(TableInfo tableInfo, List c // Now we only maintain the Builder private final Schema.Builder builder; - public SchemaUpdate(TableInfo tableInfo) { + public SchemaUpdate(Schema initialSchema) { // Initialize builder from the current table schema - this.builder = Schema.newBuilder().fromSchema(tableInfo.getSchema()); + this.builder = Schema.newBuilder().fromSchema(initialSchema); } public Schema getSchema() { @@ -50,7 +49,7 @@ public Schema getSchema() { return builder.build(); } - public SchemaUpdate applySchemaChange(TableChange columnChange) { + private SchemaUpdate applySchemaChange(TableChange columnChange) { if (columnChange instanceof TableChange.AddColumn) { return addColumn((TableChange.AddColumn) columnChange); } else if (columnChange instanceof TableChange.ModifyColumn) { @@ -69,13 +68,8 @@ private SchemaUpdate addColumn(TableChange.AddColumn addColumn) { Schema.Column existingColumn = builder.getColumn(addColumn.getName()).orElse(null); if (existingColumn != null) { - if (!existingColumn.getDataType().equals(addColumn.getDataType()) - || !Objects.equals( - existingColumn.getComment().orElse(null), addColumn.getComment())) { - throw new IllegalArgumentException( - "Column " + addColumn.getName() + " already exists."); - } - return this; + throw new InvalidAlterTableException( + "Column " + addColumn.getName() + " already exists."); } if (addColumn.getPosition() != TableChange.ColumnPosition.last()) { From f437dd93d9d258305e5b2a679bb25ea60b330372 Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Wed, 28 Jan 2026 17:14:16 +0800 Subject: [PATCH 3/3] fix test error --- .../org/apache/fluss/lake/lakestorage/LakeCatalog.java | 5 ++++- .../fluss/lake/paimon/utils/PaimonTableValidation.java | 2 +- .../fluss/server/coordinator/CoordinatorService.java | 9 +++++++-- .../apache/fluss/server/coordinator/MetadataManager.java | 2 +- 4 files changed, 13 insertions(+), 5 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java index 8a05fd97f2..7fa2f3ea14 100644 --- a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java +++ b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java @@ -25,6 +25,8 @@ import org.apache.fluss.metadata.TablePath; import org.apache.fluss.security.acl.FlussPrincipal; +import javax.annotation.Nullable; + import java.util.List; /** @@ -88,9 +90,10 @@ interface Context { /** * Get the current table info of fluss. * - * @return the current table info of fluss. + * @return the current table info of fluss. Null if the table does not exist. * @since 0.10 */ + @Nullable TableDescriptor getCurrentTable(); /** Get the expected table info of fluss. */ diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java index 62ff19cc61..a15feefebc 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java @@ -107,7 +107,7 @@ public static boolean equalIgnoreSystemColumnTimestampPrecision( systemTimestampField.description())); } existingSchema = existingSchema.copy(RowType.of(existingFields.toArray(new DataField[0]))); - return existingSchema.equals(newSchema); + return equalIgnoreFieldId(existingSchema, newSchema); } private static boolean equalIgnoreFieldId(Schema existingSchema, Schema newSchema) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index 2d32ea5771..88b1d5b088 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -1011,16 +1011,20 @@ static class DefaultLakeCatalogContext implements LakeCatalog.Context { private final boolean isCreatingFlussTable; private final FlussPrincipal flussPrincipal; - private final TableDescriptor currentTable; + @Nullable private final TableDescriptor currentTable; private final TableDescriptor expectedTable; public DefaultLakeCatalogContext( boolean isCreatingFlussTable, FlussPrincipal flussPrincipal, - TableDescriptor currentTable, + @Nullable TableDescriptor currentTable, TableDescriptor expectedTable) { this.isCreatingFlussTable = isCreatingFlussTable; this.flussPrincipal = flussPrincipal; + if (!isCreatingFlussTable) { + checkNotNull( + currentTable, "currentTable must be provided when altering a Fluss table."); + } this.currentTable = currentTable; this.expectedTable = expectedTable; } @@ -1035,6 +1039,7 @@ public FlussPrincipal getFlussPrincipal() { return flussPrincipal; } + @Nullable @Override public TableDescriptor getCurrentTable() { return currentTable; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index df411bb1e9..02a02cc44c 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -463,7 +463,7 @@ private void preAlterTableProperties( FlussPrincipal flussPrincipal) { LakeCatalog.Context lakeCatalogContext = new CoordinatorService.DefaultLakeCatalogContext( - false, flussPrincipal, null, newDescriptor); + false, flussPrincipal, tableDescriptor, newDescriptor); LakeCatalog lakeCatalog = lakeCatalogDynamicLoader.getLakeCatalogContainer().getLakeCatalog();