ГлавнаяАкадемияИсполнительные устройства: интерлоки, таймауты → Контракт сообщения для подтверждения (Ack)

Контракт сообщения для подтверждения (Ack)

Урок 4 · Исполнительные устройства: интерлоки, таймауты · 30 мин · theory

Введение: Зачем нужен контракт сообщения?

В предыдущих уроках мы подробно разобрали фундаментальную проблему систем автоматизации — принцип "отправили и забыли" (fire-and-forget). Отправка команды исполнительному устройству без получения подтверждения о ее выполнении неизбежно приводит к рассинхронизации между желаемым состоянием системы (в логике контроллера) и ее реальным состоянием. Последствия могут варьироваться от незначительных (свет не включился) до критических (не закрылся клапан подачи воды или не отключился перегревающийся двигатель).

Решением этой проблемы является внедрение паттерна "Команда -> Подтверждение (Ack) -> Таймаут", где каждое управляющее воздействие должно быть подтверждено. Но как именно должно выглядеть это подтверждение? Что оно должно содержать? Как отличить подтверждение на команду "Включить свет" от подтверждения на команду "Закрыть шторы"? Здесь мы вводим ключевое понятие — Контракт сообщения (Message Contract).

> 💡 Подсказка: Рассматривайте контракт не как ограничение, а как инструмент для создания порядка. Единый, хорошо документированный стандарт обмена сообщениями — признак профессиональной и продуманной системы.

Контракт сообщения — это формализованное, документированное соглашение о структуре и содержании данных, передаваемых между различными компонентами системы автоматизации внутри Node-RED. По сути, это стандарт, который описывает, как должны выглядеть сообщения `msg`, которыми обмениваются ваши потоки.

Эту концепцию проще всего понять по аналогии с программными интерфейсами (API) в веб-разработке:

Без такого контракта каждый инженер будет изобретать свой собственный формат сообщений. Один поток для управления светом будет ожидать `msg.payload = "ON"`, другой — `msg.payload = true`, а третий — `msg.payload = { "command": 1 }`. Это порождает хаос и делает систему хрупкой и сложной в поддержке.

Внедрение единого контракта сообщения для команд и подтверждений дает четыре неоспоримых преимущества:

  • Предсказуемость: Вы всегда точно знаете, в каком формате придет ответ и какие поля он будет содержать. Логика обработки становится прямолинейной и надежной.
  • Упрощение отладки: При анализе потока данных в отладчике Node-RED (`Debug` node) вы видите унифицированные и понятные структуры сообщений, что в разы ускоряет поиск проблем.
  • Масштабируемость: Добавление нового устройства в систему сводится к реализации для него потока, который "говорит" на общем для всей системы "языке".
  • Повторное использование (Reusability): Вы можете создавать универсальные субпотоки (subflows) для логирования, отправки уведомлений или обработки ошибок, так как они будут получать на вход данные в стандартном формате.
  • В этом уроке мы спроектируем и внедрим на практике надежный контракт для команд и подтверждений, который станет основой для построения отказоустойчивых сценариев.

    ---

    Проектирование контракта Ack: обязательные и опциональные поля

    В основе надежного механизма подтверждений лежит способность однозначно связать отправленную команду и пришедший на нее ответ. Особенно это критично в асинхронной среде Node-RED, где одновременно могут быть запущены десятки команд для разных устройств. Если в систему придет простое сообщение `true`, как понять, на какую из команд оно является ответом?

    Ключевая роль `msg.correlation_id`

    Для решения этой задачи вводится специальное поле — Идентификатор корреляции (Correlation ID). Это уникальная строка или число, которое генерируется инициатором команды и добавляется в сообщение. Компонент, исполняющий команду, обязан скопировать этот идентификатор в свое ответное сообщение (Ack). Таким образом, `correlation_id` работает как почтовый трек-номер, однозначно связывая "посылку" (команду) и "уведомление о доставке" (подтверждение).

    Существует несколько подходов к генерации `correlation_id`:

    Для большинства задач автоматизации на платформе HI метод с `Date.now()` является оптимальным балансом между простотой и надежностью.

    Структура сообщений: Команда и Подтверждение

    Рассмотрим примеры контракта для управления реле.

    1. Сообщение-команда (Command):

    Это сообщение инициирует действие. Оно отправляется в поток, отвечающий за управление физическим устройством.

    {
    

    "topic": "hi/command/relay/living_room_light/set",

    "payload": {

    "command": "ON",

    "value": true

    },

    "correlation_id": 1678886400123,

    "request_ts": 1678886400123

    }

    2. Сообщение-подтверждение (Ack):

    Это сообщение генерируется потоком-исполнителем после успешного (или неуспешного) выполнения команды и отправляется обратно инициатору.

    {
    

    "payload": {

    "status": "ok",

    "state": true

    },

    "correlation_id": 1678886400123,

    "response_ts": 1678886400345

    }

    Соглашения по именованию топиков MQTT

    Для четкого разделения потоков данных в системах, использующих MQTT (что является стандартом для платформы HI), критически важно придерживаться соглашений по именованию топиков. Это позволяет легко фильтровать команды от статусов на уровне MQTT-брокера.

    | Тип топика | Шаблон | Пример | Назначение |

    |------------|---------------------------------------|-------------------------------------------|--------------------------------------------------------|

    | Команда | `...//set` | `hi/office/light-main/set` | Для отправки управляющих команд на устройство. |

    | Статус | `...//status` | `hi/office/light-main/status` | Для публикации фактического состояния устройства. |

    | Ack | `.../ack/` (опция) | `hi/ack/1678886400123` | Для отправки целевых подтверждений. |

    | Телеметрия | `...//value` | `hi/office/temperature/value` | Для публикации данных с датчиков. |

    Чаще всего для Ack не создают отдельный топик, а используют общий поток `Link` нод внутри Node-RED. Однако для межконтроллерного взаимодействия выделенный топик для Ack может быть полезен. В рамках одного контроллера HI мы будем использовать внутреннюю маршрутизацию.

    ---

    Практика: Реализация correlation_id и паттерн 'Message wrapper'

    Теперь перейдем от теории к практике. Создадим поток, который реализует генерацию `correlation_id` и обеспечивает его сквозную передачу. Мы будем использовать простой, но очень мощный Паттерн 'Оболочка сообщения' (Message Wrapper). Его суть в том, что перед отправкой команды в исполнительный поток мы "заворачиваем" ее в стандартную оболочку с `correlation_id` и метаданными.

    Шаг 1: Создание "Оболочки" (Wrapper)

    Предположим, у нас есть кнопка в интерфейсе, которая при нажатии отправляет простое сообщение, например `msg.payload = "ON"`. Наша задача — превратить его в полноценную команду согласно контракту.

  • Создайте ноду `Inject`, которая имитирует сигнал от кнопки. Настройте ее на отправку строки `"ON"`.
  • Соедините ее с нодой `Function`, которую назовем "Command Wrapper".
  • Вставьте в эту ноду `Function` следующий код:
  • // --- Command Wrapper Function ---
    

    // Превращает простой payload в стандартизированную команду.

    // 1. Генерируем уникальный Correlation ID

    const correlationId = Date.now();

    // 2. Сохраняем исходный payload для дальнейшего использования

    const originalPayload = msg.payload;

    // 3. Формируем новое сообщение, соответствующее контракту

    // В msg.payload отправляем команду для исполнителя

    msg.payload = {

    command: originalPayload, // "ON" или "OFF"

    value: originalPayload === "ON" // преобразуем в boolean

    };

    // 4. Добавляем correlation_id и timestamp на верхний уровень msg

    msg.correlation_id = correlationId;

    msg.request_ts = correlationId; // Можно использовать тот же timestamp

    node.status({fill:"blue", shape:"dot", text:"ID: " + correlationId});

    return msg;

    Теперь на выходе ноды "Command Wrapper" мы имеем полностью сформированное сообщение-команду.

    Шаг 2: Модификация потока-исполнителя

    Поток-исполнитель (например, драйвер Modbus-реле или просто нода `rpi gpio out`) должен быть модифицирован так, чтобы после выполнения действия он формировал Ack-сообщение.

    > ⚠️ Внимание: Крайне важно, чтобы логика, отвечающая за исполнение команды, обязательно копировала `correlation_id` в ответное сообщение. Без этого вся система таймаутов и повторов теряет смысл.

    Представим поток, который управляет реле на контроллере HI. Он получает нашу команду и должен сгенерировать подтверждение.

  • Подключите выход "Command Wrapper" к ноде `Function`, имитирующей исполнителя. Назовем ее "Relay Driver".
  • Вставьте в "Relay Driver" следующий код:
  • // --- Relay Driver Function (имитация) ---
    

    // Принимает команду, "управляет" реле и генерирует Ack.

    // 1. Извлекаем данные из команды

    const command = msg.payload.command;

    const valueToSet = msg.payload.value;

    const correlationId = msg.correlation_id; // <-- Важнейший шаг!

    // 2. Имитируем управление реле

    // В реальной системе здесь будет стоять нода rpi-gpio-out или modbus-write

    node.warn(`[ИСПОЛНИТЕЛЬ] Получена команда '${command}' с ID: ${correlationId}. Устанавливаю значение: ${valueToSet}`);

    const success = true; // Имитируем успешное выполнение

    // 3. Формируем новое сообщение - подтверждение (Ack)

    // ВАЖНО: мы создаем новый объект msg, чтобы не смешивать его с исходным

    const ackMessage = {};

    // 4. Копируем Correlation ID в Ack

    ackMessage.correlation_id = correlationId;

    ackMessage.response_ts = Date.now();

    // 5. Формируем payload подтверждения согласно контракту

    if (success) {

    ackMessage.payload = {

    status: "ok",

    state: valueToSet // Сообщаем фактическое новое состояние

    };

    node.status({fill:"green", shape:"dot", text:"Ack OK for ID: " + correlationId});

    } else {

    ackMessage.payload = {

    status: "error",

    message: "Failed to write to relay"

    };

    node.status({fill:"red", shape:"dot", text:"Ack Error for ID: " + correlationId});

    }

    // Отправляем Ack на выход

    return ackMessage;

    Визуализация полного цикла

    Давайте проследим путь `correlation_id` в нашем потоке:

    [Inject: "ON"]
    

    |

    v

    [Function: Command Wrapper]

    | (Генерируется msg.correlation_id = 123)

    `--> msg = { payload: { command: "ON", value: true }, correlation_id: 123, ... }

    |

    v

    [Function: Relay Driver]

    | (Извлекается correlation_id = 123)

    | (Копируется в новое сообщение)

    `--> ackMessage = { payload: { status: "ok", state: true }, correlation_id: 123, ... }

    |

    v

    [Логика обработки Ack]

    (Здесь мы будем ожидать сообщение именно с correlation_id = 123)

    Мы успешно создали механизм, который позволяет отслеживать жизненный цикл каждой отдельной команды.

    ---

    Обработка Ack: Нода Join как элегантное решение для корреляции

    Теперь, когда у нас есть механизм генерации и передачи `correlation_id`, нам нужен инструмент, который сможет автоматически сопоставить исходящую команду и входящее подтверждение, а также обработать ситуацию, когда подтверждение не пришло вовремя. Как мы рассматривали в уроке `COURSE-05-M04-L03`, эту задачу можно решить с помощью ноды `Trigger`. Однако у этого подхода есть недостаток: он плохо масштабируется при отправке нескольких команд одновременно.

    Гораздо более мощным и элегантным решением является использование стандартной ноды Join.

    > 🔗 Связанный материал: Этот паттерн является более функциональной альтернативой подходу с нодой `Trigger`, разобранному в уроке `COURSE-05-M04-L03`. Нода `Join` значительно упрощает логику при работе с множеством параллельных асинхронных команд.

    Принцип работы ноды `Join`

    Нода `Join` может работать в нескольких режимах. Для нашей задачи идеален режим "ключ/значение" (key/value). В этом режиме нода собирает сообщения и группирует их по значению определенного свойства, которое мы указываем как "ключ". В нашем случае, этим ключом будет, конечно же, `msg.correlation_id`.

    Алгоритм работы:
  • На вход ноды `Join` мы подаем два потока сообщений: исходные команды и приходящие подтверждения (Ack).
  • Нода `Join` "видит" первое сообщение с `correlation_id = 123` (это наша команда) и начинает ждать второе сообщение с таким же `correlation_id`.
  • Когда приходит второе сообщение (наш Ack), нода "схлопывает" их в одно, содержащее информацию из обоих.
  • Если второе сообщение не приходит за указанный таймаут, нода отправляет на выход только первое сообщение (команду).
  • Настройка потока с нодой `Join`

    Соберем полный цикл "Команда -> Ack -> Таймаут":

  • Возьмем наш поток из предыдущего раздела с нодами "Command Wrapper" и "Relay Driver".
  • Добавим ноду `Join` и настроим ее следующим образом:
  • * Mode (Режим): `key/value object` (объект ключ/значение).

    * Key Property (Свойство-ключ): `msg.correlation_id`.

    * Number of message parts (Количество частей): `2`. Это значит, что нода будет ждать два сообщения с одинаковым ключом.

    * Timeout (Таймаут): `5` секунд. Это время, которое мы даем исполнителю на отправку подтверждения.

    * And send (И отправить): `the combined message` (объединенное сообщение).

    * After a timeout send (После таймаута отправить): `the message part(s)` (часть(и) сообщения).

  • Теперь соединим потоки:
  • * Выход ноды "Command Wrapper" подключите одновременно к двум точкам:

    1. К входу "Relay Driver" (чтобы отправить команду на исполнение).

    2. К входу ноды `Join` (чтобы "зарегистрировать" ожидание).

    * Выход ноды "Relay Driver" (который генерирует Ack) подключите к этому же входу ноды `Join`.

                      +---------------------+
    

    [Inject] ---> [Command Wrapper] ---+---> [Relay Driver] ---+

    | | |

    | +-----------> [Join] <--+

    | (mode: key/value)

    | |

    +--------------------------------+---> [Обработка результата]

    Разбор результата на выходе из `Join`

    На выход ноды `Join` придет сообщение, по структуре которого мы можем судить о результате:

    `msg.payload` будет объектом, где ключами являются `correlation_id`, а значениями — `payload`'ы объединенных сообщений. Но так как `correlation_id` уникален, проще смотреть на `msg.parts`.

    `msg.parts` будет массивом из двух наших сообщений: команды и подтверждения.

    Мы можем проверить это в ноде `Function` после `Join`:

        if (msg.parts && msg.parts.length === 2) {

    // Успех! Ack получен.

    const command = msg.parts[0]; // Оригинальная команда

    const ack = msg.parts[1]; // Пришедшее подтверждение

    node.status({fill:"green", shape:"dot", text:"SUCCESS: " + command.correlation_id});

    return;

    } else {

    // Таймаут! Пришла только одна часть - команда.

    const command = msg.parts[0];

    node.status({fill:"red", shape:"dot", text:"TIMEOUT: " + command.correlation_id});

    // Здесь запускается логика Retry, как мы разбирали ранее.

    }

    `msg.parts` будет содержать только один элемент — исходное сообщение-команду. Это и есть наш сигнал для запуска логики повтора (retry) или оповещения об ошибке.

    | Метод | Преимущества | Недостатки |

    |-------------------|---------------------------------------------------------|----------------------------------------------------------------|

    | `Trigger` | Прост для понимания в линейных сценариях "одна команда за раз". | Плохо масштабируется, сложно управлять несколькими параллельными командами. |

    | `Join` | Нативно поддерживает корреляцию по ключу. Отлично работает с десятками параллельных асинхронных команд. Встроенный таймаут. | Требует более сложной обработки `msg` на выходе (`msg.parts`). |

    Для production-ready систем на платформе HI использование ноды `Join` является предпочтительным и стандартным методом.

    ---

    Расширенный контракт: передача ошибок и контекста

    Простой ответ "команда выполнена" (`status: 'ok'`) или "не выполнена" — это уже хорошо, но часто этой информации недостаточно. Что если команда не может быть выполнена по уважительной, заранее известной причине? Например, сработала программная блокировка (interlock), и реле не должно включаться.

    В таких случаях система не должна просто молчать или возвращать общую ошибку. Она должна вернуть осмысленное сообщение, объясняющее причину отказа. Для этого мы расширяем наш контракт, превращая `msg.payload` из простого флага в информативный объект.

    От примитива к объекту

    Обновленный контракт для Ack-сообщения может выглядеть так:

    {
    

    "status": "ok" | "error" | "rejected",

    "state": "...",

    "code": "...",

    "message": "..."

    }

    Это позволяет логике верхнего уровня принимать более тонкие решения, а пользовательскому интерфейсу — отображать осмысленные уведомления.

    Пример 1: Некорректная команда для диммера

    Предположим, на диммер, который ожидает яркость от 0 до 100, приходит команда установить `-10`. Поток-исполнитель для диммера должен не просто проигнорировать команду, а вежливо ответить.

        {
    

    "payload": { "command": "SET_BRIGHTNESS", "value": -10 },

    "correlation_id": 1678887000555

    }

        {
    

    "payload": {

    "status": "rejected",

    "state": 50,

    "code": "INVALID_VALUE",

    "message": "Brightness value (-10) is out of range [0, 100]"

    },

    "correlation_id": 1678887000555

    }

    Инициатор команды, получив такой ответ, понимает, что повторять (retry) эту команду бессмысленно, так как она некорректна. Он должен записать ошибку в лог. `state: 50` сообщает, что диммер остался в своем прежнем состоянии.

    Пример 2: Сработала блокировка (Interlock)

    Как мы обсуждали в модуле про реверсивные двигатели (`COURSE-05-M02`), нельзя одновременно включать реле "Вперед" и "Назад". Если команда на включение одного реле приходит, пока другое еще активно, команда должна быть отклонена.

        {
    

    "payload": { "command": "ON" },

    "correlation_id": 1678887100888

    }

        {
    

    "payload": {

    "status": "rejected",

    "state": false,

    "code": "INTERLOCK_ACTIVE",

    "message": "Cannot turn on: reverse relay is currently active."

    },

    "correlation_id": 1678887100888

    }

    Такой ответ позволяет системе немедленно понять причину отказа без необходимости ждать таймаута. Это делает систему более отзывчивой и информативной. Стандартизация кодов ошибок (`INVALID_VALUE`, `INTERLOCK_ACTIVE`) позволяет легко строить на их основе дашборды, системы оповещений и сложную логику верхнего уровня.

    ---

    Итоги и лучшие практики

    В этом уроке мы заложили один из самых важных архитектурных принципов построения надежных систем на платформе HI — Контракт Сообщения. Правильная реализация этого паттерна превращает хаотичный набор потоков в предсказуемую, прозрачную и легко масштабируемую систему.

    Давайте закрепим ключевые моменты:

    Что дальше?

    Освоив принципы создания надежного контракта сообщений, в следующем уроке мы соберем все полученные знания воедино. Мы построим комплексный, готовый к использованию на реальном объекте субпоток для управления исполнительным устройством (например, приводом клапана), который будет инкапсулировать в себе всю логику: отправку команды по контракту, обработку подтверждения через `Join`, реализацию нескольких попыток (retry) и корректное логирование всех шагов.