feat/sv-searching-txtai #2

Closed
dufok wants to merge 44 commits from feat/sv-searching-txtai into dev
11 changed files with 1060 additions and 228 deletions

View File

@ -29,7 +29,16 @@ jobs:
if: github.ref == 'refs/heads/dev'
uses: dokku/github-action@master
with:
branch: 'dev'
branch: 'main'
force: true
git_remote_url: 'ssh://dokku@v2.discours.io:22/core'
ssh_private_key: ${{ secrets.SSH_PRIVATE_KEY }}
- name: Push to dokku for staging branch
if: github.ref == 'refs/heads/staging'
uses: dokku/github-action@master
with:
branch: 'dev'
git_remote_url: 'ssh://dokku@staging.discours.io:22/core'
ssh_private_key: ${{ secrets.SSH_PRIVATE_KEY }}
git_push_flags: '--force'

6
.gitignore vendored
View File

@ -128,6 +128,9 @@ dmypy.json
.idea
temp.*
# Debug
DEBUG.log
discours.key
discours.crt
discours.pem
@ -161,4 +164,5 @@ views.json
*.key
*.crt
*cache.json
.cursor
.cursor
.devcontainer/

48
main.py
View File

@ -17,7 +17,8 @@ from cache.revalidator import revalidation_manager
from services.exception import ExceptionHandlerMiddleware
from services.redis import redis
from services.schema import create_all_tables, resolvers
from services.search import search_service
#from services.search import search_service
from services.search import search_service, initialize_search_index
from services.viewed import ViewedStorage
from services.webhook import WebhookEndpoint, create_webhook_endpoint
from settings import DEV_SERVER_PID_FILE_NAME, MODE
@ -34,24 +35,67 @@ async def start():
f.write(str(os.getpid()))
print(f"[main] process started in {MODE} mode")
async def check_search_service():
"""Check if search service is available and log result"""
info = await search_service.info()
if info.get("status") in ["error", "unavailable"]:
print(f"[WARNING] Search service unavailable: {info.get('message', 'unknown reason')}")
else:
print(f"[INFO] Search service is available: {info}")
# indexing DB data
# async def indexing():
# from services.db import fetch_all_shouts
# all_shouts = await fetch_all_shouts()
# await initialize_search_index(all_shouts)
async def lifespan(_app):
try:
print("[lifespan] Starting application initialization")
create_all_tables()
await asyncio.gather(
redis.connect(),
precache_data(),
ViewedStorage.init(),
create_webhook_endpoint(),
search_service.info(),
check_search_service(),
start(),
revalidation_manager.start(),
)
print("[lifespan] Basic initialization complete")
# Add a delay before starting the intensive search indexing
print("[lifespan] Waiting for system stabilization before search indexing...")
await asyncio.sleep(10) # 10-second delay to let the system stabilize
# Start search indexing as a background task with lower priority
asyncio.create_task(initialize_search_index_background())
yield
finally:
print("[lifespan] Shutting down application services")
tasks = [redis.disconnect(), ViewedStorage.stop(), revalidation_manager.stop()]
await asyncio.gather(*tasks, return_exceptions=True)
print("[lifespan] Shutdown complete")
# Initialize search index in the background
async def initialize_search_index_background():
"""Run search indexing as a background task with low priority"""
try:
print("[search] Starting background search indexing process")
from services.db import fetch_all_shouts
# Get total count first (optional)
all_shouts = await fetch_all_shouts()
total_count = len(all_shouts) if all_shouts else 0
print(f"[search] Fetched {total_count} shouts for background indexing")
# Start the indexing process with the fetched shouts
print("[search] Beginning background search index initialization...")
await initialize_search_index(all_shouts)
print("[search] Background search index initialization complete")
except Exception as e:
print(f"[search] Error in background search indexing: {str(e)}")
# Создаем экземпляр GraphQL
graphql_app = GraphQL(schema, debug=True)

View File

@ -71,6 +71,34 @@ class ShoutAuthor(Base):
class Shout(Base):
"""
Публикация в системе.
Attributes:
body (str)
slug (str)
cover (str) : "Cover image url"
cover_caption (str) : "Cover image alt caption"
lead (str)
title (str)
subtitle (str)
layout (str)
media (dict)
authors (list[Author])
topics (list[Topic])
reactions (list[Reaction])
lang (str)
version_of (int)
oid (str)
seo (str) : JSON
draft (int)
created_at (int)
updated_at (int)
published_at (int)
featured_at (int)
deleted_at (int)
created_by (int)
updated_by (int)
deleted_by (int)
community (int)
"""
__tablename__ = "shout"

View File

@ -13,6 +13,10 @@ starlette
gql
ariadne
granian
# NLP and search
httpx
orjson
pydantic
trafilatura

View File

@ -10,7 +10,7 @@ from orm.shout import Shout, ShoutAuthor, ShoutTopic
from orm.topic import Topic
from services.db import json_array_builder, json_builder, local_session
from services.schema import query
from services.search import search_text
from services.search import search_text, get_search_count
from services.viewed import ViewedStorage
from utils.logger import root_logger as logger
@ -187,12 +187,10 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
"""
shouts = []
try:
# logger.info(f"Starting get_shouts_with_links with limit={limit}, offset={offset}")
q = q.limit(limit).offset(offset)
with local_session() as session:
shouts_result = session.execute(q).all()
# logger.info(f"Got {len(shouts_result) if shouts_result else 0} shouts from query")
if not shouts_result:
logger.warning("No shouts found in query result")
@ -203,7 +201,6 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
shout = None
if hasattr(row, "Shout"):
shout = row.Shout
# logger.debug(f"Processing shout#{shout.id} at index {idx}")
if shout:
shout_id = int(f"{shout.id}")
shout_dict = shout.dict()
@ -231,20 +228,16 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
topics = None
if has_field(info, "topics") and hasattr(row, "topics"):
topics = orjson.loads(row.topics) if isinstance(row.topics, str) else row.topics
# logger.debug(f"Shout#{shout_id} topics: {topics}")
shout_dict["topics"] = topics
if has_field(info, "main_topic"):
main_topic = None
if hasattr(row, "main_topic"):
# logger.debug(f"Raw main_topic for shout#{shout_id}: {row.main_topic}")
main_topic = (
orjson.loads(row.main_topic) if isinstance(row.main_topic, str) else row.main_topic
)
# logger.debug(f"Parsed main_topic for shout#{shout_id}: {main_topic}")
if not main_topic and topics and len(topics) > 0:
# logger.info(f"No main_topic found for shout#{shout_id}, using first topic from list")
main_topic = {
"id": topics[0]["id"],
"title": topics[0]["title"],
@ -252,10 +245,8 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
"is_main": True,
}
elif not main_topic:
logger.warning(f"No main_topic and no topics found for shout#{shout_id}")
main_topic = {"id": 0, "title": "no topic", "slug": "notopic", "is_main": True}
shout_dict["main_topic"] = main_topic
# logger.debug(f"Final main_topic for shout#{shout_id}: {main_topic}")
if has_field(info, "authors") and hasattr(row, "authors"):
shout_dict["authors"] = (
@ -282,7 +273,6 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
logger.error(f"Fatal error in get_shouts_with_links: {e}", exc_info=True)
raise
finally:
logger.info(f"Returning {len(shouts)} shouts from get_shouts_with_links")
return shouts
@ -401,8 +391,17 @@ async def load_shouts_search(_, info, text, options):
"""
limit = options.get("limit", 10)
offset = options.get("offset", 0)
if isinstance(text, str) and len(text) > 2:
# Get search results with pagination
results = await search_text(text, limit, offset)
# If no results, return empty list
if not results:
logger.info(f"No search results found for '{text}'")
return []
# Extract IDs and scores
scores = {}
hits_ids = []
for sr in results:
@ -412,22 +411,42 @@ async def load_shouts_search(_, info, text, options):
scores[shout_id] = sr.get("score")
hits_ids.append(shout_id)
q = (
query_with_stat(info)
if has_field(info, "stat")
else select(Shout).filter(and_(Shout.published_at.is_not(None), Shout.deleted_at.is_(None)))
)
# Query DB for only the IDs in the current page
q = query_with_stat(info)
q = q.filter(Shout.id.in_(hits_ids))
q = apply_filters(q, options)
q = apply_sorting(q, options)
shouts = get_shouts_with_links(info, q, limit, offset)
q = apply_filters(q, options.get("filters", {}))
#
shouts = get_shouts_with_links(info, q, len(hits_ids), 0)
# Add scores from search results
for shout in shouts:
shout.score = scores[f"{shout.id}"]
shouts.sort(key=lambda x: x.score, reverse=True)
shout_id = str(shout['id'])
shout["score"] = scores.get(shout_id, 0)
# Re-sort by search score to maintain ranking
shouts.sort(key=lambda x: scores.get(str(x['id']), 0), reverse=True)
return shouts
return []
@query.field("get_search_results_count")
async def get_search_results_count(_, info, text):
"""
Returns the total count of search results for a search query.
:param _: Root query object (unused)
:param info: GraphQL context information
:param text: Search query text
:return: Total count of results
"""
if isinstance(text, str) and len(text) > 2:
count = await get_search_count(text)
return {"count": count}
return {"count": 0}
@query.field("load_shouts_unrated")
async def load_shouts_unrated(_, info, options):
"""

View File

@ -33,6 +33,7 @@ type Query {
get_shout(slug: String, shout_id: Int): Shout
load_shouts_by(options: LoadShoutsOptions): [Shout]
load_shouts_search(text: String!, options: LoadShoutsOptions): [SearchResult]
get_search_results_count(text: String!): CountResult!
load_shouts_bookmarked(options: LoadShoutsOptions): [Shout]
# rating

View File

@ -207,6 +207,7 @@ type CommonResult {
}
type SearchResult {
id: Int!
slug: String!
title: String!
cover: String
@ -274,3 +275,7 @@ type MyRateComment {
my_rate: ReactionKind
}
type CountResult {
count: Int!
}

34
server.py Normal file
View File

@ -0,0 +1,34 @@
import sys
from pathlib import Path
from granian.constants import Interfaces
from granian.log import LogLevels
from granian.server import Server
from settings import PORT
from utils.logger import root_logger as logger
if __name__ == "__main__":
logger.info("started")
try:
granian_instance = Server(
"main:app",
address="0.0.0.0",
port=PORT,
interface=Interfaces.ASGI,
workers=1,
websockets=False,
log_level=LogLevels.debug,
backlog=2048,
)
if "dev" in sys.argv:
logger.info("dev mode, building ssl context")
granian_instance.build_ssl_context(cert=Path("localhost.pem"), key=Path("localhost-key.pem"), password=None)
granian_instance.serve()
except Exception as error:
logger.error(error, exc_info=True)
raise
finally:
logger.info("stopped")

View File

@ -19,7 +19,7 @@ from sqlalchemy import (
inspect,
text,
)
from sqlalchemy.orm import Session, configure_mappers, declarative_base
from sqlalchemy.orm import Session, configure_mappers, declarative_base, joinedload
from sqlalchemy.sql.schema import Table
from settings import DB_URL
@ -259,3 +259,32 @@ def get_json_builder():
# Используем их в коде
json_builder, json_array_builder, json_cast = get_json_builder()
# Fetch all shouts, with authors preloaded
# This function is used for search indexing
async def fetch_all_shouts(session=None):
"""Fetch all published shouts for search indexing with authors preloaded"""
from orm.shout import Shout
close_session = False
if session is None:
session = local_session()
close_session = True
try:
# Fetch only published and non-deleted shouts with authors preloaded
query = session.query(Shout).options(
joinedload(Shout.authors)
).filter(
Shout.published_at.is_not(None),
Shout.deleted_at.is_(None)
)
shouts = query.all()
return shouts
except Exception as e:
logger.error(f"Error fetching shouts for search indexing: {e}")
return []
finally:
if close_session:
session.close()

File diff suppressed because it is too large Load Diff