views from ackee

This commit is contained in:
tonyrewin 2022-11-15 12:25:04 +03:00
parent 9942fc2558
commit efc3531c33
12 changed files with 126 additions and 196 deletions

View File

@ -1,45 +0,0 @@
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport
# Provide a GraphQL query
query_ackee_views = gql(
"""
query getDomainsFacts {
domains {
statistics {
views {
id
count
}
pages {
id
count
created
}
}
facts {
activeVisitors
# averageViews
# averageDuration
viewsToday
viewsMonth
viewsYear
}
}
}
"""
)
class GraphQLClient:
# Select your transport with a defined url endpoint
transport = AIOHTTPTransport(url="https://ackee.discours.io/")
# Create a GraphQL client using the defined transport
client = Client(transport=transport, fetch_schema_from_transport=True)
@staticmethod
def get_views_by_slug(slug):
# Execute the query on the transport
domains = GraphQLClient.client.execute(query_ackee_views)
print(domains)

View File

@ -17,7 +17,7 @@ from resolvers.auth import confirm_email_handler
from services.main import storages_init
from services.stat.reacted import ReactedStorage
from services.stat.topicstat import TopicStat
from services.stat.viewed import ViewedStorage
from services.stat.views import Stat
from services.zine.gittask import GitTask
from services.zine.shoutauthor import ShoutAuthorStorage
import_module("resolvers")
@ -31,8 +31,8 @@ middleware = [
async def start_up():
await redis.connect()
viewed_storage_task = asyncio.create_task(ViewedStorage.worker())
print(viewed_storage_task)
views_stat_task = asyncio.create_task(Stat.worker())
print(views_stat_task)
reacted_storage_task = asyncio.create_task(ReactedStorage.worker())
print(reacted_storage_task)
shout_author_task = asyncio.create_task(ShoutAuthorStorage.worker())

View File

@ -12,7 +12,7 @@ from orm.shout import Shout, ShoutTopic, ShoutReactionsFollower
from orm.user import User
from orm.topic import TopicFollower
from services.stat.reacted import ReactedStorage
from services.stat.viewed import ViewedStorage
from services.stat.views import Stat
OLD_DATE = "2016-03-05 22:22:00.350000"
ts = datetime.now()
@ -340,7 +340,7 @@ async def migrate(entry, storage):
raise Exception("[migration] content_item.ratings error: \n%r" % content_rating)
# shout views
await ViewedStorage.increment(shout_dict["slug"], amount=entry.get("views", 1))
await Stat.increment(shout_dict["slug"], amount=entry.get("views", 1))
# del shout_dict['ratings']
shout_dict["oid"] = entry.get("_id")
storage["shouts"]["by_oid"][entry["_id"]] = shout_dict

View File

@ -6,7 +6,6 @@ from orm.reaction import Reaction
from orm.shout import Shout
from orm.topic import Topic, TopicFollower
from orm.user import User, UserRating
from orm.viewed import ViewedByDay
__all__ = [
"User",
@ -19,8 +18,7 @@ __all__ = [
"TopicFollower",
"Notification",
"Reaction",
"UserRating",
"ViewedByDay"
"UserRating"
]
Base.metadata.create_all(engine)

View File

@ -1,12 +0,0 @@
from datetime import datetime
from sqlalchemy import Column, DateTime, ForeignKey, Integer
from base.orm import Base
class ViewedByDay(Base):
__tablename__ = "viewed_by_day"
id = None
shout = Column(ForeignKey("shout.slug"), primary_key=True)
day = Column(DateTime, primary_key=True, default=datetime.now)
value = Column(Integer)

View File

@ -13,7 +13,7 @@ from services.stat.reacted import ReactedStorage
async def get_reaction_stat(reaction_id):
return {
# "viewed": await ViewedStorage.get_reaction(reaction_id),
# "viewed": await Stat.get_reaction(reaction_id),
"reacted": len(await ReactedStorage.get_reaction(reaction_id)),
"rating": await ReactedStorage.get_reaction_rating(reaction_id),
"commented": len(await ReactedStorage.get_reaction_comments(reaction_id)),

View File

@ -9,7 +9,7 @@ from orm.topic import Topic, TopicFollower
from services.zine.topics import TopicStorage
from services.stat.reacted import ReactedStorage
from services.stat.topicstat import TopicStat
# from services.stat.viewed import ViewedStorage
from services.stat.views import Stat
async def get_topic_stat(slug):
@ -17,7 +17,7 @@ async def get_topic_stat(slug):
"shouts": len(TopicStat.shouts_by_topic.get(slug, {}).keys()),
"authors": len(TopicStat.authors_by_topic.get(slug, {}).keys()),
"followers": len(TopicStat.followers_by_topic.get(slug, {}).keys()),
# "viewed": await ViewedStorage.get_topic(slug),
"viewed": await Stat.get_topic(slug),
"reacted": len(await ReactedStorage.get_topic(slug)),
"commented": len(await ReactedStorage.get_topic_comments(slug)),
"rating": await ReactedStorage.get_topic_rating(slug)

View File

@ -191,49 +191,49 @@ type Mutation {
unfollow(what: FollowingEntity!, slug: String!): Result!
}
input MessagesBy {
interface By {
order: String
days: Int
stat: String
}
input MessagesBy implements By {
author: String
body: String
chat: String
days: Int
}
input AuthorsBy {
input AuthorsBy implements By{
lastSeen: DateTime
createdAt: DateTime
stat: String
slug: String
name: String
topic: String
}
input ShoutsBy {
input ShoutsBy implements By {
slug: String
title: String
body: String
topic: String
author: String
days: Int
layout: String
published: Boolean
visibility: String
stat: String
}
input ReactionBy {
input ReactionBy implements By {
shout: String
body: String
topic: String
author: String
days: Int
stat: String
}
################################### Query
type Query {
# inbox
loadChats(offset: Int, amount: Int): Result! # your chats
loadMessagesBy(by: MessagesBy!, amount: Int, offset: Int): Result!
loadMessagesBy(by: By & MessagesBy!, amount: Int, offset: Int): Result!
searchUsers(query: String!, amount: Int, offset: Int): Result!
# auth
@ -242,9 +242,9 @@ type Query {
signOut: AuthResult!
# zine
loadAuthorsBy(by: AuthorsBy, amount: Int, offset: Int): [Author]!
loadShoutsBy(by: ShoutsBy, amount: Int, offset: Int): [Shout]!
loadReactionsBy(by: ReactionBy!, amount: Int, limit: Int): [Reaction]!
loadAuthorsBy(by: By & AuthorsBy, amount: Int, offset: Int): [Author]!
loadShoutsBy(by: By & ShoutsBy, amount: Int, offset: Int): [Shout]!
loadReactionsBy(by: By & ReactionBy!, amount: Int, limit: Int): [Reaction]!
userFollowers(slug: String!): [Author]!
userFollowedAuthors(slug: String!): [Author]!
userFollowedTopics(slug: String!): [Topic]!

View File

@ -1,4 +1,4 @@
from services.stat.viewed import ViewedStorage
from services.stat.views import Stat
from services.stat.reacted import ReactedStorage
from services.auth.roles import RoleStorage
from services.auth.users import UserStorage
@ -10,7 +10,7 @@ from base.orm import local_session
async def storages_init():
with local_session() as session:
print('[main] initialize storages')
ViewedStorage.init(session)
await Stat.update()
ReactedStorage.init(session)
RoleStorage.init(session)
UserStorage.init(session)

View File

@ -2,6 +2,7 @@ import asyncio
from base.orm import local_session
from orm.reaction import ReactionKind, Reaction
from services.zine.topics import TopicStorage
from services.stat.views import Stat
def kind_to_rate(kind) -> int:
@ -35,8 +36,7 @@ class ReactedStorage:
@staticmethod
async def get_shout_stat(slug):
return {
# TODO: use ackee as datasource
"viewed": 0, # await ViewedStorage.get_shout(slug),
"viewed": await Stat.get_shout(slug),
"reacted": len(await ReactedStorage.get_shout(slug)),
"commented": len(await ReactedStorage.get_comments(slug)),
"rating": await ReactedStorage.get_rating(slug),

View File

@ -1,110 +0,0 @@
import asyncio
from datetime import datetime
from base.orm import local_session
from sqlalchemy.orm.attributes import flag_modified
from orm.shout import ShoutTopic
from orm.viewed import ViewedByDay
class ViewedStorage:
viewed = {"shouts": {}, "topics": {}, "reactions": {}}
this_day_views = {}
to_flush = []
period = 30 * 60 # sec
lock = asyncio.Lock()
@staticmethod
def init(session):
self = ViewedStorage
views = session.query(ViewedByDay).all()
for view in views:
shout = view.shout
topics = (
session.query(ShoutTopic.topic).filter(ShoutTopic.shout == shout).all()
)
value = view.value
if shout:
old_value = self.viewed["shouts"].get(shout, 0)
self.viewed["shouts"][shout] = old_value + value
for t in topics:
old_topic_value = self.viewed["topics"].get(t, 0)
self.viewed["topics"][t] = old_topic_value + value
if shout not in self.this_day_views:
self.this_day_views[shout] = view
this_day_view = self.this_day_views[shout]
if this_day_view.day < view.day:
self.this_day_views[shout] = view
print("[stat.viewed] %d shouts viewed" % len(self.viewed['shouts']))
@staticmethod
async def get_shout(shout_slug):
self = ViewedStorage
async with self.lock:
return self.viewed["shouts"].get(shout_slug, 0)
@staticmethod
async def get_topic(topic_slug):
self = ViewedStorage
async with self.lock:
return self.viewed["topics"].get(topic_slug, 0)
@staticmethod
async def get_reaction(reaction_id):
self = ViewedStorage
async with self.lock:
return self.viewed["reactions"].get(reaction_id, 0)
@staticmethod
async def increment(shout_slug, amount=1):
self = ViewedStorage
async with self.lock:
this_day_view = self.this_day_views.get(shout_slug)
day_start = datetime.now().replace(hour=0, minute=0, second=0)
if not this_day_view or this_day_view.day < day_start:
if this_day_view and getattr(this_day_view, "modified", False):
self.to_flush.append(this_day_view)
this_day_view = ViewedByDay.create(shout=shout_slug, value=1)
self.this_day_views[shout_slug] = this_day_view
else:
this_day_view.value = this_day_view.value + amount
this_day_view.modified = True
self.viewed["shouts"][shout_slug] = (self.viewed["shouts"].get(shout_slug, 0) + amount)
with local_session() as session:
topics = (
session.query(ShoutTopic.topic)
.where(ShoutTopic.shout == shout_slug)
.all()
)
for t in topics:
self.viewed["topics"][t] = self.viewed["topics"].get(t, 0) + amount
flag_modified(this_day_view, "value")
@staticmethod
async def flush_changes(session):
self = ViewedStorage
async with self.lock:
for view in self.this_day_views.values():
if getattr(view, "modified", False):
session.add(view)
flag_modified(view, "value")
view.modified = False
for view in self.to_flush:
session.add(view)
self.to_flush.clear()
session.commit()
@staticmethod
async def worker():
while True:
try:
with local_session() as session:
await ViewedStorage.flush_changes(session)
print("[stat.viewed] periodical flush")
except Exception as err:
print("[stat.viewed] : %s" % (err))
await asyncio.sleep(ViewedStorage.period)

99
services/stat/views.py Normal file
View File

@ -0,0 +1,99 @@
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport
import asyncio
from services.zine.topics import TopicStorage
query_ackee_views = gql(
"""
query getDomainsFacts {
domains {
statistics {
views {
id
count
}
pages {
id
count
created
}
}
facts {
activeVisitors
# averageViews
# averageDuration
viewsToday
viewsMonth
viewsYear
}
}
}
"""
)
class Stat:
lock = asyncio.Lock()
by_slugs = {}
by_topics = {}
period = 30 * 60 # 30 minutes
transport = AIOHTTPTransport(url="https://ackee.discours.io/")
client = Client(transport=transport, fetch_schema_from_transport=True)
@staticmethod
async def load_views():
# TODO: when the struture of paylod will be transparent
# TODO: perhaps ackee token getting here
self = Stat
async with self.lock:
domains = self.client.execute(query_ackee_views)
print("[stat.ackee] loaded domains")
print(domains)
print('\n\n# TODO: something here...\n\n')
@staticmethod
async def get_shout(shout_slug):
self = Stat
async with self.lock:
return self.by_slugs.get(shout_slug) or 0
@staticmethod
async def get_topic(topic_slug):
self = Stat
async with self.lock:
shouts = self.by_topics.get(topic_slug)
topic_views = 0
for v in shouts.values():
topic_views += v
return topic_views
@staticmethod
async def increment(shout_slug, amount=1):
self = Stat
async with self.lock:
self.by_slugs[shout_slug] = self.by_slugs.get(shout_slug) or 0
self.by_slugs[shout_slug] += amount
shout_topics = await TopicStorage.get_topics_by_slugs([shout_slug, ])
for t in shout_topics:
self.by_topics[t] = self.by_topics.get(t) or {}
self.by_topics[t][shout_slug] = self.by_topics[t].get(shout_slug) or 0
self.by_topics[t][shout_slug] += amount
@staticmethod
async def update():
self = Stat
async with self.lock:
self.load_views()
@staticmethod
async def worker():
while True:
try:
await Stat.update()
except Exception as err:
print("[stat.ackee] : %s" % (err))
print("[stat.ackee] renew period: %d minutes" % (Stat.period / 60))
await asyncio.sleep(Stat.period)