|
15 | 15 | # =============================================================================== |
16 | 16 | from __future__ import annotations |
17 | 17 |
|
18 | | -import textwrap |
19 | | -import uuid |
| 18 | +import tempfile |
20 | 19 | from pathlib import Path |
21 | 20 |
|
22 | 21 | from click.testing import CliRunner |
23 | 22 | from sqlalchemy import select |
24 | 23 |
|
25 | 24 | from cli.cli import cli |
26 | | -from db import FieldActivity, FieldEvent, Observation, Sample |
| 25 | +from db import FieldActivity, FieldEvent, FieldEventParticipant, Observation, Sample |
27 | 26 | from db.engine import session_ctx |
28 | 27 |
|
29 | 28 |
|
@@ -141,89 +140,184 @@ def fake_upload(file_path, *, pretty_json=False): |
141 | 140 | assert captured["pretty_json"] is True |
142 | 141 |
|
143 | 142 |
|
144 | | -def test_water_levels_cli_persists_observations(tmp_path, water_well_thing): |
| 143 | +def test_water_levels_cli_persists_observations( |
| 144 | + water_level_bulk_upload_data, water_well_thing, contact, second_contact |
| 145 | +): |
145 | 146 | """ |
146 | 147 | End-to-end CLI invocation should create FieldEvent, Sample, and Observation rows. |
| 148 | +
|
| 149 | + This is essentially the same test in tests/services/test_water_level_service.py::test_bulk_upload, |
| 150 | + but it works by invoking the command line rather than just the function directly. |
147 | 151 | """ |
148 | 152 |
|
149 | | - def _write_csv(path: Path, *, well_name: str, notes: str): |
150 | | - csv_text = textwrap.dedent( |
151 | | - f"""\ |
152 | | - field_staff,well_name_point_id,field_event_date_time,measurement_date_time,sampler,sample_method,mp_height,level_status,depth_to_water_ft,data_quality,water_level_notes |
153 | | - CLI Tester,{well_name},2025-02-15T08:00:00-07:00,2025-02-15T10:30:00-07:00,Groundwater Team,electric tape,1.5,stable,42.5,approved,{notes} |
154 | | - """ |
155 | | - ) |
156 | | - path.write_text(csv_text) |
| 153 | + # write to a CSV file in memory then delete it after processing |
| 154 | + # this is being done to avoid filesystem dependencies in tests and |
| 155 | + # to use the contact fixture for the field staff |
| 156 | + csv_headers = list(water_level_bulk_upload_data.keys()) |
| 157 | + csv_values = list(water_level_bulk_upload_data.values()) |
157 | 158 |
|
158 | | - unique_notes = f"pytest-{uuid.uuid4()}" |
159 | | - csv_file = tmp_path / "water_levels.csv" |
160 | | - _write_csv(csv_file, well_name=water_well_thing.name, notes=unique_notes) |
| 159 | + csv_content = ",".join(csv_headers) + "\n" + ",".join(csv_values) |
161 | 160 |
|
162 | | - runner = CliRunner() |
163 | | - result = runner.invoke( |
164 | | - cli, ["water-levels", "bulk-upload", "--file", str(csv_file)] |
165 | | - ) |
166 | | - |
167 | | - assert result.exit_code == 0, result.output |
| 161 | + with tempfile.NamedTemporaryFile( |
| 162 | + mode="w+", encoding="utf-8", delete_on_close=True |
| 163 | + ) as temp_csv: |
| 164 | + temp_csv.write(csv_content) |
| 165 | + temp_csv.flush() |
168 | 166 |
|
169 | | - created_ids: dict[str, int] = {} |
170 | | - with session_ctx() as session: |
171 | | - stmt = ( |
172 | | - select(Observation) |
173 | | - .join(Observation.sample) |
174 | | - .join(Sample.field_activity) |
175 | | - .join(FieldActivity.field_event) |
176 | | - .where(Sample.notes == unique_notes) |
| 167 | + runner = CliRunner() |
| 168 | + result = runner.invoke( |
| 169 | + cli, ["water-levels", "bulk-upload", "--file", str(temp_csv.name)] |
177 | 170 | ) |
178 | | - observations = session.scalars(stmt).all() |
179 | | - assert len(observations) == 1, "Expected one observation for the uploaded CSV" |
180 | | - |
181 | | - observation = observations[0] |
182 | | - sample = observation.sample |
183 | | - field_activity = sample.field_activity |
184 | | - field_event = field_activity.field_event |
185 | | - |
186 | | - assert field_event.thing_id == water_well_thing.id |
187 | | - assert sample.sample_method == "Electric tape measurement (E-probe)" |
188 | | - assert sample.sample_matrix == "water" |
189 | | - assert observation.value == 42.5 |
190 | | - assert observation.measuring_point_height == 1.5 |
191 | | - assert observation.notes == "Level status: stable | Data quality: approved" |
192 | | - assert ( |
193 | | - field_event.notes == f"Field staff: CLI Tester | {unique_notes}" |
194 | | - ), "Field event notes should capture field staff and notes" |
195 | | - |
196 | | - created_ids = { |
197 | | - "observation_id": observation.id, |
198 | | - "sample_id": sample.id, |
199 | | - "field_activity_id": field_activity.id, |
200 | | - "field_event_id": field_event.id, |
201 | | - } |
202 | | - |
203 | | - if created_ids: |
204 | | - # Clean up committed rows so other tests see a pristine database. |
| 171 | + |
| 172 | + assert result.exit_code == 0, result.output |
| 173 | + |
| 174 | + created_ids: dict[str, int] = {} |
205 | 175 | with session_ctx() as session: |
206 | | - observation = session.get(Observation, created_ids["observation_id"]) |
207 | | - sample = session.get(Sample, created_ids["sample_id"]) |
208 | | - field_activity = session.get( |
209 | | - FieldActivity, created_ids["field_activity_id"] |
| 176 | + stmt = ( |
| 177 | + select(Observation) |
| 178 | + .join(Observation.sample) |
| 179 | + .join(Sample.field_activity) |
| 180 | + .join(FieldActivity.field_event) |
| 181 | + .where( |
| 182 | + Observation.notes |
| 183 | + == water_level_bulk_upload_data["water_level_notes"] |
| 184 | + ) |
210 | 185 | ) |
211 | | - field_event = session.get(FieldEvent, created_ids["field_event_id"]) |
212 | | - |
213 | | - if observation: |
214 | | - session.delete(observation) |
215 | | - session.flush() |
216 | | - if sample: |
217 | | - session.delete(sample) |
218 | | - session.flush() |
219 | | - if field_activity: |
220 | | - session.delete(field_activity) |
221 | | - session.flush() |
222 | | - if field_event: |
223 | | - session.delete(field_event) |
224 | | - session.flush() |
225 | | - |
226 | | - session.commit() |
| 186 | + observations = session.scalars(stmt).all() |
| 187 | + print(observations) |
| 188 | + assert ( |
| 189 | + len(observations) == 1 |
| 190 | + ), "Expected one observation for the uploaded CSV" |
| 191 | + |
| 192 | + observation = observations[0] |
| 193 | + sample = observation.sample |
| 194 | + field_activity = sample.field_activity |
| 195 | + field_event = field_activity.field_event |
| 196 | + # contact is created before second_contact so will have a lower id |
| 197 | + field_event_participants = sorted( |
| 198 | + field_event.field_event_participants, key=lambda fep: fep.contact_id |
| 199 | + ) |
| 200 | + field_event_participant_1 = field_event_participants[0] |
| 201 | + field_event_participant_2 = field_event_participants[1] |
| 202 | + |
| 203 | + # ---------- |
| 204 | + # INSERTION VERIFICATION |
| 205 | + # ---------- |
| 206 | + |
| 207 | + # FieldEvent |
| 208 | + assert field_event is not None |
| 209 | + assert field_event.thing_id == water_well_thing.id |
| 210 | + # TODO: uncomment after timezone handling is fixed |
| 211 | + # assert field_event.event_date.isoformat() == "2025-02-15T15:00:00+00:00" |
| 212 | + assert ( |
| 213 | + field_event.event_date.isoformat() |
| 214 | + == water_level_bulk_upload_data["field_event_date_time"] + "+00:00" |
| 215 | + ) |
| 216 | + |
| 217 | + # FieldActivity |
| 218 | + assert field_activity is not None |
| 219 | + assert field_activity.activity_type == "groundwater level" |
| 220 | + |
| 221 | + # FieldEventParticipants |
| 222 | + assert field_event_participant_1 is not None |
| 223 | + assert field_event_participant_1.contact_id == contact.id |
| 224 | + assert field_event_participant_1.field_event_id == field_event.id |
| 225 | + assert field_event_participant_1.participant_role == "Lead" |
| 226 | + |
| 227 | + assert field_event_participant_2 is not None |
| 228 | + assert field_event_participant_2.contact_id == second_contact.id |
| 229 | + assert field_event_participant_2.field_event_id == field_event.id |
| 230 | + assert field_event_participant_2.participant_role == "Participant" |
| 231 | + |
| 232 | + # Sample |
| 233 | + assert sample is not None |
| 234 | + assert sample.field_activity_id == field_activity.id |
| 235 | + # TODO: uncomment after timezone handling is fixed |
| 236 | + # assert sample.sample_date.isoformat() == "2025-02-15T17:30:00+00:00" |
| 237 | + assert ( |
| 238 | + sample.sample_date.isoformat() |
| 239 | + == water_level_bulk_upload_data["water_level_date_time"] + "+00:00" |
| 240 | + ) |
| 241 | + assert sample.sample_name[0:3] == "wl-" |
| 242 | + assert sample.sample_matrix == "water" |
| 243 | + assert sample.sample_method == water_level_bulk_upload_data["sample_method"] |
| 244 | + assert sample.qc_type == "Normal" |
| 245 | + assert sample.depth_top is None |
| 246 | + assert sample.depth_bottom is None |
| 247 | + |
| 248 | + # Observation |
| 249 | + assert observation is not None |
| 250 | + assert observation.sample_id == sample.id |
| 251 | + # TODO: uncomment after timezone handling is fixed |
| 252 | + # assert observation.observation_datetime.isoformat() == "2025-02-15T17:30:00+00:00" |
| 253 | + assert ( |
| 254 | + observation.observation_datetime.isoformat() |
| 255 | + == water_level_bulk_upload_data["water_level_date_time"] + "+00:00" |
| 256 | + ) |
| 257 | + assert observation.value == float( |
| 258 | + water_level_bulk_upload_data["depth_to_water_ft"] |
| 259 | + ) |
| 260 | + assert observation.unit == "ft" |
| 261 | + assert observation.measuring_point_height == float( |
| 262 | + water_level_bulk_upload_data["mp_height"] |
| 263 | + ) |
| 264 | + assert ( |
| 265 | + observation.groundwater_level_reason |
| 266 | + == water_level_bulk_upload_data["level_status"] |
| 267 | + ) |
| 268 | + assert ( |
| 269 | + observation.groundwater_level_accuracy |
| 270 | + == water_level_bulk_upload_data["data_quality"] |
| 271 | + ) |
| 272 | + assert ( |
| 273 | + observation.notes == water_level_bulk_upload_data["water_level_notes"] |
| 274 | + ) |
| 275 | + |
| 276 | + created_ids = { |
| 277 | + "observation_id": observation.id, |
| 278 | + "sample_id": sample.id, |
| 279 | + "field_activity_id": field_activity.id, |
| 280 | + "field_event_id": field_event.id, |
| 281 | + "field_participant_ids": [fep.id for fep in field_event.participants], |
| 282 | + } |
| 283 | + |
| 284 | + if created_ids: |
| 285 | + # Clean up committed rows so other tests see a pristine database. |
| 286 | + with session_ctx() as session: |
| 287 | + observation = session.get(Observation, created_ids["observation_id"]) |
| 288 | + sample = session.get(Sample, created_ids["sample_id"]) |
| 289 | + field_activity = session.get( |
| 290 | + FieldActivity, created_ids["field_activity_id"] |
| 291 | + ) |
| 292 | + field_event = session.get(FieldEvent, created_ids["field_event_id"]) |
| 293 | + field_participants = ( |
| 294 | + session.query(FieldEventParticipant) |
| 295 | + .filter( |
| 296 | + FieldEventParticipant.id.in_( |
| 297 | + created_ids["field_participant_ids"] |
| 298 | + ) |
| 299 | + ) |
| 300 | + .all() |
| 301 | + ) |
| 302 | + |
| 303 | + if observation: |
| 304 | + session.delete(observation) |
| 305 | + session.flush() |
| 306 | + if sample: |
| 307 | + session.delete(sample) |
| 308 | + session.flush() |
| 309 | + if field_participants: |
| 310 | + for participant in field_participants: |
| 311 | + session.delete(participant) |
| 312 | + session.flush() |
| 313 | + if field_activity: |
| 314 | + session.delete(field_activity) |
| 315 | + session.flush() |
| 316 | + if field_event: |
| 317 | + session.delete(field_event) |
| 318 | + session.flush() |
| 319 | + |
| 320 | + session.commit() |
227 | 321 |
|
228 | 322 |
|
229 | 323 | # ============= EOF ============================================= |
0 commit comments