| 目录名 | 性质 | 核心职责 | 包含什么 | 提交Git? | 谁会修改 |
|---|---|---|---|---|---|
| core/ | 代码-框架层 | 提供底层能力,支撑整个系统运转 | workflow、context、llm、storage、cache、models | ✅ 是 | 架构师/框架开发者 |
| plugins/ | 代码-业务层 | 提供可插拔的业务功能扩展 | sources、processors、outputs三类插件 | ✅ 是 | 业务开发者(最常改) |
| quality/ | 代码-业务层 | 质量保证和优化建议系统 | evaluator、rules、optimizer | ✅ 是 | 业务开发者 |
| config/ | 配置文件 | 定义系统行为和业务规则 | workflows配置、数据源配置、prompt模板 | ✅ 是 | 运营人员/业务开发者 |
| data/ | 运行时数据 | 存储程序运行产生的所有数据 | raw、processed、published、cache | ❌ 否 | 程序自动生成 |
| scripts/ | 工具脚本 | 辅助开发和运维的便捷工具 | 运行脚本、验证工具 | ✅ 是 | 开发者 |
| tests/ | 测试代码 | 保证代码质量的单元测试和集成测试 | 各种test_*.py文件 | ✅ 是 | 开发者 |
通用内容聚合系统 – 详细系统设计文档
项目名称: Universal Content Pipeline
版本: v1.0
创建日期: 2024-01-15
目标: 开发一个通用、低耦合的Python框架,实现智能内容聚合、质量评估和自动发布
一、项目背景与目标
1.1 业务背景
- 痛点: 现有网站内容同质化严重,用户留存率低
- 机会: 中国就业环境严峻,副业需求旺盛
- 方向: 从海量信息中筛选出最精华的副业相关内容,每天推荐10篇文章
1.2 核心目标
- 高质量内容: 通过AI评估确保每篇文章质量达标(≥7.0分)
- 自动化运行: 支持本地和GitHub Action自动执行
- 可扩展性: 轻松添加新的数据源、处理器、输出格式
- 智能重试: 质量不达标时自动优化重试,最多3次
- 高性能: 支持100+数据源并发抓取,500+文章批量处理
1.3 技术要求
- Python 3.10+
- 模块化、低耦合、高内聚
- 支持配置驱动,无需改代码即可调整行为
- 完整的错误处理和日志记录
二、系统架构设计
2.1 整体架构图
┌─────────────────────────────────────────────────────────────┐
│ 用户层 │
│ python main.py run daily │ GitHub Action 定时触发 │
└────────────────────┬────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 工作流编排层 │
│ core/workflow.py (WorkflowEngine) │
│ - 读取配置 → 编排执行 → 重试循环 → 结果输出 │
└────────────────────┬────────────────────────────────────────┘
↓
┌────────────┴────────────┐
↓ ↓
┌──────────────┐ ┌──────────────┐
│ 数据采集层 │ │ 质量保证层 │
│ plugins/ │ │ quality/ │
│ sources/ │ │ evaluator │
└──────┬───────┘ └──────┬───────┘
↓ ↓
┌──────────────┐ ┌──────────────┐
│ 内容处理层 │ │ 优化建议层 │
│ plugins/ │ │ quality/ │
│ processors/ │ │ optimizer │
└──────┬───────┘ └──────────────┘
↓
┌─────────────────────────────────────────┐
│ 输出发布层 │
│ plugins/outputs/ │
└─────────────────────────────────────────┘
↓
┌─────────────────────────────────────────┐
│ 存储层 │
│ data/ (raw/processed/published) │
└─────────────────────────────────────────┘
2.2 目录结构设计
universal-content-pipeline/
├── core/ # 核心框架层
│ ├── __init__.py
│ ├── workflow.py # 工作流编排引擎
│ ├── context.py # 执行上下文
│ ├── llm.py # LLM客户端封装
│ ├── storage.py # 数据持久化
│ ├── cache.py # 缓存管理
│ ├── models.py # 领域模型定义
│ └── exceptions.py # 异常定义
│
├── plugins/ # 插件层
│ ├── sources/ # 数据源插件
│ │ ├── __init__.py
│ │ ├── base.py # 数据源基类
│ │ ├── github.py # GitHub Trending
│ │ ├── producthunt.py # Product Hunt
│ │ ├── hackernews.py # Hacker News
│ │ └── rss.py # RSS聚合
│ ├── processors/ # 处理器插件
│ │ ├── __init__.py
│ │ ├── base.py # 处理器基类
│ │ ├── dedup.py # 去重处理器
│ │ ├── translate.py # 翻译处理器
│ │ └── summarize.py # 摘要生成器
│ └── outputs/ # 输出插件
│ ├── __init__.py
│ ├── base.py # 输出基类
│ ├── vitepress.py # VitePress输出
│ └── json.py # JSON输出
│
├── quality/ # 质量保证层
│ ├── __init__.py
│ ├── evaluator.py # 质量评估器
│ ├── rules.py # 质量规则集合
│ └── optimizer.py # 优化建议生成器
│
├── config/ # 配置层
│ ├── workflows/ # 工作流配置
│ │ └── daily.yaml
│ ├── sources.yaml # 数据源配置
│ ├── quality.yaml # 质量规则配置
│ └── prompts/ # Prompt模板
│ ├── judge.txt
│ ├── summarize.txt
│ └── translate.txt
│
├── data/ # 数据层(不提交Git)
│ ├── raw/ # 原始数据
│ ├── processed/ # 处理后数据
│ ├── published/ # 发布内容
│ └── cache/ # 缓存数据
│
├── scripts/ # 工具脚本
│ ├── run.py # 本地运行脚本
│ └── validate.py # 配置验证脚本
│
├── tests/ # 测试
│ ├── test_core.py
│ ├── test_plugins.py
│ └── test_quality.py
│
├── .github/
│ └── workflows/
│ └── daily.yml # GitHub Action配置
│
├── main.py # 命令行入口
├── pyproject.toml # 项目配置
├── .env.example # 环境变量模板
├── .gitignore
└── README.md
三、核心模块详细设计
3.1 core/models.py – 领域模型
3.1.1 Article 文章模型
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Dict, Optional
@dataclass
class Article:
"""文章领域模型 - 所有模块操作的统一数据结构"""
# 基础字段
id: str # 唯一标识(格式: source_原始id)
source: str # 来源标识(如 'github', 'producthunt')
raw_data: dict # 原始数据(保留用于调试)
# 内容字段
title: str # 标题
content: str # 正文内容
url: str # 原文链接
published_at: datetime # 发布时间
# 元数据
keywords: List[str] = field(default_factory=list) # 关键词
metadata: Dict = field(default_factory=dict) # 其他元数据
# 处理状态
quality_score: Optional['QualityScore'] = None # 质量评分
processing_history: List[Dict] = field(default_factory=list) # 处理历史
def add_processing_record(self, processor_name: str, changes: Dict):
"""记录处理历史"""
self.processing_history.append({
'processor': processor_name,
'timestamp': datetime.now().isoformat(),
'changes': changes
})
3.1.2 QualityScore 质量评分模型
@dataclass
class QualityScore:
"""质量评分模型"""
total: float # 总分(0-10)
dimensions: Dict[str, float] # 各维度得分
feedback: List[str] # 评分反馈
passed: bool # 是否通过阈值
# 示例:
# {
# 'total': 7.5,
# 'dimensions': {'length': 8.0, 'freshness': 9.0, 'relevance': 7.0, 'llm_quality': 6.0},
# 'feedback': ['相关性较好', 'LLM判断质量略低'],
# 'passed': True
# }
3.1.3 Context 执行上下文
@dataclass
class RetryState:
"""重试状态"""
attempt: int = 0 # 当前第几次尝试(0开始)
max_attempts: int = 3 # 最大重试次数
history: List[Dict] = field(default_factory=list) # 历史记录
def can_retry(self) -> bool:
return self.attempt < self.max_attempts
def record_attempt(self, avg_score: float, reason: str, hints: Dict):
"""记录本次尝试"""
self.history.append({
'attempt': self.attempt,
'avg_score': avg_score,
'reason': reason,
'hints': hints,
'timestamp': datetime.now().isoformat()
})
self.attempt += 1
@dataclass
class Context:
"""工作流执行上下文 - 携带所有运行时状态"""
# 基础信息
workflow_name: str # 工作流名称
run_date: str # 运行日期(YYYY-MM-DD)
output_dir: str # 输出目录
# 重试相关
retry_state: RetryState = field(default_factory=RetryState)
optimization_hints: Dict = field(default_factory=dict) # 优化建议
# 配置
config: Dict = field(default_factory=dict) # 工作流配置
def should_retry(self, avg_score: float) -> bool:
"""判断是否需要重试"""
threshold = self.config.get('quality_threshold', 7.0)
if avg_score >= threshold:
return False # 达标不需要重试
if not self.retry_state.can_retry():
return False # 无重试机会
return True
3.1.4 OptimizationHints 优化建议模型
@dataclass
class OptimizationHints:
"""优化建议模型 - 传递给processors"""
should_retry: bool # 是否建议重试
failed_dimensions: List[str] # 失败的维度
processor_hints: Dict[str, Dict] # 给各processor的建议
# 示例:
# {
# 'should_retry': True,
# 'failed_dimensions': ['relevance', 'llm_quality'],
# 'processor_hints': {
# 'summarize': {
# 'issue': '相关性不足',
# 'suggestion': '强调副业相关性',
# 'emphasize_keywords': ['副业', '独立开发', '创业']
# },
# 'translate': {
# 'suggestion': '使用更准确的翻译'
# }
# }
# }
3.2 core/workflow.py – 工作流编排引擎
3.2.1 WorkflowEngine 类设计
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict
import yaml
import logging
class WorkflowEngine:
"""工作流编排引擎 - 核心执行器"""
def __init__(self, config_path: str, context: Context):
"""
初始化工作流引擎
Args:
config_path: 工作流配置文件路径(如 config/workflows/daily.yaml)
context: 执行上下文
"""
self.config = self._load_config(config_path)
self.context = context
self.context.config = self.config
self.logger = logging.getLogger(__name__)
def run(self) -> 'WorkflowResult':
"""
执行工作流 - 支持重试循环
Returns:
WorkflowResult: 包含最终文章列表和执行统计
"""
self.logger.info(f"🚀 开始执行工作流: {self.context.workflow_name}")
# 重试循环
while True:
attempt = self.context.retry_state.attempt + 1
self.logger.info(f"{'='*60}")
self.logger.info(f"🔄 第 {attempt} 次尝试")
self.logger.info(f"{'='*60}")
# 执行完整流程
articles = self._execute_pipeline()
# 计算平均分
avg_score = self._calculate_avg_score(articles)
threshold = self.config.get('quality_threshold', 7.0)
self.logger.info(f"\n📊 本轮结果: 文章={len(articles)}, 平均分={avg_score:.2f}, 阈值={threshold}")
# 判断是否需要重试
if not self.context.should_retry(avg_score):
if avg_score >= threshold:
self.logger.info("✅ 质量达标,流程结束!")
else:
self.logger.warning("⚠️ 质量未达标但无重试机会,使用当前结果")
# 输出最终结果
self._output_results(articles)
return WorkflowResult(articles=articles, stats=self._get_stats())
# 需要重试:生成优化建议
self.logger.info("❌ 质量未达标,准备重试...")
hints = self._generate_optimization_hints(articles)
self.context.optimization_hints = hints.processor_hints
self.context.retry_state.record_attempt(avg_score, "质量不达标", hints.processor_hints)
def _execute_pipeline(self) -> List[Article]:
"""执行完整流程"""
# 阶段1: 数据采集
self.logger.info("\n📥 [阶段1] 数据采集...")
raw_articles = self._run_sources()
# 阶段2: 内容处理
self.logger.info("\n🔧 [阶段2] 内容处理...")
processed_articles = self._run_processors(raw_articles)
# 阶段3: 质量评估
self.logger.info("\n⭐ [阶段3] 质量评估...")
scored_articles = self._run_quality_evaluation(processed_articles)
# 阶段4: 筛选Top N
self.logger.info("\n🎯 [阶段4] 筛选Top文章...")
top_articles = self._select_top_n(scored_articles)
return top_articles
def _run_sources(self) -> List[Article]:
"""
并发执行所有数据源
Returns:
所有数据源抓取的文章列表
"""
sources_config = self.config['stages'][0]['sources']
max_workers = self.config.get('concurrency', {}).get('sources', 10)
all_articles = []
source_instances = self._create_source_instances(sources_config)
# 并发执行
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_source = {
executor.submit(source.fetch, self.context): source
for source in source_instances
}
for future in as_completed(future_to_source):
source = future_to_source[future]
try:
articles = future.result(timeout=30) # 30秒超时
all_articles.extend(articles)
self.logger.info(f" ✓ {source.name}: {len(articles)} 条")
except Exception as e:
self.logger.error(f" ✗ {source.name} 失败: {e}")
# 保存原始数据
self._save_raw_data(all_articles)
self.logger.info(f" 总计: {len(all_articles)} 条")
return all_articles
def _run_processors(self, articles: List[Article]) -> List[Article]:
"""
依次执行处理器链
Args:
articles: 输入文章列表
Returns:
处理后的文章列表
"""
processors_config = self.config['stages'][1]['processors']
processor_instances = self._create_processor_instances(processors_config)
for processor in processor_instances:
self.logger.info(f" 执行: {processor.name}")
articles = processor.process(articles, self.context)
self.logger.info(f" 剩余: {len(articles)} 条")
# 保存处理后数据
self._save_processed_data(articles)
return articles
def _run_quality_evaluation(self, articles: List[Article]) -> List[Article]:
"""
质量评估
Args:
articles: 待评估文章列表
Returns:
带评分的文章列表
"""
from quality.evaluator import QualityEvaluator
evaluator = QualityEvaluator(self.config.get('quality_config'))
for i, article in enumerate(articles, 1):
score = evaluator.evaluate(article, self.context)
article.quality_score = score
self.logger.info(f" [{i}/{len(articles)}] {article.title[:30]}... 得分: {score.total:.1f}")
return articles
def _select_top_n(self, articles: List[Article]) -> List[Article]:
"""选择得分最高的N篇文章"""
top_n = self.config.get('output_config', {}).get('top_n', 10)
sorted_articles = sorted(
articles,
key=lambda x: x.quality_score.total,
reverse=True
)
selected = sorted_articles[:top_n]
self.logger.info(f" 选中前 {top_n} 篇")
return selected
def _generate_optimization_hints(self, articles: List[Article]) -> OptimizationHints:
"""生成优化建议"""
from quality.optimizer import QualityOptimizer
optimizer = QualityOptimizer()
evaluation_result = EvaluationResult(
articles=articles,
avg_score=self._calculate_avg_score(articles)
)
hints = optimizer.generate_hints(evaluation_result, self.context)
self.logger.info(f" 优化建议: {hints.processor_hints}")
return hints
def _output_results(self, articles: List[Article]):
"""输出最终结果"""
outputs_config = self.config['stages'][-1]['outputs']
output_instances = self._create_output_instances(outputs_config)
for output in output_instances:
self.logger.info(f" 输出到: {output.name}")
output.write(articles, self.context)
3.3 plugins/sources/base.py – 数据源基类
from abc import ABC, abstractmethod
from typing import List
from core.models import Article, Context
class BaseSource(ABC):
"""数据源基类 - 所有数据源必须继承此类"""
def __init__(self, config: dict):
"""
初始化数据源
Args:
config: 数据源配置(来自 config/sources.yaml)
"""
self.config = config
self.name = self.__class__.__name__
@abstractmethod
def fetch(self, context: Context) -> List[Article]:
"""
抓取数据 - 子类必须实现
Args:
context: 执行上下文
Returns:
文章列表
Raises:
SourceException: 抓取失败时抛出
"""
pass
def _create_article_id(self, source_name: str, original_id: str) -> str:
"""生成统一的文章ID"""
return f"{source_name}_{original_id}"
3.3.1 GitHub 数据源实现示例
import requests
from datetime import datetime
from plugins.sources.base import BaseSource
from core.models import Article
from core.exceptions import SourceException
class GitHubSource(BaseSource):
"""GitHub Trending 数据源"""
def fetch(self, context: Context) -> List[Article]:
"""从GitHub抓取trending项目"""
api_url = "https://api.github.com/search/repositories"
# 计算时间范围(最近24小时)
since_date = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
params = {
'q': f'created:>{since_date}',
'sort': 'stars',
'order': 'desc',
'per_page': self.config.get('per_page', 30)
}
headers = {}
if token := os.getenv('GITHUB_TOKEN'):
headers['Authorization'] = f'token {token}'
try:
response = requests.get(api_url, params=params, headers=headers, timeout=10)
response.raise_for_status()
data = response.json()
except Exception as e:
raise SourceException(f"GitHub API调用失败: {e}")
articles = []
for repo in data.get('items', []):
article = Article(
id=self._create_article_id('github', str(repo['id'])),
source='github',
raw_data=repo,
title=repo['full_name'],
content=repo['description'] or '',
url=repo['html_url'],
published_at=datetime.fromisoformat(repo['created_at'].replace('Z', '+00:00')),
keywords=[],
metadata={
'stars': repo['stargazers_count'],
'language': repo['language'],
'topics': repo.get('topics', [])
}
)
articles.append(article)
return articles
3.4 plugins/processors/base.py – 处理器基类
from abc import ABC, abstractmethod
from typing import List
from core.models import Article, Context
class BaseProcessor(ABC):
"""处理器基类 - 所有处理器必须继承此类"""
def __init__(self, config: dict = None):
"""
初始化处理器
Args:
config: 处理器配置
"""
self.config = config or {}
self.name = self.__class__.__name__
@abstractmethod
def process(self, articles: List[Article], context: Context) -> List[Article]:
"""
处理文章列表 - 子类必须实现
Args:
articles: 输入文章列表
context: 执行上下文(包含optimization_hints)
Returns:
处理后的文章列表
"""
pass
def _get_hints(self, context: Context) -> dict:
"""从上下文获取针对本处理器的优化建议"""
return context.optimization_hints.get(self.name, {})
3.4.1 去重处理器实现
from plugins.processors.base import BaseProcessor
class DedupProcessor(BaseProcessor):
"""去重处理器 - 基于URL和标题相似度"""
def process(self, articles: List[Article], context: Context) -> List[Article]:
"""去除重复文章"""
seen_urls = set()
seen_titles = set()
unique_articles = []
for article in articles:
# URL去重
if article.url in seen_urls:
continue
# 标题相似度去重(简化版:完全相同)
if article.title in seen_titles:
continue
seen_urls.add(article.url)
seen_titles.add(article.title)
unique_articles.append(article)
# 记录处理历史
article.add_processing_record('dedup', {'status': 'kept'})
removed_count = len(articles) - len(unique_articles)
if removed_count > 0:
context.logger.info(f" 去重移除: {removed_count} 条")
return unique_articles
3.4.2 翻译处理器实现
from plugins.processors.base import BaseProcessor
from core.llm import LLMClient
from core.cache import Cache
class TranslateProcessor(BaseProcessor):
"""翻译处理器 - 将英文内容翻译为中文"""
def __init__(self, config: dict = None):
super().__init__(config)
self.llm = LLMClient()
self.cache = Cache()
def process(self, articles: List[Article], context: Context) -> List[Article]:
"""翻译文章标题和内容"""
hints = self._get_hints(context)
# 读取prompt模板
with open('config/prompts/translate.txt', 'r', encoding='utf-8') as f:
prompt_template = f.read()
# 批量处理
batch_size = context.config.get('concurrency', {}).get('processors', 5)
for i in range(0, len(articles), batch_size):
batch = articles[i:i+batch_size]
self._translate_batch(batch, prompt_template, hints)
return articles
def _translate_batch(self, articles: List[Article], prompt_template: str, hints: dict):
"""批量翻译"""
for article in articles:
# 检查缓存
cache_key = f"translate_{article.id}"
cached = self.cache.get(cache_key)
if cached:
article.title = cached['title']
article.content = cached['content']
continue
# 根据hints调整prompt
if hints.get('be_more_accurate'):
prompt_template += "\n注意:使用更准确、专业的翻译。"
# 调用LLM
prompt = prompt_template.format(
title=article.title,
content=article.content
)
result = self.llm.chat(prompt, temperature=hints.get('temperature', 0.3))
translated = self._parse_result(result)
# 更新文章
article.title = translated['title']
article.content = translated['content']
# 缓存
self.cache.set(cache_key, translated)
# 记录
article.add_processing_record('translate', {
'original_title': article.raw_data.get('title'),
'translated_title': article.title
})
def _parse_result(self, result: str) -> dict:
"""解析LLM返回结果"""
# 假设LLM返回格式为:标题: xxx\n内容: xxx
lines = result.strip().split('\n', 1)
return {
'title': lines[0].replace('标题:', '').strip(),
'content': lines[1].replace('内容:', '').strip() if len(lines) > 1 else ''
}
3.5 quality/evaluator.py – 质量评估器
from typing import List
from core.models import Article, QualityScore, Context
from quality.rules import LengthRule, FreshnessRule, RelevanceRule, LLMQualityRule
class QualityEvaluator:
"""质量评估器 - 综合评估文章质量"""
def __init__(self, config: dict = None):
"""
初始化评估器
Args:
config: 质量配置(来自 config/quality.yaml)
"""
self.config = config or {}
self.rules = self._init_rules()
self.weights = self.config.get('weights', {
'length': 0.1,
'fresh
0