From 985fb57067ced1f262bdc8e2fa7e76867e3004bd Mon Sep 17 00:00:00 2001 From: yongliu Date: Mon, 19 Jan 2026 11:12:17 +0800 Subject: [PATCH] [FLINK-36986][table] Migrate SplitRemoteConditionFromJoinRule to java --- .../rules/logical/AsyncCalcSplitRule.java | 9 +- .../SplitPythonConditionFromJoinRule.java | 30 ++++ .../SplitRemoteConditionFromJoinRule.java | 169 ++++++++++++++++++ .../SplitPythonConditionFromJoinRule.scala | 25 --- .../SplitRemoteConditionFromJoinRule.scala | 102 ----------- 5 files changed, 204 insertions(+), 131 deletions(-) create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SplitPythonConditionFromJoinRule.java create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SplitRemoteConditionFromJoinRule.java delete mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitPythonConditionFromJoinRule.scala delete mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitRemoteConditionFromJoinRule.scala diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRule.java index 82ac5519affee..23c6bb7e907de 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRule.java @@ -62,11 +62,12 @@ public class AsyncCalcSplitRule { public static final RelOptRule ONE_PER_CALC_SPLIT = new AsyncCalcSplitOnePerCalcRule(ASYNC_CALL_FINDER); public static final RelOptRule NO_ASYNC_JOIN_CONDITIONS = - new SplitRemoteConditionFromJoinRule( - ASYNC_CALL_FINDER, - JavaScalaConversionUtil.toScala( + SplitRemoteConditionFromJoinRule.SplitRemoteConditionFromJoinRuleConfig.DEFAULT + .withRemoteCallFinder(ASYNC_CALL_FINDER) + .withErrorOnUnsplittableRemoteCall( Optional.of( - "AsyncScalarFunction not supported for non inner join condition"))); + "AsyncScalarFunction not supported for non inner join condition")) + .toRule(); private static boolean hasNestedCalls(List projects) { return projects.stream() diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SplitPythonConditionFromJoinRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SplitPythonConditionFromJoinRule.java new file mode 100644 index 0000000000000..d6c23c2b28786 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SplitPythonConditionFromJoinRule.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.calcite.plan.RelOptRule; + +public class SplitPythonConditionFromJoinRule { + + private static final RemoteCallFinder callFinder = new PythonRemoteCallFinder(); + + public static final RelOptRule INSTANCE = + SplitRemoteConditionFromJoinRule.SplitRemoteConditionFromJoinRuleConfig.DEFAULT + .withRemoteCallFinder(callFinder) + .toRule(); +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SplitRemoteConditionFromJoinRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SplitRemoteConditionFromJoinRule.java new file mode 100644 index 0000000000000..c8524118b6fd4 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SplitRemoteConditionFromJoinRule.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.rex.RexProgramBuilder; +import org.apache.calcite.rex.RexUtil; +import org.immutables.value.Value; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * Rule will split the {@link FlinkLogicalJoin} which contains Remote Functions in join condition + * into a {@link FlinkLogicalJoin} and a {@link FlinkLogicalCalc} with Remote Functions. Currently, + * only inner join is supported. + * + *

After this rule is applied, there will be no Remote Functions in the condition of the {@link + * FlinkLogicalJoin}. + */ +@Value.Enclosing +public class SplitRemoteConditionFromJoinRule + extends RelRule { + + protected SplitRemoteConditionFromJoinRule( + SplitRemoteConditionFromJoinRule.SplitRemoteConditionFromJoinRuleConfig config) { + super(config); + } + + @Override + public boolean matches(RelOptRuleCall call) { + FlinkLogicalJoin join = call.rel(0); + JoinRelType joinType = join.getJoinType(); + // matches if it is inner join and it contains Remote functions in condition + if (join.getCondition() != null + && config.remoteCallFinder().containsRemoteCall(join.getCondition())) { + if (joinType == JoinRelType.INNER) { + return true; + } else if (config.errorOnUnsplittableRemoteCall().isPresent()) { + throw new TableException(config.errorOnUnsplittableRemoteCall().get()); + } + } + return false; + } + + public void onMatch(RelOptRuleCall call) { + FlinkLogicalJoin join = call.rel(0); + RexBuilder rexBuilder = join.getCluster().getRexBuilder(); + + List joinFilters = RelOptUtil.conjunctions(join.getCondition()); + List remoteFilters = + joinFilters.stream() + .filter(config.remoteCallFinder()::containsRemoteCall) + .collect(Collectors.toList()); + List remainingFilters = + joinFilters.stream() + .filter(f -> !config.remoteCallFinder().containsRemoteCall(f)) + .collect(Collectors.toList()); + + RexNode newJoinCondition = RexUtil.composeConjunction(rexBuilder, remainingFilters); + FlinkLogicalJoin bottomJoin = + new FlinkLogicalJoin( + join.getCluster(), + join.getTraitSet(), + join.getLeft(), + join.getRight(), + newJoinCondition, + join.getHints(), + join.getJoinType()); + + RexProgramBuilder rexProgramBuilder = + new RexProgramBuilder(bottomJoin.getRowType(), rexBuilder); + RexProgram rexProgram = rexProgramBuilder.getProgram(); + RexNode topCalcCondition = RexUtil.composeConjunction(rexBuilder, remoteFilters); + + FlinkLogicalCalc topCalc = + new FlinkLogicalCalc( + join.getCluster(), + join.getTraitSet(), + bottomJoin, + RexProgram.create( + bottomJoin.getRowType(), + rexProgram.getExprList(), + topCalcCondition, + bottomJoin.getRowType(), + rexBuilder)); + + call.transformTo(topCalc); + } + + // Consider the rules to be equal if they are the same class and their call finders are the same + // class. + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof SplitRemoteConditionFromJoinRule)) { + return false; + } + + SplitRemoteConditionFromJoinRule rule = (SplitRemoteConditionFromJoinRule) obj; + return super.equals(rule) + && config.remoteCallFinder() + .getClass() + .equals(rule.config.remoteCallFinder().getClass()) + && config.errorOnUnsplittableRemoteCall() + .equals(rule.config.errorOnUnsplittableRemoteCall()); + } + + /** Rule configuration. */ + @Value.Immutable(singleton = false) + public interface SplitRemoteConditionFromJoinRuleConfig extends RelRule.Config { + SplitRemoteConditionFromJoinRule.SplitRemoteConditionFromJoinRuleConfig DEFAULT = + ImmutableSplitRemoteConditionFromJoinRule.SplitRemoteConditionFromJoinRuleConfig + .builder() + .operandSupplier(b0 -> b0.operand(FlinkLogicalJoin.class).anyInputs()) + .description("SplitRemoteConditionFromJoinRule") + .build(); + + @Value.Default + default RemoteCallFinder remoteCallFinder() { + return new PythonRemoteCallFinder(); + } + + /** Sets {@link #remoteCallFinder()}. */ + SplitRemoteConditionFromJoinRuleConfig withRemoteCallFinder(RemoteCallFinder callFinder); + + @Value.Default + default Optional errorOnUnsplittableRemoteCall() { + return Optional.empty(); + } + + /** Sets {@link #errorOnUnsplittableRemoteCall()}. */ + SplitRemoteConditionFromJoinRuleConfig withErrorOnUnsplittableRemoteCall( + Optional errorOnUnsplittableRemoteCall); + + @Override + default SplitRemoteConditionFromJoinRule toRule() { + return new SplitRemoteConditionFromJoinRule(this); + } + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitPythonConditionFromJoinRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitPythonConditionFromJoinRule.scala deleted file mode 100644 index 10ced294caa82..0000000000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitPythonConditionFromJoinRule.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.planner.plan.rules.logical - -import org.apache.calcite.plan.RelOptRule - -object SplitPythonConditionFromJoinRule { - private val callFinder = new PythonRemoteCallFinder() - val INSTANCE: RelOptRule = new SplitRemoteConditionFromJoinRule(callFinder, Option.empty) -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitRemoteConditionFromJoinRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitRemoteConditionFromJoinRule.scala deleted file mode 100644 index 0a77c137b1b78..0000000000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitRemoteConditionFromJoinRule.scala +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.planner.plan.rules.logical - -import org.apache.flink.table.api.TableException -import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalJoin} - -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptUtil} -import org.apache.calcite.plan.RelOptRule.{none, operand} -import org.apache.calcite.rel.core.JoinRelType -import org.apache.calcite.rex.{RexProgram, RexProgramBuilder, RexUtil} - -import scala.collection.JavaConversions._ - -/** - * Rule will splits the [[FlinkLogicalJoin]] which contains Remote Functions in join condition into - * a [[FlinkLogicalJoin]] and a [[FlinkLogicalCalc]] with Remote Functions. Currently, only inner - * join is supported. - * - * After this rule is applied, there will be no Remote Functions in the condition of the - * [[FlinkLogicalJoin]]. - */ -class SplitRemoteConditionFromJoinRule( - protected val callFinder: RemoteCallFinder, - protected val errorOnUnsplittableRemoteCall: Option[String]) - extends RelOptRule(operand(classOf[FlinkLogicalJoin], none), "SplitRemoteConditionFromJoinRule") { - - override def matches(call: RelOptRuleCall): Boolean = { - val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin] - val joinType: JoinRelType = join.getJoinType - // matches if it is inner join and it contains Remote functions in condition - if (Option(join.getCondition).exists(callFinder.containsRemoteCall)) { - if (joinType == JoinRelType.INNER) { - return true - } else if (errorOnUnsplittableRemoteCall.nonEmpty) { - throw new TableException(errorOnUnsplittableRemoteCall.get) - } - } - false - } - - override def onMatch(call: RelOptRuleCall): Unit = { - val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin] - val rexBuilder = join.getCluster.getRexBuilder - - val joinFilters = RelOptUtil.conjunctions(join.getCondition) - val remoteFilters = joinFilters.filter(callFinder.containsRemoteCall) - val remainingFilters = joinFilters.filter(!callFinder.containsRemoteCall(_)) - - val newJoinCondition = RexUtil.composeConjunction(rexBuilder, remainingFilters) - val bottomJoin = new FlinkLogicalJoin( - join.getCluster, - join.getTraitSet, - join.getLeft, - join.getRight, - newJoinCondition, - join.getHints, - join.getJoinType) - - val rexProgram = new RexProgramBuilder(bottomJoin.getRowType, rexBuilder).getProgram - val topCalcCondition = RexUtil.composeConjunction(rexBuilder, remoteFilters) - - val topCalc = new FlinkLogicalCalc( - join.getCluster, - join.getTraitSet, - bottomJoin, - RexProgram.create( - bottomJoin.getRowType, - rexProgram.getExprList, - topCalcCondition, - bottomJoin.getRowType, - rexBuilder)) - - call.transformTo(topCalc) - } - - // Consider the rules to be equal if they are the same class and their call finders are the same - // class. - override def equals(obj: Any): Boolean = { - obj match { - case rule: SplitRemoteConditionFromJoinRule => - super.equals(rule) && callFinder.getClass.equals(rule.callFinder.getClass) && - errorOnUnsplittableRemoteCall.equals(rule.errorOnUnsplittableRemoteCall) - case _ => false - } - } -}