Решил сделать нормальный контроль уровня воды в аквариуме, чтобы автодолив не был сюрпризом. Заказал JSN-SR04T — водонепроницаемый ультразвук. Хочу прикрутить к Raspberry Pi, возможно через RS485 (потому что малинка стоит далеко, а витая пара есть везде). Главный кайф — датчик не надо мочить, можно закрепить над водой или даже прижать к стеклу (хотя через стекло ультразвук не фонтан, лучше сверху).
В интернетах мало толковых мануалов именно по UART-режиму, поэтому запишу шпаргалку для себя (и для всех, кто тоже любит помучаться).
🔧 Что за зверь JSN-SR04T и причем тут TX/RX?
Этот датчик может работать в трёх режимах. Нас интересуют UART-режимы (Mode 1 и Mode 2), потому что тогда мы используем пины TX и RX, а не дёргаем эхо-ногу. Всё управление идёт через последовательный порт.
🔀 Режимы работы (переключение резистором R27 или R19)
| Режим | Резистор (JSN) | Резистор (AJ) | Описание |
|---|---|---|---|
| Mode 1 (continuous) | 47 кОм | 120 кОм | Датчик сам шлёт данные раз в 100 мс. Ничего не спрашивай — просто слушай TX. |
| Mode 2 (command) | 120 кОм | 47 кОм | Ждёт команду 0x55 на RX, делает одно измерение и отвечает. Идеально для RS485. |
| Mode 3 (HC-SR04) | 0 (перемычка) | 0 | Обычный Trig/Echo, UART не используется. |
📡 Варианты подключения: напрямую или RS485?
Если малинка стоит в метре от аквариума — вешай датчик напрямую через преобразователь логики 5V→3.3V (или делитель на RX). Если дальше — бери MAX485 и витую пару. Mode 2 для RS485 подходит идеально: отправил байт — получил ответ, и линия свободна.
🐍 Код для Raspberry Pi (Python) — Mode 2 (Command)
Этот код подходит, если датчик настроен на Mode 2 (жду команду). Работает и при прямом подключении, и через пару MAX485 — просто открой последовательный порт.
import serial
import time
import struct
# Настройка последовательного порта (проверьте: /dev/ttyS0, /dev/ttyAMA0, /dev/ttyUSB0)
ser = serial.Serial(
port='/dev/ttyS0', # замените на свой
baudrate=9600,
timeout=1
)
def read_distance():
# Очищаем буферы
ser.reset_input_buffer()
ser.reset_output_buffer()
# Отправляем команду замера (0x55)
ser.write(b'\x55')
time.sleep(0.05) # ждём, пока датчик сделает измерение
# Читаем ответ (4 байта: 0xFF + H + L + SUM)
resp = ser.read(4)
if len(resp) == 4 and resp[0] == 0xFF:
dist_raw = (resp[1] << 8) | resp[2]
checksum = (resp[0] + resp[1] + resp[2]) & 0xFF
if checksum == resp[3]:
# Возвращаем в см (пришло в мм)
return dist_raw / 10.0
else:
print("Контрольная сумма не сошлась")
else:
print("Нет ответа или неверный заголовок")
return None
# Главный цикл
try:
while True:
d = read_distance()
if d is not None:
print(f"📏 Расстояние до воды: {d:.1f} см")
else:
print("⚠️ Ошибка чтения")
time.sleep(2) # опрос раз в 2 секунды
except KeyboardInterrupt:
ser.close()
print("Порт закрыт")
🔄 Mode 1 (continuous) — ещё проще
Если датчик настроен на Mode 1, он сам плюётся данными. Тогда код — просто чтение серийного порта:
import serial
import time
ser = serial.Serial('/dev/ttyS0', 9600, timeout=1)
try:
while True:
if ser.in_waiting >= 4:
data = ser.read(4)
if data[0] == 0xFF:
dist = (data[1] << 8 | data[2]) / 10
print(f"📊 Уровень: {dist} см")
time.sleep(0.1)
except:
ser.close()
🧠 Полезные грабли (чтобы не наступать)
- Мёртвая зона: JSN-SR04T не видит ближе 20–25 см. Если вода поднимется слишком близко к датчику — начнёт показывать чушь или 0. Рассчитывай монтаж так, чтобы при макс. уровне оставалось минимум 30 см до датчика.
- AJ-SR04M: У него другой порядок резисторов для режимов. Если не читается — проверь модель и подбери режим перепайкой.
- Плата датчика НЕ водонепроницаема. Только сам ультразвуковой преобразователь на проводе. Плату прячь в коробку!
- RS485: Не забудь про терминаторы на концах линии (120 Ом) и про то, что MAX485 нужно питание 5V.
🔌 Монтаж на аквариуме
Я планирую закрепить датчик в крышке, строго вертикально вниз, чтобы луч упирался в поверхность воды. Отражение от воды чёткое, проблем быть не должно. Если хочешь измерять через стекло сбоку — ультразвук будет теряться на границах сред (стекло/вода), лучше не мучиться и ставить сверху.
Код для распбери пи с записью в базу по UART
- Опрашивать датчик JSN-SR04T
- Сохранять данные в базу данных (SQLite или PostgreSQL)
- Иметь нормальное логирование и обработку ошибок
#!/usr/bin/env python3
"""
Аквариумный монитор уровня воды
Датчик: JSN-SR04T / AJ-SR04M (режим UART)
База данных: SQLite/PostgreSQL
Автор: DIY аквариумист
"""
import serial
import time
import logging
import sqlite3
import psycopg2
from datetime import datetime
import sys
import os
from typing import Optional, Dict, Any
import schedule
import json
from dataclasses import dataclass
from enum import Enum
# ===================== КОНФИГУРАЦИЯ =====================
class DBType(Enum):
SQLITE = "sqlite"
POSTGRES = "postgres"
# Настройки базы данных
DB_CONFIG = {
"type": DBType.SQLITE, # или DBType.POSTGRES
"sqlite_path": "/home/pi/aquarium/water_level.db",
"postgres": {
"host": "localhost",
"port": 5432,
"database": "aquarium",
"user": "pi",
"password": "your_password"
}
}
# Настройки датчика
SENSOR_CONFIG = {
"port": "/dev/ttyS0", # UART порт на Raspberry Pi
"baudrate": 9600,
"timeout": 1,
"mode": 2, # 1 - continuous, 2 - command
"dead_zone": 25, # мёртвая зона в см
"max_range": 450 # максимальная дальность в см
}
# Настройки опроса
POLL_CONFIG = {
"interval_seconds": 60, # опрос раз в минуту
"retry_attempts": 3, # попыток при ошибке
"retry_delay": 5, # секунд между попытками
"log_level": logging.INFO
}
# Настройки аварийных уровней (опционально)
ALERT_CONFIG = {
"min_level_cm": 10, # минимальный уровень (слишком низко)
"max_level_cm": 40, # максимальный уровень (перелив)
"alert_file": "/home/pi/aquarium/alerts.log"
}
# ===================== НАСТРОЙКА ЛОГИРОВАНИЯ =====================
def setup_logging():
"""Настройка логирования в файл и консоль"""
log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
# Создаём директорию для логов если её нет
log_dir = "/home/pi/aquarium/logs"
os.makedirs(log_dir, exist_ok=True)
# Файловый обработчик
file_handler = logging.FileHandler(f"{log_dir}/sensor_{datetime.now().strftime('%Y%m%d')}.log")
file_handler.setFormatter(logging.Formatter(log_format))
# Консольный обработчик
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(log_format))
# Настройка корневого логгера
logger = logging.getLogger('AquariumSensor')
logger.setLevel(POLL_CONFIG["log_level"])
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
logger = setup_logging()
# ===================== РАБОТА С БАЗОЙ ДАННЫХ =====================
class DatabaseManager:
"""Менеджер для работы с БД (SQLite или PostgreSQL)"""
def __init__(self, config: Dict):
self.config = config
self.connection = None
self.db_type = config["type"]
self.connect()
self.init_tables()
def connect(self):
"""Установка соединения с БД"""
try:
if self.db_type == DBType.SQLITE:
self.connection = sqlite3.connect(
self.config["sqlite_path"],
detect_types=sqlite3.PARSE_DECLTYPES
)
logger.info(f"Connected to SQLite database: {self.config['sqlite_path']}")
elif self.db_type == DBType.POSTGRES:
pg_config = self.config["postgres"]
self.connection = psycopg2.connect(
host=pg_config["host"],
port=pg_config["port"],
database=pg_config["database"],
user=pg_config["user"],
password=pg_config["password"]
)
logger.info(f"Connected to PostgreSQL database: {pg_config['database']}")
except Exception as e:
logger.error(f"Database connection failed: {e}")
raise
def init_tables(self):
"""Создание таблиц, если их нет"""
try:
cursor = self.connection.cursor()
if self.db_type == DBType.SQLITE:
cursor.execute("""
CREATE TABLE IF NOT EXISTS water_level (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
distance_cm REAL NOT NULL,
temperature_c REAL,
raw_value INTEGER,
sensor_status TEXT,
alert_level TEXT
)
""")
# Индекс для быстрых запросов по времени
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_timestamp
ON water_level(timestamp)
""")
elif self.db_type == DBType.POSTGRES:
cursor.execute("""
CREATE TABLE IF NOT EXISTS water_level (
id SERIAL PRIMARY KEY,
timestamp TIMESTAMPTZ DEFAULT NOW(),
distance_cm REAL NOT NULL,
temperature_c REAL,
raw_value INTEGER,
sensor_status TEXT,
alert_level TEXT
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_timestamp
ON water_level(timestamp)
""")
self.connection.commit()
logger.info("Database tables initialized")
except Exception as e:
logger.error(f"Failed to initialize tables: {e}")
raise
def save_reading(self, distance: float, raw_value: int = None,
temperature: float = None, status: str = "OK",
alert: str = None):
"""Сохранение показаний в БД"""
try:
cursor = self.connection.cursor()
if self.db_type == DBType.SQLITE:
cursor.execute("""
INSERT INTO water_level
(timestamp, distance_cm, raw_value, temperature_c, sensor_status, alert_level)
VALUES (CURRENT_TIMESTAMP, ?, ?, ?, ?, ?)
""", (distance, raw_value, temperature, status, alert))
elif self.db_type == DBType.POSTGRES:
cursor.execute("""
INSERT INTO water_level
(timestamp, distance_cm, raw_value, temperature_c, sensor_status, alert_level)
VALUES (NOW(), %s, %s, %s, %s, %s)
""", (distance, raw_value, temperature, status, alert))
self.connection.commit()
logger.debug(f"Saved to DB: {distance:.1f} cm")
except Exception as e:
logger.error(f"Failed to save to database: {e}")
# Пробуем переподключиться
try:
self.connect()
except:
pass
def close(self):
"""Закрытие соединения с БД"""
if self.connection:
self.connection.close()
logger.info("Database connection closed")
# ===================== ДАТЧИК УРОВНЯ =====================
class WaterLevelSensor:
"""Класс для работы с JSN-SR04T / AJ-SR04M через UART"""
def __init__(self, config: Dict):
self.config = config
self.serial_port = None
self.last_valid_reading = None
self.error_count = 0
self.connect()
def connect(self):
"""Открытие последовательного порта"""
try:
self.serial_port = serial.Serial(
port=self.config["port"],
baudrate=self.config["baudrate"],
timeout=self.config["timeout"]
)
logger.info(f"Sensor connected on {self.config['port']}")
# Очистка буфера
self.serial_port.reset_input_buffer()
self.serial_port.reset_output_buffer()
except Exception as e:
logger.error(f"Failed to open serial port: {e}")
raise
def read_mode2(self) -> Optional[float]:
"""
Чтение в режиме Mode 2 (command mode)
Отправляем 0x55 и читаем ответ
"""
try:
# Очистка буферов
self.serial_port.reset_input_buffer()
self.serial_port.reset_output_buffer()
# Отправка команды
self.serial_port.write(b'\x55')
time.sleep(0.05) # Ждём измерения
# Чтение ответа (4 байта)
response = self.serial_port.read(4)
if len(response) == 4:
# Проверка заголовка (должен быть 0xFF)
if response[0] == 0xFF:
# Извлечение расстояния (старший и младший байт)
distance_raw = (response[1] << 8) | response[2]
# Проверка контрольной суммы
checksum = (response[0] + response[1] + response[2]) & 0xFF
if checksum == response[3]:
# Переводим из мм в см
distance_cm = distance_raw / 10.0
# Проверка на мёртвую зону и максимальную дальность
if distance_cm < self.config["dead_zone"]:
logger.warning(f"Reading in dead zone: {distance_cm} cm")
return None
if distance_cm > self.config["max_range"]:
logger.warning(f"Reading beyond max range: {distance_cm} cm")
return None
self.last_valid_reading = distance_cm
self.error_count = 0
return distance_cm
else:
logger.error(f"Checksum error: {checksum} != {response[3]}")
else:
logger.error(f"Invalid header: {response[0]}")
else:
logger.error(f"Invalid response length: {len(response)}")
return None
except Exception as e:
logger.error(f"Error reading sensor: {e}")
return None
def read_mode1(self) -> Optional[float]:
"""
Чтение в режиме Mode 1 (continuous mode)
Датчик сам шлёт данные
"""
try:
if self.serial_port.in_waiting >= 4:
response = self.serial_port.read(4)
if len(response) == 4 and response[0] == 0xFF:
distance_raw = (response[1] << 8) | response[2]
checksum = (response[0] + response[1] + response[2]) & 0xFF
if checksum == response[3]:
distance_cm = distance_raw / 10.0
if distance_cm < self.config["dead_zone"]:
return None
if distance_cm > self.config["max_range"]:
return None
self.last_valid_reading = distance_cm
self.error_count = 0
return distance_cm
return None
except Exception as e:
logger.error(f"Error reading sensor in mode1: {e}")
return None
def read_with_retry(self) -> tuple:
"""
Чтение с повторными попытками
Возвращает (distance, raw_value, status)
"""
for attempt in range(POLL_CONFIG["retry_attempts"]):
try:
if self.config["mode"] == 1:
distance = self.read_mode1()
else:
distance = self.read_mode2()
if distance is not None:
return distance, int(distance * 10), "OK"
logger.warning(f"Attempt {attempt + 1} failed")
time.sleep(POLL_CONFIG["retry_delay"])
except Exception as e:
logger.error(f"Attempt {attempt + 1} error: {e}")
time.sleep(POLL_CONFIG["retry_delay"])
self.error_count += 1
status = "ERROR" if self.error_count > 5 else "WARNING"
return None, None, status
def reconnect(self):
"""Переподключение к датчику"""
try:
if self.serial_port:
self.serial_port.close()
time.sleep(1)
self.connect()
logger.info("Sensor reconnected")
except Exception as e:
logger.error(f"Reconnection failed: {e}")
# ===================== МОНИТОРИНГ И АВАРИИ =====================
class AlertManager:
"""Класс для отслеживания аварийных ситуаций"""
def __init__(self, config: Dict):
self.min_level = config.get("min_level_cm")
self.max_level = config.get("max_level_cm")
self.alert_file = config.get("alert_file")
self.last_alert_state = None
def check_level(self, distance_cm: float) -> Optional[str]:
"""
Проверка уровня воды
Возвращает статус аварии или None
"""
if distance_cm is None:
return "NO_DATA"
if self.max_level and distance_cm > self.max_level:
return "LOW_WATER" # Воды мало (большое расстояние)
if self.min_level and distance_cm < self.min_level:
return "HIGH_WATER" # Воды много (маленькое расстояние)
return "NORMAL"
def log_alert(self, level: str, distance: float):
"""Запись аварии в лог"""
try:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
alert_msg = f"[{timestamp}] ALERT: {level} - distance: {distance:.1f} cm\n"
with open(self.alert_file, 'a') as f:
f.write(alert_msg)
logger.warning(f"Alert triggered: {level}")
except Exception as e:
logger.error(f"Failed to write alert: {e}")
# ===================== ОСНОВНАЯ ПРОГРАММА =====================
class AquariumMonitor:
"""Главный класс мониторинга аквариума"""
def __init__(self):
self.db = DatabaseManager(DB_CONFIG)
self.sensor = WaterLevelSensor(SENSOR_CONFIG)
self.alert = AlertManager(ALERT_CONFIG)
self.running = True
self.stats = {
"total_readings": 0,
"successful_readings": 0,
"failed_readings": 0,
"alerts": 0,
"start_time": datetime.now()
}
logger.info("Aquarium Monitor initialized")
logger.info(f"Polling interval: {POLL_CONFIG['interval_seconds']} seconds")
def read_and_store(self):
"""Один цикл чтения и сохранения"""
logger.debug("Starting reading cycle")
# Чтение с датчика
distance, raw, status = self.sensor.read_with_retry()
self.stats["total_readings"] += 1
# Проверка аварий
alert_level = None
if distance is not None:
self.stats["successful_readings"] += 1
alert_level = self.alert.check_level(distance)
if alert_level not in [None, "NORMAL"]:
self.stats["alerts"] += 1
self.alert.log_alert(alert_level, distance)
logger.info(f"Reading: {distance:.1f} cm (status: {alert_level})")
else:
self.stats["failed_readings"] += 1
logger.error("Failed to get valid reading")
# Сохранение в БД
self.db.save_reading(
distance=distance if distance is not None else -1,
raw_value=raw,
temperature=None, # Можно добавить DS18B20 позже
status=status,
alert=alert_level
)
# Периодический вывод статистики
if self.stats["total_readings"] % 10 == 0:
self.print_stats()
def print_stats(self):
"""Вывод статистики работы"""
uptime = datetime.now() - self.stats["start_time"]
success_rate = (self.stats["successful_readings"] /
max(1, self.stats["total_readings"]) * 100)
logger.info("=" * 50)
logger.info(f"STATISTICS (uptime: {uptime})")
logger.info(f"Total readings: {self.stats['total_readings']}")
logger.info(f"Successful: {self.stats['successful_readings']} ({success_rate:.1f}%)")
logger.info(f"Failed: {self.stats['failed_readings']}")
logger.info(f"Alerts: {self.stats['alerts']}")
logger.info("=" * 50)
def run(self):
"""Основной цикл программы"""
logger.info("Starting monitoring loop")
# Планировщик задач
schedule.every(POLL_CONFIG["interval_seconds"]).seconds.do(self.read_and_store)
# Первое чтение сразу при старте
self.read_and_store()
try:
while self.running:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
logger.info("Received interrupt signal")
self.cleanup()
def cleanup(self):
"""Очистка ресурсов"""
logger.info("Cleaning up...")
self.running = False
# Финальная статистика
self.print_stats()
# Закрытие соединений
self.db.close()
self.sensor.serial_port.close()
logger.info("Aquarium Monitor stopped")
sys.exit(0)
# ===================== ТОЧКА ВХОДА =====================
if __name__ == "__main__":
# Проверка прав на последовательный порт
if not os.access(SENSOR_CONFIG["port"], os.R_OK | os.W_OK):
logger.error(f"Cannot access {SENSOR_CONFIG['port']}. Run with sudo or add user to dialout group.")
logger.info("Try: sudo usermod -a -G dialout $USER")
sys.exit(1)
# Запуск монитора
monitor = AquariumMonitor()
monitor.run()
Комментарии
Пока нет комментариев. Будьте первым!