Перейти к основному содержимому

Примеры решений в Telegram-бота на Python

Telegram-боты на Python — типичный пример систем, сочетающих простоту прототипирования и глубину масштабируемости. Настоящая глава демонстрирует реализации частых задач, сгруппированные по сценариям использования. Все примеры рассчитаны на python-telegram-bot ≥ 20.0, используют асинхронный стиль (async/await), и соответствуют рекомендациям библиотеки:

  • Обработчики (handlers) — чистые функции;
  • Состояния — через ContextTypes.DEFAULT_TYPE и ConversationHandler;
  • Конфигурация — вынесена из кода (переменные окружения, pydantic-settings, toml).

1. Базовый шаблон: запуск бота с минимальной архитектурой

# main.py
import os
import logging
from pathlib import Path
from telegram import Update
from telegram.ext import (
ApplicationBuilder,
CommandHandler,
ContextTypes,
)

# Настройка логирования
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)

BOT_TOKEN = os.getenv("BOT_TOKEN")
if not BOT_TOKEN:
raise ValueError("BOT_TOKEN must be set in environment")

async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Обработчик /start — стандартная точка входа."""
user = update.effective_user
await update.message.reply_html(
f"Здравствуйте, {user.mention_html()}!\n"
"Я — технический бот-помощник.\n"
"Используйте /help для получения справки."
)

async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Справка по командам."""
await update.message.reply_text(
"/start — приветствие\n"
"/help — эта справка\n"
"/ping — проверка доступности\n"
"/settings — управление настройками"
)

async def ping(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Пинг-команда — полезна для health-check'ов."""
await update.message.reply_text("✅ OK")

def main() -> None:
"""Точка входа."""
application = ApplicationBuilder().token(BOT_TOKEN).build()

application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("ping", ping))

application.run_polling(allowed_updates=Update.ALL_TYPES)

if __name__ == "__main__":
main()

Комментарий. Такой шаблон можно использовать как основу проекта. Он поддерживает CI-развёртывание и легко расширяется добавлением обработчиков. Рекомендуется выносить логику обработки в отдельные модули (handlers/, services/).


2. Опрос с валидацией (ConversationHandler)

Сценарий: бот задаёт последовательность вопросов (регистрация, анкета, заявка), сохраняет промежуточные данные, валидирует ответы.

from telegram import ReplyKeyboardMarkup, ReplyKeyboardRemove, Update
from telegram.ext import (
ContextTypes,
ConversationHandler,
MessageHandler,
filters,
)

# Этапы диалога — константы для читаемости
(NAME, AGE, EMAIL) = range(3)

async def start_survey(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
await update.message.reply_text(
"Начнём опрос.\nКак вас зовут?",
reply_markup=ReplyKeyboardRemove()
)
return NAME

async def name_received(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
name = update.message.text.strip()
if len(name) < 2:
await update.message.reply_text("Имя должно быть ≥2 символов. Повторите:")
return NAME # остаёмся на том же этапе
context.user_data["name"] = name
await update.message.reply_text("Сколько вам лет?")
return AGE

async def age_received(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
text = update.message.text.strip()
if not text.isdigit():
await update.message.reply_text("Введите возраст цифрами.")
return AGE
age = int(text)
if not (5 <= age <= 120):
await update.message.reply_text("Некорректный возраст. Укажите от 5 до 120.")
return AGE
context.user_data["age"] = age
await update.message.reply_text("Укажите email (опционально):")
return EMAIL

async def email_received(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
email = update.message.text.strip()
# Простая валидация email (для продакшена — использовать email-validator)
if email and "@" not in email:
await update.message.reply_text("Некорректный email. Повторите или отправьте '—' для пропуска.")
return EMAIL
if email != "—":
context.user_data["email"] = email

# Финал — вывод результата
summary = (
f"Результаты опроса:\n"
f"Имя: {context.user_data['name']}\n"
f"Возраст: {context.user_data['age']}"
)
if "email" in context.user_data:
summary += f"\nEmail: {context.user_data['email']}"

await update.message.reply_text(summary, reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END

async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
await update.message.reply_text("Опрос отменён.", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END

# Регистрация диалога
survey_handler = ConversationHandler(
entry_points=[CommandHandler("survey", start_survey)],
states={
NAME: [MessageHandler(filters.TEXT & ~filters.COMMAND, name_received)],
AGE: [MessageHandler(filters.TEXT & ~filters.COMMAND, age_received)],
EMAIL: [MessageHandler(filters.TEXT & ~filters.COMMAND, email_received)],
},
fallbacks=[CommandHandler("cancel", cancel)],
)

Комментарий. ConversationHandler позволяет строить сложные сценарии без глобального состояния. user_data — привязано к пользователю, chat_data — к чату, bot_data — глобально. При масштабировании рекомендуется сериализовать данные в БД (например, redis или PostgreSQL).


3. Обработка кнопок: Inline- и Reply-клавиатуры

3.1. Reply-клавиатура с опциями

async def settings_menu(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
keyboard = [
["🔔 Уведомления", "🌐 Язык"],
["🔙 Назад"]
]
reply_markup = ReplyKeyboardMarkup(
keyboard,
one_time_keyboard=True,
resize_keyboard=True
)
await update.message.reply_text("Выберите настройку:", reply_markup=reply_markup)

3.2. Inline-кнопки с callback-данными

from telegram import InlineKeyboardButton, InlineKeyboardMarkup

async def send_notification_options(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
# Храним текущие настройки в user_data или БД
enabled = context.user_data.get("notify_enabled", True)

status_text = "✅ Вкл." if enabled else "❌ Выкл."
toggle_payload = "notify_off" if enabled else "notify_on"

keyboard = [
[
InlineKeyboardButton(status_text, callback_data=toggle_payload),
InlineKeyboardButton("ℹ️ Описание", callback_data="notify_info")
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text("Настройки уведомлений:", reply_markup=reply_markup)

async def handle_notification_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
query = update.callback_query
await query.answer() # подтверждаем нажатие

match query.data:
case "notify_on":
context.user_data["notify_enabled"] = True
text = "🔔 Уведомления включены."
case "notify_off":
context.user_data["notify_enabled"] = False
text = "🔕 Уведомления отключены."
case "notify_info":
text = (
"Уведомления приходят при:\n"
"• новом сообщении от администратора\n"
"• изменении статуса заявки"
)
case _:
text = "Неизвестная команда."

await query.edit_message_text(text=text)

Комментарий. Inline-кнопки не заменяют сообщение, а редактируют его — это позволяет сохранять контекст. callback_data имеет ограничение в 64 байта (UTF-8), поэтому для сложных сценариев используйте сериализацию (например, короткие hash + маппинг в памяти/БД).


4. Обработка медиа: фото, документы, голосовые

import io
from PIL import Image

async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
photo_file = await update.message.photo[-1].get_file() # берём наибольшее разрешение
file_bytes = await photo_file.download_as_bytearray()

# Пример: анализ размера и формата
try:
image = Image.open(io.BytesIO(file_bytes))
width, height = image.size
fmt = image.format
await update.message.reply_text(
f"🖼 Получено изображение: {width}×{height} px, формат {fmt}"
)

# Пример обработки: конвертация в ч/б и отправка обратно
image_bw = image.convert("L")
output = io.BytesIO()
image_bw.save(output, format="PNG")
output.seek(0)
await update.message.reply_photo(
photo=output,
caption="Вот чёрно-белая версия."
)
except Exception as e:
logging.error("Image processing error", exc_info=e)
await update.message.reply_text("Не удалось обработать изображение.")
async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
doc = update.message.document
await update.message.reply_text(
f"📄 Получен документ:\n"
f"Имя: {doc.file_name}\n"
f"Размер: {doc.file_size // 1024} КБ\n"
f"MIME: {doc.mime_type}"
)

# Пример: проверка на PDF
if doc.mime_type == "application/pdf":
file = await doc.get_file()
# Здесь можно передать в pdfplumber, PyPDF2 и т.д.
await update.message.reply_text("PDF обнаружен — начинаю разбор...")

Важно. Telegram не гарантирует наличие mime_type для всех файлов — при критичных проверках делайте анализ по сигнатуре (магическим числам).


5. Работа с внешними API: надёжные вызовы, ретраи, кэширование

5.1. Базовый HTTP-клиент с ретраями и таймаутами

import httpx
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
reraise=True
)
async def fetch_weather(city: str) -> dict | None:
"""Получение погоды через OpenWeather API с автоматическими повторами."""
API_KEY = os.getenv("OPENWEATHER_API_KEY")
if not API_KEY:
raise ValueError("OPENWEATHER_API_KEY not set")

url = "https://api.openweathermap.org/data/2.5/weather"
params = {"q": city, "appid": API_KEY, "units": "metric", "lang": "ru"}

async with httpx.AsyncClient(timeout=10.0) as client:
try:
response = await client.get(url, params=params)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return None # город не найден — не ошибка в работе
raise # повторяем для 5xx и сетевых ошибок
except httpx.RequestError as e:
logging.warning(f"Network error fetching weather: {e}")
raise

5.2. Интеграция в обработчик команды

async def weather_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not context.args:
await update.message.reply_text("Укажите город: /weather Москва")
return

city = " ".join(context.args).strip()
try:
data = await fetch_weather(city)
if not data:
await update.message.reply_text(f"Город «{city}» не найден.")
return

temp = data["main"]["temp"]
desc = data["weather"][0]["description"].capitalize()
await update.message.reply_text(
f"🌤 В {city}: {temp:.1f}°C, {desc}."
)
except Exception as e:
logging.error("Weather fetch failed", exc_info=e)
await update.message.reply_text("Не удалось получить погоду. Повторите позже.")

5.3. Кэширование ответов (на уровне сессии или Redis)

from functools import lru_cache
from datetime import timedelta

# Простое кэширование в памяти (на 5 минут)
@lru_cache(maxsize=128)
def _cached_weather_key(city: str, ts_bucket: int) -> dict | None:
# В реальном коде — вызов fetch_weather(city), но без async
# Здесь — заглушка; для async-кэширования используйте cachetools + asyncio.Lock
pass

def get_weather_cache_key(city: str) -> tuple[str, int]:
# Группируем запросы по 5-минутным окнам
now = int(time.time() // 300)
return (city.lower(), now)

# Альтернатива — Redis + aioredis (рекомендуется для продакшена)
# import aioredis
# redis = aioredis.from_url("redis://localhost")
# cache_key = f"weather:{city}"
# cached = await redis.get(cache_key)
# if cached: return json.loads(cached)
# ...
# await redis.setex(cache_key, 300, json.dumps(data))

Комментарий. Не кэшируйте персонализированные или чувствительные данные. Для высоконагруженных ботов используйте Redis или memcached с TTL и стратегией cache-aside.


6. Администрирование: команды для модераторов

6.1. Проверка прав доступа через декоратор

from functools import wraps

ADMIN_IDS = {int(x) for x in os.getenv("ADMIN_CHAT_IDS", "").split(",") if x}

def admin_only(func):
@wraps(func)
async def wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
if user_id not in ADMIN_IDS:
logging.warning(f"Unauthorized admin access attempt by {user_id}")
await update.message.reply_text("⚠️ Запрещено.")
return
return await func(update, context)
return wrapper

@admin_only
async def stats_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
# Пример сбора метрик
total_users = await db.fetchval("SELECT COUNT(*) FROM users")
active_last_week = await db.fetchval(
"SELECT COUNT(*) FROM users WHERE last_seen > NOW() - INTERVAL '7 days'"
)
await update.message.reply_text(
f"📊 Статистика:\n"
f"Всего пользователей: {total_users}\n"
f"Активно за неделю: {active_last_week}"
)

@admin_only
async def broadcast_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not context.args:
await update.message.reply_text("Текст рассылки: /broadcast Привет всем!")
return

text = " ".join(context.args)
user_ids = await db.fetch("SELECT user_id FROM users WHERE is_subscribed = true")

sent, failed = 0, 0
for row in user_ids:
try:
await context.bot.send_message(chat_id=row["user_id"], text=text)
sent += 1
await asyncio.sleep(0.05) # rate-limit ~20 msg/s
except Exception as e:
logging.warning(f"Failed to send to {row['user_id']}: {e}")
failed += 1

await update.message.reply_text(f"✉️ Рассылка завершена: {sent} отправлено, {failed} ошибок.")

6.2. Бан/разбан с логированием

@admin_only
async def ban_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not context.args or len(context.args) < 2:
await update.message.reply_text("/ban <user_id> <причина>")
return

try:
user_id = int(context.args[0])
reason = " ".join(context.args[1:])
except ValueError:
await update.message.reply_text("ID должен быть числом.")
return

# Заносим в БД (или в Redis-сет)
await db.execute(
"INSERT INTO bans (user_id, reason, banned_by, created_at) "
"VALUES ($1, $2, $3, NOW()) ON CONFLICT DO NOTHING",
user_id, reason, update.effective_user.id
)

# Опционально: отправляем уведомление пользователю
try:
await context.bot.send_message(
chat_id=user_id,
text=f"⛔ Вы заблокированы.\nПричина: {reason}"
)
except Exception:
pass # пользователь мог удалить бота

await update.message.reply_text(f"✅ Пользователь {user_id} заблокирован.")

Комментарий. Для избежания flood control используйте очередь (asyncio.Queue) и фоновый воркер. Telegram допускает ~30 сообщений/сек на бота, но лучше ограничиться ~15.


7. Планирование задач: APScheduler + асинхронность

Сценарии: ежедневная рассылка, очистка временных данных, health-check.

7.1. Настройка асинхронного планировщика

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger

scheduler = AsyncIOScheduler(timezone="Europe/Moscow")

async def daily_digest(context: ContextTypes.DEFAULT_TYPE) -> None:
"""Ежедневная рассылка в 09:00 МСК."""
users = await db.fetch("SELECT user_id FROM subscriptions WHERE daily_digest = true")
for user in users:
try:
await context.bot.send_message(
chat_id=user["user_id"],
text="🌅 Доброе утро!\nВаши задачи на сегодня: ...\n(реализация — в модуле business_logic)"
)
await asyncio.sleep(0.03)
except Exception as e:
logging.warning(f"Digest failed for {user['user_id']}: {e}")

# Регистрация задачи при старте приложения
def setup_scheduled_jobs(application: Application) -> None:
scheduler.add_job(
daily_digest,
CronTrigger(hour=9, minute=0),
args=[application.bot],
id="daily_digest",
replace_existing=True
)
scheduler.start()
# Graceful shutdown
import atexit
atexit.register(lambda: scheduler.shutdown(wait=False))

Замечание. APScheduler не native-async, но AsyncIOScheduler интегрируется с asyncio. Избегайте блокирующих операций внутри задач.


8. Тестирование: модульные и интеграционные тесты

8.1. Модульный тест обработчика (pytest + pytest-asyncio)

# test_handlers.py
import pytest
from telegram import Update, User, Message, Chat
from telegram.ext import ContextTypes
from unittest.mock import AsyncMock

@pytest.mark.asyncio
async def test_start_handler():
# Подготовка моков
update = AsyncMock(spec=Update)
context = AsyncMock(spec=ContextTypes.DEFAULT_TYPE)

# Эмуляция входящих данных
update.effective_user = User(id=123, first_name="Тест", is_bot=False)
update.message = AsyncMock(spec=Message)
update.message.reply_html = AsyncMock()

# Вызов тестируемого кода
from handlers import start
await start(update, context)

# Проверка
update.message.reply_html.assert_called_once()
args, kwargs = update.message.reply_html.call_args
assert "Тест" in args[0]

8.2. Интеграционный тест с telegram-test (или эмуляцией через Mock)

Для полноценного интеграционного теста рекомендуется:

  • Запускать бота в polling-режиме на localhost;
  • Использовать requests или httpx для отправки фейковых update через локальный вебхук (/setWebhook?url=http://localhost:8080 в тестовом режиме);
  • Либо использовать pytest-telegram-bot-mock (сторонняя утилита, эмулирующая API Telegram в памяти).

9. Безопасность: защита от атак и ошибок

9.1. Rate-limiting на уровне обработчика

from collections import defaultdict
import time

# In-memory rate limiter (замените на Redis для масштаба)
user_last_call = defaultdict(float)
RATE_LIMIT_WINDOW = 3.0 # секунд между вызовами

def rate_limited(func):
@wraps(func)
async def wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
now = time.time()
if now - user_last_call[user_id] < RATE_LIMIT_WINDOW:
await update.message.reply_text("⏳ Подождите немного.")
return
user_last_call[user_id] = now
return await func(update, context)
return wrapper

@rate_limited
async def heavy_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
# ... тяжёлая логика
pass

9.2. Санитизация входных данных

import html
import re

def sanitize_input(text: str) -> str:
"""Удаляет потенциально опасные символы и ограничивает длину."""
# Ограничение длины — 1024 символа
text = text[:1024]
# Экранирование HTML (если отправляете в reply_html)
text = html.escape(text, quote=False)
# Удаление управляющих символов (кроме \n, \t)
text = re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]", "", text)
return text

# Применение:
# clean_text = sanitize_input(update.message.text)

9.3. Проверка webhook-подписи (при использовании)

import hmac
import hashlib

WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET")

async def webhook_handler(request):
signature = request.headers.get("X-Telegram-Bot-Api-Secret-Token")
if not hmac.compare_digest(signature or "", WEBHOOK_SECRET):
return web.Response(status=403)

update = Update.de_json(await request.json(), bot)
await dp.process_update(update)
return web.Response(status=200)

10. Деплой и мониторинг

10.1. Docker-образ с многоступенчатой сборкой

Dockerfile

# Этап 1: сборка зависимостей
FROM python:3.11-slim AS builder

WORKDIR /app
COPY requirements.txt .
RUN pip install --user --no-cache-dir --upgrade pip \
&& pip install --user --no-cache-dir -r requirements.txt

# Этап 2: runtime
FROM python:3.11-slim

# Минимизация слоя: копируем только установленные пакеты и код
COPY --from=builder /root/.local /root/.local
COPY . /app

WORKDIR /app

# Безопасность: непривилегированный пользователь
RUN useradd --create-home --shell /bin/bash appuser \
&& chown -R appuser:appuser /app \
&& chmod -R 755 /app
USER appuser

ENV PATH=/root/.local/bin:$PATH

# Health-check (для оркестраторов)
HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \
CMD python -c "import httpx; httpx.get('http://localhost:8081/health', timeout=3)"

# Веб-сервер для health-check (если polling)
EXPOSE 8081

CMD ["python", "main.py"]

requirements.txt (рекомендованный минимальный набор)

python-telegram-bot[httpx]==20.7
httpx==0.27.0
tenacity==9.0.0
APScheduler==3.10.4
asyncpg==0.29.0
pydantic-settings==2.5.2
structlog==24.1.0
prometheus-client==0.20.0

10.2. Health-check endpoint (для Kubernetes / Docker Swarm)

# health_check.py
from http.server import HTTPServer, BaseHTTPRequestHandler
import threading
import os

HEALTH_PORT = int(os.getenv("HEALTH_PORT", "8081"))

class HealthHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/health":
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(b'{"status":"ok"}')
else:
self.send_response(404)
self.end_headers()

def start_health_server():
server = HTTPServer(("0.0.0.0", HEALTH_PORT), HealthHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
return server

В main.py:

if __name__ == "__main__":
# Запуск health-check в фоне (до polling)
health_server = start_health_server()
logging.info(f"Health server started on :{HEALTH_PORT}")

try:
application.run_polling(allowed_updates=Update.ALL_TYPES)
finally:
health_server.shutdown()

10.3. Метрики в Prometheus

from prometheus_client import Counter, Histogram, start_http_server

# Инициализация
start_http_server(8000) # отдельный порт для метрик

BOT_REQUESTS_TOTAL = Counter(
"bot_requests_total",
"Общее число обработанных команд",
["command"]
)
BOT_REQUEST_DURATION = Histogram(
"bot_request_duration_seconds",
"Время обработки команды"
)

def instrument_command(command_name: str):
def decorator(func):
@wraps(func)
async def wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE):
BOT_REQUESTS_TOTAL.labels(command=command_name).inc()
with BOT_REQUEST_DURATION.time():
return await func(update, context)
return wrapper
return decorator

# Применение:
@instrument_command("start")
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
# ...

Примечание. Для Grafana используйте дашборд «Telegram Bot Metrics» (шаблон: Grafana ID 14945).


11. Интернационализация (i18n)

11.1. Архитектура на базе gettext

Структура проекта:

locales/
├── en/
│ └── LC_MESSAGES/
│ ├── bot.mo
│ └── bot.po
├── ru/
│ └── LC_MESSAGES/
│ ├── bot.mo
│ └── bot.po
└── __init__.py

Генерация .pot и .po:

# Извлечение строк из кода
xgettext --language=Python --keyword=_ --output=locales/bot.pot main.py handlers/*.py

# Создание .po для языков
msginit --input=locales/bot.pot --locale=ru_RU --output=locales/ru/LC_MESSAGES/bot.po
msginit --input=locales/bot.pot --locale=en_US --output=locales/en/LC_MESSAGES/bot.po

# После перевода — компиляция в .mo
msgfmt locales/ru/LC_MESSAGES/bot.po -o locales/ru/LC_MESSAGES/bot.mo

11.2. Контекстно-зависимое форматирование

import gettext
from pathlib import Path

LOCALE_DIR = Path(__file__).parent / "locales"

def get_translator(lang_code: str = "ru") -> gettext.GNUTranslations:
try:
return gettext.translation("bot", localedir=LOCALE_DIR, languages=[lang_code])
except FileNotFoundError:
return gettext.NullTranslations() # fallback: no-op

async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_lang = update.effective_user.language_code or "ru"
_ = get_translator(user_lang[:2]).gettext # ru, en

await update.message.reply_text(
_("Здравствуйте! Это бот на {lang} языке.").format(lang=user_lang)
)

Альтернатива. Для динамических переводов (без пересборки) — храните строки в БД или JSON-файлах и кэшируйте в lru_cache.


12. Работа с базами данных

12.1. Асинхронное подключение через asyncpg

# db.py
import asyncpg
from pydantic_settings import BaseSettings

class DBSettings(BaseSettings):
DB_HOST: str = "localhost"
DB_PORT: int = 5432
DB_NAME: str
DB_USER: str
DB_PASS: str

@property
def dsn(self) -> str:
return f"postgresql://{self.DB_USER}:{self.DB_PASS}@{self.DB_HOST}:{self.DB_PORT}/{self.DB_NAME}"

settings = DBSettings()

async def init_db() -> asyncpg.Pool:
return await asyncpg.create_pool(
dsn=settings.dsn,
min_size=1,
max_size=10,
command_timeout=60,
init=setup_connection # см. ниже
)

async def setup_connection(conn: asyncpg.Connection) -> None:
# Устанавливаем часовой пояс и кодировку
await conn.execute("SET TIME ZONE 'Europe/Moscow';")
await conn.execute("SET CLIENT_ENCODING TO 'UTF8';")

12.2. Безопасное выполнение с транзакциями

from contextlib import asynccontextmanager

@asynccontextmanager
async def transaction(conn: asyncpg.Connection):
tr = conn.transaction()
await tr.start()
try:
yield conn
await tr.commit()
except Exception:
await tr.rollback()
raise

# Пример использования
async def create_user_if_not_exists(conn: asyncpg.Connection, user_id: int, name: str) -> bool:
async with transaction(conn):
row = await conn.fetchrow(
"""
INSERT INTO users (user_id, name, created_at)
VALUES ($1, $2, NOW())
ON CONFLICT (user_id) DO NOTHING
RETURNING user_id
""",
user_id, name
)
return row is not None

12.3. Миграции через alembic (асинхронная конфигурация)

alembic.ini:

[alembic]
script_location = alembic
sqlalchemy.url = driver://user:pass@localhost/dbname

[post_write_hooks]

alembic/env.py (фрагмент для async):

from sqlalchemy.ext.asyncio import create_async_engine
from alembic import context

def run_migrations_online():
connectable = create_async_engine(
config.get_main_option("sqlalchemy.url"),
poolclass=NullPool,
)

async def run_async_migrations():
async with connectable.connect() as connection:
await connection.run_sync(
lambda sync_conn: context.configure(
connection=sync_conn,
target_metadata=target_metadata,
compare_type=True,
)
)
await connection.run_sync(lambda sync_conn: context.run_migrations())

import asyncio
asyncio.run(run_async_migrations())

13. Гибридные сценарии: бот + веб-интерфейс

Цель: бот обрабатывает интерактив, веб — настройки, аналитика, ручное управление.

13.1. FastAPI-эндпоинт для уведомлений администратора

# web.py
from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel

app = FastAPI()

class NotificationRequest(BaseModel):
user_id: int
text: str
silent: bool = False

async def get_admin_token(token: str):
if token != os.getenv("ADMIN_API_TOKEN"):
raise HTTPException(status_code=403, detail="Invalid token")
return True

@app.post("/notify", dependencies=[Depends(get_admin_token)])
async def send_notification(req: NotificationRequest):
try:
await bot.send_message(
chat_id=req.user_id,
text=req.text,
disable_notification=req.silent
)
return {"status": "sent"}
except Exception as e:
logging.error(f"Web notify failed: {e}")
raise HTTPException(status_code=500, detail=str(e))

13.2. Запуск FastAPI и бота в одном процессе (для dev)

# main.py
import threading
import uvicorn

def run_web_server():
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="warning")

if __name__ == "__main__":
# Запуск веб-сервера в фоновом потоке
web_thread = threading.Thread(target=run_web_server, daemon=True)
web_thread.start()

# Основной цикл — бот
application.run_polling()

Рекомендация. В продакшене разделяйте сервисы: бот — в pod, веб — в другом, общение через Redis Pub/Sub или RabbitMQ.


14. Централизованная обработка ошибок

14.1. Глобальный exception handler

async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
exc = context.error
logging.error("Exception while handling an update", exc_info=exc)

# Отправка уведомления администратору (только для Update-событий)
if isinstance(update, Update) and update.effective_chat:
try:
await context.bot.send_message(
chat_id=update.effective_chat.id,
text="⚠️ Произошла внутренняя ошибка. Администратор уведомлён."
)
except Exception:
pass # не ломаем обработчик ошибок при ошибках отправки

# Интеграция с Sentry
if "sentry_sdk" in globals():
sentry_sdk.capture_exception(exc)

# Регистрация
application.add_error_handler(error_handler)

14.2. Контекстное логгирование с structlog

import structlog

structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)

logger = structlog.get_logger()

# Пример использования в обработчике:
async def some_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
logger.info(
"user_action",
user_id=update.effective_user.id,
chat_type=update.effective_chat.type,
action="button_click",
payload=context.user_data.get("last_choice")
)

15. Оптимизация и профилирование

15.1. Профилирование «на лету» через команду /profile

import cProfile
import pstats
import io

@admin_only
async def profile_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not context.args:
await update.message.reply_text("/profile <команда> [аргументы]")
return

cmd_name = context.args[0]
handler = application.handlers_map.get(cmd_name)
if not handler:
await update.message.reply_text("Команда не найдена.")
return

# Запуск с профилированием (только для демонстрации — не в прод!)
pr = cProfile.Profile()
pr.enable()

# Имитация вызова — в реальности вызовите нужную функцию
# await handler(update, context)

pr.disable()
s = io.StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats("cumulative")
ps.print_stats(10) # топ-10

output = s.getvalue()[:4000] # укладываемся в лимит Telegram
await update.message.reply_text(f"```\n{output}\n```", parse_mode="MarkdownV2")

15.2. Кэширование тяжёлых вычислений

from cachetools import TTLCache, cached

# TTL = 5 минут, макс. 1000 записей
calc_cache = TTLCache(maxsize=1000, ttl=300)

@cached(calc_cache)
def heavy_calculation(param: str) -> str:
# Например, парсинг XML, вычисление хэша, ML-инференс
time.sleep(0.5) # имитация
return f"result_for_{param}"