Java-приложение с Apache Kafka и PostgreSQL
Подготовка
Я уверен, что вы пришли сюда за практикой работы с Kafka, а не только за теорией.
Давайте разбираться, и попробуем вместе с вами собрать приложение, которое будет, словно CRM, подключаться к Kafka, работать с базой данных, добавлять и отправлять. Возможно, так будет эффективнее.
Чтобы было проще ориентироваться, можно воспринимать эту практику как три шага:
- Подготовить инфраструктуру (Kafka + PostgreSQL + Java).
- Собрать рабочий контур Producer -> Topic -> Consumer -> DB.
- Проверить поток на реальных данных и зафиксировать точки отказа.
Так вы увидите не только "как запустить", но и как мыслить при проектировании интеграции в боевом сервисе.

Нас ожидает тот ещё квест, если вы не работали с Kafka. Начнём с приготовлений.
Используемые технологии
Для начала работы нам потребуется:
- Kafka;
- PostgreSQL;
- Java;
- Intellij IDEA.
Системные требования
На компьютере нужно будет минимум 4 ГБ оперативной памяти, 10 ГБ свободного места на диске.
Общая схема
Общая схема архитектуры нашего приложения будет такова:
Клиент → Producer → Kafka → Consumer → База данных
↓
Мониторинг
Эта схема отражает основной принцип работы систем на основе Apache Kafka: разделение отправки и обработки данных во времени и пространстве.
- Producer (Издатель) — это часть нашего приложения, которая отвечает за отправку событий (в нашем случае — информации о новом клиенте) в Kafka. Она не знает и не заботится о том, кто и когда будет эти данные читать.
- Kafka выступает в роли надёжного буфера-очереди. Она принимает сообщения, сохраняет их на диске и гарантирует их доставку подписчикам, даже если потребитель временно недоступен.
- Consumer (Потребитель) — это фоновый процесс, который постоянно "слушает" указанный топик (
customer-events). Как только появляется новое сообщение, он его забирает и выполняет полезную работу — в нашем случае сохраняет данные в PostgreSQL. - База данных — это конечное хранилище, где информация о клиентах хранится долговременно и структурированно.
Такой подход позволяет:
- Разгрузить пользовательский интерфейс: отправка в Kafka происходит быстро и асинхронно, поэтому GUI не зависает.
- Повысить отказоустойчивость — если база данных временно недоступна, сообщения останутся в Kafka и будут обработаны позже, как только потребитель сможет подключиться.
- Масштабировать систему — в будущем к одному и тому же топику можно будет подключить несколько разных потребителей для выполнения различных задач (например, отправка email-уведомлений, обновление аналитической базы данных и так далее).
Само приложение будет написано на языке программирования Java, подключаться к базе данных на PostgreSQL, и обладать графическим интерфейсом.
Реализация будет отправлять клиентов в Kafka, а фоновый потребитель (consumer) будет постоянно слушать топик и сохранять данные в PostgreSQL:
[GUI] → Отправка в Kafka → [Топик customer-events] → Потребитель → Сохранение в БД
↑
Фоновый поток (не блокирует UI)
Подготовка и установка
Kafka
Начнём с установки Kafka. Я в примере использовал версию 4.2.0.
Скачайте последнюю версию с сайта https://kafka.apache.org/community/downloads/.
Лучше качать бинарную версию, например, у меня была kafka_2.13-4.2.0.tgz. Эта запись расшифровывается так:
kafka— имя проекта.2.13— версия языка Scala, на котором скомпилирована эта сборка Kafka. Это внутренняя деталь, и для большинства пользователей, которые используют Kafka только как брокер сообщений через клиентские библиотеки (как мы с Java), это значение не имеет принципиального значения.4.2.0— версия самого Apache Kafka.
Для Windows рекомендую распаковать папку из архива в папку C:\kafka так, чтобы у вас получилось по этому пути следующее содержимое:
kafka/
├── bin/
│ ├── windows/
├── config/
├── Данные/
├── libs/
├── licenses/
├── logs/
├── site-docs/
└── LICENSE
└── NOTICE
Kafka лучше всего работает в Unix-подобных системах (Linux, macOS). На Windows для серьёзной разработки или prod рекомендуется WSL2 или виртуальная машина с Linux.
Java
Во-первых, установите JDK.
https://www.oracle.com/java/technologies/downloads/
Убедитесь, что Java установлена:
java -version
Во-вторых, установите Intellij IDEA:
https://www.jetbrains.com/Рu/idea/download/?section=windows
PostgreSQL
Установите PostgreSQL и pgAdmin.
https://www.postgresql.org/download/
Установка может занять некоторое время, но я думаю, что вы справитесь!
Нам понадобится программа pgAdmin для работы с базой данных, поэтому убедитесь, что она установлена и попробуйте её хотя бы раз запустить.
Конфигурация Kafka
Проблемы с версиями
Раньше (в версиях 3.x) разработчики клали примеры KRaft-конфигов в отдельную папку, чтобы не ломать привычный server.properties со старыми ZooKeeper-настройками. Начиная с версии 4.0, когда ZooKeeper полностью убрали, файл config/server.properties переписали под чистый KRaft.
Kafka не могла управлять собой сама и для хранения своего состояния (какие топики есть, кто является лидером) использовала ZooKeeper как внешний костыль. Это создавало сложности: нужно было администрировать две разные системы, а при больших нагрузках ZooKeeper начинал "тормозить". С версии 4.0 ZooKeeper удалили навсегда, сделав KRaft единственным способом запуска. Вам нужно забыть про zookeeper.properties.
В корневой папке config есть файл server.properties, но могут возникнуть проблемы, поэтому допустимо создать свой файл конфигурации, что мы и сделаем.
Важно: Используйте обратные слеши \ в PowerShell/CMD, а не прямые /. Если будете работать в GitBash или на Linux, то /, если в CMD/PowerShell то \.
Настройка конфигурации
Мы создадим собственный файл конфигурации kraft-single.properties для запуска Kafka в режиме одного узла (single-node cluster). Это идеально подходит для локальной разработки и обучения.
- Создайте файл
kraft-single.propertiesв папкеC:\kafka\config. - Добавьте в него следующее содержимое:
Код ITЗагрузка примера кода…
Что мы настроили здесь:
listeners— теперь перечислены обаlistener-а:CONTROLLERна порту 9093 иPLAINTEXTна порту 9092;controller.listener.names=CONTROLLER— указывает, какойlistenerиспользуется для межконтроллерного общения;listener.security.protocol.map— обязательный маппинг для обоих типов.
Такой конфиг настроит слушатели, чтобы не было ошибок (но не гарантирую!).
В стандартном server.properties из дистрибутива Kafka 4.2.0 нет настройки process.roles и controller.quorum.voters — без них KRaft не знает, в каком режиме запускаться (как брокер, как контроллер или как оба). Поэтому для одиночного узла лучше создать такой конфигурационный файл.
Обратите внимание, что для версии Windows существует папка C:\kafka\bin\windows, в которой лежит много .bat файлов. Запомните их.
process.roles — это ключевая настройка KRaft-режима. Она определяет, какие функции будет выполнять данный узел Kafka.
broker— отвечает за хранение данных (топиков) и обработку запросов от продюсеров и консьюмеров.controller— управляет метаданными кластера — знает о всех топиках, партициях, их лидерах и состоянии узлов.
В нашем случае, поскольку мы разворачиваем всё на одной машине, один и тот же процесс берёт на себя обе роли.
node.id — уникальный числовой идентификатор этого узла в пределах всего кластера. В single-node конфигурации он может быть любым, но обычно выбирают 1.
controller.quorum.voters — список всех узлов, которые участвуют в принятии решений контроллером (quorum). Формат записи: <node.id>@<hostname>:<port>.
Поскольку у нас только один узел с node.id=1, слушающий на localhost:9093, именно он и указан здесь. Эта настройка позволяет контроллеру понять, с кем ему нужно синхронизироваться.
listeners — определяет, на каких сетевых интерфейсах и портах будет слушать наш узел. У нас два слушателя:
CONTROLLER://localhost:9093— используется исключительно для внутреннего общения между контроллерами (в нашем случае — сам с собой).PLAINTEXT://localhost:9092— используется для общения с клиентами (нашими Java-приложениями — продюсерами и консьюмерами).
controller.listener.names — явно указывает, какой из слушателей, перечисленных в listeners, предназначен для внутреннего трафика контроллера. Это обеспечивает чёткое разделение ролей.
listener.security.protocol.map — задаёт соответствие между именем слушателя и фактическим протоколом безопасности.
CONTROLLER:PLAINTEXTозначает, что для слушателяCONTROLLERбудет использоваться незашифрованное соединение (PLAINTEXT).PLAINTEXT:PLAINTEXTозначает то же самое для клиентского слушателя. Для продакшена здесь следует использоватьSSLилиSASL_SSL.
log.dirs — путь к директории, где Kafka будет хранить все свои данные: логи топиков, индексы и метаданные. Убедитесь, что эта папка существует, или создайте её вручную перед запуском.
-
num.partitions=1— каждый новый топик будет создан с одной партицией. Партиции позволяют распараллеливать обработку, но для нашего примера одной достаточно. -
default.replication.factor=1— фактор репликации по умолчанию равен 1, то есть данные не будут дублироваться на другие узлы (их у нас и нет). -
Остальные параметры (
offsets.topic...,transaction.state.log...) настраивают системные топики Kafka. Их фактор репликации также установлен в1, так как у нас single-node кластер. -
log.retention.hours=168— сообщения в топиках будут автоматически удаляться через 168 часов (7 дней). -
log.segment.bytes— максимальный размер одного сегмента лога (файла на диске). -
log.retention.check.interval.ms— интервал, с которым Kafka проверяет, какие сегменты пора удалить.
advertised.listeners — самый важный параметр для клиентов. Он сообщает продюсерам и консьюмерам, по какому адресу и порту с ним можно связаться. В нашем случае это localhost:9092. Если бы Kafka работала на удалённом сервере, здесь нужно было бы указать его публичный IP-адрес или доменное имя.
Получение uuid
Перед первым запуском KRaft нужно сгенерировать ID кластера.
Откройте Git Bash в папке C:/kafka/:
$ bin/windows/kafka-storage.bat random-uuid
Если будете использовать обычный терминал Windows (CMD/Powershell), то обратите внимание, команда будет выглядеть иначе:
bin\windows\kafka-storage.bat random-uuid
Запомните разницу, не перепутайте.
Как заметили, система будет обращаться к файлу kafka-storage.bat и генерировать случайный uuid.
К примеру, вы можете получить что-то вроде:
2026-04-09T06:03:59.109257100Z main ERROR Reconfiguration failed: No configuration found for '5acf9800' at 'null' in 'null'
w4BJSOSHQHRdLTEmM0ZA-w
Если вы увидели ошибку, как выше - ничего страшного, но она будет связана с Log4j - это для логирования, так что не пугайтесь. Нас интересует следующая строка, к примеру как тут:
w4BJSOSHQHRdLTEmM0ZA-w
Это ваш Cluster ID. Скопируйте полученную строку.
К слову, на Linux это было бы так:
./bin/kafka-storage.sh random-uuid
Форматирование хранилища
После получения ID кластера, нужно отформатировать хранилище.
Для Linux:
./bin/kafka-storage.sh format -t <ваш-uuid> -c ./config/kraft/kraft-single.properties
Для Windows:
bin\windows\kafka-storage.bat format -t <ваш-uuid> -c config\kraft-single.properties
Запуск Kafka
Запуск брокера (из каталога Kafka):
bin\windows\kafka-server-start.bat config\kraft-single.properties
После запуска вы должны увидеть что-то вроде:
[2026-04-09 08:xx:xx,xxx] INFO [ControllerServer id=1] Started.
[2026-04-09 08:xx:xx,xxx] INFO [BrokerServer id=1] Started.
При запуске на Windows, мы можем получить ошибку:
'wmic' is not recognized as an internal or external command,
operable program or batch file.
Эта проблема связана с тем, что, начиная с Windows 10 (версия 21H1), Microsoft удалила старую консольную утилиту wmic.exe по умолчанию. Старые скрипты запуска Kafka (даже в версии 4.2.0) всё ещё пытаются её вызвать, чтобы определить разрядность системы, и, не находя её, падают с этой ошибкой.
Bash-скрипт будет чистый, и с Linux проблемы такой не будет. Куда деваться, разница систем даёт знать.
Откройте файл C:\kafka\bin\windows\kafka-server-start.bat в блокноте. Там будет вызов wmic. К примеру, может быть что-то вроде:
Код ITЗагрузка примера кода…
Замените блок с IF ["%KAFKA_HEAP_OPTS%"] EQU [""] на следующий:
IF ["%KAFKA_HEAP_OPTS%"] EQU [""] (
rem Skip WMIC detection for modern Windows
set KAFKA_HEAP_OPTS=-Xmx1G -Xms1G
)
Весь файл будет выглядеть как-то так:
Код ITЗагрузка примера кода…
Тогда можно повторить, и Kafka успешно запустится. Если всё сделать правильно, после запуска Kafka сообщит в консоли:
INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)
База данных
Создание базы данных
Запустите pgAdmin, настройте пароль и пользователя, создайте базу данных.
Простейший способ - выбрать сервер localhost - Databases - Create - Database.
В параметрах укажите лишь имя, к примеру, test.
Затем нажмите правой кнопкой мыши по созданной базе данных, и выберите Query Tool, чтобы сделать SQL-запрос.
Выполните последовательно несколько запросов:
- Создание таблицы для клиентов CRM:
CREATE TABLE IF NOT EXISTS customers (
id VARCHAR(36) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE,
phone VARCHAR(20),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
- Создание индекса для email (опционально):
CREATE INDEX IF NOT EXISTS idx_customers_email ON customers(email);
И всё. В дальнейшем, чтобы проверять клиентов в базе, используйте запрос:
SELECT * FROM customers ORDER BY created_at DESC;
Написание программы
Перейдём к созданию Java-приложения.
Создание проекта
Создайте новый Maven-проект. Можете ставить себе свои настройки в конфигурации, но если хотите начать с копирования моих примеров, то сделайте как я:

Назовём проект просто 'CRM', систему сборки выберем Maven, и укажем:
- groupId:
ru.timur.crm; - artifactId:
CRM.
Настройка Maven
После создания проекта, первым делом отправляемся к pom.xml и заменяем его содержимое на следующее:
Код ITЗагрузка примера кода…
Мы устанавливаем здесь зависимости:
- Kafka клиент;
- Jackson для работы с JSON;
- slf4j для логирования;
- PostgreSQL JDBC Driver для базы данных;
- JavaFX для графического интерфейса.
С такой конфигурацией, у нас будет вариант запустить приложение через консоль - AppConsole, или через графический интерфейс - CrmApplication.
Структура проекта
Воссоздайте следующую структуру:

Архитектура следующая:
- Папка
configсодержит классы с конфигурацией -DatabaseConfigиKafkaConfig; - Папка
consumerсодержит классCustomerConsumer; - Папка
modelсодержит классCustomer; - Папка
producerсодержит классCustomerProduser; - Папка
serializerсодержит классы для обработки JSON -JsonDeserializerиJsonSerializer; - Папка
serviceсодержит классCustomerService; - В корне будут лежать классы
AppConsole(альтернатива, консольный клиент),CrmApplicationдля основного приложения с GUI, иCrmController; - В ресурсах (
resources, ниже) нужно создать файлcrm-view.fxml.
Чтобы создать папку, выбирайте New - Package. Для создания классов - New - Java Class. Все такие файлы будут иметь расширение .java.
config
DatabaseConfig.java
Папка config содержит классы, которые централизуют все настройки приложения. Это лучшая практика, так как позволяет легко изменять параметры подключения без необходимости искать их по всему коду.
В папке config, в файле DatabaseConfig, добавляем код:
// src/main/java/ru/timur/crm/config/DatabaseConfig.java
package ru.timur.crm.config;
public class DatabaseConfig {
public static final String DB_URL = "jdbc:postgresql://localhost:5432/имя_вашей_базы";
public static final String DB_USER = "логин";
public static final String DB_PASSWORD = "пароль";
// SQL для вставки клиента
public static final String INSERT_CUSTOMER_SQL =
"INSERT INTO customers (id, name, email, phone) VALUES (?, ?, ?, ?) " +
"ON CONFLICT (id) DO NOTHING";
}
Обратите внимание - здесь есть:
- DB_URL - путь к базе данных PostgreSQL;
- DB_USER - логин;
- DB_PASSWORD - пароль.
Это конфигурация для подключения к базе данных.
DB_URL— это строка подключения к базе данных в формате JDBC (Java Database Connectivity). Она состоит из:- Протокола (
jdbc:postgresql://). - Адреса сервера (
localhost). - Порта (
5432, стандартный для PostgreSQL). - Имени базы данных (
имя_вашей_базы). Вам нужно заменить это значение на имя вашей реальной базы, например,test.
- Протокола (
DB_USERиDB_PASSWORD— учётные данные для аутентификации в PostgreSQL. Замените их на ваши логин и пароль.INSERT_CUSTOMER_SQL— SQL-запрос для вставки нового клиента. Ключевая часть здесь —ON CONFLICT (id) DO NOTHING. Эта конструкция, специфичная для PostgreSQL, предотвращает ошибку, если попытаться вставить запись с уже существующимid. Вместо падения запрос просто ничего не делает, что повышает устойчивость потребителя к дублирующимся сообщениям.
KafkaConfig.java
Этот класс отвечает за создание конфигурационных объектов для Kafka-клиентов.
В папке config, в файле KafkaConfig, добавляем код:
Код ITЗагрузка примера кода…
Обратите внимание, что как раз для подключения используется локальный сервер localhost, и порт 9092.
BOOTSTRAP_SERVERS— список адресов брокеров Kafka, к которым должен подключиться клиент для получения полной информации о кластере. Для нашего локального примера этоlocalhost:9092.CUSTOMER_TOPIC— имя топика, который будет использоваться для обмена событиями о клиентах. Это центральный элемент нашей системы.
Метод producerProps() создаёт настройки для продюсера:
KEY_SERIALIZER_CLASS_CONFIG— указывает, как сериализовать ключ сообщения. Мы используем стандартныйStringSerializer, так как наш ключ — это строковыйidклиента.VALUE_SERIALIZER_CLASS_CONFIG— указывает, как сериализовать само тело сообщения. Здесь мы используем наш собственныйJsonSerializer, который преобразует объектCustomerв JSON-строку.ACKS_CONFIG = "all"— самая строгая гарантия доставки. Продюсер будет считать запись успешной только после того, как все реплики партиции подтвердят получение данных. Это предотвращает потерю данных, но немного снижает производительность. Для обучения и надёжности это оптимальный выбор.RETRIES_CONFIG = 3— количество попыток повторной отправки сообщения в случае временных сбоев (например, сетевых проблем).
Метод consumerProps() создаёт настройки для консьюмера:
GROUP_ID_CONFIG— идентификатор группы потребителей. Все консьюмеры с одинаковымgroupIdобразуют группу и совместно распределяют между собой партиции топика. В нашем GUI-приложении используетсяcrm-gui-consumer, а в консольной версии —crm-group-1.KEY_DESERIALIZER_CLASS_CONFIGиVALUE_DESERIALIZER_CLASS_CONFIG— зеркальные настройки для продюсера, но для десериализации (обратного преобразования из байтов в объекты).AUTO_OFFSET_RESET_CONFIG = "earliest"— указывает, с какого места начинать чтение, если для данной группы потребителей нет сохранённого оффсета (например, при первом запуске)."earliest"означает, что будут прочитаны все существующие в топике сообщения с самого начала.
consumer
В папке consumer, в файле CustomerConsumer, добавляем следующий код:
Код ITЗагрузка примера кода…
model
В папке model, в файле Customer, добавьте следующий код:
Код ITЗагрузка примера кода…
Так будет выглядеть модель типичной сущности "Пользователь" в нашем проекте:
idдля идентификатора;nameдля имени;emailдля адреса электронной почты;phoneдля номера телефона.
producer
В папке producer, в файле CustomerProducer добавьте код:
Код ITЗагрузка примера кода…
serializer
В папке serializer, в файле JsonDeserializer, добавьте код:
Код ITЗагрузка примера кода…
В папке serializer, в файле JsonSerializer, добавьте код:
Код ITЗагрузка примера кода…
service
Пакет service содержит бизнес-логику приложения. Класс CustomerService отвечает за взаимодействие с базой данных PostgreSQL и реализует паттерн Data Access Object (DAO), инкапсулируя все детали работы с SQL.
В папке service, в файле CustomerService, добавьте код:
Код ITЗагрузка примера кода…
Класс хранит два ключевых поля:
connection— активное соединение с базой данных.manageConnection— флаг, который определяет, кто отвечает за закрытие этого соединения. Это важный момент для управления ресурсами и предотвращения утечек.
Класс предоставляет два конструктора, что делает его гибким и пригодным для разных сценариев использования.
Конструктор для внешнего соединения CustomerService принимает уже существующее соединение извне (например, от GUI-контроллера). Флаг manageConnection установлен в false, потому что владелец соединения (в данном случае CrmController) сам будет управлять его жизненным циклом. Это эффективно, так как позволяет переиспользовать одно и то же соединение для множества операций без накладных расходов на его создание и закрытие.
Конструктор для создания нового соединения CustomerService создаёт новое соединение с нуля. Он полезен для автономных задач или консольных утилит, где нет внешнего менеджера соединений. Поскольку класс сам создаёт соединение, он также берёт на себя ответственность за его закрытие (manageConnection = true).
Метод сохранения клиента saveCustomer использует конструкцию PostgreSQL ON CONFLICT ... DO UPDATE (также известная как "upsert" — update or insert).
- Если клиент с таким
idуже существует в таблице, его данные (name,email,phone) будут обновлены новыми значениями. - Поле
created_atбудет обновлено до текущего времени только в случае обновления записи. Это позволяет отслеживать, когда данные были последний раз изменены. - Такой подход делает потребителя идемпотентным: повторная отправка одного и того же сообщения не приведёт к дублированию записей или ошибкам, что критически важно для надёжных систем на основе Kafka.
Использование PreparedStatement с конструкцией try-with-resources гарантирует, что SQL-запрос будет скомпилирован один раз и безопасно выполнен. Параметризованные запросы защищают от SQL-инъекций, а автоматическое закрытие PreparedStatement предотвращает утечки ресурсов.
Реализация интерфейса AutoCloseable позволяет использовать этот класс в блоке try-with-resources. Метод close() проверяет флаг manageConnection: он закрывает соединение только если оно было создано внутри этого объекта. Это корректное поведение, которое не нарушает контракт с внешним владельцем соединения.
AppConsole
В файл AppConsole можно оставить такой код:
Код ITЗагрузка примера кода…
CrmApplication
В файл CrmApplication добавьте код:
Код ITЗагрузка примера кода…
CrmController
В файл CrmController добавьте код:
Код ITЗагрузка примера кода…
crm-view.fxml
В файл crm-view.fxml (который должен быть в папке resources), добавьте:
Код ITЗагрузка примера кода…
Запуск программы
Теперь самое интересное!
Очистка, сборка и компиляция
Выполните в терминале:
mvn clean install -U
Система должна скачать всё что нужно.
Для компиляции попробуйте выполнить:
mvn clean compile
Запуск программы
Запуск JavaFX-приложения:
mvn javafx:run
Так мы должны увидеть наш интерфейс:

Теперь попробуйте:
- Нажать кнопку "Подключиться к Kafka";
Система должна показать, к примеру:
[14:12:14] Успешное подключение к Kafka
- Нажать кнопку "Подключиться к БД";
Система должна показать, к примеру:
[14:12:16] Успешное подключение к PostgreSQL
- Убедитесь, что статусы корректны;
- Попробуйте нажать "Загрузить клиентов из БД";
[14:13:44] Последние клиенты из БД:
1. ID: 4aff56b5-dc73-44ab-bfc7-4cd933a05528
Имя: Клиент 3, Email: client3@example.com, Телефон: +79001234563
2. ID: 9b77b8d3-78c6-44f9-8352-7079b46379d2
Имя: Клиент 2, Email: client2@example.com, Телефон: +79001234562
3. ID: 3cf8d673-797e-4697-8c79-f9aeeab953b9
Имя: Клиент 1, Email: client1@example.com, Телефон: +79001234561
- Сравните клиентов с результатов SQL-запроса в БД;
- Попробуйте "Сгенерировать ID" и заполнить поля;
- Нажмите "Отправить в Kafka".
Система должна показать, к примеру:
[15:37:22] ✓ Отправлено: ID=4f44888b-fb07-4ec6-81f0-c2b5e21c04ca, партиция=0, смещение=6
- Нажмите "Запустить потребитель", чтобы запустить фоновый поток, который:
- подписывается на топик
customer-events; - постоянно опрашивает новые сообщения (
poll()); - при получении клиента, сохраняет в БД через
CustomerService; - коммитит оффсет только после успешного сохранения (гарантия доставки).
Отправляете клиента через форму, и он попадает в топик Kafka. Потребитель получает сообщение, сохраняет в базу данных и выводит в лог:
[15:52:00] ✓ Получен клиент из Kafka: ID=6933f0ce-987e-460c-8109-1d335a67b5a1, Имя=Тест
В разделе "Информация о топике customer-events вы сможете видеть:
- количество партиций;
- общее количество сообщений (в нашем коде мы выключили);
- группу потребителей.
Вот, в принципе, и всё!
Что важно учесть перед стартом проекта
Чтобы практическая часть прошла предсказуемо, заранее проверьте:
- совпадение версий JDK, Kafka клиента и JDBC-драйвера;
- кодировку (
UTF-8) для проекта и БД; - отдельные параметры подключения для
dev/test/prod; - включённые логи Kafka consumer group и SQL-ошибок;
- минимальный smoke test — отправка события, чтение, запись в PostgreSQL.
Улучшения для боевого применения
Текущий пример хорош как учебная база. Для прод-сценария дополнительно рекомендуется:
- вынести секреты и URL в переменные окружения;
- добавить централизованный логгер вместо
System.out; - включить retry policy и DLQ для проблемных сообщений;
- перейти на Avro/Protobuf + Schema Registry вместо ad-hoc JSON;
- покрыть producer/consumer интеграционными тестами;
- ввести валидацию входных данных до публикации в Kafka.