Первые шаги к микросервисам
Первые шаги к микросервисам
Микросервисная архитектура представляет собой подход к разработке программного обеспечения, при котором приложение строится как набор небольших, независимо развертываемых сервисов.
Каждый сервис отвечает за одну конкретную бизнес-функцию и общается с другими сервисами через четко определенные интерфейсы.
Такой подход позволяет масштабировать отдельные части системы независимо друг от друга, упрощает обновление технологий и повышает отказоустойчивость.
Рассмотрим практический пример создания распределенной системы обработки заявок.
Система будет состоять из трех основных компонентов:
- сервиса приема заявок на Python,
- сервиса обработки заказов на Java,
- сервиса уведомлений на C#.
Все компоненты будут взаимодействовать через REST API для синхронных запросов и RabbitMQ для асинхронного обмена сообщениями. В качестве хранилища данных выступит база PostgreSQL.
Архитектура системы
Система состоит из четырех ключевых элементов.
Первый элемент — это клиентское приложение или внешний источник, который инициирует создание заявки.
Второй элемент — сервис приема заявок (Order Ingestion Service), написанный на Python. Этот компонент принимает входящие данные, валидирует их и сохраняет информацию в базу данных.
Третий элемент — сервис обработки заказов (Order Processing Service), реализованный на Java. Он читает новые заявки из базы данных, выполняет бизнес-логику и формирует задачи для дальнейшей обработки.
Четвертый элемент — сервис уведомлений (Notification Service) на C#, который получает события о статусе заказа через брокер сообщений и отправляет уведомления пользователям.
Взаимодействие между компонентами происходит по двум каналам.
Синхронный канал использует REST API для немедленного ответа на запросы.
Асинхронный канал использует RabbitMQ для передачи событий, когда требуется фоновая обработка без блокировки основного потока выполнения.
База данных PostgreSQL служит единым источником истины для хранения структуры заявок и истории их изменений.
Техническая сводка
| Компонент | Команда запуска | Порт (API) | Описание функции |
|---|---|---|---|
| Docker (База данных PostgreSQL + Брокер RabbitMQ) | docker-compose up -d | 5432, 15672 | Запуск инфраструктуры. Порт 5432 обеспечивает подключение к базе данных, порт 15672 предоставляет веб-интерфейс для мониторинга очередей и сообщений брокера. |
| Python (Сервис приема заявок) | uvicorn main:app --reload --host 0.0.0.0 --port 8001 | 8001 | Принимает входящие запросы на создание заказов через REST API, сохраняет данные в базу и отправляет событие о создании заказа в очередь RabbitMQ. Документация API доступна по адресу http://localhost:8001/docs. |
| Java (Сервис обработки заказов) | mvn spring-boot:run | 8082 | Читает заказы со статусом "pending" из базы данных, выполняет бизнес-логику обработки, обновляет статус заказа и публикует событие о завершении работы в очередь RabbitMQ. |
| C# (Сервис уведомлений) | dotnet run | 5000 или 5001 | Подписывается на очередь RabbitMQ, получает события об изменении статуса заказа и инициирует отправку уведомления клиенту через HTTP-запрос. Сервис автоматически начинает прослушивание при запуске приложения. |
Подготовка инфраструктуры
Первый этап работы требует наличия специализированного программного обеспечения для управления контейнерами и сред выполнения языков программирования. Отсутствие этих инструментов делает запуск проекта невозможным.
| № | Элемент проверки | Статус | Примечание |
|---|---|---|---|
| 1 | Установлен Docker и Docker Compose | ☐ | Проверьте версию командой docker --version и docker-compose --version. Требуется актуальная версия для поддержки сетевых драйверов. |
| 2 | Установлен Python версии 3.9 или выше | ☐ | Подтвердите наличие интерпретатора командой python --version. Убедитесь, что переменная окружения PATH содержит путь к исполняемому файлу. |
| 3 | Установлена JDK версии 17 или выше | ☐ | Проверьте версию Java командой java -version или javac -version. Система должна использовать OpenJDK или Oracle JDK совместимой версии. |
| 4 | Установлена платформа .NET 6 или выше | ☐ | Подтвердите наличие SDK командой dotnet --version. Это необходимо для компиляции и запуска сервисов на C#. |
| 5 | Запущены контейнеры базы данных и брокера | ☐ | Выполните команду docker-compose up -d в корне проекта. Убедитесь, что статус сервисов postgres и rabbitmq равен Up. |
| 6 | Проверена связность с сетевыми службами | ☐ | Используйте утилиту ping или telnet для проверки доступа к портам postgres:5432 и rabbitmq:5672 изнутри сети Docker. |
Настройка среды разработки
Для работы с системой необходимо подготовить окружение.
Основными требованиями являются наличие Docker для запуска контейнеров с базой данных и брокером сообщений, а также установленные языки программирования Python 3.9+, Java 17+ и .NET 6+.
База данных PostgreSQL должна быть доступна для подключения по стандартному порту 5432.
Брокер сообщений RabbitMQ должен быть запущен на порту 5672 для AMQP протокола и иметь веб-интерфейс на порту 15672.
Перед началом работы создайте директорию проекта microservices-demo и внутри нее поддиректории для каждого сервиса: order-ingestion, order-processing, notifications.
Также создайте файл docker-compose.yml для оркестрации зависимостей.
Файл конфигурации Docker Compose определяет три службы:
- базу данных PostgreSQL,
- брокер RabbitMQ,
- пользовательские сервисы.
Каждая служба имеет свои параметры подключения, переменные окружения и тома для хранения данных.
version: '3.8'
services:
postgres:
image: postgres:15-alpine
container_name: microservices-db
environment:
POSTGRES_USER: admin
POSTGRES_PASSWORD: secret_password
POSTGRES_DB: orders_db
ports:
- "5432:5432"
volumes:
- pg_data:/var/lib/postgresql/data
networks:
- microservices-net
rabbitmq:
image: rabbitmq:3-management-alpine
container_name: microservices-rabbit
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: secret_password
networks:
- microservices-net
networks:
microservices-net:
driver: bridge
volumes:
pg_data:
Создание сети microservices-net обеспечивает изолированное взаимодействие между контейнерами.
Использование томов для PostgreSQL гарантирует сохранность данных после перезапуска контейнера. Веб-интерфейс RabbitMQ позволяет визуализировать очереди, обмены и состояние сообщений.
Сервис приема заявок требует изолированного пространства зависимостей. Создание виртуального окружения предотвращает конфликты версий библиотек с системными пакетами.
- Создание окружения: Выполните команду
python -m venv venv(илиpy -m venv venvна Windows) в директории сервисаorder-ingestion. - Активация: Активируйте окружение командой
source venv/bin/activate(Linux/macOS) илиvenv\Scripts\activate(Windows). - Установка зависимостей: Выполните установку пакетов из файла требований
pip install -r requirements.txtили вручную установитеfastapi,uvicorn,sqlalchemy,psycopg2-binary,pika. - Проверка: Убедитесь, что импорты модулей в коде не вызывают ошибок
ModuleNotFoundError.
Проект на базе Spring Boot управляется сборщиком Maven. Процесс сборки включает скачивание всех необходимых библиотек и компиляцию исходного кода.
- Инициализация проекта: Перейдите в директорию сервиса
order-processing. - Сборка: Выполните команду
mvn clean compileдля проверки синтаксиса иmvn packageдля создания артефакта.jar. - Зависимости: Убедитесь, что в папке
targetпоявился файл архива. Проверьте логи Maven на отсутствие ошибок разрешения зависимостей (Dependency Resolution Error). - Конфигурация: Файл
application.propertiesдолжен содержать правильные параметры подключения к базе данныхpostgresи брокеруrabbitmqвнутри сети Docker.
Сервис уведомлений использует экосистему .NET. Восстановление пакетов NuGet обеспечивает наличие всех необходимых библиотек для работы с очередями и HTTP-клиентами.
- Восстановление пакетов: Перейдите в директорию сервиса
notifications. Выполните командуdotnet restore. - Компиляция: Запустите
dotnet buildдля проверки кода. - Проверка: Убедитесь, что проект собирается без ошибок. В случае использования современных фреймворков убедитесь в наличии нужных версий пакетов
RabbitMQ.ClientиMicrosoft.AspNetCore.Mvc.NewtonsoftJson.
Сервис приема заявок на Python
Сервис приема заявок отвечает за получение данных от клиентов, валидацию входных параметров и сохранение информации в базу данных.
Для реализации используется фреймворк FastAPI, который обеспечивает высокую производительность и автоматическую генерацию документации API.
Библиотека SQLAlchemy используется для работы с базой данных через ORM.
Установите необходимые зависимости в виртуальном окружении Python:
pip install fastapi uvicorn sqlalchemy psycopg2-binary pydantic
Структура проекта включает:
- модуль
main.pyс точкой входа, - модуль
database.pyдля настройки подключения к базе данных, - модуль
models.pyс описанием схем таблиц, - модуль
schemas.pyс моделями данных Pydantic для валидации входящих запросов.
Файл database.py содержит функцию создания сессии и объект Base для декларативного определения моделей.
Подключение к базе данных осуществляется через URL, сформированный из переменных окружения.
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "postgresql://admin:secret_password@postgres/orders_db"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
Модель Order описывает структуру таблицы в базе данных.
Поля включают:
- уникальный идентификатор,
- номер заказа,
- статус,
- сумму,
- дату создания.
Индексы на полях номера заказа и статуса ускоряют поиск записей.
from sqlalchemy import Column, Integer, String, Float, DateTime, Index
from datetime import datetime
class Order(Base):
__tablename__ = "orders"
id = Column(Integer, primary_key=True, index=True)
order_number = Column(String(50), unique=True, index=True, nullable=False)
status = Column(String(20), default="pending")
amount = Column(Float, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
__table_args__ = (
Index('idx_order_status', 'status'),
)
Модель Pydantic OrderCreate определяет схему валидации входящих данных.
Поле order_number должно быть уникальным и содержать не более 50 символов.
Поле amount должно быть положительным числом.
from pydantic import BaseModel, Field
class OrderCreate(BaseModel):
order_number: str = Field(..., min_length=1, max_length=50)
amount: float = Field(..., gt=0)
Основной файл main.py содержит роуты для создания заказа и получения списка заказов.
Роут /orders обрабатывает POST-запросы для создания новых записей.
Роут /orders/{order_id} возвращает информацию о конкретном заказе.
from fastapi import FastAPI, Depends, HTTPException, BackgroundTasks
from sqlalchemy.orm import Session
from typing import List
from database import get_db, Base, engine
from models import Order
from schemas import OrderCreate, OrderResponse
import json
import pika
app = FastAPI(title="Order Ingestion Service")
# Создание таблиц
Base.metadata.create_all(bind=engine)
# Конфигурация RabbitMQ
rabbit_url = "amqp://admin:secret_password@rabbitmq/"
connection = pika.BlockingConnection(pika.URLParameters(rabbit_url))
channel = connection.channel()
channel.queue_declare(queue='order_events', durable=True)
def send_order_event(order_id: int, event_type: str):
message = json.dumps({
"order_id": order_id,
"event_type": event_type,
"timestamp": datetime.utcnow().isoformat()
})
channel.basic_publish(
exchange='',
routing_key='order_events',
body=message,
properties=pika.BasicProperties(delivery_mode=2)
)
@app.post("/orders", response_model=OrderResponse)
async def create_order(order: OrderCreate, background_tasks: BackgroundTasks, db: Session = Depends(get_db)):
existing_order = db.query(Order).filter(Order.order_number == order.order_number).first()
if existing_order:
raise HTTPException(status_code=400, detail="Order number already exists")
new_order = Order(
order_number=order.order_number,
amount=order.amount,
status="pending"
)
db.add(new_order)
db.commit()
db.refresh(new_order)
background_tasks.add_task(send_order_event, new_order.id, "order.created")
return new_order
@app.get("/orders/{order_id}", response_model=OrderResponse)
async def get_order(order_id: int, db: Session = Depends(get_db)):
order = db.query(Order).filter(Order.id == order_id).first()
if not order:
raise HTTPException(status_code=404, detail="Order not found")
return order
Запуск сервиса выполняется командой:
uvicorn main:app --reload --host 0.0.0.0 --port 8001
Документация API доступна по адресу http://localhost:8001/docs.
Сервис обработки заказов на Java
Сервис обработки заказов читает pending-заказы из базы данных, выполняет бизнес-логику проверки и обновления статуса, а затем отправляет событие в очередь RabbitMQ.
Для реализации используется Spring Boot с библиотеками Spring Data JPA для работы с базой данных и Spring AMQP для взаимодействия с RabbitMQ.
Зависимости проекта Maven включают следующие пакеты:
spring-boot-starter-web,spring-boot-starter-data-jpa,postgresql-driver,spring-boot-starter-amqp,lombok,validation.
Файл pom.xml содержит конфигуцию сборки проекта с указанием версий зависимостей и настроек компиляции.
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
Модель Order соответствует структуре таблицы в базе данных.
Используется аннотация @Entity для связи с таблицей, @Id для первичного ключа и другие аннотации для управления отображением полей.
package com.example.orderservice.model;
import jakarta.persistence.*;
import java.time.LocalDateTime;
@Entity
@Table(name = "orders")
public class Order {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(unique = true, nullable = false)
private String orderNumber;
@Enumerated(EnumType.STRING)
private OrderStatus status;
private Double amount;
private LocalDateTime createdAt;
public enum OrderStatus {
PENDING, PROCESSING, COMPLETED, FAILED
}
}
Репозиторий OrderRepository расширяет интерфейс JpaRepository и предоставляет методы для поиска заказов по статусу.
Метод findByStatusOrderByCreatedAtAsc возвращает список всех заказов со статусом PENDING, отсортированных по дате создания.
package com.example.orderservice.repository;
import com.example.orderservice.model.Order;
import com.example.orderservice.model.Order.OrderStatus;
import org.springframework.data.jpa.repository.JpaRepository;
import java.util.List;
public interface OrderRepository extends JpaRepository<Order, Long> {
List<Order> findByStatusOrderByCreatedAtAsc(OrderStatus status);
}
Сервис OrderProcessingService содержит основную бизнес-логику.
Метод processPendingOrders выбирает все pending-заказы, обновляет их статус на PROCESSING, а затем на COMPLETED, имитируя выполнение работы.
После успешной обработки заказ отправляется в очередь RabbitMQ.
package com.example.orderservice.service;
import com.example.orderservice.model.Order;
import com.example.orderservice.model.Order.OrderStatus;
import com.example.orderservice.repository.OrderRepository;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class OrderProcessingService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private RabbitTemplate rabbitTemplate;
private static final String QUEUE_NAME = "order_events";
public void processPendingOrders() {
List<Order> pendingOrders = orderRepository.findByStatusOrderByCreatedAtAsc(OrderStatus.PENDING);
for (Order order : pendingOrders) {
try {
order.setStatus(OrderStatus.PROCESSING);
orderRepository.save(order);
// Имитация обработки
Thread.sleep(1000);
order.setStatus(OrderStatus.COMPLETED);
orderRepository.save(order);
// Отправка события
rabbitTemplate.convertAndSend("", QUEUE_NAME,
String.format("{\"orderId\": %d, \"status\": \"%s\"}", order.getId(), order.getStatus()));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
order.setStatus(OrderStatus.FAILED);
orderRepository.save(order);
}
}
}
}
Файл конфигурации application.properties содержит параметры подключения к базе данных, настройку RabbitMQ и порт сервера.
spring.datasource.url=jdbc:postgresql://postgres:5432/orders_db
spring.datasource.username=admin
spring.datasource.password=secret_password
spring.datasource.driver-class-name=org.postgresql.Driver
spring.rabbitmq.host=rabbitmq
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret_password
server.port=8082
Запуск сервиса выполняется командой:
mvn spring-boot:run
Сервис автоматически запускает процессор заказов каждые 5 секунд благодаря аннотации @Scheduled.
Сервис уведомлений на C#
Сервис уведомлений слушает очередь RabbitMQ и при получении события о завершении заказа отправляет HTTP-запрос клиенту с информацией о статусе.
Для реализации используется ASP.NET Core Web API и библиотека RabbitMQ.Client.
Установите необходимые пакеты через NuGet:
dotnet add package RabbitMQ.Client
dotnet add package Microsoft.AspNetCore.Mvc.NewtonsoftJson
Модель OrderEvent описывает структуру сообщения, приходящего из очереди. Свойства соответствуют полям JSON-сообщения.
using System.Text.Json.Serialization;
namespace NotificationService.Models
{
public class OrderEvent
{
[JsonPropertyName("orderId")]
public int OrderId { get; set; }
[JsonPropertyName("status")]
public string Status { get; set; }
[JsonPropertyName("timestamp")]
public string Timestamp { get; set; }
}
}
Сервис NotificationConsumer реализует логику подписки на очередь и обработки сообщений.
Метод ConsumeMessages создает подключение к RabbitMQ, объявляет очередь и начинает слушать сообщения.
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Text.Json;
using NotificationService.Models;
namespace NotificationService.Services
{
public class NotificationConsumer : IHostedService
{
private readonly ILogger<NotificationConsumer> _logger;
private IConnection _connection;
private IChannel _channel;
private bool _running;
public NotificationConsumer(ILogger<NotificationConsumer> logger)
{
_logger = logger;
}
public Task StartAsync(CancellationToken cancellationToken)
{
_running = true;
var factory = new ConnectionFactory
{
HostName = "rabbitmq",
UserName = "admin",
Password = "secret_password"
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.QueueDeclare(queue: "order_events", durable: true, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += async (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var orderEvent = JsonSerializer.Deserialize<OrderEvent>(message);
await SendNotificationAsync(orderEvent);
_channel.BasicAck(ea.DeliveryTag, false);
};
_channel.BasicConsume(queue: "order_events", autoAck: false, consumer: consumer);
return Task.CompletedTask;
}
private async Task SendNotificationAsync(OrderEvent orderEvent)
{
using var client = new HttpClient();
var content = new StringContent(JsonSerializer.Serialize(orderEvent), Encoding.UTF8, "application/json");
try
{
var response = await client.PostAsync("http://localhost:8001/notify", content);
_logger.LogInformation($"Notification sent for order {orderEvent.OrderId}: {response.StatusCode}");
}
catch (Exception ex)
{
_logger.LogError(ex, $"Failed to send notification for order {orderEvent.OrderId}");
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
_running = false;
_channel?.Close();
_connection?.Close();
return Task.CompletedTask;
}
}
}
Файл Program.cs регистрирует сервис в DI-контейнере и настраивает маршруты.
using NotificationService.Services;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();
builder.Services.AddHttpClient();
builder.Services.AddHostedService<NotificationConsumer>();
var app = builder.Build();
app.MapControllers();
app.Run();
Дополнительный контроллер NotificationController может принимать прямые уведомления от других сервисов через REST API.
using Microsoft.AspNetCore.Mvc;
using NotificationService.Models;
using System.Text.Json;
[ApiController]
[Route("[controller]")]
public class NotificationController : ControllerBase
{
private readonly ILogger<NotificationController> _logger;
public NotificationController(ILogger<NotificationController> logger)
{
_logger = logger;
}
[HttpPost]
public IActionResult ReceiveNotification([FromBody] OrderEvent orderEvent)
{
_logger.LogInformation($"Received direct notification for order {orderEvent.OrderId} with status {orderEvent.Status}");
return Ok(new { message = "Notification received" });
}
}
Запуск сервиса выполняется командой dotnet run.
Сервис автоматически начинает прослушивание очереди при старте приложения.
Взаимодействие через REST API и RabbitMQ
Система использует два типа взаимодействия для достижения оптимальной производительности и надежности. REST API применяется для синхронных операций, где требуется немедленный ответ от сервера. Это подходит для создания новых заказов и получения информации о статусе конкретного заказа. RabbitMQ используется для асинхронной передачи событий, когда операция не требует мгновенного подтверждения и может выполняться в фоне.
При создании заказа через REST API сервис Python возвращает ID созданной записи сразу после сохранения в базу данных. Параллельно в фоне отправляется сообщение в очередь RabbitMQ о том, что заказ создан. Сервис Java на Java считывает этот порядок из базы данных, обрабатывает его и отправляет событие о завершении в ту же очередь. Сервис C# получает это событие и выполняет действие по отправке уведомления.
Такая архитектура обеспечивает развязку компонентов. Если сервис уведомлений временно недоступен, система продолжает работать, так как сообщения сохраняются в очереди до тех пор, пока потребитель не станет доступен. Аналогично, если сервис обработки заказов перегружен, он продолжает читать заказы из базы данных, но скорость обработки ограничивается его собственными ресурсами, не влияя на возможность создания новых заказов.
Для тестирования системы выполните следующие действия. Запустите Docker Compose для поднятия базы данных и брокера сообщений. Затем запустите каждый сервис отдельно в разных терминалах. Отправьте POST-запрос на endpoint Python сервиса для создания заказа. Проверьте логи всех сервисов на предмет появления сообщений. Используйте веб-интерфейс RabbitMQ для просмотра содержимого очереди и количества обработанных сообщений.
Проблемы, которые стоит решить
- Пароли и подключения (
admin/secret_password) жестко зашиты в коде. Это рискованно для реальных проектов. Можно использовать переменные окружения или секреты Docker/K8s. - Возможно, стоит оборачивать ключевые моменты в
try/catch. - Все сервисы используют одну PostgreSQL. При падении БД упадет вся система. Это анти-паттерн для микросервисов. Каждому сервису — свою БД. Связывать через события, а не через общую таблицу
- Сервисы запускаются, но нет проверок их готовности (
liveness/readiness probes). В оркестраторе (K8s/Docker) это приведет к отправке трафика в еще не готовый сервис. Желательно добавить/health(liveness) и/ready(readiness) в каждый сервис - Эндпоинт
/orders— этоv1. При изменении структуры заказа (OrderCreate) вы сломаете всех клиентов. Сразу закладывайте версионирование. Допустим,/api/v2/orders(с новыми полями). - Между сервисами потеряется контекст запроса. Трудно отследить один заказ от создания до уведомления. Добавьте Correlation ID. При создании заказа генерируйте UUID и передавайте его в БД, RabbitMQ, во все логи.
- Если отправка уведомления упадет, сообщение потеряется (оно удалено из очереди). Возможно, использовать
BasicNackсrequeue=trueвместоBasicAckдо успешной обработки.