↩️ Назад

Категории

JSN-SR04T + Raspberry Pi колхозим уровень аквариума для автоматического подлива воды

05.03.2026 | коды из категории: IOT умный дом

про JSN-SR04T + Raspberry Pi колхозим уровень аквариума для автоматического подлива воды

Решил сделать нормальный контроль уровня воды в аквариуме, чтобы автодолив не был сюрпризом. Заказал JSN-SR04T — водонепроницаемый ультразвук. Хочу прикрутить к Raspberry Pi, возможно через RS485 (потому что малинка стоит далеко, а витая пара есть везде). Главный кайф — датчик не надо мочить, можно закрепить над водой или даже прижать к стеклу (хотя через стекло ультразвук не фонтан, лучше сверху).

В интернетах мало толковых мануалов именно по UART-режиму, поэтому запишу шпаргалку для себя (и для всех, кто тоже любит помучаться).

🔧 Что за зверь JSN-SR04T и причем тут TX/RX?

Этот датчик может работать в трёх режимах. Нас интересуют UART-режимы (Mode 1 и Mode 2), потому что тогда мы используем пины TX и RX, а не дёргаем эхо-ногу. Всё управление идёт через последовательный порт.

Лайфхак: На Aliexpress часто продают клон AJ-SR04M. Он выглядит так же, но резисторы режимов могут стоять наоборот (120кОм вместо 47кОм). Если твой датчик — AJ, то Mode 1 включается резистором 120кОм, Mode 2 — 47кОм. Имей в виду!

🔀 Режимы работы (переключение резистором R27 или R19)

РежимРезистор (JSN)Резистор (AJ)Описание
Mode 1 (continuous)47 кОм120 кОмДатчик сам шлёт данные раз в 100 мс. Ничего не спрашивай — просто слушай TX.
Mode 2 (command)120 кОм47 кОмЖдёт команду 0x55 на RX, делает одно измерение и отвечает. Идеально для RS485.
Mode 3 (HC-SR04)0 (перемычка)0Обычный Trig/Echo, UART не используется.

📡 Варианты подключения: напрямую или RS485?

Если малинка стоит в метре от аквариума — вешай датчик напрямую через преобразователь логики 5V→3.3V (или делитель на RX). Если дальше — бери MAX485 и витую пару. Mode 2 для RS485 подходит идеально: отправил байт — получил ответ, и линия свободна.

⚠️ ВАЖНО ПРО УРОВНИ: JSN-SR04T работает от 5V, его TX выдаёт 5V. Raspberry Pi принимает 3.3V. Прямое подключение убьёт GPIO. Нужен или резистивный делитель (например, 1кОм и 2кОм), или микросхема-преобразователь. MAX485 в данном случае ещё и отлично согласует уровни.

🐍 Код для Raspberry Pi (Python) — Mode 2 (Command)

Этот код подходит, если датчик настроен на Mode 2 (жду команду). Работает и при прямом подключении, и через пару MAX485 — просто открой последовательный порт.

import serial
import time
import struct

# Настройка последовательного порта (проверьте: /dev/ttyS0, /dev/ttyAMA0, /dev/ttyUSB0)
ser = serial.Serial(
    port='/dev/ttyS0',   # замените на свой
    baudrate=9600,
    timeout=1
)

def read_distance():
    # Очищаем буферы
    ser.reset_input_buffer()
    ser.reset_output_buffer()
    
    # Отправляем команду замера (0x55)
    ser.write(b'\x55')
    time.sleep(0.05)  # ждём, пока датчик сделает измерение
    
    # Читаем ответ (4 байта: 0xFF + H + L + SUM)
    resp = ser.read(4)
    if len(resp) == 4 and resp[0] == 0xFF:
        dist_raw = (resp[1] << 8) | resp[2]
        checksum = (resp[0] + resp[1] + resp[2]) & 0xFF
        if checksum == resp[3]:
            # Возвращаем в см (пришло в мм)
            return dist_raw / 10.0
        else:
            print("Контрольная сумма не сошлась")
    else:
        print("Нет ответа или неверный заголовок")
    return None

# Главный цикл
try:
    while True:
        d = read_distance()
        if d is not None:
            print(f"📏 Расстояние до воды: {d:.1f} см")
        else:
            print("⚠️ Ошибка чтения")
        time.sleep(2)   # опрос раз в 2 секунды
except KeyboardInterrupt:
    ser.close()
    print("Порт закрыт")

🔄 Mode 1 (continuous) — ещё проще

Если датчик настроен на Mode 1, он сам плюётся данными. Тогда код — просто чтение серийного порта:

import serial
import time

ser = serial.Serial('/dev/ttyS0', 9600, timeout=1)

try:
    while True:
        if ser.in_waiting >= 4:
            data = ser.read(4)
            if data[0] == 0xFF:
                dist = (data[1] << 8 | data[2]) / 10
                print(f"📊 Уровень: {dist} см")
        time.sleep(0.1)
except:
    ser.close()

🧠 Полезные грабли (чтобы не наступать)

🔥 Особенность AJ-SR04M (популярная версия): В некоторых партиях контрольная сумма считается иначе, и в ответе может быть 3 или 4 байта. Если расстояние не выводится — попробуй читать 3 байта или проверь, не зажат ли датчик в режиме непрерывной посылки (тогда пакеты идут один за другим, могут накладываться).

🔌 Монтаж на аквариуме

Я планирую закрепить датчик в крышке, строго вертикально вниз, чтобы луч упирался в поверхность воды. Отражение от воды чёткое, проблем быть не должно. Если хочешь измерять через стекло сбоку — ультразвук будет теряться на границах сред (стекло/вода), лучше не мучиться и ставить сверху.

Код для распбери пи с записью в базу по UART

#!/usr/bin/env python3
"""
Аквариумный монитор уровня воды
Датчик: JSN-SR04T / AJ-SR04M (режим UART)
База данных: SQLite/PostgreSQL
Автор: DIY аквариумист
"""

import serial
import time
import logging
import sqlite3
import psycopg2
from datetime import datetime
import sys
import os
from typing import Optional, Dict, Any
import schedule
import json
from dataclasses import dataclass
from enum import Enum

# ===================== КОНФИГУРАЦИЯ =====================
class DBType(Enum):
    SQLITE = "sqlite"
    POSTGRES = "postgres"

# Настройки базы данных
DB_CONFIG = {
    "type": DBType.SQLITE,  # или DBType.POSTGRES
    "sqlite_path": "/home/pi/aquarium/water_level.db",
    "postgres": {
        "host": "localhost",
        "port": 5432,
        "database": "aquarium",
        "user": "pi",
        "password": "your_password"
    }
}

# Настройки датчика
SENSOR_CONFIG = {
    "port": "/dev/ttyS0",          # UART порт на Raspberry Pi
    "baudrate": 9600,
    "timeout": 1,
    "mode": 2,                      # 1 - continuous, 2 - command
    "dead_zone": 25,                 # мёртвая зона в см
    "max_range": 450                 # максимальная дальность в см
}

# Настройки опроса
POLL_CONFIG = {
    "interval_seconds": 60,          # опрос раз в минуту
    "retry_attempts": 3,             # попыток при ошибке
    "retry_delay": 5,                 # секунд между попытками
    "log_level": logging.INFO
}

# Настройки аварийных уровней (опционально)
ALERT_CONFIG = {
    "min_level_cm": 10,    # минимальный уровень (слишком низко)
    "max_level_cm": 40,    # максимальный уровень (перелив)
    "alert_file": "/home/pi/aquarium/alerts.log"
}

# ===================== НАСТРОЙКА ЛОГИРОВАНИЯ =====================
def setup_logging():
    """Настройка логирования в файл и консоль"""
    log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    
    # Создаём директорию для логов если её нет
    log_dir = "/home/pi/aquarium/logs"
    os.makedirs(log_dir, exist_ok=True)
    
    # Файловый обработчик
    file_handler = logging.FileHandler(f"{log_dir}/sensor_{datetime.now().strftime('%Y%m%d')}.log")
    file_handler.setFormatter(logging.Formatter(log_format))
    
    # Консольный обработчик
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(logging.Formatter(log_format))
    
    # Настройка корневого логгера
    logger = logging.getLogger('AquariumSensor')
    logger.setLevel(POLL_CONFIG["log_level"])
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    
    return logger

logger = setup_logging()

# ===================== РАБОТА С БАЗОЙ ДАННЫХ =====================
class DatabaseManager:
    """Менеджер для работы с БД (SQLite или PostgreSQL)"""
    
    def __init__(self, config: Dict):
        self.config = config
        self.connection = None
        self.db_type = config["type"]
        self.connect()
        self.init_tables()
    
    def connect(self):
        """Установка соединения с БД"""
        try:
            if self.db_type == DBType.SQLITE:
                self.connection = sqlite3.connect(
                    self.config["sqlite_path"],
                    detect_types=sqlite3.PARSE_DECLTYPES
                )
                logger.info(f"Connected to SQLite database: {self.config['sqlite_path']}")
            
            elif self.db_type == DBType.POSTGRES:
                pg_config = self.config["postgres"]
                self.connection = psycopg2.connect(
                    host=pg_config["host"],
                    port=pg_config["port"],
                    database=pg_config["database"],
                    user=pg_config["user"],
                    password=pg_config["password"]
                )
                logger.info(f"Connected to PostgreSQL database: {pg_config['database']}")
            
        except Exception as e:
            logger.error(f"Database connection failed: {e}")
            raise
    
    def init_tables(self):
        """Создание таблиц, если их нет"""
        try:
            cursor = self.connection.cursor()
            
            if self.db_type == DBType.SQLITE:
                cursor.execute("""
                    CREATE TABLE IF NOT EXISTS water_level (
                        id INTEGER PRIMARY KEY AUTOINCREMENT,
                        timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                        distance_cm REAL NOT NULL,
                        temperature_c REAL,
                        raw_value INTEGER,
                        sensor_status TEXT,
                        alert_level TEXT
                    )
                """)
                
                # Индекс для быстрых запросов по времени
                cursor.execute("""
                    CREATE INDEX IF NOT EXISTS idx_timestamp 
                    ON water_level(timestamp)
                """)
                
            elif self.db_type == DBType.POSTGRES:
                cursor.execute("""
                    CREATE TABLE IF NOT EXISTS water_level (
                        id SERIAL PRIMARY KEY,
                        timestamp TIMESTAMPTZ DEFAULT NOW(),
                        distance_cm REAL NOT NULL,
                        temperature_c REAL,
                        raw_value INTEGER,
                        sensor_status TEXT,
                        alert_level TEXT
                    )
                """)
                
                cursor.execute("""
                    CREATE INDEX IF NOT EXISTS idx_timestamp 
                    ON water_level(timestamp)
                """)
            
            self.connection.commit()
            logger.info("Database tables initialized")
            
        except Exception as e:
            logger.error(f"Failed to initialize tables: {e}")
            raise
    
    def save_reading(self, distance: float, raw_value: int = None, 
                     temperature: float = None, status: str = "OK", 
                     alert: str = None):
        """Сохранение показаний в БД"""
        try:
            cursor = self.connection.cursor()
            
            if self.db_type == DBType.SQLITE:
                cursor.execute("""
                    INSERT INTO water_level 
                    (timestamp, distance_cm, raw_value, temperature_c, sensor_status, alert_level)
                    VALUES (CURRENT_TIMESTAMP, ?, ?, ?, ?, ?)
                """, (distance, raw_value, temperature, status, alert))
            
            elif self.db_type == DBType.POSTGRES:
                cursor.execute("""
                    INSERT INTO water_level 
                    (timestamp, distance_cm, raw_value, temperature_c, sensor_status, alert_level)
                    VALUES (NOW(), %s, %s, %s, %s, %s)
                """, (distance, raw_value, temperature, status, alert))
            
            self.connection.commit()
            logger.debug(f"Saved to DB: {distance:.1f} cm")
            
        except Exception as e:
            logger.error(f"Failed to save to database: {e}")
            # Пробуем переподключиться
            try:
                self.connect()
            except:
                pass
    
    def close(self):
        """Закрытие соединения с БД"""
        if self.connection:
            self.connection.close()
            logger.info("Database connection closed")

# ===================== ДАТЧИК УРОВНЯ =====================
class WaterLevelSensor:
    """Класс для работы с JSN-SR04T / AJ-SR04M через UART"""
    
    def __init__(self, config: Dict):
        self.config = config
        self.serial_port = None
        self.last_valid_reading = None
        self.error_count = 0
        self.connect()
    
    def connect(self):
        """Открытие последовательного порта"""
        try:
            self.serial_port = serial.Serial(
                port=self.config["port"],
                baudrate=self.config["baudrate"],
                timeout=self.config["timeout"]
            )
            logger.info(f"Sensor connected on {self.config['port']}")
            
            # Очистка буфера
            self.serial_port.reset_input_buffer()
            self.serial_port.reset_output_buffer()
            
        except Exception as e:
            logger.error(f"Failed to open serial port: {e}")
            raise
    
    def read_mode2(self) -> Optional[float]:
        """
        Чтение в режиме Mode 2 (command mode)
        Отправляем 0x55 и читаем ответ
        """
        try:
            # Очистка буферов
            self.serial_port.reset_input_buffer()
            self.serial_port.reset_output_buffer()
            
            # Отправка команды
            self.serial_port.write(b'\x55')
            time.sleep(0.05)  # Ждём измерения
            
            # Чтение ответа (4 байта)
            response = self.serial_port.read(4)
            
            if len(response) == 4:
                # Проверка заголовка (должен быть 0xFF)
                if response[0] == 0xFF:
                    # Извлечение расстояния (старший и младший байт)
                    distance_raw = (response[1] << 8) | response[2]
                    
                    # Проверка контрольной суммы
                    checksum = (response[0] + response[1] + response[2]) & 0xFF
                    if checksum == response[3]:
                        # Переводим из мм в см
                        distance_cm = distance_raw / 10.0
                        
                        # Проверка на мёртвую зону и максимальную дальность
                        if distance_cm < self.config["dead_zone"]:
                            logger.warning(f"Reading in dead zone: {distance_cm} cm")
                            return None
                        if distance_cm > self.config["max_range"]:
                            logger.warning(f"Reading beyond max range: {distance_cm} cm")
                            return None
                        
                        self.last_valid_reading = distance_cm
                        self.error_count = 0
                        return distance_cm
                    else:
                        logger.error(f"Checksum error: {checksum} != {response[3]}")
                else:
                    logger.error(f"Invalid header: {response[0]}")
            else:
                logger.error(f"Invalid response length: {len(response)}")
            
            return None
            
        except Exception as e:
            logger.error(f"Error reading sensor: {e}")
            return None
    
    def read_mode1(self) -> Optional[float]:
        """
        Чтение в режиме Mode 1 (continuous mode)
        Датчик сам шлёт данные
        """
        try:
            if self.serial_port.in_waiting >= 4:
                response = self.serial_port.read(4)
                
                if len(response) == 4 and response[0] == 0xFF:
                    distance_raw = (response[1] << 8) | response[2]
                    checksum = (response[0] + response[1] + response[2]) & 0xFF
                    
                    if checksum == response[3]:
                        distance_cm = distance_raw / 10.0
                        
                        if distance_cm < self.config["dead_zone"]:
                            return None
                        if distance_cm > self.config["max_range"]:
                            return None
                        
                        self.last_valid_reading = distance_cm
                        self.error_count = 0
                        return distance_cm
            return None
            
        except Exception as e:
            logger.error(f"Error reading sensor in mode1: {e}")
            return None
    
    def read_with_retry(self) -> tuple:
        """
        Чтение с повторными попытками
        Возвращает (distance, raw_value, status)
        """
        for attempt in range(POLL_CONFIG["retry_attempts"]):
            try:
                if self.config["mode"] == 1:
                    distance = self.read_mode1()
                else:
                    distance = self.read_mode2()
                
                if distance is not None:
                    return distance, int(distance * 10), "OK"
                
                logger.warning(f"Attempt {attempt + 1} failed")
                time.sleep(POLL_CONFIG["retry_delay"])
                
            except Exception as e:
                logger.error(f"Attempt {attempt + 1} error: {e}")
                time.sleep(POLL_CONFIG["retry_delay"])
        
        self.error_count += 1
        status = "ERROR" if self.error_count > 5 else "WARNING"
        return None, None, status
    
    def reconnect(self):
        """Переподключение к датчику"""
        try:
            if self.serial_port:
                self.serial_port.close()
            time.sleep(1)
            self.connect()
            logger.info("Sensor reconnected")
        except Exception as e:
            logger.error(f"Reconnection failed: {e}")

# ===================== МОНИТОРИНГ И АВАРИИ =====================
class AlertManager:
    """Класс для отслеживания аварийных ситуаций"""
    
    def __init__(self, config: Dict):
        self.min_level = config.get("min_level_cm")
        self.max_level = config.get("max_level_cm")
        self.alert_file = config.get("alert_file")
        self.last_alert_state = None
    
    def check_level(self, distance_cm: float) -> Optional[str]:
        """
        Проверка уровня воды
        Возвращает статус аварии или None
        """
        if distance_cm is None:
            return "NO_DATA"
        
        if self.max_level and distance_cm > self.max_level:
            return "LOW_WATER"  # Воды мало (большое расстояние)
        
        if self.min_level and distance_cm < self.min_level:
            return "HIGH_WATER"  # Воды много (маленькое расстояние)
        
        return "NORMAL"
    
    def log_alert(self, level: str, distance: float):
        """Запись аварии в лог"""
        try:
            timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            alert_msg = f"[{timestamp}] ALERT: {level} - distance: {distance:.1f} cm\n"
            
            with open(self.alert_file, 'a') as f:
                f.write(alert_msg)
            
            logger.warning(f"Alert triggered: {level}")
            
        except Exception as e:
            logger.error(f"Failed to write alert: {e}")

# ===================== ОСНОВНАЯ ПРОГРАММА =====================
class AquariumMonitor:
    """Главный класс мониторинга аквариума"""
    
    def __init__(self):
        self.db = DatabaseManager(DB_CONFIG)
        self.sensor = WaterLevelSensor(SENSOR_CONFIG)
        self.alert = AlertManager(ALERT_CONFIG)
        self.running = True
        self.stats = {
            "total_readings": 0,
            "successful_readings": 0,
            "failed_readings": 0,
            "alerts": 0,
            "start_time": datetime.now()
        }
        
        logger.info("Aquarium Monitor initialized")
        logger.info(f"Polling interval: {POLL_CONFIG['interval_seconds']} seconds")
    
    def read_and_store(self):
        """Один цикл чтения и сохранения"""
        logger.debug("Starting reading cycle")
        
        # Чтение с датчика
        distance, raw, status = self.sensor.read_with_retry()
        
        self.stats["total_readings"] += 1
        
        # Проверка аварий
        alert_level = None
        if distance is not None:
            self.stats["successful_readings"] += 1
            alert_level = self.alert.check_level(distance)
            
            if alert_level not in [None, "NORMAL"]:
                self.stats["alerts"] += 1
                self.alert.log_alert(alert_level, distance)
            
            logger.info(f"Reading: {distance:.1f} cm (status: {alert_level})")
        else:
            self.stats["failed_readings"] += 1
            logger.error("Failed to get valid reading")
        
        # Сохранение в БД
        self.db.save_reading(
            distance=distance if distance is not None else -1,
            raw_value=raw,
            temperature=None,  # Можно добавить DS18B20 позже
            status=status,
            alert=alert_level
        )
        
        # Периодический вывод статистики
        if self.stats["total_readings"] % 10 == 0:
            self.print_stats()
    
    def print_stats(self):
        """Вывод статистики работы"""
        uptime = datetime.now() - self.stats["start_time"]
        success_rate = (self.stats["successful_readings"] / 
                       max(1, self.stats["total_readings"]) * 100)
        
        logger.info("=" * 50)
        logger.info(f"STATISTICS (uptime: {uptime})")
        logger.info(f"Total readings: {self.stats['total_readings']}")
        logger.info(f"Successful: {self.stats['successful_readings']} ({success_rate:.1f}%)")
        logger.info(f"Failed: {self.stats['failed_readings']}")
        logger.info(f"Alerts: {self.stats['alerts']}")
        logger.info("=" * 50)
    
    def run(self):
        """Основной цикл программы"""
        logger.info("Starting monitoring loop")
        
        # Планировщик задач
        schedule.every(POLL_CONFIG["interval_seconds"]).seconds.do(self.read_and_store)
        
        # Первое чтение сразу при старте
        self.read_and_store()
        
        try:
            while self.running:
                schedule.run_pending()
                time.sleep(1)
                
        except KeyboardInterrupt:
            logger.info("Received interrupt signal")
            self.cleanup()
    
    def cleanup(self):
        """Очистка ресурсов"""
        logger.info("Cleaning up...")
        self.running = False
        
        # Финальная статистика
        self.print_stats()
        
        # Закрытие соединений
        self.db.close()
        self.sensor.serial_port.close()
        
        logger.info("Aquarium Monitor stopped")
        sys.exit(0)

# ===================== ТОЧКА ВХОДА =====================
if __name__ == "__main__":
    # Проверка прав на последовательный порт
    if not os.access(SENSOR_CONFIG["port"], os.R_OK | os.W_OK):
        logger.error(f"Cannot access {SENSOR_CONFIG['port']}. Run with sudo or add user to dialout group.")
        logger.info("Try: sudo usermod -a -G dialout $USER")
        sys.exit(1)
    
    # Запуск монитора
    monitor = AquariumMonitor()
    monitor.run()



Категории:

Категории

Комментарии

Пока нет комментариев. Будьте первым!

Оставить комментарий

← Назад к списку

Посетителей сегодня: 0
о блоге | карта блога

© Digital Specialist | Не являемся сотрудниками Google, Яндекса и NASA