Категории

mqtt_to_mysql.py — скрипт сервис экспорта данных в mysql базы iot системы

23.11.2025 18:32 | коды из категории: IOT умный дом

Loaded: loaded (/etc/systemd/system/mqtt-to-mysql.service;

Этот скрипт на Python служит мостом между MQTT-брокером и базой данных MySQL в системе Интернета вещей (IoT). Он автоматически получает телеметрию с датчиков и устройств (например, через Zigbee2MQTT), обрабатывает её и сохраняет в две таблицы MySQL: основную и историческую.

Основные функции

Особенности обработки данных

Скрипт различает два типа данных:

  1. Управляющие команды (параметр state) — такие данные сохраняются с суффиксом /status и не попадают в историю, так как это не телеметрия, а фактическое состояние устройства (например, вкл/выкл света).
  2. Телеметрия — все остальные параметры (температура, влажность, мощность и т.п.) обрабатываются по ключам и сохраняются как есть. Булевы значения (true/false) преобразуются в 1/0 для удобства хранения.

Структура базы данных (рекомендуемая)

Для работы скрипта нужны две таблицы:

Пример структуры таблицы sensor_history:

CREATE TABLE sensor_history (
    id INT AUTO_INCREMENT PRIMARY KEY,
    topic VARCHAR(255) NOT NULL,
    value FLOAT NOT NULL,
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

Настройки скрипта

Вы можете легко адаптировать его под свою систему:

Зачем это нужно?

В IoT-системах важно не только знать текущее состояние устройств, но и анализировать динамику: нагревается ли комната ночью, как меняется влажность, как часто срабатывает датчик движения. Этот скрипт автоматически собирает такие данные без участия веб-сервера или дополнительной логики, обеспечивая «тихий» сбор информации в фоне.

Такой подход идеален для локальных систем умного дома на базе Raspberry Pi, ESP32, Zigbee2MQTT и похожих решений.

import paho.mqtt.client as mqtt
import pymysql.cursors
import json
import logging

# Настройка логирования
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("/home/user/mqtt_logger.log"),
        logging.StreamHandler()
    ]
)

# Подключение к MySQL
try:
    connection = pymysql.connect(
        host='localhost',
        user='iot_user',
        password='123456',
        database='iot_db',
        charset='utf8mb4',
        cursorclass=pymysql.cursors.DictCursor
    )
    logging.info("✅ Подключено к MySQL")
except Exception as e:
    logging.error(f"❌ Ошибка подключения к MySQL: {e}")
    exit(1)

# === Топики, для которых нужно хранить историю (анализ тенденций) ===
TOPICS_FOR_HISTORY = [
    'zigbee2mqtt/0x00124b003548b510/temperature',   # 👈 Твой датчик "kitchen"
    'zigbee2mqtt/bedroom_sensor/temperature',       # 👈 Твой датчик "bedroom"
'zigbee2mqtt/0x00124b003548b510/humidity',
 'zigbee2mqtt/bedroom_sensor/humidity',
]

def on_connect(client, userdata, flags, reason_code, properties):
    logging.info("✅ MQTT: Подключено")
    client.subscribe("zigbee2mqtt/#")

def on_message(client, userdata, msg):
    try:
        topic = msg.topic
        payload = msg.payload.decode()
        logging.info(f"📥 Получено: {topic} = {payload}")

        # Парсим JSON
        data = json.loads(payload)
        if isinstance(data, dict):
            # === 1. Обрабатываем state → сохраняем как .../status (для управления в панели) ===
            if 'state' in data:
                clean_topic = topic.replace('/set', '')
                status_topic = f"{clean_topic}/status"  # <-- ИСПРАВЛЕНО: используем clean_topic
                save_to_sensor_data(status_topic, data['state'])
                # НЕ добавляем в историю — управляемые устройства не анализируем на тренды

            # === 2. Обрабатываем остальные параметры (power, voltage, current и т.д.) ===
            for key in ['temperature', 'humidity', 'battery', 'voltage', 'pressure', 'co2', 'pm25', 'power', 'current', 'contact', 'vibration', 'illuminance', 'presence', 'linkquality']:
                if key in data:  # ← 4 пробела после for
                    full_topic = f"{topic}/{key}"
                    value_to_save = 1 if data[key] is True else 0 if data[key] is False else data[key]
                    save_to_sensor_data(full_topic, value_to_save)
                    if full_topic in TOPICS_FOR_HISTORY:
                        save_to_history(full_topic, float(value_to_save))

    except json.JSONDecodeError:
        logging.warning(f"⚠️ Не JSON: {payload}")
    except Exception as e:
        logging.error(f"❌ Ошибка обработки сообщения: {e}")

def save_to_sensor_data(topic, value):
    """Запись в основную таблицу sensor_data"""
    try:
        with connection.cursor() as cursor:
            sql = "INSERT INTO sensor_data (topic, value) VALUES (%s, %s)"
            cursor.execute(sql, (topic, str(value)))
        connection.commit()
        logging.info(f"💾 Записано в БД: {topic} = {value}")
    except Exception as e:
        logging.error(f"❌ Ошибка записи в sensor_data: {e}")

def save_to_history(topic, value):
    """Запись в историю и очистка старых записей (макс. 20)"""
    try:
        with connection.cursor() as cursor:
            # Вставляем новое значение
            sql_insert = "INSERT INTO sensor_history (topic, value) VALUES (%s, %s)"
            cursor.execute(sql_insert, (topic, float(value)))

            # Удаляем старые записи, оставляя только последние 20
            sql_delete = """
                DELETE FROM sensor_history 
                WHERE topic = %s AND id NOT IN (
                    SELECT id FROM (
                        SELECT id FROM sensor_history 
                        WHERE topic = %s 
                        ORDER BY timestamp DESC LIMIT 144
                    ) AS temp
                )
            """
            cursor.execute(sql_delete, (topic, topic))

        connection.commit()
        logging.info(f"📊 История обновлена: {topic} = {value} (только последние 20)")

    except Exception as e:
        logging.error(f"❌ Ошибка записи в sensor_history: {e}")

# Настройка MQTT клиента
client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
client.username_pw_set("iot_user", "123456")
client.on_connect = on_connect
client.on_message = on_message

try:
    client.connect("localhost", 1883, 60)
    logging.info("📡 Подключено к MQTT-брокеру. Ожидаю данные...")
    client.loop_forever()
except Exception as e:
    logging.error(f"❌ Ошибка подключения к MQTT: {e}")

Комментарии

Пока нет комментариев. Будьте первым!

Оставить комментарий

← Назад к списку

Важно: Блог-эксперимент

Блог только запустил, все статьи генерирую через нейросеть т.к. лень, возможны ошибки. Просто чтобы вы знали и не запускали ядерный реактор по моим статьям ))
Если у вас есть вопросы, или Нашли неточность? пишите в коментах — вместе поправим и сделаем статью более качественной. Я лично объясню нюансы из практики.

Посетителей сегодня: 0


кто я | книга | контакты без контактов

© Digital Specialist | Не являемся сотрудниками Google, Яндекса и NASA