Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ private static int calculatePermuteOffset(List<SqlNode> selectItems) {
return 0;
}

private SqlNode maybeCast(SqlNode node, RelDataType currentType, RelDataType desiredType) {
protected SqlNode maybeCast(SqlNode node, RelDataType currentType, RelDataType desiredType) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change has already been committed to Calcite and will come with future updates
apache/calcite@f6271b7

return SqlTypeUtil.equalSansNullability(typeFactory, currentType, desiredType)
? node
: SqlStdOperatorTable.CAST.createCall(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,11 @@ protected void addToSelectList(
return rewritten;
}

@Override
public SqlNode maybeCast(SqlNode node, RelDataType currentType, RelDataType desiredType) {
return super.maybeCast(node, currentType, desiredType);
}

// --------------------------------------------------------------------------------------------
// Column expansion
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.calcite.SqlToRexConverter;
import org.apache.flink.table.planner.operations.converters.SqlNodeConverter;
Expand All @@ -36,7 +37,6 @@
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.dialect.AnsiSqlDialect;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.validate.SqlValidator;

import javax.annotation.Nullable;

Expand All @@ -63,7 +63,7 @@ public TableConfig getTableConfig() {
}

@Override
public SqlValidator getSqlValidator() {
public FlinkCalciteSqlValidator getSqlValidator() {
return flinkPlanner.getOrCreateSqlValidator();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@

/** A utility class with logic for handling the {@code CREATE TABLE ... AS SELECT} clause. */
public class MergeTableAsUtil {
private final SqlValidator validator;
private final FlinkCalciteSqlValidator validator;
private final Function<SqlNode, String> escapeExpression;
private final DataTypeFactory dataTypeFactory;

public MergeTableAsUtil(
SqlValidator validator,
FlinkCalciteSqlValidator validator,
Function<SqlNode, String> escapeExpression,
DataTypeFactory dataTypeFactory) {
this.validator = validator;
Expand Down Expand Up @@ -135,11 +135,10 @@ public PlannerQueryOperation maybeRewriteQuery(

assignedFields.put(
pos,
rewriterUtils.maybeCast(
validator.maybeCast(
SqlLiteral.createNull(SqlParserPos.ZERO),
typeFactory.createUnknownType(),
typeFactory.createFieldTypeFromLogicalType(targetField.getType()),
typeFactory));
typeFactory.createFieldTypeFromLogicalType(targetField.getType())));
} else {
targetPositions.add(sourceFields.get(targetField.getName()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.utils.Expander;
import org.apache.flink.table.types.DataType;
Expand Down Expand Up @@ -79,7 +80,7 @@ interface ConvertContext {
TableConfig getTableConfig();

/** Returns the {@link SqlValidator} in the convert context. */
SqlValidator getSqlValidator();
FlinkCalciteSqlValidator getSqlValidator();

/** Returns the {@link CatalogManager} in the convert context. */
CatalogManager getCatalogManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,24 @@
*/
package org.apache.flink.table.planner.calcite

import org.apache.flink.sql.parser.`type`.SqlMapTypeNameSpec
import org.apache.flink.sql.parser.SqlProperty
import org.apache.flink.sql.parser.dml.RichSqlInsert
import org.apache.flink.sql.parser.dql.SqlRichExplain
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.planner.calcite.PreValidateReWriter.{appendPartitionAndNullsProjects, notSupported}
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
import org.apache.flink.table.planner.plan.schema.{CatalogSourceTable, FlinkPreparingTableBase, LegacyCatalogSourceTable}
import org.apache.flink.util.Preconditions.checkArgument

import org.apache.calcite.plan.RelOptTable
import org.apache.calcite.prepare.CalciteCatalogReader
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory, RelDataTypeField}
import org.apache.calcite.runtime.{CalciteContextException, Resources}
import org.apache.calcite.sql.`type`.SqlTypeUtil
import org.apache.calcite.sql.{SqlCall, SqlDataTypeSpec, SqlIdentifier, SqlKind, SqlLiteral, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlTableRef, SqlUtil}
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.sql.{SqlCall, SqlIdentifier, SqlLiteral, SqlNode, SqlNodeList, SqlTableRef, SqlUtil}
import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.calcite.sql.util.SqlBasicVisitor
import org.apache.calcite.sql.validate.{SqlValidatorException, SqlValidatorTable, SqlValidatorUtil}
import org.apache.calcite.util.Static.RESOURCE

import java.util
import java.util.Collections

import scala.collection.JavaConversions._

Expand Down Expand Up @@ -153,11 +147,7 @@ object PreValidateReWriter {
val value = sqlProperty.getValue.asInstanceOf[SqlLiteral]
assignedFields.put(
targetField.getIndex,
rewriterUtils.maybeCast(
value,
value.createSqlType(typeFactory),
targetField.getType,
typeFactory))
validator.maybeCast(value, value.createSqlType(typeFactory), targetField.getType))
}

// validate partial insert columns.
Expand Down Expand Up @@ -205,11 +195,11 @@ object PreValidateReWriter {
validateField(idx => !assignedFields.contains(idx), id, targetField)
assignedFields.put(
targetField.getIndex,
rewriterUtils.maybeCast(
validator.maybeCast(
SqlLiteral.createNull(SqlParserPos.ZERO),
typeFactory.createUnknownType(),
targetField.getType,
typeFactory)
targetField.getType
)
)
} else {
// handle reorder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,15 @@
*/
package org.apache.flink.table.planner.calcite

import org.apache.flink.sql.parser.`type`.SqlMapTypeNameSpec
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.ExplicitTableSqlSelect
import org.apache.flink.table.planner.calcite.SqlRewriterUtils.{rewriteSqlCall, rewriteSqlSelect, rewriteSqlValues, rewriteSqlWith}
import org.apache.flink.util.Preconditions.checkArgument

import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.runtime.{CalciteContextException, Resources}
import org.apache.calcite.sql.`type`.SqlTypeUtil
import org.apache.calcite.sql.{SqlBasicCall, SqlCall, SqlDataTypeSpec, SqlIdentifier, SqlKind, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil, SqlWith}
import org.apache.calcite.sql.{SqlCall, SqlIdentifier, SqlKind, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil, SqlWith}
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.calcite.sql.validate.SqlValidatorException
import org.apache.calcite.util.Static.RESOURCE

Expand Down Expand Up @@ -79,38 +76,6 @@ class SqlRewriterUtils(validator: FlinkCalciteSqlValidator) {
targetPosition,
unsupportedErrorMessage)
}

// This code snippet is copied from the SqlValidatorImpl.
def maybeCast(
Comment on lines -83 to -84
Copy link
Contributor Author

@snuyanzin snuyanzin Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of having old copy reuse the actual version of that method

node: SqlNode,
currentType: RelDataType,
desiredType: RelDataType,
typeFactory: RelDataTypeFactory): SqlNode = {
if (
currentType == desiredType
|| (currentType.isNullable != desiredType.isNullable
&& typeFactory.createTypeWithNullability(currentType, desiredType.isNullable)
== desiredType)
) {
node
} else {
// See FLINK-26460 for more details
val sqlDataTypeSpec =
if (SqlTypeUtil.isNull(currentType) && SqlTypeUtil.isMap(desiredType)) {
val keyType = desiredType.getKeyType
val valueType = desiredType.getValueType
new SqlDataTypeSpec(
new SqlMapTypeNameSpec(
SqlTypeUtil.convertTypeToSpec(keyType).withNullable(keyType.isNullable),
SqlTypeUtil.convertTypeToSpec(valueType).withNullable(valueType.isNullable),
SqlParserPos.ZERO),
SqlParserPos.ZERO)
} else {
SqlTypeUtil.convertTypeToSpec(desiredType)
}
SqlStdOperatorTable.CAST.createCall(SqlParserPos.ZERO, node, sqlDataTypeSpec)
}
}
}

object SqlRewriterUtils {
Expand Down