6.05. Развёртывание модели
Развёртывание модели
Развертывание собственной модели искусственного интеллекта — это комплексный процесс, охватывающий подготовку модели, выбор инфраструктуры, настройку API, обеспечение масштабируемости и мониторинга. В отличие от использования облачных сервисов (OpenAI, Anthropic), здесь требуется полный контроль над стеком: от весов модели до сетевых интерфейсов.
Этап 1. Подготовка модели
Перед развертыванием модель должна быть:
-
Сконвертирована в подходящий формат:
GGUF— для запуска черезllama.cpp(CPU/GPU, квантование до 4-bit).safetensorsилиbin— для Hugging Face Transformers.- ONNX — для кросс-платформенного вывода.
-
Оптимизирована:
- Применено квантование (например, 4-bit через
bitsandbytesилиGGML). - Удалены ненужные компоненты (например, decoder-only модели не требуют encoder).
- При необходимости — объединены LoRA-адаптеры с базовой моделью (
merge_lora).
- Применено квантование (например, 4-bit через
-
Протестирована локально:
Запуск черезtransformers,vLLM,llama.cppилиOllamaдля проверки корректности генерации и скорости.
Этап 2. Выбор среды выполнения
Выбор движка влияет на производительность, потребление памяти и поддерживаемые функции:
| Движок | Особенности |
|---|---|
| llama.cpp | Работает на CPU и GPU (через Metal, CUDA, Vulkan); поддерживает GGUF; идеален для edge-устройств. |
| vLLM | Высокая пропускная способность за счёт PagedAttention; поддержка OpenAI-совместимого API. |
| Text Generation Inference (TGI) | От Hugging Face; оптимизирован для LLM; поддержка streaming, token streaming, watermarking. |
| Ollama | Простой CLI и REST API; удобен для локальной разработки и тестирования. |
| TensorRT-LLM | Максимальная производительность на NVIDIA GPU; требует сложной настройки. |
Для production-сред часто выбирают vLLM или TGI из-за их отказоустойчивости и совместимости с OpenAI API.
Этап 3. Контейнеризация
Модель упаковывается в Docker-образ для переносимости:
FROM python:3.11-slim
WORKDIR /app
COPY . .
RUN pip install torch transformers accelerate vllm
EXPOSE 8000
CMD ["python", "server.py"]
Файл server.py может использовать FastAPI:
from fastapi import FastAPI
from vllm import LLM, SamplingParams
app = FastAPI()
llm = LLM(model="path/to/your/model")
@app.post("/generate")
async def generate(prompt: str):
sampling_params = SamplingParams(temperature=0.2, max_tokens=256)
outputs = llm.generate([prompt], sampling_params)
return {"text": outputs[0].outputs[0].text}
Образ собирается и загружается в registry (Docker Hub, GitLab Container Registry, внутренний Harbor).
Этап 4. Развертывание в инфраструктуре
Варианты размещения:
- Локальный сервер: для закрытых систем, где данные не покидают организацию.
- Облако (AWS, Azure, GCP): использование GPU-инстансов (например,
g5.2xlargeна AWS). - Kubernetes: для автоматического масштабирования, балансировки нагрузки и управления несколькими версиями модели.
Пример манифеста Kubernetes:
apiVersion: apps/v1
kind: Deployment
metadata:
name: ai-model
spec:
replicas: 2
template:
spec:
containers:
- name: model
image: registry.example.com/ai-model:v1
ports:
- containerPort: 8000
resources:
limits:
nvidia.com/gpu: 1
---
apiVersion: v1
kind: Service
metadata:
name: ai-model-service
spec:
type: LoadBalancer
ports:
- port: 80
targetPort: 8000
selector:
app: ai-model
Этап 5. API и интеграция
Модель предоставляется через REST или gRPC API. Для совместимости с существующими клиентами часто реализуется OpenAI-совместимый интерфейс:
- Эндпоинт
/v1/chat/completions - Поддержка параметров:
model,messages,temperature,max_tokens - Streaming через SSE (Server-Sent Events)
Это позволяет использовать готовые SDK (LangChain, LlamaIndex) без модификации.
Этап 6. Мониторинг и логирование
Критически важны метрики:
- Latency: время до первого токена и полного ответа.
- Throughput: запросов в секунду.
- GPU utilization, VRAM usage.
- Ошибки: OOM, таймауты, невалидные промпты.
- Качество: доля галлюцинаций, соответствие формату (через LLM-as-a-judge).
Инструменты:
- Prometheus + Grafana — для метрик.
- ELK или Loki — для логов.
- Собственные скрипты валидации — для качества.
Этап 7. Обновление и управление версиями
- Каждая версия модели имеет свой тег (
v1.2-finetuned-crm). - Поддержка нескольких версий одновременно (A/B-тестирование).
- Автоматическое откат при падении метрик качества.
Этап 8. Безопасность
- Аутентификация: API-ключи через заголовок
Authorization. - Rate limiting: ограничение по IP или ключу.
- PII-фильтрация: middleware перед вызовом модели.
- Шифрование: TLS для трафика, шифрование данных в rest.
- Соблюдение ФЗ-152 / GDPR: данные не покидают территорию РФ, если требуется.
Интеграция развернутой ИИ-модели в Python-приложение
Интеграция модели в код требует проектирования надёжного клиентского слоя, обработки граничных случаев и соответствия архитектурным требованиям приложения. Ниже представлены проверенные подходы для промышленной эксплуатации.
Паттерны интеграции
1. Адаптер с единой точкой входа
Создайте класс-адаптер, инкапсулирующий детали взаимодействия с моделью. Это обеспечивает заменяемость бэкенда без изменения бизнес-логики.
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional, List, Dict, Any
import httpx
import backoff
@dataclass
class ModelResponse:
text: str
tokens_used: int
generation_time_ms: float
model_version: str
class AIModelAdapter(ABC):
@abstractmethod
async def generate(self, prompt: str, **kwargs) -> ModelResponse:
pass
class OpenAICompatibleAdapter(AIModelAdapter):
def __init__(
self,
base_url: str,
api_key: str,
model_name: str,
timeout: float = 30.0
):
self.client = httpx.AsyncClient(
base_url=base_url,
headers={"Authorization": f"Bearer {api_key}"},
timeout=timeout
)
self.model_name = model_name
@backoff.on_exception(
backoff.expo,
(httpx.TimeoutException, httpx.HTTPStatusError),
max_tries=3
)
async def generate(self, prompt: str, **kwargs) -> ModelResponse:
payload = {
"model": self.model_name,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": kwargs.get("max_tokens", 512),
"temperature": kwargs.get("temperature", 0.7),
"stream": False
}
response = await self.client.post("/v1/chat/completions", json=payload)
response.raise_for_status()
data = response.json()
choice = data["choices"][0]
usage = data["usage"]
return ModelResponse(
text=choice["message"]["content"],
tokens_used=usage["total_tokens"],
generation_time_ms=response.elapsed.total_seconds() * 1000,
model_version=data.get("model", self.model_name)
)
async def close(self):
await self.client.aclose()
2. Синхронный клиент для легаси-кода
Для интеграции в синхронные приложения используйте requests с таймаутами и повторными попытками через urllib3.util.retry.
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class SyncModelClient:
def __init__(self, base_url: str, api_key: str):
self.session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["POST"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
self.session.headers.update({
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
})
self.base_url = base_url
def generate(self, prompt: str, timeout: float = 15.0) -> Dict[str, Any]:
try:
response = self.session.post(
f"{self.base_url}/v1/chat/completions",
json={
"model": "custom-model",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 256
},
timeout=timeout
)
response.raise_for_status()
return response.json()["choices"][0]["message"]["content"]
except requests.exceptions.Timeout:
raise TimeoutError(f"Model inference exceeded {timeout}s")
except requests.exceptions.RequestException as e:
raise ConnectionError(f"Model API error: {e}")
Обработка ошибок и отказоустойчивость
Реализуйте стратегии для типовых сценариев отказа:
| Сценарий | Механизм обработки | Пример реализации |
|---|---|---|
| Таймаут генерации | Таймаут на уровне клиента + circuit breaker | httpx.Timeout, pybreaker |
| Перегрузка модели | Ограничение скорости на клиенте | limits + backoff |
| Некорректный промпт | Валидация до отправки | pydantic для структурированных входов |
| Отказ инстанса | Фолловер на резервный эндпоинт | Список базовых URL в конфигурации |
| Выброс памяти (OOM) | Автоматическое снижение max_tokens | Повторный запрос с уменьшенными лимитами |
Пример использования circuit breaker:
import pybreaker
class ResilientModelClient:
def __init__(self, base_url: str, api_key: str):
self.breaker = pybreaker.CircuitBreaker(
fail_max=5,
reset_timeout=60000 # 60 секунд
)
self.adapter = OpenAICompatibleAdapter(base_url, api_key, "my-model")
@pybreaker.CircuitBreakerError
def _call_model(self, prompt: str) -> ModelResponse:
return await self.adapter.generate(prompt)
async def generate_with_fallback(self, prompt: str) -> ModelResponse:
try:
return await self._call_model(prompt)
except pybreaker.CircuitBreakerError:
# Переключение на резервную модель меньшего размера
fallback = OpenAICompatibleAdapter(
base_url="https://backup.example.com",
api_key="backup-key",
model_name="my-model-small"
)
return await fallback.generate(prompt)
Асинхронная интеграция с очередями
Для высоконагруженных систем используйте асинхронную обработку через очереди (Redis, RabbitMQ):
import asyncio
import aioredis
from pydantic import BaseModel
class InferenceRequest(BaseModel):
request_id: str
prompt: str
user_id: str
priority: int = 5 # 1-10
class AsyncInferenceQueue:
def __init__(self, redis_url: str):
self.redis = aioredis.from_url(redis_url)
self.queue_key = "model:inference:queue"
async def enqueue(self, request: InferenceRequest):
payload = request.json()
# Приоритет через sorted set: score = priority
await self.redis.zadd(self.queue_key, {payload: request.priority})
async def worker(self, model_adapter: AIModelAdapter):
while True:
# Извлечение задачи с наивысшим приоритетом (минимальный score)
task = await self.redis.zpopmin(self.queue_key)
if task:
request = InferenceRequest.parse_raw(task[0][0])
try:
response = await model_adapter.generate(request.prompt)
await self._publish_result(request.request_id, response)
except Exception as e:
await self._publish_error(request.request_id, str(e))
await asyncio.sleep(0.01) # Предотвращение busy-wait
async def _publish_result(self, request_id: str, response: ModelResponse):
await self.redis.setex(
f"result:{request_id}",
3600, # TTL 1 час
response.json()
)
Кэширование семантически эквивалентных запросов
Для снижения нагрузки на модель реализуйте кэширование на основе хэша промпта:
import hashlib
from typing import Optional
class CachedModelClient:
def __init__(self, adapter: AIModelAdapter, redis_client: aioredis.Redis):
self.adapter = adapter
self.redis = redis_client
self.cache_ttl = 86400 # 24 часа
def _prompt_hash(self, prompt: str, params: Dict[str, Any]) -> str:
# Сериализация параметров для воспроизводимости хэша
param_str = "|".join(f"{k}={v}" for k, v in sorted(params.items()))
combined = f"{prompt}|{param_str}"
return hashlib.sha256(combined.encode()).hexdigest()
async def generate(self, prompt: str, **kwargs) -> ModelResponse:
cache_key = f"model:cache:{self._prompt_hash(prompt, kwargs)}"
cached = await self.redis.get(cache_key)
if cached:
return ModelResponse.parse_raw(cached)
response = await self.adapter.generate(prompt, **kwargs)
await self.redis.setex(cache_key, self.cache_ttl, response.json())
return response
Тестирование интеграции
Покройте интеграционные сценарии:
import pytest
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_model_adapter_success():
mock_response = {
"choices": [{"message": {"content": "Тестовый ответ"}}],
"usage": {"total_tokens": 42},
"model": "test-model"
}
with patch("httpx.AsyncClient.post") as mock_post:
mock_post.return_value = AsyncMock(
status_code=200,
json=lambda: mock_response,
elapsed=type("obj", (), {"total_seconds": lambda: 0.123})()
)
adapter = OpenAICompatibleAdapter(
base_url="http://test",
api_key="test",
model_name="test-model"
)
result = await adapter.generate("Тестовый промпт")
assert result.text == "Тестовый ответ"
assert result.tokens_used == 42
Рекомендации по эксплуатации
- Валидация входных данных: Перед отправкой в модель проверяйте длину промпта и кодировку символов. Для русскоязычных моделей убедитесь в поддержке UTF-8.
- Лимитирование ресурсов: Устанавливайте
max_tokensв зависимости от доступной памяти. Для 4-bit квантованной модели 7B параметров безопасный лимит — 2048 токенов на запрос. - Логирование для аудита: Сохраняйте хэши промптов (не содержимое) и метаданные запросов для анализа использования без нарушения конфиденциальности.
- Грейсфул-деградация: При недоступности модели возвращайте структурированную ошибку с кодом
503и рекомендацией повторить запрос через интервал.
Интеграция модели в код требует проектирования как клиентской, так и серверной частей с едиными контрактами. Использование адаптеров и стратегий отказоустойчивости позволяет изолировать бизнес-логику от изменений в инфраструктуре вывода модели.