diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java index 5560e14eea..5dff27b587 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java @@ -436,6 +436,22 @@ void testAlterTableColumn() throws Exception { DataTypeChecks.equalsWithFieldId( schemaInfo.getSchema().getRowType(), expectedSchema.getRowType())) .isTrue(); + + assertThatThrownBy( + () -> + admin.alterTable( + tablePath, + Collections.singletonList( + TableChange.addColumn( + "nested_row", + DataTypes.ROW( + DataTypes.STRING(), + DataTypes.INT()), + "new nested column", + TableChange.ColumnPosition.last())), + false) + .get()) + .hasMessageContaining("Column nested_row already exists"); } @Test diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java index 4cbccb6c1f..7fa2f3ea14 100644 --- a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java +++ b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java @@ -25,6 +25,8 @@ import org.apache.fluss.metadata.TablePath; import org.apache.fluss.security.acl.FlussPrincipal; +import javax.annotation.Nullable; + import java.util.List; /** @@ -84,5 +86,17 @@ interface Context { /** Get the fluss principal currently accessing the catalog. */ FlussPrincipal getFlussPrincipal(); + + /** + * Get the current table info of fluss. + * + * @return the current table info of fluss. Null if the table does not exist. + * @since 0.10 + */ + @Nullable + TableDescriptor getCurrentTable(); + + /** Get the expected table info of fluss. */ + TableDescriptor getExpectedTable(); } } diff --git a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java index 7406b13b6b..d3efd712ea 100644 --- a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java +++ b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java @@ -17,11 +17,28 @@ package org.apache.fluss.lake.lakestorage; +import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.security.acl.FlussPrincipal; /** A testing implementation of {@link LakeCatalog.Context}. */ public class TestingLakeCatalogContext implements LakeCatalog.Context { + private final TableDescriptor currentTable; + private final TableDescriptor expectedTable; + + public TestingLakeCatalogContext(TableDescriptor tableDescriptor) { + this(tableDescriptor, tableDescriptor); + } + + public TestingLakeCatalogContext(TableDescriptor currentTable, TableDescriptor expectedTable) { + this.currentTable = currentTable; + this.expectedTable = expectedTable; + } + + public TestingLakeCatalogContext() { + this(null); + } + @Override public boolean isCreatingFlussTable() { return false; @@ -31,4 +48,14 @@ public boolean isCreatingFlussTable() { public FlussPrincipal getFlussPrincipal() { return null; } + + @Override + public TableDescriptor getCurrentTable() { + return currentTable; + } + + @Override + public TableDescriptor getExpectedTable() { + return expectedTable; + } } diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java index 500546e641..0e662a5fea 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java @@ -44,12 +44,13 @@ import java.util.LinkedHashMap; import java.util.List; +import java.util.stream.Collectors; import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon; import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchema; import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchemaChanges; import static org.apache.fluss.lake.paimon.utils.PaimonTableValidation.checkTableIsEmpty; -import static org.apache.fluss.lake.paimon.utils.PaimonTableValidation.validatePaimonSchemaCompatible; +import static org.apache.fluss.lake.paimon.utils.PaimonTableValidation.isPaimonSchemaCompatible; import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME; import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; @@ -112,17 +113,41 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Co public void alterTable(TablePath tablePath, List tableChanges, Context context) throws TableNotExistException { try { - List paimonSchemaChanges = toPaimonSchemaChanges(tableChanges); + Table table = paimonCatalog.getTable(toPaimon(tablePath)); + FileStoreTable fileStoreTable = (FileStoreTable) table; + Schema currentPaimonSchema = fileStoreTable.schema().toSchema(); - // Compare current Paimon table schema with expected target schema before altering - if (shouldAlterTable(tablePath, tableChanges)) { - alterTable(tablePath, paimonSchemaChanges); + List paimonSchemaChanges; + if (isPaimonSchemaCompatible( + currentPaimonSchema, toPaimonSchema(context.getCurrentTable()))) { + // if the paimon schema is same as current fluss schema, directly apply all the + // changes. + paimonSchemaChanges = toPaimonSchemaChanges(tableChanges); + } else if (isPaimonSchemaCompatible( + currentPaimonSchema, toPaimonSchema(context.getExpectedTable()))) { + // if the schema is same as applied fluss schema , skip adding columns. + paimonSchemaChanges = + toPaimonSchemaChanges( + tableChanges.stream() + .filter( + tableChange -> + !(tableChange + instanceof TableChange.AddColumn)) + .collect(Collectors.toList())); } else { - // If schemas already match, treat as idempotent success - LOG.info( - "Skipping schema evolution for Paimon table {} because the column(s) to add {} already exist.", - tablePath, - tableChanges); + throw new InvalidAlterTableException( + String.format( + "Paimon schema is not compatible with Fluss schema: " + + "Paimon schema: %s, Fluss schema: %s. " + + "therefore you need to add the diff columns all at once, " + + "rather than applying other table changes: %s.", + currentPaimonSchema, + context.getCurrentTable().getSchema(), + tableChanges)); + } + + if (!paimonSchemaChanges.isEmpty()) { + alterTable(tablePath, paimonSchemaChanges); } } catch (Catalog.ColumnAlreadyExistException e) { // This shouldn't happen if shouldAlterTable works correctly, but keep as safeguard @@ -130,6 +155,8 @@ public void alterTable(TablePath tablePath, List tableChanges, Cont } catch (Catalog.ColumnNotExistException e) { // This shouldn't happen for AddColumn operations throw new InvalidAlterTableException(e.getMessage()); + } catch (Catalog.TableNotExistException e) { + throw new TableNotExistException("Table " + tablePath + " does not exist."); } } @@ -209,8 +236,15 @@ private void createTable(TablePath tablePath, Schema schema, boolean isCreatingF try { Table table = paimonCatalog.getTable(paimonPath); FileStoreTable fileStoreTable = (FileStoreTable) table; - validatePaimonSchemaCompatible( - paimonPath, fileStoreTable.schema().toSchema(), schema); + Schema existingSchema = fileStoreTable.schema().toSchema(); + if (!isPaimonSchemaCompatible(existingSchema, schema)) { + throw new TableAlreadyExistException( + String.format( + "The table %s already exists in Paimon catalog, but the table schema is not compatible. " + + "Existing schema: %s, new schema: %s. " + + "Please first drop the table in Paimon catalog or use a new table name.", + paimonPath.getEscapedFullName(), existingSchema, schema)); + } // if creating a new fluss table, we should ensure the lake table is empty if (isCreatingFlussTable) { checkTableIsEmpty(tablePath, fileStoreTable); diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java index ded40ac59b..ee6d0f0d7c 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java @@ -57,6 +57,7 @@ public class PaimonConversions { // for fluss config public static final String FLUSS_CONF_PREFIX = "fluss."; + public static final String TABLE_DATALAKE_PAIMON_PREFIX = "table.datalake.paimon."; // for paimon config private static final String PAIMON_CONF_PREFIX = "paimon."; @@ -261,7 +262,7 @@ private static void setPaimonDefaultProperties(Options options) { private static void setFlussPropertyToPaimon(String key, String value, Options options) { if (key.startsWith(PAIMON_CONF_PREFIX)) { options.set(key.substring(PAIMON_CONF_PREFIX.length()), value); - } else { + } else if (!key.startsWith(TABLE_DATALAKE_PAIMON_PREFIX)) { options.set(FLUSS_CONF_PREFIX + key, value); } } diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java index 94580df179..a15feefebc 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java @@ -21,7 +21,6 @@ import org.apache.fluss.metadata.TablePath; import org.apache.paimon.CoreOptions; -import org.apache.paimon.catalog.Identifier; import org.apache.paimon.options.ConfigOption; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.FileStoreTable; @@ -34,6 +33,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import static org.apache.fluss.lake.paimon.utils.PaimonConversions.FLUSS_CONF_PREFIX; import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; @@ -43,8 +43,7 @@ public class PaimonTableValidation { private static final Map> PAIMON_CONFIGS = extractPaimonConfigs(); - public static void validatePaimonSchemaCompatible( - Identifier tablePath, Schema existingSchema, Schema newSchema) { + public static boolean isPaimonSchemaCompatible(Schema existingSchema, Schema newSchema) { // Adjust options for comparison Map existingOptions = existingSchema.options(); Map newOptions = newSchema.options(); @@ -66,21 +65,16 @@ public static void validatePaimonSchemaCompatible( // ignore the existing options that are not in new options existingOptions.entrySet().removeIf(entry -> !newOptions.containsKey(entry.getKey())); - if (!existingSchema.equals(newSchema)) { + // ignore the fields because newSchema is referred by fluss schema, whose field id maybe not + // same as existingSchema. + if (!equalIgnoreFieldId(existingSchema, newSchema)) { // Allow different precisions for __timestamp column for backward compatibility, // old cluster will use precision 6, but new cluster will use precision 3, // we allow such precision difference - if (equalIgnoreSystemColumnTimestampPrecision(existingSchema, newSchema)) { - return; - } - - throw new TableAlreadyExistException( - String.format( - "The table %s already exists in Paimon catalog, but the table schema is not compatible. " - + "Existing schema: %s, new schema: %s. " - + "Please first drop the table in Paimon catalog or use a new table name.", - tablePath.getEscapedFullName(), existingSchema, newSchema)); + return equalIgnoreSystemColumnTimestampPrecision(existingSchema, newSchema); } + + return true; } /** @@ -96,7 +90,7 @@ public static void validatePaimonSchemaCompatible( * @param newSchema the new schema descriptor generated by the current Fluss cluster * @return true if the schemas are identical, disregarding the precision of the system timestamp */ - private static boolean equalIgnoreSystemColumnTimestampPrecision( + public static boolean equalIgnoreSystemColumnTimestampPrecision( Schema existingSchema, Schema newSchema) { List existingFields = new ArrayList<>(existingSchema.fields()); DataField systemTimestampField = existingFields.get(existingFields.size() - 1); @@ -113,7 +107,24 @@ private static boolean equalIgnoreSystemColumnTimestampPrecision( systemTimestampField.description())); } existingSchema = existingSchema.copy(RowType.of(existingFields.toArray(new DataField[0]))); - return existingSchema.equals(newSchema); + return equalIgnoreFieldId(existingSchema, newSchema); + } + + private static boolean equalIgnoreFieldId(Schema existingSchema, Schema newSchema) { + List existingFields = existingSchema.fields(); + List newFields = newSchema.fields(); + for (int i = 0; i < existingFields.size(); i++) { + DataField existingField = existingFields.get(i); + DataField newField = newFields.get(i); + if (!existingField.equalsIgnoreFieldId(newField)) { + return false; + } + } + + return Objects.equals(existingSchema.partitionKeys(), newSchema.partitionKeys()) + && Objects.equals(existingSchema.primaryKeys(), newSchema.primaryKeys()) + && Objects.equals(existingSchema.options(), newSchema.options()) + && Objects.equals(existingSchema.comment(), newSchema.comment()); } private static void removeChangeableOptions(Map options) { diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java index ea18b85f6a..94f51392cc 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java @@ -25,23 +25,38 @@ import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.coordinator.SchemaUpdate; import org.apache.fluss.types.DataTypes; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import java.io.File; +import java.util.Arrays; import java.util.Collections; import java.util.List; +import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED; +import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_FORMAT; +import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Unit test for {@link PaimonLakeCatalog}. */ class PaimonLakeCatalogTest { + private static final Schema FLUSS_SCHEMA = + Schema.newBuilder() + .column("id", DataTypes.BIGINT()) + .column("name", DataTypes.STRING()) + .column("amount", DataTypes.INT()) + .column("address", DataTypes.STRING()) + .build(); + private static final TestingLakeCatalogContext LAKE_CATALOG_CONTEXT = + new TestingLakeCatalogContext(TableDescriptor.builder().schema(FLUSS_SCHEMA).build()); @TempDir private File tempWarehouseDir; @@ -70,7 +85,7 @@ void testAlterTableProperties() throws Exception { flussPaimonCatalog.alterTable( tablePath, Collections.singletonList(TableChange.set("key", "value")), - new TestingLakeCatalogContext()); + LAKE_CATALOG_CONTEXT); table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier); // we have set the value for key @@ -80,7 +95,7 @@ void testAlterTableProperties() throws Exception { flussPaimonCatalog.alterTable( tablePath, Collections.singletonList(TableChange.reset("key")), - new TestingLakeCatalogContext()); + LAKE_CATALOG_CONTEXT); table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier); // we have reset the value for key @@ -89,14 +104,13 @@ void testAlterTableProperties() throws Exception { @Test void alterTablePropertiesWithNonExistentTable() { - TestingLakeCatalogContext context = new TestingLakeCatalogContext(); // db & table don't exist assertThatThrownBy( () -> flussPaimonCatalog.alterTable( TablePath.of("non_existing_db", "non_existing_table"), Collections.singletonList(TableChange.set("key", "value")), - context)) + LAKE_CATALOG_CONTEXT)) .isInstanceOf(TableNotExistException.class) .hasMessage("Table non_existing_db.non_existing_table does not exist."); @@ -110,7 +124,7 @@ void alterTablePropertiesWithNonExistentTable() { flussPaimonCatalog.alterTable( TablePath.of(database, "non_existing_table"), Collections.singletonList(TableChange.set("key", "value")), - context)) + LAKE_CATALOG_CONTEXT)) .isInstanceOf(TableNotExistException.class) .hasMessage("Table alter_props_db.non_existing_table does not exist."); } @@ -131,7 +145,7 @@ void testAlterTableAddColumnLastNullable() throws Exception { "new_col comment", TableChange.ColumnPosition.last())); - flussPaimonCatalog.alterTable(tablePath, changes, new TestingLakeCatalogContext()); + flussPaimonCatalog.alterTable(tablePath, changes, LAKE_CATALOG_CONTEXT); Table table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier); assertThat(table.rowType().getFieldNames()) @@ -164,7 +178,7 @@ void testAlterTableAddColumnNotLast() { assertThatThrownBy( () -> flussPaimonCatalog.alterTable( - tablePath, changes, new TestingLakeCatalogContext())) + tablePath, changes, LAKE_CATALOG_CONTEXT)) .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Only support to add column at last for paimon table."); } @@ -187,18 +201,17 @@ void testAlterTableAddColumnNotNullable() { assertThatThrownBy( () -> flussPaimonCatalog.alterTable( - tablePath, changes, new TestingLakeCatalogContext())) + tablePath, changes, LAKE_CATALOG_CONTEXT)) .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Only support to add nullable column for paimon table."); } @Test - void testAlterTableAddExistingColumn() { + void testAlterTableAddExistingColumns() throws Exception { String database = "test_alter_table_add_existing_column_db"; String tableName = "test_alter_table_add_existing_column_table"; TablePath tablePath = TablePath.of(database, tableName); createTable(database, tableName); - List changes = Collections.singletonList( TableChange.addColumn( @@ -207,13 +220,40 @@ void testAlterTableAddExistingColumn() { null, TableChange.ColumnPosition.last())); - // no exception thrown when adding existing column - flussPaimonCatalog.alterTable(tablePath, changes, new TestingLakeCatalogContext()); + assertThatThrownBy( + () -> + flussPaimonCatalog.alterTable( + tablePath, + changes, + getLakeCatalogContext(FLUSS_SCHEMA, changes))) + .isInstanceOf(InvalidAlterTableException.class) + .hasMessageContaining("Column address already exists"); List changes2 = - Collections.singletonList( + Arrays.asList( TableChange.addColumn( - "address", + "new_column", + DataTypes.INT(), + null, + TableChange.ColumnPosition.last()), + TableChange.addColumn( + "new_column2", + DataTypes.STRING(), + null, + TableChange.ColumnPosition.last())); + + // mock add columns to paimon successfully but fail to add columns to fluss. + flussPaimonCatalog.alterTable( + tablePath, changes2, getLakeCatalogContext(FLUSS_SCHEMA, changes2)); + List changes3 = + Arrays.asList( + TableChange.addColumn( + "new_column", + DataTypes.INT(), + null, + TableChange.ColumnPosition.last()), + TableChange.addColumn( + "new_column2", DataTypes.INT(), null, TableChange.ColumnPosition.last())); @@ -221,15 +261,25 @@ void testAlterTableAddExistingColumn() { assertThatThrownBy( () -> flussPaimonCatalog.alterTable( - tablePath, changes2, new TestingLakeCatalogContext())) + tablePath, + changes3, + getLakeCatalogContext(FLUSS_SCHEMA, changes3))) .isInstanceOf(InvalidAlterTableException.class) - .hasMessage( - "Column 'address' already exists but with different type. Existing: STRING, Expected: INT"); - - List changes3 = - Collections.singletonList( + .hasMessageContaining("Paimon schema is not compatible with Fluss schema") + .hasMessageContaining( + String.format( + "therefore you need to add the diff columns all at once, rather than applying other table changes: %s.", + changes3)); + + List changes4 = + Arrays.asList( TableChange.addColumn( - "address", + "new_column", + DataTypes.INT(), + null, + TableChange.ColumnPosition.last()), + TableChange.addColumn( + "new_column2", DataTypes.STRING(), "the address comment", TableChange.ColumnPosition.last())); @@ -237,29 +287,108 @@ tablePath, changes2, new TestingLakeCatalogContext())) assertThatThrownBy( () -> flussPaimonCatalog.alterTable( - tablePath, changes3, new TestingLakeCatalogContext())) + tablePath, + changes4, + getLakeCatalogContext(FLUSS_SCHEMA, changes4))) .isInstanceOf(InvalidAlterTableException.class) - .hasMessage( - "Column address already exists but with different comment. Existing: null, Expected: the address comment"); + .hasMessageContaining("Paimon schema is not compatible with Fluss schema") + .hasMessageContaining( + String.format( + "therefore you need to add the diff columns all at once, rather than applying other table changes: %s.", + changes4)); + + // no exception thrown only when adding existing column to match fluss and paimon. + flussPaimonCatalog.alterTable( + tablePath, changes2, getLakeCatalogContext(FLUSS_SCHEMA, changes2)); } - private void createTable(String database, String tableName) { - Schema flussSchema = + @Test + void testAlterTableAddColumnWhenPaimonSchemaNotMatch() throws Exception { + // this rarely happens only when new fluss lake table with an existed paimon table or use + // alter table in paimon side directly. + String database = "test_alter_table_add_column_fluss_wider"; + String tableName = "test_alter_table_add_column_fluss_wider"; + createTable(database, tableName); + TablePath tablePath = TablePath.of(database, tableName); + org.apache.paimon.schema.Schema paimonSchema = + ((FileStoreTable) + flussPaimonCatalog.getPaimonCatalog().getTable(toPaimon(tablePath))) + .schema() + .toSchema(); + + List changes = + Collections.singletonList( + TableChange.addColumn( + "new_col", + DataTypes.INT(), + "new_col comment", + TableChange.ColumnPosition.last())); + + // test column number mismatch. + Schema widerFlussSchema = Schema.newBuilder() .column("id", DataTypes.BIGINT()) .column("name", DataTypes.STRING()) .column("amount", DataTypes.INT()) .column("address", DataTypes.STRING()) + .column("phone", DataTypes.INT()) .build(); - - TableDescriptor td = - TableDescriptor.builder() - .schema(flussSchema) - .distributedBy(3) // no bucket key + assertThatThrownBy( + () -> + flussPaimonCatalog.alterTable( + tablePath, + changes, + getLakeCatalogContext(widerFlussSchema, changes))) + .isInstanceOf(InvalidAlterTableException.class) + .hasMessageContaining("Paimon schema is not compatible with Fluss schema") + .hasMessageContaining( + String.format( + "therefore you need to add the diff columns all at once, rather than applying other table changes: %s.", + changes)); + + // test column order mismatch. + Schema disorderflussSchema = + Schema.newBuilder() + .column("id", DataTypes.BIGINT()) + .column("amount", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .column("address", DataTypes.STRING()) .build(); + assertThatThrownBy( + () -> + flussPaimonCatalog.alterTable( + tablePath, + changes, + getLakeCatalogContext(disorderflussSchema, changes))) + .isInstanceOf(InvalidAlterTableException.class) + .hasMessageContaining("Paimon schema is not compatible with Fluss schema") + .hasMessageContaining( + String.format( + "therefore you need to add the diff columns all at once, rather than applying other table changes: %s.", + changes)); + } + private void createTable(String database, String tableName) { + TableDescriptor td = getTableDescriptor(FLUSS_SCHEMA); TablePath tablePath = TablePath.of(database, tableName); - flussPaimonCatalog.createTable(tablePath, td, new TestingLakeCatalogContext()); + flussPaimonCatalog.createTable(tablePath, td, LAKE_CATALOG_CONTEXT); + } + + private TestingLakeCatalogContext getLakeCatalogContext( + Schema schema, List schemaChanges) { + Schema expectedSchema = SchemaUpdate.applySchemaChanges(schema, schemaChanges); + return new TestingLakeCatalogContext( + getTableDescriptor(schema), getTableDescriptor(expectedSchema)); + } + + private TableDescriptor getTableDescriptor(Schema schema) { + return TableDescriptor.builder() + .schema(schema) + .property(TABLE_DATALAKE_ENABLED.key(), "true") + .property(TABLE_DATALAKE_FORMAT.key(), "paimon") + .property("table.datalake.paimon.warehouse", tempWarehouseDir.toURI().toString()) + .distributedBy(3) // no bucket key + .build(); } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index 6a846076ac..88b1d5b088 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -389,7 +389,10 @@ public CompletableFuture createTable(CreateTableRequest req tablePath, tableDescriptor, new DefaultLakeCatalogContext( - true, currentSession().getPrincipal())); + true, + currentSession().getPrincipal(), + null, + tableDescriptor)); } catch (TableAlreadyExistException e) { throw new LakeTableAlreadyExistException(e.getMessage(), e); } @@ -420,15 +423,12 @@ public CompletableFuture alterTable(AlterTableRequest reques + "table properties or table schema."); } - LakeCatalog.Context lakeCatalogContext = - new DefaultLakeCatalogContext(false, currentSession().getPrincipal()); - if (!alterSchemaChanges.isEmpty()) { metadataManager.alterTableSchema( tablePath, alterSchemaChanges, request.isIgnoreIfNotExists(), - lakeCatalogContext); + currentSession().getPrincipal()); } if (!alterTableConfigChanges.isEmpty()) { @@ -437,7 +437,7 @@ public CompletableFuture alterTable(AlterTableRequest reques alterTableConfigChanges, tablePropertyChanges, request.isIgnoreIfNotExists(), - lakeCatalogContext); + currentSession().getPrincipal()); } return CompletableFuture.completedFuture(new AlterTableResponse()); @@ -1011,11 +1011,22 @@ static class DefaultLakeCatalogContext implements LakeCatalog.Context { private final boolean isCreatingFlussTable; private final FlussPrincipal flussPrincipal; + @Nullable private final TableDescriptor currentTable; + private final TableDescriptor expectedTable; public DefaultLakeCatalogContext( - boolean isCreatingFlussTable, FlussPrincipal flussPrincipal) { + boolean isCreatingFlussTable, + FlussPrincipal flussPrincipal, + @Nullable TableDescriptor currentTable, + TableDescriptor expectedTable) { this.isCreatingFlussTable = isCreatingFlussTable; this.flussPrincipal = flussPrincipal; + if (!isCreatingFlussTable) { + checkNotNull( + currentTable, "currentTable must be provided when altering a Fluss table."); + } + this.currentTable = currentTable; + this.expectedTable = expectedTable; } @Override @@ -1027,6 +1038,17 @@ public boolean isCreatingFlussTable() { public FlussPrincipal getFlussPrincipal() { return flussPrincipal; } + + @Nullable + @Override + public TableDescriptor getCurrentTable() { + return currentTable; + } + + @Override + public TableDescriptor getExpectedTable() { + return expectedTable; + } } // ================================================================================== diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index 8d7b61a088..02a02cc44c 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -45,6 +45,7 @@ import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePartition; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.security.acl.FlussPrincipal; import org.apache.fluss.server.entity.TablePropertyChanges; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.data.DatabaseRegistration; @@ -325,16 +326,23 @@ public void alterTableSchema( TablePath tablePath, List schemaChanges, boolean ignoreIfNotExists, - LakeCatalog.Context lakeCatalogContext) + FlussPrincipal flussPrincipal) throws TableNotExistException, TableNotPartitionedException { try { TableInfo table = getTable(tablePath); + TableDescriptor tableDescriptor = table.toTableDescriptor(); // validate the table column changes if (!schemaChanges.isEmpty()) { - Schema newSchema = SchemaUpdate.applySchemaChanges(table, schemaChanges); - + Schema newSchema = + SchemaUpdate.applySchemaChanges(table.getSchema(), schemaChanges); + LakeCatalog.Context lakeCatalogContext = + new CoordinatorService.DefaultLakeCatalogContext( + false, + flussPrincipal, + tableDescriptor, + TableDescriptor.builder(tableDescriptor).schema(newSchema).build()); // Lake First: sync to Lake before updating Fluss schema syncSchemaChangesToLake(tablePath, table, schemaChanges, lakeCatalogContext); @@ -396,7 +404,7 @@ public void alterTableProperties( List tableChanges, TablePropertyChanges tablePropertyChanges, boolean ignoreIfNotExists, - LakeCatalog.Context lakeCatalogContext) { + FlussPrincipal flussPrincipal) { try { // it throws TableNotExistException if the table or database not exists TableRegistration tableReg = getTableRegistration(tablePath); @@ -422,11 +430,7 @@ public void alterTableProperties( // pre alter table properties, e.g. create lake table in lake storage if it's to // enable datalake for the table preAlterTableProperties( - tablePath, - tableDescriptor, - newDescriptor, - tableChanges, - lakeCatalogContext); + tablePath, tableDescriptor, newDescriptor, tableChanges, flussPrincipal); // update the table to zk TableRegistration updatedTableRegistration = tableReg.newProperties( @@ -456,7 +460,10 @@ private void preAlterTableProperties( TableDescriptor tableDescriptor, TableDescriptor newDescriptor, List tableChanges, - LakeCatalog.Context lakeCatalogContext) { + FlussPrincipal flussPrincipal) { + LakeCatalog.Context lakeCatalogContext = + new CoordinatorService.DefaultLakeCatalogContext( + false, flussPrincipal, tableDescriptor, newDescriptor); LakeCatalog lakeCatalog = lakeCatalogDynamicLoader.getLakeCatalogContainer().getLakeCatalog(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java index 011871a0ba..d1ad0c16c2 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java @@ -17,20 +17,19 @@ package org.apache.fluss.server.coordinator; +import org.apache.fluss.exception.InvalidAlterTableException; import org.apache.fluss.exception.SchemaChangeException; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableChange; -import org.apache.fluss.metadata.TableInfo; import java.util.List; -import java.util.Objects; /** Schema update. */ public class SchemaUpdate { /** Apply schema changes to the given table info and return the updated schema. */ - public static Schema applySchemaChanges(TableInfo tableInfo, List changes) { - SchemaUpdate schemaUpdate = new SchemaUpdate(tableInfo); + public static Schema applySchemaChanges(Schema initialSchema, List changes) { + SchemaUpdate schemaUpdate = new SchemaUpdate(initialSchema); for (TableChange change : changes) { schemaUpdate = schemaUpdate.applySchemaChange(change); } @@ -40,9 +39,9 @@ public static Schema applySchemaChanges(TableInfo tableInfo, List c // Now we only maintain the Builder private final Schema.Builder builder; - public SchemaUpdate(TableInfo tableInfo) { + public SchemaUpdate(Schema initialSchema) { // Initialize builder from the current table schema - this.builder = Schema.newBuilder().fromSchema(tableInfo.getSchema()); + this.builder = Schema.newBuilder().fromSchema(initialSchema); } public Schema getSchema() { @@ -50,7 +49,7 @@ public Schema getSchema() { return builder.build(); } - public SchemaUpdate applySchemaChange(TableChange columnChange) { + private SchemaUpdate applySchemaChange(TableChange columnChange) { if (columnChange instanceof TableChange.AddColumn) { return addColumn((TableChange.AddColumn) columnChange); } else if (columnChange instanceof TableChange.ModifyColumn) { @@ -69,13 +68,8 @@ private SchemaUpdate addColumn(TableChange.AddColumn addColumn) { Schema.Column existingColumn = builder.getColumn(addColumn.getName()).orElse(null); if (existingColumn != null) { - if (!existingColumn.getDataType().equals(addColumn.getDataType()) - || !Objects.equals( - existingColumn.getComment().orElse(null), addColumn.getComment())) { - throw new IllegalArgumentException( - "Column " + addColumn.getName() + " already exists."); - } - return this; + throw new InvalidAlterTableException( + "Column " + addColumn.getName() + " already exists."); } if (addColumn.getPosition() != TableChange.ColumnPosition.last()) {