core/auth/oauth.py

190 lines
7.1 KiB
Python
Raw Normal View History

2023-10-26 20:38:31 +00:00
from authlib.integrations.starlette_client import OAuth
2025-05-16 06:23:48 +00:00
from authlib.oauth2.rfc7636 import create_s256_code_challenge
from starlette.responses import RedirectResponse, JSONResponse
from secrets import token_urlsafe
import time
2023-10-30 21:00:55 +00:00
2023-10-26 21:07:35 +00:00
from auth.tokenstorage import TokenStorage
2025-05-16 06:23:48 +00:00
from auth.orm import Author
from services.db import local_session
2023-10-30 21:00:55 +00:00
from settings import FRONTEND_URL, OAUTH_CLIENTS
oauth = OAuth()
2025-05-16 06:23:48 +00:00
# Конфигурация провайдеров
PROVIDERS = {
"google": {
"name": "google",
"server_metadata_url": "https://accounts.google.com/.well-known/openid-configuration",
"client_kwargs": {"scope": "openid email profile", "prompt": "select_account"},
},
"github": {
"name": "github",
"access_token_url": "https://github.com/login/oauth/access_token",
"authorize_url": "https://github.com/login/oauth/authorize",
"api_base_url": "https://api.github.com/",
"client_kwargs": {"scope": "user:email"},
},
"facebook": {
"name": "facebook",
"access_token_url": "https://graph.facebook.com/v13.0/oauth/access_token",
"authorize_url": "https://www.facebook.com/v13.0/dialog/oauth",
"api_base_url": "https://graph.facebook.com/",
"client_kwargs": {"scope": "public_profile email"},
},
}
2025-05-16 06:23:48 +00:00
# Регистрация провайдеров
for provider, config in PROVIDERS.items():
if provider in OAUTH_CLIENTS:
oauth.register(
name=config["name"],
client_id=OAUTH_CLIENTS[provider.upper()]["id"],
client_secret=OAUTH_CLIENTS[provider.upper()]["key"],
**config,
)
async def get_user_profile(provider: str, client, token) -> dict:
"""Получает профиль пользователя от провайдера OAuth"""
if provider == "google":
userinfo = token.get("userinfo", {})
return {
"id": userinfo.get("sub"),
"email": userinfo.get("email"),
"name": userinfo.get("name"),
"picture": userinfo.get("picture", "").replace("=s96", "=s600"),
}
elif provider == "github":
profile = await client.get("user", token=token)
profile_data = profile.json()
emails = await client.get("user/emails", token=token)
emails_data = emails.json()
primary_email = next((email["email"] for email in emails_data if email["primary"]), None)
return {
"id": str(profile_data["id"]),
"email": primary_email or profile_data.get("email"),
"name": profile_data.get("name") or profile_data.get("login"),
"picture": profile_data.get("avatar_url"),
}
elif provider == "facebook":
profile = await client.get("me?fields=id,name,email,picture.width(600)", token=token)
profile_data = profile.json()
return {
"id": profile_data["id"],
"email": profile_data.get("email"),
"name": profile_data.get("name"),
"picture": profile_data.get("picture", {}).get("data", {}).get("url"),
}
return {}
async def oauth_login(request):
2025-05-16 06:23:48 +00:00
"""Начинает процесс OAuth авторизации"""
provider = request.path_params["provider"]
2025-05-16 06:23:48 +00:00
if provider not in PROVIDERS:
return JSONResponse({"error": "Invalid provider"}, status_code=400)
client = oauth.create_client(provider)
2025-05-16 06:23:48 +00:00
if not client:
return JSONResponse({"error": "Provider not configured"}, status_code=400)
2025-05-16 06:23:48 +00:00
# Генерируем PKCE challenge
code_verifier = token_urlsafe(32)
code_challenge = create_s256_code_challenge(code_verifier)
2025-05-16 06:23:48 +00:00
# Сохраняем code_verifier в сессии
request.session["code_verifier"] = code_verifier
request.session["provider"] = provider
request.session["state"] = token_urlsafe(16)
redirect_uri = f"{FRONTEND_URL}/oauth/callback"
try:
return await client.authorize_redirect(
request,
redirect_uri,
code_challenge=code_challenge,
code_challenge_method="S256",
state=request.session["state"],
)
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=500)
async def oauth_callback(request):
"""Обрабатывает callback от OAuth провайдера"""
try:
provider = request.session.get("provider")
if not provider:
return JSONResponse({"error": "No active OAuth session"}, status_code=400)
# Проверяем state
state = request.query_params.get("state")
if state != request.session.get("state"):
return JSONResponse({"error": "Invalid state"}, status_code=400)
client = oauth.create_client(provider)
if not client:
return JSONResponse({"error": "Provider not configured"}, status_code=400)
# Получаем токен с PKCE verifier
token = await client.authorize_access_token(
request, code_verifier=request.session.get("code_verifier")
)
# Получаем профиль пользователя
profile = await get_user_profile(provider, client, token)
if not profile.get("email"):
return JSONResponse({"error": "Email not provided"}, status_code=400)
# Создаем или обновляем пользователя
with local_session() as session:
author = session.query(Author).filter(Author.email == profile["email"]).first()
if not author:
author = Author(
email=profile["email"],
name=profile["name"],
username=profile["name"],
pic=profile.get("picture"),
oauth=f"{provider}:{profile['id']}",
email_verified=True,
created_at=int(time.time()),
updated_at=int(time.time()),
last_seen=int(time.time()),
)
session.add(author)
else:
author.name = profile["name"]
author.pic = profile.get("picture") or author.pic
author.oauth = f"{provider}:{profile['id']}"
author.email_verified = True
author.updated_at = int(time.time())
author.last_seen = int(time.time())
session.commit()
# Создаем сессию
session_token = await TokenStorage.create_session(author)
# Очищаем сессию OAuth
request.session.pop("code_verifier", None)
request.session.pop("provider", None)
request.session.pop("state", None)
# Возвращаем токен через cookie
response = RedirectResponse(url=f"{FRONTEND_URL}/auth/success")
response.set_cookie(
"session_token",
session_token,
httponly=True,
secure=True,
samesite="lax",
max_age=30 * 24 * 60 * 60, # 30 days
)
return response
except Exception as e:
return RedirectResponse(url=f"{FRONTEND_URL}/auth/error?message={str(e)}")