0.0.2
All checks were successful
deploy / deploy (push) Successful in 1m21s

This commit is contained in:
Untone 2023-11-26 14:54:07 +03:00
parent 6e6de3818b
commit f4d4fd26d4
5 changed files with 25 additions and 23 deletions

View File

@ -1,4 +1,5 @@
[0.0.2]
- services: redis listen generating method
- dx: migrated to strawberry
- dx: added sentry integration
- dx: added mypy plugins

View File

@ -15,11 +15,13 @@
## Как разрабатывать локально
Установить
1 Читаем доки [strawberry](https://strawberry.rocks/docs/general/schema-basics)
2 Устанавливаем локальные хранилища
- Redis
- Postgres
Затем
3 Запуск локального сервера
```shell
poetry env use 3.12

View File

@ -7,7 +7,7 @@ from strawberry.asgi import GraphQL
from starlette.applications import Starlette
from services.rediscache import redis
from resolvers.listener import start as listener_start, stop as listener_stop
from resolvers.listener import reactions_worker
from resolvers.schema import schema
from settings import DEV_SERVER_PID_FILE_NAME, SENTRY_DSN, MODE
@ -22,7 +22,7 @@ async def start_up():
f.write(str(os.getpid()))
else:
await redis.connect()
notification_service_task = asyncio.create_task(listener_start())
notification_service_task = asyncio.create_task(reactions_worker())
print(f"[main] {notification_service_task}")
try:
@ -45,7 +45,6 @@ async def start_up():
async def shutdown():
listener_stop()
await redis.disconnect()

View File

@ -5,7 +5,7 @@ from services.db import local_session
from services.rediscache import redis
def handle_reaction(notification: dict[str, str | int]):
async def handle_reaction(notification: dict[str, str | int]):
"""создаеёт новое хранимое уведомление"""
try:
with local_session() as session:
@ -17,20 +17,8 @@ def handle_reaction(notification: dict[str, str | int]):
print(f"[listener.handle_reaction] error: {str(e)}")
def stop(pubsub):
pubsub.unsubscribe()
pubsub.close()
def start():
pubsub = redis.pubsub()
pubsub.subscribe("reaction")
try:
# Бесконечный цикл прослушивания
while True:
msg = pubsub.get_message()
handle_reaction(json.loads(msg["data"]))
except Exception:
pass
finally:
stop(pubsub)
async def reactions_worker():
async for message in redis.listen("reaction"):
msg = json.loads(message["data"])
if msg:
await handle_reaction(msg)

View File

@ -1,3 +1,5 @@
import asyncio
import redis.asyncio as aredis
from settings import REDIS_URL
@ -55,6 +57,16 @@ class RedisCache:
print(f"[redis] MGET {key} {keys}")
return await self._client.mget(key, *keys)
async def listen(self, channel):
pubsub = self._client.pubsub()
pubsub.subscribe(channel)
while True:
message = pubsub.get_message()
if message:
yield message
await asyncio.sleep(0.1)
redis = RedisCache()