Категории

mqtt_listener.php скрипт сервис автоматизации iot-automation.service

23.11.2025 | коды из категории: IOT умный дом

обновлено 27.12.2025

Скрипт /var/www/html/iot/public_html/admin/mqtt_listener.php

Назначение: Движок автоматизаций. Постоянно проверяет таблицу sensor_data и выполняет правила из таблицы automations при срабатывании условий (по триггеру или по расписанию).

Основные функции

Что было изменено (исправлено)

  1. Исправлена ошибка SQL: Unknown column 'id'

    Исходный код пытался выполнить:

    SELECT value FROM sensor_data WHERE topic = ? ORDER BY id DESC LIMIT 1

    Но в таблице sensor_data нет столбца id (используется PRIMARY KEY (topic)).

    Исправление: запрос заменён на:

    SELECT value FROM sensor_data WHERE topic = ?

    Так как для каждого топика хранится только одно (последнее) значение — сортировка не нужна.

  2. Временно отключена логика in_progress

    Движок пытался помечать правила как «в обработке», но в таблице automations отсутствовал столбец in_progress, что вызывало скрытые ошибки и блокировку правил.

    Исправление: блоки кода, работающие с in_progress, закомментированы до добавления соответствующего столбца в БД.

  3. Добавлена защита от повторного срабатывания (частично)

    Хотя полная защита через in_progress отключена, движок использует поле last_triggered для логгирования времени срабатывания. В будущем можно будет добавить проверку «не чаще чем раз в N секунд» на его основе.

Как запускается

Работает как systemd-сервис iot-automation.service, запускается при старте системы и постоянно опрашивает базу данных с интервалом 1 секунда.

Важно!

Этот скрипт — не веб-интерфейс, а фоновый обработчик. Веб-интерфейс (logic.php) только редактирует правила. Без запущенного mqtt_listener.php автоматизации не работают.

zzick@raspberrypi:~ $ sudo cat /var/www/html/iot/public_html/admin/mqtt_listener.php
<?php

// Подключение к БД
try {
    $pdo = new PDO("mysql:host=127.0.0.1;dbname=iot_db;charset=utf8mb4", "iot_user", "123456");
    $pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
} catch (PDOException $e) {
    die("❌ Ошибка подключения к БД: " . $e->getMessage() . "\n");
}

echo "🧠 IoT Automation Engine запущен (режим: только БД + расписание).\n";
date_default_timezone_set('Europe/Moscow');
echo "🌍 Установлен часовой пояс: " . date_default_timezone_get() . "
";
// Глобальные переменные
$rules = [];
$lastScheduleCheck = 0;
$scheduled = []; // отложенные действия
$pendingConfirmations = []; // ожидание подтверждений
$ruleById = [];
$lastDailyReset = 0; // ← добавь эту строку
$lastTriggeredMinute = []; // ← добавьте эту строку
$conditionStartTime = [];

// Функция перезагрузки правил из БД
function reloadRules($pdo) {
    global $rules, $lastRuleReload, $ruleById;

    $stmt = $pdo->prepare("SELECT * FROM automations WHERE enabled = 1");
    $stmt->execute();
    $rules = $stmt->fetchAll(PDO::FETCH_ASSOC);
    $ruleById = [];
    foreach ($rules as $rule) {
        $ruleById[$rule['id']] = $rule;
    }

    $count = count($rules);
    echo "✅ Загружено $count активных правил.\n";
    if ($count === 0) {
        echo "⚠️ Нет активных правил. Добавьте в logic.php.\n";
    }

    $lastRuleReload = time();
}

// Функция отправки команды в MQTT
function sendMqttCommand($topic, $payload, $ruleName = '') {
    $cmd = "/usr/bin/mosquitto_pub -h 127.0.0.1 -p 1883 -u iot_user -P 123456 -t " . escapeshellarg($topic) . " -m " . escapeshellarg($payload) . " 2>&1";
    $output = shell_exec($cmd);
    if ($output) {
        echo "⚠️ Ошибка отправки для правила '$ruleName': $output\n";
    } else {
        echo "✅ Успешно отправлено ($ruleName): $topic = $payload\n";
    }
}

// Функция проверки условия по БД
// ========== Функция оценки условия правила (поддерживает simple/script) ==========
function evaluateRuleCondition($rule, $pdo) {
    global $conditionStartTime; // ← Подключаем глобальную переменную

    $ruleId = $rule['id'];
    $duration = (int)($rule['condition_duration'] ?? 0);

    // Если условие не задано — считаем, что оно истинно и длится 0 секунд (т.е. мгновенно)
// Если условие не задано — считаем, что оно истинно и длится 0 секунд (т.е. мгновенно)
// Условие считается "не заданным", если:
// - для простого типа: оба поля operator и value пустые
// - для скриптового типа: поле script пустое
$isSimpleConditionDefined = !empty($rule['condition_operator']) && !empty($rule['condition_value']);
$isScriptConditionDefined = !empty($rule['condition_script']);

if (!$isSimpleConditionDefined && !$isScriptConditionDefined) {
    // Сбрасываем таймер, если условие не задано
    unset($conditionStartTime[$ruleId]);
    return true;
}
   // Получаем последние данные всех сенсоров для скриптов
    $stmt = $pdo->query("SELECT topic, value FROM sensor_data ORDER BY id DESC");
    $rows = $stmt->fetchAll(PDO::FETCH_KEY_PAIR);
    $sensorData = [];
    foreach ($rows as $topic => $rawValue) {
        $decoded = json_decode($rawValue, true);
        if (json_last_error() === JSON_ERROR_NONE && is_array($decoded) && isset($decoded['state'])) {
            $sensorData[$topic] = $decoded['state'];
        } else {
            $sensorData[$topic] = $rawValue;
        }
    }

    // Функция для проверки простого условия
    $checkSimpleCondition = function($triggerTopic, $operator, $value) use ($sensorData) {
        if (empty($triggerTopic)) return false;
        if (!isset($sensorData[$triggerTopic])) return false;

        $actualValue = $sensorData[$triggerTopic];
        $numActual = is_numeric($actualValue) ? (float)$actualValue : null;
        $numValue = is_numeric($value) ? (float)$value : null;

        switch ($operator) {
            case '>':  return $numActual !== null && $numValue !== null ? $numActual > $numValue : $actualValue > $value;
            case '<':  return $numActual !== null && $numValue !== null ? $numActual < $numValue : $actualValue < $value;
            case '==': return $actualValue == $value;
            case '!=': return $actualValue != $value;
            case '>=': return $numActual !== null && $numValue !== null ? $numActual >= $numValue : $actualValue >= $value;
            case '<=': return $numActual !== null && $numValue !== null ? $numActual <= $numValue : $actualValue <= $value;
            default:   return false;
        }
    };

    // Проверяем условие
    $currentConditionMet = false;

    // Простое условие
    if (!empty($rule['condition_operator']) && !empty($rule['condition_value'])) {
        $currentConditionMet = $checkSimpleCondition($rule['trigger_topic'], $rule['condition_operator'], $rule['condition_value']);
    }
    // Скриптовое условие
    else if (!empty($rule['condition_script'])) {
        try {
            $script = $rule['condition_script'];
            $script = preg_replace('/\bsensor\.([a-zA-Z0-9_]+)\b/', '$sensorData[\'$1\']', $script);
            $script = preg_replace('/\bsensor\[([\'"])([a-zA-Z0-9_]+)\1\]/', '$sensorData[\'$2\']', $script);
            $script = str_replace('data', '$data', $script);

            if (!preg_match('/^[a-zA-Z0-9_\[\]\.\(\)\+\-\*\/\!\&\|\s\=\>\<\,\{\}\'\"]+$/', $script)) {
                throw new Exception("Недопустимые символы в выражении");
            }
            $forbidden = ['eval', 'exec', 'system', 'shell_exec', 'passthru', 'assert', 'include', 'require'];
            foreach ($forbidden as $word) {
                if (stripos($script, $word) !== false) {
                    throw new Exception("Запрещённая функция: " . $word);
                }
            }
            $tmpFile = sys_get_temp_dir() . '/eval_' . uniqid() . '.php';
            file_put_contents($tmpFile, "<?php return (" . $script . ");");
            $result = null;
            try {
                $result = include $tmpFile;
            } catch (Exception $e) {
                unlink($tmpFile);
                throw new Exception("Ошибка в выражении: " . $e->getMessage());
            }
            unlink($tmpFile);
            $currentConditionMet = (bool)$result;
        } catch (Exception $e) {
            echo "❌ Ошибка в скриптовом условии правила '" . $rule['name'] . "': " . $e->getMessage() . "\n";
            $currentConditionMet = false;
        }
    }

    // Логика для condition_duration
    if ($duration <= 0) {
        // Если длительность 0 или не задана — возвращаем результат сразу
        unset($conditionStartTime[$ruleId]); // Сбрасываем таймер
        return $currentConditionMet;
    }

    if ($currentConditionMet) {
        // Условие ВЫПОЛНЯЕТСЯ прямо сейчас
        if (!isset($conditionStartTime[$ruleId])) {
            // Это первый момент, когда условие стало true — запоминаем время
            $conditionStartTime[$ruleId] = time();
            echo "⏳ Условие для правила '" . $rule['name'] . "' стало истинным. Начинаем отсчет $duration секунд.\n";
        } else {
            // Условие продолжает выполняться — проверяем, прошло ли достаточно времени
            $elapsed = time() - $conditionStartTime[$ruleId];
            if ($elapsed >= $duration) {
                // Ура! Условие выполняется достаточно долго
                unset($conditionStartTime[$ruleId]); // Сбрасываем таймер после успешного срабатывания
                echo "✅ Условие для правила '" . $rule['name'] . "' выполнялось $duration секунд — можно срабатывать!\n";
                return true;
            } else {
                // Еще не прошло достаточно времени
                echo "⏳ Условие выполняется уже $elapsed сек из $duration для правила '" . $rule['name'] . "'.\n";
                return false;
            }
        }
    } else {
        // Условие НЕ выполняется прямо сейчас — сбрасываем таймер
        if (isset($conditionStartTime[$ruleId])) {
            echo "⚠️ Условие для правила '" . $rule['name'] . "' перестало выполняться — отсчет сброшен.\n";
            unset($conditionStartTime[$ruleId]);
        }
        return false;
    }

    return false; // fallback
}




function checkCondition($pdo, $triggerTopic, $operator, $value) {
    try {
        $stmt = $pdo->prepare("SELECT value FROM sensor_data WHERE topic = ?");
        $stmt->execute([$triggerTopic]);
        $rawValue = $stmt->fetchColumn();

        if ($rawValue === false) {
            echo "⚠️ Нет данных для топика: $triggerTopic\n";
            return false;
        }

        // Всё остальное — как было
        $decoded = json_decode($rawValue, true);
        if (json_last_error() === JSON_ERROR_NONE && is_array($decoded) && isset($decoded['state'])) {
            $actualValue = $decoded['state'];
        } else {
            $actualValue = $rawValue;
        }

        $numActual = is_numeric($actualValue) ? (float)$actualValue : null;
        $numValue = is_numeric($value) ? (float)$value : null;

        switch ($operator) {
            case '>':  return $numActual !== null && $numValue !== null ? $numActual > $numValue : $actualValue > $value;
            case '<':  return $numActual !== null && $numValue !== null ? $numActual < $numValue : $actualValue < $value;
            case '==': return $actualValue == $value;
            case '!=': return $actualValue != $value;
            case '>=': return $numActual !== null && $numValue !== null ? $numActual >= $numValue : $actualValue >= $value;
            case '<=': return $numActual !== null && $numValue !== null ? $numActual <= $numValue : $actualValue <= $value;
            default:   return false;
        }
    } catch (Exception $e) {
        echo "❌ Ошибка проверки условия для '$triggerTopic': " . $e->getMessage() . "\n";
        return false;
    }
}


// Инициализация
reloadRules($pdo);
echo "✅ Готов! Проверяю правила по расписанию и по БД...\n";


function resetDailyRules($pdo) {
    global $pendingConfirmations, $ruleById;

    // Сбрасываем in_progress для всех daily-правил
    $stmt = $pdo->prepare("UPDATE automations SET in_progress = 0 WHERE schedule_type = 'daily'");
    $stmt->execute();
// Записываем факт сброса
$stmt_log = $pdo->prepare("INSERT IGNORE INTO daily_resets (reset_date) VALUES (?)");
$stmt_log->execute([date('Y-m-d')]);

    // Очищаем ожидания подтверждений для daily-правил
    foreach ($ruleById as $rule) {
        if ($rule['schedule_type'] === 'daily' && !empty($rule['confirmation_topic']) && isset($pendingConfirmations[$rule['confirmation_topic']][$rule['id']])) {
            unset($pendingConfirmations[$rule['confirmation_topic']][$rule['id']]);
            // Если больше никто не ждёт этот топик — удаляем его
            if (empty($pendingConfirmations[$rule['confirmation_topic']])) {
                unset($pendingConfirmations[$rule['confirmation_topic']]);
            }
        }
    }

    echo "🔄 Ежедневный сброс: сняты флаги in_progress и очищены ожидания подтверждений для daily-правил.\n";
}

// Основной цикл — без подписки!
while (true) {

    // 🌅 Ежедневный сброс в 00:00:05
    $currentHour = (int)date('H');
    $currentMinute = (int)date('i');
    $currentSecond = (int)date('s');
    if ($currentHour === 0 && $currentMinute === 0 && $currentSecond >= 5 && time() - $lastDailyReset > 60) {
        resetDailyRules($pdo);
        $lastDailyReset = time();
    }



   // 🔁 Перезагружаем правила КАЖДЫЕ 5 СЕКУНД — чтобы мгновенно реагировать на изменения в БД
    reloadRules($pdo);

// 🕔 Проверяем расписание каждые 10 секунд
if (time() - $lastScheduleCheck >= 1) {
    $lastScheduleCheck = time();
    $currentTime = date('H:i:s'); // ← Только часы и минуты!
echo "🔍 DEBUG: Текущее время для проверки расписания: $currentTime
";
$currentDay = strtolower(date('D')); // mon, tue, ..., sun

    foreach ($rules as $rule) {
        if ($rule['schedule_type'] === 'none') continue;

        // Проверка зависимости (если есть)
        if ($rule['dependency_topic'] && $rule['dependency_value'] !== null) {
            $stmt = $pdo->prepare("SELECT value FROM sensor_data WHERE topic = ? ORDER BY timestamp DESC LIMIT 1");
            $stmt->execute([$rule['dependency_topic']]);
            $rawValue = $stmt->fetchColumn();
            if ($rawValue === false) continue;

            $dependencyValue = $rawValue;
            $json = json_decode($rawValue, true);
            if (json_last_error() === JSON_ERROR_NONE && is_array($json) && isset($json['state'])) {
                $dependencyValue = $json['state'];
            }
            if ((string)$dependencyValue !== (string)$rule['dependency_value']) continue;
        }

// === ПРОВЕРКА РАСПИСАНИЯ С ЗАЩИТОЙ ОТ ПРОПУСКА ===
$scheduleTime = $rule['schedule_time']; // например: "08:00:00"

// Проверяем, настало ли время (лексикографическое сравнение H:i:s — корректно!)
if ($currentTime >= $scheduleTime) {
    // Определяем, срабатывало ли уже сегодня
    $lastTriggered = $rule['last_triggered'];
    $lastDate = $lastTriggered ? date('Y-m-d', strtotime($lastTriggered)) : null;
    $today = date('Y-m-d');

    // Если уже сработало сегодня — пропускаем
    if ($lastDate === $today) {
        // Уже сработало сегодня — ничего не делаем
        continue;
    }

    // Для weekly — проверяем день недели
    if ($rule['schedule_type'] === 'weekly') {
        $allowedDays = explode(',', $rule['schedule_days']);
        if (!in_array($currentDay, $allowedDays)) {
            continue;
        }
    }

    // Проверка зависимости (если есть)
    if ($rule['dependency_topic'] && $rule['dependency_value'] !== null) {
        $stmt = $pdo->prepare("SELECT value FROM sensor_data WHERE topic = ? ORDER BY timestamp DESC LIMIT 1");
        $stmt->execute([$rule['dependency_topic']]);
        $rawValue = $stmt->fetchColumn();
        if ($rawValue !== false) {
            $dependencyValue = $rawValue;
            $json = json_decode($rawValue, true);
            if (json_last_error() === JSON_ERROR_NONE && is_array($json) && isset($json['state'])) {
                $dependencyValue = $json['state'];
            }
            if ((string)$dependencyValue !== (string)$rule['dependency_value']) {
                continue;
            }
        } else {
            continue; // Нет данных — ждём
        }
    }

    // Всё готово — проверяем условие и запускаем
    echo "⏰ Сработало расписание: " . $rule['name'] . " (" . $rule['schedule_type'] . " в " . $scheduleTime . ")\n";
    if (evaluateRuleCondition($rule, $pdo)) {
        echo "✅ Условие выполнено — выполняем действие.\n";
        handleAction($rule); // внутри обновится last_triggered
    } else {
        echo "❌ Условие НЕ выполнено — пропускаем правило.\n";
    }
}

    } // <-- Закрываем foreach ($rules as $rule) для расписания
} // <-- Закрываем if (time() - $lastScheduleCheck >= 10)

    // 🔍 Проверяем ВСЕ правила по БД (не по MQTT!)
    foreach ($rules as $rule) {
        if ($rule['schedule_type'] !== 'none') continue; // пропускаем по расписанию — они уже обработаны


    // 🚦 Пропускаем, если уже в обработке
     // if (isset($rule['in_progress']) && !empty($rule['in_progress'])) {
      //      echo "🚧 Правило '" . $rule['name'] . "' пропущено — уже в обработке.\n";
      //      continue;
       // } не включается свет по датчику движения

        // Проверка зависимости
        if ($rule['dependency_topic'] && $rule['dependency_value'] !== null) {
            $stmt = $pdo->prepare("SELECT value FROM sensor_data WHERE topic = ? ORDER BY timestamp DESC LIMIT 1");
            $stmt->execute([$rule['dependency_topic']]);
            $rawValue = $stmt->fetchColumn();
            if ($rawValue === false) {
                continue;
            }
            $dependencyValue = $rawValue;
            $json = json_decode($rawValue, true);
            if (json_last_error() === JSON_ERROR_NONE && is_array($json) && isset($json['state'])) {
                $dependencyValue = $json['state'];
            }
            if ((string)$dependencyValue !== (string)$rule['dependency_value']) {
                continue;
            }
        }

        // Проверка условия по БД
        if (checkCondition($pdo, $rule['trigger_topic'], $rule['condition_operator'], $rule['condition_value'])) {
            echo "🎉 Сработало правило: " . $rule['name'] . "\n";
            handleAction($rule);
        }
    }
  // ✅✅ Проверяем, пришли ли подтверждения
    checkPendingConfirmations($pdo);

    // ⏱ Выполняем отложенные действия
    global $scheduled;
    foreach ($scheduled as $i => $task) {
        if (time() >= $task['execute_at']) {
            echo "⏰ Выполняю отложенное действие: " . $task['rule_name'] . "\n";
            sendMqttCommand($task['topic'], $task['payload'], $task['rule_name']);
            unset($scheduled[$i]);
        }
    }
    $scheduled = array_values($scheduled);

    // 💤 Ждём 5 секунд перед следующей проверкой
    sleep(1);
}

// Общая функция выполнения действия (с поддержкой delay и confirmation)
function handleAction($rule) {
 global $scheduled, $pendingConfirmations, $ruleById, $pdo;
// 📝 Обновляем время последнего срабатывания
try {
    $stmtLog = $pdo->prepare("UPDATE automations SET last_triggered = NOW() WHERE id = ?");
    $stmtLog->execute([$rule['id']]);
    echo "📝 Записано время срабатывания для правила ID " . $rule['id'] . "\n";
} catch (Exception $e) {
    echo "⚠️ Не удалось обновить last_triggered: " . $e->getMessage() . "\n";
}
// 🚦 Помечаем правило как "в обработке" — чтобы не срабатывало повторно
//    if (empty($rule['in_progress'])) { // на всякий случай — если уже помечено, не обновляем
//        $stmt = $pdo->prepare("UPDATE automations SET in_progress = 1 WHERE id = ?");
// /       $stmt->execute([$rule['id']]);
//        echo "🚦 Правило '" . $rule['name'] . "' помечено как in_progress.\n";
//    }
//не влкючается лампочка по датчику движения

    $delay = (int)$rule['delay_seconds'];
    if ($delay > 0) {
        $scheduled[] = [
            'execute_at' => time() + $delay,
            'topic' => $rule['action_topic'],
            'payload' => $rule['action_payload'],
            'rule_name' => $rule['name']
        ];
        echo "⏱ Запланировано на " . date('H:i', time() + $delay) . "\n";
    } else {
        sendMqttCommand($rule['action_topic'], $rule['action_payload'], $rule['name']);

        // Если нужно ждать подтверждения
        if (!empty($rule['confirmation_topic'])) {
            $expectedState = null;
            $actionPayloadDecoded = json_decode($rule['action_payload'], true);
            if (is_array($actionPayloadDecoded) && isset($actionPayloadDecoded['state'])) {
                $expectedState = $actionPayloadDecoded['state'];
            } else {
                $expectedState = 'any';
            }

            if (!isset($pendingConfirmations[$rule['confirmation_topic']])) {
                $pendingConfirmations[$rule['confirmation_topic']] = [];
            }
            $pendingConfirmations[$rule['confirmation_topic']][$rule['id']] = $expectedState;

            echo "⏳ Ожидаю подтверждения в топике: " . $rule['confirmation_topic'] . " (ожидается: " . $expectedState . ")\n";
} else {
    echo "✅ Правило '" . $rule['name'] . "' выполнено (без подтверждения).
";
    // 💥 Удаляем только если галочка "Повторять" НЕ стоит
    if (empty($rule['persistent'])) {
        $stmt = $pdo->prepare("DELETE FROM automations WHERE id = ?");
        $stmt->execute([$rule['id']]);
        echo "🗑️ Правило '" . $rule['name'] . "' удалено после выполнения.
";
        unset($ruleById[$rule['id']]); // чистим кеш
    } else {
        echo "🔁 Правило не удалено (галочка 'Повторять' активна).
";
    }
}







    }
}


// Проверка ожидающих подтверждений через БД
function checkPendingConfirmations($pdo) {
    global $pendingConfirmations, $ruleById;

    if (empty($pendingConfirmations)) return;

    foreach ($pendingConfirmations as $topic => $ruleList) {
        // Получаем последнее значение из БД по топику подтверждения
        $stmt = $pdo->prepare("SELECT value FROM sensor_data WHERE topic = ? ORDER BY id DESC LIMIT 1");
        $stmt->execute([$topic]);
        $rawValue = $stmt->fetchColumn();

        if ($rawValue === false) continue; // Нет данных — ждём дальше

        // У тебя в БД просто "ON" или "OFF" — не JSON, так что оставляем как есть
        $actualValue = $rawValue;

        // Проверяем каждое правило, которое ждёт подтверждения по этому топику
        foreach ($ruleList as $ruleId => $expectedState) {
            $rule = $ruleById[$ruleId] ?? null;
            if (!$rule) continue;

            // Сравниваем: если ожидали "ON", а в БД "ON" — значит, подтверждение получено!
            if ((string)$actualValue === (string)$expectedState) {
                echo "✅✅ Подтверждение получено для правила: " . $rule['name'] . " — состояние: " . $actualValue . "\n";
// ✅✅ Удаляем только если галочка "Повторять" НЕ стоит
if (empty($rule['persistent'])) {
    $stmtDel = $pdo->prepare("DELETE FROM automations WHERE id = ?");
    $stmtDel->execute([$ruleId]);
    echo "🗑️ Правило '" . $rule['name'] . "' удалено после подтверждения.
";
    // Удаляем из кеша, чтобы не было ошибок
    unset($ruleById[$ruleId]);
} else {
    echo "🔁 Правило не удалено после подтверждения (галочка 'Повторять' активна).
";
}
                unset($pendingConfirmations[$topic][$ruleId]); // Удаляем из ожидания
            }
        }

        // Если больше никто не ждёт этот топик — удаляем его из списка
        if (empty($pendingConfirmations[$topic])) {
            unset($pendingConfirmations[$topic]);
        }
    }
}

Комментарии

Пока нет комментариев. Будьте первым!

Оставить комментарий

← Назад к списку

Посетителей сегодня: 0


кто я | о блоге

© Digital Specialist | Не являемся сотрудниками Google, Яндекса и NASA