Перейти к основному содержимому

Redis в интеграции и кэшировании

Всем

Где Redis усиливает интеграцию

Redis обычно включают в интеграционную схему по трём причинам:

  • снизить нагрузку на API и базы данных через кэш;
  • удержать всплески трафика через очереди и буферы;
  • ускорить обмен событиями между сервисами.

Redis работает как быстрый слой между сервисами. Он хранит горячие данные в памяти и поддерживает структуры, удобные для распределённых сценариев: string, hash, set, sorted set, stream.


Кэширование в API и микросервисах

Кэширование через Redis помогает снять нагрузку с источника данных и стабилизировать время ответа.

Базовый паттерн:

  1. Сервис запрашивает ключ в Redis.
  2. При попадании в кэш сразу возвращается значение.
  3. При промахе сервис читает данные из БД или внешнего API.
  4. Ответ сохраняется в Redis с TTL.

Практика:

  • ставьте TTL для всех кэшируемых ключей;
  • добавляйте jitter к TTL, чтобы ключи не истекали одновременно;
  • для критичных ключей используйте префиксы версий (user:v3:42), чтобы обновлять схему без массового удаления.

Сквозной пример — веб-приложение и полное подключение к Redis

Ниже минимальный, но законченный каркас на Node.js и Express: отдельный модуль подключения к Redis, HTTP API с кэшем, публикация в Pub/Sub, запись в Stream и два отдельных процесса-воркера (подписчик и consumer group). Подробный разбор команд redis-cli и ACL — в главе про Redis.


Структура каталога

redis-integration-demo/
├── package.json
├── .env.example
├── docker-compose.yml
├── src/
│ ├── redisClient.js
│ ├── server.js
│ ├── workerPubSub.js
│ └── workerStreams.js

Зависимости

package.json:

{
"name": "redis-integration-demo",
"private": true,
"type": "module",
"scripts": {
"start": "node src/server.js",
"worker:pubsub": "node src/workerPubSub.js",
"worker:streams": "node src/workerStreams.js"
},
"dependencies": {
"dotenv": "^16.4.5",
"express": "^4.21.0",
"ioredis": "^5.4.1"
}
}

Установка:

cd redis-integration-demo
npm install

Redis в Docker

docker-compose.yml поднимает один экземпляр на порту 6379 (для учебного стенда пароль не задан; в production включите ACL и requirepass, см. основную главу по Redis):

services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
command: ["redis-server", "--appendonly", "yes"]

Запуск:

docker compose up -d

Переменные окружения

.env.example (скопируйте в .env и при необходимости поправьте хост):

PORT=3000
# Один URL — ioredis сам разберёт хост, порт, пароль, TLS (rediss://)
REDIS_URL=redis://127.0.0.1:6379/0
# Базовый TTL кэша в секундах (jitter добавится в коде)
CACHE_TTL_SECONDS=60
# Pub/Sub — канал уведомлений (воркер подписывается на тот же)
REDIS_PUBSUB_CHANNEL=channel:integration-demo:notifications
# Streams — ключ потока, группа и имя потребителя в группе
STREAM_KEY=stream:integration-demo:events
STREAM_GROUP=demo-group
STREAM_CONSUMER=worker-1

Модуль клиента Redis

src/redisClient.js — единая точка подключения, переподключение, PING для health-check, корректное закрытие при остановке процесса:


import Redis from 'ioredis';

const url = process.env.REDIS_URL;
if (!url) {
throw new Error('Задайте REDIS_URL в .env');
}

export const redis = new Redis(url, {
maxRetriesPerRequest: 3,
enableReadyCheck: true,
connectTimeout: 5000,
commandTimeout: 3000,
retryStrategy(times) {
return Math.min(times * 100, 3000);
},
});

redis.on('error', (err) => {
console.error('[redis]', err.message);
});

export async function redisPing() {
const pong = await redis.ping();
return pong === 'PONG';
}

export async function closeRedis() {
await redis.quit();
}

HTTP-сервер с кэшем, Pub/Sub и Stream

src/server.js — чтение .env, JSON API, заголовок X-Cache (HIT / MISS), публикация уведомлений и запись событий в поток:


import 'dotenv/config';
import express from 'express';
import { redis, redisPing, closeRedis } from './redisClient.js';

const app = express();
app.use(express.json());

const PORT = Number(process.env.PORT) || 3000;
const baseTtl = Number(process.env.CACHE_TTL_SECONDS) || 60;
const pubsubChannel =
process.env.REDIS_PUBSUB_CHANNEL || 'channel:integration-demo:notifications';
const streamKey = process.env.STREAM_KEY || 'stream:integration-demo:events';

function ttlWithJitter(seconds) {
const jitter = Math.floor(Math.random() * Math.min(10, seconds * 0.1));
return seconds + jitter;
}

async function fetchCatalogFromSource() {
await new Promise((r) => setTimeout(r, 150));
return {
updatedAt: new Date().toISOString(),
items: [
{ id: 'a1', name: 'Книга' },
{ id: 'b2', name: 'Ноутбук' },
],
};
}

app.get('/health', async (_req, res) => {
try {
const ok = await redisPing();
if (!ok) {
return res.status(503).json({ status: 'degraded', redis: false });
}
return res.json({ status: 'ok', redis: true });
} catch {
return res.status(503).json({ status: 'degraded', redis: false });
}
});

app.get('/api/catalog', async (_req, res) => {
const cacheKey = 'cache:integration-demo:catalog:v1';

try {
const cached = await redis.get(cacheKey);
if (cached) {
res.set('X-Cache', 'HIT');
return res.type('application/json').send(cached);
}

const body = await fetchCatalogFromSource();
const json = JSON.stringify(body);
await redis.set(cacheKey, json, 'EX', ttlWithJitter(baseTtl));

res.set('X-Cache', 'MISS');
return res.json(body);
} catch (err) {
console.error(err);
const body = await fetchCatalogFromSource();
return res.json(body);
}
});

app.post('/api/notify', async (req, res) => {
const text =
typeof req.body?.message === 'string'
? req.body.message
: JSON.stringify(req.body ?? {});
try {
const receivers = await redis.publish(pubsubChannel, text);
return res.json({ ok: true, channel: pubsubChannel, receivers });
} catch (err) {
console.error(err);
return res.status(500).json({ ok: false, error: 'publish failed' });
}
});

app.post('/api/events', async (req, res) => {
const type = String(req.body?.type || 'event');
const payload = JSON.stringify(req.body?.payload ?? {});
try {
const id = await redis.xadd(
streamKey,
'*',
'type',
type,
'payload',
payload,
'ts',
new Date().toISOString(),
);
return res.json({ ok: true, stream: streamKey, id });
} catch (err) {
console.error(err);
return res.status(500).json({ ok: false, error: 'xadd failed' });
}
});

const server = app.listen(PORT, () => {
console.log(`http://localhost:${PORT}`);
});

async function shutdown(signal) {
console.log(signal);
server.close(async () => {
await closeRedis();
process.exit(0);
});
}

process.on('SIGINT', () => shutdown('SIGINT'));
process.on('SIGTERM', () => shutdown('SIGTERM'));

Воркер Pub/Sub

Подписчик держит отдельное TCP-соединение к Redis (так устроен режим SUBSCRIBE). В отдельном процессе это второй клиент с тем же REDIS_URL.

src/workerPubSub.js:


import 'dotenv/config';
import Redis from 'ioredis';

const url = process.env.REDIS_URL;
if (!url) throw new Error('Задайте REDIS_URL в .env');

const channel =
process.env.REDIS_PUBSUB_CHANNEL || 'channel:integration-demo:notifications';

const sub = new Redis(url);

sub.on('error', (err) => console.error('[pubsub]', err.message));

sub.on('message', (ch, message) => {
console.log(`[pubsub] ${ch}${message}`);
});

await sub.subscribe(channel);
console.log(`[pubsub] подписка на ${channel}`);

async function shutdown(signal) {
console.log(signal);
await sub.quit();
process.exit(0);
}

process.on('SIGINT', () => shutdown('SIGINT'));
process.on('SIGTERM', () => shutdown('SIGTERM'));

Воркер Streams с consumer group

src/workerStreams.js создаёт группу (если её ещё нет), читает новые записи (>), логирует поля и подтверждает обработку через XACK.


import 'dotenv/config';
import Redis from 'ioredis';

const url = process.env.REDIS_URL;
if (!url) throw new Error('Задайте REDIS_URL в .env');

const streamKey = process.env.STREAM_KEY || 'stream:integration-demo:events';
const group = process.env.STREAM_GROUP || 'demo-group';
const consumer =
process.env.STREAM_CONSUMER || `worker-${process.pid}`;

const redis = new Redis(url);

redis.on('error', (err) => console.error('[streams]', err.message));

async function ensureGroup() {
try {
await redis.xgroup('CREATE', streamKey, group, '0', 'MKSTREAM');
console.log(`[streams] группа ${group} создана`);
} catch (e) {
if (!String(e.message).includes('BUSYGROUP')) throw e;
console.log(`[streams] группа ${group} уже есть`);
}
}

function fieldsArrayToObject(fields) {
const o = {};
for (let i = 0; i < fields.length; i += 2) {
o[fields[i]] = fields[i + 1];
}
return o;
}

await ensureGroup();
console.log(`[streams] consumer ${consumer}, поток ${streamKey}`);

while (true) {
const batch = await redis.xreadgroup(
'GROUP',
group,
consumer,
'COUNT',
'10',
'BLOCK',
'8000',
'STREAMS',
streamKey,
'>',
);
if (!batch) continue;

for (const [, messages] of batch) {
for (const [id, fields] of messages) {
const row = fieldsArrayToObject(fields);
console.log('[streams]', id, row);
await redis.xack(streamKey, group, id);
}
}
}

Замечание по top-level await: в примерах включён "type": "module" в package.json, Node.js 18+ выполнит эти файлы как ES-модули с await на верхнем уровне. Если проект на CommonJS, оберните тело в async function main() { ... } main();.

Схема процессов:

Браузер / curl


server.js ──GET /api/catalog──► Redis STRING (кэш)

├──POST /api/notify──► PUBLISH ──► workerPubSub.js (SUBSCRIBE)

└──POST /api/events──► XADD ──► workerStreams.js (XREADGROUP + XACK)

Для горизонтального масштабирования обработки событий запустите несколько экземпляров worker:streams с разными STREAM_CONSUMER (например worker-1, worker-2) — Redis распределит партиции сообщений между членами одной группы.


Запуск трёх процессов

В трёх терминалах (Redis в Docker уже запущен):

npm start
npm run worker:pubsub
npm run worker:streams

Проверка:

curl -s http://localhost:3000/health
curl -sI http://localhost:3000/api/catalog | findstr X-Cache
curl -s http://localhost:3000/api/catalog

В bash вместо findstr можно написать grep -i x-cache.

Первый запрос к /api/catalog вернёт X-Cache: MISS, повторный в пределах TTL — X-Cache: HIT. Если Redis недоступен, обработчик всё равно отдаст ответ из «источника», без записи в кэш.

Pub/Sub (в логах воркера появится строка с текстом):

curl -s -X POST http://localhost:3000/api/notify -H "Content-Type: application/json" -d "{\"message\":\"Заказ 42 оплачен\"}"

Stream (воркер выведет id и поля type, payload, ts):

curl -s -X POST http://localhost:3000/api/events -H "Content-Type: application/json" -d "{\"type\":\"order.paid\",\"payload\":{\"orderId\":42}}"

Подключение из другой среды

  • Удалённый Redis — поменяйте REDIS_URL на хост managed-сервиса (ElastiCache, Azure Cache, Memorystore).
  • TLS — в URL используйте схему rediss:// и при необходимости отключите проверку сертификата только на отладке (в production настройте CA через опции клиента).
  • Python — тот же сценарий реализуется парой fastapi + redis.asyncio с from_url(os.environ["REDIS_URL"]) и middleware или зависимостью для get/set с TTL.

Redis Pub/Sub и Redis Streams

Рабочий код для обоих механизмов — в блоке «Сквозной пример» выше на этой странице (POST /api/notify, POST /api/events и воркеры).

Для обмена событиями Redis даёт два разных механизма:

  • Pub/Sub подходит для быстрых уведомлений в реальном времени;
  • Streams подходят для очередей событий с историей и группами потребителей.

Короткая разница:

  • Pub/Sub не хранит сообщения после отправки;
  • Streams хранят записи и позволяют читать их повторно;
  • Streams удобны для ретраев и контроля обработки через consumer groups.

Если нужен полноценный журнал событий с длительным хранением и горизонтальным масштабированием, чаще выбирают Kafka. Если нужен устойчивый task-queue с гибкой маршрутизацией сообщений, чаще выбирают RabbitMQ.


Что выбирать — Redis, RabbitMQ или Kafka

Ориентируйтесь на тип нагрузки и требуемые гарантии:

СценарийОсновной выбор
Кэш ответов API, сессии, rate limitingRedis
Фоновые задачи и маршрутизация по правиламRabbitMQ
Потоковая аналитика и event logKafka
Быстрые внутрисервисные уведомленияRedis Pub/Sub
Очереди событий с повторным чтениемRedis Streams или Kafka

Частый рабочий вариант в production:

  • RabbitMQ или Kafka переносят бизнес-события;
  • Redis хранит горячие данные и служебные счётчики;
  • HTTP API использует Redis-кэш для ускорения чтения.

Риски и защита

Основные риски при использовании Redis в интеграции:

  • stale data при слишком длинном TTL;
  • stampede при одновременном истечении популярных ключей;
  • рост памяти при неконтролируемых ключах;
  • потеря событий в Pub/Sub, если потребитель временно недоступен.

Базовые меры:

  • лимиты памяти и политика вытеснения (maxmemory-policy);
  • мониторинг hit ratio, latency и количества ключей;
  • идемпотентная обработка событий по messageId или eventId;
  • переход с Pub/Sub на Streams или брокер сообщений в критичных процессах.

Связанные материалы

См. также

Другие статьи этого же раздела в боковом меню (как на странице "О разделе").