Примеры решений в Telegram-бота на Python
Telegram-боты на Python — типичный пример систем, сочетающих простоту прототипирования и глубину масштабируемости. Настоящая глава демонстрирует реализации частых задач, сгруппированные по сценариям использования. Все примеры рассчитаны на python-telegram-bot ≥ 20.0, используют асинхронный стиль (async/await), и соответствуют рекомендациям библиотеки:
- Обработчики (
handlers) — чистые функции; - Состояния — через
ContextTypes.DEFAULT_TYPEиConversationHandler; - Конфигурация — вынесена из кода (переменные окружения,
pydantic-settings,toml).
1. Базовый шаблон: запуск бота с минимальной архитектурой
# main.py
import os
import logging
from pathlib import Path
from telegram import Update
from telegram.ext import (
ApplicationBuilder,
CommandHandler,
ContextTypes,
)
# Настройка логирования
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
BOT_TOKEN = os.getenv("BOT_TOKEN")
if not BOT_TOKEN:
raise ValueError("BOT_TOKEN must be set in environment")
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Обработчик /start — стандартная точка входа."""
user = update.effective_user
await update.message.reply_html(
f"Здравствуйте, {user.mention_html()}!\n"
"Я — технический бот-помощник.\n"
"Используйте /help для получения справки."
)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Справка по командам."""
await update.message.reply_text(
"/start — приветствие\n"
"/help — эта справка\n"
"/ping — проверка доступности\n"
"/settings — управление настройками"
)
async def ping(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Пинг-команда — полезна для health-check'ов."""
await update.message.reply_text("✅ OK")
def main() -> None:
"""Точка входа."""
application = ApplicationBuilder().token(BOT_TOKEN).build()
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("ping", ping))
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
main()
Комментарий. Такой шаблон можно использовать как основу проекта. Он поддерживает CI-развёртывание и легко расширяется добавлением обработчиков. Рекомендуется выносить логику обработки в отдельные модули (
handlers/,services/).
2. Опрос с валидацией (ConversationHandler)
Сценарий: бот задаёт последовательность вопросов (регистрация, анкета, заявка), сохраняет промежуточные данные, валидирует ответы.
from telegram import ReplyKeyboardMarkup, ReplyKeyboardRemove, Update
from telegram.ext import (
ContextTypes,
ConversationHandler,
MessageHandler,
filters,
)
# Этапы диалога — константы для читаемости
(NAME, AGE, EMAIL) = range(3)
async def start_survey(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
await update.message.reply_text(
"Начнём опрос.\nКак вас зовут?",
reply_markup=ReplyKeyboardRemove()
)
return NAME
async def name_received(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
name = update.message.text.strip()
if len(name) < 2:
await update.message.reply_text("Имя должно быть ≥2 символов. Повторите:")
return NAME # остаёмся на том же этапе
context.user_data["name"] = name
await update.message.reply_text("Сколько вам лет?")
return AGE
async def age_received(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
text = update.message.text.strip()
if not text.isdigit():
await update.message.reply_text("Введите возраст цифрами.")
return AGE
age = int(text)
if not (5 <= age <= 120):
await update.message.reply_text("Некорректный возраст. Укажите от 5 до 120.")
return AGE
context.user_data["age"] = age
await update.message.reply_text("Укажите email (опционально):")
return EMAIL
async def email_received(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
email = update.message.text.strip()
# Простая валидация email (для продакшена — использовать email-validator)
if email and "@" not in email:
await update.message.reply_text("Некорректный email. Повторите или отправьте '—' для пропуска.")
return EMAIL
if email != "—":
context.user_data["email"] = email
# Финал — вывод результата
summary = (
f"Результаты опроса:\n"
f"Имя: {context.user_data['name']}\n"
f"Возраст: {context.user_data['age']}"
)
if "email" in context.user_data:
summary += f"\nEmail: {context.user_data['email']}"
await update.message.reply_text(summary, reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
await update.message.reply_text("Опрос отменён.", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
# Регистрация диалога
survey_handler = ConversationHandler(
entry_points=[CommandHandler("survey", start_survey)],
states={
NAME: [MessageHandler(filters.TEXT & ~filters.COMMAND, name_received)],
AGE: [MessageHandler(filters.TEXT & ~filters.COMMAND, age_received)],
EMAIL: [MessageHandler(filters.TEXT & ~filters.COMMAND, email_received)],
},
fallbacks=[CommandHandler("cancel", cancel)],
)
Комментарий.
ConversationHandlerпозволяет строить сложные сценарии без глобального состояния.user_data— привязано к пользователю,chat_data— к чату,bot_data— глобально. При масштабировании рекомендуется сериализовать данные в БД (например,redisилиPostgreSQL).
3. Обработка кнопок: Inline- и Reply-клавиатуры
3.1. Reply-клавиатура с опциями
async def settings_menu(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
keyboard = [
["🔔 Уведомления", "🌐 Язык"],
["🔙 Назад"]
]
reply_markup = ReplyKeyboardMarkup(
keyboard,
one_time_keyboard=True,
resize_keyboard=True
)
await update.message.reply_text("Выберите настройку:", reply_markup=reply_markup)
3.2. Inline-кнопки с callback-данными
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
async def send_notification_options(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
# Храним текущие настройки в user_data или БД
enabled = context.user_data.get("notify_enabled", True)
status_text = "✅ Вкл." if enabled else "❌ Выкл."
toggle_payload = "notify_off" if enabled else "notify_on"
keyboard = [
[
InlineKeyboardButton(status_text, callback_data=toggle_payload),
InlineKeyboardButton("ℹ️ Описание", callback_data="notify_info")
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text("Настройки уведомлений:", reply_markup=reply_markup)
async def handle_notification_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
query = update.callback_query
await query.answer() # подтверждаем нажатие
match query.data:
case "notify_on":
context.user_data["notify_enabled"] = True
text = "🔔 Уведомления включены."
case "notify_off":
context.user_data["notify_enabled"] = False
text = "🔕 Уведомления отключены."
case "notify_info":
text = (
"Уведомления приходят при:\n"
"• новом сообщении от администратора\n"
"• изменении статуса заявки"
)
case _:
text = "Неизвестная команда."
await query.edit_message_text(text=text)
Комментарий. Inline-кнопки не заменяют сообщение, а редактируют его — это позволяет сохранять контекст.
callback_dataимеет ограничение в 64 байта (UTF-8), поэтому для сложных сценариев используйте сериализацию (например, короткие hash + маппинг в памяти/БД).
4. Обработка медиа: фото, документы, голосовые
import io
from PIL import Image
async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
photo_file = await update.message.photo[-1].get_file() # берём наибольшее разрешение
file_bytes = await photo_file.download_as_bytearray()
# Пример: анализ размера и формата
try:
image = Image.open(io.BytesIO(file_bytes))
width, height = image.size
fmt = image.format
await update.message.reply_text(
f"🖼 Получено изображение: {width}×{height} px, формат {fmt}"
)
# Пример обработки: конвертация в ч/б и отправка обратно
image_bw = image.convert("L")
output = io.BytesIO()
image_bw.save(output, format="PNG")
output.seek(0)
await update.message.reply_photo(
photo=output,
caption="Вот чёрно-белая версия."
)
except Exception as e:
logging.error("Image processing error", exc_info=e)
await update.message.reply_text("Не удалось обработать изображение.")
async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
doc = update.message.document
await update.message.reply_text(
f"📄 Получен документ:\n"
f"Имя: {doc.file_name}\n"
f"Размер: {doc.file_size // 1024} КБ\n"
f"MIME: {doc.mime_type}"
)
# Пример: проверка на PDF
if doc.mime_type == "application/pdf":
file = await doc.get_file()
# Здесь можно передать в pdfplumber, PyPDF2 и т.д.
await update.message.reply_text("PDF обнаружен — начинаю разбор...")
Важно. Telegram не гарантирует наличие
mime_typeдля всех файлов — при критичных проверках делайте анализ по сигнатуре (магическим числам).
5. Работа с внешними API: надёжные вызовы, ретраи, кэширование
5.1. Базовый HTTP-клиент с ретраями и таймаутами
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
reraise=True
)
async def fetch_weather(city: str) -> dict | None:
"""Получение погоды через OpenWeather API с автоматическими повторами."""
API_KEY = os.getenv("OPENWEATHER_API_KEY")
if not API_KEY:
raise ValueError("OPENWEATHER_API_KEY not set")
url = "https://api.openweathermap.org/data/2.5/weather"
params = {"q": city, "appid": API_KEY, "units": "metric", "lang": "ru"}
async with httpx.AsyncClient(timeout=10.0) as client:
try:
response = await client.get(url, params=params)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return None # город не найден — не ошибка в работе
raise # повторяем для 5xx и сетевых ошибок
except httpx.RequestError as e:
logging.warning(f"Network error fetching weather: {e}")
raise
5.2. Интеграция в обработчик команды
async def weather_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not context.args:
await update.message.reply_text("Укажите город: /weather Москва")
return
city = " ".join(context.args).strip()
try:
data = await fetch_weather(city)
if not data:
await update.message.reply_text(f"Город «{city}» не найден.")
return
temp = data["main"]["temp"]
desc = data["weather"][0]["description"].capitalize()
await update.message.reply_text(
f"🌤 В {city}: {temp:.1f}°C, {desc}."
)
except Exception as e:
logging.error("Weather fetch failed", exc_info=e)
await update.message.reply_text("Не удалось получить погоду. Повторите позже.")
5.3. Кэширование ответов (на уровне сессии или Redis)
from functools import lru_cache
from datetime import timedelta
# Простое кэширование в памяти (на 5 минут)
@lru_cache(maxsize=128)
def _cached_weather_key(city: str, ts_bucket: int) -> dict | None:
# В реальном коде — вызов fetch_weather(city), но без async
# Здесь — заглушка; для async-кэширования используйте cachetools + asyncio.Lock
pass
def get_weather_cache_key(city: str) -> tuple[str, int]:
# Группируем запросы по 5-минутным окнам
now = int(time.time() // 300)
return (city.lower(), now)
# Альтернатива — Redis + aioredis (рекомендуется для продакшена)
# import aioredis
# redis = aioredis.from_url("redis://localhost")
# cache_key = f"weather:{city}"
# cached = await redis.get(cache_key)
# if cached: return json.loads(cached)
# ...
# await redis.setex(cache_key, 300, json.dumps(data))
Комментарий. Не кэшируйте персонализированные или чувствительные данные. Для высоконагруженных ботов используйте
Redisилиmemcachedс TTL и стратегиейcache-aside.
6. Администрирование: команды для модераторов
6.1. Проверка прав доступа через декоратор
from functools import wraps
ADMIN_IDS = {int(x) for x in os.getenv("ADMIN_CHAT_IDS", "").split(",") if x}
def admin_only(func):
@wraps(func)
async def wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
if user_id not in ADMIN_IDS:
logging.warning(f"Unauthorized admin access attempt by {user_id}")
await update.message.reply_text("⚠️ Запрещено.")
return
return await func(update, context)
return wrapper
@admin_only
async def stats_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
# Пример сбора метрик
total_users = await db.fetchval("SELECT COUNT(*) FROM users")
active_last_week = await db.fetchval(
"SELECT COUNT(*) FROM users WHERE last_seen > NOW() - INTERVAL '7 days'"
)
await update.message.reply_text(
f"📊 Статистика:\n"
f"Всего пользователей: {total_users}\n"
f"Активно за неделю: {active_last_week}"
)
@admin_only
async def broadcast_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not context.args:
await update.message.reply_text("Текст рассылки: /broadcast Привет всем!")
return
text = " ".join(context.args)
user_ids = await db.fetch("SELECT user_id FROM users WHERE is_subscribed = true")
sent, failed = 0, 0
for row in user_ids:
try:
await context.bot.send_message(chat_id=row["user_id"], text=text)
sent += 1
await asyncio.sleep(0.05) # rate-limit ~20 msg/s
except Exception as e:
logging.warning(f"Failed to send to {row['user_id']}: {e}")
failed += 1
await update.message.reply_text(f"✉️ Рассылка завершена: {sent} отправлено, {failed} ошибок.")
6.2. Бан/разбан с логированием
@admin_only
async def ban_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not context.args or len(context.args) < 2:
await update.message.reply_text("/ban <user_id> <причина>")
return
try:
user_id = int(context.args[0])
reason = " ".join(context.args[1:])
except ValueError:
await update.message.reply_text("ID должен быть числом.")
return
# Заносим в БД (или в Redis-сет)
await db.execute(
"INSERT INTO bans (user_id, reason, banned_by, created_at) "
"VALUES ($1, $2, $3, NOW()) ON CONFLICT DO NOTHING",
user_id, reason, update.effective_user.id
)
# Опционально: отправляем уведомление пользователю
try:
await context.bot.send_message(
chat_id=user_id,
text=f"⛔ Вы заблокированы.\nПричина: {reason}"
)
except Exception:
pass # пользователь мог удалить бота
await update.message.reply_text(f"✅ Пользователь {user_id} заблокирован.")
Комментарий. Для избежания
flood controlиспользуйте очередь (asyncio.Queue) и фоновый воркер. Telegram допускает ~30 сообщений/сек на бота, но лучше ограничиться ~15.
7. Планирование задач: APScheduler + асинхронность
Сценарии: ежедневная рассылка, очистка временных данных, health-check.
7.1. Настройка асинхронного планировщика
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
scheduler = AsyncIOScheduler(timezone="Europe/Moscow")
async def daily_digest(context: ContextTypes.DEFAULT_TYPE) -> None:
"""Ежедневная рассылка в 09:00 МСК."""
users = await db.fetch("SELECT user_id FROM subscriptions WHERE daily_digest = true")
for user in users:
try:
await context.bot.send_message(
chat_id=user["user_id"],
text="🌅 Доброе утро!\nВаши задачи на сегодня: ...\n(реализация — в модуле business_logic)"
)
await asyncio.sleep(0.03)
except Exception as e:
logging.warning(f"Digest failed for {user['user_id']}: {e}")
# Регистрация задачи при старте приложения
def setup_scheduled_jobs(application: Application) -> None:
scheduler.add_job(
daily_digest,
CronTrigger(hour=9, minute=0),
args=[application.bot],
id="daily_digest",
replace_existing=True
)
scheduler.start()
# Graceful shutdown
import atexit
atexit.register(lambda: scheduler.shutdown(wait=False))
Замечание. APScheduler не native-async, но
AsyncIOSchedulerинтегрируется сasyncio. Избегайте блокирующих операций внутри задач.
8. Тестирование: модульные и интеграционные тесты
8.1. Модульный тест обработчика (pytest + pytest-asyncio)
# test_handlers.py
import pytest
from telegram import Update, User, Message, Chat
from telegram.ext import ContextTypes
from unittest.mock import AsyncMock
@pytest.mark.asyncio
async def test_start_handler():
# Подготовка моков
update = AsyncMock(spec=Update)
context = AsyncMock(spec=ContextTypes.DEFAULT_TYPE)
# Эмуляция входящих данных
update.effective_user = User(id=123, first_name="Тест", is_bot=False)
update.message = AsyncMock(spec=Message)
update.message.reply_html = AsyncMock()
# Вызов тестируемого кода
from handlers import start
await start(update, context)
# Проверка
update.message.reply_html.assert_called_once()
args, kwargs = update.message.reply_html.call_args
assert "Тест" in args[0]
8.2. Интеграционный тест с telegram-test (или эмуляцией через Mock)
Для полноценного интеграционного теста рекомендуется:
- Запускать бота в
polling-режиме наlocalhost; - Использовать
requestsилиhttpxдля отправки фейковых update через локальный вебхук (/setWebhook?url=http://localhost:8080в тестовом режиме); - Либо использовать
pytest-telegram-bot-mock(сторонняя утилита, эмулирующая API Telegram в памяти).
9. Безопасность: защита от атак и ошибок
9.1. Rate-limiting на уровне обработчика
from collections import defaultdict
import time
# In-memory rate limiter (замените на Redis для масштаба)
user_last_call = defaultdict(float)
RATE_LIMIT_WINDOW = 3.0 # секунд между вызовами
def rate_limited(func):
@wraps(func)
async def wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
now = time.time()
if now - user_last_call[user_id] < RATE_LIMIT_WINDOW:
await update.message.reply_text("⏳ Подождите немного.")
return
user_last_call[user_id] = now
return await func(update, context)
return wrapper
@rate_limited
async def heavy_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
# ... тяжёлая логика
pass
9.2. Санитизация входных данных
import html
import re
def sanitize_input(text: str) -> str:
"""Удаляет потенциально опасные символы и ограничивает длину."""
# Ограничение длины — 1024 символа
text = text[:1024]
# Экранирование HTML (если отправляете в reply_html)
text = html.escape(text, quote=False)
# Удаление управляющих символов (кроме \n, \t)
text = re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]", "", text)
return text
# Применение:
# clean_text = sanitize_input(update.message.text)
9.3. Проверка webhook-подписи (при использовании)
import hmac
import hashlib
WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET")
async def webhook_handler(request):
signature = request.headers.get("X-Telegram-Bot-Api-Secret-Token")
if not hmac.compare_digest(signature or "", WEBHOOK_SECRET):
return web.Response(status=403)
update = Update.de_json(await request.json(), bot)
await dp.process_update(update)
return web.Response(status=200)
10. Деплой и мониторинг
10.1. Docker-образ с многоступенчатой сборкой
Dockerfile
# Этап 1: сборка зависимостей
FROM python:3.11-slim AS builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --user --no-cache-dir --upgrade pip \
&& pip install --user --no-cache-dir -r requirements.txt
# Этап 2: runtime
FROM python:3.11-slim
# Минимизация слоя: копируем только установленные пакеты и код
COPY --from=builder /root/.local /root/.local
COPY . /app
WORKDIR /app
# Безопасность: непривилегированный пользователь
RUN useradd --create-home --shell /bin/bash appuser \
&& chown -R appuser:appuser /app \
&& chmod -R 755 /app
USER appuser
ENV PATH=/root/.local/bin:$PATH
# Health-check (для оркестраторов)
HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \
CMD python -c "import httpx; httpx.get('http://localhost:8081/health', timeout=3)"
# Веб-сервер для health-check (если polling)
EXPOSE 8081
CMD ["python", "main.py"]
requirements.txt (рекомендованный минимальный набор)
python-telegram-bot[httpx]==20.7
httpx==0.27.0
tenacity==9.0.0
APScheduler==3.10.4
asyncpg==0.29.0
pydantic-settings==2.5.2
structlog==24.1.0
prometheus-client==0.20.0
10.2. Health-check endpoint (для Kubernetes / Docker Swarm)
# health_check.py
from http.server import HTTPServer, BaseHTTPRequestHandler
import threading
import os
HEALTH_PORT = int(os.getenv("HEALTH_PORT", "8081"))
class HealthHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/health":
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(b'{"status":"ok"}')
else:
self.send_response(404)
self.end_headers()
def start_health_server():
server = HTTPServer(("0.0.0.0", HEALTH_PORT), HealthHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
return server
В main.py:
if __name__ == "__main__":
# Запуск health-check в фоне (до polling)
health_server = start_health_server()
logging.info(f"Health server started on :{HEALTH_PORT}")
try:
application.run_polling(allowed_updates=Update.ALL_TYPES)
finally:
health_server.shutdown()
10.3. Метрики в Prometheus
from prometheus_client import Counter, Histogram, start_http_server
# Инициализация
start_http_server(8000) # отдельный порт для метрик
BOT_REQUESTS_TOTAL = Counter(
"bot_requests_total",
"Общее число обработанных команд",
["command"]
)
BOT_REQUEST_DURATION = Histogram(
"bot_request_duration_seconds",
"Время обработки команды"
)
def instrument_command(command_name: str):
def decorator(func):
@wraps(func)
async def wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE):
BOT_REQUESTS_TOTAL.labels(command=command_name).inc()
with BOT_REQUEST_DURATION.time():
return await func(update, context)
return wrapper
return decorator
# Применение:
@instrument_command("start")
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
# ...
Примечание. Для Grafana используйте дашборд «Telegram Bot Metrics» (шаблон: Grafana ID 14945).
11. Интернационализация (i18n)
11.1. Архитектура на базе gettext
Структура проекта:
locales/
├── en/
│ └── LC_MESSAGES/
│ ├── bot.mo
│ └── bot.po
├── ru/
│ └── LC_MESSAGES/
│ ├── bot.mo
│ └── bot.po
└── __init__.py
Генерация .pot и .po:
# Извлечение строк из кода
xgettext --language=Python --keyword=_ --output=locales/bot.pot main.py handlers/*.py
# Создание .po для языков
msginit --input=locales/bot.pot --locale=ru_RU --output=locales/ru/LC_MESSAGES/bot.po
msginit --input=locales/bot.pot --locale=en_US --output=locales/en/LC_MESSAGES/bot.po
# После перевода — компиляция в .mo
msgfmt locales/ru/LC_MESSAGES/bot.po -o locales/ru/LC_MESSAGES/bot.mo
11.2. Контекстно-зависимое форматирование
import gettext
from pathlib import Path
LOCALE_DIR = Path(__file__).parent / "locales"
def get_translator(lang_code: str = "ru") -> gettext.GNUTranslations:
try:
return gettext.translation("bot", localedir=LOCALE_DIR, languages=[lang_code])
except FileNotFoundError:
return gettext.NullTranslations() # fallback: no-op
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_lang = update.effective_user.language_code or "ru"
_ = get_translator(user_lang[:2]).gettext # ru, en
await update.message.reply_text(
_("Здравствуйте! Это бот на {lang} языке.").format(lang=user_lang)
)
Альтернатива. Для динамических переводов (без пересборки) — храните строки в БД или JSON-файлах и кэшируйте в
lru_cache.
12. Работа с базами данных
12.1. Асинхронное подключение через asyncpg
# db.py
import asyncpg
from pydantic_settings import BaseSettings
class DBSettings(BaseSettings):
DB_HOST: str = "localhost"
DB_PORT: int = 5432
DB_NAME: str
DB_USER: str
DB_PASS: str
@property
def dsn(self) -> str:
return f"postgresql://{self.DB_USER}:{self.DB_PASS}@{self.DB_HOST}:{self.DB_PORT}/{self.DB_NAME}"
settings = DBSettings()
async def init_db() -> asyncpg.Pool:
return await asyncpg.create_pool(
dsn=settings.dsn,
min_size=1,
max_size=10,
command_timeout=60,
init=setup_connection # см. ниже
)
async def setup_connection(conn: asyncpg.Connection) -> None:
# Устанавливаем часовой пояс и кодировку
await conn.execute("SET TIME ZONE 'Europe/Moscow';")
await conn.execute("SET CLIENT_ENCODING TO 'UTF8';")
12.2. Безопасное выполнение с транзакциями
from contextlib import asynccontextmanager
@asynccontextmanager
async def transaction(conn: asyncpg.Connection):
tr = conn.transaction()
await tr.start()
try:
yield conn
await tr.commit()
except Exception:
await tr.rollback()
raise
# Пример использования
async def create_user_if_not_exists(conn: asyncpg.Connection, user_id: int, name: str) -> bool:
async with transaction(conn):
row = await conn.fetchrow(
"""
INSERT INTO users (user_id, name, created_at)
VALUES ($1, $2, NOW())
ON CONFLICT (user_id) DO NOTHING
RETURNING user_id
""",
user_id, name
)
return row is not None
12.3. Миграции через alembic (асинхронная конфигурация)
alembic.ini:
[alembic]
script_location = alembic
sqlalchemy.url = driver://user:pass@localhost/dbname
[post_write_hooks]
alembic/env.py (фрагмент для async):
from sqlalchemy.ext.asyncio import create_async_engine
from alembic import context
def run_migrations_online():
connectable = create_async_engine(
config.get_main_option("sqlalchemy.url"),
poolclass=NullPool,
)
async def run_async_migrations():
async with connectable.connect() as connection:
await connection.run_sync(
lambda sync_conn: context.configure(
connection=sync_conn,
target_metadata=target_metadata,
compare_type=True,
)
)
await connection.run_sync(lambda sync_conn: context.run_migrations())
import asyncio
asyncio.run(run_async_migrations())
13. Гибридные сценарии: бот + веб-интерфейс
Цель: бот обрабатывает интерактив, веб — настройки, аналитика, ручное управление.
13.1. FastAPI-эндпоинт для уведомлений администратора
# web.py
from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel
app = FastAPI()
class NotificationRequest(BaseModel):
user_id: int
text: str
silent: bool = False
async def get_admin_token(token: str):
if token != os.getenv("ADMIN_API_TOKEN"):
raise HTTPException(status_code=403, detail="Invalid token")
return True
@app.post("/notify", dependencies=[Depends(get_admin_token)])
async def send_notification(req: NotificationRequest):
try:
await bot.send_message(
chat_id=req.user_id,
text=req.text,
disable_notification=req.silent
)
return {"status": "sent"}
except Exception as e:
logging.error(f"Web notify failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
13.2. Запуск FastAPI и бота в одном процессе (для dev)
# main.py
import threading
import uvicorn
def run_web_server():
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="warning")
if __name__ == "__main__":
# Запуск веб-сервера в фоновом потоке
web_thread = threading.Thread(target=run_web_server, daemon=True)
web_thread.start()
# Основной цикл — бот
application.run_polling()
Рекомендация. В продакшене разделяйте сервисы: бот — в
pod, веб — в другом, общение черезRedis Pub/SubилиRabbitMQ.
14. Централизованная обработка ошибок
14.1. Глобальный exception handler
async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
exc = context.error
logging.error("Exception while handling an update", exc_info=exc)
# Отправка уведомления администратору (только для Update-событий)
if isinstance(update, Update) and update.effective_chat:
try:
await context.bot.send_message(
chat_id=update.effective_chat.id,
text="⚠️ Произошла внутренняя ошибка. Администратор уведомлён."
)
except Exception:
pass # не ломаем обработчик ошибок при ошибках отправки
# Интеграция с Sentry
if "sentry_sdk" in globals():
sentry_sdk.capture_exception(exc)
# Регистрация
application.add_error_handler(error_handler)
14.2. Контекстное логгирование с structlog
import structlog
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
logger = structlog.get_logger()
# Пример использования в обработчике:
async def some_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
logger.info(
"user_action",
user_id=update.effective_user.id,
chat_type=update.effective_chat.type,
action="button_click",
payload=context.user_data.get("last_choice")
)
15. Оптимизация и профилирование
15.1. Профилирование «на лету» через команду /profile
import cProfile
import pstats
import io
@admin_only
async def profile_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not context.args:
await update.message.reply_text("/profile <команда> [аргументы]")
return
cmd_name = context.args[0]
handler = application.handlers_map.get(cmd_name)
if not handler:
await update.message.reply_text("Команда не найдена.")
return
# Запуск с профилированием (только для демонстрации — не в прод!)
pr = cProfile.Profile()
pr.enable()
# Имитация вызова — в реальности вызовите нужную функцию
# await handler(update, context)
pr.disable()
s = io.StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats("cumulative")
ps.print_stats(10) # топ-10
output = s.getvalue()[:4000] # укладываемся в лимит Telegram
await update.message.reply_text(f"```\n{output}\n```", parse_mode="MarkdownV2")
15.2. Кэширование тяжёлых вычислений
from cachetools import TTLCache, cached
# TTL = 5 минут, макс. 1000 записей
calc_cache = TTLCache(maxsize=1000, ttl=300)
@cached(calc_cache)
def heavy_calculation(param: str) -> str:
# Например, парсинг XML, вычисление хэша, ML-инференс
time.sleep(0.5) # имитация
return f"result_for_{param}"