Skip to content

Commit c6ae994

Browse files
committed
Optimize transducer observation pagination
1 parent 7bf0032 commit c6ae994

4 files changed

Lines changed: 311 additions & 33 deletions

File tree

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
"""Add transducer observation deployment lookup index.
2+
3+
Revision ID: p9c0d1e2f3a4
4+
Revises: o8b9c0d1e2f3
5+
Create Date: 2026-03-19 11:05:00.000000
6+
"""
7+
8+
from alembic import op
9+
10+
# revision identifiers, used by Alembic.
11+
revision = "p9c0d1e2f3a4"
12+
down_revision = "o8b9c0d1e2f3"
13+
branch_labels = None
14+
depends_on = None
15+
16+
17+
def upgrade() -> None:
18+
op.create_index(
19+
"ix_transducer_observation_deployment_parameter_datetime",
20+
"transducer_observation",
21+
["deployment_id", "parameter_id", "observation_datetime"],
22+
unique=False,
23+
)
24+
25+
26+
def downgrade() -> None:
27+
op.drop_index(
28+
"ix_transducer_observation_deployment_parameter_datetime",
29+
table_name="transducer_observation",
30+
)

db/transducer.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,14 @@ class TransducerObservation(Base, AutoBaseMixin, ReleaseMixin):
107107
"""
108108

109109
__tablename__ = "transducer_observation"
110+
__table_args__ = (
111+
Index(
112+
"ix_transducer_observation_deployment_parameter_datetime",
113+
"deployment_id",
114+
"parameter_id",
115+
"observation_datetime",
116+
),
117+
)
110118

111119
parameter_id: Mapped[int] = mapped_column(
112120
ForeignKey("parameter.id", ondelete="CASCADE"), nullable=False, index=True

services/observation_helper.py

Lines changed: 103 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -52,34 +52,25 @@ def get_transducer_observations(
5252
order: str | None = None,
5353
filter_: str = Query(alias="filter", default=None),
5454
):
55+
deployment_rows: list[tuple[int, int]] = []
56+
deployment_to_thing: dict[int, int] = {}
57+
5558
if thing_id:
5659
item = session.get(Thing, thing_id)
5760
if item is None:
5861
empty_query = select(TransducerObservation).where(False)
5962
return paginate(query=empty_query, conn=session)
60-
61-
# Subquery to get latest block for each observation
62-
block_subq = (
63-
select(TransducerObservationBlock.id)
64-
.where(
65-
TransducerObservationBlock.parameter_id
66-
== TransducerObservation.parameter_id,
67-
TransducerObservationBlock.start_datetime
68-
<= TransducerObservation.observation_datetime,
69-
TransducerObservationBlock.end_datetime
70-
>= TransducerObservation.observation_datetime,
71-
)
72-
.order_by(desc(TransducerObservationBlock.start_datetime))
73-
.limit(1)
74-
.correlate(TransducerObservation)
75-
.scalar_subquery()
76-
)
77-
78-
query = (
79-
select(TransducerObservation, TransducerObservationBlock)
80-
.join(Deployment, TransducerObservation.deployment_id == Deployment.id)
81-
.join(TransducerObservationBlock, TransducerObservationBlock.id == block_subq)
82-
)
63+
deployment_rows = session.execute(
64+
select(Deployment.id, Deployment.thing_id).where(
65+
Deployment.thing_id == thing_id
66+
)
67+
).all()
68+
deployment_to_thing = {
69+
deployment_id: deployment_thing_id
70+
for deployment_id, deployment_thing_id in deployment_rows
71+
}
72+
73+
query = select(TransducerObservation)
8374

8475
if start_time:
8576
query = query.where(TransducerObservation.observation_datetime >= start_time)
@@ -89,23 +80,104 @@ def get_transducer_observations(
8980
if parameter_id:
9081
query = query.where(TransducerObservation.parameter_id == parameter_id)
9182
if thing_id:
92-
query = query.where(Deployment.thing_id == thing_id)
83+
deployment_ids = list(deployment_to_thing)
84+
if not deployment_ids:
85+
empty_query = select(TransducerObservation).where(False)
86+
return paginate(query=empty_query, conn=session)
87+
query = query.where(TransducerObservation.deployment_id.in_(deployment_ids))
9388

94-
def transformer(result):
89+
def transformer(observations):
9590
from schemas.transducer import (
9691
TransducerObservationWithBlockResponse,
9792
TransducerObservationResponse,
9893
TransducerObservationBlockResponse,
9994
)
10095

101-
return [
102-
TransducerObservationWithBlockResponse(
103-
observation=TransducerObservationResponse.model_validate(observation),
104-
block=TransducerObservationBlockResponse.model_validate(block),
105-
).model_dump()
106-
for observation, block in result
96+
if not observations:
97+
return []
98+
99+
deployment_ids = {observation.deployment_id for observation in observations}
100+
if not deployment_to_thing or not deployment_ids.issubset(deployment_to_thing):
101+
deployment_rows = session.execute(
102+
select(Deployment.id, Deployment.thing_id).where(
103+
Deployment.id.in_(deployment_ids)
104+
)
105+
).all()
106+
deployment_to_thing.update(
107+
{
108+
deployment_id: deployment_thing_id
109+
for deployment_id, deployment_thing_id in deployment_rows
110+
}
111+
)
112+
113+
thing_ids = {
114+
deployment_to_thing[observation.deployment_id]
115+
for observation in observations
116+
if observation.deployment_id in deployment_to_thing
117+
}
118+
parameter_ids = {observation.parameter_id for observation in observations}
119+
observation_datetimes = [
120+
observation.observation_datetime for observation in observations
107121
]
108122

123+
block_rows = session.scalars(
124+
select(TransducerObservationBlock)
125+
.where(
126+
TransducerObservationBlock.thing_id.in_(thing_ids),
127+
TransducerObservationBlock.parameter_id.in_(parameter_ids),
128+
TransducerObservationBlock.start_datetime <= max(observation_datetimes),
129+
TransducerObservationBlock.end_datetime >= min(observation_datetimes),
130+
)
131+
.order_by(
132+
TransducerObservationBlock.thing_id,
133+
TransducerObservationBlock.parameter_id,
134+
desc(TransducerObservationBlock.start_datetime),
135+
)
136+
).all()
137+
138+
block_map: dict[tuple[int, int], list[TransducerObservationBlock]] = {}
139+
for block in block_rows:
140+
key = (block.thing_id, block.parameter_id)
141+
if key not in block_map:
142+
block_map[key] = []
143+
block_map[key].append(block)
144+
145+
response_items = []
146+
for observation in observations:
147+
thing_id_for_observation = deployment_to_thing.get(
148+
observation.deployment_id
149+
)
150+
if thing_id_for_observation is None:
151+
continue
152+
153+
matching_block = next(
154+
(
155+
block
156+
for block in block_map.get(
157+
(thing_id_for_observation, observation.parameter_id), []
158+
)
159+
if block.start_datetime
160+
<= observation.observation_datetime
161+
<= block.end_datetime
162+
),
163+
None,
164+
)
165+
if matching_block is None:
166+
continue
167+
168+
response_items.append(
169+
TransducerObservationWithBlockResponse(
170+
observation=TransducerObservationResponse.model_validate(
171+
observation
172+
),
173+
block=TransducerObservationBlockResponse.model_validate(
174+
matching_block
175+
),
176+
).model_dump()
177+
)
178+
179+
return response_items
180+
109181
query = query.order_by(TransducerObservation.observation_datetime.desc())
110182

111183
return paginate(query=query, conn=session, transformer=transformer)

0 commit comments

Comments
 (0)