-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathsensor.py
More file actions
395 lines (349 loc) · 18.4 KB
/
sensor.py
File metadata and controls
395 lines (349 loc) · 18.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
"""Platform for sensor integration."""
from __future__ import annotations
import re
from datetime import timedelta, datetime, date
from functools import partial
from logging import getLogger
from typing import Any, Callable
from homeassistant.components.sensor import (
SensorEntity, RestoreSensor,
)
from homeassistant.const import CONF_NAME, CONF_ICON, CONF_ENTITY_ID, EVENT_STATE_CHANGED
from homeassistant.core import HomeAssistant, EventStateChangedData, callback
from homeassistant.exceptions import ServiceValidationError, HomeAssistantError
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity import generate_entity_id
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.event import async_call_later
from homeassistant.util import slugify
from homeassistant.util.dt import UTC
from .const import DOMAIN, CONF_TASK_INTERVAL_VALUE, CONF_NOTIFICATION_INTERVAL, CONF_TAGS, CONF_ACTIVE, \
CONST_DUE, CONST_DUE_SOON, CONST_INACTIVE, CONST_DONE, CONF_TASK_INTERVAL_TYPE, \
CONF_DUE_SOON_DAYS, CONF_TODO_LISTS, CONF_DAY, CONF_ACTIVE_OVERRIDE, CONF_TASK_INTERVAL_OVERRIDE, \
CONF_DUE_SOON_OVERRIDE, CONF_REPEAT_EVERY, \
CONF_REPEAT_EVERY_WEEKDAY, CONF_REPEAT_EVERY_DAY_OF_MONTH, \
CONF_REPEAT_EVERY_WEEKDAY_OF_MONTH, CONF_REPEAT_EVERY_DAYS_BEFORE_END_OF_MONTH
from .coordinator import TaskTrackerCoordinator
LOGGER = getLogger(__name__)
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up sensor platform from a config entry."""
coordinator: TaskTrackerCoordinator = hass.data[DOMAIN][entry.entry_id]
data = entry.data
options = entry.options
async_add_entities(
[TaskTrackerSensor(coordinator, data[CONF_NAME], options[CONF_TASK_INTERVAL_VALUE],
options[CONF_TASK_INTERVAL_TYPE],
options[CONF_NOTIFICATION_INTERVAL], options[CONF_TODO_LISTS],
options[CONF_DUE_SOON_DAYS], options[CONF_TAGS],
options[CONF_ACTIVE], options[CONF_ICON], entry.entry_id, hass,
options.get(CONF_ACTIVE_OVERRIDE),
options.get(CONF_TASK_INTERVAL_OVERRIDE),
options.get(CONF_DUE_SOON_OVERRIDE))])
class TaskTrackerSensor(RestoreSensor, SensorEntity):
"""Representation of a TaskTrackerSensor."""
_attr_has_entity_name = True
_attr_should_poll = False
_attr_translation_key = "status"
def __init__(self, coordinator: TaskTrackerCoordinator, entry_name: str,
task_interval_value: int, task_interval_type: str,
notification_interval: int, todo_lists: list[str], due_soon_days: int, tags: str, active: bool,
icon: str, entry_id: str, hass: HomeAssistant,
active_override: str | None = None,
task_interval_override: str | None = None,
due_soon_override: str | None = None) -> None:
"""Initialize the sensor with a service name."""
self.coordinator = coordinator
self.task_interval_value: int = task_interval_value
self.task_interval_type: str = task_interval_type
self.notification_interval: int = notification_interval
self.todo_lists: list[str] = todo_lists
self.due_soon_days: int = due_soon_days
self.entry_id = entry_id
self.entry_name = entry_name
tags_list = re.split(r'[;, ]+', tags)
self.tags: list[str] = [tag.strip() for tag in tags_list if tag]
self.active: bool = active
self.icon: str = icon
self.due_date: date = date(1970, 1, 1)
self.due_in: int = 0
self.mark_as_done_scheduled: Callable[[], None] | None = None
self.active_override: str | None = active_override
self.task_interval_override: str | None = task_interval_override
self.due_soon_override: str | None = due_soon_override
# Effective values after applying overrides; initialised to configured values
self._effective_active: bool = active
self._effective_due_soon_days: int = due_soon_days
device_id = f"{DOMAIN}_{self.entry_id}"
self._attr_name = None
self._attr_unique_id = f"{self.entry_id}_status"
self.entity_id = generate_entity_id("sensor.task_tracker_{}", slugify(entry_name), hass=hass)
self._attr_native_value = "due"
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, device_id)},
manufacturer="Gensyn",
model="Task Tracker",
name=entry_name,
)
# Register coordinator listener so state changes are reflected in HA.
# Registered here (not just in async_added_to_hass) so that tests which
# call async_mark_as_done / async_set_last_done_date without adding the
# entity to hass also trigger the update callback.
self.async_on_remove(
self.coordinator.async_add_listener(
lambda: self.async_schedule_update_ha_state(force_refresh=True)
)
)
async def async_added_to_hass(self) -> None:
"""Restore last known state on startup."""
await super().async_added_to_hass()
last_sensor_state = await self.async_get_last_sensor_data()
if last_sensor_state is not None:
self._attr_native_value = last_sensor_state.native_value
last_state = await self.async_get_last_state()
if last_state is not None:
last_done = last_state.attributes.get("last_done", "1970-01-01")
# Restore last_done into the coordinator — it is the single source of truth.
self.coordinator.last_done = datetime.strptime(last_done, "%Y-%m-%d").date()
self._attr_extra_state_attributes: dict[str, str | int | list] = {
"last_done": last_done,
}
# Subscribe to todo list state changes
self.async_on_remove(
self.hass.bus.async_listen(
EVENT_STATE_CHANGED,
self.async_todo_list_changed,
self._filter_state_changes
)
)
# Subscribe to override entity state changes
if any([self.active_override, self.task_interval_override, self.due_soon_override]):
@callback
def _async_override_state_changed(_event: Any) -> None:
self.async_schedule_update_ha_state(force_refresh=True)
self.async_on_remove(
self.hass.bus.async_listen(
EVENT_STATE_CHANGED,
_async_override_state_changed,
self._filter_override_changes,
)
)
await self.async_update()
def _resolve_active_override(self) -> bool:
"""Return the effective ``active`` value after applying any override entity."""
if self.active_override:
override_state = self.hass.states.get(self.active_override)
if override_state is not None and override_state.state not in ("unavailable", "unknown"):
return override_state.state == "on"
return self.active
def _resolve_task_interval_override(self) -> tuple[int, str]:
"""Return the effective (interval_value, interval_type) after applying any override entity.
When an override entity is active the interval type is always ``CONF_DAY``; the
override represents a number of days regardless of the configured interval type.
"""
if self.task_interval_override:
override_state = self.hass.states.get(self.task_interval_override)
if override_state is not None and override_state.state not in ("unavailable", "unknown"):
try:
return max(1, int(float(override_state.state))), CONF_DAY
except (ValueError, TypeError):
pass
return self.task_interval_value, self.task_interval_type
def _resolve_due_soon_override(self) -> int:
"""Return the effective due soon days after applying any override entity."""
if self.due_soon_override:
override_state = self.hass.states.get(self.due_soon_override)
if override_state is not None and override_state.state not in ("unavailable", "unknown"):
try:
return max(0, int(float(override_state.state)))
except (ValueError, TypeError):
pass
return self.due_soon_days
async def async_update(self) -> None:
"""Recalculate state, attributes, and sync all configured todo lists."""
self._attr_native_value = CONST_DONE
effective_active = self._resolve_active_override()
effective_task_interval_value, effective_task_interval_type = self._resolve_task_interval_override()
effective_due_soon_days = self._resolve_due_soon_override()
self.due_date = self.coordinator.calculate_due_date(effective_task_interval_value, effective_task_interval_type)
self.due_in: int = (self.due_date - date.today()).days if self.due_date > date.today() else 0
overdue_by: int = (date.today() - self.due_date).days if self.due_date < date.today() else 0
if not effective_active:
self._attr_native_value = CONST_INACTIVE
elif self.due_in == 0:
self._attr_native_value = CONST_DUE
elif self.due_in <= effective_due_soon_days:
self._attr_native_value = CONST_DUE_SOON
self._effective_active: bool = effective_active
self._effective_due_soon_days: int = effective_due_soon_days
self._attr_extra_state_attributes: dict[str, str | int | list] = {
"last_done": str(self.coordinator.last_done),
"due_date": str(self.due_date),
"due_in": self.due_in,
"overdue_by": overdue_by,
"repeat_mode": self.coordinator.repeat_mode,
"icon": self.icon,
"tags": self.tags,
"todo_lists": self.todo_lists,
"due_soon_days": effective_due_soon_days,
"notification_interval": self.notification_interval,
}
if self.coordinator.repeat_mode == CONF_REPEAT_EVERY:
repeat_every_type = self.coordinator.repeat_every_type
self._attr_extra_state_attributes["repeat_every_type"] = repeat_every_type
if repeat_every_type == CONF_REPEAT_EVERY_WEEKDAY:
self._attr_extra_state_attributes["repeat_weekday"] = self.coordinator.repeat_weekday
self._attr_extra_state_attributes["repeat_weeks_interval"] = self.coordinator.repeat_weeks_interval
elif repeat_every_type == CONF_REPEAT_EVERY_DAY_OF_MONTH:
self._attr_extra_state_attributes["repeat_month_day"] = self.coordinator.repeat_month_day
elif repeat_every_type == CONF_REPEAT_EVERY_WEEKDAY_OF_MONTH:
self._attr_extra_state_attributes["repeat_weekday"] = self.coordinator.repeat_weekday
self._attr_extra_state_attributes["repeat_nth_occurrence"] = self.coordinator.repeat_nth_occurrence
elif repeat_every_type == CONF_REPEAT_EVERY_DAYS_BEFORE_END_OF_MONTH:
self._attr_extra_state_attributes["repeat_days_before_end"] = self.coordinator.repeat_days_before_end
else:
self._attr_extra_state_attributes["task_interval_value"] = effective_task_interval_value
self._attr_extra_state_attributes["task_interval_type"] = effective_task_interval_type
for todo_list in self.todo_lists:
await self.async_sync_todo_list(todo_list)
@callback
def _filter_state_changes(self, event_data: EventStateChangedData) -> bool:
"""Listen only for events regarding todo list entities of our task.
Also filter out events where the number of open items did not decrease.
Todo list states are integer strings (e.g. "5"), so an explicit int()
conversion is required — pure string comparison gives wrong results for
multi-digit counts (e.g. ``"5" < "10"`` is ``False`` lexicographically).
"""
if event_data["entity_id"] not in self.todo_lists:
return False
old_state = event_data.get("old_state")
new_state = event_data.get("new_state")
if not old_state or not new_state:
return False
try:
return int(new_state.state) < int(old_state.state)
except (ValueError, TypeError):
return False
@callback
def _filter_override_changes(self, event_data: EventStateChangedData) -> bool:
"""Listen only for state changes in override entities."""
override_entities = {e for e in
[self.active_override, self.task_interval_override, self.due_soon_override] if e}
return event_data["entity_id"] in override_entities
async def async_todo_list_changed(self, event: Any) -> None:
"""Handle a todo list state-change event.
Cancels any previously scheduled deferred update, then waits 5 seconds
before acting — giving the user a chance to undo an accidental completion.
"""
if self.mark_as_done_scheduled:
self.mark_as_done_scheduled()
# allow 5 seconds for the user to change their mind - it might have been a misclick
self.mark_as_done_scheduled = async_call_later(
self.hass,
5,
partial(self.async_todo_list_changed_deferred, event)
)
async def async_todo_list_changed_deferred(self, event: Any, _) -> None:
"""Mark task as done if the todo item was completed within the last 5 minutes."""
todo_list = event.data[CONF_ENTITY_ID]
existing_item = await self.async_get_item_from_todo_list(todo_list)
if existing_item is None:
# the item does not exist, so no action is needed
return
completed_string = existing_item.get("completed")
if completed_string:
try:
completed = datetime.strptime(completed_string, "%Y-%m-%dT%H:%M:%S.%f%z")
except ValueError:
LOGGER.warning(
"Could not parse completed timestamp %r for task %s; skipping mark-as-done",
completed_string,
self.entry_name,
)
return
if datetime.now(UTC) - completed < timedelta(minutes=5):
# the item was marked as done, so we need to update our last_done date
await self.async_mark_as_done()
async def async_sync_todo_list(self, todo_list: str) -> None:
"""Add, update, or remove this task's item in *todo_list* as appropriate."""
existing_item: dict | None = await self.async_get_item_from_todo_list(todo_list)
if self._effective_active and self.due_in <= self._effective_due_soon_days:
# there is supposed to be an item in the todo list
if existing_item is None:
# The item does not exist, so we need to add it
await self.async_add_item_to_todo_list(todo_list)
return
# the item exists but the status and due date might be wrong
# even if the current status and due date are correct, it is easiest to just update the item
await self.async_update_item_in_todo_list(todo_list)
else:
# there is NOT supposed to be an item in the todo list
if existing_item is None:
# The item does not exist, so no action is needed
return
# The item exists, so we need to remove it
await self.async_remove_item_from_todo_list(todo_list)
async def async_add_item_to_todo_list(self, todo_list: str) -> None:
"""Add the item to the todo list."""
await self.async_call_service("add_item", {
CONF_ENTITY_ID: todo_list,
"item": self.entry_name,
"due_date": self.due_date
})
async def async_remove_item_from_todo_list(self, todo_list: str) -> None:
"""Remove the item from the todo list."""
await self.async_call_service("remove_item", {
CONF_ENTITY_ID: todo_list,
"item": self.entry_name,
})
async def async_update_item_in_todo_list(self, todo_list: str) -> None:
"""Update the item in the todo list."""
await self.async_call_service("update_item", {
CONF_ENTITY_ID: todo_list,
"item": self.entry_name,
"status": "needs_action",
"due_date": self.due_date,
})
async def async_get_item_from_todo_list(self, todo_list: str) -> dict | None:
"""Get the todo item from the todo list."""
response = await self.async_call_service("get_items", {
CONF_ENTITY_ID: todo_list,
"status": ["needs_action", "completed"],
}, blocking=True, response=True)
if not response:
return None
for item in response.get(todo_list, {}).get("items", []):
if item.get("summary") == self.entry_name:
return item
return None
async def async_call_service(self, service: str, service_data: dict[str, Any], blocking: bool = False,
response: bool = False) -> dict | None:
"""Call a todo-service."""
try:
return await self.hass.services.async_call(
domain="todo",
service=service,
service_data=service_data,
blocking=blocking,
return_response=response,
)
except (ServiceValidationError, HomeAssistantError) as err:
LOGGER.error("Failed to call service \"todo.%s\": %s", service, err)
raise
async def async_mark_as_done(self) -> None:
"""Mark the task as done for today.
For ``repeat_every`` tasks the action is skipped when the task is
inactive. The coordinator itself determines whether to use the most
recent past occurrence (DUE) or the next upcoming occurrence
(DUE_SOON / DONE) based on the computed due date.
"""
if (self.coordinator.repeat_mode == CONF_REPEAT_EVERY
and self._attr_native_value == CONST_INACTIVE):
return
await self.coordinator.async_mark_as_done()
async def async_set_last_done_date(self, new_date: date) -> None:
"""Set the last done date."""
await self.coordinator.async_set_last_done_date(new_date)