import mysql.connector from mysql.connector import Error from datetime import datetime, timedelta from typing import Optional import asyncio from functools import wraps from config import config def run_sync(func): """Декоратор для запуска синхронных функций в async""" @wraps(func) async def wrapper(*args, **kwargs): loop = asyncio.get_event_loop() return await loop.run_in_executor(None, func, *args, **kwargs) return wrapper class Database: """Класс для работы с базой данных""" def __init__(self): self.db_config = config.db_config self.db_config_with_name = config.db_config_with_name def get_connection(self, with_db: bool = True): """Подключение к MySQL""" db_config = self.db_config_with_name if with_db else self.db_config try: connection = mysql.connector.connect(**db_config) return connection except Error as e: print(f"Ошибка подключения к БД: {e}") return None async def create_database_if_not_exists(self) -> bool: """Создание базы данных если она не существует""" connection = self.get_connection(with_db=False) if not connection: return False try: cursor = connection.cursor() cursor.execute( f"CREATE DATABASE IF NOT EXISTS `{config.db_name}` " "CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci" ) connection.commit() cursor.close() connection.close() print(f"База данных '{config.db_name}' проверена/создана") return True except Error as e: print(f"Ошибка создания БД: {e}") return False async def create_tables(self) -> bool: """Создание таблиц в базе данных""" connection = self.get_connection(with_db=True) if not connection: return False try: cursor = connection.cursor() # Таблица подписок cursor.execute(""" CREATE TABLE IF NOT EXISTS subscriptions ( id INT AUTO_INCREMENT PRIMARY KEY, user_id BIGINT UNIQUE NOT NULL, username VARCHAR(255), is_active BOOLEAN DEFAULT FALSE, subscription_start DATETIME, subscription_end DATETIME, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ) """) # Таблица языков пользователей cursor.execute(""" CREATE TABLE IF NOT EXISTS user_languages ( user_id BIGINT PRIMARY KEY, language_code VARCHAR(10) DEFAULT 'ru', updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ) """) # Таблица тикетов cursor.execute(""" CREATE TABLE IF NOT EXISTS tickets ( id INT AUTO_INCREMENT PRIMARY KEY, user_id BIGINT NOT NULL, username VARCHAR(255), message TEXT NOT NULL, status VARCHAR(20) DEFAULT 'open', admin_response TEXT, closed_by VARCHAR(255), closed_at DATETIME, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_user_id (user_id), INDEX idx_status (status) ) """) # Индексы cursor.execute("CREATE INDEX IF NOT EXISTS idx_user_id ON subscriptions(user_id)") connection.commit() cursor.close() connection.close() print("Таблицы созданы/проверены") return True except Error as e: print(f"Ошибка создания таблиц: {e}") return False async def initialize(self): """Инициализация БД: создание БД и таблиц""" await self.create_database_if_not_exists() await self.create_tables() @run_sync def get_user_subscription(self, user_id: int) -> bool: """Проверка статуса подписки пользователя""" connection = self.get_connection() if not connection: return False try: cursor = connection.cursor(dictionary=True) cursor.execute( "SELECT is_active FROM subscriptions WHERE user_id = %s", (user_id,) ) result = cursor.fetchone() cursor.close() connection.close() return result['is_active'] if result else False except Error as e: print(f"Ошибка БД: {e}") return False @run_sync def create_user(self, user_id: int, username: str) -> bool: """Создание нового пользователя в базе""" connection = self.get_connection() if not connection: return False try: cursor = connection.cursor() cursor.execute( """INSERT INTO subscriptions (user_id, username, is_active) VALUES (%s, %s, FALSE) ON DUPLICATE KEY UPDATE username = %s""", (user_id, username, username) ) connection.commit() cursor.close() connection.close() return True except Error as e: print(f"Ошибка БД: {e}") return False @run_sync def activate_subscription(self, user_id: int, days: int = 30) -> bool: """Активация подписки на указанное количество дней""" connection = self.get_connection() if not connection: return False try: cursor = connection.cursor() now = datetime.now() end_date = now + timedelta(days=days) cursor.execute( """INSERT INTO subscriptions (user_id, is_active, subscription_start, subscription_end) VALUES (%s, TRUE, %s, %s) ON DUPLICATE KEY UPDATE is_active = TRUE, subscription_start = %s, subscription_end = %s""", (user_id, now, end_date, now, end_date) ) connection.commit() cursor.close() connection.close() return True except Error as e: print(f"Ошибка БД: {e}") return False @run_sync def deactivate_subscription(self, user_id: int) -> bool: """Деактивация подписки""" connection = self.get_connection() if not connection: return False try: cursor = connection.cursor() cursor.execute( "UPDATE subscriptions SET is_active = FALSE WHERE user_id = %s", (user_id,) ) connection.commit() cursor.close() connection.close() return True except Error as e: print(f"Ошибка БД: {e}") return False @run_sync def get_subscription_end_date(self, user_id: int) -> Optional[datetime]: """Получение даты окончания подписки""" connection = self.get_connection() if not connection: return None try: cursor = connection.cursor(dictionary=True) cursor.execute( "SELECT subscription_end FROM subscriptions WHERE user_id = %s", (user_id,) ) result = cursor.fetchone() cursor.close() connection.close() return result['subscription_end'] if result and result['subscription_end'] else None except Error as e: print(f"Ошибка БД: {e}") return None @run_sync def get_user_language(self, user_id: int) -> str: """Получение языка пользователя""" connection = self.get_connection() if not connection: return "ru" try: cursor = connection.cursor(dictionary=True) cursor.execute( "SELECT language_code FROM user_languages WHERE user_id = %s", (user_id,) ) result = cursor.fetchone() cursor.close() connection.close() return result['language_code'] if result else "ru" except Error as e: print(f"Ошибка БД: {e}") return "ru" @run_sync def set_user_language(self, user_id: int, language: str) -> bool: """Установка языка пользователя""" connection = self.get_connection() if not connection: return False try: cursor = connection.cursor() cursor.execute( """INSERT INTO user_languages (user_id, language_code) VALUES (%s, %s) ON DUPLICATE KEY UPDATE language_code = %s""", (user_id, language, language) ) connection.commit() cursor.close() connection.close() return True except Error as e: print(f"Ошибка БД: {e}") return False @run_sync def create_ticket(self, user_id: int, username: str, message: str) -> Optional[int]: """Создание тикета""" connection = self.get_connection() if not connection: return None try: cursor = connection.cursor() cursor.execute( """INSERT INTO tickets (user_id, username, message, status) VALUES (%s, %s, %s, 'open')""", (user_id, username, message) ) connection.commit() ticket_id = cursor.lastrowid cursor.close() connection.close() return ticket_id except Error as e: print(f"Ошибка БД: {e}") return None @run_sync def get_user_tickets(self, user_id: int) -> list: """Получение всех тикетов пользователя""" connection = self.get_connection() if not connection: return [] try: cursor = connection.cursor(dictionary=True) cursor.execute( """SELECT id, message, status, admin_response, created_at FROM tickets WHERE user_id = %s ORDER BY created_at DESC""", (user_id,) ) tickets = cursor.fetchall() cursor.close() connection.close() return tickets except Error as e: print(f"Ошибка БД: {e}") return [] @run_sync def get_ticket(self, ticket_id: int) -> Optional[dict]: """Получение информации о тикете""" connection = self.get_connection() if not connection: return None try: cursor = connection.cursor(dictionary=True) cursor.execute( "SELECT * FROM tickets WHERE id = %s", (ticket_id,) ) ticket = cursor.fetchone() cursor.close() connection.close() return ticket except Error as e: print(f"Ошибка БД: {e}") return None @run_sync def update_ticket_response(self, ticket_id: int, admin_response: str, status: str = 'answered') -> bool: """Обновление ответа администратора в тикете""" connection = self.get_connection() if not connection: return False try: cursor = connection.cursor() cursor.execute( """UPDATE tickets SET admin_response = %s, status = %s WHERE id = %s""", (admin_response, status, ticket_id) ) connection.commit() cursor.close() connection.close() return True except Error as e: print(f"Ошибка БД: {e}") return False @run_sync def close_ticket(self, ticket_id, closed_by): """Закрытие тикета""" connection = self.get_connection() if not connection: return False try: cursor = connection.cursor() cursor.execute( """UPDATE tickets SET status = 'closed', closed_by = %s, closed_at = %s WHERE id = %s""", (closed_by, datetime.now(), ticket_id) ) connection.commit() cursor.close() connection.close() return True except Error as e: print(f"Ошибка БД: {e}") return False @run_sync def append_ticket_message(self, ticket_id: int, user_message: str) -> bool: """Добавление сообщения к тикету""" connection = self.get_connection() if not connection: return False try: cursor = connection.cursor() # Получаем текущее сообщение cursor.execute("SELECT message FROM tickets WHERE id = %s", (ticket_id,)) result = cursor.fetchone() if result: current_message = result[0] new_message = f"{current_message}\n\n➕ Дополнение:\n{user_message}" cursor.execute( "UPDATE tickets SET message = %s WHERE id = %s", (new_message, ticket_id) ) connection.commit() cursor.close() connection.close() return True return False except Error as e: print(f"Ошибка БД: {e}") return False @run_sync def get_all_users_paginated(self, page, per_page=10): """ Получение всех пользователей с пагинацией Args: page: Номер страницы (начиная с 1) per_page: Количество пользователей на странице Returns: dict: {'users': [...], 'total': int, 'total_pages': int, 'current_page': int} """ connection = self.get_connection() if not connection: return {'users': [], 'total': 0, 'total_pages': 0, 'current_page': page} try: cursor = connection.cursor(dictionary=True) # Получаем общее количество пользователей cursor.execute("SELECT COUNT(*) as count FROM subscriptions") total = cursor.fetchone()['count'] # Вычисляем общее количество страниц total_pages = (total + per_page - 1) // per_page if total > 0 else 0 # Ограничиваем page в допустимых пределах page = max(1, min(page, total_pages if total_pages > 0 else 1)) # Получаем пользователей с пагинацией offset = (page - 1) * per_page cursor.execute( """SELECT id, user_id, username, is_active, subscription_start, subscription_end, created_at FROM subscriptions ORDER BY created_at DESC LIMIT %s OFFSET %s""", (per_page, offset) ) users = cursor.fetchall() cursor.close() connection.close() return { 'users': users, 'total': total, 'total_pages': total_pages, 'current_page': page } except Error as e: print(f"Ошибка БД: {e}") return {'users': [], 'total': 0, 'total_pages': 0, 'current_page': page} @run_sync def get_users_count(self): """ Получение количества пользователей по статусам Returns: dict: {'total': int, 'active': int, 'inactive': int} """ connection = self.get_connection() if not connection: return {'total': 0, 'active': 0, 'inactive': 0} try: cursor = connection.cursor(dictionary=True) cursor.execute("SELECT COUNT(*) as count FROM subscriptions") total = cursor.fetchone()['count'] cursor.execute("SELECT COUNT(*) as count FROM subscriptions WHERE is_active = TRUE") active = cursor.fetchone()['count'] cursor.close() connection.close() return { 'total': total, 'active': active, 'inactive': total - active } except Error as e: print(f"Ошибка БД: {e}") return {'total': 0, 'active': 0, 'inactive': 0} # Глобальный экземпляр базы данных db = Database()