from urllib.parse import quote_plus from datetime import datetime from graphql.type import GraphQLResolveInfo from transliterate import translit from auth.tokenstorage import TokenStorage from auth.authenticate import login_required from auth.email import send_auth_email from auth.identity import Identity, Password from base.exceptions import ( InvalidPassword, InvalidToken, ObjectNotExist, OperationNotAllowed, ) from base.orm import local_session from base.resolvers import mutation, query from orm import User, Role from resolvers.profile import get_user_subscriptions from settings import SESSION_TOKEN_HEADER @mutation.field("refreshSession") @login_required async def get_current_user(_, info): user = info.context["request"].user user.lastSeen = datetime.now() with local_session() as session: session.add(user) session.commit() token = await TokenStorage.create_session(user) return { "token": token, "user": user, "news": await get_user_subscriptions(user.slug), } @mutation.field("confirmEmail") async def confirm_email(_, _info, confirm_token): """confirm owning email address""" try: user_id = await TokenStorage.get(confirm_token) with local_session() as session: user = session.query(User).where(User.id == user_id).first() session_token = TokenStorage.create_session(user) user.emailConfirmed = True user.lastSeen = datetime.now() session.add(user) session.commit() return { "token": session_token, "user": user, "news": await get_user_subscriptions(user.slug) } except InvalidToken as e: raise InvalidToken(e.message) except Exception as e: print(e) # FIXME: debug only return {"error": "email is not confirmed"} def create_user(user_dict): user = User(**user_dict) user.roles.append(Role.default_role) with local_session() as session: session.add(user) session.commit() return user def generate_unique_slug(username): slug = translit(username, "ru", reversed=True).replace(".", "-").lower() with local_session() as session: c = 1 user = session.query(User).where(User.slug == slug).first() while user: user = session.query(User).where(User.slug == slug).first() slug = slug + '-' + str(c) c += 1 if not user: unique_slug = slug return quote_plus(unique_slug) @mutation.field("registerUser") async def register(_, _info, email: str, password: str = "", username: str = ""): """creates new user account""" with local_session() as session: user = session.query(User).filter(User.email == email).first() if user: raise OperationNotAllowed("User already exist") else: username = username or email.split("@")[0] user_dict = { "email": email, "username": username, "slug": generate_unique_slug(username) } if password: user_dict["password"] = Password.encode(password) user = create_user(user_dict) if not password: user = await auth_send_link(_, _info, email) return {"user": user} @mutation.field("sendLink") async def auth_send_link(_, _info, email): """send link with confirm code to email""" with local_session() as session: user = session.query(User).filter(User.email == email).first() if not user: raise ObjectNotExist("User not found") else: token = await TokenStorage.create_onetime(user) await send_auth_email(user, token) return user @query.field("signIn") async def login(_, _info, email: str, password: str = ""): with local_session() as session: orm_user = session.query(User).filter(User.email == email).first() if orm_user is None: print(f"[auth] {email}: email not found") # return {"error": "email not found"} raise ObjectNotExist("User not found") # contains webserver status if not password: print(f"[auth] send confirm link to {email}") token = await TokenStorage.create_onetime(orm_user) await send_auth_email(orm_user, token) # FIXME: not an error, warning return {"error": "no password, email link was sent"} else: # sign in using password if not orm_user.emailConfirmed: # not an error, warns users return {"error": "please, confirm email"} else: try: user = Identity.password(orm_user, password) session_token = await TokenStorage.create_session(user) print(f"[auth] user {email} authorized") return { "token": session_token, "user": user, "news": await get_user_subscriptions(user.slug), } except InvalidPassword: print(f"[auth] {email}: invalid password") raise InvalidPassword("invalid passoword") # contains webserver status # return {"error": "invalid password"} @query.field("signOut") @login_required async def sign_out(_, info: GraphQLResolveInfo): token = info.context["request"].headers[SESSION_TOKEN_HEADER] status = await TokenStorage.revoke(token) return status @query.field("isEmailUsed") async def is_email_used(_, _info, email): with local_session() as session: user = session.query(User).filter(User.email == email).first() return user is not None