From cdc73026eea8157b1c200f6023d09d27787696f8 Mon Sep 17 00:00:00 2001 From: Sergey Nuyanzin Date: Thu, 22 Jan 2026 23:19:47 +0100 Subject: [PATCH] [FLINK-38964][table] Reuse Calcite's `SqlValidatorImpl#maybeCast` --- .../sql/validate/SqlValidatorImpl.java | 2 +- .../calcite/FlinkCalciteSqlValidator.java | 5 +++ .../operations/SqlNodeConvertContext.java | 4 +- .../converters/MergeTableAsUtil.java | 9 ++--- .../converters/SqlNodeConverter.java | 3 +- .../planner/calcite/PreValidateReWriter.scala | 20 +++------- .../planner/calcite/SqlRewriterUtils.scala | 39 +------------------ 7 files changed, 21 insertions(+), 61 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java index 57dc5e740f1df..6635637888aed 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java @@ -776,7 +776,7 @@ private static int calculatePermuteOffset(List selectItems) { return 0; } - private SqlNode maybeCast(SqlNode node, RelDataType currentType, RelDataType desiredType) { + protected SqlNode maybeCast(SqlNode node, RelDataType currentType, RelDataType desiredType) { return SqlTypeUtil.equalSansNullability(typeFactory, currentType, desiredType) ? node : SqlStdOperatorTable.CAST.createCall( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java index 648c951ebef97..a65cb882a4325 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java @@ -431,6 +431,11 @@ protected void addToSelectList( return rewritten; } + @Override + public SqlNode maybeCast(SqlNode node, RelDataType currentType, RelDataType desiredType) { + return super.maybeCast(node, currentType, desiredType); + } + // -------------------------------------------------------------------------------------------- // Column expansion // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java index 687a8ce483ae0..05364388fba41 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeConvertContext.java @@ -20,6 +20,7 @@ import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator; import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; import org.apache.flink.table.planner.calcite.SqlToRexConverter; import org.apache.flink.table.planner.operations.converters.SqlNodeConverter; @@ -36,7 +37,6 @@ import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.dialect.AnsiSqlDialect; import org.apache.calcite.sql.parser.SqlParser; -import org.apache.calcite.sql.validate.SqlValidator; import javax.annotation.Nullable; @@ -63,7 +63,7 @@ public TableConfig getTableConfig() { } @Override - public SqlValidator getSqlValidator() { + public FlinkCalciteSqlValidator getSqlValidator() { return flinkPlanner.getOrCreateSqlValidator(); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/MergeTableAsUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/MergeTableAsUtil.java index 3264e3c392a23..2b26585d6f411 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/MergeTableAsUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/MergeTableAsUtil.java @@ -67,12 +67,12 @@ /** A utility class with logic for handling the {@code CREATE TABLE ... AS SELECT} clause. */ public class MergeTableAsUtil { - private final SqlValidator validator; + private final FlinkCalciteSqlValidator validator; private final Function escapeExpression; private final DataTypeFactory dataTypeFactory; public MergeTableAsUtil( - SqlValidator validator, + FlinkCalciteSqlValidator validator, Function escapeExpression, DataTypeFactory dataTypeFactory) { this.validator = validator; @@ -135,11 +135,10 @@ public PlannerQueryOperation maybeRewriteQuery( assignedFields.put( pos, - rewriterUtils.maybeCast( + validator.maybeCast( SqlLiteral.createNull(SqlParserPos.ZERO), typeFactory.createUnknownType(), - typeFactory.createFieldTypeFromLogicalType(targetField.getType()), - typeFactory)); + typeFactory.createFieldTypeFromLogicalType(targetField.getType()))); } else { targetPositions.add(sourceFields.get(targetField.getName())); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java index 2c54d78c51648..23f8f1716b220 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverter.java @@ -22,6 +22,7 @@ import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator; import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; import org.apache.flink.table.planner.utils.Expander; import org.apache.flink.table.types.DataType; @@ -79,7 +80,7 @@ interface ConvertContext { TableConfig getTableConfig(); /** Returns the {@link SqlValidator} in the convert context. */ - SqlValidator getSqlValidator(); + FlinkCalciteSqlValidator getSqlValidator(); /** Returns the {@link CatalogManager} in the convert context. */ CatalogManager getCatalogManager(); diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala index 8be16db924f01..b6e61385d36c4 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala @@ -17,30 +17,24 @@ */ package org.apache.flink.table.planner.calcite -import org.apache.flink.sql.parser.`type`.SqlMapTypeNameSpec import org.apache.flink.sql.parser.SqlProperty import org.apache.flink.sql.parser.dml.RichSqlInsert import org.apache.flink.sql.parser.dql.SqlRichExplain import org.apache.flink.table.api.ValidationException import org.apache.flink.table.planner.calcite.PreValidateReWriter.{appendPartitionAndNullsProjects, notSupported} -import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable import org.apache.flink.table.planner.plan.schema.{CatalogSourceTable, FlinkPreparingTableBase, LegacyCatalogSourceTable} -import org.apache.flink.util.Preconditions.checkArgument import org.apache.calcite.plan.RelOptTable import org.apache.calcite.prepare.CalciteCatalogReader import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory, RelDataTypeField} import org.apache.calcite.runtime.{CalciteContextException, Resources} -import org.apache.calcite.sql.`type`.SqlTypeUtil -import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlIdentifier, SqlKind, SqlLiteral, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlTableRef, SqlUtil} -import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.{SqlCall, SqlIdentifier, SqlLiteral, SqlNode, SqlNodeList, SqlTableRef, SqlUtil} import org.apache.calcite.sql.parser.SqlParserPos import org.apache.calcite.sql.util.SqlBasicVisitor import org.apache.calcite.sql.validate.{SqlValidatorException, SqlValidatorTable, SqlValidatorUtil} import org.apache.calcite.util.Static.RESOURCE import java.util -import java.util.Collections import scala.collection.JavaConversions._ @@ -153,11 +147,7 @@ object PreValidateReWriter { val value = sqlProperty.getValue.asInstanceOf[SqlLiteral] assignedFields.put( targetField.getIndex, - rewriterUtils.maybeCast( - value, - value.createSqlType(typeFactory), - targetField.getType, - typeFactory)) + validator.maybeCast(value, value.createSqlType(typeFactory), targetField.getType)) } // validate partial insert columns. @@ -205,11 +195,11 @@ object PreValidateReWriter { validateField(idx => !assignedFields.contains(idx), id, targetField) assignedFields.put( targetField.getIndex, - rewriterUtils.maybeCast( + validator.maybeCast( SqlLiteral.createNull(SqlParserPos.ZERO), typeFactory.createUnknownType(), - targetField.getType, - typeFactory) + targetField.getType + ) ) } else { // handle reorder diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala index 9bcc9361f1479..7326268e75e86 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala @@ -17,18 +17,15 @@ */ package org.apache.flink.table.planner.calcite -import org.apache.flink.sql.parser.`type`.SqlMapTypeNameSpec import org.apache.flink.table.api.ValidationException import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.ExplicitTableSqlSelect import org.apache.flink.table.planner.calcite.SqlRewriterUtils.{rewriteSqlCall, rewriteSqlSelect, rewriteSqlValues, rewriteSqlWith} import org.apache.flink.util.Preconditions.checkArgument -import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.runtime.{CalciteContextException, Resources} -import org.apache.calcite.sql.`type`.SqlTypeUtil -import org.apache.calcite.sql.{SqlBasicCall, SqlCall, SqlDataTypeSpec, SqlIdentifier, SqlKind, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil, SqlWith} +import org.apache.calcite.sql.{SqlCall, SqlIdentifier, SqlKind, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil, SqlWith} import org.apache.calcite.sql.fun.SqlStdOperatorTable -import org.apache.calcite.sql.parser.SqlParserPos import org.apache.calcite.sql.validate.SqlValidatorException import org.apache.calcite.util.Static.RESOURCE @@ -79,38 +76,6 @@ class SqlRewriterUtils(validator: FlinkCalciteSqlValidator) { targetPosition, unsupportedErrorMessage) } - - // This code snippet is copied from the SqlValidatorImpl. - def maybeCast( - node: SqlNode, - currentType: RelDataType, - desiredType: RelDataType, - typeFactory: RelDataTypeFactory): SqlNode = { - if ( - currentType == desiredType - || (currentType.isNullable != desiredType.isNullable - && typeFactory.createTypeWithNullability(currentType, desiredType.isNullable) - == desiredType) - ) { - node - } else { - // See FLINK-26460 for more details - val sqlDataTypeSpec = - if (SqlTypeUtil.isNull(currentType) && SqlTypeUtil.isMap(desiredType)) { - val keyType = desiredType.getKeyType - val valueType = desiredType.getValueType - new SqlDataTypeSpec( - new SqlMapTypeNameSpec( - SqlTypeUtil.convertTypeToSpec(keyType).withNullable(keyType.isNullable), - SqlTypeUtil.convertTypeToSpec(valueType).withNullable(valueType.isNullable), - SqlParserPos.ZERO), - SqlParserPos.ZERO) - } else { - SqlTypeUtil.convertTypeToSpec(desiredType) - } - SqlStdOperatorTable.CAST.createCall(SqlParserPos.ZERO, node, sqlDataTypeSpec) - } - } } object SqlRewriterUtils {