Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ thiserror = "1.0"
serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.145"
rand = "0.8"
nalgebra = "0.32"

[dev-dependencies]
approx = "0.5"
Expand Down
18 changes: 15 additions & 3 deletions python/eo_processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,25 @@ def random_forest_predict(model_json, features):


def bfast_monitor(
stack, dates, history_start_date, monitor_start_date, level
stack,
dates,
history_start_date,
monitor_start_date,
order=1,
h=0.25,
alpha=0.05,
):
"""
Scaffold for the BFAST Monitor workflow.
BFAST Monitor workflow for change detection.
"""
return _bfast_monitor(
stack, dates, history_start_date, monitor_start_date, level
stack,
dates,
history_start_date,
monitor_start_date,
order,
h,
alpha,
)


Expand Down
11 changes: 11 additions & 0 deletions python/eo_processor/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,15 @@ def binary_closing(
input: NDArray[np.uint8], kernel_size: int = ...
) -> NDArray[np.uint8]: ...

# Workflows
def bfast_monitor(
stack: NumericArray,
dates: Sequence[int],
history_start_date: int,
monitor_start_date: int,
order: int = ...,
h: float = ...,
alpha: float = ...,
) -> NDArray[np.float64]: ...

# Raises ValueError if p < 1.0
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ pub enum CoreError {
InvalidArgument(String),
#[error("Computation error: {0}")]
ComputationError(String),
#[error("Not enough data: {0}")]
NotEnoughData(String),
}

impl From<CoreError> for PyErr {
fn from(err: CoreError) -> PyErr {
match err {
CoreError::InvalidArgument(msg) => PyValueError::new_err(msg),
CoreError::ComputationError(msg) => PyValueError::new_err(msg),
CoreError::NotEnoughData(msg) => PyValueError::new_err(msg),
}
}
}
Expand Down
229 changes: 178 additions & 51 deletions src/workflows.rs
Original file line number Diff line number Diff line change
@@ -1,70 +1,172 @@
use crate::CoreError;
use nalgebra::{DMatrix, DVector};
use ndarray::{Axis, IxDyn, Zip};
use numpy::{IntoPyArray, PyArrayDyn, PyReadonlyArrayDyn};
use pyo3::prelude::*;
use rayon::prelude::*;

const TWO_PI: f64 = 2.0 * std::f64::consts::PI;

// --- 1. BFAST Monitor Workflow ---

// Placeholder struct for model parameters
/// Represents the fitted harmonic model.
struct HarmonicModel {
mean: f64,
coefficients: DVector<f64>,
sigma: f64,
}

// Placeholder for fitting a harmonic model to the stable history period.
// In a real implementation, this would involve solving for harmonic coefficients.
fn fit_harmonic_model(y: &[f64]) -> HarmonicModel {
if y.is_empty() {
return HarmonicModel { mean: 0.0 };
/// Constructs the design matrix for a harmonic model.
///
/// # Arguments
///
/// * `dates` - A slice of fractional years.
/// * `order` - The order of the harmonic model (e.g., 1 for one sine/cosine pair).
///
/// # Returns
///
/// A 2D array representing the design matrix `X`.
fn build_design_matrix(dates: &[f64], order: usize) -> DMatrix<f64> {
let n = dates.len();
let num_coeffs = 2 * order + 2; // intercept, trend, and sin/cos pairs
let mut x = DMatrix::<f64>::zeros(n, num_coeffs);

for i in 0..n {
let t = dates[i];
x[(i, 0)] = 1.0; // Intercept
x[(i, 1)] = t; // Trend
for j in 1..=order {
let freq = TWO_PI * j as f64 * t;
x[(i, 2 * j)] = freq.cos();
x[(i, 2 * j + 1)] = freq.sin();
}
}
let sum: f64 = y.iter().sum();
HarmonicModel {
mean: sum / y.len() as f64,
x
}

/// Fits a harmonic model to the stable history period using Ordinary Least Squares (OLS).
fn fit_harmonic_model(y: &[f64], dates: &[f64], order: usize) -> Result<HarmonicModel, CoreError> {
if y.len() < (2 * order + 2) {
return Err(CoreError::NotEnoughData(
"Not enough historical data to fit model".to_string(),
));
}

let y_vec = DVector::from_vec(y.to_vec());
let x = build_design_matrix(dates, order);

let decomp = x.clone().svd(true, true);
let coeffs = decomp.solve(&y_vec, 1e-10).map_err(|e| {
CoreError::ComputationError(format!("Failed to solve OLS with nalgebra: {}", e))
})?;

let y_pred = &x * &coeffs;
let residuals = &y_vec - &y_pred;
let sum_sq_err = residuals.iter().map(|&r| r * r).sum::<f64>();
let df = (y.len() - (2 * order + 2)) as f64;
if df <= 0.0 {
return Err(CoreError::ComputationError(
"Degrees of freedom is non-positive".to_string(),
));
}
let sigma = (sum_sq_err / df).sqrt();

Ok(HarmonicModel {
coefficients: coeffs,
sigma,
})
}

// Placeholder for predicting values based on the fitted model.
fn predict_harmonic_model(_model: &HarmonicModel, dates: &[i64]) -> Vec<f64> {
// For now, just return a constant prediction (the historical mean)
vec![_model.mean; dates.len()]
/// Predicts values for the monitoring period based on the fitted model.
fn predict_harmonic_model(model: &HarmonicModel, dates: &[f64], order: usize) -> DVector<f64> {
let x_mon = build_design_matrix(dates, order);
&x_mon * &model.coefficients
}

// Placeholder for the MOSUM process to detect a break.
// Returns (break_date, magnitude)
/// Detects a break using the OLS-MOSUM process.
fn detect_mosum_break(
y_monitor: &[f64],
y_pred: &[f64],
monitor_dates: &[i64],
level: f64,
y_pred: &DVector<f64>,
monitor_dates: &[f64],
hist_len: usize,
sigma: f64,
h: f64,
alpha: f64,
) -> (f64, f64) {
if y_monitor.is_empty() || y_monitor.len() != y_pred.len() {
if y_monitor.is_empty() {
return (0.0, 0.0);
}

let n_hist = hist_len as f64;
let window_size = (h * n_hist).floor() as usize;

let residuals: Vec<f64> = y_monitor
.iter()
.zip(y_pred.iter())
.map(|(obs, pred)| obs - pred)
.collect();

let mean_residual: f64 = residuals.iter().sum::<f64>() / residuals.len() as f64;
let mut cusum = vec![0.0; residuals.len() + 1];
for i in 0..residuals.len() {
cusum[i + 1] = cusum[i] + residuals[i];
}

// We can only start calculating MOSUM after `window_size` observations
if residuals.len() < window_size {
return (0.0, 0.0);
}

let mosum_process: Vec<f64> = (window_size..residuals.len())
.map(|i| cusum[i] - cusum[i - window_size])
.collect();

let standardizer = sigma * n_hist.sqrt();
let standardized_mosum: Vec<f64> = mosum_process
.iter()
.map(|&m| (m / standardizer).abs())
.collect();

// Simplified critical boundary based on a lookup for alpha=0.05 and h=0.25
// A full implementation would use a precomputed table or a more complex calculation.
let critical_value = if alpha <= 0.05 { 1.36 } else { 1.63 }; // Approximations

// Simplified break detection: if the average residual in the monitoring period
// exceeds the significance level, flag the start of the period as a break.
if mean_residual.abs() > level {
(monitor_dates[0] as f64, mean_residual.abs())
} else {
(0.0, 0.0) // No break detected
for (i, &mosum_val) in standardized_mosum.iter().enumerate() {
// The index k starts from 1 for the monitoring period
let k = (i + 1) as f64;
let boundary = critical_value * (1.0 + k / n_hist).sqrt();

if mosum_val > boundary {
let break_idx = i + window_size;
let magnitude = (y_monitor[break_idx] - y_pred[break_idx]).abs();
return (monitor_dates[break_idx], magnitude);
}
}

(0.0, 0.0) // No break detected
}

/// Converts integer dates (YYYYMMDD) to fractional years.
fn dates_to_frac_years(dates: &[i64]) -> Vec<f64> {
dates
.iter()
.map(|&date| {
let year = (date / 10000) as f64;
let month = ((date % 10000) / 100) as f64;
let day = (date % 100) as f64;
// Simple approximation
year + (month - 1.0) / 12.0 + (day - 1.0) / 365.25
})
.collect()
}

// This is the main logic function that runs for each pixel.
fn run_bfast_monitor_per_pixel(
pixel_ts: &[f64],
dates: &[i64],
history_start: i64,
monitor_start: i64,
level: f64,
dates: &[f64],
history_start: f64,
monitor_start: f64,
order: usize,
h: f64, // h parameter for MOSUM window size
alpha: f64, // Significance level
) -> (f64, f64) {
// 1. Find the indices for the history and monitoring periods
let history_indices: Vec<usize> = dates
Expand All @@ -82,32 +184,48 @@ fn run_bfast_monitor_per_pixel(
.collect();

if history_indices.is_empty() || monitor_indices.is_empty() {
return (0.0, 0.0); // Not enough data
return (0.0, 0.0);
}

// 2. Extract the data for these periods
let history_ts: Vec<f64> = history_indices.iter().map(|&i| pixel_ts[i]).collect();
let history_dates: Vec<f64> = history_indices.iter().map(|&i| dates[i]).collect();
let monitor_ts: Vec<f64> = monitor_indices.iter().map(|&i| pixel_ts[i]).collect();
let monitor_dates: Vec<i64> = monitor_indices.iter().map(|&i| dates[i]).collect();
let monitor_dates: Vec<f64> = monitor_indices.iter().map(|&i| dates[i]).collect();

// 3. Fit model on the historical period
let model = fit_harmonic_model(&history_ts);
let model_result = fit_harmonic_model(&history_ts, &history_dates, order);
let model = match model_result {
Ok(m) => m,
Err(_) => return (0.0, 0.0), // Return no-break if model fails
};

// 4. Predict for the monitoring period
let predicted_ts = predict_harmonic_model(&model, &monitor_dates);
let predicted_ts = predict_harmonic_model(&model, &monitor_dates, order);

// 5. Detect break using MOSUM process on residuals
detect_mosum_break(&monitor_ts, &predicted_ts, &monitor_dates, level)
detect_mosum_break(
&monitor_ts,
&predicted_ts,
&monitor_dates,
history_ts.len(),
model.sigma,
h,
alpha,
)
}

#[pyfunction]
#[allow(clippy::too_many_arguments)]
pub fn bfast_monitor(
py: Python,
stack: PyReadonlyArrayDyn<f64>,
dates: Vec<i64>,
history_start_date: i64,
monitor_start_date: i64,
level: f64, // Significance level
order: usize,
h: f64,
alpha: f64,
) -> PyResult<Py<PyArrayDyn<f64>>> {
let stack_arr = stack.as_array();

Expand All @@ -132,6 +250,11 @@ pub fn bfast_monitor(
.into());
}

// Convert integer dates to fractional years for modeling
let frac_dates = dates_to_frac_years(&dates);
let history_start_frac = dates_to_frac_years(&[history_start_date])[0];
let monitor_start_frac = dates_to_frac_years(&[monitor_start_date])[0];

// Output channels: [break_date, magnitude]
let mut out_array = ndarray::ArrayD::<f64>::zeros(IxDyn(&[2, height, width]));

Expand All @@ -158,10 +281,12 @@ pub fn bfast_monitor(
.par_for_each(|break_date, magnitude, pixel_ts| {
let (bk_date, mag) = run_bfast_monitor_per_pixel(
pixel_ts.as_slice().unwrap(),
&dates,
history_start_date,
monitor_start_date,
level,
&frac_dates,
history_start_frac,
monitor_start_frac,
order,
h,
alpha,
);
*break_date = bk_date;
*magnitude = mag;
Expand Down Expand Up @@ -194,16 +319,18 @@ pub fn complex_classification(

let mut out = ndarray::ArrayD::<u8>::zeros(blue_arr.raw_dim());

out.indexed_iter_mut().par_bridge().for_each(|(idx, res)| {
let b = blue_arr[&idx];
let g = green_arr[&idx];
let r = red_arr[&idx];
let n = nir_arr[&idx];
let s1 = swir1_arr[&idx];
let s2 = swir2_arr[&idx];
let t = temp_arr[&idx];
*res = classify_pixel(b, g, r, n, s1, s2, t);
});
out.indexed_iter_mut()
.par_bridge()
.for_each(|(idx, res)| {
let b = blue_arr[&idx];
let g = green_arr[&idx];
let r = red_arr[&idx];
let n = nir_arr[&idx];
let s1 = swir1_arr[&idx];
let s2 = swir2_arr[&idx];
let t = temp_arr[&idx];
*res = classify_pixel(b, g, r, n, s1, s2, t);
});

Ok(out.into_pyarray(py).to_owned())
}
Expand Down
Loading