Перейти к основному содержимому

8.05. Реактивная коммуникация

Разработчику Архитектору Инженеру

Реактивная коммуникация

Что такое реактивная коммуникация?

Реактивные взаимодействия фокусируются на обмене событиями в режиме реального времени. Системы реагируют на события по мере их возникновения, обеспечивая непрерывный поток данных.

Транспорт:

  • WebSockets
  • SSE
  • Kafka Streams
  • MQTT

WebSockets

WebSockets является самым распространённым представителем реактивной коммуникации, ведь устанавливается двусторонний канал связи между клиентом и сервером, который позволяет обмениваться данными в реальном времени. Это протокол для двусторонней связи между клиентом и сервером через единственный постоянный канал. После установления соединения данные передаются без дополнительных HTTP-заголовков.

WebSocket — протокол полнодуплексной связи поверх единого TCP-соединения, обеспечивающий низколатентный обмен данными между клиентом и сервером в режиме реального времени.

Архитектурные особенности

Двунаправленная связь

  • После установления соединения данные передаются в обоих направлениях без необходимости повторного рукопожатия.
  • Сервер может инициировать отправку сообщений клиенту без предварительного запроса (push-модель).

Единое соединение

  • Сохраняется постоянное соединение на протяжении всего сеанса работы.
  • Устраняется оверхед повторного установления соединения для каждого сообщения (в отличие от HTTP).

Низкая задержка

  • Минимальные накладные расходы на передачу: заголовок фрейма составляет 2–14 байт.
  • Отсутствие необходимости в заголовках HTTP для каждого сообщения.

Работа через прокси и брандмауэры

  • Использует стандартные порты 80 (ws) и 443 (wss), совместим с существующей веб-инфраструктурой.
  • Начинается с HTTP-рукопожатия, что позволяет проходить через большинство прокси.

Протокол и жизненный цикл соединения

1. Рукопожатие (Handshake)

GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
Sec-WebSocket-Protocol: chat, superchat
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Protocol: chat
  • Клиент отправляет обычный HTTP-запрос с заголовками Upgrade: websocket.
  • Сервер отвечает кодом 101 (Switching Protocols), подтверждая переход на протокол WebSocket.
  • Заголовок Sec-WebSocket-Key используется для предотвращения кэширования и проверки подлинности.

2. Передача данных

  • Данные передаются в виде фреймов (frames) с минимальным заголовком.
  • Каждый фрейм содержит:
    • Флаги (FIN, RSV1-3, opcode)
    • Длина полезной нагрузки (7, 7+16, или 7+64 бита)
    • Маска (4 байта, обязательна для клиентских фреймов)
    • Полезная нагрузка

3. Закрытие соединения

0x88 0x00  // FIN, opcode=Close, length=0
  • Грациозное закрытие через контрольный фрейм CLOSE.
  • Код состояния (1000 — нормальное закрытие, 1001 — уход, 4000+ — пользовательские коды).

Типы фреймов

OpcodeНазначениеОписание
0x0ContinuationПродолжение фрагментированного сообщения
0x1TextТекстовые данные в UTF-8
0x2BinaryБинарные данные
0x8CloseЗапрос на закрытие соединения
0x9PingПроверка активности соединения
0xAPongОтвет на Ping

Механизмы надёжности

Контроль целостности

  • Каждое сообщение может быть фрагментировано на несколько фреймов с восстановлением на стороне получателя.
  • Проверка порядка фреймов и сборка полного сообщения.

Проверка активности (Heartbeat)

  • Ping/Pong фреймы для обнаружения разрыва соединения.
  • Типичный интервал: 30–60 секунд.

Переподключение

  • Автоматическое восстановление соединения при обнаружении разрыва.
  • Экспоненциальная задержка между попытками для предотвращения шторма запросов.

Аутентификация

  • Токены передаются в начальном HTTP-рукопожатии (Authorization header).
  • Поддержка сессий через cookies (если разрешено CORS).

Типичные сценарии применения

Чаты и мессенджеры

  • Мгновенная доставка сообщений между пользователями.
  • Индикация набора текста, статус онлайн/оффлайн.

Коллаборативные приложения

  • Совместное редактирование документов в реальном времени.
  • Синхронизация состояния между несколькими клиентами.

Финансовые приложения

  • Потоковые котировки на бирже.
  • Уведомления о транзакциях.

Игры

  • Синхронизация игрового состояния между игроками.
  • Мультиплеерные взаимодействия с минимальной задержкой.

Мониторинг и телеметрия

  • Поток метрик серверов и приложений.
  • Обновление дашбордов в реальном времени.

Уведомления

  • Push-уведомления в веб-приложениях.
  • Событийные оповещения без опроса.

JavaScript (браузерный клиент)

// Создание соединения
const ws = new WebSocket('wss://example.com/chat');

// Обработчики событий
ws.addEventListener('open', (event) => {
console.log('Соединение установлено');
// Отправка сообщения после установки соединения
ws.send(JSON.stringify({
type: 'join',
room: 'general',
username: 'Тимур'
}));
});

ws.addEventListener('message', (event) => {
const data = JSON.parse(event.data);
console.log('Получено сообщение:', data);

switch (data.type) {
case 'message':
displayMessage(data.username, data.text);
break;
case 'user_joined':
updateUserList(data.users);
break;
}
});

ws.addEventListener('error', (event) => {
console.error('Ошибка соединения:', event);
});

ws.addEventListener('close', (event) => {
console.log('Соединение закрыто:', event.code, event.reason);
// Автоматическое переподключение
setTimeout(() => reconnect(), 3000);
});

// Отправка сообщения
function sendMessage(text) {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({
type: 'message',
text: text,
timestamp: Date.now()
}));
}
}

// Проверка активности
let pingInterval = setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: 'ping' }));
}
}, 30000);

ws.addEventListener('close', () => {
clearInterval(pingInterval);
});

Node.js (сервер на ws)

const WebSocket = require('ws');
const http = require('http');

// HTTP сервер для обслуживания статики
const server = http.createServer((req, res) => {
res.writeHead(200, { 'Content-Type': 'text/html' });
res.end('<h1>WebSocket Chat</h1>');
});

// WebSocket сервер
const wss = new WebSocket.Server({ server });

// Хранилище подключений
const clients = new Map();
const rooms = new Map();

wss.on('connection', (ws, req) => {
const clientId = generateId();
clients.set(clientId, ws);

console.log(`Новое соединение: ${clientId}`);

// Обработка сообщений
ws.on('message', (data) => {
try {
const message = JSON.parse(data);
handleClientMessage(clientId, message);
} catch (error) {
console.error('Ошибка обработки сообщения:', error);
}
});

// Обработка закрытия соединения
ws.on('close', () => {
console.log(`Соединение закрыто: ${clientId}`);
handleClientDisconnect(clientId);
clients.delete(clientId);
});

// Обработка ошибок
ws.on('error', (error) => {
console.error(`Ошибка соединения ${clientId}:`, error);
clients.delete(clientId);
});

// Отправка приветственного сообщения
ws.send(JSON.stringify({
type: 'welcome',
clientId: clientId,
timestamp: Date.now()
}));
});

function handleClientMessage(clientId, message) {
const ws = clients.get(clientId);
if (!ws || ws.readyState !== WebSocket.OPEN) return;

switch (message.type) {
case 'join':
joinRoom(clientId, message.room, message.username);
break;

case 'message':
broadcastToRoom(message.room, {
type: 'message',
username: message.username,
text: message.text,
timestamp: Date.now()
}, clientId);
break;

case 'ping':
ws.send(JSON.stringify({ type: 'pong' }));
break;

default:
console.warn(`Неизвестный тип сообщения: ${message.type}`);
}
}

function joinRoom(clientId, roomName, username) {
if (!rooms.has(roomName)) {
rooms.set(roomName, new Set());
}

const room = rooms.get(roomName);
room.add(clientId);

// Уведомить других участников комнаты
broadcastToRoom(roomName, {
type: 'user_joined',
username: username,
clientId: clientId,
users: Array.from(room).map(id => ({
id: id,
username: getUsername(id)
}))
});

// Подтвердить присоединение
const ws = clients.get(clientId);
if (ws) {
ws.send(JSON.stringify({
type: 'joined',
room: roomName,
users: Array.from(room)
}));
}
}

function broadcastToRoom(roomName, message, excludeClientId = null) {
const room = rooms.get(roomName);
if (!room) return;

const data = JSON.stringify(message);

room.forEach(clientId => {
if (clientId === excludeClientId) return;

const ws = clients.get(clientId);
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(data);
} else {
// Очистить мёртвые соединения
room.delete(clientId);
}
});

// Удалить пустую комнату
if (room.size === 0) {
rooms.delete(roomName);
}
}

function handleClientDisconnect(clientId) {
// Удалить клиента из всех комнат
rooms.forEach((room, roomName) => {
if (room.has(clientId)) {
room.delete(clientId);
broadcastToRoom(roomName, {
type: 'user_left',
clientId: clientId
});

if (room.size === 0) {
rooms.delete(roomName);
}
}
});
}

function generateId() {
return Math.random().toString(36).substr(2, 9);
}

// Запуск сервера
server.listen(8080, () => {
console.log('Сервер запущен на порту 8080');
});

C# (ASP.NET Core)

using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();

var app = builder.Build();

// Middleware для обработки WebSocket
app.UseWebSockets(new WebSocketOptions
{
KeepAliveInterval = TimeSpan.FromSeconds(30),
ReceiveBufferSize = 4 * 1024
});

app.Map("/ws", async context =>
{
if (!context.WebSockets.IsWebSocketRequest)
{
context.Response.StatusCode = StatusCodes.Status400BadRequest;
return;
}

var webSocket = await context.WebSockets.AcceptWebSocketAsync();
await HandleWebSocketConnection(webSocket);
});

app.Run();

async Task HandleWebSocketConnection(WebSocket webSocket)
{
var clientId = Guid.NewGuid().ToString();
var buffer = new byte[4096];

Console.WriteLine($"Новое соединение: {clientId}");

try
{
while (webSocket.State == WebSocketState.Open)
{
var result = await webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer), CancellationToken.None);

if (result.MessageType == WebSocketMessageType.Close)
{
await webSocket.CloseAsync(
WebSocketCloseStatus.NormalClosure,
"Закрыто клиентом",
CancellationToken.None);
break;
}

// Обработка текстового сообщения
if (result.MessageType == WebSocketMessageType.Text)
{
var message = Encoding.UTF8.GetString(buffer, 0, result.Count);
await ProcessMessage(webSocket, clientId, message);
}
}
}
catch (Exception ex)
{
Console.WriteLine($"Ошибка соединения {clientId}: {ex.Message}");
}
finally
{
Console.WriteLine($"Соединение закрыто: {clientId}");
}
}

async Task ProcessMessage(WebSocket socket, string clientId, string message)
{
try
{
using var doc = JsonDocument.Parse(message);
var root = doc.RootElement;

if (!root.TryGetProperty("type", out var typeElement))
return;

var type = typeElement.GetString();

switch (type)
{
case "ping":
await SendMessage(socket, new
{
type = "pong",
timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
});
break;

case "message":
if (root.TryGetProperty("text", out var textElement))
{
var response = new
{
type = "message",
clientId = clientId,
text = textElement.GetString(),
timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};
await SendMessage(socket, response);
}
break;

default:
await SendMessage(socket, new
{
type = "error",
message = $"Неизвестный тип: {type}"
});
break;
}
}
catch (JsonException ex)
{
await SendMessage(socket, new
{
type = "error",
message = $"Ошибка парсинга JSON: {ex.Message}"
});
}
}

async Task SendMessage(WebSocket socket, object message)
{
if (socket.State != WebSocketState.Open)
return;

var json = JsonSerializer.Serialize(message);
var bytes = Encoding.UTF8.GetBytes(json);

await socket.SendAsync(
new ArraySegment<byte>(bytes),
WebSocketMessageType.Text,
true,
CancellationToken.None);
}

Python (FastAPI + WebSockets)

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from typing import Dict, Set
import json
import asyncio

app = FastAPI()

# Хранилище подключений
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
self.rooms: Dict[str, Set[str]] = {}

async def connect(self, websocket: WebSocket, client_id: str):
await websocket.accept()
self.active_connections[client_id] = websocket
print(f"Новое соединение: {client_id}")

def disconnect(self, client_id: str):
if client_id in self.active_connections:
del self.active_connections[client_id]
print(f"Соединение закрыто: {client_id}")

# Удалить из всех комнат
for room in self.rooms.values():
room.discard(client_id)

async def send_personal_message(self, message: dict, client_id: str):
if client_id in self.active_connections:
websocket = self.active_connections[client_id]
await websocket.send_text(json.dumps(message))

async def broadcast(self, message: dict, room: str = None):
if room and room in self.rooms:
recipients = self.rooms[room]
else:
recipients = self.active_connections.keys()

data = json.dumps(message)
disconnected = []

for client_id in recipients:
if client_id in self.active_connections:
websocket = self.active_connections[client_id]
try:
await websocket.send_text(data)
except Exception:
disconnected.append(client_id)

# Очистить мёртвые соединения
for client_id in disconnected:
self.disconnect(client_id)

manager = ConnectionManager()

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await manager.connect(websocket, client_id)

try:
while True:
data = await websocket.receive_text()
message = json.loads(data)

msg_type = message.get("type")

if msg_type == "ping":
await manager.send_personal_message({
"type": "pong",
"timestamp": asyncio.get_event_loop().time()
}, client_id)

elif msg_type == "join":
room = message.get("room")
if room:
if room not in manager.rooms:
manager.rooms[room] = set()
manager.rooms[room].add(client_id)

await manager.broadcast({
"type": "user_joined",
"client_id": client_id,
"room": room
}, room)

elif msg_type == "message":
room = message.get("room")
await manager.broadcast({
"type": "message",
"client_id": client_id,
"text": message.get("text"),
"timestamp": asyncio.get_event_loop().time()
}, room)

except WebSocketDisconnect:
manager.disconnect(client_id)
except Exception as e:
print(f"Ошибка: {e}")
manager.disconnect(client_id)

@app.get("/")
async def get():
return HTMLResponse("""
<!DOCTYPE html>
<html>
<head>
<title>WebSocket Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<div id="messages"></div>
<input id="message" type="text" />
<button onclick="sendMessage()">Send</button>

<script>
const ws = new WebSocket("ws://localhost:8000/ws/test_client");

ws.onmessage = (event) => {
const messages = document.getElementById('messages');
const data = JSON.parse(event.data);
const message = document.createElement('div');
message.textContent = JSON.stringify(data);
messages.appendChild(message);
};

function sendMessage() {
const input = document.getElementById('message');
ws.send(JSON.stringify({
type: 'message',
text: input.value
}));
input.value = '';
}
</script>
</body>
</html>
""")

Java (Spring WebSocket)

import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Configuration
public class WebSocketHandler extends TextWebSocketHandler {

private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();

@Override
public void afterConnectionEstablished(WebSocketSession session) {
String sessionId = session.getId();
sessions.put(sessionId, session);

System.out.println("Новое соединение: " + sessionId);

try {
session.sendMessage(new TextMessage(
"{\"type\":\"welcome\",\"sessionId\":\"" + sessionId + "\"}"
));
} catch (Exception e) {
System.err.println("Ошибка отправки приветствия: " + e.getMessage());
}
}

@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
String sessionId = session.getId();

try {
String payload = message.getPayload();
System.out.println("Сообщение от " + sessionId + ": " + payload);

// Эхо-ответ
session.sendMessage(new TextMessage(
"{\"type\":\"echo\",\"original\":" + payload + "}"
));

} catch (Exception e) {
System.err.println("Ошибка обработки сообщения: " + e.getMessage());
}
}

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
String sessionId = session.getId();
sessions.remove(sessionId);

System.out.println("Соединение закрыто: " + sessionId + " (" + status + ")");
}

@Override
public void handleTransportError(WebSocketSession session, Throwable exception) {
System.err.println("Ошибка транспорта: " + exception.getMessage());
}

// Метод для отправки сообщения конкретному клиенту
public void sendMessageTo(String sessionId, String message) {
WebSocketSession session = sessions.get(sessionId);
if (session != null && session.isOpen()) {
try {
session.sendMessage(new TextMessage(message));
} catch (Exception e) {
System.err.println("Ошибка отправки: " + e.getMessage());
}
}
}

// Метод для рассылки всем клиентам
public void broadcast(String message) {
TextMessage textMessage = new TextMessage(message);

sessions.forEach((sessionId, session) -> {
if (session.isOpen()) {
try {
session.sendMessage(textMessage);
} catch (Exception e) {
System.err.println("Ошибка рассылки: " + e.getMessage());
}
}
});
}
}

Конфигурация WebSocket в Spring:

import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

private final WebSocketHandler webSocketHandler;

public WebSocketConfig(WebSocketHandler webSocketHandler) {
this.webSocketHandler = webSocketHandler;
}

@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(webSocketHandler, "/ws")
.setAllowedOrigins("*");
}
}

SSE

Server-Sent Events (SSE) предполагает, что сервер отправляет обновления клиенту через однонаправленный канал, это протокол для однонаправленной передачи данных от сервера к клиенту через HTTP. Сервер отправляет данные клиенту, но обратная связь невозможна. Если соединение разрывается, клиент автоматически переподключается. Пример - трансляция обновлений и уведомления в реальном времени.

Server-Sent Events — стандарт веб-API для односторонней потоковой передачи данных от сервера к клиенту через обычное HTTP-соединение. Клиент устанавливает соединение и получает события в режиме реального времени без необходимости повторных запросов.

Архитектурные особенности

Односторонняя связь

  • Данные передаются только от сервера к клиенту.
  • Клиент не может отправлять сообщения через установленное соединение (только через отдельные HTTP-запросы).

Постоянное соединение

  • После установки соединение остаётся открытым до явного закрытия или таймаута.
  • Сервер отправляет данные по мере их появления, без ожидания запроса от клиента.

Текстовый формат

  • Передача данных осуществляется в виде текстовых событий в кодировке UTF-8.
  • Поддержка автоматического парсинга браузером через EventSource API.

Автоматическое восстановление

  • При разрыве соединения браузер автоматически пытается переподключиться.
  • Интервал повторных попыток настраивается сервером через поле retry.

Совместимость с инфраструктурой

  • Работает поверх стандартного HTTP/HTTPS без необходимости специальных протоколов.
  • Проходит через прокси, балансировщики и брандмауэры без дополнительной настройки.

Протокол и формат сообщений

HTTP-заголовки ответа

HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
Transfer-Encoding: chunked
  • Content-Type: text/event-stream — обязательный заголовок, идентифицирующий SSE-поток.
  • Cache-Control: no-cache — предотвращает кэширование промежуточными прокси.
  • Connection: keep-alive — поддерживает постоянное соединение.

Формат события

event: message
data: {"type":"notification","text":"Новое сообщение"}
id: 12345
retry: 30000

data: Простое сообщение без типа

: Это комментарий (игнорируется клиентом)

Структура поля события:

  • event — тип события (по умолчанию message).
  • data — полезная нагрузка (может занимать несколько строк).
  • id — уникальный идентификатор события для восстановления после переподключения.
  • retry — интервал переподключения в миллисекундах (по умолчанию 3000 мс).

Многострочные данные

data: Первая строка
data: Вторая строка
data: Третья строка

После парсинга объединяются в одну строку с символами новой строки.

Разделитель событий

  • Пустая строка (\n\n) завершает событие.
  • Сервер должен отправлять \n\n после каждого события.

Жизненный цикл соединения

1. Установление соединения

const eventSource = new EventSource('https://example.com/events');

2. Получение событий

eventSource.onmessage = (event) => {
console.log('Данные:', event.data);
};

eventSource.addEventListener('notification', (event) => {
console.log('Тип события:', event.type);
console.log('Данные:', event.data);
console.log('ID события:', event.lastEventId);
});

3. Автоматическое переподключение

  • При разрыве соединения браузер повторяет запрос с заголовком Last-Event-ID.
  • Сервер может возобновить поток с последнего полученного события.
GET /events HTTP/1.1
Host: example.com
Last-Event-ID: 12345

4. Закрытие соединения

eventSource.close();

Механизмы надёжности

Идентификаторы событий

  • Поле id позволяет клиенту отслеживать последнее полученное событие.
  • При переподключении браузер отправляет Last-Event-ID в заголовке запроса.
  • Сервер может возобновить поток с указанного события, предотвращая потерю данных.

Настройка интервала повтора

retry: 10000
  • Значение в миллисекундах.
  • Применяется ко всем последующим переподключениям до следующего изменения.

Обработка ошибок

eventSource.onerror = (error) => {
console.error('Ошибка соединения:', error);

// Проверка состояния
if (eventSource.readyState === EventSource.CLOSED) {
console.log('Соединение закрыто');
} else if (eventSource.readyState === EventSource.CONNECTING) {
console.log('Попытка переподключения...');
}
};

Состояния соединения

  • EventSource.CONNECTING (0) — соединение отсутствует или устанавливается.
  • EventSource.OPEN (1) — соединение открыто и готово к приёму событий.
  • EventSource.CLOSED (2) — соединение закрыто.

JavaScript (клиент)

// Создание соединения
const eventSource = new EventSource('https://api.example.com/stream', {
withCredentials: true // Для передачи cookies
});

// Обработчик всех событий (тип по умолчанию 'message')
eventSource.onmessage = (event) => {
console.log('Получено сообщение:', event.data);
};

// Обработчик события с типом 'notification'
eventSource.addEventListener('notification', (event) => {
const data = JSON.parse(event.data);
console.log('Уведомление:', data);
showNotification(data.title, data.body);
});

// Обработчик события с типом 'progress'
eventSource.addEventListener('progress', (event) => {
const data = JSON.parse(event.data);
updateProgressBar(data.percent);
});

// Обработчик ошибок
eventSource.onerror = (error) => {
console.error('Ошибка SSE:', error);

// Проверка состояния соединения
switch (eventSource.readyState) {
case EventSource.CONNECTING:
console.log('Переподключение...');
break;
case EventSource.CLOSED:
console.log('Соединение закрыто');
// Можно создать новое соединение
break;
}
};

// Получение последнего ID события
console.log('Последнее событие ID:', eventSource.lastEventId);

// Закрытие соединения
function closeConnection() {
eventSource.close();
console.log('Соединение закрыто вручную');
}

// Пример использования с обработкой различных типов событий
const handlers = {
'message': (data) => console.log('Сообщение:', data),
'update': (data) => updateUI(data),
'error': (data) => showError(data),
'complete': (data) => {
console.log('Завершено:', data);
eventSource.close();
}
};

Object.entries(handlers).forEach(([type, handler]) => {
eventSource.addEventListener(type, (event) => {
try {
const data = JSON.parse(event.data);
handler(data);
} catch (e) {
console.error('Ошибка парсинга JSON:', e);
}
});
});

C# (ASP.NET Core)

using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

[ApiController]
[Route("api/[controller]")]
public class EventsController : ControllerBase
{
[HttpGet]
public async Task GetEvents()
{
// Установка заголовков SSE
Response.Headers.Append("Content-Type", "text/event-stream");
Response.Headers.Append("Cache-Control", "no-cache");
Response.Headers.Append("Connection", "keep-alive");

// Отправка начального события
await SendEventAsync("connected", new { message = "Подключено к потоку событий" });

// Генерация событий в реальном времени
var cancellationToken = HttpContext.RequestAborted;

try
{
int eventId = 0;

while (!cancellationToken.IsCancellationRequested)
{
// Отправка события каждые 2 секунды
await SendEventAsync("tick", new
{
id = Interlocked.Increment(ref eventId),
timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
message = $"Событие #{eventId}"
});

// Отправка события с указанием интервала повтора
if (eventId == 1)
{
await SendRetryAsync(5000); // 5 секунд
}

// Отправка комментария (игнорируется клиентом)
await Response.WriteAsync($": Серверное время {DateTime.Now}\n\n");
await Response.Body.FlushAsync(cancellationToken);

await Task.Delay(2000, cancellationToken);
}
}
catch (TaskCanceledException)
{
// Клиент закрыл соединение
Console.WriteLine("Клиент отключился");
}
catch (Exception ex)
{
Console.WriteLine($"Ошибка потока: {ex.Message}");
}
finally
{
// Отправка финального события
await SendEventAsync("disconnected", new { message = "Поток завершён" });
}
}

[HttpGet("notifications")]
public async Task GetNotifications()
{
Response.Headers.Append("Content-Type", "text/event-stream");
Response.Headers.Append("Cache-Control", "no-cache");

var cancellationToken = HttpContext.RequestAborted;

// Получение последнего ID события из заголовка
var lastEventId = Request.Headers["Last-Event-ID"].ToString();
int lastId = string.IsNullOrEmpty(lastEventId) ? 0 : int.Parse(lastEventId);

Console.WriteLine($"Восстановление с события #{lastId}");

try
{
// Симуляция получения уведомлений из базы данных или очереди
while (!cancellationToken.IsCancellationRequested)
{
var notifications = GetNewNotifications(lastId);

foreach (var notification in notifications)
{
await SendEventAsync("notification", new
{
id = notification.Id,
type = notification.Type,
title = notification.Title,
body = notification.Body,
timestamp = notification.Timestamp
}, notification.Id.ToString());

lastId = notification.Id;
}

await Response.Body.FlushAsync(cancellationToken);
await Task.Delay(1000, cancellationToken);
}
}
catch (TaskCanceledException)
{
Console.WriteLine("Клиент отключился от уведомлений");
}
}

/// <summary>
/// Отправка события в формате SSE
/// </summary>
private async Task SendEventAsync(string eventType, object data, string id = null)
{
var sb = new StringBuilder();

if (!string.IsNullOrEmpty(id))
{
sb.AppendLine($"id: {id}");
}

sb.AppendLine($"event: {eventType}");
sb.AppendLine($"data: {System.Text.Json.JsonSerializer.Serialize(data)}");
sb.AppendLine();

await Response.WriteAsync(sb.ToString());
}

/// <summary>
/// Установка интервала повторного подключения
/// </summary>
private async Task SendRetryAsync(int milliseconds)
{
await Response.WriteAsync($"retry: {milliseconds}\n\n");
}

/// <summary>
/// Симуляция получения новых уведомлений
/// </summary>
private List<Notification> GetNewNotifications(int afterId)
{
// В реальном приложении: запрос к базе данных или очереди сообщений
var notifications = new List<Notification>();

// Пример генерации
var random = new Random();
if (random.Next(10) > 7)
{
notifications.Add(new Notification
{
Id = afterId + 1,
Type = "info",
Title = "Новое уведомление",
Body = $"Случайное событие #{afterId + 1}",
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
});
}

return notifications;
}

private class Notification
{
public int Id { get; set; }
public string Type { get; set; }
public string Title { get; set; }
public string Body { get; set; }
public long Timestamp { get; set; }
}
}

Python (FastAPI)

from fastapi import FastAPI, Response, Request, HTTPException
from fastapi.responses import StreamingResponse
from typing import Dict, Any
import asyncio
import json
import time
import uvicorn

app = FastAPI()

def generate_sse_event(
event_type: str = "message",
data: Any = None,
event_id: str = None,
retry: int = None
) -> str:
"""Форматирование события в формате SSE"""
lines = []

if event_id is not None:
lines.append(f"id: {event_id}")

if event_type != "message":
lines.append(f"event: {event_type}")

if data is not None:
if isinstance(data, dict):
data_str = json.dumps(data)
else:
data_str = str(data)

# Многострочные данные
for line in data_str.split('\n'):
lines.append(f"data: {line}")

if retry is not None:
lines.append(f"retry: {retry}")

lines.append("") # Пустая строка завершает событие
lines.append("") # Двойной перевод строки

return "\n".join(lines)

async def event_stream(request: Request, start_id: int = 0):
"""Генератор событий для SSE"""
event_id = start_id

# Отправка начального события
yield generate_sse_event(
event_type="connected",
data={"message": "Подключено к потоку событий", "start_id": start_id},
event_id=str(event_id)
)
event_id += 1

# Установка интервала повтора
yield generate_sse_event(retry=5000)

try:
while True:
# Проверка отключения клиента
if await request.is_disconnected():
print("Клиент отключился")
break

# Генерация события
event_data = {
"id": event_id,
"timestamp": time.time(),
"message": f"Событие #{event_id}"
}

yield generate_sse_event(
event_type="tick",
data=event_data,
event_id=str(event_id)
)

event_id += 1

# Отправка комментария
yield f": Серверное время {time.strftime('%H:%M:%S')}\n\n"

await asyncio.sleep(2)

except asyncio.CancelledError:
print("Поток отменён")
raise
except Exception as e:
print(f"Ошибка в потоке: {e}")
yield generate_sse_event(
event_type="error",
data={"message": str(e)}
)
finally:
# Финальное событие
yield generate_sse_event(
event_type="disconnected",
data={"message": "Поток завершён"}
)

@app.get("/events")
async def sse_endpoint(request: Request):
"""Эндпоинт для SSE потока"""

# Получение последнего ID события
last_event_id = request.headers.get("Last-Event-ID")
start_id = int(last_event_id) if last_event_id else 0

print(f"Подключение клиента, восстановление с события #{start_id}")

return StreamingResponse(
event_stream(request, start_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # Отключение буферизации nginx
}
)

# Симуляция уведомлений
notifications_store: Dict[int, Dict] = {}
notification_counter = 0

async def notification_stream(request: Request):
"""Поток уведомлений"""
global notification_counter

last_id = 0

# Получение последнего события
last_event_id = request.headers.get("Last-Event-ID")
if last_event_id:
last_id = int(last_event_id)
print(f"Восстановление уведомлений с #{last_id}")

try:
while True:
if await request.is_disconnected():
break

# Проверка новых уведомлений
new_notifications = [
(nid, data) for nid, data in notifications_store.items()
if nid > last_id
]

for nid, data in sorted(new_notifications):
yield generate_sse_event(
event_type="notification",
data=data,
event_id=str(nid)
)
last_id = nid

await asyncio.sleep(1)

except asyncio.CancelledError:
raise

@app.get("/notifications")
async def notifications_endpoint(request: Request):
"""Эндпоинт для потока уведомлений"""
return StreamingResponse(
notification_stream(request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
)

@app.post("/notify")
async def create_notification(notification: Dict[str, Any]):
"""Создание нового уведомления"""
global notification_counter

notification_counter += 1

notification_data = {
"id": notification_counter,
"type": notification.get("type", "info"),
"title": notification.get("title", "Уведомление"),
"body": notification.get("body", ""),
"timestamp": time.time()
}

notifications_store[notification_counter] = notification_data

return {"status": "created", "id": notification_counter}

if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)

Java (Spring WebFlux)

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.util.concurrent.Queues;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

@RestController
public class SseController {

// Хранилище для клиентских потоков
private final Map<String, Sinks.Many<String>> clientSinks = new ConcurrentHashMap<>();
private final AtomicInteger clientIdGenerator = new AtomicInteger(0);

/**
* Простой поток событий с фиксированным интервалом
*/
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> getEvents() {
return Flux.interval(Duration.ofSeconds(2))
.map(sequence -> ServerSentEvent.<String>builder()
.id(String.valueOf(sequence))
.event("tick")
.data(String.format("{\"sequence\":%d,\"timestamp\":%d}",
sequence, System.currentTimeMillis()))
.build());
}

/**
* Поток с автоматическим переподключением
*/
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> eventStream() {
return Flux.interval(Duration.ofSeconds(1))
.take(30) // Ограничение для демонстрации
.map(sequence -> {
if (sequence == 0) {
// Первое событие с указанием интервала повтора
return ServerSentEvent.<String>builder()
.event("connected")
.data("{\"status\":\"connected\"}")
.build();
} else if (sequence == 1) {
// Установка интервала повтора
return ServerSentEvent.<String>builder()
.comment("retry: 5000")
.build();
} else {
return ServerSentEvent.<String>builder()
.id(String.valueOf(sequence))
.event("message")
.data(String.format("{\"count\":%d,\"time\":%d}",
sequence, System.currentTimeMillis()))
.build();
}
});
}

/**
* Broadcast уведомлений для всех подключенных клиентов
*/
@GetMapping(value = "/notifications", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> notificationsStream() {
String clientId = "client-" + clientIdGenerator.incrementAndGet();
Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE);

clientSinks.put(clientId, sink);

System.out.println("Новый клиент подключился: " + clientId);

// Отправка приветственного сообщения
sink.tryEmitNext("{\"type\":\"connected\",\"clientId\":\"" + clientId + "\"}");

return sink.asFlux()
.map(data -> ServerSentEvent.<String>builder()
.data(data)
.build())
.doFinally(signalType -> {
System.out.println("Клиент отключился: " + clientId);
clientSinks.remove(clientId);
});
}

/**
* Отправка уведомления всем клиентам
*/
public void broadcastNotification(String type, String title, String body) {
String message = String.format(
"{\"type\":\"%s\",\"title\":\"%s\",\"body\":\"%s\",\"timestamp\":%d}",
type, title, body, System.currentTimeMillis()
);

clientSinks.forEach((clientId, sink) -> {
sink.tryEmitNext(message);
});
}

/**
* Пример использования broadcast
*/
public void simulateNotifications() {
new Thread(() -> {
int count = 0;
while (true) {
try {
Thread.sleep(3000);
count++;
broadcastNotification(
"info",
"Уведомление #" + count,
"Тестовое уведомление в " + System.currentTimeMillis()
);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}).start();
}
}

Spring MVC альтернатива (с использованием SseEmitter):

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@RestController
public class SseEmitterController {

private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
private final ExecutorService executor = Executors.newCachedThreadPool();

@GetMapping(value = "/sse", produces = "text/event-stream")
public SseEmitter handleSse() {
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE); // Без таймаута

String emitterId = "emitter-" + System.currentTimeMillis();
emitters.put(emitterId, emitter);

// Настройка обработчиков
emitter.onCompletion(() -> {
System.out.println("SSE завершён: " + emitterId);
emitters.remove(emitterId);
});

emitter.onTimeout(() -> {
System.out.println("SSE таймаут: " + emitterId);
emitters.remove(emitterId);
});

emitter.onError(error -> {
System.out.println("SSE ошибка: " + emitterId + " - " + error.getMessage());
emitters.remove(emitterId);
});

// Отправка начального сообщения
try {
emitter.send(SseEmitter.event()
.name("connected")
.data("{\"status\":\"connected\",\"id\":\"" + emitterId + "\"}"));
} catch (IOException e) {
e.printStackTrace();
}

// Запуск потока отправки событий
executor.submit(() -> {
int count = 0;
try {
while (true) {
Thread.sleep(2000);

if (!emitters.containsKey(emitterId)) {
break;
}

count++;
emitter.send(SseEmitter.event()
.id(String.valueOf(count))
.name("tick")
.data("{\"count\":" + count + ",\"time\":" + System.currentTimeMillis() + "}"));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (IOException e) {
System.out.println("Ошибка отправки: " + e.getMessage());
emitters.remove(emitterId);
}
});

return emitter;
}

/**
* Broadcast сообщения всем подключенным клиентам
*/
public void broadcast(String eventName, Object data) {
emitters.forEach((id, emitter) -> {
try {
emitter.send(SseEmitter.event()
.name(eventName)
.data(data));
} catch (IOException e) {
System.out.println("Ошибка broadcast для " + id + ": " + e.getMessage());
emitters.remove(id);
}
});
}
}

Node.js (Express)

const express = require('express');
const app = express();

/**
* Простой SSE поток
*/
app.get('/events', (req, res) => {
// Установка заголовков
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no'); // Отключение буферизации nginx

// Получение последнего события
const lastEventId = req.headers['last-event-id'];
let eventId = lastEventId ? parseInt(lastEventId) : 0;

console.log(`Клиент подключился, последнее событие: ${lastEventId || 'нет'}`);

// Отправка начального события
sendEvent(res, 'connected', { message: 'Подключено к потоку' }, ++eventId);

// Установка интервала повтора
res.write(`retry: 5000\n\n`);

// Генерация событий
const interval = setInterval(() => {
if (res.writableEnded) {
clearInterval(interval);
console.log('Клиент отключился');
return;
}

const data = {
id: ++eventId,
timestamp: Date.now(),
message: `Событие #${eventId}`
};

sendEvent(res, 'tick', data, eventId.toString());

// Отправка комментария
res.write(`: Серверное время ${new Date().toISOString()}\n\n`);
res.flush();

}, 2000);

// Обработка отключения
req.on('close', () => {
clearInterval(interval);
console.log('Соединение закрыто клиентом');
});

// Обработка ошибок
res.on('error', (err) => {
clearInterval(interval);
console.error('Ошибка потока:', err);
});
});

/**
* Поток уведомлений
*/
const notifications = [];
let notificationId = 0;

app.get('/notifications', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');

const lastEventId = req.headers['last-event-id'];
let lastId = lastEventId ? parseInt(lastEventId) : 0;

console.log(`Уведомления: восстановление с #${lastId}`);

// Отправка накопленных уведомлений
notifications
.filter(n => n.id > lastId)
.forEach(notification => {
sendEvent(res, 'notification', notification.data, notification.id.toString());
});

// Отслеживание новых уведомлений
const checkInterval = setInterval(() => {
if (res.writableEnded) {
clearInterval(checkInterval);
return;
}

const newNotifications = notifications.filter(n => n.id > lastId);

newNotifications.forEach(notification => {
sendEvent(res, 'notification', notification.data, notification.id.toString());
lastId = notification.id;
});

if (newNotifications.length > 0) {
res.flush();
}

}, 1000);

req.on('close', () => {
clearInterval(checkInterval);
});
});

/**
* Создание уведомления
*/
app.post('/notify', express.json(), (req, res) => {
notificationId++;

const notification = {
id: notificationId,
data: {
type: req.body.type || 'info',
title: req.body.title || 'Уведомление',
body: req.body.body || '',
timestamp: Date.now()
}
};

notifications.push(notification);

console.log(`Новое уведомление #${notificationId}:`, notification.data);

res.json({ status: 'created', id: notificationId });
});

/**
* Вспомогательная функция отправки события
*/
function sendEvent(res, eventType, data, id = null) {
if (id) {
res.write(`id: ${id}\n`);
}

if (eventType !== 'message') {
res.write(`event: ${eventType}\n`);
}

const dataStr = typeof data === 'string' ? data : JSON.stringify(data);
dataStr.split('\n').forEach(line => {
res.write(`data: ${line}\n`);
});

res.write('\n'); // Пустая строка завершает событие
}

// Запуск сервера
const PORT = 3000;
app.listen(PORT, () => {
console.log(`Сервер запущен на порту ${PORT}`);
});

Event Streaming

Основы Event Streaming

Event Streaming — это потоковые платформы, такие как Apache Kafka, позволяют системам подписываться на события и реагировать на них.

События сохраняются в упорядоченном, неизменяемом журнале и могут быть потреблены множеством независимых приложений в реальном времени или с отсрочкой.

Событие (Event)

  • Атомарная запись о произошедшем факте в системе.
  • Содержит: тип события, временну́ю метку, идентификаторы, полезную нагрузку.
  • Неизменяемо после создания.

Топик (Topic)

  • Логический канал для категоризации событий.
  • Разделяет события по доменам или типам (например, orders.created, users.registered).
  • Поддерживает публикацию от множества производителей и потребление множеством подписчиков.

Партиция (Partition)

  • Горизонтальное разделение топика для масштабирования и параллелизма.
  • События внутри партиции упорядочены.
  • Ключ события определяет принадлежность к партиции (хеширование).

Смещение (Offset)

  • Уникальный идентификатор события внутри партиции.
  • Позволяет потребителю отслеживать позицию в потоке.
  • Гарантирует доставку «хотя бы один раз» при сохранении смещения.

Группа потребителей (Consumer Group)

  • Набор потребителей, совместно обрабатывающих топик.
  • Каждая партиция назначается одному потребителю в группе.
  • Обеспечивает масштабирование обработки и отказоустойчивость.

Retention Policy

  • Политика хранения событий: по времени (например, 7 дней) или по объёму (например, 100 ГБ).
  • Позволяет повторную обработку событий в течение заданного окна.

Потоковые операторы — это инструменты для обработки потоков данных в реальном времени. Они позволяют выполнять преобразования, фильтрацию и агрегацию данных.

Примеры потоковых операторов:

  • Map — преобразует каждый элемент потока. Пример - увеличить все числа в потоке на 1:
stream.map(x => x + 1);
  • Filter — отбирает только те элементы, кторые удовлетворяют условию. Пример - оставить только положительные числа:
stream.filter(x => x > 0);
  • Reduce — агрегирует данные в один результат. Пример - подсчитать сумму всех чисел в потоке:
stream.reduce((acc, x) => acc + x, 0);
  • FlatMap — преобразует каждый элемент в несколько элементов. Пример - разбить строку на слова:
stream.flatMap(sentence => sentence.split(' '));

Потоковые операторы в интеграции используются как раз в Apache Kafka Streams. На практике это обработка событий в реальном времени (например, подсчёт количества кликов пользователей).

Kafta и MQTT - не только асинхронные протоколы, но и реактивные.

Реактивные системы обеспечивают мгновенный обмен данными, и они подходят для работы с большим количеством событий и пользователей. Такое можно встретить в чатах, онлайн-играх, биржевых платформах, системах мониторинга.


Apache Kafka

Архитектура

  • Распределённый журнал (distributed commit log) с репликацией.
  • Кластер состоит из брокеров (brokers), каждый хранит подмножество партиций.
  • ZooKeeper (в версиях до 2.8) или KRaft (Kafka Raft) для координации кластера.

Гарантии

  • Упорядоченность в пределах партиции.
  • Персистентность через запись на диск с сегментацией.
  • Репликация с настраиваемым фактором (replication factor).
  • Подтверждение записи: acks=0 (без подтверждения), acks=1 (лидер), acks=all (все реплики).

Компоненты экосистемы

  • Kafka Connect — коннекторы для интеграции с внешними системами (базы данных, хранилища).
  • Kafka Streams — библиотека для потоковой обработки на стороне приложения.
  • ksqlDB — SQL-подобный движок для потоковых запросов.
  • Schema Registry — централизованное управление схемами сообщений (Avro, Protobuf).

Apache Pulsar

Архитектура

  • Разделение вычислений и хранения: брокеры обрабатывают запросы, BookKeeper хранит данные.
  • Многоуровневое хранение (tiered storage): горячие данные в BookKeeper, холодные в объектном хранилище (S3, GCS).

Отличия от Kafka

  • Встроенная поддержка multi-tenancy с изоляцией на уровне тенантов и пространств имён.
  • Гео-репликация на уровне кластера.
  • Гибкая политика хранения с автоматическим перемещением данных.

NATS Streaming (STAN) / JetStream

Особенности

  • Лёгкий протокол с минимальным оверхедом.
  • JetStream (встроен в NATS 2.0+) добавляет персистентность и потоковую обработку.
  • Поддержка «интересных» подписок (interest-based) и потоков (streams).

RabbitMQ Streams

Особенности

  • Расширение RabbitMQ для потоковой обработки.
  • Сохраняет совместимость с AMQP и добавляет семантику журналов.
  • Поддержка offset tracking и повторного чтения.

Event Sourcing

Принцип

  • Состояние агрегата определяется последовательностью событий, а не текущим снимком.
  • События сохраняются в журнале как единственный источник истины.

Преимущества

  • Полная история изменений для аудита и отладки.
  • Возможность восстановления состояния на любой момент времени.
  • Упрощение распределённых транзакций через компенсирующие события.

Реализация

// События
public record OrderCreated(Guid OrderId, string CustomerId, DateTime Timestamp);
public record OrderItemAdded(Guid OrderId, Product Product, int Quantity);
public record OrderCompleted(Guid OrderId, decimal TotalAmount);

// Агрегат
public class Order
{
private readonly List<object> _events = new();
public Guid Id { get; private set; }
public string CustomerId { get; private set; }
public List<OrderItem> Items { get; } = new();
public bool IsCompleted { get; private set; }

public void Create(string customerId)
{
var ev = new OrderCreated(Id, customerId, DateTime.UtcNow);
Apply(ev);
_events.Add(ev);
}

public void AddItem(Product product, int quantity)
{
var ev = new OrderItemAdded(Id, product, quantity);
Apply(ev);
_events.Add(ev);
}

public void Complete(decimal totalAmount)
{
var ev = new OrderCompleted(Id, totalAmount);
Apply(ev);
_events.Add(ev);
}

private void Apply(object ev)
{
switch (ev)
{
case OrderCreated created:
Id = created.OrderId;
CustomerId = created.CustomerId;
break;
case OrderItemAdded added:
Items.Add(new OrderItem(added.Product, added.Quantity));
break;
case OrderCompleted _:
IsCompleted = true;
break;
}
}

public IEnumerable<object> GetUncommittedEvents() => _events;
public void ClearEvents() => _events.Clear();
}

CQRS (Command Query Responsibility Segregation)

Принцип

  • Разделение операций записи (команды) и чтения (запросы).
  • Команды публикуют события в поток.
  • Проекции (read models) обрабатывают события и обновляют материализованные представления.

Архитектура

┌─────────────┐      ┌──────────────┐      ┌──────────────┐
│ Command │─────▶│ Event │─────▶│ Event │
│ Handler │ │ Bus/Stream │ │ Store │
└─────────────┘ └──────────────┘ └──────────────┘


┌─────────────────────┐
│ Projection/ │
│ Read Model │
└─────────────────────┘


┌─────────────────────┐
│ Query Handler │
└─────────────────────┘

Event-Driven Microservices

Принцип

  • Микросервисы взаимодействуют через события, а не через синхронные вызовы.
  • Каждый сервис имеет собственный контекст и базу данных.
  • События служат для распространения изменений между границами контекстов.

Пример потока

1. Order Service: OrderCreated → Kafka (topic: orders)
2. Payment Service: потребляет OrderCreated → создаёт платеж → PaymentProcessed → Kafka (topic: payments)
3. Inventory Service: потребляет OrderCreated → резервирует товар → InventoryReserved → Kafka (topic: inventory)
4. Notification Service: потребляет все события → отправляет уведомления

Change Data Capture (CDC)

Принцип

  • Перехват изменений в базе данных и публикация их как событий.
  • Позволяет синхронизировать данные между системами без изменения прикладного кода.

Инструменты

  • Debezium — CDC для реляционных баз (PostgreSQL, MySQL) и MongoDB.
  • Maxwell — MySQL binlog reader.
  • Kafka Connect с коннекторами JDBC, JMS.

C# (.NET) с Confluent Kafka

Производитель событий

using Confluent.Kafka;
using System.Text.Json;

public class OrderEventProducer
{
private readonly IProducer<string, string> _producer;
private readonly string _topic = "orders";

public OrderEventProducer(string bootstrapServers)
{
var config = new ProducerConfig
{
BootstrapServers = bootstrapServers,
Acks = Acks.All,
EnableIdempotence = true,
MaxInFlight = 5
};

_producer = new ProducerBuilder<string, string>(config).Build();
}

public async Task ProduceOrderCreatedAsync(Order order)
{
var eventData = new
{
type = "OrderCreated",
orderId = order.Id,
customerId = order.CustomerId,
items = order.Items,
timestamp = DateTimeOffset.UtcNow
};

var message = new Message<string, string>
{
Key = order.Id.ToString(),
Value = JsonSerializer.Serialize(eventData)
};

try
{
var result = await _producer.ProduceAsync(_topic, message);
Console.WriteLine($"Событие отправлено: {result.TopicPartitionOffset}");
}
catch (ProduceException<string, string> ex)
{
Console.WriteLine($"Ошибка отправки: {ex.Error.Reason}");
throw;
}
}

public void Dispose()
{
_producer?.Dispose();
}
}

Потребитель событий

using Confluent.Kafka;

public class OrderEventConsumer
{
private readonly IConsumer<string, string> _consumer;
private readonly string _topic = "orders";
private readonly CancellationTokenSource _cts = new();

public OrderEventConsumer(string bootstrapServers, string groupId)
{
var config = new ConsumerConfig
{
BootstrapServers = bootstrapServers,
GroupId = groupId,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false,
SessionTimeoutMs = 30000
};

_consumer = new ConsumerBuilder<string, string>(config)
.SetPartitionsAssignedHandler((c, partitions) =>
{
Console.WriteLine($"Назначены партиции: {string.Join(',', partitions)}");
})
.Build();
}

public async Task StartAsync()
{
_consumer.Subscribe(_topic);

try
{
while (!_cts.Token.IsCancellationRequested)
{
try
{
var result = _consumer.Consume(_cts.Token);

Console.WriteLine($"Получено событие: {result.Message.Key}");
ProcessEvent(result.Message.Value);

// Подтверждение обработки
_consumer.Commit(result);
}
catch (ConsumeException ex)
{
Console.WriteLine($"Ошибка потребления: {ex.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Потребление остановлено");
}
finally
{
_consumer.Close();
}
}

private void ProcessEvent(string eventData)
{
// Обработка события
Console.WriteLine($"Обработка: {eventData}");
}

public void Stop()
{
_cts.Cancel();
}

public void Dispose()
{
_consumer?.Dispose();
_cts?.Dispose();
}
}

Потоковая обработка с Kafka Streams

// Пример агрегации с использованием Kafka Streams API (Java-библиотека через IKVM или REST Proxy)
// Для .NET можно использовать Kafka REST Proxy или Confluent Cloud ksqlDB

public class OrderAnalytics
{
// Подсчёт заказов по клиентам
/*
CREATE TABLE customer_order_count AS
SELECT customer_id, COUNT(*) AS order_count
FROM orders
GROUP BY customer_id
EMIT CHANGES;
*/

// Средний чек по периодам
/*
CREATE TABLE avg_order_value AS
SELECT
WINDOWSTART() as period_start,
COUNT(*) as order_count,
AVG(total_amount) as avg_amount
FROM orders
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY WINDOWSTART()
EMIT CHANGES;
*/
}

Python с confluent-kafka

Производитель

from confluent_kafka import Producer
import json
import uuid
from datetime import datetime

class OrderProducer:
def __init__(self, bootstrap_servers):
self.producer = Producer({
'bootstrap.servers': bootstrap_servers,
'acks': 'all',
'enable.idempotence': True
})
self.topic = 'orders'

def delivery_report(self, err, msg):
if err is not None:
print(f'Ошибка доставки: {err}')
else:
print(f'Сообщение доставлено: {msg.topic()} [{msg.partition()}]')

def produce_order_created(self, order_id, customer_id, items):
event = {
'type': 'OrderCreated',
'order_id': str(order_id),
'customer_id': customer_id,
'items': items,
'timestamp': datetime.utcnow().isoformat()
}

self.producer.produce(
topic=self.topic,
key=str(order_id),
value=json.dumps(event),
callback=self.delivery_report
)

# Ожидание доставки всех сообщений
self.producer.flush()

def close(self):
self.producer.flush()

# Использование
producer = OrderProducer('localhost:9092')
producer.produce_order_created(
uuid.uuid4(),
'customer_123',
[{'product_id': 'p1', 'quantity': 2, 'price': 100.0}]
)
producer.close()

Потребитель

from confluent_kafka import Consumer, KafkaException
import json

class OrderConsumer:
def __init__(self, bootstrap_servers, group_id):
self.consumer = Consumer({
'bootstrap.servers': bootstrap_servers,
'group.id': group_id,
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
'session.timeout.ms': 30000
})
self.topic = 'orders'

def start(self):
self.consumer.subscribe([self.topic])

try:
while True:
msg = self.consumer.poll(timeout=1.0)

if msg is None:
continue

if msg.error():
raise KafkaException(msg.error())

try:
event = json.loads(msg.value().decode('utf-8'))
self.process_event(event)

# Подтверждение обработки
self.consumer.commit(asynchronous=False)

except json.JSONDecodeError as e:
print(f'Ошибка парсинга JSON: {e}')
except Exception as e:
print(f'Ошибка обработки события: {e}')
# Можно отправить в DLQ (dead letter queue)

except KeyboardInterrupt:
print('Потребление остановлено')
finally:
self.consumer.close()

def process_event(self, event):
event_type = event.get('type')

if event_type == 'OrderCreated':
print(f"Создан заказ: {event['order_id']}")
# Логика обработки...
elif event_type == 'OrderCompleted':
print(f"Завершён заказ: {event['order_id']}")
else:
print(f"Неизвестный тип события: {event_type}")

# Использование
consumer = OrderConsumer('localhost:9092', 'order-processing-group')
consumer.start()

Потоковая обработка с Faust

import faust
from datetime import datetime

app = faust.App(
'order-processing',
broker='kafka://localhost:9092',
store='memory://',
version=1
)

class OrderEvent(faust.Record, serializer='json'):
type: str
order_id: str
customer_id: str
items: list
timestamp: str

orders_topic = app.topic('orders', value_type=OrderEvent)

# Таблица для агрегации по клиентам
customer_orders = app.Table(
'customer_orders',
default=int
)

@app.agent(orders_topic)
async def process_orders(stream):
async for event in stream:
if event.type == 'OrderCreated':
print(f"Обработка заказа {event.order_id}")

# Инкремент счётчика заказов клиента
customer_orders[event.customer_id] += 1

# Логика обработки...
await process_order(event)

async def process_order(event):
# Бизнес-логика
pass

# Периодическая задача для отчётов
@app.timer(interval=60.0)
async def report_stats():
total = sum(customer_orders.values())
print(f"Статистика: всего заказов = {total}")
for customer, count in customer_orders.items():
print(f" Клиент {customer}: {count} заказов")

if __name__ == '__main__':
app.main()

Java с Spring Kafka

Конфигурация

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConfig {

private final String bootstrapServers = "localhost:9092";

@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

return new DefaultKafkaProducerFactory<>(config);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processing-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

return new DefaultKafkaConsumerFactory<>(config);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(
org.springframework.kafka.listener.ContainerProperties.AckMode.MANUAL_IMMEDIATE
);

return factory;
}
}

Производитель

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;

@Service
public class OrderEventProducer {

private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
private final String topic = "orders";

public OrderEventProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
this.objectMapper = new ObjectMapper();
}

public CompletableFuture<SendResult<String, String>> produceOrderCreated(
UUID orderId, String customerId, OrderItem[] items) {

try {
OrderEvent event = new OrderEvent(
"OrderCreated",
orderId.toString(),
customerId,
items,
System.currentTimeMillis()
);

String value = objectMapper.writeValueAsString(event);

return kafkaTemplate.send(topic, orderId.toString(), value)
.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println("Событие отправлено: " +
result.getRecordMetadata().offset());
} else {
System.err.println("Ошибка отправки: " + ex.getMessage());
}
});

} catch (Exception e) {
throw new RuntimeException("Ошибка сериализации", e);
}
}

private static class OrderEvent {
public String type;
public String orderId;
public String customerId;
public OrderItem[] items;
public long timestamp;

public OrderEvent(String type, String orderId, String customerId,
OrderItem[] items, long timestamp) {
this.type = type;
this.orderId = orderId;
this.customerId = customerId;
this.items = items;
this.timestamp = timestamp;
}
}

private static class OrderItem {
public String productId;
public int quantity;
public double price;
}
}

Потребитель

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;

@Service
public class OrderEventConsumer {

private final ObjectMapper objectMapper = new ObjectMapper();

@KafkaListener(topics = "orders", groupId = "order-processing-group")
public void consume(@Payload String eventData, Acknowledgment ack) {
try {
OrderEvent event = objectMapper.readValue(eventData, OrderEvent.class);

System.out.println("Получено событие: " + event.type +
" для заказа " + event.orderId);

processEvent(event);

// Подтверждение обработки
ack.acknowledge();

} catch (Exception e) {
System.err.println("Ошибка обработки события: " + e.getMessage());
// Можно отправить в DLQ
}
}

private void processEvent(OrderEvent event) {
switch (event.type) {
case "OrderCreated":
handleOrderCreated(event);
break;
case "OrderCompleted":
handleOrderCompleted(event);
break;
default:
System.out.println("Неизвестный тип события: " + event.type);
}
}

private void handleOrderCreated(OrderEvent event) {
System.out.println("Обработка созданного заказа: " + event.orderId);
// Бизнес-логика...
}

private void handleOrderCompleted(OrderEvent event) {
System.out.println("Обработка завершённого заказа: " + event.orderId);
// Бизнес-логика...
}

private static class OrderEvent {
public String type;
public String orderId;
public String customerId;
public OrderItem[] items;
public long timestamp;
}

private static class OrderItem {
public String productId;
public int quantity;
public double price;
}
}

Потоковая обработка с Kafka Streams

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.Properties;

@Component
public class OrderAnalyticsStreams {

@Bean
public KafkaStreams orderAnalyticsStreams() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-analytics-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

StreamsBuilder builder = new StreamsBuilder();

// Чтение событий заказов
KStream<String, String> orders = builder.stream("orders");

// Фильтрация только созданных заказов
KStream<String, OrderEvent> createdOrders = orders
.filter((key, value) -> value.contains("\"type\":\"OrderCreated\""))
.mapValues(value -> parseOrderEvent(value));

// Агрегация по клиентам
KTable<String, Long> customerOrderCount = createdOrders
.groupBy((key, event) -> event.customerId)
.count(Materialized.as("customer-order-count"));

// Подсчёт среднего чека по часам
KTable<Windowed<String>, Double> hourlyAvgOrderValue = createdOrders
.filter((key, event) -> event.totalAmount > 0)
.groupBy((key, event) -> "all")
.windowedBy(TimeWindows.of(Duration.ofHours(1)))
.aggregate(
() -> 0.0,
(key, event, total) -> total + event.totalAmount,
(key, event, total) -> total - event.totalAmount,
Materialized.as("hourly-total-amount")
)
.mapValues(total -> total / 100.0); // Пример вычисления

// Вывод результатов в топики
customerOrderCount
.toStream()
.to("customer-order-count", Produced.with(Serdes.String(), Serdes.Long()));

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

return streams;
}

private OrderEvent parseOrderEvent(String json) {
// Парсинг JSON в объект OrderEvent
return new OrderEvent();
}

private static class OrderEvent {
String customerId;
double totalAmount;
}
}

Node.js с kafkajs

Производитель

const { Kafka } = require('kafkajs');

const kafka = new Kafka({
clientId: 'order-service',
brokers: ['localhost:9092']
});

const producer = kafka.producer({
acks: -1, // all replicas
idempotent: true
});

async function produceOrderCreated(orderId, customerId, items) {
await producer.connect();

const event = {
type: 'OrderCreated',
orderId: orderId,
customerId: customerId,
items: items,
timestamp: new Date().toISOString()
};

await producer.send({
topic: 'orders',
messages: [{
key: orderId,
value: JSON.stringify(event)
}]
});

console.log(`Событие отправлено: ${orderId}`);
await producer.disconnect();
}

// Использование
produceOrderCreated(
'order-123',
'customer-456',
[{ productId: 'p1', quantity: 2, price: 100 }]
).catch(console.error);

Потребитель

const { Kafka } = require('kafkajs');

const kafka = new Kafka({
clientId: 'order-processor',
brokers: ['localhost:9092']
});

const consumer = kafka.consumer({
groupId: 'order-processing-group',
sessionTimeout: 30000
});

async function startConsumer() {
await consumer.connect();
await consumer.subscribe({ topic: 'orders', fromBeginning: true });

await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
const event = JSON.parse(message.value.toString());
console.log(`Получено событие: ${event.type}`);

await processEvent(event);

// Смещение автоматически подтверждается
} catch (error) {
console.error('Ошибка обработки:', error);
// Можно отправить в DLQ
}
}
});
}

async function processEvent(event) {
switch (event.type) {
case 'OrderCreated':
console.log(`Создан заказ: ${event.orderId}`);
// Логика обработки...
break;
case 'OrderCompleted':
console.log(`Завершён заказ: ${event.orderId}`);
break;
default:
console.log(`Неизвестный тип: ${event.type}`);
}
}

// Запуск
startConsumer().catch(console.error);

// Graceful shutdown
process.on('SIGTERM', async () => {
await consumer.disconnect();
process.exit(0);
});

Потоковая обработка с ksqlDB

// HTTP-запросы к ksqlDB REST API
const fetch = require('node-fetch');

async function createStream() {
const query = `
CREATE STREAM orders_stream (
type VARCHAR,
orderId VARCHAR,
customerId VARCHAR,
items ARRAY<STRUCT<productId VARCHAR, quantity INT, price DOUBLE>>,
timestamp VARCHAR
) WITH (
kafka_topic='orders',
value_format='json',
partitions=4
);
`;

await fetch('http://localhost:8088/ksql', {
method: 'POST',
headers: { 'Content-Type': 'application/vnd.ksql.v1+json' },
body: JSON.stringify({ ksql: query })
});
}

async function createAggregation() {
const query = `
CREATE TABLE customer_order_count AS
SELECT customerId, COUNT(*) AS orderCount
FROM orders_stream
GROUP BY customerId
EMIT CHANGES;
`;

await fetch('http://localhost:8088/ksql', {
method: 'POST',
headers: { 'Content-Type': 'application/vnd.ksql.v1+json' },
body: JSON.stringify({ ksql: query })
});
}

async function queryOrders() {
const response = await fetch('http://localhost:8088/query', {
method: 'POST',
headers: { 'Content-Type': 'application/vnd.ksql.v1+json' },
body: JSON.stringify({
ksql: "SELECT * FROM customer_order_count WHERE customerId = 'customer-456';"
})
});

const data = await response.json();
console.log('Результаты:', data);
}