ГлавнаяАкадемияNode-RED: установка, flows, msg/JSON, отладка → Добавление метаданных в `msg` (трассировка, timestamps)

Добавление метаданных в `msg` (трассировка, timestamps)

Урок 4 · Node-RED: установка, flows, msg/JSON, отладка · 30 мин · theory

Введение в метаданные: Зачем расширять объект msg?

В предыдущих уроках мы установили, что объект `msg` является основой всей коммуникации в Node-RED. Мы подробно рассмотрели его ключевые свойства: `msg.payload` как контейнер для основных данных и `msg.topic` как инструмент для маршрутизации. Однако для построения по-настоящему надежных и легко обслуживаемых систем автоматизации этого недостаточно.

Представьте, что ваша система получает значение температуры: `22.5`. Без дополнительной информации это просто число. Откуда оно пришло? Когда было измерено? Является ли это значение достоверным, или оно получено во время сбоя связи с датчиком? Ответы на эти вопросы лежат за пределами `payload` и `topic`.

Здесь на сцену выходят метаданные — это "данные о данных", дополнительная служебная информация, которая обогащает исходное сообщение, придавая ему контекст, историю и атрибуты качества.

> ℹ️ Информация: Метаданные превращают простое событие от датчика в подробный «паспорт» этого события, который можно отследить на всем его жизненном пути в системе.

Ограничения стандартных свойств

Хотя `msg.payload` и `msg.topic` являются мощными инструментами, их использование имеет свои пределы:

Назначение и роль метаданных

Добавляя в объект `msg` новые свойства верхнего уровня (например, `msg.timestamp`, `msg.source`, `msg.quality`), мы решаем несколько критически важных задач:

📋 Ключевые понятия:

В этом уроке мы научимся правильно создавать, структурировать и использовать метаданные, превращая наши потоки из простых цепочек обработки в интеллектуальные и наблюдаемые системы.

---

Работа со временем: Добавление и использование msg.timestamp

Одним из самых фундаментальных и полезных элементов метаданных является временная метка (timestamp). Она фиксирует точный момент, когда произошло событие, что открывает широкие возможности для анализа и контроля.

Добавление timestamp

Самый простой и надежный способ добавить временную метку — использовать узел `Function` сразу после узла, генерирующего событие (например, `mqtt in`, `modbus-read` или `inject`).

Основной инструмент для этого в JavaScript — метод `Date.now()`. Он возвращает количество миллисекунд, прошедших с начала эпохи Unix (1 января 1970 года). Это стандартный, эффективный и удобный для вычислений формат.

Пример кода в узле `Function`:
// Добавляем новое свойство 'timestamp' к объекту msg

msg.timestamp = Date.now();

// Сообщение продолжает свой путь с новой информацией

return msg;

После прохождения через этот узел объект `msg` будет выглядеть примерно так:

{

"_msgid": "a1b2c3d4.e5f6g7",

"payload": 25.5,

"topic": "sensors/room1/temperature",

"timestamp": 1678886400123

}

Форматы времени: Unix vs ISO 8601

Хотя Unix timestamp идеален для машинной обработки, он неудобен для чтения человеком. Для отладки и вывода в логах часто используется стандарт ISO 8601. Преобразовать одно в другое очень просто.

| Формат | Пример | Преимущества | Недостатки |

| ----------------- | --------------------------- | --------------------------------------------- | ------------------------------------------- |

| Unix (ms) | `1678886400123` | Компактность, легкость вычислений (разница) | Нечитаем для человека без конвертации |

| ISO 8601 | `"2023-03-15T13:20:00.123Z"` | Человекочитаемость, однозначность (включая UTC) | Требует парсинга для вычислений, больше места |

> 💡 Подсказка: Лучшая практика — хранить основную временную метку (`msg.timestamp`) в формате Unix, а при необходимости логирования или отображения добавлять отдельное свойство с форматированной строкой.

Пример кода для добавления обоих форматов:
// Основной timestamp для вычислений

msg.timestamp = Date.now();

// Дополнительное поле для удобства отладки

msg.iso_time = new Date(msg.timestamp).toISOString();

return msg;

Практическое применение: расчет задержки (Latency)

Имея временные метки, мы можем измерять, сколько времени сообщение шло от одной точки потока до другой. Это критически важно для оценки производительности и поиска узких мест.

Сценарий: Измерить время, затраченное на обработку данных от Modbus-устройства, включая валидацию и отправку в MQTT. Поток:

`[Modbus Read]` -> `[Function "Start Timer"]` -> `[Function "Validation"]` -> `[Function "Calculate Latency"]` -> `[MQTT Out]`

  • Узел `Function "Start Timer"`:
  •     msg.timestamp = Date.now();

    return msg;

  • Узел `Function "Calculate Latency"`:
  •     // Рассчитываем задержку в миллисекундах

    const latency = Date.now() - msg.timestamp;

    // Выводим информацию в лог для отладки

    node.log("Processing latency: " + latency + " ms");

    // Можно добавить это значение в метаданные для дальнейшего мониторинга

    msg.processing_time_ms = latency;

    return msg;

    Такой подход позволяет выявлять аномально долгие операции. Если вы видите, что `latency` внезапно выросло со 10 мс до 500 мс, это четкий сигнал, что один из узлов между метками времени стал работать медленнее и требует вашего внимания.

    ---

    Трассировка сообщений: Идентификация источника и пути

    Когда в системе работают десятки или сотни устройств, критически важно понимать не только что произошло, но и откуда пришло сообщение. Для этого служит механизм трассировки (tracing), который начинается с правильной идентификации источника.

    Единый источник правды и `msg.source`

    Для каждого сообщения в системе должен быть определен единый источник правды (Source of Truth) — первоначальный узел или устройство, сгенерировавшее событие. Информация об источнике должна добавляться как можно раньше и иметь стандартизированную структуру. Для этой цели идеально подходит свойство `msg.source`.

    > 💡 Подсказка: Используйте стандартизированные идентификаторы для `msg.source`, чтобы избежать путаницы. Например, `[DEVICE_TYPE]-[LOCATION]-[ID]` (e.g., `MODBUS-ELECTRO-METER-01`). Это упрощает фильтрацию и поиск.

    Рекомендуется использовать для `msg.source` не простую строку, а структурированный объект, который несет больше полезной информации.

    Пример структуры `msg.source`:
    "source": {
    

    "type": "Sensor.1-Wire",

    "id": "28-01234567abcd",

    "location": "Office.Room101.Ceiling",

    "nodeId": "f4b3c2a1.123456"

    }

    Такая структура позволяет легко фильтровать сообщения в логах, например, "показать все события от всех Modbus-датчиков в офисе".

    Реализация «хлебных крошек» с `msg.trace`

    Иногда недостаточно знать только первоначальный источник. Для сложных потоков, где сообщение проходит через множество узлов, субпотоков и даже других контроллеров (через MQTT), полезно отслеживать весь его путь. Этот механизм, похожий на "хлебные крошки", можно реализовать с помощью свойства `msg.trace`, представляющего собой массив.

    Концепция:
  • При создании сообщения (или на первой стадии обработки) создается пустой массив `msg.trace = []`.
  • Каждый ключевой узел, через который проходит сообщение, добавляет в этот массив объект с информацией о себе.
  • Пример элемента массива `msg.trace`:
    {
    

    "nodeId": "a1b2c3d4.e5f6g7",

    "nodeName": "Validate Temperature",

    "flowId": "b2c3d4e5.f6g7h8",

    "ts": 1678886400500

    }

    Таким образом, к концу потока `msg.trace` будет содержать полную историю путешествия сообщения по системе. Это бесценный инструмент для отладки сложных взаимодействий и анализа "гонок состояний" (race conditions).

    Масштабирование и управление инфраструктурой

    Стандартизация `msg.source` и использование `msg.trace` приносят огромную пользу в крупных проектах:

    ---

    Практика: Создание потока с полной трассировкой

    Давайте объединим полученные знания и создадим поток, который не просто обрабатывает данные, но и обогащает их полной информацией для наблюдаемости (observability).

    Задача: Эмулировать получение данных от датчика, добавить метаданные об источнике и времени, провести через несколько стадий обработки и собрать полную трассировку пути.

    Сборка потока

    Создайте на холсте Node-RED следующую цепочку узлов:

    `[Inject]` -> `[Change "Set Source"]` -> `[Function "Start Trace"]` -> `[Function "Process Data"]` -> `[Debug]`

  • `Inject` node: Настройте его на отправку `msg.payload` (число), например, `22.5`. Можно установить повтор каждые 10 секунд для наглядности.
  • `Change` node (Имя: "Set Source"): Этот узел добавит статическую информацию об источнике. Это эффективно, когда все сообщения в данном потоке приходят от одного и того же устройства.
  • `Function` node (Имя: "Start Trace"): Здесь мы добавим динамические метаданные — временную метку и первую запись в массив трассировки.
  • `Function` node (Имя: "Process Data"): Этот узел эмулирует некую бизнес-логику и добавляет свою метку в трассировку.
  • `Debug` node: Настроен на вывод всего объекта `msg`.
  • Конфигурация узлов

    1. Узел `Change`: "Set Source"

    Настройте правила для установки свойств `msg.source`:

    Этот узел создаст структурированный объект `msg.source` без написания кода.

    2. Узел `Function`: "Start Trace"

    Этот узел инициализирует процесс трассировки.

    Код:
    // 1. Добавляем временную метку создания события
    

    msg.timestamp = Date.now();

    // Для удобства добавим и ISO формат

    msg.iso_time = new Date(msg.timestamp).toISOString();

    // 2. Инициализируем массив для трассировки

    msg.trace = [];

    // 3. Добавляем первую запись - "хлебную крошку"

    msg.trace.push({

    nodeId: node.id,

    nodeName: node.name,

    ts: msg.timestamp

    });

    return msg;

    Этот код установит начальную точку отсчета времени и создаст первую запись в "паспорте" нашего сообщения.

    3. Узел `Function`: "Process Data"

    Этот узел имитирует полезную работу и оставляет свой след в истории.

    Код:
    // --- Имитация полезной работы ---
    

    // Предположим, мы конвертируем Цельсий в Фаренгейт

    let celsius = msg.payload;

    let fahrenheit = (celsius * 9/5) + 32;

    // Обновляем payload, но сохраняем исходное значение в метаданных

    msg.original_payload = celsius;

    msg.payload = fahrenheit.toFixed(2);

    // ------------------------------------

    // --- Добавление записи в трассировку ---

    if (!msg.trace) {

    // На случай, если предыдущий узел был пропущен

    msg.trace = [];

    }

    msg.trace.push({

    nodeId: node.id,

    nodeName: node.name,

    ts: Date.now()

    });

    return msg;

    Анализ результата в панели отладки

    После запуска потока вы увидите в панели "Debug" сложный, но очень информативный объект `msg`.

    Пример итогового объекта `msg`:
    {
    

    "_msgid": "cdef1234.567890",

    "payload": "72.50",

    "original_payload": 22.5,

    "source": {

    "type": "Sensor.Virtual",

    "id": "VIRT-TEMP-01",

    "location": "Lab.TestBench"

    },

    "timestamp": 1678887000000,

    "iso_time": "2023-03-15T13:30:00.000Z",

    "trace": [

    {

    "nodeId": "a1b2c3d4.e5f6g7",

    "nodeName": "Start Trace",

    "ts": 1678887000000

    },

    {

    "nodeId": "b2c3d4e5.f6g7h8",

    "nodeName": "Process Data",

    "ts": 1678887000002

    }

    ]

    }

    Что мы видим:

    Теперь у вас есть полный "паспорт" события, который делает вашу систему прозрачной и легко управляемой.

    ---

    Контроль целостности: Флаги качества и счетчики последовательности

    Обогащение сообщений метаданными особенно важно при работе с ненадежными каналами связи, такими как беспроводные сети (LoRaWAN, Zigbee) или промышленные шины с высоким уровнем помех. В таких случаях мы не можем слепо доверять каждому полученному значению.

    > ⚠️ Внимание: Избыточное количество или слишком сложная структура метаданных могут увеличить размер объекта `msg` и незначительно повлиять на производительность контроллера HI, особенно в высокочастотных потоках. Добавляйте только ту информацию, которая действительно необходима для отладки и мониторинга.

    Флаги качества данных (`msg.quality`)

    Проблема: Узел `modbus-read` не смог опросить устройство из-за сбоя на линии RS-485. Что он должен отправить дальше по потоку? Ничего? Старое значение? Ошибку?

    Правильный подход — отправить сообщение дальше, но с явным указанием, что данные не достоверны. Для этого вводится свойство `msg.quality`. Это может быть строка или числовой код, определяющий состояние данных.

    Пример стандартных флагов качества:

    | Флаг (`msg.quality`) | Значение | Описание |

    | ----------------------- | -------- | --------------------------------------------------------------------- |

    | `GOOD` | `0x00` | Данные получены успешно и являются достоверными. |

    | `UNCERTAIN` | `0x01` | Данные получены, но их достоверность под сомнением (например, старые). |

    | `BAD_COMM_FAILURE` | `0x02` | Ошибка связи. Устройство не ответило на запрос. |

    | `BAD_DEVICE_FAILURE` | `0x03` | Устройство ответило, но сообщило о внутренней ошибке. |

    | `BAD_OUT_OF_RANGE` | `0x04` | Полученное значение выходит за допустимые пределы (например, T > 150°C). |

    Как это работает:

    Узел, взаимодействующий с оборудованием (например, обертка над `modbus-read` в виде субпотока), должен анализировать результат операции.

    Последующие узлы в потоке (например, узел управления климатом) обязаны сначала проверить `msg.quality`. Если качество плохое, они могут проигнорировать это сообщение или перейти в безопасный режим работы.

    Счетчики последовательности (`msg.sequence`)

    Проблема: Беспроводной датчик LoRaWAN отправляет данные каждые 5 минут. Из-за помех одно из сообщений не дошло до контроллера. Система этого не заметила и продолжает работать со старыми данными, считая их актуальными.

    Решение: Использовать счетчик последовательности (sequence number). Датчик (или узел, принимающий от него данные) должен с каждым новым сообщением увеличивать счетчик на единицу и добавлять его в `msg.sequence`.

    Как это работает:

    Принимающий узел должен хранить номер последовательности предыдущего сообщения. Для этого идеально подходит контекст потока (`flow context`), так как он сохраняет состояние между выполнениями.

    Пример в узле `Function`:
    // Предполагаем, что msg.payload содержит объект от датчика,
    

    // в котором есть поле sequence_number

    const incomingSequence = msg.payload.sequence_number;

    // Получаем последнее сохраненное значение из контекста потока

    const lastSequence = flow.get("last_sequence_from_lora_sensor_1") || 0;

    // Проверяем, не пропущено ли сообщение

    if (incomingSequence > lastSequence + 1) {

    const missed_count = incomingSequence - (lastSequence + 1);

    node.warn(`Пропущено ${missed_count} сообщений от датчика LORA-1!`);

    // Здесь можно сгенерировать тревогу

    }

    // Проверяем на дубликат или перезагрузку датчика

    if (incomingSequence <= lastSequence && lastSequence !== 0) {

    node.warn(`Получен дубликат или старое сообщение (seq: ${incomingSequence})`);

    return null; // Прекращаем обработку дубликата

    }

    // Если все в порядке, обновляем последнее значение в контексте

    flow.set("last_sequence_from_lora_sensor_1", incomingSequence);

    // Добавляем sequence в метаданные верхнего уровня для стандартизации

    msg.sequence = incomingSequence;

    return msg;

    Этот механизм позволяет надежно обнаруживать потерю данных, что является ключевым требованием для построения отказоустойчивых систем.

    ---

    Итоги и лучшие практики по работе с метаданными

    Мы рассмотрели, как превратить простой объект `msg` в мощный инструмент для отладки, мониторинга и обеспечения надежности. Грамотная работа с метаданными — это признак профессионального подхода к проектированию систем на Node-RED.

    Давайте подведем итог. Вот ключевые поля метаданных, которые стоит использовать в ваших проектах:

    Контракт метаданных

    Так же, как мы создавали "контракт сообщения" для `msg.payload` в уроке [COURSE-06-M02-L04], для крупного проекта необходимо создать «контракт метаданных». Это документ или раздел в проектной wiki, который четко определяет:

  • Какие поля метаданных используются в проекте.
  • Структуру и формат каждого поля (например, формат `msg.source`).
  • Набор допустимых значений для таких полей, как `msg.quality`.
  • Единообразие — ключ к созданию поддерживаемой системы. Если в одном потоке timestamp называется `msg.ts`, в другом `msg.timestamp`, а в третьем `msg.time`, система быстро превратится в хаос.

    Баланс информативности и производительности

    Метаданные — это не накладные расходы, а инвестиция в надежность, наблюдаемость и простоту обслуживания вашей системы автоматизации. Затраты на добавление нескольких полей в объект `msg` на современном контроллере HI (4 ядра / 4 ГБ RAM) практически незаметны, а выгоды от возможности быстро найти и устранить проблему — огромны.

    Тем не менее, всегда ищите разумный баланс. Не стоит добавлять в `msg.trace` каждый узел в потоке — только ключевые точки, где меняется логика или происходит взаимодействие с внешними системами. Добавляйте только ту информацию, которая вам действительно понадобится для анализа.

    Что дальше?

    В следующем уроке мы перейдем к одной из самых мощных концепций Node-RED, которая позволяет управлять состоянием систем и избегать хаоса в логике — к управлению контекстом (`flow` и `global`). Мы научимся хранить переменные, состояния и настройки, которые переживают выполнение одного сообщения, и обеспечивать их сохранность даже после перезагрузки контроллера.