Перейти к основному содержимому

7.05. Kafka

Разработчику Архитектору Инженеру

Kafka

Концепции топиков, разделов, производителей, потребителей, брокеров и кластеров.
Принципы работы групп потребителей и балансировки нагрузки.
Внутреннее устройство: контроллер кластера, механизм репликации, физическое хранение данных (сегменты журналов, индексы, сжатие).
Принципы авторизации и управления доступом.
Программное управление через AdminClient (создание/удаление топиков, управление конфигурацией, управление группами потребителей).
Использование утилит командной строки для администрирования.
Настройка и использование клиентских API (Producer, Consumer).

Apache Kafka — это распределённая потоковая платформа (streaming platform), которая предназначена для обработки больших объёмов данных в реальном времени. Kafka часто используется для построения систем, где требуется высокая производительность, масштабируемость и надёжность.

Официальный сайт - https://kafka.apache.org/

Kafka работает в кластере, поддерживает обработку данных в реальном времени и может обрабатывать миллионы сообщений в секунду.

image-12.png

Основные компоненты Kafka:

  1. Брокер (Broker) — это узел (сервер) в Kafka-кластере, который отвечает за хранение и управление данными. Каждый брокер хранит часть данных (топиков) и обрабатывает запросы от продюсеров и консьюмеров. В кластере может быть несколько брокеров для обеспечения отказоустойчивости и масштабируемости.
  2. Кластер (Cluster) — это группа брокеров, которые работают вместе для обработки данных. Kafka использует ZooKeeper (или Raft в новых версиях) для координации работы брокеров в кластере.
  3. Координатор (Coordinator) — это специальный брокер, который отвечает за управление группами консьюмеров. Он отслеживает, какие консьюмеры читают данные из каких партиций, и управляет оффсетами.
  4. Топик (Topic) — это логический канал, через который передаются сообщения. Каждый топик разделяется на партиции (partitions) для параллельной обработки данных.
  5. Партиция (Partition) — это упорядоченный лог данных внутри топика. Каждая партиция хранится на одном брокере, но может реплицироваться на другие брокеры для отказоустойчивости. Сообщения в партиции имеют строгий порядок, что позволяет гарантировать последовательность обработки.
  6. Оффсет (Offset) — это уникальный идентификатор сообщения в партиции. Консьюмеры используют оффсеты для отслеживания своего прогресса при чтении данных. Оффсеты сохраняются либо на стороне консьюмера, либо в Kafka.
  7. Продюсер (Producer) — это приложение или сервис, которое отправляет сообщения в Kafka. Продюсер выбирает топик и партицию для отправки сообщений.
  8. Консьюмер (Consumer) — это приложение или сервис, которое читает сообщения из Kafka. Консьюмеры организованы в группы (consumer groups), чтобы распределить нагрузку между несколькими экземплярами.

Kafka использует модель «продюсер-брокер-консьюмер» для обработки данных.

Вот как это работает:

  • продюсер отправляет сообщения, пишет их в определённый топик;
  • сообщения автоматически распределяются по партициям топика;
  • каждый брокер хранит данные в партициях - данные сохраняются в течение заданного времени (например, неделя);
  • консьюмер подключается к топику и начинает читать сообщения;
  • каждый консьюмер в группе получает данные из одной или нескольких партиций;
  • координатор следит за тем, какие консьюмеры читают данные и из каких партиций, если консьюмер выходит из строя, его партиции переназначаются другим консьюмерам.

Как настроить Kafka?

  1. Установка Java. Kafka работает поверх Java, поэтому сначала нужно установить Java Development Kit (JDK).
  2. Установка ZooKeeper. ZooKeeper — это координатор, который управляет кластером Kafka. В новых версиях Kafka (например, 3.x) ZooKeeper заменяется на Raft, но для старых версий он всё ещё обязателен. 2.1. Скачайте и распакуйте ZooKeeper; 2.2. Настройте конфигурацию. Создайте файл zoo.cfg в папке conf. 2.3. Запустите ZooKeeper
  3. Установка Kafka. 3.1. Скачайте и распакуйте Kafka. 3.2. Настройте конфигурацию. Файл конфигурации находится в config/server.properties. Основные параметры:
broker.id=1
listeners=PLAINTEXT://localhost:9092
log.dirs=/tmp/kafka-logs
num.partitions=3

3.3. Запустите Kafka:

bin/kafka-server-start.sh config/server.properties

3.4. Чтобы Kafka запускался автоматически при загрузке системы, добавьте скрипт в автозагрузку или используйте systemd. 4. Создание топика.

Топик — это логический канал для передачи сообщений.

Создайте топик:

bin/kafka-topics.sh --create --topic my_topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

Проверьте список топиков:

bin/kafka-topics.sh --list --bootstrap-server localhost:9092
  1. Отправка и получение сообщений.

Пример отправки сообщения:

bin/kafka-console-producer.sh --topic my_topic --bootstrap-server localhost:9092

Пример получения сообщения:

bin/kafka-console-consumer.sh --topic my_topic --from-beginning --bootstrap-server localhost:9092
  1. Подключение Kafka к программам.

В разных языках программирования испольхуются соответствующие библиотеки.

6.1. Java - библиотека org.apache.kafka:kafka-clients

Пример использования:

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my_topic", "key", "Hello, Kafka!"));
producer.close();
}
}

6.2. Python- библиотека kafka-python

Пример использования:

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('my_topic', value=b'Hello, Kafka!')
producer.flush()

6.3. JavaScript (Node.js)- библиотека kafkajs

Пример использования:

const { Kafka } = require('kafkajs');
const kafka = new Kafka({ brokers: ['localhost:9092'] });
const producer = kafka.producer();
await producer.connect();
await producer.send({
topic: 'my_topic',
messages: [{ value: 'Hello, Kafka!' }],
});
await producer.disconnect();

6.4. PHP - библиотека php-rdkafka

Пример использования:

<?php
$rk = new RdKafka\Producer();
$rk->addBrokers("localhost:9092");

$topic = $rk->newTopic("my_topic");
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Hello, Kafka!");
$rk->poll(0);
?>

  1. Мониторинг предоставляет инструмент для управления кластерами - Kafka Manager.

Пример установки:

docker run -it --rm -p 9000:9000 -e ZK_HOSTS="localhost:2181" sheepkiller/kafka-manager

Для визуализации можно использовать Grafana, а Prometheus для сбора метрик.