Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 81 additions & 9 deletions custom_components/network_scanner/__init__.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,90 @@
from .sensor import NetworkScanner
"""Network Scanner integration."""
from __future__ import annotations

import logging
from datetime import timedelta

from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed

from .const import DOMAIN
from .scanner import NetworkScannerClient

_LOGGER = logging.getLogger(__name__)

PLATFORMS = ["sensor"]
SCAN_INTERVAL = timedelta(minutes=15)

async def async_setup(hass, config):

async def async_setup(hass: HomeAssistant, config: dict) -> bool:
"""Set up the Network Scanner component."""
# Store YAML configuration in hass.data
hass.data[DOMAIN] = config.get(DOMAIN, {})
hass.data.setdefault(DOMAIN, {})
return True

async def async_setup_entry(hass, config_entry):

async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
"""Set up Network Scanner from a config entry."""
await hass.config_entries.async_forward_entry_setups(config_entry, ["sensor"])
ip_range = config_entry.data.get("ip_range")

# Collect all mac_mapping_* entries from config
mac_mappings_list = []
for i in range(25):
key = f"mac_mapping_{i+1}"
mac_mappings_list.append(config_entry.data.get(key, ""))

i = 25
while True:
key = f"mac_mapping_{i+1}"
if key in config_entry.data:
mac_mappings_list.append(config_entry.data.get(key))
i += 1
else:
break

mac_mappings = "\n".join(mac_mappings_list)

# Build the blocking scanner client in the executor — its constructor
# calls `nmap --version` synchronously, which must not run on the event loop.
client = await hass.async_add_executor_job(
NetworkScannerClient, ip_range, mac_mappings
)

async def _async_update_data():
"""Run the blocking nmap scan off the event loop."""
try:
return await hass.async_add_executor_job(client.scan)
except Exception as err:
raise UpdateFailed(f"Network scan failed: {err}") from err

coordinator: DataUpdateCoordinator = DataUpdateCoordinator(
hass,
_LOGGER,
name=f"{DOMAIN}_{ip_range}",
update_interval=SCAN_INTERVAL,
update_method=_async_update_data,
)

# IMPORTANT: do NOT await the first refresh here, and do NOT attach the
# background task to the config entry. Either of those will make HA
# consider setup as still running until the scan finishes, which triggers
# "Setup of sensor platform network_scanner is taking over 10 seconds".
# We fire-and-forget on hass instead, so setup returns immediately.
hass.data[DOMAIN][config_entry.entry_id] = coordinator

await hass.config_entries.async_forward_entry_setups(config_entry, PLATFORMS)

hass.async_create_background_task(
coordinator.async_refresh(),
name=f"{DOMAIN}_initial_refresh",
)

return True

async def async_unload_entry(hass, config_entry):

async def async_unload_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
"""Unload a config entry."""
await hass.config_entries.async_forward_entry_unload(config_entry, "sensor")
return True
unload_ok = await hass.config_entries.async_unload_platforms(config_entry, PLATFORMS)
if unload_ok:
hass.data[DOMAIN].pop(config_entry.entry_id, None)
return unload_ok
2 changes: 1 addition & 1 deletion custom_components/network_scanner/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@
"iot_class": "local_polling",
"issue_tracker": "https://github.com/parvez/network_scanner/issues",
"requirements": ["python-nmap"],
"version": "1.0.7"
"version": "1.2.1"
}
135 changes: 135 additions & 0 deletions custom_components/network_scanner/scanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
"""Pure nmap scanning logic, isolated from Home Assistant internals.

This module must not import anything from homeassistant, and must not touch
the event loop. All calls into it run inside the executor via the coordinator.
"""
from __future__ import annotations

import logging
import re
import socket
from typing import Any

import nmap

_LOGGER = logging.getLogger(__name__)

# nmap args:
# -sn : ping scan only, no port scan
# -n : skip DNS (we do fast reverse-DNS ourselves, only for live IPs)
# -T4 : aggressive timing template
# --min-parallelism : probe many hosts at once
# --max-retries 1 : don't linger on unresponsive hosts
# --host-timeout 3s : give up on any single host after 3s
NMAP_ARGS = "-sn -n -T4 --min-parallelism 64 --max-retries 1 --host-timeout 3s"


class NetworkScannerClient:
"""Blocking nmap client. One instance per config entry."""

def __init__(self, ip_range: str, mac_mapping: str) -> None:
self.ip_range = ip_range
self.mac_mapping = self._parse_mac_mapping(mac_mapping)
self.nm = nmap.PortScanner()
_LOGGER.info("Network Scanner client initialized for %s", ip_range)

# ---------------------- helpers ----------------------
@staticmethod
def _parse_mac_mapping(mapping_string: str) -> dict[str, tuple[str, str]]:
mapping: dict[str, tuple[str, str]] = {}
for line in mapping_string.split("\n"):
parts = line.split(";")
if len(parts) >= 3:
mapping[parts[0].lower()] = (parts[1], parts[2])
return mapping

@staticmethod
def _short_label(name: str) -> str | None:
"""Return lowercase host label before first dot, cleaned, or None."""
if not name:
return None
short = name.strip().rstrip(".").split(".", 1)[0].lower()
short = re.sub(r"[^a-z0-9_-]", "", short)
return short or None

@staticmethod
def _fast_rdns(ip: str, timeout: float = 0.3) -> str | None:
"""Reverse-DNS with a very short timeout; returns cleaned short label or None."""
old_to = socket.getdefaulttimeout()
socket.setdefaulttimeout(timeout)
try:
host, _, _ = socket.gethostbyaddr(ip)
return NetworkScannerClient._short_label(host)
except Exception:
return None
finally:
socket.setdefaulttimeout(old_to)

def _get_device_info_from_mac(self, mac_address: str) -> tuple[str, str]:
return self.mac_mapping.get(
mac_address.lower(), ("Unknown Device", "Unknown Device")
)

# ---------------------- main entry point ----------------------
def scan(self) -> list[dict[str, Any]]:
"""Run an nmap ping scan and return the list of discovered devices.

Blocking. Must be called from the executor, never from the event loop.
"""
try:
self.nm.scan(hosts=self.ip_range, arguments=NMAP_ARGS)
except Exception as err:
_LOGGER.error("nmap scan failed with args '%s': %s", NMAP_ARGS, err)
raise

devices: list[dict[str, Any]] = []

for host in self.nm.all_hosts():
try:
addrs = self.nm[host].get("addresses", {})
if "mac" not in addrs or "ipv4" not in addrs:
continue

ip = addrs["ipv4"]
mac = addrs["mac"]

# Vendor from nmap's OUI db, if known
vendor = "Unknown"
vendor_map = self.nm[host].get("vendor", {})
if mac in vendor_map:
vendor = vendor_map[mac]

# Hostname: try nmap result first (usually empty because -n)
raw_hostname = self.nm[host].hostname() or ""
if not raw_hostname:
for h in self.nm[host].get("hostnames", []):
n = h.get("name")
if n:
raw_hostname = n
break

hostname = self._short_label(raw_hostname)
if not hostname:
hostname = self._fast_rdns(ip, timeout=0.3)

device_name, device_type = self._get_device_info_from_mac(mac)
devices.append(
{
"ip": ip,
"mac": mac,
"name": device_name,
"type": device_type,
"vendor": vendor,
"hostname": hostname,
}
)
except Exception as err:
_LOGGER.debug("Error parsing host %s: %s", host, err)
continue

try:
devices.sort(key=lambda x: [int(num) for num in x["ip"].split(".")])
except Exception:
pass

return devices
Loading