restructured,inbox-removed

This commit is contained in:
Tony Rewin 2023-10-05 21:46:18 +03:00
parent 6dfec6714a
commit deac939ed8
49 changed files with 886 additions and 1549 deletions

View File

@ -19,7 +19,8 @@ config.set_section_option(config.config_ini_section, "DB_URL", DB_URL)
if config.config_file_name is not None: if config.config_file_name is not None:
fileConfig(config.config_file_name) fileConfig(config.config_file_name)
from base.orm import Base from services.db import Base
target_metadata = [Base.metadata] target_metadata = [Base.metadata]
# other values from the config, defined by the needs of env.py, # other values from the config, defined by the needs of env.py,
@ -66,9 +67,7 @@ def run_migrations_online() -> None:
) )
with connectable.connect() as connection: with connectable.connect() as connection:
context.configure( context.configure(connection=connection, target_metadata=target_metadata)
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction(): with context.begin_transaction():
context.run_migrations() context.run_migrations()

View File

@ -7,57 +7,59 @@ from starlette.authentication import AuthenticationBackend
from starlette.requests import HTTPConnection from starlette.requests import HTTPConnection
from auth.credentials import AuthCredentials, AuthUser from auth.credentials import AuthCredentials, AuthUser
from base.orm import local_session from services.db import local_session
from orm.user import User, Role from orm.user import User, Role
from settings import SESSION_TOKEN_HEADER from settings import SESSION_TOKEN_HEADER
from auth.tokenstorage import SessionToken from auth.tokenstorage import SessionToken
from base.exceptions import OperationNotAllowed from services.exceptions import OperationNotAllowed
class JWTAuthenticate(AuthenticationBackend): class JWTAuthenticate(AuthenticationBackend):
async def authenticate( async def authenticate(
self, request: HTTPConnection self, request: HTTPConnection
) -> Optional[Tuple[AuthCredentials, AuthUser]]: ) -> Optional[Tuple[AuthCredentials, AuthUser]]:
if SESSION_TOKEN_HEADER not in request.headers: if SESSION_TOKEN_HEADER not in request.headers:
return AuthCredentials(scopes={}), AuthUser(user_id=None, username='') return AuthCredentials(scopes={}), AuthUser(user_id=None, username="")
token = request.headers.get(SESSION_TOKEN_HEADER) token = request.headers.get(SESSION_TOKEN_HEADER)
if not token: if not token:
print("[auth.authenticate] no token in header %s" % SESSION_TOKEN_HEADER) print("[auth.authenticate] no token in header %s" % SESSION_TOKEN_HEADER)
return AuthCredentials(scopes={}, error_message=str("no token")), AuthUser( return AuthCredentials(scopes={}, error_message=str("no token")), AuthUser(
user_id=None, username='' user_id=None, username=""
) )
if len(token.split('.')) > 1: if len(token.split(".")) > 1:
payload = await SessionToken.verify(token) payload = await SessionToken.verify(token)
with local_session() as session: with local_session() as session:
try: try:
user = ( user = (
session.query(User).options( session.query(User)
joinedload(User.roles).options(joinedload(Role.permissions)), .options(
joinedload(User.ratings) joinedload(User.roles).options(
).filter( joinedload(Role.permissions)
User.id == payload.user_id ),
).one() joinedload(User.ratings),
)
.filter(User.id == payload.user_id)
.one()
) )
scopes = {} # TODO: integrate await user.get_permission() scopes = {} # TODO: integrate await user.get_permission()
return ( return (
AuthCredentials( AuthCredentials(
user_id=payload.user_id, user_id=payload.user_id, scopes=scopes, logged_in=True
scopes=scopes,
logged_in=True
), ),
AuthUser(user_id=user.id, username=''), AuthUser(user_id=user.id, username=""),
) )
except exc.NoResultFound: except exc.NoResultFound:
pass pass
return AuthCredentials(scopes={}, error_message=str('Invalid token')), AuthUser(user_id=None, username='') return AuthCredentials(scopes={}, error_message=str("Invalid token")), AuthUser(
user_id=None, username=""
)
def login_required(func): def login_required(func):
@ -68,9 +70,7 @@ def login_required(func):
# print(auth) # print(auth)
if not auth or not auth.logged_in: if not auth or not auth.logged_in:
# raise Unauthorized(auth.error_message or "Please login") # raise Unauthorized(auth.error_message or "Please login")
return { return {"error": "Please login first"}
"error": "Please login first"
}
return await func(parent, info, *args, **kwargs) return await func(parent, info, *args, **kwargs)
return wrap return wrap
@ -79,7 +79,9 @@ def login_required(func):
def permission_required(resource, operation, func): def permission_required(resource, operation, func):
@wraps(func) @wraps(func)
async def wrap(parent, info: GraphQLResolveInfo, *args, **kwargs): async def wrap(parent, info: GraphQLResolveInfo, *args, **kwargs):
print('[auth.authenticate] permission_required for %r with info %r' % (func, info)) # debug only print(
"[auth.authenticate] permission_required for %r with info %r" % (func, info)
) # debug only
auth: AuthCredentials = info.context["request"].auth auth: AuthCredentials = info.context["request"].auth
if not auth.logged_in: if not auth.logged_in:
raise OperationNotAllowed(auth.error_message or "Please login") raise OperationNotAllowed(auth.error_message or "Please login")

View File

@ -7,8 +7,9 @@ from sqlalchemy import or_
from auth.jwtcodec import JWTCodec from auth.jwtcodec import JWTCodec
from auth.tokenstorage import TokenStorage from auth.tokenstorage import TokenStorage
# from base.exceptions import InvalidPassword, InvalidToken # from base.exceptions import InvalidPassword, InvalidToken
from base.orm import local_session from services.db import local_session
from orm import User from orm import User
from validations.auth import AuthInput from validations.auth import AuthInput
@ -57,14 +58,10 @@ class Identity:
user = User(**orm_user.dict()) user = User(**orm_user.dict())
if not user.password: if not user.password:
# raise InvalidPassword("User password is empty") # raise InvalidPassword("User password is empty")
return { return {"error": "User password is empty"}
"error": "User password is empty"
}
if not Password.verify(password, user.password): if not Password.verify(password, user.password):
# raise InvalidPassword("Wrong user password") # raise InvalidPassword("Wrong user password")
return { return {"error": "Wrong user password"}
"error": "Wrong user password"
}
return user return user
@staticmethod @staticmethod
@ -87,30 +84,24 @@ class Identity:
@staticmethod @staticmethod
async def onetime(token: str) -> User: async def onetime(token: str) -> User:
try: try:
print('[auth.identity] using one time token') print("[auth.identity] using one time token")
payload = JWTCodec.decode(token) payload = JWTCodec.decode(token)
if not await TokenStorage.exist(f"{payload.user_id}-{payload.username}-{token}"): if not await TokenStorage.exist(
f"{payload.user_id}-{payload.username}-{token}"
):
# raise InvalidToken("Login token has expired, please login again") # raise InvalidToken("Login token has expired, please login again")
return { return {"error": "Token has expired"}
"error": "Token has expired"
}
except ExpiredSignatureError: except ExpiredSignatureError:
# raise InvalidToken("Login token has expired, please try again") # raise InvalidToken("Login token has expired, please try again")
return { return {"error": "Token has expired"}
"error": "Token has expired"
}
except DecodeError: except DecodeError:
# raise InvalidToken("token format error") from e # raise InvalidToken("token format error") from e
return { return {"error": "Token format error"}
"error": "Token format error"
}
with local_session() as session: with local_session() as session:
user = session.query(User).filter_by(id=payload.user_id).first() user = session.query(User).filter_by(id=payload.user_id).first()
if not user: if not user:
# raise Exception("user not exist") # raise Exception("user not exist")
return { return {"error": "User does not exist"}
"error": "User does not exist"
}
if not user.emailConfirmed: if not user.emailConfirmed:
user.emailConfirmed = True user.emailConfirmed = True
session.commit() session.commit()

View File

@ -1,6 +1,6 @@
from datetime import datetime, timezone from datetime import datetime, timezone
import jwt import jwt
from base.exceptions import ExpiredToken, InvalidToken from services.exceptions import ExpiredToken, InvalidToken
from validations.auth import TokenPayload, AuthInput from validations.auth import TokenPayload, AuthInput
from settings import JWT_ALGORITHM, JWT_SECRET_KEY from settings import JWT_ALGORITHM, JWT_SECRET_KEY
@ -13,12 +13,12 @@ class JWTCodec:
"username": user.email or user.phone, "username": user.email or user.phone,
"exp": exp, "exp": exp,
"iat": datetime.now(tz=timezone.utc), "iat": datetime.now(tz=timezone.utc),
"iss": "discours" "iss": "discours",
} }
try: try:
return jwt.encode(payload, JWT_SECRET_KEY, JWT_ALGORITHM) return jwt.encode(payload, JWT_SECRET_KEY, JWT_ALGORITHM)
except Exception as e: except Exception as e:
print('[auth.jwtcodec] JWT encode error %r' % e) print("[auth.jwtcodec] JWT encode error %r" % e)
@staticmethod @staticmethod
def decode(token: str, verify_exp: bool = True) -> TokenPayload: def decode(token: str, verify_exp: bool = True) -> TokenPayload:
@ -33,18 +33,18 @@ class JWTCodec:
# "verify_signature": False # "verify_signature": False
}, },
algorithms=[JWT_ALGORITHM], algorithms=[JWT_ALGORITHM],
issuer="discours" issuer="discours",
) )
r = TokenPayload(**payload) r = TokenPayload(**payload)
print('[auth.jwtcodec] debug token %r' % r) print("[auth.jwtcodec] debug token %r" % r)
return r return r
except jwt.InvalidIssuedAtError: except jwt.InvalidIssuedAtError:
print('[auth.jwtcodec] invalid issued at: %r' % payload) print("[auth.jwtcodec] invalid issued at: %r" % payload)
raise ExpiredToken('check token issued time') raise ExpiredToken("check token issued time")
except jwt.ExpiredSignatureError: except jwt.ExpiredSignatureError:
print('[auth.jwtcodec] expired signature %r' % payload) print("[auth.jwtcodec] expired signature %r" % payload)
raise ExpiredToken('check token lifetime') raise ExpiredToken("check token lifetime")
except jwt.InvalidTokenError: except jwt.InvalidTokenError:
raise InvalidToken('token is not valid') raise InvalidToken("token is not valid")
except jwt.InvalidSignatureError: except jwt.InvalidSignatureError:
raise InvalidToken('token is not valid') raise InvalidToken("token is not valid")

View File

@ -2,14 +2,16 @@ from datetime import datetime, timedelta, timezone
from auth.jwtcodec import JWTCodec from auth.jwtcodec import JWTCodec
from validations.auth import AuthInput from validations.auth import AuthInput
from base.redis import redis from services.redis import redis
from settings import SESSION_TOKEN_LIFE_SPAN, ONETIME_TOKEN_LIFE_SPAN from settings import SESSION_TOKEN_LIFE_SPAN, ONETIME_TOKEN_LIFE_SPAN
async def save(token_key, life_span, auto_delete=True): async def save(token_key, life_span, auto_delete=True):
await redis.execute("SET", token_key, "True") await redis.execute("SET", token_key, "True")
if auto_delete: if auto_delete:
expire_at = (datetime.now(tz=timezone.utc) + timedelta(seconds=life_span)).timestamp() expire_at = (
datetime.now(tz=timezone.utc) + timedelta(seconds=life_span)
).timestamp()
await redis.execute("EXPIREAT", token_key, int(expire_at)) await redis.execute("EXPIREAT", token_key, int(expire_at))
@ -35,7 +37,7 @@ class SessionToken:
class TokenStorage: class TokenStorage:
@staticmethod @staticmethod
async def get(token_key): async def get(token_key):
print('[tokenstorage.get] ' + token_key) print("[tokenstorage.get] " + token_key)
# 2041-user@domain.zn-eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoyMDQxLCJ1c2VybmFtZSI6ImFudG9uLnJld2luK3Rlc3QtbG9hZGNoYXRAZ21haWwuY29tIiwiZXhwIjoxNjcxNzgwNjE2LCJpYXQiOjE2NjkxODg2MTYsImlzcyI6ImRpc2NvdXJzIn0.Nml4oV6iMjMmc6xwM7lTKEZJKBXvJFEIZ-Up1C1rITQ # 2041-user@domain.zn-eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoyMDQxLCJ1c2VybmFtZSI6ImFudG9uLnJld2luK3Rlc3QtbG9hZGNoYXRAZ21haWwuY29tIiwiZXhwIjoxNjcxNzgwNjE2LCJpYXQiOjE2NjkxODg2MTYsImlzcyI6ImRpc2NvdXJzIn0.Nml4oV6iMjMmc6xwM7lTKEZJKBXvJFEIZ-Up1C1rITQ
return await redis.execute("GET", token_key) return await redis.execute("GET", token_key)

View File

@ -1,38 +0,0 @@
from graphql.error import GraphQLError
# TODO: remove traceback from logs for defined exceptions
class BaseHttpException(GraphQLError):
code = 500
message = "500 Server error"
class ExpiredToken(BaseHttpException):
code = 401
message = "401 Expired Token"
class InvalidToken(BaseHttpException):
code = 401
message = "401 Invalid Token"
class Unauthorized(BaseHttpException):
code = 401
message = "401 Unauthorized"
class ObjectNotExist(BaseHttpException):
code = 404
message = "404 Object Does Not Exist"
class OperationNotAllowed(BaseHttpException):
code = 403
message = "403 Operation Is Not Allowed"
class InvalidPassword(BaseHttpException):
code = 403
message = "403 Invalid Password"

View File

@ -1,44 +0,0 @@
from aioredis import from_url
from asyncio import sleep
from settings import REDIS_URL
class RedisCache:
def __init__(self, uri=REDIS_URL):
self._uri: str = uri
self._instance = None
async def connect(self):
if self._instance is not None:
return
self._instance = await from_url(self._uri, encoding="utf-8")
# print(self._instance)
async def disconnect(self):
if self._instance is None:
return
await self._instance.close()
# await self._instance.wait_closed() # deprecated
self._instance = None
async def execute(self, command, *args, **kwargs):
while not self._instance:
await sleep(1)
try:
print("[redis] " + command + ' ' + ' '.join(args))
return await self._instance.execute_command(command, *args, **kwargs)
except Exception:
pass
async def lrange(self, key, start, stop):
print(f"[redis] LRANGE {key} {start} {stop}")
return await self._instance.lrange(key, start, stop)
async def mget(self, key, *keys):
print(f"[redis] MGET {key} {keys}")
return await self._instance.mget(key, *keys)
redis = RedisCache()
__all__ = ["redis"]

43
main.py
View File

@ -13,21 +13,16 @@ from orm import init_tables
from auth.authenticate import JWTAuthenticate from auth.authenticate import JWTAuthenticate
from auth.oauth import oauth_login, oauth_authorize from auth.oauth import oauth_login, oauth_authorize
from base.redis import redis from services.redis import redis
from base.resolvers import resolvers from services.schema import resolvers
from resolvers.auth import confirm_email_handler from resolvers.auth import confirm_email_handler
from resolvers.upload import upload_handler from resolvers.upload import upload_handler
from services.main import storages_init from services.main import storages_init
from services.stat.viewed import ViewedStorage from services.viewed import ViewedStorage
from services.zine.gittask import GitTask
from settings import DEV_SERVER_PID_FILE_NAME, SENTRY_DSN from settings import DEV_SERVER_PID_FILE_NAME, SENTRY_DSN
# from sse.transport import GraphQLSSEHandler
from services.inbox.presence import on_connect, on_disconnect
# from services.inbox.sse import sse_messages
from ariadne.asgi.handlers import GraphQLTransportWSHandler
import_module("resolvers") import_module("resolvers")
schema = make_executable_schema(load_schema_from_path("schema.graphql"), resolvers) # type: ignore schema = make_executable_schema(load_schema_from_path("schemas/core.graphql"), resolvers) # type: ignore
middleware = [ middleware = [
Middleware(AuthenticationMiddleware, backend=JWTAuthenticate()), Middleware(AuthenticationMiddleware, backend=JWTAuthenticate()),
@ -41,13 +36,12 @@ async def start_up():
await storages_init() await storages_init()
views_stat_task = asyncio.create_task(ViewedStorage().worker()) views_stat_task = asyncio.create_task(ViewedStorage().worker())
print(views_stat_task) print(views_stat_task)
git_task = asyncio.create_task(GitTask.git_task_worker())
print(git_task)
try: try:
import sentry_sdk import sentry_sdk
sentry_sdk.init(SENTRY_DSN) sentry_sdk.init(SENTRY_DSN)
except Exception as e: except Exception as e:
print('[sentry] init error') print("[sentry] init error")
print(e) print(e)
@ -56,7 +50,7 @@ async def dev_start_up():
await redis.connect() await redis.connect()
return return
else: else:
with open(DEV_SERVER_PID_FILE_NAME, 'w', encoding='utf-8') as f: with open(DEV_SERVER_PID_FILE_NAME, "w", encoding="utf-8") as f:
f.write(str(os.getpid())) f.write(str(os.getpid()))
await start_up() await start_up()
@ -67,11 +61,10 @@ async def shutdown():
routes = [ routes = [
# Route("/messages", endpoint=sse_messages),
Route("/oauth/{provider}", endpoint=oauth_login), Route("/oauth/{provider}", endpoint=oauth_login),
Route("/oauth-authorize", endpoint=oauth_authorize), Route("/oauth-authorize", endpoint=oauth_authorize),
Route("/confirm/{token}", endpoint=confirm_email_handler), Route("/confirm/{token}", endpoint=confirm_email_handler),
Route("/upload", endpoint=upload_handler, methods=['POST']) Route("/upload", endpoint=upload_handler, methods=["POST"]),
] ]
app = Starlette( app = Starlette(
@ -81,14 +74,10 @@ app = Starlette(
middleware=middleware, middleware=middleware,
routes=routes, routes=routes,
) )
app.mount("/", GraphQL( app.mount(
schema, "/",
debug=True, GraphQL(schema, debug=True),
websocket_handler=GraphQLTransportWSHandler(
on_connect=on_connect,
on_disconnect=on_disconnect
) )
))
dev_app = app = Starlette( dev_app = app = Starlette(
debug=True, debug=True,
@ -97,11 +86,7 @@ dev_app = app = Starlette(
middleware=middleware, middleware=middleware,
routes=routes, routes=routes,
) )
dev_app.mount("/", GraphQL( dev_app.mount(
schema, "/",
debug=True, GraphQL(schema, debug=True),
websocket_handler=GraphQLTransportWSHandler(
on_connect=on_connect,
on_disconnect=on_disconnect
) )
))

View File

@ -2,7 +2,7 @@ from datetime import datetime, timezone
from dateutil.parser import parse as date_parse from dateutil.parser import parse as date_parse
from base.orm import local_session from services.db import local_session
from migration.html2text import html2text from migration.html2text import html2text
from orm.reaction import Reaction, ReactionKind from orm.reaction import Reaction, ReactionKind
from orm.shout import ShoutReactionsFollower from orm.shout import ShoutReactionsFollower
@ -15,34 +15,28 @@ ts = datetime.now(tz=timezone.utc)
def auto_followers(session, topics, reaction_dict): def auto_followers(session, topics, reaction_dict):
# creating shout's reactions following for reaction author # creating shout's reactions following for reaction author
following1 = session.query( following1 = (
ShoutReactionsFollower session.query(ShoutReactionsFollower)
).where( .where(ShoutReactionsFollower.follower == reaction_dict["createdBy"])
ShoutReactionsFollower.follower == reaction_dict["createdBy"] .filter(ShoutReactionsFollower.shout == reaction_dict["shout"])
).filter( .first()
ShoutReactionsFollower.shout == reaction_dict["shout"] )
).first()
if not following1: if not following1:
following1 = ShoutReactionsFollower.create( following1 = ShoutReactionsFollower.create(
follower=reaction_dict["createdBy"], follower=reaction_dict["createdBy"], shout=reaction_dict["shout"], auto=True
shout=reaction_dict["shout"],
auto=True
) )
session.add(following1) session.add(following1)
# creating topics followings for reaction author # creating topics followings for reaction author
for t in topics: for t in topics:
tf = session.query( tf = (
TopicFollower session.query(TopicFollower)
).where( .where(TopicFollower.follower == reaction_dict["createdBy"])
TopicFollower.follower == reaction_dict["createdBy"] .filter(TopicFollower.topic == t["id"])
).filter( .first()
TopicFollower.topic == t['id'] )
).first()
if not tf: if not tf:
topic_following = TopicFollower.create( topic_following = TopicFollower.create(
follower=reaction_dict["createdBy"], follower=reaction_dict["createdBy"], topic=t["id"], auto=True
topic=t['id'],
auto=True
) )
session.add(topic_following) session.add(topic_following)
@ -68,18 +62,15 @@ def migrate_ratings(session, entry, reaction_dict):
try: try:
# creating reaction from old rating # creating reaction from old rating
rr = Reaction.create(**re_reaction_dict) rr = Reaction.create(**re_reaction_dict)
following2 = session.query( following2 = (
ShoutReactionsFollower session.query(ShoutReactionsFollower)
).where( .where(ShoutReactionsFollower.follower == re_reaction_dict["createdBy"])
ShoutReactionsFollower.follower == re_reaction_dict['createdBy'] .filter(ShoutReactionsFollower.shout == rr.shout)
).filter( .first()
ShoutReactionsFollower.shout == rr.shout )
).first()
if not following2: if not following2:
following2 = ShoutReactionsFollower.create( following2 = ShoutReactionsFollower.create(
follower=re_reaction_dict['createdBy'], follower=re_reaction_dict["createdBy"], shout=rr.shout, auto=True
shout=rr.shout,
auto=True
) )
session.add(following2) session.add(following2)
session.add(rr) session.add(rr)
@ -150,9 +141,11 @@ async def migrate(entry, storage):
else: else:
stage = "author and old id found" stage = "author and old id found"
try: try:
shout = session.query( shout = (
Shout session.query(Shout)
).where(Shout.slug == old_shout["slug"]).one() .where(Shout.slug == old_shout["slug"])
.one()
)
if shout: if shout:
reaction_dict["shout"] = shout.id reaction_dict["shout"] = shout.id
reaction_dict["createdBy"] = author.id if author else 1 reaction_dict["createdBy"] = author.id if author else 1
@ -178,9 +171,9 @@ async def migrate(entry, storage):
def migrate_2stage(old_comment, idmap): def migrate_2stage(old_comment, idmap):
if old_comment.get('body'): if old_comment.get("body"):
new_id = idmap.get(old_comment.get('oid')) new_id = idmap.get(old_comment.get("oid"))
new_id = idmap.get(old_comment.get('_id')) new_id = idmap.get(old_comment.get("_id"))
if new_id: if new_id:
new_replyto_id = None new_replyto_id = None
old_replyto_id = old_comment.get("replyTo") old_replyto_id = old_comment.get("replyTo")
@ -190,17 +183,22 @@ def migrate_2stage(old_comment, idmap):
comment = session.query(Reaction).where(Reaction.id == new_id).first() comment = session.query(Reaction).where(Reaction.id == new_id).first()
try: try:
if new_replyto_id: if new_replyto_id:
new_reply = session.query(Reaction).where(Reaction.id == new_replyto_id).first() new_reply = (
session.query(Reaction)
.where(Reaction.id == new_replyto_id)
.first()
)
if not new_reply: if not new_reply:
print(new_replyto_id) print(new_replyto_id)
raise Exception("cannot find reply by id!") raise Exception("cannot find reply by id!")
comment.replyTo = new_reply.id comment.replyTo = new_reply.id
session.add(comment) session.add(comment)
srf = session.query(ShoutReactionsFollower).where( srf = (
ShoutReactionsFollower.shout == comment.shout session.query(ShoutReactionsFollower)
).filter( .where(ShoutReactionsFollower.shout == comment.shout)
ShoutReactionsFollower.follower == comment.createdBy .filter(ShoutReactionsFollower.follower == comment.createdBy)
).first() .first()
)
if not srf: if not srf:
srf = ShoutReactionsFollower.create( srf = ShoutReactionsFollower.create(
shout=comment.shout, follower=comment.createdBy, auto=True shout=comment.shout, follower=comment.createdBy, auto=True

View File

@ -3,13 +3,13 @@ import json
from dateutil.parser import parse as date_parse from dateutil.parser import parse as date_parse
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
from transliterate import translit from transliterate import translit
from base.orm import local_session from services.db import local_session
from migration.extract import extract_html, extract_media from migration.extract import extract_html, extract_media
from orm.reaction import Reaction, ReactionKind from orm.reaction import Reaction, ReactionKind
from orm.shout import Shout, ShoutTopic, ShoutReactionsFollower from orm.shout import Shout, ShoutTopic, ShoutReactionsFollower
from orm.user import User from orm.user import User
from orm.topic import TopicFollower, Topic from orm.topic import TopicFollower, Topic
from services.stat.viewed import ViewedStorage from services.viewed import ViewedStorage
import re import re
OLD_DATE = "2016-03-05 22:22:00.350000" OLD_DATE = "2016-03-05 22:22:00.350000"
@ -33,7 +33,7 @@ def get_shout_slug(entry):
slug = friend.get("slug", "") slug = friend.get("slug", "")
if slug: if slug:
break break
slug = re.sub('[^0-9a-zA-Z]+', '-', slug) slug = re.sub("[^0-9a-zA-Z]+", "-", slug)
return slug return slug
@ -41,28 +41,30 @@ def create_author_from_app(app):
user = None user = None
userdata = None userdata = None
# check if email is used # check if email is used
if app['email']: if app["email"]:
with local_session() as session: with local_session() as session:
user = session.query(User).where(User.email == app['email']).first() user = session.query(User).where(User.email == app["email"]).first()
if not user: if not user:
# print('[migration] app %r' % app) # print('[migration] app %r' % app)
name = app.get('name') name = app.get("name")
if name: if name:
slug = translit(name, "ru", reversed=True).lower() slug = translit(name, "ru", reversed=True).lower()
slug = re.sub('[^0-9a-zA-Z]+', '-', slug) slug = re.sub("[^0-9a-zA-Z]+", "-", slug)
print('[migration] created slug %s' % slug) print("[migration] created slug %s" % slug)
# check if slug is used # check if slug is used
if slug: if slug:
user = session.query(User).where(User.slug == slug).first() user = session.query(User).where(User.slug == slug).first()
# get slug from email # get slug from email
if user: if user:
slug = app['email'].split('@')[0] slug = app["email"].split("@")[0]
user = session.query(User).where(User.slug == slug).first() user = session.query(User).where(User.slug == slug).first()
# one more try # one more try
if user: if user:
slug += '-author' slug += "-author"
user = session.query(User).where(User.slug == slug).first() user = (
session.query(User).where(User.slug == slug).first()
)
# create user with application data # create user with application data
if not user: if not user:
@ -80,7 +82,7 @@ def create_author_from_app(app):
user = User.create(**userdata) user = User.create(**userdata)
session.add(user) session.add(user)
session.commit() session.commit()
userdata['id'] = user.id userdata["id"] = user.id
userdata = user.dict() userdata = user.dict()
return userdata return userdata
@ -92,13 +94,16 @@ async def create_shout(shout_dict):
s = Shout.create(**shout_dict) s = Shout.create(**shout_dict)
author = s.authors[0] author = s.authors[0]
with local_session() as session: with local_session() as session:
srf = session.query(ShoutReactionsFollower).where( srf = (
ShoutReactionsFollower.shout == s.id session.query(ShoutReactionsFollower)
).filter( .where(ShoutReactionsFollower.shout == s.id)
ShoutReactionsFollower.follower == author.id .filter(ShoutReactionsFollower.follower == author.id)
).first() .first()
)
if not srf: if not srf:
srf = ShoutReactionsFollower.create(shout=s.id, follower=author.id, auto=True) srf = ShoutReactionsFollower.create(
shout=s.id, follower=author.id, auto=True
)
session.add(srf) session.add(srf)
session.commit() session.commit()
return s return s
@ -117,14 +122,14 @@ async def get_user(entry, storage):
elif user_oid: elif user_oid:
userdata = storage["users"]["by_oid"].get(user_oid) userdata = storage["users"]["by_oid"].get(user_oid)
if not userdata: if not userdata:
print('no userdata by oid, anonymous') print("no userdata by oid, anonymous")
userdata = anondict userdata = anondict
print(app) print(app)
# cleanup slug # cleanup slug
if userdata: if userdata:
slug = userdata.get("slug", "") slug = userdata.get("slug", "")
if slug: if slug:
slug = re.sub('[^0-9a-zA-Z]+', '-', slug) slug = re.sub("[^0-9a-zA-Z]+", "-", slug)
userdata["slug"] = slug userdata["slug"] = slug
else: else:
userdata = anondict userdata = anondict
@ -138,23 +143,30 @@ async def migrate(entry, storage):
r = { r = {
"layout": type2layout[entry["type"]], "layout": type2layout[entry["type"]],
"title": entry["title"], "title": entry["title"],
"authors": [author, ], "authors": [
author,
],
"slug": get_shout_slug(entry), "slug": get_shout_slug(entry),
"cover": ( "cover": (
"https://assets.discours.io/unsafe/1600x/" + "https://assets.discours.io/unsafe/1600x/" + entry["thumborId"]
entry["thumborId"] if entry.get("thumborId") else entry.get("image", {}).get("url") if entry.get("thumborId")
else entry.get("image", {}).get("url")
), ),
"visibility": "public" if entry.get("published") else "authors", "visibility": "public" if entry.get("published") else "authors",
"publishedAt": date_parse(entry.get("publishedAt")) if entry.get("published") else None, "publishedAt": date_parse(entry.get("publishedAt"))
"deletedAt": date_parse(entry.get("deletedAt")) if entry.get("deletedAt") else None, if entry.get("published")
else None,
"deletedAt": date_parse(entry.get("deletedAt"))
if entry.get("deletedAt")
else None,
"createdAt": date_parse(entry.get("createdAt", OLD_DATE)), "createdAt": date_parse(entry.get("createdAt", OLD_DATE)),
"updatedAt": date_parse(entry["updatedAt"]) if "updatedAt" in entry else ts, "updatedAt": date_parse(entry["updatedAt"]) if "updatedAt" in entry else ts,
"topics": await add_topics_follower(entry, storage, author), "topics": await add_topics_follower(entry, storage, author),
"body": extract_html(entry, cleanup=True) "body": extract_html(entry, cleanup=True),
} }
# main topic patch # main topic patch
r['mainTopic'] = r['topics'][0] r["mainTopic"] = r["topics"][0]
# published author auto-confirm # published author auto-confirm
if entry.get("published"): if entry.get("published"):
@ -177,14 +189,16 @@ async def migrate(entry, storage):
shout_dict["oid"] = entry.get("_id", "") shout_dict["oid"] = entry.get("_id", "")
shout = await create_shout(shout_dict) shout = await create_shout(shout_dict)
except IntegrityError as e: except IntegrityError as e:
print('[migration] create_shout integrity error', e) print("[migration] create_shout integrity error", e)
shout = await resolve_create_shout(shout_dict) shout = await resolve_create_shout(shout_dict)
except Exception as e: except Exception as e:
raise Exception(e) raise Exception(e)
# udpate data # udpate data
shout_dict = shout.dict() shout_dict = shout.dict()
shout_dict["authors"] = [author.dict(), ] shout_dict["authors"] = [
author.dict(),
]
# shout topics aftermath # shout topics aftermath
shout_dict["topics"] = await topics_aftermath(r, storage) shout_dict["topics"] = await topics_aftermath(r, storage)
@ -193,7 +207,9 @@ async def migrate(entry, storage):
await content_ratings_to_reactions(entry, shout_dict["slug"]) await content_ratings_to_reactions(entry, shout_dict["slug"])
# shout views # shout views
await ViewedStorage.increment(shout_dict["slug"], amount=entry.get("views", 1), viewer='old-discours') await ViewedStorage.increment(
shout_dict["slug"], amount=entry.get("views", 1), viewer="old-discours"
)
# del shout_dict['ratings'] # del shout_dict['ratings']
storage["shouts"]["by_oid"][entry["_id"]] = shout_dict storage["shouts"]["by_oid"][entry["_id"]] = shout_dict
@ -205,7 +221,9 @@ async def add_topics_follower(entry, storage, user):
topics = set([]) topics = set([])
category = entry.get("category") category = entry.get("category")
topics_by_oid = storage["topics"]["by_oid"] topics_by_oid = storage["topics"]["by_oid"]
oids = [category, ] + entry.get("tags", []) oids = [
category,
] + entry.get("tags", [])
for toid in oids: for toid in oids:
tslug = topics_by_oid.get(toid, {}).get("slug") tslug = topics_by_oid.get(toid, {}).get("slug")
if tslug: if tslug:
@ -217,23 +235,20 @@ async def add_topics_follower(entry, storage, user):
try: try:
tpc = session.query(Topic).where(Topic.slug == tpcslug).first() tpc = session.query(Topic).where(Topic.slug == tpcslug).first()
if tpc: if tpc:
tf = session.query( tf = (
TopicFollower session.query(TopicFollower)
).where( .where(TopicFollower.follower == user.id)
TopicFollower.follower == user.id .filter(TopicFollower.topic == tpc.id)
).filter( .first()
TopicFollower.topic == tpc.id )
).first()
if not tf: if not tf:
tf = TopicFollower.create( tf = TopicFollower.create(
topic=tpc.id, topic=tpc.id, follower=user.id, auto=True
follower=user.id,
auto=True
) )
session.add(tf) session.add(tf)
session.commit() session.commit()
except IntegrityError: except IntegrityError:
print('[migration.shout] hidden by topic ' + tpc.slug) print("[migration.shout] hidden by topic " + tpc.slug)
# main topic # main topic
maintopic = storage["replacements"].get(topics_by_oid.get(category, {}).get("slug")) maintopic = storage["replacements"].get(topics_by_oid.get(category, {}).get("slug"))
if maintopic in ttt: if maintopic in ttt:
@ -254,7 +269,7 @@ async def process_user(userdata, storage, oid):
if not user: if not user:
try: try:
slug = userdata["slug"].lower().strip() slug = userdata["slug"].lower().strip()
slug = re.sub('[^0-9a-zA-Z]+', '-', slug) slug = re.sub("[^0-9a-zA-Z]+", "-", slug)
userdata["slug"] = slug userdata["slug"] = slug
user = User.create(**userdata) user = User.create(**userdata)
session.add(user) session.add(user)
@ -263,7 +278,9 @@ async def process_user(userdata, storage, oid):
print(f"[migration] user creating with slug {userdata['slug']}") print(f"[migration] user creating with slug {userdata['slug']}")
print("[migration] from userdata") print("[migration] from userdata")
print(userdata) print(userdata)
raise Exception("[migration] cannot create user in content_items.get_user()") raise Exception(
"[migration] cannot create user in content_items.get_user()"
)
if user.id == 946: if user.id == 946:
print("[migration] ***************** ALPINA") print("[migration] ***************** ALPINA")
if user.id == 2: if user.id == 2:
@ -282,9 +299,9 @@ async def resolve_create_shout(shout_dict):
s = session.query(Shout).filter(Shout.slug == shout_dict["slug"]).first() s = session.query(Shout).filter(Shout.slug == shout_dict["slug"]).first()
bump = False bump = False
if s: if s:
if s.createdAt != shout_dict['createdAt']: if s.createdAt != shout_dict["createdAt"]:
# create new with different slug # create new with different slug
shout_dict["slug"] += '-' + shout_dict["layout"] shout_dict["slug"] += "-" + shout_dict["layout"]
try: try:
await create_shout(shout_dict) await create_shout(shout_dict)
except IntegrityError as e: except IntegrityError as e:
@ -375,7 +392,7 @@ async def content_ratings_to_reactions(entry, slug):
if content_rating["value"] > 0 if content_rating["value"] > 0
else ReactionKind.DISLIKE, else ReactionKind.DISLIKE,
"createdBy": rater.id, "createdBy": rater.id,
"shout": shout.id "shout": shout.id,
} }
reaction = ( reaction = (
session.query(Reaction) session.query(Reaction)
@ -385,7 +402,11 @@ async def content_ratings_to_reactions(entry, slug):
.first() .first()
) )
if reaction: if reaction:
k = ReactionKind.AGREE if content_rating["value"] > 0 else ReactionKind.DISAGREE k = (
ReactionKind.AGREE
if content_rating["value"] > 0
else ReactionKind.DISAGREE
)
reaction_dict["kind"] = k reaction_dict["kind"] = k
reaction.update(reaction_dict) reaction.update(reaction_dict)
session.add(reaction) session.add(reaction)

View File

@ -1,36 +1,28 @@
from base.orm import local_session from services.db import local_session
from migration.extract import extract_md from migration.extract import extract_md
from migration.html2text import html2text from migration.html2text import html2text
from orm.reaction import Reaction, ReactionKind from orm.reaction import Reaction, ReactionKind
def migrate(entry, storage): def migrate(entry, storage):
post_oid = entry['contentItem'] post_oid = entry["contentItem"]
print(post_oid) print(post_oid)
shout_dict = storage['shouts']['by_oid'].get(post_oid) shout_dict = storage["shouts"]["by_oid"].get(post_oid)
if shout_dict: if shout_dict:
print(shout_dict['body']) print(shout_dict["body"])
remark = { remark = {
"shout": shout_dict['id'], "shout": shout_dict["id"],
"body": extract_md( "body": extract_md(html2text(entry["body"]), shout_dict),
html2text(entry['body']), "kind": ReactionKind.REMARK,
shout_dict
),
"kind": ReactionKind.REMARK
} }
if entry.get('textBefore'): if entry.get("textBefore"):
remark['range'] = str( remark["range"] = (
shout_dict['body'] str(shout_dict["body"].index(entry["textBefore"] or ""))
.index( + ":"
entry['textBefore'] or '' + str(
) shout_dict["body"].index(entry["textAfter"] or "")
) + ':' + str( + len(entry["textAfter"] or "")
shout_dict['body']
.index(
entry['textAfter'] or ''
) + len(
entry['textAfter'] or ''
) )
) )

View File

@ -1,4 +1,4 @@
from base.orm import local_session from services.db import local_session
from migration.extract import extract_md from migration.extract import extract_md
from migration.html2text import html2text from migration.html2text import html2text
from orm import Topic from orm import Topic
@ -10,7 +10,7 @@ def migrate(entry):
"slug": entry["slug"], "slug": entry["slug"],
"oid": entry["_id"], "oid": entry["_id"],
"title": entry["title"].replace(" ", " "), "title": entry["title"].replace(" ", " "),
"body": extract_md(html2text(body_orig)) "body": extract_md(html2text(body_orig)),
} }
with local_session() as session: with local_session() as session:

View File

@ -4,7 +4,7 @@ from bs4 import BeautifulSoup
from dateutil.parser import parse from dateutil.parser import parse
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
from base.orm import local_session from services.db import local_session
from orm.user import AuthorFollower, User, UserRating from orm.user import AuthorFollower, User, UserRating
@ -19,12 +19,13 @@ def migrate(entry):
"username": email, "username": email,
"email": email, "email": email,
"createdAt": parse(entry["createdAt"]), "createdAt": parse(entry["createdAt"]),
"emailConfirmed": ("@discours.io" in email) or bool(entry["emails"][0]["verified"]), "emailConfirmed": ("@discours.io" in email)
or bool(entry["emails"][0]["verified"]),
"muted": False, # amnesty "muted": False, # amnesty
"bio": entry["profile"].get("bio", ""), "bio": entry["profile"].get("bio", ""),
"links": [], "links": [],
"name": "anonymous", "name": "anonymous",
"password": entry["services"]["password"].get("bcrypt") "password": entry["services"]["password"].get("bcrypt"),
} }
if "updatedAt" in entry: if "updatedAt" in entry:
@ -34,9 +35,13 @@ def migrate(entry):
if entry.get("profile"): if entry.get("profile"):
# slug # slug
slug = entry["profile"].get("path").lower() slug = entry["profile"].get("path").lower()
slug = re.sub('[^0-9a-zA-Z]+', '-', slug).strip() slug = re.sub("[^0-9a-zA-Z]+", "-", slug).strip()
user_dict["slug"] = slug user_dict["slug"] = slug
bio = (entry.get("profile", {"bio": ""}).get("bio") or "").replace('\(', '(').replace('\)', ')') bio = (
(entry.get("profile", {"bio": ""}).get("bio") or "")
.replace("\(", "(")
.replace("\)", ")")
)
bio_text = BeautifulSoup(bio, features="lxml").text bio_text = BeautifulSoup(bio, features="lxml").text
if len(bio_text) > 120: if len(bio_text) > 120:
@ -115,7 +120,7 @@ def post_migrate():
"slug": "old-discours", "slug": "old-discours",
"username": "old-discours", "username": "old-discours",
"email": "old@discours.io", "email": "old@discours.io",
"name": "Просмотры на старой версии сайта" "name": "Просмотры на старой версии сайта",
} }
with local_session() as session: with local_session() as session:
@ -148,11 +153,9 @@ def migrate_2stage(entry, id_map):
} }
user_rating = UserRating.create(**user_rating_dict) user_rating = UserRating.create(**user_rating_dict)
if user_rating_dict['value'] > 0: if user_rating_dict["value"] > 0:
af = AuthorFollower.create( af = AuthorFollower.create(
author=user.id, author=user.id, follower=rater.id, auto=True
follower=rater.id,
auto=True
) )
session.add(af) session.add(af)
session.add(user_rating) session.add(user_rating)

View File

@ -1,4 +1,4 @@
from base.orm import Base, engine from services.db import Base, engine
from orm.community import Community from orm.community import Community
from orm.notification import Notification from orm.notification import Notification
from orm.rbac import Operation, Resource, Permission, Role from orm.rbac import Operation, Resource, Permission, Role

View File

@ -2,7 +2,7 @@ from datetime import datetime
from sqlalchemy import Column, DateTime, ForeignKey, String from sqlalchemy import Column, DateTime, ForeignKey, String
from base.orm import Base from services.db import Base
class ShoutCollection(Base): class ShoutCollection(Base):

View File

@ -1,7 +1,7 @@
from datetime import datetime from datetime import datetime
from sqlalchemy import Column, String, ForeignKey, DateTime from sqlalchemy import Column, String, ForeignKey, DateTime
from base.orm import Base, local_session from services.db import Base, local_session
class CommunityFollower(Base): class CommunityFollower(Base):
@ -30,12 +30,10 @@ class Community(Base):
@staticmethod @staticmethod
def init_table(): def init_table():
with local_session() as session: with local_session() as session:
d = ( d = session.query(Community).filter(Community.slug == "discours").first()
session.query(Community).filter(Community.slug == "discours").first()
)
if not d: if not d:
d = Community.create(name="Дискурс", slug="discours") d = Community.create(name="Дискурс", slug="discours")
session.add(d) session.add(d)
session.commit() session.commit()
Community.default_community = d Community.default_community = d
print('[orm] default community id: %s' % d.id) print("[orm] default community id: %s" % d.id)

View File

@ -1,6 +1,6 @@
from datetime import datetime from datetime import datetime
from sqlalchemy import Column, String, JSON, ForeignKey, DateTime, Boolean from sqlalchemy import Column, String, JSON, ForeignKey, DateTime, Boolean
from base.orm import Base from services.db import Base
class Notification(Base): class Notification(Base):

View File

@ -3,7 +3,7 @@ import warnings
from sqlalchemy import String, Column, ForeignKey, UniqueConstraint, TypeDecorator from sqlalchemy import String, Column, ForeignKey, UniqueConstraint, TypeDecorator
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from base.orm import Base, REGISTRY, engine, local_session from services.db import Base, REGISTRY, engine, local_session
# Role Based Access Control # # Role Based Access Control #
@ -130,7 +130,16 @@ class Resource(Base):
@staticmethod @staticmethod
def init_table(): def init_table():
with local_session() as session: with local_session() as session:
for res in ["shout", "topic", "reaction", "chat", "message", "invite", "community", "user"]: for res in [
"shout",
"topic",
"reaction",
"chat",
"message",
"invite",
"community",
"user",
]:
r = session.query(Resource).filter(Resource.name == res).first() r = session.query(Resource).filter(Resource.name == res).first()
if not r: if not r:
r = Resource.create(name=res, resourceClass=res) r = Resource.create(name=res, resourceClass=res)

View File

@ -3,7 +3,7 @@ from enum import Enum as Enumeration
from sqlalchemy import Column, DateTime, Enum, ForeignKey, String from sqlalchemy import Column, DateTime, Enum, ForeignKey, String
from base.orm import Base from services.db import Base
class ReactionKind(Enumeration): class ReactionKind(Enumeration):
@ -30,11 +30,17 @@ class Reaction(Base):
createdAt = Column( createdAt = Column(
DateTime, nullable=False, default=datetime.now, comment="Created at" DateTime, nullable=False, default=datetime.now, comment="Created at"
) )
createdBy = Column(ForeignKey("user.id"), nullable=False, index=True, comment="Sender") createdBy = Column(
ForeignKey("user.id"), nullable=False, index=True, comment="Sender"
)
updatedAt = Column(DateTime, nullable=True, comment="Updated at") updatedAt = Column(DateTime, nullable=True, comment="Updated at")
updatedBy = Column(ForeignKey("user.id"), nullable=True, index=True, comment="Last Editor") updatedBy = Column(
ForeignKey("user.id"), nullable=True, index=True, comment="Last Editor"
)
deletedAt = Column(DateTime, nullable=True, comment="Deleted at") deletedAt = Column(DateTime, nullable=True, comment="Deleted at")
deletedBy = Column(ForeignKey("user.id"), nullable=True, index=True, comment="Deleted by") deletedBy = Column(
ForeignKey("user.id"), nullable=True, index=True, comment="Deleted by"
)
shout = Column(ForeignKey("shout.id"), nullable=False, index=True) shout = Column(ForeignKey("shout.id"), nullable=False, index=True)
replyTo = Column( replyTo = Column(
ForeignKey("reaction.id"), nullable=True, comment="Reply to reaction ID" ForeignKey("reaction.id"), nullable=True, comment="Reply to reaction ID"

View File

@ -3,7 +3,7 @@ from datetime import datetime
from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, String, JSON from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, String, JSON
from sqlalchemy.orm import column_property, relationship from sqlalchemy.orm import column_property, relationship
from base.orm import Base, local_session from services.db import Base, local_session
from orm.reaction import Reaction from orm.reaction import Reaction
from orm.topic import Topic from orm.topic import Topic
from orm.user import User from orm.user import User
@ -43,7 +43,9 @@ class Shout(Base):
__tablename__ = "shout" __tablename__ = "shout"
# timestamps # timestamps
createdAt = Column(DateTime, nullable=False, default=datetime.now, comment="Created at") createdAt = Column(
DateTime, nullable=False, default=datetime.now, comment="Created at"
)
updatedAt = Column(DateTime, nullable=True, comment="Updated at") updatedAt = Column(DateTime, nullable=True, comment="Updated at")
publishedAt = Column(DateTime, nullable=True) publishedAt = Column(DateTime, nullable=True)
deletedAt = Column(DateTime, nullable=True) deletedAt = Column(DateTime, nullable=True)
@ -72,7 +74,7 @@ class Shout(Base):
# TODO: these field should be used or modified # TODO: these field should be used or modified
community = Column(ForeignKey("community.id"), default=1) community = Column(ForeignKey("community.id"), default=1)
lang = Column(String, nullable=False, default='ru', comment="Language") lang = Column(String, nullable=False, default="ru", comment="Language")
mainTopic = Column(ForeignKey("topic.slug"), nullable=True) mainTopic = Column(ForeignKey("topic.slug"), nullable=True)
visibility = Column(String, nullable=True) # owner authors community public visibility = Column(String, nullable=True) # owner authors community public
versionOf = Column(ForeignKey("shout.id"), nullable=True) versionOf = Column(ForeignKey("shout.id"), nullable=True)
@ -87,7 +89,7 @@ class Shout(Base):
"slug": "genesis-block", "slug": "genesis-block",
"body": "", "body": "",
"title": "Ничего", "title": "Ничего",
"lang": "ru" "lang": "ru",
} }
s = Shout.create(**entry) s = Shout.create(**entry)
session.add(s) session.add(s)

View File

@ -2,7 +2,7 @@ from datetime import datetime
from sqlalchemy import Boolean, Column, DateTime, ForeignKey, String from sqlalchemy import Boolean, Column, DateTime, ForeignKey, String
from base.orm import Base from services.db import Base
class TopicFollower(Base): class TopicFollower(Base):
@ -24,7 +24,5 @@ class Topic(Base):
title = Column(String, nullable=False, comment="Title") title = Column(String, nullable=False, comment="Title")
body = Column(String, nullable=True, comment="Body") body = Column(String, nullable=True, comment="Body")
pic = Column(String, nullable=True, comment="Picture") pic = Column(String, nullable=True, comment="Picture")
community = Column( community = Column(ForeignKey("community.id"), default=1, comment="Community")
ForeignKey("community.id"), default=1, comment="Community"
)
oid = Column(String, nullable=True, comment="Old ID") oid = Column(String, nullable=True, comment="Old ID")

View File

@ -3,7 +3,7 @@ from datetime import datetime
from sqlalchemy import JSON as JSONType from sqlalchemy import JSON as JSONType
from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, String from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, String
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from base.orm import Base, local_session from services.db import Base, local_session
from orm.rbac import Role from orm.rbac import Role

View File

@ -8,62 +8,36 @@ from resolvers.auth import (
get_current_user, get_current_user,
) )
from resolvers.create.migrate import markdown_body from resolvers.migrate import markdown_body
from resolvers.create.editor import create_shout, delete_shout, update_shout from resolvers.editor import create_shout, delete_shout, update_shout
from resolvers.profile import (
from resolvers.zine.profile import (
load_authors_by, load_authors_by,
rate_user, rate_user,
update_profile, update_profile,
get_authors_all get_authors_all,
) )
from resolvers.zine.reactions import ( from resolvers.topics import (
topics_all,
topics_by_community,
topics_by_author,
topic_follow,
topic_unfollow,
get_topic,
)
from resolvers.reactions import (
create_reaction, create_reaction,
delete_reaction, delete_reaction,
update_reaction, update_reaction,
reactions_unfollow, reactions_unfollow,
reactions_follow, reactions_follow,
load_reactions_by load_reactions_by,
)
from resolvers.zine.topics import (
topic_follow,
topic_unfollow,
topics_by_author,
topics_by_community,
topics_all,
get_topic
) )
from resolvers.zine.following import ( from resolvers.following import follow, unfollow
follow,
unfollow
)
from resolvers.zine.load import ( from resolvers.load import load_shout, load_shouts_by
load_shout,
load_shouts_by
)
from resolvers.inbox.chats import (
create_chat,
delete_chat,
update_chat
)
from resolvers.inbox.messages import (
create_message,
delete_message,
update_message,
message_generator,
mark_as_read
)
from resolvers.inbox.load import (
load_chats,
load_messages_by,
load_recipients
)
from resolvers.inbox.search import search_recipients
__all__ = [ __all__ = [
# auth # auth
@ -74,12 +48,12 @@ __all__ = [
"auth_send_link", "auth_send_link",
"sign_out", "sign_out",
"get_current_user", "get_current_user",
# zine.profile # profile
"load_authors_by", "load_authors_by",
"rate_user", "rate_user",
"update_profile", "update_profile",
"get_authors_all", "get_authors_all",
# zine.load # load
"load_shout", "load_shout",
"load_shouts_by", "load_shouts_by",
# zine.following # zine.following
@ -90,7 +64,7 @@ __all__ = [
"update_shout", "update_shout",
"delete_shout", "delete_shout",
"markdown_body", "markdown_body",
# zine.topics # topics
"topics_all", "topics_all",
"topics_by_community", "topics_by_community",
"topics_by_author", "topics_by_author",
@ -104,17 +78,4 @@ __all__ = [
"update_reaction", "update_reaction",
"delete_reaction", "delete_reaction",
"load_reactions_by", "load_reactions_by",
# inbox
"load_chats",
"load_messages_by",
"create_chat",
"delete_chat",
"update_chat",
"create_message",
"delete_message",
"update_message",
"message_generator",
"mark_as_read",
"load_recipients",
"search_recipients"
] ]

View File

@ -13,10 +13,15 @@ from auth.email import send_auth_email
from auth.identity import Identity, Password from auth.identity import Identity, Password
from auth.jwtcodec import JWTCodec from auth.jwtcodec import JWTCodec
from auth.tokenstorage import TokenStorage from auth.tokenstorage import TokenStorage
from base.exceptions import (BaseHttpException, InvalidPassword, InvalidToken, from services.exceptions import (
ObjectNotExist, Unauthorized) BaseHttpException,
from base.orm import local_session InvalidPassword,
from base.resolvers import mutation, query InvalidToken,
ObjectNotExist,
Unauthorized,
)
from services.db import local_session
from services.schema import mutation, query
from orm import Role, User from orm import Role, User
from resolvers.zine.profile import user_subscriptions from resolvers.zine.profile import user_subscriptions
from settings import SESSION_TOKEN_HEADER, FRONTEND_URL from settings import SESSION_TOKEN_HEADER, FRONTEND_URL
@ -44,7 +49,7 @@ async def get_current_user(_, info):
async def confirm_email(_, info, token): async def confirm_email(_, info, token):
"""confirm owning email address""" """confirm owning email address"""
try: try:
print('[resolvers.auth] confirm email by token') print("[resolvers.auth] confirm email by token")
payload = JWTCodec.decode(token) payload = JWTCodec.decode(token)
user_id = payload.user_id user_id = payload.user_id
await TokenStorage.get(f"{user_id}-{payload.username}-{token}") await TokenStorage.get(f"{user_id}-{payload.username}-{token}")
@ -58,7 +63,7 @@ async def confirm_email(_, info, token):
return { return {
"token": session_token, "token": session_token,
"user": user, "user": user,
"news": await user_subscriptions(user.id) "news": await user_subscriptions(user.id),
} }
except InvalidToken as e: except InvalidToken as e:
raise InvalidToken(e.message) raise InvalidToken(e.message)
@ -71,9 +76,9 @@ async def confirm_email_handler(request):
token = request.path_params["token"] # one time token = request.path_params["token"] # one time
request.session["token"] = token request.session["token"] = token
res = await confirm_email(None, {}, token) res = await confirm_email(None, {}, token)
print('[resolvers.auth] confirm_email request: %r' % request) print("[resolvers.auth] confirm_email request: %r" % request)
if "error" in res: if "error" in res:
raise BaseHttpException(res['error']) raise BaseHttpException(res["error"])
else: else:
response = RedirectResponse(url=FRONTEND_URL) response = RedirectResponse(url=FRONTEND_URL)
response.set_cookie("token", res["token"]) # session token response.set_cookie("token", res["token"]) # session token
@ -90,22 +95,22 @@ def create_user(user_dict):
def generate_unique_slug(src): def generate_unique_slug(src):
print('[resolvers.auth] generating slug from: ' + src) print("[resolvers.auth] generating slug from: " + src)
slug = translit(src, "ru", reversed=True).replace(".", "-").lower() slug = translit(src, "ru", reversed=True).replace(".", "-").lower()
slug = re.sub('[^0-9a-zA-Z]+', '-', slug) slug = re.sub("[^0-9a-zA-Z]+", "-", slug)
if slug != src: if slug != src:
print('[resolvers.auth] translited name: ' + slug) print("[resolvers.auth] translited name: " + slug)
c = 1 c = 1
with local_session() as session: with local_session() as session:
user = session.query(User).where(User.slug == slug).first() user = session.query(User).where(User.slug == slug).first()
while user: while user:
user = session.query(User).where(User.slug == slug).first() user = session.query(User).where(User.slug == slug).first()
slug = slug + '-' + str(c) slug = slug + "-" + str(c)
c += 1 c += 1
if not user: if not user:
unique_slug = slug unique_slug = slug
print('[resolvers.auth] ' + unique_slug) print("[resolvers.auth] " + unique_slug)
return quote_plus(unique_slug.replace('\'', '')).replace('+', '-') return quote_plus(unique_slug.replace("'", "")).replace("+", "-")
@mutation.field("registerUser") @mutation.field("registerUser")
@ -120,12 +125,12 @@ async def register_by_email(_, _info, email: str, password: str = "", name: str
slug = generate_unique_slug(name) slug = generate_unique_slug(name)
user = session.query(User).where(User.slug == slug).first() user = session.query(User).where(User.slug == slug).first()
if user: if user:
slug = generate_unique_slug(email.split('@')[0]) slug = generate_unique_slug(email.split("@")[0])
user_dict = { user_dict = {
"email": email, "email": email,
"username": email, # will be used to store phone number or some messenger network id "username": email, # will be used to store phone number or some messenger network id
"name": name, "name": name,
"slug": slug "slug": slug,
} }
if password: if password:
user_dict["password"] = Password.encode(password) user_dict["password"] = Password.encode(password)
@ -182,7 +187,9 @@ async def login(_, info, email: str, password: str = "", lang: str = "ru"):
} }
except InvalidPassword: except InvalidPassword:
print(f"[auth] {email}: invalid password") print(f"[auth] {email}: invalid password")
raise InvalidPassword("invalid password") # contains webserver status raise InvalidPassword(
"invalid password"
) # contains webserver status
# return {"error": "invalid password"} # return {"error": "invalid password"}

View File

@ -5,8 +5,8 @@ from sqlalchemy.orm import joinedload
from auth.authenticate import login_required from auth.authenticate import login_required
from auth.credentials import AuthCredentials from auth.credentials import AuthCredentials
from base.orm import local_session from services.db import local_session
from base.resolvers import mutation from services.schema import mutation
from orm.shout import Shout, ShoutAuthor, ShoutTopic from orm.shout import Shout, ShoutAuthor, ShoutTopic
from orm.topic import Topic from orm.topic import Topic
from resolvers.zine.reactions import reactions_follow, reactions_unfollow from resolvers.zine.reactions import reactions_follow, reactions_unfollow
@ -18,21 +18,25 @@ async def create_shout(_, info, inp):
auth: AuthCredentials = info.context["request"].auth auth: AuthCredentials = info.context["request"].auth
with local_session() as session: with local_session() as session:
topics = session.query(Topic).filter(Topic.slug.in_(inp.get('topics', []))).all() topics = (
session.query(Topic).filter(Topic.slug.in_(inp.get("topics", []))).all()
)
new_shout = Shout.create(**{ new_shout = Shout.create(
**{
"title": inp.get("title"), "title": inp.get("title"),
"subtitle": inp.get('subtitle'), "subtitle": inp.get("subtitle"),
"lead": inp.get('lead'), "lead": inp.get("lead"),
"description": inp.get('description'), "description": inp.get("description"),
"body": inp.get("body", ''), "body": inp.get("body", ""),
"layout": inp.get("layout"), "layout": inp.get("layout"),
"authors": inp.get("authors", []), "authors": inp.get("authors", []),
"slug": inp.get("slug"), "slug": inp.get("slug"),
"mainTopic": inp.get("mainTopic"), "mainTopic": inp.get("mainTopic"),
"visibility": "owner", "visibility": "owner",
"createdBy": auth.user_id "createdBy": auth.user_id,
}) }
)
for topic in topics: for topic in topics:
t = ShoutTopic.create(topic=topic.id, shout=new_shout.id) t = ShoutTopic.create(topic=topic.id, shout=new_shout.id)
@ -64,10 +68,15 @@ async def update_shout(_, info, shout_id, shout_input=None, publish=False):
auth: AuthCredentials = info.context["request"].auth auth: AuthCredentials = info.context["request"].auth
with local_session() as session: with local_session() as session:
shout = session.query(Shout).options( shout = (
session.query(Shout)
.options(
joinedload(Shout.authors), joinedload(Shout.authors),
joinedload(Shout.topics), joinedload(Shout.topics),
).filter(Shout.id == shout_id).first() )
.filter(Shout.id == shout_id)
.first()
)
if not shout: if not shout:
return {"error": "shout not found"} return {"error": "shout not found"}
@ -82,7 +91,9 @@ async def update_shout(_, info, shout_id, shout_input=None, publish=False):
del shout_input["topics"] del shout_input["topics"]
new_topics_to_link = [] new_topics_to_link = []
new_topics = [topic_input for topic_input in topics_input if topic_input["id"] < 0] new_topics = [
topic_input for topic_input in topics_input if topic_input["id"] < 0
]
for new_topic in new_topics: for new_topic in new_topics:
del new_topic["id"] del new_topic["id"]
@ -94,24 +105,40 @@ async def update_shout(_, info, shout_id, shout_input=None, publish=False):
session.commit() session.commit()
for new_topic_to_link in new_topics_to_link: for new_topic_to_link in new_topics_to_link:
created_unlinked_topic = ShoutTopic.create(shout=shout.id, topic=new_topic_to_link.id) created_unlinked_topic = ShoutTopic.create(
shout=shout.id, topic=new_topic_to_link.id
)
session.add(created_unlinked_topic) session.add(created_unlinked_topic)
existing_topics_input = [topic_input for topic_input in topics_input if topic_input.get("id", 0) > 0] existing_topics_input = [
existing_topic_to_link_ids = [existing_topic_input["id"] for existing_topic_input in existing_topics_input topic_input
if existing_topic_input["id"] not in [topic.id for topic in shout.topics]] for topic_input in topics_input
if topic_input.get("id", 0) > 0
]
existing_topic_to_link_ids = [
existing_topic_input["id"]
for existing_topic_input in existing_topics_input
if existing_topic_input["id"]
not in [topic.id for topic in shout.topics]
]
for existing_topic_to_link_id in existing_topic_to_link_ids: for existing_topic_to_link_id in existing_topic_to_link_ids:
created_unlinked_topic = ShoutTopic.create(shout=shout.id, topic=existing_topic_to_link_id) created_unlinked_topic = ShoutTopic.create(
shout=shout.id, topic=existing_topic_to_link_id
)
session.add(created_unlinked_topic) session.add(created_unlinked_topic)
topic_to_unlink_ids = [topic.id for topic in shout.topics topic_to_unlink_ids = [
if topic.id not in [topic_input["id"] for topic_input in existing_topics_input]] topic.id
for topic in shout.topics
if topic.id
not in [topic_input["id"] for topic_input in existing_topics_input]
]
shout_topics_to_remove = session.query(ShoutTopic).filter( shout_topics_to_remove = session.query(ShoutTopic).filter(
and_( and_(
ShoutTopic.shout == shout.id, ShoutTopic.shout == shout.id,
ShoutTopic.topic.in_(topic_to_unlink_ids) ShoutTopic.topic.in_(topic_to_unlink_ids),
) )
) )
@ -120,13 +147,13 @@ async def update_shout(_, info, shout_id, shout_input=None, publish=False):
shout_input["mainTopic"] = shout_input["mainTopic"]["slug"] shout_input["mainTopic"] = shout_input["mainTopic"]["slug"]
if shout_input["mainTopic"] == '': if shout_input["mainTopic"] == "":
del shout_input["mainTopic"] del shout_input["mainTopic"]
shout.update(shout_input) shout.update(shout_input)
updated = True updated = True
if publish and shout.visibility == 'owner': if publish and shout.visibility == "owner":
shout.visibility = "community" shout.visibility = "community"
shout.publishedAt = datetime.now(tz=timezone.utc) shout.publishedAt = datetime.now(tz=timezone.utc)
updated = True updated = True

View File

@ -1,8 +1,9 @@
import asyncio import asyncio
from base.orm import local_session from services.db import local_session
from base.resolvers import mutation, subscription from services.schema import mutation, subscription
from auth.authenticate import login_required from auth.authenticate import login_required
from auth.credentials import AuthCredentials from auth.credentials import AuthCredentials
# from resolvers.community import community_follow, community_unfollow # from resolvers.community import community_follow, community_unfollow
from orm.user import AuthorFollower from orm.user import AuthorFollower
from orm.topic import TopicFollower from orm.topic import TopicFollower
@ -22,20 +23,20 @@ async def follow(_, info, what, slug):
try: try:
if what == "AUTHOR": if what == "AUTHOR":
if author_follow(auth.user_id, slug): if author_follow(auth.user_id, slug):
result = FollowingResult("NEW", 'author', slug) result = FollowingResult("NEW", "author", slug)
await FollowingManager.push('author', result) await FollowingManager.push("author", result)
elif what == "TOPIC": elif what == "TOPIC":
if topic_follow(auth.user_id, slug): if topic_follow(auth.user_id, slug):
result = FollowingResult("NEW", 'topic', slug) result = FollowingResult("NEW", "topic", slug)
await FollowingManager.push('topic', result) await FollowingManager.push("topic", result)
elif what == "COMMUNITY": elif what == "COMMUNITY":
if False: # TODO: use community_follow(auth.user_id, slug): if False: # TODO: use community_follow(auth.user_id, slug):
result = FollowingResult("NEW", 'community', slug) result = FollowingResult("NEW", "community", slug)
await FollowingManager.push('community', result) await FollowingManager.push("community", result)
elif what == "REACTIONS": elif what == "REACTIONS":
if reactions_follow(auth.user_id, slug): if reactions_follow(auth.user_id, slug):
result = FollowingResult("NEW", 'shout', slug) result = FollowingResult("NEW", "shout", slug)
await FollowingManager.push('shout', result) await FollowingManager.push("shout", result)
except Exception as e: except Exception as e:
print(Exception(e)) print(Exception(e))
return {"error": str(e)} return {"error": str(e)}
@ -51,20 +52,20 @@ async def unfollow(_, info, what, slug):
try: try:
if what == "AUTHOR": if what == "AUTHOR":
if author_unfollow(auth.user_id, slug): if author_unfollow(auth.user_id, slug):
result = FollowingResult("DELETED", 'author', slug) result = FollowingResult("DELETED", "author", slug)
await FollowingManager.push('author', result) await FollowingManager.push("author", result)
elif what == "TOPIC": elif what == "TOPIC":
if topic_unfollow(auth.user_id, slug): if topic_unfollow(auth.user_id, slug):
result = FollowingResult("DELETED", 'topic', slug) result = FollowingResult("DELETED", "topic", slug)
await FollowingManager.push('topic', result) await FollowingManager.push("topic", result)
elif what == "COMMUNITY": elif what == "COMMUNITY":
if False: # TODO: use community_unfollow(auth.user_id, slug): if False: # TODO: use community_unfollow(auth.user_id, slug):
result = FollowingResult("DELETED", 'community', slug) result = FollowingResult("DELETED", "community", slug)
await FollowingManager.push('community', result) await FollowingManager.push("community", result)
elif what == "REACTIONS": elif what == "REACTIONS":
if reactions_unfollow(auth.user_id, slug): if reactions_unfollow(auth.user_id, slug):
result = FollowingResult("DELETED", 'shout', slug) result = FollowingResult("DELETED", "shout", slug)
await FollowingManager.push('shout', result) await FollowingManager.push("shout", result)
except Exception as e: except Exception as e:
return {"error": str(e)} return {"error": str(e)}
@ -82,23 +83,29 @@ async def shout_generator(_, info: GraphQLResolveInfo):
tasks = [] tasks = []
with local_session() as session: with local_session() as session:
# notify new shout by followed authors # notify new shout by followed authors
following_topics = session.query(TopicFollower).where(TopicFollower.follower == user_id).all() following_topics = (
session.query(TopicFollower)
.where(TopicFollower.follower == user_id)
.all()
)
for topic_id in following_topics: for topic_id in following_topics:
following_topic = Following('topic', topic_id) following_topic = Following("topic", topic_id)
await FollowingManager.register('topic', following_topic) await FollowingManager.register("topic", following_topic)
following_topic_task = following_topic.queue.get() following_topic_task = following_topic.queue.get()
tasks.append(following_topic_task) tasks.append(following_topic_task)
# by followed topics # by followed topics
following_authors = session.query(AuthorFollower).where( following_authors = (
AuthorFollower.follower == user_id).all() session.query(AuthorFollower)
.where(AuthorFollower.follower == user_id)
.all()
)
for author_id in following_authors: for author_id in following_authors:
following_author = Following('author', author_id) following_author = Following("author", author_id)
await FollowingManager.register('author', following_author) await FollowingManager.register("author", following_author)
following_author_task = following_author.queue.get() following_author_task = following_author.queue.get()
tasks.append(following_author_task) tasks.append(following_author_task)
@ -128,15 +135,18 @@ async def reaction_generator(_, info):
user_id = auth.user_id user_id = auth.user_id
try: try:
with local_session() as session: with local_session() as session:
followings = session.query(ShoutReactionsFollower.shout).where( followings = (
ShoutReactionsFollower.follower == user_id).unique() session.query(ShoutReactionsFollower.shout)
.where(ShoutReactionsFollower.follower == user_id)
.unique()
)
# notify new reaction # notify new reaction
tasks = [] tasks = []
for shout_id in followings: for shout_id in followings:
following_shout = Following('shout', shout_id) following_shout = Following("shout", shout_id)
await FollowingManager.register('shout', following_shout) await FollowingManager.register("shout", following_shout)
following_author_task = following_shout.queue.get() following_author_task = following_shout.queue.get()
tasks.append(following_author_task) tasks.append(following_author_task)

View File

@ -1,124 +0,0 @@
import json
import uuid
from datetime import datetime, timezone
from auth.authenticate import login_required
from auth.credentials import AuthCredentials
from base.redis import redis
from base.resolvers import mutation
from validations.inbox import Chat
@mutation.field("updateChat")
@login_required
async def update_chat(_, info, chat_new: Chat):
"""
updating chat
requires info["request"].user.slug to be in chat["admins"]
:param info: GraphQLInfo with request
:param chat_new: dict with chat data
:return: Result { error chat }
"""
auth: AuthCredentials = info.context["request"].auth
chat_id = chat_new["id"]
chat = await redis.execute("GET", f"chats/{chat_id}")
if not chat:
return {
"error": "chat not exist"
}
chat = dict(json.loads(chat))
# TODO
if auth.user_id in chat["admins"]:
chat.update({
"title": chat_new.get("title", chat["title"]),
"description": chat_new.get("description", chat["description"]),
"updatedAt": int(datetime.now(tz=timezone.utc).timestamp()),
"admins": chat_new.get("admins", chat.get("admins") or []),
"users": chat_new.get("users", chat["users"])
})
await redis.execute("SET", f"chats/{chat.id}", json.dumps(chat))
await redis.execute("COMMIT")
return {
"error": None,
"chat": chat
}
@mutation.field("createChat")
@login_required
async def create_chat(_, info, title="", members=[]):
auth: AuthCredentials = info.context["request"].auth
chat = {}
print('create_chat members: %r' % members)
if auth.user_id not in members:
members.append(int(auth.user_id))
# reuse chat craeted before if exists
if len(members) == 2 and title == "":
chat = None
print(members)
chatset1 = await redis.execute("SMEMBERS", f"chats_by_user/{members[0]}")
if not chatset1:
chatset1 = set([])
print(chatset1)
chatset2 = await redis.execute("SMEMBERS", f"chats_by_user/{members[1]}")
if not chatset2:
chatset2 = set([])
print(chatset2)
chatset = chatset1.intersection(chatset2)
print(chatset)
for c in chatset:
chat = await redis.execute("GET", f"chats/{c.decode('utf-8')}")
if chat:
chat = json.loads(chat)
if chat['title'] == "":
print('[inbox] createChat found old chat')
print(chat)
break
if chat:
return {
"chat": chat,
"error": "existed"
}
chat_id = str(uuid.uuid4())
chat = {
"id": chat_id,
"users": members,
"title": title,
"createdBy": auth.user_id,
"createdAt": int(datetime.now(tz=timezone.utc).timestamp()),
"updatedAt": int(datetime.now(tz=timezone.utc).timestamp()),
"admins": members if (len(members) == 2 and title == "") else []
}
for m in members:
await redis.execute("SADD", f"chats_by_user/{m}", chat_id)
await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat))
await redis.execute("SET", f"chats/{chat_id}/next_message_id", str(0))
await redis.execute("COMMIT")
return {
"error": None,
"chat": chat
}
@mutation.field("deleteChat")
@login_required
async def delete_chat(_, info, chat_id: str):
auth: AuthCredentials = info.context["request"].auth
chat = await redis.execute("GET", f"/chats/{chat_id}")
if chat:
chat = dict(json.loads(chat))
if auth.user_id in chat['admins']:
await redis.execute("DEL", f"chats/{chat_id}")
await redis.execute("SREM", "chats_by_user/" + str(auth.user_id), chat_id)
await redis.execute("COMMIT")
else:
return {
"error": "chat not exist"
}

View File

@ -1,152 +0,0 @@
import json
# from datetime import datetime, timedelta, timezone
from auth.authenticate import login_required
from auth.credentials import AuthCredentials
from base.redis import redis
from base.orm import local_session
from base.resolvers import query
from orm.user import User
from resolvers.zine.profile import followed_authors
from .unread import get_unread_counter
async def load_messages(chat_id: str, limit: int = 5, offset: int = 0, ids=[]):
''' load :limit messages for :chat_id with :offset '''
messages = []
message_ids = []
if ids:
message_ids += ids
try:
if limit:
mids = await redis.lrange(f"chats/{chat_id}/message_ids",
offset,
offset + limit
)
mids = [mid.decode("utf-8") for mid in mids]
message_ids += mids
except Exception as e:
print(e)
if message_ids:
message_keys = [f"chats/{chat_id}/messages/{mid}" for mid in message_ids]
messages = await redis.mget(*message_keys)
messages = [json.loads(msg.decode('utf-8')) for msg in messages]
replies = []
for m in messages:
rt = m.get('replyTo')
if rt:
rt = int(rt)
if rt not in message_ids:
replies.append(rt)
if replies:
messages += await load_messages(chat_id, limit=0, ids=replies)
return messages
@query.field("loadChats")
@login_required
async def load_chats(_, info, limit: int = 50, offset: int = 0):
""" load :limit chats of current user with :offset """
auth: AuthCredentials = info.context["request"].auth
cids = await redis.execute("SMEMBERS", "chats_by_user/" + str(auth.user_id))
if cids:
cids = list(cids)[offset:offset + limit]
if not cids:
print('[inbox.load] no chats were found')
cids = []
onliners = await redis.execute("SMEMBERS", "users-online")
if not onliners:
onliners = []
chats = []
for cid in cids:
cid = cid.decode("utf-8")
c = await redis.execute("GET", "chats/" + cid)
if c:
c = dict(json.loads(c))
c['messages'] = await load_messages(cid, 5, 0)
c['unread'] = await get_unread_counter(cid, auth.user_id)
with local_session() as session:
c['members'] = []
for uid in c["users"]:
a = session.query(User).where(User.id == uid).first()
if a:
c['members'].append({
"id": a.id,
"slug": a.slug,
"userpic": a.userpic,
"name": a.name,
"lastSeen": a.lastSeen,
"online": a.id in onliners
})
chats.append(c)
return {
"chats": chats,
"error": None
}
@query.field("loadMessagesBy")
@login_required
async def load_messages_by(_, info, by, limit: int = 10, offset: int = 0):
''' load :limit messages of :chat_id with :offset '''
auth: AuthCredentials = info.context["request"].auth
userchats = await redis.execute("SMEMBERS", "chats_by_user/" + str(auth.user_id))
userchats = [c.decode('utf-8') for c in userchats]
# print('[inbox] userchats: %r' % userchats)
if userchats:
# print('[inbox] loading messages by...')
messages = []
by_chat = by.get('chat')
if by_chat in userchats:
chat = await redis.execute("GET", f"chats/{by_chat}")
# print(chat)
if not chat:
return {
"messages": [],
"error": "chat not exist"
}
# everyone's messages in filtered chat
messages = await load_messages(by_chat, limit, offset)
return {
"messages": sorted(
list(messages),
key=lambda m: m['createdAt']
),
"error": None
}
else:
return {
"error": "Cannot access messages of this chat"
}
@query.field("loadRecipients")
async def load_recipients(_, info, limit=50, offset=0):
chat_users = []
auth: AuthCredentials = info.context["request"].auth
onliners = await redis.execute("SMEMBERS", "users-online")
if not onliners:
onliners = []
try:
chat_users += await followed_authors(auth.user_id)
limit = limit - len(chat_users)
except Exception:
pass
with local_session() as session:
chat_users += session.query(User).where(User.emailConfirmed).limit(limit).offset(offset)
members = []
for a in chat_users:
members.append({
"id": a.id,
"slug": a.slug,
"userpic": a.userpic,
"name": a.name,
"lastSeen": a.lastSeen,
"online": a.id in onliners
})
return {
"members": members,
"error": None
}

View File

@ -1,179 +0,0 @@
import asyncio
import json
from typing import Any
from datetime import datetime, timezone
from graphql.type import GraphQLResolveInfo
from auth.authenticate import login_required
from auth.credentials import AuthCredentials
from base.redis import redis
from base.resolvers import mutation, subscription
from services.following import FollowingManager, FollowingResult, Following
from validations.inbox import Message
@mutation.field("createMessage")
@login_required
async def create_message(_, info, chat: str, body: str, replyTo=None):
""" create message with :body for :chat_id replying to :replyTo optionally """
auth: AuthCredentials = info.context["request"].auth
chat = await redis.execute("GET", f"chats/{chat}")
if not chat:
return {
"error": "chat is not exist"
}
else:
chat = dict(json.loads(chat))
message_id = await redis.execute("GET", f"chats/{chat['id']}/next_message_id")
message_id = int(message_id)
new_message = {
"chatId": chat['id'],
"id": message_id,
"author": auth.user_id,
"body": body,
"createdAt": int(datetime.now(tz=timezone.utc).timestamp())
}
if replyTo:
new_message['replyTo'] = replyTo
chat['updatedAt'] = new_message['createdAt']
await redis.execute("SET", f"chats/{chat['id']}", json.dumps(chat))
print(f"[inbox] creating message {new_message}")
await redis.execute(
"SET", f"chats/{chat['id']}/messages/{message_id}", json.dumps(new_message)
)
await redis.execute("LPUSH", f"chats/{chat['id']}/message_ids", str(message_id))
await redis.execute("SET", f"chats/{chat['id']}/next_message_id", str(message_id + 1))
users = chat["users"]
for user_slug in users:
await redis.execute(
"LPUSH", f"chats/{chat['id']}/unread/{user_slug}", str(message_id)
)
result = FollowingResult("NEW", 'chat', new_message)
await FollowingManager.push('chat', result)
return {
"message": new_message,
"error": None
}
@mutation.field("updateMessage")
@login_required
async def update_message(_, info, chat_id: str, message_id: int, body: str):
auth: AuthCredentials = info.context["request"].auth
chat = await redis.execute("GET", f"chats/{chat_id}")
if not chat:
return {"error": "chat not exist"}
message = await redis.execute("GET", f"chats/{chat_id}/messages/{message_id}")
if not message:
return {"error": "message not exist"}
message = json.loads(message)
if message["author"] != auth.user_id:
return {"error": "access denied"}
message["body"] = body
message["updatedAt"] = int(datetime.now(tz=timezone.utc).timestamp())
await redis.execute("SET", f"chats/{chat_id}/messages/{message_id}", json.dumps(message))
result = FollowingResult("UPDATED", 'chat', message)
await FollowingManager.push('chat', result)
return {
"message": message,
"error": None
}
@mutation.field("deleteMessage")
@login_required
async def delete_message(_, info, chat_id: str, message_id: int):
auth: AuthCredentials = info.context["request"].auth
chat = await redis.execute("GET", f"chats/{chat_id}")
if not chat:
return {"error": "chat not exist"}
chat = json.loads(chat)
message = await redis.execute("GET", f"chats/{chat_id}/messages/{str(message_id)}")
if not message:
return {"error": "message not exist"}
message = json.loads(message)
if message["author"] != auth.user_id:
return {"error": "access denied"}
await redis.execute("LREM", f"chats/{chat_id}/message_ids", 0, str(message_id))
await redis.execute("DEL", f"chats/{chat_id}/messages/{str(message_id)}")
users = chat["users"]
for user_id in users:
await redis.execute("LREM", f"chats/{chat_id}/unread/{user_id}", 0, str(message_id))
result = FollowingResult("DELETED", 'chat', message)
await FollowingManager.push(result)
return {}
@mutation.field("markAsRead")
@login_required
async def mark_as_read(_, info, chat_id: str, messages: [int]):
auth: AuthCredentials = info.context["request"].auth
chat = await redis.execute("GET", f"chats/{chat_id}")
if not chat:
return {"error": "chat not exist"}
chat = json.loads(chat)
users = set(chat["users"])
if auth.user_id not in users:
return {"error": "access denied"}
for message_id in messages:
await redis.execute("LREM", f"chats/{chat_id}/unread/{auth.user_id}", 0, str(message_id))
return {
"error": None
}
@subscription.source("newMessage")
async def message_generator(_, info: GraphQLResolveInfo):
print(f"[resolvers.messages] generator {info}")
auth: AuthCredentials = info.context["request"].auth
user_id = auth.user_id
try:
user_following_chats = await redis.execute("GET", f"chats_by_user/{user_id}")
if user_following_chats:
user_following_chats = list(json.loads(user_following_chats)) # chat ids
else:
user_following_chats = []
tasks = []
updated = {}
for chat_id in user_following_chats:
chat = await redis.execute("GET", f"chats/{chat_id}")
updated[chat_id] = chat['updatedAt']
user_following_chats_sorted = sorted(user_following_chats, key=lambda x: updated[x], reverse=True)
for chat_id in user_following_chats_sorted:
following_chat = Following('chat', chat_id)
await FollowingManager.register('chat', following_chat)
chat_task = following_chat.queue.get()
tasks.append(chat_task)
while True:
msg = await asyncio.gather(*tasks)
yield msg
finally:
await FollowingManager.remove('chat', following_chat)
@subscription.field("newMessage")
@login_required
async def message_resolver(message: Message, info: Any):
return message

View File

@ -1,95 +0,0 @@
import json
from datetime import datetime, timezone, timedelta
from auth.authenticate import login_required
from auth.credentials import AuthCredentials
from base.redis import redis
from base.resolvers import query
from base.orm import local_session
from orm.user import AuthorFollower, User
from resolvers.inbox.load import load_messages
@query.field("searchRecipients")
@login_required
async def search_recipients(_, info, query: str, limit: int = 50, offset: int = 0):
result = []
# TODO: maybe redis scan?
auth: AuthCredentials = info.context["request"].auth
talk_before = await redis.execute("GET", f"/chats_by_user/{auth.user_id}")
if talk_before:
talk_before = list(json.loads(talk_before))[offset:offset + limit]
for chat_id in talk_before:
members = await redis.execute("GET", f"/chats/{chat_id}/users")
if members:
members = list(json.loads(members))
for member in members:
if member.startswith(query):
if member not in result:
result.append(member)
more_amount = limit - len(result)
with local_session() as session:
# followings
result += session.query(AuthorFollower.author).join(
User, User.id == AuthorFollower.follower
).where(
User.slug.startswith(query)
).offset(offset + len(result)).limit(more_amount)
more_amount = limit
# followers
result += session.query(AuthorFollower.follower).join(
User, User.id == AuthorFollower.author
).where(
User.slug.startswith(query)
).offset(offset + len(result)).limit(offset + len(result) + limit)
return {
"members": list(result),
"error": None
}
@query.field("searchMessages")
@login_required
async def search_user_chats(by, messages, user_id: int, limit, offset):
cids = set([])
cids.union(set(await redis.execute("SMEMBERS", "chats_by_user/" + str(user_id))))
messages = []
by_author = by.get('author')
if by_author:
# all author's messages
cids.union(set(await redis.execute("SMEMBERS", f"chats_by_user/{by_author}")))
# author's messages in filtered chat
messages.union(set(filter(lambda m: m["author"] == by_author, list(messages))))
for c in cids:
c = c.decode('utf-8')
messages = await load_messages(c, limit, offset)
body_like = by.get('body')
if body_like:
# search in all messages in all user's chats
for c in cids:
# FIXME: use redis scan here
c = c.decode('utf-8')
mmm = await load_messages(c, limit, offset)
for m in mmm:
if body_like in m["body"]:
messages.add(m)
else:
# search in chat's messages
messages.extend(filter(lambda m: body_like in m["body"], list(messages)))
days = by.get("days")
if days:
messages.extend(filter(
list(messages),
key=lambda m: (
datetime.now(tz=timezone.utc) - int(m["createdAt"]) < timedelta(days=by["days"])
)
))
return {
"messages": messages,
"error": None
}

View File

@ -1,22 +0,0 @@
from base.redis import redis
import json
async def get_unread_counter(chat_id: str, user_id: int):
try:
unread = await redis.execute("LLEN", f"chats/{chat_id.decode('utf-8')}/unread/{user_id}")
if unread:
return unread
except Exception:
return 0
async def get_total_unread_counter(user_id: int):
chats = await redis.execute("GET", f"chats_by_user/{str(user_id)}")
unread = 0
if chats:
chats = json.loads(chats)
for chat_id in chats:
n = await get_unread_counter(chat_id.decode('utf-8'), user_id)
unread += n
return unread

View File

@ -1,13 +1,13 @@
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from sqlalchemy.orm import joinedload, aliased from sqlalchemy.orm import joinedload, aliased
from sqlalchemy.sql.expression import desc, asc, select, func, case, and_, text, nulls_last from sqlalchemy.sql.expression import desc, asc, select, func, case, and_, nulls_last
from auth.authenticate import login_required from auth.authenticate import login_required
from auth.credentials import AuthCredentials from auth.credentials import AuthCredentials
from base.exceptions import ObjectNotExist, OperationNotAllowed from services.exceptions import ObjectNotExist
from base.orm import local_session from services.db import local_session
from base.resolvers import query from services.schema import query
from orm import TopicFollower from orm import TopicFollower
from orm.reaction import Reaction, ReactionKind from orm.reaction import Reaction, ReactionKind
from orm.shout import Shout, ShoutAuthor, ShoutTopic from orm.shout import Shout, ShoutAuthor, ShoutTopic
@ -18,16 +18,12 @@ def add_stat_columns(q):
aliased_reaction = aliased(Reaction) aliased_reaction = aliased(Reaction)
q = q.outerjoin(aliased_reaction).add_columns( q = q.outerjoin(aliased_reaction).add_columns(
func.sum(aliased_reaction.id).label("reacted_stat"),
func.sum( func.sum(
aliased_reaction.id case((aliased_reaction.kind == ReactionKind.COMMENT, 1), else_=0)
).label('reacted_stat'), ).label("commented_stat"),
func.sum( func.sum(
case( case(
(aliased_reaction.kind == ReactionKind.COMMENT, 1),
else_=0
)
).label('commented_stat'),
func.sum(case(
# do not count comments' reactions # do not count comments' reactions
(aliased_reaction.replyTo.is_not(None), 0), (aliased_reaction.replyTo.is_not(None), 0),
(aliased_reaction.kind == ReactionKind.AGREE, 1), (aliased_reaction.kind == ReactionKind.AGREE, 1),
@ -38,12 +34,16 @@ def add_stat_columns(q):
(aliased_reaction.kind == ReactionKind.REJECT, -1), (aliased_reaction.kind == ReactionKind.REJECT, -1),
(aliased_reaction.kind == ReactionKind.LIKE, 1), (aliased_reaction.kind == ReactionKind.LIKE, 1),
(aliased_reaction.kind == ReactionKind.DISLIKE, -1), (aliased_reaction.kind == ReactionKind.DISLIKE, -1),
else_=0) else_=0,
).label('rating_stat'), )
func.max(case( ).label("rating_stat"),
func.max(
case(
(aliased_reaction.kind != ReactionKind.COMMENT, None), (aliased_reaction.kind != ReactionKind.COMMENT, None),
else_=aliased_reaction.createdAt else_=aliased_reaction.createdAt,
)).label('last_comment')) )
).label("last_comment"),
)
return q return q
@ -60,7 +60,7 @@ def apply_filters(q, filters, user_id=None):
if filters.get("layout"): if filters.get("layout"):
q = q.filter(Shout.layout == filters.get("layout")) q = q.filter(Shout.layout == filters.get("layout"))
if filters.get('excludeLayout'): if filters.get("excludeLayout"):
q = q.filter(Shout.layout != filters.get("excludeLayout")) q = q.filter(Shout.layout != filters.get("excludeLayout"))
if filters.get("author"): if filters.get("author"):
q = q.filter(Shout.authors.any(slug=filters.get("author"))) q = q.filter(Shout.authors.any(slug=filters.get("author")))
@ -71,7 +71,9 @@ def apply_filters(q, filters, user_id=None):
if filters.get("body"): if filters.get("body"):
q = q.filter(Shout.body.ilike(f'%{filters.get("body")}%s')) q = q.filter(Shout.body.ilike(f'%{filters.get("body")}%s'))
if filters.get("days"): if filters.get("days"):
before = datetime.now(tz=timezone.utc) - timedelta(days=int(filters.get("days")) or 30) before = datetime.now(tz=timezone.utc) - timedelta(
days=int(filters.get("days")) or 30
)
q = q.filter(Shout.createdAt > before) q = q.filter(Shout.createdAt > before)
return q return q
@ -87,30 +89,32 @@ async def load_shout(_, info, slug=None, shout_id=None):
q = add_stat_columns(q) q = add_stat_columns(q)
if slug is not None: if slug is not None:
q = q.filter( q = q.filter(Shout.slug == slug)
Shout.slug == slug
)
if shout_id is not None: if shout_id is not None:
q = q.filter( q = q.filter(Shout.id == shout_id)
Shout.id == shout_id
)
q = q.filter( q = q.filter(Shout.deletedAt.is_(None)).group_by(Shout.id)
Shout.deletedAt.is_(None)
).group_by(Shout.id)
try: try:
[shout, reacted_stat, commented_stat, rating_stat, last_comment] = session.execute(q).first() [
shout,
reacted_stat,
commented_stat,
rating_stat,
last_comment,
] = session.execute(q).first()
shout.stat = { shout.stat = {
"viewed": shout.views, "viewed": shout.views,
"reacted": reacted_stat, "reacted": reacted_stat,
"commented": commented_stat, "commented": commented_stat,
"rating": rating_stat "rating": rating_stat,
} }
for author_caption in session.query(ShoutAuthor).join(Shout).where(Shout.slug == slug): for author_caption in (
session.query(ShoutAuthor).join(Shout).where(Shout.slug == slug)
):
for author in shout.authors: for author in shout.authors:
if author.id == author_caption.user: if author.id == author_caption.user:
author.caption = author_caption.caption author.caption = author_caption.caption
@ -142,14 +146,13 @@ async def load_shouts_by(_, info, options):
:return: Shout[] :return: Shout[]
""" """
q = select(Shout).options( q = (
select(Shout)
.options(
joinedload(Shout.authors), joinedload(Shout.authors),
joinedload(Shout.topics), joinedload(Shout.topics),
).where(
and_(
Shout.deletedAt.is_(None),
Shout.layout.is_not(None)
) )
.where(and_(Shout.deletedAt.is_(None), Shout.layout.is_not(None)))
) )
q = add_stat_columns(q) q = add_stat_columns(q)
@ -159,23 +162,36 @@ async def load_shouts_by(_, info, options):
order_by = options.get("order_by", Shout.publishedAt) order_by = options.get("order_by", Shout.publishedAt)
query_order_by = desc(order_by) if options.get('order_by_desc', True) else asc(order_by) query_order_by = (
desc(order_by) if options.get("order_by_desc", True) else asc(order_by)
)
offset = options.get("offset", 0) offset = options.get("offset", 0)
limit = options.get("limit", 10) limit = options.get("limit", 10)
q = q.group_by(Shout.id).order_by(nulls_last(query_order_by)).limit(limit).offset(offset) q = (
q.group_by(Shout.id)
.order_by(nulls_last(query_order_by))
.limit(limit)
.offset(offset)
)
shouts = [] shouts = []
with local_session() as session: with local_session() as session:
shouts_map = {} shouts_map = {}
for [shout, reacted_stat, commented_stat, rating_stat, last_comment] in session.execute(q).unique(): for [
shout,
reacted_stat,
commented_stat,
rating_stat,
last_comment,
] in session.execute(q).unique():
shouts.append(shout) shouts.append(shout)
shout.stat = { shout.stat = {
"viewed": shout.views, "viewed": shout.views,
"reacted": reacted_stat, "reacted": reacted_stat,
"commented": commented_stat, "commented": commented_stat,
"rating": rating_stat "rating": rating_stat,
} }
shouts_map[shout.id] = shout shouts_map[shout.id] = shout
@ -187,11 +203,13 @@ async def get_drafts(_, info):
auth: AuthCredentials = info.context["request"].auth auth: AuthCredentials = info.context["request"].auth
user_id = auth.user_id user_id = auth.user_id
q = select(Shout).options( q = (
select(Shout)
.options(
joinedload(Shout.authors), joinedload(Shout.authors),
joinedload(Shout.topics), joinedload(Shout.topics),
).where( )
and_(Shout.deletedAt.is_(None), Shout.createdBy == user_id) .where(and_(Shout.deletedAt.is_(None), Shout.createdBy == user_id))
) )
q = q.group_by(Shout.id) q = q.group_by(Shout.id)
@ -210,24 +228,26 @@ async def get_my_feed(_, info, options):
auth: AuthCredentials = info.context["request"].auth auth: AuthCredentials = info.context["request"].auth
user_id = auth.user_id user_id = auth.user_id
subquery = select(Shout.id).join( subquery = (
ShoutAuthor select(Shout.id)
).join( .join(ShoutAuthor)
AuthorFollower, AuthorFollower.follower == user_id .join(AuthorFollower, AuthorFollower.follower == user_id)
).join( .join(ShoutTopic)
ShoutTopic .join(TopicFollower, TopicFollower.follower == user_id)
).join(
TopicFollower, TopicFollower.follower == user_id
) )
q = select(Shout).options( q = (
select(Shout)
.options(
joinedload(Shout.authors), joinedload(Shout.authors),
joinedload(Shout.topics), joinedload(Shout.topics),
).where( )
.where(
and_( and_(
Shout.publishedAt.is_not(None), Shout.publishedAt.is_not(None),
Shout.deletedAt.is_(None), Shout.deletedAt.is_(None),
Shout.id.in_(subquery) Shout.id.in_(subquery),
)
) )
) )
@ -236,22 +256,35 @@ async def get_my_feed(_, info, options):
order_by = options.get("order_by", Shout.publishedAt) order_by = options.get("order_by", Shout.publishedAt)
query_order_by = desc(order_by) if options.get('order_by_desc', True) else asc(order_by) query_order_by = (
desc(order_by) if options.get("order_by_desc", True) else asc(order_by)
)
offset = options.get("offset", 0) offset = options.get("offset", 0)
limit = options.get("limit", 10) limit = options.get("limit", 10)
q = q.group_by(Shout.id).order_by(nulls_last(query_order_by)).limit(limit).offset(offset) q = (
q.group_by(Shout.id)
.order_by(nulls_last(query_order_by))
.limit(limit)
.offset(offset)
)
shouts = [] shouts = []
with local_session() as session: with local_session() as session:
shouts_map = {} shouts_map = {}
for [shout, reacted_stat, commented_stat, rating_stat, last_comment] in session.execute(q).unique(): for [
shout,
reacted_stat,
commented_stat,
rating_stat,
last_comment,
] in session.execute(q).unique():
shouts.append(shout) shouts.append(shout)
shout.stat = { shout.stat = {
"viewed": shout.views, "viewed": shout.views,
"reacted": reacted_stat, "reacted": reacted_stat,
"commented": commented_stat, "commented": commented_stat,
"rating": rating_stat "rating": rating_stat,
} }
shouts_map[shout.id] = shout shouts_map[shout.id] = shout

View File

@ -1,5 +1,4 @@
from services.schema import query
from base.resolvers import query
from resolvers.auth import login_required from resolvers.auth import login_required
from migration.extract import extract_md from migration.extract import extract_md

View File

@ -5,8 +5,8 @@ from sqlalchemy.orm import aliased, joinedload
from auth.authenticate import login_required from auth.authenticate import login_required
from auth.credentials import AuthCredentials from auth.credentials import AuthCredentials
from base.orm import local_session from services.orm import local_session
from base.resolvers import mutation, query from services.schema import mutation, query
from orm.reaction import Reaction from orm.reaction import Reaction
from orm.shout import ShoutAuthor, ShoutTopic from orm.shout import ShoutAuthor, ShoutTopic
from orm.topic import Topic from orm.topic import Topic
@ -23,36 +23,31 @@ def add_author_stat_columns(q, include_heavy_stat=False):
shout_author_aliased = aliased(ShoutAuthor) shout_author_aliased = aliased(ShoutAuthor)
q = q.outerjoin(shout_author_aliased).add_columns( q = q.outerjoin(shout_author_aliased).add_columns(
func.count(distinct(shout_author_aliased.shout)).label('shouts_stat') func.count(distinct(shout_author_aliased.shout)).label("shouts_stat")
) )
q = q.outerjoin(author_followers, author_followers.author == User.id).add_columns( q = q.outerjoin(author_followers, author_followers.author == User.id).add_columns(
func.count(distinct(author_followers.follower)).label('followers_stat') func.count(distinct(author_followers.follower)).label("followers_stat")
) )
q = q.outerjoin(author_following, author_following.follower == User.id).add_columns( q = q.outerjoin(author_following, author_following.follower == User.id).add_columns(
func.count(distinct(author_following.author)).label('followings_stat') func.count(distinct(author_following.author)).label("followings_stat")
) )
if include_heavy_stat: if include_heavy_stat:
user_rating_aliased = aliased(UserRating) user_rating_aliased = aliased(UserRating)
q = q.outerjoin(user_rating_aliased, user_rating_aliased.user == User.id).add_columns( q = q.outerjoin(
func.sum(user_rating_aliased.value).label('rating_stat') user_rating_aliased, user_rating_aliased.user == User.id
) ).add_columns(func.sum(user_rating_aliased.value).label("rating_stat"))
else: else:
q = q.add_columns(literal(-1).label('rating_stat')) q = q.add_columns(literal(-1).label("rating_stat"))
if include_heavy_stat: if include_heavy_stat:
q = q.outerjoin( q = q.outerjoin(
Reaction, Reaction, and_(Reaction.createdBy == User.id, Reaction.body.is_not(None))
and_( ).add_columns(func.count(distinct(Reaction.id)).label("commented_stat"))
Reaction.createdBy == User.id,
Reaction.body.is_not(None)
)).add_columns(
func.count(distinct(Reaction.id)).label('commented_stat')
)
else: else:
q = q.add_columns(literal(-1).label('commented_stat')) q = q.add_columns(literal(-1).label("commented_stat"))
q = q.group_by(User.id) q = q.group_by(User.id)
@ -60,13 +55,19 @@ def add_author_stat_columns(q, include_heavy_stat=False):
def add_stat(author, stat_columns): def add_stat(author, stat_columns):
[shouts_stat, followers_stat, followings_stat, rating_stat, commented_stat] = stat_columns [
shouts_stat,
followers_stat,
followings_stat,
rating_stat,
commented_stat,
] = stat_columns
author.stat = { author.stat = {
"shouts": shouts_stat, "shouts": shouts_stat,
"followers": followers_stat, "followers": followers_stat,
"followings": followings_stat, "followings": followings_stat,
"rating": rating_stat, "rating": rating_stat,
"commented": commented_stat "commented": commented_stat,
} }
return author return author
@ -84,9 +85,15 @@ def get_authors_from_query(q):
async def user_subscriptions(user_id: int): async def user_subscriptions(user_id: int):
return { return {
"unread": await get_total_unread_counter(user_id), # unread inbox messages counter "unread": await get_total_unread_counter(
"topics": [t.slug for t in await followed_topics(user_id)], # followed topics slugs user_id
"authors": [a.slug for a in await followed_authors(user_id)], # followed authors slugs ), # unread inbox messages counter
"topics": [
t.slug for t in await followed_topics(user_id)
], # followed topics slugs
"authors": [
a.slug for a in await followed_authors(user_id)
], # followed authors slugs
"reactions": await followed_reactions(user_id) "reactions": await followed_reactions(user_id)
# "communities": [c.slug for c in followed_communities(slug)], # communities # "communities": [c.slug for c in followed_communities(slug)], # communities
} }
@ -101,13 +108,12 @@ async def followed_discussions(_, info, user_id) -> List[Topic]:
async def followed_reactions(user_id): async def followed_reactions(user_id):
with local_session() as session: with local_session() as session:
user = session.query(User).where(User.id == user_id).first() user = session.query(User).where(User.id == user_id).first()
return session.query( return (
Reaction.shout session.query(Reaction.shout)
).where( .where(Reaction.createdBy == user.id)
Reaction.createdBy == user.id .filter(Reaction.createdAt > user.lastSeen)
).filter( .all()
Reaction.createdAt > user.lastSeen )
).all()
# dufok mod (^*^') : # dufok mod (^*^') :
@ -158,10 +164,10 @@ async def user_followers(_, _info, slug) -> List[User]:
q = add_author_stat_columns(q) q = add_author_stat_columns(q)
aliased_user = aliased(User) aliased_user = aliased(User)
q = q.join(AuthorFollower, AuthorFollower.follower == User.id).join( q = (
aliased_user, aliased_user.id == AuthorFollower.author q.join(AuthorFollower, AuthorFollower.follower == User.id)
).where( .join(aliased_user, aliased_user.id == AuthorFollower.author)
aliased_user.slug == slug .where(aliased_user.slug == slug)
) )
return get_authors_from_query(q) return get_authors_from_query(q)
@ -189,15 +195,10 @@ async def update_profile(_, info, profile):
with local_session() as session: with local_session() as session:
user = session.query(User).filter(User.id == user_id).one() user = session.query(User).filter(User.id == user_id).one()
if not user: if not user:
return { return {"error": "canoot find user"}
"error": "canoot find user"
}
user.update(profile) user.update(profile)
session.commit() session.commit()
return { return {"error": None, "author": user}
"error": None,
"author": user
}
@mutation.field("rateUser") @mutation.field("rateUser")
@ -208,7 +209,11 @@ async def rate_user(_, info, rated_userslug, value):
with local_session() as session: with local_session() as session:
rating = ( rating = (
session.query(UserRating) session.query(UserRating)
.filter(and_(UserRating.rater == auth.user_id, UserRating.user == rated_userslug)) .filter(
and_(
UserRating.rater == auth.user_id, UserRating.user == rated_userslug
)
)
.first() .first()
) )
if rating: if rating:
@ -239,13 +244,10 @@ def author_follow(user_id, slug):
def author_unfollow(user_id, slug): def author_unfollow(user_id, slug):
with local_session() as session: with local_session() as session:
flw = ( flw = (
session.query( session.query(AuthorFollower)
AuthorFollower .join(User, User.id == AuthorFollower.author)
).join(User, User.id == AuthorFollower.author).filter( .filter(and_(AuthorFollower.follower == user_id, User.slug == slug))
and_( .first()
AuthorFollower.follower == user_id, User.slug == slug
)
).first()
) )
if flw: if flw:
session.delete(flw) session.delete(flw)
@ -281,7 +283,12 @@ async def load_authors_by(_, info, by, limit, offset):
elif by.get("name"): elif by.get("name"):
q = q.filter(User.name.ilike(f"%{by['name']}%")) q = q.filter(User.name.ilike(f"%{by['name']}%"))
elif by.get("topic"): elif by.get("topic"):
q = q.join(ShoutAuthor).join(ShoutTopic).join(Topic).where(Topic.slug == by["topic"]) q = (
q.join(ShoutAuthor)
.join(ShoutTopic)
.join(Topic)
.where(Topic.slug == by["topic"])
)
if by.get("lastSeen"): # in days if by.get("lastSeen"): # in days
days_before = datetime.now(tz=timezone.utc) - timedelta(days=by["lastSeen"]) days_before = datetime.now(tz=timezone.utc) - timedelta(days=by["lastSeen"])
q = q.filter(User.lastSeen > days_before) q = q.filter(User.lastSeen > days_before)
@ -289,8 +296,6 @@ async def load_authors_by(_, info, by, limit, offset):
days_before = datetime.now(tz=timezone.utc) - timedelta(days=by["createdAt"]) days_before = datetime.now(tz=timezone.utc) - timedelta(days=by["createdAt"])
q = q.filter(User.createdAt > days_before) q = q.filter(User.createdAt > days_before)
q = q.order_by( q = q.order_by(by.get("order", User.createdAt)).limit(limit).offset(offset)
by.get("order", User.createdAt)
).limit(limit).offset(offset)
return get_authors_from_query(q) return get_authors_from_query(q)

View File

@ -4,9 +4,9 @@ from sqlalchemy.orm import aliased
from auth.authenticate import login_required from auth.authenticate import login_required
from auth.credentials import AuthCredentials from auth.credentials import AuthCredentials
from base.exceptions import OperationNotAllowed from services.exceptions import OperationNotAllowed
from base.orm import local_session from services.db import local_session
from base.resolvers import mutation, query from services.schema import mutation, query
from orm.reaction import Reaction, ReactionKind from orm.reaction import Reaction, ReactionKind
from orm.shout import Shout, ShoutReactionsFollower from orm.shout import Shout, ShoutReactionsFollower
from orm.user import User from orm.user import User
@ -15,17 +15,15 @@ from orm.user import User
def add_reaction_stat_columns(q): def add_reaction_stat_columns(q):
aliased_reaction = aliased(Reaction) aliased_reaction = aliased(Reaction)
q = q.outerjoin(aliased_reaction, Reaction.id == aliased_reaction.replyTo).add_columns( q = q.outerjoin(
func.sum( aliased_reaction, Reaction.id == aliased_reaction.replyTo
aliased_reaction.id ).add_columns(
).label('reacted_stat'), func.sum(aliased_reaction.id).label("reacted_stat"),
func.sum(case((aliased_reaction.body.is_not(None), 1), else_=0)).label(
"commented_stat"
),
func.sum( func.sum(
case( case(
(aliased_reaction.body.is_not(None), 1),
else_=0
)
).label('commented_stat'),
func.sum(case(
(aliased_reaction.kind == ReactionKind.AGREE, 1), (aliased_reaction.kind == ReactionKind.AGREE, 1),
(aliased_reaction.kind == ReactionKind.DISAGREE, -1), (aliased_reaction.kind == ReactionKind.DISAGREE, -1),
(aliased_reaction.kind == ReactionKind.PROOF, 1), (aliased_reaction.kind == ReactionKind.PROOF, 1),
@ -34,8 +32,10 @@ def add_reaction_stat_columns(q):
(aliased_reaction.kind == ReactionKind.REJECT, -1), (aliased_reaction.kind == ReactionKind.REJECT, -1),
(aliased_reaction.kind == ReactionKind.LIKE, 1), (aliased_reaction.kind == ReactionKind.LIKE, 1),
(aliased_reaction.kind == ReactionKind.DISLIKE, -1), (aliased_reaction.kind == ReactionKind.DISLIKE, -1),
else_=0) else_=0,
).label('rating_stat')) )
).label("rating_stat"),
)
return q return q
@ -46,17 +46,19 @@ def reactions_follow(user_id, shout_id: int, auto=False):
shout = session.query(Shout).where(Shout.id == shout_id).one() shout = session.query(Shout).where(Shout.id == shout_id).one()
following = ( following = (
session.query(ShoutReactionsFollower).where(and_( session.query(ShoutReactionsFollower)
.where(
and_(
ShoutReactionsFollower.follower == user_id, ShoutReactionsFollower.follower == user_id,
ShoutReactionsFollower.shout == shout.id, ShoutReactionsFollower.shout == shout.id,
)).first() )
)
.first()
) )
if not following: if not following:
following = ShoutReactionsFollower.create( following = ShoutReactionsFollower.create(
follower=user_id, follower=user_id, shout=shout.id, auto=auto
shout=shout.id,
auto=auto
) )
session.add(following) session.add(following)
session.commit() session.commit()
@ -71,10 +73,14 @@ def reactions_unfollow(user_id: int, shout_id: int):
shout = session.query(Shout).where(Shout.id == shout_id).one() shout = session.query(Shout).where(Shout.id == shout_id).one()
following = ( following = (
session.query(ShoutReactionsFollower).where(and_( session.query(ShoutReactionsFollower)
.where(
and_(
ShoutReactionsFollower.follower == user_id, ShoutReactionsFollower.follower == user_id,
ShoutReactionsFollower.shout == shout.id ShoutReactionsFollower.shout == shout.id,
)).first() )
)
.first()
) )
if following: if following:
@ -87,30 +93,31 @@ def reactions_unfollow(user_id: int, shout_id: int):
def is_published_author(session, user_id): def is_published_author(session, user_id):
''' checks if user has at least one publication ''' """checks if user has at least one publication"""
return session.query( return (
Shout session.query(Shout)
).where( .where(Shout.authors.contains(user_id))
Shout.authors.contains(user_id) .filter(and_(Shout.publishedAt.is_not(None), Shout.deletedAt.is_(None)))
).filter( .count()
and_( > 0
Shout.publishedAt.is_not(None),
Shout.deletedAt.is_(None)
) )
).count() > 0
def check_to_publish(session, user_id, reaction): def check_to_publish(session, user_id, reaction):
''' set shout to public if publicated approvers amount > 4 ''' """set shout to public if publicated approvers amount > 4"""
if not reaction.replyTo and reaction.kind in [ if not reaction.replyTo and reaction.kind in [
ReactionKind.ACCEPT, ReactionKind.ACCEPT,
ReactionKind.LIKE, ReactionKind.LIKE,
ReactionKind.PROOF ReactionKind.PROOF,
]: ]:
if is_published_author(user_id): if is_published_author(user_id):
# now count how many approvers are voted already # now count how many approvers are voted already
approvers_reactions = session.query(Reaction).where(Reaction.shout == reaction.shout).all() approvers_reactions = (
approvers = [user_id, ] session.query(Reaction).where(Reaction.shout == reaction.shout).all()
)
approvers = [
user_id,
]
for ar in approvers_reactions: for ar in approvers_reactions:
a = ar.createdBy a = ar.createdBy
if is_published_author(session, a): if is_published_author(session, a):
@ -121,20 +128,22 @@ def check_to_publish(session, user_id, reaction):
def check_to_hide(session, user_id, reaction): def check_to_hide(session, user_id, reaction):
''' hides any shout if 20% of reactions are negative ''' """hides any shout if 20% of reactions are negative"""
if not reaction.replyTo and reaction.kind in [ if not reaction.replyTo and reaction.kind in [
ReactionKind.REJECT, ReactionKind.REJECT,
ReactionKind.DISLIKE, ReactionKind.DISLIKE,
ReactionKind.DISPROOF ReactionKind.DISPROOF,
]: ]:
# if is_published_author(user): # if is_published_author(user):
approvers_reactions = session.query(Reaction).where(Reaction.shout == reaction.shout).all() approvers_reactions = (
session.query(Reaction).where(Reaction.shout == reaction.shout).all()
)
rejects = 0 rejects = 0
for r in approvers_reactions: for r in approvers_reactions:
if r.kind in [ if r.kind in [
ReactionKind.REJECT, ReactionKind.REJECT,
ReactionKind.DISLIKE, ReactionKind.DISLIKE,
ReactionKind.DISPROOF ReactionKind.DISPROOF,
]: ]:
rejects += 1 rejects += 1
if len(approvers_reactions) / rejects < 5: if len(approvers_reactions) / rejects < 5:
@ -145,14 +154,14 @@ def check_to_hide(session, user_id, reaction):
def set_published(session, shout_id): def set_published(session, shout_id):
s = session.query(Shout).where(Shout.id == shout_id).first() s = session.query(Shout).where(Shout.id == shout_id).first()
s.publishedAt = datetime.now(tz=timezone.utc) s.publishedAt = datetime.now(tz=timezone.utc)
s.visibility = text('public') s.visibility = text("public")
session.add(s) session.add(s)
session.commit() session.commit()
def set_hidden(session, shout_id): def set_hidden(session, shout_id):
s = session.query(Shout).where(Shout.id == shout_id).first() s = session.query(Shout).where(Shout.id == shout_id).first()
s.visibility = text('community') s.visibility = text("community")
session.add(s) session.add(s)
session.commit() session.commit()
@ -161,37 +170,46 @@ def set_hidden(session, shout_id):
@login_required @login_required
async def create_reaction(_, info, reaction): async def create_reaction(_, info, reaction):
auth: AuthCredentials = info.context["request"].auth auth: AuthCredentials = info.context["request"].auth
reaction['createdBy'] = auth.user_id reaction["createdBy"] = auth.user_id
rdict = {} rdict = {}
with local_session() as session: with local_session() as session:
shout = session.query(Shout).where(Shout.id == reaction["shout"]).one() shout = session.query(Shout).where(Shout.id == reaction["shout"]).one()
author = session.query(User).where(User.id == auth.user_id).one() author = session.query(User).where(User.id == auth.user_id).one()
if reaction["kind"] in [ if reaction["kind"] in [ReactionKind.DISLIKE.name, ReactionKind.LIKE.name]:
ReactionKind.DISLIKE.name, existing_reaction = (
ReactionKind.LIKE.name session.query(Reaction)
]: .where(
existing_reaction = session.query(Reaction).where(
and_( and_(
Reaction.shout == reaction["shout"], Reaction.shout == reaction["shout"],
Reaction.createdBy == auth.user_id, Reaction.createdBy == auth.user_id,
Reaction.kind == reaction["kind"], Reaction.kind == reaction["kind"],
Reaction.replyTo == reaction.get("replyTo") Reaction.replyTo == reaction.get("replyTo"),
)
)
.first()
) )
).first()
if existing_reaction is not None: if existing_reaction is not None:
raise OperationNotAllowed("You can't vote twice") raise OperationNotAllowed("You can't vote twice")
opposite_reaction_kind = ReactionKind.DISLIKE if reaction["kind"] == ReactionKind.LIKE.name else ReactionKind.LIKE opposite_reaction_kind = (
opposite_reaction = session.query(Reaction).where( ReactionKind.DISLIKE
if reaction["kind"] == ReactionKind.LIKE.name
else ReactionKind.LIKE
)
opposite_reaction = (
session.query(Reaction)
.where(
and_( and_(
Reaction.shout == reaction["shout"], Reaction.shout == reaction["shout"],
Reaction.createdBy == auth.user_id, Reaction.createdBy == auth.user_id,
Reaction.kind == opposite_reaction_kind, Reaction.kind == opposite_reaction_kind,
Reaction.replyTo == reaction.get("replyTo") Reaction.replyTo == reaction.get("replyTo"),
)
)
.first()
) )
).first()
if opposite_reaction is not None: if opposite_reaction is not None:
session.delete(opposite_reaction) session.delete(opposite_reaction)
@ -199,14 +217,18 @@ async def create_reaction(_, info, reaction):
r = Reaction.create(**reaction) r = Reaction.create(**reaction)
# Proposal accepting logix # Proposal accepting logix
if r.replyTo is not None and \ if (
r.kind == ReactionKind.ACCEPT and \ r.replyTo is not None
auth.user_id in shout.dict()['authors']: and r.kind == ReactionKind.ACCEPT
replied_reaction = session.query(Reaction).where(Reaction.id == r.replyTo).first() and auth.user_id in shout.dict()["authors"]
):
replied_reaction = (
session.query(Reaction).where(Reaction.id == r.replyTo).first()
)
if replied_reaction and replied_reaction.kind == ReactionKind.PROPOSE: if replied_reaction and replied_reaction.kind == ReactionKind.PROPOSE:
if replied_reaction.range: if replied_reaction.range:
old_body = shout.body old_body = shout.body
start, end = replied_reaction.range.split(':') start, end = replied_reaction.range.split(":")
start = int(start) start = int(start)
end = int(end) end = int(end)
new_body = old_body[:start] + replied_reaction.body + old_body[end:] new_body = old_body[:start] + replied_reaction.body + old_body[end:]
@ -216,8 +238,8 @@ async def create_reaction(_, info, reaction):
session.add(r) session.add(r)
session.commit() session.commit()
rdict = r.dict() rdict = r.dict()
rdict['shout'] = shout.dict() rdict["shout"] = shout.dict()
rdict['createdBy'] = author.dict() rdict["createdBy"] = author.dict()
# self-regulation mechanics # self-regulation mechanics
@ -231,11 +253,7 @@ async def create_reaction(_, info, reaction):
except Exception as e: except Exception as e:
print(f"[resolvers.reactions] error on reactions autofollowing: {e}") print(f"[resolvers.reactions] error on reactions autofollowing: {e}")
rdict['stat'] = { rdict["stat"] = {"commented": 0, "reacted": 0, "rating": 0}
"commented": 0,
"reacted": 0,
"rating": 0
}
return {"reaction": rdict} return {"reaction": rdict}
@ -250,7 +268,9 @@ async def update_reaction(_, info, id, reaction={}):
q = add_reaction_stat_columns(q) q = add_reaction_stat_columns(q)
q = q.group_by(Reaction.id) q = q.group_by(Reaction.id)
[r, reacted_stat, commented_stat, rating_stat] = session.execute(q).unique().one() [r, reacted_stat, commented_stat, rating_stat] = (
session.execute(q).unique().one()
)
if not r: if not r:
return {"error": "invalid reaction id"} return {"error": "invalid reaction id"}
@ -268,7 +288,7 @@ async def update_reaction(_, info, id, reaction={}):
r.stat = { r.stat = {
"commented": commented_stat, "commented": commented_stat,
"reacted": reacted_stat, "reacted": reacted_stat,
"rating": rating_stat "rating": rating_stat,
} }
return {"reaction": r} return {"reaction": r}
@ -286,17 +306,12 @@ async def delete_reaction(_, info, id):
if r.createdBy != auth.user_id: if r.createdBy != auth.user_id:
return {"error": "access denied"} return {"error": "access denied"}
if r.kind in [ if r.kind in [ReactionKind.LIKE, ReactionKind.DISLIKE]:
ReactionKind.LIKE,
ReactionKind.DISLIKE
]:
session.delete(r) session.delete(r)
else: else:
r.deletedAt = datetime.now(tz=timezone.utc) r.deletedAt = datetime.now(tz=timezone.utc)
session.commit() session.commit()
return { return {"reaction": r}
"reaction": r
}
@query.field("loadReactionsBy") @query.field("loadReactionsBy")
@ -317,12 +332,10 @@ async def load_reactions_by(_, _info, by, limit=50, offset=0):
:return: Reaction[] :return: Reaction[]
""" """
q = select( q = (
Reaction, User, Shout select(Reaction, User, Shout)
).join( .join(User, Reaction.createdBy == User.id)
User, Reaction.createdBy == User.id .join(Shout, Reaction.shout == Shout.id)
).join(
Shout, Reaction.shout == Shout.id
) )
if by.get("shout"): if by.get("shout"):
@ -340,7 +353,7 @@ async def load_reactions_by(_, _info, by, limit=50, offset=0):
if by.get("comment"): if by.get("comment"):
q = q.filter(func.length(Reaction.body) > 0) q = q.filter(func.length(Reaction.body) > 0)
if len(by.get('search', '')) > 2: if len(by.get("search", "")) > 2:
q = q.filter(Reaction.body.ilike(f'%{by["body"]}%')) q = q.filter(Reaction.body.ilike(f'%{by["body"]}%'))
if by.get("days"): if by.get("days"):
@ -348,13 +361,9 @@ async def load_reactions_by(_, _info, by, limit=50, offset=0):
q = q.filter(Reaction.createdAt > after) q = q.filter(Reaction.createdAt > after)
order_way = asc if by.get("sort", "").startswith("-") else desc order_way = asc if by.get("sort", "").startswith("-") else desc
order_field = by.get("sort", "").replace('-', '') or Reaction.createdAt order_field = by.get("sort", "").replace("-", "") or Reaction.createdAt
q = q.group_by( q = q.group_by(Reaction.id, User.id, Shout.id).order_by(order_way(order_field))
Reaction.id, User.id, Shout.id
).order_by(
order_way(order_field)
)
q = add_reaction_stat_columns(q) q = add_reaction_stat_columns(q)
@ -363,13 +372,20 @@ async def load_reactions_by(_, _info, by, limit=50, offset=0):
reactions = [] reactions = []
with local_session() as session: with local_session() as session:
for [reaction, user, shout, reacted_stat, commented_stat, rating_stat] in session.execute(q): for [
reaction,
user,
shout,
reacted_stat,
commented_stat,
rating_stat,
] in session.execute(q):
reaction.createdBy = user reaction.createdBy = user
reaction.shout = shout reaction.shout = shout
reaction.stat = { reaction.stat = {
"rating": rating_stat, "rating": rating_stat,
"commented": commented_stat, "commented": commented_stat,
"reacted": reacted_stat "reacted": reacted_stat,
} }
reaction.kind = reaction.kind.name reaction.kind = reaction.kind.name

View File

@ -2,8 +2,8 @@ from sqlalchemy import and_, select, distinct, func
from sqlalchemy.orm import aliased from sqlalchemy.orm import aliased
from auth.authenticate import login_required from auth.authenticate import login_required
from base.orm import local_session from services.db import local_session
from base.resolvers import mutation, query from services.schema import mutation, query
from orm.shout import ShoutTopic, ShoutAuthor from orm.shout import ShoutTopic, ShoutAuthor
from orm.topic import Topic, TopicFollower from orm.topic import Topic, TopicFollower
from orm import User from orm import User
@ -13,12 +13,19 @@ def add_topic_stat_columns(q):
aliased_shout_author = aliased(ShoutAuthor) aliased_shout_author = aliased(ShoutAuthor)
aliased_topic_follower = aliased(TopicFollower) aliased_topic_follower = aliased(TopicFollower)
q = q.outerjoin(ShoutTopic, Topic.id == ShoutTopic.topic).add_columns( q = (
func.count(distinct(ShoutTopic.shout)).label('shouts_stat') q.outerjoin(ShoutTopic, Topic.id == ShoutTopic.topic)
).outerjoin(aliased_shout_author, ShoutTopic.shout == aliased_shout_author.shout).add_columns( .add_columns(func.count(distinct(ShoutTopic.shout)).label("shouts_stat"))
func.count(distinct(aliased_shout_author.user)).label('authors_stat') .outerjoin(aliased_shout_author, ShoutTopic.shout == aliased_shout_author.shout)
).outerjoin(aliased_topic_follower).add_columns( .add_columns(
func.count(distinct(aliased_topic_follower.follower)).label('followers_stat') func.count(distinct(aliased_shout_author.user)).label("authors_stat")
)
.outerjoin(aliased_topic_follower)
.add_columns(
func.count(distinct(aliased_topic_follower.follower)).label(
"followers_stat"
)
)
) )
q = q.group_by(Topic.id) q = q.group_by(Topic.id)
@ -31,7 +38,7 @@ def add_stat(topic, stat_columns):
topic.stat = { topic.stat = {
"shouts": shouts_stat, "shouts": shouts_stat,
"authors": authors_stat, "authors": authors_stat,
"followers": followers_stat "followers": followers_stat,
} }
return topic return topic
@ -133,12 +140,10 @@ def topic_unfollow(user_id, slug):
try: try:
with local_session() as session: with local_session() as session:
sub = ( sub = (
session.query(TopicFollower).join(Topic).filter( session.query(TopicFollower)
and_( .join(Topic)
TopicFollower.follower == user_id, .filter(and_(TopicFollower.follower == user_id, Topic.slug == slug))
Topic.slug == slug .first()
)
).first()
) )
if sub: if sub:
session.delete(sub) session.delete(sub)

View File

@ -2,12 +2,6 @@ scalar DateTime
################################### Payload ################################### ################################### Payload ###################################
enum MessageStatus {
NEW
UPDATED
DELETED
}
type UserFollowings { type UserFollowings {
unread: Int unread: Int
topics: [String] topics: [String]
@ -23,18 +17,6 @@ type AuthResult {
news: UserFollowings news: UserFollowings
} }
type ChatMember {
id: Int!
slug: String!
name: String!
userpic: String
lastSeen: DateTime
online: Boolean
# invitedAt: DateTime
# invitedBy: String # user slug
# TODO: keep invite databit
}
type AuthorStat { type AuthorStat {
followings: Int followings: Int
followers: Int followers: Int
@ -62,11 +44,6 @@ type Author {
type Result { type Result {
error: String error: String
slugs: [String] slugs: [String]
chat: Chat
chats: [Chat]
message: Message
messages: [Message]
members: [ChatMember]
shout: Shout shout: Shout
shouts: [Shout] shouts: [Shout]
author: Author author: Author
@ -140,12 +117,6 @@ input ReactionInput {
replyTo: Int replyTo: Int
} }
input ChatInput {
id: String!
title: String
description: String
}
enum FollowingEntity { enum FollowingEntity {
TOPIC TOPIC
AUTHOR AUTHOR
@ -156,15 +127,6 @@ enum FollowingEntity {
################################### Mutation ################################### Mutation
type Mutation { type Mutation {
# inbox
createChat(title: String, members: [Int]!): Result!
updateChat(chat: ChatInput!): Result!
deleteChat(chatId: String!): Result!
createMessage(chat: String!, body: String!, replyTo: Int): Result!
updateMessage(chatId: String!, id: Int!, body: String!): Result!
deleteMessage(chatId: String!, id: Int!): Result!
markAsRead(chatId: String!, ids: [Int]!): Result!
# auth # auth
getSession: AuthResult! getSession: AuthResult!
@ -198,15 +160,6 @@ type Mutation {
unfollow(what: FollowingEntity!, slug: String!): Result! unfollow(what: FollowingEntity!, slug: String!): Result!
} }
input MessagesBy {
author: String
body: String
chat: String
order: String
days: Int
stat: String
}
input AuthorsBy { input AuthorsBy {
lastSeen: DateTime lastSeen: DateTime
createdAt: DateTime createdAt: DateTime
@ -252,12 +205,6 @@ input ReactionBy {
################################### Query ################################### Query
type Query { type Query {
# inbox
loadChats( limit: Int, offset: Int): Result! # your chats
loadMessagesBy(by: MessagesBy!, limit: Int, offset: Int): Result!
loadRecipients(limit: Int, offset: Int): Result!
searchRecipients(query: String!, limit: Int, offset: Int): Result!
searchMessages(by: MessagesBy!, limit: Int, offset: Int): Result!
# auth # auth
isEmailUsed(email: String!): Boolean! isEmailUsed(email: String!): Boolean!
@ -288,14 +235,6 @@ type Query {
topicsByAuthor(author: String!): [Topic]! topicsByAuthor(author: String!): [Topic]!
} }
############################################ Subscription
type Subscription {
newMessage: Message # new messages in inbox
newShout: Shout # personal feed new shout
newReaction: Reaction # new reactions to notify
}
############################################ Entities ############################################ Entities
type Resource { type Resource {
@ -474,29 +413,3 @@ type Token {
usedAt: DateTime usedAt: DateTime
value: String! value: String!
} }
type Message {
author: Int!
chatId: String!
body: String!
createdAt: Int!
id: Int!
replyTo: Int
updatedAt: Int
seen: Boolean
}
type Chat {
id: String!
createdAt: Int!
createdBy: Int!
updatedAt: Int!
title: String
description: String
users: [Int]
members: [ChatMember]
admins: [Int]
messages: [Message]
unread: Int
private: Boolean
}

39
services/exceptions.py Normal file
View File

@ -0,0 +1,39 @@
from starlette.exceptions import HTTPException
# TODO: remove traceback from logs for defined exceptions
class BaseHttpException(HTTPException):
states_code = 500
detail = "500 Server error"
class ExpiredToken(BaseHttpException):
states_code = 401
detail = "401 Expired Token"
class InvalidToken(BaseHttpException):
states_code = 401
detail = "401 Invalid Token"
class Unauthorized(BaseHttpException):
states_code = 401
detail = "401 Unauthorized"
class ObjectNotExist(BaseHttpException):
code = 404
detail = "404 Object Does Not Exist"
class OperationNotAllowed(BaseHttpException):
states_code = 403
detail = "403 Operation Is Not Allowed"
class InvalidPassword(BaseHttpException):
states_code = 403
message = "403 Invalid Password"

View File

@ -1,46 +0,0 @@
# from base.exceptions import Unauthorized
from auth.tokenstorage import SessionToken
from base.redis import redis
async def set_online_status(user_id, status):
if user_id:
if status:
await redis.execute("SADD", "users-online", user_id)
else:
await redis.execute("SREM", "users-online", user_id)
async def on_connect(req, params):
if not isinstance(params, dict):
req.scope["connection_params"] = {}
return
token = params.get('token')
if not token:
# raise Unauthorized("Please login")
return {
"error": "Please login first"
}
else:
payload = await SessionToken.verify(token)
if payload and payload.user_id:
req.scope["user_id"] = payload.user_id
await set_online_status(payload.user_id, True)
async def on_disconnect(req):
user_id = req.scope.get("user_id")
await set_online_status(user_id, False)
# FIXME: not used yet
def context_value(request):
context = {}
print(f"[inbox.presense] request debug: {request}")
if request.scope["type"] == "websocket":
# request is an instance of WebSocket
context.update(request.scope["connection_params"])
else:
context["token"] = request.META.get("authorization")
return context

View File

@ -1,22 +0,0 @@
from sse_starlette.sse import EventSourceResponse
from starlette.requests import Request
from graphql.type import GraphQLResolveInfo
from resolvers.inbox.messages import message_generator
# from base.exceptions import Unauthorized
# https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md
async def sse_messages(request: Request):
print(f'[SSE] request\n{request}\n')
info = GraphQLResolveInfo()
info.context['request'] = request.scope
user_id = request.scope['user'].user_id
if user_id:
event_generator = await message_generator(None, info)
return EventSourceResponse(event_generator)
else:
# raise Unauthorized("Please login")
return {
"error": "Please login first"
}

View File

@ -1,13 +1,13 @@
from services.search import SearchService from services.search import SearchService
from services.stat.viewed import ViewedStorage from stat.viewed import ViewedStorage
from base.orm import local_session from services.db import local_session
async def storages_init(): async def storages_init():
with local_session() as session: with local_session() as session:
print('[main] initialize SearchService') print("[main] initialize SearchService")
await SearchService.init(session) await SearchService.init(session)
print('[main] SearchService initialized') print("[main] SearchService initialized")
print('[main] initialize storages') print("[main] initialize storages")
await ViewedStorage.init() await ViewedStorage.init()
print('[main] storages initialized') print("[main] storages initialized")

35
services/presence.py Normal file
View File

@ -0,0 +1,35 @@
import json
from orm.reaction import Reaction
from orm.shout import Shout
from services.redis import redis
async def notify_reaction(reaction: Reaction):
channel_name = f"new_reaction"
data = {**reaction, "kind": f"new_reaction{reaction.kind}"}
try:
await redis.publish(channel_name, json.dumps(data))
except Exception as e:
print(f"Failed to publish to channel {channel_name}: {e}")
async def notify_shout(shout: Shout):
channel_name = f"new_shout"
data = {**shout, "kind": "new_shout"}
try:
await redis.publish(channel_name, json.dumps(data))
except Exception as e:
print(f"Failed to publish to channel {channel_name}: {e}")
async def notify_follower(follower_id: int, author_id: int):
channel_name = f"new_follower"
data = {
"follower_id": follower_id,
"author_id": author_id,
"kind": "new_follower",
}
try:
await redis.publish(channel_name, json.dumps(data))
except Exception as e:
print(f"Failed to publish to channel {channel_name}: {e}")

58
services/redis.py Normal file
View File

@ -0,0 +1,58 @@
import asyncio
import aredis
from settings import REDIS_URL
class RedisCache:
def __init__(self, uri=REDIS_URL):
self._uri: str = uri
self.pubsub_channels = []
self._instance = None
async def connect(self):
self._instance = aredis.StrictRedis.from_url(self._uri, decode_responses=True)
async def disconnect(self):
self._instance.connection_pool.disconnect()
self._instance = None
async def execute(self, command, *args, **kwargs):
while not self._instance:
await asyncio.sleep(1)
try:
print("[redis] " + command + " " + " ".join(args))
return await self._instance.execute_command(command, *args, **kwargs)
except Exception:
pass
async def subscribe(self, *channels):
if not self._instance:
await self.connect()
for channel in channels:
await self._instance.subscribe(channel)
self.pubsub_channels.append(channel)
async def unsubscribe(self, *channels):
if not self._instance:
return
for channel in channels:
await self._instance.unsubscribe(channel)
self.pubsub_channels.remove(channel)
async def publish(self, channel, data):
if not self._instance:
return
await self._instance.publish(channel, data)
async def lrange(self, key, start, stop):
print(f"[redis] LRANGE {key} {start} {stop}")
return await self._instance.lrange(key, start, stop)
async def mget(self, key, *keys):
print(f"[redis] MGET {key} {keys}")
return await self._instance.mget(key, *keys)
redis = RedisCache()
__all__ = ["redis"]

View File

@ -1,8 +1,8 @@
import asyncio import asyncio
import json import json
from base.redis import redis from services.redis import redis
from orm.shout import Shout from db.shout import Shout
from resolvers.zine.load import load_shouts_by from schema.zine.load import load_shouts_by
class SearchService: class SearchService:
@ -12,7 +12,7 @@ class SearchService:
@staticmethod @staticmethod
async def init(session): async def init(session):
async with SearchService.lock: async with SearchService.lock:
print('[search.service] did nothing') print("[search.service] did nothing")
SearchService.cache = {} SearchService.cache = {}
@staticmethod @staticmethod
@ -24,7 +24,7 @@ class SearchService:
"title": text, "title": text,
"body": text, "body": text,
"limit": limit, "limit": limit,
"offset": offset "offset": offset,
} }
payload = await load_shouts_by(None, None, options) payload = await load_shouts_by(None, None, options)
await redis.execute("SET", text, json.dumps(payload)) await redis.execute("SET", text, json.dumps(payload))

View File

@ -6,13 +6,13 @@ from ssl import create_default_context
from gql import Client, gql from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport from gql.transport.aiohttp import AIOHTTPTransport
from sqlalchemy import func
from base.orm import local_session from services.db import local_session
from orm import User, Topic from orm import Topic
from orm.shout import ShoutTopic, Shout from orm.shout import ShoutTopic, Shout
load_facts = gql(""" load_facts = gql(
"""
query getDomains { query getDomains {
domains { domains {
id id
@ -25,9 +25,11 @@ query getDomains {
} }
} }
} }
""") """
)
load_pages = gql(""" load_pages = gql(
"""
query getDomains { query getDomains {
domains { domains {
title title
@ -41,8 +43,9 @@ query getDomains {
} }
} }
} }
""") """
schema_str = open(path.dirname(__file__) + '/ackee.graphql').read() )
schema_str = open(path.dirname(__file__) + "/ackee.graphql").read()
token = environ.get("ACKEE_TOKEN", "") token = environ.get("ACKEE_TOKEN", "")
@ -52,8 +55,8 @@ def create_client(headers=None, schema=None):
transport=AIOHTTPTransport( transport=AIOHTTPTransport(
url="https://ackee.discours.io/api", url="https://ackee.discours.io/api",
ssl=create_default_context(), ssl=create_default_context(),
headers=headers headers=headers,
) ),
) )
@ -75,10 +78,13 @@ class ViewedStorage:
self = ViewedStorage self = ViewedStorage
async with self.lock: async with self.lock:
if token: if token:
self.client = create_client({ self.client = create_client(
"Authorization": "Bearer %s" % str(token) {"Authorization": "Bearer %s" % str(token)}, schema=schema_str
}, schema=schema_str) )
print("[stat.viewed] * authorized permanentely by ackee.discours.io: %s" % token) print(
"[stat.viewed] * authorized permanentely by ackee.discours.io: %s"
% token
)
else: else:
print("[stat.viewed] * please set ACKEE_TOKEN") print("[stat.viewed] * please set ACKEE_TOKEN")
self.disabled = True self.disabled = True
@ -96,7 +102,7 @@ class ViewedStorage:
try: try:
for page in self.pages: for page in self.pages:
p = page["value"].split("?")[0] p = page["value"].split("?")[0]
slug = p.split('discours.io/')[-1] slug = p.split("discours.io/")[-1]
shouts[slug] = page["count"] shouts[slug] = page["count"]
for slug in shouts.keys(): for slug in shouts.keys():
await ViewedStorage.increment(slug, shouts[slug]) await ViewedStorage.increment(slug, shouts[slug])
@ -126,7 +132,9 @@ class ViewedStorage:
shout_views = 0 shout_views = 0
with local_session() as session: with local_session() as session:
try: try:
shout = session.query(Shout).where(Shout.slug == shout_slug).one() shout = (
session.query(Shout).where(Shout.slug == shout_slug).one()
)
self.by_shouts[shout_slug] = shout.views self.by_shouts[shout_slug] = shout.views
self.update_topics(session, shout_slug) self.update_topics(session, shout_slug)
except Exception as e: except Exception as e:
@ -148,22 +156,26 @@ class ViewedStorage:
def update_topics(session, shout_slug): def update_topics(session, shout_slug):
"""updates topics counters by shout slug""" """updates topics counters by shout slug"""
self = ViewedStorage self = ViewedStorage
for [shout_topic, topic] in session.query(ShoutTopic, Topic).join(Topic).join(Shout).where( for [shout_topic, topic] in (
Shout.slug == shout_slug session.query(ShoutTopic, Topic)
).all(): .join(Topic)
.join(Shout)
.where(Shout.slug == shout_slug)
.all()
):
if not self.by_topics.get(topic.slug): if not self.by_topics.get(topic.slug):
self.by_topics[topic.slug] = {} self.by_topics[topic.slug] = {}
self.by_topics[topic.slug][shout_slug] = self.by_shouts[shout_slug] self.by_topics[topic.slug][shout_slug] = self.by_shouts[shout_slug]
@staticmethod @staticmethod
async def increment(shout_slug, amount=1, viewer='ackee'): async def increment(shout_slug, amount=1, viewer="ackee"):
"""the only way to change views counter""" """the only way to change views counter"""
self = ViewedStorage self = ViewedStorage
async with self.lock: async with self.lock:
# TODO optimize, currenty we execute 1 DB transaction per shout # TODO optimize, currenty we execute 1 DB transaction per shout
with local_session() as session: with local_session() as session:
shout = session.query(Shout).where(Shout.slug == shout_slug).one() shout = session.query(Shout).where(Shout.slug == shout_slug).one()
if viewer == 'old-discours': if viewer == "old-discours":
# this is needed for old db migration # this is needed for old db migration
if shout.viewsOld == amount: if shout.viewsOld == amount:
print(f"viewsOld amount: {amount}") print(f"viewsOld amount: {amount}")
@ -174,7 +186,9 @@ class ViewedStorage:
if shout.viewsAckee == amount: if shout.viewsAckee == amount:
print(f"viewsAckee amount: {amount}") print(f"viewsAckee amount: {amount}")
else: else:
print(f"viewsAckee amount changed: {shout.viewsAckee} --> {amount}") print(
f"viewsAckee amount changed: {shout.viewsAckee} --> {amount}"
)
shout.viewsAckee = amount shout.viewsAckee = amount
session.commit() session.commit()
@ -205,9 +219,10 @@ class ViewedStorage:
if failed == 0: if failed == 0:
when = datetime.now(timezone.utc) + timedelta(seconds=self.period) when = datetime.now(timezone.utc) + timedelta(seconds=self.period)
t = format(when.astimezone().isoformat()) t = format(when.astimezone().isoformat())
print("[stat.viewed] ⎩ next update: %s" % ( print(
t.split("T")[0] + " " + t.split("T")[1].split(".")[0] "[stat.viewed] ⎩ next update: %s"
)) % (t.split("T")[0] + " " + t.split("T")[1].split(".")[0])
)
await asyncio.sleep(self.period) await asyncio.sleep(self.period)
else: else:
await asyncio.sleep(10) await asyncio.sleep(10)

View File

@ -1,70 +0,0 @@
import asyncio
import subprocess
from pathlib import Path
from settings import SHOUTS_REPO
class GitTask:
"""every shout update use a new task"""
queue = asyncio.Queue()
def __init__(self, input, username, user_email, comment):
self.slug = input["slug"]
self.shout_body = input["body"]
self.username = username
self.user_email = user_email
self.comment = comment
GitTask.queue.put_nowait(self)
def init_repo(self):
repo_path = "%s" % (SHOUTS_REPO)
Path(repo_path).mkdir()
cmd = (
"cd %s && git init && "
"git config user.name 'discours' && "
"git config user.email 'discours@discours.io' && "
"touch initial && git add initial && "
"git commit -m 'init repo'" % (repo_path)
)
output = subprocess.check_output(cmd, shell=True)
print(output)
def execute(self):
repo_path = "%s" % (SHOUTS_REPO)
if not Path(repo_path).exists():
self.init_repo()
# cmd = "cd %s && git checkout master" % (repo_path)
# output = subprocess.check_output(cmd, shell=True)
# print(output)
shout_filename = "%s.mdx" % (self.slug)
shout_full_filename = "%s/%s" % (repo_path, shout_filename)
with open(shout_full_filename, mode="w", encoding="utf-8") as shout_file:
shout_file.write(bytes(self.shout_body, "utf-8").decode("utf-8", "ignore"))
author = "%s <%s>" % (self.username, self.user_email)
cmd = "cd %s && git add %s && git commit -m '%s' --author='%s'" % (
repo_path,
shout_filename,
self.comment,
author,
)
output = subprocess.check_output(cmd, shell=True)
print(output)
@staticmethod
async def git_task_worker():
print("[service.git] starting task worker")
while True:
task = await GitTask.queue.get()
try:
task.execute()
except Exception as err:
print("[service.git] worker error: %s" % (err))