Перейти к основному содержимому

RabbitMQ

Разработчику Архитектору Инженеру

Справочник: Справочник по RabbitMQ.

Основы: RabbitMQ — работа с очередями сообщений.


RabbitMQ

Что такое RabbitMQ?

RabbitMQ — это популярный брокер сообщений (Message Broker), который реализует протокол AMQP. Он используется для асинхронного обмена данными между приложениями через очереди. RabbitMQ обеспечивает надёжную доставку сообщений, масштабируемость и гибкость.

Play ITЗагрузка интерактивного демо…

Официальный сайт - https://www.rabbitmq.com/

Сообщения хранятся в очередях до тех пор, пока не будут обработаны. Производитель отправляет сообщение в очередь, а потребитель забирает его позже.

image-11.png


Производитель-потребитель

RabbitMQ использует модель "производитель-потребитель" с промежуточным хранилищем — очередью:

  • Producer (Производитель) отправляет сообщения в RabbitMQ, может быть любым приложением или системой;
  • Exchange (Обменник) принимает сообщения от производителя и определяет, в какую очередь поместить сообщение, исходя из правил маршрутизации;
  • Queue (Очередь) хранит сообщения до тех пор, пока они не будут обработаны потребителем.
  • Consumer (Потребитель) забирает сообщения из очереди и обрабатывает их согласно логике приложения.

RabbitMQ поддерживает несколько типов обменников (exchanges), которые определяют правила маршрутизации сообщений:

  • Direct - сообщение отправляется в очередь, которая соответствует ключу маршрутизации;
  • Fanout - сообщение рассылается во все очереди, связанные с этим обменником;
  • Topic - сообщение отправляется в очереди, соответствующие шаблону ключа маршрутизации;
  • Headers - маршрутизация основана на заголовках сообщения, а не на ключе.

Virtual Host — это изолированное пространство внутри RabbitMQ, которое позволяет разделять ресурсы (очереди, обменники, пользователи) между различными проектами или командами. Каждый Virtual Host имеет свои собственные очереди, обменники и разрешения.

Создать Virtual Host можно через консоль или веб-интерфейс. RabbitMQ предоставляет встроенный веб-интерфейс для мониторинга и управления брокером. Этот интерфейс называется RabbitMQ Management, который представляет собой плагин rabbitmq_management.

RabbitMQ Management по умолчанию доступен по адресу http://localhost:15672 под учётными данными guest/guest (логин/пароль). Интерфейс позволяет просматривать статистику (количество сообщений, скорость обработки, использование памяти), состояние очередей, обменников и соединений, создавать и удалять очереди, обменники.


Архитектура RabbitMQ

Базовые концепции модели обмена

Очереди, обменники и привязки

Очередь (queue) — буфер хранения сообщений с семантикой FIFO. Очередь принадлежит одному узлу кластера и может быть объявлена как постоянная (durable) для сохранения после перезапуска брокера.

Обменник (exchange) — точка маршрутизации, принимающая сообщения от продюсеров и распределяющая их по очередям согласно правилам привязки (bindings). Привязка определяет критерий маршрутизации: ключ маршрутизации (routing key) для direct/topic обменников или шаблон для headers.

Сообщение содержит:

  • Тело (payload) — произвольные байты
  • Свойства (properties)content_type, message_id, correlation_id, reply_to, expiration, priority
  • Флаг delivery_mode: 1 (volatile) или 2 (persistent) для надёжного хранения

Типы обменников и стратегии маршрутизации

  • Direct — точное совпадение ключа маршрутизации с привязкой. Используется для адресной доставки.
  • Fanout — рассылка во все привязанные очереди без учёта ключа. Реализует паттерн публикации-подписки.
  • Topic — шаблонное совпадение ключа (* — один сегмент, # — ноль или более сегментов). Пример: logs.error.# соответствует logs.error.api.
  • Headers — маршрутизация по совпадению заголовков сообщения (headers) с параметрами привязки. Поддерживает режимы any (логическое ИЛИ) и all (логическое И).
  • Default (амплуа) — скрытый direct-обменник с именем "", автоматически привязанный ко всем очередям по имени очереди как ключу.

Модель доставки и управление потоком

Продюсеры и потребители

Продюсер публикует сообщения в обменник, не зная конечных очередей. Потребитель (consumer) подписывается на очередь через метод basic.consume и получает сообщения асинхронно через обратный вызов.

Альтернативно используется синхронный метод basic.get для единичного извлечения.


Подтверждения и надёжность

  • Publisher confirms — механизм подтверждения доставки сообщения в очередь. Брокер отправляет basic.ack после сохранения сообщения на диск (для постоянных очередей) или в память.
  • Consumer acknowledgements — потребитель подтверждает обработку через basic.ack. При отказе обработки отправляется basic.nack или basic.reject с флагом requeue для возврата в очередь.
  • QoS prefetch — ограничение количества неподтверждённых сообщений на канал через basic.qos(prefetch_count=N). Предотвращает перегрузку потребителя.

Возврат сообщений и обработка ошибок

Сообщение возвращается продюсеру через basic.return, если:

  • Обменник не имеет подходящих привязок (режим mandatory=true)
  • Очередь переполнена (лимит длины или дискового пространства)
  • Превышено время жизни сообщения (TTL)

Кластеризация и отказоустойчивость

Архитектура кластера

Кластер объединяет несколько узлов (nodes) под единым пространством имён очередей и обменников. Все узлы имеют общее состояние метаданных через распределённую базу Mnesia.

Очереди физически размещаются на одном узле (дискретное размещение), но доступны для публикации/подписки со всех узлов кластера.


Репликация очередей

  • Mirrored queues (устаревший механизм) — репликация очередей через политики (policies) с указанием узлов-реплик. Управление лидером выполняется автоматически.
  • Quorum queues (рекомендуемый механизм) — реализация на основе алгоритма Raft. Обеспечивает строгую упорядоченность, отказоустойчивость и защиту от потери данных при разделении сети (split-brain protection). Требует нечётного числа реплик (3, 5, 7).

Пример политики для quorum-очередей:

{
"queue-master-locator": "client-local",
"dead-letter-exchange": "dlx.events"
}

Разбор:

  • JSON показывает формат тела запроса или сообщения в очереди: имена полей должны совпадать с контрактом API или схемой consumer.

Виртуальные хосты

Виртуальный хост (vhost) — изолированное пространство имён внутри кластера, содержащее собственные очереди, обменники и пользователей. Используется для разделения окружений (например, prod, staging, dev) или клиентов в мультитенантных системах.


Внутреннее устройство и соединения

Каналы и соединения

Соединение (connection) — TCP-соединение между клиентом и брокером. Канал (channel) — лёгковесный виртуальный канал поверх соединения, инкапсулирующий отдельный диалог. Один канал гарантирует упорядоченность операций; несколько каналов в одном соединении позволяют параллельную работу без создания множества TCP-соединений.


Хранение сообщений

Сообщения хранятся в файлах журналов на диске с индексами для быстрого доступа. Для постоянных очередей (durable=true) и сообщений (delivery_mode=2) данные сбрасываются на диск перед отправкой подтверждения. Временные очереди и сообщения хранятся только в памяти. Поддерживается сжатие журналов и фоновая дефрагментация.


Авторизация и управление доступом

Модель прав доступа

Доступ контролируется на уровне виртуального хоста через права:

  • configure — управление очередями/обменниками (объявление, удаление)
  • write — публикация в обменники
  • read — потребление из очередей

Права назначаются пользователю для конкретного vhost:

rabbitmqctl set_permissions -p /prod user1 ".*" ".*" ".*"

Разбор:

  • Утилиты rabbitmqctl / rabbitmq-diagnostics управляют политиками, очередями и проверяют живость узла брокера.

Аутентификация

Поддерживаются механизмы:

  • Встроенная аутентификация (логин/пароль в базе)
  • Внешняя аутентификация через плагины (LDAP, OAuth 2.0, JWT)
  • Аутентификация по сертификатам (TLS client certificate)

Программное управление через Management API

HTTP API предоставляет операции администрирования:


Управление очередями и обменниками

Код ITЗагрузка примера кода…

Разбор:

  • Импорты подключают модули проекта; выполнение идёт сверху вниз — сначала конфигурация, затем объявление сущностей или маршрутов.

Мониторинг состояния

// C#: получение метрик очереди
var response = await httpClient.GetAsync(
"http://localhost:15672/api/queues/%2F/orders");
var metrics = await response.Content.ReadFromJsonAsync<QueueMetrics>();
// metrics.messages_ready, metrics.consumers, metrics.memory

Разбор:

  • Пространства имён группируют модели и сервисы; async/await не блокируют поток при HTTP-вызове наружу.

Управление политиками

// Java: применение политики для dead-lettering
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.main");
args.put("x-message-ttl", 60000);
channel.queueDeclare("Задачи", true, false, false, args);

Разбор:

  • Пакеты model, repository, service разделяют слои — сущность, доступ к данным, бизнес-логика обработки заказов.

Утилиты командной строки

Основные операции через rabbitmqctl

# Список узлов кластера
rabbitmqctl cluster_status

# Создание пользователя и прав
rabbitmqctl add_user app-user securepass
rabbitmqctl set_permissions -p /prod app-user ".*" ".*" ".*"

# Применение политики quorum-очередей
rabbitmqctl set_policy quorum-policy \
"^q\." '{"queue-mode":"quorum"}' --apply-to queues

# Просмотр сообщений в очереди (требует плагина shovel)
rabbitmqctl list_queues name messages_ready consumers

Разбор:

  • Утилиты rabbitmqctl / rabbitmq-diagnostics управляют политиками, очередями и проверяют живость узла брокера.

Управление через rabbitmqadmin (Python CLI)

# Экспорт конфигурации
rabbitmqadmin export config.json

# Импорт конфигурации
rabbitmqadmin import config.json

# Публикация тестового сообщения
rabbitmqadmin publish exchange=amq.default \
routing_key=debug payload="test message"

Разбор:

  • Команды выполняют из каталога сервиса или из корня с docker-compose.yml; ненулевой код выхода останавливает сценарий до исправления ошибки.

Настройка клиентских API

Producer pattern

Ключевые параметры подключения:

  • virtual_host — изолированное пространство имён
  • heartbeat — интервал проверки активности соединения (рекомендуется 60 сек)
  • connection_timeout — таймаут установки соединения
  • publisher_confirms — включение механизма подтверждений

Пример публикации с подтверждением (C#):

var channel = connection.CreateModel();
channel.ConfirmSelect();
channel.BasicPublish(
exchange: "orders",
routingKey: "order.created",
basicProperties: props,
body: body
);
channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));

Разбор:

  • Пространства имён группируют модели и сервисы; async/await не блокируют поток при HTTP-вызове наружу.

Consumer pattern

Рекомендуемая конфигурация:

  • prefetch_count=1..10 — баланс между пропускной способностью и справедливостью
  • auto_ack=false — ручное подтверждение после успешной обработки
  • consumer_tag — уникальный идентификатор потребителя для управления

Пример обработки с подтверждением (Python):

Код ITЗагрузка примера кода…

Разбор:

  • Импорты подключают модули проекта; выполнение идёт сверху вниз — сначала конфигурация, затем объявление сущностей или маршрутов.

Dead Letter Exchange (DLX)

Механизм обработки проваленных сообщений:

  1. Очередь объявляется с аргументами x-dead-letter-exchange и x-dead-letter-routing-key
  2. При отклонении (nack/reject с requeue=false) или истечении TTL сообщение перенаправляется в DLX
  3. DLX маршрутизирует сообщение в очередь мониторинга или повторных попыток

Установка и настройка

Как настроить RabbitMQ?

Erlang

Установка Erlang на сервер — это зависимость для RabbitMQ (он работает поверх Erlang). Пример:

sudo apt update
sudo apt install erlang

Разбор:

  • Фрагмент иллюстрирует контракт, конфигурацию или протокол обмена — сверяйте каждую строку с текстом раздела выше.

Установка RabbitMQ.

Сначала нужно добавить официальный репозиторий RabbitMQ:

sudo apt-get install curl gnupg
curl -fsSL https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc | sudo gpg --dearmor > /usr/share/keyrings/rabbitmq-archive-keyring.gpg
echo "deb [signed-by=/usr/share/keyrings/rabbitmq-archive-keyring.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/deb/ubuntu $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list
sudo apt update

Разбор:

  • Фрагмент иллюстрирует контракт, конфигурацию или протокол обмена — сверяйте каждую строку с текстом раздела выше.

А затем установить RabbitMQ:

sudo apt install rabbitmq-server

Разбор:

  • Фрагмент иллюстрирует контракт, конфигурацию или протокол обмена — сверяйте каждую строку с текстом раздела выше.

Запуск RabbitMQ

Для этого нужно запустить службу RabbitMQ:

sudo systemctl start rabbitmq-server

Разбор:

  • Фрагмент иллюстрирует контракт, конфигурацию или протокол обмена — сверяйте каждую строку с текстом раздела выше.

Желательно также включить автозапуск RabbitMQ при загрузке системы:

sudo systemctl enable rabbitmq-server

Разбор:

  • Фрагмент иллюстрирует контракт, конфигурацию или протокол обмена — сверяйте каждую строку с текстом раздела выше.

Чтобы убедиться в корректности запуска, можно проверить статус:

sudo systemctl status rabbitmq-server

Разбор:

  • Фрагмент иллюстрирует контракт, конфигурацию или протокол обмена — сверяйте каждую строку с текстом раздела выше.

Виртуальный хост

Создание виртуального хоста для изоляции проектов:

sudo rabbitmqctl add_vhost my_vhost

Разбор:

  • Фрагмент иллюстрирует контракт, конфигурацию или протокол обмена — сверяйте каждую строку с текстом раздела выше.

Настройка прав

Настройка прав доступа к виртуальному хосту.

Создание пользователя:

sudo rabbitmqctl add_user my_user my_password

Разбор:

  • Фрагмент иллюстрирует контракт, конфигурацию или протокол обмена — сверяйте каждую строку с текстом раздела выше.

Назначение прав на виртуальный хост:

sudo rabbitmqctl set_permissions -p my_vhost my_user ".*" ".*" ".*"

Разбор:

  • Фрагмент иллюстрирует контракт, конфигурацию или протокол обмена — сверяйте каждую строку с текстом раздела выше.

Включение плагина управления

Активировать веб-интерфейс:

sudo rabbitmq-plugins enable rabbitmq_management

Разбор:

  • Фрагмент иллюстрирует контракт, конфигурацию или протокол обмена — сверяйте каждую строку с текстом раздела выше.

Веб-интерфейс будет доступен по адресу http://localhost:15672


Подключение к RabbitMQ

В разных языках программирования используются соответствующие клиентские библиотеки. Нужно добавить к проекту программы библиотеку, а затем:

  • создать соединение с RabbitMQ, указав хост, порт и учётные данные;
  • создать канал (логическое соединение внутри физического соединения - все операции выполняются через канал);
  • создать обменник с указанием типа;
  • создать очередь с уникальным именем;
  • связать обменник с очередью, указав ключ маршрутизации.

C# - библиотека RabbitMQ.Client.

Пример использования:

Код ITЗагрузка примера кода…

Разбор:

  • IHostedService стартует фоновый consumer при запуске приложения; ConnectionFactory задаёт хост rabbitmq и учётные данные AMQP.
  • QueueDeclare(..., durable: true) создаёт устойчивую очередь; autoAck: false означает, что BasicAck отправляют только после успешной обработки.
  • Обработчик Received читает тело, десериализует OrderEvent и вызывает SendNotificationAsync — HTTP POST на callback с логированием результата.

Java - библиотека amqp-client.

Пример использования:

Код ITЗагрузка примера кода…

Разбор:

  • Пакеты model, repository, service разделяют слои — сущность, доступ к данным, бизнес-логика обработки заказов.

Python - библиотека pika.

Пример использования:


import pika

# Подключение к RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Создание очереди
channel.queue_declare(queue='my_queue')
# Отправка сообщения
channel.basic_publish(exchange='', routing_key='my_queue', body='Hello, RabbitMQ!')
print("Message sent")
connection.close()

Разбор:

  • Импорты подключают модули проекта; выполнение идёт сверху вниз — сначала конфигурация, затем объявление сущностей или маршрутов.

JavaScript (Node.js) - библиотека amqplib.

Пример использования:

Код ITЗагрузка примера кода…

Разбор:

  • Прочитайте фрагмент построчно: каждая директива или вызов меняет поведение сервиса, брокера или балансировщика в цепочке микросервисов.
  • После правки перезапустите соответствующий процесс или контейнер и проверьте логи, очередь RabbitMQ или ответ HTTP.

PHP - библиотека php-amqplib.

Пример использования:

Код ITЗагрузка примера кода…

Разбор:

  • Прочитайте фрагмент построчно: каждая директива или вызов меняет поведение сервиса, брокера или балансировщика в цепочке микросервисов.
  • После правки перезапустите соответствующий процесс или контейнер и проверьте логи, очередь RabbitMQ или ответ HTTP.

Отправка и получение сообщений.

Отправка сообщения (Producer) выполняется через обменник.

Получение сообщения (Consumer) выполняется из очереди. Нужно подписаться на очередь и получить сообщение.

Архитектура RabbitMQ основана на гибкой модели маршрутизации через обменники и привязки, обеспечивающей декуплирование продюсеров и потребителей. Отказоустойчивость достигается через кластеризацию с репликацией очередей (quorum queues), а надёжность доставки — через подтверждения публикации и потребления. Эффективное использование требует правильной настройки QoS, управления временем жизни сообщений и применения паттернов обработки ошибок (DLX). Для продакшн-сред рекомендуется использовать quorum queues вместо mirrored, включать publisher confirms, настраивать мониторинг метрик (длина очередей, скорость обработки) и применять политики для автоматического управления параметрами очередей.


Мониторинг

В веб-интерфейсе можно анализировать список очередей и статистику, а в разделе Exchanges можно увидеть обменники и их привязки.


Масштабирование

Для масштабирования можно настроить кластер RabbitMQ. Но важно убедиться, что все узлы кластера имеют одинаковую версию RabbitMQ и Erlang.


См. также

Базовая глава в разделе "Система и сеть" — RabbitMQ — работа с очередями сообщений.


Как начать с RabbitMQ без лишней сложности

Для первого рабочего контура достаточно базовой схемы:

  1. Одна бизнес-очередь для задач.
  2. Один exchange (direct или topic) с понятным routing key.
  3. Ручные ack у потребителя.
  4. DLX/DLQ для неуспешных сообщений.
  5. prefetch под вашу скорость обработки.

Это покрывает большинство практических сценариев и помогает избежать переусложнения на старте.


Типичные ошибки в RabbitMQ-проектах

  • auto_ack=true в критичной обработке;
  • отсутствие ограничений retry и бесконечные requeue-циклы;
  • смешивание разных типов сообщений в одной очереди без явного контракта;
  • мониторинг только "сообщений в очереди" без анализа времени обработки и ошибок потребителей.

Основа по протоколу

Базовый разбор HTTP и HTTPS находится в отдельной статье — HTTP как основа веб-интеграций.