Merge branch 'following' into 'main'
following-manager See merge request discoursio/discoursio-api!10
This commit is contained in:
commit
761d17e02e
|
@ -12,7 +12,7 @@ from orm.user import User, Role
|
|||
|
||||
from settings import SESSION_TOKEN_HEADER
|
||||
from auth.tokenstorage import SessionToken
|
||||
from base.exceptions import InvalidToken, OperationNotAllowed
|
||||
from base.exceptions import OperationNotAllowed
|
||||
|
||||
|
||||
class JWTAuthenticate(AuthenticationBackend):
|
||||
|
@ -30,11 +30,8 @@ class JWTAuthenticate(AuthenticationBackend):
|
|||
user_id=None
|
||||
)
|
||||
|
||||
try:
|
||||
if len(token.split('.')) > 1:
|
||||
payload = await SessionToken.verify(token)
|
||||
if payload is None:
|
||||
return AuthCredentials(scopes=[]), AuthUser(user_id=None)
|
||||
user = None
|
||||
with local_session() as session:
|
||||
try:
|
||||
|
@ -46,13 +43,8 @@ class JWTAuthenticate(AuthenticationBackend):
|
|||
User.id == payload.user_id
|
||||
).one()
|
||||
)
|
||||
except exc.NoResultFound:
|
||||
user = None
|
||||
|
||||
if not user:
|
||||
return AuthCredentials(scopes=[]), AuthUser(user_id=None)
|
||||
|
||||
scopes = {} # await user.get_permission()
|
||||
scopes = {} # TODO: integrate await user.get_permission()
|
||||
|
||||
return (
|
||||
AuthCredentials(
|
||||
|
@ -62,12 +54,10 @@ class JWTAuthenticate(AuthenticationBackend):
|
|||
),
|
||||
AuthUser(user_id=user.id),
|
||||
)
|
||||
else:
|
||||
InvalidToken("please try again")
|
||||
except Exception as e:
|
||||
print("[auth.authenticate] session token verify error")
|
||||
print(e)
|
||||
return AuthCredentials(scopes=[], error_message=str(e)), AuthUser(user_id=None)
|
||||
except exc.NoResultFound:
|
||||
pass
|
||||
|
||||
return AuthCredentials(scopes=[], error_message=str('Invalid token')), AuthUser(user_id=None)
|
||||
|
||||
|
||||
def login_required(func):
|
||||
|
|
|
@ -23,6 +23,7 @@ class JWTCodec:
|
|||
@staticmethod
|
||||
def decode(token: str, verify_exp: bool = True) -> TokenPayload:
|
||||
r = None
|
||||
payload = None
|
||||
try:
|
||||
payload = jwt.decode(
|
||||
token,
|
||||
|
|
|
@ -13,6 +13,7 @@ class DraftTopic(Base):
|
|||
id = None # type: ignore
|
||||
collab = Column(ForeignKey("draft_collab.id"), primary_key=True)
|
||||
topic = Column(ForeignKey("topic.id"), primary_key=True)
|
||||
main = Column(Boolean, default=False)
|
||||
|
||||
|
||||
class DraftAuthor(Base):
|
||||
|
|
40
orm/shout.py
40
orm/shout.py
|
@ -15,6 +15,7 @@ class ShoutTopic(Base):
|
|||
id = None # type: ignore
|
||||
shout = Column(ForeignKey("shout.id"), primary_key=True, index=True)
|
||||
topic = Column(ForeignKey("topic.id"), primary_key=True, index=True)
|
||||
main = Column(Boolean, default=False)
|
||||
|
||||
|
||||
class ShoutReactionsFollower(Base):
|
||||
|
@ -42,28 +43,33 @@ class ShoutAuthor(Base):
|
|||
class Shout(Base):
|
||||
__tablename__ = "shout"
|
||||
|
||||
slug = Column(String, unique=True)
|
||||
community = Column(ForeignKey("community.id"), default=1)
|
||||
lang = Column(String, nullable=False, default='ru', comment="Language")
|
||||
body = Column(String, nullable=False, comment="Body")
|
||||
title = Column(String, nullable=True)
|
||||
subtitle = Column(String, nullable=True)
|
||||
layout = Column(String, nullable=True)
|
||||
mainTopic = Column(ForeignKey("topic.slug"), nullable=True)
|
||||
cover = Column(String, nullable=True, comment="Cover")
|
||||
authors = relationship(lambda: User, secondary=ShoutAuthor.__tablename__)
|
||||
topics = relationship(lambda: Topic, secondary=ShoutTopic.__tablename__)
|
||||
reactions = relationship(lambda: Reaction)
|
||||
visibility = Column(String, nullable=True) # owner authors community public
|
||||
versionOf = Column(ForeignKey("shout.id"), nullable=True)
|
||||
oid = Column(String, nullable=True)
|
||||
media = Column(JSON, nullable=True)
|
||||
|
||||
# timestamps
|
||||
createdAt = Column(DateTime, nullable=False, default=datetime.now, comment="Created at")
|
||||
updatedAt = Column(DateTime, nullable=True, comment="Updated at")
|
||||
publishedAt = Column(DateTime, nullable=True)
|
||||
deletedAt = Column(DateTime, nullable=True)
|
||||
|
||||
# same with Draft
|
||||
slug = Column(String, unique=True)
|
||||
cover = Column(String, nullable=True, comment="Cover")
|
||||
body = Column(String, nullable=False, comment="Body")
|
||||
title = Column(String, nullable=True)
|
||||
subtitle = Column(String, nullable=True)
|
||||
layout = Column(String, nullable=True)
|
||||
media = Column(JSON, nullable=True)
|
||||
authors = relationship(lambda: User, secondary=ShoutAuthor.__tablename__)
|
||||
topics = relationship(lambda: Topic, secondary=ShoutTopic.__tablename__)
|
||||
|
||||
reactions = relationship(lambda: Reaction)
|
||||
|
||||
# TODO: these field should be used or modified
|
||||
community = Column(ForeignKey("community.id"), default=1)
|
||||
lang = Column(String, nullable=False, default='ru', comment="Language")
|
||||
mainTopic = Column(ForeignKey("topic.slug"), nullable=True)
|
||||
visibility = Column(String, nullable=True) # owner authors community public
|
||||
versionOf = Column(ForeignKey("shout.id"), nullable=True)
|
||||
oid = Column(String, nullable=True)
|
||||
|
||||
@staticmethod
|
||||
def init_table():
|
||||
with local_session() as session:
|
||||
|
|
|
@ -8,8 +8,8 @@ from resolvers.auth import (
|
|||
get_current_user,
|
||||
)
|
||||
|
||||
from resolvers.create.collab import load_drafts, create_draft, update_draft, delete_draft,\
|
||||
accept_coauthor, invite_coauthor
|
||||
from resolvers.create.drafts import load_drafts, create_draft, update_draft, delete_draft,\
|
||||
accept_coauthor, invite_coauthor, draft_to_shout
|
||||
from resolvers.create.migrate import markdown_body
|
||||
from resolvers.create.editor import create_shout, delete_shout, update_shout
|
||||
|
||||
|
@ -87,19 +87,18 @@ __all__ = [
|
|||
# zine.following
|
||||
"follow",
|
||||
"unfollow",
|
||||
# create.editor
|
||||
# create
|
||||
"create_shout",
|
||||
"update_shout",
|
||||
"delete_shout",
|
||||
# create.migrate
|
||||
"markdown_body",
|
||||
# create.collab
|
||||
"load_drafts",
|
||||
"create_draft",
|
||||
"update_draft",
|
||||
"delete_draft",
|
||||
"invite_coauthor",
|
||||
"accept_coauthor",
|
||||
"draft_to_shout",
|
||||
# zine.topics
|
||||
"topics_all",
|
||||
"topics_by_community",
|
||||
|
|
|
@ -6,7 +6,7 @@ from urllib.parse import quote_plus
|
|||
from graphql.type import GraphQLResolveInfo
|
||||
from starlette.responses import RedirectResponse
|
||||
from transliterate import translit
|
||||
|
||||
import re
|
||||
from auth.authenticate import login_required
|
||||
from auth.credentials import AuthCredentials
|
||||
from auth.email import send_auth_email
|
||||
|
@ -92,6 +92,7 @@ def create_user(user_dict):
|
|||
def generate_unique_slug(src):
|
||||
print('[resolvers.auth] generating slug from: ' + src)
|
||||
slug = translit(src, "ru", reversed=True).replace(".", "-").lower()
|
||||
slug = re.sub('[^0-9a-zA-Z]+', '-', slug)
|
||||
if slug != src:
|
||||
print('[resolvers.auth] translited name: ' + slug)
|
||||
c = 1
|
||||
|
|
|
@ -2,10 +2,13 @@ from auth.authenticate import login_required
|
|||
from auth.credentials import AuthCredentials
|
||||
from base.orm import local_session
|
||||
from base.resolvers import query, mutation
|
||||
from base.exceptions import ObjectNotExist, BaseHttpException
|
||||
from orm.draft import DraftCollab, DraftAuthor, DraftTopic
|
||||
from orm.draft import DraftCollab, DraftAuthor
|
||||
from orm.shout import Shout
|
||||
from orm.topic import Topic
|
||||
from orm.user import User
|
||||
from datetime import datetime, timezone
|
||||
from transliterate import translit
|
||||
import re
|
||||
|
||||
|
||||
@query.field("loadDrafts")
|
||||
|
@ -22,7 +25,7 @@ async def load_drafts(_, info):
|
|||
@login_required
|
||||
async def create_draft(_, info, draft_input):
|
||||
auth: AuthCredentials = info.context["request"].auth
|
||||
|
||||
draft_input['createdBy'] = auth.user_id
|
||||
with local_session() as session:
|
||||
collab = DraftCollab.create(**draft_input)
|
||||
session.add(collab)
|
||||
|
@ -32,24 +35,23 @@ async def create_draft(_, info, draft_input):
|
|||
return {}
|
||||
|
||||
|
||||
@mutation.field("deleteDraft") # TODO
|
||||
@mutation.field("deleteDraft")
|
||||
@login_required
|
||||
async def delete_draft(_, info, draft: int = 0):
|
||||
auth: AuthCredentials = info.context["request"].auth
|
||||
|
||||
with local_session() as session:
|
||||
collab = session.query(DraftCollab).where(DraftCollab.id == draft_input.id).one()
|
||||
if auth.user_id not in s.authors:
|
||||
d = session.query(DraftCollab).where(DraftCollab.id == draft).one()
|
||||
if auth.user_id not in d.authors:
|
||||
# raise BaseHttpException("only owner can remove coauthors")
|
||||
return {
|
||||
"error": "Only authors can update a draft"
|
||||
}
|
||||
elif not collab:
|
||||
elif not d:
|
||||
return {
|
||||
"error": "There is no draft with this id"
|
||||
}
|
||||
else:
|
||||
session.delete(collab)
|
||||
session.delete(d)
|
||||
session.commit()
|
||||
return {}
|
||||
|
||||
|
@ -60,24 +62,29 @@ async def update_draft(_, info, draft_input):
|
|||
auth: AuthCredentials = info.context["request"].auth
|
||||
|
||||
with local_session() as session:
|
||||
collab = session.query(DraftCollab).where(DraftCollab.id == draft_input.id).one() # raises Error when not found
|
||||
if auth.user_id not in s.authors:
|
||||
d = session.query(
|
||||
DraftCollab
|
||||
).where(
|
||||
DraftCollab.id == draft_input.id
|
||||
).one() # raises Error when not found
|
||||
if auth.user_id not in d.authors:
|
||||
# raise BaseHttpException("only owner can remove coauthors")
|
||||
return {
|
||||
"error": "Only authors can update draft"
|
||||
}
|
||||
elif not s:
|
||||
elif not d:
|
||||
return {
|
||||
"error": "There is no draft with this id"
|
||||
}
|
||||
else:
|
||||
draft_input["updatedAt"] = datetime.now(tz=timezone.utc)
|
||||
collab.update(draft_input)
|
||||
d.update(draft_input)
|
||||
session.commit()
|
||||
|
||||
# TODO: email notify
|
||||
return {}
|
||||
|
||||
|
||||
@mutation.field("inviteAuthor")
|
||||
@login_required
|
||||
async def invite_coauthor(_, info, author: int = 0, draft: int = 0):
|
||||
|
@ -108,20 +115,32 @@ async def invite_coauthor(_, info, author: int = 0, draft: int = 0):
|
|||
return {}
|
||||
|
||||
|
||||
def get_slug(src):
|
||||
slug = translit(src, "ru", reversed=True).replace(".", "-").lower()
|
||||
slug = re.sub('[^0-9a-zA-Z]+', '-', slug)
|
||||
return slug
|
||||
|
||||
|
||||
@mutation.field("inviteAccept")
|
||||
@login_required
|
||||
async def accept_coauthor(_, info, draft: int):
|
||||
auth: AuthCredentials = info.context["request"].auth
|
||||
|
||||
with local_session() as session:
|
||||
# c = session.query(DraftCollab).where(DraftCollab.id == draft).one()
|
||||
a = session.query(DraftAuthor).where(DraftAuthor.collab == draft).filter(DraftAuthor.author == auth.user_id).one()
|
||||
d = session.query(DraftCollab).where(DraftCollab.id == draft).one()
|
||||
if not d:
|
||||
return {
|
||||
"error": "Draft id was not found"
|
||||
}
|
||||
else:
|
||||
a = session.query(DraftAuthor).where(DraftAuthor.collab == draft).filter(
|
||||
DraftAuthor.author == auth.user_id).one()
|
||||
if not a.accepted:
|
||||
a.accepted = True
|
||||
session.commit()
|
||||
# TODO: email notify
|
||||
return {}
|
||||
elif a.accepted == True:
|
||||
elif a.accepted:
|
||||
return {
|
||||
"error": "You have accepted invite before"
|
||||
}
|
||||
|
@ -130,3 +149,41 @@ async def accept_coauthor(_, info, draft: int):
|
|||
return {
|
||||
"error": "You don't have an invitation yet"
|
||||
}
|
||||
|
||||
|
||||
@mutation.field("draftToShout")
|
||||
@login_required
|
||||
async def draft_to_shout(_, info, draft: int = 0):
|
||||
auth: AuthCredentials = info.context["request"].auth
|
||||
|
||||
with local_session() as session:
|
||||
d = session.query(DraftCollab).where(DraftCollab.id == draft).one()
|
||||
if auth.user_id not in d.authors:
|
||||
# raise BaseHttpException("you are not in authors list")
|
||||
return {
|
||||
"error": "You are not in authors list"
|
||||
}
|
||||
elif d.id:
|
||||
draft_authors = [a.author for a in d.authors]
|
||||
draft_topics = [t.topic for t in d.topics]
|
||||
authors = session.query(User).where(User.id._in(draft_authors)).all()
|
||||
topics = session.query(Topic).where(Topic.id._in(draft_topics)).all()
|
||||
new_shout = Shout.create({
|
||||
"authors": authors,
|
||||
"body": d.body,
|
||||
"title": d.title,
|
||||
"subtitle": d.subtitle or "",
|
||||
"topics": topics,
|
||||
"media": d.media,
|
||||
"slug": d.slug or get_slug(d.title),
|
||||
"layout": d.layout or "article"
|
||||
})
|
||||
session.add(new_shout)
|
||||
session.commit()
|
||||
else:
|
||||
return {
|
||||
"error": "Draft is not found"
|
||||
}
|
||||
|
||||
# TODO: email notify
|
||||
return {}
|
|
@ -12,9 +12,9 @@ from orm.topic import TopicFollower, Topic
|
|||
from orm.user import User
|
||||
from resolvers.zine.reactions import reactions_follow, reactions_unfollow
|
||||
from services.zine.gittask import GitTask
|
||||
from resolvers.inbox.chats import create_chat
|
||||
from services.inbox.storage import MessagesStorage
|
||||
from orm.draft import DraftCollab
|
||||
# from resolvers.inbox.chats import create_chat
|
||||
# from services.inbox.storage import MessagesStorage
|
||||
# from orm.draft import DraftCollab
|
||||
|
||||
|
||||
@mutation.field("createShout")
|
||||
|
|
|
@ -50,12 +50,14 @@ async def load_chats(_, info, limit: int = 50, offset: int = 0):
|
|||
auth: AuthCredentials = info.context["request"].auth
|
||||
|
||||
cids = await redis.execute("SMEMBERS", "chats_by_user/" + str(auth.user_id))
|
||||
onliners = await redis.execute("SMEMBERS", "users-online")
|
||||
if cids:
|
||||
cids = list(cids)[offset:offset + limit]
|
||||
if not cids:
|
||||
print('[inbox.load] no chats were found')
|
||||
cids = []
|
||||
onliners = await redis.execute("SMEMBERS", "users-online")
|
||||
if not onliners:
|
||||
onliners = []
|
||||
chats = []
|
||||
for cid in cids:
|
||||
cid = cid.decode("utf-8")
|
||||
|
@ -124,8 +126,10 @@ async def load_messages_by(_, info, by, limit: int = 10, offset: int = 0):
|
|||
async def load_recipients(_, info, limit=50, offset=0):
|
||||
chat_users = []
|
||||
auth: AuthCredentials = info.context["request"].auth
|
||||
try:
|
||||
onliners = await redis.execute("SMEMBERS", "users-online")
|
||||
if not onliners:
|
||||
onliners = []
|
||||
try:
|
||||
chat_users += await followed_authors(auth.user_id)
|
||||
limit = limit - len(chat_users)
|
||||
except Exception:
|
||||
|
|
|
@ -7,8 +7,7 @@ from auth.authenticate import login_required
|
|||
from auth.credentials import AuthCredentials
|
||||
from base.redis import redis
|
||||
from base.resolvers import mutation, subscription
|
||||
from services.inbox.helpers import ChatFollowing, MessageResult
|
||||
from services.inbox.storage import MessagesStorage
|
||||
from services.following import FollowingManager, FollowingResult, Following
|
||||
from validations.inbox import Message
|
||||
|
||||
|
||||
|
@ -51,8 +50,8 @@ async def create_message(_, info, chat: str, body: str, replyTo=None):
|
|||
"LPUSH", f"chats/{chat['id']}/unread/{user_slug}", str(message_id)
|
||||
)
|
||||
|
||||
result = MessageResult("NEW", new_message)
|
||||
await MessagesStorage.put(result)
|
||||
result = FollowingResult("NEW", 'chat', new_message)
|
||||
await FollowingManager.push('chat', result)
|
||||
|
||||
return {
|
||||
"message": new_message,
|
||||
|
@ -82,8 +81,8 @@ async def update_message(_, info, chat_id: str, message_id: int, body: str):
|
|||
|
||||
await redis.execute("SET", f"chats/{chat_id}/messages/{message_id}", json.dumps(message))
|
||||
|
||||
result = MessageResult("UPDATED", message)
|
||||
await MessagesStorage.put(result)
|
||||
result = FollowingResult("UPDATED", 'chat', message)
|
||||
await FollowingManager.push('chat', result)
|
||||
|
||||
return {
|
||||
"message": message,
|
||||
|
@ -115,8 +114,8 @@ async def delete_message(_, info, chat_id: str, message_id: int):
|
|||
for user_id in users:
|
||||
await redis.execute("LREM", f"chats/{chat_id}/unread/{user_id}", 0, str(message_id))
|
||||
|
||||
result = MessageResult("DELETED", message)
|
||||
await MessagesStorage.put(result)
|
||||
result = FollowingResult("DELETED", 'chat', message)
|
||||
await FollowingManager.push(result)
|
||||
|
||||
return {}
|
||||
|
||||
|
@ -162,8 +161,8 @@ async def message_generator(_, info: GraphQLResolveInfo):
|
|||
user_following_chats_sorted = sorted(user_following_chats, key=lambda x: updated[x], reverse=True)
|
||||
|
||||
for chat_id in user_following_chats_sorted:
|
||||
following_chat = ChatFollowing(chat_id)
|
||||
await MessagesStorage.register_chat(following_chat)
|
||||
following_chat = Following('chat', chat_id)
|
||||
await FollowingManager.register('chat', following_chat)
|
||||
chat_task = following_chat.queue.get()
|
||||
tasks.append(chat_task)
|
||||
|
||||
|
@ -171,7 +170,7 @@ async def message_generator(_, info: GraphQLResolveInfo):
|
|||
msg = await asyncio.gather(*tasks)
|
||||
yield msg
|
||||
finally:
|
||||
await MessagesStorage.remove_chat(following_chat)
|
||||
await FollowingManager.remove('chat', following_chat)
|
||||
|
||||
|
||||
@subscription.field("newMessage")
|
||||
|
|
|
@ -1,11 +1,16 @@
|
|||
import asyncio
|
||||
from base.orm import local_session
|
||||
from base.resolvers import mutation, subscription
|
||||
from auth.authenticate import login_required
|
||||
from auth.credentials import AuthCredentials
|
||||
from base.resolvers import mutation, subscription
|
||||
# from resolvers.community import community_follow, community_unfollow
|
||||
from orm.user import AuthorFollower
|
||||
from orm.topic import TopicFollower
|
||||
from orm.shout import ShoutReactionsFollower
|
||||
from resolvers.zine.profile import author_follow, author_unfollow
|
||||
from resolvers.zine.reactions import reactions_follow, reactions_unfollow
|
||||
from resolvers.zine.topics import topic_follow, topic_unfollow
|
||||
import asyncio
|
||||
from services.following import Following, FollowingManager, FollowingResult
|
||||
from graphql.type import GraphQLResolveInfo
|
||||
|
||||
|
||||
|
@ -16,15 +21,23 @@ async def follow(_, info, what, slug):
|
|||
|
||||
try:
|
||||
if what == "AUTHOR":
|
||||
author_follow(auth.user_id, slug)
|
||||
if author_follow(auth.user_id, slug):
|
||||
result = FollowingResult("NEW", 'author', slug)
|
||||
await FollowingManager.push('author', result)
|
||||
elif what == "TOPIC":
|
||||
topic_follow(auth.user_id, slug)
|
||||
if topic_follow(auth.user_id, slug):
|
||||
result = FollowingResult("NEW", 'topic', slug)
|
||||
await FollowingManager.push('topic', result)
|
||||
elif what == "COMMUNITY":
|
||||
# community_follow(user, slug)
|
||||
pass
|
||||
if False: # TODO: use community_follow(auth.user_id, slug):
|
||||
result = FollowingResult("NEW", 'community', slug)
|
||||
await FollowingManager.push('community', result)
|
||||
elif what == "REACTIONS":
|
||||
reactions_follow(auth.user_id, slug)
|
||||
if reactions_follow(auth.user_id, slug):
|
||||
result = FollowingResult("NEW", 'shout', slug)
|
||||
await FollowingManager.push('shout', result)
|
||||
except Exception as e:
|
||||
print(Exception(e))
|
||||
return {"error": str(e)}
|
||||
|
||||
return {}
|
||||
|
@ -37,20 +50,28 @@ async def unfollow(_, info, what, slug):
|
|||
|
||||
try:
|
||||
if what == "AUTHOR":
|
||||
author_unfollow(auth.user_id, slug)
|
||||
if author_unfollow(auth.user_id, slug):
|
||||
result = FollowingResult("DELETED", 'author', slug)
|
||||
await FollowingManager.push('author', result)
|
||||
elif what == "TOPIC":
|
||||
topic_unfollow(auth.user_id, slug)
|
||||
if topic_unfollow(auth.user_id, slug):
|
||||
result = FollowingResult("DELETED", 'topic', slug)
|
||||
await FollowingManager.push('topic', result)
|
||||
elif what == "COMMUNITY":
|
||||
# community_unfollow(user, slug)
|
||||
pass
|
||||
if False: # TODO: use community_unfollow(auth.user_id, slug):
|
||||
result = FollowingResult("DELETED", 'community', slug)
|
||||
await FollowingManager.push('community', result)
|
||||
elif what == "REACTIONS":
|
||||
reactions_unfollow(auth.user_id, slug)
|
||||
if reactions_unfollow(auth.user_id, slug):
|
||||
result = FollowingResult("DELETED", 'shout', slug)
|
||||
await FollowingManager.push('shout', result)
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
return {}
|
||||
|
||||
|
||||
# by author and by topic
|
||||
@subscription.source("newShout")
|
||||
@login_required
|
||||
async def shout_generator(_, info: GraphQLResolveInfo):
|
||||
|
@ -60,7 +81,37 @@ async def shout_generator(_, info: GraphQLResolveInfo):
|
|||
try:
|
||||
tasks = []
|
||||
|
||||
# TODO: implement when noticing new shout
|
||||
with local_session() as session:
|
||||
|
||||
# notify new shout by followed authors
|
||||
following_topics = session.query(TopicFollower).where(TopicFollower.follower == user_id).all()
|
||||
|
||||
for topic_id in following_topics:
|
||||
following_topic = Following('topic', topic_id)
|
||||
await FollowingManager.register('topic', following_topic)
|
||||
following_topic_task = following_topic.queue.get()
|
||||
tasks.append(following_topic_task)
|
||||
|
||||
# by followed topics
|
||||
following_authors = session.query(AuthorFollower).where(
|
||||
AuthorFollower.follower == user_id).all()
|
||||
|
||||
for author_id in following_authors:
|
||||
following_author = Following('author', author_id)
|
||||
await FollowingManager.register('author', following_author)
|
||||
following_author_task = following_author.queue.get()
|
||||
tasks.append(following_author_task)
|
||||
|
||||
# TODO: use communities
|
||||
# by followed communities
|
||||
# following_communities = session.query(CommunityFollower).where(
|
||||
# CommunityFollower.follower == user_id).all()
|
||||
|
||||
# for community_id in following_communities:
|
||||
# following_community = Following('community', author_id)
|
||||
# await FollowingManager.register('community', following_community)
|
||||
# following_community_task = following_community.queue.get()
|
||||
# tasks.append(following_community_task)
|
||||
|
||||
while True:
|
||||
shout = await asyncio.gather(*tasks)
|
||||
|
@ -76,9 +127,18 @@ async def reaction_generator(_, info):
|
|||
auth: AuthCredentials = info.context["request"].auth
|
||||
user_id = auth.user_id
|
||||
try:
|
||||
tasks = []
|
||||
with local_session() as session:
|
||||
followings = session.query(ShoutReactionsFollower.shout).where(
|
||||
ShoutReactionsFollower.follower == user_id).unique()
|
||||
|
||||
# TODO: implement when noticing new reaction
|
||||
# notify new reaction
|
||||
|
||||
tasks = []
|
||||
for shout_id in followings:
|
||||
following_shout = Following('shout', shout_id)
|
||||
await FollowingManager.register('shout', following_shout)
|
||||
following_author_task = following_shout.queue.get()
|
||||
tasks.append(following_author_task)
|
||||
|
||||
while True:
|
||||
reaction = await asyncio.gather(*tasks)
|
||||
|
|
|
@ -3,13 +3,15 @@ from datetime import datetime, timedelta, timezone
|
|||
from sqlalchemy.orm import joinedload, aliased
|
||||
from sqlalchemy.sql.expression import desc, asc, select, func, case
|
||||
|
||||
from auth.authenticate import login_required
|
||||
from auth.credentials import AuthCredentials
|
||||
from base.exceptions import ObjectNotExist
|
||||
from base.orm import local_session
|
||||
from base.resolvers import query
|
||||
from orm import ViewedEntry
|
||||
from orm import ViewedEntry, TopicFollower
|
||||
from orm.reaction import Reaction, ReactionKind
|
||||
from orm.shout import Shout, ShoutAuthor
|
||||
from orm.shout import Shout, ShoutAuthor, ShoutTopic
|
||||
from orm.user import AuthorFollower
|
||||
|
||||
|
||||
def add_stat_columns(q):
|
||||
|
@ -193,3 +195,56 @@ async def load_shouts_by(_, info, options):
|
|||
shouts_map[shout_id].stat['viewed'] = viewed_stat
|
||||
|
||||
return shouts
|
||||
|
||||
|
||||
@query.field("myFeed")
|
||||
@login_required
|
||||
async def get_my_feed(_, info, options):
|
||||
auth: AuthCredentials = info.context["request"].auth
|
||||
user_id = auth.user_id
|
||||
|
||||
q = select(Shout).options(
|
||||
joinedload(Shout.authors),
|
||||
joinedload(Shout.topics),
|
||||
).where(
|
||||
Shout.deletedAt.is_(None)
|
||||
)
|
||||
|
||||
q = q.join(
|
||||
ShoutAuthor
|
||||
).join(
|
||||
AuthorFollower
|
||||
).where(
|
||||
AuthorFollower.follower == user_id
|
||||
).join(
|
||||
ShoutTopic
|
||||
).join(
|
||||
TopicFollower
|
||||
).where(TopicFollower.follower == user_id)
|
||||
|
||||
q = add_stat_columns(q)
|
||||
q = apply_filters(q, options.get("filters", {}), user_id)
|
||||
|
||||
order_by = options.get("order_by", Shout.createdAt)
|
||||
if order_by == 'reacted':
|
||||
aliased_reaction = aliased(Reaction)
|
||||
q.outerjoin(aliased_reaction).add_columns(func.max(aliased_reaction.createdAt).label('reacted'))
|
||||
|
||||
query_order_by = desc(order_by) if options.get('order_by_desc', True) else asc(order_by)
|
||||
offset = options.get("offset", 0)
|
||||
limit = options.get("limit", 10)
|
||||
|
||||
q = q.group_by(Shout.id).order_by(query_order_by).limit(limit).offset(offset)
|
||||
|
||||
shouts = []
|
||||
with local_session() as session:
|
||||
for [shout, reacted_stat, commented_stat, rating_stat] in session.execute(q).unique():
|
||||
shouts.append(shout)
|
||||
shout.stat = {
|
||||
"viewed": 0,
|
||||
"reacted": reacted_stat,
|
||||
"commented": commented_stat,
|
||||
"rating": rating_stat
|
||||
}
|
||||
|
||||
return shouts
|
||||
|
|
|
@ -198,11 +198,15 @@ async def rate_user(_, info, rated_userslug, value):
|
|||
|
||||
# for mutation.field("follow")
|
||||
def author_follow(user_id, slug):
|
||||
try:
|
||||
with local_session() as session:
|
||||
author = session.query(User).where(User.slug == slug).one()
|
||||
af = AuthorFollower.create(follower=user_id, author=author.id)
|
||||
session.add(af)
|
||||
session.commit()
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
|
||||
|
||||
# for mutation.field("unfollow")
|
||||
|
@ -217,14 +221,11 @@ def author_unfollow(user_id, slug):
|
|||
)
|
||||
).first()
|
||||
)
|
||||
if not flw:
|
||||
return {
|
||||
"error": "Follower is not exist, cant unfollow"
|
||||
}
|
||||
else:
|
||||
if flw:
|
||||
session.delete(flw)
|
||||
session.commit()
|
||||
return {}
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
@query.field("authorsAll")
|
||||
|
|
|
@ -40,6 +40,7 @@ def add_reaction_stat_columns(q):
|
|||
|
||||
|
||||
def reactions_follow(user_id, shout_id: int, auto=False):
|
||||
try:
|
||||
with local_session() as session:
|
||||
shout = session.query(Shout).where(Shout.id == shout_id).one()
|
||||
|
||||
|
@ -58,9 +59,13 @@ def reactions_follow(user_id, shout_id: int, auto=False):
|
|||
)
|
||||
session.add(following)
|
||||
session.commit()
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
|
||||
|
||||
def reactions_unfollow(user_id: int, shout_id: int):
|
||||
try:
|
||||
with local_session() as session:
|
||||
shout = session.query(Shout).where(Shout.id == shout_id).one()
|
||||
|
||||
|
@ -74,6 +79,10 @@ def reactions_unfollow(user_id: int, shout_id: int):
|
|||
if following:
|
||||
session.delete(following)
|
||||
session.commit()
|
||||
return True
|
||||
except:
|
||||
pass
|
||||
return False
|
||||
|
||||
|
||||
def is_published_author(session, user_id):
|
||||
|
|
|
@ -117,15 +117,20 @@ async def update_topic(_, _info, inp):
|
|||
|
||||
|
||||
def topic_follow(user_id, slug):
|
||||
try:
|
||||
with local_session() as session:
|
||||
topic = session.query(Topic).where(Topic.slug == slug).one()
|
||||
|
||||
following = TopicFollower.create(topic=topic.id, follower=user_id)
|
||||
session.add(following)
|
||||
session.commit()
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
|
||||
|
||||
def topic_unfollow(user_id, slug):
|
||||
try:
|
||||
with local_session() as session:
|
||||
sub = (
|
||||
session.query(TopicFollower).join(Topic).filter(
|
||||
|
@ -135,11 +140,13 @@ def topic_unfollow(user_id, slug):
|
|||
)
|
||||
).first()
|
||||
)
|
||||
if not sub:
|
||||
raise Exception("[resolvers.topics] follower not exist")
|
||||
else:
|
||||
if sub:
|
||||
session.delete(sub)
|
||||
session.commit()
|
||||
return True
|
||||
except:
|
||||
pass
|
||||
return False
|
||||
|
||||
|
||||
@query.field("topicsRandom")
|
||||
|
|
|
@ -69,7 +69,6 @@ type Result {
|
|||
members: [ChatMember]
|
||||
shout: Shout
|
||||
shouts: [Shout]
|
||||
drafts: [DraftCollab]
|
||||
author: Author
|
||||
authors: [Author]
|
||||
reaction: Reaction
|
||||
|
@ -78,6 +77,8 @@ type Result {
|
|||
topics: [Topic]
|
||||
community: Community
|
||||
communities: [Community]
|
||||
draft: DraftCollab
|
||||
drafts: [DraftCollab]
|
||||
}
|
||||
|
||||
enum ReactionStatus {
|
||||
|
@ -207,6 +208,7 @@ type Mutation {
|
|||
deleteDraft(draft: Int!): Result!
|
||||
inviteAccept(draft: Int!): Result!
|
||||
inviteAuthor(draft: Int!, author: Int!): Result!
|
||||
draftToShout(draft: Int!): Result!
|
||||
|
||||
# following
|
||||
follow(what: FollowingEntity!, slug: String!): Result!
|
||||
|
@ -302,6 +304,7 @@ type Query {
|
|||
userFollowedTopics(slug: String!): [Topic]!
|
||||
authorsAll: [Author]!
|
||||
getAuthor(slug: String!): User
|
||||
myFeed(options: LoadShoutsOptions): [Shout]
|
||||
|
||||
# draft/collab
|
||||
loadDrafts: [DraftCollab]!
|
||||
|
|
51
services/following.py
Normal file
51
services/following.py
Normal file
|
@ -0,0 +1,51 @@
|
|||
import asyncio
|
||||
|
||||
|
||||
class FollowingResult:
|
||||
def __init__(self, event, kind, payload):
|
||||
self.event = event
|
||||
self.kind = kind
|
||||
self.payload = payload
|
||||
|
||||
|
||||
class Following:
|
||||
queue = asyncio.Queue()
|
||||
|
||||
def __init__(self, kind, uid):
|
||||
self.kind = kind # author topic shout chat
|
||||
self.uid = uid
|
||||
|
||||
|
||||
class FollowingManager:
|
||||
lock = asyncio.Lock()
|
||||
data = {
|
||||
'author': [],
|
||||
'topic': [],
|
||||
'shout': [],
|
||||
'chat': []
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
async def register(kind, uid):
|
||||
async with FollowingManager.lock:
|
||||
FollowingManager[kind].append(uid)
|
||||
|
||||
@staticmethod
|
||||
async def remove(kind, uid):
|
||||
async with FollowingManager.lock:
|
||||
FollowingManager[kind].remove(uid)
|
||||
|
||||
@staticmethod
|
||||
async def push(kind, payload):
|
||||
try:
|
||||
async with FollowingManager.lock:
|
||||
if kind == 'chat':
|
||||
for chat in FollowingManager['chat']:
|
||||
if payload.message["chatId"] == chat.uid:
|
||||
chat.queue.put_nowait(payload)
|
||||
else:
|
||||
for entity in FollowingManager[kind]:
|
||||
if payload.shout['createdBy'] == entity.uid:
|
||||
entity.queue.put_nowait(payload)
|
||||
except Exception as e:
|
||||
print(Exception(e))
|
|
@ -1,14 +0,0 @@
|
|||
import asyncio
|
||||
|
||||
|
||||
class MessageResult:
|
||||
def __init__(self, status, message):
|
||||
self.seen = status
|
||||
self.message = message
|
||||
|
||||
|
||||
class ChatFollowing:
|
||||
queue = asyncio.Queue()
|
||||
|
||||
def __init__(self, chat_id):
|
||||
self.chat_id = chat_id
|
|
@ -1,23 +0,0 @@
|
|||
import asyncio
|
||||
|
||||
|
||||
class MessagesStorage:
|
||||
lock = asyncio.Lock()
|
||||
chats = []
|
||||
|
||||
@staticmethod
|
||||
async def register_chat(chat):
|
||||
async with MessagesStorage.lock:
|
||||
MessagesStorage.chats.append(chat)
|
||||
|
||||
@staticmethod
|
||||
async def remove_chat(chat):
|
||||
async with MessagesStorage.lock:
|
||||
MessagesStorage.chats.remove(chat)
|
||||
|
||||
@staticmethod
|
||||
async def put(message_result):
|
||||
async with MessagesStorage.lock:
|
||||
for chat in MessagesStorage.chats:
|
||||
if message_result.message["chatId"] == chat.chat_id:
|
||||
chat.queue.put_nowait(message_result)
|
Loading…
Reference in New Issue
Block a user