Перейти к основному содержимому

Kafka

Разработчику Архитектору Инженеру

Справочник: Справочник по Apache Kafka.

Основы: Apache Kafka — потоковая обработка данных.


Kafka

Углублённая статья для разработчиков, архитекторов и инженеров. Определения компонентов, первый запуск и сценарии применения — в базовой главе Apache Kafka — потоковая обработка данных. CLI и таблицы параметров — в Справочнике по Apache Kafka.

Play ITЗагрузка интерактивного демо…

Ниже — архитектура кластера, гарантии доставки, интеграция, эксплуатация и безопасность.


Архитектура Kafka

Базовые концепции распределённой очереди сообщений

Топики и партиции

Топик (topic) — логический канал для хранения потока сообщений с общим назначением. Физически топик разделён на упорядоченные партиции (partitions), каждая из которых представляет собой неизменяемый упорядоченный журнал записей. Партиции обеспечивают горизонтальное масштабирование — одна партиция обрабатывается одним брокером, но топик может содержать множество партиций, распределённых по кластеру. Внутри партиции каждое сообщение имеет уникальный смещение (offset), определяющее его позицию в журнале.


Продюсеры и потребители

Продюсер (producer) — клиентское приложение, публикующее сообщения в топик. При отправке продюсер определяет целевую партицию: явно по ключу сообщения (хеширование ключа гарантирует упорядоченность для одинаковых ключей) или через стратегию балансировки (например, round-robin). Потребитель (consumer) — приложение, считывающее сообщения из партиций. Потребитель отслеживает своё текущее смещение для каждой партиции, что позволяет возобновлять чтение с последней прочитанной позиции после перезапуска.


Брокеры и кластеры

Брокер (broker) — отдельный узел Kafka, отвечающий за приём, хранение и доставку сообщений. Каждый брокер имеет уникальный идентификатор и управляет набором партиций (как лидеров, так и реплик). Кластер (cluster) — совокупность брокеров, работающих совместно под единым логическим пространством имён. Кластер обеспечивает отказоустойчивость через репликацию партиций и автоматическое перераспределение ролей при сбоях узлов.


Группы потребителей и балансировка нагрузки

Группа потребителей (consumer group) — логическая группа потребителей, совместно обрабатывающая сообщения из топика. Каждая партиция топика назначается ровно одному активному потребителю в группе. Это обеспечивает параллельную обработку: если топик содержит 12 партиций, максимум 12 потребителей в группе могут обрабатывать данные одновременно.

Балансировка нагрузки выполняется протоколом перебалансировки (rebalance protocol):

  1. Потребители регистрируются в группе через координатора группы (group coordinator) — специальный брокер, ответственный за управление состоянием группы.
  2. При изменении состава группы (добавление/удаление потребителя) запускается фаза перебалансировки.
  3. Выбирается лидер группы, который вычисляет план распределения партиций (partition assignment) с использованием стратегии (Range, RoundRobin, CooperativeSticky).
  4. План распространяется всем участникам; потребители останавливают обработку, применяют новое распределение и возобновляют чтение.

Стратегия CooperativeSticky (рекомендуемая в современных версиях) минимизирует перемещение партиций между потребителями при инкрементальных изменениях состава группы.

Static membership (group.instance.id) — consumer сохраняет "личность" при кратковременном рестарте. Кластер не запускает полный rebalance, если consumer вернулся в пределах session.timeout.ms. Это снижает "шторм" rebalance при деплоях.

Rebalance listener — callback ConsumerRebalanceListener вызывается до и после перераспределения партиций. Типичный паттерн: commit offset в onPartitionsRevoked, восстановление локального состояния в onPartitionsAssigned.


Внутреннее устройство кластера

Контроллер кластера

Контроллер (controller) — единственный брокер в кластере, ответственный за координацию метаданных:

  • Назначение лидеров партиций при старте кластера или сбое узла
  • Обработка запросов на создание/удаление топиков
  • Управление ISR (In-Sync Replicas) — множеством реплик, синхронизированных с лидером

В режиме KRaft (Kafka Raft Metadata mode, с версии 3.3; единственный режим с Kafka 4.0) контроллер использует встроенный Raft-консенсус без ZooKeeper.


Механизм репликации

Каждая партиция имеет один лидер и набор реплик (обычно 2–3). Продюсеры пишут только в лидера; реплики асинхронно синхронизируются с лидером. Реплики, отставание которых не превышает порог replica.lag.time.max.ms, входят в множество ISR. Только реплики из ISR могут быть избраны новым лидером при сбое. Фактор репликации (replication factor) определяет общее количество копий данных для отказоустойчивости.


Физическое хранение данных

Данные партиции хранятся на диске в виде сегментированных файлов журнала:

  • Основной файл .log содержит последовательность сообщений фиксированного размера (по умолчанию 1 ГБ)
  • Файл индекса .index отображает смещение в позицию в файле для быстрого поиска
  • Файл временного индекса .timeindex позволяет искать сообщения по метке времени
  • Файл .txnindex используется для поддержки транзакционных операций

Сообщения могут подвергаться сжатию на уровне партиции:

  • none — без сжатия
  • gzip, snappy, lz4, zstd — алгоритмы сжатия с разным балансом скорости/степени
  • compact — режим очистки (compaction), сохраняющий только последнее значение для каждого ключа (актуально для топиков-хранилищ состояний)

Сегменты неизменяемы после закрытия; новые сообщения пишутся в активный сегмент. Устаревшие сегменты удаляются по политике хранения (время или объём).


Гарантии надёжности Kafka

Kafka описывает поведение системы через явные гарантии — их можно сопоставить с ACID у реляционных СУБД, но семантика другая.

Гарантии платформы:

  1. Упорядоченность в партиции — если producer записал B после A в одну партицию, offset B больше offset A, и consumer прочитает B после A.
  2. Committed — producer считает запись успешной после записи во все in-sync реплики (при acks=all) или по выбранному уровню ack.
  3. Durability — committed-сообщение не теряется, пока жива хотя бы одна реплика партиции.
  4. Visibility — consumer по умолчанию читает только committed-записи; транзакционные "незавершённые" скрыты при isolation.level=read_committed.

Настройки брокера для надёжности:

ПараметрРекомендацияЗачем
replication.factor≥ 3 в prodОтказ одного-двух брокеров
min.insync.replicas2 при RF=3acks=all не подтвердит запись, если ISR < min
unclean.leader.election.enablefalseЗапрет выбора несинхронизированной реплики лидером (риск потери данных)
log.flush.interval.messagesпо умолчаниюKafka полагается на OS page cache; явный fsync на каждое сообщение редко нужен

Надёжный producer: acks=all, retries > 0, enable.idempotence=true (включает max.in.flight.requests.per.connection ≤ 5 и правильные retry).

Надёжный consumer: enable.auto.commit=false, commit offset после успешной обработки batch, идемпотентная бизнес-логика или upsert по ключу.


Семантика exactly-once

At-least-once допускает дубликаты при retry producer или повторном commit consumer. Для агрегаций и stream processing дубликат искажает результат — нужна семантика exactly-once. Общая модель слоёв и effectively exactly-onceИдемпотентность и семантика доставки.

Kafka реализует её двумя механизмами.

Идемпотентный producer (enable.idempotence=true):

  • Broker присваивает producer ID (PID) и sequence number каждому сообщению в партиции.
  • При retry broker отбрасывает дубликат с тем же sequence.
  • Работает в пределах одной сессии producer; при transactional.id PID сохраняется дольше.
  • Не защищает от дубликатов, если приложение отправляет "логически одно и то же" событие дважды с разными ключами.

Транзакционный producer (transactional.id):

  • Группирует несколько записей (и commit consumer offset) в одну атомарную транзакцию.
  • Consumer с isolation.level=read_committed видит сообщения только после commit транзакции.
  • Типичный сценарий: consume → transform → produce в другой топик + commit offset в одной транзакции (Kafka Streams делает это автоматически).
Ограничения exactly-once

Транзакции Kafka гарантируют атомарность внутри кластера Kafka. Запись в PostgreSQL и Kafka в одной distributed transaction требует паттерна outbox или двухфазного подхода. Внешние side-effect (email, HTTP) всё равно нуждаются в идемпотентности на стороне получателя.


Kafka Connect

Kafka Connect — фреймворк для массовой интеграции без написания producer/consumer вручную. Коннекторы бывают source (внешняя система → Kafka) и sink (Kafka → внешняя система).

Когда Connect, когда свой клиент:

ConnectСвой producer/consumer
Стандартный CDC (Debezium), репликация в DWHСложная бизнес-логика, фильтрация, enrichment
Много однотипных коннекторов (N таблиц → N топиков)Низкая latency, тонкий контроль retry/DLQ
Единая операционная модель (REST API Connect)Язык/стек без готового коннектора

Пример топологии: MySQL (binlog) → Debezium source → топик orders → Elasticsearch sink для полнотекстового поиска.

Connect поддерживает Single Message Transforms (SMT) — лёгкие преобразования (маскирование поля, переименование) без отдельного stream job.

Принципы data pipeline (Kafka как буфер между системами):

  • Своевременность — producer пишет в real-time, consumer может читать потоково или пакетами (раз в час).
  • Надёжность — at-least-once на уровне Kafka; exactly-once сквозь pipeline — через idempotent sink, upsert или Connect offset API.
  • Формат данных — договоритесь о схеме заранее (Avro + Schema Registry), иначе каждая пара producer/consumer "сшита" своим JSON.

Схемы событий и Schema Registry

Сырой JSON в топике быстро превращается в проблему — поля переименовываются, типы плывут, consumer'ы разных команд ломаются. Для контрактов событий обычно выбирают Avro, Protobuf или JSON Schema вместо самодельных binary-сериализаторов.

Почему Avro

  • Схема описана отдельно (обычно JSON), payload — компактный binary.
  • Обратная и совместимая эволюция — добавление поля с default, удаление optional-поля; старые consumer'ы читают новые сообщения (новые поля → default/null).
  • Один формат для Java, Python, Go через codegen или GenericRecord.

Пример эволюции: поле faxNumber заменили на email с "default": null — старые записи без email десериализуются с email=null, новые без faxNumber — с faxNumber=null.

Schema Registry

Реестр схем (часто Confluent Schema Registry, open source) хранит версии схем по subject (topic-value, topic-key). В Kafka уходит wire format: magic byte + schema id (4 bytes) + Avro payload — полная схема в каждое сообщение не кладётся.

[0x0][schema_id: 4 bytes][Avro binary data...]

Producer с KafkaAvroSerializer регистрирует схему при первой записи; consumer с KafkaAvroDeserializer подтягивает схему по id.

props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://schema-registry:8081");

Режимы совместимости

РежимПравило
BACKWARD (default)Новая схема читает старые данные (добавление optional-полей)
FORWARDСтарые consumer'ы читают новые данные
FULLОба направления
NONEЛюбые изменения (опасно в prod)
Практикум vs prod

В Java-приложение с Apache Kafka и PostgreSQL для обучения используется JSON + custom serializer. Для prod event contract — Avro/Protobuf + Schema Registry и проверка совместимости в CI.

Альтернативы без Confluent — Apicurio Registry, AWS Glue Schema Registry, protobuf + buf validate.


Kafka Streams — обзор

Kafka Streams — библиотека (JVM), а не отдельный кластер. Приложение читает топики, строит топологию (граф операций) и пишет результат обратно в Kafka.

Ключевые понятия из потоковой обработки:

  • Таблично-потоковый дуализм — топик событий и compacted-топик "таблица состояния" взаимозаменяемы (changelog).
  • State stores — локальные RocksDB для агрегаций; при rebalance состояние восстанавливается из changelog-топика.
  • Окна — tumbling, hopping, session windows для агрегаций по времени.
  • Гарантии — at-least-once по умолчанию; exactly-once через processing.guarantee=exactly_once_v2.

Альтернативы — ksqlDB (SQL поверх Kafka), Flink, Spark Structured Streaming — выбор зависит от команды и требований к state и операционной модели.


Мониторинг и SLO

Consumer lag — главный индикатор "здоровья" pipeline: разница между последним offset в партиции и committed offset группы. Рост lag → consumer не успевает или упал.

Метрики брокера (JMX / Prometheus):

МетрикаЧто означает
Under-replicated partitionsРеплика отстаёт или брокер недоступен — риск потери данных при сбое
Offline partitionsНет лидера — запись/чтение недоступны
Request handler idle %Загрузка CPU брокера
Bytes in/out per secПропускная способность

Метрики producer: record-error-rate, request-latency-avg, размер batch.

Метрики consumer: records-lag-max, commit-latency-avg, частота rebalance.

SLO-пример: 99% событий обрабатываются с lag < 60 сек; алерт при URP > 0 дольше 5 мин.

Инструменты — AKHQ, Kafka UI, Confluent Control Center, Grafana + kafka_exporter, kafka-consumer-groups.sh --describe.


Выбор аппаратного обеспечения

Kafka дисковая система: последовательная запись в log-сегменты. Ориентиры по sizing:

  • Диск — SSD/NVMe; отдельные тома под log.dirs; RAID10 или JBOD (не RAID5 для write-heavy).
  • Память — page cache ОС критичен для read; 64 GB+ на брокер в высоконагруженных кластерах; heap JVM обычно 6–12 GB (больше — длинные GC-pause).
  • Сеть — 10 GbE+ между брокерами и клиентами при больших объёмах репликации.
  • CPU — умеренная нагрузка; сжатие (lz4, zstd) и SSL увеличивают потребление.

В облаке — managed-сервисы (AWS MSK, Azure Event Hubs for Kafka, Confluent Cloud) снимают часть sizing, но партиционирование и RF остаются на стороне архитектора.


Зеркальное копирование между кластерами

Mirroring — репликация данных между отдельными Kafka-кластерами (в отличие от репликации партиций внутри одного кластера). Встроенный инструмент Apache Kafka — MirrorMaker 2 (MM2).

Когда нужно несколько кластеров:

СценарийСуть
Региональные + центральныйЛокальные кластеры в ЦОД/регионах; агрегированные топики зеркалируются в центральный для аналитики
HA / DR"Горячий" или "холодный" standby-кластер с копией данных для аварийного переключения
ИзоляцияРазные SLA, security boundary или команды — отдельные кластеры вместо одного "монстра"
Active-activeДва ЦОД принимают трафик; mirroring синхронизирует топики (сложнее: порядок, идемпотентность)

Как работает MM2: consumer читает из source-кластера, producer пишет в target-кластер. Топики в target обычно получают префикс (например, source.orders). Offset'ы между кластерами не совпадают — это отдельные журналы.

Ограничения mirroring:

  • Задержка репликации (секунды–минуты) — DR-кластер отстаёт от primary.
  • Exactly-once не переносится "сквозь" mirroring автоматически.
  • Active-active требует продуманного naming, compacted topics и идемпотентных consumer'ов.

Альтернативы MM2 — Confluent Cluster Linking, LinkedIn Brooklin, Uber uReplicator — для крупных multi-DC с тонкой настройкой lag и failover.


Безопасность Kafka

Безопасность Kafka строится на пяти столпах — аутентификация (кто вы), авторизация (что можно), шифрование (защита на проводе и at rest), аудит (кто что делал), квоты (лимит ресурсов). Система защищена настолько, насколько защищено самое слабое звено — включая клиентов, CI/CD и секреты.

Протоколы listener'ов

security.protocolШифрованиеАутентификация
PLAINTEXTНетНет
SSLTLSОпционально (mTLS)
SASL_PLAINTEXTНетSASL
SASL_SSLTLS + SASLРекомендуется для prod

Типичная prod-конфигурация брокера:

listeners=SASL_SSL://0.0.0.0:9092
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512

Аутентификация (SASL)

МеханизмКогда использовать
SASL/PLAINDev/test; в prod только с TLS и централизованным хранением паролей
SASL/SCRAM-SHA-256/512Стандарт для prod: хеши на брокере, без plain-text пароля в конфиге клиента
SASL/GSSAPI (Kerberos)Корпоративный AD/LDAP, единый SSO
SASL/OAUTHBEAREROAuth 2.0 / OIDC (облако, zero-trust)

Клиент задаёт механизм через sasl.mechanism и JAAS-конфиг или sasl.jaas.config.

Шифрование

  • In-transit — TLS между client↔broker и broker↔broker (ssl.keystore.location, ssl.truststore.location на брокере; CA для клиентов).
  • At rest — шифрование диска на уровне ОС/облака (EBS encryption, LUKS); Kafka не шифрует log-сегменты сама.
  • End-to-end — приложение шифрует payload до отправки; брокер видит только байты (для особо чувствительных данных).

Авторизация (ACL)

Управление через AclAuthorizer (или RBAC в Confluent). Пример через CLI:

kafka-acls.sh --bootstrap-server broker:9092 \
--add --allow-principal User:alice \
--operation Read --operation Describe \
--topic payments
ОперацияСмысл
READ / WRITEЧтение / запись в топик
CREATE / DELETE / ALTERАдминистрирование топика
DESCRIBEМетаданные без чтения данных
ALLПолный доступ к ресурсу

Ресурсы — Topic, Group, Cluster, TransactionalId. Для межброкерного трафика — отдельный principal с минимальными правами.

Аудит и квоты

  • Audit logs фиксируют denied/allowed операции — отправка в SIEM.
  • Quotas ограничивают produce/fetch bytes/sec по principal — защита от "шумного соседа".
Rolling upgrade безопасности

Добавляйте SASL_SSL параллельно с PLAINTEXT на отдельном listener, мигрируйте клиентов, затем отключайте plaintext. Ротация сертификатов — до истечения срока, с truststore на всех клиентах.


Программное управление через AdminClient

AdminClient предоставляет программный интерфейс для административных операций без использования CLI. Основные возможности:


Управление топиками

// Java: создание топика с 3 партициями и фактором репликации 2
NewTopic topic = new NewTopic("orders", 3, (short) 2);
CreateTopicsResult result = adminClient.createTopics(Collections.singleton(topic));
result.all().get();

Управление конфигурацией

# Python (confluent-kafka) — изменение параметра топика
from confluent_kafka.admin import ConfigResource, ConfigSource
config_resource = ConfigResource(ConfigResource.Type.TOPIC, "orders")
config_resource.set_config("retention.ms", "604800000")
admin.alter_configs([config_resource])

Управление группами потребителей

// C# (Confluent.Kafka): получение информации о группе
var groups = admin.ListConsumerGroupsAsync().Result;
var description = admin.DescribeConsumerGroupsAsync(new[] { "order-processors" }).Result;

AdminClient поддерживает асинхронные операции, обработку ошибок через исключения и работу с метаданными кластера (список брокеров, топиков, партиций).


Утилиты командной строки для администрирования

Стандартный дистрибутив Kafka включает скрипты в каталоге bin/:


Управление топиками

# Создание топика
kafka-topics.sh --bootstrap-server broker1:9092 --create \
--topic events --partitions 6 --replication-factor 3

# Описание топика
kafka-topics.sh --bootstrap-server broker1:9092 --describe --topic events

# Удаление топика (требует включённого delete.topic.enable)
kafka-topics.sh --bootstrap-server broker1:9092 --delete --topic deprecated-topic

Работа с потребителями

# Просмотр групп потребителей
kafka-consumer-groups.sh --bootstrap-server broker1:9092 --list

# Описание состояния группы
kafka-consumer-groups.sh --bootstrap-server broker1:9092 \
--describe --group payment-processors

# Сброс смещений (требует остановки потребителей группы)
kafka-consumer-groups.sh --bootstrap-server broker1:9092 \
--group analytics --reset-offsets --to-earliest --execute --topic clicks

Диагностика кластера

# Проверка работоспособности брокера
kafka-broker-api-versions.sh --bootstrap-server broker1:9092

# Анализ распределения партиций
kafka-topics.sh --bootstrap-server broker1:9092 --describe \
--under-replicated-partitions

В режиме KRaft утилиты используют --bootstrap-server. Параметр --zookeeper относится к legacy-кластерам Kafka 3.x.


Настройка и использование клиентских API

Producer API

Ключевые параметры конфигурации:

  • bootstrap.servers — начальный список брокеров
  • acks — уровень подтверждения — 0 (без подтверждения), 1 (лидер), all (все реплики в ISR)
  • retries и retry.backoff.ms — политика повторных попыток
  • compression.type — алгоритм сжатия (none, gzip, snappy, lz4, zstd)
  • batch.size и linger.ms — параметры батчинга для повышения пропускной способности (теория — Пакетная работа с данными)
  • enable.idempotence — идемпотентная отправка (см. exactly-once)
  • schema.registry.url — при Avro/Protobuf через Confluent serializers

Headers — метаданные записи (trace-id, event-type); не участвуют в выборе партиции.

Interceptors — hooks до/после send для метрик и аудита без дублирования кода в каждом сервисе.

Пример отправки с ключом для упорядоченности:

ProducerRecord<String, String> record =
new ProducerRecord<>("user-events", userId, eventJson);
producer.send(record, (metadata, exception) -> {
if (exception != null) handleFailure(exception);
});

Consumer API

Ключевые параметры:

  • group.id — идентификатор группы потребителей
  • auto.offset.reset — поведение при отсутствии смещения (earliest, latest)
  • enable.auto.commit — автоматическое подтверждение смещений (рекомендуется false для контроля)
  • max.poll.records — максимум записей за один вызов poll()
  • session.timeout.ms и heartbeat.interval.ms — параметры жизненного цикла потребителя в группе

Рекомендуемый цикл обработки с ручным подтверждением:

while True:
records = consumer.poll(timeout_ms=1000)
for record in records:
process(record)
consumer.commit_sync() # Подтверждение после успешной обработки

Для обработки ошибок необходимо обрабатывать исключения CommitFailedException (возникает при перебалансировке во время коммита) и реализовывать стратегию повторных попыток для идемпотентной обработки сообщений.


Установка и эксплуатация

Материал по установке вынесен в соседние статьи, чтобы не дублировать шаги:

ЗадачаСтатья
Первый запуск, KRaft, Docker, console producer/consumerApache Kafka — потоковая обработка
KRaft single-node на Windows, Java CRM + PostgreSQLJava-приложение с Apache Kafka и PostgreSQL
CLI, параметры broker/producer/consumer, anti-patternsСправочник по Apache Kafka
Мониторинг lag, URP, SLOраздел "Мониторинг и SLO" в этой статье

См. также

Базовая глава в разделе "Система и сеть" — Apache Kafka — потоковая обработка данных.


Как внедрять event streaming поэтапно

Чтобы снизить риск, внедряйте потоковую архитектуру итеративно:

  1. Выберите 1-2 доменных события с понятной ценностью для бизнеса.
  2. Поднимите producer + consumer + мониторинг lag/error.
  3. Добавьте схему события (Avro/JSON Schema) и правила совместимости в Schema Registry.
  4. Подключите DLQ/повторную обработку.
  5. Только после стабилизации расширяйте на другие домены.

Такой путь быстрее приводит к устойчивому результату, чем "большой взрыв" с полной миграцией всех интеграций сразу.