diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index b54bdb44..26e1f08f 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -11,10 +11,9 @@ permissions: contents: read jobs: - run-tests: + unit-tests: runs-on: ubuntu-latest - # Set shared env vars ONCE here for all steps env: MODE: development POSTGRES_HOST: localhost @@ -56,12 +55,21 @@ jobs: uses: astral-sh/setup-uv@v5 with: enable-cache: true + cache-dependency-glob: uv.lock - name: Set up Python + id: setup-python uses: actions/setup-python@v6.2.0 with: python-version-file: "pyproject.toml" + - name: Cache project virtualenv + id: cache-venv + uses: actions/cache@v4 + with: + path: .venv + key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('uv.lock') }} + - name: Install the project run: uv sync --locked --all-extras --dev @@ -76,13 +84,74 @@ jobs: - name: Run tests run: uv run pytest -vv --durations=20 --cov --cov-report=xml --junitxml=junit.xml --ignore=tests/transfers - - name: Run BDD tests - run: | - uv run behave tests/features --tags="@backend and @production and not @skip" --no-capture - - name: Upload results to Codecov uses: codecov/codecov-action@v5 with: report_type: test_results token: ${{ secrets.CODECOV_TOKEN }} + bdd-tests: + runs-on: ubuntu-latest + + env: + MODE: development + POSTGRES_HOST: localhost + POSTGRES_PORT: 5432 + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_DB: ocotilloapi_test + DB_DRIVER: postgres + BASE_URL: http://localhost:8000 + SESSION_SECRET_KEY: supersecretkeyforunittests + AUTHENTIK_DISABLE_AUTHENTICATION: 1 + + services: + postgis: + image: postgis/postgis:17-3.5 + env: + POSTGRES_PASSWORD: postgres + POSTGRES_PORT: 5432 + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + + steps: + - name: Check out source repository + uses: actions/checkout@v6.0.2 + + - name: Install uv + uses: astral-sh/setup-uv@v5 + with: + enable-cache: true + cache-dependency-glob: uv.lock + + - name: Set up Python + id: setup-python + uses: actions/setup-python@v6.2.0 + with: + python-version-file: "pyproject.toml" + + - name: Cache project virtualenv + id: cache-venv + uses: actions/cache@v4 + with: + path: .venv + key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('uv.lock') }} + + - name: Install the project + run: uv sync --locked --all-extras --dev + + - name: Show Alembic heads + run: uv run alembic heads + + - name: Create test database + run: | + PGPASSWORD=postgres psql -h localhost -p 5432 -U postgres -c "CREATE DATABASE ocotilloapi_test" + PGPASSWORD=postgres psql -h localhost -p 5432 -U postgres -d ocotilloapi_test -c "CREATE EXTENSION IF NOT EXISTS postgis" + + - name: Run BDD tests + run: uv run behave tests/features --tags="@backend and @production and not @skip" --no-capture diff --git a/core/lexicon.json b/core/lexicon.json index 1143eb6b..9da523f9 100644 --- a/core/lexicon.json +++ b/core/lexicon.json @@ -2260,7 +2260,12 @@ "categories": ["status_value"], "term": "Open", "definition": "The well is open." - }, + }, + { + "categories": ["status_value"], + "term": "Open (unequipped)", + "definition": "The well is open and unequipped." + }, { "categories": ["status_value"], "term": "Closed", diff --git a/tests/features/environment.py b/tests/features/environment.py index 0e9ada2a..266df26f 100644 --- a/tests/features/environment.py +++ b/tests/features/environment.py @@ -13,9 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============== ================================================================ +import os import random from datetime import datetime, timedelta +from alembic import command +from alembic.config import Config +from core.initializers import init_lexicon, init_parameter from db import ( Location, Thing, @@ -40,15 +44,14 @@ ThingAquiferAssociation, GeologicFormation, ThingGeologicFormationAssociation, - Base, Asset, Contact, Sample, + Base, ) from db.engine import session_ctx -from services.util import get_bool_env +from db.initialization import recreate_public_schema, sync_search_vector_triggers from sqlalchemy import select -from transfers.transfer import _drop_and_rebuild_db def add_context_object_container(name): @@ -499,24 +502,26 @@ def add_geologic_formation(context, session, formation_code, well): return formation -def before_all(context): - context.objects = {} +def _alembic_config() -> Config: + root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) + cfg = Config(os.path.join(root, "alembic.ini")) + cfg.set_main_option("script_location", os.path.join(root, "alembic")) + return cfg - rebuild_raw = get_bool_env("DROP_AND_REBUILD_DB") - rebuild = rebuild_raw if isinstance(rebuild_raw, bool) else False - erase_data = False - if rebuild: - _drop_and_rebuild_db() - elif erase_data: - with session_ctx() as session: - for table in reversed(Base.metadata.sorted_tables): - if table.name in ("alembic_version", "parameter"): - continue - elif table.name.startswith("lexicon"): - continue - session.execute(table.delete()) - session.commit() +def _initialize_test_schema() -> None: + with session_ctx() as session: + recreate_public_schema(session) + command.upgrade(_alembic_config(), "head") + with session_ctx() as session: + sync_search_vector_triggers(session) + init_lexicon() + init_parameter() + + +def before_all(context): + context.objects = {} + _initialize_test_schema() with session_ctx() as session: diff --git a/tests/test_transfer_legacy_dates.py b/tests/test_transfer_legacy_dates.py index bbfce3a5..32732b97 100644 --- a/tests/test_transfer_legacy_dates.py +++ b/tests/test_transfer_legacy_dates.py @@ -24,10 +24,12 @@ import datetime from unittest.mock import patch +import numpy as np import pandas as pd import pytest from db import Sample +from transfers.well_transfer import _normalize_completion_date from transfers.util import make_location from transfers.waterlevels_transfer import WaterLevelTransferer @@ -207,6 +209,37 @@ def test_make_observation_maps_data_quality(): assert observation.nma_data_quality == "Mapped Quality" +def test_normalize_completion_date_drops_time_from_datetime(): + value = datetime.datetime(2024, 7, 3, 14, 15, 16) + normalized, parse_failed = _normalize_completion_date(value) + assert normalized == datetime.date(2024, 7, 3) + assert parse_failed is False + + +def test_normalize_completion_date_drops_time_from_timestamp_and_string(): + ts_value = pd.Timestamp("2021-05-06 23:59:00") + str_value = "2021-05-06 23:59:00.000" + normalized_ts, parse_failed_ts = _normalize_completion_date(ts_value) + normalized_str, parse_failed_str = _normalize_completion_date(str_value) + assert normalized_ts == datetime.date(2021, 5, 6) + assert normalized_str == datetime.date(2021, 5, 6) + assert parse_failed_ts is False + assert parse_failed_str is False + + +def test_normalize_completion_date_handles_numpy_datetime64(): + value = np.datetime64("2020-01-02T03:04:05") + normalized, parse_failed = _normalize_completion_date(value) + assert normalized == datetime.date(2020, 1, 2) + assert parse_failed is False + + +def test_normalize_completion_date_invalid_returns_none_and_parse_failed(): + normalized, parse_failed = _normalize_completion_date("not-a-date") + assert normalized is None + assert parse_failed is True + + def test_get_dt_utc_respects_time_datum(): transfer = WaterLevelTransferer.__new__(WaterLevelTransferer) transfer.errors = [] diff --git a/transfers/util.py b/transfers/util.py index bb9762cc..d358937c 100644 --- a/transfers/util.py +++ b/transfers/util.py @@ -57,6 +57,38 @@ } +DEFINED_RECORDING_INTERVALS = { + "SA-0174": (1, "hour"), + "SO-0140": (15, "minute"), + "SO-0145": (15, "minute"), + "SO-0146": (15, "minute"), + "SO-0148": (15, "minute"), + "SO-0160": (15, "minute"), + "SO-0163": (15, "minute"), + "SO-0165": (15, "minute"), + "SO-0166": (15, "minute"), + "SO-0175": (15, "minute"), + "SO-0177": (15, "minute"), + "SO-0189": (15, "minute"), + "SO-0191": (15, "minute"), + "SO-0194": (15, "minute"), + "SO-0200": (15, "minute"), + "SO-0204": (15, "minute"), + "SO-0224": (15, "minute"), + "SO-0238": (15, "minute"), + "SO-0247": (15, "minute"), + "SO-0249": (15, "minute"), + "SO-0261": (15, "minute"), + "SM-0055": (6, "hour"), + "SM-0259": (12, "hour"), + "HS-038": (12, "hour"), + "EB-220": (12, "hour"), + "SO-0144": (15, "minute"), + "SO-0142": (15, "minute"), + "SO-0190": (15, "minute"), +} + + class MeasuringPointEstimator: def __init__(self): df = read_csv("WaterLevels") @@ -123,6 +155,12 @@ def estimate_measuring_point_height( return mphs, mph_descs, start_dates, end_dates +def _get_defined_recording_interval(pointid: str) -> tuple[int, str] | None: + if pointid in DEFINED_RECORDING_INTERVALS: + return DEFINED_RECORDING_INTERVALS[pointid] + return None + + class SensorParameterEstimator: def __init__(self, sensor_type: str): if sensor_type == "Pressure Transducer": @@ -156,7 +194,16 @@ def estimate_recording_interval( installation_date: datetime = None, removal_date: datetime = None, ) -> tuple[int | None, str | None, str | None]: + """ + return estimated recording interval, unit, and error message if applicable + """ point_id = record.PointID + + # get statically defined recording interval provided by Ethan + ri = _get_defined_recording_interval(point_id) + if ri is not None: + return ri[0], ri[1], None + cdf = self._get_values(point_id) if len(cdf) == 0: return None, None, f"No measurements found for PointID: {point_id}" diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index 60fa3ff6..a6fa6408 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -18,9 +18,10 @@ import time import traceback from concurrent.futures import ThreadPoolExecutor, as_completed -from datetime import datetime, UTC +from datetime import date, datetime, UTC from zoneinfo import ZoneInfo +import numpy as np import pandas as pd from pandas import isna, notna from pydantic import ValidationError @@ -95,6 +96,42 @@ ] +def _normalize_completion_date(value) -> tuple[date | None, bool]: + try: + if value is None or pd.isna(value): + return None, False + except (TypeError, ValueError): + pass + + if isinstance(value, pd.Timestamp): + return value.date(), False + + if isinstance(value, np.datetime64): + return pd.Timestamp(value).date(), False + + if isinstance(value, datetime): + return value.date(), False + + if isinstance(value, date): + return value, False + + if isinstance(value, str): + stripped = value.strip() + if not stripped: + return None, False + + parsed = pd.to_datetime(stripped, errors="coerce") + if not pd.isna(parsed): + return parsed.date(), False + return None, True + + parsed = pd.to_datetime(value, errors="coerce") + if not pd.isna(parsed): + return parsed.date(), False + + return None, True + + class WellTransferer(Transferer): source_table = "WellData" @@ -279,123 +316,6 @@ def _get_dfs(self): cleaned_df = cleaned_df[cleaned_df["PointID"].isin(self.pointids)] return input_df, cleaned_df - # def _step(self, session: Session, df: pd.DataFrame, i: int, row: pd.Series): - # - # try: - # first_visit_date = get_first_visit_date(row) - # well_purposes = ( - # [] if isna(row.CurrentUse) else self._extract_well_purposes(row) - # ) - # well_casing_materials = ( - # [] if isna(row.CasingDescription) else extract_casing_materials(row) - # ) - # well_pump_type = extract_well_pump_type(row) - # - # wcm = None - # if notna(row.ConstructionMethod): - # wcm = self._get_lexicon_value( - # row, f"LU_ConstructionMethod:{row.ConstructionMethod}", "Unknown" - # ) - # - # mpheight = row.MPHeight - # mpheight_description = row.MeasuringPoint - # if mpheight is None: - # mphs = self._measuring_point_estimator.estimate_measuring_point_height( - # row - # ) - # if mphs: - # try: - # mpheight = mphs[0][0] - # mpheight_description = mphs[1][0] - # except IndexError: - # if self.verbose: - # logger.warning( - # f"Measuring point height estimation failed for well {row.PointID}, {mphs}" - # ) - # - # data = CreateWell( - # location_id=0, - # name=row.PointID, - # first_visit_date=first_visit_date, - # hole_depth=row.HoleDepth, - # well_depth=row.WellDepth, - # well_casing_diameter=( - # row.CasingDiameter * 12 if row.CasingDiameter else None - # ), - # well_casing_depth=row.CasingDepth, - # release_status="public" if row.PublicRelease else "private", - # measuring_point_height=mpheight, - # measuring_point_description=mpheight_description, - # notes=( - # [{"content": row.Notes, "note_type": "General"}] - # if row.Notes - # else [] - # ), - # well_completion_date=row.CompletionDate, - # well_driller_name=row.DrillerName, - # well_construction_method=wcm, - # well_pump_type=well_pump_type, - # ) - # - # CreateWell.model_validate(data) - # except ValidationError as e: - # self._capture_validation_error(row.PointID, e) - # return - # - # well = None - # try: - # well_data = data.model_dump(exclude=EXCLUDED_FIELDS) - # well_data["thing_type"] = "water well" - # well_data["nma_pk_welldata"] = row.WellID - # well_data["nma_pk_location"] = row.LocationId - # - # well = Thing(**well_data) - # session.add(well) - # - # if well_purposes: - # for wp in well_purposes: - # # TODO: add validation logic here - # if wp in WellPurposeEnum: - # wp_obj = WellPurpose(thing=well, purpose=wp) - # session.add(wp_obj) - # else: - # logger.critical(f"{well.name}. Invalid well purpose: {wp}") - # - # if well_casing_materials: - # for wcm in well_casing_materials: - # # TODO: add validation logic here - # if wcm in WellCasingMaterialEnum: - # wcm_obj = WellCasingMaterial(thing=well, material=wcm) - # session.add(wcm_obj) - # else: - # logger.critical( - # f"{well.name}. Invalid well casing material: {wcm}" - # ) - # except Exception as e: - # if well is not None: - # session.expunge(well) - # - # self._capture_error(row.PointID, str(e), "UnknownField") - # - # logger.critical(f"Error creating well for {row.PointID}: {e}") - # return - # - # try: - # location, elevation_method, notes = make_location( - # row, self._cached_elevations - # ) - # session.add(location) - # # session.flush() - # self._added_locations[row.PointID] = (elevation_method, notes) - # except Exception as e: - # import traceback - # - # traceback.print_exc() - # self._capture_error(row.PointID, str(e), str(e), "Location") - # logger.critical(f"Error making location for {row.PointID}: {e}") - # - # return - # def _extract_well_purposes(self, row) -> list[str]: cu = row.CurrentUse @@ -641,6 +561,16 @@ def _build_well_payload(self, row) -> CreateWell | None: except IndexError: pass + completion_date, completion_date_parse_failed = _normalize_completion_date( + row.CompletionDate + ) + if completion_date_parse_failed: + self._capture_error( + row.PointID, + f"Invalid CompletionDate value: {row.CompletionDate!r}", + "CompletionDate", + ) + data = CreateWell( location_id=0, name=row.PointID, @@ -659,7 +589,7 @@ def _build_well_payload(self, row) -> CreateWell | None: if row.Notes else [] ), - well_completion_date=row.CompletionDate, + well_completion_date=completion_date, well_driller_name=row.DrillerName, well_construction_method=wcm, well_pump_type=well_pump_type, @@ -830,8 +760,9 @@ def _add_histories(self, session: Session, row, well: Thing) -> None: ) if notna(row.Status): + sv = row.Status.strip() try: - status_value = lexicon_mapper.map_value(f"LU_Status:{row.Status}") + status_value = lexicon_mapper.map_value(f"LU_Status:{sv}") session.add( StatusHistory( status_type="Well Status", @@ -843,7 +774,7 @@ def _add_histories(self, session: Session, row, well: Thing) -> None: ) ) except KeyError: - pass + self._capture_error(well.name, f"Unknown status code: {sv}", "Status") if notna(row.OpenWellLoggerOK): if bool(row.OpenWellLoggerOK):