diff --git a/README.md b/README.md index 42ad6d2b..8c3ed1d0 100644 --- a/README.md +++ b/README.md @@ -293,8 +293,10 @@ In the GIS world, rasters are used for representing continuous phenomena (e.g. e | [Flow Direction (Dinf)](xrspatial/flow_direction_dinf.py) | Computes D-infinity flow direction as a continuous angle toward the steepest downslope facet | Tarboton 1997 | ✅️ | ✅️ | ✅️ | ✅️ | | [Flow Direction (MFD)](xrspatial/flow_direction_mfd.py) | Partitions flow to all downslope neighbors with an adaptive exponent (Qin et al. 2007) | Qin et al. 2007 | ✅️ | ✅️ | ✅️ | ✅️ | | [Flow Accumulation (D8)](xrspatial/flow_accumulation.py) | Counts upstream cells draining through each cell in a D8 flow direction grid | Jenson & Domingue 1988 | ✅️ | ✅️ | ✅️ | ✅️ | +| [Flow Accumulation (Dinf)](xrspatial/flow_accumulation_dinf.py) | Accumulates upstream area by splitting flow proportionally between two neighbors (Tarboton 1997) | Tarboton 1997 | ✅️ | ✅️ | ✅️ | 🔄 | | [Flow Accumulation (MFD)](xrspatial/flow_accumulation_mfd.py) | Accumulates upstream area through all MFD flow paths weighted by directional fractions | Qin et al. 2007 | ✅️ | ✅️ | ✅️ | 🔄 | | [Flow Length (D8)](xrspatial/flow_length.py) | Computes D8 flow path length from each cell to outlet (downstream) or from divide (upstream) | Standard (D8 tracing) | ✅️ | ✅️ | ✅️ | 🔄 | +| [Flow Length (Dinf)](xrspatial/flow_length_dinf.py) | Proportion-weighted flow path length using D-inf angle decomposition (downstream or upstream) | Tarboton 1997 | ✅️ | ✅️ | ✅️ | 🔄 | | [Flow Length (MFD)](xrspatial/flow_length_mfd.py) | Proportion-weighted flow path length using MFD fractions (downstream or upstream) | Qin et al. 2007 | ✅️ | ✅️ | ✅️ | 🔄 | | [Watershed](xrspatial/watershed.py) | Labels each cell with the pour point it drains to via D8 flow direction | Standard (D8 tracing) | ✅️ | ✅️ | ✅️ | ✅️ | | [Basins](xrspatial/watershed.py) | Delineates drainage basins by labeling each cell with its outlet ID | Standard (D8 tracing) | ✅️ | ✅️ | ✅️ | ✅️ | diff --git a/docs/source/reference/hydrology.rst b/docs/source/reference/hydrology.rst index 6cf753b0..44fa1ae2 100644 --- a/docs/source/reference/hydrology.rst +++ b/docs/source/reference/hydrology.rst @@ -53,6 +53,13 @@ Flow Length xrspatial.flow_length.flow_length +Flow Length (D-infinity) +======================== +.. autosummary:: + :toctree: _autosummary + + xrspatial.flow_length_dinf.flow_length_dinf + Flow Length (MFD) ================= .. autosummary:: diff --git a/xrspatial/__init__.py b/xrspatial/__init__.py index f9f60c6b..8de0a0dc 100644 --- a/xrspatial/__init__.py +++ b/xrspatial/__init__.py @@ -45,6 +45,7 @@ from xrspatial.flow_direction_dinf import flow_direction_dinf # noqa from xrspatial.flow_direction_mfd import flow_direction_mfd # noqa from xrspatial.flow_length import flow_length # noqa +from xrspatial.flow_length_dinf import flow_length_dinf # noqa from xrspatial.flow_length_mfd import flow_length_mfd # noqa from xrspatial.flow_path import flow_path # noqa from xrspatial.focal import mean # noqa diff --git a/xrspatial/flow_length_dinf.py b/xrspatial/flow_length_dinf.py new file mode 100644 index 00000000..4c8e0180 --- /dev/null +++ b/xrspatial/flow_length_dinf.py @@ -0,0 +1,938 @@ +"""D-infinity flow length: proportion-weighted distance from each cell to +outlet (downstream) or longest path from divide to cell (upstream). + +Algorithm +--------- +CPU : Kahn's BFS topological sort on the D-inf DAG -- O(N). + Downstream: reverse pass from outlets, accumulating weighted step + distance. Upstream: forward pass from divides, propagating max + distance. +GPU : CuPy-via-CPU (same strategy as flow_length.py). +Dask: iterative tile sweep with BoundaryStore boundary propagation. +""" + +from __future__ import annotations + +import math + +import numpy as np +import xarray as xr + +try: + import dask.array as da +except ImportError: + da = None + +from xrspatial._boundary_store import BoundaryStore +from xrspatial.flow_accumulation import _preprocess_tiles +from xrspatial.flow_accumulation_dinf import _angle_to_neighbors +from xrspatial.utils import ( + _validate_raster, + get_dataarray_resolution, + has_cuda_and_cupy, + is_cupy_array, + is_dask_cupy, + ngjit, +) +from xrspatial.dataset_support import supports_dataset + + +# ===================================================================== +# Step-distance helper +# ===================================================================== + +@ngjit +def _neighbor_dist(dy, dx, cellsize_x, cellsize_y): + """Return the travel distance for a D-inf neighbor offset.""" + ady = abs(dy) + adx = abs(dx) + if ady == 0 and adx == 0: + return 0.0 + if ady == 0: + return cellsize_x + if adx == 0: + return cellsize_y + return math.sqrt(cellsize_x * cellsize_x + cellsize_y * cellsize_y) + + +# ===================================================================== +# CPU kernels +# ===================================================================== + +@ngjit +def _flow_length_dinf_downstream_cpu(flow_dir, H, W, cellsize_x, cellsize_y): + """Downstream D-inf flow length: proportion-weighted average distance + from each cell to its outlet(s). + + Two-pass O(N): + 1. Kahn's BFS builds topological order (divides first). + 2. Reverse pass (outlets first): flow_len[cell] = + w1*(dist1 + flow_len[nb1]) + w2*(dist2 + flow_len[nb2]) + """ + in_degree = np.zeros((H, W), dtype=np.int32) + valid = np.zeros((H, W), dtype=np.int8) + flow_len = np.empty((H, W), dtype=np.float64) + + # Init + for r in range(H): + for c in range(W): + v = flow_dir[r, c] + if v == v: # not NaN + valid[r, c] = 1 + flow_len[r, c] = 0.0 + else: + flow_len[r, c] = np.nan + + # In-degrees + for r in range(H): + for c in range(W): + if valid[r, c] == 0: + continue + dy1, dx1, w1, dy2, dx2, w2 = _angle_to_neighbors(flow_dir[r, c]) + if w1 > 0.0: + nr, nc = r + dy1, c + dx1 + if 0 <= nr < H and 0 <= nc < W and valid[nr, nc] == 1: + in_degree[nr, nc] += 1 + if w2 > 0.0: + nr, nc = r + dy2, c + dx2 + if 0 <= nr < H and 0 <= nc < W and valid[nr, nc] == 1: + in_degree[nr, nc] += 1 + + # BFS topological order (divides first) + order_r = np.empty(H * W, dtype=np.int64) + order_c = np.empty(H * W, dtype=np.int64) + head = np.int64(0) + tail = np.int64(0) + + for r in range(H): + for c in range(W): + if valid[r, c] == 1 and in_degree[r, c] == 0: + order_r[tail] = r + order_c[tail] = c + tail += 1 + + while head < tail: + r = order_r[head] + c = order_c[head] + head += 1 + dy1, dx1, w1, dy2, dx2, w2 = _angle_to_neighbors(flow_dir[r, c]) + if w1 > 0.0: + nr, nc = r + dy1, c + dx1 + if 0 <= nr < H and 0 <= nc < W and valid[nr, nc] == 1: + in_degree[nr, nc] -= 1 + if in_degree[nr, nc] == 0: + order_r[tail] = nr + order_c[tail] = nc + tail += 1 + if w2 > 0.0: + nr, nc = r + dy2, c + dx2 + if 0 <= nr < H and 0 <= nc < W and valid[nr, nc] == 1: + in_degree[nr, nc] -= 1 + if in_degree[nr, nc] == 0: + order_r[tail] = nr + order_c[tail] = nc + tail += 1 + + # Reverse pass: outlets -> divides + for i in range(tail - 1, -1, -1): + r = order_r[i] + c = order_c[i] + dy1, dx1, w1, dy2, dx2, w2 = _angle_to_neighbors(flow_dir[r, c]) + total = 0.0 + + if w1 > 0.0: + d1 = _neighbor_dist(dy1, dx1, cellsize_x, cellsize_y) + nr, nc = r + dy1, c + dx1 + if 0 <= nr < H and 0 <= nc < W and valid[nr, nc] == 1: + total += w1 * (d1 + flow_len[nr, nc]) + else: + total += w1 * d1 # grid edge + + if w2 > 0.0: + d2 = _neighbor_dist(dy2, dx2, cellsize_x, cellsize_y) + nr, nc = r + dy2, c + dx2 + if 0 <= nr < H and 0 <= nc < W and valid[nr, nc] == 1: + total += w2 * (d2 + flow_len[nr, nc]) + else: + total += w2 * d2 # grid edge + + flow_len[r, c] = total + + return flow_len + + +@ngjit +def _flow_length_dinf_upstream_cpu(flow_dir, H, W, cellsize_x, cellsize_y): + """Upstream D-inf flow length: longest path from any divide to cell. + + Kahn's BFS from divides downstream, propagating max distance. + """ + in_degree = np.zeros((H, W), dtype=np.int32) + valid = np.zeros((H, W), dtype=np.int8) + flow_len = np.empty((H, W), dtype=np.float64) + + for r in range(H): + for c in range(W): + v = flow_dir[r, c] + if v == v: + valid[r, c] = 1 + flow_len[r, c] = 0.0 + else: + flow_len[r, c] = np.nan + + for r in range(H): + for c in range(W): + if valid[r, c] == 0: + continue + dy1, dx1, w1, dy2, dx2, w2 = _angle_to_neighbors(flow_dir[r, c]) + if w1 > 0.0: + nr, nc = r + dy1, c + dx1 + if 0 <= nr < H and 0 <= nc < W and valid[nr, nc] == 1: + in_degree[nr, nc] += 1 + if w2 > 0.0: + nr, nc = r + dy2, c + dx2 + if 0 <= nr < H and 0 <= nc < W and valid[nr, nc] == 1: + in_degree[nr, nc] += 1 + + # BFS queue + queue_r = np.empty(H * W, dtype=np.int64) + queue_c = np.empty(H * W, dtype=np.int64) + head = np.int64(0) + tail = np.int64(0) + + for r in range(H): + for c in range(W): + if valid[r, c] == 1 and in_degree[r, c] == 0: + queue_r[tail] = r + queue_c[tail] = c + tail += 1 + + while head < tail: + r = queue_r[head] + c = queue_c[head] + head += 1 + + dy1, dx1, w1, dy2, dx2, w2 = _angle_to_neighbors(flow_dir[r, c]) + + if w1 > 0.0: + nr, nc = r + dy1, c + dx1 + if 0 <= nr < H and 0 <= nc < W and valid[nr, nc] == 1: + d1 = _neighbor_dist(dy1, dx1, cellsize_x, cellsize_y) + candidate = flow_len[r, c] + d1 + if candidate > flow_len[nr, nc]: + flow_len[nr, nc] = candidate + in_degree[nr, nc] -= 1 + if in_degree[nr, nc] == 0: + queue_r[tail] = nr + queue_c[tail] = nc + tail += 1 + + if w2 > 0.0: + nr, nc = r + dy2, c + dx2 + if 0 <= nr < H and 0 <= nc < W and valid[nr, nc] == 1: + d2 = _neighbor_dist(dy2, dx2, cellsize_x, cellsize_y) + candidate = flow_len[r, c] + d2 + if candidate > flow_len[nr, nc]: + flow_len[nr, nc] = candidate + in_degree[nr, nc] -= 1 + if in_degree[nr, nc] == 0: + queue_r[tail] = nr + queue_c[tail] = nc + tail += 1 + + return flow_len + + +# ===================================================================== +# CuPy backend (via CPU) +# ===================================================================== + +def _flow_length_dinf_cupy(flow_dir_data, direction, cellsize_x, cellsize_y): + import cupy as cp + fd_np = flow_dir_data.get().astype(np.float64) + H, W = fd_np.shape + if direction == 'downstream': + out = _flow_length_dinf_downstream_cpu(fd_np, H, W, + cellsize_x, cellsize_y) + else: + out = _flow_length_dinf_upstream_cpu(fd_np, H, W, + cellsize_x, cellsize_y) + return cp.asarray(out) + + +# ===================================================================== +# Dask tile kernels +# ===================================================================== + +@ngjit +def _flow_length_dinf_downstream_tile(flow_dir, h, w, cellsize_x, cellsize_y, + seed_top, seed_bottom, + seed_left, seed_right, + seed_tl, seed_tr, seed_bl, seed_br): + """Downstream D-inf flow length for one tile with exit seeds.""" + in_degree = np.zeros((h, w), dtype=np.int32) + valid = np.zeros((h, w), dtype=np.int8) + flow_len = np.empty((h, w), dtype=np.float64) + + # Init + for r in range(h): + for c in range(w): + v = flow_dir[r, c] + if v == v: + valid[r, c] = 1 + flow_len[r, c] = 0.0 + else: + flow_len[r, c] = np.nan + + # In-degrees + for r in range(h): + for c in range(w): + if valid[r, c] == 0: + continue + dy1, dx1, w1, dy2, dx2, w2 = _angle_to_neighbors(flow_dir[r, c]) + if w1 > 0.0: + nr, nc = r + dy1, c + dx1 + if 0 <= nr < h and 0 <= nc < w and valid[nr, nc] == 1: + in_degree[nr, nc] += 1 + if w2 > 0.0: + nr, nc = r + dy2, c + dx2 + if 0 <= nr < h and 0 <= nc < w and valid[nr, nc] == 1: + in_degree[nr, nc] += 1 + + # BFS topological order + order_r = np.empty(h * w, dtype=np.int64) + order_c = np.empty(h * w, dtype=np.int64) + head = np.int64(0) + tail = np.int64(0) + + for r in range(h): + for c in range(w): + if valid[r, c] == 1 and in_degree[r, c] == 0: + order_r[tail] = r + order_c[tail] = c + tail += 1 + + while head < tail: + r = order_r[head] + c = order_c[head] + head += 1 + dy1, dx1, w1, dy2, dx2, w2 = _angle_to_neighbors(flow_dir[r, c]) + if w1 > 0.0: + nr, nc = r + dy1, c + dx1 + if 0 <= nr < h and 0 <= nc < w and valid[nr, nc] == 1: + in_degree[nr, nc] -= 1 + if in_degree[nr, nc] == 0: + order_r[tail] = nr + order_c[tail] = nc + tail += 1 + if w2 > 0.0: + nr, nc = r + dy2, c + dx2 + if 0 <= nr < h and 0 <= nc < w and valid[nr, nc] == 1: + in_degree[nr, nc] -= 1 + if in_degree[nr, nc] == 0: + order_r[tail] = nr + order_c[tail] = nc + tail += 1 + + # Reverse pass + for i in range(tail - 1, -1, -1): + r = order_r[i] + c = order_c[i] + dy1, dx1, w1, dy2, dx2, w2 = _angle_to_neighbors(flow_dir[r, c]) + total = 0.0 + + if w1 > 0.0: + d1 = _neighbor_dist(dy1, dx1, cellsize_x, cellsize_y) + nr, nc = r + dy1, c + dx1 + if 0 <= nr < h and 0 <= nc < w and valid[nr, nc] == 1: + total += w1 * (d1 + flow_len[nr, nc]) + else: + # Flows out of tile -- look up exit seed + exit_val = _get_exit_seed(nr, nc, h, w, + seed_top, seed_bottom, + seed_left, seed_right, + seed_tl, seed_tr, seed_bl, seed_br) + if exit_val == exit_val: # not NaN + total += w1 * (d1 + exit_val) + else: + total += w1 * d1 # grid edge + + if w2 > 0.0: + d2 = _neighbor_dist(dy2, dx2, cellsize_x, cellsize_y) + nr, nc = r + dy2, c + dx2 + if 0 <= nr < h and 0 <= nc < w and valid[nr, nc] == 1: + total += w2 * (d2 + flow_len[nr, nc]) + else: + exit_val = _get_exit_seed(nr, nc, h, w, + seed_top, seed_bottom, + seed_left, seed_right, + seed_tl, seed_tr, seed_bl, seed_br) + if exit_val == exit_val: + total += w2 * (d2 + exit_val) + else: + total += w2 * d2 + + flow_len[r, c] = total + + return flow_len + + +@ngjit +def _get_exit_seed(nr, nc, h, w, + seed_top, seed_bottom, seed_left, seed_right, + seed_tl, seed_tr, seed_bl, seed_br): + """Look up the exit seed for a cell flowing out of the tile.""" + if nr < 0 and nc < 0: + return seed_tl + if nr < 0 and nc >= w: + return seed_tr + if nr >= h and nc < 0: + return seed_bl + if nr >= h and nc >= w: + return seed_br + if nr < 0: + return seed_top[nc] + if nr >= h: + return seed_bottom[nc] + if nc < 0: + return seed_left[nr] + if nc >= w: + return seed_right[nr] + return np.nan + + +@ngjit +def _flow_length_dinf_upstream_tile(flow_dir, h, w, cellsize_x, cellsize_y, + seed_top, seed_bottom, + seed_left, seed_right, + seed_tl, seed_tr, seed_bl, seed_br): + """Upstream D-inf flow length for one tile with entry seeds.""" + in_degree = np.zeros((h, w), dtype=np.int32) + valid = np.zeros((h, w), dtype=np.int8) + flow_len = np.empty((h, w), dtype=np.float64) + + for r in range(h): + for c in range(w): + v = flow_dir[r, c] + if v == v: + valid[r, c] = 1 + flow_len[r, c] = 0.0 + else: + flow_len[r, c] = np.nan + + # Apply entry seeds (max with existing) + for c in range(w): + if valid[0, c] == 1 and seed_top[c] > flow_len[0, c]: + flow_len[0, c] = seed_top[c] + if valid[h - 1, c] == 1 and seed_bottom[c] > flow_len[h - 1, c]: + flow_len[h - 1, c] = seed_bottom[c] + for r in range(h): + if valid[r, 0] == 1 and seed_left[r] > flow_len[r, 0]: + flow_len[r, 0] = seed_left[r] + if valid[r, w - 1] == 1 and seed_right[r] > flow_len[r, w - 1]: + flow_len[r, w - 1] = seed_right[r] + + # Corner seeds + if valid[0, 0] == 1 and seed_tl > flow_len[0, 0]: + flow_len[0, 0] = seed_tl + if valid[0, w - 1] == 1 and seed_tr > flow_len[0, w - 1]: + flow_len[0, w - 1] = seed_tr + if valid[h - 1, 0] == 1 and seed_bl > flow_len[h - 1, 0]: + flow_len[h - 1, 0] = seed_bl + if valid[h - 1, w - 1] == 1 and seed_br > flow_len[h - 1, w - 1]: + flow_len[h - 1, w - 1] = seed_br + + # In-degrees + for r in range(h): + for c in range(w): + if valid[r, c] == 0: + continue + dy1, dx1, w1, dy2, dx2, w2 = _angle_to_neighbors(flow_dir[r, c]) + if w1 > 0.0: + nr, nc = r + dy1, c + dx1 + if 0 <= nr < h and 0 <= nc < w and valid[nr, nc] == 1: + in_degree[nr, nc] += 1 + if w2 > 0.0: + nr, nc = r + dy2, c + dx2 + if 0 <= nr < h and 0 <= nc < w and valid[nr, nc] == 1: + in_degree[nr, nc] += 1 + + # BFS from divides + queue_r = np.empty(h * w, dtype=np.int64) + queue_c = np.empty(h * w, dtype=np.int64) + head = np.int64(0) + tail = np.int64(0) + + for r in range(h): + for c in range(w): + if valid[r, c] == 1 and in_degree[r, c] == 0: + queue_r[tail] = r + queue_c[tail] = c + tail += 1 + + while head < tail: + r = queue_r[head] + c = queue_c[head] + head += 1 + + dy1, dx1, w1, dy2, dx2, w2 = _angle_to_neighbors(flow_dir[r, c]) + + if w1 > 0.0: + nr, nc = r + dy1, c + dx1 + if 0 <= nr < h and 0 <= nc < w and valid[nr, nc] == 1: + d1 = _neighbor_dist(dy1, dx1, cellsize_x, cellsize_y) + candidate = flow_len[r, c] + d1 + if candidate > flow_len[nr, nc]: + flow_len[nr, nc] = candidate + in_degree[nr, nc] -= 1 + if in_degree[nr, nc] == 0: + queue_r[tail] = nr + queue_c[tail] = nc + tail += 1 + + if w2 > 0.0: + nr, nc = r + dy2, c + dx2 + if 0 <= nr < h and 0 <= nc < w and valid[nr, nc] == 1: + d2 = _neighbor_dist(dy2, dx2, cellsize_x, cellsize_y) + candidate = flow_len[r, c] + d2 + if candidate > flow_len[nr, nc]: + flow_len[nr, nc] = candidate + in_degree[nr, nc] -= 1 + if in_degree[nr, nc] == 0: + queue_r[tail] = nr + queue_c[tail] = nc + tail += 1 + + return flow_len + + +# ===================================================================== +# Dask iterative tile sweep +# ===================================================================== + +def _compute_exit_seeds_downstream(iy, ix, boundaries, flow_bdry, + chunks_y, chunks_x, + n_tile_y, n_tile_x): + """For downstream: provide flow_length values at destination cells + in adjacent tiles. Same approach as flow_length_mfd -- seed arrays + mirror the adjacent tile's facing boundary.""" + tile_h = chunks_y[iy] + tile_w = chunks_x[ix] + + seed_top = np.full(tile_w, np.nan) + seed_bottom = np.full(tile_w, np.nan) + seed_left = np.full(tile_h, np.nan) + seed_right = np.full(tile_h, np.nan) + seed_tl = np.nan + seed_tr = np.nan + seed_bl = np.nan + seed_br = np.nan + + if iy > 0: + nb = boundaries.get('bottom', iy - 1, ix) + seed_top[:len(nb)] = nb + if iy < n_tile_y - 1: + nb = boundaries.get('top', iy + 1, ix) + seed_bottom[:len(nb)] = nb + if ix > 0: + nb = boundaries.get('right', iy, ix - 1) + seed_left[:len(nb)] = nb + if ix < n_tile_x - 1: + nb = boundaries.get('left', iy, ix + 1) + seed_right[:len(nb)] = nb + + # Diagonal corners + if iy > 0 and ix > 0: + seed_tl = float(boundaries.get('bottom', iy - 1, ix - 1)[-1]) + if iy > 0 and ix < n_tile_x - 1: + seed_tr = float(boundaries.get('bottom', iy - 1, ix + 1)[0]) + if iy < n_tile_y - 1 and ix > 0: + seed_bl = float(boundaries.get('top', iy + 1, ix - 1)[-1]) + if iy < n_tile_y - 1 and ix < n_tile_x - 1: + seed_br = float(boundaries.get('top', iy + 1, ix + 1)[0]) + + return (seed_top, seed_bottom, seed_left, seed_right, + seed_tl, seed_tr, seed_bl, seed_br) + + +def _compute_entry_seeds_upstream(iy, ix, boundaries, flow_bdry, + chunks_y, chunks_x, + n_tile_y, n_tile_x, + cellsize_x, cellsize_y): + """For upstream: check which adjacent cells flow INTO this tile, + seed with max(existing, neighbor_upstream + step_dist).""" + tile_h = chunks_y[iy] + tile_w = chunks_x[ix] + + seed_top = np.zeros(tile_w, dtype=np.float64) + seed_bottom = np.zeros(tile_w, dtype=np.float64) + seed_left = np.zeros(tile_h, dtype=np.float64) + seed_right = np.zeros(tile_h, dtype=np.float64) + seed_tl = 0.0 + seed_tr = 0.0 + seed_bl = 0.0 + seed_br = 0.0 + + # Top edge: bottom row of tile above flows south into this tile + if iy > 0: + nb_fdir = flow_bdry.get('bottom', iy - 1, ix) + nb_val = boundaries.get('bottom', iy - 1, ix) + for c in range(len(nb_fdir)): + dy1, dx1, w1, dy2, dx2, w2 = _angle_to_neighbors(nb_fdir[c]) + if w1 > 0.0 and dy1 == 1: + d = _neighbor_dist(dy1, dx1, cellsize_x, cellsize_y) + tc = c + dx1 + if 0 <= tc < tile_w: + v = nb_val[c] + d + if v > seed_top[tc]: + seed_top[tc] = v + if w2 > 0.0 and dy2 == 1: + d = _neighbor_dist(dy2, dx2, cellsize_x, cellsize_y) + tc = c + dx2 + if 0 <= tc < tile_w: + v = nb_val[c] + d + if v > seed_top[tc]: + seed_top[tc] = v + + # Bottom edge: top row of tile below flows north + if iy < n_tile_y - 1: + nb_fdir = flow_bdry.get('top', iy + 1, ix) + nb_val = boundaries.get('top', iy + 1, ix) + for c in range(len(nb_fdir)): + dy1, dx1, w1, dy2, dx2, w2 = _angle_to_neighbors(nb_fdir[c]) + if w1 > 0.0 and dy1 == -1: + d = _neighbor_dist(dy1, dx1, cellsize_x, cellsize_y) + tc = c + dx1 + if 0 <= tc < tile_w: + v = nb_val[c] + d + if v > seed_bottom[tc]: + seed_bottom[tc] = v + if w2 > 0.0 and dy2 == -1: + d = _neighbor_dist(dy2, dx2, cellsize_x, cellsize_y) + tc = c + dx2 + if 0 <= tc < tile_w: + v = nb_val[c] + d + if v > seed_bottom[tc]: + seed_bottom[tc] = v + + # Left edge: right col of tile to the left flows east + if ix > 0: + nb_fdir = flow_bdry.get('right', iy, ix - 1) + nb_val = boundaries.get('right', iy, ix - 1) + for r in range(len(nb_fdir)): + dy1, dx1, w1, dy2, dx2, w2 = _angle_to_neighbors(nb_fdir[r]) + if w1 > 0.0 and dx1 == 1: + d = _neighbor_dist(dy1, dx1, cellsize_x, cellsize_y) + tr = r + dy1 + if 0 <= tr < tile_h: + v = nb_val[r] + d + if v > seed_left[tr]: + seed_left[tr] = v + if w2 > 0.0 and dx2 == 1: + d = _neighbor_dist(dy2, dx2, cellsize_x, cellsize_y) + tr = r + dy2 + if 0 <= tr < tile_h: + v = nb_val[r] + d + if v > seed_left[tr]: + seed_left[tr] = v + + # Right edge: left col of tile to the right flows west + if ix < n_tile_x - 1: + nb_fdir = flow_bdry.get('left', iy, ix + 1) + nb_val = boundaries.get('left', iy, ix + 1) + for r in range(len(nb_fdir)): + dy1, dx1, w1, dy2, dx2, w2 = _angle_to_neighbors(nb_fdir[r]) + if w1 > 0.0 and dx1 == -1: + d = _neighbor_dist(dy1, dx1, cellsize_x, cellsize_y) + tr = r + dy1 + if 0 <= tr < tile_h: + v = nb_val[r] + d + if v > seed_right[tr]: + seed_right[tr] = v + if w2 > 0.0 and dx2 == -1: + d = _neighbor_dist(dy2, dx2, cellsize_x, cellsize_y) + tr = r + dy2 + if 0 <= tr < tile_h: + v = nb_val[r] + d + if v > seed_right[tr]: + seed_right[tr] = v + + # Diagonal corners + diag = math.sqrt(cellsize_x ** 2 + cellsize_y ** 2) + + if iy > 0 and ix > 0: + fv = flow_bdry.get('bottom', iy - 1, ix - 1)[-1] + dy1, dx1, w1, dy2, dx2, w2 = _angle_to_neighbors(fv) + av = float(boundaries.get('bottom', iy - 1, ix - 1)[-1]) + if w1 > 0.0 and dy1 == 1 and dx1 == 1: + seed_tl = max(seed_tl, av + diag) + if w2 > 0.0 and dy2 == 1 and dx2 == 1: + seed_tl = max(seed_tl, av + diag) + + if iy > 0 and ix < n_tile_x - 1: + fv = flow_bdry.get('bottom', iy - 1, ix + 1)[0] + dy1, dx1, w1, dy2, dx2, w2 = _angle_to_neighbors(fv) + av = float(boundaries.get('bottom', iy - 1, ix + 1)[0]) + if w1 > 0.0 and dy1 == 1 and dx1 == -1: + seed_tr = max(seed_tr, av + diag) + if w2 > 0.0 and dy2 == 1 and dx2 == -1: + seed_tr = max(seed_tr, av + diag) + + if iy < n_tile_y - 1 and ix > 0: + fv = flow_bdry.get('top', iy + 1, ix - 1)[-1] + dy1, dx1, w1, dy2, dx2, w2 = _angle_to_neighbors(fv) + av = float(boundaries.get('top', iy + 1, ix - 1)[-1]) + if w1 > 0.0 and dy1 == -1 and dx1 == 1: + seed_bl = max(seed_bl, av + diag) + if w2 > 0.0 and dy2 == -1 and dx2 == 1: + seed_bl = max(seed_bl, av + diag) + + if iy < n_tile_y - 1 and ix < n_tile_x - 1: + fv = flow_bdry.get('top', iy + 1, ix + 1)[0] + dy1, dx1, w1, dy2, dx2, w2 = _angle_to_neighbors(fv) + av = float(boundaries.get('top', iy + 1, ix + 1)[0]) + if w1 > 0.0 and dy1 == -1 and dx1 == -1: + seed_br = max(seed_br, av + diag) + if w2 > 0.0 and dy2 == -1 and dx2 == -1: + seed_br = max(seed_br, av + diag) + + return (seed_top, seed_bottom, seed_left, seed_right, + seed_tl, seed_tr, seed_bl, seed_br) + + +def _process_tile_downstream(iy, ix, flow_dir_da, boundaries, flow_bdry, + chunks_y, chunks_x, n_tile_y, n_tile_x, + cellsize_x, cellsize_y): + chunk = np.asarray( + flow_dir_da.blocks[iy, ix].compute(), dtype=np.float64) + h, w = chunk.shape + + seeds = _compute_exit_seeds_downstream( + iy, ix, boundaries, flow_bdry, + chunks_y, chunks_x, n_tile_y, n_tile_x) + + result = _flow_length_dinf_downstream_tile( + chunk, h, w, cellsize_x, cellsize_y, *seeds) + + change = 0.0 + for side, arr in (('top', result[0, :]), ('bottom', result[-1, :]), + ('left', result[:, 0]), ('right', result[:, -1])): + new = arr.copy() + old = boundaries.get(side, iy, ix) + with np.errstate(invalid='ignore'): + diff = np.abs(new - old) + diff = np.where(np.isnan(diff), 0.0, diff) + m = float(np.max(diff)) + if m > change: + change = m + boundaries.set(side, iy, ix, new) + + return change + + +def _process_tile_upstream(iy, ix, flow_dir_da, boundaries, flow_bdry, + chunks_y, chunks_x, n_tile_y, n_tile_x, + cellsize_x, cellsize_y): + chunk = np.asarray( + flow_dir_da.blocks[iy, ix].compute(), dtype=np.float64) + h, w = chunk.shape + + seeds = _compute_entry_seeds_upstream( + iy, ix, boundaries, flow_bdry, + chunks_y, chunks_x, n_tile_y, n_tile_x, + cellsize_x, cellsize_y) + + result = _flow_length_dinf_upstream_tile( + chunk, h, w, cellsize_x, cellsize_y, *seeds) + + change = 0.0 + for side, arr in (('top', result[0, :]), ('bottom', result[-1, :]), + ('left', result[:, 0]), ('right', result[:, -1])): + new = np.where(np.isnan(arr), 0.0, arr).copy() + old = boundaries.get(side, iy, ix) + with np.errstate(invalid='ignore'): + diff = np.abs(new - old) + diff = np.where(np.isnan(diff), 0.0, diff) + m = float(np.max(diff)) + if m > change: + change = m + boundaries.set(side, iy, ix, new) + + return change + + +def _flow_length_dinf_dask_iterative(flow_dir_da, direction, + cellsize_x, cellsize_y): + chunks_y = flow_dir_da.chunks[0] + chunks_x = flow_dir_da.chunks[1] + n_tile_y = len(chunks_y) + n_tile_x = len(chunks_x) + + flow_bdry = _preprocess_tiles(flow_dir_da, chunks_y, chunks_x) + flow_bdry = flow_bdry.snapshot() + + fill = np.nan if direction == 'downstream' else 0.0 + boundaries = BoundaryStore(chunks_y, chunks_x, fill_value=fill) + + process_fn = (_process_tile_downstream if direction == 'downstream' + else _process_tile_upstream) + + max_iterations = max(n_tile_y, n_tile_x) * 2 + 10 + + for _iteration in range(max_iterations): + max_change = 0.0 + + for iy in range(n_tile_y): + for ix in range(n_tile_x): + c = process_fn(iy, ix, flow_dir_da, boundaries, flow_bdry, + chunks_y, chunks_x, n_tile_y, n_tile_x, + cellsize_x, cellsize_y) + if c > max_change: + max_change = c + + for iy in reversed(range(n_tile_y)): + for ix in reversed(range(n_tile_x)): + c = process_fn(iy, ix, flow_dir_da, boundaries, flow_bdry, + chunks_y, chunks_x, n_tile_y, n_tile_x, + cellsize_x, cellsize_y) + if c > max_change: + max_change = c + + if max_change == 0.0: + break + + boundaries = boundaries.snapshot() + + return _assemble_result(flow_dir_da, boundaries, flow_bdry, + chunks_y, chunks_x, n_tile_y, n_tile_x, + direction, cellsize_x, cellsize_y) + + +def _assemble_result(flow_dir_da, boundaries, flow_bdry, + chunks_y, chunks_x, n_tile_y, n_tile_x, + direction, cellsize_x, cellsize_y): + rows = [] + for iy in range(n_tile_y): + row = [] + for ix in range(n_tile_x): + chunk = np.asarray( + flow_dir_da.blocks[iy, ix].compute(), dtype=np.float64) + h, w = chunk.shape + + if direction == 'downstream': + seeds = _compute_exit_seeds_downstream( + iy, ix, boundaries, flow_bdry, + chunks_y, chunks_x, n_tile_y, n_tile_x) + tile = _flow_length_dinf_downstream_tile( + chunk, h, w, cellsize_x, cellsize_y, *seeds) + else: + seeds = _compute_entry_seeds_upstream( + iy, ix, boundaries, flow_bdry, + chunks_y, chunks_x, n_tile_y, n_tile_x, + cellsize_x, cellsize_y) + tile = _flow_length_dinf_upstream_tile( + chunk, h, w, cellsize_x, cellsize_y, *seeds) + + row.append(da.from_array(tile, chunks=tile.shape)) + rows.append(row) + + return da.block(rows) + + +def _flow_length_dinf_dask_cupy(flow_dir_da, direction, + cellsize_x, cellsize_y): + import cupy as cp + + flow_dir_np = flow_dir_da.map_blocks( + lambda b: b.get(), dtype=flow_dir_da.dtype, + meta=np.array((), dtype=flow_dir_da.dtype), + ) + result = _flow_length_dinf_dask_iterative( + flow_dir_np, direction, cellsize_x, cellsize_y) + return result.map_blocks( + cp.asarray, dtype=result.dtype, + meta=cp.array((), dtype=result.dtype), + ) + + +# ===================================================================== +# Public API +# ===================================================================== + +@supports_dataset +def flow_length_dinf(flow_dir: xr.DataArray, + direction: str = 'downstream', + name: str = 'flow_length_dinf') -> xr.DataArray: + """Compute flow length from a D-infinity flow direction grid. + + Parameters + ---------- + flow_dir : xarray.DataArray or xr.Dataset + 2-D D-infinity flow direction grid as returned by + ``flow_direction_dinf``. Values are angles in radians + ``[0, 2*pi)``, ``-1.0`` for pits/flats, ``NaN`` for nodata. + Supported backends: NumPy, CuPy, NumPy-backed Dask, + CuPy-backed Dask. + If a Dataset is passed, the operation is applied to each + data variable independently. + direction : str, default 'downstream' + ``'downstream'``: proportion-weighted average distance from + each cell to its outlet. Flow is split between two neighbors + following Tarboton (1997) angle decomposition. + ``'upstream'``: longest path from any divide to each cell. + name : str, default 'flow_length_dinf' + Name of output DataArray. + + Returns + ------- + xarray.DataArray or xr.Dataset + 2-D float64 array of flow length values in coordinate units. + NaN where flow_dir is NaN. + + References + ---------- + Tarboton, D.G. (1997). A new method for the determination of flow + directions and upslope areas in grid digital elevation models. + Water Resources Research, 33(2), 309-319. + """ + _validate_raster(flow_dir, func_name='flow_length_dinf', name='flow_dir') + + if direction not in ('downstream', 'upstream'): + raise ValueError( + f"direction must be 'downstream' or 'upstream', got {direction!r}") + + cellsize_x, cellsize_y = get_dataarray_resolution(flow_dir) + cellsize_x = abs(cellsize_x) + cellsize_y = abs(cellsize_y) + + data = flow_dir.data + + if isinstance(data, np.ndarray): + fd = data.astype(np.float64) + H, W = fd.shape + if direction == 'downstream': + out = _flow_length_dinf_downstream_cpu( + fd, H, W, cellsize_x, cellsize_y) + else: + out = _flow_length_dinf_upstream_cpu( + fd, H, W, cellsize_x, cellsize_y) + + elif has_cuda_and_cupy() and is_cupy_array(data): + out = _flow_length_dinf_cupy(data, direction, cellsize_x, cellsize_y) + + elif has_cuda_and_cupy() and is_dask_cupy(flow_dir): + out = _flow_length_dinf_dask_cupy(data, direction, + cellsize_x, cellsize_y) + + elif da is not None and isinstance(data, da.Array): + out = _flow_length_dinf_dask_iterative(data, direction, + cellsize_x, cellsize_y) + + else: + raise TypeError(f"Unsupported array type: {type(data)}") + + return xr.DataArray(out, + name=name, + coords=flow_dir.coords, + dims=flow_dir.dims, + attrs=flow_dir.attrs) diff --git a/xrspatial/tests/test_flow_length_dinf.py b/xrspatial/tests/test_flow_length_dinf.py new file mode 100644 index 00000000..bf2ff158 --- /dev/null +++ b/xrspatial/tests/test_flow_length_dinf.py @@ -0,0 +1,307 @@ +"""Tests for xrspatial.flow_length_dinf.""" + +import math + +import numpy as np +import pytest +import xarray as xr + +from xrspatial.flow_length_dinf import flow_length_dinf +from xrspatial.tests.general_checks import ( + create_test_raster, + cuda_and_cupy_available, + dask_array_available, +) + + +def _make_dinf_raster(data, backend='numpy', chunks=(3, 3), + res=(1.0, 1.0)): + """Build a 2-D D-inf DataArray from angle data.""" + attrs = {'res': res} + return create_test_raster(data, backend=backend, name='fdir_dinf', + attrs=attrs, chunks=chunks) + + +class TestDownstreamCardinalEast: + """All cells flow east (angle=0). Last col = pit.""" + + def test_downstream_east(self): + # angle=0 means pure east, w1=1.0, w2=0.0 + fd = np.full((3, 5), 0.0, dtype=np.float64) + fd[:, -1] = -1.0 # pits at right edge + raster = _make_dinf_raster(fd) + result = flow_length_dinf(raster, direction='downstream') + for r in range(3): + for c in range(5): + expected = (4 - c) * 1.0 + np.testing.assert_allclose( + result.data[r, c], expected, rtol=1e-10, + err_msg=f"Failed at ({r},{c})") + + +class TestUpstreamCardinalEast: + """All cells flow east; upstream length = col * cellsize_x.""" + + def test_upstream_east(self): + fd = np.full((3, 5), 0.0, dtype=np.float64) + fd[:, -1] = -1.0 + raster = _make_dinf_raster(fd) + result = flow_length_dinf(raster, direction='upstream') + for r in range(3): + for c in range(5): + expected = c * 1.0 + np.testing.assert_allclose( + result.data[r, c], expected, rtol=1e-10, + err_msg=f"Failed at ({r},{c})") + + +class TestDownstreamCardinalSouth: + """All cells flow south (angle=3*pi/2). Last row = pit.""" + + def test_downstream_south(self): + # S direction: angle = 3*pi/2, k=6, alpha=0, w1=1.0 for S + angle_s = 3.0 * math.pi / 2.0 + fd = np.full((4, 3), angle_s, dtype=np.float64) + fd[-1, :] = -1.0 + raster = _make_dinf_raster(fd) + result = flow_length_dinf(raster, direction='downstream') + for r in range(4): + for c in range(3): + expected = (3 - r) * 1.0 + np.testing.assert_allclose( + result.data[r, c], expected, rtol=1e-10, + err_msg=f"Failed at ({r},{c})") + + +class TestDownstreamDiagonalNE: + """All cells flow NE (angle=pi/4). Pit at top-right corner.""" + + def test_downstream_ne(self): + # NE: angle = pi/4, k=1, alpha=0, w1=1.0 for NE + angle_ne = math.pi / 4.0 + H, W = 4, 4 + fd = np.full((H, W), angle_ne, dtype=np.float64) + fd[0, W - 1] = -1.0 # pit at (0, 3) + # Cells that would flow OOB: treat as pits too + fd[0, :] = -1.0 # top row flows OOB except corner handled + fd[:, W - 1] = -1.0 # right col flows OOB + # Actually let's simplify: only interior cells flow NE + fd[0, W - 1] = -1.0 # the outlet + + raster = _make_dinf_raster(fd) + result = flow_length_dinf(raster, direction='downstream') + diag = math.sqrt(2.0) + # (3,0): 3 diagonal steps to (0,3) + np.testing.assert_allclose(result.data[3, 0], 3 * diag, rtol=1e-10) + # (2,1): 2 diagonal steps + np.testing.assert_allclose(result.data[2, 1], 2 * diag, rtol=1e-10) + # (0,3): pit = 0 + np.testing.assert_allclose(result.data[0, 3], 0.0, rtol=1e-10) + + +class TestDownstreamProportionalSplit: + """Cell with angle pi/8 splits 50/50 between E and NE.""" + + def test_equal_split(self): + # angle=pi/8: k=0, alpha=pi/8, w1 = 1 - (pi/8)/(pi/4) = 0.5, w2=0.5 + angle = math.pi / 8.0 + # 3x3 grid: center sends 0.5 to E(1,2) and 0.5 to NE(0,2) + fd = np.full((3, 3), -1.0, dtype=np.float64) # all pits + fd[1, 1] = angle + + raster = _make_dinf_raster(fd) + result = flow_length_dinf(raster, direction='downstream') + + diag = math.sqrt(2.0) + # center: 0.5 * (1.0 + 0) + 0.5 * (diag + 0) + expected = 0.5 * 1.0 + 0.5 * diag + np.testing.assert_allclose(result.data[1, 1], expected, rtol=1e-10) + + +class TestUpstreamMaxPath: + """Upstream takes the longest path at junctions.""" + + def test_upstream_max_path(self): + # 2x4 grid: + # Row 0: all flow east (angle=0) + # Row 1: cells 0,1 flow east, cell 2 flows north (angle=pi/2) + fd = np.full((2, 4), 0.0, dtype=np.float64) + fd[0, 3] = -1.0 # pit + fd[1, 3] = -1.0 # pit + fd[1, 2] = math.pi / 2.0 # N: flows to (0,2) + + raster = _make_dinf_raster(fd) + result = flow_length_dinf(raster, direction='upstream') + + # (0,0): divide -> 0 + assert result.data[0, 0] == 0.0 + # (0,1): from (0,0) -> 1 + np.testing.assert_allclose(result.data[0, 1], 1.0, rtol=1e-10) + # (1,2): upstream from (1,0)->(1,1)->(1,2) = 2 + np.testing.assert_allclose(result.data[1, 2], 2.0, rtol=1e-10) + # (0,2): max((0,1)+1=2, (1,2)+1=3) = 3 + np.testing.assert_allclose(result.data[0, 2], 3.0, rtol=1e-10) + + +class TestNanHandling: + """NaN flow_dir should produce NaN output.""" + + def test_nan_cells(self): + fd = np.array([ + [0.0, np.nan, -1.0], + [0.0, 0.0, -1.0], + ], dtype=np.float64) + raster = _make_dinf_raster(fd) + result_ds = flow_length_dinf(raster, direction='downstream') + result_us = flow_length_dinf(raster, direction='upstream') + assert np.isnan(result_ds.data[0, 1]) + assert np.isnan(result_us.data[0, 1]) + assert not np.isnan(result_ds.data[1, 0]) + + +class TestPitCells: + """Pit cells (angle=-1) have flow_length = 0.""" + + def test_pit_zero(self): + fd = np.full((2, 2), -1.0, dtype=np.float64) + raster = _make_dinf_raster(fd) + result = flow_length_dinf(raster, direction='downstream') + for r in range(2): + for c in range(2): + assert result.data[r, c] == 0.0 + + +class TestDirectionValidation: + """Invalid direction string should raise.""" + + def test_invalid_direction(self): + fd = np.full((2, 2), -1.0, dtype=np.float64) + raster = _make_dinf_raster(fd) + with pytest.raises(ValueError, match="direction"): + flow_length_dinf(raster, direction='sideways') + + +class TestNonSquareCells: + """Test with rectangular cells.""" + + def test_rectangular_east(self): + fd = np.array([[0.0, 0.0, -1.0]], dtype=np.float64) + raster = _make_dinf_raster(fd, res=(2.0, 1.0)) + result = flow_length_dinf(raster, direction='downstream') + np.testing.assert_allclose(result.data[0, 0], 4.0, rtol=1e-10) + np.testing.assert_allclose(result.data[0, 1], 2.0, rtol=1e-10) + np.testing.assert_allclose(result.data[0, 2], 0.0, rtol=1e-10) + + +class TestFlowDirectionDinfIntegration: + """Integration test with actual flow_direction_dinf output.""" + + def test_bowl_downstream(self): + from xrspatial.flow_direction_dinf import flow_direction_dinf + + elev = np.array([ + [9, 8, 9], + [8, 1, 8], + [9, 8, 9], + ], dtype=np.float64) + elev_r = create_test_raster(elev, backend='numpy', name='elev') + dinf = flow_direction_dinf(elev_r) + result = flow_length_dinf(dinf, direction='downstream') + + # Edge cells are NaN (D-inf needs all 8 neighbors) + assert np.isnan(result.data[0, 0]) + + +@dask_array_available +class TestFlowLengthDinfDask: + """Cross-backend: numpy vs dask for both directions.""" + + @pytest.mark.parametrize("chunks", [ + (2, 2), (3, 3), (2, 5), (5, 2), + ]) + @pytest.mark.parametrize("direction", ['downstream', 'upstream']) + def test_numpy_equals_dask(self, chunks, direction): + from xrspatial.flow_direction_dinf import flow_direction_dinf + + np.random.seed(123) + elev = np.random.uniform(0, 100, (6, 6)).astype(np.float64) + elev_r = create_test_raster(elev, backend='numpy', name='elev') + dinf = flow_direction_dinf(elev_r) + fd_data = dinf.data.astype(np.float64) + + np_raster = _make_dinf_raster(fd_data, backend='numpy') + da_raster = _make_dinf_raster(fd_data, backend='dask', chunks=chunks) + + result_np = flow_length_dinf(np_raster, direction=direction) + result_da = flow_length_dinf(da_raster, direction=direction) + + np.testing.assert_allclose( + result_np.data, result_da.data.compute(), + equal_nan=True, rtol=1e-10) + + @pytest.mark.parametrize("direction", ['downstream', 'upstream']) + def test_cross_tile_flow(self, direction): + """Flow crossing tile boundaries should compute correctly.""" + fd = np.full((2, 6), 0.0, dtype=np.float64) + fd[:, -1] = -1.0 + np_raster = _make_dinf_raster(fd, backend='numpy') + da_raster = _make_dinf_raster(fd, backend='dask', chunks=(2, 2)) + + result_np = flow_length_dinf(np_raster, direction=direction) + result_da = flow_length_dinf(da_raster, direction=direction) + + np.testing.assert_allclose( + result_np.data, result_da.data.compute(), + equal_nan=True, rtol=1e-10) + + +@cuda_and_cupy_available +class TestFlowLengthDinfCuPy: + """Cross-backend: numpy vs cupy.""" + + @pytest.mark.parametrize("direction", ['downstream', 'upstream']) + def test_numpy_equals_cupy(self, direction): + from xrspatial.flow_direction_dinf import flow_direction_dinf + + np.random.seed(42) + elev = np.random.uniform(0, 100, (6, 6)).astype(np.float64) + elev_r = create_test_raster(elev, backend='numpy', name='elev') + dinf = flow_direction_dinf(elev_r) + fd_data = dinf.data.astype(np.float64) + + np_raster = _make_dinf_raster(fd_data, backend='numpy') + cu_raster = _make_dinf_raster(fd_data, backend='cupy') + + result_np = flow_length_dinf(np_raster, direction=direction) + result_cu = flow_length_dinf(cu_raster, direction=direction) + + np.testing.assert_allclose( + result_np.data, result_cu.data.get(), + equal_nan=True, rtol=1e-10) + + +@cuda_and_cupy_available +@dask_array_available +class TestFlowLengthDinfDaskCuPy: + """Cross-backend: numpy vs dask+cupy.""" + + @pytest.mark.parametrize("direction", ['downstream', 'upstream']) + def test_numpy_equals_dask_cupy(self, direction): + from xrspatial.flow_direction_dinf import flow_direction_dinf + + np.random.seed(42) + elev = np.random.uniform(0, 100, (6, 6)).astype(np.float64) + elev_r = create_test_raster(elev, backend='numpy', name='elev') + dinf = flow_direction_dinf(elev_r) + fd_data = dinf.data.astype(np.float64) + + np_raster = _make_dinf_raster(fd_data, backend='numpy') + dc_raster = _make_dinf_raster(fd_data, backend='dask+cupy', + chunks=(3, 3)) + + result_np = flow_length_dinf(np_raster, direction=direction) + result_dc = flow_length_dinf(dc_raster, direction=direction) + + np.testing.assert_allclose( + result_np.data, result_dc.data.compute().get(), + equal_nan=True, rtol=1e-10)