Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ If `unsafe_txdb_checkpoint` is not enabled, `build_rocksdb()` returns an explici
- [x] Describe
- [x] Union
- [x] EXCEPT
- [x] INTERSECT

### DML
- [x] Insert
Expand Down
34 changes: 17 additions & 17 deletions src/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ use crate::execution::dql::join::joins_nullable;
use crate::expression::simplify::ConstantCalculator;
use crate::expression::visitor_mut::{walk_mut_expr, PositionShift, VisitorMut};
use crate::expression::{AliasType, BinaryOperator};
use crate::planner::operator::except::ExceptOperator;
use crate::planner::operator::function_scan::FunctionScanOperator;
use crate::planner::operator::insert::InsertOperator;
use crate::planner::operator::join::JoinCondition;
use crate::planner::operator::set_membership::{SetMembershipKind, SetMembershipOperator};
use crate::planner::operator::sort::{SortField, SortOperator};
use crate::planner::operator::union::UnionOperator;
use crate::planner::{Childrens, LogicalPlan, SchemaOutput};
Expand Down Expand Up @@ -756,15 +756,14 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
)?)
}
}
SetOperator::Except => {
if is_all {
Ok(ExceptOperator::build(
left_schema.clone(),
right_schema.clone(),
left_plan,
right_plan,
))
} else {
SetOperator::Except | SetOperator::Intersect => {
let kind = match op {
SetOperator::Except => SetMembershipKind::Except,
SetOperator::Intersect => SetMembershipKind::Intersect,
_ => unreachable!(),
};

if !is_all {
let left_distinct_exprs = left_schema
.iter()
.cloned()
Expand All @@ -782,14 +781,15 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
right_plan = self.bind_distinct(right_plan, right_distinct_exprs)?;
left_schema = left_plan.output_schema();
right_schema = right_plan.output_schema();

Ok(ExceptOperator::build(
left_schema.clone(),
right_schema.clone(),
left_plan,
right_plan,
))
}

Ok(SetMembershipOperator::build(
kind,
left_schema.clone(),
right_schema.clone(),
left_plan,
right_plan,
))
}
set_operator => Err(DatabaseError::UnsupportedStmt(format!(
"set operator: {set_operator:?}"
Expand Down
2 changes: 1 addition & 1 deletion src/execution/dql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
pub(crate) mod aggregate;
pub(crate) mod describe;
pub(crate) mod dummy;
pub(crate) mod except;
pub(crate) mod explain;
pub(crate) mod filter;
pub(crate) mod function_scan;
Expand All @@ -27,6 +26,7 @@ pub(crate) mod projection;
pub(crate) mod scalar_apply;
pub(crate) mod scalar_subquery;
pub(crate) mod seq_scan;
pub(crate) mod set_membership;
pub(crate) mod show_table;
pub(crate) mod show_view;
pub(crate) mod sort;
Expand Down
59 changes: 38 additions & 21 deletions src/execution/dql/except.rs → src/execution/dql/set_membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,39 @@ use crate::errors::DatabaseError;
use crate::execution::{
build_read, ExecArena, ExecId, ExecNode, ExecutionCaches, ExecutorNode, ReadExecutor,
};
use crate::planner::operator::set_membership::SetMembershipKind;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use crate::types::tuple::Tuple;
use ahash::{HashMap, HashMapExt};
pub struct Except {

pub struct SetMembership {
kind: SetMembershipKind,
left_plan: LogicalPlan,
right_plan: LogicalPlan,
left_input: ExecId,
right_input: ExecId,
except_col: HashMap<Tuple, usize>,
right_counts: HashMap<Tuple, usize>,
built: bool,
}

impl From<(LogicalPlan, LogicalPlan)> for Except {
fn from((left_input, right_input): (LogicalPlan, LogicalPlan)) -> Self {
Except {
impl From<(SetMembershipKind, LogicalPlan, LogicalPlan)> for SetMembership {
fn from(
(kind, left_input, right_input): (SetMembershipKind, LogicalPlan, LogicalPlan),
) -> Self {
SetMembership {
kind,
left_plan: left_input,
right_plan: right_input,
left_input: 0,
right_input: 0,
except_col: HashMap::new(),
right_counts: HashMap::new(),
built: false,
}
}
}

impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Except {
impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for SetMembership {
fn into_executor(
mut self,
arena: &mut ExecArena<'a, T>,
Expand All @@ -51,12 +57,12 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Except {
) -> ExecId {
self.left_input = build_read(arena, self.left_plan.take(), cache, transaction);
self.right_input = build_read(arena, self.right_plan.take(), cache, transaction);
arena.push(ExecNode::Except(self))
arena.push(ExecNode::SetMembership(self))
}
}

impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for Except {
type Input = (LogicalPlan, LogicalPlan);
impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for SetMembership {
type Input = (SetMembershipKind, LogicalPlan, LogicalPlan);

fn into_executor(
input: Self::Input,
Expand All @@ -68,19 +74,19 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for Except {
}

fn next_tuple(&mut self, arena: &mut ExecArena<'a, T>) -> Result<(), DatabaseError> {
Except::next_tuple(self, arena)
SetMembership::next_tuple(self, arena)
}
}

impl Except {
impl SetMembership {
pub(crate) fn next_tuple<'a, T: Transaction + 'a>(
&mut self,
arena: &mut ExecArena<'a, T>,
) -> Result<(), DatabaseError> {
if !self.built {
while arena.next_tuple(self.right_input)? {
*self
.except_col
.right_counts
.entry(arena.result_tuple().clone())
.or_insert(0) += 1;
}
Expand All @@ -92,17 +98,28 @@ impl Except {
arena.finish();
return Ok(());
}
let tuple = arena.result_tuple();

if let Some(count) = self.except_col.get_mut(tuple) {
if *count > 0 {
*count -= 1;
continue;
}
let matched = self.consume_right_match(arena.result_tuple());
let should_emit = match self.kind {
SetMembershipKind::Except => !matched,
SetMembershipKind::Intersect => matched,
};

if should_emit {
arena.resume();
return Ok(());
}
}
}

arena.resume();
return Ok(());
fn consume_right_match(&mut self, tuple: &Tuple) -> bool {
if let Some(count) = self.right_counts.get_mut(tuple) {
if *count > 0 {
*count -= 1;
return true;
}
}

false
}
}
23 changes: 14 additions & 9 deletions src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use crate::execution::dql::aggregate::simple_agg::SimpleAggExecutor;
use crate::execution::dql::aggregate::stream_distinct::StreamDistinctExecutor;
use crate::execution::dql::describe::Describe;
use crate::execution::dql::dummy::Dummy;
use crate::execution::dql::except::Except;
use crate::execution::dql::explain::Explain;
use crate::execution::dql::filter::Filter;
use crate::execution::dql::function_scan::FunctionScan;
Expand All @@ -51,6 +50,7 @@ use crate::execution::dql::limit::Limit;
use crate::execution::dql::projection::Projection;
use crate::execution::dql::scalar_subquery::ScalarSubquery;
use crate::execution::dql::seq_scan::SeqScan;
use crate::execution::dql::set_membership::SetMembership;
use crate::execution::dql::show_table::ShowTables;
use crate::execution::dql::show_view::ShowViews;
use crate::execution::dql::sort::Sort;
Expand Down Expand Up @@ -127,7 +127,6 @@ pub(crate) enum ExecNode<'a, T: Transaction + 'a> {
DropTable(DropTable),
DropView(DropView),
Dummy(Dummy),
Except(Except),
Explain(Explain),
Filter(Filter),
FunctionScan(FunctionScan),
Expand All @@ -141,6 +140,7 @@ pub(crate) enum ExecNode<'a, T: Transaction + 'a> {
Projection(Projection),
ScalarApply(ScalarApply),
ScalarSubquery(ScalarSubquery),
SetMembership(SetMembership),
SeqScan(SeqScan<'a, T>),
ShowTables(ShowTables),
ShowViews(ShowViews),
Expand Down Expand Up @@ -206,7 +206,6 @@ impl<'a, T: Transaction + 'a> ExecNode<'a, T> {
}
ExecNode::DropView(exec) => <DropView as ExecutorNode<'a, T>>::next_tuple(exec, arena),
ExecNode::Dummy(exec) => <Dummy as ExecutorNode<'a, T>>::next_tuple(exec, arena),
ExecNode::Except(exec) => <Except as ExecutorNode<'a, T>>::next_tuple(exec, arena),
ExecNode::Explain(exec) => <Explain as ExecutorNode<'a, T>>::next_tuple(exec, arena),
ExecNode::Filter(exec) => <Filter as ExecutorNode<'a, T>>::next_tuple(exec, arena),
ExecNode::FunctionScan(exec) => {
Expand Down Expand Up @@ -236,6 +235,9 @@ impl<'a, T: Transaction + 'a> ExecNode<'a, T> {
ExecNode::ScalarSubquery(exec) => {
<ScalarSubquery as ExecutorNode<'a, T>>::next_tuple(exec, arena)
}
ExecNode::SetMembership(exec) => {
<SetMembership as ExecutorNode<'a, T>>::next_tuple(exec, arena)
}
ExecNode::SeqScan(exec) => {
<SeqScan<'a, T> as ExecutorNode<'a, T>>::next_tuple(exec, arena)
}
Expand Down Expand Up @@ -783,12 +785,15 @@ pub(crate) fn build_read<'a, T: Transaction + 'a>(
cache,
transaction,
),
Operator::Except(_) => <Except as ExecutorNode<'a, T>>::into_executor(
childrens.pop_twins(),
arena,
cache,
transaction,
),
Operator::SetMembership(op) => {
let (left, right) = childrens.pop_twins();
<SetMembership as ExecutorNode<'a, T>>::into_executor(
(op.kind, left, right),
arena,
cache,
transaction,
)
}
_ => unreachable!(),
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/optimizer/rule/implementation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl ImplementationRuleRootTag {
| Operator::ShowView
| Operator::Explain
| Operator::Describe(_)
| Operator::Except(_)
| Operator::SetMembership(_)
| Operator::Union(_)
| Operator::CreateIndex(_)
| Operator::CreateView(_)
Expand Down
8 changes: 4 additions & 4 deletions src/optimizer/rule/normalization/column_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl ColumnPruning {
.map(|column| column.summary()),
);
}
Operator::Except(op) => {
Operator::SetMembership(op) => {
referenced_columns.extend(
op.left_schema_ref
.iter()
Expand Down Expand Up @@ -281,7 +281,7 @@ impl ColumnPruning {
| Operator::ShowView
| Operator::Describe(_)
| Operator::Union(_)
| Operator::Except(_)
| Operator::SetMembership(_)
| Operator::AddColumn(_)
| Operator::ChangeColumn(_)
| Operator::DropColumn(_)
Expand Down Expand Up @@ -503,7 +503,7 @@ impl ColumnPruning {
| Operator::Join(_)
| Operator::Filter(_)
| Operator::Union(_)
| Operator::Except(_)
| Operator::SetMembership(_)
| Operator::TopK(_) => {
if matches!(
operator,
Expand Down Expand Up @@ -582,7 +582,7 @@ impl ColumnPruning {
}
changed = true;
}
} else if matches!(operator, Operator::Union(_) | Operator::Except(_)) {
} else if matches!(operator, Operator::Union(_) | Operator::SetMembership(_)) {
let mut child_required = required_columns;
Self::extend_operator_referenced_columns(operator, &mut child_required);
changed |= Self::apply_twins(child_required, all_referenced, childrens, arena)?;
Expand Down
4 changes: 2 additions & 2 deletions src/optimizer/rule/normalization/compilation_in_advance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pub(crate) fn evaluator_bind_current(plan: &mut LogicalPlan) -> Result<(), Datab
| Operator::CopyFromFile(_)
| Operator::CopyToFile(_)
| Operator::Union(_)
| Operator::Except(_) => (),
| Operator::SetMembership(_) => (),
}

Ok(())
Expand All @@ -125,7 +125,7 @@ impl EvaluatorBind {
| Operator::MarkApply(_)
| Operator::Join(_)
| Operator::Union(_)
| Operator::Except(_)
| Operator::SetMembership(_)
) {
Self::_apply(right)?;
}
Expand Down
2 changes: 1 addition & 1 deletion src/optimizer/rule/normalization/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl NormalizationRuleRootTag {
| Operator::FunctionScan(_)
| Operator::Update(_)
| Operator::Union(_)
| Operator::Except(_) => None,
| Operator::SetMembership(_) => None,
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/orm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ The usual flow is:
- keep full-model output, or switch into `project::<P>()`,
`project_value(...)`, or `project_tuple(...)`
- once the output shape is fixed, compose set queries with `union(...)`,
`except(...)`, and optional `.all()`
`except(...)`, `intersect(...)`, and optional `.all()`

If you need an explicit relation alias, call `.alias("name")` on a source or
pending join, and re-qualify fields with `Field::qualify("name")` where
Expand Down Expand Up @@ -143,9 +143,10 @@ Set operations are available after the output shape is fixed:
- model rows: `from::<User>().union(...)`
- single values: `project_value(...).union(...)`
- tuples: `project_tuple(...).except(...)`
- intersections: `project_value(...).intersect(...)`
- struct projections: `project::<P>().union(...)`

Call `.all()` after `union(...)` or `except(...)` when you want multiset
Call `.all()` after `union(...)`, `except(...)`, or `intersect(...)` when you want multiset
semantics instead of the default distinct result.

After a set query is formed, you can still apply result-level methods such as
Expand Down
Loading
Loading