Практика подключения MongoDB, Redis, RabbitMQ и Kafka в распределённой системе
Контекст: теория по каждому инструменту — в MongoDB, Redis в интеграции, RabbitMQ, Kafka. Здесь — одна собранная картина: кто к кому ходит по сети и зачем нужен каждый компонент.
Сценарий заказа в интернет-магазине
Разобьём систему на сервисы (каждый — отдельный процесс или контейнер со своим HTTP API):
| Сервис | Задача | Своё хранилище |
|---|---|---|
| api-gateway | Единая точка входа для браузера и мобильного приложения | — |
| catalog-service | Каталог товаров, цены, остатки | MongoDB (гибкая схема карточек) |
| order-service | Создание заказа, статусы | PostgreSQL (транзакции, уникальность заказа) |
| notification-worker | Письма и push после заказа | — (читает очередь) |
| analytics-consumer | Витрины и отчёты по событиям | ClickHouse / DWH (часто заполняется из Kafka) |
Общая инфраструктура (не «принадлежит» одному сервису):
| Компонент | Роль в этом примере |
|---|---|
| Redis | Кэш карточек каталога, rate limit, короткоживущие блокировки |
| RabbitMQ | Фоновые задачи: «отправить письмо», «списать бонусы» |
| Kafka | Поток событий order.created для аналитики и других подписчиков |
Как приложение «дёргает» отдельный микросервис
Правило для фронтенда
Браузер не обращается напрямую к catalog-service:3001 и order-service:3002 в продакшене:
- внутренние хосты не видны из интернета;
- CORS, секреты и единая авторизация проще на одном входе;
- клиенту не нужно знать, сколько сервисов за фасадом.
Цепочка выглядит так:
- SPA делает
POST https://shop.example.com/api/ordersс JWT. - API Gateway (Nginx, Kong, Traefik, AWS API Gateway) проверяет TLS, rate limit, маршрутизирует на
order-service. order-serviceпри необходимости сам вызываетcatalog-serviceпо внутреннему URL — это уже service-to-service, не браузер.
Подробнее про gateway и mesh — Коммуникация и интеграция, синхронная коммуникация.
Откуда сервис знает адрес соседа
| Среда | Как находят друг друга |
|---|---|
| Локально / Docker Compose | Имена сервисов в docker-compose.yml: http://catalog-service:3000 |
| Kubernetes | DNS вида http://catalog-service.orders.svc.cluster.local |
| Облако | Service discovery, переменные окружения, конфиг из Consul/Vault |
В коде адрес не хардкодят в репозитории — берут из process.env.CATALOG_SERVICE_URL / ConfigMap.
Пример вызова с gateway (клиент → заказ)
// Фронтенд — только публичный URL gateway
const res = await fetch('https://shop.example.com/api/orders', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${accessToken}`,
'Idempotency-Key': orderIdempotencyKey,
},
body: JSON.stringify({ items: [{ productId: 'PROD-001', qty: 2 }] }),
});
Разбор кода (фронтенд):
fetch— встроенный HTTP-клиент браузера; возвращаетPromiseс ответом.- Первый аргумент — полный URL gateway, не внутренний хост микросервиса.
method: 'POST'— создаём ресурс «заказ» (семантика REST для создания).Content-Type: application/json— тело ниже парсится как JSON.Authorization: Bearer ${accessToken}— JWT или access token после логина; gateway или order-service проверяют подпись и срок.Idempotency-Key— стабильный ключ на попытку оформления; повтор после таймаута не создаст второй заказ (идемпотентность).JSON.stringify({ items: [...] })— тело запроса;productIdиqtyдолжны совпасть с контрактом API.
// order-service — внутренний вызов catalog (сервер-сервер)
const catalogBase = process.env.CATALOG_SERVICE_URL; // http://catalog-service:3000
const product = await fetch(`${catalogBase}/products/${productId}`, {
headers: { 'X-Request-Id': correlationId },
});
Разбор кода (order-service → catalog):
process.env.CATALOG_SERVICE_URL— базовый URL из окружения (Compose/K8s ConfigMap), не захардкожен в git.- Шаблонная строка
`${catalogBase}/products/${productId}`— REST-ресурс «товар по id». fetchбезmethod— по умолчанию GET, только чтение карточки.X-Request-Id— correlation id; тот же id попадёт в логи catalog, order и брокеров для разбора инцидента.- Ответ нужно проверить (
res.ok/ статус 404) перед созданием заказа — иначе риск нулевой цены.
После ответа пользователю order-service не ждёт отправки письма: кладёт задачу в RabbitMQ и событие в Kafka (см. ниже). Пользователь уже получил 201, остальное — фон.
MongoDB — подключение и место в системе
Зачем отдельная MongoDB у catalog-service
Каталог часто меняет поля (атрибуты, локали, медиа). Database per service: только catalog-service пишет в свою MongoDB; order-service хранит снимок productId, цену и название на момент заказа в PostgreSQL, а актуальную карточку запрашивает по HTTP при оформлении.
Установка и shell — Первые шаги с MongoDB. Модель данных — MongoDB — документоориентированная БД.
Docker для учебного стенда
# фрагмент docker-compose.yml
services:
mongo:
image: mongo:7
ports:
- "27017:27017"
volumes:
- mongo_data:/data/db
volumes:
mongo_data:
Разбор кода (Docker):
services.mongo— имя сервиса; в сети Compose хостmongoрезолвится в этот контейнер.image: mongo:7— официальный образ MongoDB 7.x.ports: "27017:27017"— проброс порта на хост для Compass и локальных драйверов.volumes: mongo_data:/data/db— данные переживают пересоздание контейнера.- Именованный том
mongo_dataобъявлен в секцииvolumesвнизу файла.
TypeScript / Node.js (catalog-service)
Переменная окружения (никогда не коммитьте пароль в git):
MONGODB_URI=mongodb://mongo:27017/catalog
Разбор кода (URI):
mongodb://— схема драйвера MongoDB (аналогhttps://для HTTP).mongo— hostname сервиса в Docker Compose (неlocalhostиз другого контейнера).27017— стандартный портmongod./catalog— имя базы данных по умолчанию дляclient.db()без аргумента.
Минимальный модуль:
import { MongoClient } from 'mongodb';
const uri = process.env.MONGODB_URI;
if (!uri) throw new Error('Задайте MONGODB_URI');
let client;
export async function getDb() {
if (!client) {
client = new MongoClient(uri, { maxPoolSize: 20 });
await client.connect();
}
return client.db(); // база из пути URI: catalog
}
export async function closeMongo() {
await client?.close();
}
Разбор кода (модуль MongoDB):
import { MongoClient } from 'mongodb'— официальный драйвер Node.js.process.env.MONGODB_URI— строка подключения из.env/ секретов оркестратора.let client— один экземпляр на процесс (singleton), чтобы не открывать TCP на каждый запрос.new MongoClient(uri, { maxPoolSize: 20 })— пул до 20 соединений кmongod.await client.connect()— ленивое подключение при первомgetDb().client.db()— базаcatalogиз пути URI.export async function getDb()— единая точка доступа для роутов Express.client?.close()— graceful shutdown при SIGTERM в K8s.
Эндпоинт «товар по id»:
import express from 'express';
import { getDb } from './mongo.js';
const app = express();
app.get('/products/:id', async (req, res) => {
const db = await getDb();
const doc = await db.collection('products').findOne({ _id: req.params.id });
if (!doc) return res.status(404).json({ error: 'not_found' });
res.json(doc);
});
Разбор кода (эндпоинт):
express()/app.get— HTTP-сервер и маршрут GET с параметром:idв path.req.params.id— значение сегмента URL (/products/PROD-001→PROD-001).db.collection('products')— коллекция документов каталога (аналог таблицы).findOne({ _id: req.params.id })— поиск одного документа по первичному ключу_id.res.status(404).json({ error: 'not_found' })— контракт ошибки для order-service и gateway.res.json(doc)— сериализация BSON-документа в JSON для ответа.
Readiness: в /health/ready делайте db.command({ ping: 1 }) — gateway не направит трафик на pod без живой БД (FAQ по health).
Redis — подключение и роль рядом с API
Redis не заменяет MongoDB и PostgreSQL. В нашем сценарии:
- cache-aside для
GET /products/:id— снижает нагрузку на MongoDB; - опционально rate limit по
customerIdна gateway; - distributed lock на короткое время при резервировании остатка (осторожно с TTL и идемпотентностью).
Полный рабочий проект (Express, Pub/Sub, Streams, docker-compose) — в Redis в интеграции.
Фрагмент cache-aside в catalog-service:
import { redis } from './redisClient.js'; // см. 129.md
const cacheKey = `product:v1:${productId}`;
const cached = await redis.get(cacheKey);
if (cached) {
res.set('X-Cache', 'HIT');
return res.json(JSON.parse(cached));
}
const doc = await db.collection('products').findOne({ _id: productId });
const ttl = 60 + Math.floor(Math.random() * 15); // jitter
await redis.setex(cacheKey, ttl, JSON.stringify(doc));
res.set('X-Cache', 'MISS');
res.json(doc);
Разбор кода (cache-aside):
product:v1:${productId}— ключ кэша с версией префиксаv1для массовой инвалидации.redis.get(cacheKey)— чтение строки из Redis;null/undefined— промах (MISS).X-Cache: HIT— служебный заголовок для отладки и метрик доли попаданий в кэш.JSON.parse(cached)— в Redis храним JSON-текст, не BSON.findOneв MongoDB — выполняется только при MISS.Math.random() * 15(jitter) — разброс TTL, чтобы ключи не истекали одновременно.setex(key, ttl, value)— атомарно «значение + TTL в секундах».X-Cache: MISS— ответ собран из источника и положен в кэш.
При изменении цены в MongoDB сбрасывайте ключ (DEL product:v1:*) или поднимайте версию префикса product:v2:.
RabbitMQ — подключение и фоновые задачи
Зачем очередь, если уже есть HTTP
HTTP от order-service к notification-worker создаёт синхронную зависимость: упал worker — тормозится или падает создание заказа. RabbitMQ развязывает по времени: заказ сохранён в PostgreSQL → сообщение в очередь → worker обрабатывает когда может, с retry и DLQ.
Типичная топология:
- exchange
orders.tasks(typedirect); - queue
notifications.emailс binding keyemail.send; - DLQ для сообщений после N ошибок.
Теория, management UI, ack/nack — RabbitMQ.
Docker
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"
Разбор кода (Docker RabbitMQ):
rabbitmq:3-management— образ с плагином веб-UI (Management).5672— порт протокола AMQP для приложений.15672— HTTP-интерфейс Management (guest/guestтолько для dev).
Producer в order-service (после commit в БД)
import amqp from 'amqplib';
const RABBIT_URL = process.env.RABBITMQ_URL ?? 'amqp://guest:guest@rabbitmq:5672';
export async function enqueueOrderEmail({ orderId, email, correlationId }) {
const conn = await amqp.connect(RABBIT_URL);
const ch = await conn.createChannel();
await ch.assertExchange('orders.tasks', 'direct', { durable: true });
await ch.assertQueue('notifications.email', { durable: true });
await ch.bindQueue('notifications.email', 'orders.tasks', 'email.send');
const body = Buffer.from(JSON.stringify({ orderId, email }));
ch.publish('orders.tasks', 'email.send', body, {
persistent: true,
contentType: 'application/json',
headers: { 'x-correlation-id': correlationId },
});
await ch.close();
await conn.close();
}
Разбор кода (producer RabbitMQ):
amqplib— клиент AMQP 0-9-1 для Node.js.amqp.connect(RABBIT_URL)— TCP-сессия к брокеру; URL содержит логин, хост, vhost.createChannel()— виртуальное соединение для команд (легче, чем новый TCP на каждое сообщение).assertExchange(..., 'direct', { durable: true })— exchange переживает рестарт брокера;directмаршрутизирует по точному routing key.assertQueue(..., { durable: true })— очередь на диске, не теряется при рестарте.bindQueue(queue, exchange, 'email.send')— в очередь попадут сообщения с routing keyemail.send.Buffer.from(JSON.stringify(...))— тело сообщения в бинарном виде.publish(exchange, routingKey, body, options)— отправка без привязки к имени очереди (маршрутизация через exchange).persistent: true—delivery_mode=2, сообщение пишется на диск (при durable queue).x-correlation-id— пользовательский заголовок для трассировки.ch.close()/conn.close()— в проде чаще долгоживущий канал, здесь упрощение для учебника.
Вызывайте только после успешного COMMIT заказа в PostgreSQL. Иначе письмо уйдёт без заказа или наоборот.
Consumer в notification-worker
import amqp from 'amqplib';
const conn = await amqp.connect(process.env.RABBITMQ_URL);
const ch = await conn.createChannel();
await ch.prefetch(10); // не забирать сотни сообщений без обработки
await ch.consume('notifications.email', async (msg) => {
if (!msg) return;
try {
const payload = JSON.parse(msg.content.toString());
await sendEmail(payload); // SMTP / SendGrid
ch.ack(msg);
} catch (err) {
// reject без requeue → DLQ, если настроена политика
ch.nack(msg, false, false);
}
});
Разбор кода (consumer RabbitMQ):
prefetch(10)— broker отдаёт не больше 10 unacked сообщений на consumer; защита от перегрузки памяти.consume(queue, callback)— push-модель: брокер вызывает callback при появлении сообщения.msg === null— канал закрыт, выходим из обработчика.msg.content.toString()— тело AMQP → строка →JSON.parse.sendEmail(payload)— бизнес-логика worker; здесь не блокирует HTTP пользователя.ch.ack(msg)— подтверждение успеха; сообщение удаляется из очереди (at-least-once после ack).ch.nack(msg, false, false)— отказ: не requeue (requeue=false) → политика DLQ или discard.
Идемпотентность: храните processed_message_id или проверяйте «письмо по orderId уже отправлено» — at-least-once даст дубликаты (идемпотентность).
Kafka — подключение и поток событий
Чем Kafka отличается от RabbitMQ в этом же заказе
| RabbitMQ в примере | Kafka в примере | |
|---|---|---|
| Задача | «Сделай сейчас: отправь email» | «Зафиксируй факт: заказ создан» |
| Потребители | Один worker на очередь (масштаб — несколько consumer одной очереди) | Много независимых consumer group (аналитика, антифрод, поиск) |
| История | Сообщение удаляется после ack | События хранятся по retention; можно перечитать |
Оба могут сосуществовать: RabbitMQ для команд, Kafka для событий домена.
Подробнее — Kafka, практикум KRaft — Java-приложение с Kafka и PostgreSQL.
Producer в order-service (KafkaJS, Node.js)
import { Kafka } from 'kafkajs';
const kafka = new Kafka({
clientId: 'order-service',
brokers: (process.env.KAFKA_BROKERS ?? 'kafka:9092').split(','),
});
const producer = kafka.producer();
export async function publishOrderCreated(event) {
await producer.connect();
await producer.send({
topic: 'shop.orders.v1',
messages: [
{
key: event.orderId, // порядок событий одного заказа в одной партиции
value: JSON.stringify(event),
headers: { 'correlation-id': event.correlationId },
},
],
});
}
Разбор кода (producer Kafka):
Kafka— точка входа KafkaJS;clientIdидентифициует приложение в логах брокера.brokers— списокhost:port;split(',')для нескольких брокеров кластера.kafka.producer()— объект для записи в топики.producer.connect()— установка метаданных топиков (в проде connect один раз при старте).producer.send({ topic, messages })— пакетная отправка (здесь одно сообщение); батчинг продюсера — Пакетная работа с данными.key: event.orderId— все события одного заказа попадают в одну партицию → порядок внутри ключа.value: JSON.stringify(event)— полезная нагрузка как UTF-8 строка (или Avro/Protobuf в зрелых системах).headers— метаданные вне тела;correlation-idдля observability.
Топик создают заранее (или через IaC) с нужным числом партиций; consumer group analytics и fraud читают один топик независимо.
Consumer (analytics-consumer)
const consumer = kafka.consumer({ groupId: 'analytics' });
await consumer.connect();
await consumer.subscribe({ topic: 'shop.orders.v1', fromBeginning: false });
await consumer.run({
eachMessage: async ({ message }) => {
const evt = JSON.parse(message.value.toString());
await insertIntoWarehouse(evt); // идемпотентный upsert по orderId
},
});
Разбор кода (consumer Kafka):
consumer({ groupId: 'analytics' })— consumer group; Kafka балансирует партиции между членами группы.subscribe({ topic, fromBeginning: false })— читать только новые сообщения после подписки (не с начала топика).consumer.run({ eachMessage })— цикл опроса; callback на каждое сообщение.message.value.toString()— байты значения → строка JSON.insertIntoWarehouse(evt)— запись в DWH; должна быть идемпотентной поorderId.- Явный
commitв KafkaJS настраивается отдельно; по умолчанию commit после обработки — не коммитьте до успешного upsert.
Commit offset после успешной записи в DWH. Рост consumer lag — главный сигнал отставания (FAQ).
Клиенты на разных языках
Ниже — те же операции, что в примерах выше (MongoDB findOne, Redis GET/SETEX, RabbitMQ publish в orders.tasks, Kafka produce в shop.orders.v1, HTTP GET к catalog). В каждом стеке свой официальный или де-факто стандартный клиент; имена классов и методов различаются, смысл один.
Подробные справочники по брокерам — RabbitMQ, Kafka (разделы «Подключение приложений»).
HTTP — order-service вызывает catalog-service
| Язык | Пакет / модуль | Типичные API |
|---|---|---|
| TypeScript | fetch (встроенный) | fetch(url, { headers }), res.json() |
| Python | httpx или requests | httpx.get(url, headers=...), .json() |
| Java | java.net.http.HttpClient или Spring WebClient | HttpClient.send(request, BodyHandlers.ofString()) |
| C# | System.Net.Http.HttpClient | GetAsync, ReadFromJsonAsync<T>() |
| Go | net/http | http.NewRequest, http.Client.Do |
| PHP | guzzlehttp/guzzle | GuzzleHttp\Client::get() |
const res = await fetch(`${catalogBase}/products/${productId}`, {
headers: { 'X-Request-Id': correlationId },
});
if (!res.ok) throw new Error(`catalog ${res.status}`);
const product = await res.json();
Разбор кода (TypeScript):
`${catalogBase}/products/${productId}`— тот же контракт path, что у Expressapp.get('/products/:id').res.ok—falseпри 4xx/5xx; без проверки можно принять ошибку за товар.res.json()— парсинг тела ответа в объект JavaScript.
import httpx
r = httpx.get(f"{catalog_base}/products/{product_id}", headers={"X-Request-Id": cid}, timeout=5.0)
r.raise_for_status()
product = r.json()
Разбор кода (Python):
httpx.get— синхронный GET; для async-сервиса —httpx.AsyncClient.timeout=5.0— не ждать catalog дольше 5 с (согласовать с timeout gateway и order-service).raise_for_status()— исключение при 404/503 вместо тихого разбора HTML ошибки.r.json()— dict с полями карточки товара.
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(catalogBase + "/products/" + productId))
.header("X-Request-Id", correlationId)
.GET()
.build();
String body = HttpClient.newHttpClient().send(request, HttpResponse.BodyHandlers.ofString()).body();
Разбор кода (Java):
HttpRequest.newBuilder()— fluent-сборка запроса (Java 11+)..uri(URI.create(...))— полный URL ресурса..header("X-Request-Id", ...)— произвольный заголовок трассировки..GET()— метод без тела.HttpClient.send— блокирующий вызов;body()— строка JSON (дальше — Jackson/Gson).
using var client = new HttpClient();
client.DefaultRequestHeaders.Add("X-Request-Id", correlationId);
var product = await client.GetFromJsonAsync<ProductDto>($"{catalogBase}/products/{productId}");
Разбор кода (C#):
using var client—HttpClientна процесс (не создавать новый на каждый запрос).DefaultRequestHeaders.Add— общий заголовок для всех вызовов этого клиента.GetFromJsonAsync<ProductDto>— десериализация JSON в типизированный DTO.
req, _ := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/products/%s", catalogBase, productID), nil)
req.Header.Set("X-Request-Id", correlationID)
resp, err := http.DefaultClient.Do(req)
Разбор кода (Go):
http.NewRequest— конструктор запроса;MethodGet— GET.req.Header.Set— заголовки доDo.http.DefaultClient.Do— выполнение; нужно проверитьresp.StatusCodeи прочитатьresp.Body(здесь сокращено).
$client = new \GuzzleHttp\Client(['base_uri' => $catalogBase]);
$response = $client->get("products/{$productId}", ['headers' => ['X-Request-Id' => $cid]]);
$product = json_decode($response->getBody()->getContents(), true);
Разбор кода (PHP):
base_uri— префикс URL; относительный путьproducts/{id}дописывается к нему.$client->get(..., ['headers' => ...])— GET с заголовками.getBody()->getContents()— строка ответа.json_decode(..., true)— ассоциативный массив PHP вместо объекта.
MongoDB — чтение карточки товара
| Язык | Пакет | Классы / функции | Операция в примере |
|---|---|---|---|
| TypeScript | mongodb | MongoClient, Db, Collection | collection.findOne({ _id }) |
| Python | pymongo | MongoClient, Database, Collection | collection.find_one({"_id": ...}) |
| Java | mongodb-driver-sync | MongoClients, MongoCollection, Filters | collection.find(eq("_id", id)).first() |
| C# | MongoDB.Driver | MongoClient, IMongoCollection<T>, FilterDefinition | Find(filter).FirstOrDefaultAsync() |
| Go | go.mongodb.org/mongo-driver | mongo.Client, mongo.Collection | FindOne(ctx, bson.M{"_id": id}) |
| PHP | mongodb/mongodb (расширение ext-mongodb) | MongoDB\Client, Collection | findOne(['_id' => $id]) |
from pymongo import MongoClient
client = MongoClient(os.environ["MONGODB_URI"])
doc = client["catalog"]["products"].find_one({"_id": product_id})
Разбор кода (Python / pymongo):
MongoClient(uri)— пул соединений к кластеру/standalone.client["catalog"]— доступ к БДcatalog(квадратные скобки).["products"]— коллекцияproducts.find_one({"_id": product_id})— один документ илиNone.
import com.mongodb.client.MongoClients;
import com.mongodb.client.model.Filters;
var client = MongoClients.create(mongoUri);
var doc = client.getDatabase("catalog")
.getCollection("products")
.find(Filters.eq("_id", productId))
.first();
Разбор кода (Java):
MongoClients.create(mongoUri)— фабрика синхронного драйвера.getDatabase("catalog").getCollection("products")— цепочка БД → коллекция.find(Filters.eq("_id", productId))— курсор с фильтром по_id..first()— первый документ илиnull.
using MongoDB.Driver;
var client = new MongoClient(mongoUri);
var products = client.GetDatabase("catalog").GetCollection<BsonDocument>("products");
var doc = await products
.Find(Builders<BsonDocument>.Filter.Eq("_id", productId))
.FirstOrDefaultAsync();
Разбор кода (C#):
MongoClient/GetDatabase/GetCollection<BsonDocument>— типизированный доступ к коллекции.Builders<BsonDocument>.Filter.Eq("_id", productId)— фильтр эквивалентен{ _id: id }.FirstOrDefaultAsync— async-чтение одного документа.
import (
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)
coll := client.Database("catalog").Collection("products")
var doc bson.M
err := coll.FindOne(ctx, bson.M{"_id": productID}).Decode(&doc)
Разбор кода (Go):
ctx— контекст отмены/таймаута (обязателен в mongo-driver).bson.M{"_id": productID}— фильтр как map.FindOne(...).Decode(&doc)— декодирование BSON вbson.Mили struct.
$client = new MongoDB\Client(getenv('MONGODB_URI'));
$doc = $client->selectDatabase('catalog')
->selectCollection('products')
->findOne(['_id' => $productId]);
Разбор кода (PHP):
MongoDB\Client— высокоуровневая обёртка надext-mongodb.selectDatabase/selectCollection— явный выбор БД и коллекции.findOne(['_id' => $productId])— массив фильтра → один документ илиnull.
Redis — cache-aside
| Язык | Пакет | Классы / типы | Операция в примере |
|---|---|---|---|
| TypeScript | ioredis или redis | Redis | get, setex |
| Python | redis | redis.Redis | get, setex |
| Java | lettuce-core (или Jedis) | RedisClient, StatefulRedisConnection, RedisCommands | sync().get, sync().setex |
| C# | StackExchange.Redis | ConnectionMultiplexer, IDatabase | StringGetAsync, StringSetAsync с TTL |
| Go | github.com/redis/go-redis/v9 | redis.Client | Get, Set с time.Duration |
| PHP | predis/predis | Predis\Client | get, setex |
import redis
r = redis.Redis.from_url(os.environ["REDIS_URL"])
key = f"product:v1:{product_id}"
cached = r.get(key)
if cached:
return json.loads(cached)
doc = collection.find_one({"_id": product_id})
r.setex(key, 60, json.dumps(doc, default=str))
Разбор кода (Python / redis):
Redis.from_url— парсингredis://host:port/db.r.get(key)—bytesилиNone; при hit —json.loads.setex(key, seconds, value)— запись с TTL 60 с.default=strвjson.dumps— сериализация дат/ObjectId из Mongo.
RedisClient redis = RedisClient.create(redisUrl);
try (var connection = redis.connect()) {
RedisCommands<String, String> cmd = connection.sync();
String cached = cmd.get("product:v1:" + productId);
if (cached != null) { /* deserialize */ }
cmd.setex("product:v1:" + productId, 60, json);
}
Разбор кода (Java / Lettuce):
RedisClient.create— клиент Lettuce (Netty под капотом).connect()+ try-with-resources — освобождение соединения.connection.sync()— синхронные команды Redis.get/setex— те же команды, чтоGET/SETEXв redis-cli.
var mux = await ConnectionMultiplexer.ConnectAsync(redisUrl);
var db = mux.GetDatabase();
var key = $"product:v1:{productId}";
var cached = await db.StringGetAsync(key);
if (cached.HasValue) { /* hit */ }
await db.StringSetAsync(key, json, TimeSpan.FromSeconds(60));
Разбор кода (C# / StackExchange.Redis):
ConnectionMultiplexer— один мультиплексор на приложение (тысячи logical DB).GetDatabase()— логическая БД Redis (0 по умолчанию).StringGetAsync— GET;HasValue— проверка hit.StringSetAsyncсTimeSpan— SET с истечением.
rdb := redis.NewClient(&redis.Options{Addr: "redis:6379"})
key := "product:v1:" + productID
cached, err := rdb.Get(ctx, key).Result()
if err == nil { /* hit */ }
rdb.Set(ctx, key, jsonBytes, 60*time.Second)
Разбор кода (Go / go-redis):
redis.NewClient— клиент с адресом брокера.Get(ctx, key).Result()— приredis.Nil— промах кэша.Set(ctx, key, value, expiration)— SET с TTL (60*time.Second).
$redis = new Predis\Client(getenv('REDIS_URL'));
$key = "product:v1:$productId";
if ($cached = $redis->get($key)) {
return json_decode($cached, true);
}
$redis->setex($key, 60, json_encode($doc));
Разбор кода (PHP / Predis):
Predis\Client— чистый PHP-клиент (без ext-redis).$redis->get($key)— в условииif— hit/miss.setex($key, $seconds, $payload)— SETEX одной командой.
RabbitMQ — publish задачи «отправить email»
| Язык | Пакет | Классы / функции | Операция в примере |
|---|---|---|---|
| TypeScript | amqplib | connect, Channel | publish, consume, ack |
| Python | pika | BlockingConnection, channel | basic_publish, basic_consume, basic_ack |
| Java | com.rabbitmq:amqp-client | ConnectionFactory, Connection, Channel | basicPublish, basicConsume, basicAck |
| C# | RabbitMQ.Client | ConnectionFactory, IConnection, IModel | BasicPublish, BasicConsume, BasicAck |
| Go | github.com/rabbitmq/amqp091-go | amqp.Dial, Channel | PublishWithContext, Consume, Ack |
| PHP | php-amqplib/php-amqplib | AMQPStreamConnection, channel | basic_publish, basic_consume, basic_ack |
import pika, json
params = pika.URLParameters(os.environ["RABBITMQ_URL"])
with pika.BlockingConnection(params) as conn:
ch = conn.channel()
ch.exchange_declare("orders.tasks", "direct", durable=True)
ch.basic_publish(
"orders.tasks",
"email.send",
json.dumps({"orderId": order_id, "email": email}),
pika.BasicProperties(delivery_mode=2, content_type="application/json"),
)
Разбор кода (Python / pika):
URLParameters— разборamqp://user:pass@host:5672/vhost.BlockingConnection— синхронный клиент (для worker/скриптов).exchange_declare(..., durable=True)— устойчивый exchange.basic_publish(exchange, routing_key, body, properties)— аналогchannel.publishв Node.delivery_mode=2— persistent message.
ConnectionFactory factory = new ConnectionFactory();
factory.setUri(rabbitUrl);
try (Connection conn = factory.newConnection(); Channel ch = conn.createChannel()) {
ch.exchangeDeclare("orders.tasks", BuiltinExchangeType.DIRECT, true);
ch.basicPublish("orders.tasks", "email.send", null, body.getBytes(StandardCharsets.UTF_8));
}
Разбор кода (Java):
ConnectionFactory.setUri— единая строка подключения AMQP.createConnection/createChannel— TCP + виртуальный канал.exchangeDeclare(..., DIRECT, true)— durable direct exchange.basicPublish— 4-й аргументbyte[]тела;nullproperties — без доп. заголовков.
var factory = new ConnectionFactory { Uri = new Uri(rabbitUrl) };
using var conn = factory.CreateConnection();
using var ch = conn.CreateModel();
ch.ExchangeDeclare("orders.tasks", ExchangeType.Direct, durable: true);
ch.BasicPublish("orders.tasks", "email.send", body: Encoding.UTF8.GetBytes(json));
Разбор кода (C#):
ConnectionFactory+CreateConnection/CreateModel—IModel≈ Channel в AMQP.ExchangeDeclare— объявление топологии (идемпотентно при старте).BasicPublish— именованные аргументы exchange, routing key, body.
conn, _ := amqp.Dial(rabbitURL)
ch, _ := conn.Channel()
_ = ch.PublishWithContext(ctx, "orders.tasks", "email.send", false, false, amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "application/json",
Body: body,
})
Разбор кода (Go / amqp091-go):
amqp.Dial— подключение по URL или host.PublishWithContext— publish с отменой черезctx.DeliveryMode: amqp.Persistent— сообщение на диск.- Первые
false, false— mandatory/immediate (обычно false).
$connection = new AMQPStreamConnection('rabbitmq', 5672, 'guest', 'guest');
$channel = $connection->channel();
$msg = new AMQPMessage($json, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($msg, 'orders.tasks', 'email.send');
Разбор кода (PHP / php-amqplib):
AMQPStreamConnection— host, port, user, password.AMQPMessageсdelivery_mode— persistent.basic_publish($msg, $exchange, $routingKey)— порядок аргументов как в спецификации PHP-библиотеки.
Consumer (notification-worker) на всех языках — basic_consume / Consume + обработчик + ack после успешного SMTP; при ошибке — nack без requeue в DLQ (RabbitMQ).
Kafka — produce события order.created
| Язык | Пакет | Классы / функции | Операция в примере |
|---|---|---|---|
| TypeScript | kafkajs | Kafka, Producer, Consumer | producer.send, consumer.run |
| Python | confluent-kafka | Producer, Consumer | produce, poll, commit |
| Java | org.apache.kafka:kafka-clients | KafkaProducer, KafkaConsumer, ProducerRecord | send, poll, commitSync |
| C# | Confluent.Kafka | ProducerBuilder, ConsumerBuilder, Message | ProduceAsync, Consume, Commit |
| Go | github.com/segmentio/kafka-go | kafka.Writer, kafka.Reader | WriteMessages, ReadMessage, CommitMessages |
| PHP | ext-rdkafka + php-rdkafka/php-rdkafka | RdKafka\Producer, RdKafka\KafkaConsumer | produce, consume, commit |
from confluent_kafka import Producer
import json
p = Producer({"bootstrap.servers": os.environ["KAFKA_BROKERS"]})
p.produce(
"shop.orders.v1",
key=order_id,
value=json.dumps(event),
headers=[("correlation-id", correlation_id.encode())],
)
p.flush()
Разбор кода (Python / confluent-kafka):
Producer({"bootstrap.servers": ...})— конфиг librdkafka.produce(topic, key=, value=, headers=)— асинхронная постановка в очередь отправки.flush()— дождаться доставки в брокер перед завершением процесса.
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
producer.send(new ProducerRecord<>("shop.orders.v1", orderId, json));
}
Разбор кода (Java / kafka-clients):
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG— адреса брокеров.KEY/VALUE_SERIALIZER_CLASS_CONFIG— String → bytes для ключа и JSON-тела.KafkaProducer— потокобезопасный producer.ProducerRecord(topic, key, value)— одно сообщение;sendасинхронный (нуженflushпри shutdown).
var config = new ProducerConfig { BootstrapServers = brokers };
using var producer = new ProducerBuilder<string, string>(config).Build();
await producer.ProduceAsync("shop.orders.v1", new Message<string, string> {
Key = orderId,
Value = json,
});
Разбор кода (C# / Confluent.Kafka):
ProducerConfig— bootstrap servers, security, acks.ProducerBuilder<string, string>.Build()— типизированный producer.ProduceAsync+Message { Key, Value }— запись в топик с await.
w := &kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: "shop.orders.v1",
Balancer: &kafka.LeastBytes{},
}
_ = w.WriteMessages(ctx, kafka.Message{Key: []byte(orderID), Value: eventJSON})
Разбор кода (Go / kafka-go):
kafka.Writer— высокоуровневый producer segmentio.Addr: kafka.TCP(brokers...)— список брокеров.Balancer: &kafka.LeastBytes{}— выбор партиции по загрузке (без key — иначе hash по key).WriteMessages— batch-friendly отправка.
$producer = new RdKafka\Producer();
$producer->addBrokers(getenv('KAFKA_BROKERS'));
$topic = $producer->newTopic('shop.orders.v1');
$topic->produce(RD_KAFKA_PARTITION_UA, 0, $json, $orderId);
$producer->flush(1000);
Разбор кода (PHP / rdkafka):
RdKafka\Producer— обёртка над librdkafka (нужноext-rdkafka).addBrokers— списокhost:port.newTopic+produce— partitionRD_KAFKA_PARTITION_UA(брокер выберет по key).flush(1000)— таймаут ожидания отправки в мс.
Consumer (analytics) — отдельная group.id (analytics), чтение топика shop.orders.v1, идемпотентный upsert в DWH, commit offset после успешной записи.
Как ориентироваться при выборе клиента
| Ситуация | Рекомендация |
|---|---|
| Один язык во всех микросервисах | Официальный драйвер MongoDB/Kafka для платформы; Redis — ioredis / Lettuce / StackExchange.Redis |
| JVM-экосистема | Spring Boot — spring-boot-starter-data-mongodb, spring-kafka, spring-boot-starter-amqp поверх тех же клиентов |
| .NET | MongoDB.Driver, Confluent.Kafka, RabbitMQ.Client, StackExchange.Redis |
| PHP | Расширения ext-mongodb, ext-redis или Predis; Kafka — только через librdkafka (ext-rdkafka) |
| Строгие контракты между сервисами | OpenAPI для HTTP; JSON Schema / Avro для Kafka — см. Kafka |
Полноценный учебный стенд на TypeScript (Redis, Pub/Sub, Streams) — 129.md. Java + Kafka + PostgreSQL — практикум 1191.
Один проход — создание заказа от клика до аналитики
| Шаг | Кто | Что происходит | Технология |
|---|---|---|---|
| 1 | SPA → Gateway | POST /api/orders | HTTPS, JWT |
| 2 | Gateway → order-service | Маршрутизация, rate limit | API Gateway |
| 3 | order-service → catalog-service | Проверка товара и цены | HTTP (sync) |
| 4 | catalog-service | Чтение карточки | Redis → при MISS MongoDB |
| 5 | order-service | INSERT заказ, статус created | PostgreSQL |
| 6 | order-service | Задача «отправить email» | RabbitMQ publish |
| 7 | order-service | Событие order.created | Kafka produce |
| 8 | order-service → клиент | 201 + тело заказа | HTTP |
| 9 | notification-worker | SMTP / push | RabbitMQ consume + ack |
| 10 | analytics-consumer | Строка в витрине | Kafka consume + commit offset |
Если на шаге 3 catalog-service недоступен — возвращайте 503 или используйте circuit breaker, не создавайте заказ с нулевой ценой (Инженерия устойчивости).
Учебный docker-compose (все порты в одном файле)
Скелет для локальной отработки связей (образы и тома сокращены; в production секреты — из vault, сеть — internal):
services:
api-gateway:
image: nginx:alpine
ports:
- "8080:80"
# upstream: catalog-service, order-service
catalog-service:
build: ./catalog-service
environment:
MONGODB_URI: mongodb://mongo:27017/catalog
REDIS_URL: redis://redis:6379/0
depends_on: [mongo, redis]
order-service:
build: ./order-service
environment:
DATABASE_URL: postgres://postgres:postgres@postgres:5432/orders
CATALOG_SERVICE_URL: http://catalog-service:3000
RABBITMQ_URL: amqp://guest:guest@rabbitmq:5672
KAFKA_BROKERS: kafka:9092
depends_on: [postgres, rabbitmq, kafka]
notification-worker:
build: ./notification-worker
environment:
RABBITMQ_URL: amqp://guest:guest@rabbitmq:5672
analytics-consumer:
build: ./analytics-consumer
environment:
KAFKA_BROKERS: kafka:9092
mongo:
image: mongo:7
redis:
image: redis:7-alpine
postgres:
image: postgres:16-alpine
rabbitmq:
image: rabbitmq:3-management
kafka:
image: bitnami/kafka:3
Разбор кода (docker-compose):
build: ./catalog-service— образ из Dockerfile рядом с compose-файлом.environment— URL зависимостей; имена хостов (mongo,redis) из имён services.depends_on— порядок старта контейнеров (не гарантия готовности сервиса — для prod нужны healthcheck).CATALOG_SERVICE_URL: http://catalog-service:3000— внутренний DNS Compose для HTTP между сервисами.DATABASE_URL— PostgreSQL только у order-service (database per service).notification-workerиanalytics-consumer— отдельные процессы без публичных портов, только к брокерам.- Инфраструктурные сервисы (
mongo,redis,postgres,rabbitmq,kafka) — общие для стенда, в prod часто managed-сервисы в облаке.
В Kubernetes те же зависимости задают Deployments, Services и Secrets; см. контейнеры в микросервисах.
Как выбрать инструмент в новом сервисе
| Нужно | Инструмент |
|---|---|
| Хранить сущности сервиса, гибкая схема | MongoDB (или своя SQL — по домену) |
| Ускорить чтение, сессии, блокировки | Redis |
| Фоновая задача «сделай X», retry, DLQ | RabbitMQ |
| Событие «произошло Y», много подписчиков, перечитать поток | Kafka |
| Ответ пользователю «здесь и сейчас» | HTTP/gRPC между сервисами через gateway |
Рекомендую читать дальше
| Тема | Статья |
|---|---|
| Клиенты по языкам (таблицы выше) | § Клиенты на разных языках |
| Типы sync/async | Типы взаимодействия |
| Полный Redis-проект | Redis в интеграции |
| RabbitMQ углублённо | RabbitMQ, 8.05 RabbitMQ |
| Kafka углублённо | Kafka, 8.05 Kafka |
| Outbox (БД + Kafka атомарно) | Проектирование API — mTLS/JWS/outbox |
| Микросервисы в проде | Микросервисы и интеграция — о разделе |