Перейти к основному содержимому

Практика подключения MongoDB, Redis, RabbitMQ и Kafka в распределённой системе

Разработчику Архитектору Инженеру

Контекст: теория по каждому инструменту — в MongoDB, Redis в интеграции, RabbitMQ, Kafka. Здесь — одна собранная картина: кто к кому ходит по сети и зачем нужен каждый компонент.


Сценарий заказа в интернет-магазине

Разобьём систему на сервисы (каждый — отдельный процесс или контейнер со своим HTTP API):

СервисЗадачаСвоё хранилище
api-gatewayЕдиная точка входа для браузера и мобильного приложения
catalog-serviceКаталог товаров, цены, остаткиMongoDB (гибкая схема карточек)
order-serviceСоздание заказа, статусыPostgreSQL (транзакции, уникальность заказа)
notification-workerПисьма и push после заказа— (читает очередь)
analytics-consumerВитрины и отчёты по событиямClickHouse / DWH (часто заполняется из Kafka)

Общая инфраструктура (не «принадлежит» одному сервису):

КомпонентРоль в этом примере
RedisКэш карточек каталога, rate limit, короткоживущие блокировки
RabbitMQФоновые задачи: «отправить письмо», «списать бонусы»
KafkaПоток событий order.created для аналитики и других подписчиков

Как приложение «дёргает» отдельный микросервис

Правило для фронтенда

Браузер не обращается напрямую к catalog-service:3001 и order-service:3002 в продакшене:

  • внутренние хосты не видны из интернета;
  • CORS, секреты и единая авторизация проще на одном входе;
  • клиенту не нужно знать, сколько сервисов за фасадом.

Цепочка выглядит так:

  1. SPA делает POST https://shop.example.com/api/orders с JWT.
  2. API Gateway (Nginx, Kong, Traefik, AWS API Gateway) проверяет TLS, rate limit, маршрутизирует на order-service.
  3. order-service при необходимости сам вызывает catalog-service по внутреннему URL — это уже service-to-service, не браузер.

Подробнее про gateway и mesh — Коммуникация и интеграция, синхронная коммуникация.

Откуда сервис знает адрес соседа

СредаКак находят друг друга
Локально / Docker ComposeИмена сервисов в docker-compose.yml: http://catalog-service:3000
KubernetesDNS вида http://catalog-service.orders.svc.cluster.local
ОблакоService discovery, переменные окружения, конфиг из Consul/Vault

В коде адрес не хардкодят в репозитории — берут из process.env.CATALOG_SERVICE_URL / ConfigMap.

Пример вызова с gateway (клиент → заказ)

// Фронтенд — только публичный URL gateway
const res = await fetch('https://shop.example.com/api/orders', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${accessToken}`,
'Idempotency-Key': orderIdempotencyKey,
},
body: JSON.stringify({ items: [{ productId: 'PROD-001', qty: 2 }] }),
});

Разбор кода (фронтенд):

  • fetch — встроенный HTTP-клиент браузера; возвращает Promise с ответом.
  • Первый аргумент — полный URL gateway, не внутренний хост микросервиса.
  • method: 'POST' — создаём ресурс «заказ» (семантика REST для создания).
  • Content-Type: application/json — тело ниже парсится как JSON.
  • Authorization: Bearer ${accessToken} — JWT или access token после логина; gateway или order-service проверяют подпись и срок.
  • Idempotency-Key — стабильный ключ на попытку оформления; повтор после таймаута не создаст второй заказ (идемпотентность).
  • JSON.stringify({ items: [...] }) — тело запроса; productId и qty должны совпасть с контрактом API.
// order-service — внутренний вызов catalog (сервер-сервер)
const catalogBase = process.env.CATALOG_SERVICE_URL; // http://catalog-service:3000
const product = await fetch(`${catalogBase}/products/${productId}`, {
headers: { 'X-Request-Id': correlationId },
});

Разбор кода (order-service → catalog):

  • process.env.CATALOG_SERVICE_URL — базовый URL из окружения (Compose/K8s ConfigMap), не захардкожен в git.
  • Шаблонная строка `${catalogBase}/products/${productId}` — REST-ресурс «товар по id».
  • fetch без method — по умолчанию GET, только чтение карточки.
  • X-Request-Id — correlation id; тот же id попадёт в логи catalog, order и брокеров для разбора инцидента.
  • Ответ нужно проверить (res.ok / статус 404) перед созданием заказа — иначе риск нулевой цены.

После ответа пользователю order-service не ждёт отправки письма: кладёт задачу в RabbitMQ и событие в Kafka (см. ниже). Пользователь уже получил 201, остальное — фон.


MongoDB — подключение и место в системе

Зачем отдельная MongoDB у catalog-service

Каталог часто меняет поля (атрибуты, локали, медиа). Database per service: только catalog-service пишет в свою MongoDB; order-service хранит снимок productId, цену и название на момент заказа в PostgreSQL, а актуальную карточку запрашивает по HTTP при оформлении.

Установка и shell — Первые шаги с MongoDB. Модель данных — MongoDB — документоориентированная БД.

Docker для учебного стенда

# фрагмент docker-compose.yml
services:
mongo:
image: mongo:7
ports:
- "27017:27017"
volumes:
- mongo_data:/data/db

volumes:
mongo_data:

Разбор кода (Docker):

  • services.mongo — имя сервиса; в сети Compose хост mongo резолвится в этот контейнер.
  • image: mongo:7 — официальный образ MongoDB 7.x.
  • ports: "27017:27017" — проброс порта на хост для Compass и локальных драйверов.
  • volumes: mongo_data:/data/db — данные переживают пересоздание контейнера.
  • Именованный том mongo_data объявлен в секции volumes внизу файла.

TypeScript / Node.js (catalog-service)

Переменная окружения (никогда не коммитьте пароль в git):

MONGODB_URI=mongodb://mongo:27017/catalog

Разбор кода (URI):

  • mongodb:// — схема драйвера MongoDB (аналог https:// для HTTP).
  • mongo — hostname сервиса в Docker Compose (не localhost из другого контейнера).
  • 27017 — стандартный порт mongod.
  • /catalog — имя базы данных по умолчанию для client.db() без аргумента.

Минимальный модуль:

import { MongoClient } from 'mongodb';

const uri = process.env.MONGODB_URI;
if (!uri) throw new Error('Задайте MONGODB_URI');

let client;
export async function getDb() {
if (!client) {
client = new MongoClient(uri, { maxPoolSize: 20 });
await client.connect();
}
return client.db(); // база из пути URI: catalog
}

export async function closeMongo() {
await client?.close();
}

Разбор кода (модуль MongoDB):

  • import { MongoClient } from 'mongodb' — официальный драйвер Node.js.
  • process.env.MONGODB_URI — строка подключения из .env / секретов оркестратора.
  • let client — один экземпляр на процесс (singleton), чтобы не открывать TCP на каждый запрос.
  • new MongoClient(uri, { maxPoolSize: 20 }) — пул до 20 соединений к mongod.
  • await client.connect() — ленивое подключение при первом getDb().
  • client.db() — база catalog из пути URI.
  • export async function getDb() — единая точка доступа для роутов Express.
  • client?.close() — graceful shutdown при SIGTERM в K8s.

Эндпоинт «товар по id»:

import express from 'express';
import { getDb } from './mongo.js';

const app = express();

app.get('/products/:id', async (req, res) => {
const db = await getDb();
const doc = await db.collection('products').findOne({ _id: req.params.id });
if (!doc) return res.status(404).json({ error: 'not_found' });
res.json(doc);
});

Разбор кода (эндпоинт):

  • express() / app.get — HTTP-сервер и маршрут GET с параметром :id в path.
  • req.params.id — значение сегмента URL (/products/PROD-001PROD-001).
  • db.collection('products') — коллекция документов каталога (аналог таблицы).
  • findOne({ _id: req.params.id }) — поиск одного документа по первичному ключу _id.
  • res.status(404).json({ error: 'not_found' }) — контракт ошибки для order-service и gateway.
  • res.json(doc) — сериализация BSON-документа в JSON для ответа.

Readiness: в /health/ready делайте db.command({ ping: 1 }) — gateway не направит трафик на pod без живой БД (FAQ по health).


Redis — подключение и роль рядом с API

Redis не заменяет MongoDB и PostgreSQL. В нашем сценарии:

  • cache-aside для GET /products/:id — снижает нагрузку на MongoDB;
  • опционально rate limit по customerId на gateway;
  • distributed lock на короткое время при резервировании остатка (осторожно с TTL и идемпотентностью).

Полный рабочий проект (Express, Pub/Sub, Streams, docker-compose) — в Redis в интеграции.

Фрагмент cache-aside в catalog-service:

import { redis } from './redisClient.js'; // см. 129.md

const cacheKey = `product:v1:${productId}`;
const cached = await redis.get(cacheKey);
if (cached) {
res.set('X-Cache', 'HIT');
return res.json(JSON.parse(cached));
}
const doc = await db.collection('products').findOne({ _id: productId });
const ttl = 60 + Math.floor(Math.random() * 15); // jitter
await redis.setex(cacheKey, ttl, JSON.stringify(doc));
res.set('X-Cache', 'MISS');
res.json(doc);

Разбор кода (cache-aside):

  • product:v1:${productId} — ключ кэша с версией префикса v1 для массовой инвалидации.
  • redis.get(cacheKey) — чтение строки из Redis; null/undefined — промах (MISS).
  • X-Cache: HIT — служебный заголовок для отладки и метрик доли попаданий в кэш.
  • JSON.parse(cached) — в Redis храним JSON-текст, не BSON.
  • findOne в MongoDB — выполняется только при MISS.
  • Math.random() * 15 (jitter) — разброс TTL, чтобы ключи не истекали одновременно.
  • setex(key, ttl, value) — атомарно «значение + TTL в секундах».
  • X-Cache: MISS — ответ собран из источника и положен в кэш.

При изменении цены в MongoDB сбрасывайте ключ (DEL product:v1:*) или поднимайте версию префикса product:v2:.


RabbitMQ — подключение и фоновые задачи

Зачем очередь, если уже есть HTTP

HTTP от order-service к notification-worker создаёт синхронную зависимость: упал worker — тормозится или падает создание заказа. RabbitMQ развязывает по времени: заказ сохранён в PostgreSQL → сообщение в очередь → worker обрабатывает когда может, с retry и DLQ.

Типичная топология:

  • exchange orders.tasks (type direct);
  • queue notifications.email с binding key email.send;
  • DLQ для сообщений после N ошибок.

Теория, management UI, ack/nack — RabbitMQ.

Docker

services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"

Разбор кода (Docker RabbitMQ):

  • rabbitmq:3-management — образ с плагином веб-UI (Management).
  • 5672 — порт протокола AMQP для приложений.
  • 15672 — HTTP-интерфейс Management (guest/guest только для dev).

Producer в order-service (после commit в БД)

import amqp from 'amqplib';

const RABBIT_URL = process.env.RABBITMQ_URL ?? 'amqp://guest:guest@rabbitmq:5672';

export async function enqueueOrderEmail({ orderId, email, correlationId }) {
const conn = await amqp.connect(RABBIT_URL);
const ch = await conn.createChannel();
await ch.assertExchange('orders.tasks', 'direct', { durable: true });
await ch.assertQueue('notifications.email', { durable: true });
await ch.bindQueue('notifications.email', 'orders.tasks', 'email.send');

const body = Buffer.from(JSON.stringify({ orderId, email }));
ch.publish('orders.tasks', 'email.send', body, {
persistent: true,
contentType: 'application/json',
headers: { 'x-correlation-id': correlationId },
});
await ch.close();
await conn.close();
}

Разбор кода (producer RabbitMQ):

  • amqplib — клиент AMQP 0-9-1 для Node.js.
  • amqp.connect(RABBIT_URL) — TCP-сессия к брокеру; URL содержит логин, хост, vhost.
  • createChannel() — виртуальное соединение для команд (легче, чем новый TCP на каждое сообщение).
  • assertExchange(..., 'direct', { durable: true }) — exchange переживает рестарт брокера; direct маршрутизирует по точному routing key.
  • assertQueue(..., { durable: true }) — очередь на диске, не теряется при рестарте.
  • bindQueue(queue, exchange, 'email.send') — в очередь попадут сообщения с routing key email.send.
  • Buffer.from(JSON.stringify(...)) — тело сообщения в бинарном виде.
  • publish(exchange, routingKey, body, options) — отправка без привязки к имени очереди (маршрутизация через exchange).
  • persistent: truedelivery_mode=2, сообщение пишется на диск (при durable queue).
  • x-correlation-id — пользовательский заголовок для трассировки.
  • ch.close() / conn.close() — в проде чаще долгоживущий канал, здесь упрощение для учебника.

Вызывайте только после успешного COMMIT заказа в PostgreSQL. Иначе письмо уйдёт без заказа или наоборот.

Consumer в notification-worker

import amqp from 'amqplib';

const conn = await amqp.connect(process.env.RABBITMQ_URL);
const ch = await conn.createChannel();
await ch.prefetch(10); // не забирать сотни сообщений без обработки

await ch.consume('notifications.email', async (msg) => {
if (!msg) return;
try {
const payload = JSON.parse(msg.content.toString());
await sendEmail(payload); // SMTP / SendGrid
ch.ack(msg);
} catch (err) {
// reject без requeue → DLQ, если настроена политика
ch.nack(msg, false, false);
}
});

Разбор кода (consumer RabbitMQ):

  • prefetch(10) — broker отдаёт не больше 10 unacked сообщений на consumer; защита от перегрузки памяти.
  • consume(queue, callback)push-модель: брокер вызывает callback при появлении сообщения.
  • msg === null — канал закрыт, выходим из обработчика.
  • msg.content.toString() — тело AMQP → строка → JSON.parse.
  • sendEmail(payload) — бизнес-логика worker; здесь не блокирует HTTP пользователя.
  • ch.ack(msg) — подтверждение успеха; сообщение удаляется из очереди (at-least-once после ack).
  • ch.nack(msg, false, false) — отказ: не requeue (requeue=false) → политика DLQ или discard.

Идемпотентность: храните processed_message_id или проверяйте «письмо по orderId уже отправлено» — at-least-once даст дубликаты (идемпотентность).


Kafka — подключение и поток событий

Чем Kafka отличается от RabbitMQ в этом же заказе

RabbitMQ в примереKafka в примере
Задача«Сделай сейчас: отправь email»«Зафиксируй факт: заказ создан»
ПотребителиОдин worker на очередь (масштаб — несколько consumer одной очереди)Много независимых consumer group (аналитика, антифрод, поиск)
ИсторияСообщение удаляется после ackСобытия хранятся по retention; можно перечитать

Оба могут сосуществовать: RabbitMQ для команд, Kafka для событий домена.

Подробнее — Kafka, практикум KRaft — Java-приложение с Kafka и PostgreSQL.

Producer в order-service (KafkaJS, Node.js)

import { Kafka } from 'kafkajs';

const kafka = new Kafka({
clientId: 'order-service',
brokers: (process.env.KAFKA_BROKERS ?? 'kafka:9092').split(','),
});

const producer = kafka.producer();

export async function publishOrderCreated(event) {
await producer.connect();
await producer.send({
topic: 'shop.orders.v1',
messages: [
{
key: event.orderId, // порядок событий одного заказа в одной партиции
value: JSON.stringify(event),
headers: { 'correlation-id': event.correlationId },
},
],
});
}

Разбор кода (producer Kafka):

  • Kafka — точка входа KafkaJS; clientId идентифициует приложение в логах брокера.
  • brokers — список host:port; split(',') для нескольких брокеров кластера.
  • kafka.producer() — объект для записи в топики.
  • producer.connect() — установка метаданных топиков (в проде connect один раз при старте).
  • producer.send({ topic, messages }) — пакетная отправка (здесь одно сообщение); батчинг продюсера — Пакетная работа с данными.
  • key: event.orderId — все события одного заказа попадают в одну партицию → порядок внутри ключа.
  • value: JSON.stringify(event) — полезная нагрузка как UTF-8 строка (или Avro/Protobuf в зрелых системах).
  • headers — метаданные вне тела; correlation-id для observability.

Топик создают заранее (или через IaC) с нужным числом партиций; consumer group analytics и fraud читают один топик независимо.

Consumer (analytics-consumer)

const consumer = kafka.consumer({ groupId: 'analytics' });
await consumer.connect();
await consumer.subscribe({ topic: 'shop.orders.v1', fromBeginning: false });

await consumer.run({
eachMessage: async ({ message }) => {
const evt = JSON.parse(message.value.toString());
await insertIntoWarehouse(evt); // идемпотентный upsert по orderId
},
});

Разбор кода (consumer Kafka):

  • consumer({ groupId: 'analytics' })consumer group; Kafka балансирует партиции между членами группы.
  • subscribe({ topic, fromBeginning: false }) — читать только новые сообщения после подписки (не с начала топика).
  • consumer.run({ eachMessage }) — цикл опроса; callback на каждое сообщение.
  • message.value.toString() — байты значения → строка JSON.
  • insertIntoWarehouse(evt) — запись в DWH; должна быть идемпотентной по orderId.
  • Явный commit в KafkaJS настраивается отдельно; по умолчанию commit после обработки — не коммитьте до успешного upsert.

Commit offset после успешной записи в DWH. Рост consumer lag — главный сигнал отставания (FAQ).


Клиенты на разных языках

Ниже — те же операции, что в примерах выше (MongoDB findOne, Redis GET/SETEX, RabbitMQ publish в orders.tasks, Kafka produce в shop.orders.v1, HTTP GET к catalog). В каждом стеке свой официальный или де-факто стандартный клиент; имена классов и методов различаются, смысл один.

Подробные справочники по брокерам — RabbitMQ, Kafka (разделы «Подключение приложений»).

HTTP — order-service вызывает catalog-service

ЯзыкПакет / модульТипичные API
TypeScriptfetch (встроенный)fetch(url, { headers }), res.json()
Pythonhttpx или requestshttpx.get(url, headers=...), .json()
Javajava.net.http.HttpClient или Spring WebClientHttpClient.send(request, BodyHandlers.ofString())
C#System.Net.Http.HttpClientGetAsync, ReadFromJsonAsync<T>()
Gonet/httphttp.NewRequest, http.Client.Do
PHPguzzlehttp/guzzleGuzzleHttp\Client::get()
const res = await fetch(`${catalogBase}/products/${productId}`, {
headers: { 'X-Request-Id': correlationId },
});
if (!res.ok) throw new Error(`catalog ${res.status}`);
const product = await res.json();

Разбор кода (TypeScript):

  • `${catalogBase}/products/${productId}` — тот же контракт path, что у Express app.get('/products/:id').
  • res.okfalse при 4xx/5xx; без проверки можно принять ошибку за товар.
  • res.json() — парсинг тела ответа в объект JavaScript.
import httpx

r = httpx.get(f"{catalog_base}/products/{product_id}", headers={"X-Request-Id": cid}, timeout=5.0)
r.raise_for_status()
product = r.json()

Разбор кода (Python):

  • httpx.get — синхронный GET; для async-сервиса — httpx.AsyncClient.
  • timeout=5.0 — не ждать catalog дольше 5 с (согласовать с timeout gateway и order-service).
  • raise_for_status() — исключение при 404/503 вместо тихого разбора HTML ошибки.
  • r.json() — dict с полями карточки товара.
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(catalogBase + "/products/" + productId))
.header("X-Request-Id", correlationId)
.GET()
.build();
String body = HttpClient.newHttpClient().send(request, HttpResponse.BodyHandlers.ofString()).body();

Разбор кода (Java):

  • HttpRequest.newBuilder() — fluent-сборка запроса (Java 11+).
  • .uri(URI.create(...)) — полный URL ресурса.
  • .header("X-Request-Id", ...) — произвольный заголовок трассировки.
  • .GET() — метод без тела.
  • HttpClient.send — блокирующий вызов; body() — строка JSON (дальше — Jackson/Gson).
using var client = new HttpClient();
client.DefaultRequestHeaders.Add("X-Request-Id", correlationId);
var product = await client.GetFromJsonAsync<ProductDto>($"{catalogBase}/products/{productId}");

Разбор кода (C#):

  • using var clientHttpClient на процесс (не создавать новый на каждый запрос).
  • DefaultRequestHeaders.Add — общий заголовок для всех вызовов этого клиента.
  • GetFromJsonAsync<ProductDto> — десериализация JSON в типизированный DTO.
req, _ := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/products/%s", catalogBase, productID), nil)
req.Header.Set("X-Request-Id", correlationID)
resp, err := http.DefaultClient.Do(req)

Разбор кода (Go):

  • http.NewRequest — конструктор запроса; MethodGet — GET.
  • req.Header.Set — заголовки до Do.
  • http.DefaultClient.Do — выполнение; нужно проверить resp.StatusCode и прочитать resp.Body (здесь сокращено).
$client = new \GuzzleHttp\Client(['base_uri' => $catalogBase]);
$response = $client->get("products/{$productId}", ['headers' => ['X-Request-Id' => $cid]]);
$product = json_decode($response->getBody()->getContents(), true);

Разбор кода (PHP):

  • base_uri — префикс URL; относительный путь products/{id} дописывается к нему.
  • $client->get(..., ['headers' => ...]) — GET с заголовками.
  • getBody()->getContents() — строка ответа.
  • json_decode(..., true) — ассоциативный массив PHP вместо объекта.

MongoDB — чтение карточки товара

ЯзыкПакетКлассы / функцииОперация в примере
TypeScriptmongodbMongoClient, Db, Collectioncollection.findOne({ _id })
PythonpymongoMongoClient, Database, Collectioncollection.find_one({"_id": ...})
Javamongodb-driver-syncMongoClients, MongoCollection, Filterscollection.find(eq("_id", id)).first()
C#MongoDB.DriverMongoClient, IMongoCollection<T>, FilterDefinitionFind(filter).FirstOrDefaultAsync()
Gogo.mongodb.org/mongo-drivermongo.Client, mongo.CollectionFindOne(ctx, bson.M{"_id": id})
PHPmongodb/mongodb (расширение ext-mongodb)MongoDB\Client, CollectionfindOne(['_id' => $id])
from pymongo import MongoClient

client = MongoClient(os.environ["MONGODB_URI"])
doc = client["catalog"]["products"].find_one({"_id": product_id})

Разбор кода (Python / pymongo):

  • MongoClient(uri) — пул соединений к кластеру/standalone.
  • client["catalog"] — доступ к БД catalog (квадратные скобки).
  • ["products"] — коллекция products.
  • find_one({"_id": product_id}) — один документ или None.
import com.mongodb.client.MongoClients;
import com.mongodb.client.model.Filters;

var client = MongoClients.create(mongoUri);
var doc = client.getDatabase("catalog")
.getCollection("products")
.find(Filters.eq("_id", productId))
.first();

Разбор кода (Java):

  • MongoClients.create(mongoUri) — фабрика синхронного драйвера.
  • getDatabase("catalog").getCollection("products") — цепочка БД → коллекция.
  • find(Filters.eq("_id", productId)) — курсор с фильтром по _id.
  • .first() — первый документ или null.
using MongoDB.Driver;

var client = new MongoClient(mongoUri);
var products = client.GetDatabase("catalog").GetCollection<BsonDocument>("products");
var doc = await products
.Find(Builders<BsonDocument>.Filter.Eq("_id", productId))
.FirstOrDefaultAsync();

Разбор кода (C#):

  • MongoClient / GetDatabase / GetCollection<BsonDocument> — типизированный доступ к коллекции.
  • Builders<BsonDocument>.Filter.Eq("_id", productId) — фильтр эквивалентен { _id: id }.
  • FirstOrDefaultAsync — async-чтение одного документа.
import (
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)

coll := client.Database("catalog").Collection("products")
var doc bson.M
err := coll.FindOne(ctx, bson.M{"_id": productID}).Decode(&doc)

Разбор кода (Go):

  • ctx — контекст отмены/таймаута (обязателен в mongo-driver).
  • bson.M{"_id": productID} — фильтр как map.
  • FindOne(...).Decode(&doc) — декодирование BSON в bson.M или struct.
$client = new MongoDB\Client(getenv('MONGODB_URI'));
$doc = $client->selectDatabase('catalog')
->selectCollection('products')
->findOne(['_id' => $productId]);

Разбор кода (PHP):

  • MongoDB\Client — высокоуровневая обёртка над ext-mongodb.
  • selectDatabase / selectCollection — явный выбор БД и коллекции.
  • findOne(['_id' => $productId]) — массив фильтра → один документ или null.

Redis — cache-aside

ЯзыкПакетКлассы / типыОперация в примере
TypeScriptioredis или redisRedisget, setex
Pythonredisredis.Redisget, setex
Javalettuce-core (или Jedis)RedisClient, StatefulRedisConnection, RedisCommandssync().get, sync().setex
C#StackExchange.RedisConnectionMultiplexer, IDatabaseStringGetAsync, StringSetAsync с TTL
Gogithub.com/redis/go-redis/v9redis.ClientGet, Set с time.Duration
PHPpredis/predisPredis\Clientget, setex
import redis

r = redis.Redis.from_url(os.environ["REDIS_URL"])
key = f"product:v1:{product_id}"
cached = r.get(key)
if cached:
return json.loads(cached)
doc = collection.find_one({"_id": product_id})
r.setex(key, 60, json.dumps(doc, default=str))

Разбор кода (Python / redis):

  • Redis.from_url — парсинг redis://host:port/db.
  • r.get(key)bytes или None; при hit — json.loads.
  • setex(key, seconds, value) — запись с TTL 60 с.
  • default=str в json.dumps — сериализация дат/ObjectId из Mongo.
RedisClient redis = RedisClient.create(redisUrl);
try (var connection = redis.connect()) {
RedisCommands<String, String> cmd = connection.sync();
String cached = cmd.get("product:v1:" + productId);
if (cached != null) { /* deserialize */ }
cmd.setex("product:v1:" + productId, 60, json);
}

Разбор кода (Java / Lettuce):

  • RedisClient.create — клиент Lettuce (Netty под капотом).
  • connect() + try-with-resources — освобождение соединения.
  • connection.sync() — синхронные команды Redis.
  • get / setex — те же команды, что GET / SETEX в redis-cli.
var mux = await ConnectionMultiplexer.ConnectAsync(redisUrl);
var db = mux.GetDatabase();
var key = $"product:v1:{productId}";
var cached = await db.StringGetAsync(key);
if (cached.HasValue) { /* hit */ }
await db.StringSetAsync(key, json, TimeSpan.FromSeconds(60));

Разбор кода (C# / StackExchange.Redis):

  • ConnectionMultiplexer — один мультиплексор на приложение (тысячи logical DB).
  • GetDatabase() — логическая БД Redis (0 по умолчанию).
  • StringGetAsync — GET; HasValue — проверка hit.
  • StringSetAsync с TimeSpan — SET с истечением.
rdb := redis.NewClient(&redis.Options{Addr: "redis:6379"})
key := "product:v1:" + productID
cached, err := rdb.Get(ctx, key).Result()
if err == nil { /* hit */ }
rdb.Set(ctx, key, jsonBytes, 60*time.Second)

Разбор кода (Go / go-redis):

  • redis.NewClient — клиент с адресом брокера.
  • Get(ctx, key).Result() — при redis.Nil — промах кэша.
  • Set(ctx, key, value, expiration) — SET с TTL (60*time.Second).
$redis = new Predis\Client(getenv('REDIS_URL'));
$key = "product:v1:$productId";
if ($cached = $redis->get($key)) {
return json_decode($cached, true);
}
$redis->setex($key, 60, json_encode($doc));

Разбор кода (PHP / Predis):

  • Predis\Client — чистый PHP-клиент (без ext-redis).
  • $redis->get($key) — в условии if — hit/miss.
  • setex($key, $seconds, $payload) — SETEX одной командой.

RabbitMQ — publish задачи «отправить email»

ЯзыкПакетКлассы / функцииОперация в примере
TypeScriptamqplibconnect, Channelpublish, consume, ack
PythonpikaBlockingConnection, channelbasic_publish, basic_consume, basic_ack
Javacom.rabbitmq:amqp-clientConnectionFactory, Connection, ChannelbasicPublish, basicConsume, basicAck
C#RabbitMQ.ClientConnectionFactory, IConnection, IModelBasicPublish, BasicConsume, BasicAck
Gogithub.com/rabbitmq/amqp091-goamqp.Dial, ChannelPublishWithContext, Consume, Ack
PHPphp-amqplib/php-amqplibAMQPStreamConnection, channelbasic_publish, basic_consume, basic_ack
import pika, json

params = pika.URLParameters(os.environ["RABBITMQ_URL"])
with pika.BlockingConnection(params) as conn:
ch = conn.channel()
ch.exchange_declare("orders.tasks", "direct", durable=True)
ch.basic_publish(
"orders.tasks",
"email.send",
json.dumps({"orderId": order_id, "email": email}),
pika.BasicProperties(delivery_mode=2, content_type="application/json"),
)

Разбор кода (Python / pika):

  • URLParameters — разбор amqp://user:pass@host:5672/vhost.
  • BlockingConnection — синхронный клиент (для worker/скриптов).
  • exchange_declare(..., durable=True) — устойчивый exchange.
  • basic_publish(exchange, routing_key, body, properties) — аналог channel.publish в Node.
  • delivery_mode=2 — persistent message.
ConnectionFactory factory = new ConnectionFactory();
factory.setUri(rabbitUrl);
try (Connection conn = factory.newConnection(); Channel ch = conn.createChannel()) {
ch.exchangeDeclare("orders.tasks", BuiltinExchangeType.DIRECT, true);
ch.basicPublish("orders.tasks", "email.send", null, body.getBytes(StandardCharsets.UTF_8));
}

Разбор кода (Java):

  • ConnectionFactory.setUri — единая строка подключения AMQP.
  • createConnection / createChannel — TCP + виртуальный канал.
  • exchangeDeclare(..., DIRECT, true) — durable direct exchange.
  • basicPublish — 4-й аргумент byte[] тела; null properties — без доп. заголовков.
var factory = new ConnectionFactory { Uri = new Uri(rabbitUrl) };
using var conn = factory.CreateConnection();
using var ch = conn.CreateModel();
ch.ExchangeDeclare("orders.tasks", ExchangeType.Direct, durable: true);
ch.BasicPublish("orders.tasks", "email.send", body: Encoding.UTF8.GetBytes(json));

Разбор кода (C#):

  • ConnectionFactory + CreateConnection / CreateModelIModel ≈ Channel в AMQP.
  • ExchangeDeclare — объявление топологии (идемпотентно при старте).
  • BasicPublish — именованные аргументы exchange, routing key, body.
conn, _ := amqp.Dial(rabbitURL)
ch, _ := conn.Channel()
_ = ch.PublishWithContext(ctx, "orders.tasks", "email.send", false, false, amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "application/json",
Body: body,
})

Разбор кода (Go / amqp091-go):

  • amqp.Dial — подключение по URL или host.
  • PublishWithContext — publish с отменой через ctx.
  • DeliveryMode: amqp.Persistent — сообщение на диск.
  • Первые false, false — mandatory/immediate (обычно false).
$connection = new AMQPStreamConnection('rabbitmq', 5672, 'guest', 'guest');
$channel = $connection->channel();
$msg = new AMQPMessage($json, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($msg, 'orders.tasks', 'email.send');

Разбор кода (PHP / php-amqplib):

  • AMQPStreamConnection — host, port, user, password.
  • AMQPMessage с delivery_mode — persistent.
  • basic_publish($msg, $exchange, $routingKey) — порядок аргументов как в спецификации PHP-библиотеки.

Consumer (notification-worker) на всех языках — basic_consume / Consume + обработчик + ack после успешного SMTP; при ошибке — nack без requeue в DLQ (RabbitMQ).


Kafka — produce события order.created

ЯзыкПакетКлассы / функцииОперация в примере
TypeScriptkafkajsKafka, Producer, Consumerproducer.send, consumer.run
Pythonconfluent-kafkaProducer, Consumerproduce, poll, commit
Javaorg.apache.kafka:kafka-clientsKafkaProducer, KafkaConsumer, ProducerRecordsend, poll, commitSync
C#Confluent.KafkaProducerBuilder, ConsumerBuilder, MessageProduceAsync, Consume, Commit
Gogithub.com/segmentio/kafka-gokafka.Writer, kafka.ReaderWriteMessages, ReadMessage, CommitMessages
PHPext-rdkafka + php-rdkafka/php-rdkafkaRdKafka\Producer, RdKafka\KafkaConsumerproduce, consume, commit
from confluent_kafka import Producer
import json

p = Producer({"bootstrap.servers": os.environ["KAFKA_BROKERS"]})
p.produce(
"shop.orders.v1",
key=order_id,
value=json.dumps(event),
headers=[("correlation-id", correlation_id.encode())],
)
p.flush()

Разбор кода (Python / confluent-kafka):

  • Producer({"bootstrap.servers": ...}) — конфиг librdkafka.
  • produce(topic, key=, value=, headers=) — асинхронная постановка в очередь отправки.
  • flush() — дождаться доставки в брокер перед завершением процесса.
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
producer.send(new ProducerRecord<>("shop.orders.v1", orderId, json));
}

Разбор кода (Java / kafka-clients):

  • ProducerConfig.BOOTSTRAP_SERVERS_CONFIG — адреса брокеров.
  • KEY/VALUE_SERIALIZER_CLASS_CONFIG — String → bytes для ключа и JSON-тела.
  • KafkaProducer — потокобезопасный producer.
  • ProducerRecord(topic, key, value) — одно сообщение; send асинхронный (нужен flush при shutdown).
var config = new ProducerConfig { BootstrapServers = brokers };
using var producer = new ProducerBuilder<string, string>(config).Build();
await producer.ProduceAsync("shop.orders.v1", new Message<string, string> {
Key = orderId,
Value = json,
});

Разбор кода (C# / Confluent.Kafka):

  • ProducerConfig — bootstrap servers, security, acks.
  • ProducerBuilder<string, string>.Build() — типизированный producer.
  • ProduceAsync + Message { Key, Value } — запись в топик с await.
w := &kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: "shop.orders.v1",
Balancer: &kafka.LeastBytes{},
}
_ = w.WriteMessages(ctx, kafka.Message{Key: []byte(orderID), Value: eventJSON})

Разбор кода (Go / kafka-go):

  • kafka.Writer — высокоуровневый producer segmentio.
  • Addr: kafka.TCP(brokers...) — список брокеров.
  • Balancer: &kafka.LeastBytes{} — выбор партиции по загрузке (без key — иначе hash по key).
  • WriteMessages — batch-friendly отправка.
$producer = new RdKafka\Producer();
$producer->addBrokers(getenv('KAFKA_BROKERS'));
$topic = $producer->newTopic('shop.orders.v1');
$topic->produce(RD_KAFKA_PARTITION_UA, 0, $json, $orderId);
$producer->flush(1000);

Разбор кода (PHP / rdkafka):

  • RdKafka\Producer — обёртка над librdkafka (нужно ext-rdkafka).
  • addBrokers — список host:port.
  • newTopic + produce — partition RD_KAFKA_PARTITION_UA (брокер выберет по key).
  • flush(1000) — таймаут ожидания отправки в мс.

Consumer (analytics) — отдельная group.id (analytics), чтение топика shop.orders.v1, идемпотентный upsert в DWH, commit offset после успешной записи.


Как ориентироваться при выборе клиента

СитуацияРекомендация
Один язык во всех микросервисахОфициальный драйвер MongoDB/Kafka для платформы; Redis — ioredis / Lettuce / StackExchange.Redis
JVM-экосистемаSpring Boot — spring-boot-starter-data-mongodb, spring-kafka, spring-boot-starter-amqp поверх тех же клиентов
.NETMongoDB.Driver, Confluent.Kafka, RabbitMQ.Client, StackExchange.Redis
PHPРасширения ext-mongodb, ext-redis или Predis; Kafka — только через librdkafka (ext-rdkafka)
Строгие контракты между сервисамиOpenAPI для HTTP; JSON Schema / Avro для Kafka — см. Kafka

Полноценный учебный стенд на TypeScript (Redis, Pub/Sub, Streams) — 129.md. Java + Kafka + PostgreSQLпрактикум 1191.


Один проход — создание заказа от клика до аналитики

ШагКтоЧто происходитТехнология
1SPA → GatewayPOST /api/ordersHTTPS, JWT
2Gateway → order-serviceМаршрутизация, rate limitAPI Gateway
3order-service → catalog-serviceПроверка товара и ценыHTTP (sync)
4catalog-serviceЧтение карточкиRedis → при MISS MongoDB
5order-serviceINSERT заказ, статус createdPostgreSQL
6order-serviceЗадача «отправить email»RabbitMQ publish
7order-serviceСобытие order.createdKafka produce
8order-service → клиент201 + тело заказаHTTP
9notification-workerSMTP / pushRabbitMQ consume + ack
10analytics-consumerСтрока в витринеKafka consume + commit offset

Если на шаге 3 catalog-service недоступен — возвращайте 503 или используйте circuit breaker, не создавайте заказ с нулевой ценой (Инженерия устойчивости).


Учебный docker-compose (все порты в одном файле)

Скелет для локальной отработки связей (образы и тома сокращены; в production секреты — из vault, сеть — internal):

services:
api-gateway:
image: nginx:alpine
ports:
- "8080:80"
# upstream: catalog-service, order-service

catalog-service:
build: ./catalog-service
environment:
MONGODB_URI: mongodb://mongo:27017/catalog
REDIS_URL: redis://redis:6379/0
depends_on: [mongo, redis]

order-service:
build: ./order-service
environment:
DATABASE_URL: postgres://postgres:postgres@postgres:5432/orders
CATALOG_SERVICE_URL: http://catalog-service:3000
RABBITMQ_URL: amqp://guest:guest@rabbitmq:5672
KAFKA_BROKERS: kafka:9092
depends_on: [postgres, rabbitmq, kafka]

notification-worker:
build: ./notification-worker
environment:
RABBITMQ_URL: amqp://guest:guest@rabbitmq:5672

analytics-consumer:
build: ./analytics-consumer
environment:
KAFKA_BROKERS: kafka:9092

mongo:
image: mongo:7
redis:
image: redis:7-alpine
postgres:
image: postgres:16-alpine
rabbitmq:
image: rabbitmq:3-management
kafka:
image: bitnami/kafka:3

Разбор кода (docker-compose):

  • build: ./catalog-service — образ из Dockerfile рядом с compose-файлом.
  • environment — URL зависимостей; имена хостов (mongo, redis) из имён services.
  • depends_on — порядок старта контейнеров (не гарантия готовности сервиса — для prod нужны healthcheck).
  • CATALOG_SERVICE_URL: http://catalog-service:3000 — внутренний DNS Compose для HTTP между сервисами.
  • DATABASE_URL — PostgreSQL только у order-service (database per service).
  • notification-worker и analytics-consumer — отдельные процессы без публичных портов, только к брокерам.
  • Инфраструктурные сервисы (mongo, redis, postgres, rabbitmq, kafka) — общие для стенда, в prod часто managed-сервисы в облаке.

В Kubernetes те же зависимости задают Deployments, Services и Secrets; см. контейнеры в микросервисах.


Как выбрать инструмент в новом сервисе

НужноИнструмент
Хранить сущности сервиса, гибкая схемаMongoDB (или своя SQL — по домену)
Ускорить чтение, сессии, блокировкиRedis
Фоновая задача «сделай X», retry, DLQRabbitMQ
Событие «произошло Y», много подписчиков, перечитать потокKafka
Ответ пользователю «здесь и сейчас»HTTP/gRPC между сервисами через gateway

Рекомендую читать дальше

ТемаСтатья
Клиенты по языкам (таблицы выше)§ Клиенты на разных языках
Типы sync/asyncТипы взаимодействия
Полный Redis-проектRedis в интеграции
RabbitMQ углублённоRabbitMQ, 8.05 RabbitMQ
Kafka углублённоKafka, 8.05 Kafka
Outbox (БД + Kafka атомарно)Проектирование API — mTLS/JWS/outbox
Микросервисы в продеМикросервисы и интеграция — о разделе
Содержание
Освоение главы0%