8.05. RabbitMQ
RabbitMQ
Что такое RabbitMQ?
RabbitMQ — это популярный брокер сообщений (Message Broker), который реализует протокол AMQP. Он используется для асинхронного обмена данными между приложениями через очереди. RabbitMQ обеспечивает надёжную доставку сообщений, масштабируемость и гибкость.
Официальный сайт - https://www.rabbitmq.com/
Сообщения хранятся в очередях до тех пор, пока не будут обработаны. Производитель отправляет сообщение в очередь, а потребитель забирает его позже.

Производитель-потребитель
RabbitMQ использует модель «производитель-потребитель» с промежуточным хранилищем — очередью:
- Producer (Производитель) отправляет сообщения в RabbitMQ, может быть любым приложением или системой;
- Exchange (Обменник) принимает сообщения от производителя и определяет, в какую очередь поместить сообщение, исходя из правил маршрутизации;
- Queue (Очередь) хранит сообщения до тех пор, пока они не будут обработаны потребителем.
- Consumer (Потребитель) забирает сообщения из очереди и обрабатывает их согласно логике приложения.
RabbitMQ поддерживает несколько типов обменников (exchanges), которые определяют правила маршрутизации сообщений:
- Direct - сообщение отправляется в очередь, которая соответствует ключу маршрутизации;
- Fanout - сообщение рассылается во все очереди, связанные с этим обменником;
- Topic - сообщение отправляется в очереди, соответствующие шаблону ключа маршрутизации;
- Headers - маршрутизация основана на заголовках сообщения, а не на ключе.
Virtual Host — это изолированное пространство внутри RabbitMQ, которое позволяет разделять ресурсы (очереди, обменники, пользователи) между различными проектами или командами. Каждый Virtual Host имеет свои собственные очереди, обменники и разрешения.
Создать Virtual Host можно через консоль или веб-интерфейс. RabbitMQ предоставляет встроенный веб-интерфейс для мониторинга и управления брокером. Этот интерфейс называется RabbitMQ Management, который представляет собой плагин rabbitmq_management.
RabbitMQ Management по умолчанию доступен по адресу http://localhost:15672 под учётными данными guest/guest (логин/пароль). Интерфейс позволяет просматривать статистику (количество сообщений, скорость обработки, использование памяти), состояние очередей, обменников и соединений, создавать и удалять очереди, обменники.
Архитектура RabbitMQ
Базовые концепции модели обмена
Очереди, обменники и привязки
Очередь (queue) — буфер хранения сообщений с семантикой FIFO. Очередь принадлежит одному узлу кластера и может быть объявлена как постоянная (durable) для сохранения после перезапуска брокера.
Обменник (exchange) — точка маршрутизации, принимающая сообщения от продюсеров и распределяющая их по очередям согласно правилам привязки (bindings). Привязка определяет критерий маршрутизации: ключ маршрутизации (routing key) для direct/topic обменников или шаблон для headers.
Сообщение содержит:
- Тело (payload) — произвольные байты
- Свойства (properties):
content_type,message_id,correlation_id,reply_to,expiration,priority - Флаг
delivery_mode: 1 (volatile) или 2 (persistent) для надёжного хранения
Типы обменников и стратегии маршрутизации
- Direct — точное совпадение ключа маршрутизации с привязкой. Используется для адресной доставки.
- Fanout — рассылка во все привязанные очереди без учёта ключа. Реализует паттерн публикации-подписки.
- Topic — шаблонное совпадение ключа (
*— один сегмент,#— ноль или более сегментов). Пример:logs.error.#соответствуетlogs.error.api. - Headers — маршрутизация по совпадению заголовков сообщения (headers) с параметрами привязки. Поддерживает режимы
any(логическое ИЛИ) иall(логическое И). - Default (амплуа) — скрытый direct-обменник с именем
"", автоматически привязанный ко всем очередям по имени очереди как ключу.
Модель доставки и управление потоком
Продюсеры и потребители
Продюсер публикует сообщения в обменник, не зная конечных очередей. Потребитель (consumer) подписывается на очередь через метод basic.consume и получает сообщения асинхронно через обратный вызов.
Альтернативно используется синхронный метод basic.get для единичного извлечения.
Подтверждения и надёжность
- Publisher confirms — механизм подтверждения доставки сообщения в очередь. Брокер отправляет
basic.ackпосле сохранения сообщения на диск (для постоянных очередей) или в память. - Consumer acknowledgements — потребитель подтверждает обработку через
basic.ack. При отказе обработки отправляетсяbasic.nackилиbasic.rejectс флагомrequeueдля возврата в очередь. - QoS prefetch — ограничение количества неподтверждённых сообщений на канал через
basic.qos(prefetch_count=N). Предотвращает перегрузку потребителя.
Возврат сообщений и обработка ошибок
Сообщение возвращается продюсеру через basic.return, если:
- Обменник не имеет подходящих привязок (режим
mandatory=true) - Очередь переполнена (лимит длины или дискового пространства)
- Превышено время жизни сообщения (TTL)
Кластеризация и отказоустойчивость
Архитектура кластера
Кластер объединяет несколько узлов (nodes) под единым пространством имён очередей и обменников. Все узлы имеют общее состояние метаданных через распределённую базу Mnesia.
Очереди физически размещаются на одном узле (дискретное размещение), но доступны для публикации/подписки со всех узлов кластера.
Репликация очередей
- Mirrored queues (устаревший механизм) — репликация очередей через политики (policies) с указанием узлов-реплик. Управление лидером выполняется автоматически.
- Quorum queues (рекомендуемый механизм) — реализация на основе алгоритма Raft. Обеспечивает строгую упорядоченность, отказоустойчивость и защиту от потери данных при разделении сети (split-brain protection). Требует нечётного числа реплик (3, 5, 7).
Пример политики для quorum-очередей:
{
"queue-master-locator": "client-local",
"dead-letter-exchange": "dlx.events"
}
Виртуальные хосты
Виртуальный хост (vhost) — изолированное пространство имён внутри кластера, содержащее собственные очереди, обменники и пользователей. Используется для разделения окружений (например, prod, staging, dev) или клиентов в мультитенантных системах.
Внутреннее устройство и соединения
Каналы и соединения
Соединение (connection) — TCP-соединение между клиентом и брокером. Канал (channel) — лёгковесный виртуальный канал поверх соединения, инкапсулирующий отдельный диалог. Один канал гарантирует упорядоченность операций; несколько каналов в одном соединении позволяют параллельную работу без создания множества TCP-соединений.
Хранение сообщений
Сообщения хранятся в файлах журналов на диске с индексами для быстрого доступа. Для постоянных очередей (durable=true) и сообщений (delivery_mode=2) данные сбрасываются на диск перед отправкой подтверждения. Временные очереди и сообщения хранятся только в памяти. Поддерживается сжатие журналов и фоновая дефрагментация.
Авторизация и управление доступом
Модель прав доступа
Доступ контролируется на уровне виртуального хоста через права:
configure— управление очередями/обменниками (объявление, удаление)write— публикация в обменникиread— потребление из очередей
Права назначаются пользователю для конкретного vhost:
rabbitmqctl set_permissions -p /prod user1 ".*" ".*" ".*"
Аутентификация
Поддерживаются механизмы:
- Встроенная аутентификация (логин/пароль в базе)
- Внешняя аутентификация через плагины (LDAP, OAuth 2.0, JWT)
- Аутентификация по сертификатам (TLS client certificate)
Программное управление через Management API
HTTP API предоставляет операции администрирования:
Управление очередями и обменниками
# Python: создание очереди через HTTP API
import requests
requests.put(
"http://admin:pass@localhost:15672/api/queues/%2F/orders",
json={
"durable": True,
"arguments": {
"x-queue-type": "quorum",
"x-dead-letter-exchange": "dlx.orders"
}
}
)
Мониторинг состояния
// C#: получение метрик очереди
var response = await httpClient.GetAsync(
"http://localhost:15672/api/queues/%2F/orders");
var metrics = await response.Content.ReadFromJsonAsync<QueueMetrics>();
// metrics.messages_ready, metrics.consumers, metrics.memory
Управление политиками
// Java: применение политики для dead-lettering
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.main");
args.put("x-message-ttl", 60000);
channel.queueDeclare("tasks", true, false, false, args);
Утилиты командной строки
Основные операции через rabbitmqctl
# Список узлов кластера
rabbitmqctl cluster_status
# Создание пользователя и прав
rabbitmqctl add_user app-user securepass
rabbitmqctl set_permissions -p /prod app-user ".*" ".*" ".*"
# Применение политики quorum-очередей
rabbitmqctl set_policy quorum-policy \
"^q\." '{"queue-mode":"quorum"}' --apply-to queues
# Просмотр сообщений в очереди (требует плагина shovel)
rabbitmqctl list_queues name messages_ready consumers
Управление через rabbitmqadmin (Python CLI)
# Экспорт конфигурации
rabbitmqadmin export config.json
# Импорт конфигурации
rabbitmqadmin import config.json
# Публикация тестового сообщения
rabbitmqadmin publish exchange=amq.default \
routing_key=debug payload="test message"
Настройка клиентских API
Producer pattern
Ключевые параметры подключения:
virtual_host— изолированное пространство имёнheartbeat— интервал проверки активности соединения (рекомендуется 60 сек)connection_timeout— таймаут установки соединенияpublisher_confirms— включение механизма подтверждений
Пример публикации с подтверждением (C#):
var channel = connection.CreateModel();
channel.ConfirmSelect();
channel.BasicPublish(
exchange: "orders",
routingKey: "order.created",
basicProperties: props,
body: body
);
channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));
Consumer pattern
Рекомендуемая конфигурация:
prefetch_count=1..10— баланс между пропускной способностью и справедливостьюauto_ack=false— ручное подтверждение после успешной обработкиconsumer_tag— уникальный идентификатор потребителя для управления
Пример обработки с подтверждением (Python):
def callback(ch, method, properties, body):
try:
process(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception:
ch.basic_nack(
delivery_tag=method.delivery_tag,
requeue=False # отправка в DLX при настройке
)
channel.basic_qos(prefetch_count=3)
channel.basic_consume(
queue='tasks',
on_message_callback=callback,
auto_ack=False
)
channel.start_consuming()
Dead Letter Exchange (DLX)
Механизм обработки проваленных сообщений:
- Очередь объявляется с аргументами
x-dead-letter-exchangeиx-dead-letter-routing-key - При отклонении (
nack/rejectсrequeue=false) или истечении TTL сообщение перенаправляется в DLX - DLX маршрутизирует сообщение в очередь мониторинга или повторных попыток
Установка и настройка
Как настроить RabbitMQ?
Erlang
Установка Erlang на сервер — это зависимость для RabbitMQ (он работает поверх Erlang). Пример:
sudo apt update
sudo apt install erlang
Установка RabbitMQ.
Сначала нужно добавить официальный репозиторий RabbitMQ:
sudo apt-get install curl gnupg
curl -fsSL https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc | sudo gpg --dearmor > /usr/share/keyrings/rabbitmq-archive-keyring.gpg
echo "deb [signed-by=/usr/share/keyrings/rabbitmq-archive-keyring.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/deb/ubuntu $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list
sudo apt update
А затем установить RabbitMQ:
sudo apt install rabbitmq-server
Запуск RabbitMQ
Для этого нужно запустить службу RabbitMQ:
sudo systemctl start rabbitmq-server
Желательно также включить автозапуск RabbitMQ при загрузке системы:
sudo systemctl enable rabbitmq-server
Чтобы убедиться в корректности запуска, можно проверить статус:
sudo systemctl status rabbitmq-server
Виртуальный хост
Создание виртуального хоста для изоляции проектов:
sudo rabbitmqctl add_vhost my_vhost
Настройка прав
Настройка прав доступа к виртуальному хосту.
Создание пользователя:
sudo rabbitmqctl add_user my_user my_password
Назначение прав на виртуальный хост:
sudo rabbitmqctl set_permissions -p my_vhost my_user ".*" ".*" ".*"
Включение плагина управления
Активировать веб-интерфейс:
sudo rabbitmq-plugins enable rabbitmq_management
Веб-интерфейс будет доступен по адресу http://localhost:15672
Подключение к RabbitMQ
В разных языках программирования используются соответствующие клиентские библиотеки. Нужно добавить к проекту программы библиотеку, а затем:
- создать соединение с RabbitMQ, указав хост, порт и учётные данные;
- создать канал (логическое соединение внутри физического соединения - все операции выполняются через канал);
- создать обменник с указанием типа;
- создать очередь с уникальным именем;
- связать обменник с очередью, указав ключ маршрутизации.
C# - библиотека RabbitMQ.Client.
Пример использования:
using RabbitMQ.Client;
using System.Text;
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
// Создание очереди
channel.QueueDeclare(queue: "my_queue", durable: false, exclusive: false, autoDelete: false, arguments: null);
// Отправка сообщения
string message = "Hello, RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: "my_queue", basicProperties: null, body: body);
Console.WriteLine("Message sent");
}
Java - библиотека amqp-client.
Пример использования:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class RabbitMQExample {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// Создание очереди
channel.queueDeclare("my_queue", false, false, false, null);
// Отправка сообщения
String message = "Hello, RabbitMQ!";
channel.basicPublish("", "my_queue", null, message.getBytes());
System.out.println("Message sent");
}
}
}
Python - библиотека pika.
Пример использования:
import pika
# Подключение к RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Создание очереди
channel.queue_declare(queue='my_queue')
# Отправка сообщения
channel.basic_publish(exchange='', routing_key='my_queue', body='Hello, RabbitMQ!')
print("Message sent")
connection.close()
JavaScript (Node.js) - библиотека amqplib.
Пример использования:
const amqp = require('amqplib');
async function sendMessage() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
// Создание очереди
const queue = 'my_queue';
await channel.assertQueue(queue, { durable: false });
// Отправка сообщения
const message = 'Hello, RabbitMQ!';
channel.sendToQueue(queue, Buffer.from(message));
console.log("Message sent");
setTimeout(() => {
connection.close();
}, 500);
}
sendMessage();
PHP - библиотека php-amqplib.
Пример использования:
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// Создание очереди
$channel->queue_declare('my_queue', false, false, false, false);
// Отправка сообщения
$msg = new AMQPMessage('Hello, RabbitMQ!');
$channel->basic_publish($msg, '', 'my_queue');
echo "Message sent\n";
$channel->close();
$connection->close();
Отправка и получение сообщений.
Отправка сообщения (Producer) выполняется через обменник.
Получение сообщения (Consumer) выполняется из очереди. Нужно подписаться на очередь и получить сообщение.
Архитектура RabbitMQ основана на гибкой модели маршрутизации через обменники и привязки, обеспечивающей декуплирование продюсеров и потребителей. Отказоустойчивость достигается через кластеризацию с репликацией очередей (quorum queues), а надёжность доставки — через подтверждения публикации и потребления. Эффективное использование требует правильной настройки QoS, управления временем жизни сообщений и применения паттернов обработки ошибок (DLX). Для продакшн-сред рекомендуется использовать quorum queues вместо mirrored, включать publisher confirms, настраивать мониторинг метрик (длина очередей, скорость обработки) и применять политики для автоматического управления параметрами очередей.
Мониторинг
В веб-интерфейсе можно анализировать список очередей и статистику, а в разделе Exchanges можно увидеть обменники и их привязки.
Масштабирование
Для масштабирования можно настроить кластер RabbitMQ. Но важно убедиться, что все узлы кластера имеют одинаковую версию RabbitMQ и Erlang.