Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,22 @@ void testAlterTableColumn() throws Exception {
DataTypeChecks.equalsWithFieldId(
schemaInfo.getSchema().getRowType(), expectedSchema.getRowType()))
.isTrue();

assertThatThrownBy(
() ->
admin.alterTable(
tablePath,
Collections.singletonList(
TableChange.addColumn(
"nested_row",
DataTypes.ROW(
DataTypes.STRING(),
DataTypes.INT()),
"new nested column",
TableChange.ColumnPosition.last())),
false)
.get())
.hasMessageContaining("Column nested_row already exists");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,16 @@ interface Context {

/** Get the fluss principal currently accessing the catalog. */
FlussPrincipal getFlussPrincipal();

/**
* Get the current table info of fluss.
*
* @return the current table info of fluss.
* @since 0.10
*/
TableDescriptor getCurrentTable();

/** Get the expected table info of fluss. */
TableDescriptor getExpectedTable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,28 @@

package org.apache.fluss.lake.lakestorage;

import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.security.acl.FlussPrincipal;

/** A testing implementation of {@link LakeCatalog.Context}. */
public class TestingLakeCatalogContext implements LakeCatalog.Context {

private final TableDescriptor currentTable;
private final TableDescriptor expectedTable;

public TestingLakeCatalogContext(TableDescriptor tableDescriptor) {
this(tableDescriptor, tableDescriptor);
}

public TestingLakeCatalogContext(TableDescriptor currentTable, TableDescriptor expectedTable) {
this.currentTable = currentTable;
this.expectedTable = expectedTable;
}

public TestingLakeCatalogContext() {
this(null);
}

@Override
public boolean isCreatingFlussTable() {
return false;
Expand All @@ -31,4 +48,14 @@ public boolean isCreatingFlussTable() {
public FlussPrincipal getFlussPrincipal() {
return null;
}

@Override
public TableDescriptor getCurrentTable() {
return currentTable;
}

@Override
public TableDescriptor getExpectedTable() {
return expectedTable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@

import java.util.LinkedHashMap;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchema;
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchemaChanges;
import static org.apache.fluss.lake.paimon.utils.PaimonTableValidation.checkTableIsEmpty;
import static org.apache.fluss.lake.paimon.utils.PaimonTableValidation.validatePaimonSchemaCompatible;
import static org.apache.fluss.lake.paimon.utils.PaimonTableValidation.isPaimonSchemaCompatible;
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
Expand Down Expand Up @@ -112,24 +113,50 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Co
public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Context context)
throws TableNotExistException {
try {
List<SchemaChange> paimonSchemaChanges = toPaimonSchemaChanges(tableChanges);
Table table = paimonCatalog.getTable(toPaimon(tablePath));
FileStoreTable fileStoreTable = (FileStoreTable) table;
Schema currentPaimonSchema = fileStoreTable.schema().toSchema();

// Compare current Paimon table schema with expected target schema before altering
if (shouldAlterTable(tablePath, tableChanges)) {
alterTable(tablePath, paimonSchemaChanges);
List<SchemaChange> paimonSchemaChanges;
if (isPaimonSchemaCompatible(
currentPaimonSchema, toPaimonSchema(context.getCurrentTable()))) {
// if the paimon schema is same as current fluss schema, directly apply all the
// changes.
paimonSchemaChanges = toPaimonSchemaChanges(tableChanges);
} else if (isPaimonSchemaCompatible(
currentPaimonSchema, toPaimonSchema(context.getExpectedTable()))) {
// if the schema is same as applied fluss schema , skip adding columns.
paimonSchemaChanges =
toPaimonSchemaChanges(
tableChanges.stream()
.filter(
tableChange ->
!(tableChange
instanceof TableChange.AddColumn))
.collect(Collectors.toList()));
} else {
// If schemas already match, treat as idempotent success
LOG.info(
"Skipping schema evolution for Paimon table {} because the column(s) to add {} already exist.",
tablePath,
tableChanges);
throw new InvalidAlterTableException(
String.format(
"Paimon schema is not compatible with Fluss schema: "
+ "Paimon schema: %s, Fluss schema: %s. "
+ "therefore you need to add the diff columns all at once, "
+ "rather than applying other table changes: %s.",
currentPaimonSchema,
context.getCurrentTable().getSchema(),
tableChanges));
}

if (!paimonSchemaChanges.isEmpty()) {
alterTable(tablePath, paimonSchemaChanges);
}
} catch (Catalog.ColumnAlreadyExistException e) {
// This shouldn't happen if shouldAlterTable works correctly, but keep as safeguard
throw new InvalidAlterTableException(e.getMessage());
} catch (Catalog.ColumnNotExistException e) {
// This shouldn't happen for AddColumn operations
throw new InvalidAlterTableException(e.getMessage());
} catch (Catalog.TableNotExistException e) {
throw new TableNotExistException("Table " + tablePath + " does not exist.");
}
}

Expand Down Expand Up @@ -209,8 +236,15 @@ private void createTable(TablePath tablePath, Schema schema, boolean isCreatingF
try {
Table table = paimonCatalog.getTable(paimonPath);
FileStoreTable fileStoreTable = (FileStoreTable) table;
validatePaimonSchemaCompatible(
paimonPath, fileStoreTable.schema().toSchema(), schema);
Schema existingSchema = fileStoreTable.schema().toSchema();
if (!isPaimonSchemaCompatible(existingSchema, schema)) {
throw new TableAlreadyExistException(
String.format(
"The table %s already exists in Paimon catalog, but the table schema is not compatible. "
+ "Existing schema: %s, new schema: %s. "
+ "Please first drop the table in Paimon catalog or use a new table name.",
paimonPath.getEscapedFullName(), existingSchema, schema));
}
// if creating a new fluss table, we should ensure the lake table is empty
if (isCreatingFlussTable) {
checkTableIsEmpty(tablePath, fileStoreTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class PaimonConversions {

// for fluss config
public static final String FLUSS_CONF_PREFIX = "fluss.";
public static final String TABLE_DATALAKE_PAIMON_PREFIX = "table.datalake.paimon.";
// for paimon config
private static final String PAIMON_CONF_PREFIX = "paimon.";

Expand Down Expand Up @@ -261,7 +262,7 @@ private static void setPaimonDefaultProperties(Options options) {
private static void setFlussPropertyToPaimon(String key, String value, Options options) {
if (key.startsWith(PAIMON_CONF_PREFIX)) {
options.set(key.substring(PAIMON_CONF_PREFIX.length()), value);
} else {
} else if (!key.startsWith(TABLE_DATALAKE_PAIMON_PREFIX)) {
options.set(FLUSS_CONF_PREFIX + key, value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.fluss.metadata.TablePath;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
Expand All @@ -34,6 +33,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.apache.fluss.lake.paimon.utils.PaimonConversions.FLUSS_CONF_PREFIX;
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
Expand All @@ -43,8 +43,7 @@ public class PaimonTableValidation {

private static final Map<String, ConfigOption<?>> PAIMON_CONFIGS = extractPaimonConfigs();

public static void validatePaimonSchemaCompatible(
Identifier tablePath, Schema existingSchema, Schema newSchema) {
public static boolean isPaimonSchemaCompatible(Schema existingSchema, Schema newSchema) {
// Adjust options for comparison
Map<String, String> existingOptions = existingSchema.options();
Map<String, String> newOptions = newSchema.options();
Expand All @@ -66,21 +65,16 @@ public static void validatePaimonSchemaCompatible(
// ignore the existing options that are not in new options
existingOptions.entrySet().removeIf(entry -> !newOptions.containsKey(entry.getKey()));

if (!existingSchema.equals(newSchema)) {
// ignore the fields because newSchema is referred by fluss schema, whose field id maybe not
// same as existingSchema.
if (!equalIgnoreFieldId(existingSchema, newSchema)) {
// Allow different precisions for __timestamp column for backward compatibility,
// old cluster will use precision 6, but new cluster will use precision 3,
// we allow such precision difference
if (equalIgnoreSystemColumnTimestampPrecision(existingSchema, newSchema)) {
return;
}

throw new TableAlreadyExistException(
String.format(
"The table %s already exists in Paimon catalog, but the table schema is not compatible. "
+ "Existing schema: %s, new schema: %s. "
+ "Please first drop the table in Paimon catalog or use a new table name.",
tablePath.getEscapedFullName(), existingSchema, newSchema));
return equalIgnoreSystemColumnTimestampPrecision(existingSchema, newSchema);
}

return true;
}

/**
Expand All @@ -96,7 +90,7 @@ public static void validatePaimonSchemaCompatible(
* @param newSchema the new schema descriptor generated by the current Fluss cluster
* @return true if the schemas are identical, disregarding the precision of the system timestamp
*/
private static boolean equalIgnoreSystemColumnTimestampPrecision(
public static boolean equalIgnoreSystemColumnTimestampPrecision(
Schema existingSchema, Schema newSchema) {
List<DataField> existingFields = new ArrayList<>(existingSchema.fields());
DataField systemTimestampField = existingFields.get(existingFields.size() - 1);
Expand All @@ -116,6 +110,23 @@ private static boolean equalIgnoreSystemColumnTimestampPrecision(
return existingSchema.equals(newSchema);
}

private static boolean equalIgnoreFieldId(Schema existingSchema, Schema newSchema) {
List<DataField> existingFields = existingSchema.fields();
List<DataField> newFields = newSchema.fields();
for (int i = 0; i < existingFields.size(); i++) {
DataField existingField = existingFields.get(i);
DataField newField = newFields.get(i);
if (!existingField.equalsIgnoreFieldId(newField)) {
return false;
}
}

return Objects.equals(existingSchema.partitionKeys(), newSchema.partitionKeys())
&& Objects.equals(existingSchema.primaryKeys(), newSchema.primaryKeys())
&& Objects.equals(existingSchema.options(), newSchema.options())
&& Objects.equals(existingSchema.comment(), newSchema.comment());
}

private static void removeChangeableOptions(Map<String, String> options) {
options.entrySet()
.removeIf(
Expand Down
Loading
Loading