From 8c00bfd64086829881bce97b060a060335f9749b Mon Sep 17 00:00:00 2001 From: kould Date: Wed, 16 Jul 2025 01:12:58 +0800 Subject: [PATCH 1/2] refactor: Refactor Tuple's id to make Update support complex queries --- Cargo.lock | 85 +++-- .../src/reference_serialization.rs | 24 +- src/binder/alter_table.rs | 4 +- src/binder/analyze.rs | 2 +- src/binder/copy.rs | 2 +- src/binder/create_index.rs | 2 +- src/binder/delete.rs | 2 +- src/binder/expr.rs | 3 +- src/binder/mod.rs | 13 + src/binder/select.rs | 3 +- src/binder/update.rs | 1 + src/execution/ddl/add_column.rs | 6 +- src/execution/ddl/create_index.rs | 4 +- src/execution/dml/copy_to_file.rs | 2 +- src/execution/dml/delete.rs | 4 +- src/execution/dml/insert.rs | 10 +- src/execution/dml/update.rs | 17 +- src/execution/dql/aggregate/avg.rs | 26 +- src/execution/dql/aggregate/mod.rs | 2 +- src/execution/dql/index_scan.rs | 2 + src/execution/dql/join/hash_join.rs | 6 +- src/execution/dql/join/nested_loop_join.rs | 12 +- src/execution/dql/projection.rs | 11 +- src/execution/dql/seq_scan.rs | 4 +- src/expression/mod.rs | 2 +- .../rule/normalization/column_pruning.rs | 2 +- src/planner/mod.rs | 9 +- src/planner/operator/mod.rs | 12 +- src/planner/operator/table_scan.rs | 17 +- src/serdes/btree_map.rs | 40 +++ src/serdes/mod.rs | 1 + src/storage/mod.rs | 176 ++++++---- src/storage/rocksdb.rs | 27 +- src/storage/table_codec.rs | 22 +- src/types/tuple.rs | 57 ++- src/types/tuple_builder.rs | 6 +- tests/slt/basic_test.slt | 15 +- tests/slt/crdb/update.slt | 324 ++++++++++++++++++ 38 files changed, 723 insertions(+), 234 deletions(-) create mode 100644 src/serdes/btree_map.rs create mode 100644 tests/slt/crdb/update.slt diff --git a/Cargo.lock b/Cargo.lock index 9a818231..d6d8b7f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -158,9 +158,9 @@ checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" [[package]] name = "aws-lc-rs" -version = "1.13.1" +version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93fcc8f365936c834db5514fc45aee5b1202d677e6b40e48468aaaa8183ca8c7" +checksum = "08b5d4e069cbc868041a64bd68dc8cb39a0d79585cd6c5a24caa8c2d622121be" dependencies = [ "aws-lc-sys", "untrusted 0.7.1", @@ -169,9 +169,9 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.29.0" +version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61b1d86e7705efe1be1b569bab41d4fa1e14e220b60a160f78de2db687add079" +checksum = "dbfd150b5dbdb988bcc8fb1fe787eb6b7ee6180ca24da683b61ea5405f3d43ff" dependencies = [ "bindgen 0.69.5", "cc", @@ -309,9 +309,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.18.1" +version = "3.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "793db76d6187cd04dff33004d8e6c9cc4e05cd330500379d2394209271b4aeee" +checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" dependencies = [ "allocator-api2", ] @@ -374,9 +374,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.2.27" +version = "1.2.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d487aa071b5f64da6f19a3e848e3578944b726ee5a4854b82172f02aa876bfdc" +checksum = "5c1599538de2394445747c8cf7935946e3cc27e9625f889d979bfb2aaf569362" dependencies = [ "jobserver", "libc", @@ -458,9 +458,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.40" +version = "4.5.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40b6887a1d8685cebccf115538db5c0efe625ccac9696ad45c409d96566e910f" +checksum = "be92d32e80243a54711e5d7ce823c35c41c9d929dc4ab58e1276f625841aadf9" dependencies = [ "clap_builder", "clap_derive", @@ -468,9 +468,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.40" +version = "4.5.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0c66c08ce9f0c698cbce5c0279d0bb6ac936d8674174fe48f736533b964f59e" +checksum = "707eab41e9622f9139419d573eca0900137718000c517d47da73045f54331c3d" dependencies = [ "anstream", "anstyle", @@ -480,9 +480,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.40" +version = "4.5.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2c7947ae4cc3d851207c1adb5b5e260ff0cca11446b1d6d1423788e442257ce" +checksum = "ef4f52386a59ca4c860f7393bcf8abd8dfd91ecccc0f774635ff68e92eeef491" dependencies = [ "heck", "proc-macro2", @@ -644,9 +644,9 @@ dependencies = [ [[package]] name = "crunchy" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43da5946c66ffcc7745f48db692ffbb10a83bfe0afd96235c5c2a4fb23994929" +checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" [[package]] name = "crypto-common" @@ -1148,9 +1148,9 @@ checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" [[package]] name = "indexmap" -version = "2.9.0" +version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e" +checksum = "fe4cd85333e22411419a0bcae1297d25e58c9443848b11dc6a86fefe8c78a661" dependencies = [ "equivalent", "hashbrown 0.15.4", @@ -1196,6 +1196,17 @@ dependencies = [ "rustversion", ] +[[package]] +name = "io-uring" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b86e202f00093dcba4275d4636b93ef9dd75d025ae560d2521b45ea28ab49013" +dependencies = [ + "bitflags 2.9.1", + "cfg-if", + "libc", +] + [[package]] name = "is-terminal" version = "0.4.16" @@ -1395,9 +1406,9 @@ dependencies = [ [[package]] name = "libredox" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" +checksum = "1580801010e535496706ba011c15f8532df6b42297d2e471fec38ceadd8c0638" dependencies = [ "bitflags 2.9.1", "libc", @@ -1514,9 +1525,9 @@ checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" [[package]] name = "memmap2" -version = "0.9.5" +version = "0.9.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd3f7eed9d3848f8b98834af67102b720745c4ec028fcd0aa0239277e7de374f" +checksum = "483758ad303d734cec05e5c12b41d7e93e6a6390c5e9dae6bdeb7c1259012d28" dependencies = [ "libc", ] @@ -2083,9 +2094,9 @@ dependencies = [ [[package]] name = "rgb" -version = "0.8.50" +version = "0.8.52" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57397d16646700483b67d2dd6511d79318f9d057fdbd21a4066aeac8b41d310a" +checksum = "0c6a884d2998352bb4daf0183589aec883f16a6da1f4dde84d8e2e9a5409a1ce" dependencies = [ "bytemuck", ] @@ -2193,22 +2204,22 @@ dependencies = [ [[package]] name = "rustix" -version = "1.0.7" +version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c71e83d6afe7ff64890ec6b71d6a69bb8a610ab78ce364b3352876bb4c801266" +checksum = "11181fbabf243db407ef8df94a6ce0b2f9a733bd8be4ad02b4eda9602296cac8" dependencies = [ "bitflags 2.9.1", "errno", "libc", "linux-raw-sys 0.9.4", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] name = "rustls" -version = "0.23.28" +version = "0.23.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7160e3e10bf4535308537f3c4e1641468cd0e485175d6163087c0393c7d46643" +checksum = "2491382039b29b9b11ff08b76ff6c97cf287671dbb74f0be44bda389fffe9bd1" dependencies = [ "aws-lc-rs", "log", @@ -2230,9 +2241,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.103.3" +version = "0.103.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4a72fe2bcf7a6ac6fd7d0b9e5cb68aeb7d4c0a0271730218b3e92d43b4eb435" +checksum = "0a17884ae0c1b773f1ccd2bd4a8c72f16da897310a98b0e84bf349ad5ead92fc" dependencies = [ "aws-lc-rs", "ring", @@ -2543,7 +2554,7 @@ dependencies = [ "fastrand", "getrandom 0.3.3", "once_cell", - "rustix 1.0.7", + "rustix 1.0.8", "windows-sys 0.59.0", ] @@ -2632,17 +2643,19 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.45.1" +version = "1.46.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75ef51a33ef1da925cea3e4eb122833cb377c61439ca401b770f54902b806779" +checksum = "0cc3a2344dafbe23a245241fe8b09735b521110d30fcefbbd5feb1797ca35d17" dependencies = [ "backtrace", "bytes", + "io-uring", "libc", "mio", "parking_lot", "pin-project-lite", "signal-hook-registry", + "slab", "socket2", "tokio-macros", "windows-sys 0.52.0", @@ -3298,9 +3311,9 @@ checksum = "271414315aff87387382ec3d271b52d7ae78726f5d44ac98b4f4030c91880486" [[package]] name = "winnow" -version = "0.7.11" +version = "0.7.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74c7b26e3480b707944fc872477815d29a8e429d2f93a1ce000f5fa84a15cbcd" +checksum = "f3edebf492c8125044983378ecb5766203ad3b4c2f7a922bd7dd207f6d443e95" dependencies = [ "memchr", ] diff --git a/kite_sql_serde_macros/src/reference_serialization.rs b/kite_sql_serde_macros/src/reference_serialization.rs index 2e873582..a016c0e3 100644 --- a/kite_sql_serde_macros/src/reference_serialization.rs +++ b/kite_sql_serde_macros/src/reference_serialization.rs @@ -47,6 +47,26 @@ fn process_type(ty: &Type) -> TokenStream { } } } + "BTreeMap" => { + if let PathArguments::AngleBracketed(AngleBracketedGenericArguments { + args, .. + }) = &path.segments.last().unwrap().arguments + { + let mut iter = args.iter(); + if let ( + Some(GenericArgument::Type(inner_ty_0)), + Some(GenericArgument::Type(inner_ty_1)), + ) = (iter.next(), iter.next()) + { + let inner_processed_0 = process_type(inner_ty_0); + let inner_processed_1 = process_type(inner_ty_1); + + return quote! { + #ident::<#inner_processed_0, #inner_processed_1> + }; + } + } + } _ => {} } @@ -72,7 +92,7 @@ pub(crate) fn handle(ast: DeriveInput) -> Result { let field_name = field_opts .ident - .unwrap_or_else(|| Ident::new(&format!("filed_{}", i), Span::call_site())); + .unwrap_or_else(|| Ident::new(&format!("field_{}", i), Span::call_site())); let ty = process_type(&field_opts.ty); encode_fields.push(quote! { @@ -135,7 +155,7 @@ pub(crate) fn handle(ast: DeriveInput) -> Result { let field_name = field_opts .ident - .unwrap_or_else(|| Ident::new(&format!("filed_{}", i), Span::call_site())); + .unwrap_or_else(|| Ident::new(&format!("field_{}", i), Span::call_site())); let ty = process_type(&field_opts.ty); encode_fields.push(quote! { diff --git a/src/binder/alter_table.rs b/src/binder/alter_table.rs index f2f4d013..ff615e3c 100644 --- a/src/binder/alter_table.rs +++ b/src/binder/alter_table.rs @@ -30,7 +30,7 @@ impl> Binder<'_, '_, T, A> if_not_exists, column_def, } => { - let plan = TableScanOperator::build(table_name.clone(), table); + let plan = TableScanOperator::build(table_name.clone(), table, true); let column = self.bind_column(column_def, None)?; if !is_valid_identifier(column.name()) { @@ -52,7 +52,7 @@ impl> Binder<'_, '_, T, A> if_exists, .. } => { - let plan = TableScanOperator::build(table_name.clone(), table); + let plan = TableScanOperator::build(table_name.clone(), table, true); let column_name = column_name.value.clone(); LogicalPlan::new( diff --git a/src/binder/analyze.rs b/src/binder/analyze.rs index 6aaafe78..6ade3864 100644 --- a/src/binder/analyze.rs +++ b/src/binder/analyze.rs @@ -26,7 +26,7 @@ impl> Binder<'_, '_, T, A> .ok_or(DatabaseError::TableNotFound)?; let index_metas = table.indexes.clone(); - let scan_op = TableScanOperator::build(table_name.clone(), table); + let scan_op = TableScanOperator::build(table_name.clone(), table, false); Ok(LogicalPlan::new( Operator::Analyze(AnalyzeOperator { table_name, diff --git a/src/binder/copy.rs b/src/binder/copy.rs index e0b0efe6..4cd29392 100644 --- a/src/binder/copy.rs +++ b/src/binder/copy.rs @@ -105,7 +105,7 @@ impl> Binder<'_, '_, T, A> target: ext_source, schema_ref, }), - Childrens::Only(TableScanOperator::build(table_name, table)), + Childrens::Only(TableScanOperator::build(table_name, table, false)), )) } else { // COPY FROM diff --git a/src/binder/create_index.rs b/src/binder/create_index.rs index 69a0b103..4b5332ba 100644 --- a/src/binder/create_index.rs +++ b/src/binder/create_index.rs @@ -35,7 +35,7 @@ impl> Binder<'_, '_, T, A> .source_and_bind(table_name.clone(), None, None, false)? .ok_or(DatabaseError::SourceNotFound)?; let plan = match source { - Source::Table(table) => TableScanOperator::build(table_name.clone(), table), + Source::Table(table) => TableScanOperator::build(table_name.clone(), table, true), Source::View(view) => LogicalPlan::clone(&view.plan), }; let mut columns = Vec::with_capacity(exprs.len()); diff --git a/src/binder/delete.rs b/src/binder/delete.rs index 96971e3b..94fc8118 100644 --- a/src/binder/delete.rs +++ b/src/binder/delete.rs @@ -37,7 +37,7 @@ impl> Binder<'_, '_, T, A> .iter() .map(|(_, column)| column.clone()) .collect_vec(); - let mut plan = TableScanOperator::build(table_name.clone(), table); + let mut plan = TableScanOperator::build(table_name.clone(), table, true); if let Some(alias_idents) = alias_idents { plan = diff --git a/src/binder/expr.rs b/src/binder/expr.rs index 548b7028..d8edc85b 100644 --- a/src/binder/expr.rs +++ b/src/binder/expr.rs @@ -538,13 +538,12 @@ impl<'a, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '_, T if args.len() != 1 { return Err(DatabaseError::MisMatch("number of avg() parameters", "1")); } - let ty = args[0].return_type(); return Ok(ScalarExpression::AggCall { distinct: func.distinct, kind: AggKind::Avg, args, - ty, + ty: LogicalType::Double, }); } "if" => { diff --git a/src/binder/mod.rs b/src/binder/mod.rs index e4329da4..ae353c2c 100644 --- a/src/binder/mod.rs +++ b/src/binder/mod.rs @@ -319,6 +319,7 @@ pub struct Binder<'a, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> context: BinderContext<'a, T>, table_schema_buf: HashMap>, args: &'a A, + with_pk: Option, pub(crate) parent: Option<&'b Binder<'a, 'b, T, A>>, } @@ -332,10 +333,22 @@ impl<'a, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, ' context, table_schema_buf: Default::default(), args, + with_pk: None, parent, } } + pub fn with_pk(&mut self, table_name: TableName) { + self.with_pk = Some(table_name); + } + + pub fn is_scan_with_pk(&self, table_name: &TableName) -> bool { + if let Some(with_pk_table) = self.with_pk.as_ref() { + return with_pk_table == table_name; + } + false + } + pub fn bind(&mut self, stmt: &Statement) -> Result { let plan = match stmt { Statement::Query(query) => self.bind_query(query)?, diff --git a/src/binder/select.rs b/src/binder/select.rs index 76605b42..b212bee3 100644 --- a/src/binder/select.rs +++ b/src/binder/select.rs @@ -386,12 +386,13 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<' alias_idents = Some(columns); } + let with_pk = self.is_scan_with_pk(&table_name); let source = self .context .source_and_bind(table_name.clone(), table_alias.as_ref(), join_type, false)? .ok_or(DatabaseError::SourceNotFound)?; let mut plan = match source { - Source::Table(table) => TableScanOperator::build(table_name.clone(), table), + Source::Table(table) => TableScanOperator::build(table_name.clone(), table, with_pk), Source::View(view) => LogicalPlan::clone(&view.plan), }; diff --git a/src/binder/update.rs b/src/binder/update.rs index dd134726..a160a24a 100644 --- a/src/binder/update.rs +++ b/src/binder/update.rs @@ -21,6 +21,7 @@ impl> Binder<'_, '_, T, A> self.context.allow_default = true; if let TableFactor::Table { name, .. } = &to.relation { let table_name = Arc::new(lower_case_name(name)?); + self.with_pk(table_name.clone()); let mut plan = self.bind_table_ref(to)?; diff --git a/src/execution/ddl/add_column.rs b/src/execution/ddl/add_column.rs index e395b426..f91ffe3a 100644 --- a/src/execution/ddl/add_column.rs +++ b/src/execution/ddl/add_column.rs @@ -1,3 +1,4 @@ +use crate::errors::DatabaseError; use crate::execution::{build_read, Executor, WriteExecutor}; use crate::planner::LogicalPlan; use crate::storage::{StatisticsMetaCache, TableCache, ViewCache}; @@ -55,7 +56,10 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for AddColumn { if let Some(value) = throw!(column.default_value()) { if let Some(unique_values) = &mut unique_values { - unique_values.push((tuple.id().unwrap().clone(), value.clone())); + unique_values.push(( + throw!(tuple.pk.clone().ok_or(DatabaseError::PrimaryKeyNotFound)), + value.clone(), + )); } tuple.values.push(value); } else { diff --git a/src/execution/ddl/create_index.rs b/src/execution/ddl/create_index.rs index d19c9c9c..51c0dfa0 100644 --- a/src/execution/ddl/create_index.rs +++ b/src/execution/ddl/create_index.rs @@ -72,7 +72,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateIndex { let mut coroutine = build_read(self.input, cache, transaction); while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { - let mut tuple: Tuple = throw!(tuple); + let tuple: Tuple = throw!(tuple); let Some(value) = DataValue::values_to_tuple(throw!(Projection::projection( &tuple, @@ -81,7 +81,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for CreateIndex { ))) else { continue; }; - let tuple_id = if let Some(tuple_id) = tuple.id().take() { + let tuple_id = if let Some(tuple_id) = tuple.pk.as_ref() { tuple_id } else { continue; diff --git a/src/execution/dml/copy_to_file.rs b/src/execution/dml/copy_to_file.rs index d1c94e2b..562c810a 100644 --- a/src/execution/dml/copy_to_file.rs +++ b/src/execution/dml/copy_to_file.rs @@ -184,7 +184,7 @@ mod tests { let executor = CopyToFile { op: op.clone(), - input: TableScanOperator::build(Arc::new("t1".to_string()), table), + input: TableScanOperator::build(Arc::new("t1".to_string()), table, true), }; let mut coroutine = executor.execute( ( diff --git a/src/execution/dml/delete.rs b/src/execution/dml/delete.rs index e1d76db9..5df5958a 100644 --- a/src/execution/dml/delete.rs +++ b/src/execution/dml/delete.rs @@ -51,7 +51,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Delete { let mut coroutine = build_read(input, cache, transaction); while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { - let mut tuple: Tuple = throw!(tuple); + let tuple: Tuple = throw!(tuple); for index_meta in table.indexes() { if let Some(Value { exprs, values, .. }) = indexes.get_mut(&index_meta.id) { @@ -81,7 +81,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Delete { ); } } - if let Some(tuple_id) = tuple.id() { + if let Some(tuple_id) = &tuple.pk { for ( index_id, Value { diff --git a/src/execution/dml/insert.rs b/src/execution/dml/insert.rs index 9c65c5d3..ac1e8e77 100644 --- a/src/execution/dml/insert.rs +++ b/src/execution/dml/insert.rs @@ -99,7 +99,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Insert { } let types = table_catalog.types(); - let indices = table_catalog.primary_keys_indices(); + let pk_indices = table_catalog.primary_keys_indices(); let mut coroutine = build_read(input, cache, transaction); while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { @@ -126,16 +126,16 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Insert { } values.push(value) } - let mut tuple = Tuple::new(Some(indices.clone()), values); + let pk = Tuple::primary_projection(pk_indices, &values); + let tuple = Tuple::new(Some(pk), values); for (index_meta, exprs) in index_metas.iter() { let values = throw!(Projection::projection(&tuple, exprs, &schema)); let Some(value) = DataValue::values_to_tuple(values) else { continue; }; - let Some(tuple_id) = tuple.id() else { - unreachable!() - }; + let tuple_id = + throw!(tuple.pk.as_ref().ok_or(DatabaseError::PrimaryKeyNotFound)); let index = Index::new(index_meta.id, &value, index_meta.ty); throw!(unsafe { &mut (*transaction) }.add_index( &table_name, diff --git a/src/execution/dml/update.rs b/src/execution/dml/update.rs index c028aead..e4f1138d 100644 --- a/src/execution/dml/update.rs +++ b/src/execution/dml/update.rs @@ -1,4 +1,5 @@ use crate::catalog::{ColumnRef, TableName}; +use crate::errors::DatabaseError; use crate::execution::dql::projection::Projection; use crate::execution::{build_read, Executor, WriteExecutor}; use crate::expression::ScalarExpression; @@ -80,7 +81,8 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Update { let mut is_overwrite = true; - let old_pk = tuple.id().cloned().unwrap(); + let old_pk = + throw!(tuple.pk.clone().ok_or(DatabaseError::PrimaryKeyNotFound)); for (index_meta, exprs) in index_metas.iter() { let values = throw!(Projection::projection(&tuple, exprs, &input_schema)); @@ -99,10 +101,15 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Update { tuple.values[i] = throw!(expr.eval(Some((&tuple, &input_schema)))); } } - tuple.clear_id(); - let new_pk = tuple.id().unwrap().clone(); - if new_pk != old_pk { + tuple.pk = Some(Tuple::primary_projection( + table_catalog.primary_keys_indices(), + &tuple.values, + )); + let new_pk = + throw!(tuple.pk.as_ref().ok_or(DatabaseError::PrimaryKeyNotFound)); + + if new_pk != &old_pk { throw!( unsafe { &mut (*transaction) }.remove_tuple(&table_name, &old_pk) ); @@ -118,7 +125,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Update { throw!(unsafe { &mut (*transaction) }.add_index( &table_name, index, - &new_pk + new_pk )); } diff --git a/src/execution/dql/aggregate/avg.rs b/src/execution/dql/aggregate/avg.rs index ce19658d..9e1a9080 100644 --- a/src/execution/dql/aggregate/avg.rs +++ b/src/execution/dql/aggregate/avg.rs @@ -4,26 +4,31 @@ use crate::execution::dql::aggregate::Accumulator; use crate::expression::BinaryOperator; use crate::types::evaluator::EvaluatorFactory; use crate::types::value::DataValue; -use crate::types::LogicalType; pub struct AvgAccumulator { - inner: SumAccumulator, + inner: Option, count: usize, } impl AvgAccumulator { - pub fn new(ty: &LogicalType) -> Result { - Ok(Self { - inner: SumAccumulator::new(ty)?, + pub fn new() -> Self { + Self { + inner: None, count: 0, - }) + } } } impl Accumulator for AvgAccumulator { fn update_value(&mut self, value: &DataValue) -> Result<(), DatabaseError> { if !value.is_null() { - self.inner.update_value(value)?; + let acc = if let Some(ref mut inner) = self.inner { + inner + } else { + self.inner + .get_or_insert(SumAccumulator::new(&value.logical_type())?) + }; + acc.update_value(value)?; self.count += 1; } @@ -31,11 +36,14 @@ impl Accumulator for AvgAccumulator { } fn evaluate(&self) -> Result { - let mut value = self.inner.evaluate()?; + let Some(acc) = &self.inner else { + return Ok(DataValue::Null); + }; + let mut value = acc.evaluate()?; let value_ty = value.logical_type(); if self.count == 0 { - return Ok(DataValue::init(&value_ty)); + return Ok(DataValue::Null); } let quantity = if value_ty.is_signed_numeric() { DataValue::Int64(self.count as i64) diff --git a/src/execution/dql/aggregate/mod.rs b/src/execution/dql/aggregate/mod.rs index 2afd64c5..5a3e64ce 100644 --- a/src/execution/dql/aggregate/mod.rs +++ b/src/execution/dql/aggregate/mod.rs @@ -38,7 +38,7 @@ fn create_accumulator(expr: &ScalarExpression) -> Result, D (AggKind::Sum, true) => Box::new(DistinctSumAccumulator::new(ty)?), (AggKind::Min, _) => Box::new(MinMaxAccumulator::new(false)), (AggKind::Max, _) => Box::new(MinMaxAccumulator::new(true)), - (AggKind::Avg, _) => Box::new(AvgAccumulator::new(ty)?), + (AggKind::Avg, _) => Box::new(AvgAccumulator::new()), }) } else { unreachable!( diff --git a/src/execution/dql/index_scan.rs b/src/execution/dql/index_scan.rs index bf3794ba..c218aac8 100644 --- a/src/execution/dql/index_scan.rs +++ b/src/execution/dql/index_scan.rs @@ -39,6 +39,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for IndexScan { table_name, columns, limit, + with_pk, .. } = self.op; @@ -50,6 +51,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for IndexScan { columns, self.index_by, self.ranges, + with_pk, ) .unwrap(); diff --git a/src/execution/dql/join/hash_join.rs b/src/execution/dql/join/hash_join.rs index 88ffe81f..f7cdce8e 100644 --- a/src/execution/dql/join/hash_join.rs +++ b/src/execution/dql/join/hash_join.rs @@ -199,13 +199,13 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for HashJoin { JoinType::LeftAnti => continue, _ => (), } - for (i, Tuple { values, .. }) in tuples.iter().enumerate() { + for (i, Tuple { values, pk }) in tuples.iter().enumerate() { let full_values = values .iter() .chain(tuple.values.iter()) .cloned() .collect_vec(); - let tuple = Tuple::new(None, full_values); + let tuple = Tuple::new(pk.clone(), full_values); if let Some(tuple) = throw!(Self::filter( tuple, &full_schema_ref, @@ -235,7 +235,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for HashJoin { .map(|_| NULL_VALUE.clone()) .chain(tuple.values) .collect_vec(); - let tuple = Tuple::new(None, values); + let tuple = Tuple::new(tuple.pk, values); if let Some(tuple) = throw!(Self::filter( tuple, &full_schema_ref, diff --git a/src/execution/dql/join/nested_loop_join.rs b/src/execution/dql/join/nested_loop_join.rs index b578f9e3..e79dca4a 100644 --- a/src/execution/dql/join/nested_loop_join.rs +++ b/src/execution/dql/join/nested_loop_join.rs @@ -270,7 +270,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for NestedLoopJoin { let mut values = vec![NULL_VALUE.clone(); right_schema_len]; values.append(&mut right_tuple.values); - yield Ok(Tuple::new(None, values)) + yield Ok(Tuple::new(right_tuple.pk, values)) } idx += 1; } @@ -328,16 +328,20 @@ impl NestedLoopJoin { return None; } - Some(Tuple::new(None, values)) + Some(Tuple::new( + left_tuple.pk.as_ref().or(right_tuple.pk.as_ref()).cloned(), + values, + )) } /// Merge the two tuples. /// `left_tuple` must be from the `NestedLoopJoin.left_input` /// `right_tuple` must be from the `NestedLoopJoin.right_input` fn merge_tuple(left_tuple: &Tuple, right_tuple: &Tuple, ty: &JoinType) -> Tuple { + let pk = left_tuple.pk.as_ref().or(right_tuple.pk.as_ref()).cloned(); match ty { JoinType::RightOuter => Tuple::new( - None, + pk, right_tuple .values .iter() @@ -346,7 +350,7 @@ impl NestedLoopJoin { .collect_vec(), ), _ => Tuple::new( - None, + pk, left_tuple .values .iter() diff --git a/src/execution/dql/projection.rs b/src/execution/dql/projection.rs index 3584912f..4f11e60e 100644 --- a/src/execution/dql/projection.rs +++ b/src/execution/dql/projection.rs @@ -38,11 +38,8 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Projection { while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { let tuple = throw!(tuple); - - yield Ok(Tuple::new( - None, - throw!(Self::projection(&tuple, &exprs, &schema)), - )); + let values = throw!(Self::projection(&tuple, &exprs, &schema)); + yield Ok(Tuple::new(tuple.pk, values)); } }, ) @@ -53,12 +50,12 @@ impl Projection { pub fn projection( tuple: &Tuple, exprs: &[ScalarExpression], - schmea: &[ColumnRef], + schema: &[ColumnRef], ) -> Result, DatabaseError> { let mut values = Vec::with_capacity(exprs.len()); for expr in exprs.iter() { - values.push(expr.eval(Some((tuple, schmea)))?); + values.push(expr.eval(Some((tuple, schema)))?); } Ok(values) } diff --git a/src/execution/dql/seq_scan.rs b/src/execution/dql/seq_scan.rs index dc56e89f..cf675e4f 100644 --- a/src/execution/dql/seq_scan.rs +++ b/src/execution/dql/seq_scan.rs @@ -26,6 +26,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for SeqScan { table_name, columns, limit, + with_pk, .. } = self.op; @@ -33,7 +34,8 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for SeqScan { table_cache, table_name, limit, - columns + columns, + with_pk )); while let Some(tuple) = throw!(iter.next_tuple()) { diff --git a/src/expression/mod.rs b/src/expression/mod.rs index 89875aeb..cd1a4273 100644 --- a/src/expression/mod.rs +++ b/src/expression/mod.rs @@ -1501,7 +1501,7 @@ mod test { distinct: true, kind: AggKind::Avg, args: vec![ScalarExpression::Empty], - ty: LogicalType::Integer, + ty: LogicalType::Double, }, Some((&transaction, &table_cache)), &mut reference_tables, diff --git a/src/optimizer/rule/normalization/column_pruning.rs b/src/optimizer/rule/normalization/column_pruning.rs index b8340897..95665723 100644 --- a/src/optimizer/rule/normalization/column_pruning.rs +++ b/src/optimizer/rule/normalization/column_pruning.rs @@ -99,7 +99,7 @@ impl ColumnPruning { Operator::TableScan(op) => { if !all_referenced { op.columns - .retain(|(_, column)| column_references.contains(column.summary())); + .retain(|_, column| column_references.contains(column.summary())); } } Operator::Sort(_) diff --git a/src/planner/mod.rs b/src/planner/mod.rs index 39d67a83..1220af74 100644 --- a/src/planner/mod.rs +++ b/src/planner/mod.rs @@ -159,12 +159,9 @@ impl LogicalPlan { .map(|expr| expr.output_column()) .collect_vec(), ), - Operator::TableScan(op) => SchemaOutput::Schema( - op.columns - .iter() - .map(|(_, column)| column.clone()) - .collect_vec(), - ), + Operator::TableScan(op) => { + SchemaOutput::Schema(op.columns.values().cloned().collect_vec()) + } Operator::FunctionScan(op) => { SchemaOutput::SchemaRef(op.table_function.output_schema().clone()) } diff --git a/src/planner/operator/mod.rs b/src/planner/operator/mod.rs index d51cf047..620c7b31 100644 --- a/src/planner/operator/mod.rs +++ b/src/planner/operator/mod.rs @@ -135,9 +135,8 @@ impl Operator { Operator::Project(op) => Some(op.exprs.clone()), Operator::TableScan(op) => Some( op.columns - .iter() - .cloned() - .map(|(_, column)| ScalarExpression::ColumnRef(column)) + .values() + .map(|column| ScalarExpression::ColumnRef(column.clone())) .collect_vec(), ), Operator::Sort(_) | Operator::Limit(_) => None, @@ -210,12 +209,7 @@ impl Operator { .iter() .flat_map(|expr| expr.referenced_columns(only_column_ref)) .collect_vec(), - Operator::TableScan(op) => op - .columns - .iter() - .map(|(_, column)| column) - .cloned() - .collect_vec(), + Operator::TableScan(op) => op.columns.values().cloned().collect_vec(), Operator::FunctionScan(op) => op .table_function .args diff --git a/src/planner/operator/table_scan.rs b/src/planner/operator/table_scan.rs index 7ab1c131..fe94aafa 100644 --- a/src/planner/operator/table_scan.rs +++ b/src/planner/operator/table_scan.rs @@ -6,6 +6,7 @@ use crate::types::index::IndexInfo; use crate::types::ColumnId; use itertools::Itertools; use kite_sql_serde_macros::ReferenceSerialization; +use std::collections::BTreeMap; use std::fmt; use std::fmt::Formatter; @@ -13,17 +14,22 @@ use std::fmt::Formatter; pub struct TableScanOperator { pub(crate) table_name: TableName, pub(crate) primary_keys: Vec, - pub(crate) columns: Vec<(usize, ColumnRef)>, + pub(crate) columns: BTreeMap, // Support push down limit. pub(crate) limit: Bounds, // Support push down predicate. // If pre_where is simple predicate, for example: a > 1 then can calculate directly when read data. pub(crate) index_infos: Vec, + pub(crate) with_pk: bool, } impl TableScanOperator { - pub fn build(table_name: TableName, table_catalog: &TableCatalog) -> LogicalPlan { + pub fn build( + table_name: TableName, + table_catalog: &TableCatalog, + with_pk: bool, + ) -> LogicalPlan { let primary_keys = table_catalog .primary_keys() .iter() @@ -34,7 +40,7 @@ impl TableScanOperator { .columns() .enumerate() .map(|(i, column)| (i, column.clone())) - .collect_vec(); + .collect(); let index_infos = table_catalog .indexes .iter() @@ -51,6 +57,7 @@ impl TableScanOperator { primary_keys, columns, limit: (None, None), + with_pk, }), Childrens::None, ) @@ -61,8 +68,8 @@ impl fmt::Display for TableScanOperator { fn fmt(&self, f: &mut Formatter) -> fmt::Result { let projection_columns = self .columns - .iter() - .map(|(_, column)| column.name().to_string()) + .values() + .map(|column| column.name().to_string()) .join(", "); let (offset, limit) = self.limit; diff --git a/src/serdes/btree_map.rs b/src/serdes/btree_map.rs new file mode 100644 index 00000000..6e0678e2 --- /dev/null +++ b/src/serdes/btree_map.rs @@ -0,0 +1,40 @@ +use crate::errors::DatabaseError; +use crate::serdes::{ReferenceSerialization, ReferenceTables}; +use crate::storage::{TableCache, Transaction}; +use std::collections::BTreeMap; +use std::io::{Read, Write}; + +impl ReferenceSerialization for BTreeMap +where + K: ReferenceSerialization + Ord, + V: ReferenceSerialization, +{ + fn encode( + &self, + writer: &mut W, + is_direct: bool, + reference_tables: &mut ReferenceTables, + ) -> Result<(), DatabaseError> { + self.len().encode(writer, is_direct, reference_tables)?; + for (key, value) in self.iter() { + key.encode(writer, is_direct, reference_tables)?; + value.encode(writer, is_direct, reference_tables)?; + } + Ok(()) + } + + fn decode( + reader: &mut R, + drive: Option<(&T, &TableCache)>, + reference_tables: &ReferenceTables, + ) -> Result { + let len = ::decode(reader, drive, reference_tables)?; + let mut btree_map = BTreeMap::new(); + for _ in 0..len { + let key = K::decode(reader, drive, reference_tables)?; + let value = V::decode(reader, drive, reference_tables)?; + btree_map.insert(key, value); + } + Ok(btree_map) + } +} diff --git a/src/serdes/mod.rs b/src/serdes/mod.rs index 48ec0c9c..34d1c041 100644 --- a/src/serdes/mod.rs +++ b/src/serdes/mod.rs @@ -1,5 +1,6 @@ mod boolean; mod bound; +mod btree_map; mod char; mod char_length_units; mod column; diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 838fc1e2..5b48466a 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -2,9 +2,7 @@ pub mod rocksdb; pub(crate) mod table_codec; use crate::catalog::view::View; -use crate::catalog::{ - ColumnCatalog, ColumnRef, PrimaryKeyIndices, TableCatalog, TableMeta, TableName, -}; +use crate::catalog::{ColumnCatalog, ColumnRef, TableCatalog, TableMeta, TableName}; use crate::errors::DatabaseError; use crate::expression::range_detacher::Range; use crate::optimizer::core::statistics_meta::{StatisticMetaLoader, StatisticsMeta}; @@ -16,8 +14,8 @@ use crate::types::value::DataValue; use crate::types::{ColumnId, LogicalType}; use crate::utils::lru::SharedLruCache; use itertools::Itertools; +use std::collections::{BTreeMap, Bound}; use sqlparser::keywords::NULL; -use std::collections::Bound; use std::io::Cursor; use std::mem; use std::ops::SubAssign; @@ -55,19 +53,18 @@ pub trait Transaction: Sized { table_cache: &'a TableCache, table_name: TableName, bounds: Bounds, - mut columns: Vec<(usize, ColumnRef)>, + mut columns: BTreeMap, + with_pk: bool, ) -> Result, DatabaseError> { - debug_assert!(columns.is_sorted_by_key(|(i, _)| i)); - debug_assert!(columns.iter().map(|(i, _)| i).all_unique()); + debug_assert!(columns.keys().all_unique()); let table = self .table(table_cache, table_name.clone())? .ok_or(DatabaseError::TableNotFound)?; - let pk_indices = table.primary_keys_indices(); let table_types = table.types(); - if columns.is_empty() { + if columns.is_empty() || with_pk { for (i, column) in table.primary_keys() { - columns.push((*i, column.clone())); + columns.insert(*i, column.clone()); } } let mut tuple_columns = Vec::with_capacity(columns.len()); @@ -76,6 +73,7 @@ pub trait Transaction: Sized { tuple_columns.push(column); projections.push(projection); } + let remap_pk_indices = remap_pk_indices(&projections, table.primary_keys_indices()); let (min, max) = unsafe { &*self.table_codec() }.tuple_bound(&table_name); let iter = self.range(Bound::Included(min), Bound::Included(max))?; @@ -85,50 +83,58 @@ pub trait Transaction: Sized { limit: bounds.1, table_types, tuple_columns: Arc::new(tuple_columns), - pk_indices, + remap_pk_indices, projections, + with_pk, iter, }) } + #[allow(clippy::too_many_arguments)] fn read_by_index<'a>( &'a self, table_cache: &'a TableCache, table_name: TableName, (offset_option, limit_option): Bounds, - columns: Vec<(usize, ColumnRef)>, + mut columns: BTreeMap, index_meta: IndexMetaRef, ranges: Vec, + with_pk: bool, ) -> Result, DatabaseError> { - debug_assert!(columns.is_sorted_by_key(|(i, _)| i)); - debug_assert!(columns.iter().map(|(i, _)| i).all_unique()); + debug_assert!(columns.keys().all_unique()); let table = self .table(table_cache, table_name.clone())? .ok_or(DatabaseError::TableNotFound)?; - let pk_indices = table.primary_keys_indices(); let table_types = table.types(); let table_name = table.name.as_str(); let offset = offset_option.unwrap_or(0); + if columns.is_empty() || with_pk { + for (i, column) in table.primary_keys() { + columns.insert(*i, column.clone()); + } + } let mut tuple_columns = Vec::with_capacity(columns.len()); let mut projections = Vec::with_capacity(columns.len()); for (projection, column) in columns { tuple_columns.push(column); projections.push(projection); } + let remap_pk_indices = remap_pk_indices(&projections, table.primary_keys_indices()); let inner = IndexImplEnum::instance(index_meta.ty); Ok(IndexIter { offset, limit: limit_option, - pk_indices, + remap_pk_indices, params: IndexImplParams { tuple_schema_ref: Arc::new(tuple_columns), projections, index_meta, table_name, table_types, + with_pk, tx: self, }, inner, @@ -676,14 +682,14 @@ trait IndexImpl<'bytes, T: Transaction + 'bytes> { fn index_lookup( &self, bytes: &Bytes, - pk_indices: &PrimaryKeyIndices, + pk_indices: &[usize], params: &IndexImplParams, ) -> Result; fn eq_to_res<'a>( &self, value: &DataValue, - pk_indices: &PrimaryKeyIndices, + pk_indices: &[usize], params: &IndexImplParams<'a, T>, ) -> Result, DatabaseError>; @@ -725,6 +731,7 @@ struct IndexImplParams<'a, T: Transaction> { index_meta: IndexMetaRef, table_name: &'a str, table_types: Vec, + with_pk: bool, tx: &'a T, } @@ -750,7 +757,7 @@ impl IndexImplParams<'_, T> { fn get_tuple_by_id( &self, - pk_indices: &PrimaryKeyIndices, + pk_indices: &[usize], tuple_id: &TupleId, ) -> Result, DatabaseError> { let key = unsafe { &*self.table_codec() }.encode_tuple_key(self.table_name, tuple_id)?; @@ -764,6 +771,7 @@ impl IndexImplParams<'_, T> { &self.projections, &self.tuple_schema_ref, &bytes, + self.with_pk, ) }) .transpose() @@ -779,7 +787,7 @@ impl<'bytes, T: Transaction + 'bytes> IndexImpl<'bytes, T> for IndexImplEnum { fn index_lookup( &self, bytes: &Bytes, - pk_indices: &PrimaryKeyIndices, + pk_indices: &[usize], params: &IndexImplParams, ) -> Result { match self { @@ -793,7 +801,7 @@ impl<'bytes, T: Transaction + 'bytes> IndexImpl<'bytes, T> for IndexImplEnum { fn eq_to_res<'a>( &self, value: &DataValue, - pk_indices: &PrimaryKeyIndices, + pk_indices: &[usize], params: &IndexImplParams<'a, T>, ) -> Result, DatabaseError> { match self { @@ -823,7 +831,7 @@ impl<'bytes, T: Transaction + 'bytes> IndexImpl<'bytes, T> for PrimaryKeyIndexIm fn index_lookup( &self, bytes: &Bytes, - pk_indices: &PrimaryKeyIndices, + pk_indices: &[usize], params: &IndexImplParams, ) -> Result { TableCodec::decode_tuple( @@ -832,13 +840,14 @@ impl<'bytes, T: Transaction + 'bytes> IndexImpl<'bytes, T> for PrimaryKeyIndexIm ¶ms.projections, ¶ms.tuple_schema_ref, bytes, + params.with_pk, ) } fn eq_to_res<'a>( &self, value: &DataValue, - pk_indices: &PrimaryKeyIndices, + pk_indices: &[usize], params: &IndexImplParams<'a, T>, ) -> Result, DatabaseError> { let tuple = params @@ -851,6 +860,7 @@ impl<'bytes, T: Transaction + 'bytes> IndexImpl<'bytes, T> for PrimaryKeyIndexIm ¶ms.projections, ¶ms.tuple_schema_ref, &bytes, + params.with_pk, ) }) .transpose()?; @@ -869,7 +879,7 @@ impl<'bytes, T: Transaction + 'bytes> IndexImpl<'bytes, T> for PrimaryKeyIndexIm fn secondary_index_lookup( bytes: &Bytes, - pk_indices: &PrimaryKeyIndices, + pk_indices: &[usize], params: &IndexImplParams, ) -> Result { let tuple_id = TableCodec::decode_index(bytes)?; @@ -882,7 +892,7 @@ impl<'bytes, T: Transaction + 'bytes> IndexImpl<'bytes, T> for UniqueIndexImpl { fn index_lookup( &self, bytes: &Bytes, - pk_indices: &PrimaryKeyIndices, + pk_indices: &[usize], params: &IndexImplParams, ) -> Result { secondary_index_lookup(bytes, pk_indices, params) @@ -891,7 +901,7 @@ impl<'bytes, T: Transaction + 'bytes> IndexImpl<'bytes, T> for UniqueIndexImpl { fn eq_to_res<'a>( &self, value: &DataValue, - pk_indices: &PrimaryKeyIndices, + pk_indices: &[usize], params: &IndexImplParams<'a, T>, ) -> Result, DatabaseError> { let Some(bytes) = params.tx.get(&self.bound_key(params, value, false)?)? else { @@ -920,7 +930,7 @@ impl<'bytes, T: Transaction + 'bytes> IndexImpl<'bytes, T> for NormalIndexImpl { fn index_lookup( &self, bytes: &Bytes, - pk_indices: &PrimaryKeyIndices, + pk_indices: &[usize], params: &IndexImplParams, ) -> Result { secondary_index_lookup(bytes, pk_indices, params) @@ -929,7 +939,7 @@ impl<'bytes, T: Transaction + 'bytes> IndexImpl<'bytes, T> for NormalIndexImpl { fn eq_to_res<'a>( &self, value: &DataValue, - _: &PrimaryKeyIndices, + _: &[usize], params: &IndexImplParams<'a, T>, ) -> Result, DatabaseError> { let min = self.bound_key(params, value, false)?; @@ -961,7 +971,7 @@ impl<'bytes, T: Transaction + 'bytes> IndexImpl<'bytes, T> for CompositeIndexImp fn index_lookup( &self, bytes: &Bytes, - pk_indices: &PrimaryKeyIndices, + pk_indices: &[usize], params: &IndexImplParams, ) -> Result { secondary_index_lookup(bytes, pk_indices, params) @@ -970,7 +980,7 @@ impl<'bytes, T: Transaction + 'bytes> IndexImpl<'bytes, T> for CompositeIndexImp fn eq_to_res<'a>( &self, value: &DataValue, - _: &PrimaryKeyIndices, + _: &[usize], params: &IndexImplParams<'a, T>, ) -> Result, DatabaseError> { let min = self.bound_key(params, value, false)?; @@ -1003,8 +1013,9 @@ pub struct TupleIter<'a, T: Transaction + 'a> { limit: Option, table_types: Vec, tuple_columns: Arc>, - pk_indices: &'a PrimaryKeyIndices, + remap_pk_indices: Vec, projections: Vec, + with_pk: bool, iter: T::IterType<'a>, } @@ -1027,10 +1038,11 @@ impl<'a, T: Transaction + 'a> Iter for TupleIter<'a, T> { } let tuple = TableCodec::decode_tuple( &self.table_types, - self.pk_indices, + &self.remap_pk_indices, &self.projections, &self.tuple_columns, &value, + self.with_pk, )?; return Ok(Some(tuple)); @@ -1044,7 +1056,7 @@ pub struct IndexIter<'a, T: Transaction> { offset: usize, limit: Option, - pk_indices: &'a PrimaryKeyIndices, + remap_pk_indices: Vec, params: IndexImplParams<'a, T>, inner: IndexImplEnum, // for buffering data @@ -1099,7 +1111,7 @@ impl Iter for IndexIter<'_, T> { }; match binary { Range::Scope { min, max } => { - let table_name = self.params.table_name; + let table_name = &self.params.table_name; let index_meta = &self.params.index_meta; let bound_encode = |bound: Bound, @@ -1146,7 +1158,11 @@ impl Iter for IndexIter<'_, T> { Range::Eq(mut val) => { val = self.params.try_cast(val)?; - match self.inner.eq_to_res(&val, self.pk_indices, &self.params)? { + match self.inner.eq_to_res( + &val, + &self.remap_pk_indices, + &self.params, + )? { IndexResult::Tuple(tuple) => { if Self::offset_move(&mut self.offset) { continue; @@ -1168,9 +1184,11 @@ impl Iter for IndexIter<'_, T> { continue; } Self::limit_sub(&mut self.limit); - let tuple = - self.inner - .index_lookup(&bytes, self.pk_indices, &self.params)?; + let tuple = self.inner.index_lookup( + &bytes, + &self.remap_pk_indices, + &self.params, + )?; return Ok(Some(tuple)); } @@ -1190,6 +1208,22 @@ pub trait Iter { fn next_tuple(&mut self) -> Result, DatabaseError>; } +fn remap_pk_indices(projection: &[usize], pk_indices: &[usize]) -> Vec { + let mut result = Vec::with_capacity(pk_indices.len()); + let mut proj_idx = 0; + let mut pk_idx = 0; + + while pk_idx < pk_indices.len() && proj_idx < projection.len() { + if projection[proj_idx] == pk_indices[pk_idx] { + result.push(proj_idx); + pk_idx += 1; + } else { + proj_idx += 1; + } + } + result +} + #[cfg(test)] mod test { use crate::binder::test::build_t1_table; @@ -1208,43 +1242,44 @@ mod test { use crate::types::value::DataValue; use crate::types::{ColumnId, LogicalType}; use crate::utils::lru::SharedLruCache; - use std::collections::Bound; + use std::collections::{BTreeMap, Bound}; use std::hash::RandomState; use std::sync::Arc; use tempfile::TempDir; - fn full_columns() -> Vec<(usize, ColumnRef)> { - vec![ - ( - 0, - ColumnRef::from(ColumnCatalog::new( - "c1".to_string(), - false, - ColumnDesc::new(LogicalType::Integer, Some(0), false, None).unwrap(), - )), - ), - ( - 1, - ColumnRef::from(ColumnCatalog::new( - "c2".to_string(), - false, - ColumnDesc::new(LogicalType::Boolean, None, false, None).unwrap(), - )), - ), - ( - 2, - ColumnRef::from(ColumnCatalog::new( - "c3".to_string(), - false, - ColumnDesc::new(LogicalType::Integer, None, false, None).unwrap(), - )), - ), - ] + fn full_columns() -> BTreeMap { + let mut columns = BTreeMap::new(); + + columns.insert( + 0, + ColumnRef::from(ColumnCatalog::new( + "c1".to_string(), + false, + ColumnDesc::new(LogicalType::Integer, Some(0), false, None).unwrap(), + )), + ); + columns.insert( + 1, + ColumnRef::from(ColumnCatalog::new( + "c2".to_string(), + false, + ColumnDesc::new(LogicalType::Boolean, None, false, None).unwrap(), + )), + ); + columns.insert( + 2, + ColumnRef::from(ColumnCatalog::new( + "c3".to_string(), + false, + ColumnDesc::new(LogicalType::Integer, None, false, None).unwrap(), + )), + ); + columns } fn build_tuples() -> Vec { vec![ Tuple::new( - Some(Arc::new(vec![0])), + Some(DataValue::Int32(0)), vec![ DataValue::Int32(0), DataValue::Boolean(true), @@ -1252,7 +1287,7 @@ mod test { ], ), Tuple::new( - Some(Arc::new(vec![0])), + Some(DataValue::Int32(1)), vec![ DataValue::Int32(1), DataValue::Boolean(true), @@ -1260,7 +1295,7 @@ mod test { ], ), Tuple::new( - Some(Arc::new(vec![0])), + Some(DataValue::Int32(2)), vec![ DataValue::Int32(2), DataValue::Boolean(false), @@ -1401,6 +1436,7 @@ mod test { Arc::new("t1".to_string()), (None, None), full_columns(), + true, )?; assert_eq!(tuple_iter.next_tuple()?.unwrap(), tuples[0]); @@ -1426,6 +1462,7 @@ mod test { Arc::new("t1".to_string()), (None, None), full_columns(), + true, )?; assert_eq!(tuple_iter.next_tuple()?.unwrap(), tuples[0]); @@ -1550,6 +1587,7 @@ mod test { min: Bound::Unbounded, max: Bound::Unbounded, }], + true, ) } diff --git a/src/storage/rocksdb.rs b/src/storage/rocksdb.rs index 21abdbcf..d3ac6e6b 100644 --- a/src/storage/rocksdb.rs +++ b/src/storage/rocksdb.rs @@ -169,7 +169,7 @@ mod test { use crate::types::LogicalType; use crate::utils::lru::SharedLruCache; use itertools::Itertools; - use std::collections::Bound; + use std::collections::{BTreeMap, Bound}; use std::hash::RandomState; use std::sync::Arc; use tempfile::TempDir; @@ -214,7 +214,7 @@ mod test { transaction.append_tuple( &"test".to_string(), Tuple::new( - Some(Arc::new(vec![0])), + Some(DataValue::Int32(1)), vec![DataValue::Int32(1), DataValue::Boolean(true)], ), &[LogicalType::Integer, LogicalType::Boolean], @@ -223,22 +223,26 @@ mod test { transaction.append_tuple( &"test".to_string(), Tuple::new( - Some(Arc::new(vec![0])), + Some(DataValue::Int32(2)), vec![DataValue::Int32(2), DataValue::Boolean(true)], ), &[LogicalType::Integer, LogicalType::Boolean], false, )?; + let mut read_columns = BTreeMap::new(); + read_columns.insert(0, columns[0].clone()); + let mut iter = transaction.read( &table_cache, Arc::new("test".to_string()), (Some(1), Some(1)), - vec![(0, columns[0].clone())], + read_columns, + true, )?; let option_1 = iter.next_tuple()?; - assert_eq!(option_1.unwrap().pk_indices, Some(Arc::new(vec![0]))); + assert_eq!(option_1.unwrap().pk, Some(DataValue::Int32(2))); let option_2 = iter.next_tuple()?; assert_eq!(option_2, None); @@ -271,11 +275,10 @@ mod test { DataValue::Int32(3), DataValue::Int32(4), ]; - let pk_indices = Arc::new(vec![0]); let mut iter = IndexIter { offset: 0, limit: None, - pk_indices: &pk_indices, + remap_pk_indices: vec![0], params: IndexImplParams { tuple_schema_ref: table.schema_ref().clone(), projections: vec![0], @@ -290,6 +293,7 @@ mod test { }), table_name: &table.name, table_types: table.types(), + with_pk: true, tx: &transaction, }, ranges: vec![ @@ -305,8 +309,8 @@ mod test { }; let mut result = Vec::new(); - while let Some(mut tuple) = iter.next_tuple()? { - result.push(tuple.id().unwrap().clone()); + while let Some(tuple) = iter.next_tuple()? { + result.push(tuple.pk.unwrap()); } assert_eq!(result, tuple_ids); @@ -330,7 +334,7 @@ mod test { .table(kite_sql.state.table_cache(), Arc::new("t1".to_string()))? .unwrap() .clone(); - let columns = table.columns().cloned().enumerate().collect_vec(); + let columns = table.columns().cloned().enumerate().collect(); let mut iter = transaction .read_by_index( kite_sql.state.table_cache(), @@ -342,11 +346,12 @@ mod test { min: Bound::Excluded(DataValue::Int32(0)), max: Bound::Unbounded, }], + true, ) .unwrap(); while let Some(tuple) = iter.next_tuple()? { - assert_eq!(tuple.pk_indices, Some(Arc::new(vec![0]))); + assert_eq!(tuple.pk, Some(DataValue::Int32(1))); assert_eq!(tuple.values, vec![DataValue::Int32(1), DataValue::Int32(1)]) } diff --git a/src/storage/table_codec.rs b/src/storage/table_codec.rs index 7d2e5911..8df9e511 100644 --- a/src/storage/table_codec.rs +++ b/src/storage/table_codec.rs @@ -1,5 +1,5 @@ use crate::catalog::view::View; -use crate::catalog::{ColumnRef, ColumnRelation, PrimaryKeyIndices, TableMeta}; +use crate::catalog::{ColumnRef, ColumnRelation, TableMeta}; use crate::errors::DatabaseError; use crate::serdes::{ReferenceSerialization, ReferenceTables}; use crate::storage::{TableCache, Transaction}; @@ -260,7 +260,7 @@ impl TableCodec { tuple: &mut Tuple, types: &[LogicalType], ) -> Result<(BumpBytes, BumpBytes), DatabaseError> { - let tuple_id = tuple.id().ok_or(DatabaseError::PrimaryKeyNotFound)?; + let tuple_id = tuple.pk.as_ref().ok_or(DatabaseError::PrimaryKeyNotFound)?; let key = self.encode_tuple_key(table_name, tuple_id)?; Ok((key, tuple.serialize_to(types, &self.arena)?)) @@ -284,12 +284,13 @@ impl TableCodec { #[inline] pub fn decode_tuple( table_types: &[LogicalType], - pk_indices: &PrimaryKeyIndices, + pk_indices: &[usize], projections: &[usize], schema: &Schema, bytes: &[u8], + with_pk: bool, ) -> Result { - Tuple::deserialize_from(table_types, pk_indices, projections, schema, bytes) + Tuple::deserialize_from(table_types, pk_indices, projections, schema, bytes, with_pk) } /// Key: {TableName}{INDEX_META_TAG}{BOUND_MIN_TAG}{IndexID} @@ -568,7 +569,7 @@ mod tests { let table_catalog = build_table_codec(); let mut tuple = Tuple::new( - Some(Arc::new(vec![0])), + Some(DataValue::Int32(0)), vec![DataValue::Int32(0), DataValue::Decimal(Decimal::new(1, 0))], ); let (_, bytes) = table_codec.encode_tuple( @@ -579,9 +580,16 @@ mod tests { let schema = table_catalog.schema_ref(); let pk_indices = table_catalog.primary_keys_indices(); - tuple.clear_id(); + tuple.pk = None; assert_eq!( - TableCodec::decode_tuple(&table_catalog.types(), pk_indices, &[0, 1], schema, &bytes)?, + TableCodec::decode_tuple( + &table_catalog.types(), + pk_indices, + &[0, 1], + schema, + &bytes, + false + )?, tuple ); diff --git a/src/types/tuple.rs b/src/types/tuple.rs index 364247fb..c01ac309 100644 --- a/src/types/tuple.rs +++ b/src/types/tuple.rs @@ -1,4 +1,4 @@ -use crate::catalog::{ColumnRef, PrimaryKeyIndices}; +use crate::catalog::ColumnRef; use crate::db::ResultIter; use crate::errors::DatabaseError; use crate::storage::table_codec::BumpBytes; @@ -25,48 +25,26 @@ pub fn types(schema: &Schema) -> Vec { #[derive(Clone, Debug, PartialEq)] pub struct Tuple { - pub(crate) pk_indices: Option, + pub pk: Option, pub values: Vec, - id_buf: Option>, } impl Tuple { - pub fn new(pk_indices: Option, values: Vec) -> Self { - Tuple { - pk_indices, - values, - id_buf: None, - } - } - - pub fn id(&mut self) -> Option<&TupleId> { - self.id_buf - .get_or_insert_with(|| { - self.pk_indices.as_ref().map(|pk_indices| { - if pk_indices.len() == 1 { - self.values[0].clone() - } else { - let mut values = Vec::with_capacity(pk_indices.len()); - - for i in pk_indices.iter() { - values.push(self.values[*i].clone()); - } - DataValue::Tuple(values, false) - } - }) - }) - .as_ref() + pub fn new(pk: Option, values: Vec) -> Self { + Tuple { pk, values } } #[inline] pub fn deserialize_from( table_types: &[LogicalType], - pk_indices: &PrimaryKeyIndices, + pk_indices: &[usize], projections: &[usize], schema: &Schema, bytes: &[u8], + with_pk: bool, ) -> Result { debug_assert!(!schema.is_empty()); + debug_assert!(projections.is_sorted()); debug_assert_eq!(projections.len(), schema.len()); fn is_none(bits: u8, i: usize) -> bool { @@ -96,10 +74,10 @@ impl Tuple { projection_i += 1; } } + Ok(Tuple { - pk_indices: Some(pk_indices.clone()), + pk: with_pk.then(|| Tuple::primary_projection(pk_indices, &values)), values, - id_buf: None, }) } @@ -135,8 +113,15 @@ impl Tuple { Ok(bytes) } - pub(crate) fn clear_id(&mut self) { - self.id_buf = None; + pub fn primary_projection(pk_indices: &[usize], values: &[DataValue]) -> TupleId { + if pk_indices.len() > 1 { + DataValue::Tuple( + pk_indices.iter().map(|i| values[*i].clone()).collect_vec(), + false, + ) + } else { + values[pk_indices[0]].clone() + } } } @@ -290,7 +275,7 @@ mod tests { let tuples = vec![ Tuple::new( - Some(Arc::new(vec![0])), + Some(DataValue::Int32(0)), vec![ DataValue::Int32(0), DataValue::UInt32(1), @@ -327,7 +312,7 @@ mod tests { ], ), Tuple::new( - Some(Arc::new(vec![0])), + Some(DataValue::Int32(1)), vec![ DataValue::Int32(1), DataValue::Null, @@ -361,6 +346,7 @@ mod tests { &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15], &columns, &tuples[0].serialize_to(&types, &arena).unwrap(), + true, ) .unwrap(); @@ -373,6 +359,7 @@ mod tests { &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15], &columns, &tuples[1].serialize_to(&types, &arena).unwrap(), + true, ) .unwrap(); diff --git a/src/types/tuple_builder.rs b/src/types/tuple_builder.rs index da6acbec..0b196c25 100644 --- a/src/types/tuple_builder.rs +++ b/src/types/tuple_builder.rs @@ -44,6 +44,10 @@ impl<'a> TupleBuilder<'a> { return Err(DatabaseError::MisMatch("types", "values")); } - Ok(Tuple::new(self.pk_indices.cloned(), values)) + let pk = self + .pk_indices + .map(|indices| Tuple::primary_projection(indices, &values)); + + Ok(Tuple::new(pk, values)) } } diff --git a/tests/slt/basic_test.slt b/tests/slt/basic_test.slt index bdda04a4..b96cb70f 100644 --- a/tests/slt/basic_test.slt +++ b/tests/slt/basic_test.slt @@ -115,4 +115,17 @@ statement ok drop table if exists t statement ok -drop table if exists t \ No newline at end of file +drop table if exists t + +statement ok +create table t2 (c1 int primary key, c2 int, c3 int primary key) + +statement ok +insert into t2 values(0, 0, 0), (1, 1, 1), (2, 2, 2); + +query III +select c1 from t2; +---- +0 +1 +2 diff --git a/tests/slt/crdb/update.slt b/tests/slt/crdb/update.slt new file mode 100644 index 00000000..e87066a7 --- /dev/null +++ b/tests/slt/crdb/update.slt @@ -0,0 +1,324 @@ +statement ok +drop table if exists t1; + +statement ok +drop table if exists t2; + +statement ok +create table t1(id INT PRIMARY KEY, a INT); + +statement ok +create table t2(id INT PRIMARY KEY, b INT); + +statement ok +insert into t1 values(0, 1), (1, 2), (2, 3), (3, 8); + +statement ok +insert into t2 values(0, 2), (1, 3); + +statement ok +update t1 set a = a + 1 where a in (select b from t2); + +query I +select * from t1 order by a; +---- +0 1 +1 3 +2 4 +3 8 + +statement ok +truncate table t1; + +statement ok +insert into t1 values(0, 1), (1, 2), (2, 3), (3, 8); + +# TODO: Exists +# statement ok +# update t1 set a = a + 1 where exists (select * from t2 where t1.a = t2.b); + +# query I +# select * from t1 order by a; +# ---- +# 1 +# 3 +# 4 +# 8 + +statement ok +truncate table t1; + +statement ok +insert into t1 values(0, 1), (1, 2), (2, 3), (3, 8); + +# sqlparser-rs not support +# statement ok +# update t1 set a = a + 1 where a < any(select b from t2); + +# query I +# select * from t1 order by a; +# ---- +# 2 +# 3 +# 3 +# 8 + +statement ok +truncate table t1; + +statement ok +insert into t1 values(0, 1), (1, 2), (2, 3), (3, 8); + +# sqlparser-rs not support +# statement ok +# update t1 set a = a + 1 where a = all(select b from t2); + +# query I +# select * from t1 order by a; +# ---- +# 1 +# 2 +# 3 +# 8 + +statement ok +update t1 set a = a + 1 where a in (select b from t2 where a > b); + +query I +select a from t1 order by a; +---- +1 +2 +3 +8 + +# sqlparser-rs not support +# statement ok +# update t1 set a = a + 1 where a = any(select b from t2 where t1.a = t2.b); + +# query I +# select * from t1 order by a; +# ---- +# 1 +# 3 +# 4 +# 8 + +statement ok +truncate table t1; + +statement ok +insert into t1 values(0, 1), (1, 2), (2, 3), (3, 8); + +# TODO: Exists +# statement ok +# update t1 set a = a + 1 where exists(select b from t2 where b > 2); + +# query I +# select * from t1 order by a; +# ---- +# 2 +# 3 +# 4 +# 9 + +statement ok +truncate table t1; + +statement ok +insert into t1 values(0, 1), (1, 2), (2, 3), (3, 8); + +# TODO: Exists +# statement ok +# update t1 set a = a + 1 where not exists(select b from t2 where b > 2); + +# query I +# select * from t1 order by a; +# ---- +# 1 +# 2 +# 3 +# 8 + +statement ok +truncate table t1; + +statement ok +insert into t1 values(1), (2), (3), (8); + +# sqlparser-rs not support +# statement ok +# update t1 set a = a + 1 where a = any(select b from t2 where t1.a = t2.b) or a != any(select b from t2 where t1.a = t2.b); + +# query I +# select * from t1 order by a; +# ---- +# 1 +# 3 +# 4 +# 8 + +statement ok +truncate table t1; + +statement ok +insert into t1 values(0, 1), (1, 2), (2, 3), (3, 8); + +# sqlparser-rs not support +# statement ok +# update t1 set a = a + 1 where a = any(select b from t2 where t1.a = t2.b) or a > 1; + +# query I +# select * from t1 order by a; +# ---- +# 1 +# 3 +# 4 +# 9 + +statement ok +truncate table t1; + +statement ok +insert into t1 values(1), (2), (3), (8); + +# sqlparser-rs not support +# statement ok +# update t1 set a = a + 1 where a = any(select b from t2 where t1.a = t2.b) or a < any(select b from t2); + +# query I +# select * from t1 order by a; +# ---- +# 2 +# 3 +# 4 +# 8 + +statement ok +truncate table t1; + +statement ok +insert into t1 values(1), (2), (3), (8); + +# sqlparser-rs not support +# statement ok +# update t1 set a = a + 1 where exists(select b from t2 where a = b); + +# query I +# select * from t1 order by a; +# ---- +# 1 +# 3 +# 4 +# 8 + +statement ok +truncate table t1; + +statement ok +insert into t1 values(0, 1), (1, 2), (2, 3), (3, 8); + +statement ok +update t1 set a = a + 1 where 200 > (select avg(a) from t1); + +query I +select * from t1 order by a; +---- +0 2 +1 3 +2 4 +3 9 + +statement ok +drop table t1; + +statement ok +drop table t2; + +# https://github.com/datafuselabs/databend/issues/15791 + +statement ok +CREATE TABLE t1 (id INT PRIMARY KEY, name VARCHAR(255) ); + +statement ok +INSERT INTO t1 (id, name) VALUES (1, 'Row 1'), (2, 'Row 2'), (3, 'Row 3'), (4, 'Row 4'), (5, 'Row 5'), (6, 'Row 6'), (7, 'Row 7'), (8, 'Row 8'), (9, 'Row 9'), (10, 'Row 10'); + +statement ok +CREATE TABLE t2 (id INT PRIMARY KEY, name VARCHAR(255) ); + +statement ok +INSERT INTO t2 (id, name) VALUES (1, 'Row 11'), (2, 'Row 12'), (3, 'Row 13'), (4, 'Row 14'), (5, 'Row 15'), (6, 'Row 16'), (7, 'Row 17'), (8, 'Row 18'), (9, 'Row 19'), (10, 'Row 20'); + +statement ok +UPDATE t1 SET name = 'test' WHERE name IS not NULL AND id in (select id from t2); + +query IT +select * from t1 order by id; +---- +1 test +2 test +3 test +4 test +5 test +6 test +7 test +8 test +9 test +10 test + +statement ok +UPDATE t1 SET name = 'test' WHERE id in (select id from t2); + +query IT +select * from t1 order by id; +---- +1 test +2 test +3 test +4 test +5 test +6 test +7 test +8 test +9 test +10 test + +statement ok +UPDATE t1 SET name = 'test' WHERE name IS not NULL; + +query IT +select * from t1 order by id; +---- +1 test +2 test +3 test +4 test +5 test +6 test +7 test +8 test +9 test +10 test + +statement ok +UPDATE t1 SET name = 'test' WHERE name IS not NULL and id = 1; + +query IT +select * from t1 order by id; +---- +1 test +2 test +3 test +4 test +5 test +6 test +7 test +8 test +9 test +10 test + +statement ok +drop table t1; + +statement ok +drop table t2; From 3d040543dd7bfcaee0cba9453f0844025bc7d4de Mon Sep 17 00:00:00 2001 From: Kould Date: Wed, 16 Jul 2025 01:28:10 +0800 Subject: [PATCH 2/2] chore: codefmt --- src/execution/dql/show_view.rs | 4 ++-- src/storage/mod.rs | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/execution/dql/show_view.rs b/src/execution/dql/show_view.rs index b16de4ce..9f11dba0 100644 --- a/src/execution/dql/show_view.rs +++ b/src/execution/dql/show_view.rs @@ -11,13 +11,13 @@ pub struct ShowViews; impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for ShowViews { fn execute( self, - (TableCache, _, _): (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), + (table_cache, _, _): (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), transaction: *mut T, ) -> Executor<'a> { Box::new( #[coroutine] move || { - let metas = throw!(unsafe { &mut (*transaction) }.views(TableCache)); + let metas = throw!(unsafe { &mut (*transaction) }.views(table_cache)); for View { name, .. } in metas { let values = vec![DataValue::Utf8 { diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 5b48466a..c2d0d532 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -15,7 +15,6 @@ use crate::types::{ColumnId, LogicalType}; use crate::utils::lru::SharedLruCache; use itertools::Itertools; use std::collections::{BTreeMap, Bound}; -use sqlparser::keywords::NULL; use std::io::Cursor; use std::mem; use std::ops::SubAssign;