Перейти к основному содержимому

Реактивная коммуникация

Разработчику Архитектору Инженеру

Основы: Реактивные системы и потоки данных.


Реактивная коммуникация

Что такое реактивная коммуникация?

Реактивные взаимодействия фокусируются на обмене событиями в режиме реального времени. Системы реагируют на события по мере их возникновения, обеспечивая непрерывный поток данных.

Транспорт:

  • WebSockets
  • SSE
  • Kafka Streams
  • MQTT

Play ITЗагрузка интерактивного демо…


WebSockets

WebSocket чаще всего выбирают, когда нужен дуплексный канал между клиентом и сервером: после короткого HTTP‑рукопожатия по тому же TCP обе стороны обмениваются фреймами без отдельного HTTP‑запроса на каждое сообщение. Нормативное описание протокола — RFC 6455.


Архитектурные особенности

Двунаправленная связь

  • После установления соединения данные передаются в обоих направлениях без необходимости повторного рукопожатия.
  • Сервер может инициировать отправку сообщений клиенту без предварительного запроса (push-модель).

Единое соединение

  • Сохраняется постоянное соединение на протяжении всего сеанса работы.
  • Устраняется оверхед повторного установления соединения для каждого сообщения (в отличие от HTTP).

Низкая задержка

  • Минимальные накладные расходы на передачу: заголовок фрейма составляет 2–14 байт.
  • Отсутствие необходимости в заголовках HTTP для каждого сообщения.

Работа через прокси и брандмауэры

  • Использует стандартные порты 80 (ws) и 443 (wss), совместим с существующей веб-инфраструктурой.
  • Начинается с HTTP-рукопожатия, что позволяет проходить через большинство прокси.

Протокол и жизненный цикл соединения

1. Рукопожатие (Handshake)

GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
Sec-WebSocket-Protocol: chat, superchat

Разбор:

  • Строка HTTP-запроса задаёт метод и путь ресурса; сервер сопоставляет её с маршрутом контроллера или шлюза.
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Protocol: chat

Разбор:

  • Фрагмент иллюстрирует контракт, конфигурацию или протокол обмена — сверяйте каждую строку с текстом раздела выше.

  • Клиент отправляет обычный HTTP-запрос с заголовками Upgrade: websocket.

  • Сервер отвечает кодом 101 (Switching Protocols), подтверждая переход на протокол WebSocket.

  • Заголовок Sec-WebSocket-Key — случайное значение от клиента; сервер возвращает Sec-WebSocket-Accept, вычислив хеш от ключа и "магической" константы из RFC 6455. Так проверяется, что ответил именно WebSocket‑сервер, а не случайный HTTP‑кэш или прокси, который не понимает апгрейд.

2. Передача данных

  • Данные передаются в виде фреймов (frames) с минимальным заголовком.
  • Каждый фрейм содержит:
    • Флаги (FIN, RSV1-3, opcode)
    • Длина полезной нагрузки (7, 7+16, или 7+64 бита)
    • Маска (4 байта, обязательна для клиентских фреймов)
    • Полезная нагрузка

3. Закрытие соединения

0x88 0x00 // FIN, opcode=Close, length=0

Разбор:

  • Фрагмент иллюстрирует контракт, конфигурацию или протокол обмена — сверяйте каждую строку с текстом раздела выше.

  • Грациозное закрытие через контрольный фрейм CLOSE.

  • Код состояния (1000 — нормальное закрытие, 1001 — уход, 4000+ — пользовательские коды).


Типы фреймов

OpcodeНазначениеОписание
0x0ContinuationПродолжение фрагментированного сообщения
0x1TextТекстовые данные в UTF-8
0x2BinaryБинарные данные
0x8CloseЗапрос на закрытие соединения
0x9PingПроверка активности соединения
0xAPongОтвет на Ping

Механизмы надёжности

Контроль целостности

  • Каждое сообщение может быть фрагментировано на несколько фреймов с восстановлением на стороне получателя.
  • Проверка порядка фреймов и сборка полного сообщения.

Проверка активности (Heartbeat)

  • Ping/Pong фреймы для обнаружения разрыва соединения.
  • Типичный интервал: 30–60 секунд.

Переподключение

  • Автоматическое восстановление соединения при обнаружении разрыва.
  • Экспоненциальная задержка между попытками для предотвращения шторма запросов.

Аутентификация

  • Токены передаются в начальном HTTP-рукопожатии (Authorization header).
  • Поддержка сессий через cookies (если разрешено CORS).

Ментальная модель — что происходит после 101

До ответа 101 Switching Protocols всё ведёт себя как обычный HTTP — работают маршрутизация, TLS, cookies, заголовки авторизации, правила на API‑шлюзе и WAF. После апгрейда поверх того же TCP открывается другой формат — последовательность бинарных WebSocket‑фреймов с коротким служебным заголовком. У каждого сообщения больше нет своего HTTP‑запроса с Host и путём; контекст пользователя нужно один раз установить при handshake и дальше сопоставлять с логическим каналом на сервере.

Текст и бинарь. В прикладном коде чаще всего шлют UTF‑8 JSON в текстовых фреймах (opcode text). Бинарные фреймы уместны для компактных форматов (Protobuf, сырые байты медиа), но тогда клиент и сервер должны явно договориться о формате.

Подпротокол. Заголовок Sec-WebSocket-Protocol позволяет выбрать "версию прикладного протокола" поверх WebSocket (например, graphql-transport-ws или собственное имя). Это удобно, когда на одном пути /ws может жить несколько типов клиентов.

Сжатие. Расширение permessage-deflate (если согласовано в handshake) сжимает полезную нагрузку на уровне фреймов. На высоких частотах сообщений оно снижает трафик, но добавляет CPU; включают осознанно.


Инфраструктура, балансировка, прокси

  • wss (WebSocket поверх TLS) в продакшене — обычное требование безопасности; ws без шифрования оставляют для локальной разработки.
  • Балансировщики и reverse‑proxy (nginx, Envoy, облачные ALB) должны пропускать Upgrade и не обрывать долгое соединение из‑за короткого proxy_read_timeout / idle timeout.
  • Если за балансировщиком несколько инстансов приложения и нет общей шины сообщений (Redis Pub/Sub, брокер), два клиента в одной "комнате" чата могут попасть на разные машины и не увидеть друг друга. Решения — sticky sessions по cookie/идентификатору, общий pub/sub, или выделенный сервис real‑time.

Надёжность на уровне приложения

Протокол WebSocket гарантирует доставку байт в порядке в рамках соединения, но не гарантирует бизнес‑семантику "сообщение обработано". На практике добавляют:

  • Идемпотентные сообщения с messageId и дедупликацией на приёме.
  • Очередь на сервере с ограничением длины: при перегрузе отвечают ошибкой или дросселируют, иначе память съедят медленные клиенты (у WebSocket нет встроенного backpressure на уровне API браузера).
  • Прикладной heartbeat (JSON ping/pong) там, где нет доступа к низкоуровневым Ping‑фреймам; в браузерном WebSocket отправка служебных Ping‑фреймов недоступна, их обычно генерирует сама реализация стека или прокси.
  • Экспоненциальный backoff при переподключении и восстановление состояния (последний seq, версия снапшота).

WebSocket, SSE, long polling и webhook — ориентир по выбору

НужноРазумный выбор
Дуплекс: клиент и сервер шлют события в любой моментWebSocket
Только поток от сервера к браузеруSSE или chunked HTTP
Редкие обновления, совместимость "везде"Long polling или короткий polling
Внешний сервис уведомляет ваш backend о событииWebhook (POST на зарегистрированный URL)
Редкие изменения, простой контроль расписания опросаPolling

Сводная таблица всех четырёх паттернов с sequence-диаграммами — Polling, Long Polling, SSE и Webhook. Push/pull и надёжность webhook — в Push, Pull, Webhooks.

Подробнее про SSE — в следующем разделе этой же статьи.


Типичные сценарии применения

Чаты и мессенджеры

  • Мгновенная доставка сообщений между пользователями.
  • Индикация набора текста, статус онлайн/оффлайн.

Коллаборативные приложения

  • Совместное редактирование документов в реальном времени.
  • Синхронизация состояния между несколькими клиентами.

Финансовые приложения

  • Потоковые котировки на бирже.
  • Уведомления о транзакциях.

Игры

  • Синхронизация игрового состояния между игроками.
  • Мультиплеерные взаимодействия с минимальной задержкой.

Мониторинг и телеметрия

  • Поток метрик серверов и приложений.
  • Обновление дашбордов в реальном времени.

Уведомления

  • Push-уведомления в веб-приложениях.
  • Событийные оповещения без опроса.

JavaScript (браузерный клиент)

Код ITЗагрузка примера кода…

Разбор:

  • Прочитайте фрагмент построчно: каждая директива или вызов меняет поведение сервиса, брокера или балансировщика в цепочке микросервисов.
  • После правки перезапустите соответствующий процесс или контейнер и проверьте логи, очередь RabbitMQ или ответ HTTP.

Node.js (сервер на ws)

Код ITЗагрузка примера кода…

Разбор:

  • Прочитайте фрагмент построчно: каждая директива или вызов меняет поведение сервиса, брокера или балансировщика в цепочке микросервисов.
  • После правки перезапустите соответствующий процесс или контейнер и проверьте логи, очередь RabbitMQ или ответ HTTP.

C# (ASP.NET Core)

Код ITЗагрузка примера кода…

Разбор:

  • AddHostedService<NotificationConsumer>() регистрирует слушателя очереди в DI; MapControllers() подключает REST-эндпоинты.
  • Пространства имён группируют модели и сервисы; async/await не блокируют поток при HTTP-вызове наружу.

Python (FastAPI + WebSockets)

Код ITЗагрузка примера кода…

Разбор:

  • FastAPI() создаёт приложение; декораторы @app.post / @app.get регистрируют маршруты и схемы ответа response_model.
  • Depends(get_db) внедряет сессию БД; BackgroundTasks откладывает публикацию в RabbitMQ, чтобы HTTP-ответ ушёл клиенту быстрее.
  • pika.BlockingConnection и basic_publish отправляют JSON-событие в очередь order_events; delivery_mode=2 помечает сообщение persistent.
  • Перед вставкой проверяется уникальность order_number; HTTPException(400) и (404) возвращают понятные коды клиенту.

Java (Spring WebSocket)

Код ITЗагрузка примера кода…

Разбор:

  • Пакеты model, repository, service разделяют слои — сущность, доступ к данным, бизнес-логика обработки заказов.

Конфигурация WebSocket в Spring:

Код ITЗагрузка примера кода…

Разбор:

  • Пакеты model, repository, service разделяют слои — сущность, доступ к данным, бизнес-логика обработки заказов.

SSE

Server-Sent Events (SSE) предполагает, что сервер отправляет обновления клиенту через однонаправленный канал, это протокол для однонаправленной передачи данных от сервера к клиенту через HTTP. Сервер отправляет данные клиенту, но обратная связь невозможна. Если соединение разрывается, клиент автоматически переподключается. Пример - трансляция обновлений и уведомления в реальном времени.

Server-Sent Events — стандарт веб-API для односторонней потоковой передачи данных от сервера к клиенту через обычное HTTP-соединение. Клиент устанавливает соединение и получает события в режиме реального времени без необходимости повторных запросов.


Архитектурные особенности

Односторонняя связь

  • Данные передаются только от сервера к клиенту.
  • Клиент не может отправлять сообщения через установленное соединение (только через отдельные HTTP-запросы).

Постоянное соединение

  • После установки соединение остаётся открытым до явного закрытия или таймаута.
  • Сервер отправляет данные по мере их появления, без ожидания запроса от клиента.

Текстовый формат

  • Передача данных осуществляется в виде текстовых событий в кодировке UTF-8.
  • Поддержка автоматического парсинга браузером через EventSource API.

Автоматическое восстановление

  • При разрыве соединения браузер автоматически пытается переподключиться.
  • Интервал повторных попыток настраивается сервером через поле retry.

Совместимость с инфраструктурой

  • Работает поверх стандартного HTTP/HTTPS без необходимости специальных протоколов.
  • Проходит через прокси, балансировщики и брандмауэры без дополнительной настройки.

Протокол и формат сообщений

HTTP-заголовки ответа

HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
Transfer-Encoding: chunked

Разбор:

  • Фрагмент иллюстрирует контракт, конфигурацию или протокол обмена — сверяйте каждую строку с текстом раздела выше.

  • Content-Type: text/event-stream — обязательный заголовок, идентифицирующий SSE-поток.

  • Cache-Control: no-cache — предотвращает кэширование промежуточными прокси.

  • Connection: keep-alive — поддерживает постоянное соединение.

Формат события

event: message
data: {"type":"notification","text":"Новое сообщение"}
id: 12345
retry: 30000

data: Простое сообщение без типа

: Это комментарий (игнорируется клиентом)

Разбор:

  • Формат SSE — поле event — имя события, data — полезная нагрузка, id — для возобновления, retry — пауза переподключения в мс.

Структура поля события:

  • event — тип события (по умолчанию message).
  • data — полезная нагрузка (может занимать несколько строк).
  • id — уникальный идентификатор события для восстановления после переподключения.
  • retry — интервал переподключения в миллисекундах (по умолчанию 3000 мс).

Многострочные данные

data: Первая строка
data: Вторая строка
data: Третья строка

Разбор:

  • Фрагмент иллюстрирует контракт, конфигурацию или протокол обмена — сверяйте каждую строку с текстом раздела выше.

После парсинга объединяются в одну строку с символами новой строки.

Разделитель событий

  • Пустая строка (\n\n) завершает событие.
  • Сервер должен отправлять \n\n после каждого события.

Жизненный цикл соединения

1. Установление соединения

const eventSource = new EventSource('https://example.com/events');

Разбор:

  • Прочитайте фрагмент построчно: каждая директива или вызов меняет поведение сервиса, брокера или балансировщика в цепочке микросервисов.
  • После правки перезапустите соответствующий процесс или контейнер и проверьте логи, очередь RabbitMQ или ответ HTTP.

2. Получение событий

eventSource.onmessage = (event) => {
console.log('Полезная нагрузка:', event.data);
};

eventSource.addEventListener('notification', (event) => {
console.log('Тип события:', event.type);
console.log('Полезная нагрузка:', event.data);
console.log('ID события:', event.lastEventId);
});

Разбор:

  • Прочитайте фрагмент построчно: каждая директива или вызов меняет поведение сервиса, брокера или балансировщика в цепочке микросервисов.
  • После правки перезапустите соответствующий процесс или контейнер и проверьте логи, очередь RabbitMQ или ответ HTTP.

3. Автоматическое переподключение

  • При разрыве соединения браузер повторяет запрос с заголовком Last-Event-ID.
  • Сервер может возобновить поток с последнего полученного события.
GET /events HTTP/1.1
Host: example.com
Last-Event-ID: 12345

Разбор:

  • Строка HTTP-запроса задаёт метод и путь ресурса; сервер сопоставляет её с маршрутом контроллера или шлюза.

4. Закрытие соединения

eventSource.close();

Разбор:

  • Прочитайте фрагмент построчно: каждая директива или вызов меняет поведение сервиса, брокера или балансировщика в цепочке микросервисов.
  • После правки перезапустите соответствующий процесс или контейнер и проверьте логи, очередь RabbitMQ или ответ HTTP.

Механизмы надёжности

Идентификаторы событий

  • Поле id позволяет клиенту отслеживать последнее полученное событие.
  • При переподключении браузер отправляет Last-Event-ID в заголовке запроса.
  • Сервер может возобновить поток с указанного события, предотвращая потерю данных.

Настройка интервала повтора

retry: 10000

Разбор:

  • Фрагмент иллюстрирует контракт, конфигурацию или протокол обмена — сверяйте каждую строку с текстом раздела выше.

  • Значение в миллисекундах.

  • Применяется ко всем последующим переподключениям до следующего изменения.

Обработка ошибок

eventSource.onerror = (error) => {
console.error('Ошибка соединения:', error);

// Проверка состояния
if (eventSource.readyState === EventSource.CLOSED) {
console.log('Соединение закрыто');
} else if (eventSource.readyState === EventSource.CONNECTING) {
console.log('Попытка переподключения...');
}
};

Разбор:

  • Прочитайте фрагмент построчно: каждая директива или вызов меняет поведение сервиса, брокера или балансировщика в цепочке микросервисов.
  • После правки перезапустите соответствующий процесс или контейнер и проверьте логи, очередь RabbitMQ или ответ HTTP.

Состояния соединения

  • EventSource.CONNECTING (0) — соединение отсутствует или устанавливается.
  • EventSource.OPEN (1) — соединение открыто и готово к приёму событий.
  • EventSource.CLOSED (2) — соединение закрыто.

JavaScript (клиент)

Код ITЗагрузка примера кода…

Разбор:

  • Прочитайте фрагмент построчно: каждая директива или вызов меняет поведение сервиса, брокера или балансировщика в цепочке микросервисов.
  • После правки перезапустите соответствующий процесс или контейнер и проверьте логи, очередь RabbitMQ или ответ HTTP.

C# (ASP.NET Core)

Код ITЗагрузка примера кода…

Разбор:

  • [ApiController] и [Route] задают префикс URL; [FromBody] биндит JSON тела в OrderEvent для прямых уведомлений.
  • Пространства имён группируют модели и сервисы; async/await не блокируют поток при HTTP-вызове наружу.

Python (FastAPI)

Код ITЗагрузка примера кода…

Разбор:

  • FastAPI() создаёт приложение; декораторы @app.post / @app.get регистрируют маршруты и схемы ответа response_model.
  • Depends(get_db) внедряет сессию БД; BackgroundTasks откладывает публикацию в RabbitMQ, чтобы HTTP-ответ ушёл клиенту быстрее.
  • pika.BlockingConnection и basic_publish отправляют JSON-событие в очередь order_events; delivery_mode=2 помечает сообщение persistent.
  • Перед вставкой проверяется уникальность order_number; HTTPException(400) и (404) возвращают понятные коды клиенту.

Java (Spring WebFlux)

Код ITЗагрузка примера кода…

Разбор:

  • Пакеты model, repository, service разделяют слои — сущность, доступ к данным, бизнес-логика обработки заказов.

Spring MVC альтернатива (с использованием SseEmitter):

Код ITЗагрузка примера кода…

Разбор:

  • Пакеты model, repository, service разделяют слои — сущность, доступ к данным, бизнес-логика обработки заказов.

Node.js (Express)

Код ITЗагрузка примера кода…

Разбор:

  • Прочитайте фрагмент построчно: каждая директива или вызов меняет поведение сервиса, брокера или балансировщика в цепочке микросервисов.
  • После правки перезапустите соответствующий процесс или контейнер и проверьте логи, очередь RabbitMQ или ответ HTTP.

Event Streaming

Основы Event Streaming

Event Streaming — это потоковые платформы, такие как Apache Kafka, позволяют системам подписываться на события и реагировать на них.

События сохраняются в упорядоченном, неизменяемом журнале и могут быть потреблены множеством независимых приложений в реальном времени или с отсрочкой.

Событие (Event)

  • Атомарная запись о произошедшем факте в системе.
  • Содержит — тип события, временну́ю метку, идентификаторы, полезную нагрузку.
  • Неизменяемо после создания.

Топик (Topic)

  • Логический канал для категоризации событий.
  • Разделяет события по доменам или типам (например, orders.created, users.registered).
  • Поддерживает публикацию от множества производителей и потребление множеством подписчиков.

Партиция (Partition)

  • Горизонтальное разделение топика для масштабирования и параллелизма.
  • События внутри партиции упорядочены.
  • Ключ события определяет принадлежность к партиции (хеширование).

Смещение (Offset)

  • Уникальный идентификатор события внутри партиции.
  • Позволяет потребителю отслеживать позицию в потоке.
  • Гарантирует доставку "хотя бы один раз" при сохранении смещения.

Группа потребителей (Consumer Group)

  • Набор потребителей, совместно обрабатывающих топик.
  • Каждая партиция назначается одному потребителю в группе.
  • Обеспечивает масштабирование обработки и отказоустойчивость.

Retention Policy

  • Политика хранения событий — по времени (например, 7 дней) или по объёму (например, 100 ГБ).
  • Позволяет повторную обработку событий в течение заданного окна.

Потоковые операторы — это инструменты для обработки потоков данных в реальном времени. Они позволяют выполнять преобразования, фильтрацию и агрегацию данных.

Примеры потоковых операторов:

  • Map — преобразует каждый элемент потока. Пример - увеличить все числа в потоке на 1:
stream.map(x => x + 1);

Разбор:

  • Пространства имён группируют модели и сервисы; async/await не блокируют поток при HTTP-вызове наружу.

  • Filter — отбирает только те элементы, кторые удовлетворяют условию. Пример - оставить только положительные числа:

stream.filter(x => x > 0);

Разбор:

  • Пространства имён группируют модели и сервисы; async/await не блокируют поток при HTTP-вызове наружу.

  • Reduce — агрегирует данные в один результат. Пример - подсчитать сумму всех чисел в потоке:

stream.reduce((acc, x) => acc + x, 0);

Разбор:

  • Пространства имён группируют модели и сервисы; async/await не блокируют поток при HTTP-вызове наружу.

  • FlatMap — преобразует каждый элемент в несколько элементов. Пример - разбить строку на слова:

stream.flatMap(sentence => sentence.split(' '));

Разбор:

  • Пространства имён группируют модели и сервисы; async/await не блокируют поток при HTTP-вызове наружу.

Потоковые операторы в интеграции используются как раз в Apache Kafka Streams. На практике это обработка событий в реальном времени (например, подсчёт количества кликов пользователей).

Kafta и MQTT - не только асинхронные протоколы, но и реактивные.

Реактивные системы обеспечивают мгновенный обмен данными, и они подходят для работы с большим количеством событий и пользователей. Такое можно встретить в чатах, онлайн-играх, биржевых платформах, системах мониторинга.


Apache Kafka

Архитектура

  • Распределённый журнал (distributed commit log) с репликацией.
  • Кластер состоит из брокеров (brokers), каждый хранит подмножество партиций.
  • ZooKeeper (в версиях до 2.8, выбор controller через ZAB) или KRaft (встроенный Raft) для координации кластера — алгоритмы выбора лидера.

Гарантии

  • Упорядоченность в пределах партиции.
  • Персистентность через запись на диск с сегментацией.
  • Репликация с настраиваемым фактором (replication factor).
  • Подтверждение записи — acks=0 (без подтверждения), acks=1 (лидер), acks=all (все реплики).

Компоненты экосистемы

  • Kafka Connect — коннекторы для интеграции с внешними системами (базы данных, хранилища).
  • Kafka Streams — библиотека для потоковой обработки на стороне приложения.
  • ksqlDB — SQL-подобный движок для потоковых запросов.
  • Schema Registry — централизованное управление схемами сообщений (Avro, Protobuf).

Apache Pulsar

Архитектура

  • Разделение вычислений и хранения: брокеры обрабатывают запросы, BookKeeper хранит данные.
  • Многоуровневое хранение (tiered storage) — горячие данные в BookKeeper, холодные в объектном хранилище (S3, GCS).

Отличия от Kafka

  • Встроенная поддержка multi-tenancy с изоляцией на уровне тенантов и пространств имён.
  • Гео-репликация на уровне кластера.
  • Гибкая политика хранения с автоматическим перемещением данных.

NATS Streaming (STAN) / JetStream

Особенности

  • Лёгкий протокол с минимальным оверхедом.
  • JetStream (встроен в NATS 2.0+) добавляет персистентность и потоковую обработку.
  • Поддержка "интересных" подписок (interest-based) и потоков (streams).

RabbitMQ Streams

Особенности

  • Расширение RabbitMQ для потоковой обработки.
  • Сохраняет совместимость с AMQP и добавляет семантику журналов.
  • Поддержка offset tracking и повторного чтения.

Event Sourcing

Принцип

  • Состояние агрегата определяется последовательностью событий, а не текущим снимком.
  • События сохраняются в журнале как единственный источник истины.

Преимущества

  • Полная история изменений для аудита и отладки.
  • Возможность восстановления состояния на любой момент времени.
  • Упрощение распределённых транзакций через компенсирующие события.

Реализация

Код ITЗагрузка примера кода…

Разбор:

  • Пространства имён группируют модели и сервисы; async/await не блокируют поток при HTTP-вызове наружу.

CQRS (Command Query Responsibility Segregation)

Принцип

  • Разделение операций записи (команды) и чтения (запросы).
  • Команды публикуют события в поток.
  • Проекции (read models) обрабатывают события и обновляют материализованные представления.

Архитектура

┌─────────────┐ ┌──────────────┐ ┌──────────────┐
│ Command │─────▶│ Event │─────▶│ Event │
│ Handler │ │ Bus/Stream │ │ Store │
└─────────────┘ └──────────────┘ └──────────────┘


┌─────────────────────┐
│ Projection/ │
│ Read Model │
└─────────────────────┘


┌─────────────────────┐
│ Query Handler │
└─────────────────────┘

Разбор:

  • Дерево каталогов фиксирует структуру проекта — где лежат исходники, конфиги, бинарники Kafka и лицензии.

Event-Driven Microservices

Принцип

  • Микросервисы взаимодействуют через события, а не через синхронные вызовы.
  • Каждый сервис имеет собственный контекст и базу данных.
  • События служат для распространения изменений между границами контекстов.

Пример потока

1. Order Service: OrderCreated → Kafka (topic: orders)
2. Payment Service: потребляет OrderCreated → создаёт платеж → PaymentProcessed → Kafka (topic: payments)
3. Inventory Service: потребляет OrderCreated → резервирует товар → InventoryReserved → Kafka (topic: inventory)
4. Notification Service: потребляет все события → отправляет уведомления

Разбор:

  • Схема показывает направление потока данных или сообщений между компонентами распределённой системы.
  • Producer пишет в топик, Consumer читает асинхронно — UI или API не блокируются ожиданием записи в БД.

Change Data Capture (CDC)

Принцип

  • Перехват изменений в базе данных и публикация их как событий.
  • Позволяет синхронизировать данные между системами без изменения прикладного кода.

Инструменты

  • Debezium — CDC для реляционных баз (PostgreSQL, MySQL) и MongoDB.
  • Maxwell — MySQL binlog reader.
  • Kafka Connect с коннекторами JDBC, JMS.

C# (.NET) с Confluent Kafka

Производитель событий

Код ITЗагрузка примера кода…

Разбор:

  • JsonPropertyName сопоставляет поля JSON (orderId) со свойствами C#; десериализатор заполнит объект из тела сообщения RabbitMQ.
  • Пространства имён группируют модели и сервисы; async/await не блокируют поток при HTTP-вызове наружу.

Потребитель событий

Код ITЗагрузка примера кода…

Разбор:

  • JsonPropertyName сопоставляет поля JSON (orderId) со свойствами C#; десериализатор заполнит объект из тела сообщения RabbitMQ.
  • Пространства имён группируют модели и сервисы; async/await не блокируют поток при HTTP-вызове наружу.

Потоковая обработка с Kafka Streams

Код ITЗагрузка примера кода…

Разбор:

  • Пространства имён группируют модели и сервисы; async/await не блокируют поток при HTTP-вызове наружу.

Python с confluent-kafka

Производитель

Код ITЗагрузка примера кода…

Разбор:

  • Импорты подключают модули проекта; выполнение идёт сверху вниз — сначала конфигурация, затем объявление сущностей или маршрутов.

Потребитель

Код ITЗагрузка примера кода…

Разбор:

  • Импорты подключают модули проекта; выполнение идёт сверху вниз — сначала конфигурация, затем объявление сущностей или маршрутов.

Потоковая обработка с Faust

Код ITЗагрузка примера кода…

Разбор:

  • Импорты подключают модули проекта; выполнение идёт сверху вниз — сначала конфигурация, затем объявление сущностей или маршрутов.

Java с Spring Kafka

Конфигурация

Код ITЗагрузка примера кода…

Разбор:

  • Пакеты model, repository, service разделяют слои — сущность, доступ к данным, бизнес-логика обработки заказов.

Производитель

Код ITЗагрузка примера кода…

Разбор:

  • Пакеты model, repository, service разделяют слои — сущность, доступ к данным, бизнес-логика обработки заказов.

Потребитель

Код ITЗагрузка примера кода…

Разбор:

  • Пакеты model, repository, service разделяют слои — сущность, доступ к данным, бизнес-логика обработки заказов.

Потоковая обработка с Kafka Streams

Код ITЗагрузка примера кода…

Разбор:

  • Пакеты model, repository, service разделяют слои — сущность, доступ к данным, бизнес-логика обработки заказов.

Node.js с kafkajs

Производитель

Код ITЗагрузка примера кода…

Разбор:

  • Прочитайте фрагмент построчно: каждая директива или вызов меняет поведение сервиса, брокера или балансировщика в цепочке микросервисов.
  • После правки перезапустите соответствующий процесс или контейнер и проверьте логи, очередь RabbitMQ или ответ HTTP.

Потребитель

Код ITЗагрузка примера кода…

Разбор:

  • Прочитайте фрагмент построчно: каждая директива или вызов меняет поведение сервиса, брокера или балансировщика в цепочке микросервисов.
  • После правки перезапустите соответствующий процесс или контейнер и проверьте логи, очередь RabbitMQ или ответ HTTP.

Потоковая обработка с ksqlDB

Код ITЗагрузка примера кода…

Разбор:

  • Прочитайте фрагмент построчно: каждая директива или вызов меняет поведение сервиса, брокера или балансировщика в цепочке микросервисов.
  • После правки перезапустите соответствующий процесс или контейнер и проверьте логи, очередь RabbitMQ или ответ HTTP.

См. также

Базовая глава в разделе "Система и сеть" — Реактивные системы и потоки данных.


Практический чек-лист внедрения реактивной коммуникации

Перед запуском в продакшене проверьте базовые пункты:

  • wss включён везде, где есть пользовательские или чувствительные данные;
  • настроены heartbeat и политика переподключения с backoff;
  • есть ограничение на размер входящих сообщений и частоту отправки;
  • реализованы idempotency/sequence-механизмы на уровне приложения;
  • таймауты proxy/load balancer согласованы с жизненным циклом соединения;
  • метрики собираются отдельно по подключениям, задержкам, ошибкам и отвалам.

Этот чек-лист снижает риск "тихих" деградаций, когда интерфейс вроде работает, но часть событий теряется или приходит с большой задержкой.


Когда WebSocket лучше не использовать

  • если нужен только поток "сервер -> клиент" и нет двустороннего обмена (чаще достаточно SSE);
  • если обновления редкие и допустим опрос раз в N секунд (обычный polling проще в поддержке);
  • если инфраструктура строго ограничена прокси/сетевыми политиками и long-lived соединения создают операционные риски.

Выбор транспорта лучше делать от требований сценария, а не от популярности технологии.


Основа по протоколу

Базовый разбор HTTP и HTTPS находится в отдельной статье — HTTP как основа веб-интеграций.