Skip to content

缓存配置

redies缓存配置

在redis使用之前,需要先对reids进行配置

在项目目录下创建cache_conf文件,在文件中指定连接信息和封装redis操作函数

import redis.asyncio as redis
import json
from typing import Any, Optional

# Redis 配置
REDIS_HOST = "localhost"
REDIS_PORT = 6379
REDIS_DB = 0

# 创建 Redis 客户端实例
redis_client = redis.Redis(
    host=REDIS_HOST,
    port=REDIS_PORT,
    db=REDIS_DB,
    decode_responses=True
)


def get_redis_client():
    """获取 Redis 客户端实例"""
    return redis_client


async def set_cache(key: str, value: Any, expire: int = 3600) -> bool:
    """
    设置缓存

    Args:
        key: 缓存键
        value: 缓存值
        expire: 过期时间(秒),默认1小时

    Returns:
        bool: 设置成功返回True,否则返回False
    """
    try:
        if isinstance(value, (dict, list)):
            value = json.dumps(value)
        await redis_client.setex(key, expire, value)
        return True
    except Exception as e:
        print(f"设置缓存失败: {e}")
        return False


async def get_cache(key: str) -> Optional[str]:
    """
    获取缓存

    Args:
        key: 缓存键

    Returns:
        Optional[str]: 缓存值,不存在则返回None
    """
    try:
        return await redis_client.get(key)
    except Exception as e:
        print(f"获取缓存失败: {e}")
        return None


async def get_json_cache(key: str) -> Any:
    """
    获取JSON格式的缓存数据

    Args:
        key: 缓存键

    Returns:
        Any: 解析后的JSON数据,不存在或解析失败则返回None
    """
    try:
        data = await redis_client.get(key)
        if data:
            return json.loads(data)
        return None
    except Exception as e:
        print(f"获取JSON缓存失败: {e}")
        return None


async def delete_cache(key: str) -> bool:
    """
    删除缓存

    Args:
        key: 缓存键

    Returns:
        bool: 删除成功返回True,否则返回False
    """
    try:
        return bool(await redis_client.delete(key))
    except Exception as e:
        print(f"删除缓存失败: {e}")
        return False


async def delete_cache_pattern(pattern: str) -> int:
    """
    根据模式删除缓存

    Args:
        pattern: 缓存键模式,如 "news:list:*"

    Returns:
        int: 删除的缓存数量
    """
    try:
        keys = []
        async for key in redis_client.scan_iter(match=pattern):
            keys.append(key)
        if keys:
            return await redis_client.delete(*keys)
        return 0
    except Exception as e:
        print(f"批量删除缓存失败: {e}")
        return 0


async def exists_cache(key: str) -> bool:
    """
    检查缓存是否存在

    Args:
        key: 缓存键

    Returns:
        bool: 存在返回True,否则返回False
    """
    try:
        return bool(await redis_client.exists(key))
    except Exception as e:
        print(f"检查缓存存在性失败: {e}")
        return False

image-20251025173956800