Files
RemnaWaveBOT/core/__init__.py
2026-04-05 12:18:33 +03:00

519 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import mysql.connector
from mysql.connector import Error
from datetime import datetime, timedelta
from typing import Optional
import asyncio
from functools import wraps
from config import config
def run_sync(func):
"""Декоратор для запуска синхронных функций в async"""
@wraps(func)
async def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, func, *args, **kwargs)
return wrapper
class Database:
"""Класс для работы с базой данных"""
def __init__(self):
self.db_config = config.db_config
self.db_config_with_name = config.db_config_with_name
def get_connection(self, with_db: bool = True):
"""Подключение к MySQL"""
db_config = self.db_config_with_name if with_db else self.db_config
try:
connection = mysql.connector.connect(**db_config)
return connection
except Error as e:
print(f"Ошибка подключения к БД: {e}")
return None
async def create_database_if_not_exists(self) -> bool:
"""Создание базы данных если она не существует"""
connection = self.get_connection(with_db=False)
if not connection:
return False
try:
cursor = connection.cursor()
cursor.execute(
f"CREATE DATABASE IF NOT EXISTS `{config.db_name}` "
"CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci"
)
connection.commit()
cursor.close()
connection.close()
print(f"База данных '{config.db_name}' проверена/создана")
return True
except Error as e:
print(f"Ошибка создания БД: {e}")
return False
async def create_tables(self) -> bool:
"""Создание таблиц в базе данных"""
connection = self.get_connection(with_db=True)
if not connection:
return False
try:
cursor = connection.cursor()
# Таблица подписок
cursor.execute("""
CREATE TABLE IF NOT EXISTS subscriptions (
id INT AUTO_INCREMENT PRIMARY KEY,
user_id BIGINT UNIQUE NOT NULL,
username VARCHAR(255),
is_active BOOLEAN DEFAULT FALSE,
subscription_start DATETIME,
subscription_end DATETIME,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
)
""")
# Таблица языков пользователей
cursor.execute("""
CREATE TABLE IF NOT EXISTS user_languages (
user_id BIGINT PRIMARY KEY,
language_code VARCHAR(10) DEFAULT 'ru',
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
)
""")
# Таблица тикетов
cursor.execute("""
CREATE TABLE IF NOT EXISTS tickets (
id INT AUTO_INCREMENT PRIMARY KEY,
user_id BIGINT NOT NULL,
username VARCHAR(255),
message TEXT NOT NULL,
status VARCHAR(20) DEFAULT 'open',
admin_response TEXT,
closed_by VARCHAR(255),
closed_at DATETIME,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_user_id (user_id),
INDEX idx_status (status)
)
""")
# Индексы
cursor.execute("CREATE INDEX IF NOT EXISTS idx_user_id ON subscriptions(user_id)")
connection.commit()
cursor.close()
connection.close()
print("Таблицы созданы/проверены")
return True
except Error as e:
print(f"Ошибка создания таблиц: {e}")
return False
async def initialize(self):
"""Инициализация БД: создание БД и таблиц"""
await self.create_database_if_not_exists()
await self.create_tables()
@run_sync
def get_user_subscription(self, user_id: int) -> bool:
"""Проверка статуса подписки пользователя"""
connection = self.get_connection()
if not connection:
return False
try:
cursor = connection.cursor(dictionary=True)
cursor.execute(
"SELECT is_active FROM subscriptions WHERE user_id = %s",
(user_id,)
)
result = cursor.fetchone()
cursor.close()
connection.close()
return result['is_active'] if result else False
except Error as e:
print(f"Ошибка БД: {e}")
return False
@run_sync
def create_user(self, user_id: int, username: str) -> bool:
"""Создание нового пользователя в базе"""
connection = self.get_connection()
if not connection:
return False
try:
cursor = connection.cursor()
cursor.execute(
"""INSERT INTO subscriptions (user_id, username, is_active)
VALUES (%s, %s, FALSE)
ON DUPLICATE KEY UPDATE username = %s""",
(user_id, username, username)
)
connection.commit()
cursor.close()
connection.close()
return True
except Error as e:
print(f"Ошибка БД: {e}")
return False
@run_sync
def activate_subscription(self, user_id: int, days: int = 30) -> bool:
"""Активация подписки на указанное количество дней"""
connection = self.get_connection()
if not connection:
return False
try:
cursor = connection.cursor()
now = datetime.now()
end_date = now + timedelta(days=days)
cursor.execute(
"""INSERT INTO subscriptions (user_id, is_active, subscription_start, subscription_end)
VALUES (%s, TRUE, %s, %s)
ON DUPLICATE KEY UPDATE
is_active = TRUE,
subscription_start = %s,
subscription_end = %s""",
(user_id, now, end_date, now, end_date)
)
connection.commit()
cursor.close()
connection.close()
return True
except Error as e:
print(f"Ошибка БД: {e}")
return False
@run_sync
def deactivate_subscription(self, user_id: int) -> bool:
"""Деактивация подписки"""
connection = self.get_connection()
if not connection:
return False
try:
cursor = connection.cursor()
cursor.execute(
"UPDATE subscriptions SET is_active = FALSE WHERE user_id = %s",
(user_id,)
)
connection.commit()
cursor.close()
connection.close()
return True
except Error as e:
print(f"Ошибка БД: {e}")
return False
@run_sync
def get_subscription_end_date(self, user_id: int) -> Optional[datetime]:
"""Получение даты окончания подписки"""
connection = self.get_connection()
if not connection:
return None
try:
cursor = connection.cursor(dictionary=True)
cursor.execute(
"SELECT subscription_end FROM subscriptions WHERE user_id = %s",
(user_id,)
)
result = cursor.fetchone()
cursor.close()
connection.close()
return result['subscription_end'] if result and result['subscription_end'] else None
except Error as e:
print(f"Ошибка БД: {e}")
return None
@run_sync
def get_user_language(self, user_id: int) -> str:
"""Получение языка пользователя"""
connection = self.get_connection()
if not connection:
return "ru"
try:
cursor = connection.cursor(dictionary=True)
cursor.execute(
"SELECT language_code FROM user_languages WHERE user_id = %s",
(user_id,)
)
result = cursor.fetchone()
cursor.close()
connection.close()
return result['language_code'] if result else "ru"
except Error as e:
print(f"Ошибка БД: {e}")
return "ru"
@run_sync
def set_user_language(self, user_id: int, language: str) -> bool:
"""Установка языка пользователя"""
connection = self.get_connection()
if not connection:
return False
try:
cursor = connection.cursor()
cursor.execute(
"""INSERT INTO user_languages (user_id, language_code)
VALUES (%s, %s)
ON DUPLICATE KEY UPDATE language_code = %s""",
(user_id, language, language)
)
connection.commit()
cursor.close()
connection.close()
return True
except Error as e:
print(f"Ошибка БД: {e}")
return False
@run_sync
def create_ticket(self, user_id: int, username: str, message: str) -> Optional[int]:
"""Создание тикета"""
connection = self.get_connection()
if not connection:
return None
try:
cursor = connection.cursor()
cursor.execute(
"""INSERT INTO tickets (user_id, username, message, status)
VALUES (%s, %s, %s, 'open')""",
(user_id, username, message)
)
connection.commit()
ticket_id = cursor.lastrowid
cursor.close()
connection.close()
return ticket_id
except Error as e:
print(f"Ошибка БД: {e}")
return None
@run_sync
def get_user_tickets(self, user_id: int) -> list:
"""Получение всех тикетов пользователя"""
connection = self.get_connection()
if not connection:
return []
try:
cursor = connection.cursor(dictionary=True)
cursor.execute(
"""SELECT id, message, status, admin_response, created_at
FROM tickets
WHERE user_id = %s
ORDER BY created_at DESC""",
(user_id,)
)
tickets = cursor.fetchall()
cursor.close()
connection.close()
return tickets
except Error as e:
print(f"Ошибка БД: {e}")
return []
@run_sync
def get_ticket(self, ticket_id: int) -> Optional[dict]:
"""Получение информации о тикете"""
connection = self.get_connection()
if not connection:
return None
try:
cursor = connection.cursor(dictionary=True)
cursor.execute(
"SELECT * FROM tickets WHERE id = %s",
(ticket_id,)
)
ticket = cursor.fetchone()
cursor.close()
connection.close()
return ticket
except Error as e:
print(f"Ошибка БД: {e}")
return None
@run_sync
def update_ticket_response(self, ticket_id: int, admin_response: str, status: str = 'answered') -> bool:
"""Обновление ответа администратора в тикете"""
connection = self.get_connection()
if not connection:
return False
try:
cursor = connection.cursor()
cursor.execute(
"""UPDATE tickets
SET admin_response = %s, status = %s
WHERE id = %s""",
(admin_response, status, ticket_id)
)
connection.commit()
cursor.close()
connection.close()
return True
except Error as e:
print(f"Ошибка БД: {e}")
return False
@run_sync
def close_ticket(self, ticket_id, closed_by):
"""Закрытие тикета"""
connection = self.get_connection()
if not connection:
return False
try:
cursor = connection.cursor()
cursor.execute(
"""UPDATE tickets
SET status = 'closed', closed_by = %s, closed_at = %s
WHERE id = %s""",
(closed_by, datetime.now(), ticket_id)
)
connection.commit()
cursor.close()
connection.close()
return True
except Error as e:
print(f"Ошибка БД: {e}")
return False
@run_sync
def append_ticket_message(self, ticket_id: int, user_message: str) -> bool:
"""Добавление сообщения к тикету"""
connection = self.get_connection()
if not connection:
return False
try:
cursor = connection.cursor()
# Получаем текущее сообщение
cursor.execute("SELECT message FROM tickets WHERE id = %s", (ticket_id,))
result = cursor.fetchone()
if result:
current_message = result[0]
new_message = f"{current_message}\n\n Дополнение:\n{user_message}"
cursor.execute(
"UPDATE tickets SET message = %s WHERE id = %s",
(new_message, ticket_id)
)
connection.commit()
cursor.close()
connection.close()
return True
return False
except Error as e:
print(f"Ошибка БД: {e}")
return False
@run_sync
def get_all_users_paginated(self, page, per_page=10):
"""
Получение всех пользователей с пагинацией
Args:
page: Номер страницы (начиная с 1)
per_page: Количество пользователей на странице
Returns:
dict: {'users': [...], 'total': int, 'total_pages': int, 'current_page': int}
"""
connection = self.get_connection()
if not connection:
return {'users': [], 'total': 0, 'total_pages': 0, 'current_page': page}
try:
cursor = connection.cursor(dictionary=True)
# Получаем общее количество пользователей
cursor.execute("SELECT COUNT(*) as count FROM subscriptions")
total = cursor.fetchone()['count']
# Вычисляем общее количество страниц
total_pages = (total + per_page - 1) // per_page if total > 0 else 0
# Ограничиваем page в допустимых пределах
page = max(1, min(page, total_pages if total_pages > 0 else 1))
# Получаем пользователей с пагинацией
offset = (page - 1) * per_page
cursor.execute(
"""SELECT id, user_id, username, is_active, subscription_start, subscription_end, created_at
FROM subscriptions
ORDER BY created_at DESC
LIMIT %s OFFSET %s""",
(per_page, offset)
)
users = cursor.fetchall()
cursor.close()
connection.close()
return {
'users': users,
'total': total,
'total_pages': total_pages,
'current_page': page
}
except Error as e:
print(f"Ошибка БД: {e}")
return {'users': [], 'total': 0, 'total_pages': 0, 'current_page': page}
@run_sync
def get_users_count(self):
"""
Получение количества пользователей по статусам
Returns:
dict: {'total': int, 'active': int, 'inactive': int}
"""
connection = self.get_connection()
if not connection:
return {'total': 0, 'active': 0, 'inactive': 0}
try:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT COUNT(*) as count FROM subscriptions")
total = cursor.fetchone()['count']
cursor.execute("SELECT COUNT(*) as count FROM subscriptions WHERE is_active = TRUE")
active = cursor.fetchone()['count']
cursor.close()
connection.close()
return {
'total': total,
'active': active,
'inactive': total - active
}
except Error as e:
print(f"Ошибка БД: {e}")
return {'total': 0, 'active': 0, 'inactive': 0}
# Глобальный экземпляр базы данных
db = Database()