2828 DT_MEASURED ,
2929 EARLIEST ,
3030 LATEST ,
31+ TDS ,
32+ WATERLEVELS
3133)
3234from backend .connectors .wqp .transformer import (
3335 WQPSiteTransformer ,
@@ -160,7 +162,7 @@ def get_records(self, site_record):
160162 }
161163 params .update (get_date_range (self .config ))
162164
163- if config .parameter .lower () != "waterlevels" :
165+ if config .parameter .lower () != WATERLEVELS :
164166 params ["characteristicName" ] = get_analyte_search_param (
165167 config .parameter , WQP_ANALYTE_MAPPING
166168 )
@@ -181,6 +183,37 @@ def _parameter_units_hook(self):
181183 raise NotImplementedError (
182184 f"{ self .__class__ .__name__ } must implement _parameter_units_hook"
183185 )
186+
187+ def _clean_records (self , records ) -> list :
188+ """
189+ Remove duplicate TDS records. This is called on a site-by-site basis so does not need to account for
190+ different sites having observations on the same date.
191+ """
192+ if self .config .parameter == TDS :
193+ site_id = records [0 ]["MonitoringLocationIdentifier" ]
194+ return_records = []
195+ dates = [record ["ActivityStartDate" ] for record in records ]
196+ dates = list (set (dates ))
197+ for date in dates :
198+ # get all records for this date
199+ date_records = {
200+ record ["USGSPCode" ]: record for record in records if record ["ActivityStartDate" ] == date
201+ }
202+ if len (date_records .items ()) > 1 :
203+ if "70301" in date_records .keys ():
204+ kept_record = date_records ["70301" ]
205+ elif "70303" in date_records .keys ():
206+ kept_record = date_records ["70303" ]
207+ else :
208+ raise ValueError (
209+ f"Multiple TDS records found for { site_id } on date { date } but no 70301 or 70303 pcodes found."
210+ )
211+ else :
212+ kept_record = list (date_records .values ())[0 ]
213+ return_records .append (kept_record )
214+ return return_records
215+ else :
216+ return super ()._clean_records (records )
184217
185218
186219class WQPAnalyteSource (WQPParameterSource , BaseAnalyteSource ):
0 commit comments