diff --git a/disabled-CHECKS b/CHECKS similarity index 100% rename from disabled-CHECKS rename to CHECKS diff --git a/auth/oauth.py b/auth/oauth.py index ac63fa5a..88950c8d 100644 --- a/auth/oauth.py +++ b/auth/oauth.py @@ -66,7 +66,7 @@ async def oauth_login(request): provider = request.path_params["provider"] request.session["provider"] = provider client = oauth.create_client(provider) - redirect_uri = "https://newapi.discours.io/oauth-authorize" + redirect_uri = "https://v2.discours.io/oauth-authorize" return await client.authorize_redirect(request, redirect_uri) diff --git a/create_crt.sh b/create_crt.sh deleted file mode 100644 index 9994eb82..00000000 --- a/create_crt.sh +++ /dev/null @@ -1,14 +0,0 @@ -#!/bin/bash - -openssl req -newkey rsa:4096 \ - -x509 \ - -sha256 \ - -days 3650 \ - -nodes \ - -out server.crt \ - -keyout server.key \ - -subj "/C=RU/ST=Moscow/L=Moscow/O=Discours/OU=Site/CN=newapi.discours.io" - -openssl x509 -in server.crt -out server.pem -outform PEM -tar cvf server.tar server.crt server.key -dokku certs:add discoursio-api < server.tar diff --git a/main.py b/main.py index 10bd865e..965b9e94 100644 --- a/main.py +++ b/main.py @@ -14,11 +14,10 @@ from auth.oauth import oauth_login, oauth_authorize from base.redis import redis from base.resolvers import resolvers from resolvers.auth import confirm_email_handler -from resolvers.zine import ShoutsCache from services.main import storages_init from services.stat.reacted import ReactedStorage from services.stat.topicstat import TopicStat -from services.stat.viewed import ViewedStorage +from services.stat.views import ViewStat from services.zine.gittask import GitTask from services.zine.shoutauthor import ShoutAuthorStorage import_module("resolvers") @@ -32,20 +31,17 @@ middleware = [ async def start_up(): await redis.connect() - viewed_storage_task = asyncio.create_task(ViewedStorage.worker()) - print(viewed_storage_task) + await storages_init() + views_stat_task = asyncio.create_task(ViewStat.worker()) + print(views_stat_task) reacted_storage_task = asyncio.create_task(ReactedStorage.worker()) print(reacted_storage_task) - shouts_cache_task = asyncio.create_task(ShoutsCache.worker()) - print(shouts_cache_task) shout_author_task = asyncio.create_task(ShoutAuthorStorage.worker()) print(shout_author_task) topic_stat_task = asyncio.create_task(TopicStat.worker()) print(topic_stat_task) git_task = asyncio.create_task(GitTask.git_task_worker()) print(git_task) - await storages_init() - print() async def shutdown(): diff --git a/migration/__init__.py b/migration/__init__.py index 7bed642b..61593bbd 100644 --- a/migration/__init__.py +++ b/migration/__init__.py @@ -7,7 +7,7 @@ import sys from datetime import datetime import bs4 - +from base.redis import redis from migration.tables.comments import migrate as migrateComment from migration.tables.comments import migrate_2stage as migrateComment_2stage from migration.tables.content_items import get_shout_slug @@ -181,7 +181,11 @@ async def all_handle(storage, args): print("[migration] handle everything") await users_handle(storage) await topics_handle(storage) + print("[migration] users and topics are migrated") + await redis.connect() + print("[migration] redis connected") await shouts_handle(storage, args) + print("[migration] migrating comments") await comments_handle(storage) # export_email_subscriptions() print("[migration] done!") @@ -295,9 +299,9 @@ def create_pgdump(): async def handle_auto(): - print("[migration] no option given, auto mode") url = os.getenv("MONGODB_URL") if url: + print("[migration] connecting mongo") mongo_download(url) bson_handle() await all_handle(data_load(), sys.argv) diff --git a/migration/extract.py b/migration/extract.py index 48ed5d68..4ea44d04 100644 --- a/migration/extract.py +++ b/migration/extract.py @@ -285,13 +285,13 @@ def prepare_md_body(entry): if "title" in m: trackname += m.get("title", "") addon += ( - '\n' ) - body = "import MusicPlayer from '$/components/Article/MusicPlayer'\n\n" + addon + body = "import AudioPlayer from '$/components/Article/AudioPlayer'\n\n" + addon body_orig, media = extract_html(entry) if body_orig: diff --git a/migration/tables/content_items.py b/migration/tables/content_items.py index 5adae72e..89392051 100644 --- a/migration/tables/content_items.py +++ b/migration/tables/content_items.py @@ -1,5 +1,5 @@ from datetime import datetime - +import json from dateutil.parser import parse as date_parse from sqlalchemy.exc import IntegrityError from transliterate import translit @@ -12,7 +12,7 @@ from orm.shout import Shout, ShoutTopic, ShoutReactionsFollower from orm.user import User from orm.topic import TopicFollower from services.stat.reacted import ReactedStorage -from services.stat.viewed import ViewedStorage +from services.stat.views import ViewStat OLD_DATE = "2016-03-05 22:22:00.350000" ts = datetime.now() @@ -149,6 +149,12 @@ async def migrate(entry, storage): if entry.get("published"): r["publishedAt"] = date_parse(entry.get("publishedAt", OLD_DATE)) r["visibility"] = "public" + with local_session() as session: + # update user.emailConfirmed if published + author = session.query(User).where(User.slug == userslug).first() + author.emailConfirmed = True + session.add(author) + session.commit() else: r["visibility"] = "authors" if "deletedAt" in entry: @@ -192,7 +198,7 @@ async def migrate(entry, storage): # body r["body"], media = prepare_html_body(entry) if media: - print(media) + r["media"] = json.dumps(media) # save shout to db s = object() shout_dict = r.copy() @@ -340,7 +346,7 @@ async def migrate(entry, storage): raise Exception("[migration] content_item.ratings error: \n%r" % content_rating) # shout views - await ViewedStorage.increment(shout_dict["slug"], amount=entry.get("views", 1)) + await ViewStat.increment(shout_dict["slug"], amount=entry.get("views", 1)) # del shout_dict['ratings'] shout_dict["oid"] = entry.get("_id") storage["shouts"]["by_oid"][entry["_id"]] = shout_dict diff --git a/migration/tables/users.py b/migration/tables/users.py index 982f3abc..17b6cead 100644 --- a/migration/tables/users.py +++ b/migration/tables/users.py @@ -34,7 +34,9 @@ def migrate(entry): user_dict["slug"] = ( entry["profile"].get("path").lower().replace(" ", "-").strip() ) - user_dict["bio"] = html2text(entry.get("profile").get("bio") or "") + user_dict["bio"] = html2text( + entry.get("profile").get("bio") or "" + ).replace('\(', '(').replace('\)', ')') # userpic try: diff --git a/nginx.conf.sigil b/nginx.conf.sigil index 5a74d410..eb458974 100644 --- a/nginx.conf.sigil +++ b/nginx.conf.sigil @@ -5,7 +5,7 @@ {{ $upstream_port := index $port_map_list 2 }} map $http_origin $allow_origin { - ~^https?:\/\/((.*\.)?localhost(:\d+)?|discoursio-webapp.*\.vercel\.app|(.*\.)?discours\.io(:\d+)?)$ $http_origin; + ~^https?:\/\/((.*\.)?localhost(:\d+)?|discoursio-webapp-(.*)?\.vercel\.app|(.*\.)?discours\.io(:\d+)?)$ $http_origin; default ""; } diff --git a/orm/__init__.py b/orm/__init__.py index 8c9f6412..4ac7cee3 100644 --- a/orm/__init__.py +++ b/orm/__init__.py @@ -6,7 +6,6 @@ from orm.reaction import Reaction from orm.shout import Shout from orm.topic import Topic, TopicFollower from orm.user import User, UserRating -from orm.viewed import ViewedByDay __all__ = [ "User", @@ -19,8 +18,7 @@ __all__ = [ "TopicFollower", "Notification", "Reaction", - "UserRating", - "ViewedByDay" + "UserRating" ] Base.metadata.create_all(engine) diff --git a/orm/shout.py b/orm/shout.py index 7828aa62..2a3909ad 100644 --- a/orm/shout.py +++ b/orm/shout.py @@ -1,6 +1,6 @@ from datetime import datetime -from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, String +from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, String, JSON from sqlalchemy.orm import relationship from base.orm import Base @@ -58,9 +58,9 @@ class Shout(Base): versionOf = Column(ForeignKey("shout.slug"), nullable=True) lang = Column(String, default='ru') oid = Column(String, nullable=True) + media = Column(JSON, nullable=True) createdAt = Column(DateTime, nullable=False, default=datetime.now, comment="Created at") updatedAt = Column(DateTime, nullable=True, comment="Updated at") publishedAt = Column(DateTime, nullable=True) deletedAt = Column(DateTime, nullable=True) - diff --git a/orm/viewed.py b/orm/viewed.py deleted file mode 100644 index 7db97418..00000000 --- a/orm/viewed.py +++ /dev/null @@ -1,12 +0,0 @@ -from datetime import datetime -from sqlalchemy import Column, DateTime, ForeignKey, Integer -from base.orm import Base - - -class ViewedByDay(Base): - __tablename__ = "viewed_by_day" - - id = None - shout = Column(ForeignKey("shout.slug"), primary_key=True) - day = Column(DateTime, primary_key=True, default=datetime.now) - value = Column(Integer) diff --git a/requirements.txt b/requirements.txt index e1d477fc..e47130af 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,11 +1,13 @@ python-frontmatter~=1.0.0 aioredis~=2.0.1 +aiohttp ariadne>=0.16.0 PyYAML>=5.4 pyjwt>=2.6.0 starlette~=0.20.4 sqlalchemy>=1.4.41 graphql-core +gql uvicorn>=0.18.3 pydantic>=1.10.2 passlib~=1.7.4 diff --git a/resolvers/__init__.py b/resolvers/__init__.py index 1fe2933f..25b516b2 100644 --- a/resolvers/__init__.py +++ b/resolvers/__init__.py @@ -8,32 +8,23 @@ from resolvers.auth import ( get_current_user, ) from resolvers.collab import remove_author, invite_author -from resolvers.community import ( - create_community, - delete_community, - get_community, - get_communities, -) - from resolvers.migrate import markdown_body # from resolvers.collab import invite_author, remove_author from resolvers.editor import create_shout, delete_shout, update_shout from resolvers.profile import ( - get_users_by_slugs, - get_user_reacted_shouts, - get_user_roles, - get_top_authors, - get_author + load_authors_by, + rate_user, + update_profile ) -# from resolvers.feed import shouts_for_feed, my_candidates from resolvers.reactions import ( create_reaction, delete_reaction, update_reaction, reactions_unfollow, reactions_follow, + load_reactions_by ) from resolvers.topics import ( topic_follow, @@ -45,36 +36,31 @@ from resolvers.topics import ( ) from resolvers.zine import ( - get_shout_by_slug, follow, unfollow, - increment_view, - top_month, - top_overall, - recent_published, - recent_all, - recent_commented, - recent_reacted, - shouts_by_authors, - shouts_by_topics, - shouts_by_layout_recent, - shouts_by_layout_top, - shouts_by_layout_topmonth, - shouts_by_communities, + load_shouts_by ) -from resolvers.inbox.chats import load_chats, \ - create_chat, delete_chat, update_chat, \ - invite_to_chat, enter_chat -from resolvers.inbox.messages import load_chat_messages, \ - create_message, delete_message, update_message, \ - message_generator, mark_as_read -from resolvers.inbox.search import search_users, \ - search_messages, search_chats +from resolvers.inbox.chats import ( + create_chat, + delete_chat, + update_chat, + invite_to_chat +) +from resolvers.inbox.messages import ( + create_message, + delete_message, + update_message, + message_generator, + mark_as_read +) +from resolvers.inbox.load import ( + load_chats, + load_messages_by +) +from resolvers.inbox.search import search_users __all__ = [ - "follow", - "unfollow", # auth "login", "register_by_email", @@ -83,27 +69,15 @@ __all__ = [ "auth_send_link", "sign_out", "get_current_user", - # profile - "get_users_by_slugs", - "get_user_roles", - "get_top_authors", - "get_author", + # authors + "load_authors_by", + "rate_user", + "update_profile", + "get_authors_all", # zine - "recent_published", - "recent_commented", - "recent_reacted", - "recent_all", - "shouts_by_topics", - "shouts_by_layout_recent", - "shouts_by_layout_topmonth", - "shouts_by_layout_top", - "shouts_by_authors", - "shouts_by_communities", - "get_user_reacted_shouts", - "top_month", - "top_overall", - "increment_view", - "get_shout_by_slug", + "load_shouts_by", + "follow", + "unfollow", # editor "create_shout", "update_shout", @@ -120,31 +94,24 @@ __all__ = [ "topic_follow", "topic_unfollow", "get_topic", - # communities - "get_community", - "get_communities", - "create_community", - "delete_community", # reactions "reactions_follow", "reactions_unfollow", "create_reaction", "update_reaction", "delete_reaction", + "load_reactions_by", # inbox + "load_chats", + "load_messages_by", + "invite_to_chat", "create_chat", "delete_chat", "update_chat", - "load_chats", "create_message", "delete_message", "update_message", - "load_chat_messages", "message_generator", "mark_as_read", - "search_users", - "search_chats", - "search_messages", - "enter_chat", - "invite_to_chat" + "search_users" ] diff --git a/resolvers/collab.py b/resolvers/collab.py index 8271b478..7db4620e 100644 --- a/resolvers/collab.py +++ b/resolvers/collab.py @@ -32,16 +32,16 @@ async def invite_author(_, info, author, shout): authors = [a.id for a in shout.authors] if user_id not in authors: return {"error": "access denied"} - author = session.query(User).filter(User.slug == author).first() - if author.id in authors: - return {"error": "already added"} - shout.authors.append(author) + author = session.query(User).filter(User.id == author.id).first() + if author: + if author.id in authors: + return {"error": "already added"} + shout.authors.append(author) shout.updated_at = datetime.now() session.add(shout) session.commit() # TODO: email notify - return {} @@ -59,9 +59,10 @@ async def remove_author(_, info, author, shout): if user_id not in authors: return {"error": "access denied"} author = session.query(User).filter(User.slug == author).first() - if author.id not in authors: - return {"error": "not in authors"} - shout.authors.remove(author) + if author: + if author.id not in authors: + return {"error": "not in authors"} + shout.authors.remove(author) shout.updated_at = datetime.now() session.add(shout) session.commit() diff --git a/resolvers/collection.py b/resolvers/collection.py deleted file mode 100644 index 9c300b3b..00000000 --- a/resolvers/collection.py +++ /dev/null @@ -1,104 +0,0 @@ -from datetime import datetime - -from sqlalchemy import and_ - -from auth.authenticate import login_required -from base.orm import local_session -from base.resolvers import mutation, query -from orm.collection import Collection, ShoutCollection -from orm.user import User - - -@mutation.field("createCollection") -@login_required -async def create_collection(_, _info, inp): - # auth = info.context["request"].auth - # user_id = auth.user_id - collection = Collection.create( - slug=inp.get("slug", ""), - title=inp.get("title", ""), - desc=inp.get("desc", ""), - pic=inp.get("pic", ""), - ) - - return {"collection": collection} - - -@mutation.field("updateCollection") -@login_required -async def update_collection(_, info, inp): - auth = info.context["request"].auth - user_id = auth.user_id - collection_slug = inp.get("slug", "") - with local_session() as session: - owner = session.query(User).filter(User.id == user_id) # note list here - collection = ( - session.query(Collection).filter(Collection.slug == collection_slug).first() - ) - editors = [e.slug for e in collection.editors] - if not collection: - return {"error": "invalid collection id"} - if collection.createdBy not in (owner + editors): - return {"error": "access denied"} - collection.title = inp.get("title", "") - collection.desc = inp.get("desc", "") - collection.pic = inp.get("pic", "") - collection.updatedAt = datetime.now() - session.commit() - - -@mutation.field("deleteCollection") -@login_required -async def delete_collection(_, info, slug): - auth = info.context["request"].auth - user_id = auth.user_id - with local_session() as session: - collection = session.query(Collection).filter(Collection.slug == slug).first() - if not collection: - return {"error": "invalid collection slug"} - if collection.owner != user_id: - return {"error": "access denied"} - collection.deletedAt = datetime.now() - session.add(collection) - session.commit() - - return {} - - -@query.field("getUserCollections") -async def get_user_collections(_, _info, userslug): - collections = [] - with local_session() as session: - user = session.query(User).filter(User.slug == userslug).first() - if user: - # TODO: check rights here - collections = ( - session.query(Collection) - .where( - and_(Collection.createdBy == userslug, Collection.publishedAt.is_not(None)) - ) - .all() - ) - for c in collections: - shouts = ( - session.query(ShoutCollection) - .filter(ShoutCollection.collection == c.id) - .all() - ) - c.amount = len(shouts) - return collections - - -@query.field("getMyColelctions") -@login_required -async def get_my_collections(_, info): - auth = info.context["request"].auth - user_id = auth.user_id - with local_session() as session: - collections = ( - session.query(Collection).when(Collection.createdBy == user_id).all() - ) - return collections - - -# TODO: get shouts list by collection diff --git a/resolvers/community.py b/resolvers/community.py deleted file mode 100644 index e37e0e7e..00000000 --- a/resolvers/community.py +++ /dev/null @@ -1,134 +0,0 @@ -from datetime import datetime -from typing import List - -from sqlalchemy import and_ - -from auth.authenticate import login_required -from base.orm import local_session -from base.resolvers import mutation, query -from orm.community import Community, CommunityFollower -from orm.user import User - - -@mutation.field("createCommunity") -@login_required -async def create_community(_, info, input): - auth = info.context["request"].auth - user_id = auth.user_id - with local_session() as session: - user = session.query(User).where(User.id == user_id).first() - community = Community.create( - slug=input.get("slug", ""), - title=input.get("title", ""), - desc=input.get("desc", ""), - pic=input.get("pic", ""), - createdBy=user.slug, - createdAt=datetime.now(), - ) - session.add(community) - session.commit() - - return {"community": community} - - -@mutation.field("updateCommunity") -@login_required -async def update_community(_, info, input): - auth = info.context["request"].auth - user_id = auth.user_id - community_slug = input.get("slug", "") - - with local_session() as session: - owner = session.query(User).filter(User.id == user_id) # note list here - community = ( - session.query(Community).filter(Community.slug == community_slug).first() - ) - editors = [e.slug for e in community.editors] - if not community: - return {"error": "invalid community id"} - if community.createdBy not in (owner + editors): - return {"error": "access denied"} - community.title = input.get("title", "") - community.desc = input.get("desc", "") - community.pic = input.get("pic", "") - community.updatedAt = datetime.now() - session.add(community) - session.commit() - - -@mutation.field("deleteCommunity") -@login_required -async def delete_community(_, info, slug): - auth = info.context["request"].auth - user_id = auth.user_id - - with local_session() as session: - community = session.query(Community).filter(Community.slug == slug).first() - if not community: - return {"error": "invalid community slug"} - if community.owner != user_id: - return {"error": "access denied"} - community.deletedAt = datetime.now() - session.add(community) - session.commit() - - return {} - - -@query.field("getCommunity") -async def get_community(_, info, slug): - with local_session() as session: - community = session.query(Community).filter(Community.slug == slug).first() - if not community: - return {"error": "invalid community id"} - - return community - - -@query.field("getCommunities") -async def get_communities(_, info): - with local_session() as session: - communities = session.query(Community) - return communities - - -def community_follow(user, slug): - with local_session() as session: - cf = CommunityFollower.create(follower=user.slug, community=slug) - session.add(cf) - session.commit() - - -def community_unfollow(user, slug): - with local_session() as session: - following = ( - session.query(CommunityFollower) - .filter( - and_( - CommunityFollower.follower == user.slug, - CommunityFollower.community == slug, - ) - ) - .first() - ) - if not following: - raise Exception("[orm.community] following was not exist") - session.delete(following) - session.commit() - - -@query.field("userFollowedCommunities") -def get_followed_communities(_, _info, user_slug) -> List[Community]: - return followed_communities(user_slug) - - -def followed_communities(user_slug) -> List[Community]: - ccc = [] - with local_session() as session: - ccc = ( - session.query(Community.slug) - .join(CommunityFollower) - .where(CommunityFollower.follower == user_slug) - .all() - ) - return ccc diff --git a/resolvers/editor.py b/resolvers/editor.py index 06abd34d..a44e15f2 100644 --- a/resolvers/editor.py +++ b/resolvers/editor.py @@ -73,9 +73,14 @@ async def update_shout(_, info, inp): shout.update(inp) shout.updatedAt = datetime.now() session.add(shout) - for topic in inp.get("topic_slugs", []): - st = ShoutTopic.create(shout=slug, topic=topic) - session.add(st) + if inp.get("topics"): + # remove old links + links = session.query(ShoutTopic).where(ShoutTopic.shout == slug).all() + for topiclink in links: + session.delete(topiclink) + # add new topic links + for topic in inp.get("topics", []): + ShoutTopic.create(shout=slug, topic=topic) session.commit() GitTask(inp, user.username, user.email, "update shout %s" % (slug)) diff --git a/resolvers/feed.py b/resolvers/feed.py deleted file mode 100644 index f53955f4..00000000 --- a/resolvers/feed.py +++ /dev/null @@ -1,53 +0,0 @@ -from typing import List - -from sqlalchemy import and_, desc - -from auth.authenticate import login_required -from base.orm import local_session -from base.resolvers import query -from orm.shout import Shout, ShoutAuthor, ShoutTopic -from orm.topic import TopicFollower -from orm.user import AuthorFollower -from services.zine.shoutscache import prepare_shouts - - -@query.field("shoutsForFeed") -@login_required -async def get_user_feed(_, info, offset, limit) -> List[Shout]: - user = info.context["request"].user - shouts = [] - with local_session() as session: - shouts = ( - session.query(Shout) - .join(ShoutAuthor) - .join(AuthorFollower) - .where(AuthorFollower.follower == user.slug) - .order_by(desc(Shout.createdAt)) - ) - topic_rows = ( - session.query(Shout) - .join(ShoutTopic) - .join(TopicFollower) - .where(TopicFollower.follower == user.slug) - .order_by(desc(Shout.createdAt)) - ) - shouts = shouts.union(topic_rows).limit(limit).offset(offset).all() - return shouts - - -@query.field("recentCandidates") -@login_required -async def user_unpublished_shouts(_, info, offset, limit) -> List[Shout]: - user = info.context["request"].user - with local_session() as session: - shouts = prepare_shouts( - session.query(Shout) - .join(ShoutAuthor) - .where(and_(Shout.publishedAt.is_(None), ShoutAuthor.user == user.slug)) - .order_by(desc(Shout.createdAt)) - .group_by(Shout.id) - .limit(limit) - .offset(offset) - .all() - ) - return shouts diff --git a/resolvers/inbox/chats.py b/resolvers/inbox/chats.py index e6923811..04fd691d 100644 --- a/resolvers/inbox/chats.py +++ b/resolvers/inbox/chats.py @@ -5,7 +5,7 @@ from datetime import datetime from auth.authenticate import login_required from base.redis import redis from base.resolvers import mutation, query -from resolvers.inbox.load import load_messages, load_user_chats +from services.auth.users import UserStorage async def add_user_to_chat(user_slug: str, chat_id: str, chat=None): @@ -20,40 +20,6 @@ async def add_user_to_chat(user_slug: str, chat_id: str, chat=None): await redis.execute("SET", f"chats_by_user/{member}", json.dumps(chats_ids)) -@query.field("loadChats") -@login_required -async def load_chats(_, info): - user = info.context["request"].user - return await load_user_chats(user.slug) - - -@mutation.field("enterChat") -@login_required -async def enter_chat(_, info, chat_id: str): - ''' enter to public chat with :chat_id ''' - user = info.context["request"].user - chat = await redis.execute("GET", f"chats/{chat_id}") - if not chat: - return { - "error": "chat not exist" - } - else: - chat = dict(json.loads(chat)) - if chat['private']: - return { - "error": "cannot enter private chat" - } - if user.slug not in chat["users"]: - chat["users"].append(user.slug) - await add_user_to_chat(user.slug, chat_id, chat) - await redis.execute("SET" f"chats/{chat_id}", json.dumps(chat)) - chat['messages'] = await load_messages(chat_id) - return { - "chat": chat, - "error": None - } - - @mutation.field("inviteChat") async def invite_to_chat(_, info, invited: str, chat_id: str): ''' invite user with :slug to chat with :chat_id ''' @@ -156,3 +122,10 @@ async def delete_chat(_, info, chat_id: str): return { "error": "chat not exist" } + + +@query.field("chatUsersAll") +@login_required +async def get_chat_users_all(_, info): + chat_users = await UserStorage.get_all_chat_users() + return chat_users diff --git a/resolvers/inbox/load.py b/resolvers/inbox/load.py index 49c1e766..a3eb24fb 100644 --- a/resolvers/inbox/load.py +++ b/resolvers/inbox/load.py @@ -1,51 +1,36 @@ import json +from datetime import datetime, timedelta +from auth.authenticate import login_required from base.redis import redis +from base.resolvers import query async def get_unread_counter(chat_id: str, user_slug: str): try: - return int(await redis.execute("LLEN", f"chats/{chat_id}/unread/{user_slug}")) + unread = await redis.execute("LLEN", f"chats/{chat_id}/unread/{user_slug}") + if unread: + return unread except Exception: return 0 async def get_total_unread_counter(user_slug: str): chats = await redis.execute("GET", f"chats_by_user/{user_slug}") - if not chats: - return 0 - - chats = json.loads(chats) unread = 0 - for chat_id in chats: - n = await get_unread_counter(chat_id, user_slug) - unread += n - + if chats: + chats = json.loads(chats) + for chat_id in chats: + n = await get_unread_counter(chat_id, user_slug) + unread += n return unread -async def load_user_chats(slug, offset: int, amount: int): - """ load :amount chats of :slug user with :offset """ - - chats = await redis.execute("GET", f"chats_by_user/{slug}") - if chats: - chats = list(json.loads(chats))[offset:offset + amount] - if not chats: - chats = [] - for c in chats: - c['messages'] = await load_messages(c['id']) - c['unread'] = await get_unread_counter(c['id'], slug) - return { - "chats": chats, - "error": None - } - - -async def load_messages(chatId: str, offset: int, amount: int): - ''' load :amount messages for :chatId with :offset ''' +async def load_messages(chatId: str, limit: int, offset: int): + ''' load :limit messages for :chatId with :offset ''' messages = [] message_ids = await redis.lrange( - f"chats/{chatId}/message_ids", 0 - offset - amount, 0 - offset + f"chats/{chatId}/message_ids", 0 - offset - limit, 0 - offset ) if message_ids: message_keys = [ @@ -57,3 +42,61 @@ async def load_messages(chatId: str, offset: int, amount: int): "messages": messages, "error": None } + + +@query.field("loadChats") +@login_required +async def load_chats(_, info, limit: int, offset: int): + """ load :limit chats of current user with :offset """ + user = info.context["request"].user + chats = await redis.execute("GET", f"chats_by_user/{user.slug}") + if chats: + chats = list(json.loads(chats))[offset:offset + limit] + if not chats: + chats = [] + for c in chats: + c['messages'] = await load_messages(c['id'], limit, offset) + c['unread'] = await get_unread_counter(c['id'], user.slug) + return { + "chats": chats, + "error": None + } + + +@query.field("loadMessagesBy") +@login_required +async def load_messages_by(_, info, by, limit: int = 50, offset: int = 0): + ''' load :amolimitunt messages of :chat_id with :offset ''' + user = info.context["request"].user + my_chats = await redis.execute("GET", f"chats_by_user/{user.slug}") + chat_id = by.get('chat') + if chat_id: + chat = await redis.execute("GET", f"chats/{chat_id}") + if not chat: + return { + "error": "chat not exist" + } + messages = await load_messages(chat_id, limit, offset) + user_id = by.get('author') + if user_id: + chats = await redis.execute("GET", f"chats_by_user/{user_id}") + our_chats = list(set(chats) & set(my_chats)) + for c in our_chats: + messages += await load_messages(c, limit, offset) + body_like = by.get('body') + if body_like: + for c in my_chats: + mmm = await load_messages(c, limit, offset) + for m in mmm: + if body_like in m["body"]: + messages.append(m) + days = by.get("days") + if days: + messages = filter( + lambda m: datetime.now() - int(m["createdAt"]) < timedelta(days=by.get("days")), + messages + ) + return { + "messages": messages, + "error": None + } diff --git a/resolvers/inbox/messages.py b/resolvers/inbox/messages.py index e6494808..48f45b4a 100644 --- a/resolvers/inbox/messages.py +++ b/resolvers/inbox/messages.py @@ -4,67 +4,52 @@ from datetime import datetime from auth.authenticate import login_required from base.redis import redis -from base.resolvers import mutation, query, subscription +from base.resolvers import mutation, subscription from services.inbox import ChatFollowing, MessageResult, MessagesStorage -from resolvers.inbox.load import load_messages - - -@query.field("loadMessages") -@login_required -async def load_chat_messages(_, info, chat_id: str, offset: int = 0, amount: int = 50): - ''' load [amount] chat's messages with [offset] ''' - chat = await redis.execute("GET", f"chats/{chat_id}") - if not chat: - return { - "error": "chat not exist" - } - messages = await load_messages(chat_id, offset, amount) - return { - "messages": messages, - "error": None - } @mutation.field("createMessage") @login_required -async def create_message(_, info, chat_id: str, body: str, replyTo=None): +async def create_message(_, info, chat: str, body: str, replyTo=None): """ create message with :body for :chat_id replying to :replyTo optionally """ user = info.context["request"].user - chat = await redis.execute("GET", f"chats/{chat_id}") + chat = await redis.execute("GET", f"chats/{chat}") if not chat: return { "error": "chat not exist" } - message_id = await redis.execute("GET", f"chats/{chat_id}/next_message_id") - message_id = int(message_id) - new_message = { - "chatId": chat_id, - "id": message_id, - "author": user.slug, - "body": body, - "replyTo": replyTo, - "createdAt": int(datetime.now().timestamp()), - } - await redis.execute( - "SET", f"chats/{chat_id}/messages/{message_id}", json.dumps(new_message) - ) - await redis.execute("LPUSH", f"chats/{chat_id}/message_ids", str(message_id)) - await redis.execute("SET", f"chats/{chat_id}/next_message_id", str(message_id + 1)) - - chat = json.loads(chat) - users = chat["users"] - for user_slug in users: + else: + chat = dict(json.loads(chat)) + message_id = await redis.execute("GET", f"chats/{chat['id']}/next_message_id") + message_id = int(message_id) + new_message = { + "chatId": chat['id'], + "id": message_id, + "author": user.slug, + "body": body, + "replyTo": replyTo, + "createdAt": int(datetime.now().timestamp()), + } await redis.execute( - "LPUSH", f"chats/{chat_id}/unread/{user_slug}", str(message_id) + "SET", f"chats/{chat['id']}/messages/{message_id}", json.dumps(new_message) ) + await redis.execute("LPUSH", f"chats/{chat['id']}/message_ids", str(message_id)) + await redis.execute("SET", f"chats/{chat['id']}/next_message_id", str(message_id + 1)) - result = MessageResult("NEW", new_message) - await MessagesStorage.put(result) + chat = json.loads(chat) + users = chat["users"] + for user_slug in users: + await redis.execute( + "LPUSH", f"chats/{chat['id']}/unread/{user_slug}", str(message_id) + ) - return { - "message": new_message, - "error": None - } + result = MessageResult("NEW", new_message) + await MessagesStorage.put(result) + + return { + "message": new_message, + "error": None + } @mutation.field("updateMessage") @@ -174,6 +159,7 @@ async def message_generator(obj, info): while True: msg = await asyncio.gather(*tasks) + print('[inbox] %d new messages' % len(tasks)) yield msg finally: await MessagesStorage.remove_chat(following_chat) diff --git a/resolvers/inbox/search.py b/resolvers/inbox/search.py index 82048e00..5d6d4b82 100644 --- a/resolvers/inbox/search.py +++ b/resolvers/inbox/search.py @@ -9,13 +9,13 @@ from orm.user import AuthorFollower @query.field("searchUsers") @login_required -async def search_users(_, info, query: str, offset: int = 0, amount: int = 50): +async def search_users(_, info, query: str, limit: int = 50, offset: int = 0): result = [] # TODO: maybe redis scan? user = info.context["request"].user talk_before = await redis.execute("GET", f"/chats_by_user/{user.slug}") if talk_before: - talk_before = list(json.loads(talk_before))[offset:offset + amount] + talk_before = list(json.loads(talk_before))[offset:offset + limit] for chat_id in talk_before: members = await redis.execute("GET", f"/chats/{chat_id}/users") if members: @@ -26,54 +26,18 @@ async def search_users(_, info, query: str, offset: int = 0, amount: int = 50): result.append(member) user = info.context["request"].user - more_amount = amount - len(result) + more_amount = limit - len(result) with local_session() as session: # followings result += session.query(AuthorFollower.author).where(AuthorFollower.follower.startswith(query))\ .offset(offset + len(result)).limit(more_amount) - more_amount = amount + more_amount = limit # followers result += session.query(AuthorFollower.follower).where(AuthorFollower.author.startswith(query))\ - .offset(offset + len(result)).limit(offset + len(result) + amount) + .offset(offset + len(result)).limit(offset + len(result) + limit) return { "slugs": list(result), "error": None } - - -@query.field("searchChats") -@login_required -async def search_chats(_, info, query: str, offset: int = 0, amount: int = 50): - user = info.context["request"].user - my_chats = await redis.execute("GET", f"/chats_by_user/{user.slug}") - chats = [] - for chat_id in my_chats: - chat = await redis.execute("GET", f"chats/{chat_id}") - if chat: - chat = dict(json.loads(chat)) - chats.append(chat) - return { - "chats": chats, - "error": None - } - - -@query.field("searchMessages") -@login_required -async def search_messages(_, info, query: str, offset: int = 0, amount: int = 50): - user = info.context["request"].user - my_chats = await redis.execute("GET", f"/chats_by_user/{user.slug}") - chats = [] - if my_chats: - my_chats = list(json.loads(my_chats)) - for chat_id in my_chats: - chat = await redis.execute("GET", f"chats/{chat_id}") - if chat: - chat = dict(json.loads(chat)) - chats.append(chat) - return { - "chats": chats, - "error": None - } diff --git a/resolvers/profile.py b/resolvers/profile.py index bfd79dd7..fbd3aed2 100644 --- a/resolvers/profile.py +++ b/resolvers/profile.py @@ -1,20 +1,20 @@ from typing import List - -from sqlalchemy import and_, desc, func +from datetime import datetime, timedelta +from sqlalchemy import and_, func from sqlalchemy.orm import selectinload from auth.authenticate import login_required from base.orm import local_session from base.resolvers import mutation, query from orm.reaction import Reaction -from orm.shout import Shout from orm.topic import Topic, TopicFollower from orm.user import AuthorFollower, Role, User, UserRating, UserRole from services.auth.users import UserStorage from services.stat.reacted import ReactedStorage -from services.zine.shoutscache import ShoutsCache +from services.stat.topicstat import TopicStat +from services.zine.shoutauthor import ShoutAuthorStorage -from .community import followed_communities +# from .community import followed_communities from .inbox.load import get_total_unread_counter from .topics import get_topic_stat @@ -25,7 +25,7 @@ async def user_subscriptions(slug: str): "topics": [t.slug for t in await followed_topics(slug)], # followed topics slugs "authors": [a.slug for a in await followed_authors(slug)], # followed authors slugs "reactions": await ReactedStorage.get_shouts_by_author(slug), - "communities": [c.slug for c in followed_communities(slug)], # communities + # "communities": [c.slug for c in followed_communities(slug)], # communities } @@ -46,24 +46,6 @@ async def get_author_stat(slug): } -@query.field("userReactedShouts") -async def get_user_reacted_shouts(_, slug: str, offset: int, limit: int) -> List[Shout]: - user = await UserStorage.get_user_by_slug(slug) - if not user: - return [] - with local_session() as session: - shouts = ( - session.query(Shout) - .join(Reaction) - .where(Reaction.createdBy == user.slug) - .order_by(desc(Reaction.createdAt)) - .limit(limit) - .offset(offset) - .all() - ) - return shouts - - @query.field("userFollowedTopics") @login_required async def get_followed_topics(_, info, slug) -> List[Topic]: @@ -115,20 +97,7 @@ async def user_followers(_, _info, slug) -> List[User]: return users -@query.field("getUsersBySlugs") -async def get_users_by_slugs(_, _info, slugs): - with local_session() as session: - users = ( - session.query(User) - .options(selectinload(User.ratings)) - .filter(User.slug in slugs) - .all() - ) - return users - - -@query.field("getUserRoles") -async def get_user_roles(_, _info, slug): +async def get_user_roles(slug): with local_session() as session: user = session.query(User).where(User.slug == slug).first() roles = ( @@ -206,22 +175,41 @@ def author_unfollow(user, slug): @query.field("authorsAll") async def get_authors_all(_, _info): users = await UserStorage.get_all_users() - authorslugs = await ShoutsCache.get_all_authors_slugs() authors = [] for author in users: - if author.slug in authorslugs: + if ShoutAuthorStorage.shouts_by_author.get(author.slug): author.stat = await get_author_stat(author.slug) authors.append(author) return authors -@query.field("topAuthors") -def get_top_authors(_, _info, offset, limit): - return list(UserStorage.get_top_users())[offset : offset + limit] # type: ignore - - -@query.field("getAuthor") -async def get_author(_, _info, slug): - a = await UserStorage.get_user_by_slug(slug) - a.stat = await get_author_stat(slug) - return a +@query.field("loadAuthorsBy") +async def load_authors_by(_, info, by, limit, offset): + authors = [] + with local_session() as session: + aq = session.query(User) + if by.get("slug"): + aq = aq.filter(User.slug.ilike(f"%{by['slug']}%")) + elif by.get("name"): + aq = aq.filter(User.name.ilike(f"%{by['name']}%")) + elif by.get("topic"): + aaa = list(map(lambda a: a.slug, TopicStat.authors_by_topic.get(by["topic"]))) + aq = aq.filter(User.name._in(aaa)) + if by.get("lastSeen"): # in days + days_before = datetime.now() - timedelta(days=by["lastSeen"]) + aq = aq.filter(User.lastSeen > days_before) + elif by.get("createdAt"): # in days + days_before = datetime.now() - timedelta(days=by["createdAt"]) + aq = aq.filter(User.createdAt > days_before) + aq = aq.group_by( + User.id + ).order_by( + by.get("order") or "createdAt" + ).limit(limit).offset(offset) + print(aq) + authors = list(map(lambda r: r.User, session.execute(aq))) + if by.get("stat"): + for a in authors: + a.stat = await get_author_stat(a.slug) + authors = list(set(authors)).sort(authors, key=lambda a: a["stat"].get(by.get("stat"))) + return authors diff --git a/resolvers/reactions.py b/resolvers/reactions.py index f1b789ad..2ca7ebd2 100644 --- a/resolvers/reactions.py +++ b/resolvers/reactions.py @@ -1,6 +1,7 @@ -from datetime import datetime +from datetime import datetime, timedelta -from sqlalchemy import and_, desc +from sqlalchemy import and_, desc, select, text, func +from sqlalchemy.orm import selectinload from auth.authenticate import login_required from base.orm import local_session @@ -8,14 +9,12 @@ from base.resolvers import mutation, query from orm.reaction import Reaction, ReactionKind from orm.shout import Shout, ShoutReactionsFollower from orm.user import User -from services.auth.users import UserStorage from services.stat.reacted import ReactedStorage -from services.stat.viewed import ViewedStorage async def get_reaction_stat(reaction_id): return { - "viewed": await ViewedStorage.get_reaction(reaction_id), + # "viewed": await ViewStat.get_reaction(reaction_id), "reacted": len(await ReactedStorage.get_reaction(reaction_id)), "rating": await ReactedStorage.get_reaction_rating(reaction_id), "commented": len(await ReactedStorage.get_reaction_comments(reaction_id)), @@ -117,14 +116,14 @@ def set_published(session, slug, publisher): s = session.query(Shout).where(Shout.slug == slug).first() s.publishedAt = datetime.now() s.publishedBy = publisher - s.visibility = 'public' + s.visibility = text('public') session.add(s) session.commit() def set_hidden(session, slug): s = session.query(Shout).where(Shout.slug == slug).first() - s.visibility = 'authors' + s.visibility = text('authors') s.publishedAt = None # TODO: discuss s.publishedBy = None # TODO: store changes history in git session.add(s) @@ -202,57 +201,58 @@ async def delete_reaction(_, info, rid): return {} -@query.field("reactionsForShouts") -async def get_reactions_for_shouts(_, info, shouts, offset, limit): - return await reactions_for_shouts(shouts, offset, limit) +@query.field("loadReactionsBy") +async def load_reactions_by(_, info, by, limit=50, offset=0): + """ + :param by: { + shout: 'some-slug' + author: 'discours', + topic: 'culture', + body: 'something else', + stat: 'rating' | 'comments' | 'reacted' | 'views', + days: 30 + } + :param limit: int amount of shouts + :param offset: int offset in this order + :return: Reaction[] + """ + q = select(Reaction).options( + selectinload(Reaction.shout), + ).where( + Reaction.deletedAt.is_(None) + ).join( + Shout, + Shout.slug == Reaction.shout + ) + if by.get("slug"): + q = q.filter(Shout.slug == by["slug"]) + else: + if by.get("reacted"): + user = info.context["request"].user + q = q.filter(Reaction.createdBy == user.slug) + if by.get("author"): + q = q.filter(Reaction.createdBy == by["author"]) + if by.get("topic"): + q = q.filter(Shout.topics.contains(by["topic"])) + if by.get("body"): + if by["body"] is True: + q = q.filter(func.length(Reaction.body) > 0) + else: + q = q.filter(Reaction.body.ilike(f'%{by["body"]}%')) + if by.get("days"): + before = datetime.now() - timedelta(days=int(by["days"]) or 30) + q = q.filter(Reaction.createdAt > before) + q = q.group_by(Shout.id).order_by( + desc(by.get("order") or "createdAt") + ).limit(limit).offset(offset) -async def reactions_for_shouts(shouts, offset, limit): - reactions = [] + rrr = [] with local_session() as session: - for slug in shouts: - reactions += ( - session.query(Reaction) - .filter(Reaction.shout == slug) - .where(Reaction.deletedAt.is_not(None)) - .order_by(desc("createdAt")) - .offset(offset) - .limit(limit) - .all() - ) - for r in reactions: - r.stat = await get_reaction_stat(r.id) - r.createdBy = await UserStorage.get_user(r.createdBy or "discours") - return reactions - reactions = [] - with local_session() as session: - for slug in shouts: - reactions += ( - session.query(Reaction) - .filter(Reaction.shout == slug) - .where(Reaction.deletedAt.is_not(None)) - .order_by(desc("createdAt")) - .offset(offset) - .limit(limit) - .all() - ) - for r in reactions: - r.stat = await get_reaction_stat(r.id) - r.createdBy = await UserStorage.get_user(r.createdBy or "discours") - return reactions - - -@query.field("reactionsByAuthor") -async def get_reactions_by_author(_, info, slug, limit=50, offset=0): - reactions = [] - with local_session() as session: - reactions = ( - session.query(Reaction) - .where(Reaction.createdBy == slug) - .limit(limit) - .offset(offset) - ) - for r in reactions: - r.stat = await get_reaction_stat(r.id) - r.createdBy = await UserStorage.get_user(r.createdBy or "discours") - return reactions + # post query stats and author's captions + for r in list(map(lambda r: r.Reaction, session.execute(q))): + r.stat = await get_reaction_stat(r.id) + rrr.append(r) + if by.get("stat"): + rrr.sort(lambda r: r.stat.get(by["stat"]) or r.createdAt) + return rrr diff --git a/resolvers/topics.py b/resolvers/topics.py index 12d914f8..f1c7f1d1 100644 --- a/resolvers/topics.py +++ b/resolvers/topics.py @@ -6,11 +6,10 @@ from auth.authenticate import login_required from base.orm import local_session from base.resolvers import mutation, query from orm.topic import Topic, TopicFollower -from services.zine.shoutscache import ShoutsCache from services.zine.topics import TopicStorage from services.stat.reacted import ReactedStorage from services.stat.topicstat import TopicStat -from services.stat.viewed import ViewedStorage +from services.stat.views import ViewStat async def get_topic_stat(slug): @@ -18,7 +17,7 @@ async def get_topic_stat(slug): "shouts": len(TopicStat.shouts_by_topic.get(slug, {}).keys()), "authors": len(TopicStat.authors_by_topic.get(slug, {}).keys()), "followers": len(TopicStat.followers_by_topic.get(slug, {}).keys()), - "viewed": await ViewedStorage.get_topic(slug), + "viewed": await ViewStat.get_topic(slug), "reacted": len(await ReactedStorage.get_topic(slug)), "commented": len(await ReactedStorage.get_topic_comments(slug)), "rating": await ReactedStorage.get_topic_rating(slug) @@ -43,7 +42,7 @@ async def topics_by_community(_, info, community): @query.field("topicsByAuthor") async def topics_by_author(_, _info, author): - shouts = ShoutsCache.by_author.get(author, []) + shouts = TopicStorage.get_topics_by_author(author) author_topics = set() for s in shouts: for tpc in s.topics: diff --git a/resolvers/zine.py b/resolvers/zine.py index 29adf225..c782e759 100644 --- a/resolvers/zine.py +++ b/resolvers/zine.py @@ -1,249 +1,87 @@ -from graphql.type import GraphQLResolveInfo from datetime import datetime, timedelta from sqlalchemy.orm import selectinload -from sqlalchemy.sql.expression import and_, desc, select +from sqlalchemy.sql.expression import or_, desc, select from auth.authenticate import login_required from base.orm import local_session from base.resolvers import mutation, query -from orm.collection import ShoutCollection -from orm.shout import Shout, ShoutTopic -from orm.topic import Topic -from resolvers.community import community_follow, community_unfollow +from orm.shout import Shout +from orm.reaction import Reaction +# from resolvers.community import community_follow, community_unfollow from resolvers.profile import author_follow, author_unfollow from resolvers.reactions import reactions_follow, reactions_unfollow from resolvers.topics import topic_follow, topic_unfollow -from services.search import SearchService -from services.stat.viewed import ViewedStorage from services.zine.shoutauthor import ShoutAuthorStorage -from services.zine.shoutscache import ShoutsCache, get_shout_stat +from services.stat.reacted import ReactedStorage -@mutation.field("incrementView") -async def increment_view(_, _info, shout): - # TODO: use ackee to collect views - async with ViewedStorage.lock: - return ViewedStorage.increment(shout) +@query.field("loadShoutsBy") +async def load_shouts_by(_, info, by, limit=50, offset=0): + """ + :param by: { + layout: 'audio', + visibility: "public", + author: 'discours', + topic: 'culture', + title: 'something', + body: 'something else', + stat: 'rating' | 'comments' | 'reacted' | 'views', + days: 30 + } + :param limit: int amount of shouts + :param offset: int offset in this order + :return: Shout[] + """ - -@query.field("topMonth") -async def top_month(_, _info, offset, limit): - async with ShoutsCache.lock: - return ShoutsCache.top_month[offset : offset + limit] - - -@query.field("topPublished") -async def top_published(_, _info, daysago, offset, limit): - async with ShoutsCache.lock: - return ShoutsCache.get_top_published_before(daysago, offset, limit) - - -@query.field("topCommented") -async def top_commented(_, _info, offset, limit): - async with ShoutsCache.lock: - return ShoutsCache.top_commented[offset : offset + limit] - - -@query.field("topOverall") -async def top_overall(_, _info, offset, limit): - async with ShoutsCache.lock: - return ShoutsCache.top_overall[offset : offset + limit] - - -@query.field("recentPublished") -async def recent_published(_, _info, offset, limit): - async with ShoutsCache.lock: - return ShoutsCache.recent_published[offset : offset + limit] - - -@query.field("recentAll") -async def recent_all(_, _info, offset, limit): - async with ShoutsCache.lock: - return ShoutsCache.recent_all[offset : offset + limit] - - -@query.field("recentReacted") -async def recent_reacted(_, _info, offset, limit): - async with ShoutsCache.lock: - return ShoutsCache.recent_reacted[offset : offset + limit] - - -@query.field("recentCommented") -async def recent_commented(_, _info, offset, limit): - async with ShoutsCache.lock: - return ShoutsCache.recent_commented[offset : offset + limit] - - -@query.field("getShoutBySlug") -async def get_shout_by_slug(_, info, slug): - all_fields = [ - node.name.value for node in info.field_nodes[0].selection_set.selections - ] - selected_fields = set(["authors", "topics"]).intersection(all_fields) - select_options = [selectinload(getattr(Shout, field)) for field in selected_fields] - with local_session() as session: - # s = text(open("src/queries/shout-by-slug.sql", "r").read() % slug) - shout = ( - session.query(Shout) - .options(select_options) - .filter(Shout.slug == slug) - .first() - ) - - if not shout: - print(f"shout with slug {slug} not exist") - return {"error": "shout not found"} - else: - for a in shout.authors: - a.caption = await ShoutAuthorStorage.get_author_caption(slug, a.slug) - return shout - - -@query.field("searchQuery") -async def get_search_results(_, _info, searchtext, offset, limit): - shouts = SearchService.search(searchtext) - # TODO: sort and filter types for search service - for s in shouts: - shout = s.dict() - for a in shout['authors']: - a.caption = await ShoutAuthorStorage.get_author_caption(s.slug, a.slug) - s.stat.relevance = 1 # FIXME: expecting search engine rated relevance - return shouts[offset : offset + limit] - - -@query.field("shoutsByAuthors") -async def shouts_by_authors(_, _info, slugs, offset=0, limit=100): - async with ShoutsCache.lock: - shouts = {} - for author in slugs: - shouts_by_author = list(ShoutsCache.by_author.get(author, {}).values()) - for s in shouts_by_author: - for a in s.authors: - a.caption = await ShoutAuthorStorage.get_author_caption(s.slug, a.slug) - if bool(s.publishedAt): - shouts[s.slug] = s - shouts_prepared = list(shouts.values()) - shouts_prepared.sort(key=lambda s: s.publishedAt, reverse=True) - return shouts_prepared[offset : offset + limit] - - -@query.field("recentLayoutShouts") -async def shouts_by_layout_recent(_param, _info: GraphQLResolveInfo, layout, amount=100, offset=0): - async with ShoutsCache.lock: - shouts = {} - # for layout in ['image', 'audio', 'video', 'literature']: - shouts_by_layout = list(ShoutsCache.by_layout.get(layout, [])) - for s in shouts_by_layout: - if s.visibility == 'public': # if bool(s.publishedAt): - shouts[s.slug] = s - for a in s.authors: - a.caption = await ShoutAuthorStorage.get_author_caption(s.slug, a.slug) - - shouts_prepared = list(shouts.values()) - shouts_prepared.sort(key=lambda s: s.createdAt, reverse=True) - return shouts_prepared[offset : offset + amount] - - -@query.field("topLayoutShouts") -async def shouts_by_layout_top(_param, _info: GraphQLResolveInfo, layout, amount=100, offset=0): - async with ShoutsCache.lock: - shouts = {} - # for layout in ['image', 'audio', 'video', 'literature']: - shouts_by_layout = list(ShoutsCache.by_layout.get(layout, [])) - for s in shouts_by_layout: - if s.visibility == 'public': # if bool(s.publishedAt): - shouts[s.slug] = s - for a in s.authors: - a.caption = await ShoutAuthorStorage.get_author_caption(s.slug, a.slug) - s.stat = await get_shout_stat(s.slug) - shouts_prepared = list(shouts.values()) - shouts_prepared.sort(key=lambda s: s.stat["rating"], reverse=True) - return shouts_prepared[offset : offset + amount] - - -@query.field("topMonthLayoutShouts") -async def shouts_by_layout_topmonth(_param, _info: GraphQLResolveInfo, layout, amount=100, offset=0): - async with ShoutsCache.lock: - shouts = {} - # for layout in ['image', 'audio', 'video', 'literature']: - shouts_by_layout = list(ShoutsCache.by_layout.get(layout, [])) - month_ago = datetime.now() - timedelta(days=30) - for s in shouts_by_layout: - if s.visibility == 'public' and s.createdAt > month_ago: - shouts[s.slug] = s - for a in s.authors: - a.caption = await ShoutAuthorStorage.get_author_caption(s.slug, a.slug) - - shouts_prepared = list(shouts.values()) - shouts_prepared.sort(key=lambda s: s.stat["rating"], reverse=True) - return shouts_prepared[offset : offset + amount] - - -@query.field("shoutsByTopics") -async def shouts_by_topics(_, _info, slugs, offset=0, limit=100): - async with ShoutsCache.lock: - shouts = {} - for topic in slugs: - shouts_by_topic = list(ShoutsCache.by_topic.get(topic, {}).values()) - for s in shouts_by_topic: - for a in s.authors: - a.caption = await ShoutAuthorStorage.get_author_caption(s.slug, a.slug) - if bool(s.publishedAt): - shouts[s.slug] = s - shouts_prepared = list(shouts.values()) - shouts_prepared.sort(key=lambda s: s.publishedAt, reverse=True) - return shouts_prepared[offset : offset + limit] - - -@query.field("shoutsByCollection") -async def shouts_by_collection(_, _info, collection, offset, limit): - with local_session() as session: - shouts = ( - session.query(Shout) - .join(ShoutCollection, ShoutCollection.collection == collection) - .where(and_(ShoutCollection.shout == Shout.slug, Shout.publishedAt.is_not(None))) - .order_by(desc("publishedAt")) - .limit(limit) - .offset(offset) - ) - for s in shouts: - for a in s.authors: - a.caption = await ShoutAuthorStorage.get_author_caption(s.slug, a.slug) - return shouts - - -SINGLE_COMMUNITY = True - - -@query.field("shoutsByCommunities") -async def shouts_by_communities(_, info, slugs, offset, limit): - if SINGLE_COMMUNITY: - return recent_published(_, info, offset, limit) + q = select(Shout, Reaction).options( + selectinload(Shout.authors), + selectinload(Shout.topics), + selectinload(Shout.reactions) + ).where( + Shout.deletedAt.is_(None) + ).join( + Reaction, Reaction.shout == Shout.slug + ) + if by.get("slug"): + q = q.filter(Shout.slug == by["slug"]) else: - with local_session() as session: - # TODO fix postgres high load - shouts = ( - session.query(Shout) - .distinct() - .join(ShoutTopic) - .where( - and_( - Shout.publishedAt.is_not(None), - ShoutTopic.topic.in_( - select(Topic.slug).where(Topic.community.in_(slugs)) - ), - ) - ) - .order_by(desc("publishedAt")) - .limit(limit) - .offset(offset) - ) - - for s in shouts: + if by.get("reacted"): + user = info.context["request"].user + q = q.filter(Reaction.createdBy == user.slug) + if by.get("visibility"): + q = q.filter(or_( + Shout.visibility.ilike(f"%{by.get('visibility')}%"), + Shout.visibility.ilike(f"%{'public'}%"), + )) + if by.get("layout"): + q = q.filter(Shout.layout == by["layout"]) + if by.get("author"): + q = q.filter(Shout.authors.contains(by["author"])) + if by.get("topic"): + q = q.filter(Shout.topics.contains(by["topic"])) + if by.get("title"): + q = q.filter(Shout.title.ilike(f'%{by["title"]}%')) + if by.get("body"): + q = q.filter(Shout.body.ilike(f'%{by["body"]}%')) + if by.get("days"): + before = datetime.now() - timedelta(days=int(by["days"]) or 30) + q = q.filter(Shout.createdAt > before) + q = q.group_by(Shout.id, Reaction.id).order_by( + desc(by.get("order") or "createdAt") + ).limit(limit).offset(offset) + print(q) + shouts = [] + with local_session() as session: + # post query stats and author's captions + for s in list(map(lambda r: r.Shout, session.execute(q))): + s.stat = await ReactedStorage.get_shout_stat(s.slug) for a in s.authors: a.caption = await ShoutAuthorStorage.get_author_caption(s.slug, a.slug) - return shouts + shouts.append(s) + if by.get("stat"): + shouts.sort(lambda s: s.stat.get(by["stat"]) or s.createdAt) + return shouts @mutation.field("follow") @@ -256,7 +94,8 @@ async def follow(_, info, what, slug): elif what == "TOPIC": topic_follow(user, slug) elif what == "COMMUNITY": - community_follow(user, slug) + # community_follow(user, slug) + pass elif what == "REACTIONS": reactions_follow(user, slug) except Exception as e: @@ -276,7 +115,8 @@ async def unfollow(_, info, what, slug): elif what == "TOPIC": topic_unfollow(user, slug) elif what == "COMMUNITY": - community_unfollow(user, slug) + # community_unfollow(user, slug) + pass elif what == "REACTIONS": reactions_unfollow(user, slug) except Exception as e: diff --git a/schema.graphql b/schema.graphql index 14420949..3dace2c9 100644 --- a/schema.graphql +++ b/schema.graphql @@ -110,18 +110,6 @@ input ProfileInput { bio: String } -input CommunityInput { - title: String! - desc: String - pic: String -} - -input CollectionInput { - title: String! - desc: String - pic: String -} - input TopicInput { slug: String! community: String! @@ -161,8 +149,8 @@ type Mutation { updateChat(chat: ChatInput!): Result! deleteChat(chatId: String!): Result! inviteChat(chatId: String!, userslug: String!): Result! - enterChat(chatId: String!): Result! - createMessage(chatId: String!, body: String!, replyTo: String): Result! + + createMessage(chat: String!, body: String!, replyTo: String): Result! updateMessage(chatId: String!, id: Int!, body: String!): Result! deleteMessage(chatId: String!, id: Int!): Result! markAsRead(chatId: String!, ids: [Int]!): Result! @@ -180,7 +168,7 @@ type Mutation { # user profile rateUser(slug: String!, value: Int!): Result! - # updateOnlineStatus: Result! + updateOnlineStatus: Result! updateProfile(profile: ProfileInput!): Result! # topics @@ -189,22 +177,11 @@ type Mutation { updateTopic(input: TopicInput!): Result! destroyTopic(slug: String!): Result! - # reactions createReaction(reaction: ReactionInput!): Result! updateReaction(reaction: ReactionInput!): Result! deleteReaction(id: Int!): Result! - # community - createCommunity(community: CommunityInput!): Result! - updateCommunity(community: CommunityInput!): Result! - deleteCommunity(slug: String!): Result! - - # collection - createCollection(collection: CollectionInput!): Result! - updateCollection(collection: CollectionInput!): Result! - deleteCollection(slug: String!): Result! - # collab inviteAuthor(author: String!, shout: String!): Result! removeAuthor(author: String!, shout: String!): Result! @@ -212,65 +189,77 @@ type Mutation { # following follow(what: FollowingEntity!, slug: String!): Result! unfollow(what: FollowingEntity!, slug: String!): Result! - - # seen - incrementView(shout: String!): Result! } +input MessagesBy { + author: String + body: String + chat: String + order: String + days: Int + stat: String +} + +input AuthorsBy { + lastSeen: DateTime + createdAt: DateTime + slug: String + name: String + topic: String + order: String + days: Int + stat: String +} + +input ShoutsBy { + slug: String + title: String + body: String + topic: String + topics: [String] + author: String + authors: [String] + layout: String + visibility: String + order: String + days: Int + stat: String +} + +input ReactionBy { + shout: String + shouts: [String] + body: String + topic: String + author: String + order: String + days: Int + stat: String +} ################################### Query type Query { # inbox - loadChats(offset: Int, amount: Int): Result! - loadMessages(chatId: String!, offset: Int, amount: Int): Result! - searchUsers(q: String!, offset: Int, amount: Int): Result! - searchChats(q: String!, offset: Int, amount: Int): Result! - searchMessages(q: String!, offset: Int, amount: Int): Result! + loadChats( limit: Int, offset: Int): Result! # your chats + loadMessagesBy(by: MessagesBy!, limit: Int, offset: Int): Result! + searchUsers(query: String!, limit: Int, offset: Int): Result! + chatUsersAll: [ChatUser]! # auth isEmailUsed(email: String!): Boolean! signIn(email: String!, password: String, lang: String): AuthResult! signOut: AuthResult! - # profile - getUsersBySlugs(slugs: [String]!): [Author]! + # zine + loadAuthorsBy(by: AuthorsBy, limit: Int, offset: Int): [Author]! + loadShoutsBy(by: ShoutsBy, limit: Int, offset: Int): [Shout]! + loadReactionsBy(by: ReactionBy!, limit: Int, offset: Int): [Reaction]! userFollowers(slug: String!): [Author]! userFollowedAuthors(slug: String!): [Author]! userFollowedTopics(slug: String!): [Topic]! - userFollowedCommunities(slug: String!): [Community]! - userReactedShouts(slug: String!): [Shout]! # test - getUserRoles(slug: String!): [Role]! authorsAll: [Author]! getAuthor(slug: String!): User! - # shouts - getShoutBySlug(slug: String!): Shout! - shoutsForFeed(offset: Int!, limit: Int!): [Shout]! # test - shoutsByLayout(layout: String, amount: Int!, offset: Int!): [Shout]! - shoutsByTopics(slugs: [String]!, offset: Int!, limit: Int!): [Shout]! - shoutsByAuthors(slugs: [String]!, offset: Int!, limit: Int!): [Shout]! - shoutsByCommunities(slugs: [String]!, offset: Int!, limit: Int!): [Shout]! - # topReacted(offset: Int!, limit: Int!): [Shout]! - topAuthors(offset: Int!, limit: Int!): [Author]! # by User.rating - topPublished(daysago: Int!, offset: Int!, limit: Int!): [Shout]! - topMonth(offset: Int!, limit: Int!): [Shout]! # TODO: implement topPublishedAfter(day, offset, limit) - topOverall(offset: Int!, limit: Int!): [Shout]! - topCommented(offset: Int!, limit: Int!): [Shout]! - recentPublished(offset: Int!, limit: Int!): [Shout]! # homepage - recentReacted(offset: Int!, limit: Int!): [Shout]! # TODO: use in design! - recentCommented(offset: Int!, limit: Int!): [Shout]! - recentAll(offset: Int!, limit: Int!): [Shout]! - recentCandidates(offset: Int!, limit: Int!): [Shout]! - - # expo - topMonthLayoutShouts(layout: String!, amount: Int, offset: Int): [Shout]! - topLayoutShouts(layout: String!, amount: Int, offset: Int): [Shout]! - recentLayoutShouts(layout: String!, amount: Int, offset: Int): [Shout]! - - # reactons - reactionsByAuthor(slug: String!, offset: Int!, limit: Int!): [Reaction]! - reactionsForShouts(shouts: [String]!, offset: Int!, limit: Int!): [Reaction]! - # collab getCollabs: [Collab]! @@ -283,18 +272,6 @@ type Query { topicsRandom(amount: Int): [Topic]! topicsByCommunity(community: String!): [Topic]! topicsByAuthor(author: String!): [Topic]! - - # collections - collectionsAll: [Collection]! - getUserCollections(author: String!): [Collection]! - shoutsByCollection(collection: String!, offset: Int!, limit: Int!): [Shout]! - - # communities - getCommunity(slug: String): Community! - getCommunities: [Community]! # all - - # search - searchQuery(q: String, offset: Int!, limit: Int!): [Shout] } ############################################ Subscription @@ -372,6 +349,14 @@ type User { oid: String } +type ChatUser { + id: Int! + slug: String! + name: String! + userpic: String + lastSeen: DateTime +} + type Collab { authors: [String]! invites: [String] @@ -440,6 +425,7 @@ type Shout { deletedBy: User publishedBy: User publishedAt: DateTime + media: String stat: Stat } diff --git a/services/auth/users.py b/services/auth/users.py index 489155ce..7d4d6c0b 100644 --- a/services/auth/users.py +++ b/services/auth/users.py @@ -1,7 +1,6 @@ import asyncio - from sqlalchemy.orm import selectinload - +from base.orm import local_session from orm.user import User @@ -34,6 +33,11 @@ class UserStorage: aaa.sort(key=lambda user: user.createdAt) return aaa + @staticmethod + async def get_all_chat_users(): + with local_session() as session: + return session.query(User).where(User.emailConfirmed).all() + @staticmethod async def get_top_users(): self = UserStorage diff --git a/services/main.py b/services/main.py index 3783b55f..bcd886f0 100644 --- a/services/main.py +++ b/services/main.py @@ -1,4 +1,3 @@ -from services.stat.viewed import ViewedStorage from services.stat.reacted import ReactedStorage from services.auth.roles import RoleStorage from services.auth.users import UserStorage @@ -10,7 +9,6 @@ from base.orm import local_session async def storages_init(): with local_session() as session: print('[main] initialize storages') - ViewedStorage.init(session) ReactedStorage.init(session) RoleStorage.init(session) UserStorage.init(session) diff --git a/services/search.py b/services/search.py index 89e6abce..e11a7372 100644 --- a/services/search.py +++ b/services/search.py @@ -9,6 +9,7 @@ class SearchService: @staticmethod async def init(session): async with SearchService.lock: + print('[search.service] init') SearchService.cache = {} @staticmethod diff --git a/services/stat/reacted.py b/services/stat/reacted.py index e6ec7b37..d91b2daf 100644 --- a/services/stat/reacted.py +++ b/services/stat/reacted.py @@ -2,6 +2,7 @@ import asyncio from base.orm import local_session from orm.reaction import ReactionKind, Reaction from services.zine.topics import TopicStorage +from services.stat.views import ViewStat def kind_to_rate(kind) -> int: @@ -32,6 +33,15 @@ class ReactedStorage: lock = asyncio.Lock() modified_shouts = set([]) + @staticmethod + async def get_shout_stat(slug): + return { + "viewed": await ViewStat.get_shout(slug), + "reacted": len(await ReactedStorage.get_shout(slug)), + "commented": len(await ReactedStorage.get_comments(slug)), + "rating": await ReactedStorage.get_rating(slug), + } + @staticmethod async def get_shout(shout_slug): self = ReactedStorage @@ -158,22 +168,25 @@ class ReactedStorage: self = ReactedStorage all_reactions = session.query(Reaction).all() self.modified_shouts = list(set([r.shout for r in all_reactions])) - print("[stat.reacted] %d shouts with reactions loaded" % len(self.modified_shouts)) + print("[stat.reacted] %d shouts with reactions" % len(self.modified_shouts)) @staticmethod async def recount_changed(session): self = ReactedStorage async with self.lock: - print('[stat.reacted] recounting...') - for slug in list(self.modified_shouts): + sss = list(self.modified_shouts) + c = 0 + for slug in sss: siblings = session.query(Reaction).where(Reaction.shout == slug).all() + c += len(siblings) await self.recount(siblings) + print("[stat.reacted] %d reactions total" % c) print("[stat.reacted] %d shouts" % len(self.modified_shouts)) print("[stat.reacted] %d topics" % len(self.reacted["topics"].values())) print("[stat.reacted] %d shouts" % len(self.reacted["shouts"])) print("[stat.reacted] %d authors" % len(self.reacted["authors"].values())) - print("[stat.reacted] %d reactions" % len(self.reacted["reactions"])) + print("[stat.reacted] %d reactions replied" % len(self.reacted["reactions"])) self.modified_shouts = set([]) @staticmethod diff --git a/services/stat/topicstat.py b/services/stat/topicstat.py index 086d83c8..034967f5 100644 --- a/services/stat/topicstat.py +++ b/services/stat/topicstat.py @@ -19,7 +19,7 @@ class TopicStat: async def load_stat(session): self = TopicStat shout_topics = session.query(ShoutTopic).all() - print("[stat.topics] shouts linked %d times" % len(shout_topics)) + print("[stat.topics] %d links for shouts" % len(shout_topics)) for shout_topic in shout_topics: tpc = shout_topic.topic # shouts by topics @@ -34,17 +34,14 @@ class TopicStat: [aslug, acaption] = a self.authors_by_topic[tpc][aslug] = acaption - print("[stat.topics] shouts indexed by %d topics" % len(self.shouts_by_topic.keys())) - print("[stat.topics] authors indexed by %d topics" % len(self.authors_by_topic.keys())) - self.followers_by_topic = {} followings = session.query(TopicFollower).all() + print("[stat.topics] %d followings by users" % len(followings)) for flw in followings: topic = flw.topic userslug = flw.follower self.followers_by_topic[topic] = self.followers_by_topic.get(topic, dict()) self.followers_by_topic[topic][userslug] = userslug - print("[stat.topics] followers indexed by %d topics" % len(self.followers_by_topic.keys())) @staticmethod async def get_shouts(topic): diff --git a/services/stat/viewed.py b/services/stat/viewed.py deleted file mode 100644 index 70f1fc30..00000000 --- a/services/stat/viewed.py +++ /dev/null @@ -1,110 +0,0 @@ -import asyncio -from datetime import datetime - -from base.orm import local_session - -from sqlalchemy.orm.attributes import flag_modified - -from orm.shout import ShoutTopic -from orm.viewed import ViewedByDay - - -class ViewedStorage: - viewed = {"shouts": {}, "topics": {}, "reactions": {}} - this_day_views = {} - to_flush = [] - period = 30 * 60 # sec - lock = asyncio.Lock() - - @staticmethod - def init(session): - self = ViewedStorage - views = session.query(ViewedByDay).all() - - for view in views: - shout = view.shout - topics = ( - session.query(ShoutTopic.topic).filter(ShoutTopic.shout == shout).all() - ) - value = view.value - if shout: - old_value = self.viewed["shouts"].get(shout, 0) - self.viewed["shouts"][shout] = old_value + value - for t in topics: - old_topic_value = self.viewed["topics"].get(t, 0) - self.viewed["topics"][t] = old_topic_value + value - if shout not in self.this_day_views: - self.this_day_views[shout] = view - this_day_view = self.this_day_views[shout] - if this_day_view.day < view.day: - self.this_day_views[shout] = view - - print("[stat.viewed] %d shouts viewed" % len(self.viewed['shouts'])) - - @staticmethod - async def get_shout(shout_slug): - self = ViewedStorage - async with self.lock: - return self.viewed["shouts"].get(shout_slug, 0) - - @staticmethod - async def get_topic(topic_slug): - self = ViewedStorage - async with self.lock: - return self.viewed["topics"].get(topic_slug, 0) - - @staticmethod - async def get_reaction(reaction_id): - self = ViewedStorage - async with self.lock: - return self.viewed["reactions"].get(reaction_id, 0) - - @staticmethod - async def increment(shout_slug, amount=1): - self = ViewedStorage - async with self.lock: - this_day_view = self.this_day_views.get(shout_slug) - day_start = datetime.now().replace(hour=0, minute=0, second=0) - if not this_day_view or this_day_view.day < day_start: - if this_day_view and getattr(this_day_view, "modified", False): - self.to_flush.append(this_day_view) - this_day_view = ViewedByDay.create(shout=shout_slug, value=1) - self.this_day_views[shout_slug] = this_day_view - else: - this_day_view.value = this_day_view.value + amount - this_day_view.modified = True - self.viewed["shouts"][shout_slug] = (self.viewed["shouts"].get(shout_slug, 0) + amount) - with local_session() as session: - topics = ( - session.query(ShoutTopic.topic) - .where(ShoutTopic.shout == shout_slug) - .all() - ) - for t in topics: - self.viewed["topics"][t] = self.viewed["topics"].get(t, 0) + amount - flag_modified(this_day_view, "value") - - @staticmethod - async def flush_changes(session): - self = ViewedStorage - async with self.lock: - for view in self.this_day_views.values(): - if getattr(view, "modified", False): - session.add(view) - flag_modified(view, "value") - view.modified = False - for view in self.to_flush: - session.add(view) - self.to_flush.clear() - session.commit() - - @staticmethod - async def worker(): - while True: - try: - with local_session() as session: - await ViewedStorage.flush_changes(session) - print("[stat.viewed] periodical flush") - except Exception as err: - print("[stat.viewed] : %s" % (err)) - await asyncio.sleep(ViewedStorage.period) diff --git a/services/stat/views.py b/services/stat/views.py new file mode 100644 index 00000000..722d745d --- /dev/null +++ b/services/stat/views.py @@ -0,0 +1,127 @@ +import asyncio +import json + +from gql import Client, gql +from gql.transport.aiohttp import AIOHTTPTransport + +from base.redis import redis +from services.zine.topics import TopicStorage +from ssl import create_default_context + + +query_ackee_views = gql( + """ + query getDomainsFacts { + domains { + statistics { + views { + id + count + } + pages { + id + count + created + } + } + facts { + activeVisitors + # averageViews + # averageDuration + viewsToday + viewsMonth + viewsYear + } + } + } + """ +) + +ssl = create_default_context() + + +class ViewStat: + lock = asyncio.Lock() + by_slugs = {} + by_topics = {} + period = 5 * 60 # 5 minutes + transport = AIOHTTPTransport(url="https://ackee.discours.io/", ssl=ssl) + client = Client(transport=transport, fetch_schema_from_transport=True) + + @staticmethod + async def load_views(): + # TODO: when the struture of paylod will be transparent + # TODO: perhaps ackee token getting here + + self = ViewStat + async with self.lock: + self.by_topics = await redis.execute("GET", "views_by_topics") + if self.by_topics: + self.by_topics = dict(json.loads(self.by_topics)) + else: + self.by_topics = {} + self.by_slugs = await redis.execute("GET", "views_by_shouts") + if self.by_slugs: + self.by_slugs = dict(json.loads(self.by_slugs)) + else: + self.by_slugs = {} + domains = await self.client.execute_async(query_ackee_views) + print("[stat.ackee] loaded domains") + print(domains) + + print('\n\n# TODO: something here...\n\n') + + @staticmethod + async def get_shout(shout_slug): + self = ViewStat + async with self.lock: + return self.by_slugs.get(shout_slug) or 0 + + @staticmethod + async def get_topic(topic_slug): + self = ViewStat + async with self.lock: + shouts = self.by_topics.get(topic_slug) or {} + topic_views = 0 + for v in shouts.values(): + topic_views += v + return topic_views + + @staticmethod + async def increment(shout_slug, amount=1): + self = ViewStat + async with self.lock: + self.by_slugs[shout_slug] = self.by_slugs.get(shout_slug) or 0 + self.by_slugs[shout_slug] += amount + await redis.execute( + "SET", + f"views_by_shouts/{shout_slug}", + str(self.by_slugs[shout_slug]) + ) + shout_topics = await TopicStorage.get_topics_by_slugs([shout_slug, ]) + for t in shout_topics: + self.by_topics[t] = self.by_topics.get(t) or {} + self.by_topics[t][shout_slug] = self.by_topics[t].get(shout_slug) or 0 + self.by_topics[t][shout_slug] += amount + await redis.execute( + "SET", + f"views_by_topics/{t}/{shout_slug}", + str(self.by_topics[t][shout_slug]) + ) + + @staticmethod + async def reset(): + self = ViewStat + self.by_topics = {} + self.by_slugs = {} + + @staticmethod + async def worker(): + self = ViewStat + while True: + try: + await self.load_views() + except Exception as err: + print("[stat.ackee] : %s" % (err)) + print("[stat.ackee] renew period: %d minutes" % (ViewStat.period / 60)) + await asyncio.sleep(self.period) diff --git a/services/zine/shoutauthor.py b/services/zine/shoutauthor.py index 6906c9ef..b56e3ed0 100644 --- a/services/zine/shoutauthor.py +++ b/services/zine/shoutauthor.py @@ -17,7 +17,10 @@ class ShoutAuthorStorage: for sa in sas: self.authors_by_shout[sa.shout] = self.authors_by_shout.get(sa.shout, []) self.authors_by_shout[sa.shout].append([sa.user, sa.caption]) - print("[zine.shouts] %d shouts indexed by authors" % len(self.authors_by_shout)) + self.shouts_by_author[sa.user] = self.shouts_by_author.get(sa.user, []) + self.shouts_by_author[sa.user].append(sa.shout) + print("[zine.authors] %d shouts indexed by authors" % len(self.authors_by_shout)) + print("[zine.authors] %d authors indexed by shouts" % len(self.shouts_by_author)) @staticmethod async def get_authors(shout): @@ -42,7 +45,7 @@ class ShoutAuthorStorage: with local_session() as session: async with self.lock: await self.load(session) - print("[zine.shouts] index by authors was updated") + print("[zine.authors] index by authors was updated") except Exception as err: - print("[zine.shouts] error indexing by author: %s" % (err)) + print("[zine.authors] error indexing by author: %s" % (err)) await asyncio.sleep(self.period) diff --git a/services/zine/shoutscache.py b/services/zine/shoutscache.py deleted file mode 100644 index 442c09ab..00000000 --- a/services/zine/shoutscache.py +++ /dev/null @@ -1,285 +0,0 @@ -import asyncio -from datetime import datetime, timedelta - -from sqlalchemy import and_, desc, func, select -from sqlalchemy.orm import selectinload - -from base.orm import local_session -from orm.reaction import Reaction, ReactionKind -from orm.shout import Shout -from services.stat.reacted import ReactedStorage - - -async def get_shout_stat(slug): - return { - # TODO: use ackee as datasource - "viewed": 0, # await ViewedStorage.get_shout(slug), - "reacted": len(await ReactedStorage.get_shout(slug)), - "commented": len(await ReactedStorage.get_comments(slug)), - "rating": await ReactedStorage.get_rating(slug), - } - - -async def prepare_shouts(session, stmt): - shouts = [] - print(stmt) - for s in list(map(lambda r: r.Shout, session.execute(stmt))): - s.stat = await get_shout_stat(s.slug) - shouts.append(s) - return shouts - - -LAYOUTS = ['audio', 'video', 'image', 'literature'] - - -class ShoutsCache: - # limit = 200 - period = 60 * 60 # 1 hour - lock = asyncio.Lock() - - recent_published = [] - recent_all = [] - recent_reacted = [] - recent_commented = [] - top_month = [] - top_overall = [] - top_commented = [] - - by_author = {} - by_topic = {} - by_layout = {} - - @staticmethod - async def prepare_recent_published(): - with local_session() as session: - shouts = await prepare_shouts( - session, - ( - select(Shout) - .options( - selectinload(Shout.authors), - selectinload(Shout.topics) - ) - .where(Shout.deletedAt.is_(None)) - .filter(Shout.publishedAt.is_not(None)) - .group_by(Shout.id) - .order_by(desc("publishedAt")) - # .limit(ShoutsCache.limit) - ), - ) - async with ShoutsCache.lock: - for s in shouts: - for a in s.authors: - ShoutsCache.by_author[a.slug] = ShoutsCache.by_author.get(a.slug, {}) - ShoutsCache.by_author[a.slug][s.slug] = s - for t in s.topics: - ShoutsCache.by_topic[t.slug] = ShoutsCache.by_topic.get(t.slug, {}) - ShoutsCache.by_topic[t.slug][s.slug] = s - if s.layout in LAYOUTS: - ShoutsCache.by_layout[s.layout] = ShoutsCache.by_layout.get(s.layout, []) - ShoutsCache.by_layout[s.layout].append(s) - print("[zine.cache] indexed by %d topics " % len(ShoutsCache.by_topic.keys())) - print("[zine.cache] indexed by %d authors " % len(ShoutsCache.by_author.keys())) - print("[zine.cache] indexed by %d layouts " % len(ShoutsCache.by_layout.keys())) - ShoutsCache.recent_published = shouts - print("[zine.cache] %d recently published shouts " % len(shouts)) - - @staticmethod - async def prepare_recent_all(): - with local_session() as session: - shouts = await prepare_shouts( - session, - ( - select(Shout) - .options( - selectinload(Shout.authors), - selectinload(Shout.topics) - ) - .where(Shout.deletedAt.is_(None)) - .group_by(Shout.id) - .order_by(desc("createdAt")) - # .limit(ShoutsCache.limit) - ) - ) - async with ShoutsCache.lock: - ShoutsCache.recent_all = shouts - print("[zine.cache] %d recently created shouts " % len(ShoutsCache.recent_all)) - - @staticmethod - async def prepare_recent_reacted(): - with local_session() as session: - reactions = session.query(Reaction).order_by(Reaction.createdAt).all() - # .limit(ShoutsCache.limit) - reacted_slugs = set([]) - for r in reactions: - reacted_slugs.add(r.shout) - shouts = await prepare_shouts( - session, - ( - select( - Shout, - Reaction.createdAt.label('reactedAt') - ) - .options( - selectinload(Shout.authors), - selectinload(Shout.topics), - selectinload(Shout.reactions), - ) - .join(Reaction) - .where(and_(Shout.deletedAt.is_(None), Shout.slug.in_(reacted_slugs))) - .filter(Shout.publishedAt.is_not(None)) - .group_by(Shout.id, "reactedAt") - .order_by(desc("reactedAt")) - # .limit(ShoutsCache.limit) - ) - ) - async with ShoutsCache.lock: - ShoutsCache.recent_reacted = shouts - print("[zine.cache] %d recently reacted shouts " % len(shouts)) - - @staticmethod - async def prepare_recent_commented(): - with local_session() as session: - reactions = session.query(Reaction).order_by(Reaction.createdAt).all() - # .limit(ShoutsCache.limit) - commented_slugs = set([]) - for r in reactions: - if r.body and len(r.body) > 0: - commented_slugs.add(r.shout) - shouts = await prepare_shouts( - session, - ( - select( - Shout, - Reaction.createdAt.label('reactedAt') - ) - .options( - selectinload(Shout.authors), - selectinload(Shout.topics), - selectinload(Shout.reactions), - ) - .join(Reaction) - .where(and_(Shout.deletedAt.is_(None), Shout.slug.in_(commented_slugs))) - .group_by(Shout.id, "reactedAt") - .order_by(desc("reactedAt")) - # .limit(ShoutsCache.limit) - ) - ) - async with ShoutsCache.lock: - ShoutsCache.recent_commented = shouts - print("[zine.cache] %d recently commented shouts " % len(shouts)) - - @staticmethod - async def prepare_top_overall(): - with local_session() as session: - shouts = await prepare_shouts( - session, - ( - select( - Shout, - func.sum(Reaction.id).label('reacted') - ) - .options( - selectinload(Shout.authors), - selectinload(Shout.topics), - selectinload(Shout.reactions), - ) - .join(Reaction, Reaction.kind == ReactionKind.LIKE) - .where(Shout.deletedAt.is_(None)) - .filter(Shout.publishedAt.is_not(None)) - .group_by(Shout.id) - .order_by(desc("reacted")) - # .limit(ShoutsCache.limit) - ), - ) - shouts.sort(key=lambda s: s.stat["rating"], reverse=True) - async with ShoutsCache.lock: - print("[zine.cache] %d top rated published " % len(shouts)) - ShoutsCache.top_overall = shouts - - @staticmethod - async def prepare_top_month(): - month_ago = datetime.now() - timedelta(days=30) - with local_session() as session: - shouts = await prepare_shouts( - session, - ( - select(Shout) - .options( - selectinload(Shout.authors), - selectinload(Shout.topics), - selectinload(Shout.reactions), - ) - .join(Reaction) - .where(Shout.deletedAt.is_(None)) - .filter(Shout.publishedAt > month_ago) - .group_by(Shout.id) - # .limit(ShoutsCache.limit) - ), - ) - shouts.sort(key=lambda s: s.stat["rating"], reverse=True) - async with ShoutsCache.lock: - ShoutsCache.top_month = shouts - print("[zine.cache] %d top month published " % len(ShoutsCache.top_month)) - - @staticmethod - async def prepare_top_commented(): - month_ago = datetime.now() - timedelta(days=30) - with local_session() as session: - shouts = await prepare_shouts( - session, - ( - select( - Shout, - func.sum(Reaction.id).label("commented") - ) - .options( - selectinload(Shout.authors), - selectinload(Shout.topics), - selectinload(Shout.reactions) - ) - .join(Reaction, func.length(Reaction.body) > 0) - .where(Shout.deletedAt.is_(None)) - .filter(Shout.publishedAt > month_ago) - .group_by(Shout.id) - .order_by(desc("commented")) - # .limit(ShoutsCache.limit) - ), - ) - shouts.sort(key=lambda s: s.stat["commented"], reverse=True) - async with ShoutsCache.lock: - ShoutsCache.top_commented = shouts - print("[zine.cache] %d last month top commented shouts " % len(ShoutsCache.top_commented)) - - @staticmethod - async def get_top_published_before(daysago, offset, limit): - shouts_by_rating = [] - before = datetime.now() - timedelta(days=daysago) - for s in ShoutsCache.recent_published: - if s.publishedAt >= before: - shouts_by_rating.append(s) - shouts_by_rating.sort(lambda s: s.stat["rating"], reverse=True) - return shouts_by_rating - - @staticmethod - async def get_all_authors_slugs(): - slugs = ShoutsCache.by_author.keys() - return slugs - - @staticmethod - async def worker(): - while True: - try: - await ShoutsCache.prepare_top_month() - await ShoutsCache.prepare_top_overall() - await ShoutsCache.prepare_top_commented() - - await ShoutsCache.prepare_recent_published() - await ShoutsCache.prepare_recent_all() - await ShoutsCache.prepare_recent_reacted() - await ShoutsCache.prepare_recent_commented() - print("[zine.cache] periodical update") - except Exception as err: - print("[zine.cache] error: %s" % (err)) - raise err - await asyncio.sleep(ShoutsCache.period) diff --git a/settings.py b/settings.py index 7f2e9a8e..7712303d 100644 --- a/settings.py +++ b/settings.py @@ -8,8 +8,8 @@ DB_URL = ( ) JWT_ALGORITHM = "HS256" JWT_SECRET_KEY = environ.get("JWT_SECRET_KEY") or "8f1bd7696ffb482d8486dfbc6e7d16dd-secret-key" -SESSION_TOKEN_LIFE_SPAN = 24 * 60 * 60 # seconds -ONETIME_TOKEN_LIFE_SPAN = 1 * 60 * 60 # seconds +SESSION_TOKEN_LIFE_SPAN = 30 * 24 * 60 * 60 # 1 month in seconds +ONETIME_TOKEN_LIFE_SPAN = 24 * 60 * 60 # 1 day in seconds REDIS_URL = environ.get("REDIS_URL") or "redis://127.0.0.1" MAILGUN_API_KEY = environ.get("MAILGUN_API_KEY")