Merge pull request #50 from Discours/prepaare-comments-optimization
more optimizations
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
import asyncio
|
||||
import time
|
||||
from base.orm import local_session
|
||||
from orm.reaction import ReactionKind, Reaction
|
||||
from services.zine.topics import TopicStorage
|
||||
@@ -175,6 +176,7 @@ class ReactedStorage:
|
||||
|
||||
@staticmethod
|
||||
async def recount_changed(session):
|
||||
start = time.time()
|
||||
self = ReactedStorage
|
||||
async with self.lock:
|
||||
sss = list(self.modified_shouts)
|
||||
@@ -191,6 +193,9 @@ class ReactedStorage:
|
||||
print("[stat.reacted] %d replies" % len(self.reacted["reactions"]))
|
||||
self.modified_shouts = set([])
|
||||
|
||||
end = time.time()
|
||||
print("[stat.reacted] recount_changed took %fs " % (end - start))
|
||||
|
||||
@staticmethod
|
||||
async def worker():
|
||||
while True:
|
||||
|
@@ -1,9 +1,9 @@
|
||||
import asyncio
|
||||
|
||||
import time
|
||||
from base.orm import local_session
|
||||
from orm.shout import Shout, ShoutTopic
|
||||
from orm.shout import Shout, ShoutTopic, ShoutAuthor
|
||||
from orm.topic import TopicFollower
|
||||
from services.zine.shoutauthor import ShoutAuthorStorage
|
||||
from sqlalchemy.sql.expression import select
|
||||
|
||||
|
||||
class TopicStat:
|
||||
@@ -17,22 +17,24 @@ class TopicStat:
|
||||
|
||||
@staticmethod
|
||||
async def load_stat(session):
|
||||
start = time.time()
|
||||
self = TopicStat
|
||||
shout_topics = session.query(ShoutTopic).all()
|
||||
shout_topics = session.query(ShoutTopic, Shout).join(Shout).all()
|
||||
all_shout_authors = session.query(ShoutAuthor).all()
|
||||
print("[stat.topics] %d links for shouts" % len(shout_topics))
|
||||
for shout_topic in shout_topics:
|
||||
for [shout_topic, shout] in shout_topics:
|
||||
tpc = shout_topic.topic
|
||||
# shouts by topics
|
||||
shout = session.query(Shout).where(Shout.slug == shout_topic.shout).first()
|
||||
# shout = session.query(Shout).where(Shout.slug == shout_topic.shout).first()
|
||||
self.shouts_by_topic[tpc] = self.shouts_by_topic.get(tpc, dict())
|
||||
self.shouts_by_topic[tpc][shout.slug] = shout
|
||||
|
||||
# authors by topics
|
||||
authors = await ShoutAuthorStorage.get_authors(shout.slug)
|
||||
shout_authors = filter(lambda asa: asa.shout == shout.slug, all_shout_authors)
|
||||
|
||||
self.authors_by_topic[tpc] = self.authors_by_topic.get(tpc, dict())
|
||||
for a in authors:
|
||||
[aslug, acaption] = a
|
||||
self.authors_by_topic[tpc][aslug] = acaption
|
||||
for sa in shout_authors:
|
||||
self.authors_by_topic[tpc][sa.shout] = sa.caption
|
||||
|
||||
self.followers_by_topic = {}
|
||||
followings = session.query(TopicFollower).all()
|
||||
@@ -43,6 +45,9 @@ class TopicStat:
|
||||
self.followers_by_topic[topic] = self.followers_by_topic.get(topic, dict())
|
||||
self.followers_by_topic[topic][userslug] = userslug
|
||||
|
||||
end = time.time()
|
||||
print("[stat.topics] load_stat took %fs " % (end - start))
|
||||
|
||||
@staticmethod
|
||||
async def get_shouts(topic):
|
||||
self = TopicStat
|
||||
@@ -52,6 +57,7 @@ class TopicStat:
|
||||
@staticmethod
|
||||
async def worker():
|
||||
self = TopicStat
|
||||
first_run = True
|
||||
while True:
|
||||
try:
|
||||
with local_session() as session:
|
||||
@@ -59,4 +65,9 @@ class TopicStat:
|
||||
await self.load_stat(session)
|
||||
except Exception as err:
|
||||
raise Exception(err)
|
||||
if first_run:
|
||||
# sleep for period + 1 min after first run
|
||||
# to distribute load on server by workers with the same period
|
||||
await asyncio.sleep(60)
|
||||
first_run = False
|
||||
await asyncio.sleep(self.period)
|
||||
|
@@ -1,4 +1,5 @@
|
||||
import asyncio
|
||||
import time
|
||||
from datetime import timedelta, timezone, datetime
|
||||
from gql import Client, gql
|
||||
from gql.transport.aiohttp import AIOHTTPTransport
|
||||
@@ -9,7 +10,6 @@ from orm.viewed import ViewedEntry
|
||||
from ssl import create_default_context
|
||||
from os import environ, path
|
||||
|
||||
|
||||
load_facts = gql("""
|
||||
query getDomains {
|
||||
domains {
|
||||
@@ -82,8 +82,9 @@ class ViewedStorage:
|
||||
self.disabled = True
|
||||
|
||||
@staticmethod
|
||||
async def update_pages(session):
|
||||
async def update_pages():
|
||||
""" query all the pages from ackee sorted by views count """
|
||||
start = time.time()
|
||||
self = ViewedStorage
|
||||
async with self.lock:
|
||||
try:
|
||||
@@ -104,6 +105,9 @@ class ViewedStorage:
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
end = time.time()
|
||||
print("[stat.viewed] update_pages took %fs " % (end - start))
|
||||
|
||||
@staticmethod
|
||||
async def get_facts():
|
||||
self = ViewedStorage
|
||||
@@ -176,9 +180,8 @@ class ViewedStorage:
|
||||
async with self.lock:
|
||||
while True:
|
||||
try:
|
||||
with local_session() as session:
|
||||
await self.update_pages(session)
|
||||
failed = 0
|
||||
await self.update_pages()
|
||||
failed = 0
|
||||
except Exception:
|
||||
failed += 1
|
||||
print("[stat.viewed] update failed #%d, wait 10 seconds" % failed)
|
||||
|
@@ -1,47 +0,0 @@
|
||||
import asyncio
|
||||
|
||||
from base.orm import local_session
|
||||
from orm.shout import ShoutAuthor, Shout
|
||||
|
||||
|
||||
class ShoutAuthorStorage:
|
||||
authors_by_shout = {}
|
||||
lock = asyncio.Lock()
|
||||
period = 30 * 60 # sec
|
||||
|
||||
@staticmethod
|
||||
async def load(session):
|
||||
self = ShoutAuthorStorage
|
||||
sas = session.query(ShoutAuthor).join(Shout).all()
|
||||
for sa in sas:
|
||||
self.authors_by_shout[sa.shout] = self.authors_by_shout.get(sa.shout, [])
|
||||
self.authors_by_shout[sa.shout].append([sa.user, sa.caption])
|
||||
print("[zine.authors] %d shouts indexed by authors" % len(self.authors_by_shout))
|
||||
|
||||
@staticmethod
|
||||
async def get_authors(shout):
|
||||
self = ShoutAuthorStorage
|
||||
async with self.lock:
|
||||
return self.authors_by_shout.get(shout, [])
|
||||
|
||||
@staticmethod
|
||||
async def get_author_caption(shout, author):
|
||||
self = ShoutAuthorStorage
|
||||
async with self.lock:
|
||||
for a in self.authors_by_shout.get(shout, []):
|
||||
if author in a:
|
||||
return a[1]
|
||||
return {"error": "author caption not found"}
|
||||
|
||||
@staticmethod
|
||||
async def worker():
|
||||
self = ShoutAuthorStorage
|
||||
while True:
|
||||
try:
|
||||
with local_session() as session:
|
||||
async with self.lock:
|
||||
await self.load(session)
|
||||
print("[zine.authors] index by authors was updated")
|
||||
except Exception as err:
|
||||
print("[zine.authors] error indexing by author: %s" % (err))
|
||||
await asyncio.sleep(self.period)
|
Reference in New Issue
Block a user