Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions beacon-functions/src/util/mask_if_not_null.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use std::sync::Arc;

Copilot AI Sep 30, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The std::sync::Arc import is unused in this file. Consider removing it to keep imports clean.

Suggested change
use std::sync::Arc;

Copilot uses AI. Check for mistakes.

use arrow::datatypes::{DataType, Field, FieldRef};

Copilot AI Sep 30, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Field and FieldRef imports from arrow::datatypes are unused in this file. Consider removing them to keep imports clean.

Suggested change
use arrow::datatypes::{DataType, Field, FieldRef};
use arrow::datatypes::DataType;

Copilot uses AI. Check for mistakes.
use datafusion::{
common::{exec_err, internal_err, ExprSchema},
logical_expr::{
conditional_expressions::CaseBuilder,
simplify::{ExprSimplifyResult, SimplifyInfo},
ColumnarValue, ExprSchemable, ReturnFieldArgs, ScalarUDF, ScalarUDFImpl, Signature,

Copilot AI Sep 30, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ExprSchemable and ReturnFieldArgs imports are unused in this file. Consider removing them to keep imports clean.

Suggested change
ColumnarValue, ExprSchemable, ReturnFieldArgs, ScalarUDF, ScalarUDFImpl, Signature,
ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature,

Copilot uses AI. Check for mistakes.
Volatility,
},
physical_plan::expressions::CaseExpr,
prelude::{is_null, Expr},
};

pub fn mask_if_not_null() -> ScalarUDF {
ScalarUDF::new_from_impl(MaskIfNotNullFunc::new())
}

#[derive(Debug, Clone)]
pub struct MaskIfNotNullFunc {
signature: Signature,
}

impl MaskIfNotNullFunc {
pub fn new() -> Self {
Self {
signature: Signature::any(2, Volatility::Immutable),
}
}
}

impl ScalarUDFImpl for MaskIfNotNullFunc {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn name(&self) -> &str {
"mask_if_not_null"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn is_nullable(&self, args: &[Expr], schema: &dyn ExprSchema) -> bool {
args.iter().any(|e| e.nullable(schema).ok().unwrap_or(true))
}

fn return_type(&self, arg_types: &[DataType]) -> datafusion::error::Result<DataType> {
if arg_types.len() != 2 {
return exec_err!(
"mask_if_not_null requires exactly two arguments, got {}",
arg_types.len()
);
}
Ok(arg_types[1].clone())
}

fn simplify(
&self,
mut args: Vec<Expr>,
info: &dyn SimplifyInfo,
) -> datafusion::error::Result<ExprSimplifyResult> {
if args.len() != 2 {
return exec_err!(
"mask_if_not_null requires exactly two arguments, got {}",
args.len()
);
}
let left = args.remove(0);
let right = args.remove(0);

// If the first argument is known to be non-null, we can simplify to the second argument
if let Ok(false) = info.nullable(&left) {
return Ok(ExprSimplifyResult::Simplified(right));
}

let new_expr = CaseBuilder::new(
None,
vec![left.is_not_null()],
vec![right],
Some(Box::new(Expr::Literal(
datafusion::scalar::ScalarValue::Null,
None,
))),
)
.end()?;

Ok(ExprSimplifyResult::Simplified(new_expr))
}

fn invoke_with_args(
&self,
args: datafusion::logical_expr::ScalarFunctionArgs,
) -> datafusion::error::Result<ColumnarValue> {
internal_err!("invoke_with_args should not be called for mask_if_not_null")
}
}
99 changes: 99 additions & 0 deletions beacon-functions/src/util/mask_if_null.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use std::sync::Arc;

Copilot AI Sep 30, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The std::sync::Arc import is unused in this file. Consider removing it to keep imports clean.

Suggested change
use std::sync::Arc;

Copilot uses AI. Check for mistakes.

use arrow::datatypes::{DataType, Field, FieldRef};

Copilot AI Sep 30, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Field and FieldRef imports from arrow::datatypes are unused in this file. Consider removing them to keep imports clean.

Suggested change
use arrow::datatypes::{DataType, Field, FieldRef};
use arrow::datatypes::DataType;

Copilot uses AI. Check for mistakes.
use datafusion::{
common::{exec_err, internal_err, ExprSchema},
logical_expr::{
conditional_expressions::CaseBuilder,
simplify::{ExprSimplifyResult, SimplifyInfo},
ColumnarValue, ExprSchemable, ReturnFieldArgs, ScalarUDF, ScalarUDFImpl, Signature,

Copilot AI Sep 30, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ExprSchemable and ReturnFieldArgs imports are unused in this file. Consider removing them to keep imports clean.

Suggested change
ColumnarValue, ExprSchemable, ReturnFieldArgs, ScalarUDF, ScalarUDFImpl, Signature,
ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature,

Copilot uses AI. Check for mistakes.
Volatility,
},
physical_plan::expressions::CaseExpr,
prelude::{is_null, Expr},
};

pub fn mask_if_null() -> ScalarUDF {
ScalarUDF::new_from_impl(MaskIfNullFunc::new())
}

#[derive(Debug, Clone)]
pub struct MaskIfNullFunc {
signature: Signature,
}

impl MaskIfNullFunc {
pub fn new() -> Self {
Self {
signature: Signature::any(2, Volatility::Immutable),
}
}
}

impl ScalarUDFImpl for MaskIfNullFunc {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn name(&self) -> &str {
"mask_if_null"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn is_nullable(&self, args: &[Expr], schema: &dyn ExprSchema) -> bool {
args.iter().any(|e| e.nullable(schema).ok().unwrap_or(true))
}

fn return_type(&self, arg_types: &[DataType]) -> datafusion::error::Result<DataType> {
if arg_types.len() != 2 {
return exec_err!(
"mask_if_null requires exactly two arguments, got {}",
arg_types.len()
);
}
Ok(arg_types[1].clone())
}

fn simplify(
&self,
mut args: Vec<Expr>,
info: &dyn SimplifyInfo,
) -> datafusion::error::Result<ExprSimplifyResult> {
if args.len() != 2 {
return exec_err!(
"mask_if_null requires exactly two arguments, got {}",
args.len()
);
}
let left = args.remove(0);
let right = args.remove(0);

// If the first argument is known to be non-null, we can simplify to the second argument
if let Ok(false) = info.nullable(&left) {
return Ok(ExprSimplifyResult::Simplified(right));
Comment on lines +74 to +76

Copilot AI Sep 30, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The simplification logic is incorrect for mask_if_null. When the first argument is known to be non-null, the function should return null, not the second argument. The correct simplification should be Ok(ExprSimplifyResult::Simplified(Expr::Literal(datafusion::scalar::ScalarValue::Null))).

Suggested change
// If the first argument is known to be non-null, we can simplify to the second argument
if let Ok(false) = info.nullable(&left) {
return Ok(ExprSimplifyResult::Simplified(right));
// If the first argument is known to be non-null, we can simplify to null
if let Ok(false) = info.nullable(&left) {
return Ok(ExprSimplifyResult::Simplified(Expr::Literal(
datafusion::scalar::ScalarValue::Null,
None,
)));

Copilot uses AI. Check for mistakes.
}

let new_expr = CaseBuilder::new(
None,
vec![left.is_null()],
vec![right],
Some(Box::new(Expr::Literal(
datafusion::scalar::ScalarValue::Null,
None,
))),
)
.end()?;

Ok(ExprSimplifyResult::Simplified(new_expr))
}

fn invoke_with_args(
&self,
args: datafusion::logical_expr::ScalarFunctionArgs,
) -> datafusion::error::Result<ColumnarValue> {
internal_err!("invoke_with_args should not be called for mask_if_null")
}
}
4 changes: 4 additions & 0 deletions beacon-functions/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ use datafusion::logical_expr::ScalarUDF;

pub mod cast_int8_as_char;
pub mod coalesce_label;
pub mod mask_if_not_null;
pub mod mask_if_null;
pub mod try_arrow_cast;

pub fn util_udfs() -> Vec<ScalarUDF> {
vec![
cast_int8_as_char::cast_int8_as_char(),
try_arrow_cast::try_arrow_cast(),
coalesce_label::coalesce_label(),
mask_if_null::mask_if_null(),
mask_if_not_null::mask_if_not_null(),
]
}