import json import redis.asyncio as aredis import asyncio from settings import REDIS_URL import logging logger = logging.getLogger("\t[services.redis]\t") logger.setLevel(logging.DEBUG) class RedisCache: def __init__(self, uri=REDIS_URL): self._uri: str = uri self.pubsub_channels = [] self._client = None async def connect(self): self._client = aredis.Redis.from_url(self._uri, decode_responses=True) async def disconnect(self): if self._client: await self._client.close() async def execute(self, command, *args, **kwargs): if self._client: try: logger.debug(command + " " + " ".join(args)) r = await self._client.execute_command(command, *args, **kwargs) return r except Exception as e: logger.error(f"{e}") return None async def subscribe(self, *channels): if self._client: async with self._client.pubsub() as pubsub: for channel in channels: await pubsub.subscribe(channel) self.pubsub_channels.append(channel) async def unsubscribe(self, *channels): if not self._client: return async with self._client.pubsub() as pubsub: for channel in channels: await pubsub.unsubscribe(channel) self.pubsub_channels.remove(channel) async def publish(self, channel, data): if not self._client: return await self._client.publish(channel, data) async def listen(self, pattern): if self._client: pubsub = self._client.pubsub() await pubsub.psubscribe(pattern) while True: message = await pubsub.get_message() if message and isinstance(message["data"], (str, bytes, bytearray)): logger.debug("pubsub got msg") try: yield json.loads(message["data"]), message.get("channel") except Exception as e: logger.error(f"{e}") await asyncio.sleep(1) redis = RedisCache() __all__ = ["redis"]