from datetime import datetime from dateutil.parser import parse as date_parse from sqlalchemy.exc import IntegrityError from transliterate import translit from base.orm import local_session from migration.extract import prepare_html_body from orm.community import Community from orm.reaction import Reaction, ReactionKind from orm.shout import Shout, ShoutTopic, ShoutReactionsFollower from orm.user import User from orm.topic import TopicFollower from services.stat.reacted import ReactedStorage from services.stat.viewed import ViewedStorage OLD_DATE = "2016-03-05 22:22:00.350000" ts = datetime.now() type2layout = { "Article": "", "Literature": "literature", "Music": "audio", "Video": "video", "Image": "image", } def get_shout_slug(entry): slug = entry.get("slug", "") if not slug: for friend in entry.get("friendlySlugs", []): slug = friend.get("slug", "") if slug: break return slug def create_author_from_app(app): try: with local_session() as session: # check if email is used user = session.query(User).where(User.email == app['email']).first() if not user: name = app.get('name') slug = ( translit(name, "ru", reversed=True) .replace(" ", "-") .replace("'", "") .replace(".", "-") .lower() ) # check if nameslug is used user = session.query(User).where(User.slug == slug).first() # get slug from email if user: slug = app['email'].split('@')[0] user = session.query(User).where(User.slug == slug).first() # one more try if user: slug += '-author' user = session.query(User).where(User.slug == slug).first() # create user with application data if not user: userdata = { "username": app["email"], "email": app["email"], "name": app.get("name", ""), "bio": app.get("bio", ""), "emailConfirmed": False, "slug": slug, "createdAt": ts, "lastSeen": ts, } user = User.create(**userdata) session.add(user) session.commit() userdata = user.dict() if not userdata: userdata = User.default_user.dict() except Exception as e: print(app) raise e return userdata async def create_shout(shout_dict, userslug): s = Shout.create(**shout_dict) with local_session() as session: srf = session.query(ShoutReactionsFollower).where( ShoutReactionsFollower.shout == s.slug ).filter( ShoutReactionsFollower.follower == userslug ).first() if not srf: srf = ShoutReactionsFollower.create(shout=s.slug, follower=userslug, auto=True) session.add(srf) session.commit() async def migrate(entry, storage): # init, set title and layout r = { "layout": type2layout[entry["type"]], "title": entry["title"], "community": Community.default_community.id, "authors": [], "topics": set([]), # 'rating': 0, # 'ratings': [], "createdAt": [], } topics_by_oid = storage["topics"]["by_oid"] users_by_oid = storage["users"]["by_oid"] # author oid = entry.get("createdBy", entry.get("_id", entry.get("oid"))) userdata = users_by_oid.get(oid) user = None if not userdata: app = entry.get("application") if app: userdata = create_author_from_app(app) if userdata: userslug = userdata.get('slug') else: userslug = "anonymous" # bad old id slug was found r["authors"] = [userslug, ] # slug slug = get_shout_slug(entry) if slug: r["slug"] = slug else: raise Exception # cover c = "" if entry.get("thumborId"): c = "https://assets.discours.io/unsafe/1600x/" + entry["thumborId"] else: c = entry.get("image", {}).get("url") if not c or "cloudinary" in c: c = "" r["cover"] = c # timestamps r["createdAt"] = date_parse(entry.get("createdAt", OLD_DATE)) r["updatedAt"] = date_parse(entry["updatedAt"]) if "updatedAt" in entry else ts if entry.get("published"): r["publishedAt"] = date_parse(entry.get("publishedAt", OLD_DATE)) if "deletedAt" in entry: r["deletedAt"] = date_parse(entry["deletedAt"]) # topics category = entry["category"] mainTopic = topics_by_oid.get(category) if mainTopic: r["mainTopic"] = storage["replacements"].get(mainTopic["slug"], mainTopic["slug"]) topic_oids = [category, ] topic_oids.extend(entry.get("tags", [])) for oid in topic_oids: if oid in storage["topics"]["by_oid"]: r["topics"].add(storage["topics"]["by_oid"][oid]["slug"]) else: print("[migration] unknown old topic id: " + oid) r["topics"] = list(r["topics"]) # add author as TopicFollower with local_session() as session: for tpc in r['topics']: tf = session.query( TopicFollower ).where( TopicFollower.follower == userslug ).filter( TopicFollower.topic == tpc ).first() if not tf: tf = TopicFollower.create( topic=tpc, follower=userslug, auto=True ) session.add(tf) entry["topics"] = r["topics"] entry["cover"] = r["cover"] # body r["body"] = prepare_html_body(entry) # save shout to db s = object() shout_dict = r.copy() user = None del shout_dict["topics"] with local_session() as session: # c = session.query(Community).all().pop() if not user and userslug: user = session.query(User).filter(User.slug == userslug).first() if not user and userdata: try: userdata["slug"] = userdata["slug"].lower().strip().replace(" ", "-") user = User.create(**userdata) session.add(user) session.commit() except IntegrityError: print("[migration] user error: " + userdata) userdata["id"] = user.id userdata["createdAt"] = user.createdAt storage["users"]["by_slug"][userdata["slug"]] = userdata storage["users"]["by_oid"][entry["_id"]] = userdata if not user: raise Exception("could not get a user") shout_dict["authors"] = [user, ] try: await create_shout(shout_dict, userslug) except IntegrityError as e: with local_session() as session: s = session.query(Shout).filter(Shout.slug == shout_dict["slug"]).first() bump = False if s: if s.authors[0] != userslug: # create new with different slug shout_dict["slug"] += '-' + shout_dict["layout"] try: await create_shout(shout_dict, userslug) except IntegrityError as e: print(e) bump = True else: # update old for key in shout_dict: if key in s.__dict__: if s.__dict__[key] != shout_dict[key]: print( "[migration] shout already exists, but differs in %s" % key ) bump = True else: print("[migration] shout already exists, but lacks %s" % key) bump = True if bump: s.update(shout_dict) else: print("[migration] something went wrong with shout: \n%r" % shout_dict) raise e session.commit() except Exception as e: print(e) print(s) raise Exception # shout topics aftermath shout_dict["topics"] = [] for tpc in r["topics"]: oldslug = tpc newslug = storage["replacements"].get(oldslug, oldslug) if newslug: with local_session() as session: shout_topic_old = ( session.query(ShoutTopic) .filter(ShoutTopic.shout == shout_dict["slug"]) .filter(ShoutTopic.topic == oldslug) .first() ) if shout_topic_old: shout_topic_old.update({"slug": newslug}) else: shout_topic_new = ( session.query(ShoutTopic) .filter(ShoutTopic.shout == shout_dict["slug"]) .filter(ShoutTopic.topic == newslug) .first() ) if not shout_topic_new: try: ShoutTopic.create( **{"shout": shout_dict["slug"], "topic": newslug} ) except Exception: print("[migration] shout topic error: " + newslug) session.commit() if newslug not in shout_dict["topics"]: shout_dict["topics"].append(newslug) else: print("[migration] ignored topic slug: \n%r" % tpc["slug"]) # raise Exception # content_item ratings to reactions try: with local_session() as session: for content_rating in entry.get("ratings", []): rater = ( session.query(User) .filter(User.oid == content_rating["createdBy"]) .first() ) reactedBy = ( rater if rater else session.query(User).filter(User.slug == "noname").first() ) if rater: reaction_dict = { "kind": ReactionKind.LIKE if content_rating["value"] > 0 else ReactionKind.DISLIKE, "createdBy": reactedBy.slug, "shout": shout_dict["slug"], } cts = content_rating.get("createdAt") if cts: reaction_dict["createdAt"] = date_parse(cts) reaction = ( session.query(Reaction) .filter(Reaction.shout == reaction_dict["shout"]) .filter(Reaction.createdBy == reaction_dict["createdBy"]) .filter(Reaction.kind == reaction_dict["kind"]) .first() ) if reaction: k = ReactionKind.AGREE if content_rating["value"] > 0 else ReactionKind.DISAGREE reaction_dict["kind"] = k reaction.update(reaction_dict) else: rea = Reaction.create(**reaction_dict) session.add(rea) await ReactedStorage.react(rea) # shout_dict['ratings'].append(reaction_dict) session.commit() except Exception: raise Exception("[migration] content_item.ratings error: \n%r" % content_rating) # shout views await ViewedStorage.increment(shout_dict["slug"], amount=entry.get("views", 1)) # del shout_dict['ratings'] shout_dict["oid"] = entry.get("_id") storage["shouts"]["by_oid"][entry["_id"]] = shout_dict storage["shouts"]["by_slug"][slug] = shout_dict return shout_dict