Skip to content

Commit ddb4c24

Browse files
authored
Merge pull request #223 from DataIntegrationGroup/transfer
transfer
2 parents c56360d + 26e4fbf commit ddb4c24

9 files changed

Lines changed: 355 additions & 140 deletions

core/lexicon.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,7 @@
357357
{"categories": ["analysis_method_type"], "term": "Laboratory", "definition": "A procedure performed on a physical sample in a controlled, off-site laboratory environment. These methods typically involve complex instrumentation, standardized reagents, and formal quality control protocols."},
358358
{"categories": ["analysis_method_type"], "term": "Field Procedure", "definition": "A standardized procedure performed on-site at the time of sample collection. This can involve direct measurement of the environmental medium using a calibrated field instrument or a specific, documented technique for collecting a sample."},
359359
{"categories": ["analysis_method_type"], "term": "Calculation", "definition": "A mathematical procedure used to derive a new data point from one or more directly measured values. This type is used to document the provenance of calculated data, providing an auditable trail."},
360+
{"categories": ["organization"], "term": "NMSU", "definition": "New Mexico State University"},
360361
{"categories": ["organization"], "term": "USGS", "definition": "US Geological Survey"},
361362
{"categories": ["organization"], "term": "TWDB", "definition": "Texas Water Development Board"},
362363
{"categories": ["organization"], "term": "NMED", "definition": "New Mexico Environment Department"},

transfers/contact_transfer.py

Lines changed: 51 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from transfers.logger import logger
2222
from transfers.util import (
2323
get_transfers_data_path,
24+
chunk_by_size,
2425
)
2526
from transfers.util import read_csv, filter_to_valid_point_ids, replace_nans
2627

@@ -66,50 +67,56 @@ def transfer_contacts(session):
6667
odf = filter_to_valid_point_ids(session, odf)
6768
cleaned_df = odf
6869
errors = []
69-
for i, row in odf.iterrows():
70-
thing = session.query(Thing).where(Thing.name == row.PointID).first()
71-
logger.info(f"Processing PointID: {i} {row.PointID}")
72-
if thing is None:
73-
logger.critical(
74-
f"Thing with PointID {row.PointID} not found. Skipping owner."
75-
)
76-
continue
77-
78-
# TODO: use contact_helper.add_contact
79-
try:
80-
_add_first_contact(session, row, thing, co_to_org_mapper)
81-
session.commit()
82-
session.flush()
83-
logger.info(f"added first contact for PointID {row.PointID}")
84-
except ValidationError as e:
85-
logger.critical(
86-
f"Skipping first contact for PointID {row.PointID} due to validation error: {e.errors()}"
87-
)
88-
session.rollback()
89-
errors.append({"pointid": row.PointID, "error": e.errors()})
90-
except Exception as e:
91-
logger.critical(
92-
f"Skipping first contact for PointID {row.PointID} due to error: {e}"
93-
)
94-
session.rollback()
95-
errors.append({"pointid": row.PointID, "error": e})
96-
try:
97-
_add_second_contact(session, row, thing, co_to_org_mapper)
98-
session.commit()
99-
session.flush()
100-
logger.info(f"added second contact for PointID {row.PointID}")
101-
except ValidationError as e:
102-
logger.critical(
103-
f"Skipping second contact for PointID {row.PointID} due to validation error: {e.errors()}"
104-
)
105-
session.rollback()
106-
errors.append({"pointid": row.PointID, "error": e.errors()})
107-
except Exception as e:
108-
logger.critical(
109-
f"Skipping second contact for PointID {row.PointID} due to error: {e}"
110-
)
111-
session.rollback()
112-
errors.append({"pointid": row.PointID, "error": e})
70+
# for i, row in odf.iterrows():
71+
for chunk in chunk_by_size(odf, 500):
72+
things = (
73+
session.query(Thing).filter(Thing.name.in_(chunk.PointID.tolist())).all()
74+
)
75+
for i, row in chunk.iterrows():
76+
thing = next((thing for thing in things if thing.name == row.PointID), None)
77+
logger.info(f"Processing PointID: {i} {row.PointID}")
78+
if thing is None:
79+
logger.critical(
80+
f"Thing with PointID {row.PointID} not found. Skipping owner."
81+
)
82+
continue
83+
84+
# TODO: use contact_helper.add_contact
85+
try:
86+
_add_first_contact(session, row, thing, co_to_org_mapper)
87+
session.commit()
88+
# session.flush()
89+
logger.info(f"added first contact for PointID {row.PointID}")
90+
except ValidationError as e:
91+
logger.critical(
92+
f"Skipping first contact for PointID {row.PointID} due to validation error: {e.errors()}"
93+
)
94+
session.rollback()
95+
errors.append({"pointid": row.PointID, "error": e.errors()})
96+
except Exception as e:
97+
logger.critical(
98+
f"Skipping first contact for PointID {row.PointID} due to error: {e}"
99+
)
100+
session.rollback()
101+
errors.append({"pointid": row.PointID, "error": e})
102+
103+
try:
104+
_add_second_contact(session, row, thing, co_to_org_mapper)
105+
session.commit()
106+
# session.flush()
107+
logger.info(f"added second contact for PointID {row.PointID}")
108+
except ValidationError as e:
109+
logger.critical(
110+
f"Skipping second contact for PointID {row.PointID} due to validation error: {e.errors()}"
111+
)
112+
session.rollback()
113+
errors.append({"pointid": row.PointID, "error": e.errors()})
114+
except Exception as e:
115+
logger.critical(
116+
f"Skipping second contact for PointID {row.PointID} due to error: {e}"
117+
)
118+
session.rollback()
119+
errors.append({"pointid": row.PointID, "error": e})
113120

114121
return input_df, cleaned_df, errors
115122

transfers/link_ids_transfer.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ def transfer_link_ids_welldata(session):
3333

3434
ldf = filter_to_valid_point_ids(session, ldf)
3535

36-
for chunk in chunk_by_size(ldf, 25):
37-
locations = (
36+
for chunk in chunk_by_size(ldf, 100):
37+
things = (
3838
session.query(Thing).filter(Thing.name.in_(chunk.PointID.tolist())).all()
3939
)
4040
for row in chunk.itertuples():
@@ -45,7 +45,7 @@ def transfer_link_ids_welldata(session):
4545
# )
4646
continue
4747

48-
thing = next((l for l in locations if l.name == row.PointID), None)
48+
thing = next((l for l in things if l.name == row.PointID), None)
4949
if thing is None:
5050
logger.warning(
5151
f"Thing not found forPointID {row.PointID}. Skipping link ids."
@@ -162,7 +162,7 @@ def transfer_link_ids(session, site_type="GW"):
162162
ldf = replace_nans(ldf)
163163

164164
ldf = filter_to_valid_point_ids(session, ldf)
165-
for chunk in chunk_by_size(ldf, 25):
165+
for chunk in chunk_by_size(ldf, 100):
166166
locations = (
167167
session.query(Thing).filter(Thing.name.in_(chunk.PointID.tolist())).all()
168168
)

transfers/metrics.py

Lines changed: 68 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,21 @@
2222
from sqlalchemy import select, func
2323
from sqlalchemy.orm import Session
2424

25-
from db import Thing, WellScreen, Sensor, Contact, Observation, Parameter
25+
from db import (
26+
Thing,
27+
WellScreen,
28+
Sensor,
29+
Contact,
30+
Observation,
31+
Parameter,
32+
Deployment,
33+
TransducerObservation,
34+
)
2635

2736

2837
class Metrics:
38+
include_errors = False
39+
2940
def __init__(self):
3041
# create a new path for the metrics
3142
root = Path("metrics")
@@ -36,12 +47,16 @@ def __init__(self):
3647
os.mkdir(root)
3748

3849
self.path = root / f"metrics_{datetime.now()}.csv"
39-
40-
self._writer = csv.writer(self.path.open("a"), delimiter="|")
41-
self._writer.writerow(["model", "transferred", "input_count", "cleaned_count"])
50+
delimiter = "|" if self.include_errors else ","
51+
self._writer = csv.writer(self.path.open("a"), delimiter=delimiter)
52+
self._writer.writerow(
53+
["model", "input_count", "cleaned_count", "transferred", "issue_percentage"]
54+
)
4255

4356
def well_metrics(self, *args, **kw) -> None:
44-
self._handle_metrics(Thing, where=Thing.thing_type == "water well", *args, **kw)
57+
self._handle_metrics(
58+
Thing, where=Thing.thing_type == "water well", name="Well", *args, **kw
59+
)
4560

4661
def sensor_metrics(self, *args, **kw) -> None:
4762
self._handle_metrics(Sensor, *args, **kw)
@@ -56,7 +71,9 @@ def contact_metrics(self, sess, input_df, cleaned_df, errors) -> None:
5671
)
5772

5873
# since each contact in nma contacts a primary and a secondary contact multiply the count by 2
59-
metrics = [Contact.__name__, len(input_df) * 2, len(cleaned_df) * 2, count]
74+
metrics = self._make_metrics(
75+
Contact.__name__, len(input_df) * 2, len(cleaned_df) * 2, count
76+
)
6077
self._writer.writerow(metrics)
6178
self._write_errors(errors)
6279

@@ -69,33 +86,65 @@ def water_level_metrics(self, sess, input_df, cleaned_df, errors) -> None:
6986
)
7087
count = sess.execute(sql).scalar_one()
7188

72-
metrics = ["Manual Water Levels", len(input_df), len(cleaned_df), count]
89+
metrics = self._make_metrics(
90+
"Manual Water Levels", len(input_df), len(cleaned_df), count
91+
)
7392
self._writer.writerow(metrics)
7493
self._write_errors(errors)
7594

95+
def acoustic_metrics(self, *args, **kw) -> None:
96+
self._transducer_metrics("Acoustic Sounder", *args, **kw)
97+
98+
def pressure_metrics(self, *args, **kw) -> None:
99+
self._transducer_metrics("Pressure Transducer", *args, **kw)
100+
101+
def _transducer_metrics(
102+
self, sensor_type, sess, input_df, cleaned_df, errors
103+
) -> None:
104+
sql = (
105+
select(func.count())
106+
.select_from(TransducerObservation)
107+
.join(Deployment)
108+
.join(Sensor)
109+
.join(Parameter)
110+
.where(Sensor.sensor_type == sensor_type)
111+
.where(Parameter.parameter_name == "groundwater level")
112+
)
113+
count = sess.execute(sql).scalar_one()
114+
metrics = self._make_metrics(sensor_type, len(input_df), len(cleaned_df), count)
115+
self._writer.writerow(metrics)
116+
self._write_errors(errors)
117+
118+
def _make_metrics(self, name, input_n, cleaned_n, count):
119+
percent_issue = (cleaned_n - count) / cleaned_n * 100 if cleaned_n == 0 else 0
120+
return [name, input_n, cleaned_n, count, percent_issue]
121+
76122
def _handle_metrics(
77-
self, model, sess, input_df, cleaned_df, errors, where=None
123+
self, model, sess, input_df, cleaned_df, errors, where=None, name=None
78124
) -> None:
79125
count = self._get_count(sess, model, where=where)
80-
self._write_metrics(model.__name__, count, input_df, cleaned_df)
126+
127+
if name is None:
128+
name = model.__name__
129+
self._write_metrics(name, count, input_df, cleaned_df)
81130
self._write_errors(errors)
82131

83132
def _write_errors(self, errors: list) -> None:
84-
self._writer.writerow(["PointID", "Error"])
85-
for e in errors:
86-
error = e["error"]
87-
if not isinstance(error, (list, tuple)):
88-
error = [error]
133+
if self.include_errors:
134+
self._writer.writerow(["PointID", "Error"])
135+
for e in errors:
136+
error = e["error"]
137+
if not isinstance(error, (list, tuple)):
138+
error = [error]
89139

90-
for ee in error:
91-
self._writer.writerow([e["pointid"], ee])
92-
self._writer.writerow([])
140+
for ee in error:
141+
self._writer.writerow([e["pointid"], ee])
142+
self._writer.writerow([])
93143

94144
def _write_metrics(
95145
self, name: str, count: int, input_df: DataFrame, cleaned_df: DataFrame
96146
) -> None:
97-
metrics = [name, len(input_df), len(cleaned_df), count]
98-
147+
metrics = self._make_metrics(name, len(input_df), len(cleaned_df), count)
99148
self._writer.writerow(metrics)
100149

101150
def _get_count(self, sess: Session, model, where=None) -> int:

transfers/sensor_transfer.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
# ===============================================================================
1616
from datetime import datetime
1717

18+
from sqlalchemy import select
19+
1820
from db import Sensor, Deployment, Thing
1921
from transfers.util import read_csv, logger, filter_to_valid_point_ids, replace_nans
2022

@@ -119,6 +121,20 @@ def transfer_sensors(session):
119121
f"not an integer",
120122
}
121123
)
124+
sql = (
125+
select(Deployment)
126+
.join(Thing)
127+
.join(Sensor)
128+
.where(Thing.name == pointid)
129+
.where(Sensor.serial_no == sensor.serial_no)
130+
.where(Deployment.installation_date == installation_date)
131+
.where(Deployment.removal_date == removal_date)
132+
)
133+
134+
existing_deployment = session.execute(sql).scalars().one_or_none()
135+
if existing_deployment:
136+
logger.info("existing deployment")
137+
continue
122138

123139
# TODO: add validation
124140
deployment = Deployment(

0 commit comments

Comments
 (0)