Реактивная коммуникация
Основы: Реактивные системы и потоки данных.
Реактивная коммуникация
Что такое реактивная коммуникация?
Реактивные взаимодействия фокусируются на обмене событиями в режиме реального времени. Системы реагируют на события по мере их возникновения, обеспечивая непрерывный поток данных.
Транспорт:
- WebSockets
- SSE
- Kafka Streams
- MQTT
Play ITЗагрузка интерактивного демо…
WebSockets
WebSocket чаще всего выбирают, когда нужен дуплексный канал между клиентом и сервером: после короткого HTTP‑рукопожатия по тому же TCP обе стороны обмениваются фреймами без отдельного HTTP‑запроса на каждое сообщение. Нормативное описание протокола — RFC 6455.
Архитектурные особенности
Двунаправленная связь
- После установления соединения данные передаются в обоих направлениях без необходимости повторного рукопожатия.
- Сервер может инициировать отправку сообщений клиенту без предварительного запроса (push-модель).
Единое соединение
- Сохраняется постоянное соединение на протяжении всего сеанса работы.
- Устраняется оверхед повторного установления соединения для каждого сообщения (в отличие от HTTP).
Низкая задержка
- Минимальные накладные расходы на передачу: заголовок фрейма составляет 2–14 байт.
- Отсутствие необходимости в заголовках HTTP для каждого сообщения.
Работа через прокси и брандмауэры
- Использует стандартные порты 80 (ws) и 443 (wss), совместим с существующей веб-инфраструктурой.
- Начинается с HTTP-рукопожатия, что позволяет проходить через большинство прокси.
Протокол и жизненный цикл соединения
1. Рукопожатие (Handshake)
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
Sec-WebSocket-Protocol: chat, superchat
Разбор:
- Строка HTTP-запроса задаёт метод и путь ресурса; сервер сопоставляет её с маршрутом контроллера или шлюза.
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Protocol: chat
Разбор:
-
Фрагмент иллюстрирует контракт, конфигурацию или протокол обмена — сверяйте каждую строку с текстом раздела выше.
-
Клиент отправляет обычный HTTP-запрос с заголовками
Upgrade: websocket. -
Сервер отвечает кодом 101 (Switching Protocols), подтверждая переход на протокол WebSocket.
-
Заголовок
Sec-WebSocket-Key— случайное значение от клиента; сервер возвращаетSec-WebSocket-Accept, вычислив хеш от ключа и "магической" константы из RFC 6455. Так проверяется, что ответил именно WebSocket‑сервер, а не случайный HTTP‑кэш или прокси, который не понимает апгрейд.
2. Передача данных
- Данные передаются в виде фреймов (frames) с минимальным заголовком.
- Каждый фрейм содержит:
- Флаги (FIN, RSV1-3, opcode)
- Длина полезной нагрузки (7, 7+16, или 7+64 бита)
- Маска (4 байта, обязательна для клиентских фреймов)
- Полезная нагрузка
3. Закрытие соединения
0x88 0x00 // FIN, opcode=Close, length=0
Разбор:
-
Фрагмент иллюстрирует контракт, конфигурацию или протокол обмена — сверяйте каждую строку с текстом раздела выше.
-
Грациозное закрытие через контрольный фрейм CLOSE.
-
Код состояния (1000 — нормальное закрытие, 1001 — уход, 4000+ — пользовательские коды).
Типы фреймов
| Opcode | Назначение | Описание |
|---|---|---|
| 0x0 | Continuation | Продолжение фрагментированного сообщения |
| 0x1 | Text | Текстовые данные в UTF-8 |
| 0x2 | Binary | Бинарные данные |
| 0x8 | Close | Запрос на закрытие соединения |
| 0x9 | Ping | Проверка активности соединения |
| 0xA | Pong | Ответ на Ping |
Механизмы надёжности
Контроль целостности
- Каждое сообщение может быть фрагментировано на несколько фреймов с восстановлением на стороне получателя.
- Проверка порядка фреймов и сборка полного сообщения.
Проверка активности (Heartbeat)
- Ping/Pong фреймы для обнаружения разрыва соединения.
- Типичный интервал: 30–60 секунд.
Переподключение
- Автоматическое восстановление соединения при обнаружении разрыва.
- Экспоненциальная задержка между попытками для предотвращения шторма запросов.
Аутентификация
- Токены передаются в начальном HTTP-рукопожатии (Authorization header).
- Поддержка сессий через cookies (если разрешено CORS).
Ментальная модель — что происходит после 101
До ответа 101 Switching Protocols всё ведёт себя как обычный HTTP — работают маршрутизация, TLS, cookies, заголовки авторизации, правила на API‑шлюзе и WAF. После апгрейда поверх того же TCP открывается другой формат — последовательность бинарных WebSocket‑фреймов с коротким служебным заголовком. У каждого сообщения больше нет своего HTTP‑запроса с Host и путём; контекст пользователя нужно один раз установить при handshake и дальше сопоставлять с логическим каналом на сервере.
Текст и бинарь. В прикладном коде чаще всего шлют UTF‑8 JSON в текстовых фреймах (opcode text). Бинарные фреймы уместны для компактных форматов (Protobuf, сырые байты медиа), но тогда клиент и сервер должны явно договориться о формате.
Подпротокол. Заголовок Sec-WebSocket-Protocol позволяет выбрать "версию прикладного протокола" поверх WebSocket (например, graphql-transport-ws или собственное имя). Это удобно, когда на одном пути /ws может жить несколько типов клиентов.
Сжатие. Расширение permessage-deflate (если согласовано в handshake) сжимает полезную нагрузку на уровне фреймов. На высоких частотах сообщений оно снижает трафик, но добавляет CPU; включают осознанно.
Инфраструктура, балансировка, прокси
- wss (WebSocket поверх TLS) в продакшене — обычное требование безопасности;
wsбез шифрования оставляют для локальной разработки. - Балансировщики и reverse‑proxy (nginx, Envoy, облачные ALB) должны пропускать Upgrade и не обрывать долгое соединение из‑за короткого
proxy_read_timeout/ idle timeout. - Если за балансировщиком несколько инстансов приложения и нет общей шины сообщений (Redis Pub/Sub, брокер), два клиента в одной "комнате" чата могут попасть на разные машины и не увидеть друг друга. Решения — sticky sessions по cookie/идентификатору, общий pub/sub, или выделенный сервис real‑time.
Надёжность на уровне приложения
Протокол WebSocket гарантирует доставку байт в порядке в рамках соединения, но не гарантирует бизнес‑семантику "сообщение обработано". На практике добавляют:
- Идемпотентные сообщения с
messageIdи дедупликацией на приёме. - Очередь на сервере с ограничением длины: при перегрузе отвечают ошибкой или дросселируют, иначе память съедят медленные клиенты (у WebSocket нет встроенного backpressure на уровне API браузера).
- Прикладной heartbeat (JSON
ping/pong) там, где нет доступа к низкоуровневым Ping‑фреймам; в браузерномWebSocketотправка служебных Ping‑фреймов недоступна, их обычно генерирует сама реализация стека или прокси. - Экспоненциальный backoff при переподключении и восстановление состояния (последний
seq, версия снапшота).
WebSocket, SSE, long polling и webhook — ориентир по выбору
| Нужно | Разумный выбор |
|---|---|
| Дуплекс: клиент и сервер шлют события в любой момент | WebSocket |
| Только поток от сервера к браузеру | SSE или chunked HTTP |
| Редкие обновления, совместимость "везде" | Long polling или короткий polling |
| Внешний сервис уведомляет ваш backend о событии | Webhook (POST на зарегистрированный URL) |
| Редкие изменения, простой контроль расписания опроса | Polling |
Сводная таблица всех четырёх паттернов с sequence-диаграммами — Polling, Long Polling, SSE и Webhook. Push/pull и надёжность webhook — в Push, Pull, Webhooks.
Подробнее про SSE — в следующем разделе этой же статьи.
Типичные сценарии применения
Чаты и мессенджеры
- Мгновенная доставка сообщений между пользователями.
- Индикация набора текста, статус онлайн/оффлайн.
Коллаборативные приложения
- Совместное редактирование документов в реальном времени.
- Синхронизация состояния между несколькими клиентами.
Финансовые приложения
- Потоковые котировки на бирже.
- Уведомления о транзакциях.
Игры
- Синхронизация игрового состояния между игроками.
- Мультиплеерные взаимодействия с минимальной задержкой.
Мониторинг и телеметрия
- Поток метрик серверов и приложений.
- Обновление дашбордов в реальном времени.
Уведомления
- Push-уведомления в веб-приложениях.
- Событийные оповещения без опроса.
JavaScript (браузерный клиент)
Код ITЗагрузка примера кода…
Разбор:
- Прочитайте фрагмент построчно: каждая директива или вызов меняет поведение сервиса, брокера или балансировщика в цепочке микросервисов.
- После правки перезапустите соответствующий процесс или контейнер и проверьте логи, очередь RabbitMQ или ответ HTTP.
Node.js (сервер на ws)
Код ITЗагрузка примера кода…
Разбор:
- Прочитайте фрагмент построчно: каждая директива или вызов меняет поведение сервиса, брокера или балансировщика в цепочке микросервисов.
- После правки перезапустите соответствующий процесс или контейнер и проверьте логи, очередь RabbitMQ или ответ HTTP.
C# (ASP.NET Core)
Код ITЗагрузка примера кода…
Разбор:
AddHostedService<NotificationConsumer>()регистрирует слушателя очереди в DI;MapControllers()подключает REST-эндпоинты.- Пространства имён группируют модели и сервисы;
async/awaitне блокируют поток при HTTP-вызове наружу.
Python (FastAPI + WebSockets)
Код ITЗагрузка примера кода…
Разбор:
FastAPI()создаёт приложение; декораторы@app.post/@app.getрегистрируют маршруты и схемы ответаresponse_model.Depends(get_db)внедряет сессию БД;BackgroundTasksоткладывает публикацию в RabbitMQ, чтобы HTTP-ответ ушёл клиенту быстрее.pika.BlockingConnectionиbasic_publishотправляют JSON-событие в очередьorder_events;delivery_mode=2помечает сообщение persistent.- Перед вставкой проверяется уникальность
order_number;HTTPException(400)и(404)возвращают понятные коды клиенту.
Java (Spring WebSocket)
Код ITЗагрузка примера кода…
Разбор:
- Пакеты
model,repository,serviceразделяют слои — сущность, доступ к данным, бизнес-логика обработки заказов.
Конфигурация WebSocket в Spring:
Код ITЗагрузка примера кода…
Разбор:
- Пакеты
model,repository,serviceразделяют слои — сущность, доступ к данным, бизнес-логика обработки заказов.
SSE
Server-Sent Events (SSE) предполагает, что сервер отправляет обновления клиенту через однонаправленный канал, это протокол для однонаправленной передачи данных от сервера к клиенту через HTTP. Сервер отправляет данные клиенту, но обратная связь невозможна. Если соединение разрывается, клиент автоматически переподключается. Пример - трансляция обновлений и уведомления в реальном времени.
Server-Sent Events — стандарт веб-API для односторонней потоковой передачи данных от сервера к клиенту через обычное HTTP-соединение. Клиент устанавливает соединение и получает события в режиме реального времени без необходимости повторных запросов.
Архитектурные особенности
Односторонняя связь
- Данные передаются только от сервера к клиенту.
- Клиент не может отправлять сообщения через установленное соединение (только через отдельные HTTP-запросы).
Постоянное соединение
- После установки соединение остаётся открытым до явного закрытия или таймаута.
- Сервер отправляет данные по мере их появления, без ожидания запроса от клиента.
Текстовый формат
- Передача данных осуществляется в виде текстовых событий в кодировке UTF-8.
- Поддержка автоматического парсинга браузером через
EventSourceAPI.
Автоматическое восстановление
- При разрыве соединения браузер автоматически пытается переподключиться.
- Интервал повторных попыток настраивается сервером через поле
retry.
Совместимость с инфраструктурой
- Работает поверх стандартного HTTP/HTTPS без необходимости специальных протоколов.
- Проходит через прокси, балансировщики и брандмауэры без дополнительной настройки.
Протокол и формат сообщений
HTTP-заголовки ответа
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
Transfer-Encoding: chunked
Разбор:
-
Фрагмент иллюстрирует контракт, конфигурацию или протокол обмена — сверяйте каждую строку с текстом раздела выше.
-
Content-Type: text/event-stream— обязательный заголовок, идентифицирующий SSE-поток. -
Cache-Control: no-cache— предотвращает кэширование промежуточными прокси. -
Connection: keep-alive— поддерживает постоянное соединение.
Формат события
event: message
data: {"type":"notification","text":"Новое сообщение"}
id: 12345
retry: 30000
data: Простое сообщение без типа
: Это комментарий (игнорируется клиентом)
Разбор:
- Формат SSE — поле
event— имя события,data— полезная нагрузка,id— для возобновления,retry— пауза переподключения в мс.
Структура поля события:
event— тип события (по умолчаниюmessage).data— полезная нагрузка (может занимать несколько строк).id— уникальный идентификатор события для восстановления после переподключения.retry— интервал переподключения в миллисекундах (по умолчанию 3000 мс).
Многострочные данные
data: Первая строка
data: Вторая строка
data: Третья строка
Разбор:
- Фрагмент иллюстрирует контракт, конфигурацию или протокол обмена — сверяйте каждую строку с текстом раздела выше.
После парсинга объединяются в одну строку с символами новой строки.
Разделитель событий
- Пустая строка (
\n\n) завершает событие. - Сервер должен отправлять
\n\nпосле каждого события.
Жизненный цикл соединения
1. Установление соединения
const eventSource = new EventSource('https://example.com/events');
Разбор:
- Прочитайте фрагмент построчно: каждая директива или вызов меняет поведение сервиса, брокера или балансировщика в цепочке микросервисов.
- После правки перезапустите соответствующий процесс или контейнер и проверьте логи, очередь RabbitMQ или ответ HTTP.
2. Получение событий
eventSource.onmessage = (event) => {
console.log('Полезная нагрузка:', event.data);
};
eventSource.addEventListener('notification', (event) => {
console.log('Тип события:', event.type);
console.log('Полезная нагрузка:', event.data);
console.log('ID события:', event.lastEventId);
});
Разбор:
- Прочитайте фрагмент построчно: каждая директива или вызов меняет поведение сервиса, брокера или балансировщика в цепочке микросервисов.
- После правки перезапустите соответствующий процесс или контейнер и проверьте логи, очередь RabbitMQ или ответ HTTP.
3. Автоматическое переподключение
- При разрыве соединения браузер повторяет запрос с заголовком
Last-Event-ID. - Сервер может возобновить поток с последнего полученного события.
GET /events HTTP/1.1
Host: example.com
Last-Event-ID: 12345
Разбор:
- Строка HTTP-запроса задаёт метод и путь ресурса; сервер сопоставляет её с маршрутом контроллера или шлюза.
4. Закрытие соединения
eventSource.close();
Разбор:
- Прочитайте фрагмент построчно: каждая директива или вызов меняет поведение сервиса, брокера или балансировщика в цепочке микросервисов.
- После правки перезапустите соответствующий процесс или контейнер и проверьте логи, очередь RabbitMQ или ответ HTTP.
Механизмы надёжности
Идентификаторы событий
- Поле
idпозволяет клиенту отслеживать последнее полученное событие. - При переподключении браузер отправляет
Last-Event-IDв заголовке запроса. - Сервер может возобновить поток с указанного события, предотвращая потерю данных.
Настройка интервала повтора
retry: 10000
Разбор:
-
Фрагмент иллюстрирует контракт, конфигурацию или протокол обмена — сверяйте каждую строку с текстом раздела выше.
-
Значение в миллисекундах.
-
Применяется ко всем последующим переподключениям до следующего изменения.
Обработка ошибок
eventSource.onerror = (error) => {
console.error('Ошибка соединения:', error);
// Проверка состояния
if (eventSource.readyState === EventSource.CLOSED) {
console.log('Соединение закрыто');
} else if (eventSource.readyState === EventSource.CONNECTING) {
console.log('Попытка переподключения...');
}
};
Разбор:
- Прочитайте фрагмент построчно: каждая директива или вызов меняет поведение сервиса, брокера или балансировщика в цепочке микросервисов.
- После правки перезапустите соответствующий процесс или контейнер и проверьте логи, очередь RabbitMQ или ответ HTTP.
Состояния соединения
EventSource.CONNECTING(0) — соединение отсутствует или устанавливается.EventSource.OPEN(1) — соединение открыто и готово к приёму событий.EventSource.CLOSED(2) — соединение закрыто.
JavaScript (клиент)
Код ITЗагрузка примера кода…
Разбор:
- Прочитайте фрагмент построчно: каждая директива или вызов меняет поведение сервиса, брокера или балансировщика в цепочке микросервисов.
- После правки перезапустите соответствующий процесс или контейнер и проверьте логи, очередь RabbitMQ или ответ HTTP.
C# (ASP.NET Core)
Код ITЗагрузка примера кода…
Разбор:
[ApiController]и[Route]задают префикс URL;[FromBody]биндит JSON тела вOrderEventдля прямых уведомлений.- Пространства имён группируют модели и сервисы;
async/awaitне блокируют поток при HTTP-вызове наружу.
Python (FastAPI)
Код ITЗагрузка примера кода…
Разбор:
FastAPI()создаёт приложение; декораторы@app.post/@app.getрегистрируют маршруты и схемы ответаresponse_model.Depends(get_db)внедряет сессию БД;BackgroundTasksоткладывает публикацию в RabbitMQ, чтобы HTTP-ответ ушёл клиенту быстрее.pika.BlockingConnectionиbasic_publishотправляют JSON-событие в очередьorder_events;delivery_mode=2помечает сообщение persistent.- Перед вставкой проверяется уникальность
order_number;HTTPException(400)и(404)возвращают понятные коды клиенту.
Java (Spring WebFlux)
Код ITЗагрузка примера кода…
Разбор:
- Пакеты
model,repository,serviceразделяют слои — сущность, доступ к данным, бизнес-логика обработки заказов.
Spring MVC альтернатива (с использованием SseEmitter):
Код ITЗагрузка примера кода…
Разбор:
- Пакеты
model,repository,serviceразделяют слои — сущность, доступ к данным, бизнес-логика обработки заказов.
Node.js (Express)
Код ITЗагрузка примера кода…
Разбор:
- Прочитайте фрагмент построчно: каждая директива или вызов меняет поведение сервиса, брокера или балансировщика в цепочке микросервисов.
- После правки перезапустите соответствующий процесс или контейнер и проверьте логи, очередь RabbitMQ или ответ HTTP.
Event Streaming
Основы Event Streaming
Event Streaming — это потоковые платформы, такие как Apache Kafka, позволяют системам подписываться на события и реагировать на них.
События сохраняются в упорядоченном, неизменяемом журнале и могут быть потреблены множеством независимых приложений в реальном времени или с отсрочкой.
Событие (Event)
- Атомарная запись о произошедшем факте в системе.
- Содержит — тип события, временну́ю метку, идентификаторы, полезную нагрузку.
- Неизменяемо после создания.
Топик (Topic)
- Логический канал для категоризации событий.
- Разделяет события по доменам или типам (например,
orders.created,users.registered). - Поддерживает публикацию от множества производителей и потребление множеством подписчиков.
Партиция (Partition)
- Горизонтальное разделение топика для масштабирования и параллелизма.
- События внутри партиции упорядочены.
- Ключ события определяет принадлежность к партиции (хеширование).
Смещение (Offset)
- Уникальный идентификатор события внутри партиции.
- Позволяет потребителю отслеживать позицию в потоке.
- Гарантирует доставку "хотя бы один раз" при сохранении смещения.
Группа потребителей (Consumer Group)
- Набор потребителей, совместно обрабатывающих топик.
- Каждая партиция назначается одному потребителю в группе.
- Обеспечивает масштабирование обработки и отказоустойчивость.
Retention Policy
- Политика хранения событий — по времени (например, 7 дней) или по объёму (например, 100 ГБ).
- Позволяет повторную обработку событий в течение заданного окна.
Потоковые операторы — это инструменты для обработки потоков данных в реальном времени. Они позволяют выполнять преобразования, фильтрацию и агрегацию данных.
Примеры потоковых операторов:
- Map — преобразует каждый элемент потока. Пример - увеличить все числа в потоке на 1:
stream.map(x => x + 1);
Разбор:
-
Пространства имён группируют модели и сервисы;
async/awaitне блокируют поток при HTTP-вызове наружу. -
Filter — отбирает только те элементы, кторые удовлетворяют условию. Пример - оставить только положительные числа:
stream.filter(x => x > 0);
Разбор:
-
Пространства имён группируют модели и сервисы;
async/awaitне блокируют поток при HTTP-вызове наружу. -
Reduce — агрегирует данные в один результат. Пример - подсчитать сумму всех чисел в потоке:
stream.reduce((acc, x) => acc + x, 0);
Разбор:
-
Пространства имён группируют модели и сервисы;
async/awaitне блокируют поток при HTTP-вызове наружу. -
FlatMap — преобразует каждый элемент в несколько элементов. Пример - разбить строку на слова:
stream.flatMap(sentence => sentence.split(' '));
Разбор:
- Пространства имён группируют модели и сервисы;
async/awaitне блокируют поток при HTTP-вызове наружу.
Потоковые операторы в интеграции используются как раз в Apache Kafka Streams. На практике это обработка событий в реальном времени (например, подсчёт количества кликов пользователей).
Kafta и MQTT - не только асинхронные протоколы, но и реактивные.
Реактивные системы обеспечивают мгновенный обмен данными, и они подходят для работы с большим количеством событий и пользователей. Такое можно встретить в чатах, онлайн-играх, биржевых платформах, системах мониторинга.
Apache Kafka
Архитектура
- Распределённый журнал (distributed commit log) с репликацией.
- Кластер состоит из брокеров (brokers), каждый хранит подмножество партиций.
- ZooKeeper (в версиях до 2.8, выбор controller через ZAB) или KRaft (встроенный Raft) для координации кластера — алгоритмы выбора лидера.
Гарантии
- Упорядоченность в пределах партиции.
- Персистентность через запись на диск с сегментацией.
- Репликация с настраиваемым фактором (replication factor).
- Подтверждение записи —
acks=0(без подтверждения),acks=1(лидер),acks=all(все реплики).
Компоненты экосистемы
- Kafka Connect — коннекторы для интеграции с внешними системами (базы данных, хранилища).
- Kafka Streams — библиотека для потоковой обработки на стороне приложения.
- ksqlDB — SQL-подобный движок для потоковых запросов.
- Schema Registry — централизованное управление схемами сообщений (Avro, Protobuf).
Apache Pulsar
Архитектура
- Разделение вычислений и хранения: брокеры обрабатывают запросы, BookKeeper хранит данные.
- Многоуровневое хранение (tiered storage) — горячие данные в BookKeeper, холодные в объектном хранилище (S3, GCS).
Отличия от Kafka
- Встроенная поддержка multi-tenancy с изоляцией на уровне тенантов и пространств имён.
- Гео-репликация на уровне кластера.
- Гибкая политика хранения с автоматическим перемещением данных.
NATS Streaming (STAN) / JetStream
Особенности
- Лёгкий протокол с минимальным оверхедом.
- JetStream (встроен в NATS 2.0+) добавляет персистентность и потоковую обработку.
- Поддержка "интересных" подписок (interest-based) и потоков (streams).
RabbitMQ Streams
Особенности
- Расширение RabbitMQ для потоковой обработки.
- Сохраняет совместимость с AMQP и добавляет семантику журналов.
- Поддержка offset tracking и повторного чтения.
Event Sourcing
Принцип
- Состояние агрегата определяется последовательностью событий, а не текущим снимком.
- События сохраняются в журнале как единственный источник истины.
Преимущества
- Полная история изменений для аудита и отладки.
- Возможность восстановления состояния на любой момент времени.
- Упрощение распределённых транзакций через компенсирующие события.
Реализация
Код ITЗагрузка примера кода…
Разбор:
- Пространства имён группируют модели и сервисы;
async/awaitне блокируют поток при HTTP-вызове наружу.
CQRS (Command Query Responsibility Segregation)
Принцип
- Разделение операций записи (команды) и чтения (запросы).
- Команды публикуют события в поток.
- Проекции (read models) обрабатывают события и обновляют материализованные представления.
Архитектура
┌─────────────┐ ┌──────────────┐ ┌──────────────┐
│ Command │─────▶│ Event │─────▶│ Event │
│ Handler │ │ Bus/Stream │ │ Store │
└─────────────┘ └──────────────┘ └──────────────┘
│
▼
┌─────────────────────┐
│ Projection/ │
│ Read Model │
└─────────────────────┘
│
▼
┌─────────────────────┐
│ Query Handler │
└─────────────────────┘
Разбор:
- Дерево каталогов фиксирует структуру проекта — где лежат исходники, конфиги, бинарники Kafka и лицензии.
Event-Driven Microservices
Принцип
- Микросервисы взаимодействуют через события, а не через синхронные вызовы.
- Каждый сервис имеет собственный контекст и базу данных.
- События служат для распространения изменений между границами контекстов.
Пример потока
1. Order Service: OrderCreated → Kafka (topic: orders)
2. Payment Service: потребляет OrderCreated → создаёт платеж → PaymentProcessed → Kafka (topic: payments)
3. Inventory Service: потребляет OrderCreated → резервирует товар → InventoryReserved → Kafka (topic: inventory)
4. Notification Service: потребляет все события → отправляет уведомления
Разбор:
- Схема показывает направление потока данных или сообщений между компонентами распределённой системы.
- Producer пишет в топик, Consumer читает асинхронно — UI или API не блокируются ожиданием записи в БД.
Change Data Capture (CDC)
Принцип
- Перехват изменений в базе данных и публикация их как событий.
- Позволяет синхронизировать данные между системами без изменения прикладного кода.
Инструменты
- Debezium — CDC для реляционных баз (PostgreSQL, MySQL) и MongoDB.
- Maxwell — MySQL binlog reader.
- Kafka Connect с коннекторами JDBC, JMS.
C# (.NET) с Confluent Kafka
Производитель событий
Код ITЗагрузка примера кода…
Разбор:
JsonPropertyNameсопоставляет поля JSON (orderId) со свойствами C#; десериализатор заполнит объект из тела сообщения RabbitMQ.- Пространства имён группируют модели и сервисы;
async/awaitне блокируют поток при HTTP-вызове наружу.
Потребитель событий
Код ITЗагрузка примера кода…
Разбор:
JsonPropertyNameсопоставляет поля JSON (orderId) со свойствами C#; десериализатор заполнит объект из тела сообщения RabbitMQ.- Пространства имён группируют модели и сервисы;
async/awaitне блокируют поток при HTTP-вызове наружу.
Потоковая обработка с Kafka Streams
Код ITЗагрузка примера кода…
Разбор:
- Пространства имён группируют модели и сервисы;
async/awaitне блокируют поток при HTTP-вызове наружу.
Python с confluent-kafka
Производитель
Код ITЗагрузка примера кода…
Разбор:
- Импорты подключают модули проекта; выполнение идёт сверху вниз — сначала конфигурация, затем объявление сущностей или маршрутов.
Потребитель
Код ITЗагрузка примера кода…
Разбор:
- Импорты подключают модули проекта; выполнение идёт сверху вниз — сначала конфигурация, затем объявление сущностей или маршрутов.
Потоковая обработка с Faust
Код ITЗагрузка примера кода…
Разбор:
- Импорты подключают модули проекта; выполнение идёт сверху вниз — сначала конфигурация, затем объявление сущностей или маршрутов.
Java с Spring Kafka
Конфигурация
Код ITЗагрузка примера кода…
Разбор:
- Пакеты
model,repository,serviceразделяют слои — сущность, доступ к данным, бизнес-логика обработки заказов.
Производитель
Код ITЗагрузка примера кода…
Разбор:
- Пакеты
model,repository,serviceразделяют слои — сущность, доступ к данным, бизнес-логика обработки заказов.
Потребитель
Код ITЗагрузка примера кода…
Разбор:
- Пакеты
model,repository,serviceразделяют слои — сущность, доступ к данным, бизнес-логика обработки заказов.
Потоковая обработка с Kafka Streams
Код ITЗагрузка примера кода…
Разбор:
- Пакеты
model,repository,serviceразделяют слои — сущность, доступ к данным, бизнес-логика обработки заказов.
Node.js с kafkajs
Производитель
Код ITЗагрузка примера кода…
Разбор:
- Прочитайте фрагмент построчно: каждая директива или вызов меняет поведение сервиса, брокера или балансировщика в цепочке микросервисов.
- После правки перезапустите соответствующий процесс или контейнер и проверьте логи, очередь RabbitMQ или ответ HTTP.
Потребитель
Код ITЗагрузка примера кода…
Разбор:
- Прочитайте фрагмент построчно: каждая директива или вызов меняет поведение сервиса, брокера или балансировщика в цепочке микросервисов.
- После правки перезапустите соответствующий процесс или контейнер и проверьте логи, очередь RabbitMQ или ответ HTTP.
Потоковая обработка с ksqlDB
Код ITЗагрузка примера кода…
Разбор:
- Прочитайте фрагмент построчно: каждая директива или вызов меняет поведение сервиса, брокера или балансировщика в цепочке микросервисов.
- После правки перезапустите соответствующий процесс или контейнер и проверьте логи, очередь RabbitMQ или ответ HTTP.
См. также
Базовая глава в разделе "Система и сеть" — Реактивные системы и потоки данных.
Практический чек-лист внедрения реактивной коммуникации
Перед запуском в продакшене проверьте базовые пункты:
wssвключён везде, где есть пользовательские или чувствительные данные;- настроены heartbeat и политика переподключения с backoff;
- есть ограничение на размер входящих сообщений и частоту отправки;
- реализованы idempotency/sequence-механизмы на уровне приложения;
- таймауты proxy/load balancer согласованы с жизненным циклом соединения;
- метрики собираются отдельно по подключениям, задержкам, ошибкам и отвалам.
Этот чек-лист снижает риск "тихих" деградаций, когда интерфейс вроде работает, но часть событий теряется или приходит с большой задержкой.
Когда WebSocket лучше не использовать
- если нужен только поток "сервер -> клиент" и нет двустороннего обмена (чаще достаточно SSE);
- если обновления редкие и допустим опрос раз в N секунд (обычный polling проще в поддержке);
- если инфраструктура строго ограничена прокси/сетевыми политиками и long-lived соединения создают операционные риски.
Выбор транспорта лучше делать от требований сценария, а не от популярности технологии.
Базовый разбор HTTP и HTTPS находится в отдельной статье — HTTP как основа веб-интеграций.