-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathPiHole.py
More file actions
145 lines (121 loc) · 5.6 KB
/
PiHole.py
File metadata and controls
145 lines (121 loc) · 5.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
"""Module Interacting with the PiHole API"""
import requests
from loguru import logger as log
class PiHole:
"""Class providing all the functionality in this module"""
def __init__(self, ip_address: str, api_token: str, protocol: str = "https"):
self.ip_address = ip_address
self.api_token = api_token
self.protocol = protocol
self.sid_token = None
self.csrf_token = None
self.headers = {}
self.authenticate()
log.debug("PiHole API Class initialized")
def authenticate(self):
if not self.protocol or not self.ip_address:
log.trace("No protocol or IP address provided. Cannot authenticate.")
return
"""
Authenticate and store the API token.
"""
url = f"{self.protocol}://{self.ip_address}/api/auth"
payload = {"password": self.api_token}
try:
resp = requests.post(url, json=payload, verify=False, timeout=2.5)
log.debug(f"Authentication Response: {resp.status_code}, Body: {resp.text}")
if resp.status_code == 200:
session = resp.json().get("session", {})
self.sid_token = session.get("sid")
self.csrf_token = session.get("csrf")
if self.sid_token:
self.headers = {
"X-FTL-SID": f"{self.sid_token}",
"X-FTL-CSRF": f"{self.csrf_token}",
}
log.info("Authentication with PiHole Server successful.")
else:
log.error("Authentication with PiHole Server failed: No valid session token.")
else:
log.error("Authentication with PiHole Server failed.")
except requests.RequestException as e:
log.error(f"Authentication error: {e}")
def _make_request(self, method: str, endpoint: str, data: dict = None):
"""
Helper function to send requests and handle authentication.
:param method: "GET" or "POST"
:param endpoint: API endpoint (e.g., "/api/dns/blocking")
:param data: Optional JSON payload for POST requests
:return: JSON response or None
"""
if not self.ip_address or not self.api_token:
log.trace("No IP address or API token provided. Cannot make request.")
return None
if not self.sid_token:
log.error("[Request Error] No valid authentication token. Re-authenticating...")
self.authenticate()
if not self.sid_token:
log.error("[Request Error] Failed to authenticate.")
return None
url = f"{self.protocol}://{self.ip_address}{endpoint}"
try:
resp = requests.request(method, url, headers=self.headers, json=data, verify=False, timeout=2.5)
log.debug(f"API Request using {method} at {endpoint} with Response: {resp.status_code}, Body: {resp.text}")
if resp.status_code == 200:
return resp.json()
if resp.status_code == 403:
log.error("[Request ERROR] Authentication token expired or invalid. Re-authenticating...")
self.authenticate()
return self._make_request(method, endpoint, data) # Retry after re-authentication
log.error(f"[Request Error] Failed request. Status code: {resp.status_code}")
except requests.RequestException as e:
log.error(f"[Request Error] Request failed with error: {e}")
return None
def get_summary(self):
"""
Fetch Pi-hole statistics summary.
"""
return self._make_request("GET", "/api/stats/summary")
def get_enabled(self) -> bool:
"""
Get the current blocking status of Pi-hole.
Returns True if blocking is enabled, False if disabled.
"""
response = self._make_request("GET", "/api/dns/blocking")
if not response:
return None
blocking_status = response.get("blocking")
if blocking_status == "enabled":
log.info("Blocking is enabled.")
return True
if blocking_status == "disabled":
log.info("Blocking is disabled.")
return False
log.error("[get_enabled() Error] Unexpected value for blocking status.")
return None
def disable(self, time: int) -> bool:
"""
Disable Pi-hole blocking for a given time (0 = infinite).
:param time: Time in seconds (0 = infinite)
:return: True if successful, False otherwise
"""
data = {"blocking": False, "timer": time} # JSON payload
response = self._make_request("POST", "/api/dns/blocking", data)
if response and response.get("blocking") == "disabled":
log.info("[disable({time})] Succesfully disabled blocking.", time=time)
return True
log.error("[disable({time})] Failed to disable blocking.", time=time)
return False
def enable(self, time: int) -> bool:
"""
Enable Pi-hole blocking for a given time (0 = infinite).
:param time: Time in seconds (0 = infinite)
:return: True if successful, False otherwise
"""
data = {"blocking": True, "timer": time} # JSON payload
response = self._make_request("POST", "/api/dns/blocking", data)
if response and response.get("blocking") == "enabled":
log.info("[enable({time})] Succesfully enabled blocking.", time=time)
return True
log.error("[enable({time})] Failed to enable blocking.", time=time)
return False