技术博客撰写 Agentic RAG Agent 系统设计
目录
- 技术博客撰写 Agentic RAG Agent 系统设计
- 目录
- 1. 系统概述
- 2. 系统架构
- 3. Agent详细设计
- 3.1 意图理解Agent (Intent Understanding Agent)
- 3.2 协调Agent (Coordinator Agent)
- 3.3 规划Agent (Planning Agent)
- 3.4 检索Agent (Retrieval Agent)
- 3.5 写作Agent (Writing Agent)
- 3.6 风格化Agent (Style Agent)
- 3.7 引用Agent (Citation Agent)
- 3.8 图片生成Agent (Image Generation Agent)
- 3.9 审校Agent (Review Agent)
- 3.10 格式化Agent (Formatting Agent)
- 3.11 发布Agent (Publishing Agent)
- 4. 技术实现细节
- 5. 部署配置
- 6. 使用示例
- 7. 总结
1. 系统概述
本系统是一个面向博客作者的智能技术博客撰写平台,采用多Agent协作架构(Planner-Writer-Critic模式),为用户提供从主题输入到文章发布的全流程自动化写作服务。
1.1 目标用户与交互方式
- 目标用户:技术博客作者、技术写作者、内容创作者
- 交互方式:Web界面交互,用户无需编程背景,通过直观的图形界面完成所有操作
- 输入方式:用户仅需输入文章主题,系统自动完成后续所有写作流程
1.2 核心能力
- 智能意图识别:深度理解用户输入,识别真实写作意图和隐含需求
- 需求优化建议:基于用户输入提供写作方向优化和内容增强建议
- 智能结构规划:基于主题自动生成文章大纲和章节结构
- 多源信息检索:集成Web搜索、arXiv、Semantic Scholar等外部检索源
- 智能内容生成:基于RAG检索增强生成技术,产出高质量技术内容
- 自动引用管理:智能生成和管理学术引用,支持多种引用格式
- 风格化写作:支持多种写作风格控制和个性化调整
- AI图片生成:自动生成技术图表、架构图和插图
- 智能审校:提供语法检查、事实验证和质量评估
- 多格式输出:支持Markdown格式输出,兼容中英文双语
1.3 技术特色
- 智能意图理解:采用先进的NLP技术和上下文分析,准确识别用户的写作意图和潜在需求
- 多Agent协作:采用意图理解Agent、协调Agent、规划Agent、检索Agent、写作Agent等11个专业Agent分工协作
- RAG增强生成:结合向量检索和大语言模型,确保内容的准确性和时效性
- 多源数据融合:整合Web搜索、学术数据库和私有知识库的信息
- 质量控制机制:多层次质量检查,包括事实验证、逻辑一致性和技术准确性审核
- 实时信息获取:支持获取最新的技术动态和研究进展
2. 系统架构
2.1 核心架构图
graph TB
subgraph "用户交互层"
UI[Web界面/API]
REQ[用户需求]
end
subgraph "意图理解层"
INTENT[意图理解Agent]
NLP[自然语言处理模块]
CONTEXT[上下文分析模块]
OPTIM[需求优化模块]
end
subgraph "协调层"
COORD[协调Agent]
PLAN[规划Agent]
end
subgraph "检索层"
RETR[检索Agent]
WEB[Web检索模块]
ACAD[学术数据库模块]
KNOW[知识库模块]
end
subgraph "内容生成层"
WRITE[写作Agent]
STYLE[风格化Agent]
REF[引用Agent]
IMG[图片生成Agent]
end
subgraph "质量控制层"
QC[审校Agent]
FACT[事实检查模块]
QUAL[质量评估模块]
end
subgraph "输出层"
FMT[格式化Agent]
PUB[发布Agent]
end
subgraph "数据层"
VEC[向量数据库]
CACHE[缓存系统]
META[元数据存储]
INTENT_DB[意图知识库]
end
UI --> INTENT
INTENT --> NLP
INTENT --> CONTEXT
INTENT --> OPTIM
INTENT --> COORD
COORD --> PLAN
PLAN --> RETR
RETR --> WEB
RETR --> ACAD
RETR --> KNOW
RETR --> WRITE
WRITE --> STYLE
STYLE --> REF
REF --> IMG
IMG --> QC
QC --> FACT
QC --> QUAL
QC --> FMT
FMT --> PUB
RETR --> VEC
WRITE --> CACHE
QC --> META
INTENT --> INTENT_DB
3. Agent详细设计
3.1 意图理解Agent (Intent Understanding Agent)
职责: 用户意图识别与需求优化
核心能力:
- 自然语言意图识别与分类
- 隐含需求挖掘与分析
- 写作目标明确化
- 需求优化建议生成
- 用户交互优化
技术实现:
from llama_index.agent.openai import OpenAIAgent
from llama_index.core.tools import FunctionTool
from llama_index.core import VectorStoreIndex
from typing import Dict, Any, List
import re
import json
class IntentUnderstandingAgent:
def __init__(self, service_context, intent_knowledge_base):
self.service_context = service_context
self.intent_kb = intent_knowledge_base
# 意图分类模型
self.intent_categories = {
'tutorial': '教程类文章',
'analysis': '技术分析类',
'comparison': '对比评测类',
'introduction': '技术介绍类',
'best_practices': '最佳实践类',
'troubleshooting': '问题解决类',
'review': '技术评论类',
'news': '技术新闻类'
}
# 创建意图理解工具
self.intent_tools = self._create_intent_tools()
# 初始化意图理解Agent
self.agent = OpenAIAgent.from_tools(
tools=self.intent_tools,
llm=service_context.llm,
system_prompt="""
你是一个专业的意图理解专家,负责:
1. 深度分析用户的写作需求和真实意图
2. 识别用户输入中的隐含信息和潜在需求
3. 提供写作方向的优化建议
4. 明确文章的目标受众和写作风格
5. 生成结构化的需求分析报告
请始终以用户体验为中心,提供专业、准确的意图分析。
""",
verbose=True
)
def _create_intent_tools(self):
"""创建意图理解工具"""
def analyze_user_intent(user_input: str) -> str:
"""分析用户意图"""
try:
# 关键词提取
keywords = self._extract_keywords(user_input)
# 意图分类
intent_category = self._classify_intent(user_input)
# 技术领域识别
tech_domain = self._identify_tech_domain(user_input)
# 目标受众分析
target_audience = self._analyze_target_audience(user_input)
analysis_result = {
'keywords': keywords,
'intent_category': intent_category,
'tech_domain': tech_domain,
'target_audience': target_audience,
'confidence_score': 0.85
}
return f"意图分析结果: {json.dumps(analysis_result, ensure_ascii=False, indent=2)}"
except Exception as e:
return f"意图分析失败: {str(e)}"
def generate_optimization_suggestions(user_input: str, intent_analysis: Dict) -> str:
"""生成优化建议"""
try:
suggestions = []
# 基于意图类型的建议
if intent_analysis.get('intent_category') == 'tutorial':
suggestions.extend([
"建议添加实践案例和代码示例",
"考虑包含常见问题和解决方案",
"添加循序渐进的学习路径"
])
elif intent_analysis.get('intent_category') == 'analysis':
suggestions.extend([
"建议深入分析技术原理和机制",
"包含性能测试和基准对比",
"添加技术演进历史和趋势分析"
])
# 基于技术领域的建议
tech_domain = intent_analysis.get('tech_domain', '')
if 'AI' in tech_domain or 'machine learning' in tech_domain:
suggestions.extend([
"建议包含算法原理图解",
"添加数据集和模型性能指标",
"考虑包含实际应用场景"
])
return f"优化建议: {json.dumps(suggestions, ensure_ascii=False, indent=2)}"
except Exception as e:
return f"生成优化建议失败: {str(e)}"
def clarify_requirements(user_input: str) -> str:
"""需求澄清"""
try:
clarification_questions = []
# 检查是否缺少关键信息
if not self._has_clear_scope(user_input):
clarification_questions.append("文章的具体范围和深度如何?")
if not self._has_target_audience(user_input):
clarification_questions.append("目标读者是初学者、中级开发者还是专家?")
if not self._has_practical_focus(user_input):
clarification_questions.append("更偏重理论分析还是实践应用?")
return f"需求澄清问题: {json.dumps(clarification_questions, ensure_ascii=False, indent=2)}"
except Exception as e:
return f"需求澄清失败: {str(e)}"
def enhance_topic_scope(original_topic: str, intent_analysis: Dict) -> str:
"""增强主题范围"""
try:
enhanced_topics = [original_topic]
# 基于意图分析扩展主题
intent_category = intent_analysis.get('intent_category')
tech_domain = intent_analysis.get('tech_domain')
if intent_category == 'tutorial':
enhanced_topics.extend([
f"{original_topic} - 入门指南",
f"{original_topic} - 实践案例",
f"{original_topic} - 常见问题解答"
])
elif intent_category == 'analysis':
enhanced_topics.extend([
f"{original_topic} - 技术原理深度解析",
f"{original_topic} - 性能评估与优化",
f"{original_topic} - 发展趋势分析"
])
return f"增强主题建议: {json.dumps(enhanced_topics, ensure_ascii=False, indent=2)}"
except Exception as e:
return f"主题增强失败: {str(e)}"
return [
FunctionTool.from_defaults(
fn=analyze_user_intent,
name="analyze_user_intent",
description="分析用户输入的写作意图和需求"
),
FunctionTool.from_defaults(
fn=generate_optimization_suggestions,
name="generate_optimization_suggestions",
description="基于意图分析生成写作优化建议"
),
FunctionTool.from_defaults(
fn=clarify_requirements,
name="clarify_requirements",
description="识别需要澄清的需求并生成问题"
),
FunctionTool.from_defaults(
fn=enhance_topic_scope,
name="enhance_topic_scope",
description="基于意图分析增强和扩展主题范围"
)
]
def _extract_keywords(self, text: str) -> List[str]:
"""提取关键词"""
# 简化的关键词提取逻辑
tech_keywords = [
'AI', 'machine learning', 'deep learning', 'neural network',
'Python', 'JavaScript', 'React', 'Vue', 'Node.js',
'Docker', 'Kubernetes', 'microservices', 'API',
'database', 'SQL', 'NoSQL', 'MongoDB', 'Redis'
]
found_keywords = []
text_lower = text.lower()
for keyword in tech_keywords:
if keyword.lower() in text_lower:
found_keywords.append(keyword)
return found_keywords
def _classify_intent(self, text: str) -> str:
"""分类用户意图"""
text_lower = text.lower()
if any(word in text_lower for word in ['教程', 'tutorial', '如何', 'how to', '入门']):
return 'tutorial'
elif any(word in text_lower for word in ['分析', 'analysis', '原理', '机制']):
return 'analysis'
elif any(word in text_lower for word in ['对比', 'comparison', 'vs', '比较']):
return 'comparison'
elif any(word in text_lower for word in ['介绍', 'introduction', '什么是']):
return 'introduction'
elif any(word in text_lower for word in ['最佳实践', 'best practices', '经验']):
return 'best_practices'
else:
return 'general'
def _identify_tech_domain(self, text: str) -> str:
"""识别技术领域"""
domains = {
'AI/ML': ['AI', 'machine learning', 'deep learning', 'neural network'],
'Web开发': ['React', 'Vue', 'JavaScript', 'HTML', 'CSS', 'Node.js'],
'后端开发': ['Python', 'Java', 'Go', 'API', 'microservices'],
'数据库': ['SQL', 'NoSQL', 'MongoDB', 'Redis', 'database'],
'运维': ['Docker', 'Kubernetes', 'DevOps', 'CI/CD']
}
text_lower = text.lower()
for domain, keywords in domains.items():
if any(keyword.lower() in text_lower for keyword in keywords):
return domain
return 'general'
def _analyze_target_audience(self, text: str) -> str:
"""分析目标受众"""
text_lower = text.lower()
if any(word in text_lower for word in ['初学者', 'beginner', '入门', '新手']):
return 'beginner'
elif any(word in text_lower for word in ['高级', 'advanced', '专家', '深入']):
return 'advanced'
else:
return 'intermediate'
def _has_clear_scope(self, text: str) -> bool:
"""检查是否有明确的范围"""
scope_indicators = ['具体', '详细', '深入', '全面', '简单', '基础']
return any(indicator in text for indicator in scope_indicators)
def _has_target_audience(self, text: str) -> bool:
"""检查是否指定了目标受众"""
audience_indicators = ['初学者', '新手', '专家', '开发者', '工程师']
return any(indicator in text for indicator in audience_indicators)
def _has_practical_focus(self, text: str) -> bool:
"""检查是否有实践导向"""
practical_indicators = ['实践', '案例', '示例', '代码', '项目']
return any(indicator in text for indicator in practical_indicators)
async def understand_intent(self, user_input: str) -> Dict[str, Any]:
"""理解用户意图的主要方法"""
try:
# 构建意图理解提示
intent_prompt = f"""
请分析以下用户输入的写作需求:
用户输入: "{user_input}"
请执行以下步骤:
1. 使用analyze_user_intent分析用户意图
2. 使用clarify_requirements识别需要澄清的问题
3. 使用generate_optimization_suggestions生成优化建议
4. 使用enhance_topic_scope增强主题范围
请提供完整的意图理解报告。
"""
# 使用Agent执行意图理解
response = await self.agent.achat(intent_prompt)
return {
'success': True,
'user_input': user_input,
'intent_analysis': response.response,
'recommendations': 'Based on analysis'
}
except Exception as e:
return {
'success': False,
'error': str(e),
'user_input': user_input
}
3.2 协调Agent (Coordinator Agent)
职责: 全局任务调度与流程控制
核心能力:
- 接收意图理解Agent的分析结果
- 任务分解与优先级管理
- Agent间通信协调
- 异常处理与错误恢复
- 进度监控与状态管理
- 基于意图分析优化工作流程
技术栈:
from llama_index.agent.openai import OpenAIAgent
from llama_index.core.tools import FunctionTool
from llama_index.core.workflow import Workflow
from typing import Dict, Any
import asyncio
import logging
class CoordinatorAgent:
def __init__(self, service_context, workflow_manager):
self.service_context = service_context
self.workflow_manager = workflow_manager
self.active_tasks = {}
self.logger = logging.getLogger(__name__)
# 创建协调工具
self.coordination_tools = self._create_coordination_tools()
# 初始化协调Agent
self.agent = OpenAIAgent.from_tools(
tools=self.coordination_tools,
llm=service_context.llm,
system_prompt="""
你是一个专业的任务协调专家,负责:
1. 接收并整合意图理解Agent的分析结果
2. 基于用户真实意图制定执行计划
3. 协调各个专业Agent的工作
4. 监控任务进度和处理异常
5. 确保最终输出符合用户意图和质量要求
请始终以用户意图为导向,保持专业、高效的工作方式。
""",
verbose=True
)
def _create_coordination_tools(self):
"""创建协调工具"""
def start_blog_workflow(topic: str, requirements: str) -> str:
"""启动博客写作工作流"""
try:
task_id = f"blog_{len(self.active_tasks)}"
workflow_future = asyncio.create_task(
self.workflow_manager.start_blog_writing(topic, requirements)
)
self.active_tasks[task_id] = {
'workflow': workflow_future,
'status': 'running',
'topic': topic,
'start_time': asyncio.get_event_loop().time()
}
return f"博客写作工作流已启动,任务ID: {task_id}"
except Exception as e:
return f"启动工作流失败: {str(e)}"
def check_task_status(task_id: str) -> str:
"""检查任务状态"""
if task_id not in self.active_tasks:
return f"任务 {task_id} 不存在"
task = self.active_tasks[task_id]
if task['workflow'].done():
if task['workflow'].exception():
return f"任务 {task_id} 执行失败: {task['workflow'].exception()}"
else:
return f"任务 {task_id} 执行完成"
else:
return f"任务 {task_id} 正在执行中"
def get_task_result(task_id: str) -> str:
"""获取任务结果"""
if task_id not in self.active_tasks:
return f"任务 {task_id} 不存在"
task = self.active_tasks[task_id]
if not task['workflow'].done():
return f"任务 {task_id} 尚未完成"
try:
result = task['workflow'].result()
return f"任务结果: {result['final_content'][:500]}..."
except Exception as e:
return f"获取结果失败: {str(e)}"
return [
FunctionTool.from_defaults(
fn=start_blog_workflow,
name="start_blog_workflow",
description="启动博客写作工作流"
),
FunctionTool.from_defaults(
fn=check_task_status,
name="check_task_status",
description="检查任务执行状态"
),
FunctionTool.from_defaults(
fn=get_task_result,
name="get_task_result",
description="获取任务执行结果"
)
]
async def process_request(self, user_request: str) -> Dict[str, Any]:
"""处理用户请求"""
try:
# 使用Agent处理请求
response = await self.agent.achat(user_request)
# 解析响应并执行相应操作
result = {
'success': True,
'response': response.response,
'active_tasks': list(self.active_tasks.keys())
}
return result
except Exception as e:
self.logger.error(f"处理请求失败: {str(e)}")
return {
'success': False,
'error': str(e),
'active_tasks': list(self.active_tasks.keys())
}
async def monitor_tasks(self):
"""监控任务执行"""
while True:
completed_tasks = []
for task_id, task in self.active_tasks.items():
if task['workflow'].done():
completed_tasks.append(task_id)
if task['workflow'].exception():
self.logger.error(
f"任务 {task_id} 执行失败: {task['workflow'].exception()}"
)
else:
self.logger.info(f"任务 {task_id} 执行完成")
# 清理已完成的任务
for task_id in completed_tasks:
del self.active_tasks[task_id]
await asyncio.sleep(5) # 每5秒检查一次
3.3 规划Agent (Planning Agent)
职责: 内容结构规划与写作策略制定
核心能力:
- 主题分析与大纲生成
- 内容架构设计
- 写作策略规划
- 资源需求评估
算法实现:
class PlanningAgent:
def __init__(self):
self.outline_generator = OutlineGenerator()
self.strategy_planner = StrategyPlanner()
self.resource_estimator = ResourceEstimator()
async def create_plan(self, topic, requirements):
# 主题分析
topic_analysis = await self.analyze_topic(topic)
# 生成大纲
outline = await self.outline_generator.generate(
topic_analysis, requirements
)
# 制定写作策略
strategy = await self.strategy_planner.plan(
outline, target_audience=requirements.audience
)
return BlogPlan(outline, strategy, topic_analysis)
3.4 检索Agent (Retrieval Agent)
职责: 多源信息检索与知识融合
核心能力:
- Web搜索与实时信息获取
- 学术数据库检索
- 私有知识库查询
- 多模态信息融合
多源检索架构:
from llama_index.readers.web import SimpleWebPageReader
from llama_index.readers.arxiv import ArxivReader
from llama_index.readers.google import GoogleDocsReader
from llama_index.agent.openai import OpenAIAgent
from llama_index.core.tools import QueryEngineTool, FunctionTool
from llama_index.core import VectorStoreIndex
from llama_index.core.retrievers import VectorIndexRetriever
from llama_index.core.query_engine import RetrieverQueryEngine
from typing import List, Dict, Any
import asyncio
class RetrievalAgent:
def __init__(self, service_context, data_fusion_engine):
self.service_context = service_context
self.data_fusion = data_fusion_engine
# 初始化数据读取器
self.web_reader = SimpleWebPageReader(html_to_text=True)
self.arxiv_reader = ArxivReader()
self.google_docs_reader = GoogleDocsReader()
# 存储已建立的索引
self.indexes = {}
self.query_engines = {}
# 创建检索工具
self.retrieval_tools = self._create_retrieval_tools()
# 初始化检索Agent
self.agent = OpenAIAgent.from_tools(
tools=self.retrieval_tools,
llm=service_context.llm,
system_prompt="""
你是一个专业的信息检索专家,负责:
1. 从多个数据源检索相关信息
2. 评估信息的相关性和可信度
3. 整合和去重检索结果
4. 提供高质量的信息摘要
请确保检索的信息准确、相关且来源可靠。
""",
verbose=True
)
def _create_retrieval_tools(self):
"""创建检索工具"""
def search_web_content(urls: List[str], query: str) -> str:
"""搜索Web内容"""
try:
# 加载Web文档
documents = self.web_reader.load_data(urls)
# 为每个文档添加元数据
for i, doc in enumerate(documents):
doc.metadata.update({
'source_type': 'web',
'url': urls[i] if i < len(urls) else 'unknown',
'credibility_score': 0.7
})
# 创建索引
index = VectorStoreIndex.from_documents(
documents, service_context=self.service_context
)
# 查询
query_engine = index.as_query_engine(similarity_top_k=5)
response = query_engine.query(query)
return f"Web搜索结果: {response.response}"
except Exception as e:
return f"Web搜索失败: {str(e)}"
def search_arxiv_papers(paper_ids: List[str], query: str) -> str:
"""搜索arXiv论文"""
try:
# 加载arXiv论文
documents = self.arxiv_reader.load_papers(paper_ids)
# 添加元数据
for doc in documents:
doc.metadata.update({
'source_type': 'academic',
'credibility_score': 0.9
})
# 创建索引
index = VectorStoreIndex.from_documents(
documents, service_context=self.service_context
)
# 查询
query_engine = index.as_query_engine(similarity_top_k=3)
response = query_engine.query(query)
return f"学术论文搜索结果: {response.response}"
except Exception as e:
return f"学术搜索失败: {str(e)}"
def search_knowledge_base(index_name: str, query: str) -> str:
"""搜索知识库"""
try:
if index_name not in self.query_engines:
return f"知识库 {index_name} 不存在"
query_engine = self.query_engines[index_name]
response = query_engine.query(query)
return f"知识库搜索结果: {response.response}"
except Exception as e:
return f"知识库搜索失败: {str(e)}"
def multi_source_search(query: str, sources_config: Dict[str, Any]) -> str:
"""多源综合搜索"""
try:
all_results = []
# Web搜索
if 'web_urls' in sources_config:
web_result = search_web_content(
sources_config['web_urls'], query
)
all_results.append(web_result)
# 学术搜索
if 'arxiv_papers' in sources_config:
arxiv_result = search_arxiv_papers(
sources_config['arxiv_papers'], query
)
all_results.append(arxiv_result)
# 知识库搜索
if 'knowledge_bases' in sources_config:
for kb_name in sources_config['knowledge_bases']:
kb_result = search_knowledge_base(kb_name, query)
all_results.append(kb_result)
# 整合结果
integrated_result = "\n\n".join(all_results)
return f"综合搜索结果:\n{integrated_result}"
except Exception as e:
return f"多源搜索失败: {str(e)}"
return [
FunctionTool.from_defaults(
fn=search_web_content,
name="search_web_content",
description="从指定的Web页面搜索相关内容"
),
FunctionTool.from_defaults(
fn=search_arxiv_papers,
name="search_arxiv_papers",
description="从arXiv论文中搜索学术内容"
),
FunctionTool.from_defaults(
fn=search_knowledge_base,
name="search_knowledge_base",
description="从指定知识库中搜索内容"
),
FunctionTool.from_defaults(
fn=multi_source_search,
name="multi_source_search",
description="执行多源综合搜索"
)
]
async def retrieve_information(self, query: str, sources_config: Dict[str, Any]) -> Dict[str, Any]:
"""检索信息的主要方法"""
try:
# 构建检索提示
retrieval_prompt = f"""
请根据查询"{query}"从以下数据源检索相关信息:
数据源配置: {sources_config}
要求:
1. 使用multi_source_search进行综合搜索
2. 评估信息的相关性和可信度
3. 提供信息摘要和关键点
4. 标注信息来源
"""
# 使用Agent执行检索
response = await self.agent.achat(retrieval_prompt)
return {
'success': True,
'query': query,
'results': response.response,
'sources': sources_config
}
except Exception as e:
return {
'success': False,
'error': str(e),
'query': query
}
async def build_knowledge_base(self, name: str, documents: List) -> bool:
"""构建知识库索引"""
try:
# 处理文档
processed_docs = await self.data_fusion.process_and_fuse(documents)
# 创建索引
index = VectorStoreIndex.from_documents(
processed_docs, service_context=self.service_context
)
# 创建查询引擎
query_engine = index.as_query_engine(
similarity_top_k=10,
response_mode="tree_summarize"
)
# 存储
self.indexes[name] = index
self.query_engines[name] = query_engine
return True
except Exception as e:
print(f"构建知识库失败: {str(e)}")
return False
多源数据集成配置:
retrieval_sources:
web:
- google_search_api
- bing_api
- duckduckgo_api
academic:
- arxiv
- ieee_xplore
- acm_digital_library
- pubmed
- google_scholar
knowledge_base:
- company_docs
- tech_wikis
- code_repositories
3.5 写作Agent (Writing Agent)
职责: 核心内容生成与文本创作
核心能力:
- 基于RAG的智能写作
- 多语言内容生成
- 技术概念解释
- 代码示例生成
写作引擎设计:
from llama_index.agent.openai import OpenAIAgent
from llama_index.core.tools import FunctionTool, QueryEngineTool
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.llms import ChatMessage, MessageRole
from typing import Dict, List, Any, Optional
import json
class WritingAgent:
def __init__(self, service_context, retrieval_agent, code_generator=None):
self.service_context = service_context
self.retrieval_agent = retrieval_agent
self.code_generator = code_generator
# 创建写作工具
self.writing_tools = self._create_writing_tools()
# 初始化写作Agent
self.agent = OpenAIAgent.from_tools(
tools=self.writing_tools,
llm=service_context.llm,
memory=ChatMemoryBuffer.from_defaults(token_limit=10000),
system_prompt="""
你是一个专业的技术写作专家,擅长创作高质量的技术博客和文档。
核心职责:
1. 基于检索到的信息生成准确、专业的技术内容
2. 确保内容逻辑清晰、结构合理
3. 适当插入代码示例和技术图表
4. 保持一致的写作风格和专业术语使用
5. 优化内容的可读性和技术深度
写作原则:
- 准确性:确保技术信息的准确性
- 清晰性:使用简洁明了的语言表达复杂概念
- 实用性:提供有价值的实践指导
- 完整性:覆盖主题的关键方面
- 专业性:使用标准的技术术语和表达方式
""",
verbose=True
)
# 写作历史和上下文
self.writing_context = {
'current_topic': None,
'writing_style': 'technical',
'target_audience': 'developers',
'content_sections': []
}
def _create_writing_tools(self):
"""创建写作工具"""
def write_section_with_rag(section_plan: str, retrieved_context: str) -> str:
"""基于RAG的章节写作"""
try:
plan_data = json.loads(section_plan)
# 构建写作提示
writing_prompt = f"""
请根据以下信息写作一个技术博客章节:
章节计划:
- 标题: {plan_data.get('title', '')}
- 主要内容点: {plan_data.get('key_points', [])}
- 目标长度: {plan_data.get('target_length', 800)}字
- 写作风格: {plan_data.get('style', 'technical')}
- 目标读者: {plan_data.get('audience', 'developers')}
检索到的相关信息:
{retrieved_context}
写作要求:
1. 内容要准确、专业、有深度
2. 结构清晰,逻辑连贯
3. 适当使用技术术语,但要确保可读性
4. 如果涉及代码,请提供清晰的示例
5. 保持客观、专业的写作语调
请生成完整的章节内容,包括适当的标题层级。
"""
# 这里应该调用LLM生成内容
# 为了示例,返回一个结构化的响应
section_content = f"""
## {plan_data.get('title', '章节标题')}
[基于检索信息和章节计划生成的专业技术内容]
### 核心概念
[基于 {retrieved_context[:100]}... 的核心概念解释]
### 技术实现
[技术实现细节和最佳实践]
### 实际应用
[实际应用场景和案例分析]
"""
return section_content
except Exception as e:
return f"章节写作失败: {str(e)}"
def generate_code_examples(requirements: str, context: str) -> str:
"""生成代码示例"""
try:
if self.code_generator:
return self.code_generator.generate(requirements, context)
# 默认代码生成逻辑
req_data = json.loads(requirements) if isinstance(requirements, str) else requirements
code_template = f"""
{req_data.get('language', 'python')}
# {req_data.get('description', '代码示例')}
{req_data.get('template_code', '# 示例代码将在这里生成')}
**代码说明:**
- {req_data.get('explanation', '代码功能和使用方法的详细说明')}
"""
return code_template
except Exception as e:
return f"代码生成失败: {str(e)}"
def enhance_content_with_context(base_content: str, additional_context: str) -> str:
"""使用额外上下文增强内容"""
try:
enhancement_prompt = f"""
请基于以下额外上下文信息来增强和完善现有内容:
现有内容:
{base_content}
额外上下文:
{additional_context}
增强要求:
1. 添加更多技术细节和深度分析
2. 补充相关的最佳实践和注意事项
3. 增加实际应用场景和案例
4. 确保信息的准确性和时效性
5. 保持原有的结构和风格
请返回增强后的完整内容。
"""
# 这里应该调用LLM进行内容增强
enhanced_content = f"{base_content}\n\n### 深度分析\n\n[基于额外上下文的深度分析内容]\n\n### 最佳实践\n\n[相关最佳实践和建议]"
return enhanced_content
except Exception as e:
return f"内容增强失败: {str(e)}"
def optimize_readability(content: str, target_audience: str) -> str:
"""优化内容可读性"""
try:
optimization_prompt = f"""
请优化以下内容的可读性,目标读者是: {target_audience}
原始内容:
{content}
优化要求:
1. 调整语言复杂度以适应目标读者
2. 改善段落结构和逻辑流程
3. 添加必要的解释和背景信息
4. 优化标题和子标题的层次结构
5. 确保技术术语的适当使用和解释
请返回优化后的内容。
"""
# 这里应该调用LLM进行可读性优化
optimized_content = content # 简化示例
return optimized_content
except Exception as e:
return f"可读性优化失败: {str(e)}"
return [
FunctionTool.from_defaults(
fn=write_section_with_rag,
name="write_section_with_rag",
description="基于RAG检索信息和章节计划生成技术内容"
),
FunctionTool.from_defaults(
fn=generate_code_examples,
name="generate_code_examples",
description="根据需求生成相关的代码示例和说明"
),
FunctionTool.from_defaults(
fn=enhance_content_with_context,
name="enhance_content_with_context",
description="使用额外上下文信息增强和完善内容"
),
FunctionTool.from_defaults(
fn=optimize_readability,
name="optimize_readability",
description="优化内容的可读性以适应目标读者"
)
]
async def write_section(self, section_plan: Dict[str, Any], retrieved_info: Dict[str, Any]) -> Dict[str, Any]:
"""写作章节的主要方法"""
try:
# 更新写作上下文
self.writing_context['current_topic'] = section_plan.get('title', '')
# 构建写作提示
writing_prompt = f"""
请为技术博客写作一个章节,具体要求如下:
章节计划: {json.dumps(section_plan, ensure_ascii=False, indent=2)}
检索到的相关信息: {json.dumps(retrieved_info, ensure_ascii=False, indent=2)}
请按以下步骤执行:
1. 使用write_section_with_rag基于检索信息生成基础内容
2. 如果需要代码示例,使用generate_code_examples生成
3. 使用enhance_content_with_context增强内容深度
4. 使用optimize_readability优化可读性
最终输出完整的、高质量的技术章节内容。
"""
# 使用Agent执行写作任务
response = await self.agent.achat(writing_prompt)
# 记录到写作上下文
self.writing_context['content_sections'].append({
'title': section_plan.get('title', ''),
'content': response.response,
'metadata': section_plan
})
return {
'success': True,
'content': response.response,
'section_plan': section_plan,
'metadata': {
'word_count': len(response.response.split()),
'has_code': '```' in response.response,
'section_level': section_plan.get('level', 1)
}
}
except Exception as e:
return {
'success': False,
'error': str(e),
'section_plan': section_plan
}
async def write_complete_article(self, outline: Dict[str, Any], research_data: Dict[str, Any]) -> Dict[str, Any]:
"""写作完整文章"""
try:
article_sections = []
# 逐个写作章节
for section in outline.get('sections', []):
# 为每个章节检索相关信息
section_context = await self.retrieval_agent.retrieve_information(
section.get('title', ''), research_data
)
# 写作章节
section_result = await self.write_section(section, section_context)
if section_result['success']:
article_sections.append(section_result['content'])
else:
print(f"章节写作失败: {section_result['error']}")
# 组合完整文章
complete_article = "\n\n".join(article_sections)
return {
'success': True,
'article': complete_article,
'outline': outline,
'metadata': {
'total_sections': len(article_sections),
'total_words': len(complete_article.split()),
'writing_context': self.writing_context
}
}
except Exception as e:
return {
'success': False,
'error': str(e),
'outline': outline
}
3.6 风格化Agent (Style Agent)
职责: 内容风格优化与个性化调整
核心能力:
- 写作风格一致性保持
- 技术术语标准化
- 可读性优化
- 个性化风格适配
风格化处理流程:
from llama_index.agent.openai import OpenAIAgent
from llama_index.core.tools import FunctionTool
from llama_index.core.memory import ChatMemoryBuffer
from typing import Dict, List, Any, Optional
import json
import re
class StyleAgent:
def __init__(self, service_context, style_config=None):
self.service_context = service_context
self.style_config = style_config or self._default_style_config()
# 创建风格化工具
self.style_tools = self._create_style_tools()
# 初始化风格化Agent
self.agent = OpenAIAgent.from_tools(
tools=self.style_tools,
llm=service_context.llm,
memory=ChatMemoryBuffer.from_defaults(token_limit=8000),
system_prompt="""
你是一个专业的内容风格优化专家,负责:
1. 分析和调整文本的写作风格
2. 确保技术术语的一致性和标准化
3. 优化内容的可读性和专业性
4. 适配不同目标读者的需求
5. 保持品牌和个人写作风格的一致性
风格优化原则:
- 一致性:确保整篇文章风格统一
- 专业性:使用准确的技术术语
- 可读性:平衡专业性和易读性
- 适应性:根据目标读者调整语言复杂度
- 准确性:保持技术内容的准确性
""",
verbose=True
)
# 风格分析历史
self.style_history = []
def _default_style_config(self):
"""默认风格配置"""
return {
'tone': 'professional',
'formality': 'semi-formal',
'technical_level': 'intermediate',
'target_audience': 'developers',
'terminology_standards': {
'AI': 'Artificial Intelligence',
'ML': 'Machine Learning',
'API': 'Application Programming Interface'
},
'style_preferences': {
'use_active_voice': True,
'prefer_short_sentences': False,
'include_examples': True,
'use_bullet_points': True
}
}
def _create_style_tools(self):
"""创建风格化工具"""
def analyze_writing_style(content: str) -> str:
"""分析写作风格"""
try:
# 分析各种风格指标
analysis = {
'tone_analysis': self._analyze_tone(content),
'readability_score': self._calculate_readability(content),
'technical_density': self._analyze_technical_density(content),
'sentence_structure': self._analyze_sentence_structure(content),
'terminology_consistency': self._check_terminology_consistency(content),
'style_issues': self._identify_style_issues(content)
}
return json.dumps(analysis, ensure_ascii=False, indent=2)
except Exception as e:
return f"风格分析失败: {str(e)}"
def adjust_tone_and_style(content: str, target_style: str) -> str:
"""调整语调和风格"""
try:
style_config = json.loads(target_style) if isinstance(target_style, str) else target_style
adjustment_prompt = f"""
请根据以下风格要求调整内容的语调和写作风格:
原始内容:
{content}
目标风格配置:
- 语调: {style_config.get('tone', 'professional')}
- 正式程度: {style_config.get('formality', 'semi-formal')}
- 技术水平: {style_config.get('technical_level', 'intermediate')}
- 目标读者: {style_config.get('target_audience', 'developers')}
调整要求:
1. 保持技术内容的准确性
2. 调整语言的正式程度和复杂度
3. 确保语调符合目标风格
4. 优化句式结构和表达方式
5. 保持逻辑结构不变
请返回调整后的内容。
"""
# 这里应该调用LLM进行风格调整
adjusted_content = content # 简化示例
return adjusted_content
except Exception as e:
return f"风格调整失败: {str(e)}"
def standardize_terminology(content: str, terminology_dict: str) -> str:
"""标准化技术术语"""
try:
term_dict = json.loads(terminology_dict) if isinstance(terminology_dict, str) else terminology_dict
standardized_content = content
# 应用术语标准化
for abbrev, full_form in term_dict.items():
# 首次出现时使用全称
pattern = rf'\b{re.escape(abbrev)}\b'
if re.search(pattern, standardized_content):
# 只在首次出现时替换为全称+缩写
standardized_content = re.sub(
pattern,
f"{full_form} ({abbrev})",
standardized_content,
count=1
)
return standardized_content
except Exception as e:
return f"术语标准化失败: {str(e)}"
def optimize_readability(content: str, target_audience: str) -> str:
"""优化可读性"""
try:
optimization_prompt = f"""
请优化以下内容的可读性,目标读者是: {target_audience}
原始内容:
{content}
优化要求:
1. 简化复杂句式,提高理解度
2. 添加适当的过渡词和连接词
3. 优化段落结构和信息层次
4. 确保技术概念的清晰解释
5. 保持专业性的同时提高可读性
请返回优化后的内容。
"""
# 这里应该调用LLM进行可读性优化
optimized_content = content # 简化示例
return optimized_content
except Exception as e:
return f"可读性优化失败: {str(e)}"
def ensure_style_consistency(content_sections: List[str]) -> str:
"""确保风格一致性"""
try:
consistency_prompt = f"""
请检查并确保以下多个章节内容的风格一致性:
章节内容:
{json.dumps(content_sections, ensure_ascii=False, indent=2)}
一致性要求:
1. 统一语调和写作风格
2. 保持术语使用的一致性
3. 统一句式结构和表达习惯
4. 确保逻辑连贯性
5. 保持专业水平的一致性
请返回风格统一后的完整内容。
"""
# 这里应该调用LLM进行一致性处理
consistent_content = "\n\n".join(content_sections) # 简化示例
return consistent_content
except Exception as e:
return f"风格一致性处理失败: {str(e)}"
return [
FunctionTool.from_defaults(
fn=analyze_writing_style,
name="analyze_writing_style",
description="分析内容的写作风格和特征"
),
FunctionTool.from_defaults(
fn=adjust_tone_and_style,
name="adjust_tone_and_style",
description="根据目标风格调整内容的语调和写作风格"
),
FunctionTool.from_defaults(
fn=standardize_terminology,
name="standardize_terminology",
description="标准化技术术语的使用"
),
FunctionTool.from_defaults(
fn=optimize_readability,
name="optimize_readability",
description="优化内容的可读性"
),
FunctionTool.from_defaults(
fn=ensure_style_consistency,
name="ensure_style_consistency",
description="确保多个章节间的风格一致性"
)
]
def _analyze_tone(self, content: str) -> Dict[str, Any]:
"""分析语调"""
# 简化的语调分析
formal_indicators = ['furthermore', 'therefore', 'consequently', 'moreover']
informal_indicators = ['basically', 'pretty much', 'kind of', 'sort of']
formal_count = sum(1 for word in formal_indicators if word in content.lower())
informal_count = sum(1 for word in informal_indicators if word in content.lower())
return {
'formality_score': formal_count - informal_count,
'tone': 'formal' if formal_count > informal_count else 'informal'
}
def _calculate_readability(self, content: str) -> float:
"""计算可读性分数"""
# 简化的可读性计算
sentences = content.split('.')
words = content.split()
if len(sentences) == 0:
return 0.0
avg_sentence_length = len(words) / len(sentences)
return max(0, 100 - avg_sentence_length * 2) # 简化公式
def _analyze_technical_density(self, content: str) -> float:
"""分析技术密度"""
technical_terms = ['algorithm', 'framework', 'implementation', 'architecture', 'optimization']
words = content.lower().split()
if len(words) == 0:
return 0.0
technical_count = sum(1 for word in words if word in technical_terms)
return technical_count / len(words) * 100
def _analyze_sentence_structure(self, content: str) -> Dict[str, Any]:
"""分析句式结构"""
sentences = [s.strip() for s in content.split('.') if s.strip()]
if not sentences:
return {'avg_length': 0, 'complexity': 'low'}
avg_length = sum(len(s.split()) for s in sentences) / len(sentences)
complexity = 'high' if avg_length > 20 else 'medium' if avg_length > 10 else 'low'
return {
'avg_length': avg_length,
'complexity': complexity,
'sentence_count': len(sentences)
}
def _check_terminology_consistency(self, content: str) -> Dict[str, Any]:
"""检查术语一致性"""
# 简化的一致性检查
inconsistencies = []
# 检查常见的不一致用法
if 'AI' in content and 'artificial intelligence' in content.lower():
inconsistencies.append('AI术语使用不一致')
return {
'inconsistencies': inconsistencies,
'consistency_score': max(0, 100 - len(inconsistencies) * 10)
}
def _identify_style_issues(self, content: str) -> List[str]:
"""识别风格问题"""
issues = []
# 检查常见风格问题
if content.count('!') > 3:
issues.append('过多使用感叹号')
if 'very' in content.lower():
issues.append('使用了模糊限定词')
return issues
async def stylize_content(self, content: str, style_guide: Dict[str, Any]) -> Dict[str, Any]:
"""风格化内容的主要方法"""
try:
# 构建风格化提示
stylization_prompt = f"""
请对以下内容进行全面的风格优化:
原始内容: {content}
风格指南: {json.dumps(style_guide, ensure_ascii=False, indent=2)}
请按以下步骤执行:
1. 使用analyze_writing_style分析当前风格
2. 使用adjust_tone_and_style调整语调和风格
3. 使用standardize_terminology标准化术语
4. 使用optimize_readability优化可读性
最终输出风格优化后的完整内容。
"""
# 使用Agent执行风格化任务
response = await self.agent.achat(stylization_prompt)
# 记录风格化历史
self.style_history.append({
'original_content': content,
'style_guide': style_guide,
'stylized_content': response.response,
'timestamp': self._get_timestamp()
})
return {
'success': True,
'stylized_content': response.response,
'style_guide': style_guide,
'metadata': {
'original_length': len(content.split()),
'stylized_length': len(response.response.split()),
'style_changes_applied': True
}
}
except Exception as e:
return {
'success': False,
'error': str(e),
'original_content': content
}
def _get_timestamp(self) -> str:
"""获取时间戳"""
from datetime import datetime
return datetime.now().isoformat()
3.7 引用Agent (Citation Agent)
职责: 引用管理与学术规范
核心能力:
- 自动引用生成
- 多种引用格式支持
- 引用质量验证
- 参考文献管理
引用系统设计:
from llama_index.agent.openai import OpenAIAgent
from llama_index.core.tools import FunctionTool
from llama_index.core.memory import ChatMemoryBuffer
from typing import Dict, List, Any, Optional
import json
import re
from datetime import datetime
class CitationAgent:
def __init__(self, service_context, citation_config=None):
self.service_context = service_context
self.citation_config = citation_config or self._default_citation_config()
# 创建引用工具
self.citation_tools = self._create_citation_tools()
# 初始化引用Agent
self.agent = OpenAIAgent.from_tools(
tools=self.citation_tools,
llm=service_context.llm,
memory=ChatMemoryBuffer.from_defaults(token_limit=6000),
system_prompt="""
你是一个专业的学术引用和参考文献管理专家,负责:
1. 识别需要引用的内容和观点
2. 生成符合学术规范的引用格式
3. 管理和组织参考文献
4. 验证引用的准确性和完整性
5. 确保引用格式的一致性
引用原则:
- 准确性:确保引用信息的准确性
- 完整性:提供完整的引用信息
- 一致性:保持引用格式的统一
- 规范性:遵循学术引用标准
- 可追溯性:确保读者能够找到原始资料
""",
verbose=True
)
# 引用数据库
self.citation_database = {
'sources': {},
'citations': [],
'bibliography': []
}
def _default_citation_config(self):
"""默认引用配置"""
return {
'default_style': 'APA',
'supported_styles': ['APA', 'MLA', 'Chicago', 'IEEE'],
'auto_generate': True,
'include_urls': True,
'include_access_dates': True,
'validate_sources': True
}
def _create_citation_tools(self):
"""创建引用工具"""
def extract_citation_points(content: str) -> str:
"""提取需要引用的内容点"""
try:
# 识别需要引用的内容
citation_points = []
# 查找直接引用(引号内容)
direct_quotes = re.findall(r'"([^"]+)"', content)
for quote in direct_quotes:
citation_points.append({
'type': 'direct_quote',
'content': quote,
'position': content.find(f'"{quote}"'),
'needs_citation': True
})
# 查找统计数据和具体数字
statistics = re.findall(r'\b\d+(?:\.\d+)?%|\b\d+(?:,\d{3})*(?:\.\d+)?\b', content)
for stat in statistics:
if content.find(stat) != -1:
citation_points.append({
'type': 'statistic',
'content': stat,
'position': content.find(stat),
'needs_citation': True
})
# 查找技术概念和专业术语
technical_patterns = [
r'according to [^,\.]+',
r'research shows',
r'studies indicate',
r'as demonstrated by'
]
for pattern in technical_patterns:
matches = re.finditer(pattern, content, re.IGNORECASE)
for match in matches:
citation_points.append({
'type': 'research_reference',
'content': match.group(),
'position': match.start(),
'needs_citation': True
})
return json.dumps(citation_points, ensure_ascii=False, indent=2)
except Exception as e:
return f"引用点提取失败: {str(e)}"
def match_sources_to_citations(citation_points: str, available_sources: str) -> str:
"""将引用点与可用源匹配"""
try:
points = json.loads(citation_points)
sources = json.loads(available_sources)
matched_citations = []
for point in points:
best_match = None
best_score = 0
# 为每个引用点找到最佳匹配的源
for source_id, source_info in sources.items():
score = self._calculate_relevance_score(
point['content'],
source_info.get('content', '') + ' ' + source_info.get('title', '')
)
if score > best_score:
best_score = score
best_match = source_id
if best_match and best_score > 0.3: # 阈值
matched_citations.append({
'citation_point': point,
'source_id': best_match,
'source_info': sources[best_match],
'relevance_score': best_score
})
return json.dumps(matched_citations, ensure_ascii=False, indent=2)
except Exception as e:
return f"源匹配失败: {str(e)}"
def format_citations(matched_citations: str, citation_style: str) -> str:
"""格式化引用"""
try:
citations = json.loads(matched_citations)
formatted_citations = []
for citation in citations:
source_info = citation['source_info']
if citation_style.upper() == 'APA':
formatted = self._format_apa_citation(source_info)
elif citation_style.upper() == 'MLA':
formatted = self._format_mla_citation(source_info)
elif citation_style.upper() == 'IEEE':
formatted = self._format_ieee_citation(source_info)
else:
formatted = self._format_apa_citation(source_info) # 默认APA
formatted_citations.append({
'citation_point': citation['citation_point'],
'formatted_citation': formatted,
'in_text_citation': self._generate_in_text_citation(source_info, citation_style),
'source_id': citation['source_id']
})
return json.dumps(formatted_citations, ensure_ascii=False, indent=2)
except Exception as e:
return f"引用格式化失败: {str(e)}"
def generate_bibliography(sources: str, citation_style: str) -> str:
"""生成参考文献列表"""
try:
source_data = json.loads(sources)
bibliography = []
for source_id, source_info in source_data.items():
if citation_style.upper() == 'APA':
bib_entry = self._format_apa_bibliography(source_info)
elif citation_style.upper() == 'MLA':
bib_entry = self._format_mla_bibliography(source_info)
elif citation_style.upper() == 'IEEE':
bib_entry = self._format_ieee_bibliography(source_info)
else:
bib_entry = self._format_apa_bibliography(source_info)
bibliography.append({
'source_id': source_id,
'formatted_entry': bib_entry,
'source_type': source_info.get('type', 'unknown')
})
# 按字母顺序排序
bibliography.sort(key=lambda x: x['formatted_entry'])
return json.dumps(bibliography, ensure_ascii=False, indent=2)
except Exception as e:
return f"参考文献生成失败: {str(e)}"
def validate_citations(citations: str) -> str:
"""验证引用质量"""
try:
citation_data = json.loads(citations)
validation_results = {
'valid_citations': [],
'invalid_citations': [],
'warnings': [],
'suggestions': []
}
for citation in citation_data:
source_info = citation.get('source_info', {})
issues = []
# 检查必需字段
required_fields = ['title', 'author', 'year']
for field in required_fields:
if not source_info.get(field):
issues.append(f"缺少{field}字段")
# 检查URL有效性
if source_info.get('url') and not source_info['url'].startswith(('http://', 'https://')):
issues.append("URL格式无效")
# 检查年份格式
year = source_info.get('year')
if year and not re.match(r'^\d{4}$', str(year)):
issues.append("年份格式无效")
if issues:
validation_results['invalid_citations'].append({
'citation': citation,
'issues': issues
})
else:
validation_results['valid_citations'].append(citation)
return json.dumps(validation_results, ensure_ascii=False, indent=2)
except Exception as e:
return f"引用验证失败: {str(e)}"
return [
FunctionTool.from_defaults(
fn=extract_citation_points,
name="extract_citation_points",
description="从内容中提取需要引用的关键点"
),
FunctionTool.from_defaults(
fn=match_sources_to_citations,
name="match_sources_to_citations",
description="将引用点与可用的源材料进行匹配"
),
FunctionTool.from_defaults(
fn=format_citations,
name="format_citations",
description="按指定格式格式化引用"
),
FunctionTool.from_defaults(
fn=generate_bibliography,
name="generate_bibliography",
description="生成完整的参考文献列表"
),
FunctionTool.from_defaults(
fn=validate_citations,
name="validate_citations",
description="验证引用的质量和完整性"
)
]
def _calculate_relevance_score(self, citation_content: str, source_content: str) -> float:
"""计算相关性分数"""
# 简化的相关性计算
citation_words = set(citation_content.lower().split())
source_words = set(source_content.lower().split())
if not citation_words or not source_words:
return 0.0
intersection = citation_words.intersection(source_words)
union = citation_words.union(source_words)
return len(intersection) / len(union) if union else 0.0
def _format_apa_citation(self, source_info: Dict[str, Any]) -> str:
"""格式化APA引用"""
author = source_info.get('author', 'Unknown Author')
year = source_info.get('year', 'n.d.')
title = source_info.get('title', 'Untitled')
url = source_info.get('url', '')
citation = f"{author} ({year}). {title}."
if url:
citation += f" Retrieved from {url}"
return citation
def _format_mla_citation(self, source_info: Dict[str, Any]) -> str:
"""格式化MLA引用"""
author = source_info.get('author', 'Unknown Author')
title = source_info.get('title', 'Untitled')
year = source_info.get('year', 'n.d.')
url = source_info.get('url', '')
citation = f"{author}. \"{title}.\" {year}."
if url:
citation += f" Web. {datetime.now().strftime('%d %b %Y')}."
return citation
def _format_ieee_citation(self, source_info: Dict[str, Any]) -> str:
"""格式化IEEE引用"""
author = source_info.get('author', 'Unknown Author')
title = source_info.get('title', 'Untitled')
year = source_info.get('year', 'n.d.')
url = source_info.get('url', '')
citation = f"{author}, \"{title},\" {year}."
if url:
citation += f" [Online]. Available: {url}"
return citation
def _generate_in_text_citation(self, source_info: Dict[str, Any], style: str) -> str:
"""生成文内引用"""
author = source_info.get('author', 'Unknown')
year = source_info.get('year', 'n.d.')
if style.upper() == 'APA':
return f"({author}, {year})"
elif style.upper() == 'MLA':
return f"({author})"
elif style.upper() == 'IEEE':
return f"[{source_info.get('id', '1')}]"
else:
return f"({author}, {year})"
def _format_apa_bibliography(self, source_info: Dict[str, Any]) -> str:
"""格式化APA参考文献条目"""
return self._format_apa_citation(source_info)
def _format_mla_bibliography(self, source_info: Dict[str, Any]) -> str:
"""格式化MLA参考文献条目"""
return self._format_mla_citation(source_info)
def _format_ieee_bibliography(self, source_info: Dict[str, Any]) -> str:
"""格式化IEEE参考文献条目"""
return self._format_ieee_citation(source_info)
async def generate_citations(self, content: str, sources: Dict[str, Any], style: str = 'APA') -> Dict[str, Any]:
"""生成引用的主要方法"""
try:
# 构建引用生成提示
citation_prompt = f"""
请为以下内容生成完整的学术引用:
内容: {content}
可用源材料: {json.dumps(sources, ensure_ascii=False, indent=2)}
引用格式: {style}
请按以下步骤执行:
1. 使用extract_citation_points提取需要引用的内容点
2. 使用match_sources_to_citations匹配源材料
3. 使用format_citations格式化引用
4. 使用generate_bibliography生成参考文献
5. 使用validate_citations验证引用质量
最终输出完整的引用信息和参考文献列表。
"""
# 使用Agent执行引用生成任务
response = await self.agent.achat(citation_prompt)
# 更新引用数据库
self.citation_database['sources'].update(sources)
return {
'success': True,
'citations': response.response,
'style': style,
'metadata': {
'source_count': len(sources),
'citation_style': style,
'generation_time': datetime.now().isoformat()
}
}
except Exception as e:
return {
'success': False,
'error': str(e),
'content': content
}
3.8 图片生成Agent (Image Generation Agent)
职责: 技术图表与插图生成
核心能力:
- 技术架构图生成
- 流程图创建
- 代码可视化
- AI图片生成
图片生成系统:
from llama_index.agent.openai import OpenAIAgent
from llama_index.core.tools import FunctionTool
from llama_index.core.memory import ChatMemoryBuffer
from typing import Dict, List, Any, Optional
import json
import base64
from datetime import datetime
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.graph_objects as go
import plotly.express as px
from PIL import Image, ImageDraw, ImageFont
import io
class ImageGenerationAgent:
def __init__(self, service_context, image_config=None):
self.service_context = service_context
self.image_config = image_config or self._default_image_config()
# 创建图像生成工具
self.image_tools = self._create_image_tools()
# 初始化图像生成Agent
self.agent = OpenAIAgent.from_tools(
tools=self.image_tools,
llm=service_context.llm,
memory=ChatMemoryBuffer.from_defaults(token_limit=6000),
system_prompt="""
你是一个专业的技术图表和数据可视化专家,负责:
1. 根据文本描述生成技术图表和流程图
2. 创建数据可视化图表
3. 设计架构图和系统图
4. 优化图像质量和布局
5. 确保图表的专业性和可读性
设计原则:
- 清晰性:图表信息清晰易懂
- 专业性:符合技术文档标准
- 美观性:视觉效果良好
- 一致性:风格统一
- 实用性:服务于内容表达
""",
verbose=True
)
# 图像缓存
self.image_cache = {}
# 设置绘图样式
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
def _default_image_config(self):
"""默认图像配置"""
return {
'default_format': 'svg',
'default_size': (800, 600),
'default_dpi': 300,
'color_scheme': 'professional',
'font_family': 'Arial',
'enable_cache': True,
'max_cache_size': 100
}
def _create_image_tools(self):
"""创建图像生成工具"""
def generate_flowchart(description: str, chart_type: str = 'process') -> str:
"""生成流程图"""
try:
# 解析流程描述
steps = self._parse_flowchart_description(description)
# 创建流程图
fig, ax = plt.subplots(figsize=(12, 8))
ax.set_xlim(0, 10)
ax.set_ylim(0, len(steps) + 1)
# 绘制流程步骤
for i, step in enumerate(steps):
y_pos = len(steps) - i
# 绘制矩形框
rect = plt.Rectangle((2, y_pos-0.3), 6, 0.6,
facecolor='lightblue',
edgecolor='navy',
linewidth=2)
ax.add_patch(rect)
# 添加文本
ax.text(5, y_pos, step['text'],
ha='center', va='center',
fontsize=10, fontweight='bold')
# 绘制箭头(除了最后一个步骤)
if i < len(steps) - 1:
ax.arrow(5, y_pos-0.4, 0, -0.3,
head_width=0.2, head_length=0.1,
fc='navy', ec='navy')
ax.set_title(f'流程图: {chart_type}', fontsize=16, fontweight='bold')
ax.axis('off')
# 保存为SVG
svg_buffer = io.StringIO()
plt.savefig(svg_buffer, format='svg', bbox_inches='tight')
svg_content = svg_buffer.getvalue()
plt.close()
return f"流程图生成成功:\n{svg_content[:500]}..." # 返回部分内容作为确认
except Exception as e:
return f"流程图生成失败: {str(e)}"
def create_architecture_diagram(components: str, relationships: str) -> str:
"""创建架构图"""
try:
# 解析组件和关系
comp_data = json.loads(components)
rel_data = json.loads(relationships)
fig, ax = plt.subplots(figsize=(14, 10))
# 绘制组件
positions = {}
colors = ['lightblue', 'lightgreen', 'lightcoral', 'lightyellow', 'lightpink']
for i, comp in enumerate(comp_data):
x = (i % 3) * 4 + 2
y = (i // 3) * 3 + 2
positions[comp['name']] = (x, y)
# 绘制组件框
rect = plt.Rectangle((x-1, y-0.5), 2, 1,
facecolor=colors[i % len(colors)],
edgecolor='black',
linewidth=2)
ax.add_patch(rect)
# 添加组件名称
ax.text(x, y, comp['name'],
ha='center', va='center',
fontsize=10, fontweight='bold')
# 绘制关系线
for rel in rel_data:
if rel['from'] in positions and rel['to'] in positions:
start_pos = positions[rel['from']]
end_pos = positions[rel['to']]
ax.annotate('', xy=end_pos, xytext=start_pos,
arrowprops=dict(arrowstyle='->',
lw=2, color='red'))
# 添加关系标签
mid_x = (start_pos[0] + end_pos[0]) / 2
mid_y = (start_pos[1] + end_pos[1]) / 2
ax.text(mid_x, mid_y + 0.2, rel.get('label', ''),
ha='center', va='center',
fontsize=8, style='italic')
ax.set_title('系统架构图', fontsize=16, fontweight='bold')
ax.set_xlim(-1, 11)
ax.set_ylim(0, 8)
ax.axis('off')
# 保存为SVG
svg_buffer = io.StringIO()
plt.savefig(svg_buffer, format='svg', bbox_inches='tight')
svg_content = svg_buffer.getvalue()
plt.close()
return f"架构图生成成功:\n{svg_content[:500]}..."
except Exception as e:
return f"架构图生成失败: {str(e)}"
def generate_data_visualization(data: str, chart_type: str = 'bar') -> str:
"""生成数据可视化图表"""
try:
# 解析数据
chart_data = json.loads(data)
if chart_type == 'bar':
fig = px.bar(x=chart_data['labels'],
y=chart_data['values'],
title=chart_data.get('title', '数据图表'))
elif chart_type == 'line':
fig = px.line(x=chart_data['labels'],
y=chart_data['values'],
title=chart_data.get('title', '趋势图'))
elif chart_type == 'pie':
fig = px.pie(values=chart_data['values'],
names=chart_data['labels'],
title=chart_data.get('title', '饼图'))
elif chart_type == 'scatter':
fig = px.scatter(x=chart_data['x'],
y=chart_data['y'],
title=chart_data.get('title', '散点图'))
else:
# 默认柱状图
fig = px.bar(x=chart_data['labels'],
y=chart_data['values'],
title=chart_data.get('title', '数据图表'))
# 更新布局
fig.update_layout(
font=dict(family="Arial, sans-serif", size=12),
title_font_size=16,
showlegend=True,
width=800,
height=600
)
# 转换为SVG
svg_content = fig.to_image(format="svg").decode('utf-8')
return f"数据可视化图表生成成功:\n{svg_content[:500]}..."
except Exception as e:
return f"数据可视化生成失败: {str(e)}"
def create_code_visualization(code: str, language: str = 'python') -> str:
"""创建代码可视化"""
try:
# 简化的代码可视化
fig, ax = plt.subplots(figsize=(12, 8))
# 代码行分析
lines = code.split('\n')
line_count = len(lines)
# 创建代码块可视化
ax.text(0.05, 0.95, f"代码语言: {language}",
transform=ax.transAxes, fontsize=14, fontweight='bold')
ax.text(0.05, 0.90, f"总行数: {line_count}",
transform=ax.transAxes, fontsize=12)
# 显示代码结构
y_pos = 0.85
for i, line in enumerate(lines[:20]): # 只显示前20行
if line.strip():
ax.text(0.05, y_pos, f"{i+1:2d}: {line[:60]}...",
transform=ax.transAxes, fontsize=9,
fontfamily='monospace')
y_pos -= 0.04
if line_count > 20:
ax.text(0.05, y_pos, f"... 还有 {line_count - 20} 行",
transform=ax.transAxes, fontsize=9, style='italic')
ax.set_title('代码结构可视化', fontsize=16, fontweight='bold')
ax.axis('off')
# 保存为SVG
svg_buffer = io.StringIO()
plt.savefig(svg_buffer, format='svg', bbox_inches='tight')
svg_content = svg_buffer.getvalue()
plt.close()
return f"代码可视化生成成功:\n{svg_content[:500]}..."
except Exception as e:
return f"代码可视化生成失败: {str(e)}"
def optimize_image_quality(image_description: str, optimization_goals: str) -> str:
"""优化图像质量"""
try:
goals = json.loads(optimization_goals)
optimization_plan = {
'image_description': image_description,
'optimization_goals': goals,
'recommended_improvements': [],
'technical_adjustments': []
}
# 根据目标生成优化建议
if 'clarity' in goals:
optimization_plan['recommended_improvements'].extend([
"增加图像分辨率",
"优化颜色对比度",
"调整字体大小和清晰度"
])
if 'professional' in goals:
optimization_plan['recommended_improvements'].extend([
"使用企业级配色方案",
"统一视觉风格",
"添加专业图例和标注"
])
if 'accessibility' in goals:
optimization_plan['recommended_improvements'].extend([
"确保色盲友好的颜色选择",
"提供替代文本描述",
"增强颜色对比度"
])
optimization_plan['technical_adjustments'] = [
"调整DPI至300以上",
"使用矢量格式(SVG)",
"优化文件大小",
"确保跨平台兼容性"
]
return json.dumps(optimization_plan, ensure_ascii=False, indent=2)
except Exception as e:
return f"图像质量优化失败: {str(e)}"
return [
FunctionTool.from_defaults(
fn=generate_flowchart,
name="generate_flowchart",
description="根据描述生成流程图"
),
FunctionTool.from_defaults(
fn=create_architecture_diagram,
name="create_architecture_diagram",
description="创建系统架构图"
),
FunctionTool.from_defaults(
fn=generate_data_visualization,
name="generate_data_visualization",
description="生成数据可视化图表"
),
FunctionTool.from_defaults(
fn=create_code_visualization,
name="create_code_visualization",
description="创建代码结构可视化"
),
FunctionTool.from_defaults(
fn=optimize_image_quality,
name="optimize_image_quality",
description="优化图像质量和视觉效果"
)
]
def _parse_flowchart_description(self, description: str) -> List[Dict[str, str]]:
"""解析流程图描述"""
# 简化的解析逻辑
steps = []
lines = description.split('\n')
for line in lines:
line = line.strip()
if line and not line.startswith('#'):
steps.append({
'text': line,
'type': 'process'
})
return steps
async def generate_images(self, content_sections: List[Dict[str, Any]]) -> Dict[str, Any]:
"""生成图像的主要方法"""
try:
generated_images = []
for section in content_sections:
if section.get('needs_diagram'):
# 构建图表生成提示
diagram_prompt = f"""
请为以下内容生成技术图表:
内容: {section.get('content', '')}
图表类型: {section.get('diagram_type', 'flowchart')}
规格说明: {section.get('diagram_spec', '')}
请使用适当的工具生成专业的技术图表。
"""
response = await self.agent.achat(diagram_prompt)
generated_images.append({
'type': 'diagram',
'content': response.response,
'section_id': section.get('id')
})
if section.get('needs_chart'):
# 生成数据图表
chart_prompt = f"""
请为以下数据生成可视化图表:
数据: {json.dumps(section.get('data', {}), ensure_ascii=False)}
图表类型: {section.get('chart_type', 'bar')}
请使用generate_data_visualization工具生成图表。
"""
response = await self.agent.achat(chart_prompt)
generated_images.append({
'type': 'chart',
'content': response.response,
'section_id': section.get('id')
})
if section.get('needs_code_viz'):
# 生成代码可视化
code_viz_prompt = f"""
请为以下代码生成可视化:
代码: {section.get('code', '')}
语言: {section.get('language', 'python')}
请使用create_code_visualization工具生成代码可视化。
"""
response = await self.agent.achat(code_viz_prompt)
generated_images.append({
'type': 'code_visualization',
'content': response.response,
'section_id': section.get('id')
})
return {
'success': True,
'images': generated_images,
'metadata': {
'total_images': len(generated_images),
'generation_time': datetime.now().isoformat()
}
}
except Exception as e:
return {
'success': False,
'error': str(e),
'content_sections': content_sections
}
3.9 审校Agent (Review Agent)
职责: 内容质量控制与错误检查
核心能力:
- 语法检查与修正
- 事实准确性验证
- 逻辑一致性检查
- 技术准确性审核
质量控制流程:
from llama_index.agent.openai import OpenAIAgent
from llama_index.core.tools import FunctionTool
from llama_index.llms.openai import OpenAI
import re
import asyncio
class ReviewAgent:
def __init__(self):
# 初始化LLM
self.llm = OpenAI(model="gpt-4", temperature=0.1)
# 创建审校工具
self.review_tools = [
FunctionTool.from_defaults(fn=self.check_grammar),
FunctionTool.from_defaults(fn=self.verify_facts),
FunctionTool.from_defaults(fn=self.check_logic),
FunctionTool.from_defaults(fn=self.review_technical_accuracy),
FunctionTool.from_defaults(fn=self.check_consistency)
]
# 初始化Agent
self.agent = OpenAIAgent.from_tools(
self.review_tools,
llm=self.llm,
verbose=True,
system_prompt="""
你是一个专业的内容审校专家。你的任务是:
1. 检查语法和拼写错误
2. 验证事实准确性
3. 检查逻辑一致性
4. 审核技术准确性
5. 确保内容一致性
请提供详细的审校报告和改进建议。
"""
)
def check_grammar(self, content: str) -> dict:
"""检查语法和拼写错误"""
issues = []
# 基本语法检查
common_errors = {
r'\b(it\'s)\b(?=\s+[A-Z])': "应该使用 'its' 而不是 'it's'",
r'\b(your)\b(?=\s+going)': "应该使用 'you're' 而不是 'your'",
r'\b(there)\b(?=\s+are)': "检查是否应该使用 'their'",
}
for pattern, suggestion in common_errors.items():
matches = re.finditer(pattern, content, re.IGNORECASE)
for match in matches:
issues.append({
'type': 'grammar',
'position': match.start(),
'text': match.group(),
'suggestion': suggestion
})
return {
'category': 'grammar',
'issues': issues,
'score': max(0, 100 - len(issues) * 5)
}
def verify_facts(self, content: str, sources: list = None) -> dict:
"""验证事实准确性"""
issues = []
# 检查数字和统计数据
number_pattern = r'\b\d+(?:\.\d+)?%?\b'
numbers = re.findall(number_pattern, content)
for number in numbers:
# 这里可以集成外部事实检查API
issues.append({
'type': 'fact_check',
'text': number,
'suggestion': f'请验证数字 "{number}" 的准确性'
})
return {
'category': 'facts',
'issues': issues[:3], # 限制问题数量
'score': max(0, 100 - len(issues) * 10)
}
def check_logic(self, content: str) -> dict:
"""检查逻辑一致性"""
issues = []
# 检查逻辑连接词
logical_connectors = ['however', 'therefore', 'moreover', 'furthermore']
sentences = content.split('.')
for i, sentence in enumerate(sentences[:-1]):
next_sentence = sentences[i + 1].strip()
if any(conn in sentence.lower() for conn in logical_connectors):
if not next_sentence:
issues.append({
'type': 'logic',
'position': i,
'suggestion': '逻辑连接词后缺少相应的论述'
})
return {
'category': 'logic',
'issues': issues,
'score': max(0, 100 - len(issues) * 15)
}
def review_technical_accuracy(self, content: str) -> dict:
"""审核技术准确性"""
issues = []
# 检查技术术语的一致性
tech_terms = ['API', 'REST', 'JSON', 'HTTP', 'HTTPS', 'SQL']
term_usage = {}
for term in tech_terms:
variations = re.findall(rf'\b{term}\w*\b', content, re.IGNORECASE)
if variations:
term_usage[term] = variations
for term, variations in term_usage.items():
unique_variations = set(v.lower() for v in variations)
if len(unique_variations) > 1:
issues.append({
'type': 'technical',
'term': term,
'variations': list(unique_variations),
'suggestion': f'术语 "{term}" 使用不一致'
})
return {
'category': 'technical',
'issues': issues,
'score': max(0, 100 - len(issues) * 20)
}
def check_consistency(self, content: str) -> dict:
"""检查内容一致性"""
issues = []
# 检查标题层级
headers = re.findall(r'^(#{1,6})\s+(.+)$', content, re.MULTILINE)
for i, (level, title) in enumerate(headers[:-1]):
next_level, next_title = headers[i + 1]
level_diff = len(next_level) - len(level)
if level_diff > 1:
issues.append({
'type': 'consistency',
'position': i,
'suggestion': f'标题层级跳跃过大: {title} -> {next_title}'
})
return {
'category': 'consistency',
'issues': issues,
'score': max(0, 100 - len(issues) * 10)
}
async def review_content(self, content: str, sources: list = None):
"""执行完整的内容审校"""
try:
# 使用Agent进行综合审校
review_prompt = f"""
请对以下内容进行全面审校:
内容:
{content}
请检查:
1. 语法和拼写
2. 事实准确性
3. 逻辑一致性
4. 技术准确性
5. 内容一致性
提供详细的审校报告和改进建议。
"""
response = await self.agent.achat(review_prompt)
# 执行各项检查
grammar_result = self.check_grammar(content)
facts_result = self.verify_facts(content, sources)
logic_result = self.check_logic(content)
technical_result = self.review_technical_accuracy(content)
consistency_result = self.check_consistency(content)
# 计算总体质量分数
total_score = (
grammar_result['score'] * 0.2 +
facts_result['score'] * 0.3 +
logic_result['score'] * 0.2 +
technical_result['score'] * 0.2 +
consistency_result['score'] * 0.1
)
return {
'success': True,
'agent_review': str(response),
'detailed_analysis': {
'grammar': grammar_result,
'facts': facts_result,
'logic': logic_result,
'technical': technical_result,
'consistency': consistency_result
},
'overall_score': round(total_score, 2),
'recommendations': self._generate_recommendations(total_score)
}
except Exception as e:
return {
'success': False,
'error': str(e),
'overall_score': 0
}
def _generate_recommendations(self, score: float) -> list:
"""根据分数生成改进建议"""
if score >= 90:
return ['内容质量优秀,可以发布']
elif score >= 80:
return ['内容质量良好,建议小幅修改后发布']
elif score >= 70:
return ['内容需要适度修改', '重点关注事实准确性和技术细节']
else:
return ['内容需要大幅修改', '建议重新审视逻辑结构', '加强事实验证']
3.10 格式化Agent (Formatting Agent)
职责: Markdown格式化与多语言输出
核心能力:
- Markdown标准化
- 多语言格式适配
- 响应式布局优化
- 元数据嵌入
格式化处理:
from llama_index.agent.openai import OpenAIAgent
from llama_index.core.tools import FunctionTool
from llama_index.llms.openai import OpenAI
import re
import yaml
import json
from datetime import datetime
class FormattingAgent:
def __init__(self):
# 初始化LLM
self.llm = OpenAI(model="gpt-4", temperature=0.1)
# 创建格式化工具
self.formatting_tools = [
FunctionTool.from_defaults(fn=self.format_markdown),
FunctionTool.from_defaults(fn=self.adapt_multilanguage),
FunctionTool.from_defaults(fn=self.inject_metadata),
FunctionTool.from_defaults(fn=self.optimize_responsive_layout),
FunctionTool.from_defaults(fn=self.standardize_formatting)
]
# 初始化Agent
self.agent = OpenAIAgent.from_tools(
self.formatting_tools,
llm=self.llm,
verbose=True,
system_prompt="""
你是一个专业的内容格式化专家。你的任务是:
1. 标准化Markdown格式
2. 适配多语言格式
3. 注入元数据
4. 优化响应式布局
5. 确保格式一致性
请确保输出格式规范、美观且符合标准。
"""
)
def format_markdown(self, content: str, style: str = "standard") -> dict:
"""标准化Markdown格式"""
try:
formatted_content = content
# 标准化标题格式
formatted_content = re.sub(r'^(#{1,6})([^\s])', r'`\1`(\2) \2', formatted_content, flags=re.MULTILINE)
# 标准化列表格式
formatted_content = re.sub(r'^(\s*)-([^\s])', r'\1- \2', formatted_content, flags=re.MULTILINE)
formatted_content = re.sub(r'^(\s*)\*([^\s])', r'\1* \2', formatted_content, flags=re.MULTILINE)
# 标准化代码块
formatted_content = re.sub(r'```(\w+)\n', r'```\1\n', formatted_content)
# 标准化链接格式
formatted_content = re.sub(r'\[([^\]]+)\]\s*\(([^\)]+)\)', r'\1', formatted_content)
# 标准化表格格式
lines = formatted_content.split('\n')
formatted_lines = []
for line in lines:
if '|' in line and line.strip().startswith('|'):
# 格式化表格行
cells = [cell.strip() for cell in line.split('|')]
formatted_line = '| ' + ' | '.join(cells[1:-1]) + ' |'
formatted_lines.append(formatted_line)
else:
formatted_lines.append(line)
formatted_content = '\n'.join(formatted_lines)
return {
'success': True,
'formatted_content': formatted_content,
'style': style,
'changes_made': ['标题格式化', '列表格式化', '代码块格式化', '链接格式化', '表格格式化']
}
except Exception as e:
return {
'success': False,
'error': str(e),
'formatted_content': content
}
def adapt_multilanguage(self, content: str, target_language: str = "zh-CN") -> dict:
"""适配多语言格式"""
try:
adapted_content = content
# 语言特定的格式调整
if target_language in ['zh-CN', 'zh-TW', 'ja', 'ko']:
# 中日韩语言的标点符号调整
adapted_content = re.sub(r'([\u4e00-\u9fff])\s*,\s*', r'\1,', adapted_content)
adapted_content = re.sub(r'([\u4e00-\u9fff])\s*\.\s*', r'\1。', adapted_content)
adapted_content = re.sub(r'([\u4e00-\u9fff])\s*;\s*', r'\1;', adapted_content)
adapted_content = re.sub(r'([\u4e00-\u9fff])\s*:\s*', r'\1:', adapted_content)
# 引号调整
adapted_content = re.sub(r'"([^"]+)"', r'"\1"', adapted_content)
elif target_language in ['ar', 'he']:
# 从右到左语言的特殊处理
# 这里可以添加RTL语言的特殊格式处理
pass
# 日期格式本地化
date_patterns = {
'zh-CN': r'(\d{4})年(\d{1,2})月(\d{1,2})日',
'en-US': r'(\w+) (\d{1,2}), (\d{4})',
'ja': r'(\d{4})年(\d{1,2})月(\d{1,2})日'
}
return {
'success': True,
'adapted_content': adapted_content,
'target_language': target_language,
'adaptations_made': ['标点符号本地化', '引号格式调整', '日期格式本地化']
}
except Exception as e:
return {
'success': False,
'error': str(e),
'adapted_content': content
}
def inject_metadata(self, content: str, metadata: dict) -> dict:
"""注入元数据"""
try:
# 构建YAML前置元数据
frontmatter = {
'title': metadata.get('title', ''),
'author': metadata.get('author', ''),
'date': metadata.get('date', datetime.now().isoformat()),
'tags': metadata.get('tags', []),
'categories': metadata.get('categories', []),
'description': metadata.get('description', ''),
'keywords': metadata.get('keywords', []),
'lang': metadata.get('language', 'zh-CN')
}
# 生成YAML前置内容
yaml_frontmatter = yaml.dump(frontmatter, default_flow_style=False, allow_unicode=True)
# 组合最终内容
final_content = f"---\n{yaml_frontmatter}---\n\n{content}"
# 在内容末尾添加元数据注释
metadata_comment = f"\n\n<!-- Generated on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} -->"
final_content += metadata_comment
return {
'success': True,
'content_with_metadata': final_content,
'frontmatter': frontmatter,
'metadata_injected': True
}
except Exception as e:
return {
'success': False,
'error': str(e),
'content_with_metadata': content
}
def optimize_responsive_layout(self, content: str) -> dict:
"""优化响应式布局"""
try:
optimized_content = content
# 添加响应式图片
img_pattern = r'!\[([^\]]*)\]\(([^\)]+)\)'
def replace_img(match):
alt_text = match.group(1)
img_url = match.group(2)
return f'<img src="{img_url}" alt="{alt_text}" style="max-width: 100%; height: auto;" />'
optimized_content = re.sub(img_pattern, replace_img, optimized_content)
# 添加响应式表格包装
table_pattern = r'(\|[^\n]+\|\n\|[-\s\|]+\|\n(?:\|[^\n]+\|\n?)+)'
def wrap_table(match):
table_content = match.group(1)
return f'<div style="overflow-x: auto;">\n\n{table_content}\n\n</div>'
optimized_content = re.sub(table_pattern, wrap_table, optimized_content, flags=re.MULTILINE)
# 添加代码块的响应式样式
code_pattern = r'```([^\n]*)\n([^`]+)```'
def wrap_code(match):
lang = match.group(1)
code_content = match.group(2)
return f'```{lang}\n{code_content}```\n<style>\npre \n</style>'
optimized_content = re.sub(code_pattern, wrap_code, optimized_content, flags=re.DOTALL)
return {
'success': True,
'optimized_content': optimized_content,
'optimizations': ['响应式图片', '响应式表格', '响应式代码块']
}
except Exception as e:
return {
'success': False,
'error': str(e),
'optimized_content': content
}
def standardize_formatting(self, content: str) -> dict:
"""标准化格式"""
try:
standardized_content = content
# 统一行尾
standardized_content = re.sub(r'\r\n|\r', '\n', standardized_content)
# 移除多余空行
standardized_content = re.sub(r'\n{3,}', '\n\n', standardized_content)
# 统一缩进(使用空格)
lines = standardized_content.split('\n')
standardized_lines = []
for line in lines:
# 将制表符转换为4个空格
standardized_line = line.expandtabs(4)
standardized_lines.append(standardized_line)
standardized_content = '\n'.join(standardized_lines)
# 移除行尾空格
standardized_content = re.sub(r' +$', '', standardized_content, flags=re.MULTILINE)
return {
'success': True,
'standardized_content': standardized_content,
'standardizations': ['统一行尾', '移除多余空行', '统一缩进', '移除行尾空格']
}
except Exception as e:
return {
'success': False,
'error': str(e),
'standardized_content': content
}
async def format_output(self, content: str, format_spec: dict):
"""执行完整的格式化处理"""
try:
# 使用Agent进行综合格式化
format_prompt = f"""
请对以下内容进行格式化处理:
内容:
{content}
格式要求:
- 样式: {format_spec.get('style', 'standard')}
- 语言: {format_spec.get('language', 'zh-CN')}
- 包含元数据: {format_spec.get('include_metadata', True)}
- 响应式优化: {format_spec.get('responsive', True)}
请提供格式化后的内容。
"""
response = await self.agent.achat(format_prompt)
# 执行各项格式化
current_content = content
# 1. Markdown格式化
markdown_result = self.format_markdown(
current_content,
format_spec.get('style', 'standard')
)
if markdown_result['success']:
current_content = markdown_result['formatted_content']
# 2. 多语言适配
if format_spec.get('language', 'zh-CN') != 'en':
multilang_result = self.adapt_multilanguage(
current_content,
format_spec.get('language', 'zh-CN')
)
if multilang_result['success']:
current_content = multilang_result['adapted_content']
# 3. 响应式优化
if format_spec.get('responsive', True):
responsive_result = self.optimize_responsive_layout(current_content)
if responsive_result['success']:
current_content = responsive_result['optimized_content']
# 4. 标准化格式
standard_result = self.standardize_formatting(current_content)
if standard_result['success']:
current_content = standard_result['standardized_content']
# 5. 元数据注入
if format_spec.get('include_metadata', True):
metadata_result = self.inject_metadata(
current_content,
format_spec.get('metadata', {})
)
if metadata_result['success']:
current_content = metadata_result['content_with_metadata']
return {
'success': True,
'agent_response': str(response),
'formatted_content': current_content,
'format_spec': format_spec,
'processing_steps': [
'Markdown格式化',
'多语言适配',
'响应式优化',
'标准化格式',
'元数据注入'
]
}
except Exception as e:
return {
'success': False,
'error': str(e),
'formatted_content': content
}
3.11 发布Agent (Publishing Agent)
职责: 内容发布与分发管理
核心能力:
- 多平台发布支持
- 发布时机优化
- 内容版本管理
- 发布状态监控
发布系统设计:
from llama_index.agent.openai import OpenAIAgent
from llama_index.core.tools import FunctionTool
from llama_index.llms.openai import OpenAI
import asyncio
import hashlib
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional
class PublishingAgent:
def __init__(self):
# 初始化LLM
self.llm = OpenAI(model="gpt-4", temperature=0.1)
# 创建发布工具
self.publishing_tools = [
FunctionTool.from_defaults(fn=self.create_version),
FunctionTool.from_defaults(fn=self.adapt_for_platform),
FunctionTool.from_defaults(fn=self.schedule_publish),
FunctionTool.from_defaults(fn=self.immediate_publish),
FunctionTool.from_defaults(fn=self.monitor_publish_status)
]
# 初始化Agent
self.agent = OpenAIAgent.from_tools(
self.publishing_tools,
llm=self.llm,
verbose=True,
system_prompt="""
你是一个专业的内容发布管理专家。你的任务是:
1. 管理内容版本
2. 适配不同发布平台
3. 调度发布时机
4. 监控发布状态
5. 优化发布策略
请确保发布流程高效、可靠且符合各平台要求。
"""
)
# 平台配置
self.platform_configs = {
'blog': {
'max_length': 10000,
'supports_html': True,
'supports_markdown': True,
'image_formats': ['jpg', 'png', 'gif', 'webp']
},
'medium': {
'max_length': 8000,
'supports_html': False,
'supports_markdown': True,
'image_formats': ['jpg', 'png', 'gif']
},
'linkedin': {
'max_length': 3000,
'supports_html': False,
'supports_markdown': False,
'image_formats': ['jpg', 'png']
},
'twitter': {
'max_length': 280,
'supports_html': False,
'supports_markdown': False,
'image_formats': ['jpg', 'png', 'gif']
}
}
# 版本存储
self.versions = {}
self.publish_history = []
def create_version(self, content: str, metadata: dict = None) -> dict:
"""创建内容版本"""
try:
# 生成版本ID
content_hash = hashlib.md5(content.encode()).hexdigest()[:8]
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
version_id = f"v_{timestamp}_{content_hash}"
# 创建版本记录
version = {
'id': version_id,
'content': content,
'metadata': metadata or {},
'created_at': datetime.now().isoformat(),
'content_length': len(content),
'word_count': len(content.split()),
'status': 'created'
}
# 存储版本
self.versions[version_id] = version
return {
'success': True,
'version': version,
'version_id': version_id
}
except Exception as e:
return {
'success': False,
'error': str(e),
'version_id': None
}
def adapt_for_platform(self, content: str, platform: str) -> dict:
"""为特定平台适配内容"""
try:
if platform not in self.platform_configs:
return {
'success': False,
'error': f'不支持的平台: {platform}',
'adapted_content': content
}
config = self.platform_configs[platform]
adapted_content = content
adaptations = []
# 长度限制
if len(adapted_content) > config['max_length']:
adapted_content = adapted_content[:config['max_length']] + '...'
adaptations.append(f'内容截断至{config["max_length"]}字符')
# Markdown支持
if not config['supports_markdown']:
# 移除Markdown格式
import re
adapted_content = re.sub(r'\*\*([^*]+)\*\*', r'\1', adapted_content) # 粗体
adapted_content = re.sub(r'\*([^*]+)\*', r'\1', adapted_content) # 斜体
adapted_content = re.sub(r'`([^`]+)`', r'\1', adapted_content) # 代码
adapted_content = re.sub(r'#{1,6}\s*', '', adapted_content) # 标题
adapted_content = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', adapted_content) # 链接
adaptations.append('移除Markdown格式')
# HTML支持
if not config['supports_html']:
import re
adapted_content = re.sub(r'<[^>]+>', '', adapted_content)
adaptations.append('移除HTML标签')
# 平台特定优化
if platform == 'twitter':
# Twitter特殊处理:添加话题标签
lines = adapted_content.split('\n')
if lines:
first_line = lines[0]
if len(first_line) < 200:
adapted_content = first_line + ' #技术分享 #AI'
adaptations.append('添加Twitter话题标签')
elif platform == 'linkedin':
# LinkedIn特殊处理:专业化语调
if not adapted_content.startswith('在'):
adapted_content = '在技术发展的今天,' + adapted_content
adaptations.append('添加LinkedIn专业开头')
return {
'success': True,
'adapted_content': adapted_content,
'platform': platform,
'adaptations': adaptations,
'original_length': len(content),
'adapted_length': len(adapted_content)
}
except Exception as e:
return {
'success': False,
'error': str(e),
'adapted_content': content
}
def schedule_publish(self, content_versions: dict, schedule_time: str) -> dict:
"""调度发布"""
try:
from datetime import datetime
# 解析调度时间
if isinstance(schedule_time, str):
schedule_dt = datetime.fromisoformat(schedule_time)
else:
schedule_dt = schedule_time
# 创建调度任务
schedule_id = f"schedule_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
schedule_task = {
'id': schedule_id,
'content_versions': content_versions,
'schedule_time': schedule_dt.isoformat(),
'status': 'scheduled',
'created_at': datetime.now().isoformat()
}
# 这里可以集成实际的任务调度系统(如Celery)
# 现在只是模拟调度
return {
'success': True,
'schedule_id': schedule_id,
'schedule_task': schedule_task,
'message': f'已调度在 {schedule_dt} 发布'
}
except Exception as e:
return {
'success': False,
'error': str(e),
'schedule_id': None
}
async def immediate_publish(self, content_versions: dict) -> dict:
"""立即发布"""
try:
publish_results = {}
publish_id = f"publish_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
for platform, content in content_versions.items():
try:
# 模拟发布到各平台
# 实际实现中这里会调用各平台的API
if platform == 'blog':
result = await self._publish_to_blog(content)
elif platform == 'medium':
result = await self._publish_to_medium(content)
elif platform == 'linkedin':
result = await self._publish_to_linkedin(content)
elif platform == 'twitter':
result = await self._publish_to_twitter(content)
else:
result = {
'success': False,
'error': f'不支持的平台: {platform}'
}
publish_results[platform] = result
except Exception as e:
publish_results[platform] = {
'success': False,
'error': str(e),
'platform': platform
}
# 记录发布历史
publish_record = {
'id': publish_id,
'timestamp': datetime.now().isoformat(),
'platforms': list(content_versions.keys()),
'results': publish_results,
'success_count': sum(1 for r in publish_results.values() if r.get('success')),
'total_count': len(publish_results)
}
self.publish_history.append(publish_record)
return {
'success': True,
'publish_id': publish_id,
'results': publish_results,
'summary': publish_record
}
except Exception as e:
return {
'success': False,
'error': str(e),
'publish_id': None
}
async def _publish_to_blog(self, content: str) -> dict:
"""发布到博客平台"""
# 模拟博客发布
await asyncio.sleep(0.5) # 模拟网络延迟
return {
'success': True,
'platform': 'blog',
'url': 'https://blog.example.com/post/123',
'published_at': datetime.now().isoformat()
}
async def _publish_to_medium(self, content: str) -> dict:
"""发布到Medium"""
await asyncio.sleep(0.8)
return {
'success': True,
'platform': 'medium',
'url': 'https://medium.com/@user/post-abc123',
'published_at': datetime.now().isoformat()
}
async def _publish_to_linkedin(self, content: str) -> dict:
"""发布到LinkedIn"""
await asyncio.sleep(0.6)
return {
'success': True,
'platform': 'linkedin',
'url': 'https://linkedin.com/posts/user_post_123',
'published_at': datetime.now().isoformat()
}
async def _publish_to_twitter(self, content: str) -> dict:
"""发布到Twitter"""
await asyncio.sleep(0.3)
return {
'success': True,
'platform': 'twitter',
'url': 'https://twitter.com/user/status/123456789',
'published_at': datetime.now().isoformat()
}
def monitor_publish_status(self, publish_id: str) -> dict:
"""监控发布状态"""
try:
# 查找发布记录
publish_record = None
for record in self.publish_history:
if record['id'] == publish_id:
publish_record = record
break
if not publish_record:
return {
'success': False,
'error': f'未找到发布记录: {publish_id}',
'status': 'not_found'
}
# 分析发布状态
total_platforms = publish_record['total_count']
success_platforms = publish_record['success_count']
failure_platforms = total_platforms - success_platforms
status = 'completed' if failure_platforms == 0 else 'partial_failure' if success_platforms > 0 else 'failed'
return {
'success': True,
'publish_id': publish_id,
'status': status,
'total_platforms': total_platforms,
'success_platforms': success_platforms,
'failure_platforms': failure_platforms,
'publish_record': publish_record
}
except Exception as e:
return {
'success': False,
'error': str(e),
'status': 'error'
}
async def publish_content(self, content: str, publish_config: dict):
"""执行完整的发布流程"""
try:
# 使用Agent进行发布管理
publish_prompt = f"""
请管理以下内容的发布流程:
内容长度: {len(content)} 字符
目标平台: {publish_config.get('platforms', [])}
发布时间: {publish_config.get('schedule_time', '立即发布')}
请提供发布建议和优化方案。
"""
response = await self.agent.achat(publish_prompt)
# 1. 创建版本
version_result = self.create_version(content, publish_config.get('metadata', {}))
if not version_result['success']:
return version_result
version_id = version_result['version_id']
# 2. 平台适配
adapted_content = {}
platforms = publish_config.get('platforms', ['blog'])
for platform in platforms:
adapt_result = self.adapt_for_platform(content, platform)
if adapt_result['success']:
adapted_content[platform] = adapt_result['adapted_content']
else:
adapted_content[platform] = content # 使用原内容作为备选
# 3. 发布执行
if publish_config.get('schedule_time'):
# 调度发布
publish_result = self.schedule_publish(adapted_content, publish_config['schedule_time'])
else:
# 立即发布
publish_result = await self.immediate_publish(adapted_content)
return {
'success': True,
'agent_response': str(response),
'version_id': version_id,
'adapted_content': {k: len(v) for k, v in adapted_content.items()}, # 只返回长度信息
'publish_result': publish_result,
'platforms': platforms
}
except Exception as e:
return {
'success': False,
'error': str(e),
'version_id': None
}
4. 技术实现细节
4.1 RAG检索增强架构
from llama_index.core import VectorStoreIndex, ServiceContext, Document
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.core.postprocessor import SentenceTransformerRerank
import chromadb
class RAGEngine:
def __init__(self):
# 初始化LLM和嵌入模型
self.llm = OpenAI(model="gpt-4", temperature=0.1)
self.embed_model = OpenAIEmbedding(model="text-embedding-ada-002")
# 初始化向量存储
chroma_client = chromadb.PersistentClient(path="./chroma_db")
chroma_collection = chroma_client.get_or_create_collection("blog_knowledge")
self.vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
# 配置服务上下文
self.service_context = ServiceContext.from_defaults(
llm=self.llm,
embed_model=self.embed_model
)
# 初始化重排序器
self.reranker = SentenceTransformerRerank(
model="cross-encoder/ms-marco-MiniLM-L-12-v2",
top_n=10
)
async def build_index(self, documents):
"""构建向量索引"""
self.index = VectorStoreIndex.from_documents(
documents,
service_context=self.service_context,
vector_store=self.vector_store
)
return self.index
async def retrieve_and_generate(self, query, k=10):
"""检索并生成回答"""
# 创建查询引擎
query_engine = self.index.as_query_engine(
similarity_top_k=k*2,
node_postprocessors=[self.reranker],
response_mode="tree_summarize"
)
# 执行查询
response = await query_engine.aquery(query)
return {
'answer': response.response,
'source_nodes': response.source_nodes,
'metadata': response.metadata
}
4.2 多源数据融合算法
from llama_index.readers.web import SimpleWebPageReader
from llama_index.readers.arxiv import ArxivReader
from llama_index.readers.notion import NotionPageReader
from llama_index.core.schema import Document
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import (
TitleExtractor,
QuestionsAnsweredExtractor,
SummaryExtractor
)
from llama_index.core.ingestion import IngestionPipeline
class MultiSourceDataFusion:
def __init__(self, service_context):
self.service_context = service_context
# 初始化各种数据读取器
self.web_reader = SimpleWebPageReader(html_to_text=True)
self.arxiv_reader = ArxivReader()
self.notion_reader = NotionPageReader(integration_token="your_token")
# 初始化文档处理管道
self.node_parser = SentenceSplitter(
chunk_size=1024,
chunk_overlap=200
)
# 初始化元数据提取器
self.extractors = [
TitleExtractor(nodes=5),
QuestionsAnsweredExtractor(questions=3),
SummaryExtractor(summaries=["prev", "self"])
]
# 创建数据处理管道
self.pipeline = IngestionPipeline(
transformations=[
self.node_parser,
*self.extractors
]
)
async def load_multi_source_data(self, sources_config):
"""从多个数据源加载数据"""
all_documents = []
# Web数据源
if 'web_urls' in sources_config:
web_docs = self.web_reader.load_data(sources_config['web_urls'])
for doc in web_docs:
doc.metadata['source_type'] = 'web'
doc.metadata['credibility_score'] = 0.7
all_documents.extend(web_docs)
# arXiv学术论文
if 'arxiv_papers' in sources_config:
arxiv_docs = self.arxiv_reader.load_papers(
sources_config['arxiv_papers']
)
for doc in arxiv_docs:
doc.metadata['source_type'] = 'academic'
doc.metadata['credibility_score'] = 0.9
all_documents.extend(arxiv_docs)
# Notion知识库
if 'notion_pages' in sources_config:
notion_docs = self.notion_reader.load_data(
page_ids=sources_config['notion_pages']
)
for doc in notion_docs:
doc.metadata['source_type'] = 'knowledge_base'
doc.metadata['credibility_score'] = 0.8
all_documents.extend(notion_docs)
return all_documents
async def process_and_fuse(self, documents):
"""处理和融合多源数据"""
# 通过管道处理文档
processed_nodes = await self.pipeline.arun(documents=documents)
# 基于相似度去重
unique_nodes = self._deduplicate_nodes(processed_nodes)
# 基于可信度评分排序
ranked_nodes = self._rank_by_credibility(unique_nodes)
return ranked_nodes
def _deduplicate_nodes(self, nodes, similarity_threshold=0.85):
"""基于语义相似度去重"""
unique_nodes = []
for node in nodes:
is_duplicate = False
for unique_node in unique_nodes:
# 使用嵌入模型计算相似度
similarity = self._calculate_similarity(
node.text, unique_node.text
)
if similarity > similarity_threshold:
# 保留可信度更高的节点
if (node.metadata.get('credibility_score', 0) >
unique_node.metadata.get('credibility_score', 0)):
unique_nodes.remove(unique_node)
unique_nodes.append(node)
is_duplicate = True
break
if not is_duplicate:
unique_nodes.append(node)
return unique_nodes
def _rank_by_credibility(self, nodes):
"""基于可信度评分排序"""
return sorted(
nodes,
key=lambda x: x.metadata.get('credibility_score', 0),
reverse=True
)
def _calculate_similarity(self, text1, text2):
"""计算文本相似度"""
# 使用服务上下文中的嵌入模型
embed1 = self.service_context.embed_model.get_text_embedding(text1)
embed2 = self.service_context.embed_model.get_text_embedding(text2)
# 计算余弦相似度
import numpy as np
return np.dot(embed1, embed2) / (np.linalg.norm(embed1) * np.linalg.norm(embed2))
4.3 Agent间通信协议
from llama_index.core.workflow import (
Event,
StartEvent,
StopEvent,
Workflow,
step,
Context
)
from llama_index.agent.openai import OpenAIAgent
from llama_index.core.tools import QueryEngineTool, ToolMetadata
from typing import Any, Dict
import asyncio
from datetime import datetime
# 定义工作流事件
class PlanningEvent(Event):
plan: Dict[str, Any]
topic: str
class RetrievalEvent(Event):
retrieved_data: Dict[str, Any]
plan: Dict[str, Any]
class WritingEvent(Event):
content: str
metadata: Dict[str, Any]
class ReviewEvent(Event):
reviewed_content: str
suggestions: list
class FormattingEvent(Event):
formatted_content: str
format_type: str
# 博客写作工作流
class BlogWritingWorkflow(Workflow):
def __init__(self, service_context, agents_config):
super().__init__()
self.service_context = service_context
self.agents_config = agents_config
# 初始化各个专业Agent
self._initialize_agents()
def _initialize_agents(self):
"""初始化各个专业Agent"""
# 规划Agent
self.planning_agent = OpenAIAgent.from_tools(
tools=[],
llm=self.service_context.llm,
system_prompt="你是一个专业的内容规划专家,负责分析主题并制定写作计划。"
)
# 检索Agent工具
retrieval_tools = self._create_retrieval_tools()
self.retrieval_agent = OpenAIAgent.from_tools(
tools=retrieval_tools,
llm=self.service_context.llm,
system_prompt="你是一个信息检索专家,负责从多个数据源获取相关信息。"
)
# 写作Agent
self.writing_agent = OpenAIAgent.from_tools(
tools=[],
llm=self.service_context.llm,
system_prompt="你是一个专业的技术写作专家,负责创作高质量的技术博客内容。"
)
# 审校Agent
self.review_agent = OpenAIAgent.from_tools(
tools=[],
llm=self.service_context.llm,
system_prompt="你是一个专业的内容审校专家,负责检查内容质量和准确性。"
)
# 格式化Agent
self.formatting_agent = OpenAIAgent.from_tools(
tools=[],
llm=self.service_context.llm,
system_prompt="你是一个专业的内容格式化专家,负责将内容转换为指定格式。"
)
def _create_retrieval_tools(self):
"""创建检索工具"""
# 这里可以添加各种检索工具
return []
@step
async def planning_step(self, ctx: Context, ev: StartEvent) -> PlanningEvent:
"""规划步骤"""
topic = ev.topic
requirements = ev.requirements
planning_prompt = f"""
请为主题"{topic}"制定详细的写作计划,包括:
1. 文章结构大纲
2. 每个章节的核心内容
3. 需要检索的信息类型
4. 目标受众分析
要求:{requirements}
"""
response = await self.planning_agent.achat(planning_prompt)
plan = self._parse_plan(response.response)
ctx.data["planning_result"] = {
"plan": plan,
"timestamp": datetime.now(),
"agent": "planning"
}
return PlanningEvent(plan=plan, topic=topic)
@step
async def retrieval_step(self, ctx: Context, ev: PlanningEvent) -> RetrievalEvent:
"""检索步骤"""
plan = ev.plan
topic = ev.topic
retrieval_prompt = f"""
基于以下写作计划,请检索相关信息:
主题:{topic}
计划:{plan}
请使用可用的检索工具获取最新和最权威的信息。
"""
response = await self.retrieval_agent.achat(retrieval_prompt)
retrieved_data = self._parse_retrieval_result(response.response)
ctx.data["retrieval_result"] = {
"data": retrieved_data,
"timestamp": datetime.now(),
"agent": "retrieval"
}
return RetrievalEvent(retrieved_data=retrieved_data, plan=plan)
@step
async def writing_step(self, ctx: Context, ev: RetrievalEvent) -> WritingEvent:
"""写作步骤"""
plan = ev.plan
retrieved_data = ev.retrieved_data
writing_prompt = f"""
基于以下信息撰写技术博客:
写作计划:{plan}
检索信息:{retrieved_data}
要求:
1. 结构清晰,逻辑严密
2. 包含代码示例和实际案例
3. 语言专业但易懂
4. 添加适当的技术细节
"""
response = await self.writing_agent.achat(writing_prompt)
content = response.response
ctx.data["writing_result"] = {
"content": content,
"timestamp": datetime.now(),
"agent": "writing"
}
return WritingEvent(
content=content,
metadata={"sources": retrieved_data, "plan": plan}
)
@step
async def review_step(self, ctx: Context, ev: WritingEvent) -> ReviewEvent:
"""审校步骤"""
content = ev.content
metadata = ev.metadata
review_prompt = f"""
请审校以下技术博客内容:
{content}
审校要点:
1. 技术准确性
2. 逻辑连贯性
3. 语言表达
4. 结构完整性
5. 引用规范性
请提供修改建议和最终版本。
"""
response = await self.review_agent.achat(review_prompt)
reviewed_content, suggestions = self._parse_review_result(response.response)
ctx.data["review_result"] = {
"content": reviewed_content,
"suggestions": suggestions,
"timestamp": datetime.now(),
"agent": "review"
}
return ReviewEvent(
reviewed_content=reviewed_content,
suggestions=suggestions
)
@step
async def formatting_step(self, ctx: Context, ev: ReviewEvent) -> StopEvent:
"""格式化步骤"""
content = ev.reviewed_content
formatting_prompt = f"""
请将以下内容格式化为标准的Markdown格式:
{content}
要求:
1. 添加适当的标题层级
2. 格式化代码块
3. 添加目录
4. 优化排版
"""
response = await self.formatting_agent.achat(formatting_prompt)
formatted_content = response.response
ctx.data["formatting_result"] = {
"content": formatted_content,
"timestamp": datetime.now(),
"agent": "formatting"
}
# 返回最终结果
return StopEvent(
result={
"final_content": formatted_content,
"workflow_data": ctx.data,
"success": True
}
)
def _parse_plan(self, plan_text):
"""解析规划结果"""
# 实现计划解析逻辑
return {"outline": plan_text, "sections": []}
def _parse_retrieval_result(self, retrieval_text):
"""解析检索结果"""
# 实现检索结果解析逻辑
return {"sources": [], "content": retrieval_text}
def _parse_review_result(self, review_text):
"""解析审校结果"""
# 实现审校结果解析逻辑
return review_text, []
# 工作流管理器
class WorkflowManager:
def __init__(self, service_context):
self.service_context = service_context
self.active_workflows = {}
async def start_blog_writing(self, topic, requirements):
"""启动博客写作工作流"""
workflow = BlogWritingWorkflow(
self.service_context,
agents_config={}
)
# 启动工作流
result = await workflow.run(
topic=topic,
requirements=requirements
)
return result
async def get_workflow_status(self, workflow_id):
"""获取工作流状态"""
if workflow_id in self.active_workflows:
return self.active_workflows[workflow_id].get_status()
return None
5. 部署配置
5.1 Docker容器化部署
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
5.2 Kubernetes部署配置
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: agentic-rag-blog
spec:
replicas: 3
selector:
matchLabels:
app: agentic-rag-blog
template:
metadata:
labels:
app: agentic-rag-blog
spec:
containers:
- name: blog-agent
image: agentic-rag-blog:latest
ports:
- containerPort: 8000
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: api-secrets
key: openai-key
resources:
requests:
memory: "2Gi"
cpu: "1000m"
limits:
memory: "4Gi"
cpu: "2000m"
5.3 环境配置
# config.yaml
system:
max_concurrent_agents: 10
timeout_seconds: 300
retry_attempts: 3
agents:
intent_understanding:
enabled: true
confidence_threshold: 0.7
context_window: 5
optimization_strategies:
- "topic_clarification"
- "audience_identification"
- "content_structure_suggestion"
coordinator:
enabled: true
max_tasks: 100
intent_integration: true
planning:
enabled: true
outline_depth: 3
intent_aware: true
retrieval:
enabled: true
sources:
- web
- academic
- knowledge_base
max_results_per_source: 20
writing:
enabled: true
model: "gpt-4"
temperature: 0.7
max_tokens: 2000
style:
enabled: true
default_style: "technical"
citation:
enabled: true
format: "APA"
image_generation:
enabled: true
providers:
- "dalle-3"
- "midjourney"
review:
enabled: true
quality_threshold: 0.8
formatting:
enabled: true
output_formats:
- "markdown"
- "html"
databases:
vector_store:
type: "chromadb"
url: "http://chromadb:8000"
cache:
type: "redis"
url: "redis://redis:6379"
metadata:
type: "postgresql"
url: "postgresql://user:pass@postgres:5432/blogdb"
apis:
openai:
base_url: "https://api.openai.com/v1"
model: "gpt-4"
search:
google:
api_key: "${GOOGLE_API_KEY}"
bing:
api_key: "${BING_API_KEY}"
academic:
arxiv:
base_url: "http://export.arxiv.org/api/query"
semantic_scholar:
api_key: "${SEMANTIC_SCHOLAR_KEY}"
6. 使用示例
6.1 API调用示例
# 客户端调用示例
import asyncio
from agentic_rag_client import BlogAgentClient
async def main():
client = BlogAgentClient(base_url="http://localhost:8000")
# 示例1: 明确的需求
task1 = await client.create_blog_task({
"topic": "微服务架构设计模式",
"language": "zh",
"target_audience": "senior_developers",
"length": "long",
"include_code_examples": True,
"include_diagrams": True,
"style": "technical_tutorial"
})
# 示例2: 模糊的用户输入(展示意图理解能力)
user_input = "我想了解一下那个什么分布式的东西,就是能让系统更稳定的"
# 系统会自动进行意图理解和需求优化
task2 = await client.create_blog_task_from_natural_input({
"user_input": user_input,
"context": {
"user_level": "beginner",
"previous_topics": ["基础架构", "单体应用"]
}
})
# 意图理解结果示例
intent_analysis = await client.get_intent_analysis(task2.id)
print(f"识别的意图: {intent_analysis.intent}")
print(f"优化后的主题: {intent_analysis.optimized_topic}")
print(f"建议的内容结构: {intent_analysis.suggested_structure}")
# 监控任务进度(以task1为例)
while not task1.is_complete():
status = await client.get_task_status(task1.id)
print(f"进度: {status.progress}% - 当前阶段: {status.current_stage}")
await asyncio.sleep(5)
# 获取最终结果
result = await client.get_task_result(task1.id)
print("博客生成完成!")
print(f"标题: {result.title}")
print(f"字数: {result.word_count}")
print(f"质量评分: {result.quality_score}")
# 保存到文件
with open(f"{result.title}.md", "w", encoding="utf-8") as f:
f.write(result.content)
# 处理意图理解任务的结果
result2 = await client.get_task_result(task2.id)
print(f"\n意图理解任务完成!")
print(f"原始输入: {user_input}")
print(f"生成的标题: {result2.title}")
print(f"内容适配程度: {result2.adaptation_score}")
if __name__ == "__main__":
asyncio.run(main())
6.2 输出示例
生成的博客将包含以下结构化内容:
# 微服务架构设计模式:构建可扩展分布式系统的最佳实践
## 摘要
本文深入探讨微服务架构的核心设计模式...
## 目录
1. [微服务架构概述](#微服务架构概述)
2. [核心设计模式](#核心设计模式)
3. [实现最佳实践](#实现最佳实践)
...
7. 总结
本设计提供了一个完整的、可扩展的技术博客撰写系统,通过多Agent协作实现了从需求分析到内容发布的全流程自动化。系统具备强大的RAG检索能力、多源数据融合、智能写作辅助和质量控制等核心功能,能够生成高质量的技术博客内容。
主要优势:
- 模块化设计:每个Agent职责明确,易于维护和扩展
- 多源检索:整合Web、学术和私有数据源
- 质量保证:多层次质量控制机制
- 多语言支持:支持中英文双语输出
- 可扩展性:基于容器化部署,支持水平扩展
该系统可广泛应用于技术团队的内容创作、知识管理和对外技术分享等场景。