Категории

mqtt_to_mysql.py — скрипт сервис экспорта данных в mysql базы iot системы

23.11.2025 | коды из категории: IOT умный дом

обновлено 27.12.2025

Скрипт /home/mazzick/mqtt_to_mysql.py

Назначение: Перехват MQTT-сообщений от Zigbee2MQTT и сохранение их в базу данных MySQL/MariaDB для последующей обработки автоматизациями.

Основные функции

Технические детали

Зачем это нужно?

Без этого скрипта автоматизации (например, «включить свет при движении») не могут работать, потому что:

Важно!

При изменении логики обработки значений (особенно булевых) необходимо проверять синтаксис Python и правильность отступов — ошибка приведёт к падению systemd-сервиса.

import paho.mqtt.client as mqtt
import pymysql.cursors
import json
import logging

# Настройка логирования
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("/home/mazzick/mqtt_logger.log"),
        logging.StreamHandler()
    ]
)

# Подключение к MySQL
try:
    connection = pymysql.connect(
        host='localhost',
        user='iot_user',
        password='123456',
        database='iot_db',
        charset='utf8mb4',
        cursorclass=pymysql.cursors.DictCursor
    )
    logging.info("✅ Подключено к MySQL")
except Exception as e:
    logging.error(f"❌ Ошибка подключения к MySQL: {e}")
    exit(1)

# Топики для истории
TOPICS_FOR_HISTORY = [
    'zigbee2mqtt/gorshok_big/temperature',
    'zigbee2mqtt/bedroom_sensor/temperature',
    'zigbee2mqtt/gorshok_big/humidity',
    'zigbee2mqtt/bedroom_sensor/humidity',
    'zigbee2mqtt/fish_reley_temp/temperature',
]

def on_connect(client, userdata, flags, reason_code, properties):
    logging.info("✅ MQTT: Подключено")
    client.subscribe("zigbee2mqtt/#")

def on_message(client, userdata, msg):
    try:
        topic = msg.topic
        payload = msg.payload.decode()
        logging.info(f"📥 Получено: {topic} = {payload}")

        # Игнорируем служебные топики
        if topic.endswith("/battery") or topic.startswith("zigbee2mqtt/bridge/")                                                                       :
            return

        # Сохраняем ПОЛНЫЙ JSON как есть (важно для управления!)
        save_to_sensor_data(topic, payload)

        # Пытаемся распарсить
        data = json.loads(payload)
        if isinstance(data, dict):
            for key, value in data.items():
                if key in ['linkquality', 'countdown_l1', 'countdown_l2', 'count                                                                       down_l3',
                           'power_on_behavior_l1', 'last_seen', 'update', 'updat                                                                       e_available']:
                    continue

                full_topic = f"{topic}/{key}"

                # 🔧 Исправлено: true → '1', false → '0'
                if isinstance(value, bool):
                    value_to_save = "1" if value else "0"
                else:
                    value_to_save = value

                # Сохраняем отдельное поле
                save_to_sensor_data(full_topic, str(value_to_save))

                # История — только для указанных топиков
                if full_topic in TOPICS_FOR_HISTORY:
                    try:
                        save_to_history(full_topic, float(value_to_save))
                    except (ValueError, TypeError):
                        pass

    except json.JSONDecodeError:
        # Не JSON — сохраняем как есть (редко)
        save_to_sensor_data(topic, payload)
    except Exception as e:
        logging.error(f"❌ Ошибка обработки сообщения: {e}")

def save_to_sensor_data(topic, value):
    """Запись в sensor_data с заменой по топику (остаётся только последнее значе                                                                       ние)"""
    try:
        with connection.cursor() as cursor:
            # Используем INSERT ... ON DUPLICATE KEY UPDATE
            sql = """
                INSERT INTO sensor_data (topic, value, timestamp)
                VALUES (%s, %s, NOW())
                ON DUPLICATE KEY UPDATE
                    value = VALUES(value),
                    timestamp = VALUES(timestamp)
            """
            cursor.execute(sql, (topic, str(value)))
        connection.commit()
        logging.debug(f"💾 Обновлено: {topic} = {value}")
    except Exception as e:
        logging.error(f"❌ Ошибка записи в sensor_data: {e}")

def save_to_history(topic, value):
    try:
        with connection.cursor() as cursor:
            cursor.execute(
                "INSERT INTO sensor_history (topic, value, timestamp) VALUES (%s                                                                       , %s, NOW())",
                (topic, float(value))
            )
            # Оставить последние 144 (6 часов по 1 значению в 2.5 мин)
            cursor.execute("""
                DELETE h1 FROM sensor_history h1
                LEFT JOIN (
                    SELECT id FROM sensor_history
                    WHERE topic = %s
                    ORDER BY timestamp DESC
                    LIMIT 144
                ) h2 ON h1.id = h2.id
                WHERE h2.id IS NULL AND h1.topic = %s
            """, (topic, topic))
        connection.commit()
    except Exception as e:
        logging.error(f"❌ История: {e}")

# MQTT клиент
client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
client.username_pw_set("iot_user", "123456")
client.on_connect = on_connect
client.on_message = on_message

try:
    client.connect("localhost", 1883, 60)
    logging.info("📡 Подключено к MQTT. Ожидаю данные...")
    client.loop_forever()
except Exception as e:
    logging.error(f"❌ MQTT ошибка: {e}")

Комментарии

Пока нет комментариев. Будьте первым!

Оставить комментарий

← Назад к списку

Посетителей сегодня: 0


кто я | о блоге

© Digital Specialist | Не являемся сотрудниками Google, Яндекса и NASA