Skip to content

Commit 05aade6

Browse files
authored
Merge pull request #457 from DataIntegrationGroup/BDMS-520-1-1-Cleanup-2.0
BDMS-520-1-1-Cleanup-2.0
2 parents 1f00d38 + 9a62b05 commit 05aade6

2 files changed

Lines changed: 39 additions & 3 deletions

File tree

transfers/transfer.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,11 @@ def _run_continuous_water_level_transfers(metrics, flags):
425425
results_map[result_name] = result
426426
logger.info(f"Parallel task {result_name} completed in {elapsed:.2f}s")
427427
except Exception as e:
428-
logger.critical(f"Parallel task {name} failed: {e}")
428+
import traceback
429+
430+
logger.critical(
431+
f"Parallel task {name} failed: {traceback.format_exc()}"
432+
)
429433

430434
if "Pressure" in results_map and results_map["Pressure"]:
431435
metrics.pressure_metrics(*results_map["Pressure"])

transfers/waterlevels_transducer_transfer.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ def _install_ts(value):
154154
insert(TransducerObservation),
155155
filtered_observations,
156156
)
157-
session.add(block)
157+
block = self._get_or_create_block(session, block)
158158
logger.info(
159159
f"Added {len(observations)} water levels {release_status} block"
160160
)
@@ -250,6 +250,33 @@ def _build_itertuples_field_map(df: pd.DataFrame) -> dict[str, str]:
250250
mapping[col] = field
251251
return mapping
252252

253+
def _get_or_create_block(
254+
self, session: Session, block: TransducerObservationBlock
255+
) -> TransducerObservationBlock:
256+
existing = (
257+
session.query(TransducerObservationBlock)
258+
.filter(
259+
TransducerObservationBlock.thing_id == block.thing_id,
260+
TransducerObservationBlock.parameter_id == block.parameter_id,
261+
TransducerObservationBlock.review_status == block.review_status,
262+
TransducerObservationBlock.start_datetime == block.start_datetime,
263+
TransducerObservationBlock.end_datetime == block.end_datetime,
264+
)
265+
.one_or_none()
266+
)
267+
if existing:
268+
existing.comment = block.comment or existing.comment
269+
existing.release_status = block.release_status or existing.release_status
270+
existing.reviewer_id = block.reviewer_id or existing.reviewer_id
271+
existing.created_by_name = block.created_by_name or existing.created_by_name
272+
existing.created_by_id = block.created_by_id or existing.created_by_id
273+
existing.updated_by_name = block.updated_by_name or existing.updated_by_name
274+
existing.updated_by_id = block.updated_by_id or existing.updated_by_id
275+
return existing
276+
277+
session.add(block)
278+
return block
279+
253280

254281
class WaterLevelsContinuousPressureTransferer(WaterLevelsContinuousTransferer):
255282
source_table = "WaterLevelsContinuous_Pressure"
@@ -328,7 +355,12 @@ def _legacy_payload(self, row: pd.Series) -> dict:
328355

329356

330357
def _find_deployment(ts, deployments):
331-
date = ts.date()
358+
if isinstance(ts, Timestamp):
359+
date = ts.date()
360+
elif hasattr(ts, "date"):
361+
date = ts.date()
362+
else:
363+
date = pd.Timestamp(ts).date()
332364
for d in deployments:
333365
if d.installation_date > date:
334366
break # because sorted by start

0 commit comments

Comments
 (0)