-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcode.py
More file actions
111 lines (86 loc) · 3.01 KB
/
code.py
File metadata and controls
111 lines (86 loc) · 3.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# SPDX-FileCopyrightText: 2022 Daniel Griswold
#
# SPDX-License-Identifier: MIT
"""
Temperature Sensor with data published to MQTT.
"""
import time
import ssl
import json
import board
import socketpool
import wifi
import digitalio
import microcontroller
import adafruit_minimqtt.adafruit_minimqtt as MQTT
import adafruit_ahtx0
# pylint: disable=no-name-in-module,wrong-import-order
from secrets import secrets
UPDATE_INTERVAL = 60
MQTT_ENVIRONMENT = secrets["mqtt_topic"] + "/environment"
MQTT_SYSTEM = secrets["mqtt_topic"] + "/system"
def get_sensor_data():
""" Creates dictionary of sensor values. """
_data = {}
_data["temperature"] = aht20.temperature
_data["humidity"] = aht20.relative_humidity
return _data
def get_system_data():
"""Creates dictionary of system information"""
_data = {}
_data["reset_reason"] = str(microcontroller.cpu.reset_reason)[28:]
_data["time"] = time.monotonic()
_data["ip_address"] = wifi.radio.ipv4_address
_data["board_id"] = board.board_id
return _data
if board.id == "unexpectedmaker_feathers2":
ldo2 = digitalio.DigitalInOut(board.LDO2)
ldo2.switch_to_output(True)
if board.id == "unexpectedmaker_feathers3":
ldo2 = digitalio.DigitalInOut(board.LDO2)
ldo2.switch_to_output(True)
if board.id == "adafruit_feather_esp32s2":
i2c_power = digitalio.DigitalInOut(board.I2C_POWER_INVERTED)
i2c_power.switch_to_output(False)
if board.id == "adafruit_feather_esp32s2_tft":
i2c_power = digitalio.DigitalInOut(board.TFT_I2C_POWER)
i2c_power.switch_to_output(True)
aht20 = adafruit_ahtx0.AHTx0(board.I2C())
try:
print("Connecting to %s..." % secrets["ssid"])
wifi.radio.connect(secrets["ssid"], secrets["password"])
print("Connected to %s!" % secrets["ssid"])
pool = socketpool.SocketPool(wifi.radio)
except Exception as error:
print("Could not initialize network. {}".format(error))
raise
try:
mqtt_client = MQTT.MQTT(
broker=secrets["mqtt_broker"],
port=secrets["mqtt_port"],
username=secrets["mqtt_username"],
password=secrets["mqtt_password"],
socket_pool=pool,
ssl_context=ssl.create_default_context(),
)
mqtt_client.connect()
except MQTT.MMQTTException as error:
print("Could not connect to mqtt broker. {}".format(error))
raise
last_update = 0 # pylint: disable=invalid-name
while True:
_now = time.monotonic()
if last_update + UPDATE_INTERVAL < _now:
last_update = _now
print("Publishing environmental data")
try:
mqtt_client.publish(
MQTT_ENVIRONMENT, json.dumps(get_sensor_data()), retain=True
)
except MQTT.MMQTTException as error:
print("Could not publish to mqtt broker. {}".format(error))
print("Publishing system data")
try:
mqtt_client.publish(MQTT_SYSTEM, json.dumps(get_system_data()), retain=True)
except MQTT.MMQTTException as error:
print("Could not publish to mqtt broker. {}".format(error))