fix-migration-replyto

This commit is contained in:
tonyrewin 2022-11-30 22:47:34 +03:00
parent 44bd4f6ede
commit 3ed1857f11
5 changed files with 292 additions and 307 deletions

View File

@ -96,16 +96,16 @@ async def shouts_handle(storage, args):
continue
# migrate
shout = await migrateShout(entry, storage)
if shout:
storage["shouts"]["by_oid"][entry["_id"]] = shout
storage["shouts"]["by_slug"][shout["slug"]] = shout
shout_dict = await migrateShout(entry, storage)
if shout_dict:
storage["shouts"]["by_oid"][entry["_id"]] = shout_dict
storage["shouts"]["by_slug"][shout_dict["slug"]] = shout_dict
# shouts.topics
if not shout["topics"]:
if not shout_dict["topics"]:
print("[migration] no topics!")
# with author
author: str = shout["authors"][0].dict()
author = shout_dict["authors"][0]
if author["slug"] == "discours":
discours_author += 1
if author["slug"] == "anonymous":
@ -114,18 +114,20 @@ async def shouts_handle(storage, args):
if entry.get("published"):
if "mdx" in args:
export_mdx(shout)
export_mdx(shout_dict)
pub_counter += 1
# print main counter
counter += 1
print('[migration] shouts_handle %d: %s @%s' % ((counter + 1), shout["slug"], author["slug"]))
print('[migration] shouts_handle %d: %s @%s' % (
(counter + 1), shout_dict["slug"], author["slug"]
))
b = bs4.BeautifulSoup(shout["body"], "html.parser")
texts = [shout["title"].lower().replace(r"[^а-яА-Яa-zA-Z]", "")]
b = bs4.BeautifulSoup(shout_dict["body"], "html.parser")
texts = [shout_dict["title"].lower().replace(r"[^а-яА-Яa-zA-Z]", "")]
texts = texts + b.findAll(text=True)
topics_dataset_bodies.append(" ".join([x.strip().lower() for x in texts]))
topics_dataset_tlist.append(shout["topics"])
topics_dataset_tlist.append(shout_dict["topics"])
else:
ignored += 1
@ -133,9 +135,7 @@ async def shouts_handle(storage, args):
# ', fmt='%s')
print("[migration] " + str(counter) + " content items were migrated")
print("[migration] " + str(ignored) + " content items were ignored")
print("[migration] " + str(pub_counter) + " have been published")
print("[migration] " + str(discours_author) + " authored by @discours")
print("[migration] " + str(anonymous_author) + " authored by @anonymous")

View File

@ -5,128 +5,50 @@ from dateutil.parser import parse as date_parse
from base.orm import local_session
from migration.html2text import html2text
from orm.reaction import Reaction, ReactionKind
from orm.shout import ShoutReactionsFollower, Shout
from orm.topic import TopicFollower, Topic
from orm.shout import ShoutReactionsFollower
from orm.topic import TopicFollower
from orm.user import User
from orm.shout import Shout
# from services.stat.reacted import ReactedStorage
ts = datetime.now(tz=timezone.utc)
async def migrate(entry, storage):
"""
{
"_id": "hdtwS8fSyFLxXCgSC",
"body": "<p>",
"contentItem": "mnK8KsJHPRi8DrybQ",
"createdBy": "bMFPuyNg6qAD2mhXe",
"thread": "01/",
"createdAt": "2016-04-19 04:33:53+00:00",
"ratings": [
{ "createdBy": "AqmRukvRiExNpAe8C", "value": 1 },
{ "createdBy": "YdE76Wth3yqymKEu5", "value": 1 }
],
"rating": 2,
"updatedAt": "2020-05-27 19:22:57.091000+00:00",
"updatedBy": "0"
}
->
type Reaction {
id: Int!
shout: Shout!
createdAt: DateTime!
createdBy: User!
updatedAt: DateTime
deletedAt: DateTime
deletedBy: User
range: String # full / 0:2340
kind: ReactionKind!
body: String
replyTo: Reaction
stat: Stat
old_id: String
old_thread: String
}
"""
reaction_dict = {
"createdAt": (
ts if not entry.get("createdAt") else date_parse(entry.get("createdAt"))
),
"body": html2text(entry.get("body", "")),
"oid": entry["_id"],
}
shout_oid = entry.get("contentItem")
if shout_oid not in storage["shouts"]["by_oid"]:
if len(storage["shouts"]["by_oid"]) > 0:
return shout_oid
else:
print("[migration] no shouts migrated yet")
raise Exception
return
else:
with local_session() as session:
author = session.query(User).filter(User.oid == entry["createdBy"]).first()
shout_dict = storage["shouts"]["by_oid"][shout_oid]
if shout_dict:
shout = session.query(
Shout
).where(Shout.slug == shout_dict["slug"]).one()
reaction_dict["shout"] = shout.id
reaction_dict["createdBy"] = author.id if author else 1
reaction_dict["kind"] = ReactionKind.COMMENT
# creating reaction from old comment
reaction = Reaction.create(**reaction_dict)
session.add(reaction)
def auto_followers(session, topics, reaction_dict):
# creating shout's reactions following for reaction author
following1 = session.query(
ShoutReactionsFollower
).join(
User
).join(
Shout
).where(
User.id == reaction_dict["createdBy"]
ShoutReactionsFollower.follower == reaction_dict["createdBy"]
).filter(
ShoutReactionsFollower.shout == reaction.shout
ShoutReactionsFollower.shout == reaction_dict["shout"]
).first()
if not following1:
following1 = ShoutReactionsFollower.create(
follower=reaction_dict["createdBy"],
shout=reaction.shout,
shout=reaction_dict["shout"],
auto=True
)
session.add(following1)
# creating topics followings for reaction author
for t in shout_dict["topics"]:
for t in topics:
tf = session.query(
TopicFollower
).join(
Topic
).where(
TopicFollower.follower == reaction_dict["createdBy"]
).filter(
Topic.slug == t
TopicFollower.topic == t['id']
).first()
if not tf:
topic = session.query(
Topic
).where(Topic.slug == t).one()
topic_following = TopicFollower.create(
follower=reaction_dict["createdBy"],
topic=topic.id,
topic=t['id'],
auto=True
)
session.add(topic_following)
reaction_dict["id"] = reaction.id
def migrate_ratings(session, entry, reaction_dict):
for comment_rating_old in entry.get("ratings", []):
rater = (
session.query(User)
@ -135,7 +57,7 @@ async def migrate(entry, storage):
)
re_reaction_dict = {
"shout": reaction_dict["shout"],
"replyTo": reaction.id,
"replyTo": reaction_dict["id"],
"kind": ReactionKind.LIKE
if comment_rating_old["value"] > 0
else ReactionKind.DISLIKE,
@ -162,43 +84,129 @@ async def migrate(entry, storage):
)
session.add(following2)
session.add(rr)
# await ReactedStorage.react(rr)
except Exception as e:
print("[migration] comment rating error: %r" % re_reaction_dict)
raise e
session.commit()
async def migrate(entry, storage):
"""
{
"_id": "hdtwS8fSyFLxXCgSC",
"body": "<p>",
"contentItem": "mnK8KsJHPRi8DrybQ",
"createdBy": "bMFPuyNg6qAD2mhXe",
"thread": "01/",
"createdAt": "2016-04-19 04:33:53+00:00",
"ratings": [
{ "createdBy": "AqmRukvRiExNpAe8C", "value": 1 },
{ "createdBy": "YdE76Wth3yqymKEu5", "value": 1 }
],
"rating": 2,
"updatedAt": "2020-05-27 19:22:57.091000+00:00",
"updatedBy": "0"
}
->
type Reaction {
id: Int!
shout: Shout!
createdAt: DateTime!
createdBy: User!
updatedAt: DateTime
deletedAt: DateTime
deletedBy: User
range: String # full / 0:2340
kind: ReactionKind!
body: String
replyTo: Reaction
stat: Stat
old_id: String
old_thread: String
}
"""
old_ts = entry.get("createdAt")
reaction_dict = {
"createdAt": (ts if not old_ts else date_parse(old_ts)),
"body": html2text(entry.get("body", "")),
"oid": entry["_id"],
}
shout_oid = entry.get("contentItem")
if shout_oid not in storage["shouts"]["by_oid"]:
if len(storage["shouts"]["by_oid"]) > 0:
return shout_oid
else:
print(
"[migration] error: cannot find shout for comment %r"
% reaction_dict
)
return reaction
def migrate_2stage(rr, old_new_id):
reply_oid = rr.get("replyTo")
if not reply_oid:
print("[migration] no shouts migrated yet")
raise Exception
return
new_id = old_new_id.get(rr.get("oid"))
if not new_id:
return
else:
stage = "started"
reaction = None
with local_session() as session:
comment = session.query(Reaction).filter(Reaction.id == new_id).first()
comment.replyTo = old_new_id.get(reply_oid)
session.add(comment)
author = session.query(User).filter(User.oid == entry["createdBy"]).first()
old_shout = storage["shouts"]["by_oid"].get(shout_oid)
if not old_shout:
raise Exception("no old shout in storage")
else:
stage = "author and old id found"
try:
shout = session.query(
Shout
).where(Shout.slug == old_shout["slug"]).one()
if shout:
reaction_dict["shout"] = shout.id
reaction_dict["createdBy"] = author.id if author else 1
reaction_dict["kind"] = ReactionKind.COMMENT
# creating reaction from old comment
reaction = Reaction.create(**reaction_dict)
session.add(reaction)
# session.commit()
stage = "new reaction commited"
reaction_dict = reaction.dict()
topics = [t.dict() for t in shout.topics]
auto_followers(session, topics, reaction_dict)
migrate_ratings(session, entry, reaction_dict)
return reaction
except Exception as e:
print(e)
print(reaction)
raise Exception(stage)
return
def migrate_2stage(old_comment, idmap):
if old_comment.get('body'):
new_id = idmap.get(old_comment.get('oid'))
if new_id:
new_replyto_id = None
old_replyto_id = old_comment.get("replyTo")
if old_replyto_id:
new_replyto_id = int(idmap.get(old_replyto_id, "0"))
with local_session() as session:
comment = session.query(Reaction).where(Reaction.id == new_id).first()
try:
if new_replyto_id:
new_reply = session.query(Reaction).where(Reaction.id == new_replyto_id).first()
if not new_reply:
print(new_replyto_id)
raise Exception("cannot find reply by id!")
comment.replyTo = new_reply.id
session.add(comment)
srf = session.query(ShoutReactionsFollower).where(
ShoutReactionsFollower.shout == comment.shout
).filter(
ShoutReactionsFollower.follower == comment.createdBy
).first()
if not srf:
srf = ShoutReactionsFollower.create(shout=comment.shout, follower=comment.createdBy, auto=True)
srf = ShoutReactionsFollower.create(
shout=comment.shout, follower=comment.createdBy, auto=True
)
session.add(srf)
session.commit()
if not rr["body"]:
raise Exception(rr)
except Exception:
raise Exception("cannot find a comment by oldid")

View File

@ -9,7 +9,9 @@ from orm.reaction import Reaction, ReactionKind
from orm.shout import Shout, ShoutTopic, ShoutReactionsFollower
from orm.user import User
from orm.topic import TopicFollower, Topic
# from services.stat.reacted import ReactedStorage
from services.stat.viewed import ViewedStorage
import re
OLD_DATE = "2016-03-05 22:22:00.350000"
ts = datetime.now(tz=timezone.utc)
@ -21,6 +23,8 @@ type2layout = {
"Image": "image",
}
anondict = {"slug": "anonymous", "id": 1, "name": "Аноним"}
def get_shout_slug(entry):
slug = entry.get("slug", "")
@ -29,6 +33,7 @@ def get_shout_slug(entry):
slug = friend.get("slug", "")
if slug:
break
slug = re.sub('[^0-9a-zA-Z]+', '-', slug)
return slug
@ -39,13 +44,8 @@ def create_author_from_app(app):
user = session.query(User).where(User.email == app['email']).first()
if not user:
name = app.get('name')
slug = (
translit(name, "ru", reversed=True)
.replace(" ", "-")
.replace("'", "")
.replace(".", "-")
.lower()
)
slug = translit(name, "ru", reversed=True).lower()
slug = re.sub('[^0-9a-zA-Z]+', '-', slug)
# check if nameslug is used
user = session.query(User).where(User.slug == slug).first()
# get slug from email
@ -74,122 +74,99 @@ def create_author_from_app(app):
session.commit()
userdata = user.dict()
if not userdata:
userdata = User.default_user.dict()
userdata = User.default_user.dict() # anonymous
except Exception as e:
print(app)
raise e
return userdata
async def create_shout(shout_dict, userslug):
async def create_shout(shout_dict, user):
s = Shout.create(**shout_dict)
with local_session() as session:
follower = session.query(User).where(User.slug == userslug).one()
srf = session.query(
ShoutReactionsFollower
).join(
User
).where(
srf = session.query(ShoutReactionsFollower).where(
ShoutReactionsFollower.shout == s.id
).filter(
User.slug == userslug
ShoutReactionsFollower.follower == user.id
).first()
if not srf:
srf = ShoutReactionsFollower.create(shout=s.id, follower=follower.id, auto=True)
srf = ShoutReactionsFollower.create(shout=s.id, follower=user.id, auto=True)
session.add(srf)
session.commit()
return s
def get_userdata(entry, storage):
user_oid = entry.get("createdBy", "")
userdata = None
app = entry.get("application")
if app:
userdata = create_author_from_app(app) or anondict
else:
userdata = storage["users"]["by_oid"].get(user_oid) or anondict
slug = userdata.get("slug")
slug = re.sub('[^0-9a-zA-Z]+', '-', slug)
userdata["slug"] = slug
return userdata, user_oid
async def migrate(entry, storage):
# init, set title and layout
userdata, user_oid = get_userdata(entry, storage)
user = await get_user(userdata, storage, user_oid)
r = {
"layout": type2layout[entry["type"]],
"title": entry["title"],
"authors": [],
"topics": set([])
"authors": [userdata["slug"], ],
"slug": get_shout_slug(entry),
"cover": (
"https://assets.discours.io/unsafe/1600x/" +
entry["thumborId"] if entry.get("thumborId") else entry.get("image", {}).get("url")
),
"visibility": "public" if entry.get("published") else "authors",
"publishedAt": date_parse(entry.get("publishedAt")) if entry.get("published") else None,
"deletedAt": date_parse(entry.get("deletedAt")) if entry.get("deletedAt") else None,
"createdAt": date_parse(entry.get("createdAt", OLD_DATE)),
"updatedAt": date_parse(entry["updatedAt"]) if "updatedAt" in entry else ts,
"topics": await add_topics_follower(entry, storage, user),
"body": extract_html(entry)
}
# author
users_by_oid = storage["users"]["by_oid"]
user_oid = entry.get("createdBy", "")
userdata = users_by_oid.get(user_oid)
user = None
if not userdata:
app = entry.get("application")
if app:
userdata = create_author_from_app(app)
if userdata:
userslug = userdata.get('slug')
else:
userslug = "anonymous" # bad old id slug was found
r["authors"] = [userslug, ]
# main topic patch
r['mainTopic'] = r['topics'][0]
# slug
slug = get_shout_slug(entry)
if slug:
r["slug"] = slug
else:
raise Exception
# cover
c = ""
if entry.get("thumborId"):
c = "https://assets.discours.io/unsafe/1600x/" + entry["thumborId"]
else:
c = entry.get("image", {}).get("url")
if not c or "cloudinary" in c:
c = ""
r["cover"] = c
# timestamps
r["createdAt"] = date_parse(entry.get("createdAt", OLD_DATE))
r["updatedAt"] = date_parse(entry["updatedAt"]) if "updatedAt" in entry else ts
# visibility
# published author auto-confirm
if entry.get("published"):
r["publishedAt"] = date_parse(entry.get("publishedAt", OLD_DATE))
r["visibility"] = "public"
with local_session() as session:
# update user.emailConfirmed if published
author = session.query(User).where(User.slug == userslug).first()
author = session.query(User).where(User.slug == userdata["slug"]).first()
author.emailConfirmed = True
session.add(author)
session.commit()
else:
r["visibility"] = "authors"
if "deletedAt" in entry:
r["deletedAt"] = date_parse(entry["deletedAt"])
# topics
r['topics'] = await add_topics_follower(entry, storage, userslug)
r['mainTopic'] = r['topics'][0]
entry["topics"] = r["topics"]
entry["cover"] = r["cover"]
# body
r["body"] = extract_html(entry)
# media
media = extract_media(entry)
if media:
r["media"] = json.dumps(media, ensure_ascii=True)
r["media"] = json.dumps(media, ensure_ascii=True) if media else None
# ----------------------------------- copy
shout_dict = r.copy()
# user
user = await get_user(userslug, userdata, storage, user_oid)
shout_dict["authors"] = [user, ]
del shout_dict["topics"]
try:
# save shout to db
await create_shout(shout_dict, userslug)
shout_dict["oid"] = entry.get("_id", "")
shout = await create_shout(shout_dict, user)
except IntegrityError as e:
print(e)
await resolve_create_shout(shout_dict, userslug)
print('[migration] create_shout integrity error', e)
shout = await resolve_create_shout(shout_dict, userdata["slug"])
except Exception as e:
raise Exception(e)
# udpate data
shout_dict = shout.dict()
shout_dict["authors"] = [user.dict(), ]
# shout topics aftermath
shout_dict["topics"] = await topics_aftermath(r, storage)
@ -200,13 +177,12 @@ async def migrate(entry, storage):
await ViewedStorage.increment(shout_dict["slug"], amount=entry.get("views", 1))
# del shout_dict['ratings']
shout_dict["oid"] = entry.get("_id", "")
storage["shouts"]["by_oid"][entry["_id"]] = shout_dict
storage["shouts"]["by_slug"][slug] = shout_dict
storage["shouts"]["by_slug"][shout_dict["slug"]] = shout_dict
return shout_dict
async def add_topics_follower(entry, storage, userslug):
async def add_topics_follower(entry, storage, user):
topics = set([])
category = entry.get("category")
topics_by_oid = storage["topics"]["by_oid"]
@ -218,29 +194,26 @@ async def add_topics_follower(entry, storage, userslug):
ttt = list(topics)
# add author as TopicFollower
with local_session() as session:
for tpc in topics:
for tpcslug in topics:
try:
topic = session.query(Topic).where(Topic.slug == tpc).one()
follower = session.query(User).where(User.slug == userslug).one()
tpc = session.query(Topic).where(Topic.slug == tpcslug).first()
tf = session.query(
TopicFollower
).where(
TopicFollower.follower == follower.id
TopicFollower.follower == user.id
).filter(
TopicFollower.topic == topic.id
TopicFollower.topic == tpc.id
).first()
if not tf:
tf = TopicFollower.create(
topic=topic.id,
follower=follower.id,
topic=tpc.id,
follower=user.id,
auto=True
)
session.add(tf)
session.commit()
except IntegrityError:
print('[migration.shout] hidden by topic ' + tpc)
print('[migration.shout] hidden by topic ' + tpc.slug)
# main topic
maintopic = storage["replacements"].get(topics_by_oid.get(category, {}).get("slug"))
if maintopic in ttt:
@ -249,19 +222,24 @@ async def add_topics_follower(entry, storage, userslug):
return ttt
async def get_user(userslug, userdata, storage, oid):
async def get_user(userdata, storage, oid):
user = None
with local_session() as session:
if not user and userslug:
user = session.query(User).filter(User.slug == userslug).first()
if not user and userdata:
uid = userdata.get("id")
if uid:
user = session.query(User).filter(User.id == uid).first()
elif userdata:
try:
userdata["slug"] = userdata["slug"].lower().strip().replace(" ", "-")
slug = userdata["slug"].lower().strip()
slug = re.sub('[^0-9a-zA-Z]+', '-', slug)
userdata["slug"] = slug
user = User.create(**userdata)
session.add(user)
session.commit()
except IntegrityError:
print("[migration] user error: " + userdata)
print("[migration] user creating with slug %s" % userdata["slug"])
print("[migration] from userdata: %r" % userdata)
raise Exception("[migration] cannot create user in content_items.get_user()")
userdata["id"] = user.id
userdata["createdAt"] = user.createdAt
storage["users"]["by_slug"][userdata["slug"]] = userdata
@ -303,6 +281,7 @@ async def resolve_create_shout(shout_dict, userslug):
print("[migration] something went wrong with shout: \n%r" % shout_dict)
raise Exception("")
session.commit()
return s
async def topics_aftermath(entry, storage):
@ -359,43 +338,35 @@ async def content_ratings_to_reactions(entry, slug):
session.query(User)
.filter(User.oid == content_rating["createdBy"])
.first()
)
reactedBy = (
rater
if rater
else session.query(User).filter(User.slug == "anonymous").first()
)
if rater:
shout = session.query(Shout).where(Shout.slug == slug).one()
) or User.default_user
shout = session.query(Shout).where(Shout.slug == slug).first()
cts = content_rating.get("createdAt")
reaction_dict = {
"createdAt": date_parse(cts) if cts else None,
"kind": ReactionKind.LIKE
if content_rating["value"] > 0
else ReactionKind.DISLIKE,
"createdBy": reactedBy.id,
"shout": shout.id,
"createdBy": rater.id,
"shout": shout.id
}
cts = content_rating.get("createdAt")
if cts:
reaction_dict["createdAt"] = date_parse(cts)
reaction = (
session.query(Reaction).filter(
Reaction.shout == reaction_dict["shout"]
).filter(
Reaction.createdBy == reaction_dict["createdBy"]
).filter(
Reaction.kind == reaction_dict["kind"]
).first()
session.query(Reaction)
.filter(Reaction.shout == reaction_dict["shout"])
.filter(Reaction.createdBy == reaction_dict["createdBy"])
.filter(Reaction.kind == reaction_dict["kind"])
.first()
)
if reaction:
k = ReactionKind.AGREE if content_rating["value"] > 0 else ReactionKind.DISAGREE
reaction_dict["kind"] = k
reaction.update(reaction_dict)
session.add(reaction)
else:
rea = Reaction.create(**reaction_dict)
session.add(rea)
# await ReactedStorage.react(rea)
# shout_dict['ratings'].append(reaction_dict)
session.commit()
except Exception:
raise Exception("[migration] content_item.ratings error: \n%r" % content_rating)
print("[migration] content_item.ratings error: \n%r" % content_rating)

View File

@ -1,7 +1,7 @@
from dateutil.parser import parse
from sqlalchemy.exc import IntegrityError
from bs4 import BeautifulSoup
import re
from base.orm import local_session
from orm.user import AuthorFollower, User, UserRating
@ -32,9 +32,9 @@ def migrate(entry):
user_dict["lastSeen"] = parse(entry["wasOnlineAt"])
if entry.get("profile"):
# slug
user_dict["slug"] = (
entry["profile"].get("path").lower().replace(" ", "-").strip()
)
slug = entry["profile"].get("path").lower()
slug = re.sub('[^0-9a-zA-Z]+', '-', slug).strip()
user_dict["slug"] = slug
bio = BeautifulSoup(entry.get("profile").get("bio") or "", features="lxml").text
if bio.startswith('<'):
print('[migration] bio! ' + bio)

View File

@ -371,6 +371,12 @@ type User {
oid: String
}
type Draft {
title: String
body: String
createdBy: Int
}
type Collab {
authors: [String]!
invites: [String]