Skip to content

Commit 335ae97

Browse files
committed
Merge branch 'staging' into well-inventory-csv
2 parents 7c09529 + 4350d31 commit 335ae97

15 files changed

Lines changed: 918 additions & 43 deletions

AGENTS.MD

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@ these transfers, keep the following rules in mind to avoid hour-long runs:
2121
right instance before running destructive suites.
2222
- When done, `deactivate` to exit the venv and avoid polluting other shells.
2323

24+
## 3. Data migrations must be idempotent
25+
- Data migrations should be safe to re-run without creating duplicate rows or corrupting data.
26+
- Use upserts or duplicate checks and update source fields only after successful inserts.
27+
2428
Following this playbook keeps ETL runs measured in seconds/minutes instead of hours. EOF
2529

2630
## Activate python venv
27-
Always use `source .venv/bin/activate` to activate the venv running python
31+
Always use `source .venv/bin/activate` to activate the venv running python

cli/cli.py

Lines changed: 160 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -13,42 +13,57 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515
# ===============================================================================
16-
import click
16+
from enum import Enum
17+
from pathlib import Path
18+
19+
import typer
1720
from dotenv import load_dotenv
1821

1922
load_dotenv()
2023

24+
cli = typer.Typer(help="Command line interface for managing the application.")
25+
water_levels = typer.Typer(help="Water-level utilities")
26+
data_migrations = typer.Typer(help="Data migration utilities")
27+
cli.add_typer(water_levels, name="water-levels")
28+
cli.add_typer(data_migrations, name="data-migrations")
29+
2130

22-
@click.group()
23-
def cli():
24-
"""Command line interface for managing the application."""
25-
pass
31+
class OutputFormat(str, Enum):
32+
json = "json"
2633

2734

28-
@cli.command()
35+
@cli.command("initialize-lexicon")
2936
def initialize_lexicon():
3037
from core.initializers import init_lexicon
3138

3239
init_lexicon()
3340

3441

35-
@cli.command()
36-
@click.argument(
37-
"root_directory",
38-
type=click.Path(exists=True, file_okay=False, dir_okay=True, readable=True),
39-
)
40-
def associate_assets_command(root_directory: str):
42+
@cli.command("associate-assets")
43+
def associate_assets_command(
44+
root_directory: str = typer.Argument(
45+
...,
46+
exists=True,
47+
file_okay=False,
48+
dir_okay=True,
49+
readable=True,
50+
)
51+
):
4152
from cli.service_adapter import associate_assets
4253

4354
associate_assets(root_directory)
4455

4556

46-
@cli.command()
47-
@click.argument(
48-
"file_path",
49-
type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True),
50-
)
51-
def well_inventory_csv(file_path: str):
57+
@cli.command("well-inventory-csv")
58+
def well_inventory_csv(
59+
file_path: str = typer.Argument(
60+
...,
61+
exists=True,
62+
file_okay=True,
63+
dir_okay=False,
64+
readable=True,
65+
)
66+
):
5267
"""
5368
parse and upload a csv to database
5469
"""
@@ -58,38 +73,143 @@ def well_inventory_csv(file_path: str):
5873
well_inventory_csv(file_path)
5974

6075

61-
@cli.group()
62-
def water_levels():
63-
"""Water-level utilities"""
64-
pass
65-
66-
6776
@water_levels.command("bulk-upload")
68-
@click.option(
69-
"--file",
70-
"file_path",
71-
type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True),
72-
required=True,
73-
help="Path to CSV file containing water level rows",
74-
)
75-
@click.option(
76-
"--output",
77-
"output_format",
78-
type=click.Choice(["json"], case_sensitive=False),
79-
default=None,
80-
help="Optional output format",
81-
)
82-
def water_levels_bulk_upload(file_path: str, output_format: str | None):
77+
def water_levels_bulk_upload(
78+
file_path: str = typer.Option(
79+
...,
80+
"--file",
81+
exists=True,
82+
file_okay=True,
83+
dir_okay=False,
84+
readable=True,
85+
help="Path to CSV file containing water level rows",
86+
),
87+
output_format: OutputFormat | None = typer.Option(
88+
None,
89+
"--output",
90+
help="Optional output format",
91+
),
92+
):
8393
"""
8494
parse and upload a csv
8595
"""
8696
# TODO: use the same helper function used by api to parse and upload a WL csv
8797
from cli.service_adapter import water_levels_csv
8898

89-
pretty_json = (output_format or "").lower() == "json"
99+
pretty_json = output_format == OutputFormat.json
90100
water_levels_csv(file_path, pretty_json=pretty_json)
91101

92102

103+
@data_migrations.command("list")
104+
def data_migrations_list():
105+
from data_migrations.registry import list_migrations
106+
107+
migrations = list_migrations()
108+
if not migrations:
109+
typer.echo("No data migrations registered.")
110+
return
111+
for migration in migrations:
112+
repeatable = " (repeatable)" if migration.is_repeatable else ""
113+
typer.echo(f"{migration.id}: {migration.name}{repeatable}")
114+
115+
116+
@data_migrations.command("status")
117+
def data_migrations_status():
118+
from db.engine import session_ctx
119+
from data_migrations.runner import get_status
120+
121+
with session_ctx() as session:
122+
statuses = get_status(session)
123+
if not statuses:
124+
typer.echo("No data migrations registered.")
125+
return
126+
for status in statuses:
127+
last_applied = (
128+
status.last_applied_at.isoformat() if status.last_applied_at else "never"
129+
)
130+
typer.echo(
131+
f"{status.id}: applied {status.applied_count} time(s), last={last_applied}"
132+
)
133+
134+
135+
@data_migrations.command("run")
136+
def data_migrations_run(
137+
migration_id: str = typer.Argument(...),
138+
force: bool = typer.Option(
139+
False, "--force", help="Re-run even if already applied."
140+
),
141+
):
142+
from db.engine import session_ctx
143+
from data_migrations.runner import run_migration_by_id
144+
145+
with session_ctx() as session:
146+
ran = run_migration_by_id(session, migration_id, force=force)
147+
typer.echo("applied" if ran else "skipped")
148+
149+
150+
@data_migrations.command("run-all")
151+
def data_migrations_run_all(
152+
include_repeatable: bool = typer.Option(
153+
False,
154+
"--include-repeatable/--exclude-repeatable",
155+
help="Whether to include repeatable migrations.",
156+
),
157+
force: bool = typer.Option(
158+
False, "--force", help="Re-run non-repeatable migrations."
159+
),
160+
):
161+
from db.engine import session_ctx
162+
from data_migrations.runner import run_all
163+
164+
with session_ctx() as session:
165+
ran = run_all(session, include_repeatable=include_repeatable, force=force)
166+
typer.echo(f"applied {len(ran)} migration(s)")
167+
168+
169+
@cli.command("alembic-upgrade-and-data")
170+
def alembic_upgrade_and_data(
171+
revision: str = typer.Argument("head"),
172+
include_repeatable: bool = typer.Option(
173+
False,
174+
"--include-repeatable/--exclude-repeatable",
175+
help="Whether to include repeatable migrations.",
176+
),
177+
force: bool = typer.Option(
178+
False, "--force", help="Re-run non-repeatable migrations."
179+
),
180+
):
181+
from alembic import command
182+
from alembic.config import Config
183+
from alembic.runtime.migration import MigrationContext
184+
from alembic.script import ScriptDirectory
185+
from db.engine import engine, session_ctx
186+
from data_migrations.runner import run_all
187+
188+
root = Path(__file__).resolve().parents[1]
189+
cfg = Config(str(root / "alembic.ini"))
190+
cfg.set_main_option("script_location", str(root / "alembic"))
191+
192+
command.upgrade(cfg, revision)
193+
194+
with engine.connect() as conn:
195+
context = MigrationContext.configure(conn)
196+
heads = context.get_current_heads()
197+
script = ScriptDirectory.from_config(cfg)
198+
applied_revisions: set[str] = set()
199+
for head in heads:
200+
for rev in script.iterate_revisions(head, "base"):
201+
applied_revisions.add(rev.revision)
202+
203+
with session_ctx() as session:
204+
ran = run_all(
205+
session,
206+
include_repeatable=include_repeatable,
207+
force=force,
208+
allowed_alembic_revisions=applied_revisions,
209+
)
210+
typer.echo(f"applied {len(ran)} migration(s)")
211+
212+
93213
if __name__ == "__main__":
94214
cli()
95215

data_migrations/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# Data migrations package

data_migrations/base.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# ===============================================================================
2+
# Copyright 2026 ross
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
# ===============================================================================
16+
from dataclasses import dataclass
17+
from typing import Callable
18+
19+
from sqlalchemy.orm import Session
20+
21+
22+
@dataclass(frozen=True)
23+
class DataMigration:
24+
id: str
25+
alembic_revision: str
26+
name: str
27+
description: str
28+
run: Callable[[Session], None]
29+
is_repeatable: bool = False
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
# ===============================================================================
2+
# Copyright 2026 ross
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
# ===============================================================================
16+
from sqlalchemy import insert, select, update
17+
from sqlalchemy.orm import Session
18+
19+
from data_migrations.base import DataMigration
20+
from db.location import Location
21+
from db.notes import Notes
22+
23+
NOTE_TYPE = "General"
24+
BATCH_SIZE = 1000
25+
26+
27+
def _iter_location_notes(session: Session):
28+
stmt = select(
29+
Location.id,
30+
Location.nma_location_notes,
31+
Location.release_status,
32+
).where(Location.nma_location_notes.isnot(None))
33+
for row in session.execute(stmt):
34+
note = (row.nma_location_notes or "").strip()
35+
if not note:
36+
continue
37+
yield row.id, note, row.release_status
38+
39+
40+
def run(session: Session) -> None:
41+
buffer: list[tuple[int, str, str]] = []
42+
for item in _iter_location_notes(session):
43+
buffer.append(item)
44+
if len(buffer) >= BATCH_SIZE:
45+
_flush_batch(session, buffer)
46+
buffer.clear()
47+
if buffer:
48+
_flush_batch(session, buffer)
49+
50+
51+
def _flush_batch(session: Session, batch: list[tuple[int, str, str]]) -> None:
52+
location_ids = [row[0] for row in batch]
53+
existing = session.execute(
54+
select(Notes.target_id, Notes.content).where(
55+
Notes.target_table == "location",
56+
Notes.note_type == NOTE_TYPE,
57+
Notes.target_id.in_(location_ids),
58+
)
59+
).all()
60+
existing_set = {(row.target_id, row.content) for row in existing}
61+
62+
inserts = []
63+
for location_id, note, release_status in batch:
64+
if (location_id, note) in existing_set:
65+
continue
66+
inserts.append(
67+
{
68+
"target_id": location_id,
69+
"target_table": "location",
70+
"note_type": NOTE_TYPE,
71+
"content": note,
72+
"release_status": release_status or "draft",
73+
}
74+
)
75+
76+
if inserts:
77+
session.execute(insert(Notes), inserts)
78+
79+
session.execute(
80+
update(Location)
81+
.where(Location.id.in_(location_ids))
82+
.values(nma_location_notes=None)
83+
)
84+
session.commit()
85+
86+
87+
MIGRATION = DataMigration(
88+
id="20260205_0001_move_nma_location_notes",
89+
alembic_revision="f0c9d8e7b6a5",
90+
name="Move NMA location notes to Notes table",
91+
description="Backfill polymorphic notes from Location.nma_location_notes.",
92+
run=run,
93+
is_repeatable=False,
94+
)
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# Data migrations live here.

0 commit comments

Comments
 (0)