Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 118 additions & 68 deletions src/binder/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::expression::function::scala::{ArcScalarFunctionImpl, ScalarFunction};
use crate::expression::function::table::{ArcTableFunctionImpl, TableFunction};
use crate::expression::function::FunctionSummary;
use crate::expression::{AliasType, ScalarExpression};
use crate::planner::operator::mark_apply::MarkApplyQuantifier;
use crate::planner::operator::scalar_subquery::ScalarSubqueryOperator;
use crate::planner::{LogicalPlan, SchemaOutput};
use crate::storage::Transaction;
Expand Down Expand Up @@ -245,7 +246,7 @@ impl<'a, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '_, T
})
}
Expr::Exists { subquery, negated } => {
let (sub_query, _column, correlated) = self.bind_subquery(None, subquery)?;
let (sub_query, correlated) = self.bind_subquery(subquery)?;
let (_, marker_ref) = self
.bind_temp_table_alias(ScalarExpression::Constant(DataValue::Boolean(true)), 0);
self.context.sub_query(SubQueryType::ExistsSubQuery {
Expand All @@ -265,7 +266,8 @@ impl<'a, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '_, T
}
}
Expr::Subquery(subquery) => {
let (sub_query, column, correlated) = self.bind_subquery(None, subquery)?;
let (sub_query, column, correlated) =
self.bind_subquery_with_output(None, subquery)?;
let sub_query = ScalarSubqueryOperator::build(sub_query);
let (expr, sub_query) = if !self.context.is_step(&QueryBindStep::Where) {
self.bind_temp_table(column, sub_query)?
Expand All @@ -282,46 +284,13 @@ impl<'a, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '_, T
expr,
subquery,
negated,
} => {
let left_expr = self.bind_expr(expr)?;
let (sub_query, column, correlated) =
self.bind_subquery(Some(left_expr.return_type().as_ref()), subquery)?;

if !self.context.is_step(&QueryBindStep::Where) {
return Err(DatabaseError::UnsupportedStmt(
"'IN (SUBQUERY)' can only appear in `WHERE`".to_string(),
));
}

let (alias_expr, sub_query) = self.bind_temp_table(column, sub_query)?;
let predicate = ScalarExpression::Binary {
op: expression::BinaryOperator::Eq,
left_expr: Box::new(left_expr),
right_expr: Box::new(alias_expr),
evaluator: None,
ty: LogicalType::Boolean,
};
let (_, marker_ref) = self
.bind_temp_table_alias(ScalarExpression::Constant(DataValue::Boolean(true)), 0);
self.context.sub_query(SubQueryType::InSubQuery {
negated: *negated,
plan: sub_query,
correlated,
output_column: marker_ref.output_column(),
predicate,
});

if *negated {
Ok(ScalarExpression::Unary {
op: expression::UnaryOperator::Not,
expr: Box::new(marker_ref),
evaluator: None,
ty: LogicalType::Boolean,
})
} else {
Ok(marker_ref)
}
}
} => self.bind_quantified_subquery(
MarkApplyQuantifier::Any,
*negated,
expr,
&BinaryOperator::Eq,
subquery,
),
Expr::Tuple(exprs) => {
let mut bond_exprs = Vec::with_capacity(exprs.len());

Expand Down Expand Up @@ -377,10 +346,86 @@ impl<'a, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '_, T
ty,
})
}
Expr::AnyOp {
left,
compare_op,
right,
..
} => self.bind_quantified_op(MarkApplyQuantifier::Any, left, compare_op, right),
Expr::AllOp {
left,
compare_op,
right,
} => self.bind_quantified_op(MarkApplyQuantifier::All, left, compare_op, right),
expr => Err(DatabaseError::UnsupportedStmt(expr.to_string())),
}
}

fn bind_quantified_op(
&mut self,
quantifier: MarkApplyQuantifier,
left: &Expr,
compare_op: &BinaryOperator,
right: &Expr,
) -> Result<ScalarExpression, DatabaseError> {
let Expr::Subquery(subquery) = right else {
return Err(DatabaseError::UnsupportedStmt(format!(
"{quantifier:?} only supports subquery operands"
)));
};

self.bind_quantified_subquery(quantifier, false, left, compare_op, subquery)
}

fn bind_quantified_subquery(
&mut self,
quantifier: MarkApplyQuantifier,
negated: bool,
expr: &Expr,
compare_op: &BinaryOperator,
subquery: &Query,
) -> Result<ScalarExpression, DatabaseError> {
let left_expr = self.bind_expr(expr)?;
let (sub_query, column, correlated) =
self.bind_subquery_with_output(Some(left_expr.return_type().as_ref()), subquery)?;

if !self.context.is_step(&QueryBindStep::Where) {
return Err(DatabaseError::UnsupportedStmt(
"quantified subqueries can only appear in `WHERE`".to_string(),
));
}

let (alias_expr, sub_query) = self.bind_temp_table(column, sub_query)?;
let predicate = ScalarExpression::Binary {
op: (*compare_op).clone().try_into()?,
left_expr: Box::new(left_expr),
right_expr: Box::new(alias_expr),
evaluator: None,
ty: LogicalType::Boolean,
};
let (_, marker_ref) =
self.bind_temp_table_alias(ScalarExpression::Constant(DataValue::Boolean(true)), 0);
self.context.sub_query(SubQueryType::QuantifiedSubQuery {
quantifier,
negated,
plan: sub_query,
correlated,
output_column: marker_ref.output_column(),
predicate,
});

if negated {
Ok(ScalarExpression::Unary {
op: expression::UnaryOperator::Not,
expr: Box::new(marker_ref),
evaluator: None,
ty: LogicalType::Boolean,
})
} else {
Ok(marker_ref)
}
}

fn bind_temp_table(
&mut self,
expr: ScalarExpression,
Expand Down Expand Up @@ -425,34 +470,12 @@ impl<'a, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '_, T
)
}

fn bind_subquery(
fn bind_subquery_with_output(
&mut self,
in_ty: Option<&LogicalType>,
value_ty: Option<&LogicalType>,
subquery: &Query,
) -> Result<(LogicalPlan, ScalarExpression, bool), DatabaseError> {
let BinderContext {
table_cache,
view_cache,
transaction,
scala_functions,
table_functions,
temp_table_id,
..
} = &self.context;
let mut binder = Binder::new(
BinderContext::new(
table_cache,
view_cache,
*transaction,
scala_functions,
table_functions,
temp_table_id.clone(),
),
self.args,
Some(self),
);
let mut sub_query = binder.bind_query(subquery)?;
let correlated = binder.context.has_outer_refs();
let (mut sub_query, correlated) = self.bind_subquery(subquery)?;
let sub_query_schema = sub_query.output_schema();

let fn_check = |len: usize| {
Expand All @@ -465,7 +488,7 @@ impl<'a, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '_, T
Ok(())
};

let expr = if let Some(LogicalType::Tuple(tys)) = in_ty {
let expr = if let Some(LogicalType::Tuple(tys)) = value_ty {
fn_check(tys.len())?;

let columns = sub_query_schema
Expand All @@ -482,6 +505,33 @@ impl<'a, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '_, T
Ok((sub_query, expr, correlated))
}

fn bind_subquery(&mut self, subquery: &Query) -> Result<(LogicalPlan, bool), DatabaseError> {
let BinderContext {
table_cache,
view_cache,
transaction,
scala_functions,
table_functions,
temp_table_id,
..
} = &self.context;
let mut binder = Binder::new(
BinderContext::new(
table_cache,
view_cache,
*transaction,
scala_functions,
table_functions,
temp_table_id.clone(),
),
self.args,
Some(self),
);
let sub_query = binder.bind_query(subquery)?;
let correlated = binder.context.has_outer_refs();
Ok((sub_query, correlated))
}

pub fn bind_like(
&mut self,
negated: bool,
Expand Down
4 changes: 3 additions & 1 deletion src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use crate::db::{ScalaFunctions, TableFunctions};
use crate::errors::{DatabaseError, SqlErrorSpan};
use crate::expression::ScalarExpression;
use crate::planner::operator::join::JoinType;
use crate::planner::operator::mark_apply::MarkApplyQuantifier;
use crate::planner::{LogicalPlan, SchemaOutput};
use crate::storage::{TableCache, Transaction, ViewCache};
use crate::types::tuple::SchemaRef;
Expand Down Expand Up @@ -156,7 +157,8 @@ pub enum SubQueryType {
correlated: bool,
output_column: ColumnRef,
},
InSubQuery {
QuantifiedSubQuery {
quantifier: MarkApplyQuantifier,
negated: bool,
plan: LogicalPlan,
correlated: bool,
Expand Down
Loading
Loading