import asyncio import sentry_sdk from Craft.database.redis import Redis from types import SimpleNamespace class RedisCaching: @staticmethod def _create_cache_key(key1: str, key2: str) -> str: return f"CraftCached:{key1}:{key2}" @staticmethod def _validate_key(key: str) -> None: if not isinstance(key, str) or not key: raise ValueError("Invalid key") @staticmethod def _validate_value(value: SimpleNamespace) -> None: if not isinstance(value, SimpleNamespace) or not hasattr(value, 'emoji') or not hasattr(value, 'word'): raise ValueError("Invalid value") @staticmethod async def cache_get(key1: str, key2: str) -> dict: RedisCaching._validate_key(key1) RedisCaching._validate_key(key2) key = RedisCaching._create_cache_key(key1, key2) async with Redis() as redis: try: data = await redis.hgetall(key) if not data: key = RedisCaching._create_cache_key(key2, key1) data = await redis.hgetall(key) return data except Exception as e: sentry_sdk.capture_exception(e) raise e @staticmethod async def cache_set(key1: str, key2: str, value: SimpleNamespace) -> None: RedisCaching._validate_key(key1) RedisCaching._validate_key(key2) RedisCaching._validate_value(value) key = RedisCaching._create_cache_key(key1, key2) async with Redis() as redis: try: await redis.hset(key, mapping={"emoji": value.emoji, "word": value.word}) except Exception as e: sentry_sdk.capture_exception(e) raise e @staticmethod async def cache_delete(key1: str, key2: str) -> None: RedisCaching._validate_key(key1) RedisCaching._validate_key(key2) key = RedisCaching._create_cache_key(key1, key2) async with Redis() as redis: try: await redis.delete(key) except Exception as e: sentry_sdk.capture_exception(e) raise e @staticmethod async def cache_keys() -> list: async with Redis() as redis: try: return await redis.keys("CraftCached:*") except Exception as e: sentry_sdk.capture_exception(e) raise e @staticmethod async def cache_flush() -> None: async with Redis() as redis: try: await redis.flushdb() except Exception as e: sentry_sdk.capture_exception(e) raise e