Merge pull request #34 from Discours/cudl

CUDL
This commit is contained in:
Tony 2022-11-17 09:30:27 +03:00 committed by GitHub
commit ae1857cc93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 598 additions and 1407 deletions

View File

@ -66,7 +66,7 @@ async def oauth_login(request):
provider = request.path_params["provider"] provider = request.path_params["provider"]
request.session["provider"] = provider request.session["provider"] = provider
client = oauth.create_client(provider) client = oauth.create_client(provider)
redirect_uri = "https://newapi.discours.io/oauth-authorize" redirect_uri = "https://v2.discours.io/oauth-authorize"
return await client.authorize_redirect(request, redirect_uri) return await client.authorize_redirect(request, redirect_uri)

View File

@ -1,14 +0,0 @@
#!/bin/bash
openssl req -newkey rsa:4096 \
-x509 \
-sha256 \
-days 3650 \
-nodes \
-out server.crt \
-keyout server.key \
-subj "/C=RU/ST=Moscow/L=Moscow/O=Discours/OU=Site/CN=newapi.discours.io"
openssl x509 -in server.crt -out server.pem -outform PEM
tar cvf server.tar server.crt server.key
dokku certs:add discoursio-api < server.tar

12
main.py
View File

@ -14,11 +14,10 @@ from auth.oauth import oauth_login, oauth_authorize
from base.redis import redis from base.redis import redis
from base.resolvers import resolvers from base.resolvers import resolvers
from resolvers.auth import confirm_email_handler from resolvers.auth import confirm_email_handler
from resolvers.zine import ShoutsCache
from services.main import storages_init from services.main import storages_init
from services.stat.reacted import ReactedStorage from services.stat.reacted import ReactedStorage
from services.stat.topicstat import TopicStat from services.stat.topicstat import TopicStat
from services.stat.viewed import ViewedStorage from services.stat.views import ViewStat
from services.zine.gittask import GitTask from services.zine.gittask import GitTask
from services.zine.shoutauthor import ShoutAuthorStorage from services.zine.shoutauthor import ShoutAuthorStorage
import_module("resolvers") import_module("resolvers")
@ -32,20 +31,17 @@ middleware = [
async def start_up(): async def start_up():
await redis.connect() await redis.connect()
viewed_storage_task = asyncio.create_task(ViewedStorage.worker()) await storages_init()
print(viewed_storage_task) views_stat_task = asyncio.create_task(ViewStat.worker())
print(views_stat_task)
reacted_storage_task = asyncio.create_task(ReactedStorage.worker()) reacted_storage_task = asyncio.create_task(ReactedStorage.worker())
print(reacted_storage_task) print(reacted_storage_task)
shouts_cache_task = asyncio.create_task(ShoutsCache.worker())
print(shouts_cache_task)
shout_author_task = asyncio.create_task(ShoutAuthorStorage.worker()) shout_author_task = asyncio.create_task(ShoutAuthorStorage.worker())
print(shout_author_task) print(shout_author_task)
topic_stat_task = asyncio.create_task(TopicStat.worker()) topic_stat_task = asyncio.create_task(TopicStat.worker())
print(topic_stat_task) print(topic_stat_task)
git_task = asyncio.create_task(GitTask.git_task_worker()) git_task = asyncio.create_task(GitTask.git_task_worker())
print(git_task) print(git_task)
await storages_init()
print()
async def shutdown(): async def shutdown():

View File

@ -7,7 +7,7 @@ import sys
from datetime import datetime from datetime import datetime
import bs4 import bs4
from base.redis import redis
from migration.tables.comments import migrate as migrateComment from migration.tables.comments import migrate as migrateComment
from migration.tables.comments import migrate_2stage as migrateComment_2stage from migration.tables.comments import migrate_2stage as migrateComment_2stage
from migration.tables.content_items import get_shout_slug from migration.tables.content_items import get_shout_slug
@ -181,7 +181,11 @@ async def all_handle(storage, args):
print("[migration] handle everything") print("[migration] handle everything")
await users_handle(storage) await users_handle(storage)
await topics_handle(storage) await topics_handle(storage)
print("[migration] users and topics are migrated")
await redis.connect()
print("[migration] redis connected")
await shouts_handle(storage, args) await shouts_handle(storage, args)
print("[migration] migrating comments")
await comments_handle(storage) await comments_handle(storage)
# export_email_subscriptions() # export_email_subscriptions()
print("[migration] done!") print("[migration] done!")
@ -295,9 +299,9 @@ def create_pgdump():
async def handle_auto(): async def handle_auto():
print("[migration] no option given, auto mode")
url = os.getenv("MONGODB_URL") url = os.getenv("MONGODB_URL")
if url: if url:
print("[migration] connecting mongo")
mongo_download(url) mongo_download(url)
bson_handle() bson_handle()
await all_handle(data_load(), sys.argv) await all_handle(data_load(), sys.argv)

View File

@ -285,13 +285,13 @@ def prepare_md_body(entry):
if "title" in m: if "title" in m:
trackname += m.get("title", "") trackname += m.get("title", "")
addon += ( addon += (
'<MusicPlayer src="' '<AudioPlayer src="'
+ m.get("fileUrl", "") + m.get("fileUrl", "")
+ '" title="' + '" title="'
+ trackname + trackname
+ '" />\n' + '" />\n'
) )
body = "import MusicPlayer from '$/components/Article/MusicPlayer'\n\n" + addon body = "import AudioPlayer from '$/components/Article/AudioPlayer'\n\n" + addon
body_orig, media = extract_html(entry) body_orig, media = extract_html(entry)
if body_orig: if body_orig:

View File

@ -1,5 +1,5 @@
from datetime import datetime from datetime import datetime
import json
from dateutil.parser import parse as date_parse from dateutil.parser import parse as date_parse
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
from transliterate import translit from transliterate import translit
@ -12,7 +12,7 @@ from orm.shout import Shout, ShoutTopic, ShoutReactionsFollower
from orm.user import User from orm.user import User
from orm.topic import TopicFollower from orm.topic import TopicFollower
from services.stat.reacted import ReactedStorage from services.stat.reacted import ReactedStorage
from services.stat.viewed import ViewedStorage from services.stat.views import ViewStat
OLD_DATE = "2016-03-05 22:22:00.350000" OLD_DATE = "2016-03-05 22:22:00.350000"
ts = datetime.now() ts = datetime.now()
@ -149,6 +149,12 @@ async def migrate(entry, storage):
if entry.get("published"): if entry.get("published"):
r["publishedAt"] = date_parse(entry.get("publishedAt", OLD_DATE)) r["publishedAt"] = date_parse(entry.get("publishedAt", OLD_DATE))
r["visibility"] = "public" r["visibility"] = "public"
with local_session() as session:
# update user.emailConfirmed if published
author = session.query(User).where(User.slug == userslug).first()
author.emailConfirmed = True
session.add(author)
session.commit()
else: else:
r["visibility"] = "authors" r["visibility"] = "authors"
if "deletedAt" in entry: if "deletedAt" in entry:
@ -192,7 +198,7 @@ async def migrate(entry, storage):
# body # body
r["body"], media = prepare_html_body(entry) r["body"], media = prepare_html_body(entry)
if media: if media:
print(media) r["media"] = json.dumps(media)
# save shout to db # save shout to db
s = object() s = object()
shout_dict = r.copy() shout_dict = r.copy()
@ -340,7 +346,7 @@ async def migrate(entry, storage):
raise Exception("[migration] content_item.ratings error: \n%r" % content_rating) raise Exception("[migration] content_item.ratings error: \n%r" % content_rating)
# shout views # shout views
await ViewedStorage.increment(shout_dict["slug"], amount=entry.get("views", 1)) await ViewStat.increment(shout_dict["slug"], amount=entry.get("views", 1))
# del shout_dict['ratings'] # del shout_dict['ratings']
shout_dict["oid"] = entry.get("_id") shout_dict["oid"] = entry.get("_id")
storage["shouts"]["by_oid"][entry["_id"]] = shout_dict storage["shouts"]["by_oid"][entry["_id"]] = shout_dict

View File

@ -34,7 +34,9 @@ def migrate(entry):
user_dict["slug"] = ( user_dict["slug"] = (
entry["profile"].get("path").lower().replace(" ", "-").strip() entry["profile"].get("path").lower().replace(" ", "-").strip()
) )
user_dict["bio"] = html2text(entry.get("profile").get("bio") or "") user_dict["bio"] = html2text(
entry.get("profile").get("bio") or ""
).replace('\(', '(').replace('\)', ')')
# userpic # userpic
try: try:

View File

@ -5,7 +5,7 @@
{{ $upstream_port := index $port_map_list 2 }} {{ $upstream_port := index $port_map_list 2 }}
map $http_origin $allow_origin { map $http_origin $allow_origin {
~^https?:\/\/((.*\.)?localhost(:\d+)?|discoursio-webapp.*\.vercel\.app|(.*\.)?discours\.io(:\d+)?)$ $http_origin; ~^https?:\/\/((.*\.)?localhost(:\d+)?|discoursio-webapp-(.*)?\.vercel\.app|(.*\.)?discours\.io(:\d+)?)$ $http_origin;
default ""; default "";
} }

View File

@ -6,7 +6,6 @@ from orm.reaction import Reaction
from orm.shout import Shout from orm.shout import Shout
from orm.topic import Topic, TopicFollower from orm.topic import Topic, TopicFollower
from orm.user import User, UserRating from orm.user import User, UserRating
from orm.viewed import ViewedByDay
__all__ = [ __all__ = [
"User", "User",
@ -19,8 +18,7 @@ __all__ = [
"TopicFollower", "TopicFollower",
"Notification", "Notification",
"Reaction", "Reaction",
"UserRating", "UserRating"
"ViewedByDay"
] ]
Base.metadata.create_all(engine) Base.metadata.create_all(engine)

View File

@ -1,6 +1,6 @@
from datetime import datetime from datetime import datetime
from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, String from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, String, JSON
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from base.orm import Base from base.orm import Base
@ -58,9 +58,9 @@ class Shout(Base):
versionOf = Column(ForeignKey("shout.slug"), nullable=True) versionOf = Column(ForeignKey("shout.slug"), nullable=True)
lang = Column(String, default='ru') lang = Column(String, default='ru')
oid = Column(String, nullable=True) oid = Column(String, nullable=True)
media = Column(JSON, nullable=True)
createdAt = Column(DateTime, nullable=False, default=datetime.now, comment="Created at") createdAt = Column(DateTime, nullable=False, default=datetime.now, comment="Created at")
updatedAt = Column(DateTime, nullable=True, comment="Updated at") updatedAt = Column(DateTime, nullable=True, comment="Updated at")
publishedAt = Column(DateTime, nullable=True) publishedAt = Column(DateTime, nullable=True)
deletedAt = Column(DateTime, nullable=True) deletedAt = Column(DateTime, nullable=True)

View File

@ -1,12 +0,0 @@
from datetime import datetime
from sqlalchemy import Column, DateTime, ForeignKey, Integer
from base.orm import Base
class ViewedByDay(Base):
__tablename__ = "viewed_by_day"
id = None
shout = Column(ForeignKey("shout.slug"), primary_key=True)
day = Column(DateTime, primary_key=True, default=datetime.now)
value = Column(Integer)

View File

@ -1,11 +1,13 @@
python-frontmatter~=1.0.0 python-frontmatter~=1.0.0
aioredis~=2.0.1 aioredis~=2.0.1
aiohttp
ariadne>=0.16.0 ariadne>=0.16.0
PyYAML>=5.4 PyYAML>=5.4
pyjwt>=2.6.0 pyjwt>=2.6.0
starlette~=0.20.4 starlette~=0.20.4
sqlalchemy>=1.4.41 sqlalchemy>=1.4.41
graphql-core graphql-core
gql
uvicorn>=0.18.3 uvicorn>=0.18.3
pydantic>=1.10.2 pydantic>=1.10.2
passlib~=1.7.4 passlib~=1.7.4

View File

@ -8,32 +8,23 @@ from resolvers.auth import (
get_current_user, get_current_user,
) )
from resolvers.collab import remove_author, invite_author from resolvers.collab import remove_author, invite_author
from resolvers.community import (
create_community,
delete_community,
get_community,
get_communities,
)
from resolvers.migrate import markdown_body from resolvers.migrate import markdown_body
# from resolvers.collab import invite_author, remove_author # from resolvers.collab import invite_author, remove_author
from resolvers.editor import create_shout, delete_shout, update_shout from resolvers.editor import create_shout, delete_shout, update_shout
from resolvers.profile import ( from resolvers.profile import (
get_users_by_slugs, load_authors_by,
get_user_reacted_shouts, rate_user,
get_user_roles, update_profile
get_top_authors,
get_author
) )
# from resolvers.feed import shouts_for_feed, my_candidates
from resolvers.reactions import ( from resolvers.reactions import (
create_reaction, create_reaction,
delete_reaction, delete_reaction,
update_reaction, update_reaction,
reactions_unfollow, reactions_unfollow,
reactions_follow, reactions_follow,
load_reactions_by
) )
from resolvers.topics import ( from resolvers.topics import (
topic_follow, topic_follow,
@ -45,36 +36,31 @@ from resolvers.topics import (
) )
from resolvers.zine import ( from resolvers.zine import (
get_shout_by_slug,
follow, follow,
unfollow, unfollow,
increment_view, load_shouts_by
top_month,
top_overall,
recent_published,
recent_all,
recent_commented,
recent_reacted,
shouts_by_authors,
shouts_by_topics,
shouts_by_layout_recent,
shouts_by_layout_top,
shouts_by_layout_topmonth,
shouts_by_communities,
) )
from resolvers.inbox.chats import load_chats, \ from resolvers.inbox.chats import (
create_chat, delete_chat, update_chat, \ create_chat,
invite_to_chat, enter_chat delete_chat,
from resolvers.inbox.messages import load_chat_messages, \ update_chat,
create_message, delete_message, update_message, \ invite_to_chat
message_generator, mark_as_read )
from resolvers.inbox.search import search_users, \ from resolvers.inbox.messages import (
search_messages, search_chats create_message,
delete_message,
update_message,
message_generator,
mark_as_read
)
from resolvers.inbox.load import (
load_chats,
load_messages_by
)
from resolvers.inbox.search import search_users
__all__ = [ __all__ = [
"follow",
"unfollow",
# auth # auth
"login", "login",
"register_by_email", "register_by_email",
@ -83,27 +69,15 @@ __all__ = [
"auth_send_link", "auth_send_link",
"sign_out", "sign_out",
"get_current_user", "get_current_user",
# profile # authors
"get_users_by_slugs", "load_authors_by",
"get_user_roles", "rate_user",
"get_top_authors", "update_profile",
"get_author", "get_authors_all",
# zine # zine
"recent_published", "load_shouts_by",
"recent_commented", "follow",
"recent_reacted", "unfollow",
"recent_all",
"shouts_by_topics",
"shouts_by_layout_recent",
"shouts_by_layout_topmonth",
"shouts_by_layout_top",
"shouts_by_authors",
"shouts_by_communities",
"get_user_reacted_shouts",
"top_month",
"top_overall",
"increment_view",
"get_shout_by_slug",
# editor # editor
"create_shout", "create_shout",
"update_shout", "update_shout",
@ -120,31 +94,24 @@ __all__ = [
"topic_follow", "topic_follow",
"topic_unfollow", "topic_unfollow",
"get_topic", "get_topic",
# communities
"get_community",
"get_communities",
"create_community",
"delete_community",
# reactions # reactions
"reactions_follow", "reactions_follow",
"reactions_unfollow", "reactions_unfollow",
"create_reaction", "create_reaction",
"update_reaction", "update_reaction",
"delete_reaction", "delete_reaction",
"load_reactions_by",
# inbox # inbox
"load_chats",
"load_messages_by",
"invite_to_chat",
"create_chat", "create_chat",
"delete_chat", "delete_chat",
"update_chat", "update_chat",
"load_chats",
"create_message", "create_message",
"delete_message", "delete_message",
"update_message", "update_message",
"load_chat_messages",
"message_generator", "message_generator",
"mark_as_read", "mark_as_read",
"search_users", "search_users"
"search_chats",
"search_messages",
"enter_chat",
"invite_to_chat"
] ]

View File

@ -32,16 +32,16 @@ async def invite_author(_, info, author, shout):
authors = [a.id for a in shout.authors] authors = [a.id for a in shout.authors]
if user_id not in authors: if user_id not in authors:
return {"error": "access denied"} return {"error": "access denied"}
author = session.query(User).filter(User.slug == author).first() author = session.query(User).filter(User.id == author.id).first()
if author.id in authors: if author:
return {"error": "already added"} if author.id in authors:
shout.authors.append(author) return {"error": "already added"}
shout.authors.append(author)
shout.updated_at = datetime.now() shout.updated_at = datetime.now()
session.add(shout) session.add(shout)
session.commit() session.commit()
# TODO: email notify # TODO: email notify
return {} return {}
@ -59,9 +59,10 @@ async def remove_author(_, info, author, shout):
if user_id not in authors: if user_id not in authors:
return {"error": "access denied"} return {"error": "access denied"}
author = session.query(User).filter(User.slug == author).first() author = session.query(User).filter(User.slug == author).first()
if author.id not in authors: if author:
return {"error": "not in authors"} if author.id not in authors:
shout.authors.remove(author) return {"error": "not in authors"}
shout.authors.remove(author)
shout.updated_at = datetime.now() shout.updated_at = datetime.now()
session.add(shout) session.add(shout)
session.commit() session.commit()

View File

@ -1,104 +0,0 @@
from datetime import datetime
from sqlalchemy import and_
from auth.authenticate import login_required
from base.orm import local_session
from base.resolvers import mutation, query
from orm.collection import Collection, ShoutCollection
from orm.user import User
@mutation.field("createCollection")
@login_required
async def create_collection(_, _info, inp):
# auth = info.context["request"].auth
# user_id = auth.user_id
collection = Collection.create(
slug=inp.get("slug", ""),
title=inp.get("title", ""),
desc=inp.get("desc", ""),
pic=inp.get("pic", ""),
)
return {"collection": collection}
@mutation.field("updateCollection")
@login_required
async def update_collection(_, info, inp):
auth = info.context["request"].auth
user_id = auth.user_id
collection_slug = inp.get("slug", "")
with local_session() as session:
owner = session.query(User).filter(User.id == user_id) # note list here
collection = (
session.query(Collection).filter(Collection.slug == collection_slug).first()
)
editors = [e.slug for e in collection.editors]
if not collection:
return {"error": "invalid collection id"}
if collection.createdBy not in (owner + editors):
return {"error": "access denied"}
collection.title = inp.get("title", "")
collection.desc = inp.get("desc", "")
collection.pic = inp.get("pic", "")
collection.updatedAt = datetime.now()
session.commit()
@mutation.field("deleteCollection")
@login_required
async def delete_collection(_, info, slug):
auth = info.context["request"].auth
user_id = auth.user_id
with local_session() as session:
collection = session.query(Collection).filter(Collection.slug == slug).first()
if not collection:
return {"error": "invalid collection slug"}
if collection.owner != user_id:
return {"error": "access denied"}
collection.deletedAt = datetime.now()
session.add(collection)
session.commit()
return {}
@query.field("getUserCollections")
async def get_user_collections(_, _info, userslug):
collections = []
with local_session() as session:
user = session.query(User).filter(User.slug == userslug).first()
if user:
# TODO: check rights here
collections = (
session.query(Collection)
.where(
and_(Collection.createdBy == userslug, Collection.publishedAt.is_not(None))
)
.all()
)
for c in collections:
shouts = (
session.query(ShoutCollection)
.filter(ShoutCollection.collection == c.id)
.all()
)
c.amount = len(shouts)
return collections
@query.field("getMyColelctions")
@login_required
async def get_my_collections(_, info):
auth = info.context["request"].auth
user_id = auth.user_id
with local_session() as session:
collections = (
session.query(Collection).when(Collection.createdBy == user_id).all()
)
return collections
# TODO: get shouts list by collection

View File

@ -1,134 +0,0 @@
from datetime import datetime
from typing import List
from sqlalchemy import and_
from auth.authenticate import login_required
from base.orm import local_session
from base.resolvers import mutation, query
from orm.community import Community, CommunityFollower
from orm.user import User
@mutation.field("createCommunity")
@login_required
async def create_community(_, info, input):
auth = info.context["request"].auth
user_id = auth.user_id
with local_session() as session:
user = session.query(User).where(User.id == user_id).first()
community = Community.create(
slug=input.get("slug", ""),
title=input.get("title", ""),
desc=input.get("desc", ""),
pic=input.get("pic", ""),
createdBy=user.slug,
createdAt=datetime.now(),
)
session.add(community)
session.commit()
return {"community": community}
@mutation.field("updateCommunity")
@login_required
async def update_community(_, info, input):
auth = info.context["request"].auth
user_id = auth.user_id
community_slug = input.get("slug", "")
with local_session() as session:
owner = session.query(User).filter(User.id == user_id) # note list here
community = (
session.query(Community).filter(Community.slug == community_slug).first()
)
editors = [e.slug for e in community.editors]
if not community:
return {"error": "invalid community id"}
if community.createdBy not in (owner + editors):
return {"error": "access denied"}
community.title = input.get("title", "")
community.desc = input.get("desc", "")
community.pic = input.get("pic", "")
community.updatedAt = datetime.now()
session.add(community)
session.commit()
@mutation.field("deleteCommunity")
@login_required
async def delete_community(_, info, slug):
auth = info.context["request"].auth
user_id = auth.user_id
with local_session() as session:
community = session.query(Community).filter(Community.slug == slug).first()
if not community:
return {"error": "invalid community slug"}
if community.owner != user_id:
return {"error": "access denied"}
community.deletedAt = datetime.now()
session.add(community)
session.commit()
return {}
@query.field("getCommunity")
async def get_community(_, info, slug):
with local_session() as session:
community = session.query(Community).filter(Community.slug == slug).first()
if not community:
return {"error": "invalid community id"}
return community
@query.field("getCommunities")
async def get_communities(_, info):
with local_session() as session:
communities = session.query(Community)
return communities
def community_follow(user, slug):
with local_session() as session:
cf = CommunityFollower.create(follower=user.slug, community=slug)
session.add(cf)
session.commit()
def community_unfollow(user, slug):
with local_session() as session:
following = (
session.query(CommunityFollower)
.filter(
and_(
CommunityFollower.follower == user.slug,
CommunityFollower.community == slug,
)
)
.first()
)
if not following:
raise Exception("[orm.community] following was not exist")
session.delete(following)
session.commit()
@query.field("userFollowedCommunities")
def get_followed_communities(_, _info, user_slug) -> List[Community]:
return followed_communities(user_slug)
def followed_communities(user_slug) -> List[Community]:
ccc = []
with local_session() as session:
ccc = (
session.query(Community.slug)
.join(CommunityFollower)
.where(CommunityFollower.follower == user_slug)
.all()
)
return ccc

View File

@ -73,9 +73,14 @@ async def update_shout(_, info, inp):
shout.update(inp) shout.update(inp)
shout.updatedAt = datetime.now() shout.updatedAt = datetime.now()
session.add(shout) session.add(shout)
for topic in inp.get("topic_slugs", []): if inp.get("topics"):
st = ShoutTopic.create(shout=slug, topic=topic) # remove old links
session.add(st) links = session.query(ShoutTopic).where(ShoutTopic.shout == slug).all()
for topiclink in links:
session.delete(topiclink)
# add new topic links
for topic in inp.get("topics", []):
ShoutTopic.create(shout=slug, topic=topic)
session.commit() session.commit()
GitTask(inp, user.username, user.email, "update shout %s" % (slug)) GitTask(inp, user.username, user.email, "update shout %s" % (slug))

View File

@ -1,53 +0,0 @@
from typing import List
from sqlalchemy import and_, desc
from auth.authenticate import login_required
from base.orm import local_session
from base.resolvers import query
from orm.shout import Shout, ShoutAuthor, ShoutTopic
from orm.topic import TopicFollower
from orm.user import AuthorFollower
from services.zine.shoutscache import prepare_shouts
@query.field("shoutsForFeed")
@login_required
async def get_user_feed(_, info, offset, limit) -> List[Shout]:
user = info.context["request"].user
shouts = []
with local_session() as session:
shouts = (
session.query(Shout)
.join(ShoutAuthor)
.join(AuthorFollower)
.where(AuthorFollower.follower == user.slug)
.order_by(desc(Shout.createdAt))
)
topic_rows = (
session.query(Shout)
.join(ShoutTopic)
.join(TopicFollower)
.where(TopicFollower.follower == user.slug)
.order_by(desc(Shout.createdAt))
)
shouts = shouts.union(topic_rows).limit(limit).offset(offset).all()
return shouts
@query.field("recentCandidates")
@login_required
async def user_unpublished_shouts(_, info, offset, limit) -> List[Shout]:
user = info.context["request"].user
with local_session() as session:
shouts = prepare_shouts(
session.query(Shout)
.join(ShoutAuthor)
.where(and_(Shout.publishedAt.is_(None), ShoutAuthor.user == user.slug))
.order_by(desc(Shout.createdAt))
.group_by(Shout.id)
.limit(limit)
.offset(offset)
.all()
)
return shouts

View File

@ -5,7 +5,7 @@ from datetime import datetime
from auth.authenticate import login_required from auth.authenticate import login_required
from base.redis import redis from base.redis import redis
from base.resolvers import mutation, query from base.resolvers import mutation, query
from resolvers.inbox.load import load_messages, load_user_chats from services.auth.users import UserStorage
async def add_user_to_chat(user_slug: str, chat_id: str, chat=None): async def add_user_to_chat(user_slug: str, chat_id: str, chat=None):
@ -20,40 +20,6 @@ async def add_user_to_chat(user_slug: str, chat_id: str, chat=None):
await redis.execute("SET", f"chats_by_user/{member}", json.dumps(chats_ids)) await redis.execute("SET", f"chats_by_user/{member}", json.dumps(chats_ids))
@query.field("loadChats")
@login_required
async def load_chats(_, info):
user = info.context["request"].user
return await load_user_chats(user.slug)
@mutation.field("enterChat")
@login_required
async def enter_chat(_, info, chat_id: str):
''' enter to public chat with :chat_id '''
user = info.context["request"].user
chat = await redis.execute("GET", f"chats/{chat_id}")
if not chat:
return {
"error": "chat not exist"
}
else:
chat = dict(json.loads(chat))
if chat['private']:
return {
"error": "cannot enter private chat"
}
if user.slug not in chat["users"]:
chat["users"].append(user.slug)
await add_user_to_chat(user.slug, chat_id, chat)
await redis.execute("SET" f"chats/{chat_id}", json.dumps(chat))
chat['messages'] = await load_messages(chat_id)
return {
"chat": chat,
"error": None
}
@mutation.field("inviteChat") @mutation.field("inviteChat")
async def invite_to_chat(_, info, invited: str, chat_id: str): async def invite_to_chat(_, info, invited: str, chat_id: str):
''' invite user with :slug to chat with :chat_id ''' ''' invite user with :slug to chat with :chat_id '''
@ -156,3 +122,10 @@ async def delete_chat(_, info, chat_id: str):
return { return {
"error": "chat not exist" "error": "chat not exist"
} }
@query.field("chatUsersAll")
@login_required
async def get_chat_users_all(_, info):
chat_users = await UserStorage.get_all_chat_users()
return chat_users

View File

@ -1,51 +1,36 @@
import json import json
from datetime import datetime, timedelta
from auth.authenticate import login_required
from base.redis import redis from base.redis import redis
from base.resolvers import query
async def get_unread_counter(chat_id: str, user_slug: str): async def get_unread_counter(chat_id: str, user_slug: str):
try: try:
return int(await redis.execute("LLEN", f"chats/{chat_id}/unread/{user_slug}")) unread = await redis.execute("LLEN", f"chats/{chat_id}/unread/{user_slug}")
if unread:
return unread
except Exception: except Exception:
return 0 return 0
async def get_total_unread_counter(user_slug: str): async def get_total_unread_counter(user_slug: str):
chats = await redis.execute("GET", f"chats_by_user/{user_slug}") chats = await redis.execute("GET", f"chats_by_user/{user_slug}")
if not chats:
return 0
chats = json.loads(chats)
unread = 0 unread = 0
for chat_id in chats: if chats:
n = await get_unread_counter(chat_id, user_slug) chats = json.loads(chats)
unread += n for chat_id in chats:
n = await get_unread_counter(chat_id, user_slug)
unread += n
return unread return unread
async def load_user_chats(slug, offset: int, amount: int): async def load_messages(chatId: str, limit: int, offset: int):
""" load :amount chats of :slug user with :offset """ ''' load :limit messages for :chatId with :offset '''
chats = await redis.execute("GET", f"chats_by_user/{slug}")
if chats:
chats = list(json.loads(chats))[offset:offset + amount]
if not chats:
chats = []
for c in chats:
c['messages'] = await load_messages(c['id'])
c['unread'] = await get_unread_counter(c['id'], slug)
return {
"chats": chats,
"error": None
}
async def load_messages(chatId: str, offset: int, amount: int):
''' load :amount messages for :chatId with :offset '''
messages = [] messages = []
message_ids = await redis.lrange( message_ids = await redis.lrange(
f"chats/{chatId}/message_ids", 0 - offset - amount, 0 - offset f"chats/{chatId}/message_ids", 0 - offset - limit, 0 - offset
) )
if message_ids: if message_ids:
message_keys = [ message_keys = [
@ -57,3 +42,61 @@ async def load_messages(chatId: str, offset: int, amount: int):
"messages": messages, "messages": messages,
"error": None "error": None
} }
@query.field("loadChats")
@login_required
async def load_chats(_, info, limit: int, offset: int):
""" load :limit chats of current user with :offset """
user = info.context["request"].user
chats = await redis.execute("GET", f"chats_by_user/{user.slug}")
if chats:
chats = list(json.loads(chats))[offset:offset + limit]
if not chats:
chats = []
for c in chats:
c['messages'] = await load_messages(c['id'], limit, offset)
c['unread'] = await get_unread_counter(c['id'], user.slug)
return {
"chats": chats,
"error": None
}
@query.field("loadMessagesBy")
@login_required
async def load_messages_by(_, info, by, limit: int = 50, offset: int = 0):
''' load :amolimitunt messages of :chat_id with :offset '''
user = info.context["request"].user
my_chats = await redis.execute("GET", f"chats_by_user/{user.slug}")
chat_id = by.get('chat')
if chat_id:
chat = await redis.execute("GET", f"chats/{chat_id}")
if not chat:
return {
"error": "chat not exist"
}
messages = await load_messages(chat_id, limit, offset)
user_id = by.get('author')
if user_id:
chats = await redis.execute("GET", f"chats_by_user/{user_id}")
our_chats = list(set(chats) & set(my_chats))
for c in our_chats:
messages += await load_messages(c, limit, offset)
body_like = by.get('body')
if body_like:
for c in my_chats:
mmm = await load_messages(c, limit, offset)
for m in mmm:
if body_like in m["body"]:
messages.append(m)
days = by.get("days")
if days:
messages = filter(
lambda m: datetime.now() - int(m["createdAt"]) < timedelta(days=by.get("days")),
messages
)
return {
"messages": messages,
"error": None
}

View File

@ -4,67 +4,52 @@ from datetime import datetime
from auth.authenticate import login_required from auth.authenticate import login_required
from base.redis import redis from base.redis import redis
from base.resolvers import mutation, query, subscription from base.resolvers import mutation, subscription
from services.inbox import ChatFollowing, MessageResult, MessagesStorage from services.inbox import ChatFollowing, MessageResult, MessagesStorage
from resolvers.inbox.load import load_messages
@query.field("loadMessages")
@login_required
async def load_chat_messages(_, info, chat_id: str, offset: int = 0, amount: int = 50):
''' load [amount] chat's messages with [offset] '''
chat = await redis.execute("GET", f"chats/{chat_id}")
if not chat:
return {
"error": "chat not exist"
}
messages = await load_messages(chat_id, offset, amount)
return {
"messages": messages,
"error": None
}
@mutation.field("createMessage") @mutation.field("createMessage")
@login_required @login_required
async def create_message(_, info, chat_id: str, body: str, replyTo=None): async def create_message(_, info, chat: str, body: str, replyTo=None):
""" create message with :body for :chat_id replying to :replyTo optionally """ """ create message with :body for :chat_id replying to :replyTo optionally """
user = info.context["request"].user user = info.context["request"].user
chat = await redis.execute("GET", f"chats/{chat_id}") chat = await redis.execute("GET", f"chats/{chat}")
if not chat: if not chat:
return { return {
"error": "chat not exist" "error": "chat not exist"
} }
message_id = await redis.execute("GET", f"chats/{chat_id}/next_message_id") else:
message_id = int(message_id) chat = dict(json.loads(chat))
new_message = { message_id = await redis.execute("GET", f"chats/{chat['id']}/next_message_id")
"chatId": chat_id, message_id = int(message_id)
"id": message_id, new_message = {
"author": user.slug, "chatId": chat['id'],
"body": body, "id": message_id,
"replyTo": replyTo, "author": user.slug,
"createdAt": int(datetime.now().timestamp()), "body": body,
} "replyTo": replyTo,
await redis.execute( "createdAt": int(datetime.now().timestamp()),
"SET", f"chats/{chat_id}/messages/{message_id}", json.dumps(new_message) }
)
await redis.execute("LPUSH", f"chats/{chat_id}/message_ids", str(message_id))
await redis.execute("SET", f"chats/{chat_id}/next_message_id", str(message_id + 1))
chat = json.loads(chat)
users = chat["users"]
for user_slug in users:
await redis.execute( await redis.execute(
"LPUSH", f"chats/{chat_id}/unread/{user_slug}", str(message_id) "SET", f"chats/{chat['id']}/messages/{message_id}", json.dumps(new_message)
) )
await redis.execute("LPUSH", f"chats/{chat['id']}/message_ids", str(message_id))
await redis.execute("SET", f"chats/{chat['id']}/next_message_id", str(message_id + 1))
result = MessageResult("NEW", new_message) chat = json.loads(chat)
await MessagesStorage.put(result) users = chat["users"]
for user_slug in users:
await redis.execute(
"LPUSH", f"chats/{chat['id']}/unread/{user_slug}", str(message_id)
)
return { result = MessageResult("NEW", new_message)
"message": new_message, await MessagesStorage.put(result)
"error": None
} return {
"message": new_message,
"error": None
}
@mutation.field("updateMessage") @mutation.field("updateMessage")
@ -174,6 +159,7 @@ async def message_generator(obj, info):
while True: while True:
msg = await asyncio.gather(*tasks) msg = await asyncio.gather(*tasks)
print('[inbox] %d new messages' % len(tasks))
yield msg yield msg
finally: finally:
await MessagesStorage.remove_chat(following_chat) await MessagesStorage.remove_chat(following_chat)

View File

@ -9,13 +9,13 @@ from orm.user import AuthorFollower
@query.field("searchUsers") @query.field("searchUsers")
@login_required @login_required
async def search_users(_, info, query: str, offset: int = 0, amount: int = 50): async def search_users(_, info, query: str, limit: int = 50, offset: int = 0):
result = [] result = []
# TODO: maybe redis scan? # TODO: maybe redis scan?
user = info.context["request"].user user = info.context["request"].user
talk_before = await redis.execute("GET", f"/chats_by_user/{user.slug}") talk_before = await redis.execute("GET", f"/chats_by_user/{user.slug}")
if talk_before: if talk_before:
talk_before = list(json.loads(talk_before))[offset:offset + amount] talk_before = list(json.loads(talk_before))[offset:offset + limit]
for chat_id in talk_before: for chat_id in talk_before:
members = await redis.execute("GET", f"/chats/{chat_id}/users") members = await redis.execute("GET", f"/chats/{chat_id}/users")
if members: if members:
@ -26,54 +26,18 @@ async def search_users(_, info, query: str, offset: int = 0, amount: int = 50):
result.append(member) result.append(member)
user = info.context["request"].user user = info.context["request"].user
more_amount = amount - len(result) more_amount = limit - len(result)
with local_session() as session: with local_session() as session:
# followings # followings
result += session.query(AuthorFollower.author).where(AuthorFollower.follower.startswith(query))\ result += session.query(AuthorFollower.author).where(AuthorFollower.follower.startswith(query))\
.offset(offset + len(result)).limit(more_amount) .offset(offset + len(result)).limit(more_amount)
more_amount = amount more_amount = limit
# followers # followers
result += session.query(AuthorFollower.follower).where(AuthorFollower.author.startswith(query))\ result += session.query(AuthorFollower.follower).where(AuthorFollower.author.startswith(query))\
.offset(offset + len(result)).limit(offset + len(result) + amount) .offset(offset + len(result)).limit(offset + len(result) + limit)
return { return {
"slugs": list(result), "slugs": list(result),
"error": None "error": None
} }
@query.field("searchChats")
@login_required
async def search_chats(_, info, query: str, offset: int = 0, amount: int = 50):
user = info.context["request"].user
my_chats = await redis.execute("GET", f"/chats_by_user/{user.slug}")
chats = []
for chat_id in my_chats:
chat = await redis.execute("GET", f"chats/{chat_id}")
if chat:
chat = dict(json.loads(chat))
chats.append(chat)
return {
"chats": chats,
"error": None
}
@query.field("searchMessages")
@login_required
async def search_messages(_, info, query: str, offset: int = 0, amount: int = 50):
user = info.context["request"].user
my_chats = await redis.execute("GET", f"/chats_by_user/{user.slug}")
chats = []
if my_chats:
my_chats = list(json.loads(my_chats))
for chat_id in my_chats:
chat = await redis.execute("GET", f"chats/{chat_id}")
if chat:
chat = dict(json.loads(chat))
chats.append(chat)
return {
"chats": chats,
"error": None
}

View File

@ -1,20 +1,20 @@
from typing import List from typing import List
from datetime import datetime, timedelta
from sqlalchemy import and_, desc, func from sqlalchemy import and_, func
from sqlalchemy.orm import selectinload from sqlalchemy.orm import selectinload
from auth.authenticate import login_required from auth.authenticate import login_required
from base.orm import local_session from base.orm import local_session
from base.resolvers import mutation, query from base.resolvers import mutation, query
from orm.reaction import Reaction from orm.reaction import Reaction
from orm.shout import Shout
from orm.topic import Topic, TopicFollower from orm.topic import Topic, TopicFollower
from orm.user import AuthorFollower, Role, User, UserRating, UserRole from orm.user import AuthorFollower, Role, User, UserRating, UserRole
from services.auth.users import UserStorage from services.auth.users import UserStorage
from services.stat.reacted import ReactedStorage from services.stat.reacted import ReactedStorage
from services.zine.shoutscache import ShoutsCache from services.stat.topicstat import TopicStat
from services.zine.shoutauthor import ShoutAuthorStorage
from .community import followed_communities # from .community import followed_communities
from .inbox.load import get_total_unread_counter from .inbox.load import get_total_unread_counter
from .topics import get_topic_stat from .topics import get_topic_stat
@ -25,7 +25,7 @@ async def user_subscriptions(slug: str):
"topics": [t.slug for t in await followed_topics(slug)], # followed topics slugs "topics": [t.slug for t in await followed_topics(slug)], # followed topics slugs
"authors": [a.slug for a in await followed_authors(slug)], # followed authors slugs "authors": [a.slug for a in await followed_authors(slug)], # followed authors slugs
"reactions": await ReactedStorage.get_shouts_by_author(slug), "reactions": await ReactedStorage.get_shouts_by_author(slug),
"communities": [c.slug for c in followed_communities(slug)], # communities # "communities": [c.slug for c in followed_communities(slug)], # communities
} }
@ -46,24 +46,6 @@ async def get_author_stat(slug):
} }
@query.field("userReactedShouts")
async def get_user_reacted_shouts(_, slug: str, offset: int, limit: int) -> List[Shout]:
user = await UserStorage.get_user_by_slug(slug)
if not user:
return []
with local_session() as session:
shouts = (
session.query(Shout)
.join(Reaction)
.where(Reaction.createdBy == user.slug)
.order_by(desc(Reaction.createdAt))
.limit(limit)
.offset(offset)
.all()
)
return shouts
@query.field("userFollowedTopics") @query.field("userFollowedTopics")
@login_required @login_required
async def get_followed_topics(_, info, slug) -> List[Topic]: async def get_followed_topics(_, info, slug) -> List[Topic]:
@ -115,20 +97,7 @@ async def user_followers(_, _info, slug) -> List[User]:
return users return users
@query.field("getUsersBySlugs") async def get_user_roles(slug):
async def get_users_by_slugs(_, _info, slugs):
with local_session() as session:
users = (
session.query(User)
.options(selectinload(User.ratings))
.filter(User.slug in slugs)
.all()
)
return users
@query.field("getUserRoles")
async def get_user_roles(_, _info, slug):
with local_session() as session: with local_session() as session:
user = session.query(User).where(User.slug == slug).first() user = session.query(User).where(User.slug == slug).first()
roles = ( roles = (
@ -206,22 +175,41 @@ def author_unfollow(user, slug):
@query.field("authorsAll") @query.field("authorsAll")
async def get_authors_all(_, _info): async def get_authors_all(_, _info):
users = await UserStorage.get_all_users() users = await UserStorage.get_all_users()
authorslugs = await ShoutsCache.get_all_authors_slugs()
authors = [] authors = []
for author in users: for author in users:
if author.slug in authorslugs: if ShoutAuthorStorage.shouts_by_author.get(author.slug):
author.stat = await get_author_stat(author.slug) author.stat = await get_author_stat(author.slug)
authors.append(author) authors.append(author)
return authors return authors
@query.field("topAuthors") @query.field("loadAuthorsBy")
def get_top_authors(_, _info, offset, limit): async def load_authors_by(_, info, by, limit, offset):
return list(UserStorage.get_top_users())[offset : offset + limit] # type: ignore authors = []
with local_session() as session:
aq = session.query(User)
@query.field("getAuthor") if by.get("slug"):
async def get_author(_, _info, slug): aq = aq.filter(User.slug.ilike(f"%{by['slug']}%"))
a = await UserStorage.get_user_by_slug(slug) elif by.get("name"):
a.stat = await get_author_stat(slug) aq = aq.filter(User.name.ilike(f"%{by['name']}%"))
return a elif by.get("topic"):
aaa = list(map(lambda a: a.slug, TopicStat.authors_by_topic.get(by["topic"])))
aq = aq.filter(User.name._in(aaa))
if by.get("lastSeen"): # in days
days_before = datetime.now() - timedelta(days=by["lastSeen"])
aq = aq.filter(User.lastSeen > days_before)
elif by.get("createdAt"): # in days
days_before = datetime.now() - timedelta(days=by["createdAt"])
aq = aq.filter(User.createdAt > days_before)
aq = aq.group_by(
User.id
).order_by(
by.get("order") or "createdAt"
).limit(limit).offset(offset)
print(aq)
authors = list(map(lambda r: r.User, session.execute(aq)))
if by.get("stat"):
for a in authors:
a.stat = await get_author_stat(a.slug)
authors = list(set(authors)).sort(authors, key=lambda a: a["stat"].get(by.get("stat")))
return authors

View File

@ -1,6 +1,7 @@
from datetime import datetime from datetime import datetime, timedelta
from sqlalchemy import and_, desc from sqlalchemy import and_, desc, select, text, func
from sqlalchemy.orm import selectinload
from auth.authenticate import login_required from auth.authenticate import login_required
from base.orm import local_session from base.orm import local_session
@ -8,14 +9,12 @@ from base.resolvers import mutation, query
from orm.reaction import Reaction, ReactionKind from orm.reaction import Reaction, ReactionKind
from orm.shout import Shout, ShoutReactionsFollower from orm.shout import Shout, ShoutReactionsFollower
from orm.user import User from orm.user import User
from services.auth.users import UserStorage
from services.stat.reacted import ReactedStorage from services.stat.reacted import ReactedStorage
from services.stat.viewed import ViewedStorage
async def get_reaction_stat(reaction_id): async def get_reaction_stat(reaction_id):
return { return {
"viewed": await ViewedStorage.get_reaction(reaction_id), # "viewed": await ViewStat.get_reaction(reaction_id),
"reacted": len(await ReactedStorage.get_reaction(reaction_id)), "reacted": len(await ReactedStorage.get_reaction(reaction_id)),
"rating": await ReactedStorage.get_reaction_rating(reaction_id), "rating": await ReactedStorage.get_reaction_rating(reaction_id),
"commented": len(await ReactedStorage.get_reaction_comments(reaction_id)), "commented": len(await ReactedStorage.get_reaction_comments(reaction_id)),
@ -117,14 +116,14 @@ def set_published(session, slug, publisher):
s = session.query(Shout).where(Shout.slug == slug).first() s = session.query(Shout).where(Shout.slug == slug).first()
s.publishedAt = datetime.now() s.publishedAt = datetime.now()
s.publishedBy = publisher s.publishedBy = publisher
s.visibility = 'public' s.visibility = text('public')
session.add(s) session.add(s)
session.commit() session.commit()
def set_hidden(session, slug): def set_hidden(session, slug):
s = session.query(Shout).where(Shout.slug == slug).first() s = session.query(Shout).where(Shout.slug == slug).first()
s.visibility = 'authors' s.visibility = text('authors')
s.publishedAt = None # TODO: discuss s.publishedAt = None # TODO: discuss
s.publishedBy = None # TODO: store changes history in git s.publishedBy = None # TODO: store changes history in git
session.add(s) session.add(s)
@ -202,57 +201,58 @@ async def delete_reaction(_, info, rid):
return {} return {}
@query.field("reactionsForShouts") @query.field("loadReactionsBy")
async def get_reactions_for_shouts(_, info, shouts, offset, limit): async def load_reactions_by(_, info, by, limit=50, offset=0):
return await reactions_for_shouts(shouts, offset, limit) """
:param by: {
shout: 'some-slug'
author: 'discours',
topic: 'culture',
body: 'something else',
stat: 'rating' | 'comments' | 'reacted' | 'views',
days: 30
}
:param limit: int amount of shouts
:param offset: int offset in this order
:return: Reaction[]
"""
q = select(Reaction).options(
selectinload(Reaction.shout),
).where(
Reaction.deletedAt.is_(None)
).join(
Shout,
Shout.slug == Reaction.shout
)
if by.get("slug"):
q = q.filter(Shout.slug == by["slug"])
else:
if by.get("reacted"):
user = info.context["request"].user
q = q.filter(Reaction.createdBy == user.slug)
if by.get("author"):
q = q.filter(Reaction.createdBy == by["author"])
if by.get("topic"):
q = q.filter(Shout.topics.contains(by["topic"]))
if by.get("body"):
if by["body"] is True:
q = q.filter(func.length(Reaction.body) > 0)
else:
q = q.filter(Reaction.body.ilike(f'%{by["body"]}%'))
if by.get("days"):
before = datetime.now() - timedelta(days=int(by["days"]) or 30)
q = q.filter(Reaction.createdAt > before)
q = q.group_by(Shout.id).order_by(
desc(by.get("order") or "createdAt")
).limit(limit).offset(offset)
async def reactions_for_shouts(shouts, offset, limit): rrr = []
reactions = []
with local_session() as session: with local_session() as session:
for slug in shouts: # post query stats and author's captions
reactions += ( for r in list(map(lambda r: r.Reaction, session.execute(q))):
session.query(Reaction) r.stat = await get_reaction_stat(r.id)
.filter(Reaction.shout == slug) rrr.append(r)
.where(Reaction.deletedAt.is_not(None)) if by.get("stat"):
.order_by(desc("createdAt")) rrr.sort(lambda r: r.stat.get(by["stat"]) or r.createdAt)
.offset(offset) return rrr
.limit(limit)
.all()
)
for r in reactions:
r.stat = await get_reaction_stat(r.id)
r.createdBy = await UserStorage.get_user(r.createdBy or "discours")
return reactions
reactions = []
with local_session() as session:
for slug in shouts:
reactions += (
session.query(Reaction)
.filter(Reaction.shout == slug)
.where(Reaction.deletedAt.is_not(None))
.order_by(desc("createdAt"))
.offset(offset)
.limit(limit)
.all()
)
for r in reactions:
r.stat = await get_reaction_stat(r.id)
r.createdBy = await UserStorage.get_user(r.createdBy or "discours")
return reactions
@query.field("reactionsByAuthor")
async def get_reactions_by_author(_, info, slug, limit=50, offset=0):
reactions = []
with local_session() as session:
reactions = (
session.query(Reaction)
.where(Reaction.createdBy == slug)
.limit(limit)
.offset(offset)
)
for r in reactions:
r.stat = await get_reaction_stat(r.id)
r.createdBy = await UserStorage.get_user(r.createdBy or "discours")
return reactions

View File

@ -6,11 +6,10 @@ from auth.authenticate import login_required
from base.orm import local_session from base.orm import local_session
from base.resolvers import mutation, query from base.resolvers import mutation, query
from orm.topic import Topic, TopicFollower from orm.topic import Topic, TopicFollower
from services.zine.shoutscache import ShoutsCache
from services.zine.topics import TopicStorage from services.zine.topics import TopicStorage
from services.stat.reacted import ReactedStorage from services.stat.reacted import ReactedStorage
from services.stat.topicstat import TopicStat from services.stat.topicstat import TopicStat
from services.stat.viewed import ViewedStorage from services.stat.views import ViewStat
async def get_topic_stat(slug): async def get_topic_stat(slug):
@ -18,7 +17,7 @@ async def get_topic_stat(slug):
"shouts": len(TopicStat.shouts_by_topic.get(slug, {}).keys()), "shouts": len(TopicStat.shouts_by_topic.get(slug, {}).keys()),
"authors": len(TopicStat.authors_by_topic.get(slug, {}).keys()), "authors": len(TopicStat.authors_by_topic.get(slug, {}).keys()),
"followers": len(TopicStat.followers_by_topic.get(slug, {}).keys()), "followers": len(TopicStat.followers_by_topic.get(slug, {}).keys()),
"viewed": await ViewedStorage.get_topic(slug), "viewed": await ViewStat.get_topic(slug),
"reacted": len(await ReactedStorage.get_topic(slug)), "reacted": len(await ReactedStorage.get_topic(slug)),
"commented": len(await ReactedStorage.get_topic_comments(slug)), "commented": len(await ReactedStorage.get_topic_comments(slug)),
"rating": await ReactedStorage.get_topic_rating(slug) "rating": await ReactedStorage.get_topic_rating(slug)
@ -43,7 +42,7 @@ async def topics_by_community(_, info, community):
@query.field("topicsByAuthor") @query.field("topicsByAuthor")
async def topics_by_author(_, _info, author): async def topics_by_author(_, _info, author):
shouts = ShoutsCache.by_author.get(author, []) shouts = TopicStorage.get_topics_by_author(author)
author_topics = set() author_topics = set()
for s in shouts: for s in shouts:
for tpc in s.topics: for tpc in s.topics:

View File

@ -1,249 +1,87 @@
from graphql.type import GraphQLResolveInfo
from datetime import datetime, timedelta from datetime import datetime, timedelta
from sqlalchemy.orm import selectinload from sqlalchemy.orm import selectinload
from sqlalchemy.sql.expression import and_, desc, select from sqlalchemy.sql.expression import or_, desc, select
from auth.authenticate import login_required from auth.authenticate import login_required
from base.orm import local_session from base.orm import local_session
from base.resolvers import mutation, query from base.resolvers import mutation, query
from orm.collection import ShoutCollection from orm.shout import Shout
from orm.shout import Shout, ShoutTopic from orm.reaction import Reaction
from orm.topic import Topic # from resolvers.community import community_follow, community_unfollow
from resolvers.community import community_follow, community_unfollow
from resolvers.profile import author_follow, author_unfollow from resolvers.profile import author_follow, author_unfollow
from resolvers.reactions import reactions_follow, reactions_unfollow from resolvers.reactions import reactions_follow, reactions_unfollow
from resolvers.topics import topic_follow, topic_unfollow from resolvers.topics import topic_follow, topic_unfollow
from services.search import SearchService
from services.stat.viewed import ViewedStorage
from services.zine.shoutauthor import ShoutAuthorStorage from services.zine.shoutauthor import ShoutAuthorStorage
from services.zine.shoutscache import ShoutsCache, get_shout_stat from services.stat.reacted import ReactedStorage
@mutation.field("incrementView") @query.field("loadShoutsBy")
async def increment_view(_, _info, shout): async def load_shouts_by(_, info, by, limit=50, offset=0):
# TODO: use ackee to collect views """
async with ViewedStorage.lock: :param by: {
return ViewedStorage.increment(shout) layout: 'audio',
visibility: "public",
author: 'discours',
topic: 'culture',
title: 'something',
body: 'something else',
stat: 'rating' | 'comments' | 'reacted' | 'views',
days: 30
}
:param limit: int amount of shouts
:param offset: int offset in this order
:return: Shout[]
"""
q = select(Shout, Reaction).options(
@query.field("topMonth") selectinload(Shout.authors),
async def top_month(_, _info, offset, limit): selectinload(Shout.topics),
async with ShoutsCache.lock: selectinload(Shout.reactions)
return ShoutsCache.top_month[offset : offset + limit] ).where(
Shout.deletedAt.is_(None)
).join(
@query.field("topPublished") Reaction, Reaction.shout == Shout.slug
async def top_published(_, _info, daysago, offset, limit): )
async with ShoutsCache.lock: if by.get("slug"):
return ShoutsCache.get_top_published_before(daysago, offset, limit) q = q.filter(Shout.slug == by["slug"])
@query.field("topCommented")
async def top_commented(_, _info, offset, limit):
async with ShoutsCache.lock:
return ShoutsCache.top_commented[offset : offset + limit]
@query.field("topOverall")
async def top_overall(_, _info, offset, limit):
async with ShoutsCache.lock:
return ShoutsCache.top_overall[offset : offset + limit]
@query.field("recentPublished")
async def recent_published(_, _info, offset, limit):
async with ShoutsCache.lock:
return ShoutsCache.recent_published[offset : offset + limit]
@query.field("recentAll")
async def recent_all(_, _info, offset, limit):
async with ShoutsCache.lock:
return ShoutsCache.recent_all[offset : offset + limit]
@query.field("recentReacted")
async def recent_reacted(_, _info, offset, limit):
async with ShoutsCache.lock:
return ShoutsCache.recent_reacted[offset : offset + limit]
@query.field("recentCommented")
async def recent_commented(_, _info, offset, limit):
async with ShoutsCache.lock:
return ShoutsCache.recent_commented[offset : offset + limit]
@query.field("getShoutBySlug")
async def get_shout_by_slug(_, info, slug):
all_fields = [
node.name.value for node in info.field_nodes[0].selection_set.selections
]
selected_fields = set(["authors", "topics"]).intersection(all_fields)
select_options = [selectinload(getattr(Shout, field)) for field in selected_fields]
with local_session() as session:
# s = text(open("src/queries/shout-by-slug.sql", "r").read() % slug)
shout = (
session.query(Shout)
.options(select_options)
.filter(Shout.slug == slug)
.first()
)
if not shout:
print(f"shout with slug {slug} not exist")
return {"error": "shout not found"}
else:
for a in shout.authors:
a.caption = await ShoutAuthorStorage.get_author_caption(slug, a.slug)
return shout
@query.field("searchQuery")
async def get_search_results(_, _info, searchtext, offset, limit):
shouts = SearchService.search(searchtext)
# TODO: sort and filter types for search service
for s in shouts:
shout = s.dict()
for a in shout['authors']:
a.caption = await ShoutAuthorStorage.get_author_caption(s.slug, a.slug)
s.stat.relevance = 1 # FIXME: expecting search engine rated relevance
return shouts[offset : offset + limit]
@query.field("shoutsByAuthors")
async def shouts_by_authors(_, _info, slugs, offset=0, limit=100):
async with ShoutsCache.lock:
shouts = {}
for author in slugs:
shouts_by_author = list(ShoutsCache.by_author.get(author, {}).values())
for s in shouts_by_author:
for a in s.authors:
a.caption = await ShoutAuthorStorage.get_author_caption(s.slug, a.slug)
if bool(s.publishedAt):
shouts[s.slug] = s
shouts_prepared = list(shouts.values())
shouts_prepared.sort(key=lambda s: s.publishedAt, reverse=True)
return shouts_prepared[offset : offset + limit]
@query.field("recentLayoutShouts")
async def shouts_by_layout_recent(_param, _info: GraphQLResolveInfo, layout, amount=100, offset=0):
async with ShoutsCache.lock:
shouts = {}
# for layout in ['image', 'audio', 'video', 'literature']:
shouts_by_layout = list(ShoutsCache.by_layout.get(layout, []))
for s in shouts_by_layout:
if s.visibility == 'public': # if bool(s.publishedAt):
shouts[s.slug] = s
for a in s.authors:
a.caption = await ShoutAuthorStorage.get_author_caption(s.slug, a.slug)
shouts_prepared = list(shouts.values())
shouts_prepared.sort(key=lambda s: s.createdAt, reverse=True)
return shouts_prepared[offset : offset + amount]
@query.field("topLayoutShouts")
async def shouts_by_layout_top(_param, _info: GraphQLResolveInfo, layout, amount=100, offset=0):
async with ShoutsCache.lock:
shouts = {}
# for layout in ['image', 'audio', 'video', 'literature']:
shouts_by_layout = list(ShoutsCache.by_layout.get(layout, []))
for s in shouts_by_layout:
if s.visibility == 'public': # if bool(s.publishedAt):
shouts[s.slug] = s
for a in s.authors:
a.caption = await ShoutAuthorStorage.get_author_caption(s.slug, a.slug)
s.stat = await get_shout_stat(s.slug)
shouts_prepared = list(shouts.values())
shouts_prepared.sort(key=lambda s: s.stat["rating"], reverse=True)
return shouts_prepared[offset : offset + amount]
@query.field("topMonthLayoutShouts")
async def shouts_by_layout_topmonth(_param, _info: GraphQLResolveInfo, layout, amount=100, offset=0):
async with ShoutsCache.lock:
shouts = {}
# for layout in ['image', 'audio', 'video', 'literature']:
shouts_by_layout = list(ShoutsCache.by_layout.get(layout, []))
month_ago = datetime.now() - timedelta(days=30)
for s in shouts_by_layout:
if s.visibility == 'public' and s.createdAt > month_ago:
shouts[s.slug] = s
for a in s.authors:
a.caption = await ShoutAuthorStorage.get_author_caption(s.slug, a.slug)
shouts_prepared = list(shouts.values())
shouts_prepared.sort(key=lambda s: s.stat["rating"], reverse=True)
return shouts_prepared[offset : offset + amount]
@query.field("shoutsByTopics")
async def shouts_by_topics(_, _info, slugs, offset=0, limit=100):
async with ShoutsCache.lock:
shouts = {}
for topic in slugs:
shouts_by_topic = list(ShoutsCache.by_topic.get(topic, {}).values())
for s in shouts_by_topic:
for a in s.authors:
a.caption = await ShoutAuthorStorage.get_author_caption(s.slug, a.slug)
if bool(s.publishedAt):
shouts[s.slug] = s
shouts_prepared = list(shouts.values())
shouts_prepared.sort(key=lambda s: s.publishedAt, reverse=True)
return shouts_prepared[offset : offset + limit]
@query.field("shoutsByCollection")
async def shouts_by_collection(_, _info, collection, offset, limit):
with local_session() as session:
shouts = (
session.query(Shout)
.join(ShoutCollection, ShoutCollection.collection == collection)
.where(and_(ShoutCollection.shout == Shout.slug, Shout.publishedAt.is_not(None)))
.order_by(desc("publishedAt"))
.limit(limit)
.offset(offset)
)
for s in shouts:
for a in s.authors:
a.caption = await ShoutAuthorStorage.get_author_caption(s.slug, a.slug)
return shouts
SINGLE_COMMUNITY = True
@query.field("shoutsByCommunities")
async def shouts_by_communities(_, info, slugs, offset, limit):
if SINGLE_COMMUNITY:
return recent_published(_, info, offset, limit)
else: else:
with local_session() as session: if by.get("reacted"):
# TODO fix postgres high load user = info.context["request"].user
shouts = ( q = q.filter(Reaction.createdBy == user.slug)
session.query(Shout) if by.get("visibility"):
.distinct() q = q.filter(or_(
.join(ShoutTopic) Shout.visibility.ilike(f"%{by.get('visibility')}%"),
.where( Shout.visibility.ilike(f"%{'public'}%"),
and_( ))
Shout.publishedAt.is_not(None), if by.get("layout"):
ShoutTopic.topic.in_( q = q.filter(Shout.layout == by["layout"])
select(Topic.slug).where(Topic.community.in_(slugs)) if by.get("author"):
), q = q.filter(Shout.authors.contains(by["author"]))
) if by.get("topic"):
) q = q.filter(Shout.topics.contains(by["topic"]))
.order_by(desc("publishedAt")) if by.get("title"):
.limit(limit) q = q.filter(Shout.title.ilike(f'%{by["title"]}%'))
.offset(offset) if by.get("body"):
) q = q.filter(Shout.body.ilike(f'%{by["body"]}%'))
if by.get("days"):
for s in shouts: before = datetime.now() - timedelta(days=int(by["days"]) or 30)
q = q.filter(Shout.createdAt > before)
q = q.group_by(Shout.id, Reaction.id).order_by(
desc(by.get("order") or "createdAt")
).limit(limit).offset(offset)
print(q)
shouts = []
with local_session() as session:
# post query stats and author's captions
for s in list(map(lambda r: r.Shout, session.execute(q))):
s.stat = await ReactedStorage.get_shout_stat(s.slug)
for a in s.authors: for a in s.authors:
a.caption = await ShoutAuthorStorage.get_author_caption(s.slug, a.slug) a.caption = await ShoutAuthorStorage.get_author_caption(s.slug, a.slug)
return shouts shouts.append(s)
if by.get("stat"):
shouts.sort(lambda s: s.stat.get(by["stat"]) or s.createdAt)
return shouts
@mutation.field("follow") @mutation.field("follow")
@ -256,7 +94,8 @@ async def follow(_, info, what, slug):
elif what == "TOPIC": elif what == "TOPIC":
topic_follow(user, slug) topic_follow(user, slug)
elif what == "COMMUNITY": elif what == "COMMUNITY":
community_follow(user, slug) # community_follow(user, slug)
pass
elif what == "REACTIONS": elif what == "REACTIONS":
reactions_follow(user, slug) reactions_follow(user, slug)
except Exception as e: except Exception as e:
@ -276,7 +115,8 @@ async def unfollow(_, info, what, slug):
elif what == "TOPIC": elif what == "TOPIC":
topic_unfollow(user, slug) topic_unfollow(user, slug)
elif what == "COMMUNITY": elif what == "COMMUNITY":
community_unfollow(user, slug) # community_unfollow(user, slug)
pass
elif what == "REACTIONS": elif what == "REACTIONS":
reactions_unfollow(user, slug) reactions_unfollow(user, slug)
except Exception as e: except Exception as e:

View File

@ -110,18 +110,6 @@ input ProfileInput {
bio: String bio: String
} }
input CommunityInput {
title: String!
desc: String
pic: String
}
input CollectionInput {
title: String!
desc: String
pic: String
}
input TopicInput { input TopicInput {
slug: String! slug: String!
community: String! community: String!
@ -161,8 +149,8 @@ type Mutation {
updateChat(chat: ChatInput!): Result! updateChat(chat: ChatInput!): Result!
deleteChat(chatId: String!): Result! deleteChat(chatId: String!): Result!
inviteChat(chatId: String!, userslug: String!): Result! inviteChat(chatId: String!, userslug: String!): Result!
enterChat(chatId: String!): Result!
createMessage(chatId: String!, body: String!, replyTo: String): Result! createMessage(chat: String!, body: String!, replyTo: String): Result!
updateMessage(chatId: String!, id: Int!, body: String!): Result! updateMessage(chatId: String!, id: Int!, body: String!): Result!
deleteMessage(chatId: String!, id: Int!): Result! deleteMessage(chatId: String!, id: Int!): Result!
markAsRead(chatId: String!, ids: [Int]!): Result! markAsRead(chatId: String!, ids: [Int]!): Result!
@ -180,7 +168,7 @@ type Mutation {
# user profile # user profile
rateUser(slug: String!, value: Int!): Result! rateUser(slug: String!, value: Int!): Result!
# updateOnlineStatus: Result! updateOnlineStatus: Result!
updateProfile(profile: ProfileInput!): Result! updateProfile(profile: ProfileInput!): Result!
# topics # topics
@ -189,22 +177,11 @@ type Mutation {
updateTopic(input: TopicInput!): Result! updateTopic(input: TopicInput!): Result!
destroyTopic(slug: String!): Result! destroyTopic(slug: String!): Result!
# reactions # reactions
createReaction(reaction: ReactionInput!): Result! createReaction(reaction: ReactionInput!): Result!
updateReaction(reaction: ReactionInput!): Result! updateReaction(reaction: ReactionInput!): Result!
deleteReaction(id: Int!): Result! deleteReaction(id: Int!): Result!
# community
createCommunity(community: CommunityInput!): Result!
updateCommunity(community: CommunityInput!): Result!
deleteCommunity(slug: String!): Result!
# collection
createCollection(collection: CollectionInput!): Result!
updateCollection(collection: CollectionInput!): Result!
deleteCollection(slug: String!): Result!
# collab # collab
inviteAuthor(author: String!, shout: String!): Result! inviteAuthor(author: String!, shout: String!): Result!
removeAuthor(author: String!, shout: String!): Result! removeAuthor(author: String!, shout: String!): Result!
@ -212,65 +189,77 @@ type Mutation {
# following # following
follow(what: FollowingEntity!, slug: String!): Result! follow(what: FollowingEntity!, slug: String!): Result!
unfollow(what: FollowingEntity!, slug: String!): Result! unfollow(what: FollowingEntity!, slug: String!): Result!
# seen
incrementView(shout: String!): Result!
} }
input MessagesBy {
author: String
body: String
chat: String
order: String
days: Int
stat: String
}
input AuthorsBy {
lastSeen: DateTime
createdAt: DateTime
slug: String
name: String
topic: String
order: String
days: Int
stat: String
}
input ShoutsBy {
slug: String
title: String
body: String
topic: String
topics: [String]
author: String
authors: [String]
layout: String
visibility: String
order: String
days: Int
stat: String
}
input ReactionBy {
shout: String
shouts: [String]
body: String
topic: String
author: String
order: String
days: Int
stat: String
}
################################### Query ################################### Query
type Query { type Query {
# inbox # inbox
loadChats(offset: Int, amount: Int): Result! loadChats( limit: Int, offset: Int): Result! # your chats
loadMessages(chatId: String!, offset: Int, amount: Int): Result! loadMessagesBy(by: MessagesBy!, limit: Int, offset: Int): Result!
searchUsers(q: String!, offset: Int, amount: Int): Result! searchUsers(query: String!, limit: Int, offset: Int): Result!
searchChats(q: String!, offset: Int, amount: Int): Result! chatUsersAll: [ChatUser]!
searchMessages(q: String!, offset: Int, amount: Int): Result!
# auth # auth
isEmailUsed(email: String!): Boolean! isEmailUsed(email: String!): Boolean!
signIn(email: String!, password: String, lang: String): AuthResult! signIn(email: String!, password: String, lang: String): AuthResult!
signOut: AuthResult! signOut: AuthResult!
# profile # zine
getUsersBySlugs(slugs: [String]!): [Author]! loadAuthorsBy(by: AuthorsBy, limit: Int, offset: Int): [Author]!
loadShoutsBy(by: ShoutsBy, limit: Int, offset: Int): [Shout]!
loadReactionsBy(by: ReactionBy!, limit: Int, offset: Int): [Reaction]!
userFollowers(slug: String!): [Author]! userFollowers(slug: String!): [Author]!
userFollowedAuthors(slug: String!): [Author]! userFollowedAuthors(slug: String!): [Author]!
userFollowedTopics(slug: String!): [Topic]! userFollowedTopics(slug: String!): [Topic]!
userFollowedCommunities(slug: String!): [Community]!
userReactedShouts(slug: String!): [Shout]! # test
getUserRoles(slug: String!): [Role]!
authorsAll: [Author]! authorsAll: [Author]!
getAuthor(slug: String!): User! getAuthor(slug: String!): User!
# shouts
getShoutBySlug(slug: String!): Shout!
shoutsForFeed(offset: Int!, limit: Int!): [Shout]! # test
shoutsByLayout(layout: String, amount: Int!, offset: Int!): [Shout]!
shoutsByTopics(slugs: [String]!, offset: Int!, limit: Int!): [Shout]!
shoutsByAuthors(slugs: [String]!, offset: Int!, limit: Int!): [Shout]!
shoutsByCommunities(slugs: [String]!, offset: Int!, limit: Int!): [Shout]!
# topReacted(offset: Int!, limit: Int!): [Shout]!
topAuthors(offset: Int!, limit: Int!): [Author]! # by User.rating
topPublished(daysago: Int!, offset: Int!, limit: Int!): [Shout]!
topMonth(offset: Int!, limit: Int!): [Shout]! # TODO: implement topPublishedAfter(day, offset, limit)
topOverall(offset: Int!, limit: Int!): [Shout]!
topCommented(offset: Int!, limit: Int!): [Shout]!
recentPublished(offset: Int!, limit: Int!): [Shout]! # homepage
recentReacted(offset: Int!, limit: Int!): [Shout]! # TODO: use in design!
recentCommented(offset: Int!, limit: Int!): [Shout]!
recentAll(offset: Int!, limit: Int!): [Shout]!
recentCandidates(offset: Int!, limit: Int!): [Shout]!
# expo
topMonthLayoutShouts(layout: String!, amount: Int, offset: Int): [Shout]!
topLayoutShouts(layout: String!, amount: Int, offset: Int): [Shout]!
recentLayoutShouts(layout: String!, amount: Int, offset: Int): [Shout]!
# reactons
reactionsByAuthor(slug: String!, offset: Int!, limit: Int!): [Reaction]!
reactionsForShouts(shouts: [String]!, offset: Int!, limit: Int!): [Reaction]!
# collab # collab
getCollabs: [Collab]! getCollabs: [Collab]!
@ -283,18 +272,6 @@ type Query {
topicsRandom(amount: Int): [Topic]! topicsRandom(amount: Int): [Topic]!
topicsByCommunity(community: String!): [Topic]! topicsByCommunity(community: String!): [Topic]!
topicsByAuthor(author: String!): [Topic]! topicsByAuthor(author: String!): [Topic]!
# collections
collectionsAll: [Collection]!
getUserCollections(author: String!): [Collection]!
shoutsByCollection(collection: String!, offset: Int!, limit: Int!): [Shout]!
# communities
getCommunity(slug: String): Community!
getCommunities: [Community]! # all
# search
searchQuery(q: String, offset: Int!, limit: Int!): [Shout]
} }
############################################ Subscription ############################################ Subscription
@ -372,6 +349,14 @@ type User {
oid: String oid: String
} }
type ChatUser {
id: Int!
slug: String!
name: String!
userpic: String
lastSeen: DateTime
}
type Collab { type Collab {
authors: [String]! authors: [String]!
invites: [String] invites: [String]
@ -440,6 +425,7 @@ type Shout {
deletedBy: User deletedBy: User
publishedBy: User publishedBy: User
publishedAt: DateTime publishedAt: DateTime
media: String
stat: Stat stat: Stat
} }

View File

@ -1,7 +1,6 @@
import asyncio import asyncio
from sqlalchemy.orm import selectinload from sqlalchemy.orm import selectinload
from base.orm import local_session
from orm.user import User from orm.user import User
@ -34,6 +33,11 @@ class UserStorage:
aaa.sort(key=lambda user: user.createdAt) aaa.sort(key=lambda user: user.createdAt)
return aaa return aaa
@staticmethod
async def get_all_chat_users():
with local_session() as session:
return session.query(User).where(User.emailConfirmed).all()
@staticmethod @staticmethod
async def get_top_users(): async def get_top_users():
self = UserStorage self = UserStorage

View File

@ -1,4 +1,3 @@
from services.stat.viewed import ViewedStorage
from services.stat.reacted import ReactedStorage from services.stat.reacted import ReactedStorage
from services.auth.roles import RoleStorage from services.auth.roles import RoleStorage
from services.auth.users import UserStorage from services.auth.users import UserStorage
@ -10,7 +9,6 @@ from base.orm import local_session
async def storages_init(): async def storages_init():
with local_session() as session: with local_session() as session:
print('[main] initialize storages') print('[main] initialize storages')
ViewedStorage.init(session)
ReactedStorage.init(session) ReactedStorage.init(session)
RoleStorage.init(session) RoleStorage.init(session)
UserStorage.init(session) UserStorage.init(session)

View File

@ -9,6 +9,7 @@ class SearchService:
@staticmethod @staticmethod
async def init(session): async def init(session):
async with SearchService.lock: async with SearchService.lock:
print('[search.service] init')
SearchService.cache = {} SearchService.cache = {}
@staticmethod @staticmethod

View File

@ -2,6 +2,7 @@ import asyncio
from base.orm import local_session from base.orm import local_session
from orm.reaction import ReactionKind, Reaction from orm.reaction import ReactionKind, Reaction
from services.zine.topics import TopicStorage from services.zine.topics import TopicStorage
from services.stat.views import ViewStat
def kind_to_rate(kind) -> int: def kind_to_rate(kind) -> int:
@ -32,6 +33,15 @@ class ReactedStorage:
lock = asyncio.Lock() lock = asyncio.Lock()
modified_shouts = set([]) modified_shouts = set([])
@staticmethod
async def get_shout_stat(slug):
return {
"viewed": await ViewStat.get_shout(slug),
"reacted": len(await ReactedStorage.get_shout(slug)),
"commented": len(await ReactedStorage.get_comments(slug)),
"rating": await ReactedStorage.get_rating(slug),
}
@staticmethod @staticmethod
async def get_shout(shout_slug): async def get_shout(shout_slug):
self = ReactedStorage self = ReactedStorage
@ -158,22 +168,25 @@ class ReactedStorage:
self = ReactedStorage self = ReactedStorage
all_reactions = session.query(Reaction).all() all_reactions = session.query(Reaction).all()
self.modified_shouts = list(set([r.shout for r in all_reactions])) self.modified_shouts = list(set([r.shout for r in all_reactions]))
print("[stat.reacted] %d shouts with reactions loaded" % len(self.modified_shouts)) print("[stat.reacted] %d shouts with reactions" % len(self.modified_shouts))
@staticmethod @staticmethod
async def recount_changed(session): async def recount_changed(session):
self = ReactedStorage self = ReactedStorage
async with self.lock: async with self.lock:
print('[stat.reacted] recounting...') sss = list(self.modified_shouts)
for slug in list(self.modified_shouts): c = 0
for slug in sss:
siblings = session.query(Reaction).where(Reaction.shout == slug).all() siblings = session.query(Reaction).where(Reaction.shout == slug).all()
c += len(siblings)
await self.recount(siblings) await self.recount(siblings)
print("[stat.reacted] %d reactions total" % c)
print("[stat.reacted] %d shouts" % len(self.modified_shouts)) print("[stat.reacted] %d shouts" % len(self.modified_shouts))
print("[stat.reacted] %d topics" % len(self.reacted["topics"].values())) print("[stat.reacted] %d topics" % len(self.reacted["topics"].values()))
print("[stat.reacted] %d shouts" % len(self.reacted["shouts"])) print("[stat.reacted] %d shouts" % len(self.reacted["shouts"]))
print("[stat.reacted] %d authors" % len(self.reacted["authors"].values())) print("[stat.reacted] %d authors" % len(self.reacted["authors"].values()))
print("[stat.reacted] %d reactions" % len(self.reacted["reactions"])) print("[stat.reacted] %d reactions replied" % len(self.reacted["reactions"]))
self.modified_shouts = set([]) self.modified_shouts = set([])
@staticmethod @staticmethod

View File

@ -19,7 +19,7 @@ class TopicStat:
async def load_stat(session): async def load_stat(session):
self = TopicStat self = TopicStat
shout_topics = session.query(ShoutTopic).all() shout_topics = session.query(ShoutTopic).all()
print("[stat.topics] shouts linked %d times" % len(shout_topics)) print("[stat.topics] %d links for shouts" % len(shout_topics))
for shout_topic in shout_topics: for shout_topic in shout_topics:
tpc = shout_topic.topic tpc = shout_topic.topic
# shouts by topics # shouts by topics
@ -34,17 +34,14 @@ class TopicStat:
[aslug, acaption] = a [aslug, acaption] = a
self.authors_by_topic[tpc][aslug] = acaption self.authors_by_topic[tpc][aslug] = acaption
print("[stat.topics] shouts indexed by %d topics" % len(self.shouts_by_topic.keys()))
print("[stat.topics] authors indexed by %d topics" % len(self.authors_by_topic.keys()))
self.followers_by_topic = {} self.followers_by_topic = {}
followings = session.query(TopicFollower).all() followings = session.query(TopicFollower).all()
print("[stat.topics] %d followings by users" % len(followings))
for flw in followings: for flw in followings:
topic = flw.topic topic = flw.topic
userslug = flw.follower userslug = flw.follower
self.followers_by_topic[topic] = self.followers_by_topic.get(topic, dict()) self.followers_by_topic[topic] = self.followers_by_topic.get(topic, dict())
self.followers_by_topic[topic][userslug] = userslug self.followers_by_topic[topic][userslug] = userslug
print("[stat.topics] followers indexed by %d topics" % len(self.followers_by_topic.keys()))
@staticmethod @staticmethod
async def get_shouts(topic): async def get_shouts(topic):

View File

@ -1,110 +0,0 @@
import asyncio
from datetime import datetime
from base.orm import local_session
from sqlalchemy.orm.attributes import flag_modified
from orm.shout import ShoutTopic
from orm.viewed import ViewedByDay
class ViewedStorage:
viewed = {"shouts": {}, "topics": {}, "reactions": {}}
this_day_views = {}
to_flush = []
period = 30 * 60 # sec
lock = asyncio.Lock()
@staticmethod
def init(session):
self = ViewedStorage
views = session.query(ViewedByDay).all()
for view in views:
shout = view.shout
topics = (
session.query(ShoutTopic.topic).filter(ShoutTopic.shout == shout).all()
)
value = view.value
if shout:
old_value = self.viewed["shouts"].get(shout, 0)
self.viewed["shouts"][shout] = old_value + value
for t in topics:
old_topic_value = self.viewed["topics"].get(t, 0)
self.viewed["topics"][t] = old_topic_value + value
if shout not in self.this_day_views:
self.this_day_views[shout] = view
this_day_view = self.this_day_views[shout]
if this_day_view.day < view.day:
self.this_day_views[shout] = view
print("[stat.viewed] %d shouts viewed" % len(self.viewed['shouts']))
@staticmethod
async def get_shout(shout_slug):
self = ViewedStorage
async with self.lock:
return self.viewed["shouts"].get(shout_slug, 0)
@staticmethod
async def get_topic(topic_slug):
self = ViewedStorage
async with self.lock:
return self.viewed["topics"].get(topic_slug, 0)
@staticmethod
async def get_reaction(reaction_id):
self = ViewedStorage
async with self.lock:
return self.viewed["reactions"].get(reaction_id, 0)
@staticmethod
async def increment(shout_slug, amount=1):
self = ViewedStorage
async with self.lock:
this_day_view = self.this_day_views.get(shout_slug)
day_start = datetime.now().replace(hour=0, minute=0, second=0)
if not this_day_view or this_day_view.day < day_start:
if this_day_view and getattr(this_day_view, "modified", False):
self.to_flush.append(this_day_view)
this_day_view = ViewedByDay.create(shout=shout_slug, value=1)
self.this_day_views[shout_slug] = this_day_view
else:
this_day_view.value = this_day_view.value + amount
this_day_view.modified = True
self.viewed["shouts"][shout_slug] = (self.viewed["shouts"].get(shout_slug, 0) + amount)
with local_session() as session:
topics = (
session.query(ShoutTopic.topic)
.where(ShoutTopic.shout == shout_slug)
.all()
)
for t in topics:
self.viewed["topics"][t] = self.viewed["topics"].get(t, 0) + amount
flag_modified(this_day_view, "value")
@staticmethod
async def flush_changes(session):
self = ViewedStorage
async with self.lock:
for view in self.this_day_views.values():
if getattr(view, "modified", False):
session.add(view)
flag_modified(view, "value")
view.modified = False
for view in self.to_flush:
session.add(view)
self.to_flush.clear()
session.commit()
@staticmethod
async def worker():
while True:
try:
with local_session() as session:
await ViewedStorage.flush_changes(session)
print("[stat.viewed] periodical flush")
except Exception as err:
print("[stat.viewed] : %s" % (err))
await asyncio.sleep(ViewedStorage.period)

127
services/stat/views.py Normal file
View File

@ -0,0 +1,127 @@
import asyncio
import json
from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport
from base.redis import redis
from services.zine.topics import TopicStorage
from ssl import create_default_context
query_ackee_views = gql(
"""
query getDomainsFacts {
domains {
statistics {
views {
id
count
}
pages {
id
count
created
}
}
facts {
activeVisitors
# averageViews
# averageDuration
viewsToday
viewsMonth
viewsYear
}
}
}
"""
)
ssl = create_default_context()
class ViewStat:
lock = asyncio.Lock()
by_slugs = {}
by_topics = {}
period = 5 * 60 # 5 minutes
transport = AIOHTTPTransport(url="https://ackee.discours.io/", ssl=ssl)
client = Client(transport=transport, fetch_schema_from_transport=True)
@staticmethod
async def load_views():
# TODO: when the struture of paylod will be transparent
# TODO: perhaps ackee token getting here
self = ViewStat
async with self.lock:
self.by_topics = await redis.execute("GET", "views_by_topics")
if self.by_topics:
self.by_topics = dict(json.loads(self.by_topics))
else:
self.by_topics = {}
self.by_slugs = await redis.execute("GET", "views_by_shouts")
if self.by_slugs:
self.by_slugs = dict(json.loads(self.by_slugs))
else:
self.by_slugs = {}
domains = await self.client.execute_async(query_ackee_views)
print("[stat.ackee] loaded domains")
print(domains)
print('\n\n# TODO: something here...\n\n')
@staticmethod
async def get_shout(shout_slug):
self = ViewStat
async with self.lock:
return self.by_slugs.get(shout_slug) or 0
@staticmethod
async def get_topic(topic_slug):
self = ViewStat
async with self.lock:
shouts = self.by_topics.get(topic_slug) or {}
topic_views = 0
for v in shouts.values():
topic_views += v
return topic_views
@staticmethod
async def increment(shout_slug, amount=1):
self = ViewStat
async with self.lock:
self.by_slugs[shout_slug] = self.by_slugs.get(shout_slug) or 0
self.by_slugs[shout_slug] += amount
await redis.execute(
"SET",
f"views_by_shouts/{shout_slug}",
str(self.by_slugs[shout_slug])
)
shout_topics = await TopicStorage.get_topics_by_slugs([shout_slug, ])
for t in shout_topics:
self.by_topics[t] = self.by_topics.get(t) or {}
self.by_topics[t][shout_slug] = self.by_topics[t].get(shout_slug) or 0
self.by_topics[t][shout_slug] += amount
await redis.execute(
"SET",
f"views_by_topics/{t}/{shout_slug}",
str(self.by_topics[t][shout_slug])
)
@staticmethod
async def reset():
self = ViewStat
self.by_topics = {}
self.by_slugs = {}
@staticmethod
async def worker():
self = ViewStat
while True:
try:
await self.load_views()
except Exception as err:
print("[stat.ackee] : %s" % (err))
print("[stat.ackee] renew period: %d minutes" % (ViewStat.period / 60))
await asyncio.sleep(self.period)

View File

@ -17,7 +17,10 @@ class ShoutAuthorStorage:
for sa in sas: for sa in sas:
self.authors_by_shout[sa.shout] = self.authors_by_shout.get(sa.shout, []) self.authors_by_shout[sa.shout] = self.authors_by_shout.get(sa.shout, [])
self.authors_by_shout[sa.shout].append([sa.user, sa.caption]) self.authors_by_shout[sa.shout].append([sa.user, sa.caption])
print("[zine.shouts] %d shouts indexed by authors" % len(self.authors_by_shout)) self.shouts_by_author[sa.user] = self.shouts_by_author.get(sa.user, [])
self.shouts_by_author[sa.user].append(sa.shout)
print("[zine.authors] %d shouts indexed by authors" % len(self.authors_by_shout))
print("[zine.authors] %d authors indexed by shouts" % len(self.shouts_by_author))
@staticmethod @staticmethod
async def get_authors(shout): async def get_authors(shout):
@ -42,7 +45,7 @@ class ShoutAuthorStorage:
with local_session() as session: with local_session() as session:
async with self.lock: async with self.lock:
await self.load(session) await self.load(session)
print("[zine.shouts] index by authors was updated") print("[zine.authors] index by authors was updated")
except Exception as err: except Exception as err:
print("[zine.shouts] error indexing by author: %s" % (err)) print("[zine.authors] error indexing by author: %s" % (err))
await asyncio.sleep(self.period) await asyncio.sleep(self.period)

View File

@ -1,285 +0,0 @@
import asyncio
from datetime import datetime, timedelta
from sqlalchemy import and_, desc, func, select
from sqlalchemy.orm import selectinload
from base.orm import local_session
from orm.reaction import Reaction, ReactionKind
from orm.shout import Shout
from services.stat.reacted import ReactedStorage
async def get_shout_stat(slug):
return {
# TODO: use ackee as datasource
"viewed": 0, # await ViewedStorage.get_shout(slug),
"reacted": len(await ReactedStorage.get_shout(slug)),
"commented": len(await ReactedStorage.get_comments(slug)),
"rating": await ReactedStorage.get_rating(slug),
}
async def prepare_shouts(session, stmt):
shouts = []
print(stmt)
for s in list(map(lambda r: r.Shout, session.execute(stmt))):
s.stat = await get_shout_stat(s.slug)
shouts.append(s)
return shouts
LAYOUTS = ['audio', 'video', 'image', 'literature']
class ShoutsCache:
# limit = 200
period = 60 * 60 # 1 hour
lock = asyncio.Lock()
recent_published = []
recent_all = []
recent_reacted = []
recent_commented = []
top_month = []
top_overall = []
top_commented = []
by_author = {}
by_topic = {}
by_layout = {}
@staticmethod
async def prepare_recent_published():
with local_session() as session:
shouts = await prepare_shouts(
session,
(
select(Shout)
.options(
selectinload(Shout.authors),
selectinload(Shout.topics)
)
.where(Shout.deletedAt.is_(None))
.filter(Shout.publishedAt.is_not(None))
.group_by(Shout.id)
.order_by(desc("publishedAt"))
# .limit(ShoutsCache.limit)
),
)
async with ShoutsCache.lock:
for s in shouts:
for a in s.authors:
ShoutsCache.by_author[a.slug] = ShoutsCache.by_author.get(a.slug, {})
ShoutsCache.by_author[a.slug][s.slug] = s
for t in s.topics:
ShoutsCache.by_topic[t.slug] = ShoutsCache.by_topic.get(t.slug, {})
ShoutsCache.by_topic[t.slug][s.slug] = s
if s.layout in LAYOUTS:
ShoutsCache.by_layout[s.layout] = ShoutsCache.by_layout.get(s.layout, [])
ShoutsCache.by_layout[s.layout].append(s)
print("[zine.cache] indexed by %d topics " % len(ShoutsCache.by_topic.keys()))
print("[zine.cache] indexed by %d authors " % len(ShoutsCache.by_author.keys()))
print("[zine.cache] indexed by %d layouts " % len(ShoutsCache.by_layout.keys()))
ShoutsCache.recent_published = shouts
print("[zine.cache] %d recently published shouts " % len(shouts))
@staticmethod
async def prepare_recent_all():
with local_session() as session:
shouts = await prepare_shouts(
session,
(
select(Shout)
.options(
selectinload(Shout.authors),
selectinload(Shout.topics)
)
.where(Shout.deletedAt.is_(None))
.group_by(Shout.id)
.order_by(desc("createdAt"))
# .limit(ShoutsCache.limit)
)
)
async with ShoutsCache.lock:
ShoutsCache.recent_all = shouts
print("[zine.cache] %d recently created shouts " % len(ShoutsCache.recent_all))
@staticmethod
async def prepare_recent_reacted():
with local_session() as session:
reactions = session.query(Reaction).order_by(Reaction.createdAt).all()
# .limit(ShoutsCache.limit)
reacted_slugs = set([])
for r in reactions:
reacted_slugs.add(r.shout)
shouts = await prepare_shouts(
session,
(
select(
Shout,
Reaction.createdAt.label('reactedAt')
)
.options(
selectinload(Shout.authors),
selectinload(Shout.topics),
selectinload(Shout.reactions),
)
.join(Reaction)
.where(and_(Shout.deletedAt.is_(None), Shout.slug.in_(reacted_slugs)))
.filter(Shout.publishedAt.is_not(None))
.group_by(Shout.id, "reactedAt")
.order_by(desc("reactedAt"))
# .limit(ShoutsCache.limit)
)
)
async with ShoutsCache.lock:
ShoutsCache.recent_reacted = shouts
print("[zine.cache] %d recently reacted shouts " % len(shouts))
@staticmethod
async def prepare_recent_commented():
with local_session() as session:
reactions = session.query(Reaction).order_by(Reaction.createdAt).all()
# .limit(ShoutsCache.limit)
commented_slugs = set([])
for r in reactions:
if r.body and len(r.body) > 0:
commented_slugs.add(r.shout)
shouts = await prepare_shouts(
session,
(
select(
Shout,
Reaction.createdAt.label('reactedAt')
)
.options(
selectinload(Shout.authors),
selectinload(Shout.topics),
selectinload(Shout.reactions),
)
.join(Reaction)
.where(and_(Shout.deletedAt.is_(None), Shout.slug.in_(commented_slugs)))
.group_by(Shout.id, "reactedAt")
.order_by(desc("reactedAt"))
# .limit(ShoutsCache.limit)
)
)
async with ShoutsCache.lock:
ShoutsCache.recent_commented = shouts
print("[zine.cache] %d recently commented shouts " % len(shouts))
@staticmethod
async def prepare_top_overall():
with local_session() as session:
shouts = await prepare_shouts(
session,
(
select(
Shout,
func.sum(Reaction.id).label('reacted')
)
.options(
selectinload(Shout.authors),
selectinload(Shout.topics),
selectinload(Shout.reactions),
)
.join(Reaction, Reaction.kind == ReactionKind.LIKE)
.where(Shout.deletedAt.is_(None))
.filter(Shout.publishedAt.is_not(None))
.group_by(Shout.id)
.order_by(desc("reacted"))
# .limit(ShoutsCache.limit)
),
)
shouts.sort(key=lambda s: s.stat["rating"], reverse=True)
async with ShoutsCache.lock:
print("[zine.cache] %d top rated published " % len(shouts))
ShoutsCache.top_overall = shouts
@staticmethod
async def prepare_top_month():
month_ago = datetime.now() - timedelta(days=30)
with local_session() as session:
shouts = await prepare_shouts(
session,
(
select(Shout)
.options(
selectinload(Shout.authors),
selectinload(Shout.topics),
selectinload(Shout.reactions),
)
.join(Reaction)
.where(Shout.deletedAt.is_(None))
.filter(Shout.publishedAt > month_ago)
.group_by(Shout.id)
# .limit(ShoutsCache.limit)
),
)
shouts.sort(key=lambda s: s.stat["rating"], reverse=True)
async with ShoutsCache.lock:
ShoutsCache.top_month = shouts
print("[zine.cache] %d top month published " % len(ShoutsCache.top_month))
@staticmethod
async def prepare_top_commented():
month_ago = datetime.now() - timedelta(days=30)
with local_session() as session:
shouts = await prepare_shouts(
session,
(
select(
Shout,
func.sum(Reaction.id).label("commented")
)
.options(
selectinload(Shout.authors),
selectinload(Shout.topics),
selectinload(Shout.reactions)
)
.join(Reaction, func.length(Reaction.body) > 0)
.where(Shout.deletedAt.is_(None))
.filter(Shout.publishedAt > month_ago)
.group_by(Shout.id)
.order_by(desc("commented"))
# .limit(ShoutsCache.limit)
),
)
shouts.sort(key=lambda s: s.stat["commented"], reverse=True)
async with ShoutsCache.lock:
ShoutsCache.top_commented = shouts
print("[zine.cache] %d last month top commented shouts " % len(ShoutsCache.top_commented))
@staticmethod
async def get_top_published_before(daysago, offset, limit):
shouts_by_rating = []
before = datetime.now() - timedelta(days=daysago)
for s in ShoutsCache.recent_published:
if s.publishedAt >= before:
shouts_by_rating.append(s)
shouts_by_rating.sort(lambda s: s.stat["rating"], reverse=True)
return shouts_by_rating
@staticmethod
async def get_all_authors_slugs():
slugs = ShoutsCache.by_author.keys()
return slugs
@staticmethod
async def worker():
while True:
try:
await ShoutsCache.prepare_top_month()
await ShoutsCache.prepare_top_overall()
await ShoutsCache.prepare_top_commented()
await ShoutsCache.prepare_recent_published()
await ShoutsCache.prepare_recent_all()
await ShoutsCache.prepare_recent_reacted()
await ShoutsCache.prepare_recent_commented()
print("[zine.cache] periodical update")
except Exception as err:
print("[zine.cache] error: %s" % (err))
raise err
await asyncio.sleep(ShoutsCache.period)

View File

@ -8,8 +8,8 @@ DB_URL = (
) )
JWT_ALGORITHM = "HS256" JWT_ALGORITHM = "HS256"
JWT_SECRET_KEY = environ.get("JWT_SECRET_KEY") or "8f1bd7696ffb482d8486dfbc6e7d16dd-secret-key" JWT_SECRET_KEY = environ.get("JWT_SECRET_KEY") or "8f1bd7696ffb482d8486dfbc6e7d16dd-secret-key"
SESSION_TOKEN_LIFE_SPAN = 24 * 60 * 60 # seconds SESSION_TOKEN_LIFE_SPAN = 30 * 24 * 60 * 60 # 1 month in seconds
ONETIME_TOKEN_LIFE_SPAN = 1 * 60 * 60 # seconds ONETIME_TOKEN_LIFE_SPAN = 24 * 60 * 60 # 1 day in seconds
REDIS_URL = environ.get("REDIS_URL") or "redis://127.0.0.1" REDIS_URL = environ.get("REDIS_URL") or "redis://127.0.0.1"
MAILGUN_API_KEY = environ.get("MAILGUN_API_KEY") MAILGUN_API_KEY = environ.get("MAILGUN_API_KEY")