From d775e2c0d08336707ea6314013a0386dbe017b94 Mon Sep 17 00:00:00 2001 From: tonyrewin Date: Tue, 29 Nov 2022 14:51:06 +0300 Subject: [PATCH] up --- migration/__init__.py | 40 ++++-- migration/tables/comments.py | 213 ++++++++++++++++-------------- migration/tables/content_items.py | 155 +++++++++------------- orm/user.py | 38 +++--- 4 files changed, 225 insertions(+), 221 deletions(-) diff --git a/migration/__init__.py b/migration/__init__.py index 4a25931d..043b0972 100644 --- a/migration/__init__.py +++ b/migration/__init__.py @@ -141,6 +141,7 @@ async def shouts_handle(storage, args): async def comments_handle(storage): + print("[migration] comments") id_map = {} ignored_counter = 0 missed_shouts = {} @@ -280,15 +281,25 @@ def mongo_download(url): if not url: raise Exception("\n\nYou should set MONGODB_URL enviroment variable\n") print("[migration] mongodump " + url) - subprocess.call( - [ - "mongodump", - "--uri", - url + "/?authSource=admin", - "--forceTableScan", - ], - stderr=subprocess.STDOUT, - ) + for one in [ + "content_items", + "users", + "tags", + "categories", + "comments", + "remarks" + ]: + subprocess.call( + [ + "mongodump", + "--uri", + url + "/?authSource=admin", + "--forceTableScan", + "--db=discours", + "--collection=" + one + ], + stderr=subprocess.STDOUT, + ) def create_pgdump(): @@ -312,10 +323,19 @@ async def handle_auto(): create_pgdump() +async def handle_comments(): + # 1 load migrated users and shouts to storage + storage = None + await comments_handle(storage) + + async def main(): if len(sys.argv) > 1: init_tables() - await handle_auto() + if len(sys.argv) == 2: + await handle_auto() + elif "comments" in sys.srgv: + await handle_comments() else: print("[migration] usage: python server.py migrate") diff --git a/migration/tables/comments.py b/migration/tables/comments.py index 0ca72915..5350f00c 100644 --- a/migration/tables/comments.py +++ b/migration/tables/comments.py @@ -13,6 +13,84 @@ from orm.user import User ts = datetime.now(tz=timezone.utc) +def auto_followers(session, shout_dict, reaction_dict): + # creating shout's reactions following for reaction author + following1 = session.query( + ShoutReactionsFollower + ).where( + ShoutReactionsFollower.follower == reaction_dict["createdBy"] + ).filter( + ShoutReactionsFollower.shout == reaction_dict["shout"] + ).first() + if not following1: + following1 = ShoutReactionsFollower.create( + follower=reaction_dict["createdBy"], + shout=reaction_dict["shout"], + auto=True + ) + session.add(following1) + # creating topics followings for reaction author + for t in shout_dict["topics"]: + tf = session.query( + TopicFollower + ).where( + TopicFollower.follower == reaction_dict["createdBy"] + ).filter( + TopicFollower.topic == t + ).first() + if not tf: + topic_following = TopicFollower.create( + follower=reaction_dict["createdBy"], + topic=t, + auto=True + ) + session.add(topic_following) + + +def migrate_ratings(session, entry, reaction_dict): + for comment_rating_old in entry.get("ratings", []): + rater = ( + session.query(User) + .filter(User.oid == comment_rating_old["createdBy"]) + .first() + ) + re_reaction_dict = { + "shout": reaction_dict["shout"], + "replyTo": reaction_dict["id"], + "kind": ReactionKind.LIKE + if comment_rating_old["value"] > 0 + else ReactionKind.DISLIKE, + "createdBy": rater.slug if rater else "anonymous", + } + cts = comment_rating_old.get("createdAt") + if cts: + re_reaction_dict["createdAt"] = date_parse(cts) + try: + # creating reaction from old rating + rr = Reaction.create(**re_reaction_dict) + following2 = session.query( + ShoutReactionsFollower + ).where( + ShoutReactionsFollower.follower == re_reaction_dict['createdBy'] + ).filter( + ShoutReactionsFollower.shout == rr.shout + ).first() + if not following2: + following2 = ShoutReactionsFollower.create( + follower=re_reaction_dict['createdBy'], + shout=rr.shout, + auto=True + ) + session.add(following2) + session.add(rr) + # await ReactedStorage.react(rr) + + except Exception as e: + print("[migration] comment rating error: %r" % re_reaction_dict) + raise e + session.commit() + + async def migrate(entry, storage): """ { @@ -50,10 +128,9 @@ async def migrate(entry, storage): old_thread: String } """ + old_ts = entry.get("createdAt") reaction_dict = { - "createdAt": ( - ts if not entry.get("createdAt") else date_parse(entry.get("createdAt")) - ), + "createdAt": (ts if not old_ts else date_parse(old_ts)), "body": html2text(entry.get("body", "")), "oid": entry["_id"], } @@ -79,81 +156,11 @@ async def migrate(entry, storage): session.add(reaction) # await ReactedStorage.react(reaction) - # creating shout's reactions following for reaction author - following1 = session.query( - ShoutReactionsFollower - ).where( - ShoutReactionsFollower.follower == reaction_dict["createdBy"] - ).filter( - ShoutReactionsFollower.shout == reaction.shout - ).first() - if not following1: - following1 = ShoutReactionsFollower.create( - follower=reaction_dict["createdBy"], - shout=reaction.shout, - auto=True - ) - session.add(following1) + reaction_dict = reaction.dict() - # creating topics followings for reaction author - for t in shout_dict["topics"]: - tf = session.query( - TopicFollower - ).where( - TopicFollower.follower == reaction_dict["createdBy"] - ).filter( - TopicFollower.topic == t - ).first() - if not tf: - topic_following = TopicFollower.create( - follower=reaction_dict["createdBy"], - topic=t, - auto=True - ) - session.add(topic_following) + auto_followers(session, shout_dict, reaction_dict) - reaction_dict["id"] = reaction.id - for comment_rating_old in entry.get("ratings", []): - rater = ( - session.query(User) - .filter(User.oid == comment_rating_old["createdBy"]) - .first() - ) - re_reaction_dict = { - "shout": reaction_dict["shout"], - "replyTo": reaction.id, - "kind": ReactionKind.LIKE - if comment_rating_old["value"] > 0 - else ReactionKind.DISLIKE, - "createdBy": rater.slug if rater else "discours", - } - cts = comment_rating_old.get("createdAt") - if cts: - re_reaction_dict["createdAt"] = date_parse(cts) - try: - # creating reaction from old rating - rr = Reaction.create(**re_reaction_dict) - following2 = session.query( - ShoutReactionsFollower - ).where( - ShoutReactionsFollower.follower == re_reaction_dict['createdBy'] - ).filter( - ShoutReactionsFollower.shout == rr.shout - ).first() - if not following2: - following2 = ShoutReactionsFollower.create( - follower=re_reaction_dict['createdBy'], - shout=rr.shout, - auto=True - ) - session.add(following2) - session.add(rr) - # await ReactedStorage.react(rr) - - except Exception as e: - print("[migration] comment rating error: %r" % re_reaction_dict) - raise e - session.commit() + migrate_ratings(session, shout_dict, reaction_dict) else: print( "[migration] error: cannot find shout for comment %r" @@ -162,26 +169,34 @@ async def migrate(entry, storage): return reaction -def migrate_2stage(rr, old_new_id): - reply_oid = rr.get("replyTo") - if not reply_oid: - return - new_id = old_new_id.get(rr.get("oid")) - if not new_id: - return - with local_session() as session: - comment = session.query(Reaction).filter(Reaction.id == new_id).first() - comment.replyTo = old_new_id.get(reply_oid) - session.add(comment) - - srf = session.query(ShoutReactionsFollower).where( - ShoutReactionsFollower.shout == comment.shout - ).filter( - ShoutReactionsFollower.follower == comment.createdBy - ).first() - if not srf: - srf = ShoutReactionsFollower.create(shout=comment.shout, follower=comment.createdBy, auto=True) - session.add(srf) - session.commit() - if not rr["body"]: - raise Exception(rr) +def migrate_2stage(old_comment, idmap): + if old_comment.get('body'): + new_id = idmap.get(old_comment.get('oid')) + if new_id: + new_replyto_id = None + old_replyto_id = old_comment.get("replyTo") + if old_replyto_id: + new_replyto_id = int(idmap.get(old_replyto_id, "0")) + with local_session() as session: + comment = session.query(Reaction).where(Reaction.id == new_id).first() + try: + if new_replyto_id: + new_reply = session.query(Reaction).where(Reaction.id == new_replyto_id).first() + if not new_reply: + print(new_replyto_id) + raise Exception("cannot find reply by id!") + comment.replyTo = new_reply.id + session.add(comment) + srf = session.query(ShoutReactionsFollower).where( + ShoutReactionsFollower.shout == comment.shout + ).filter( + ShoutReactionsFollower.follower == comment.createdBy + ).first() + if not srf: + srf = ShoutReactionsFollower.create( + shout=comment.shout, follower=comment.createdBy, auto=True + ) + session.add(srf) + session.commit() + except Exception: + raise Exception("cannot find a comment by oldid") diff --git a/migration/tables/content_items.py b/migration/tables/content_items.py index 8680d2b8..81dd40ff 100644 --- a/migration/tables/content_items.py +++ b/migration/tables/content_items.py @@ -75,7 +75,7 @@ def create_author_from_app(app): session.commit() userdata = user.dict() if not userdata: - userdata = User.default_user.dict() + userdata = User.default_user.dict() # anonymous except Exception as e: print(app) raise e @@ -96,84 +96,59 @@ async def create_shout(shout_dict, userslug): session.commit() +def get_userdata(entry, storage): + user_oid = entry.get("createdBy", "") + userdata = None + app = entry.get("application") + if app: + userdata = create_author_from_app(app) or {"slug": "anonymous"} + else: + userdata = storage["users"]["by_oid"].get(user_oid) or {"slug": "anonymous"} + userslug = userdata.get("slug") + return userslug, userdata, user_oid + + async def migrate(entry, storage): - # init, set title and layout + userslug, userdata, user_oid = get_userdata(entry, storage) + user = await get_user(userslug, userdata, storage, user_oid) r = { "layout": type2layout[entry["type"]], "title": entry["title"], - "authors": [], - "topics": set([]) + "authors": [userslug, ], + "slug": get_shout_slug(entry), + "cover": ( + "https://assets.discours.io/unsafe/1600x/" + + entry["thumborId"] if entry.get("thumborId") else entry.get("image", {}).get("url") + ), + "visibility": "public" if entry.get("published") else "authors", + "publishedAt": date_parse(entry.get("publishedAt")) if entry.get("published") else None, + "deletedAt": date_parse(entry.get("deletedAt")) if entry.get("deletedAt") else None, + "createdAt": date_parse(entry.get("createdAt", OLD_DATE)), + "updatedAt": date_parse(entry["updatedAt"]) if "updatedAt" in entry else ts, + "topics": await add_topics_follower(entry, storage, userslug), + "body": extract_html(entry) } - # author - users_by_oid = storage["users"]["by_oid"] - user_oid = entry.get("createdBy", "") - userdata = users_by_oid.get(user_oid) - user = None - if not userdata: - app = entry.get("application") - if app: - userdata = create_author_from_app(app) - if userdata: - userslug = userdata.get('slug') - else: - userslug = "anonymous" # bad old id slug was found - r["authors"] = [userslug, ] + # main topic patch + r['mainTopic'] = r['topics'][0] - # slug - slug = get_shout_slug(entry) - if slug: - r["slug"] = slug - else: - raise Exception - - # cover - c = "" - if entry.get("thumborId"): - c = "https://assets.discours.io/unsafe/1600x/" + entry["thumborId"] - else: - c = entry.get("image", {}).get("url") - if not c or "cloudinary" in c: - c = "" - r["cover"] = c - - # timestamps - r["createdAt"] = date_parse(entry.get("createdAt", OLD_DATE)) - r["updatedAt"] = date_parse(entry["updatedAt"]) if "updatedAt" in entry else ts - - # visibility + # published author auto-confirm if entry.get("published"): - r["publishedAt"] = date_parse(entry.get("publishedAt", OLD_DATE)) - r["visibility"] = "public" with local_session() as session: # update user.emailConfirmed if published author = session.query(User).where(User.slug == userslug).first() author.emailConfirmed = True session.add(author) session.commit() - else: - r["visibility"] = "authors" - if "deletedAt" in entry: - r["deletedAt"] = date_parse(entry["deletedAt"]) - - # topics - r['topics'] = await add_topics_follower(entry, storage, userslug) - r['mainTopic'] = r['topics'][0] - - entry["topics"] = r["topics"] - entry["cover"] = r["cover"] - - # body - r["body"] = extract_html(entry) + # media media = extract_media(entry) - if media: - r["media"] = json.dumps(media, ensure_ascii=True) + r["media"] = json.dumps(media, ensure_ascii=True) if media else None + # ----------------------------------- copy shout_dict = r.copy() # user - user = await get_user(userslug, userdata, storage, user_oid) shout_dict["authors"] = [user, ] del shout_dict["topics"] try: @@ -197,7 +172,7 @@ async def migrate(entry, storage): shout_dict["oid"] = entry.get("_id", "") storage["shouts"]["by_oid"][entry["_id"]] = shout_dict - storage["shouts"]["by_slug"][slug] = shout_dict + storage["shouts"]["by_slug"][shout_dict["slug"]] = shout_dict return shout_dict @@ -342,40 +317,34 @@ async def content_ratings_to_reactions(entry, slug): session.query(User) .filter(User.oid == content_rating["createdBy"]) .first() + ) or User.default_user + cts = content_rating.get("createdAt") + reaction_dict = { + "createdAt": date_parse(cts) if cts else None, + "kind": ReactionKind.LIKE + if content_rating["value"] > 0 + else ReactionKind.DISLIKE, + "createdBy": rater.slug, + "shout": slug + } + reaction = ( + session.query(Reaction) + .filter(Reaction.shout == reaction_dict["shout"]) + .filter(Reaction.createdBy == reaction_dict["createdBy"]) + .filter(Reaction.kind == reaction_dict["kind"]) + .first() ) - reactedBy = ( - rater - if rater - else session.query(User).filter(User.slug == "noname").first() - ) - if rater: - reaction_dict = { - "kind": ReactionKind.LIKE - if content_rating["value"] > 0 - else ReactionKind.DISLIKE, - "createdBy": reactedBy.slug, - "shout": slug, - } - cts = content_rating.get("createdAt") - if cts: - reaction_dict["createdAt"] = date_parse(cts) - reaction = ( - session.query(Reaction) - .filter(Reaction.shout == reaction_dict["shout"]) - .filter(Reaction.createdBy == reaction_dict["createdBy"]) - .filter(Reaction.kind == reaction_dict["kind"]) - .first() - ) - if reaction: - k = ReactionKind.AGREE if content_rating["value"] > 0 else ReactionKind.DISAGREE - reaction_dict["kind"] = k - reaction.update(reaction_dict) - else: - rea = Reaction.create(**reaction_dict) - session.add(rea) - # await ReactedStorage.react(rea) - # shout_dict['ratings'].append(reaction_dict) + if reaction: + k = ReactionKind.AGREE if content_rating["value"] > 0 else ReactionKind.DISAGREE + reaction_dict["kind"] = k + reaction.update(reaction_dict) + session.add(reaction) + else: + rea = Reaction.create(**reaction_dict) + session.add(rea) + # await ReactedStorage.react(rea) + # shout_dict['ratings'].append(reaction_dict) session.commit() except Exception: - raise Exception("[migration] content_item.ratings error: \n%r" % content_rating) + print("[migration] content_item.ratings error: \n%r" % content_rating) diff --git a/orm/user.py b/orm/user.py index 75a2d748..5b3b8484 100644 --- a/orm/user.py +++ b/orm/user.py @@ -81,25 +81,25 @@ class User(Base): def init_table(): with local_session() as session: default = session.query(User).filter(User.slug == "anonymous").first() - if not default: - defaul_dict = { - "email": "noreply@discours.io", - "username": "noreply@discours.io", - "name": "Аноним", - "slug": "anonymous", - } - default = User.create(**defaul_dict) - session.add(default) - discours_dict = { - "email": "welcome@discours.io", - "username": "welcome@discours.io", - "name": "Дискурс", - "slug": "discours", - } - discours = User.create(**discours_dict) - session.add(discours) - session.commit() - User.default_user = default + if not default: + default_dict = { + "email": "noreply@discours.io", + "username": "noreply@discours.io", + "name": "Аноним", + "slug": "anonymous", + } + default = User.create(**default_dict) + session.add(default) + discours_dict = { + "email": "welcome@discours.io", + "username": "welcome@discours.io", + "name": "Дискурс", + "slug": "discours", + } + discours = User.create(**discours_dict) + session.add(discours) + session.commit() + User.default_user = default async def get_permission(self): scope = {}