Skip to content

新闻缓存

新闻数据缓存

对于新闻数据来说,用户需要频繁进行访问,如果每次访问都查询Mysql数据库,无形中增加Mysql的查询压力,并且当数据量特别大时,Mysql的查询数据的耗时也较长。此时就需要将新闻数据进行缓存。

当用户首次获取新闻数据时,可以从mysql中查询,让后将数据缓存到redis,后续用户再访问时,就可以直接从reids中查询返回数据。

函数功能实现

cache目录下创建news_cache.py文件,使用redis的操作函数完成新闻数据缓存的功能

实现功能如下:

  • 缓存新闻详情
  • 获取缓存的新闻详情
  • 缓存新闻列表
  • 获取缓存的新闻列表
  • 缓存新闻分类列表
  • 获取缓存的新闻分类列表
from typing import Optional, List, Dict, Any
from models.news import News, Category
from cache_conf import set_cache, get_json_cache, delete_cache, delete_cache_pattern

# 缓存键前缀定义
NEWS_DETAIL_PREFIX = "news:detail:"
NEWS_LIST_PREFIX = "news:list:"
CATEGORIES_KEY = "news:categories"


async def cache_news_detail(news_id: int, news_data: Dict[str, Any], expire: int = 3600) -> bool:
    """
    缓存新闻详情

    Args:
        news_id: 新闻ID
        news_data: 新闻数据字典
        expire: 过期时间(秒)

    Returns:
        bool: 缓存成功返回True
    """
    key = f"{NEWS_DETAIL_PREFIX}{news_id}"
    return await set_cache(key, news_data, expire)


async def get_cached_news_detail(news_id: int) -> Optional[Dict[str, Any]]:
    """
    获取缓存的新闻详情

    Args:
        news_id: 新闻ID

    Returns:
        Optional[Dict[str, Any]]: 新闻数据,不存在则返回None
    """
    key = f"{NEWS_DETAIL_PREFIX}{news_id}"
    return await get_json_cache(key)


async def cache_news_list(category_id: Optional[int], page: int, size: int,
                    news_list: List[Dict[str, Any]], expire: int = 1800) -> bool:
    """
    缓存新闻列表

    Args:
        category_id: 分类ID,None表示全部分类
        page: 页码
        size: 每页大小
        news_list: 新闻列表数据
        expire: 过期时间(秒)

    Returns:
        bool: 缓存成功返回True
    """
    category_part = category_id if category_id is not None else "all"
    key = f"{NEWS_LIST_PREFIX}{category_part}:{page}:{size}"
    return await set_cache(key, news_list, expire)


async def get_cached_news_list(category_id: Optional[int], page: int, size: int) -> Optional[List[Dict[str, Any]]]:
    """
    获取缓存的新闻列表

    Args:
        category_id: 分类ID,None表示全部分类
        page: 页码
        size: 每页大小

    Returns:
        Optional[List[Dict[str, Any]]]: 新闻列表数据,不存在则返回None
    """
    category_part = category_id if category_id is not None else "all"
    key = f"{NEWS_LIST_PREFIX}{category_part}:{page}:{size}"
    return await get_json_cache(key)


async def cache_categories(categories_data: List[Dict[str, Any]], expire: int = 7200) -> bool:
    """
    缓存新闻分类列表

    Args:
        categories_data: 分类数据列表
        expire: 过期时间(秒)

    Returns:
        bool: 缓存成功返回True
    """
    return await set_cache(CATEGORIES_KEY, categories_data, expire)


async def get_cached_categories() -> Optional[List[Dict[str, Any]]]:
    """
    获取缓存的新闻分类列表

    Returns:
        Optional[List[Dict[str, Any]]]: 分类数据列表,不存在则返回None
    """
    return await get_json_cache(CATEGORIES_KEY)

image-20251025175037335

模型类增加to_dict方法

在进行reids数据缓存时,需要将查询到的数据转为dict字典形式,此时可以使用模型类封装一个to_dict方法,在使用模型类对象时可以直接将数据转为字典

from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import String, Integer, DateTime,Index,Text,ForeignKey
from datetime import datetime

from typing import Optional

class Base(DeclarativeBase):
    pass


class Category(Base):
    """
    新闻分类模型
    对应数据库中的 news_category 表
    """
    __tablename__ = 'news_category'

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, comment="分类ID")
    name: Mapped[str] = mapped_column(String(50), unique=True, nullable=False, comment="分类名称")
    sort_order: Mapped[int] = mapped_column(Integer, default=0, nullable=False, comment="排序顺序")
    created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now, comment="创建时间")
    updated_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now, onupdate=datetime.now,
                                                 comment="更新时间")

    def __repr__(self):
        return f"<Category(id={self.id}, name='{self.name}', sort_order={self.sort_order})>"

    def to_dict(self):
        """将Category对象转换为字典"""
        return {
            "id": self.id,
            "name": self.name,
            "sort_order": self.sort_order,
            "created_at": self.created_at.isoformat() if self.created_at else None,
            "updated_at": self.updated_at.isoformat() if self.updated_at else None
        }


class News(Base):
    """
    新闻模型
    对应数据库中的 news 表
    """
    __tablename__ = 'news'

    # 创建索引
    __table_args__ = (
        Index('fk_news_category_idx', 'category_id'),
        Index('idx_publish_time', 'publish_time'),
    )

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, comment="新闻ID")
    title: Mapped[str] = mapped_column(String(255), nullable=False, comment="新闻标题")
    description: Mapped[Optional[str]] = mapped_column(String(500), comment="新闻简介")
    content: Mapped[str] = mapped_column(Text, nullable=False, comment="新闻内容")
    image: Mapped[Optional[str]] = mapped_column(String(255), comment="封面图片URL")
    author: Mapped[Optional[str]] = mapped_column(String(50), comment="作者")
    category_id: Mapped[int] = mapped_column(Integer, ForeignKey('news_category.id'), nullable=False, comment="分类ID")
    views: Mapped[int] = mapped_column(Integer, default=0, nullable=False, comment="浏览量")
    publish_time: Mapped[datetime] = mapped_column(DateTime, default=datetime.now, comment="发布时间")
    created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now, comment="创建时间")
    updated_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now, onupdate=datetime.now,
                                                 comment="更新时间")

    def __repr__(self):
        return f"<News(id={self.id}, title='{self.title}', views={self.views})>"

    def to_dict(self):
        """将News对象转换为字典"""
        return {
            "id": self.id,
            "title": self.title,
            "description": self.description,
            "content": self.content,
            "image": self.image,
            "author": self.author,
            "category_id": self.category_id,
            "views": self.views,
            "publish_time": self.publish_time.isoformat() if self.publish_time else None,
            "created_at": self.created_at.isoformat() if self.created_at else None,
            "updated_at": self.updated_at.isoformat() if self.updated_at else None
        }

image-20251025175106475

修改新闻数据操作方法

curd目录下创建news_cache.py文件

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from typing import List, Optional

from models.news import News, Category


from cache.news_cache import (
    get_cached_news_detail, cache_news_detail,
    get_cached_news_list, cache_news_list,
    get_cached_categories, cache_categories
)


async def get_news_by_id(db: AsyncSession, news_id: int) -> Optional[News]:
    # 先尝试从缓存获取
    cached_news = await get_cached_news_detail(news_id)
    if cached_news:
        return News(**cached_news)

    # 缓存未命中,查询数据库
    result = await db.execute(select(News).where(News.id == news_id))
    news = result.scalar_one_or_none()

    # 如果查询到数据,存入缓存
    if news:
        await cache_news_detail(news_id, news.to_dict())

    return news


async def get_news_list(
        db: AsyncSession,
        category_id: Optional[int] = None,
        skip: int = 0,
        limit: int = 10
) -> List[News]:
    # 先尝试从缓存获取
    page = skip // limit + 1
    cached_list = await get_cached_news_list(category_id, page, limit)
    if cached_list:
        return [News(**item) for item in cached_list]

    # 缓存未命中,查询数据库
    query = select(News)
    if category_id:
        query = query.where(News.category_id == category_id)
    query = query.offset(skip).limit(limit)

    result = await db.execute(query)
    news_list = result.scalars().all()

    # 如果查询到数据,存入缓存
    if news_list:
        news_data = [news.to_dict() for news in news_list]
        await cache_news_list(category_id, page, limit, news_data)

    return news_list


async def get_categories(db: AsyncSession) -> List[Category]:
    # 先尝试从缓存获取
    cached_categories = await get_cached_categories()
    if cached_categories:
        return [Category(**item) for item in cached_categories]

    # 缓存未命中,查询数据库
    result = await db.execute(select(Category))
    categories = result.scalars().all()

    # 如果查询到数据,存入缓存
    if categories:
        categories_data = [cat.to_dict() for cat in categories]
        await cache_categories(categories_data)

    return categories

image-20251025175152309

验证是否存储成功

访问首页新闻

image-20251025175249613

查看reids数据

image-20251025175336204