From 5110a9fbdd8d1cf44115f075fdfb4b8c12413503 Mon Sep 17 00:00:00 2001 From: Ryan Mahoney Date: Tue, 10 Mar 2026 00:06:05 -0400 Subject: [PATCH 1/4] perf: optimize slowest validators (shape_to_stop_matching, stop_time_travel_speed) - Optimize shape_to_stop_matching (2.631s) and stop_time_travel_speed (0.906s) - Improve shape_to_stop_matching_util with more efficient algorithms - Update dependencies in pyproject.toml for performance gains - Add tests for optimized utility functions --- pyproject.toml | 1 + specops/slowest.md | 10 +- .../validators/shape_to_stop_matching.py | 112 +++- .../validators/shape_to_stop_matching_util.py | 611 ++++++++++++++++-- .../validators/stop_time_travel_speed.py | 291 +++++---- .../test_shape_to_stop_matching_util.py | 216 +++++++ uv.lock | 2 + 7 files changed, 1066 insertions(+), 177 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index a930a0979d..1f64f27560 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,6 +5,7 @@ description = "GTFS feed validator using Python and Polars" requires-python = ">=3.12" dependencies = [ "polars>=1.0", + "numpy>=1.24", "click>=8.0", "jinja2>=3.0", "httpx>=0.27", diff --git a/specops/slowest.md b/specops/slowest.md index 2317321874..b070b77765 100644 --- a/specops/slowest.md +++ b/specops/slowest.md @@ -1,5 +1,5 @@ -1. shape_to_stop_matching — 3.683s -2. stop_time_travel_speed — 2.857s -3. transfers_in_seat_transfer_type — 0.569s -4. block_trips_overlapping — 0.417s -5. shape_increasing_distance — 0.378s +1. shape_to_stop_matching — 2.631s +2. stop_time_travel_speed — 0.906s +3. transfers_in_seat_transfer_type — 0.531s +4. block_trips_overlapping — 0.382s +5. shape_increasing_distance — 0.365s diff --git a/src/gtfs_validator/validators/shape_to_stop_matching.py b/src/gtfs_validator/validators/shape_to_stop_matching.py index 6a14ef715f..1bd1023ea3 100644 --- a/src/gtfs_validator/validators/shape_to_stop_matching.py +++ b/src/gtfs_validator/validators/shape_to_stop_matching.py @@ -15,10 +15,15 @@ from gtfs_validator.context import ValidationContext from gtfs_validator.notices import Notice, Severity from gtfs_validator.validators.shape_to_stop_matching_util import ( + CandidateMatch, RAIL_ROUTE_TYPE, MatchSettings, Problem, + _VEC_MIN_SEGMENTS, + _VEC_MIN_WORK_ITEMS, + build_shape_arrays, build_shape_points, + build_shape_spatial_index, build_stop_points, compute_trip_hash, match_using_geo_distance, @@ -191,14 +196,24 @@ def validate_shape_to_stop_matching( stops_by_id: dict[str, dict] = { row["stop_id"]: row - for row in stops_df.select(available_stops_cols).to_dicts() + for row in stops_df.select(available_stops_cols).iter_rows(named=True) } routes_by_id: dict[str, dict] = { row["route_id"]: row - for row in routes_df.select(["route_id", "route_type"]).to_dicts() + for row in routes_df.select(["route_id", "route_type"]).iter_rows(named=True) } + # Group trips by shape_id first so downstream row processing can filter. + trip_cols = ["trip_id", "route_id", "shape_id", "csv_row_number"] + trips_by_shape_id: dict[str, list[dict]] = defaultdict(list) + relevant_trip_ids: set[str] = set() + for row in trips_df.select(trip_cols).iter_rows(named=True): + shape_id = row.get("shape_id") + if shape_id: + trips_by_shape_id[shape_id].append(row) + relevant_trip_ids.add(row["trip_id"]) + # Group stop_times by trip_id st_cols = ["trip_id", "stop_id", "stop_sequence", "csv_row_number"] optional_st = ["shape_dist_traveled"] @@ -209,15 +224,13 @@ def validate_shape_to_stop_matching( # Pre-sort in Polars (vectorized, GIL-released) so Python loops need no further sorting. st_by_trip_id: dict[str, list[dict]] = defaultdict(list) - for row in stop_times_df.select(available_st_cols).sort(["trip_id", "stop_sequence"]).to_dicts(): - st_by_trip_id[row["trip_id"]].append(row) - - # Group trips by shape_id - trip_cols = ["trip_id", "route_id", "shape_id", "csv_row_number"] - trips_by_shape_id: dict[str, list[dict]] = defaultdict(list) - for row in trips_df.select(trip_cols).to_dicts(): - if row.get("shape_id"): - trips_by_shape_id[row["shape_id"]].append(row) + for row in ( + stop_times_df.select(available_st_cols) + .sort(["trip_id", "stop_sequence"]) + .iter_rows(named=True) + ): + if row["trip_id"] in relevant_trip_ids: + st_by_trip_id[row["trip_id"]].append(row) # Group shapes by shape_id shape_cols = ["shape_id", "shape_pt_lat", "shape_pt_lon", "shape_pt_sequence"] @@ -227,10 +240,17 @@ def validate_shape_to_stop_matching( if col in shapes_df.columns: available_shape_cols.append(col) + relevant_shape_ids = set(trips_by_shape_id.keys()) # Pre-sort by shape_pt_sequence in Polars so build_shape_points needs no further sorting. shapes_groups: dict[str, list[dict]] = defaultdict(list) - for row in shapes_df.select(available_shape_cols).sort(["shape_id", "shape_pt_sequence"]).to_dicts(): - shapes_groups[row["shape_id"]].append(row) + for row in ( + shapes_df.select(available_shape_cols) + .sort(["shape_id", "shape_pt_sequence"]) + .iter_rows(named=True) + ): + shape_id = row["shape_id"] + if shape_id in relevant_shape_ids: + shapes_groups[shape_id].append(row) notices: list[Notice] = [] @@ -243,9 +263,26 @@ def validate_shape_to_stop_matching( if not shape_points: continue + num_segments = max(0, len(shape_points) - 1) + max_trip_stops = max( + (len(st_by_trip_id.get(trip["trip_id"], [])) for trip in trips_for_shape), + default=0, + ) + should_build_shape_arrays = ( + num_segments >= _VEC_MIN_SEGMENTS + and (num_segments * max_trip_stops) >= _VEC_MIN_WORK_ITEMS + ) + shape_arrays = build_shape_arrays(shape_points) if should_build_shape_arrays else None + shape_spatial_index = build_shape_spatial_index(shape_points) shape_has_user_dist = shape_points[-1].user_distance > 0.0 reported_stop_ids: set[str] = set() - seen_trip_hashes: set[tuple] = set() + # Two-stage dedup: quick signature first, full trip hash only on collisions. + dedup_quick_state: dict[ + tuple[int, str, str, float, float], list[dict] | set[tuple] + ] = {} + candidates_cache: dict[tuple[str, float, float, float], list[CandidateMatch]] = {} + closest_cache: dict[tuple[str, float, float], CandidateMatch] = {} + need_trip_dedup = len(trips_for_shape) > 1 for trip in trips_for_shape: trip_id = trip["trip_id"] @@ -253,10 +290,32 @@ def validate_shape_to_stop_matching( if not stop_times: continue - trip_hash = compute_trip_hash(stop_times) - if trip_hash in seen_trip_hashes: - continue - seen_trip_hashes.add(trip_hash) + if need_trip_dedup: + first = stop_times[0] + last = stop_times[-1] + quick_sig = ( + len(stop_times), + str(first.get("stop_id") or ""), + str(last.get("stop_id") or ""), + float(first.get("shape_dist_traveled") or 0.0), + float(last.get("shape_dist_traveled") or 0.0), + ) + state = dedup_quick_state.get(quick_sig) + if state is None: + dedup_quick_state[quick_sig] = stop_times + elif isinstance(state, list): + first_hash = compute_trip_hash(state) + cur_hash = compute_trip_hash(stop_times) + hash_bucket = {first_hash} + dedup_quick_state[quick_sig] = hash_bucket + if cur_hash in hash_bucket: + continue + hash_bucket.add(cur_hash) + else: + cur_hash = compute_trip_hash(stop_times) + if cur_hash in state: + continue + state.add(cur_hash) route = routes_by_id.get(trip["route_id"]) if route is None: @@ -266,7 +325,15 @@ def validate_shape_to_stop_matching( stop_points = build_stop_points(stop_times, stops_by_id, is_large_route, resolved_latlng) # Geo-distance matching (always) - geo_problems = match_using_geo_distance(stop_points, shape_points, settings) + geo_problems = match_using_geo_distance( + stop_points, + shape_points, + settings, + shape_arrays, + shape_spatial_index, + candidates_cache=candidates_cache, + closest_cache=closest_cache, + ) notices.extend( _problems_to_notices( geo_problems, @@ -282,7 +349,12 @@ def validate_shape_to_stop_matching( stops_have_user_dist = stop_points[-1].user_distance > 0.0 if stops_have_user_dist and shape_has_user_dist: user_problems = match_using_user_distance( - stop_points, shape_points, settings + stop_points, + shape_points, + settings, + shape_arrays, + shape_spatial_index, + candidates_cache=candidates_cache, ) notices.extend( _problems_to_notices( diff --git a/src/gtfs_validator/validators/shape_to_stop_matching_util.py b/src/gtfs_validator/validators/shape_to_stop_matching_util.py index 31f0a4a03f..267faea682 100644 --- a/src/gtfs_validator/validators/shape_to_stop_matching_util.py +++ b/src/gtfs_validator/validators/shape_to_stop_matching_util.py @@ -1,6 +1,7 @@ """Geometry and algorithm helpers for ShapeToStopMatchingValidator. -All functions are pure Python (no Polars) for clarity and testability. +Scalar functions are kept for clarity, testability, and single-point edge +cases. Hot-path callers use the numpy-vectorised variants below. """ from __future__ import annotations @@ -10,6 +11,8 @@ from math import asin, atan2, cos, degrees, radians, sin, sqrt from typing import Optional +import numpy as np + # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- @@ -47,6 +50,30 @@ class ShapePoint: uv: tuple[float, float, float] = field(default_factory=lambda: (0.0, 0.0, 0.0)) # precomputed unit vector +@dataclass +class ShapeArrays: + """Numpy array representation of a shape for vectorised geometry.""" + lats: np.ndarray # (N,) float64 + lons: np.ndarray # (N,) float64 + geo_distances: np.ndarray # (N,) float64 cumulative metres + user_distances: np.ndarray # (N,) float64 cumulative + uvs: np.ndarray # (N, 3) float64 unit vectors + # Precomputed per-segment bounding boxes (N-1,) + seg_min_lat: np.ndarray + seg_max_lat: np.ndarray + seg_min_lon: np.ndarray + seg_max_lon: np.ndarray + + +@dataclass +class ShapeSpatialIndex: + """Grid index mapping lat/lon cells to segment ids.""" + + cell_size_deg: float + segment_cells: dict[tuple[int, int], list[int]] + fallback_segments: list[int] + + @dataclass class StopPoint: lat: float @@ -150,7 +177,6 @@ def closest_point_on_edge( return unit_vector_to_latlng(a) # Project p onto the plane perpendicular to n - d = _dot(p, n) # p_proj = p - dot(p, n)*n (then normalised) n_unit = _normalize(n) scaled_n = _scale(n_unit, _dot(p, n_unit)) @@ -187,14 +213,15 @@ def build_shape_points(shape_rows: list[dict]) -> list[ShapePoint]: geo_dist = 0.0 user_dist = 0.0 result: list[ShapePoint] = [] - prev_lat: Optional[float] = None - prev_lon: Optional[float] = None + prev_uv: tuple[float, float, float] | None = None for row in shape_rows: lat = float(row["shape_pt_lat"]) lon = float(row["shape_pt_lon"]) - if prev_lat is not None and prev_lon is not None: - geo_dist += geo_distance_meters(prev_lat, prev_lon, lat, lon) + cur_uv = latlng_to_unit_vector(lat, lon) + if prev_uv is not None: + dot = max(-1.0, min(1.0, _dot(prev_uv, cur_uv))) + geo_dist += math.acos(dot) * EARTH_RADIUS_METERS raw_user = row.get("shape_dist_traveled") or 0.0 user_dist = max(user_dist, float(raw_user)) result.append( @@ -203,14 +230,213 @@ def build_shape_points(shape_rows: list[dict]) -> list[ShapePoint]: user_distance=user_dist, lat=lat, lon=lon, - uv=latlng_to_unit_vector(lat, lon), + uv=cur_uv, ) ) - prev_lat, prev_lon = lat, lon + prev_uv = cur_uv + + return result + + +def build_shape_arrays(shape_points: list[ShapePoint]) -> ShapeArrays: + """Convert a list of ShapePoint into contiguous numpy arrays for vectorised ops.""" + n = len(shape_points) + lats = np.array([sp.lat for sp in shape_points], dtype=np.float64) + lons = np.array([sp.lon for sp in shape_points], dtype=np.float64) + geo_distances = np.array([sp.geo_distance for sp in shape_points], dtype=np.float64) + user_distances = np.array([sp.user_distance for sp in shape_points], dtype=np.float64) + + # Unit vectors (N, 3) + lat_r = np.radians(lats) + lon_r = np.radians(lons) + cos_lat = np.cos(lat_r) + uvs = np.column_stack([cos_lat * np.cos(lon_r), cos_lat * np.sin(lon_r), np.sin(lat_r)]) + + # Segment bounding boxes (N-1,) + if n >= 2: + seg_min_lat = np.minimum(lats[:-1], lats[1:]) + seg_max_lat = np.maximum(lats[:-1], lats[1:]) + seg_min_lon = np.minimum(lons[:-1], lons[1:]) + seg_max_lon = np.maximum(lons[:-1], lons[1:]) + else: + seg_min_lat = seg_max_lat = seg_min_lon = seg_max_lon = np.empty(0) + + return ShapeArrays( + lats=lats, + lons=lons, + geo_distances=geo_distances, + user_distances=user_distances, + uvs=uvs, + seg_min_lat=seg_min_lat, + seg_max_lat=seg_max_lat, + seg_min_lon=seg_min_lon, + seg_max_lon=seg_max_lon, + ) + + +# --------------------------------------------------------------------------- +# Vectorised geometry helpers (numpy) +# --------------------------------------------------------------------------- + +_EPSILON_VEC = 1e-10 +# Minimum number of shape segments to justify numpy overhead. +# Below this, the scalar Python path is faster due to lower per-call overhead. +_VEC_MIN_SEGMENTS = 512 +# Minimum per-call geometry workload (segments * stops) to justify numpy path. +_VEC_MIN_WORK_ITEMS = 100_000 + +# Scalar spatial index tuning. +_SPATIAL_INDEX_MIN_SEGMENTS = 128 +_SPATIAL_CELL_SIZE_DEG = 0.01 +_SPATIAL_MAX_CELLS_PER_SEG = 256 + + +def _haversine_meters_vec( + lat1: np.ndarray, lon1: np.ndarray, lat2: np.ndarray, lon2: np.ndarray, +) -> np.ndarray: + """Vectorised haversine distance in metres.""" + phi1 = np.radians(lat1) + phi2 = np.radians(lat2) + dphi = phi2 - phi1 + dlam = np.radians(lon2 - lon1) + a = np.sin(dphi / 2) ** 2 + np.cos(phi1) * np.cos(phi2) * np.sin(dlam / 2) ** 2 + return 2 * np.arctan2(np.sqrt(a), np.sqrt(np.maximum(0.0, 1.0 - a))) * EARTH_RADIUS_METERS + + +def _closest_points_on_edges_vec( + p_uv: np.ndarray, + a_uvs: np.ndarray, + b_uvs: np.ndarray, +) -> np.ndarray: + """Vectorised closest-point-on-arc for M edges. + + Parameters + ---------- + p_uv : (3,) unit vector of the query point + a_uvs : (M, 3) start-point unit vectors + b_uvs : (M, 3) end-point unit vectors + + Returns + ------- + (M, 3) unit vectors of the closest point on each arc + """ + # Cross product n = a × b (M, 3) + n = np.cross(a_uvs, b_uvs) + n_norm = np.linalg.norm(n, axis=1, keepdims=True) # (M, 1) + degenerate = (n_norm.ravel() < _EPSILON_VEC) + + # Normalise n (safe — degenerate arcs handled separately) + n_safe = np.divide( + n, + n_norm, + out=np.zeros_like(n), + where=n_norm > _EPSILON_VEC, + ) + + # Project p onto the great-circle plane: p_proj = p - (p·n_unit)*n_unit + p_dot_n = (p_uv[np.newaxis, :] * n_safe).sum(axis=1, keepdims=True) # (M, 1) + p_proj_raw = p_uv[np.newaxis, :] - p_dot_n * n_safe # (M, 3) + p_proj_norm = np.linalg.norm(p_proj_raw, axis=1, keepdims=True) # (M, 1) + pole = (p_proj_norm.ravel() < _EPSILON_VEC) + + # Normalise p_proj + p_proj = np.divide( + p_proj_raw, + p_proj_norm, + out=np.zeros_like(p_proj_raw), + where=p_proj_norm > _EPSILON_VEC, + ) + + # Check if p_proj lies within arc: cross(a, p_proj)·n >= 0 AND cross(p_proj, b)·n >= 0 + cross_ap = np.cross(a_uvs, p_proj) + cross_pb = np.cross(p_proj, b_uvs) + dot_ap_n = (cross_ap * n).sum(axis=1) + dot_pb_n = (cross_pb * n).sum(axis=1) + within_arc = (dot_ap_n >= 0) & (dot_pb_n >= 0) + + # Dot products for endpoint selection + dot_a_p = (a_uvs * p_uv[np.newaxis, :]).sum(axis=1) + dot_b_p = (b_uvs * p_uv[np.newaxis, :]).sum(axis=1) + a_closer = dot_a_p >= dot_b_p + + # Build result: pick p_proj if within arc, else closer endpoint + endpoint = np.where(a_closer[:, np.newaxis], a_uvs, b_uvs) + result = np.where(within_arc[:, np.newaxis], p_proj, endpoint) + + # Handle degenerate arcs (a == b) and pole cases + result[degenerate] = a_uvs[degenerate] + pole_mask = pole & ~degenerate + result[pole_mask] = endpoint[pole_mask] return result +def _uv_to_latlng_vec(uvs: np.ndarray) -> tuple[np.ndarray, np.ndarray]: + """Convert (M, 3) unit vectors to (lats, lons) arrays in degrees.""" + lats = np.degrees(np.arcsin(np.clip(uvs[:, 2], -1.0, 1.0))) + lons = np.degrees(np.arctan2(uvs[:, 1], uvs[:, 0])) + return lats, lons + + +def _seg_fraction_vec( + p_uv: np.ndarray, + a_uvs: np.ndarray, + b_uvs: np.ndarray, +) -> np.ndarray: + """Vectorised segment fraction for M edges. Returns (M,) array in [0, 1].""" + n = np.cross(a_uvs, b_uvs) + n_norm = np.linalg.norm(n, axis=1, keepdims=True) + degenerate = (n_norm.ravel() < _EPSILON_VEC) + + n_safe = np.divide( + n, + n_norm, + out=np.zeros_like(n), + where=n_norm > _EPSILON_VEC, + ) + p_dot_n = (p_uv[np.newaxis, :] * n_safe).sum(axis=1, keepdims=True) + p_proj_raw = p_uv[np.newaxis, :] - p_dot_n * n_safe + p_proj_norm = np.linalg.norm(p_proj_raw, axis=1, keepdims=True) + small_proj = (p_proj_norm.ravel() < _EPSILON_VEC) + + p_proj = np.divide( + p_proj_raw, + p_proj_norm, + out=np.zeros_like(p_proj_raw), + where=p_proj_norm > _EPSILON_VEC, + ) + + # Check within arc + cross_ap = np.cross(a_uvs, p_proj) + cross_pb = np.cross(p_proj, b_uvs) + dot_ap_n = (cross_ap * n).sum(axis=1) + dot_pb_n = (cross_pb * n).sum(axis=1) + within = (dot_ap_n >= 0) & (dot_pb_n >= 0) + + # Angle fractions + dot_ab = np.clip((a_uvs * b_uvs).sum(axis=1), -1.0, 1.0) + angle_ab = np.arccos(dot_ab) + dot_ap = np.clip((a_uvs * p_proj).sum(axis=1), -1.0, 1.0) + angle_ap = np.arccos(dot_ap) + + ratio = np.divide( + angle_ap, + angle_ab, + out=np.zeros_like(angle_ap), + where=angle_ab > _EPSILON_VEC, + ) + frac = np.where(angle_ab > _EPSILON_VEC, np.clip(ratio, 0.0, 1.0), 0.0) + + # Outside arc: 0 or 1 based on closer endpoint + dot_a_p = (a_uvs * p_uv[np.newaxis, :]).sum(axis=1) + dot_b_p = (b_uvs * p_uv[np.newaxis, :]).sum(axis=1) + outside_frac = np.where(dot_a_p >= dot_b_p, 0.0, 1.0) + + result = np.where(within, frac, outside_frac) + result[degenerate | small_proj] = 0.0 + return result + + def resolve_stop_location(stop_id: str, stops_by_id: dict) -> tuple[float, float]: """Resolve a stop's coordinates, walking up to parent_station if needed.""" stop = stops_by_id.get(stop_id) @@ -236,6 +462,85 @@ def resolve_stop_name(stop_id: str, stops_by_id: dict) -> str: return stop.get("stop_name") or "" +def build_shape_spatial_index(shape_points: list[ShapePoint]) -> ShapeSpatialIndex | None: + """Build a coarse lat/lon grid index over shape segments for scalar matching.""" + num_segments = len(shape_points) - 1 + if num_segments < _SPATIAL_INDEX_MIN_SEGMENTS: + return None + + cell_size = _SPATIAL_CELL_SIZE_DEG + inv_cell = 1.0 / cell_size + segment_cells: dict[tuple[int, int], list[int]] = {} + fallback_segments: list[int] = [] + + for i in range(num_segments): + a = shape_points[i] + b = shape_points[i + 1] + + min_lat = a.lat if a.lat <= b.lat else b.lat + max_lat = b.lat if a.lat <= b.lat else a.lat + min_lon = a.lon if a.lon <= b.lon else b.lon + max_lon = b.lon if a.lon <= b.lon else a.lon + + min_lat_cell = int(math.floor(min_lat * inv_cell)) + max_lat_cell = int(math.floor(max_lat * inv_cell)) + min_lon_cell = int(math.floor(min_lon * inv_cell)) + max_lon_cell = int(math.floor(max_lon * inv_cell)) + + cell_span = (max_lat_cell - min_lat_cell + 1) * (max_lon_cell - min_lon_cell + 1) + if cell_span > _SPATIAL_MAX_CELLS_PER_SEG: + fallback_segments.append(i) + continue + + for lat_cell in range(min_lat_cell, max_lat_cell + 1): + for lon_cell in range(min_lon_cell, max_lon_cell + 1): + key = (lat_cell, lon_cell) + bucket = segment_cells.get(key) + if bucket is None: + segment_cells[key] = [i] + else: + bucket.append(i) + + return ShapeSpatialIndex( + cell_size_deg=cell_size, + segment_cells=segment_cells, + fallback_segments=fallback_segments, + ) + + +def _query_shape_spatial_index( + index: ShapeSpatialIndex, + stop: StopPoint, + max_dist: float, +) -> list[int]: + """Return sorted candidate segment ids that may be within max_dist of stop.""" + meters_per_deg = 111_320.0 + lat_slack = max_dist / meters_per_deg + lon_slack = max_dist / (meters_per_deg * max(cos(radians(abs(stop.lat))), 0.01)) + + min_lat = stop.lat - lat_slack + max_lat = stop.lat + lat_slack + min_lon = stop.lon - lon_slack + max_lon = stop.lon + lon_slack + + inv_cell = 1.0 / index.cell_size_deg + min_lat_cell = int(math.floor(min_lat * inv_cell)) + max_lat_cell = int(math.floor(max_lat * inv_cell)) + min_lon_cell = int(math.floor(min_lon * inv_cell)) + max_lon_cell = int(math.floor(max_lon * inv_cell)) + + candidates: set[int] = set(index.fallback_segments) + for lat_cell in range(min_lat_cell, max_lat_cell + 1): + for lon_cell in range(min_lon_cell, max_lon_cell + 1): + segs = index.segment_cells.get((lat_cell, lon_cell)) + if segs: + candidates.update(segs) + + if not candidates: + return [] + return sorted(candidates) + + def build_stop_points( stop_times: list[dict], stops_by_id: dict, @@ -277,6 +582,23 @@ def compute_trip_hash(stop_times: list[dict]) -> tuple: ) +def _candidate_cache_key(stop: StopPoint, max_dist: float) -> tuple[str, float, float, float]: + """Stable cache key for per-shape stop-to-candidate matching.""" + stop_id = str(stop.stop_time_row.get("stop_id") or "") + return ( + stop_id, + round(stop.lat, 7), + round(stop.lon, 7), + round(max_dist, 3), + ) + + +def _closest_cache_key(stop: StopPoint) -> tuple[str, float, float]: + """Stable cache key for per-shape closest-point lookup.""" + stop_id = str(stop.stop_time_row.get("stop_id") or "") + return (stop_id, round(stop.lat, 7), round(stop.lon, 7)) + + # --------------------------------------------------------------------------- # Matching helpers # --------------------------------------------------------------------------- @@ -330,6 +652,8 @@ def find_potential_matches( stop: StopPoint, shape_points: list[ShapePoint], max_dist: float, + shape_arrays: Optional[ShapeArrays] = None, + spatial_index: ShapeSpatialIndex | None = None, ) -> list[CandidateMatch]: """Find candidate matches as local minima within max_dist along the shape.""" if len(shape_points) == 1: @@ -346,9 +670,13 @@ def find_potential_matches( ] return [] + # Vectorised path for large shapes. + if shape_arrays is not None and len(shape_points) - 1 >= _VEC_MIN_SEGMENTS: + return _find_potential_matches_vec(stop, shape_arrays, max_dist) + + # --- Scalar path (inlined for zero overhead) --- p = latlng_to_unit_vector(stop.lat, stop.lon) - # Precompute per-stop bounding-box slack to quickly skip distant segments. _METERS_PER_DEG = 111_320.0 lat_slack = max_dist / _METERS_PER_DEG lon_slack = max_dist / (_METERS_PER_DEG * max(cos(radians(abs(stop.lat))), 0.01)) @@ -358,16 +686,44 @@ def find_potential_matches( run_best_match: Optional[CandidateMatch] = None matches: list[CandidateMatch] = [] - for i in range(len(shape_points) - 1): + seg_ids = ( + _query_shape_spatial_index(spatial_index, stop, max_dist) + if spatial_index is not None + else list(range(len(shape_points) - 1)) + ) + prev_seg_id = -1 + for i in seg_ids: + if in_close_run and prev_seg_id >= 0 and i != prev_seg_id + 1: + assert run_best_match is not None + matches.append(run_best_match) + in_close_run = False + run_best_dist = math.inf + run_best_match = None + a = shape_points[i] b = shape_points[i + 1] + a_lat = a.lat + b_lat = b.lat + a_lon = a.lon + b_lon = b.lon + if a_lat <= b_lat: + min_lat = a_lat + max_lat = b_lat + else: + min_lat = b_lat + max_lat = a_lat + if a_lon <= b_lon: + min_lon = a_lon + max_lon = b_lon + else: + min_lon = b_lon + max_lon = a_lon - # Fast bbox rejection before expensive spherical geometry. if ( - stop.lat < min(a.lat, b.lat) - lat_slack - or stop.lat > max(a.lat, b.lat) + lat_slack - or stop.lon < min(a.lon, b.lon) - lon_slack - or stop.lon > max(a.lon, b.lon) + lon_slack + stop.lat < min_lat - lat_slack + or stop.lat > max_lat + lat_slack + or stop.lon < min_lon - lon_slack + or stop.lon > max_lon + lon_slack ): if in_close_run: assert run_best_match is not None @@ -382,18 +738,15 @@ def find_potential_matches( closest_lat, closest_lon = closest_point_on_edge(p, av, bv) dist = geo_distance_meters(stop.lat, stop.lon, closest_lat, closest_lon) - # Interpolate geo_distance at matched point - frac = _seg_fraction(p, av, bv) - matched_geo_dist = a.geo_distance + frac * (b.geo_distance - a.geo_distance) - - candidate = CandidateMatch( - geo_distance_to_shape=dist, - shape_geo_distance=matched_geo_dist, - lat=closest_lat, - lon=closest_lon, - ) - if dist <= max_dist: + frac = _seg_fraction(p, av, bv) + matched_geo_dist = a.geo_distance + frac * (b.geo_distance - a.geo_distance) + candidate = CandidateMatch( + geo_distance_to_shape=dist, + shape_geo_distance=matched_geo_dist, + lat=closest_lat, + lon=closest_lon, + ) if not in_close_run: in_close_run = True run_best_dist = dist @@ -408,6 +761,7 @@ def find_potential_matches( in_close_run = False run_best_dist = math.inf run_best_match = None + prev_seg_id = i if in_close_run and run_best_match is not None: matches.append(run_best_match) @@ -415,9 +769,73 @@ def find_potential_matches( return matches +def _find_potential_matches_vec( + stop: StopPoint, + sa: ShapeArrays, + max_dist: float, +) -> list[CandidateMatch]: + """Numpy-vectorised implementation for large shapes.""" + num_segs = len(sa.lats) - 1 + + _METERS_PER_DEG = 111_320.0 + lat_slack = max_dist / _METERS_PER_DEG + lon_slack = max_dist / (_METERS_PER_DEG * max(cos(radians(abs(stop.lat))), 0.01)) + + bbox_pass = ( + (stop.lat >= sa.seg_min_lat - lat_slack) + & (stop.lat <= sa.seg_max_lat + lat_slack) + & (stop.lon >= sa.seg_min_lon - lon_slack) + & (stop.lon <= sa.seg_max_lon + lon_slack) + ) + + pass_idx = np.flatnonzero(bbox_pass) + if pass_idx.size == 0: + return [] + + p_uv = np.array(latlng_to_unit_vector(stop.lat, stop.lon), dtype=np.float64) + a_uvs = sa.uvs[pass_idx] + b_uvs = sa.uvs[pass_idx + 1] + closest_uvs = _closest_points_on_edges_vec(p_uv, a_uvs, b_uvs) + closest_lats, closest_lons = _uv_to_latlng_vec(closest_uvs) + dists = _haversine_meters_vec( + np.full(pass_idx.size, stop.lat), np.full(pass_idx.size, stop.lon), + closest_lats, closest_lons, + ) + fracs = _seg_fraction_vec(p_uv, a_uvs, b_uvs) + geo_d_a = sa.geo_distances[pass_idx] + geo_d_b = sa.geo_distances[pass_idx + 1] + matched_geo_dists = geo_d_a + fracs * (geo_d_b - geo_d_a) + + full_dists = np.full(num_segs, np.inf) + full_dists[pass_idx] = dists + + is_close = full_dists <= max_dist + padded = np.concatenate([[False], is_close, [False]]) + diff = np.diff(padded.astype(np.int8)) + run_starts = np.flatnonzero(diff == 1) + run_ends = np.flatnonzero(diff == -1) + + matches: list[CandidateMatch] = [] + for rs, re in zip(run_starts, run_ends): + run_dists = full_dists[rs:re] + best_in_run = rs + int(np.argmin(run_dists)) + pos = np.searchsorted(pass_idx, best_in_run) + matches.append( + CandidateMatch( + geo_distance_to_shape=float(dists[pos]), + shape_geo_distance=float(matched_geo_dists[pos]), + lat=float(closest_lats[pos]), + lon=float(closest_lons[pos]), + ) + ) + + return matches + + def find_closest_on_shape( stop: StopPoint, shape_points: list[ShapePoint], + shape_arrays: Optional[ShapeArrays] = None, ) -> CandidateMatch: """Scan all edges and return globally closest point (no distance limit).""" if len(shape_points) == 1: @@ -430,6 +848,11 @@ def find_closest_on_shape( lon=sp.lon, ) + # Vectorised path for large shapes. + if shape_arrays is not None and len(shape_points) - 1 >= _VEC_MIN_SEGMENTS: + return _find_closest_on_shape_vec(stop, shape_arrays) + + # --- Scalar path (inlined for zero overhead) --- p = latlng_to_unit_vector(stop.lat, stop.lon) best_dist = math.inf best_match: Optional[CandidateMatch] = None @@ -456,6 +879,32 @@ def find_closest_on_shape( return best_match +def _find_closest_on_shape_vec( + stop: StopPoint, + sa: ShapeArrays, +) -> CandidateMatch: + """Numpy-vectorised implementation for large shapes.""" + p_uv = np.array(latlng_to_unit_vector(stop.lat, stop.lon), dtype=np.float64) + a_uvs = sa.uvs[:-1] + b_uvs = sa.uvs[1:] + closest_uvs = _closest_points_on_edges_vec(p_uv, a_uvs, b_uvs) + closest_lats, closest_lons = _uv_to_latlng_vec(closest_uvs) + num_segs = len(sa.lats) - 1 + dists = _haversine_meters_vec( + np.full(num_segs, stop.lat), np.full(num_segs, stop.lon), + closest_lats, closest_lons, + ) + best_i = int(np.argmin(dists)) + frac = float(_seg_fraction_vec(p_uv, a_uvs[best_i:best_i+1], b_uvs[best_i:best_i+1])[0]) + matched_geo_dist = sa.geo_distances[best_i] + frac * (sa.geo_distances[best_i + 1] - sa.geo_distances[best_i]) + return CandidateMatch( + geo_distance_to_shape=float(dists[best_i]), + shape_geo_distance=float(matched_geo_dist), + lat=float(closest_lats[best_i]), + lon=float(closest_lons[best_i]), + ) + + def find_best_assignment( candidates_per_stop: list[list[CandidateMatch]], stop_points: list[StopPoint], @@ -470,6 +919,21 @@ def find_best_assignment( if n == 0: return [], [] + # Fast path: all stops have at most one candidate, so DP is unnecessary. + if all(len(candidates) <= 1 for candidates in candidates_per_stop): + quick_assignment: list[Optional[CandidateMatch]] = [] + quick_out_of_order: list[tuple[int, CandidateMatch, CandidateMatch]] = [] + prev_match: Optional[CandidateMatch] = None + for i, candidates in enumerate(candidates_per_stop): + match = candidates[0] if candidates else None + quick_assignment.append(match) + if match is None: + continue + if prev_match is not None and prev_match.shape_geo_distance > match.shape_geo_distance: + quick_out_of_order.append((i, match, prev_match)) + prev_match = match + return quick_assignment, quick_out_of_order + # dp[j] = (min_cost, prev_candidate_idx or -1, prev_stop_idx or -1) # We keep per-stop tables. INF = math.inf @@ -503,8 +967,6 @@ def find_best_assignment( # Find the best ending candidate for the last stop last_row = dp_table[n - 1] - last_candidates = candidates_per_stop[n - 1] - # For stops that have no valid predecessor, we still need to pick the best local candidate. # Find overall best (lowest cost that is finite, else pick lowest geo_dist_to_shape) best_j = -1 @@ -531,7 +993,6 @@ def find_best_assignment( # No valid predecessor — out-of-order # Pick the best local candidate for stop i-1 as the "prev match" prev_candidates = candidates_per_stop[i - 1] - prev_row = dp_table[i - 1] best_prev = min(range(len(prev_candidates)), key=lambda k: prev_candidates[k].geo_distance_to_shape) out_of_order.append((i, candidates[cur_j], prev_candidates[best_prev])) cur_j = best_prev @@ -599,8 +1060,8 @@ def interpolate_user_distance( frac = max(0.0, min(1.0, (user_dist - a.user_distance) / denom)) # Interpolate in 3D unit-vector space - av = latlng_to_unit_vector(a.lat, a.lon) - bv = latlng_to_unit_vector(b.lat, b.lon) + av = a.uv + bv = b.uv # Linear interpolation in 3D then normalize interp = ( av[0] + frac * (bv[0] - av[0]), @@ -630,19 +1091,45 @@ def match_using_geo_distance( stop_points: list[StopPoint], shape_points: list[ShapePoint], settings: MatchSettings, + shape_arrays: Optional[ShapeArrays] = None, + spatial_index: ShapeSpatialIndex | None = None, + candidates_cache: Optional[ + dict[tuple[str, float, float, float], list[CandidateMatch]] + ] = None, + closest_cache: Optional[dict[tuple[str, float, float], CandidateMatch]] = None, ) -> list[Problem]: """Geo-distance matching: find candidates, assign, detect issues.""" problems: list[Problem] = [] candidates_per_stop: list[list[CandidateMatch]] = [] # Maps index in candidates_per_stop -> index in stop_points (skips too-far stops) candidate_stop_idx: list[int] = [] + num_segments = max(0, len(shape_points) - 1) + use_vectorized = ( + shape_arrays is not None + and num_segments >= _VEC_MIN_SEGMENTS + and (num_segments * len(stop_points)) >= _VEC_MIN_WORK_ITEMS + ) for orig_idx, stop in enumerate(stop_points): max_dist = settings.max_distance_meters if stop.is_large_station: max_dist *= settings.large_station_multiplier - candidates = find_potential_matches(stop, shape_points, max_dist) + candidates_key: Optional[tuple[str, float, float, float]] = None + candidates: Optional[list[CandidateMatch]] = None + if candidates_cache is not None: + candidates_key = _candidate_cache_key(stop, max_dist) + candidates = candidates_cache.get(candidates_key) + if candidates is None: + candidates = find_potential_matches( + stop, + shape_points, + max_dist, + shape_arrays if use_vectorized else None, + spatial_index, + ) + if candidates_cache is not None and candidates_key is not None: + candidates_cache[candidates_key] = candidates if len(candidates) > settings.potential_matches_threshold: # Find closest among candidates for reporting @@ -659,7 +1146,19 @@ def match_using_geo_distance( ) if not candidates: - closest = find_closest_on_shape(stop, shape_points) + closest_key: Optional[tuple[str, float, float]] = None + closest: Optional[CandidateMatch] = None + if closest_cache is not None: + closest_key = _closest_cache_key(stop) + closest = closest_cache.get(closest_key) + if closest is None: + closest = find_closest_on_shape( + stop, + shape_points, + shape_arrays if use_vectorized else None, + ) + if closest_cache is not None and closest_key is not None: + closest_cache[closest_key] = closest problems.append( Problem( kind="too_far", @@ -700,33 +1199,59 @@ def match_using_user_distance( stop_points: list[StopPoint], shape_points: list[ShapePoint], settings: MatchSettings, + shape_arrays: Optional[ShapeArrays] = None, + spatial_index: ShapeSpatialIndex | None = None, + candidates_cache: Optional[ + dict[tuple[str, float, float, float], list[CandidateMatch]] + ] = None, ) -> list[Problem]: """User-distance matching: interpolate by shape_dist_traveled, detect issues.""" problems: list[Problem] = [] candidates_per_stop: list[list[CandidateMatch]] = [] search_from = 0 + num_segments = max(0, len(shape_points) - 1) + use_vectorized = ( + shape_arrays is not None + and num_segments >= _VEC_MIN_SEGMENTS + and (num_segments * len(stop_points)) >= _VEC_MIN_WORK_ITEMS + ) for stop in stop_points: if stop.user_distance > 0.0: - match, search_from = interpolate_user_distance( + interp_match, search_from = interpolate_user_distance( stop.user_distance, shape_points, search_from ) # Update geo_distance_to_shape from actual stop position - actual_dist = geo_distance_meters(stop.lat, stop.lon, match.lat, match.lon) - match = CandidateMatch( + actual_dist = geo_distance_meters( + stop.lat, stop.lon, interp_match.lat, interp_match.lon + ) + user_match = CandidateMatch( geo_distance_to_shape=actual_dist, - shape_geo_distance=match.shape_geo_distance, - lat=match.lat, - lon=match.lon, + shape_geo_distance=interp_match.shape_geo_distance, + lat=interp_match.lat, + lon=interp_match.lon, ) - candidates_per_stop.append([match]) + candidates_per_stop.append([user_match]) else: max_dist = settings.max_distance_meters if stop.is_large_station: max_dist *= settings.large_station_multiplier - candidates_per_stop.append( - find_potential_matches(stop, shape_points, max_dist) - ) + candidates_key: Optional[tuple[str, float, float, float]] = None + candidates: Optional[list[CandidateMatch]] = None + if candidates_cache is not None: + candidates_key = _candidate_cache_key(stop, max_dist) + candidates = candidates_cache.get(candidates_key) + if candidates is None: + candidates = find_potential_matches( + stop, + shape_points, + max_dist, + shape_arrays if use_vectorized else None, + spatial_index, + ) + if candidates_cache is not None and candidates_key is not None: + candidates_cache[candidates_key] = candidates + candidates_per_stop.append(candidates) assignment, out_of_order = find_best_assignment(candidates_per_stop, stop_points) diff --git a/src/gtfs_validator/validators/stop_time_travel_speed.py b/src/gtfs_validator/validators/stop_time_travel_speed.py index 587dd77908..7ae2510ed9 100644 --- a/src/gtfs_validator/validators/stop_time_travel_speed.py +++ b/src/gtfs_validator/validators/stop_time_travel_speed.py @@ -234,6 +234,48 @@ def _prepare_stop_lookups( # --------------------------------------------------------------------------- +def _build_consecutive_notice( + row: dict, + stop_name_by_stop_id: Mapping[str, str | None], +) -> Notice: + """Build a consecutive-stop speed notice from a Polars row dict.""" + return Notice( + code="fast_travel_between_consecutive_stops", + severity=Severity.WARNING, + fields={ + "trip_csv_row_number": int(row["trip_csv_row_number"]), + "trip_id": str(row["trip_id"]), + "route_id": str(row["route_id"]), + "speed_kph": round(float(row["speed_kph"]), 2), + "distance_km": round(float(row["consec_distance_km"]), 4), + "csv_row_number1": row["prev_csv_row_number"], + "stop_sequence1": row["prev_stop_sequence"], + "stop_id1": row["prev_stop_id"], + "stop_name1": stop_name_by_stop_id.get(row["prev_stop_id"]) or "", + "departure_time1": secs_to_hhmmss(int(row["prev_departure_time"])), + "csv_row_number2": row["csv_row_number"], + "stop_sequence2": row["stop_sequence"], + "stop_id2": row["stop_id"], + "stop_name2": stop_name_by_stop_id.get(row["stop_id"]) or "", + "arrival_time2": secs_to_hhmmss(int(row["arrival_time"])), + }, + ) + + +def _haversine_km_expr( + lat1: str, lon1: str, lat2: str, lon2: str, +) -> pl.Expr: + """Polars expression: haversine distance in km between two lat/lon column pairs.""" + R = 6371.0 + dlat = (pl.col(lat2) - pl.col(lat1)).radians() / 2 + dlon = (pl.col(lon2) - pl.col(lon1)).radians() / 2 + a = ( + dlat.sin() ** 2 + + pl.col(lat1).radians().cos() * pl.col(lat2).radians().cos() * dlon.sin() ** 2 + ) + return R * 2 * a.sqrt().arcsin() + + def validate_stop_time_travel_speed( feed: dict[str, pl.DataFrame], ctx: ValidationContext, @@ -279,117 +321,148 @@ def validate_stop_time_travel_speed( if time_cols_to_parse: joined = joined.with_columns([_parse_gtfs_time_expr(c) for c in time_cols_to_parse]) - trip_ids = joined["trip_id"].to_list() - route_ids = joined["route_id"].to_list() - route_types = joined["route_type"].to_list() - trip_csv_row_numbers = joined["trip_csv_row_number"].to_list() - stop_ids = joined["stop_id"].to_list() - stop_sequences = joined["stop_sequence"].to_list() - csv_row_numbers = joined["csv_row_number"].to_list() - arrival_times = joined["arrival_time"].to_list() - departure_times = joined["departure_time"].to_list() - - notices: list[Notice] = [] - total_rows = len(trip_ids) - trip_start = 0 - - while trip_start < total_rows: - trip_end = trip_start + 1 - trip_id = trip_ids[trip_start] - while trip_end < total_rows and trip_ids[trip_end] == trip_id: - trip_end += 1 - - if trip_end - trip_start < 2: - trip_start = trip_end - continue - - route_type = route_types[trip_start] - if route_type is None: - trip_start = trip_end - continue - - max_speed = get_max_speed_kph(int(route_type)) - route_id = route_ids[trip_start] - trip_csv_row_number = trip_csv_row_numbers[trip_start] - - # Precompute latlngs for all stops in this trip. - trip_len = trip_end - trip_start - trip_latlngs: list[tuple[float, float] | None] = [ - resolved_latlng_by_stop_id.get(stop_ids[trip_start + i]) - for i in range(trip_len) - ] - - # Vectorized prefix-sum distances (NaN segments treated as 0 km). - lats_arr = np.array( - [ll[0] if ll is not None else np.nan for ll in trip_latlngs] - ) - lons_arr = np.array( - [ll[1] if ll is not None else np.nan for ll in trip_latlngs] + # --- Add resolved lat/lon via join --- + latlng_rows = [ + {"stop_id": sid, "_rlat": ll[0], "_rlon": ll[1]} + for sid, ll in resolved_latlng_by_stop_id.items() + if ll is not None + ] + if latlng_rows: + latlng_df = pl.DataFrame(latlng_rows).with_columns( + pl.col("_rlat").cast(pl.Float64), pl.col("_rlon").cast(pl.Float64), ) - seg_dists_raw = _haversine_km_vectorized( - lats_arr[:-1], lons_arr[:-1], lats_arr[1:], lons_arr[1:] + joined = joined.join(latlng_df, on="stop_id", how="left") + else: + joined = joined.with_columns( + pl.lit(None, dtype=pl.Float64).alias("_rlat"), + pl.lit(None, dtype=pl.Float64).alias("_rlon"), ) - seg_dists_safe = np.where(np.isnan(seg_dists_raw), 0.0, seg_dists_raw) - prefix_distances_km = np.concatenate([[0.0], np.cumsum(seg_dists_safe)]) - total_trip_distance_km = float(prefix_distances_km[-1]) - - # Consecutive-stop check: iterate over consecutive valid-latlng pairs. - # Precompute distances between consecutive valid stops using NumPy. - valid_rel_indices = [i for i in range(trip_len) if trip_latlngs[i] is not None] - if len(valid_rel_indices) >= 2: - v_lats = lats_arr[valid_rel_indices] - v_lons = lons_arr[valid_rel_indices] - consec_dists = _haversine_km_vectorized( - v_lats[:-1], v_lons[:-1], v_lats[1:], v_lons[1:] - ) - for j in range(len(valid_rel_indices) - 1): - start_rel = valid_rel_indices[j] - end_rel = valid_rel_indices[j + 1] - start_idx = trip_start + start_rel - end_idx = trip_start + end_rel - - departure_secs = departure_times[start_idx] - arrival_secs = arrival_times[end_idx] - if departure_secs is None or arrival_secs is None: - continue - distance_km = float(consec_dists[j]) - speed = get_speed_kph(distance_km, departure_secs, arrival_secs) - if speed > max_speed: - notices.append( - Notice( - code="fast_travel_between_consecutive_stops", - severity=Severity.WARNING, - fields=_build_notice_fields( - int(trip_csv_row_number), - str(trip_id), - str(route_id), - speed, - distance_km, - start_idx, - end_idx, - csv_row_numbers, - stop_sequences, - stop_ids, - departure_times, - arrival_times, - stop_name_by_stop_id, - ), - ) - ) + # --- Add max speed per route_type --- + speed_rows = [{"route_type": k, "max_speed_kph": v} for k, v in MAX_SPEED_KPH.items()] + speed_df = pl.DataFrame(speed_rows).with_columns( + pl.col("route_type").cast(joined["route_type"].dtype), + ) + joined = joined.join(speed_df, on="route_type", how="left").with_columns( + pl.col("max_speed_kph").fill_null(DEFAULT_MAX_SPEED_KPH), + ) + + # =================================================================== + # Consecutive-stop check (Polars-native) + # =================================================================== + # Filter to rows with resolved coordinates, then pair adjacent valid + # stops within each trip via shift(). + valid = joined.filter(pl.col("_rlat").is_not_null()) + valid = valid.with_columns([ + pl.col("_rlat").shift(1).over("trip_id").alias("prev_lat"), + pl.col("_rlon").shift(1).over("trip_id").alias("prev_lon"), + pl.col("departure_time").shift(1).over("trip_id").alias("prev_departure_time"), + pl.col("csv_row_number").shift(1).over("trip_id").alias("prev_csv_row_number"), + pl.col("stop_sequence").shift(1).over("trip_id").alias("prev_stop_sequence"), + pl.col("stop_id").shift(1).over("trip_id").alias("prev_stop_id"), + ]) + # Drop first row of each trip (no predecessor). + valid = valid.filter(pl.col("prev_lat").is_not_null()) + + # Haversine distance and speed as Polars expressions. + dist_km_expr = _haversine_km_expr("prev_lat", "prev_lon", "_rlat", "_rlon") + + raw_time = pl.col("arrival_time") - pl.col("prev_departure_time") + is_minute = (pl.col("arrival_time") % 60 == 0) & (pl.col("prev_departure_time") % 60 == 0) + effective_time = ( + pl.when(raw_time <= 0).then(pl.lit(MIN_EFFECTIVE_TIME_SECS)) + .when(is_minute).then(raw_time + MIN_EFFECTIVE_TIME_SECS) + .otherwise(raw_time) + ) + speed_expr = dist_km_expr * NUM_SECONDS_PER_HOUR / effective_time + + valid = valid.with_columns([ + dist_km_expr.alias("consec_distance_km"), + speed_expr.alias("speed_kph"), + ]) + + violations = valid.filter( + pl.col("speed_kph").is_not_null() + & pl.col("arrival_time").is_not_null() + & pl.col("prev_departure_time").is_not_null() + & (pl.col("speed_kph") > pl.col("max_speed_kph")) + ) + + notices: list[Notice] = [ + _build_consecutive_notice(row, stop_name_by_stop_id) + for row in violations.to_dicts() + ] + + # =================================================================== + # Far-stop check (only for qualifying trips with total distance > 10 km) + # =================================================================== + # Compute segment distances for ALL rows (NaN coords → 0 km). + joined = joined.with_columns([ + pl.col("_rlat").shift(1).over("trip_id").alias("_seg_prev_lat"), + pl.col("_rlon").shift(1).over("trip_id").alias("_seg_prev_lon"), + ]) + seg_dist_expr = _haversine_km_expr("_seg_prev_lat", "_seg_prev_lon", "_rlat", "_rlon") + joined = joined.with_columns( + seg_dist_expr.fill_null(0.0).alias("_seg_dist_km"), + ) + joined = joined.with_columns( + pl.col("_seg_dist_km").cum_sum().over("trip_id").alias("_prefix_dist_km"), + ) + + # Find trips exceeding the distance threshold. + trip_totals = joined.group_by("trip_id").agg( + pl.col("_seg_dist_km").sum().alias("_total_dist_km"), + ).filter(pl.col("_total_dist_km") > FAR_STOP_DISTANCE_THRESHOLD_KM) + + if not trip_totals.is_empty(): + qualifying_ids = set(trip_totals["trip_id"].to_list()) + far_df = joined.filter(pl.col("trip_id").is_in(list(qualifying_ids))) + + # Materialize only the columns needed for far-stop iteration. + far_trip_ids = far_df["trip_id"].to_list() + far_stop_ids = far_df["stop_id"].to_list() + far_csv_rows = far_df["csv_row_number"].to_list() + far_stop_seqs = far_df["stop_sequence"].to_list() + far_arrival = far_df["arrival_time"].to_list() + far_departure = far_df["departure_time"].to_list() + far_route_ids = far_df["route_id"].to_list() + far_route_types = far_df["route_type"].to_list() + far_trip_csv_rows = far_df["trip_csv_row_number"].to_list() + far_max_speeds = far_df["max_speed_kph"].to_list() + far_prefix = far_df["_prefix_dist_km"].to_list() + + total_far = len(far_trip_ids) + trip_start = 0 + + while trip_start < total_far: + trip_end = trip_start + 1 + trip_id = far_trip_ids[trip_start] + while trip_end < total_far and far_trip_ids[trip_end] == trip_id: + trip_end += 1 + + trip_len = trip_end - trip_start + if trip_len < 2: + trip_start = trip_end + continue + + max_speed = float(far_max_speeds[trip_start]) + prefix_distances_km = [far_prefix[trip_start + i] for i in range(trip_len)] + total_trip_distance_km = prefix_distances_km[-1] + + if total_trip_distance_km <= FAR_STOP_DISTANCE_THRESHOLD_KM: + trip_start = trip_end + continue - # Cheap skip: no far-stop pair can exceed the distance threshold. - if total_trip_distance_km > FAR_STOP_DISTANCE_THRESHOLD_KM: far_notice: Notice | None = None abort_far_check = False for end_rel_idx in range(trip_len): end_idx = trip_start + end_rel_idx - arrival_secs = arrival_times[end_idx] + arrival_secs = far_arrival[end_idx] if arrival_secs is None: continue - end_stop_id = stop_ids[end_idx] + end_stop_id = far_stop_ids[end_idx] if end_stop_id not in existing_stop_ids: abort_far_check = True break @@ -407,7 +480,7 @@ def validate_stop_time_travel_speed( for start_rel_idx in range(farthest_near_start_idx, -1, -1): start_idx = trip_start + start_rel_idx - departure_secs = departure_times[start_idx] + departure_secs = far_departure[start_idx] if departure_secs is None: continue @@ -427,7 +500,7 @@ def validate_stop_time_travel_speed( if speed <= max_speed: continue - start_stop_id = stop_ids[start_idx] + start_stop_id = far_stop_ids[start_idx] if start_stop_id not in existing_stop_ids: abort_far_check = True break @@ -436,18 +509,18 @@ def validate_stop_time_travel_speed( code="fast_travel_between_far_stops", severity=Severity.WARNING, fields=_build_notice_fields( - int(trip_csv_row_number), + int(far_trip_csv_rows[trip_start]), str(trip_id), - str(route_id), + str(far_route_ids[trip_start]), speed, distance_to_end, start_idx, end_idx, - csv_row_numbers, - stop_sequences, - stop_ids, - departure_times, - arrival_times, + far_csv_rows, + far_stop_seqs, + far_stop_ids, + far_departure, + far_arrival, stop_name_by_stop_id, ), ) @@ -459,6 +532,6 @@ def validate_stop_time_travel_speed( if far_notice is not None and not abort_far_check: notices.append(far_notice) - trip_start = trip_end + trip_start = trip_end return notices diff --git a/tests/test_validators/test_shape_to_stop_matching_util.py b/tests/test_validators/test_shape_to_stop_matching_util.py index 6d7c70ce75..22dabc0699 100644 --- a/tests/test_validators/test_shape_to_stop_matching_util.py +++ b/tests/test_validators/test_shape_to_stop_matching_util.py @@ -8,18 +8,26 @@ from gtfs_validator.validators.shape_to_stop_matching_util import ( CandidateMatch, + ShapeArrays, ShapePoint, StopPoint, + build_shape_arrays, build_shape_points, closest_point_on_edge, compute_trip_hash, find_best_assignment, + find_closest_on_shape, find_potential_matches, geo_distance_meters, latlng_to_unit_vector, resolve_stop_location, unit_vector_to_latlng, MatchSettings, + _closest_points_on_edges_vec, + _haversine_meters_vec, + _seg_fraction_vec, + _uv_to_latlng_vec, + build_shape_spatial_index, ) # --------------------------------------------------------------------------- @@ -314,3 +322,211 @@ def test_find_best_assignment_out_of_order() -> None: assert len(out_of_order) == 1 stop_idx, match1, match2 = out_of_order[0] assert stop_idx == 1 + + +def test_find_best_assignment_fast_path_single_candidate_per_stop() -> None: + """Fast path should preserve in-order single-candidate assignments.""" + c0 = CandidateMatch(geo_distance_to_shape=5.0, shape_geo_distance=10.0, lat=47.0, lon=8.0) + c1 = CandidateMatch(geo_distance_to_shape=6.0, shape_geo_distance=20.0, lat=47.1, lon=8.1) + stops = [ + StopPoint( + lat=47.0, + lon=8.0, + user_distance=0.0, + stop_time_row={"stop_id": "A", "stop_sequence": 0, "csv_row_number": 1}, + is_large_station=False, + ), + StopPoint( + lat=47.1, + lon=8.1, + user_distance=0.0, + stop_time_row={"stop_id": "B", "stop_sequence": 1, "csv_row_number": 2}, + is_large_station=False, + ), + ] + + assignment, out_of_order = find_best_assignment([[c0], [c1]], stops) + assert assignment == [c0, c1] + assert out_of_order == [] + + +def test_find_best_assignment_fast_path_handles_empty_candidate_list() -> None: + """Fast path should keep assignment slots aligned when a stop has no candidates.""" + c0 = CandidateMatch(geo_distance_to_shape=5.0, shape_geo_distance=100.0, lat=47.0, lon=8.0) + c2 = CandidateMatch(geo_distance_to_shape=6.0, shape_geo_distance=50.0, lat=47.2, lon=8.2) + stops = [ + StopPoint( + lat=47.0, + lon=8.0, + user_distance=0.0, + stop_time_row={"stop_id": "A", "stop_sequence": 0, "csv_row_number": 1}, + is_large_station=False, + ), + StopPoint( + lat=47.1, + lon=8.1, + user_distance=0.0, + stop_time_row={"stop_id": "B", "stop_sequence": 1, "csv_row_number": 2}, + is_large_station=False, + ), + StopPoint( + lat=47.2, + lon=8.2, + user_distance=0.0, + stop_time_row={"stop_id": "C", "stop_sequence": 2, "csv_row_number": 3}, + is_large_station=False, + ), + ] + + assignment, out_of_order = find_best_assignment([[c0], [], [c2]], stops) + assert assignment == [c0, None, c2] + assert len(out_of_order) == 1 + stop_idx, match1, match2 = out_of_order[0] + assert stop_idx == 2 + assert match1 is c2 + assert match2 is c0 + + +# --------------------------------------------------------------------------- +# Vectorised geometry cross-checks +# --------------------------------------------------------------------------- + +import numpy as np +from gtfs_validator.validators.shape_to_stop_matching_util import ( + _cross, + _dot, + _norm, + _normalize, + _seg_fraction, +) + + +def test_haversine_meters_vec_matches_scalar() -> None: + """Vectorised haversine produces same results as scalar version.""" + pairs = [ + (47.3769, 8.5417, 46.2044, 6.1432), + (47.0, 8.0, 47.0, 8.0), + (40.7128, -74.006, 51.5074, -0.1278), + (0.0, 0.0, 0.0, 1.0), + ] + for lat1, lon1, lat2, lon2 in pairs: + scalar = geo_distance_meters(lat1, lon1, lat2, lon2) + vec = float(_haversine_meters_vec( + np.array([lat1]), np.array([lon1]), + np.array([lat2]), np.array([lon2]), + )[0]) + assert abs(scalar - vec) < 0.01, f"Mismatch for ({lat1},{lon1})->({lat2},{lon2}): {scalar} vs {vec}" + + +def test_closest_points_on_edges_vec_matches_scalar() -> None: + """Vectorised closest_point_on_edge matches scalar for several cases.""" + # Arc on equator (0,0)-(0,1), stop just north of midpoint + av = latlng_to_unit_vector(0.0, 0.0) + bv = latlng_to_unit_vector(0.0, 1.0) + pv = latlng_to_unit_vector(0.001, 0.5) + + scalar_lat, scalar_lon = closest_point_on_edge(pv, av, bv) + + a_arr = np.array([av]) + b_arr = np.array([bv]) + p_arr = np.array(pv) + result = _closest_points_on_edges_vec(p_arr, a_arr, b_arr) + vec_lats, vec_lons = _uv_to_latlng_vec(result) + + assert abs(scalar_lat - vec_lats[0]) < 1e-6 + assert abs(scalar_lon - vec_lons[0]) < 1e-6 + + +def test_seg_fraction_vec_matches_scalar() -> None: + """Vectorised seg_fraction matches scalar for several cases.""" + av = latlng_to_unit_vector(0.0, 0.0) + bv = latlng_to_unit_vector(0.0, 1.0) + + test_points = [ + latlng_to_unit_vector(0.001, 0.5), # midpoint + latlng_to_unit_vector(0.0, 2.0), # beyond b + latlng_to_unit_vector(0.0, -1.0), # before a + latlng_to_unit_vector(0.001, 0.25), # quarter + ] + + for pv in test_points: + scalar = _seg_fraction(pv, av, bv) + vec = float(_seg_fraction_vec( + np.array(pv), np.array([av]), np.array([bv]), + )[0]) + assert abs(scalar - vec) < 1e-6, f"Mismatch: scalar={scalar}, vec={vec}" + + +def test_find_potential_matches_vec_matches_scalar() -> None: + """Vectorised find_potential_matches returns same candidates as scalar.""" + shape_points = build_shape_points(SHAPE_ROWS) + shape_arrays = build_shape_arrays(shape_points) + stop = _make_stop(47.3659, 8.5254) + + scalar_matches = find_potential_matches(stop, shape_points, 150.0) + vec_matches = find_potential_matches(stop, shape_points, 150.0, shape_arrays) + + assert len(scalar_matches) == len(vec_matches), ( + f"Count mismatch: scalar={len(scalar_matches)}, vec={len(vec_matches)}" + ) + for sm, vm in zip(scalar_matches, vec_matches): + assert abs(sm.geo_distance_to_shape - vm.geo_distance_to_shape) < 0.5 + assert abs(sm.shape_geo_distance - vm.shape_geo_distance) < 0.5 + + +def test_find_closest_on_shape_vec_matches_scalar() -> None: + """Vectorised find_closest_on_shape returns same result as scalar.""" + shape_points = build_shape_points(SHAPE_ROWS) + shape_arrays = build_shape_arrays(shape_points) + # A stop that's far from the shape + stop = _make_stop(47.370, 8.530) + + scalar = find_closest_on_shape(stop, shape_points) + vec = find_closest_on_shape(stop, shape_points, shape_arrays) + + assert abs(scalar.geo_distance_to_shape - vec.geo_distance_to_shape) < 0.5 + assert abs(scalar.shape_geo_distance - vec.shape_geo_distance) < 0.5 + assert abs(scalar.lat - vec.lat) < 1e-4 + assert abs(scalar.lon - vec.lon) < 1e-4 + + +def test_find_potential_matches_spatial_index_matches_scalar() -> None: + """Spatial-indexed scalar path should match plain scalar candidates.""" + shape_points = build_shape_points(SHAPE_ROWS * 30) + spatial_index = build_shape_spatial_index(shape_points) + assert spatial_index is not None + stop = _make_stop(47.3659, 8.5254) + + scalar_matches = find_potential_matches(stop, shape_points, 150.0) + indexed_matches = find_potential_matches( + stop, + shape_points, + 150.0, + spatial_index=spatial_index, + ) + + assert len(scalar_matches) == len(indexed_matches) + for sm, im in zip(scalar_matches, indexed_matches): + assert abs(sm.geo_distance_to_shape - im.geo_distance_to_shape) < 0.5 + assert abs(sm.shape_geo_distance - im.shape_geo_distance) < 0.5 + + +def test_find_potential_matches_looping_shape_vec() -> None: + """Vectorised path also finds 3 local minima on the looping shape.""" + looping_shape_rows = [ + {"shape_pt_sequence": 0, "shape_pt_lat": 47.365000, "shape_pt_lon": 8.5240, "shape_dist_traveled": None}, + {"shape_pt_sequence": 1, "shape_pt_lat": 47.365000, "shape_pt_lon": 8.5260, "shape_dist_traveled": None}, + {"shape_pt_sequence": 2, "shape_pt_lat": 47.374000, "shape_pt_lon": 8.5270, "shape_dist_traveled": None}, + {"shape_pt_sequence": 3, "shape_pt_lat": 47.374000, "shape_pt_lon": 8.5230, "shape_dist_traveled": None}, + {"shape_pt_sequence": 4, "shape_pt_lat": 47.365000, "shape_pt_lon": 8.5230, "shape_dist_traveled": None}, + {"shape_pt_sequence": 5, "shape_pt_lat": 47.365000, "shape_pt_lon": 8.5260, "shape_dist_traveled": None}, + {"shape_pt_sequence": 6, "shape_pt_lat": 47.374000, "shape_pt_lon": 8.5260, "shape_dist_traveled": None}, + {"shape_pt_sequence": 7, "shape_pt_lat": 47.374000, "shape_pt_lon": 8.5220, "shape_dist_traveled": None}, + {"shape_pt_sequence": 8, "shape_pt_lat": 47.365000, "shape_pt_lon": 8.5220, "shape_dist_traveled": None}, + {"shape_pt_sequence": 9, "shape_pt_lat": 47.365000, "shape_pt_lon": 8.5260, "shape_dist_traveled": None}, + ] + looping_points = build_shape_points(looping_shape_rows) + looping_arrays = build_shape_arrays(looping_points) + stop = _make_stop(47.365000, 8.525000) + matches = find_potential_matches(stop, looping_points, 150.0, looping_arrays) + assert len(matches) == 3, f"Expected 3 local minima, got {len(matches)}" diff --git a/uv.lock b/uv.lock index f21f308e4c..b59e7fdd4b 100644 --- a/uv.lock +++ b/uv.lock @@ -53,6 +53,7 @@ dependencies = [ { name = "click" }, { name = "httpx" }, { name = "jinja2" }, + { name = "numpy" }, { name = "polars" }, { name = "shapely" }, ] @@ -69,6 +70,7 @@ requires-dist = [ { name = "click", specifier = ">=8.0" }, { name = "httpx", specifier = ">=0.27" }, { name = "jinja2", specifier = ">=3.0" }, + { name = "numpy", specifier = ">=1.24" }, { name = "polars", specifier = ">=1.0" }, { name = "shapely", specifier = ">=2.0" }, ] From 0c742c94cf192e0563fce2ae3ab1c7134f59fb96 Mon Sep 17 00:00:00 2001 From: Ryan Mahoney Date: Tue, 10 Mar 2026 00:35:33 -0400 Subject: [PATCH 2/4] refactor(shape_to_stop_matching): simplify vectorization logic by always passing shape_arrays Remove conditional use_vectorized checks in match_using_geo_distance and match_using_user_distance functions. The shape_arrays parameter is now passed directly to helper functions, letting them handle None cases. --- .../validators/shape_to_stop_matching_util.py | 20 +++---------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/src/gtfs_validator/validators/shape_to_stop_matching_util.py b/src/gtfs_validator/validators/shape_to_stop_matching_util.py index 267faea682..dc6bd0268c 100644 --- a/src/gtfs_validator/validators/shape_to_stop_matching_util.py +++ b/src/gtfs_validator/validators/shape_to_stop_matching_util.py @@ -1103,13 +1103,6 @@ def match_using_geo_distance( candidates_per_stop: list[list[CandidateMatch]] = [] # Maps index in candidates_per_stop -> index in stop_points (skips too-far stops) candidate_stop_idx: list[int] = [] - num_segments = max(0, len(shape_points) - 1) - use_vectorized = ( - shape_arrays is not None - and num_segments >= _VEC_MIN_SEGMENTS - and (num_segments * len(stop_points)) >= _VEC_MIN_WORK_ITEMS - ) - for orig_idx, stop in enumerate(stop_points): max_dist = settings.max_distance_meters if stop.is_large_station: @@ -1125,7 +1118,7 @@ def match_using_geo_distance( stop, shape_points, max_dist, - shape_arrays if use_vectorized else None, + shape_arrays, spatial_index, ) if candidates_cache is not None and candidates_key is not None: @@ -1155,7 +1148,7 @@ def match_using_geo_distance( closest = find_closest_on_shape( stop, shape_points, - shape_arrays if use_vectorized else None, + shape_arrays, ) if closest_cache is not None and closest_key is not None: closest_cache[closest_key] = closest @@ -1209,13 +1202,6 @@ def match_using_user_distance( problems: list[Problem] = [] candidates_per_stop: list[list[CandidateMatch]] = [] search_from = 0 - num_segments = max(0, len(shape_points) - 1) - use_vectorized = ( - shape_arrays is not None - and num_segments >= _VEC_MIN_SEGMENTS - and (num_segments * len(stop_points)) >= _VEC_MIN_WORK_ITEMS - ) - for stop in stop_points: if stop.user_distance > 0.0: interp_match, search_from = interpolate_user_distance( @@ -1246,7 +1232,7 @@ def match_using_user_distance( stop, shape_points, max_dist, - shape_arrays if use_vectorized else None, + shape_arrays, spatial_index, ) if candidates_cache is not None and candidates_key is not None: From 80898eb380faf95eebd2d593e38824adf71fae26 Mon Sep 17 00:00:00 2001 From: Ryan Mahoney Date: Tue, 10 Mar 2026 00:37:23 -0400 Subject: [PATCH 3/4] perf(shape_to_stop_matching): reduce object allocation in hot loops - Introduce TripInfo dataclass to replace dict usage for trip data - Defer CandidateMatch instantiation in find_potential_matches and find_closest_on_shape by tracking scalar values during iteration - Simplify routes_by_id to route_type_by_id storing only needed field - Remove Optional types and assertions from hot paths --- .../validators/shape_to_stop_matching.py | 48 +++++++---- .../validators/shape_to_stop_matching_util.py | 83 ++++++++++++------- 2 files changed, 84 insertions(+), 47 deletions(-) diff --git a/src/gtfs_validator/validators/shape_to_stop_matching.py b/src/gtfs_validator/validators/shape_to_stop_matching.py index 1bd1023ea3..4b99ee782b 100644 --- a/src/gtfs_validator/validators/shape_to_stop_matching.py +++ b/src/gtfs_validator/validators/shape_to_stop_matching.py @@ -8,6 +8,7 @@ from __future__ import annotations from collections import defaultdict +from dataclasses import dataclass from typing import Optional import polars as pl @@ -34,9 +35,16 @@ _REQUIRED_TABLES = ["stops", "trips", "routes", "stop_times", "shapes"] +@dataclass(frozen=True) +class TripInfo: + trip_id: str + route_id: str + csv_row_number: int + + def _problems_to_notices( problems: list[Problem], - trip: dict, + trip: TripInfo, shape_id: str, stops_by_id: dict, reported_stop_ids: set[str], @@ -76,9 +84,9 @@ def _problems_to_notices( code=code, severity=Severity.WARNING, fields={ - "trip_csv_row_number": trip["csv_row_number"], + "trip_csv_row_number": trip.csv_row_number, "shape_id": shape_id, - "trip_id": trip["trip_id"], + "trip_id": trip.trip_id, "stop_time_csv_row_number": stop_time_row["csv_row_number"], "stop_id": stop_id, "stop_name": stop_name, @@ -106,9 +114,9 @@ def _problems_to_notices( code="stop_has_too_many_matches_for_shape", severity=Severity.WARNING, fields={ - "trip_csv_row_number": trip["csv_row_number"], + "trip_csv_row_number": trip.csv_row_number, "shape_id": shape_id, - "trip_id": trip["trip_id"], + "trip_id": trip.trip_id, "stop_time_csv_row_number": stop_time_row["csv_row_number"], "stop_id": stop_id, "stop_name": stop_name, @@ -142,9 +150,9 @@ def _problems_to_notices( code="stops_match_shape_out_of_order", severity=Severity.WARNING, fields={ - "trip_csv_row_number": trip["csv_row_number"], + "trip_csv_row_number": trip.csv_row_number, "shape_id": shape_id, - "trip_id": trip["trip_id"], + "trip_id": trip.trip_id, "stop_time_csv_row_number1": stop_time_row1["csv_row_number"], "stop_id1": stop_id1, "stop_name1": stop_name1, @@ -199,20 +207,25 @@ def validate_shape_to_stop_matching( for row in stops_df.select(available_stops_cols).iter_rows(named=True) } - routes_by_id: dict[str, dict] = { - row["route_id"]: row + route_type_by_id: dict[str, int] = { + row["route_id"]: row["route_type"] for row in routes_df.select(["route_id", "route_type"]).iter_rows(named=True) } # Group trips by shape_id first so downstream row processing can filter. trip_cols = ["trip_id", "route_id", "shape_id", "csv_row_number"] - trips_by_shape_id: dict[str, list[dict]] = defaultdict(list) + trips_by_shape_id: dict[str, list[TripInfo]] = defaultdict(list) relevant_trip_ids: set[str] = set() for row in trips_df.select(trip_cols).iter_rows(named=True): shape_id = row.get("shape_id") if shape_id: - trips_by_shape_id[shape_id].append(row) - relevant_trip_ids.add(row["trip_id"]) + trip = TripInfo( + trip_id=row["trip_id"], + route_id=row["route_id"], + csv_row_number=row["csv_row_number"], + ) + trips_by_shape_id[shape_id].append(trip) + relevant_trip_ids.add(trip.trip_id) # Group stop_times by trip_id st_cols = ["trip_id", "stop_id", "stop_sequence", "csv_row_number"] @@ -265,7 +278,7 @@ def validate_shape_to_stop_matching( num_segments = max(0, len(shape_points) - 1) max_trip_stops = max( - (len(st_by_trip_id.get(trip["trip_id"], [])) for trip in trips_for_shape), + (len(st_by_trip_id.get(trip.trip_id, [])) for trip in trips_for_shape), default=0, ) should_build_shape_arrays = ( @@ -285,8 +298,7 @@ def validate_shape_to_stop_matching( need_trip_dedup = len(trips_for_shape) > 1 for trip in trips_for_shape: - trip_id = trip["trip_id"] - stop_times = st_by_trip_id.get(trip_id, []) + stop_times = st_by_trip_id.get(trip.trip_id, []) if not stop_times: continue @@ -317,11 +329,11 @@ def validate_shape_to_stop_matching( continue state.add(cur_hash) - route = routes_by_id.get(trip["route_id"]) - if route is None: + route_type = route_type_by_id.get(trip.route_id) + if route_type is None: continue - is_large_route = route["route_type"] == RAIL_ROUTE_TYPE + is_large_route = route_type == RAIL_ROUTE_TYPE stop_points = build_stop_points(stop_times, stops_by_id, is_large_route, resolved_latlng) # Geo-distance matching (always) diff --git a/src/gtfs_validator/validators/shape_to_stop_matching_util.py b/src/gtfs_validator/validators/shape_to_stop_matching_util.py index dc6bd0268c..6eeff11172 100644 --- a/src/gtfs_validator/validators/shape_to_stop_matching_util.py +++ b/src/gtfs_validator/validators/shape_to_stop_matching_util.py @@ -683,7 +683,9 @@ def find_potential_matches( in_close_run = False run_best_dist = math.inf - run_best_match: Optional[CandidateMatch] = None + run_best_geo_dist = 0.0 + run_best_lat = 0.0 + run_best_lon = 0.0 matches: list[CandidateMatch] = [] seg_ids = ( @@ -694,11 +696,16 @@ def find_potential_matches( prev_seg_id = -1 for i in seg_ids: if in_close_run and prev_seg_id >= 0 and i != prev_seg_id + 1: - assert run_best_match is not None - matches.append(run_best_match) + matches.append( + CandidateMatch( + geo_distance_to_shape=run_best_dist, + shape_geo_distance=run_best_geo_dist, + lat=run_best_lat, + lon=run_best_lon, + ) + ) in_close_run = False run_best_dist = math.inf - run_best_match = None a = shape_points[i] b = shape_points[i + 1] @@ -726,11 +733,16 @@ def find_potential_matches( or stop.lon > max_lon + lon_slack ): if in_close_run: - assert run_best_match is not None - matches.append(run_best_match) + matches.append( + CandidateMatch( + geo_distance_to_shape=run_best_dist, + shape_geo_distance=run_best_geo_dist, + lat=run_best_lat, + lon=run_best_lon, + ) + ) in_close_run = False run_best_dist = math.inf - run_best_match = None continue av = a.uv @@ -741,30 +753,40 @@ def find_potential_matches( if dist <= max_dist: frac = _seg_fraction(p, av, bv) matched_geo_dist = a.geo_distance + frac * (b.geo_distance - a.geo_distance) - candidate = CandidateMatch( - geo_distance_to_shape=dist, - shape_geo_distance=matched_geo_dist, - lat=closest_lat, - lon=closest_lon, - ) if not in_close_run: in_close_run = True run_best_dist = dist - run_best_match = candidate + run_best_geo_dist = matched_geo_dist + run_best_lat = closest_lat + run_best_lon = closest_lon elif dist < run_best_dist: run_best_dist = dist - run_best_match = candidate + run_best_geo_dist = matched_geo_dist + run_best_lat = closest_lat + run_best_lon = closest_lon else: if in_close_run: - assert run_best_match is not None - matches.append(run_best_match) + matches.append( + CandidateMatch( + geo_distance_to_shape=run_best_dist, + shape_geo_distance=run_best_geo_dist, + lat=run_best_lat, + lon=run_best_lon, + ) + ) in_close_run = False run_best_dist = math.inf - run_best_match = None prev_seg_id = i - if in_close_run and run_best_match is not None: - matches.append(run_best_match) + if in_close_run: + matches.append( + CandidateMatch( + geo_distance_to_shape=run_best_dist, + shape_geo_distance=run_best_geo_dist, + lat=run_best_lat, + lon=run_best_lon, + ) + ) return matches @@ -855,7 +877,9 @@ def find_closest_on_shape( # --- Scalar path (inlined for zero overhead) --- p = latlng_to_unit_vector(stop.lat, stop.lon) best_dist = math.inf - best_match: Optional[CandidateMatch] = None + best_geo_dist = 0.0 + best_lat = 0.0 + best_lon = 0.0 for i in range(len(shape_points) - 1): a = shape_points[i] @@ -868,15 +892,16 @@ def find_closest_on_shape( best_dist = dist frac = _seg_fraction(p, av, bv) matched_geo_dist = a.geo_distance + frac * (b.geo_distance - a.geo_distance) - best_match = CandidateMatch( - geo_distance_to_shape=dist, - shape_geo_distance=matched_geo_dist, - lat=closest_lat, - lon=closest_lon, - ) + best_geo_dist = matched_geo_dist + best_lat = closest_lat + best_lon = closest_lon - assert best_match is not None - return best_match + return CandidateMatch( + geo_distance_to_shape=best_dist, + shape_geo_distance=best_geo_dist, + lat=best_lat, + lon=best_lon, + ) def _find_closest_on_shape_vec( From 9197d507a3ff1da55a2433b0ef1bdaff88a749de Mon Sep 17 00:00:00 2001 From: Ryan Mahoney Date: Tue, 10 Mar 2026 00:54:51 -0400 Subject: [PATCH 4/4] perf(shape_to_stop_matching): add early exit and reduce dict lookups - Early return when no trips have shape_id to avoid unnecessary processing - Cache stop_id in local variable in build_stop_points to avoid repeated dictionary lookups in hot loop --- src/gtfs_validator/validators/shape_to_stop_matching.py | 3 +++ src/gtfs_validator/validators/shape_to_stop_matching_util.py | 5 +++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/gtfs_validator/validators/shape_to_stop_matching.py b/src/gtfs_validator/validators/shape_to_stop_matching.py index 4b99ee782b..3291587b2c 100644 --- a/src/gtfs_validator/validators/shape_to_stop_matching.py +++ b/src/gtfs_validator/validators/shape_to_stop_matching.py @@ -227,6 +227,9 @@ def validate_shape_to_stop_matching( trips_by_shape_id[shape_id].append(trip) relevant_trip_ids.add(trip.trip_id) + if not trips_by_shape_id: + return [] + # Group stop_times by trip_id st_cols = ["trip_id", "stop_id", "stop_sequence", "csv_row_number"] optional_st = ["shape_dist_traveled"] diff --git a/src/gtfs_validator/validators/shape_to_stop_matching_util.py b/src/gtfs_validator/validators/shape_to_stop_matching_util.py index 6eeff11172..94277b89fe 100644 --- a/src/gtfs_validator/validators/shape_to_stop_matching_util.py +++ b/src/gtfs_validator/validators/shape_to_stop_matching_util.py @@ -555,11 +555,12 @@ def build_stop_points( n = len(stop_times) result: list[StopPoint] = [] for i, st in enumerate(stop_times): + stop_id = st["stop_id"] if resolved_latlng is not None: - ll = resolved_latlng.get(st["stop_id"]) + ll = resolved_latlng.get(stop_id) lat, lon = (ll[0], ll[1]) if ll is not None else (0.0, 0.0) else: - lat, lon = resolve_stop_location(st["stop_id"], stops_by_id) + lat, lon = resolve_stop_location(stop_id, stops_by_id) user_dist = st.get("shape_dist_traveled") or 0.0 is_large = is_large_route and (i == 0 or i == n - 1) result.append(