worker-fix
All checks were successful
deploy / deploy (push) Successful in 1m12s

This commit is contained in:
Untone 2023-11-26 22:36:02 +03:00
parent bf3a8bcc0c
commit 7ea2a911ef
2 changed files with 14 additions and 22 deletions

View File

@ -1,4 +1,5 @@
import json import json
import asyncio
from orm.notification import Notification from orm.notification import Notification
from services.db import local_session from services.db import local_session
@ -19,9 +20,5 @@ async def handle_reaction(notification: dict[str, str | int]):
async def reactions_worker(): async def reactions_worker():
async for message in redis.listen("reaction"): async for message in redis.listen("reaction"):
message = await message
if message: if message:
msg_data = message.get("data") await handle_reaction(message)
if msg_data:
msg = json.loads(msg_data)
await handle_reaction(msg)

View File

@ -1,4 +1,5 @@
import asyncio import asyncio
import json
import redis.asyncio as aredis import redis.asyncio as aredis
from settings import REDIS_URL from settings import REDIS_URL
@ -47,25 +48,19 @@ class RedisCache:
return return
await self._client.publish(channel, data) await self._client.publish(channel, data)
async def lrange(self, key, start, stop):
if self._client:
print(f"[redis] LRANGE {key} {start} {stop}")
return await self._client.lrange(key, start, stop)
async def mget(self, key, *keys):
if self._client:
print(f"[redis] MGET {key} {keys}")
return await self._client.mget(key, *keys)
async def listen(self, channel): async def listen(self, channel):
if self._client:
pubsub = self._client.pubsub() pubsub = self._client.pubsub()
await pubsub.subscribe(channel) await pubsub.subscribe(channel)
try:
while True: while True:
message = pubsub.get_message() message = await pubsub.get_message()
if message: if message:
yield message yield json.loads(message['data'])
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
finally:
await pubsub.unsubscribe(channel)
redis = RedisCache() redis = RedisCache()