diff --git a/akd_ext/agents/ieso.worldview.py b/akd_ext/agents/ieso.worldview.py new file mode 100644 index 0000000..81772f4 --- /dev/null +++ b/akd_ext/agents/ieso.worldview.py @@ -0,0 +1,337 @@ +"""IESO CARE Agent for NASA Worldview visualization. + +This module implements the IESO Agent for guided, +reproducible discovery of NASA Worldview visualizations. + +Public API: + IESOWorldviewAgent, IESOWorldviewAgentInputSchema, IESOWorldviewAgentOutputSchema, IESOWorldviewAgentConfig +""" + +from __future__ import annotations + +from typing import Literal + +from pydantic import Field + +from akd._base import ( + InputSchema, + OutputSchema, + TextOutput, +) +from akd_ext.agents._base import ( + PydanticAIBaseAgent, + PydanticAIBaseAgentConfig, +) + + +# ----------------------------------------------------------------------------- +# System Prompts +# ----------------------------------------------------------------------------- + +IESO_WORLDVIEW_AGENT_SYSTEM_PROMPT = """ + ## **ROLE** + + You are a **NASA Worldview Scientific Data Assistant Agent**. + + You act as a **non-authoritative, transparency-first guide** that helps users: + + * Discover NASA datasets + * Understand dataset meaning, limitations, and proxies + * Configure and generate Worldview visualizations + * Perform **Worldview-native exploratory analysis only** + + You **do not interpret, conclude, recommend scientifically, or make decisions for the user**. + + ## **OBJECTIVE** + + Enable users to: + + 1. Translate their intent into scientifically relevant datasets + 2. Explore datasets via **NASA Worldview deep links** + 3. Understand dataset caveats, uncertainty, and limitations + 4. **Support visualization-driven and all the analysis workflows aligned with Worldview capabilities** + 5. Maintain **full human control over dataset selection and interpretation** + + ## **CONTEXT & INPUTS** + + ### **Available Systems & Tools** + + * **NASA Worldview (Primary Interface)** + * Generate deep links using URL parameters + * Support layers, time, comparison modes, charting + * **CMR API (Metadata Authority)** + * `search_collections`, `get_collection_metadata` + * UMM-based authoritative metadata + * **Earthdata Search Links** + * Dataset landing pages (no downloads or execution) + * **EONET** + * Event context (wildfires, storms, etc.) + * **Science Discovery Engine (Fallback)** + * Used only if dataset not found in primary sources + * **Worldview Layer Vector DB** + * Semantic mapping (non-authoritative) + * **Document Fetch Tool (ATBD/User Guide)** + * Triggered after dataset identification + + --- + + ### **User Inputs** + + * Natural language query (scientific or colloquial) + * Optional constraints: + * Time range + * Location + * Variable + * User expertise level: + * Beginner / Intermediate / Advanced (must be requested if unknown) + + ## **CONSTRAINTS & STYLE RULES** + + ### **Hard Constraints** + + * No scientific interpretation or conclusions + * No data inference or fabrication + * No predictive analysis + * No dataset ranking as final decision + * No autonomous dataset selection (user confirmation required) + * Only **pre-defined metrics and Worldview-supported analysis** + + ### **Guardrail Enforcement** + + * If violation detected → **REFUSE with explanation** + * If ambiguity → **ASK clarification** + * If partial data → **EXPLICITLY FLAG** + * Always include: + * Dataset provenance + * Uncertainty statement + * Non-authoritative disclaimer + + ### **Language Policy** + + * Avoid: + * “This shows…” + * “This means…” + * “This indicates…” + * Use: + * “This dataset represents…” + * “This visualization displays…” + * “Possible interpretation requires user judgment” + + ### **Output Style** + + Hybrid format: + + 1. **Structured schema (deterministic)** + 2. **User-adapted narrative** + 3. **Optional metadata expansion (on request)** + + ## **PROCESS** + + ### **Step 1: Intent Interpretation** + + * Extract: + * Goal + * Variables + * Constraints + * Normalize into scientific terms + * Ask clarification if ambiguity is high + + ### **Step 2: Expertise Detection** + + * Ask user to classify (Beginner / Intermediate / Advanced) + * Adapt: + * Vocabulary + * Detail level + * Explanation depth + + ### **Step 3: Feasibility Validation (HARD GATE)** + + * Check: + * Dataset availability + * Physical plausibility + * System capability + * If invalid: + * STOP + * Provide alternatives + + ### **Step 4: Dataset Retrieval** + + * Query: + * Worldview layers + * CMR metadata (Should be parallel) + * Use NASA SDE only if needed + * Do not override authoritative metadata + + ### **Step 5: Candidate Structuring** + + * Group datasets + * Explain: + * What dataset represents + * What it does NOT represent + * Proxy relationships + * Highlight a **recommended option (non-binding)** + + ### **Step 6: Mandatory User Confirmation** + + * Present options + * Ask: + * “Which dataset would you like to use?” + * DO NOT proceed without confirmation + + ### **Step 7: Visualization Construction** + + * Generate **Worldview deep link** + * Configure: + * Layers + * Time + * Viewport + * Comparison (if relevant) + * Ensure only valid parameters used + + ### **Step 8: Analysis Support (Limited)** + + * Provide: + * Time series (if supported) + * Regional statistics (if supported) + * Do NOT interpret results + + ### **Step 9: Provenance & Uncertainty** + + * Include: + * Dataset name + * Source + * Timestamp + * Resolution + * Add: + * Dataset uncertainty OR fallback statement + + ### **Step 10: Misuse Detection** + + Detect and block: + + * Causal inference + * Trend interpretation + * Invalid comparisons + * Proxy misuse + + ### **Step 11: Optional Expansion** + + Offer: + + * “Show dataset details” + * “Show metadata” + * “Open Earthdata page” + + ## **OUTPUT FORMAT** + + ### **1\. STRUCTURED RESPONSE** + + INTENT: + DATASET\_OPTIONS: + SELECTED\_DATASET: (ONLY after user confirmation) + WORLDVIEW\_URL: + + Options \[provide with more options\] + PARAMETERS\_USED: + PROVENANCE: + UNCERTAINTY: + LIMITATIONS: + MISSING\_FIELDS: + + ### **2\. USER NARRATIVE** + + * Beginner → simplified explanation + * Intermediate → moderate detail + * Advanced → technical description + + ### **3\. OPTIONAL ACTIONS** + + * View metadata + * Open dataset page + * Fetch documentation + + ### **4\. REQUIRED DISCLAIMER** + + “This information is derived from publicly available datasets and visualization tools on NASA Worldview . It is intended for exploratory and informational purposes only and does not constitute scientific analysis, interpretation, or validated conclusions.” +""" + +# ----------------------------------------------------------------------------- +# Configuration +# ----------------------------------------------------------------------------- + + +class IESOWorldviewAgentConfig(PydanticAIBaseAgentConfig): + """Configuration for IESO CARE Agent.""" + + description: str = Field( + default=( + """Earth science Worldview-visualization agent. Helps users translate + Earth science queries into NASA Worldview permalinks by clarifying intent, + surfacing candidate datasets, awaiting user confirmation, and producing a + reproducible visualization URL with provenance and uncertainty annotations. + Outputs are delivered via a structured schema and interactive chat with the + user for clarification, dataset selection, approval gates, and disclaimers.""" + ) + ) + system_prompt: str = Field(default=IESO_WORLDVIEW_AGENT_SYSTEM_PROMPT) + model_name: str = Field(default="openai:gpt-5.2") + reasoning_effort: Literal["low", "medium", "high"] | None = Field(default="medium") + + +# ----------------------------------------------------------------------------- +# Input/Output Schemas +# ----------------------------------------------------------------------------- + + +class IESOWorldviewAgentInputSchema(InputSchema): + """Input schema for the IESO Worldview-discovery agent.""" + + query: str = Field(..., description="Earth science query to interact with worldview visualization") + + +class IESOWorldviewAgentOutputSchema(OutputSchema): + """Structured Worldview-discovery response. Use this on the final turn, + after the user has confirmed a dataset; populate `result` with the full + sectioned response and `url` with the Worldview permalink. + Use TextOutput for clarification questions or when no dataset has been + confirmed yet.""" + + __response_field__ = "result" + result: str = Field( + ..., + description=( + "Full sectioned response: INTENT, DATASET_OPTIONS, " + "SELECTED_DATASET, WORLDVIEW_URL, PARAMETERS_USED, PROVENANCE, " + "UNCERTAINTY, LIMITATIONS, MISSING_FIELDS, USER NARRATIVE " + "OPTIONAL ACTIONS, and the REQUIRED DISCLAIMER" + "Format is defined by the system prompt." + ), + ) + url: str = Field( + ..., + description="Worldview permalink that resolves the science query.", + ) + + +# ----------------------------------------------------------------------------- +# IESO Agent +# ----------------------------------------------------------------------------- + + +class IESOWorldviewAgent(PydanticAIBaseAgent[IESOWorldviewAgentInputSchema, IESOWorldviewAgentOutputSchema]): + """Earth science Worldview-visualization agent. + + Resolves an Earth science query into a NASA Worldview permalink. + """ + + input_schema = IESOWorldviewAgentInputSchema + output_schema = IESOWorldviewAgentOutputSchema | TextOutput + config_schema = IESOWorldviewAgentConfig + + def check_output(self, output) -> str | None: + if isinstance(output, IESOWorldviewAgentOutputSchema): + if not output.result.strip(): + return "Result is empty. Provide the structured Worldview-discovery response." + if not output.url.strip(): + return "URL is empty. Provide a valid url" + return super().check_output(output) diff --git a/akd_ext/tools/__init__.py b/akd_ext/tools/__init__.py index fb1328e..d964b6f 100644 --- a/akd_ext/tools/__init__.py +++ b/akd_ext/tools/__init__.py @@ -1,41 +1,54 @@ """Tools module for akd_ext.""" -from .dummy import DummyInputSchema, DummyOutputSchema, DummyTool -from .sde_search import ( - SDEDocument, - SDESearchTool, - SDESearchToolConfig, - SDESearchToolInputSchema, - SDESearchToolOutputSchema, -) -from .code_search.code_signals import ( - CodeSignalsSearchInputSchema, - CodeSignalsSearchOutputSchema, - CodeSignalsSearchTool, - CodeSignalsSearchToolConfig, -) -from .code_search.repository_search import ( - RepositorySearchTool, - RepositorySearchToolInputSchema, - RepositorySearchToolOutputSchema, - RepositorySearchToolConfig, +# from .dummy import DummyInputSchema, DummyOutputSchema, DummyTool +# from .sde_search import ( +# SDEDocument, +# SDESearchTool, +# SDESearchToolConfig, +# SDESearchToolInputSchema, +# SDESearchToolOutputSchema, +# ) +# from .code_search.code_signals import ( +# CodeSignalsSearchInputSchema, +# CodeSignalsSearchOutputSchema, +# CodeSignalsSearchTool, +# CodeSignalsSearchToolConfig, +# ) +# from .code_search.repository_search import ( +# RepositorySearchTool, +# RepositorySearchToolInputSchema, +# RepositorySearchToolOutputSchema, +# RepositorySearchToolConfig, +# ) + +from .worldview import ( + LayerMapping, + UMMVisLookupTool, + UMMVisLookupToolConfig, + UMMVisLookupToolInputSchema, + UMMVisLookupToolOutputSchema, ) __all__ = [ - "DummyTool", - "DummyInputSchema", - "DummyOutputSchema", - "SDESearchTool", - "SDESearchToolInputSchema", - "SDESearchToolOutputSchema", - "SDESearchToolConfig", - "SDEDocument", - "CodeSignalsSearchInputSchema", - "CodeSignalsSearchOutputSchema", - "CodeSignalsSearchTool", - "CodeSignalsSearchToolConfig", - "RepositorySearchTool", - "RepositorySearchToolInputSchema", - "RepositorySearchToolOutputSchema", - "RepositorySearchToolConfig", + # "DummyTool", + # "DummyInputSchema", + # "DummyOutputSchema", + # "SDESearchTool", + # "SDESearchToolInputSchema", + # "SDESearchToolOutputSchema", + # "SDESearchToolConfig", + # "SDEDocument", + # "CodeSignalsSearchInputSchema", + # "CodeSignalsSearchOutputSchema", + # "CodeSignalsSearchTool", + # "CodeSignalsSearchToolConfig", + # "RepositorySearchTool", + # "RepositorySearchToolInputSchema", + # "RepositorySearchToolOutputSchema", + # "RepositorySearchToolConfig", + "LayerMapping", + "UMMVisLookupTool", + "UMMVisLookupToolConfig", + "UMMVisLookupToolInputSchema", + "UMMVisLookupToolOutputSchema", ] diff --git a/akd_ext/tools/worldview/__init__.py b/akd_ext/tools/worldview/__init__.py new file mode 100644 index 0000000..157ef01 --- /dev/null +++ b/akd_ext/tools/worldview/__init__.py @@ -0,0 +1,17 @@ +"""Worldview tools for akd_ext.""" + +from .cmr import ( + LayerMapping, + UMMVisLookupTool, + UMMVisLookupToolConfig, + UMMVisLookupToolInputSchema, + UMMVisLookupToolOutputSchema, +) + +__all__ = [ + "LayerMapping", + "UMMVisLookupTool", + "UMMVisLookupToolConfig", + "UMMVisLookupToolInputSchema", + "UMMVisLookupToolOutputSchema", +] diff --git a/akd_ext/tools/worldview/cmr/__init__.py b/akd_ext/tools/worldview/cmr/__init__.py new file mode 100644 index 0000000..703c6d4 --- /dev/null +++ b/akd_ext/tools/worldview/cmr/__init__.py @@ -0,0 +1,17 @@ +"""CMR UMM-Vis lookup utilities.""" + +from .ummvis_lookup import ( + LayerMapping, + UMMVisLookupTool, + UMMVisLookupToolConfig, + UMMVisLookupToolInputSchema, + UMMVisLookupToolOutputSchema, +) + +__all__ = [ + "LayerMapping", + "UMMVisLookupTool", + "UMMVisLookupToolConfig", + "UMMVisLookupToolInputSchema", + "UMMVisLookupToolOutputSchema", +] diff --git a/akd_ext/tools/worldview/cmr/ummvis_lookup.py b/akd_ext/tools/worldview/cmr/ummvis_lookup.py new file mode 100644 index 0000000..48e5e2f --- /dev/null +++ b/akd_ext/tools/worldview/cmr/ummvis_lookup.py @@ -0,0 +1,689 @@ +""" +UMM-Vis lookup tool — find Worldview/GIBS layers associated with a CMR collection. + +Maps a CMR collection concept-id to the GIBS visualization layer ID(s) recorded in +UMM-Vis. The output's ``layer_id`` is type-compatible with WorldviewPermalinkTool's +``LayerSpec.id``, completing the chain ``query → collection → layer → permalink``. + +Lookup paths. The canonical filter ``concept-ids=`` works for records that +populate ``umm.ConceptIds[].Value`` with the parent collection's C-id. Some UAT +records still leave that field as a placeholder; this tool falls back to a +client-side scan of ``umm.SourceDatasets`` / ``umm.RepresentingDatasets`` when the +canonical query returns empty. + +Layer-ID resolution. ``umm.Name`` typically carries a processing-version suffix +(``_v1_STD`` / ``_v2_NRT`` / …) that is *not* present in the public GIBS WMTS +catalog. We prefer ``Specification.ProductIdentification.BestAvailableExternalIdentifier`` +(the canonical externally-published name); failing that, we strip the version +suffix from ``umm.Name``. Validation against the live GIBS WMTS catalog is +optional and informational — set ``validate_against_gibs=True`` to populate +``LayerMapping.available_in_gibs``. +""" + +from __future__ import annotations + +import os +import re +import time +from datetime import datetime +from typing import Any, Literal + +import httpx +from akd._base import InputSchema, OutputSchema +from akd.tools import BaseTool, BaseToolConfig +from loguru import logger +from pydantic import BaseModel, Field + +from akd_ext.mcp import mcp_tool + +# Worldview projection identifiers used in URLs. EPSG:3857 (Web Mercator) is +# intentionally absent — the Worldview UI does not expose it as a top-level +# projection, so an OutputProjection of "EPSG:3857" maps to None. +_EPSG_TO_WORLDVIEW: dict[str, str] = { + "EPSG:4326": "geographic", + "EPSG:3413": "arctic", + "EPSG:3031": "antarctic", +} + +# Strip trailing processing-version markers from umm.Name to recover the public +# GIBS layer identifier. Covers the common patterns we observed across all +# 1109 prod viz records (e.g. "..._v1_STD", "..._v7_NRT"). +_VERSION_SUFFIX_RE = re.compile(r"_v\d+_(STD|NRT)$") + +# Sentinel/junk values that have leaked into BestAvailableExternalIdentifier on +# both UAT and prod. These are not real GIBS layer names. +_KNOWN_JUNK_LAYER_IDS: frozenset[str] = frozenset({"DUJUAN", "TEST", "PLACEHOLDER", "YET_TO_SUPPLY"}) + +# GIBS WMTS GetCapabilities endpoints. Merging all four projections is required +# because polar-only layers (e.g. sea ice) are not advertised on epsg4326. +_GIBS_WMTS_PROJECTIONS = ("epsg4326", "epsg3857", "epsg3413", "epsg3031") + +MatchPath = Literal["concept_ids", "source_datasets_fallback"] +LayerIdSource = Literal["best", "name_stripped", "name_raw", "best_pending_gibs", "unresolved"] + +# ----------------------------------------------------------------------------- +# Tool Config +# ----------------------------------------------------------------------------- + + +class UMMVisLookupToolConfig(BaseToolConfig): + """Configuration for the UMM-Vis Lookup Tool. + - base_url: CMR Search base. UAT default; switch to OPS via CMR_BASE_URL env var. + - timeout: HTTP timeout in seconds (default 15s). + - page_size: UMM-Vis search page size (default 2000; covers UAT today). + - fallback_cache_ttl_seconds: TTL for caching the Path B all-records fetch. + Default 300s; set 0 to disable. + """ + + base_url: str = Field( + default_factory=lambda: os.getenv("CMR_BASE_URL", "https://cmr.uat.earthdata.nasa.gov/search"), + description=("CMR Search base URL. UAT default; switch to OPS via the CMR_BASE_URL env var."), + ) + timeout: float = Field( + default=15.0, + description="HTTP request timeout in seconds.", + ) + page_size: int = Field( + default=2000, + ge=1, + le=2000, + description=("UMM-Vis search page size (max 2000; UAT has ~1155 records total today)."), + ) + fallback_cache_ttl_seconds: float = Field( + default=300.0, + ge=0.0, + description=("TTL for the Path B all-records cache. Set 0 to disable caching."), + ) + validate_against_gibs: bool = Field( + default=False, + description=( + "When True, fetch the GIBS WMTS GetCapabilities catalog and tag each " + "LayerMapping with available_in_gibs. Informational only — layers are not " + "filtered out when missing from GIBS, since some valid records describe " + "layers not yet published." + ), + ) + gibs_cache_ttl_seconds: float = Field( + default=86400.0, + ge=0.0, + description=( + "TTL for the cached merged GIBS WMTS catalog (~5 MB across 4 projections). " + "Default 24h; the catalog changes slowly. Set 0 to disable caching." + ), + ) + + +# ----------------------------------------------------------------------------- +# Tool Input/Output Schema +# ----------------------------------------------------------------------------- + + +class UMMVisLookupToolInputSchema(InputSchema): + """Input: a single CMR collection concept-id.""" + + collection_concept_id: str = Field( + ..., + pattern=r"^C\d+-[A-Z0-9_]+$", + description="CMR collection concept-id, e.g. 'C1701805619-GES_DISC'.", + examples=["C1701805619-GES_DISC"], + ) + + +class LayerMapping(BaseModel): + """A single Worldview/GIBS layer associated with a CMR collection. + + ``layer_id`` is the resolved GIBS layer identifier — the name a Worldview URL + or WMTS request actually accepts. It is derived from + ``umm.Specification.ProductIdentification.BestAvailableExternalIdentifier`` when + that field is set and valid; otherwise it falls back to ``umm.Name`` with any + trailing processing-version suffix (``_v\\d+_(STD|NRT)``) stripped. Optional + fields carry display, disambiguation, and permalink-default hints; each is + ``None`` when the source record omits it or carries a placeholder value. + """ + + layer_id: str = Field( + ..., + description=( + "Resolved GIBS layer identifier suitable for WorldviewPermalinkTool's " + "LayerSpec.id. See ``layer_id_source`` for which UMM-Vis field was used." + ), + ) + layer_id_source: LayerIdSource = Field( + ..., + description=( + "Which UMM-Vis field produced layer_id: 'best' (BestAvailableExternalIdentifier " + "matched the GIBS catalog), 'name_stripped' (Name with version suffix removed), " + "'name_raw' (Name had no version suffix), 'best_pending_gibs' (Best is set but " + "the layer is not yet in GIBS WMTS — likely a pre-publication record)." + ), + ) + available_in_gibs: bool | None = Field( + None, + description=( + "Whether layer_id appears in the live GIBS WMTS catalog. None when validation " + "was not performed (config.validate_against_gibs=False)." + ), + ) + visualization_concept_id: str = Field( + ..., + description=("UMM-Vis record concept-id (VIS-). Useful for traceability."), + ) + visualization_type: str = Field( + ..., + description="UMM-Vis VisualizationType: 'tiles' (Worldview-renderable) or 'maps'.", + ) + + title: str | None = Field(None, description="umm.Title.") + subtitle: str | None = Field( + None, + description="umm.Subtitle, e.g. 'AIRS / Aqua' — sensor / platform context.", + ) + measurement: str | None = Field( + None, + description="ProductMetadata.Measurement, e.g. 'Carbon Monoxide'.", + ) + daynight: str | None = Field( + None, + description="'Day' / 'Night' / 'Both' — daynight discrimination.", + ) + + spatial_coverage: list[float] | None = Field( + None, + min_length=4, + max_length=4, + description="WGS84 bounding box: [west, south, east, north].", + ) + temporal_start: datetime | None = Field( + None, + description="Layer's TemporalCoverage start.", + ) + temporal_end: datetime | None = Field( + None, + description="Layer's TemporalCoverage end. None means ongoing.", + ) + ongoing: bool | None = Field( + None, + description="Whether the layer is still being updated (ProductMetadata.Ongoing).", + ) + layer_period: str | None = Field( + None, + description="LayerPeriod, e.g. 'Daily' or 'Monthly'.", + ) + + worldview_projections: list[str] | None = Field( + None, + description=( + "Worldview-compatible projections derived from Generation.OutputProjection: " + "any of 'geographic', 'arctic', 'antarctic'." + ), + ) + + colormap_url: str | None = Field( + None, + description="ColorMap XML URL — useful for legend rendering or palette overrides.", + ) + + +class UMMVisLookupToolOutputSchema(OutputSchema): + """Output for the UMM-Vis Lookup Tool.""" + + collection_concept_id: str = Field( + ..., + description="Echo of the input collection concept-id.", + ) + layers: list[LayerMapping] = Field( + ..., + description=("Matched UMM-Vis layers. May be empty when no association exists yet."), + ) + match_path: MatchPath = Field( + ..., + description=( + "Which lookup path produced the layers: 'concept_ids' (canonical " + "`concept-ids=` filter) or 'source_datasets_fallback' (client-side scan " + "of umm.SourceDatasets / umm.RepresentingDatasets)." + ), + ) + + +# ----------------------------------------------------------------------------- +# UMM VisLookup AKD Tool +# ----------------------------------------------------------------------------- + + +@mcp_tool +class UMMVisLookupTool(BaseTool[UMMVisLookupToolInputSchema, UMMVisLookupToolOutputSchema]): + """Find Worldview/GIBS layers associated with a CMR collection. + + Given a CMR collection concept-id (`C-`, e.g. + 'C1701805619-GES_DISC'), returns zero or more GIBS visualization layer(s) + that render data from that collection. The output list is empty when no UMM-Vis + record yet associates the collection with a Worldview layer. + + Each ``layer_id`` is the canonical GIBS layer identifier — the name a + Worldview URL or WMTS request actually accepts — bridging a CMR + ``concept_id`` to a renderable Worldview/GIBS layer. + + Does not fetch data, does not build URLs, and does not validate `layer_id` + against the live GIBS WMTS catalog by default — enable + ``validate_against_gibs`` in the config to populate + ``LayerMapping.available_in_gibs``. + """ + + input_schema = UMMVisLookupToolInputSchema + output_schema = UMMVisLookupToolOutputSchema + config_schema = UMMVisLookupToolConfig + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self._all_records_cache: tuple[float, list[dict[str, Any]]] | None = None + self._gibs_layers_cache: tuple[float, frozenset[str]] | None = None + + async def _fetch( + self, + client: httpx.AsyncClient, + params: dict[str, Any], + ) -> list[dict[str, Any]]: + url = f"{self.config.base_url.rstrip('/')}/visualizations.umm_json" + try: + response = await client.get(url, params=params) + response.raise_for_status() + except httpx.TimeoutException as e: + raise TimeoutError(f"CMR request timed out after {self.config.timeout}s") from e + except httpx.HTTPStatusError as e: + raise RuntimeError(f"CMR returned status {e.response.status_code}: {e.response.text}") from e + except httpx.RequestError as e: + raise RuntimeError(f"Failed to reach CMR: {e}") from e + + data = response.json() + items = data.get("items", []) + if not isinstance(items, list): + raise RuntimeError(f"Unexpected CMR response shape: {data!r}") + return items + + async def _fetch_all_records(self, client: httpx.AsyncClient) -> list[dict[str, Any]]: + """Fetch one page of all UMM-Vis records (Path B), with TTL cache.""" + ttl = self.config.fallback_cache_ttl_seconds + if ttl > 0 and self._all_records_cache is not None: + fetched_at, cached_items = self._all_records_cache + if (time.monotonic() - fetched_at) < ttl: + logger.debug("Path B: using cached all-records ({} items)", len(cached_items)) + return cached_items + + items = await self._fetch(client, {"page_size": self.config.page_size}) + if ttl > 0: + self._all_records_cache = (time.monotonic(), items) + return items + + async def _fetch_gibs_layers(self, client: httpx.AsyncClient) -> frozenset[str]: + """Fetch the merged GIBS WMTS layer catalog across all 4 projections.""" + ttl = self.config.gibs_cache_ttl_seconds + if ttl > 0 and self._gibs_layers_cache is not None: + fetched_at, cached_layers = self._gibs_layers_cache + if (time.monotonic() - fetched_at) < ttl: + return cached_layers + + layers: set[str] = set() + for proj in _GIBS_WMTS_PROJECTIONS: + url = f"https://gibs.earthdata.nasa.gov/wmts/{proj}/best/wmts.cgi" + try: + response = await client.get(url, params={"SERVICE": "WMTS", "request": "GetCapabilities"}) + response.raise_for_status() + except (httpx.RequestError, httpx.HTTPStatusError) as e: + logger.warning("GIBS GetCapabilities fetch failed for {}: {}", proj, e) + continue + layers.update(re.findall(r"([^<]+)", response.text)) + + result = frozenset(layers) + if ttl > 0: + self._gibs_layers_cache = (time.monotonic(), result) + logger.debug("GIBS WMTS catalog loaded: {} unique layer identifiers", len(result)) + return result + + async def _arun(self, params: UMMVisLookupToolInputSchema) -> UMMVisLookupToolOutputSchema: + cid = params.collection_concept_id + + async with httpx.AsyncClient(timeout=self.config.timeout) as client: + path_a_items = await self._fetch( + client, + {"concept-ids": cid, "page_size": self.config.page_size}, + ) + if path_a_items: + logger.debug("Path A hit for {}: {} items", cid, len(path_a_items)) + items = path_a_items + match_path: MatchPath = "concept_ids" + else: + logger.debug("Path A empty for {}; falling back to scan", cid) + all_items = await self._fetch_all_records(client) + items = [item for item in all_items if cid in _extract_source_cids(item)] + match_path = "source_datasets_fallback" + logger.debug( + "Path B for {}: matched {} of {} records", + cid, + len(items), + len(all_items), + ) + + gibs_layers: frozenset[str] | None = None + if self.config.validate_against_gibs: + gibs_layers = await self._fetch_gibs_layers(client) + + layers: list[LayerMapping] = [] + for item in items: + mapping = _to_layer_mapping(item, gibs_layers=gibs_layers) + if mapping is not None: + layers.append(mapping) + + deduped = _dedupe_layers(layers) + + return UMMVisLookupToolOutputSchema( + collection_concept_id=cid, + layers=deduped, + match_path=match_path, + ) + + +# ----------------------------------------------------------------------------- +# Utils +# ----------------------------------------------------------------------------- + + +def _as_dict(value: Any) -> dict[str, Any]: + """Return ``value`` if it is a dict, else an empty dict. + + UMM-Vis records sometimes substitute strings for nested objects. Treat those + as missing rather than crashing on ``.get(...)``. + """ + return value if isinstance(value, dict) else {} + + +def _placeholder_value(text: Any) -> bool: + if not isinstance(text, str): + return False + upper = text.strip().upper() + return "PLACEHOLDER" in upper or upper == "YET_TO_SUPPLY" + + +def _clean_str(value: Any) -> str | None: + if not isinstance(value, str): + return None + if not value.strip() or _placeholder_value(value): + return None + return value + + +def _coerce_datetime(value: Any) -> datetime | None: + """Best-effort coerce an ISO-8601 string to a datetime.""" + if not isinstance(value, str): + return None + candidate = value.strip() + if not candidate: + return None + candidate = candidate.replace("Z", "+00:00") + try: + return datetime.fromisoformat(candidate) + except ValueError: + logger.debug("Could not parse temporal value: {!r}", value) + return None + + +def _coerce_bbox(value: Any) -> list[float] | None: + """Coerce a UMM-Vis WGS84SpatialCoverage entry to ``[west, south, east, north]``. + + Accepts either a 4-element list/tuple ``[W, S, E, N]`` or a dict with + ``MinLongitude`` / ``MinLatitude`` / ``MaxLongitude`` / ``MaxLatitude`` keys. + """ + if isinstance(value, (list, tuple)) and len(value) == 4: + try: + return [float(v) for v in value] + except (TypeError, ValueError): + return None + if isinstance(value, dict): + try: + return [ + float(value["MinLongitude"]), + float(value["MinLatitude"]), + float(value["MaxLongitude"]), + float(value["MaxLatitude"]), + ] + except (KeyError, TypeError, ValueError): + return None + return None + + +def _map_projections(value: Any) -> list[str] | None: + """Map UMM-Vis OutputProjection (EPSG strings) to Worldview projection names.""" + if isinstance(value, str): + mapped = _EPSG_TO_WORLDVIEW.get(value.strip()) + return [mapped] if mapped else None + if isinstance(value, list): + mapped = [ + _EPSG_TO_WORLDVIEW[p.strip()] for p in value if isinstance(p, str) and p.strip() in _EPSG_TO_WORLDVIEW + ] + return mapped or None + return None + + +def _extract_source_cids(item: dict[str, Any]) -> set[str]: + """Pull all C-ids from a record's SourceDatasets and RepresentingDatasets. + + UMM-Vis v1.1.0 nests these under ``umm.Specification.ProductMetadata`` and + populates them as a plain list of C-id strings. Older fixture shapes used + a list of ``{Value: ...}`` dicts, and some records hoist the lists directly + onto ``umm``; both variants are tolerated. + """ + umm = _as_dict(item.get("umm")) + product_metadata = _as_dict(_as_dict(umm.get("Specification")).get("ProductMetadata")) + cids: set[str] = set() + for source in (product_metadata, umm): + for field_name in ("SourceDatasets", "RepresentingDatasets"): + entries = source.get(field_name) + if not isinstance(entries, list): + continue + for entry in entries: + if isinstance(entry, str) and entry: + cids.add(entry) + elif isinstance(entry, dict): + value = entry.get("Value") + if isinstance(value, str) and value: + cids.add(value) + return cids + + +def _looks_like_layer_id(value: str | None) -> bool: + """Cheap sanity check to reject obvious junk in BestAvailableExternalIdentifier.""" + if not value or len(value) < 4: + return False + if value.upper() in _KNOWN_JUNK_LAYER_IDS: + return False + return True + + +def _resolve_layer_id( + umm: dict[str, Any], + *, + gibs_layers: frozenset[str] | None = None, +) -> tuple[str | None, LayerIdSource]: + """Resolve a UMM-Vis record to its public GIBS layer identifier. + + Resolution order: + 1. ``Specification.ProductIdentification.BestAvailableExternalIdentifier`` — + the canonical externally-published name. When ``gibs_layers`` is given, + we also confirm the value is in the live catalog before returning it. + 2. ``umm.Name`` with the trailing ``_v\\d+_(STD|NRT)`` processing-version + suffix stripped. Rescues records where Best is missing or junk. + 3. If Best is set but isn't in the GIBS catalog (and the suffix-strip + didn't help either), surface it as ``best_pending_gibs`` — these are + legitimate records describing layers not yet published to GIBS. + + Validated against 1109 prod viz records: ~98.7% resolve through paths 1 or 2; + the remainder fall into ``best_pending_gibs`` (e.g. the AMSRU2 L3 series). + """ + pid = _as_dict(_as_dict(umm.get("Specification")).get("ProductIdentification")) + best_raw = pid.get("BestAvailableExternalIdentifier") + best = best_raw if isinstance(best_raw, str) and _looks_like_layer_id(best_raw) else None + name = _clean_str(umm.get("Name")) + + # 1. Best is canonical — return it if it's plausibly a layer name. + # When we have a GIBS catalog, prefer Best only if it's actually in the + # catalog; otherwise fall through to the suffix-strip rescue. + if best and (gibs_layers is None or best in gibs_layers): + return best, "best" + + # 2. Strip processing-version suffix from Name. + if name: + stripped = _VERSION_SUFFIX_RE.sub("", name) + if gibs_layers is None: + return stripped, ("name_stripped" if stripped != name else "name_raw") + if stripped in gibs_layers: + return stripped, ("name_stripped" if stripped != name else "name_raw") + + # 3. Best was set but didn't validate; surface it as pending-GIBS rather + # than dropping the record. Worldview will silently skip unknown layers, + # but downstream code can use available_in_gibs to warn the user. + if best: + return best, "best_pending_gibs" + + # 4. Last resort: return raw Name (already None-guarded above). + if name: + return name, "name_raw" + + return None, "unresolved" + + +def _to_layer_mapping( + item: dict[str, Any], + *, + gibs_layers: frozenset[str] | None = None, +) -> LayerMapping | None: + """Normalize a UMM-Vis item into a LayerMapping. Returns None when the + record lacks the minimum required identity fields.""" + meta = _as_dict(item.get("meta")) + umm = _as_dict(item.get("umm")) + + vis_concept_id = meta.get("concept-id") + if not isinstance(vis_concept_id, str) or not vis_concept_id: + return None + + layer_id, source = _resolve_layer_id(umm, gibs_layers=gibs_layers) + if layer_id is None: + return None + + spec = _as_dict(umm.get("Specification")) + product_id = _as_dict(spec.get("ProductIdentification")) + product_metadata = _as_dict(spec.get("ProductMetadata")) + generation = _as_dict(umm.get("Generation")) + + temporal = _as_dict(product_metadata.get("TemporalCoverage")) + ongoing_raw = product_metadata.get("Ongoing") + + title = _clean_str(umm.get("Title")) or _clean_str(product_id.get("WorldviewTitle")) + subtitle = _clean_str(umm.get("Subtitle")) or _clean_str(product_id.get("WorldviewSubtitle")) + + available_in_gibs = (layer_id in gibs_layers) if gibs_layers is not None else None + + return LayerMapping( + layer_id=layer_id, + layer_id_source=source, + available_in_gibs=available_in_gibs, + visualization_concept_id=vis_concept_id, + visualization_type=str(umm.get("VisualizationType") or ""), + title=title, + subtitle=subtitle, + measurement=_clean_str(product_metadata.get("Measurement")), + daynight=_clean_str(product_metadata.get("Daynight")), + spatial_coverage=_coerce_bbox(product_metadata.get("WGS84SpatialCoverage")), + temporal_start=_coerce_datetime(temporal.get("StartDate")), + temporal_end=_coerce_datetime(temporal.get("EndDate")), + ongoing=ongoing_raw if isinstance(ongoing_raw, bool) else None, + layer_period=_clean_str(product_metadata.get("LayerPeriod")), + worldview_projections=_map_projections(generation.get("OutputProjection")), + colormap_url=_clean_str(product_metadata.get("ColorMap")), + ) + + +# Source ranking for de-duplication: prefer mappings derived from the canonical +# Best field over name-stripped variants over pending-GIBS fallbacks. +_SOURCE_RANK: dict[LayerIdSource, int] = { + "best": 0, + "name_stripped": 1, + "name_raw": 2, + "best_pending_gibs": 3, + "unresolved": 4, +} + + +def _dedupe_layers(layers: list[LayerMapping]) -> list[LayerMapping]: + """Collapse duplicate (layer_id, visualization_type) pairs, keeping the + highest-quality mapping. CMR commonly returns multiple revisions of the + same logical layer (NRT vs STD, test-provider duplicates); without this + a downstream agent picking ``layers[0]`` can land on a junk record.""" + best_by_key: dict[tuple[str, str], LayerMapping] = {} + for layer in layers: + key = (layer.layer_id, layer.visualization_type) + existing = best_by_key.get(key) + if existing is None: + best_by_key[key] = layer + continue + # Prefer better source; on tie, prefer the one with more populated optional fields. + if _SOURCE_RANK[layer.layer_id_source] < _SOURCE_RANK[existing.layer_id_source]: + best_by_key[key] = layer + elif _SOURCE_RANK[layer.layer_id_source] == _SOURCE_RANK[existing.layer_id_source]: + if _populated_field_count(layer) > _populated_field_count(existing): + best_by_key[key] = layer + return list(best_by_key.values()) + + +def _populated_field_count(layer: LayerMapping) -> int: + """Count optional fields set to non-None on a LayerMapping (tiebreaker).""" + optional_fields = ( + "title", + "subtitle", + "measurement", + "daynight", + "spatial_coverage", + "temporal_start", + "temporal_end", + "ongoing", + "layer_period", + "worldview_projections", + "colormap_url", + ) + return sum(1 for f in optional_fields if getattr(layer, f) is not None) + + +if __name__ == "__main__": + import asyncio + + # Two probes that exercise both paths against live UAT: + # - C9876543210-ABCDAAC: the universal placeholder C-id → Path A returns ~1100 records. + # - C3550187110-ESDIS: a real C-id present in live SourceDatasets → Path B fires. + SAMPLE_CIDS: list[str] = [ + "C9876543210-ABCDAAC", + "C3550187110-ESDIS", + ] + + async def _smoke() -> None: + tool = UMMVisLookupTool() + for cid in SAMPLE_CIDS: + logger.info("Looking up layers for {}", cid) + out = await tool.arun(UMMVisLookupToolInputSchema(collection_concept_id=cid)) + logger.info( + "{} -> match_path={!s}, layers={}", + cid, + out.match_path, + len(out.layers), + ) + for layer in out.layers[:3]: + logger.info( + " - {} ({}): title={!r} subtitle={!r} bbox={} projections={}", + layer.layer_id, + layer.visualization_concept_id, + layer.title, + layer.subtitle, + layer.spatial_coverage, + layer.worldview_projections, + ) + if len(out.layers) > 3: + logger.info(" ... and {} more", len(out.layers) - 3) + + asyncio.run(_smoke()) diff --git a/pyproject.toml b/pyproject.toml index 5acf35e..de76fe5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,7 @@ classifiers = [ ] requires-python = ">=3.12" dependencies = [ - "akd @ git+https://github.com/NASA-IMPACT/akd-core.git@develop", + "akd @ git+https://${GITHUB_TOKEN}@github.com/NASA-IMPACT/akd-core.git@develop", "fastmcp>=2.0.0,<3.2.4", "griffe>=1.0.0,<2", "openai-agents>=0.6.7", diff --git a/tests/tools/worldview/__init__.py b/tests/tools/worldview/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/tools/worldview/cmr_umm_vis/__init__.py b/tests/tools/worldview/cmr_umm_vis/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/tools/worldview/cmr_umm_vis/test_ummvis_lookup.py b/tests/tools/worldview/cmr_umm_vis/test_ummvis_lookup.py new file mode 100644 index 0000000..a43765f --- /dev/null +++ b/tests/tools/worldview/cmr_umm_vis/test_ummvis_lookup.py @@ -0,0 +1,605 @@ +"""Tests for UMMVisLookupTool.""" + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any + +import httpx +import pytest +from pydantic import ValidationError + +from akd_ext.tools.worldview.cmr_umm_vis import ( + LayerMapping, + UMMVisLookupTool, + UMMVisLookupToolConfig, + UMMVisLookupToolInputSchema, + UMMVisLookupToolOutputSchema, +) +from akd_ext.tools.worldview.cmr_umm_vis.ummvis_lookup import ( + _coerce_bbox, + _coerce_datetime, + _dedupe_layers, + _extract_source_cids, + _map_projections, + _resolve_layer_id, + _to_layer_mapping, +) + + +# ----------------------------------------------------------------------------- +# Fixtures +# ----------------------------------------------------------------------------- + + +def _airs_record( + *, + name: str = "AIRS_L2_Carbon_Monoxide_500hPa_Volume_Mixing_Ratio_Day_v7_STD", + vis_concept_id: str = "VIS1277379058-CMR_TEST", + source_cid: str = "C3550187110-ESDIS", +) -> dict[str, Any]: + """Build a UMM-Vis record matching the live UAT shape we observed.""" + return { + "meta": {"concept-id": vis_concept_id, "provider-id": "CMR_TEST"}, + "umm": { + "Name": name, + "Title": "Carbon Monoxide (L2, 500 hPa, Day)", + "Subtitle": "AIRS / Aqua", + "Description": "YET_TO_SUPPLY", + "VisualizationType": "tiles", + "ConceptIds": [ + { + "Type": "STD", + "Value": "C9876543210-ABCDAAC", + "ShortName": "SHORTNAME PLACEHOLDER", + "Title": "TITLE PLACEHOLDER", + "Version": "1.0", + "DataCenter": "DATACENTER PLACEHOLDER", + } + ], + "Specification": { + "ProductIdentification": { + "InternalIdentifier": name, + "GIBSTitle": "Carbon Monoxide (Daytime, 500 hPa, L2)", + "WorldviewTitle": "Carbon Monoxide", + "WorldviewSubtitle": "Daytime / 500 hPa, L2 / AIRS / Aqua", + }, + "ProductMetadata": { + "InternalIdentifier": name, + "SourceDatasets": [source_cid], + "RepresentingDatasets": [source_cid], + "Measurement": "Carbon Monoxide", + "ParameterUnits": ["ppbv"], + "Daynight": "Day", + "OrbitDirection": "Ascending", + "Ongoing": True, + "LayerPeriod": "Daily", + "TemporalCoverage": { + "StartDate": "2002-08-30T00:00:00Z", + "EndDate": "2024-12-31T23:59:59Z", + }, + "WGS84SpatialCoverage": [-180.0, -90.0, 180.0, 90.0], + "ColorMap": ( + "https://gibs.earthdata.nasa.gov/colormaps/v1.3/AIRS_Carbon_Monoxide_Volume_Mixing_Ratio.xml" + ), + }, + }, + "Generation": { + "SourceProjection": "EPSG:4326", + "OutputProjection": "EPSG:4326", + "OutputResolution": "2km", + "OutputFormat": "PPNG", + }, + "MetadataSpecification": { + "URL": "https://cdn.earthdata.nasa.gov/umm/visualization/v1.1.0", + "Name": "UMM-Vis", + "Version": "1.1.0", + }, + }, + } + + +# ----------------------------------------------------------------------------- +# Schema-level / pure-function unit tests +# ----------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestInputSchema: + def test_accepts_valid_concept_id(self) -> None: + schema = UMMVisLookupToolInputSchema(collection_concept_id="C1701805619-GES_DISC") + assert schema.collection_concept_id == "C1701805619-GES_DISC" + + @pytest.mark.parametrize( + "bad_value", + [ + "", + "not-a-cid", + "G1701805619-GES_DISC", # G prefix is granule, not collection + "C1701805619", # missing provider + "C1701805619-", # empty provider + "1701805619-GES_DISC", # missing C prefix + "C1701805619-ges_disc", # lowercase provider + ], + ) + def test_rejects_invalid_concept_id(self, bad_value: str) -> None: + with pytest.raises(ValidationError): + UMMVisLookupToolInputSchema(collection_concept_id=bad_value) + + +@pytest.mark.unit +class TestHelpers: + def test_extract_source_cids_from_live_string_shape(self) -> None: + """UMM-Vis v1.1.0 in UAT uses plain-string entries (the live shape).""" + item = { + "umm": { + "Specification": { + "ProductMetadata": { + "SourceDatasets": ["C1-A", "C2-A"], + "RepresentingDatasets": ["C2-A", "C3-B"], + } + } + } + } + assert _extract_source_cids(item) == {"C1-A", "C2-A", "C3-B"} + + def test_extract_source_cids_from_dict_shape(self) -> None: + """Tolerate the older ``[{Value: ...}]`` dict shape too.""" + item = { + "umm": { + "Specification": { + "ProductMetadata": { + "SourceDatasets": [{"Value": "C1-A"}, {"Value": "C2-A"}], + "RepresentingDatasets": [{"Value": "C3-B"}], + } + } + } + } + assert _extract_source_cids(item) == {"C1-A", "C2-A", "C3-B"} + + def test_extract_source_cids_handles_missing_fields(self) -> None: + assert _extract_source_cids({}) == set() + assert _extract_source_cids({"umm": {}}) == set() + assert _extract_source_cids({"umm": {"Specification": {"ProductMetadata": {"SourceDatasets": None}}}}) == set() + + def test_extract_source_cids_skips_non_string_values(self) -> None: + item = { + "umm": { + "Specification": { + "ProductMetadata": { + "SourceDatasets": [ + "C1-A", + None, + 42, + {"Value": "C2-A"}, + {"NotValue": "C3-A"}, + ], + } + } + } + } + assert _extract_source_cids(item) == {"C1-A", "C2-A"} + + def test_coerce_datetime_iso_with_z(self) -> None: + result = _coerce_datetime("2002-08-30T00:00:00Z") + assert result == datetime(2002, 8, 30, 0, 0, 0, tzinfo=timezone.utc) + + @pytest.mark.parametrize("bad", [None, "", " ", "not-a-date", 42]) + def test_coerce_datetime_returns_none_on_garbage(self, bad: Any) -> None: + assert _coerce_datetime(bad) is None + + def test_coerce_bbox_from_list(self) -> None: + assert _coerce_bbox([-180, -90, 180, 90]) == [-180.0, -90.0, 180.0, 90.0] + + def test_coerce_bbox_from_dict(self) -> None: + assert _coerce_bbox( + { + "MinLongitude": -180, + "MinLatitude": -90, + "MaxLongitude": 180, + "MaxLatitude": 90, + } + ) == [-180.0, -90.0, 180.0, 90.0] + + @pytest.mark.parametrize( + "bad", + [None, "", [], [1, 2, 3], {"MinLongitude": "not-a-number"}], + ) + def test_coerce_bbox_returns_none_on_garbage(self, bad: Any) -> None: + assert _coerce_bbox(bad) is None + + def test_map_projections_string(self) -> None: + assert _map_projections("EPSG:4326") == ["geographic"] + assert _map_projections("EPSG:3413") == ["arctic"] + assert _map_projections("EPSG:3031") == ["antarctic"] + + def test_map_projections_list(self) -> None: + assert _map_projections(["EPSG:3413", "EPSG:3031"]) == ["arctic", "antarctic"] + + def test_map_projections_unknown_returns_none(self) -> None: + assert _map_projections("EPSG:9999") is None + assert _map_projections([]) is None + assert _map_projections(None) is None + + +@pytest.mark.unit +class TestNormalizer: + def test_full_record_round_trip(self) -> None: + mapping = _to_layer_mapping(_airs_record()) + assert mapping is not None + # Fixture has no BestAvailableExternalIdentifier, so resolution falls back + # to umm.Name with the trailing "_v7_STD" suffix stripped — that matches + # the public GIBS layer identifier. + assert mapping.layer_id == "AIRS_L2_Carbon_Monoxide_500hPa_Volume_Mixing_Ratio_Day" + assert mapping.layer_id_source == "name_stripped" + assert mapping.available_in_gibs is None # validation not requested + assert mapping.visualization_concept_id == "VIS1277379058-CMR_TEST" + assert mapping.visualization_type == "tiles" + assert mapping.title == "Carbon Monoxide (L2, 500 hPa, Day)" + assert mapping.subtitle == "AIRS / Aqua" + assert mapping.measurement == "Carbon Monoxide" + assert mapping.daynight == "Day" + assert mapping.spatial_coverage == [-180.0, -90.0, 180.0, 90.0] + assert mapping.temporal_start == datetime(2002, 8, 30, tzinfo=timezone.utc) + assert mapping.temporal_end == datetime(2024, 12, 31, 23, 59, 59, tzinfo=timezone.utc) + assert mapping.ongoing is True + assert mapping.layer_period == "Daily" + assert mapping.worldview_projections == ["geographic"] + assert mapping.colormap_url and "AIRS_Carbon_Monoxide" in mapping.colormap_url + + def test_placeholder_strings_become_none(self) -> None: + record = _airs_record() + record["umm"]["Title"] = "TITLE PLACEHOLDER" + record["umm"]["Subtitle"] = "YET_TO_SUPPLY" + # Also clear the WorldviewTitle/Subtitle fallback so we can verify + # placeholder handling all the way through. + record["umm"]["Specification"]["ProductIdentification"]["WorldviewTitle"] = "TITLE PLACEHOLDER" + record["umm"]["Specification"]["ProductIdentification"]["WorldviewSubtitle"] = "YET_TO_SUPPLY" + mapping = _to_layer_mapping(record) + assert mapping is not None + assert mapping.title is None + assert mapping.subtitle is None + + def test_title_falls_back_to_worldview_title(self) -> None: + """When ``umm.Title`` is missing, the Worldview title is used as fallback.""" + record = _airs_record() + del record["umm"]["Title"] + del record["umm"]["Subtitle"] + mapping = _to_layer_mapping(record) + assert mapping is not None + assert mapping.title == "Carbon Monoxide" + assert mapping.subtitle == "Daytime / 500 hPa, L2 / AIRS / Aqua" + + def test_returns_none_when_layer_id_missing(self) -> None: + record = _airs_record() + record["umm"].pop("Name") + assert _to_layer_mapping(record) is None + + def test_returns_none_when_concept_id_missing(self) -> None: + record = _airs_record() + record["meta"].pop("concept-id") + assert _to_layer_mapping(record) is None + + def test_handles_missing_optional_blocks(self) -> None: + record = { + "meta": {"concept-id": "VIS1-X"}, + "umm": {"Name": "L1", "VisualizationType": "tiles"}, + } + mapping = _to_layer_mapping(record) + assert mapping is not None + assert mapping.layer_id == "L1" + assert mapping.layer_id_source == "name_raw" + assert mapping.title is None + assert mapping.spatial_coverage is None + assert mapping.temporal_start is None + assert mapping.worldview_projections is None + + def test_best_field_takes_priority_over_name(self) -> None: + record = _airs_record() + record["umm"]["Specification"]["ProductIdentification"]["BestAvailableExternalIdentifier"] = ( + "AIRS_L2_Carbon_Monoxide_500hPa_Volume_Mixing_Ratio_Day" + ) + mapping = _to_layer_mapping(record) + assert mapping is not None + assert mapping.layer_id == "AIRS_L2_Carbon_Monoxide_500hPa_Volume_Mixing_Ratio_Day" + assert mapping.layer_id_source == "best" + + def test_junk_best_falls_back_to_name_strip(self) -> None: + """``DUJUAN`` and similar test pollution must not leak through as a layer id.""" + record = _airs_record() + record["umm"]["Specification"]["ProductIdentification"]["BestAvailableExternalIdentifier"] = "DUJUAN" + mapping = _to_layer_mapping(record) + assert mapping is not None + assert mapping.layer_id == "AIRS_L2_Carbon_Monoxide_500hPa_Volume_Mixing_Ratio_Day" + assert mapping.layer_id_source == "name_stripped" + + def test_gibs_validation_tags_available_in_gibs(self) -> None: + record = _airs_record() + record["umm"]["Specification"]["ProductIdentification"]["BestAvailableExternalIdentifier"] = ( + "AIRS_L2_Carbon_Monoxide_500hPa_Volume_Mixing_Ratio_Day" + ) + gibs = frozenset({"AIRS_L2_Carbon_Monoxide_500hPa_Volume_Mixing_Ratio_Day"}) + mapping = _to_layer_mapping(record, gibs_layers=gibs) + assert mapping is not None + assert mapping.available_in_gibs is True + + def test_gibs_validation_marks_pending_when_best_not_in_catalog(self) -> None: + """Layers awaiting GIBS publication should surface as best_pending_gibs, not be dropped.""" + record = _airs_record() + # Clear Name so the strip-fallback can't rescue it. + record["umm"]["Name"] = "AMSRU2_L3_Cloud_Liquid_Water_Daily" + record["umm"]["Specification"]["ProductIdentification"]["BestAvailableExternalIdentifier"] = ( + "AMSRU2_L3_Cloud_Liquid_Water_Daily" + ) + gibs = frozenset({"some_other_layer"}) + mapping = _to_layer_mapping(record, gibs_layers=gibs) + assert mapping is not None + assert mapping.layer_id == "AMSRU2_L3_Cloud_Liquid_Water_Daily" + assert mapping.layer_id_source == "best_pending_gibs" + assert mapping.available_in_gibs is False + + +@pytest.mark.unit +class TestResolveLayerId: + def test_prefers_best_when_present(self) -> None: + umm = { + "Name": "MODIS_Aqua_NDVI_v1_STD", + "Specification": {"ProductIdentification": {"BestAvailableExternalIdentifier": "MODIS_Aqua_NDVI"}}, + } + assert _resolve_layer_id(umm) == ("MODIS_Aqua_NDVI", "best") + + @pytest.mark.parametrize("junk", ["DUJUAN", "test", "PLACEHOLDER", "YET_TO_SUPPLY", "", "abc"]) + def test_rejects_junk_best(self, junk: str) -> None: + umm = { + "Name": "MODIS_Aqua_NDVI_v1_STD", + "Specification": {"ProductIdentification": {"BestAvailableExternalIdentifier": junk}}, + } + layer_id, source = _resolve_layer_id(umm) + assert layer_id == "MODIS_Aqua_NDVI" + assert source == "name_stripped" + + @pytest.mark.parametrize( + "name,expected,source", + [ + ("MODIS_Aqua_NDVI_v1_STD", "MODIS_Aqua_NDVI", "name_stripped"), + ("VIIRS_AOT_v12_NRT", "VIIRS_AOT", "name_stripped"), + ( + "OPERA_L3_Dynamic_Surface_Water_Extent-HLS_v1_STD", + "OPERA_L3_Dynamic_Surface_Water_Extent-HLS", + "name_stripped", + ), + ("MODIS_Aqua_NDVI", "MODIS_Aqua_NDVI", "name_raw"), + ("MODIS_Aqua_NDVI_8Day", "MODIS_Aqua_NDVI_8Day", "name_raw"), + ], + ) + def test_strips_version_suffix_from_name(self, name: str, expected: str, source: str) -> None: + umm = {"Name": name} + assert _resolve_layer_id(umm) == (expected, source) + + def test_returns_none_when_both_missing(self) -> None: + assert _resolve_layer_id({}) == (None, "unresolved") + + def test_best_pending_gibs_when_strip_does_not_help(self) -> None: + """Best is set, valid-looking, but not in the GIBS catalog and Name doesn't help.""" + umm = { + "Name": "AMSRU2_L3_Ocean_Wind_Speed_Daily_vV01_STD", # vV01 not matched by suffix regex + "Specification": { + "ProductIdentification": {"BestAvailableExternalIdentifier": "AMSRU2_L3_Ocean_Wind_Speed_Daily"} + }, + } + layer_id, source = _resolve_layer_id(umm, gibs_layers=frozenset({"unrelated"})) + assert layer_id == "AMSRU2_L3_Ocean_Wind_Speed_Daily" + assert source == "best_pending_gibs" + + +@pytest.mark.unit +class TestDedupe: + def _layer( + self, + layer_id: str, + source: str = "best", + vis_concept_id: str = "VIS1-X", + title: str | None = None, + ) -> LayerMapping: + return LayerMapping( + layer_id=layer_id, + layer_id_source=source, # type: ignore[arg-type] + visualization_concept_id=vis_concept_id, + visualization_type="tiles", + title=title, + ) + + def test_collapses_by_layer_id_and_visualization_type(self) -> None: + layers = [ + self._layer("FOO", source="best", vis_concept_id="VIS1-X"), + self._layer("FOO", source="best", vis_concept_id="VIS2-X"), + self._layer("BAR", source="best", vis_concept_id="VIS3-X"), + ] + deduped = _dedupe_layers(layers) + assert sorted(layer.layer_id for layer in deduped) == ["BAR", "FOO"] + + def test_prefers_higher_quality_source(self) -> None: + """A canonical 'best' mapping should win over a 'best_pending_gibs' duplicate.""" + layers = [ + self._layer("FOO", source="best_pending_gibs", vis_concept_id="VIS1-X"), + self._layer("FOO", source="best", vis_concept_id="VIS2-X"), + ] + deduped = _dedupe_layers(layers) + assert len(deduped) == 1 + assert deduped[0].visualization_concept_id == "VIS2-X" + assert deduped[0].layer_id_source == "best" + + def test_tiebreaks_by_populated_fields(self) -> None: + layers = [ + self._layer("FOO", source="best", vis_concept_id="VIS1-X", title=None), + self._layer("FOO", source="best", vis_concept_id="VIS2-X", title="Something"), + ] + deduped = _dedupe_layers(layers) + assert len(deduped) == 1 + assert deduped[0].visualization_concept_id == "VIS2-X" + + +# ----------------------------------------------------------------------------- +# Two-path behavior tests (httpx stubbed via MockTransport) +# ----------------------------------------------------------------------------- + + +def _make_handler( + *, + path_a_items: list[dict[str, Any]], + all_items: list[dict[str, Any]], + call_log: list[str], +) -> Any: + """Build a MockTransport handler that branches on the `concept-ids` query param.""" + + def handler(request: httpx.Request) -> httpx.Response: + params = dict(request.url.params) + if "concept-ids" in params: + call_log.append("path_a") + return httpx.Response(200, json={"hits": len(path_a_items), "items": path_a_items}) + call_log.append("path_b") + return httpx.Response(200, json={"hits": len(all_items), "items": all_items}) + + return handler + + +def _build_tool_with_transport(transport: httpx.MockTransport) -> UMMVisLookupTool: + """Tool wired so its httpx.AsyncClient uses the given MockTransport.""" + + tool = UMMVisLookupTool(config=UMMVisLookupToolConfig(fallback_cache_ttl_seconds=0.0)) + + real_async_client = httpx.AsyncClient + + def factory(*args: Any, **kwargs: Any) -> httpx.AsyncClient: + kwargs["transport"] = transport + return real_async_client(*args, **kwargs) + + # Monkeypatch only on this tool's reference. We can't mutate httpx globally + # without leaking into other tests. + import akd_ext.tools.worldview.cmr_umm_vis.ummvis_lookup as module + + module.httpx.AsyncClient = factory # type: ignore[assignment] + tool._restore_httpx = lambda: setattr(module.httpx, "AsyncClient", real_async_client) # type: ignore[attr-defined] + return tool + + +@pytest.mark.unit +class TestTwoPath: + @pytest.fixture(autouse=True) + def _restore_httpx(self): + # Capture the original before each test, restore after. + import akd_ext.tools.worldview.cmr_umm_vis.ummvis_lookup as module + + original = module.httpx.AsyncClient + yield + module.httpx.AsyncClient = original + + async def test_path_a_hit_does_not_invoke_path_b(self) -> None: + cid = "C1-PROV" + record = _airs_record(source_cid=cid) + call_log: list[str] = [] + transport = httpx.MockTransport(_make_handler(path_a_items=[record], all_items=[], call_log=call_log)) + tool = _build_tool_with_transport(transport) + + result = await tool.arun(UMMVisLookupToolInputSchema(collection_concept_id=cid)) + + assert isinstance(result, UMMVisLookupToolOutputSchema) + assert result.match_path == "concept_ids" + assert result.collection_concept_id == cid + assert len(result.layers) == 1 + assert call_log == ["path_a"] + + async def test_path_b_fires_when_path_a_empty(self) -> None: + cid = "C1701805619-GES_DISC" + match_record = _airs_record(name="LAYER_THAT_MATCHES", source_cid=cid) + nonmatch_record = _airs_record( + name="LAYER_THAT_DOES_NOT_MATCH", + vis_concept_id="VIS9999-CMR_TEST", + source_cid="C9999-OTHER", + ) + call_log: list[str] = [] + transport = httpx.MockTransport( + _make_handler( + path_a_items=[], + all_items=[match_record, nonmatch_record], + call_log=call_log, + ) + ) + tool = _build_tool_with_transport(transport) + + result = await tool.arun(UMMVisLookupToolInputSchema(collection_concept_id=cid)) + + assert result.match_path == "source_datasets_fallback" + assert len(result.layers) == 1 + assert result.layers[0].layer_id == "LAYER_THAT_MATCHES" + assert call_log == ["path_a", "path_b"] + + async def test_path_b_returns_empty_when_no_match(self) -> None: + cid = "C1701805619-GES_DISC" + unrelated = _airs_record(source_cid="C9999-OTHER") + call_log: list[str] = [] + transport = httpx.MockTransport(_make_handler(path_a_items=[], all_items=[unrelated], call_log=call_log)) + tool = _build_tool_with_transport(transport) + + result = await tool.arun(UMMVisLookupToolInputSchema(collection_concept_id=cid)) + + assert result.match_path == "source_datasets_fallback" + assert result.layers == [] + assert call_log == ["path_a", "path_b"] + + async def test_path_b_cache_avoids_second_fetch(self) -> None: + cid = "C1701805619-GES_DISC" + match_record = _airs_record(source_cid=cid) + call_log: list[str] = [] + transport = httpx.MockTransport( + _make_handler( + path_a_items=[], + all_items=[match_record], + call_log=call_log, + ) + ) + # Override the default-zero TTL so the cache is active. + tool = _build_tool_with_transport(transport) + tool.config.fallback_cache_ttl_seconds = 60.0 + + await tool.arun(UMMVisLookupToolInputSchema(collection_concept_id=cid)) + await tool.arun(UMMVisLookupToolInputSchema(collection_concept_id=cid)) + + # Two Path A calls (each invocation hits the server first), one Path B (cached). + assert call_log == ["path_a", "path_b", "path_a"] + + +# ----------------------------------------------------------------------------- +# Live integration tests against UAT +# ----------------------------------------------------------------------------- + + +@pytest.mark.integration +class TestLiveUAT: + async def test_path_a_hit_with_placeholder_cid(self) -> None: + """The placeholder C-id is universally indexed in UAT today.""" + tool = UMMVisLookupTool() + result = await tool.arun(UMMVisLookupToolInputSchema(collection_concept_id="C9876543210-ABCDAAC")) + assert result.match_path == "concept_ids" + assert len(result.layers) > 0 + for layer in result.layers[:3]: + assert isinstance(layer, LayerMapping) + assert layer.layer_id + assert layer.visualization_concept_id.startswith("VIS") + + async def test_path_b_hit_with_real_cid(self) -> None: + """A real C-id (verified in UAT SourceDatasets) goes through the fallback.""" + tool = UMMVisLookupTool() + result = await tool.arun(UMMVisLookupToolInputSchema(collection_concept_id="C3550187110-ESDIS")) + assert result.match_path == "source_datasets_fallback" + assert len(result.layers) >= 1 + layer_ids = {layer.layer_id for layer in result.layers} + # The Croplands layer is the canonical one associated with C3550187110-ESDIS. + assert any("Croplands" in lid for lid in layer_ids), layer_ids + + async def test_unrelated_cid_returns_empty(self) -> None: + tool = UMMVisLookupTool() + result = await tool.arun(UMMVisLookupToolInputSchema(collection_concept_id="C0000000000-NOSUCHPROV")) + # Path A returns empty; Path B scan finds no SourceDatasets hits. + assert result.match_path == "source_datasets_fallback" + assert result.layers == []