From bcb9c172777294662b2a99cf52a4bb57de8025d9 Mon Sep 17 00:00:00 2001 From: Brendan Collins Date: Sun, 15 Mar 2026 17:46:57 -0700 Subject: [PATCH 1/2] Add MFD flow length computation (#1009) New flow_length_mfd() computes proportion-weighted flow path lengths from MFD fraction grids. Downstream mode gives the expected distance to outlet weighted by MFD fractions; upstream mode gives the longest path from any divide to each cell. All four backends: NumPy, CuPy, Dask+NumPy, Dask+CuPy. 29 tests covering correctness, edge cases, and cross-backend consistency. --- xrspatial/__init__.py | 1 + xrspatial/flow_length_mfd.py | 982 ++++++++++++++++++++++++ xrspatial/tests/test_flow_length_mfd.py | 419 ++++++++++ 3 files changed, 1402 insertions(+) create mode 100644 xrspatial/flow_length_mfd.py create mode 100644 xrspatial/tests/test_flow_length_mfd.py diff --git a/xrspatial/__init__.py b/xrspatial/__init__.py index a0fb4a28..f9f60c6b 100644 --- a/xrspatial/__init__.py +++ b/xrspatial/__init__.py @@ -45,6 +45,7 @@ from xrspatial.flow_direction_dinf import flow_direction_dinf # noqa from xrspatial.flow_direction_mfd import flow_direction_mfd # noqa from xrspatial.flow_length import flow_length # noqa +from xrspatial.flow_length_mfd import flow_length_mfd # noqa from xrspatial.flow_path import flow_path # noqa from xrspatial.focal import mean # noqa from xrspatial.glcm import glcm_texture # noqa diff --git a/xrspatial/flow_length_mfd.py b/xrspatial/flow_length_mfd.py new file mode 100644 index 00000000..f0d77797 --- /dev/null +++ b/xrspatial/flow_length_mfd.py @@ -0,0 +1,982 @@ +"""MFD flow length: proportion-weighted distance from each cell to outlet +(downstream) or longest path from divide to cell (upstream). + +Algorithm +--------- +CPU : Kahn's BFS topological sort on the MFD DAG -- O(N). + Downstream: reverse pass from outlets, accumulating weighted step + distance. Upstream: forward pass from divides, propagating max + distance. +GPU : CuPy-via-CPU (same strategy as flow_length.py). +Dask: iterative tile sweep with BoundaryStore boundary propagation. +""" + +from __future__ import annotations + +import math + +import numpy as np +import xarray as xr + +try: + import dask.array as da +except ImportError: + da = None + +from xrspatial._boundary_store import BoundaryStore +from xrspatial.utils import ( + _validate_raster, + get_dataarray_resolution, + has_cuda_and_cupy, + is_cupy_array, + is_dask_cupy, + ngjit, +) +from xrspatial.dataset_support import supports_dataset + +# Neighbor offsets: E, SE, S, SW, W, NW, N, NE +_DY = np.array([0, 1, 1, 1, 0, -1, -1, -1], dtype=np.int64) +_DX = np.array([1, 1, 0, -1, -1, -1, 0, 1], dtype=np.int64) + + +# ===================================================================== +# Step-distance helper +# ===================================================================== + +@ngjit +def _step_distances(cellsize_x, cellsize_y): + """Return array of 8 step distances for each neighbor direction.""" + diag = math.sqrt(cellsize_x * cellsize_x + cellsize_y * cellsize_y) + dists = np.empty(8, dtype=np.float64) + # E, SE, S, SW, W, NW, N, NE + dists[0] = cellsize_x + dists[1] = diag + dists[2] = cellsize_y + dists[3] = diag + dists[4] = cellsize_x + dists[5] = diag + dists[6] = cellsize_y + dists[7] = diag + return dists + + +# ===================================================================== +# CPU kernels +# ===================================================================== + +@ngjit +def _flow_length_mfd_downstream_cpu(fractions, H, W, cellsize_x, cellsize_y): + """Downstream MFD flow length: proportion-weighted average distance + from each cell to its outlet(s). + + Two-pass O(N): + 1. Kahn's BFS builds topological order (divides first). + 2. Reverse pass (outlets first): flow_len[cell] = + sum_k(frac[k] * (step_dist[k] + flow_len[neighbor_k])) + """ + dy = np.array([0, 1, 1, 1, 0, -1, -1, -1], dtype=np.int64) + dx = np.array([1, 1, 0, -1, -1, -1, 0, 1], dtype=np.int64) + dists = _step_distances(cellsize_x, cellsize_y) + + in_degree = np.zeros((H, W), dtype=np.int32) + valid = np.zeros((H, W), dtype=np.int8) + flow_len = np.empty((H, W), dtype=np.float64) + + # Init + for r in range(H): + for c in range(W): + v = fractions[0, r, c] + if v == v: # not NaN + valid[r, c] = 1 + flow_len[r, c] = 0.0 + else: + flow_len[r, c] = np.nan + + # In-degrees + for r in range(H): + for c in range(W): + if valid[r, c] == 0: + continue + for k in range(8): + if fractions[k, r, c] > 0.0: + nr = r + dy[k] + nc = c + dx[k] + if 0 <= nr < H and 0 <= nc < W and valid[nr, nc] == 1: + in_degree[nr, nc] += 1 + + # BFS topological order (divides first) + order_r = np.empty(H * W, dtype=np.int64) + order_c = np.empty(H * W, dtype=np.int64) + head = np.int64(0) + tail = np.int64(0) + + for r in range(H): + for c in range(W): + if valid[r, c] == 1 and in_degree[r, c] == 0: + order_r[tail] = r + order_c[tail] = c + tail += 1 + + while head < tail: + r = order_r[head] + c = order_c[head] + head += 1 + for k in range(8): + if fractions[k, r, c] > 0.0: + nr = r + dy[k] + nc = c + dx[k] + if 0 <= nr < H and 0 <= nc < W and valid[nr, nc] == 1: + in_degree[nr, nc] -= 1 + if in_degree[nr, nc] == 0: + order_r[tail] = nr + order_c[tail] = nc + tail += 1 + + # Reverse pass: outlets -> divides + for i in range(tail - 1, -1, -1): + r = order_r[i] + c = order_c[i] + total = 0.0 + for k in range(8): + frac = fractions[k, r, c] + if frac > 0.0: + nr = r + dy[k] + nc = c + dx[k] + if 0 <= nr < H and 0 <= nc < W and valid[nr, nc] == 1: + total += frac * (dists[k] + flow_len[nr, nc]) + else: + # Flows off grid: distance = 0 at destination + total += frac * dists[k] + flow_len[r, c] = total + + return flow_len + + +@ngjit +def _flow_length_mfd_upstream_cpu(fractions, H, W, cellsize_x, cellsize_y): + """Upstream MFD flow length: longest path from any divide to cell. + + Kahn's BFS from divides downstream, propagating max distance. + """ + dy = np.array([0, 1, 1, 1, 0, -1, -1, -1], dtype=np.int64) + dx = np.array([1, 1, 0, -1, -1, -1, 0, 1], dtype=np.int64) + dists = _step_distances(cellsize_x, cellsize_y) + + in_degree = np.zeros((H, W), dtype=np.int32) + valid = np.zeros((H, W), dtype=np.int8) + flow_len = np.empty((H, W), dtype=np.float64) + + for r in range(H): + for c in range(W): + v = fractions[0, r, c] + if v == v: + valid[r, c] = 1 + flow_len[r, c] = 0.0 + else: + flow_len[r, c] = np.nan + + for r in range(H): + for c in range(W): + if valid[r, c] == 0: + continue + for k in range(8): + if fractions[k, r, c] > 0.0: + nr = r + dy[k] + nc = c + dx[k] + if 0 <= nr < H and 0 <= nc < W and valid[nr, nc] == 1: + in_degree[nr, nc] += 1 + + # BFS queue + queue_r = np.empty(H * W, dtype=np.int64) + queue_c = np.empty(H * W, dtype=np.int64) + head = np.int64(0) + tail = np.int64(0) + + for r in range(H): + for c in range(W): + if valid[r, c] == 1 and in_degree[r, c] == 0: + queue_r[tail] = r + queue_c[tail] = c + tail += 1 + + while head < tail: + r = queue_r[head] + c = queue_c[head] + head += 1 + + for k in range(8): + frac = fractions[k, r, c] + if frac > 0.0: + nr = r + dy[k] + nc = c + dx[k] + if 0 <= nr < H and 0 <= nc < W and valid[nr, nc] == 1: + candidate = flow_len[r, c] + dists[k] + if candidate > flow_len[nr, nc]: + flow_len[nr, nc] = candidate + in_degree[nr, nc] -= 1 + if in_degree[nr, nc] == 0: + queue_r[tail] = nr + queue_c[tail] = nc + tail += 1 + + return flow_len + + +# ===================================================================== +# CuPy backend (via CPU) +# ===================================================================== + +def _flow_length_mfd_cupy(fractions_data, direction, cellsize_x, cellsize_y): + import cupy as cp + frac_np = fractions_data.get().astype(np.float64) + _, H, W = frac_np.shape + if direction == 'downstream': + out = _flow_length_mfd_downstream_cpu(frac_np, H, W, + cellsize_x, cellsize_y) + else: + out = _flow_length_mfd_upstream_cpu(frac_np, H, W, + cellsize_x, cellsize_y) + return cp.asarray(out) + + +# ===================================================================== +# Dask tile kernels +# ===================================================================== + +@ngjit +def _flow_length_mfd_downstream_tile(fractions, h, w, cellsize_x, cellsize_y, + seed_top, seed_bottom, + seed_left, seed_right, + seed_tl, seed_tr, seed_bl, seed_br): + """Downstream MFD flow length for one tile with exit seeds. + + Boundary cells that flow out of the tile use seed values as the + known downstream flow_length at their destination. + """ + dy = np.array([0, 1, 1, 1, 0, -1, -1, -1], dtype=np.int64) + dx = np.array([1, 1, 0, -1, -1, -1, 0, 1], dtype=np.int64) + dists = _step_distances(cellsize_x, cellsize_y) + + in_degree = np.zeros((h, w), dtype=np.int32) + valid = np.zeros((h, w), dtype=np.int8) + flow_len = np.empty((h, w), dtype=np.float64) + + # Init + for r in range(h): + for c in range(w): + v = fractions[0, r, c] + if v == v: + valid[r, c] = 1 + flow_len[r, c] = 0.0 + else: + flow_len[r, c] = np.nan + + # In-degrees (only for edges within tile) + for r in range(h): + for c in range(w): + if valid[r, c] == 0: + continue + for k in range(8): + if fractions[k, r, c] > 0.0: + nr = r + dy[k] + nc = c + dx[k] + if 0 <= nr < h and 0 <= nc < w and valid[nr, nc] == 1: + in_degree[nr, nc] += 1 + + # BFS topological order + order_r = np.empty(h * w, dtype=np.int64) + order_c = np.empty(h * w, dtype=np.int64) + head = np.int64(0) + tail = np.int64(0) + + for r in range(h): + for c in range(w): + if valid[r, c] == 1 and in_degree[r, c] == 0: + order_r[tail] = r + order_c[tail] = c + tail += 1 + + while head < tail: + r = order_r[head] + c = order_c[head] + head += 1 + for k in range(8): + if fractions[k, r, c] > 0.0: + nr = r + dy[k] + nc = c + dx[k] + if 0 <= nr < h and 0 <= nc < w and valid[nr, nc] == 1: + in_degree[nr, nc] -= 1 + if in_degree[nr, nc] == 0: + order_r[tail] = nr + order_c[tail] = nc + tail += 1 + + # Reverse pass + for i in range(tail - 1, -1, -1): + r = order_r[i] + c = order_c[i] + total = 0.0 + for k in range(8): + frac = fractions[k, r, c] + if frac > 0.0: + nr = r + dy[k] + nc = c + dx[k] + if 0 <= nr < h and 0 <= nc < w and valid[nr, nc] == 1: + total += frac * (dists[k] + flow_len[nr, nc]) + else: + # Flows out of tile -- look up exit seed + exit_val = _get_exit_seed( + r, c, nr, nc, h, w, + seed_top, seed_bottom, seed_left, seed_right, + seed_tl, seed_tr, seed_bl, seed_br) + if exit_val == exit_val: # not NaN + total += frac * (dists[k] + exit_val) + else: + # Grid edge -- distance = step only + total += frac * dists[k] + flow_len[r, c] = total + + return flow_len + + +@ngjit +def _get_exit_seed(r, c, nr, nc, h, w, + seed_top, seed_bottom, seed_left, seed_right, + seed_tl, seed_tr, seed_bl, seed_br): + """Look up the exit seed for a cell flowing out of the tile.""" + # Corner cases first + if nr < 0 and nc < 0: + return seed_tl + if nr < 0 and nc >= w: + return seed_tr + if nr >= h and nc < 0: + return seed_bl + if nr >= h and nc >= w: + return seed_br + # Edge cases + if nr < 0: + return seed_top[nc] + if nr >= h: + return seed_bottom[nc] + if nc < 0: + return seed_left[nr] + if nc >= w: + return seed_right[nr] + return np.nan + + +@ngjit +def _flow_length_mfd_upstream_tile(fractions, h, w, cellsize_x, cellsize_y, + seed_top, seed_bottom, + seed_left, seed_right, + seed_tl, seed_tr, seed_bl, seed_br): + """Upstream MFD flow length for one tile with entry seeds.""" + dy = np.array([0, 1, 1, 1, 0, -1, -1, -1], dtype=np.int64) + dx = np.array([1, 1, 0, -1, -1, -1, 0, 1], dtype=np.int64) + dists = _step_distances(cellsize_x, cellsize_y) + + in_degree = np.zeros((h, w), dtype=np.int32) + valid = np.zeros((h, w), dtype=np.int8) + flow_len = np.empty((h, w), dtype=np.float64) + + for r in range(h): + for c in range(w): + v = fractions[0, r, c] + if v == v: + valid[r, c] = 1 + flow_len[r, c] = 0.0 + else: + flow_len[r, c] = np.nan + + # Apply entry seeds (max with existing) + for c in range(w): + if valid[0, c] == 1 and seed_top[c] > flow_len[0, c]: + flow_len[0, c] = seed_top[c] + if valid[h - 1, c] == 1 and seed_bottom[c] > flow_len[h - 1, c]: + flow_len[h - 1, c] = seed_bottom[c] + for r in range(h): + if valid[r, 0] == 1 and seed_left[r] > flow_len[r, 0]: + flow_len[r, 0] = seed_left[r] + if valid[r, w - 1] == 1 and seed_right[r] > flow_len[r, w - 1]: + flow_len[r, w - 1] = seed_right[r] + + # Corner seeds + if valid[0, 0] == 1 and seed_tl > flow_len[0, 0]: + flow_len[0, 0] = seed_tl + if valid[0, w - 1] == 1 and seed_tr > flow_len[0, w - 1]: + flow_len[0, w - 1] = seed_tr + if valid[h - 1, 0] == 1 and seed_bl > flow_len[h - 1, 0]: + flow_len[h - 1, 0] = seed_bl + if valid[h - 1, w - 1] == 1 and seed_br > flow_len[h - 1, w - 1]: + flow_len[h - 1, w - 1] = seed_br + + # In-degrees + for r in range(h): + for c in range(w): + if valid[r, c] == 0: + continue + for k in range(8): + if fractions[k, r, c] > 0.0: + nr = r + dy[k] + nc = c + dx[k] + if 0 <= nr < h and 0 <= nc < w and valid[nr, nc] == 1: + in_degree[nr, nc] += 1 + + # BFS from divides + queue_r = np.empty(h * w, dtype=np.int64) + queue_c = np.empty(h * w, dtype=np.int64) + head = np.int64(0) + tail = np.int64(0) + + for r in range(h): + for c in range(w): + if valid[r, c] == 1 and in_degree[r, c] == 0: + queue_r[tail] = r + queue_c[tail] = c + tail += 1 + + while head < tail: + r = queue_r[head] + c = queue_c[head] + head += 1 + for k in range(8): + frac = fractions[k, r, c] + if frac > 0.0: + nr = r + dy[k] + nc = c + dx[k] + if 0 <= nr < h and 0 <= nc < w and valid[nr, nc] == 1: + candidate = flow_len[r, c] + dists[k] + if candidate > flow_len[nr, nc]: + flow_len[nr, nc] = candidate + in_degree[nr, nc] -= 1 + if in_degree[nr, nc] == 0: + queue_r[tail] = nr + queue_c[tail] = nc + tail += 1 + + return flow_len + + +# ===================================================================== +# Dask iterative tile sweep +# ===================================================================== + +def _preprocess_mfd_tiles(fractions_da, chunks_y, chunks_x): + """Extract boundary fraction strips into a dict.""" + n_tile_y = len(chunks_y) + n_tile_x = len(chunks_x) + frac_bdry = {} + + for iy in range(n_tile_y): + for ix in range(n_tile_x): + chunk = fractions_da[ + :, + sum(chunks_y[:iy]):sum(chunks_y[:iy + 1]), + sum(chunks_x[:ix]):sum(chunks_x[:ix + 1]), + ].compute() + chunk = np.asarray(chunk, dtype=np.float64) + + frac_bdry[('top', iy, ix)] = chunk[:, 0, :].copy() + frac_bdry[('bottom', iy, ix)] = chunk[:, -1, :].copy() + frac_bdry[('left', iy, ix)] = chunk[:, :, 0].copy() + frac_bdry[('right', iy, ix)] = chunk[:, :, -1].copy() + + return frac_bdry + + +def _compute_exit_seeds_downstream(iy, ix, boundaries, frac_bdry, + chunks_y, chunks_x, + n_tile_y, n_tile_x): + """For downstream: provide flow_length values at destination cells + in adjacent tiles. + + The tile kernel looks up ``seed_top[nc]`` for a cell flowing to + destination column ``nc`` in the tile above, so each seed array + is simply a copy of the adjacent tile's facing boundary. + """ + tile_h = chunks_y[iy] + tile_w = chunks_x[ix] + + seed_top = np.full(tile_w, np.nan) + seed_bottom = np.full(tile_w, np.nan) + seed_left = np.full(tile_h, np.nan) + seed_right = np.full(tile_h, np.nan) + seed_tl = np.nan + seed_tr = np.nan + seed_bl = np.nan + seed_br = np.nan + + # Top: destinations are in the bottom row of tile (iy-1, ix) + if iy > 0: + nb = boundaries.get('bottom', iy - 1, ix) + seed_top[:len(nb)] = nb + + # Bottom: destinations are in the top row of tile (iy+1, ix) + if iy < n_tile_y - 1: + nb = boundaries.get('top', iy + 1, ix) + seed_bottom[:len(nb)] = nb + + # Left: destinations are in the right col of tile (iy, ix-1) + if ix > 0: + nb = boundaries.get('right', iy, ix - 1) + seed_left[:len(nb)] = nb + + # Right: destinations are in the left col of tile (iy, ix+1) + if ix < n_tile_x - 1: + nb = boundaries.get('left', iy, ix + 1) + seed_right[:len(nb)] = nb + + # Diagonal corners + if iy > 0 and ix > 0: + seed_tl = float(boundaries.get('bottom', iy - 1, ix - 1)[-1]) + if iy > 0 and ix < n_tile_x - 1: + seed_tr = float(boundaries.get('bottom', iy - 1, ix + 1)[0]) + if iy < n_tile_y - 1 and ix > 0: + seed_bl = float(boundaries.get('top', iy + 1, ix - 1)[-1]) + if iy < n_tile_y - 1 and ix < n_tile_x - 1: + seed_br = float(boundaries.get('top', iy + 1, ix + 1)[0]) + + return (seed_top, seed_bottom, seed_left, seed_right, + seed_tl, seed_tr, seed_bl, seed_br) + + +def _compute_entry_seeds_upstream(iy, ix, boundaries, frac_bdry, + chunks_y, chunks_x, + n_tile_y, n_tile_x, + cellsize_x, cellsize_y): + """For upstream: check which adjacent cells flow INTO this tile, + seed with max(existing, neighbor_upstream + step_dist).""" + dy_arr = np.array([0, 1, 1, 1, 0, -1, -1, -1], dtype=np.int64) + dx_arr = np.array([1, 1, 0, -1, -1, -1, 0, 1], dtype=np.int64) + dists = _step_distances(cellsize_x, cellsize_y) + + tile_h = chunks_y[iy] + tile_w = chunks_x[ix] + + seed_top = np.zeros(tile_w, dtype=np.float64) + seed_bottom = np.zeros(tile_w, dtype=np.float64) + seed_left = np.zeros(tile_h, dtype=np.float64) + seed_right = np.zeros(tile_h, dtype=np.float64) + seed_tl = 0.0 + seed_tr = 0.0 + seed_bl = 0.0 + seed_br = 0.0 + + # Top edge: bottom row of tile above flows south into this tile + if iy > 0: + nb_frac = frac_bdry[('bottom', iy - 1, ix)] # (8, w) + nb_val = boundaries.get('bottom', iy - 1, ix) + w = nb_frac.shape[1] + for c in range(w): + for k in range(8): + if not (nb_frac[k, c] > 0.0): # rejects NaN + continue + ndy = dy_arr[k] + ndx = dx_arr[k] + if ndy != 1: # must flow south + continue + tc = c + ndx + if 0 <= tc < tile_w: + v = nb_val[c] + dists[k] + if v > seed_top[tc]: + seed_top[tc] = v + + # Bottom edge: top row of tile below flows north into this tile + if iy < n_tile_y - 1: + nb_frac = frac_bdry[('top', iy + 1, ix)] + nb_val = boundaries.get('top', iy + 1, ix) + w = nb_frac.shape[1] + for c in range(w): + for k in range(8): + if not (nb_frac[k, c] > 0.0): # rejects NaN + continue + ndy = dy_arr[k] + ndx = dx_arr[k] + if ndy != -1: + continue + tc = c + ndx + if 0 <= tc < tile_w: + v = nb_val[c] + dists[k] + if v > seed_bottom[tc]: + seed_bottom[tc] = v + + # Left edge: right col of tile to the left flows east into this tile + if ix > 0: + nb_frac = frac_bdry[('right', iy, ix - 1)] + nb_val = boundaries.get('right', iy, ix - 1) + h = nb_frac.shape[1] + for r in range(h): + for k in range(8): + if not (nb_frac[k, r] > 0.0): # rejects NaN + continue + ndy = dy_arr[k] + ndx = dx_arr[k] + if ndx != 1: + continue + tr = r + ndy + if 0 <= tr < tile_h: + v = nb_val[r] + dists[k] + if v > seed_left[tr]: + seed_left[tr] = v + + # Right edge: left col of tile to the right flows west into this tile + if ix < n_tile_x - 1: + nb_frac = frac_bdry[('left', iy, ix + 1)] + nb_val = boundaries.get('left', iy, ix + 1) + h = nb_frac.shape[1] + for r in range(h): + for k in range(8): + if not (nb_frac[k, r] > 0.0): # rejects NaN + continue + ndy = dy_arr[k] + ndx = dx_arr[k] + if ndx != -1: + continue + tr = r + ndy + if 0 <= tr < tile_h: + v = nb_val[r] + dists[k] + if v > seed_right[tr]: + seed_right[tr] = v + + # Diagonal corner seeds + diag = dists[1] # diagonal distance + + # TL: bottom-right cell of (iy-1, ix-1) flows SE (k=1) + if iy > 0 and ix > 0: + nb_frac = frac_bdry[('bottom', iy - 1, ix - 1)] + if nb_frac[1, -1] > 0.0: + val = boundaries.get('bottom', iy - 1, ix - 1)[-1] + if val == val: + seed_tl = val + diag + + # TR: bottom-left cell of (iy-1, ix+1) flows SW (k=3) + if iy > 0 and ix < n_tile_x - 1: + nb_frac = frac_bdry[('bottom', iy - 1, ix + 1)] + if nb_frac[3, 0] > 0.0: + val = boundaries.get('bottom', iy - 1, ix + 1)[0] + if val == val: + seed_tr = val + diag + + # BL: top-right cell of (iy+1, ix-1) flows NE (k=7) + if iy < n_tile_y - 1 and ix > 0: + nb_frac = frac_bdry[('top', iy + 1, ix - 1)] + if nb_frac[7, -1] > 0.0: + val = boundaries.get('top', iy + 1, ix - 1)[-1] + if val == val: + seed_bl = val + diag + + # BR: top-left cell of (iy+1, ix+1) flows NW (k=5) + if iy < n_tile_y - 1 and ix < n_tile_x - 1: + nb_frac = frac_bdry[('top', iy + 1, ix + 1)] + if nb_frac[5, 0] > 0.0: + val = boundaries.get('top', iy + 1, ix + 1)[0] + if val == val: + seed_br = val + diag + + return (seed_top, seed_bottom, seed_left, seed_right, + seed_tl, seed_tr, seed_bl, seed_br) + + +def _process_tile_downstream(iy, ix, fractions_da, boundaries, frac_bdry, + chunks_y, chunks_x, n_tile_y, n_tile_x, + cellsize_x, cellsize_y): + """Process one tile for downstream MFD flow length.""" + y_start = sum(chunks_y[:iy]) + y_end = y_start + chunks_y[iy] + x_start = sum(chunks_x[:ix]) + x_end = x_start + chunks_x[ix] + + chunk = np.asarray( + fractions_da[:, y_start:y_end, x_start:x_end].compute(), + dtype=np.float64) + _, h, w = chunk.shape + + seeds = _compute_exit_seeds_downstream( + iy, ix, boundaries, frac_bdry, + chunks_y, chunks_x, n_tile_y, n_tile_x) + + result = _flow_length_mfd_downstream_tile( + chunk, h, w, cellsize_x, cellsize_y, *seeds) + + new_top = np.where(np.isnan(result[0, :]), np.nan, result[0, :]) + new_bottom = np.where(np.isnan(result[-1, :]), np.nan, result[-1, :]) + new_left = np.where(np.isnan(result[:, 0]), np.nan, result[:, 0]) + new_right = np.where(np.isnan(result[:, -1]), np.nan, result[:, -1]) + + change = 0.0 + for side, new in (('top', new_top), ('bottom', new_bottom), + ('left', new_left), ('right', new_right)): + old = boundaries.get(side, iy, ix) + with np.errstate(invalid='ignore'): + diff = np.abs(new - old) + diff = np.where(np.isnan(diff), 0.0, diff) + m = float(np.max(diff)) + if m > change: + change = m + + boundaries.set('top', iy, ix, new_top) + boundaries.set('bottom', iy, ix, new_bottom) + boundaries.set('left', iy, ix, new_left) + boundaries.set('right', iy, ix, new_right) + + return change + + +def _process_tile_upstream(iy, ix, fractions_da, boundaries, frac_bdry, + chunks_y, chunks_x, n_tile_y, n_tile_x, + cellsize_x, cellsize_y): + """Process one tile for upstream MFD flow length.""" + y_start = sum(chunks_y[:iy]) + y_end = y_start + chunks_y[iy] + x_start = sum(chunks_x[:ix]) + x_end = x_start + chunks_x[ix] + + chunk = np.asarray( + fractions_da[:, y_start:y_end, x_start:x_end].compute(), + dtype=np.float64) + _, h, w = chunk.shape + + seeds = _compute_entry_seeds_upstream( + iy, ix, boundaries, frac_bdry, + chunks_y, chunks_x, n_tile_y, n_tile_x, + cellsize_x, cellsize_y) + + result = _flow_length_mfd_upstream_tile( + chunk, h, w, cellsize_x, cellsize_y, *seeds) + + new_top = np.where(np.isnan(result[0, :]), 0.0, result[0, :]) + new_bottom = np.where(np.isnan(result[-1, :]), 0.0, result[-1, :]) + new_left = np.where(np.isnan(result[:, 0]), 0.0, result[:, 0]) + new_right = np.where(np.isnan(result[:, -1]), 0.0, result[:, -1]) + + change = 0.0 + for side, new in (('top', new_top), ('bottom', new_bottom), + ('left', new_left), ('right', new_right)): + old = boundaries.get(side, iy, ix) + with np.errstate(invalid='ignore'): + diff = np.abs(new - old) + diff = np.where(np.isnan(diff), 0.0, diff) + m = float(np.max(diff)) + if m > change: + change = m + + boundaries.set('top', iy, ix, new_top) + boundaries.set('bottom', iy, ix, new_bottom) + boundaries.set('left', iy, ix, new_left) + boundaries.set('right', iy, ix, new_right) + + return change + + +def _flow_length_mfd_dask_iterative(fractions_da, direction, + cellsize_x, cellsize_y, + chunks_y, chunks_x): + """Iterative boundary-propagation for MFD flow length on dask arrays.""" + n_tile_y = len(chunks_y) + n_tile_x = len(chunks_x) + + frac_bdry = _preprocess_mfd_tiles(fractions_da, chunks_y, chunks_x) + + fill = np.nan if direction == 'downstream' else 0.0 + boundaries = BoundaryStore(chunks_y, chunks_x, fill_value=fill) + + if direction == 'downstream': + process_fn = _process_tile_downstream + else: + process_fn = _process_tile_upstream + + max_iterations = max(n_tile_y, n_tile_x) * 2 + 10 + + for _iteration in range(max_iterations): + max_change = 0.0 + + for iy in range(n_tile_y): + for ix in range(n_tile_x): + c = process_fn(iy, ix, fractions_da, boundaries, frac_bdry, + chunks_y, chunks_x, n_tile_y, n_tile_x, + cellsize_x, cellsize_y) + if c > max_change: + max_change = c + + for iy in reversed(range(n_tile_y)): + for ix in reversed(range(n_tile_x)): + c = process_fn(iy, ix, fractions_da, boundaries, frac_bdry, + chunks_y, chunks_x, n_tile_y, n_tile_x, + cellsize_x, cellsize_y) + if c > max_change: + max_change = c + + if max_change == 0.0: + break + + boundaries = boundaries.snapshot() + + return _assemble_result(fractions_da, boundaries, frac_bdry, + chunks_y, chunks_x, n_tile_y, n_tile_x, + direction, cellsize_x, cellsize_y) + + +def _assemble_result(fractions_da, boundaries, frac_bdry, + chunks_y, chunks_x, n_tile_y, n_tile_x, + direction, cellsize_x, cellsize_y): + """Build dask array by re-running tiles with converged boundaries.""" + rows = [] + for iy in range(n_tile_y): + row = [] + for ix in range(n_tile_x): + y_start = sum(chunks_y[:iy]) + y_end = y_start + chunks_y[iy] + x_start = sum(chunks_x[:ix]) + x_end = x_start + chunks_x[ix] + + chunk = np.asarray( + fractions_da[:, y_start:y_end, x_start:x_end].compute(), + dtype=np.float64) + _, h, w = chunk.shape + + if direction == 'downstream': + seeds = _compute_exit_seeds_downstream( + iy, ix, boundaries, frac_bdry, + chunks_y, chunks_x, n_tile_y, n_tile_x) + tile = _flow_length_mfd_downstream_tile( + chunk, h, w, cellsize_x, cellsize_y, *seeds) + else: + seeds = _compute_entry_seeds_upstream( + iy, ix, boundaries, frac_bdry, + chunks_y, chunks_x, n_tile_y, n_tile_x, + cellsize_x, cellsize_y) + tile = _flow_length_mfd_upstream_tile( + chunk, h, w, cellsize_x, cellsize_y, *seeds) + + row.append(da.from_array(tile, chunks=tile.shape)) + rows.append(row) + + return da.block(rows) + + +def _flow_length_mfd_dask_cupy(fractions_da, direction, + cellsize_x, cellsize_y, + chunks_y, chunks_x): + """Dask+CuPy: convert to numpy, run CPU iterative, convert back.""" + import cupy as cp + + fractions_np = fractions_da.map_blocks( + lambda b: b.get(), dtype=fractions_da.dtype, + meta=np.array((), dtype=fractions_da.dtype), + ) + result = _flow_length_mfd_dask_iterative( + fractions_np, direction, cellsize_x, cellsize_y, + chunks_y, chunks_x) + return result.map_blocks( + cp.asarray, dtype=result.dtype, + meta=cp.array((), dtype=result.dtype), + ) + + +# ===================================================================== +# Public API +# ===================================================================== + +@supports_dataset +def flow_length_mfd(flow_dir_mfd: xr.DataArray, + direction: str = 'downstream', + name: str = 'flow_length_mfd') -> xr.DataArray: + """Compute flow length from an MFD flow direction grid. + + Parameters + ---------- + flow_dir_mfd : xarray.DataArray or xr.Dataset + 3-D MFD flow direction array of shape ``(8, H, W)`` as returned + by ``flow_direction_mfd``. Values are flow fractions in + ``[0, 1]`` that sum to 1.0 at each cell (0.0 at pits/flats, + NaN at edges or nodata cells). + Supported backends: NumPy, CuPy, NumPy-backed Dask, + CuPy-backed Dask. + If a Dataset is passed, the operation is applied to each + data variable independently. + direction : str, default 'downstream' + ``'downstream'``: proportion-weighted average distance from + each cell to its outlet. Each cell gets the expected distance + across all MFD flow paths. + ``'upstream'``: longest path from any divide to each cell. + name : str, default 'flow_length_mfd' + Name of output DataArray. + + Returns + ------- + xarray.DataArray or xr.Dataset + 2-D float64 array of flow length values in coordinate units. + NaN where the input has NaN. + + References + ---------- + Qin, C., Zhu, A.X., Pei, T., Li, B., Zhou, C., and Yang, L. + (2007). An adaptive approach to selecting a flow-partition + exponent for a multiple-flow-direction algorithm. International + Journal of Geographical Information Science, 21(4), 443-458. + + Quinn, P., Beven, K., Chevallier, P., and Planchon, O. (1991). + The prediction of hillslope flow paths for distributed + hydrological modelling using digital terrain models. + Hydrological Processes, 5(1), 59-79. + """ + _validate_raster(flow_dir_mfd, func_name='flow_length_mfd', + name='flow_dir_mfd', ndim=3) + + if direction not in ('downstream', 'upstream'): + raise ValueError( + f"direction must be 'downstream' or 'upstream', got {direction!r}") + + data = flow_dir_mfd.data + + if data.ndim != 3 or data.shape[0] != 8: + raise ValueError( + "flow_dir_mfd must be a 3-D array of shape (8, H, W), " + f"got shape {data.shape}" + ) + + cellsize_x, cellsize_y = get_dataarray_resolution(flow_dir_mfd) + cellsize_x = abs(cellsize_x) + cellsize_y = abs(cellsize_y) + + if isinstance(data, np.ndarray): + frac = data.astype(np.float64) + _, H, W = frac.shape + if direction == 'downstream': + out = _flow_length_mfd_downstream_cpu( + frac, H, W, cellsize_x, cellsize_y) + else: + out = _flow_length_mfd_upstream_cpu( + frac, H, W, cellsize_x, cellsize_y) + + elif has_cuda_and_cupy() and is_cupy_array(data): + out = _flow_length_mfd_cupy(data, direction, cellsize_x, cellsize_y) + + elif has_cuda_and_cupy() and is_dask_cupy(flow_dir_mfd): + chunks_y = data.chunks[1] + chunks_x = data.chunks[2] + out = _flow_length_mfd_dask_cupy( + data, direction, cellsize_x, cellsize_y, chunks_y, chunks_x) + + elif da is not None and isinstance(data, da.Array): + chunks_y = data.chunks[1] + chunks_x = data.chunks[2] + out = _flow_length_mfd_dask_iterative( + data, direction, cellsize_x, cellsize_y, chunks_y, chunks_x) + + else: + raise TypeError(f"Unsupported array type: {type(data)}") + + # Build 2-D output coords (drop 'neighbor' dim) + spatial_dims = flow_dir_mfd.dims[1:] + coords = {k: v for k, v in flow_dir_mfd.coords.items() + if k != 'neighbor' and k not in flow_dir_mfd.dims[:1]} + for d in spatial_dims: + if d in flow_dir_mfd.coords: + coords[d] = flow_dir_mfd.coords[d] + + return xr.DataArray(out, + name=name, + coords=coords, + dims=spatial_dims, + attrs=flow_dir_mfd.attrs) diff --git a/xrspatial/tests/test_flow_length_mfd.py b/xrspatial/tests/test_flow_length_mfd.py new file mode 100644 index 00000000..7c037d65 --- /dev/null +++ b/xrspatial/tests/test_flow_length_mfd.py @@ -0,0 +1,419 @@ +"""Tests for xrspatial.flow_length_mfd.""" + +import math + +import numpy as np +import pytest +import xarray as xr + +from xrspatial.flow_length_mfd import flow_length_mfd +from xrspatial.tests.general_checks import ( + create_test_raster, + cuda_and_cupy_available, + dask_array_available, +) + + +def _make_mfd_raster(fractions, backend='numpy', chunks=(3, 3), + res=(1.0, 1.0)): + """Build a 3-D MFD DataArray from (8, H, W) fractions array. + + Mirrors the output shape of ``flow_direction_mfd``. + """ + _, H, W = fractions.shape + neighbor_names = ['E', 'SE', 'S', 'SW', 'W', 'NW', 'N', 'NE'] + y_coords = np.arange(H) * abs(res[1]) + x_coords = np.arange(W) * abs(res[0]) + + da_obj = xr.DataArray( + fractions.astype(np.float64), + dims=('neighbor', 'y', 'x'), + coords={ + 'neighbor': neighbor_names, + 'y': y_coords, + 'x': x_coords, + }, + attrs={'res': res}, + name='mfd_fdir', + ) + + if backend == 'dask': + import dask.array as da + da_obj = da_obj.chunk({'neighbor': 8, 'y': chunks[0], 'x': chunks[1]}) + elif backend == 'cupy': + import cupy as cp + da_obj = da_obj.copy(data=cp.asarray(da_obj.data)) + elif backend == 'dask+cupy': + import cupy as cp + import dask.array as da + da_obj = da_obj.chunk({'neighbor': 8, 'y': chunks[0], 'x': chunks[1]}) + da_obj = da_obj.copy( + data=da_obj.data.map_blocks(cp.asarray, dtype=da_obj.dtype, + meta=cp.array((), dtype=da_obj.dtype))) + + return da_obj + + +def _all_east_fractions(H, W): + """All cells flow east (direction 0). Last col = pit (all zeros).""" + fracs = np.zeros((8, H, W), dtype=np.float64) + fracs[0, :, :-1] = 1.0 # E direction, all but last col + # Last col: pit (all zeros) + # Mark last col as valid (not NaN) by ensuring at least one band is finite + # Actually pits have all-zero fractions, which is valid (not NaN) + return fracs + + +def _all_south_fractions(H, W): + """All cells flow south (direction 2). Last row = pit.""" + fracs = np.zeros((8, H, W), dtype=np.float64) + fracs[2, :-1, :] = 1.0 # S direction, all but last row + return fracs + + +class TestDownstreamLinear: + """All cells flow east; pit at rightmost column.""" + + def test_downstream_linear(self): + fracs = _all_east_fractions(3, 5) + raster = _make_mfd_raster(fracs) + result = flow_length_mfd(raster, direction='downstream') + for r in range(3): + for c in range(5): + expected = (4 - c) * 1.0 + np.testing.assert_allclose( + result.data[r, c], expected, rtol=1e-10, + err_msg=f"Failed at ({r},{c})") + + +class TestUpstreamLinear: + """All cells flow east; upstream length = col * cellsize_x.""" + + def test_upstream_linear(self): + fracs = _all_east_fractions(3, 5) + raster = _make_mfd_raster(fracs) + result = flow_length_mfd(raster, direction='upstream') + for r in range(3): + for c in range(5): + expected = c * 1.0 + np.testing.assert_allclose( + result.data[r, c], expected, rtol=1e-10, + err_msg=f"Failed at ({r},{c})") + + +class TestDownstreamSouth: + """All cells flow south; pit at bottom row.""" + + def test_downstream_south(self): + fracs = _all_south_fractions(4, 3) + raster = _make_mfd_raster(fracs) + result = flow_length_mfd(raster, direction='downstream') + for r in range(4): + for c in range(3): + expected = (3 - r) * 1.0 + np.testing.assert_allclose( + result.data[r, c], expected, rtol=1e-10, + err_msg=f"Failed at ({r},{c})") + + +class TestDownstreamDiagonal: + """All cells flow SE; pit at bottom-right corner.""" + + def test_downstream_diagonal(self): + H, W = 4, 4 + fracs = np.zeros((8, H, W), dtype=np.float64) + fracs[1, :, :] = 0.0 + # Only interior cells flow SE; edge cells that would go OOB get 0 + for r in range(H): + for c in range(W): + if r < H - 1 and c < W - 1: + fracs[1, r, c] = 1.0 # SE + # else: pit or flows off grid + + raster = _make_mfd_raster(fracs) + result = flow_length_mfd(raster, direction='downstream') + diag = math.sqrt(2.0) + for r in range(H): + for c in range(W): + steps = min(H - 1 - r, W - 1 - c) + expected = steps * diag + np.testing.assert_allclose( + result.data[r, c], expected, rtol=1e-10, + err_msg=f"Failed at ({r},{c})") + + +class TestDownstreamSplit: + """Cell splits flow between two neighbors: check weighted average.""" + + def test_equal_split(self): + # 1x3 grid: cell (0,0) sends 0.5 E, 0.5 unused (off grid) + # Actually, let's make a 3x3 with center sending 0.5 E, 0.5 S + fracs = np.zeros((8, 3, 3), dtype=np.float64) + # Center (1,1): 0.5 east, 0.5 south + fracs[0, 1, 1] = 0.5 # E -> (1,2) + fracs[2, 1, 1] = 0.5 # S -> (2,1) + # (1,2) and (2,1) are pits (all zero fracs) + raster = _make_mfd_raster(fracs) + result = flow_length_mfd(raster, direction='downstream') + + # center: 0.5 * (1.0 + 0) + 0.5 * (1.0 + 0) = 1.0 + np.testing.assert_allclose(result.data[1, 1], 1.0, rtol=1e-10) + # pits: 0.0 + assert result.data[1, 2] == 0.0 + assert result.data[2, 1] == 0.0 + + def test_unequal_split(self): + # Center sends 0.7 E, 0.3 SE + fracs = np.zeros((8, 3, 3), dtype=np.float64) + fracs[0, 1, 1] = 0.7 # E -> (1,2) + fracs[1, 1, 1] = 0.3 # SE -> (2,2) + raster = _make_mfd_raster(fracs) + result = flow_length_mfd(raster, direction='downstream') + + diag = math.sqrt(2.0) + expected = 0.7 * 1.0 + 0.3 * diag + np.testing.assert_allclose(result.data[1, 1], expected, rtol=1e-10) + + +class TestDownstreamChain: + """Chain: A -> split(B, C) -> D, verify weighted propagation.""" + + def test_chain_weighted(self): + # 1x4: cell 0 -> E(1.0) -> cell 1 -> 0.6 E, 0.4 goes off grid + # cell 1 -> E -> cell 2 -> E(1.0) -> cell 3 (pit) + fracs = np.zeros((8, 1, 4), dtype=np.float64) + fracs[0, 0, 0] = 1.0 # cell 0 -> E + fracs[0, 0, 1] = 1.0 # cell 1 -> E + fracs[0, 0, 2] = 1.0 # cell 2 -> E + # cell 3 = pit + raster = _make_mfd_raster(fracs) + result = flow_length_mfd(raster, direction='downstream') + + # cell 3: 0, cell 2: 1, cell 1: 2, cell 0: 3 + np.testing.assert_allclose(result.data[0, 3], 0.0, rtol=1e-10) + np.testing.assert_allclose(result.data[0, 2], 1.0, rtol=1e-10) + np.testing.assert_allclose(result.data[0, 1], 2.0, rtol=1e-10) + np.testing.assert_allclose(result.data[0, 0], 3.0, rtol=1e-10) + + +class TestUpstreamMaxPath: + """Upstream takes the longest path at junctions.""" + + def test_upstream_max_path(self): + # 2x4 grid: + # Row 0: all flow east + # Row 1: cells 0,1 flow east, cell 2 flows north to (0,2) + # (0,2) gets flow from (0,1) path length 2 and (1,2) path length 3 + # upstream[(0,2)] = max(2, 3) = 3 + fracs = np.zeros((8, 2, 4), dtype=np.float64) + fracs[0, 0, 0] = 1.0 # (0,0) -> E + fracs[0, 0, 1] = 1.0 # (0,1) -> E + fracs[0, 0, 2] = 1.0 # (0,2) -> E + # row 1 + fracs[0, 1, 0] = 1.0 # (1,0) -> E + fracs[0, 1, 1] = 1.0 # (1,1) -> E + fracs[6, 1, 2] = 1.0 # (1,2) -> N to (0,2) + # (0,3) and (1,3) are pits + + raster = _make_mfd_raster(fracs) + result = flow_length_mfd(raster, direction='upstream') + + # (0,0): divide -> 0 + assert result.data[0, 0] == 0.0 + # (0,1): from (0,0) -> 1 + np.testing.assert_allclose(result.data[0, 1], 1.0, rtol=1e-10) + # (1,2): upstream from (1,0)->(1,1)->(1,2) = 2 + np.testing.assert_allclose(result.data[1, 2], 2.0, rtol=1e-10) + # (0,2): max((0,1)+1=2, (1,2)+1=3) = 3 + np.testing.assert_allclose(result.data[0, 2], 3.0, rtol=1e-10) + + +class TestNanHandling: + """NaN flow_dir should produce NaN output.""" + + def test_nan_cells(self): + fracs = np.zeros((8, 2, 3), dtype=np.float64) + fracs[0, 0, 0] = 1.0 # (0,0) -> E + fracs[:, 0, 1] = np.nan # (0,1) is NaN + # (0,2), (1,*) are pits + + raster = _make_mfd_raster(fracs) + result_ds = flow_length_mfd(raster, direction='downstream') + result_us = flow_length_mfd(raster, direction='upstream') + + assert np.isnan(result_ds.data[0, 1]) + assert np.isnan(result_us.data[0, 1]) + # Non-NaN cells should have valid values + assert not np.isnan(result_ds.data[0, 0]) + + +class TestPitCells: + """Pit cells (all-zero fractions) have flow_length = 0.""" + + def test_pit_zero(self): + fracs = np.zeros((8, 2, 2), dtype=np.float64) + # All cells are pits + raster = _make_mfd_raster(fracs) + result = flow_length_mfd(raster, direction='downstream') + for r in range(2): + for c in range(2): + assert result.data[r, c] == 0.0 + + +class TestDirectionValidation: + """Invalid direction string should raise.""" + + def test_invalid_direction(self): + fracs = np.zeros((8, 2, 2), dtype=np.float64) + raster = _make_mfd_raster(fracs) + with pytest.raises(ValueError, match="direction"): + flow_length_mfd(raster, direction='sideways') + + +class TestNonSquareCells: + """Test with rectangular cells.""" + + def test_rectangular_east(self): + # All flow east, cellsize_x = 2.0 + fracs = _all_east_fractions(1, 3) + raster = _make_mfd_raster(fracs, res=(2.0, 1.0)) + result = flow_length_mfd(raster, direction='downstream') + np.testing.assert_allclose(result.data[0, 0], 4.0, rtol=1e-10) + np.testing.assert_allclose(result.data[0, 1], 2.0, rtol=1e-10) + np.testing.assert_allclose(result.data[0, 2], 0.0, rtol=1e-10) + + def test_rectangular_south(self): + # All flow south, cellsize_y = 3.0 + fracs = _all_south_fractions(3, 1) + raster = _make_mfd_raster(fracs, res=(1.0, 3.0)) + result = flow_length_mfd(raster, direction='downstream') + np.testing.assert_allclose(result.data[0, 0], 6.0, rtol=1e-10) + np.testing.assert_allclose(result.data[1, 0], 3.0, rtol=1e-10) + np.testing.assert_allclose(result.data[2, 0], 0.0, rtol=1e-10) + + +class TestShapeValidation: + """Non-(8, H, W) input should raise.""" + + def test_wrong_shape(self): + data = np.zeros((3, 4, 4), dtype=np.float64) + raster = xr.DataArray(data, dims=('a', 'b', 'c'), + attrs={'res': (1.0, 1.0)}) + with pytest.raises(ValueError, match="shape"): + flow_length_mfd(raster) + + +class TestFlowDirectionMfdIntegration: + """Integration test with actual flow_direction_mfd output.""" + + def test_bowl_downstream(self): + from xrspatial.flow_direction_mfd import flow_direction_mfd + + # Bowl DEM: center is lowest + elev = np.array([ + [9, 8, 9], + [8, 1, 8], + [9, 8, 9], + ], dtype=np.float64) + elev_r = create_test_raster(elev, backend='numpy', name='elev') + mfd = flow_direction_mfd(elev_r) + result = flow_length_mfd(mfd, direction='downstream') + + # Center (1,1) is a pit (no downslope) -> 0 + np.testing.assert_allclose(result.data[1, 1], 0.0, atol=1e-10) + # Edge cells are NaN (MFD needs all 8 neighbors) + assert np.isnan(result.data[0, 0]) + + +@dask_array_available +class TestFlowLengthMfdDask: + """Cross-backend: numpy vs dask for both directions.""" + + @pytest.mark.parametrize("chunks", [ + (2, 2), (3, 3), (2, 5), (5, 2), + ]) + @pytest.mark.parametrize("direction", ['downstream', 'upstream']) + def test_numpy_equals_dask(self, chunks, direction): + from xrspatial.flow_direction_mfd import flow_direction_mfd + + np.random.seed(123) + elev = np.random.uniform(0, 100, (6, 6)).astype(np.float64) + elev_r = create_test_raster(elev, backend='numpy', name='elev') + mfd = flow_direction_mfd(elev_r) + mfd_data = mfd.data.astype(np.float64) + + np_raster = _make_mfd_raster(mfd_data, backend='numpy') + da_raster = _make_mfd_raster(mfd_data, backend='dask', chunks=chunks) + + result_np = flow_length_mfd(np_raster, direction=direction) + result_da = flow_length_mfd(da_raster, direction=direction) + + np.testing.assert_allclose( + result_np.data, result_da.data.compute(), + equal_nan=True, rtol=1e-10) + + @pytest.mark.parametrize("direction", ['downstream', 'upstream']) + def test_cross_tile_flow(self, direction): + """Flow crossing tile boundaries should compute correctly.""" + fracs = _all_east_fractions(2, 6) + np_raster = _make_mfd_raster(fracs, backend='numpy') + da_raster = _make_mfd_raster(fracs, backend='dask', chunks=(2, 2)) + + result_np = flow_length_mfd(np_raster, direction=direction) + result_da = flow_length_mfd(da_raster, direction=direction) + + np.testing.assert_allclose( + result_np.data, result_da.data.compute(), + equal_nan=True, rtol=1e-10) + + +@cuda_and_cupy_available +class TestFlowLengthMfdCuPy: + """Cross-backend: numpy vs cupy.""" + + @pytest.mark.parametrize("direction", ['downstream', 'upstream']) + def test_numpy_equals_cupy(self, direction): + from xrspatial.flow_direction_mfd import flow_direction_mfd + + np.random.seed(42) + elev = np.random.uniform(0, 100, (6, 6)).astype(np.float64) + elev_r = create_test_raster(elev, backend='numpy', name='elev') + mfd = flow_direction_mfd(elev_r) + mfd_data = mfd.data.astype(np.float64) + + np_raster = _make_mfd_raster(mfd_data, backend='numpy') + cu_raster = _make_mfd_raster(mfd_data, backend='cupy') + + result_np = flow_length_mfd(np_raster, direction=direction) + result_cu = flow_length_mfd(cu_raster, direction=direction) + + np.testing.assert_allclose( + result_np.data, result_cu.data.get(), + equal_nan=True, rtol=1e-10) + + +@cuda_and_cupy_available +@dask_array_available +class TestFlowLengthMfdDaskCuPy: + """Cross-backend: numpy vs dask+cupy.""" + + @pytest.mark.parametrize("direction", ['downstream', 'upstream']) + def test_numpy_equals_dask_cupy(self, direction): + from xrspatial.flow_direction_mfd import flow_direction_mfd + + np.random.seed(42) + elev = np.random.uniform(0, 100, (6, 6)).astype(np.float64) + elev_r = create_test_raster(elev, backend='numpy', name='elev') + mfd = flow_direction_mfd(elev_r) + mfd_data = mfd.data.astype(np.float64) + + np_raster = _make_mfd_raster(mfd_data, backend='numpy') + dc_raster = _make_mfd_raster(mfd_data, backend='dask+cupy', + chunks=(3, 3)) + + result_np = flow_length_mfd(np_raster, direction=direction) + result_dc = flow_length_mfd(dc_raster, direction=direction) + + np.testing.assert_allclose( + result_np.data, result_dc.data.compute().get(), + equal_nan=True, rtol=1e-10) From a7cdaa8eb2e9be368c3c486df1d2482254b0921d Mon Sep 17 00:00:00 2001 From: Brendan Collins Date: Sun, 15 Mar 2026 17:49:41 -0700 Subject: [PATCH 2/2] Add docs, README entry, and user guide notebook for flow_length_mfd (#1009) --- README.md | 2 + docs/source/reference/hydrology.rst | 7 + examples/user_guide/32_Flow_Length_MFD.ipynb | 199 +++++++++++++++++++ 3 files changed, 208 insertions(+) create mode 100644 examples/user_guide/32_Flow_Length_MFD.ipynb diff --git a/README.md b/README.md index 2e506f7b..42ad6d2b 100644 --- a/README.md +++ b/README.md @@ -294,6 +294,8 @@ In the GIS world, rasters are used for representing continuous phenomena (e.g. e | [Flow Direction (MFD)](xrspatial/flow_direction_mfd.py) | Partitions flow to all downslope neighbors with an adaptive exponent (Qin et al. 2007) | Qin et al. 2007 | ✅️ | ✅️ | ✅️ | ✅️ | | [Flow Accumulation (D8)](xrspatial/flow_accumulation.py) | Counts upstream cells draining through each cell in a D8 flow direction grid | Jenson & Domingue 1988 | ✅️ | ✅️ | ✅️ | ✅️ | | [Flow Accumulation (MFD)](xrspatial/flow_accumulation_mfd.py) | Accumulates upstream area through all MFD flow paths weighted by directional fractions | Qin et al. 2007 | ✅️ | ✅️ | ✅️ | 🔄 | +| [Flow Length (D8)](xrspatial/flow_length.py) | Computes D8 flow path length from each cell to outlet (downstream) or from divide (upstream) | Standard (D8 tracing) | ✅️ | ✅️ | ✅️ | 🔄 | +| [Flow Length (MFD)](xrspatial/flow_length_mfd.py) | Proportion-weighted flow path length using MFD fractions (downstream or upstream) | Qin et al. 2007 | ✅️ | ✅️ | ✅️ | 🔄 | | [Watershed](xrspatial/watershed.py) | Labels each cell with the pour point it drains to via D8 flow direction | Standard (D8 tracing) | ✅️ | ✅️ | ✅️ | ✅️ | | [Basins](xrspatial/watershed.py) | Delineates drainage basins by labeling each cell with its outlet ID | Standard (D8 tracing) | ✅️ | ✅️ | ✅️ | ✅️ | | [Stream Order](xrspatial/stream_order.py) | Assigns Strahler or Shreve stream order to cells in a drainage network | Strahler 1957, Shreve 1966 | ✅️ | ✅️ | ✅️ | ✅️ | diff --git a/docs/source/reference/hydrology.rst b/docs/source/reference/hydrology.rst index 4e5216ef..6cf753b0 100644 --- a/docs/source/reference/hydrology.rst +++ b/docs/source/reference/hydrology.rst @@ -53,6 +53,13 @@ Flow Length xrspatial.flow_length.flow_length +Flow Length (MFD) +================= +.. autosummary:: + :toctree: _autosummary + + xrspatial.flow_length_mfd.flow_length_mfd + Flow Path ========= .. autosummary:: diff --git a/examples/user_guide/32_Flow_Length_MFD.ipynb b/examples/user_guide/32_Flow_Length_MFD.ipynb new file mode 100644 index 00000000..b07f8e41 --- /dev/null +++ b/examples/user_guide/32_Flow_Length_MFD.ipynb @@ -0,0 +1,199 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "a1b2c3d4", + "metadata": {}, + "source": [ + "# Flow Length (MFD)\n", + "\n", + "Flow length measures the distance water travels along its flow path. The MFD (Multiple Flow Direction) variant computes proportion-weighted path lengths using MFD fraction grids, where each cell distributes flow to all downslope neighbors.\n", + "\n", + "Two modes are supported:\n", + "- **Downstream**: Expected (weighted-average) distance from each cell to its outlet\n", + "- **Upstream**: Longest flow path from any drainage divide to each cell" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e5f6a7b8", + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "import matplotlib.pyplot as plt\n", + "import xarray as xr\n", + "\n", + "from xrspatial import flow_direction_mfd, flow_length_mfd, flow_accumulation_mfd\n", + "from xrspatial.terrain import generate_terrain" + ] + }, + { + "cell_type": "markdown", + "id": "c9d0e1f2", + "metadata": {}, + "source": [ + "## Generate terrain" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a3b4c5d6", + "metadata": {}, + "outputs": [], + "source": [ + "W = 400\n", + "H = 400\n", + "terrain = generate_terrain(\n", + " x_range=(-2, 2), y_range=(-2, 2), width=W, height=H, seed=42\n", + ")\n", + "\n", + "fig, ax = plt.subplots(figsize=(8, 6))\n", + "im = ax.imshow(terrain.values, cmap='terrain', origin='lower')\n", + "ax.set_title('Elevation')\n", + "plt.colorbar(im, ax=ax, label='meters')\n", + "plt.tight_layout()\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "id": "e7f8a9b0", + "metadata": {}, + "source": [ + "## Compute MFD flow direction and accumulation" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c1d2e3f4", + "metadata": {}, + "outputs": [], + "source": [ + "mfd_dir = flow_direction_mfd(terrain)\n", + "mfd_acc = flow_accumulation_mfd(mfd_dir)\n", + "\n", + "fig, ax = plt.subplots(figsize=(8, 6))\n", + "im = ax.imshow(np.log1p(mfd_acc.values), cmap='Blues', origin='lower')\n", + "ax.set_title('MFD flow accumulation (log scale)')\n", + "plt.colorbar(im, ax=ax, label='log(1 + accumulation)')\n", + "plt.tight_layout()\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "id": "a5b6c7d8", + "metadata": {}, + "source": [ + "## Downstream flow length\n", + "\n", + "Downstream flow length gives the proportion-weighted average distance from each cell to the outlet it drains to. Cells near outlets have short distances; cells far upstream have long distances." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e9f0a1b2", + "metadata": {}, + "outputs": [], + "source": [ + "downstream = flow_length_mfd(mfd_dir, direction='downstream')\n", + "\n", + "fig, ax = plt.subplots(figsize=(8, 6))\n", + "im = ax.imshow(downstream.values, cmap='viridis', origin='lower')\n", + "ax.set_title('MFD downstream flow length')\n", + "plt.colorbar(im, ax=ax, label='distance (coordinate units)')\n", + "plt.tight_layout()\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "id": "c3d4e5f6", + "metadata": {}, + "source": [ + "## Upstream flow length\n", + "\n", + "Upstream flow length gives the longest flow path distance from any drainage divide to each cell. Cells on divides have zero length; cells at outlets accumulate the full watershed length." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a7b8c9d0", + "metadata": {}, + "outputs": [], + "source": [ + "upstream = flow_length_mfd(mfd_dir, direction='upstream')\n", + "\n", + "fig, ax = plt.subplots(figsize=(8, 6))\n", + "im = ax.imshow(upstream.values, cmap='magma', origin='lower')\n", + "ax.set_title('MFD upstream flow length')\n", + "plt.colorbar(im, ax=ax, label='distance (coordinate units)')\n", + "plt.tight_layout()\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "id": "e1f2a3b4", + "metadata": {}, + "source": [ + "## Comparison: D8 vs MFD flow length\n", + "\n", + "MFD distributes flow across multiple neighbors, producing smoother flow length fields than D8 which routes everything through the single steepest neighbor." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c5d6e7f8", + "metadata": {}, + "outputs": [], + "source": [ + "from xrspatial import flow_direction, flow_length\n", + "\n", + "d8_dir = flow_direction(terrain)\n", + "d8_downstream = flow_length(d8_dir, direction='downstream')\n", + "\n", + "fig, axes = plt.subplots(1, 2, figsize=(14, 5))\n", + "\n", + "im0 = axes[0].imshow(d8_downstream.values, cmap='viridis', origin='lower')\n", + "axes[0].set_title('D8 downstream flow length')\n", + "plt.colorbar(im0, ax=axes[0], label='distance')\n", + "\n", + "im1 = axes[1].imshow(downstream.values, cmap='viridis', origin='lower')\n", + "axes[1].set_title('MFD downstream flow length')\n", + "plt.colorbar(im1, ax=axes[1], label='distance')\n", + "\n", + "plt.tight_layout()\n", + "plt.show()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.14.2" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} \ No newline at end of file