Этот скрипт на Python служит мостом между MQTT-брокером и базой данных MySQL в системе Интернета вещей (IoT). Он автоматически получает телеметрию с датчиков и устройств (например, через Zigbee2MQTT), обрабатывает её и сохраняет в две таблицы MySQL: основную и историческую.
zigbee2mqtt/#.sensor_data для отображения в веб-панели или другой логике.sensor_history.sensor_history остаётся не более 144 последних записей на топик (можно настроить), чтобы не перегружать БД./home/mazzick/mqtt_logger.log.Скрипт различает два типа данных:
state) — такие данные сохраняются с суффиксом /status и не попадают в историю, так как это не телеметрия, а фактическое состояние устройства (например, вкл/выкл света).true/false) преобразуются в 1/0 для удобства хранения.Для работы скрипта нужны две таблицы:
sensor_data — хранит последние актуальные значения всех топиков.sensor_history — хранит ограниченную историю только для важных датчиков.Пример структуры таблицы sensor_history:
CREATE TABLE sensor_history (
id INT AUTO_INCREMENT PRIMARY KEY,
topic VARCHAR(255) NOT NULL,
value FLOAT NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Вы можете легко адаптировать его под свою систему:
TOPICS_FOR_HISTORY — только для них будет вестись история.В IoT-системах важно не только знать текущее состояние устройств, но и анализировать динамику: нагревается ли комната ночью, как меняется влажность, как часто срабатывает датчик движения. Этот скрипт автоматически собирает такие данные без участия веб-сервера или дополнительной логики, обеспечивая «тихий» сбор информации в фоне.
Такой подход идеален для локальных систем умного дома на базе Raspberry Pi, ESP32, Zigbee2MQTT и похожих решений.
import paho.mqtt.client as mqtt
import pymysql.cursors
import json
import logging
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("/home/user/mqtt_logger.log"),
logging.StreamHandler()
]
)
# Подключение к MySQL
try:
connection = pymysql.connect(
host='localhost',
user='iot_user',
password='123456',
database='iot_db',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
logging.info("✅ Подключено к MySQL")
except Exception as e:
logging.error(f"❌ Ошибка подключения к MySQL: {e}")
exit(1)
# === Топики, для которых нужно хранить историю (анализ тенденций) ===
TOPICS_FOR_HISTORY = [
'zigbee2mqtt/0x00124b003548b510/temperature', # 👈 Твой датчик "kitchen"
'zigbee2mqtt/bedroom_sensor/temperature', # 👈 Твой датчик "bedroom"
'zigbee2mqtt/0x00124b003548b510/humidity',
'zigbee2mqtt/bedroom_sensor/humidity',
]
def on_connect(client, userdata, flags, reason_code, properties):
logging.info("✅ MQTT: Подключено")
client.subscribe("zigbee2mqtt/#")
def on_message(client, userdata, msg):
try:
topic = msg.topic
payload = msg.payload.decode()
logging.info(f"📥 Получено: {topic} = {payload}")
# Парсим JSON
data = json.loads(payload)
if isinstance(data, dict):
# === 1. Обрабатываем state → сохраняем как .../status (для управления в панели) ===
if 'state' in data:
clean_topic = topic.replace('/set', '')
status_topic = f"{clean_topic}/status" # <-- ИСПРАВЛЕНО: используем clean_topic
save_to_sensor_data(status_topic, data['state'])
# НЕ добавляем в историю — управляемые устройства не анализируем на тренды
# === 2. Обрабатываем остальные параметры (power, voltage, current и т.д.) ===
for key in ['temperature', 'humidity', 'battery', 'voltage', 'pressure', 'co2', 'pm25', 'power', 'current', 'contact', 'vibration', 'illuminance', 'presence', 'linkquality']:
if key in data: # ← 4 пробела после for
full_topic = f"{topic}/{key}"
value_to_save = 1 if data[key] is True else 0 if data[key] is False else data[key]
save_to_sensor_data(full_topic, value_to_save)
if full_topic in TOPICS_FOR_HISTORY:
save_to_history(full_topic, float(value_to_save))
except json.JSONDecodeError:
logging.warning(f"⚠️ Не JSON: {payload}")
except Exception as e:
logging.error(f"❌ Ошибка обработки сообщения: {e}")
def save_to_sensor_data(topic, value):
"""Запись в основную таблицу sensor_data"""
try:
with connection.cursor() as cursor:
sql = "INSERT INTO sensor_data (topic, value) VALUES (%s, %s)"
cursor.execute(sql, (topic, str(value)))
connection.commit()
logging.info(f"💾 Записано в БД: {topic} = {value}")
except Exception as e:
logging.error(f"❌ Ошибка записи в sensor_data: {e}")
def save_to_history(topic, value):
"""Запись в историю и очистка старых записей (макс. 20)"""
try:
with connection.cursor() as cursor:
# Вставляем новое значение
sql_insert = "INSERT INTO sensor_history (topic, value) VALUES (%s, %s)"
cursor.execute(sql_insert, (topic, float(value)))
# Удаляем старые записи, оставляя только последние 20
sql_delete = """
DELETE FROM sensor_history
WHERE topic = %s AND id NOT IN (
SELECT id FROM (
SELECT id FROM sensor_history
WHERE topic = %s
ORDER BY timestamp DESC LIMIT 144
) AS temp
)
"""
cursor.execute(sql_delete, (topic, topic))
connection.commit()
logging.info(f"📊 История обновлена: {topic} = {value} (только последние 20)")
except Exception as e:
logging.error(f"❌ Ошибка записи в sensor_history: {e}")
# Настройка MQTT клиента
client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
client.username_pw_set("iot_user", "123456")
client.on_connect = on_connect
client.on_message = on_message
try:
client.connect("localhost", 1883, 60)
logging.info("📡 Подключено к MQTT-брокеру. Ожидаю данные...")
client.loop_forever()
except Exception as e:
logging.error(f"❌ Ошибка подключения к MQTT: {e}")
Блог только запустил, все статьи генерирую через нейросеть т.к. лень, возможны ошибки. Просто чтобы вы знали и не запускали ядерный реактор по моим статьям ))
Если у вас есть вопросы, или Нашли неточность? пишите в коментах — вместе поправим и сделаем статью более качественной. Я лично объясню нюансы из практики.
Комментарии
Пока нет комментариев. Будьте первым!