Files
RemnaWaveBOT/core/remnawave.py
2026-04-05 12:18:33 +03:00

440 lines
21 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import aiohttp
from config import config
class RemnawaveAPI:
"""Клиент для работы с Remnawave API"""
def __init__(self):
self.api_url = config.remwave_api_url
self.api_key = config.remwave_api_key
self.headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
async def create_subscription(self, user_id: int, days: int, tariff: str) -> dict:
"""
Создание пользователя (подписки) в Remnawave
Args:
user_id: Telegram user ID
days: Количество дней подписки
tariff: Название тарифа
Returns:
dict: Результат создания подписки
"""
# Эндпоинт для создания пользователя
url = f"{self.api_url}/api/users"
# Генерируем случайный username и пароль
import random
import string
random_username = f"subs{user_id}"
random_password = ''.join(random.choices(string.ascii_letters + string.digits, k=12))
# Вычисляем дату истечения
from datetime import datetime, timedelta
expire_at = (datetime.now() + timedelta(days=days)).isoformat()
# Данные для создания пользователя (формат Remnawave)
data = {
"username": random_username,
"password": random_password,
"telegramId": user_id, # Число, не строка
"expireAt": expire_at # ISO формат даты
}
async with aiohttp.ClientSession() as session:
try:
async with session.post(url, json=data, headers=self.headers) as response:
result = await response.json()
if response.status in [200, 201]:
print(f"✅ Подписка создана для user {user_id}")
print(f"📋 Ответ API: {result}")
# Получаем ссылку на подключение
# API возвращает в формате: {'response': {'subscriptionUrl': '...'}}
subscription_url = None
if isinstance(result, dict):
# Проверяем response.subscriptionUrl (основной вариант)
response_data = result.get("response", {})
if isinstance(response_data, dict):
subscription_url = response_data.get("subscriptionUrl")
# Если не нашли, проверяем другие варианты
if not subscription_url:
subscription_url = (
result.get("subscriptionUrl") or
result.get("subscription") or
result.get("link") or
result.get("url") or
result.get("data", {}).get("subscriptionUrl") or
result.get("data", {}).get("subscription") or
result.get("data", {}).get("link")
)
print(f"🔗 Subscription URL: {subscription_url}")
return {
"success": True,
"data": result,
"subscription_url": subscription_url,
"username": random_username
}
else:
print(f"❌ Ошибка создания подписки: {response.status} - {result}")
return {"success": False, "error": result, "status": response.status}
except aiohttp.ClientError as e:
print(f"❌ Ошибка подключения к Remnawave API: {e}")
return {"success": False, "error": str(e)}
async def get_user_subscription(self, user_id: int) -> dict:
"""
Получение информации о подписке пользователя
Args:
user_id: Telegram user ID
Returns:
dict: Информация о подписке
"""
url = f"{self.api_url}/api/users/telegram/{user_id}"
async with aiohttp.ClientSession() as session:
try:
async with session.get(url, headers=self.headers) as response:
result = await response.json()
if response.status == 200:
return {"success": True, "data": result}
else:
return {"success": False, "error": result, "status": response.status}
except aiohttp.ClientError as e:
return {"success": False, "error": str(e)}
async def extend_subscription(self, user_id: int, days: int) -> dict:
"""
Продление подписки
Args:
user_id: Telegram user ID
days: Количество дней для продления
Returns:
dict: Результат продления
"""
url = f"{self.api_url}/api/users/telegram/{user_id}/extend"
data = {
"durationDays": days
}
async with aiohttp.ClientSession() as session:
try:
async with session.post(url, json=data, headers=self.headers) as response:
result = await response.json()
if response.status == 200:
print(f"✅ Подписка продлена для user {user_id} на {days} дней")
return {"success": True, "data": result}
else:
print(f"❌ Ошибка продления подписки: {response.status} - {result}")
return {"success": False, "error": result, "status": response.status}
except aiohttp.ClientError as e:
print(f"❌ Ошибка подключения к Remnawave API: {e}")
return {"success": False, "error": str(e)}
async def get_all_users(self, page: int = 0, size: int = 500) -> dict:
"""
Получение всех пользователей из Remnawave с пагинацией
Args:
page: Номер страницы (начиная с 0)
size: Размер страницы (макс 500)
Returns:
dict: Список пользователей
"""
url = f"{self.api_url}/api/users"
params = {"start": page, "size": min(size, 500)}
async with aiohttp.ClientSession() as session:
try:
async with session.get(url, headers=self.headers, params=params) as response:
result = await response.json()
print(f"📋 Remnawave API ответ: status={response.status}, keys={list(result.keys()) if isinstance(result, dict) else type(result)}")
if response.status == 200:
# Remnawave возвращает {'response': {'content': [...], 'total': N}}
users_list = []
if isinstance(result, dict):
# Пробуем разные варианты извлечения списка
if 'response' in result and isinstance(result['response'], dict):
response_data = result['response']
if 'content' in response_data and isinstance(response_data['content'], list):
users_list = response_data['content']
print(f"✅ Получено {len(users_list)} пользователей (response.content)")
elif 'users' in response_data and isinstance(response_data['users'], list):
users_list = response_data['users']
print(f"✅ Получено {len(users_list)} пользователей (response.users)")
elif 'content' in result and isinstance(result['content'], list):
users_list = result['content']
print(f"✅ Получено {len(users_list)} пользователей (content)")
elif 'data' in result and isinstance(result['data'], list):
users_list = result['data']
print(f"✅ Получено {len(users_list)} пользователей (data)")
elif 'users' in result and isinstance(result['users'], list):
users_list = result['users']
print(f"✅ Получено {len(users_list)} пользователей (users)")
if users_list:
return {"success": True, "data": {"content": users_list, "total": len(users_list)}}
print(f"⚠️ Не найдено пользователей в ответе")
return {"success": True, "data": {"content": [], "total": 0}}
else:
print(f"❌ Ошибка получения пользователей: {response.status} - {result}")
return {"success": False, "error": result, "status": response.status}
except aiohttp.ClientError as e:
print(f"❌ Ошибка подключения к Remnawave API: {e}")
return {"success": False, "error": str(e)}
async def sync_user(self, user_id: int, username: str, days: int = 30) -> dict:
"""
Синхронизация пользователя (создание если нет в Remnawave)
Args:
user_id: Telegram user ID
username: Имя пользователя
days: Количество дней
Returns:
dict: Результат синхронизации
"""
# Сначала пробуем найти пользователя по Telegram ID через API
print(f"🔍 Поиск пользователя по Telegram ID: {user_id}")
check_result = await self.get_user_subscription(user_id)
if check_result.get("success") and check_result.get("data"):
# Пользователь уже есть в Remnawave с этим telegramId
print(f"✅ Пользователь {user_id} уже найден в Remnawave по Telegram ID")
return {"success": True, "exists": True, "data": check_result.get("data")}
# Если не нашли по telegramId, пробуем найти по username (subs{user_id})
remnawave_username = f"subs{user_id}"
print(f"🔍 Не найдено по Telegram ID, поиск по username: {remnawave_username}")
# Получаем всех пользователей и ищем по username
all_users_result = await self.get_all_users(page=0, size=500)
if all_users_result.get("success"):
users_data = all_users_result.get("data", {})
users_list = users_data.get("content", [])
print(f"📋 Получено {len(users_list)} пользователей из Remnawave")
# Логируем первые несколько для отладки
if users_list and len(users_list) > 0:
first_users = [(u.get("username"), u.get("telegramId")) for u in users_list[:5] if isinstance(u, dict)]
print(f"📋 Первые пользователи: {first_users}")
# Ищем по username
for user in users_list:
if isinstance(user, dict) and user.get("username") == remnawave_username:
user_uuid = user.get("uuid")
print(f"✅ Найден пользователь {remnawave_username} с UUID {user_uuid}")
if user_uuid:
update_result = await self._update_user_telegram_id(user_uuid, user_id)
if update_result.get("success"):
print(f"✅ Обновлён telegramId для user {user_id}")
return {"success": True, "exists": True, "updated": True, "data": update_result.get("data")}
else:
return update_result
# Создаём нового пользователя
print(f" Создание нового пользователя {remnawave_username}")
create_result = await self.create_subscription(user_id, days, remnawave_username)
# Если ошибка "username already exists" — пользователь уже есть, это нормально
if not create_result.get("success"):
error_data = create_result.get("error", {})
if isinstance(error_data, dict) and error_data.get("message") == "User username already exists":
print(f" Пользователь {remnawave_username} уже существует в Remnawave (не удалось обновить telegramId)")
# Возвращаем успех с флагом exists
return {"success": True, "exists": True, "updated": False, "data": None}
else:
print(f"❌ Ошибка создания: {error_data}")
return create_result
async def _find_and_update_by_username(self, username: str, telegram_id: int) -> dict:
"""
Поиск пользователя по username и обновление telegramId
Args:
username: Имя пользователя в Remnawave
telegram_id: Telegram user ID
Returns:
dict: Результат
"""
all_users_result = await self.get_all_users(page=0, size=500)
if not all_users_result.get("success"):
return all_users_result
users_data = all_users_result.get("data", {})
users_list = users_data.get("content", []) if isinstance(users_data, dict) else (users_data if isinstance(users_data, list) else [])
for user in users_list:
if user.get("username") == username:
user_uuid = user.get("uuid")
if user_uuid:
return await self._update_user_telegram_id(user_uuid, telegram_id)
return {"success": False, "error": "User not found"}
async def _update_user_telegram_id(self, user_uuid: str, telegram_id: int) -> dict:
"""
Обновление telegramId у существующего пользователя
Args:
user_uuid: UUID пользователя в Remnawave
telegram_id: Telegram user ID
Returns:
dict: Результат обновления
"""
# Пробуем несколько вариантов обновления
urls_to_try = [
(f"{self.api_url}/api/users/{user_uuid}/telegram", "PATCH"),
(f"{self.api_url}/api/users/{user_uuid}", "PUT"),
(f"{self.api_url}/api/users/{user_uuid}/telegram-id", "POST"),
]
for url, method in urls_to_try:
async with aiohttp.ClientSession() as session:
try:
data = {"telegramId": telegram_id}
if method == "PATCH":
async with session.patch(url, json=data, headers=self.headers) as response:
result = await response.json()
if response.status in [200, 204]:
print(f"✅ Обновлён telegramId для user {telegram_id} (PATCH)")
return {"success": True, "data": result}
elif method == "PUT":
async with session.put(url, json=data, headers=self.headers) as response:
result = await response.json()
if response.status in [200, 204]:
print(f"✅ Обновлён telegramId для user {telegram_id} (PUT)")
return {"success": True, "data": result}
elif method == "POST":
async with session.post(url, json=data, headers=self.headers) as response:
result = await response.json()
if response.status in [200, 201]:
print(f"✅ Обновлён telegramId для user {telegram_id} (POST)")
return {"success": True, "data": result}
except aiohttp.ClientError as e:
continue
print(f"Не удалось обновить telegramId - API не поддерживает обновление")
return {"success": False, "error": "API does not support user update"}
async def sync_all_users(self) -> dict:
"""
Синхронизация всех пользователей: Remnawave → Локальная БД
Returns:
dict: Результаты синхронизации
"""
from core import db as local_db
# Получаем всех пользователей из Remnawave
print("🔄 Получение пользователей из Remnawave...")
remnawave_result = await self.get_all_users(page=0, size=500)
if not remnawave_result.get("success"):
return {"success": False, "error": "Не удалось получить пользователей из Remnawave"}
users_data = remnawave_result.get("data", {})
users_list = users_data.get("content", [])
print(f"📋 Получено {len(users_list)} пользователей из Remnawave")
total = len(users_list)
imported = 0
updated = 0
errors = 0
error_details = []
connection = local_db.get_connection()
cursor = connection.cursor()
for user in users_list:
if not isinstance(user, dict):
continue
user_id = user.get("telegramId")
username = user.get("username", "")
is_active = user.get("status") == "ACTIVE"
if not user_id:
# Нет Telegram ID - пропускаем
continue
try:
# Проверяем, есть ли пользователь в БД
cursor.execute(
"SELECT id FROM subscriptions WHERE user_id = %s",
(user_id,)
)
existing = cursor.fetchone()
if existing:
# Обновляем существующего
cursor.execute(
"""UPDATE subscriptions
SET username = %s, is_active = %s
WHERE user_id = %s""",
(username, is_active, user_id)
)
updated += 1
print(f"🔄 Обновлён пользователь {user_id} ({username})")
else:
# Создаём нового
cursor.execute(
"""INSERT INTO subscriptions (user_id, username, is_active)
VALUES (%s, %s, %s)""",
(user_id, username, is_active)
)
imported += 1
print(f" Импортирован пользователь {user_id} ({username})")
connection.commit()
except Exception as e:
errors += 1
error_details.append(f"User {user_id}: {str(e)}")
print(f"❌ Ошибка импорта пользователя {user_id}: {e}")
cursor.close()
connection.close()
return {
"success": True,
"total": total,
"imported": imported,
"updated": updated,
"errors": errors,
"error_details": error_details[:10]
}
# Глобальный экземпляр API
remnawave = RemnawaveAPI()