Если вся автоматика — у тебя дома, зачем тащить MQTT, брокеры, Home Assistant и прочие «облака в миниатюре»? Это лишние точки отказа. Проще и надёжнее — писать данные сразу в MySQL и брать команды оттуда же.
Вот как собрать рабочую систему: Raspberry Pi → USB-to-RS485 (FT232RL+SN75176) → шина RS-485 → ESP32/Arduino с HW-097.
sudo apt update sudo apt install mariadb-server python3-pymysql sudo mysql_secure_installation
Создаём базу и таблицы:
sudo mysql -u root -p
CREATE DATABASE smart_home;
USE smart_home;
-- Таблица для данных с датчиков
CREATE TABLE sensor_data (
id INT AUTO_INCREMENT PRIMARY KEY,
device_id VARCHAR(20) NOT NULL,
temperature FLOAT,
humidity FLOAT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
);
-- Таблица команд для устройств
CREATE TABLE commands (
id INT AUTO_INCREMENT PRIMARY KEY,
device_id VARCHAR(20) NOT NULL,
command VARCHAR(100) NOT NULL,
status ENUM('pending', 'sent', 'error') DEFAULT 'pending',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
Скрипт делает две вещи:
sensor_datacommands — и отправляет их в шину
# smart_rs485.py
import serial
import pymysql
import time
import json
import re
# Настройки
SERIAL_PORT = '/dev/ttyUSB0'
BAUD_RATE = 9600
DB_CONFIG = {
'host': 'localhost',
'user': 'smartuser',
'password': 'strongpass',
'database': 'smart_home',
'charset': 'utf8mb4'
}
# Инициализация
ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=1)
def db_connect():
return pymysql.connect(**DB_CONFIG)
def send_to_device(device_id, cmd):
"""Отправляем команду в шину: формат: 'ID:CMD'"""
msg = f"{device_id}:{cmd}\n"
ser.write(msg.encode())
print(f"→ Отправлено: {msg.strip()}")
def read_from_device(timeout=0.5):
"""Ждём ответ до timeout, возвращаем строку"""
start = time.time()
buffer = b''
while time.time() - start < timeout:
if ser.in_waiting:
buffer += ser.read(ser.in_waiting)
else:
time.sleep(0.01)
return buffer.decode().strip()
def process_sensor_data(device_id, raw):
"""Парсим строку вида 'T23.5H45.2'"""
temp_match = re.search(r'T([+-]?\d+\.?\d*)', raw)
hum_match = re.search(r'H(\d+\.?\d*)', raw)
temp = float(temp_match.group(1)) if temp_match else None
hum = float(hum_match.group(1)) if hum_match else None
return temp, hum
def main():
while True:
try:
# === 1. Проверяем и отправляем команды ===
conn = db_connect()
with conn.cursor() as cursor:
cursor.execute(
"SELECT id, device_id, command FROM commands WHERE status = 'pending' LIMIT 1"
)
cmd_row = cursor.fetchone()
if cmd_row:
cmd_id, dev_id, cmd_text = cmd_row
send_to_device(dev_id, cmd_text)
# Ждём короткий ответ (можно упростить до "OK")
time.sleep(0.1)
response = read_from_device()
status = 'sent' if 'OK' in response else 'error'
cursor.execute(
"UPDATE commands SET status = %s WHERE id = %s",
(status, cmd_id)
)
conn.commit()
print(f"← Ответ: {response} → {status}")
conn.close()
# === 2. Опрашиваем датчики (пример: device 'S01') ===
send_to_device('S01', 'READ')
time.sleep(0.1)
raw_data = read_from_device()
if raw_data:
temp, hum = process_sensor_data('S01', raw_data)
if temp is not None:
conn = db_connect()
with conn.cursor() as cursor:
cursor.execute(
"INSERT INTO sensor_data (device_id, temperature, humidity) VALUES (%s, %s, %s)",
('S01', temp, hum)
)
conn.commit()
conn.close()
print(f"💾 Записано: T={temp}, H={hum}")
time.sleep(10) # Пауза между циклами
except Exception as e:
print(f"❌ Ошибка: {e}")
time.sleep(5)
if __name__ == "__main__":
main()
S01:READ → T23.5H45.2) — легко отлаживать через minicom.
— Для Modbus замени send_to_device на pymodbus.
Просто вставь в таблицу commands:
INSERT INTO commands (device_id, command) VALUES ('BLINDS_SOUTH', 'OPEN');
Скрипт сам найдёт её, отправит в шину, и пометит как sent или error.
# Создаём пользователя (без пароля в коде!) sudo mysql -u root -p CREATE USER 'smartuser'@'localhost' IDENTIFIED BY 'strongpass'; GRANT INSERT, SELECT, UPDATE ON smart_home.* TO 'smartuser'@'localhost'; # Добавляем в автозапуск (systemd) sudo nano /etc/systemd/system/smart-rs485.service
Содержимое файла:
[Unit] Description=Smart Home RS485 Gateway After=multi-user.target [Service] Type=simple User=pi WorkingDirectory=/home/pi ExecStart=/usr/bin/python3 /home/pi/smart_rs485.py Restart=always RestartSec=10 [Install] WantedBy=multi-user.target
sudo systemctl daemon-reload sudo systemctl enable smart-rs485.service sudo systemctl start smart-rs485.service
Теперь у тебя:
Придется докупить USB-to-RS485 адаптер на FT232RL + SN75176. HW-097? Пусть живёт на ESP32 в подвале — там, где он и нужен.
Комментарии
Пока нет комментариев. Будьте первым!