深夜的烂笔头
技术 16 次浏览 10 分钟阅读

通用内容聚合系统设计

目录名性质核心职责包含什么提交Git?谁会修改
core/代码-框架层提供底层能力,支撑整个系统运转workflow、context、llm、storage、cache、models✅ 是架构师/框架开发者
plugins/代码-业务层提供可插拔的业务功能扩展sources、processors、outputs三类插件✅ 是业务开发者(最常改)
quality/代码-业务层质量保证和优化建议系统evaluator、rules、optimizer✅ 是业务开发者
config/配置文件定义系统行为和业务规则workflows配置、数据源配置、prompt模板✅ 是运营人员/业务开发者
data/运行时数据存储程序运行产生的所有数据raw、processed、published、cache❌ 否程序自动生成
scripts/工具脚本辅助开发和运维的便捷工具运行脚本、验证工具✅ 是开发者
tests/测试代码保证代码质量的单元测试和集成测试各种test_*.py文件✅ 是开发者

通用内容聚合系统 – 详细系统设计文档

项目名称: Universal Content Pipeline
版本: v1.0
创建日期: 2024-01-15
目标: 开发一个通用、低耦合的Python框架,实现智能内容聚合、质量评估和自动发布


一、项目背景与目标

1.1 业务背景

  • 痛点: 现有网站内容同质化严重,用户留存率低
  • 机会: 中国就业环境严峻,副业需求旺盛
  • 方向: 从海量信息中筛选出最精华的副业相关内容,每天推荐10篇文章

1.2 核心目标

  1. 高质量内容: 通过AI评估确保每篇文章质量达标(≥7.0分)
  2. 自动化运行: 支持本地和GitHub Action自动执行
  3. 可扩展性: 轻松添加新的数据源、处理器、输出格式
  4. 智能重试: 质量不达标时自动优化重试,最多3次
  5. 高性能: 支持100+数据源并发抓取,500+文章批量处理

1.3 技术要求

  • Python 3.10+
  • 模块化、低耦合、高内聚
  • 支持配置驱动,无需改代码即可调整行为
  • 完整的错误处理和日志记录

二、系统架构设计

2.1 整体架构图

┌─────────────────────────────────────────────────────────────┐
│                         用户层                               │
│  python main.py run daily  │  GitHub Action 定时触发        │
└────────────────────┬────────────────────────────────────────┘
                     ↓
┌─────────────────────────────────────────────────────────────┐
│                      工作流编排层                             │
│              core/workflow.py (WorkflowEngine)              │
│  - 读取配置 → 编排执行 → 重试循环 → 结果输出                 │
└────────────────────┬────────────────────────────────────────┘
                     ↓
        ┌────────────┴────────────┐
        ↓                         ↓
┌──────────────┐          ┌──────────────┐
│  数据采集层   │          │  质量保证层   │
│  plugins/    │          │  quality/    │
│  sources/    │          │  evaluator   │
└──────┬───────┘          └──────┬───────┘
       ↓                         ↓
┌──────────────┐          ┌──────────────┐
│  内容处理层   │          │  优化建议层   │
│  plugins/    │          │  quality/    │
│  processors/ │          │  optimizer   │
└──────┬───────┘          └──────────────┘
       ↓
┌─────────────────────────────────────────┐
│            输出发布层                    │
│         plugins/outputs/                │
└─────────────────────────────────────────┘
       ↓
┌─────────────────────────────────────────┐
│            存储层                        │
│    data/ (raw/processed/published)      │
└─────────────────────────────────────────┘

2.2 目录结构设计

universal-content-pipeline/
├── core/                          # 核心框架层
│   ├── __init__.py
│   ├── workflow.py               # 工作流编排引擎
│   ├── context.py                # 执行上下文
│   ├── llm.py                    # LLM客户端封装
│   ├── storage.py                # 数据持久化
│   ├── cache.py                  # 缓存管理
│   ├── models.py                 # 领域模型定义
│   └── exceptions.py             # 异常定义
│
├── plugins/                       # 插件层
│   ├── sources/                  # 数据源插件
│   │   ├── __init__.py
│   │   ├── base.py               # 数据源基类
│   │   ├── github.py             # GitHub Trending
│   │   ├── producthunt.py        # Product Hunt
│   │   ├── hackernews.py         # Hacker News
│   │   └── rss.py                # RSS聚合
│   ├── processors/               # 处理器插件
│   │   ├── __init__.py
│   │   ├── base.py               # 处理器基类
│   │   ├── dedup.py              # 去重处理器
│   │   ├── translate.py          # 翻译处理器
│   │   └── summarize.py          # 摘要生成器
│   └── outputs/                  # 输出插件
│       ├── __init__.py
│       ├── base.py               # 输出基类
│       ├── vitepress.py          # VitePress输出
│       └── json.py               # JSON输出
│
├── quality/                       # 质量保证层
│   ├── __init__.py
│   ├── evaluator.py              # 质量评估器
│   ├── rules.py                  # 质量规则集合
│   └── optimizer.py              # 优化建议生成器
│
├── config/                        # 配置层
│   ├── workflows/                # 工作流配置
│   │   └── daily.yaml
│   ├── sources.yaml              # 数据源配置
│   ├── quality.yaml              # 质量规则配置
│   └── prompts/                  # Prompt模板
│       ├── judge.txt
│       ├── summarize.txt
│       └── translate.txt
│
├── data/                          # 数据层(不提交Git)
│   ├── raw/                      # 原始数据
│   ├── processed/                # 处理后数据
│   ├── published/                # 发布内容
│   └── cache/                    # 缓存数据
│
├── scripts/                       # 工具脚本
│   ├── run.py                    # 本地运行脚本
│   └── validate.py               # 配置验证脚本
│
├── tests/                         # 测试
│   ├── test_core.py
│   ├── test_plugins.py
│   └── test_quality.py
│
├── .github/
│   └── workflows/
│       └── daily.yml             # GitHub Action配置
│
├── main.py                        # 命令行入口
├── pyproject.toml                 # 项目配置
├── .env.example                   # 环境变量模板
├── .gitignore
└── README.md


三、核心模块详细设计

3.1 core/models.py – 领域模型

3.1.1 Article 文章模型

from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Dict, Optional

@dataclass
class Article:
    """文章领域模型 - 所有模块操作的统一数据结构"""
    
    # 基础字段
    id: str                              # 唯一标识(格式: source_原始id)
    source: str                          # 来源标识(如 'github', 'producthunt')
    raw_data: dict                       # 原始数据(保留用于调试)
    
    # 内容字段
    title: str                           # 标题
    content: str                         # 正文内容
    url: str                             # 原文链接
    published_at: datetime               # 发布时间
    
    # 元数据
    keywords: List[str] = field(default_factory=list)  # 关键词
    metadata: Dict = field(default_factory=dict)       # 其他元数据
    
    # 处理状态
    quality_score: Optional['QualityScore'] = None     # 质量评分
    processing_history: List[Dict] = field(default_factory=list)  # 处理历史
    
    def add_processing_record(self, processor_name: str, changes: Dict):
        """记录处理历史"""
        self.processing_history.append({
            'processor': processor_name,
            'timestamp': datetime.now().isoformat(),
            'changes': changes
        })

3.1.2 QualityScore 质量评分模型

@dataclass
class QualityScore:
    """质量评分模型"""
    
    total: float                         # 总分(0-10)
    dimensions: Dict[str, float]         # 各维度得分
    feedback: List[str]                  # 评分反馈
    passed: bool                         # 是否通过阈值
    
    # 示例:
    # {
    #   'total': 7.5,
    #   'dimensions': {'length': 8.0, 'freshness': 9.0, 'relevance': 7.0, 'llm_quality': 6.0},
    #   'feedback': ['相关性较好', 'LLM判断质量略低'],
    #   'passed': True
    # }

3.1.3 Context 执行上下文

@dataclass
class RetryState:
    """重试状态"""
    attempt: int = 0                     # 当前第几次尝试(0开始)
    max_attempts: int = 3                # 最大重试次数
    history: List[Dict] = field(default_factory=list)  # 历史记录
    
    def can_retry(self) -> bool:
        return self.attempt < self.max_attempts
    
    def record_attempt(self, avg_score: float, reason: str, hints: Dict):
        """记录本次尝试"""
        self.history.append({
            'attempt': self.attempt,
            'avg_score': avg_score,
            'reason': reason,
            'hints': hints,
            'timestamp': datetime.now().isoformat()
        })
        self.attempt += 1


@dataclass
class Context:
    """工作流执行上下文 - 携带所有运行时状态"""
    
    # 基础信息
    workflow_name: str                   # 工作流名称
    run_date: str                        # 运行日期(YYYY-MM-DD)
    output_dir: str                      # 输出目录
    
    # 重试相关
    retry_state: RetryState = field(default_factory=RetryState)
    optimization_hints: Dict = field(default_factory=dict)  # 优化建议
    
    # 配置
    config: Dict = field(default_factory=dict)  # 工作流配置
    
    def should_retry(self, avg_score: float) -> bool:
        """判断是否需要重试"""
        threshold = self.config.get('quality_threshold', 7.0)
        
        if avg_score >= threshold:
            return False  # 达标不需要重试
        
        if not self.retry_state.can_retry():
            return False  # 无重试机会
        
        return True

3.1.4 OptimizationHints 优化建议模型

@dataclass
class OptimizationHints:
    """优化建议模型 - 传递给processors"""
    
    should_retry: bool                   # 是否建议重试
    failed_dimensions: List[str]         # 失败的维度
    processor_hints: Dict[str, Dict]     # 给各processor的建议
    
    # 示例:
    # {
    #   'should_retry': True,
    #   'failed_dimensions': ['relevance', 'llm_quality'],
    #   'processor_hints': {
    #       'summarize': {
    #           'issue': '相关性不足',
    #           'suggestion': '强调副业相关性',
    #           'emphasize_keywords': ['副业', '独立开发', '创业']
    #       },
    #       'translate': {
    #           'suggestion': '使用更准确的翻译'
    #       }
    #   }
    # }


3.2 core/workflow.py – 工作流编排引擎

3.2.1 WorkflowEngine 类设计

from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict
import yaml
import logging

class WorkflowEngine:
    """工作流编排引擎 - 核心执行器"""
    
    def __init__(self, config_path: str, context: Context):
        """
        初始化工作流引擎
        
        Args:
            config_path: 工作流配置文件路径(如 config/workflows/daily.yaml)
            context: 执行上下文
        """
        self.config = self._load_config(config_path)
        self.context = context
        self.context.config = self.config
        self.logger = logging.getLogger(__name__)
        
    def run(self) -> 'WorkflowResult':
        """
        执行工作流 - 支持重试循环
        
        Returns:
            WorkflowResult: 包含最终文章列表和执行统计
        """
        self.logger.info(f"🚀 开始执行工作流: {self.context.workflow_name}")
        
        # 重试循环
        while True:
            attempt = self.context.retry_state.attempt + 1
            self.logger.info(f"{'='*60}")
            self.logger.info(f"🔄 第 {attempt} 次尝试")
            self.logger.info(f"{'='*60}")
            
            # 执行完整流程
            articles = self._execute_pipeline()
            
            # 计算平均分
            avg_score = self._calculate_avg_score(articles)
            threshold = self.config.get('quality_threshold', 7.0)
            
            self.logger.info(f"\n📊 本轮结果: 文章={len(articles)}, 平均分={avg_score:.2f}, 阈值={threshold}")
            
            # 判断是否需要重试
            if not self.context.should_retry(avg_score):
                if avg_score >= threshold:
                    self.logger.info("✅ 质量达标,流程结束!")
                else:
                    self.logger.warning("⚠️ 质量未达标但无重试机会,使用当前结果")
                
                # 输出最终结果
                self._output_results(articles)
                return WorkflowResult(articles=articles, stats=self._get_stats())
            
            # 需要重试:生成优化建议
            self.logger.info("❌ 质量未达标,准备重试...")
            hints = self._generate_optimization_hints(articles)
            self.context.optimization_hints = hints.processor_hints
            self.context.retry_state.record_attempt(avg_score, "质量不达标", hints.processor_hints)
    
    def _execute_pipeline(self) -> List[Article]:
        """执行完整流程"""
        # 阶段1: 数据采集
        self.logger.info("\n📥 [阶段1] 数据采集...")
        raw_articles = self._run_sources()
        
        # 阶段2: 内容处理
        self.logger.info("\n🔧 [阶段2] 内容处理...")
        processed_articles = self._run_processors(raw_articles)
        
        # 阶段3: 质量评估
        self.logger.info("\n⭐ [阶段3] 质量评估...")
        scored_articles = self._run_quality_evaluation(processed_articles)
        
        # 阶段4: 筛选Top N
        self.logger.info("\n🎯 [阶段4] 筛选Top文章...")
        top_articles = self._select_top_n(scored_articles)
        
        return top_articles
    
    def _run_sources(self) -> List[Article]:
        """
        并发执行所有数据源
        
        Returns:
            所有数据源抓取的文章列表
        """
        sources_config = self.config['stages'][0]['sources']
        max_workers = self.config.get('concurrency', {}).get('sources', 10)
        
        all_articles = []
        source_instances = self._create_source_instances(sources_config)
        
        # 并发执行
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_source = {
                executor.submit(source.fetch, self.context): source 
                for source in source_instances
            }
            
            for future in as_completed(future_to_source):
                source = future_to_source[future]
                try:
                    articles = future.result(timeout=30)  # 30秒超时
                    all_articles.extend(articles)
                    self.logger.info(f"  ✓ {source.name}: {len(articles)} 条")
                except Exception as e:
                    self.logger.error(f"  ✗ {source.name} 失败: {e}")
        
        # 保存原始数据
        self._save_raw_data(all_articles)
        
        self.logger.info(f"  总计: {len(all_articles)} 条")
        return all_articles
    
    def _run_processors(self, articles: List[Article]) -> List[Article]:
        """
        依次执行处理器链
        
        Args:
            articles: 输入文章列表
            
        Returns:
            处理后的文章列表
        """
        processors_config = self.config['stages'][1]['processors']
        processor_instances = self._create_processor_instances(processors_config)
        
        for processor in processor_instances:
            self.logger.info(f"  执行: {processor.name}")
            articles = processor.process(articles, self.context)
            self.logger.info(f"    剩余: {len(articles)} 条")
        
        # 保存处理后数据
        self._save_processed_data(articles)
        
        return articles
    
    def _run_quality_evaluation(self, articles: List[Article]) -> List[Article]:
        """
        质量评估
        
        Args:
            articles: 待评估文章列表
            
        Returns:
            带评分的文章列表
        """
        from quality.evaluator import QualityEvaluator
        
        evaluator = QualityEvaluator(self.config.get('quality_config'))
        
        for i, article in enumerate(articles, 1):
            score = evaluator.evaluate(article, self.context)
            article.quality_score = score
            self.logger.info(f"  [{i}/{len(articles)}] {article.title[:30]}... 得分: {score.total:.1f}")
        
        return articles
    
    def _select_top_n(self, articles: List[Article]) -> List[Article]:
        """选择得分最高的N篇文章"""
        top_n = self.config.get('output_config', {}).get('top_n', 10)
        
        sorted_articles = sorted(
            articles,
            key=lambda x: x.quality_score.total,
            reverse=True
        )
        
        selected = sorted_articles[:top_n]
        self.logger.info(f"  选中前 {top_n} 篇")
        return selected
    
    def _generate_optimization_hints(self, articles: List[Article]) -> OptimizationHints:
        """生成优化建议"""
        from quality.optimizer import QualityOptimizer
        
        optimizer = QualityOptimizer()
        evaluation_result = EvaluationResult(
            articles=articles,
            avg_score=self._calculate_avg_score(articles)
        )
        
        hints = optimizer.generate_hints(evaluation_result, self.context)
        self.logger.info(f"  优化建议: {hints.processor_hints}")
        return hints
    
    def _output_results(self, articles: List[Article]):
        """输出最终结果"""
        outputs_config = self.config['stages'][-1]['outputs']
        output_instances = self._create_output_instances(outputs_config)
        
        for output in output_instances:
            self.logger.info(f"  输出到: {output.name}")
            output.write(articles, self.context)


3.3 plugins/sources/base.py – 数据源基类

from abc import ABC, abstractmethod
from typing import List
from core.models import Article, Context

class BaseSource(ABC):
    """数据源基类 - 所有数据源必须继承此类"""
    
    def __init__(self, config: dict):
        """
        初始化数据源
        
        Args:
            config: 数据源配置(来自 config/sources.yaml)
        """
        self.config = config
        self.name = self.__class__.__name__
    
    @abstractmethod
    def fetch(self, context: Context) -> List[Article]:
        """
        抓取数据 - 子类必须实现
        
        Args:
            context: 执行上下文
            
        Returns:
            文章列表
            
        Raises:
            SourceException: 抓取失败时抛出
        """
        pass
    
    def _create_article_id(self, source_name: str, original_id: str) -> str:
        """生成统一的文章ID"""
        return f"{source_name}_{original_id}"

3.3.1 GitHub 数据源实现示例

import requests
from datetime import datetime
from plugins.sources.base import BaseSource
from core.models import Article
from core.exceptions import SourceException

class GitHubSource(BaseSource):
    """GitHub Trending 数据源"""
    
    def fetch(self, context: Context) -> List[Article]:
        """从GitHub抓取trending项目"""
        api_url = "https://api.github.com/search/repositories"
        
        # 计算时间范围(最近24小时)
        since_date = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
        
        params = {
            'q': f'created:>{since_date}',
            'sort': 'stars',
            'order': 'desc',
            'per_page': self.config.get('per_page', 30)
        }
        
        headers = {}
        if token := os.getenv('GITHUB_TOKEN'):
            headers['Authorization'] = f'token {token}'
        
        try:
            response = requests.get(api_url, params=params, headers=headers, timeout=10)
            response.raise_for_status()
            data = response.json()
        except Exception as e:
            raise SourceException(f"GitHub API调用失败: {e}")
        
        articles = []
        for repo in data.get('items', []):
            article = Article(
                id=self._create_article_id('github', str(repo['id'])),
                source='github',
                raw_data=repo,
                title=repo['full_name'],
                content=repo['description'] or '',
                url=repo['html_url'],
                published_at=datetime.fromisoformat(repo['created_at'].replace('Z', '+00:00')),
                keywords=[],
                metadata={
                    'stars': repo['stargazers_count'],
                    'language': repo['language'],
                    'topics': repo.get('topics', [])
                }
            )
            articles.append(article)
        
        return articles


3.4 plugins/processors/base.py – 处理器基类

from abc import ABC, abstractmethod
from typing import List
from core.models import Article, Context

class BaseProcessor(ABC):
    """处理器基类 - 所有处理器必须继承此类"""
    
    def __init__(self, config: dict = None):
        """
        初始化处理器
        
        Args:
            config: 处理器配置
        """
        self.config = config or {}
        self.name = self.__class__.__name__
    
    @abstractmethod
    def process(self, articles: List[Article], context: Context) -> List[Article]:
        """
        处理文章列表 - 子类必须实现
        
        Args:
            articles: 输入文章列表
            context: 执行上下文(包含optimization_hints)
            
        Returns:
            处理后的文章列表
        """
        pass
    
    def _get_hints(self, context: Context) -> dict:
        """从上下文获取针对本处理器的优化建议"""
        return context.optimization_hints.get(self.name, {})

3.4.1 去重处理器实现

from plugins.processors.base import BaseProcessor

class DedupProcessor(BaseProcessor):
    """去重处理器 - 基于URL和标题相似度"""
    
    def process(self, articles: List[Article], context: Context) -> List[Article]:
        """去除重复文章"""
        seen_urls = set()
        seen_titles = set()
        unique_articles = []
        
        for article in articles:
            # URL去重
            if article.url in seen_urls:
                continue
            
            # 标题相似度去重(简化版:完全相同)
            if article.title in seen_titles:
                continue
            
            seen_urls.add(article.url)
            seen_titles.add(article.title)
            unique_articles.append(article)
            
            # 记录处理历史
            article.add_processing_record('dedup', {'status': 'kept'})
        
        removed_count = len(articles) - len(unique_articles)
        if removed_count > 0:
            context.logger.info(f"    去重移除: {removed_count} 条")
        
        return unique_articles

3.4.2 翻译处理器实现

from plugins.processors.base import BaseProcessor
from core.llm import LLMClient
from core.cache import Cache

class TranslateProcessor(BaseProcessor):
    """翻译处理器 - 将英文内容翻译为中文"""
    
    def __init__(self, config: dict = None):
        super().__init__(config)
        self.llm = LLMClient()
        self.cache = Cache()
    
    def process(self, articles: List[Article], context: Context) -> List[Article]:
        """翻译文章标题和内容"""
        hints = self._get_hints(context)
        
        # 读取prompt模板
        with open('config/prompts/translate.txt', 'r', encoding='utf-8') as f:
            prompt_template = f.read()
        
        # 批量处理
        batch_size = context.config.get('concurrency', {}).get('processors', 5)
        
        for i in range(0, len(articles), batch_size):
            batch = articles[i:i+batch_size]
            self._translate_batch(batch, prompt_template, hints)
        
        return articles
    
    def _translate_batch(self, articles: List[Article], prompt_template: str, hints: dict):
        """批量翻译"""
        for article in articles:
            # 检查缓存
            cache_key = f"translate_{article.id}"
            cached = self.cache.get(cache_key)
            
            if cached:
                article.title = cached['title']
                article.content = cached['content']
                continue
            
            # 根据hints调整prompt
            if hints.get('be_more_accurate'):
                prompt_template += "\n注意:使用更准确、专业的翻译。"
            
            # 调用LLM
            prompt = prompt_template.format(
                title=article.title,
                content=article.content
            )
            
            result = self.llm.chat(prompt, temperature=hints.get('temperature', 0.3))
            translated = self._parse_result(result)
            
            # 更新文章
            article.title = translated['title']
            article.content = translated['content']
            
            # 缓存
            self.cache.set(cache_key, translated)
            
            # 记录
            article.add_processing_record('translate', {
                'original_title': article.raw_data.get('title'),
                'translated_title': article.title
            })
    
    def _parse_result(self, result: str) -> dict:
        """解析LLM返回结果"""
        # 假设LLM返回格式为:标题: xxx\n内容: xxx
        lines = result.strip().split('\n', 1)
        return {
            'title': lines[0].replace('标题:', '').strip(),
            'content': lines[1].replace('内容:', '').strip() if len(lines) > 1 else ''
        }


3.5 quality/evaluator.py – 质量评估器

from typing import List
from core.models import Article, QualityScore, Context
from quality.rules import LengthRule, FreshnessRule, RelevanceRule, LLMQualityRule

class QualityEvaluator:
    """质量评估器 - 综合评估文章质量"""
    
    def __init__(self, config: dict = None):
        """
        初始化评估器
        
        Args:
            config: 质量配置(来自 config/quality.yaml)
        """
        self.config = config or {}
        self.rules = self._init_rules()
        self.weights = self.config.get('weights', {
            'length': 0.1,
            'fresh

0

  1. 此文章尚无评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注