core/services/auth/users.py

73 lines
1.9 KiB
Python
Raw Normal View History

import asyncio
2022-11-24 15:41:57 +00:00
from sqlalchemy.orm import selectinload, exc
from orm.user import User
2022-11-22 03:11:26 +00:00
from base.orm import local_session
class UserStorage:
2022-09-03 10:50:14 +00:00
users = {}
lock = asyncio.Lock()
@staticmethod
def init(session):
self = UserStorage
users = (
session.query(User)
.options(selectinload(User.roles), selectinload(User.ratings))
.all()
)
self.users = dict([(user.id, user) for user in users])
print("[auth.users] %d precached" % len(self.users))
@staticmethod
async def get_user(id):
2022-11-22 03:11:26 +00:00
with local_session() as session:
2022-11-24 15:41:57 +00:00
try:
user = (
session.query(User).options(
selectinload(User.roles),
selectinload(User.ratings)
).filter(
User.id == id
).one()
)
return user
except exc.NoResultFound:
return None
2022-09-03 10:50:14 +00:00
@staticmethod
async def get_all_users():
self = UserStorage
async with self.lock:
aaa = list(self.users.values())
aaa.sort(key=lambda user: user.createdAt)
return aaa
2022-09-14 08:27:44 +00:00
@staticmethod
async def get_top_users():
self = UserStorage
async with self.lock:
aaa = list(self.users.values())
aaa.sort(key=lambda user: user.rating)
return aaa
2022-09-03 10:50:14 +00:00
@staticmethod
async def get_user_by_slug(slug):
self = UserStorage
async with self.lock:
for user in self.users.values():
if user.slug == slug:
return user
@staticmethod
async def add_user(user):
self = UserStorage
async with self.lock:
self.users[user.id] = user
@staticmethod
async def del_user(id):
self = UserStorage
async with self.lock:
del self.users[id]