Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
ec09b77
kwarg typing
jsl12 Jan 13, 2026
1d0d9b2
WIP pin/threads
jsl12 Jan 13, 2026
1c782aa
adjustments for fully async
jsl12 Jan 13, 2026
53a07d8
thread name fix
jsl12 Jan 13, 2026
3f0f518
removed thread creation from app_run_context
jsl12 Jan 13, 2026
b20c7d7
fixed mutable defaults
jsl12 Jan 13, 2026
5b19a2a
added total threads to test
jsl12 Jan 13, 2026
e0374cf
removed total_seconds
jsl12 Jan 13, 2026
e40add3
altered log line
jsl12 Jan 13, 2026
ea04bb5
added configured_appdaemon fixture
jsl12 Jan 17, 2026
c51c7e2
separated pin_thread_counts for the default case
jsl12 Jan 17, 2026
f2cf2db
error handling
jsl12 Jan 17, 2026
321e464
added pin_thread test and app
jsl12 Jan 19, 2026
a04fdcd
type fixes in app management
jsl12 Jan 19, 2026
5c7005d
added test_new_app_pins
jsl12 Jan 20, 2026
c761471
run_every tests using new test fixtures
jsl12 Jan 21, 2026
1c43a23
test_event updated to use new configured_appdaemon fixture
jsl12 Jan 21, 2026
e9480ec
fixed pinned_app_count
jsl12 Jan 22, 2026
8ca237f
test_state updates
jsl12 Jan 22, 2026
cc4d763
remaining test updates
jsl12 Jan 24, 2026
d1f5476
namespace test updates
jsl12 Jan 25, 2026
537b0e2
run_every updates
jsl12 Feb 2, 2026
e18948e
updated production mode tests
jsl12 Feb 2, 2026
c2e7fef
try/finally around the yield in configured_appdaemon
jsl12 Feb 2, 2026
9d691a8
moved
jsl12 Feb 2, 2026
432576f
fixed path
jsl12 Feb 16, 2026
b6a9539
Expanded internals docs
jsl12 Feb 17, 2026
dae7c48
Merge branch 'uv' into pin-thread
jsl12 Feb 17, 2026
4c0744f
Merge branch 'dev' into pin-thread
jsl12 Feb 17, 2026
38ed5fb
history
jsl12 Feb 17, 2026
07d41d5
moved import
jsl12 Feb 17, 2026
48837d8
fixed line break
jsl12 Feb 17, 2026
a7569b1
Merge remote-tracking branch 'origin/dev' into pin-thread
jsl12 Feb 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
352 changes: 208 additions & 144 deletions appdaemon/app_management.py

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion appdaemon/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async def add_event_callback(
oneshot: bool = False,
pin: bool | None = None,
pin_thread: int | None = None,
kwargs: dict[str, Any] = None, # Intentionally not expanding the kwargs here so that there are no name clashes
kwargs: dict[str, Any] | None = None, # Intentionally not expanding the kwargs here so that there are no name clashes
) -> str | list[str] | None:
"""Add an event callback to AppDaemon's internal dicts.

Expand All @@ -67,6 +67,9 @@ async def add_event_callback(
Returns:
``None`` or the reference to the callback handle.
"""
# Create the default kwargs dict
kwargs = {} if kwargs is None else kwargs

if oneshot: # this is still a little awkward, but it works until this can be refactored
# This needs to be in the kwargs dict here that gets passed around later, so that the dispatcher knows to
# cancel the callback after the first run.
Expand Down
23 changes: 22 additions & 1 deletion appdaemon/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,15 @@ class BadAppConfigFile(AppDaemonException):
path: Path


@dataclass
class AppConfigWriteFail(AppDaemonException):
app_name: str
path: Path

def __str__(self):
return f"Failed to write app '{self.app_name}' config to '{self.path}'"


@dataclass
class TimeOutException(AppDaemonException):
msg: str
Expand Down Expand Up @@ -440,7 +449,19 @@ class PinOutofRange(AppDaemonException):
total_threads: int

def __str__(self):
return f"Pin thread {self.pin_thread} out of range. Must be between 0 and {self.total_threads - 1}"
if self.total_threads == 0:
max_thread_id = self.total_threads - 1
return f"Pin thread {self.pin_thread} out of range. Must be between 0 and {max_thread_id}"
else:
return f"No pin threads are allowed in this configuration: {self.pin_thread}"


@dataclass
class NegativePinThread(AppDaemonException):
pin_thread: int

def __str__(self):
return f"Pin threads can't be negative: {self.pin_thread}"


@dataclass
Expand Down
5 changes: 2 additions & 3 deletions appdaemon/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
import traceback
import uuid
from collections import OrderedDict
from logging import LogRecord, Logger, StreamHandler
from logging import Logger, LogRecord, StreamHandler
from logging.handlers import RotatingFileHandler
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union


import appdaemon.utils as utils
from appdaemon.appdaemon import AppDaemon

Expand Down Expand Up @@ -184,7 +183,7 @@ class Logging(metaclass=utils.Singleton):
"""
name: str = "_logging"

config: Dict[str, Dict[str, Any]]
config: dict[str, dict[str, Any]]

log_levels = {
"CRITICAL": 50,
Expand Down
23 changes: 19 additions & 4 deletions appdaemon/models/config/app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import sys
from abc import ABC
from collections.abc import Iterable, Iterator
from collections.abc import Generator, Iterable, Iterator
from copy import deepcopy
from pathlib import Path
from typing import Annotated, Any, Literal
Expand Down Expand Up @@ -54,7 +54,7 @@ class AppConfig(BaseApp, extra="allow"):
class_name: str = Field(alias="class")
"""Name of the class to use for the app. Must be accessible as an attribute of the imported `module_name`
"""
pin_app: bool = True
pin_app: bool | None = None
"""Pin this app to a particular thread. This is used to ensure that the app is always run on the same thread."""
pin_thread: int | None = None
"""Which thread ID to pin this app to."""
Expand Down Expand Up @@ -200,10 +200,25 @@ def apps_from_file(self, paths: Iterable[Path]):
cfg.config_path in paths
) # fmt: skip

@property
def active_apps(self) -> Generator[tuple[str, AppConfig]]:
for app_name, cfg in self.root.items():
match cfg:
case AppConfig(disable=False):
yield app_name, cfg

def active_app_count(self) -> int:
"""Active in this case means not disabled"""
return len([cfg for cfg in self.root.values() if isinstance(cfg, AppConfig) and not cfg.disable])
return len(list(self.active_apps()))

def pinned_apps(self) -> Generator[tuple[str, AppConfig]]:
for app_name, cfg in self.active_apps():
match cfg:
case AppConfig(pin_app=pin) if bool(pin):
yield app_name, cfg

def pinned_app_count(self) -> int:
"""Active in this case means not disabled"""
return len(list(self.pinned_apps()))

def get_active_app_count(self) -> tuple[int, int, int]:
active = 0
Expand Down
18 changes: 12 additions & 6 deletions appdaemon/models/config/appdaemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class AppDaemonConfig(BaseModel, extra="allow"):
ascii_encode: bool = True
"""Set to false to disable ascii encoding of log messages. This is useful for non-English languages."""

load_distribution: str = "roundrobbin"
load_distribution: Literal["load", "random", "roundrobin"] = "roundrobin"
threads: (
Annotated[
int | None,
Expand All @@ -124,8 +124,13 @@ class AppDaemonConfig(BaseModel, extra="allow"):
"""If ``True``, AppDaemon apps will be each pinned to a particular thread. This avoids complications around
re-entrant code and locking of instance variables."""
pin_threads: int | None = None
"""Number of threads to use for pinned apps, allowing the user to section off a sub-pool just for pinned apps. By
default all threads are used for pinned apps."""
"""Number of threads to use for pinned apps.

AppDaemon will use the threads with ID 0 through (`pin_threads` - 1) for pinned apps. This allows the user to section off
a sub-pool just for pinned apps. By default all threads are used for pinned apps.

This value initially comes from the user configuration, but is sometimes later modified.
"""
thread_duration_warning_threshold: float = 10
threadpool_workers: int = 10
"""Number of threads in AppDaemon's internal thread pool, which can be used to execute functions asynchronously in
Expand Down Expand Up @@ -191,11 +196,12 @@ def model_post_init(self, __context: Any):

self.ext = ".toml" if self.write_toml else ".yaml"

if self.total_threads is not None:
self.pin_apps = False

if self.pin_threads is not None and self.total_threads is not None:
# assert self.total_threads is not None, "Using pin_threads requires total_threads to be set."
assert self.pin_threads <= self.total_threads, (
"Number of pin threads has to be less than or equal to total threads."
)

@property
def fully_async(self) -> bool:
return self.total_threads == 0
2 changes: 1 addition & 1 deletion appdaemon/models/internal/app_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ class UpdateActions:
class ManagedObject:
type: Literal["app", "plugin", "sequence"]
object: Any
pin_app: bool
id: str = field(default_factory=lambda: uuid.uuid4().hex)
module_path: Path | None = None
pin_app: bool | None = None
pin_thread: int | None = None
running: bool = False
callback_counter: int = 0
Expand Down
21 changes: 7 additions & 14 deletions appdaemon/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,31 +153,24 @@ async def insert_schedule(
callback: Callable | None,
repeat: bool = False,
type_: str | None = None,
interval: timedelta = timedelta(),
offset: timedelta = timedelta(),
interval: timedelta | None = None,
offset: timedelta | None = None,
random_start: timedelta | None = None,
random_end: timedelta | None = None,
pin: bool | None = None,
pin_thread: int | None = None,
**kwargs,
) -> str:
interval = interval if interval is not None else timedelta()
offset = offset if offset is not None else timedelta()

assert isinstance(aware_dt, datetime), "aware_dt must be a datetime object"
assert aware_dt.tzinfo is not None, "aware_dt must be timezone aware"
# aware_dt will include a timezone of some sort - convert to utc timezone
basetime = aware_dt.astimezone(pytz.utc)

if pin_thread is not None:
# If the pin_thread is specified, force pin_app to True
pin_app = True
else:
# Otherwise, use the current pin_app setting in app management
if pin is None:
pin_app = self.AD.app_management.objects[name].pin_app
else:
pin_app = pin

if pin_thread is None:
pin_thread = self.AD.app_management.objects[name].pin_thread
pin_app, pin_thread = self.AD.threading.determine_thread(name, pin, pin_thread)
self.logger.debug("App '%s' scheduled on pinned thread", name, pin_app, pin_thread)

# Ensure that there's a dict available for this app name
if name not in self.schedule:
Expand Down
4 changes: 2 additions & 2 deletions appdaemon/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ def register_service(
domain (str): Domain of the service
service (str): Name of the service
callback (Callable): Callback function to be called when the service is invoked
__silent (bool, optional): If True, do not send a registration event. Defaults to False.
__name (str | None, optional): Name of the app registering the service. Defaults to None.
silent (bool, optional): If True, do not send a registration event. Defaults to False (send the event).
name (str | None, optional): Name of the app registering the service. Defaults to None.
**kwargs: Additional keyword arguments to be passed to the callback function.
"""
self.logger.debug(
Expand Down
26 changes: 13 additions & 13 deletions appdaemon/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,8 @@ async def add_state_callback(
A string made from ``uuid4().hex`` that is used to identify the callback. This can be used to cancel the
callback later.
"""
if kwargs is None:
kwargs = {}
# Create the default kwargs dict
kwargs = {} if kwargs is None else kwargs

if oneshot: # this is still a little awkward, but it works until this can be refactored
# This needs to be in the kwargs dict here that gets passed around later, so that the dispatcher knows to
Expand Down Expand Up @@ -644,7 +644,7 @@ async def get_state(
attribute: str | None = None,
default: Any | None = None,
copy: bool = True,
):
) -> Any:
self.logger.debug("get_state: %s.%s %s %s", entity_id, attribute, default, copy)

def maybe_copy(data):
Expand Down Expand Up @@ -711,16 +711,15 @@ def parse_state(

return new_state

async def add_to_state(self, name: str, namespace: str, entity_id: str, i):
value = await self.get_state(name, namespace, entity_id)
if value is not None:
value += i
await self.set_state(name, namespace, entity_id, state=value)
async def add_to_state(self, name: str, namespace: str, entity_id: str, i: int = 1):
match (await self.get_state(name, namespace, entity_id)):
case (int() | float()) as value:
await self.set_state(name, namespace, entity_id, state=value + i)

async def add_to_attr(self, name: str, namespace: str, entity_id: str, attr, i):
state = await self.get_state(name, namespace, entity_id, attribute="all")
if state is not None:
state["attributes"][attr] = copy(state["attributes"][attr]) + i
state["attributes"][attr] = copy(state["attributes"].get(attr, 0)) + i
await self.set_state(name, namespace, entity_id, attributes=state["attributes"])

def register_state_services(self, namespace: str) -> None:
Expand All @@ -736,7 +735,7 @@ async def _state_service(
*,
entity_id: str | None = None,
persist: bool = False,
writeback: Literal["safe", "hybrid"] = "safe",
writeback: ADWritebackType = ADWritebackType.safe,
**kwargs: Any
) -> Any | None:
self.logger.debug("state_services: %s, %s, %s, %s", namespace, domain, service, kwargs)
Expand Down Expand Up @@ -879,12 +878,13 @@ def set_state_simple(self, namespace: str, entity_id: str, state: Any):

async def set_namespace_state(self, namespace: str, state: dict[str, Any], persist: bool = False):
if persist:
await self.add_persistent_namespace(namespace, writeback="safe")
await self.add_persistent_namespace(namespace, writeback=ADWritebackType.safe)
self.state[namespace].update(state)
else:
# first in case it had been created before, it should be deleted
if isinstance(self.state.get(namespace), utils.PersistentDict):
await self.remove_persistent_namespace(namespace, self.state[namespace])
match self.state.get(namespace):
case utils.PersistentDict() as ns:
await self.remove_persistent_namespace(namespace, ns)
self.state[namespace] = state

def update_namespace_state(self, namespace: str | list[str], state: dict):
Expand Down
Loading