技术博客撰写 Agentic RAG Agent 系统设计

目录


1. 系统概述

本系统是一个面向博客作者的智能技术博客撰写平台,采用多Agent协作架构(Planner-Writer-Critic模式),为用户提供从主题输入到文章发布的全流程自动化写作服务。

1.1 目标用户与交互方式

  • 目标用户:技术博客作者、技术写作者、内容创作者
  • 交互方式:Web界面交互,用户无需编程背景,通过直观的图形界面完成所有操作
  • 输入方式:用户仅需输入文章主题,系统自动完成后续所有写作流程

1.2 核心能力

  • 智能意图识别:深度理解用户输入,识别真实写作意图和隐含需求
  • 需求优化建议:基于用户输入提供写作方向优化和内容增强建议
  • 智能结构规划:基于主题自动生成文章大纲和章节结构
  • 多源信息检索:集成Web搜索、arXiv、Semantic Scholar等外部检索源
  • 智能内容生成:基于RAG检索增强生成技术,产出高质量技术内容
  • 自动引用管理:智能生成和管理学术引用,支持多种引用格式
  • 风格化写作:支持多种写作风格控制和个性化调整
  • AI图片生成:自动生成技术图表、架构图和插图
  • 智能审校:提供语法检查、事实验证和质量评估
  • 多格式输出:支持Markdown格式输出,兼容中英文双语

1.3 技术特色

  • 智能意图理解:采用先进的NLP技术和上下文分析,准确识别用户的写作意图和潜在需求
  • 多Agent协作:采用意图理解Agent、协调Agent、规划Agent、检索Agent、写作Agent等11个专业Agent分工协作
  • RAG增强生成:结合向量检索和大语言模型,确保内容的准确性和时效性
  • 多源数据融合:整合Web搜索、学术数据库和私有知识库的信息
  • 质量控制机制:多层次质量检查,包括事实验证、逻辑一致性和技术准确性审核
  • 实时信息获取:支持获取最新的技术动态和研究进展

2. 系统架构

2.1 核心架构图

graph TB
    subgraph "用户交互层"
        UI[Web界面/API]
        REQ[用户需求]
    end
    
    subgraph "意图理解层"
        INTENT[意图理解Agent]
        NLP[自然语言处理模块]
        CONTEXT[上下文分析模块]
        OPTIM[需求优化模块]
    end
    
    subgraph "协调层"
        COORD[协调Agent]
        PLAN[规划Agent]
    end
    
    subgraph "检索层"
        RETR[检索Agent]
        WEB[Web检索模块]
        ACAD[学术数据库模块]
        KNOW[知识库模块]
    end
    
    subgraph "内容生成层"
        WRITE[写作Agent]
        STYLE[风格化Agent]
        REF[引用Agent]
        IMG[图片生成Agent]
    end
    
    subgraph "质量控制层"
        QC[审校Agent]
        FACT[事实检查模块]
        QUAL[质量评估模块]
    end
    
    subgraph "输出层"
        FMT[格式化Agent]
        PUB[发布Agent]
    end
    
    subgraph "数据层"
        VEC[向量数据库]
        CACHE[缓存系统]
        META[元数据存储]
        INTENT_DB[意图知识库]
    end
    
    UI --> INTENT
    INTENT --> NLP
    INTENT --> CONTEXT
    INTENT --> OPTIM
    INTENT --> COORD
    COORD --> PLAN
    PLAN --> RETR
    RETR --> WEB
    RETR --> ACAD
    RETR --> KNOW
    RETR --> WRITE
    WRITE --> STYLE
    STYLE --> REF
    REF --> IMG
    IMG --> QC
    QC --> FACT
    QC --> QUAL
    QC --> FMT
    FMT --> PUB
    
    RETR --> VEC
    WRITE --> CACHE
    QC --> META
    INTENT --> INTENT_DB

3. Agent详细设计

3.1 意图理解Agent (Intent Understanding Agent)

职责: 用户意图识别与需求优化

核心能力:

  • 自然语言意图识别与分类
  • 隐含需求挖掘与分析
  • 写作目标明确化
  • 需求优化建议生成
  • 用户交互优化

技术实现:

from llama_index.agent.openai import OpenAIAgent
from llama_index.core.tools import FunctionTool
from llama_index.core import VectorStoreIndex
from typing import Dict, Any, List
import re
import json

class IntentUnderstandingAgent:
    def __init__(self, service_context, intent_knowledge_base):
        self.service_context = service_context
        self.intent_kb = intent_knowledge_base
        
        # 意图分类模型
        self.intent_categories = {
            'tutorial': '教程类文章',
            'analysis': '技术分析类',
            'comparison': '对比评测类',
            'introduction': '技术介绍类',
            'best_practices': '最佳实践类',
            'troubleshooting': '问题解决类',
            'review': '技术评论类',
            'news': '技术新闻类'
        }
        
        # 创建意图理解工具
        self.intent_tools = self._create_intent_tools()
        
        # 初始化意图理解Agent
        self.agent = OpenAIAgent.from_tools(
            tools=self.intent_tools,
            llm=service_context.llm,
            system_prompt="""
            你是一个专业的意图理解专家,负责:
            1. 深度分析用户的写作需求和真实意图
            2. 识别用户输入中的隐含信息和潜在需求
            3. 提供写作方向的优化建议
            4. 明确文章的目标受众和写作风格
            5. 生成结构化的需求分析报告
            
            请始终以用户体验为中心,提供专业、准确的意图分析。
            """,
            verbose=True
        )
    
    def _create_intent_tools(self):
        """创建意图理解工具"""
        
        def analyze_user_intent(user_input: str) -> str:
            """分析用户意图"""
            try:
                # 关键词提取
                keywords = self._extract_keywords(user_input)
                
                # 意图分类
                intent_category = self._classify_intent(user_input)
                
                # 技术领域识别
                tech_domain = self._identify_tech_domain(user_input)
                
                # 目标受众分析
                target_audience = self._analyze_target_audience(user_input)
                
                analysis_result = {
                    'keywords': keywords,
                    'intent_category': intent_category,
                    'tech_domain': tech_domain,
                    'target_audience': target_audience,
                    'confidence_score': 0.85
                }
                
                return f"意图分析结果: {json.dumps(analysis_result, ensure_ascii=False, indent=2)}"
                
            except Exception as e:
                return f"意图分析失败: {str(e)}"
        
        def generate_optimization_suggestions(user_input: str, intent_analysis: Dict) -> str:
            """生成优化建议"""
            try:
                suggestions = []
                
                # 基于意图类型的建议
                if intent_analysis.get('intent_category') == 'tutorial':
                    suggestions.extend([
                        "建议添加实践案例和代码示例",
                        "考虑包含常见问题和解决方案",
                        "添加循序渐进的学习路径"
                    ])
                elif intent_analysis.get('intent_category') == 'analysis':
                    suggestions.extend([
                        "建议深入分析技术原理和机制",
                        "包含性能测试和基准对比",
                        "添加技术演进历史和趋势分析"
                    ])
                
                # 基于技术领域的建议
                tech_domain = intent_analysis.get('tech_domain', '')
                if 'AI' in tech_domain or 'machine learning' in tech_domain:
                    suggestions.extend([
                        "建议包含算法原理图解",
                        "添加数据集和模型性能指标",
                        "考虑包含实际应用场景"
                    ])
                
                return f"优化建议: {json.dumps(suggestions, ensure_ascii=False, indent=2)}"
                
            except Exception as e:
                return f"生成优化建议失败: {str(e)}"
        
        def clarify_requirements(user_input: str) -> str:
            """需求澄清"""
            try:
                clarification_questions = []
                
                # 检查是否缺少关键信息
                if not self._has_clear_scope(user_input):
                    clarification_questions.append("文章的具体范围和深度如何?")
                
                if not self._has_target_audience(user_input):
                    clarification_questions.append("目标读者是初学者、中级开发者还是专家?")
                
                if not self._has_practical_focus(user_input):
                    clarification_questions.append("更偏重理论分析还是实践应用?")
                
                return f"需求澄清问题: {json.dumps(clarification_questions, ensure_ascii=False, indent=2)}"
                
            except Exception as e:
                return f"需求澄清失败: {str(e)}"
        
        def enhance_topic_scope(original_topic: str, intent_analysis: Dict) -> str:
            """增强主题范围"""
            try:
                enhanced_topics = [original_topic]
                
                # 基于意图分析扩展主题
                intent_category = intent_analysis.get('intent_category')
                tech_domain = intent_analysis.get('tech_domain')
                
                if intent_category == 'tutorial':
                    enhanced_topics.extend([
                        f"{original_topic} - 入门指南",
                        f"{original_topic} - 实践案例",
                        f"{original_topic} - 常见问题解答"
                    ])
                elif intent_category == 'analysis':
                    enhanced_topics.extend([
                        f"{original_topic} - 技术原理深度解析",
                        f"{original_topic} - 性能评估与优化",
                        f"{original_topic} - 发展趋势分析"
                    ])
                
                return f"增强主题建议: {json.dumps(enhanced_topics, ensure_ascii=False, indent=2)}"
                
            except Exception as e:
                return f"主题增强失败: {str(e)}"
        
        return [
            FunctionTool.from_defaults(
                fn=analyze_user_intent,
                name="analyze_user_intent",
                description="分析用户输入的写作意图和需求"
            ),
            FunctionTool.from_defaults(
                fn=generate_optimization_suggestions,
                name="generate_optimization_suggestions",
                description="基于意图分析生成写作优化建议"
            ),
            FunctionTool.from_defaults(
                fn=clarify_requirements,
                name="clarify_requirements",
                description="识别需要澄清的需求并生成问题"
            ),
            FunctionTool.from_defaults(
                fn=enhance_topic_scope,
                name="enhance_topic_scope",
                description="基于意图分析增强和扩展主题范围"
            )
        ]
    
    def _extract_keywords(self, text: str) -> List[str]:
        """提取关键词"""
        # 简化的关键词提取逻辑
        tech_keywords = [
            'AI', 'machine learning', 'deep learning', 'neural network',
            'Python', 'JavaScript', 'React', 'Vue', 'Node.js',
            'Docker', 'Kubernetes', 'microservices', 'API',
            'database', 'SQL', 'NoSQL', 'MongoDB', 'Redis'
        ]
        
        found_keywords = []
        text_lower = text.lower()
        for keyword in tech_keywords:
            if keyword.lower() in text_lower:
                found_keywords.append(keyword)
        
        return found_keywords
    
    def _classify_intent(self, text: str) -> str:
        """分类用户意图"""
        text_lower = text.lower()
        
        if any(word in text_lower for word in ['教程', 'tutorial', '如何', 'how to', '入门']):
            return 'tutorial'
        elif any(word in text_lower for word in ['分析', 'analysis', '原理', '机制']):
            return 'analysis'
        elif any(word in text_lower for word in ['对比', 'comparison', 'vs', '比较']):
            return 'comparison'
        elif any(word in text_lower for word in ['介绍', 'introduction', '什么是']):
            return 'introduction'
        elif any(word in text_lower for word in ['最佳实践', 'best practices', '经验']):
            return 'best_practices'
        else:
            return 'general'
    
    def _identify_tech_domain(self, text: str) -> str:
        """识别技术领域"""
        domains = {
            'AI/ML': ['AI', 'machine learning', 'deep learning', 'neural network'],
            'Web开发': ['React', 'Vue', 'JavaScript', 'HTML', 'CSS', 'Node.js'],
            '后端开发': ['Python', 'Java', 'Go', 'API', 'microservices'],
            '数据库': ['SQL', 'NoSQL', 'MongoDB', 'Redis', 'database'],
            '运维': ['Docker', 'Kubernetes', 'DevOps', 'CI/CD']
        }
        
        text_lower = text.lower()
        for domain, keywords in domains.items():
            if any(keyword.lower() in text_lower for keyword in keywords):
                return domain
        
        return 'general'
    
    def _analyze_target_audience(self, text: str) -> str:
        """分析目标受众"""
        text_lower = text.lower()
        
        if any(word in text_lower for word in ['初学者', 'beginner', '入门', '新手']):
            return 'beginner'
        elif any(word in text_lower for word in ['高级', 'advanced', '专家', '深入']):
            return 'advanced'
        else:
            return 'intermediate'
    
    def _has_clear_scope(self, text: str) -> bool:
        """检查是否有明确的范围"""
        scope_indicators = ['具体', '详细', '深入', '全面', '简单', '基础']
        return any(indicator in text for indicator in scope_indicators)
    
    def _has_target_audience(self, text: str) -> bool:
        """检查是否指定了目标受众"""
        audience_indicators = ['初学者', '新手', '专家', '开发者', '工程师']
        return any(indicator in text for indicator in audience_indicators)
    
    def _has_practical_focus(self, text: str) -> bool:
        """检查是否有实践导向"""
        practical_indicators = ['实践', '案例', '示例', '代码', '项目']
        return any(indicator in text for indicator in practical_indicators)
    
    async def understand_intent(self, user_input: str) -> Dict[str, Any]:
        """理解用户意图的主要方法"""
        try:
            # 构建意图理解提示
            intent_prompt = f"""
            请分析以下用户输入的写作需求:
            
            用户输入: "{user_input}"
            
            请执行以下步骤:
            1. 使用analyze_user_intent分析用户意图
            2. 使用clarify_requirements识别需要澄清的问题
            3. 使用generate_optimization_suggestions生成优化建议
            4. 使用enhance_topic_scope增强主题范围
            
            请提供完整的意图理解报告。
            """
            
            # 使用Agent执行意图理解
            response = await self.agent.achat(intent_prompt)
            
            return {
                'success': True,
                'user_input': user_input,
                'intent_analysis': response.response,
                'recommendations': 'Based on analysis'
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': str(e),
                'user_input': user_input
            }

3.2 协调Agent (Coordinator Agent)

职责: 全局任务调度与流程控制

核心能力:

  • 接收意图理解Agent的分析结果
  • 任务分解与优先级管理
  • Agent间通信协调
  • 异常处理与错误恢复
  • 进度监控与状态管理
  • 基于意图分析优化工作流程

技术栈:

from llama_index.agent.openai import OpenAIAgent
from llama_index.core.tools import FunctionTool
from llama_index.core.workflow import Workflow
from typing import Dict, Any
import asyncio
import logging

class CoordinatorAgent:
    def __init__(self, service_context, workflow_manager):
        self.service_context = service_context
        self.workflow_manager = workflow_manager
        self.active_tasks = {}
        self.logger = logging.getLogger(__name__)
        
        # 创建协调工具
        self.coordination_tools = self._create_coordination_tools()
        
        # 初始化协调Agent
        self.agent = OpenAIAgent.from_tools(
            tools=self.coordination_tools,
            llm=service_context.llm,
            system_prompt="""
            你是一个专业的任务协调专家,负责:
            1. 接收并整合意图理解Agent的分析结果
            2. 基于用户真实意图制定执行计划
            3. 协调各个专业Agent的工作
            4. 监控任务进度和处理异常
            5. 确保最终输出符合用户意图和质量要求
            
            请始终以用户意图为导向,保持专业、高效的工作方式。
            """,
            verbose=True
        )
    
    def _create_coordination_tools(self):
        """创建协调工具"""
        def start_blog_workflow(topic: str, requirements: str) -> str:
            """启动博客写作工作流"""
            try:
                task_id = f"blog_{len(self.active_tasks)}"
                workflow_future = asyncio.create_task(
                    self.workflow_manager.start_blog_writing(topic, requirements)
                )
                self.active_tasks[task_id] = {
                    'workflow': workflow_future,
                    'status': 'running',
                    'topic': topic,
                    'start_time': asyncio.get_event_loop().time()
                }
                return f"博客写作工作流已启动,任务ID: {task_id}"
            except Exception as e:
                return f"启动工作流失败: {str(e)}"
        
        def check_task_status(task_id: str) -> str:
            """检查任务状态"""
            if task_id not in self.active_tasks:
                return f"任务 {task_id} 不存在"
            
            task = self.active_tasks[task_id]
            if task['workflow'].done():
                if task['workflow'].exception():
                    return f"任务 {task_id} 执行失败: {task['workflow'].exception()}"
                else:
                    return f"任务 {task_id} 执行完成"
            else:
                return f"任务 {task_id} 正在执行中"
        
        def get_task_result(task_id: str) -> str:
            """获取任务结果"""
            if task_id not in self.active_tasks:
                return f"任务 {task_id} 不存在"
            
            task = self.active_tasks[task_id]
            if not task['workflow'].done():
                return f"任务 {task_id} 尚未完成"
            
            try:
                result = task['workflow'].result()
                return f"任务结果: {result['final_content'][:500]}..."
            except Exception as e:
                return f"获取结果失败: {str(e)}"
        
        return [
            FunctionTool.from_defaults(
                fn=start_blog_workflow,
                name="start_blog_workflow",
                description="启动博客写作工作流"
            ),
            FunctionTool.from_defaults(
                fn=check_task_status,
                name="check_task_status",
                description="检查任务执行状态"
            ),
            FunctionTool.from_defaults(
                fn=get_task_result,
                name="get_task_result",
                description="获取任务执行结果"
            )
        ]
    
    async def process_request(self, user_request: str) -> Dict[str, Any]:
        """处理用户请求"""
        try:
            # 使用Agent处理请求
            response = await self.agent.achat(user_request)
            
            # 解析响应并执行相应操作
            result = {
                'success': True,
                'response': response.response,
                'active_tasks': list(self.active_tasks.keys())
            }
            
            return result
            
        except Exception as e:
            self.logger.error(f"处理请求失败: {str(e)}")
            return {
                'success': False,
                'error': str(e),
                'active_tasks': list(self.active_tasks.keys())
            }
    
    async def monitor_tasks(self):
        """监控任务执行"""
        while True:
            completed_tasks = []
            
            for task_id, task in self.active_tasks.items():
                if task['workflow'].done():
                    completed_tasks.append(task_id)
                    if task['workflow'].exception():
                        self.logger.error(
                            f"任务 {task_id} 执行失败: {task['workflow'].exception()}"
                        )
                    else:
                        self.logger.info(f"任务 {task_id} 执行完成")
            
            # 清理已完成的任务
            for task_id in completed_tasks:
                del self.active_tasks[task_id]
            
            await asyncio.sleep(5)  # 每5秒检查一次

3.3 规划Agent (Planning Agent)

职责: 内容结构规划与写作策略制定

核心能力:

  • 主题分析与大纲生成
  • 内容架构设计
  • 写作策略规划
  • 资源需求评估

算法实现:

class PlanningAgent:
    def __init__(self):
        self.outline_generator = OutlineGenerator()
        self.strategy_planner = StrategyPlanner()
        self.resource_estimator = ResourceEstimator()
    
    async def create_plan(self, topic, requirements):
        # 主题分析
        topic_analysis = await self.analyze_topic(topic)
        
        # 生成大纲
        outline = await self.outline_generator.generate(
            topic_analysis, requirements
        )
        
        # 制定写作策略
        strategy = await self.strategy_planner.plan(
            outline, target_audience=requirements.audience
        )
        
        return BlogPlan(outline, strategy, topic_analysis)

3.4 检索Agent (Retrieval Agent)

职责: 多源信息检索与知识融合

核心能力:

  • Web搜索与实时信息获取
  • 学术数据库检索
  • 私有知识库查询
  • 多模态信息融合

多源检索架构:

from llama_index.readers.web import SimpleWebPageReader
from llama_index.readers.arxiv import ArxivReader
from llama_index.readers.google import GoogleDocsReader
from llama_index.agent.openai import OpenAIAgent
from llama_index.core.tools import QueryEngineTool, FunctionTool
from llama_index.core import VectorStoreIndex
from llama_index.core.retrievers import VectorIndexRetriever
from llama_index.core.query_engine import RetrieverQueryEngine
from typing import List, Dict, Any
import asyncio

class RetrievalAgent:
    def __init__(self, service_context, data_fusion_engine):
        self.service_context = service_context
        self.data_fusion = data_fusion_engine
        
        # 初始化数据读取器
        self.web_reader = SimpleWebPageReader(html_to_text=True)
        self.arxiv_reader = ArxivReader()
        self.google_docs_reader = GoogleDocsReader()
        
        # 存储已建立的索引
        self.indexes = {}
        self.query_engines = {}
        
        # 创建检索工具
        self.retrieval_tools = self._create_retrieval_tools()
        
        # 初始化检索Agent
        self.agent = OpenAIAgent.from_tools(
            tools=self.retrieval_tools,
            llm=service_context.llm,
            system_prompt="""
            你是一个专业的信息检索专家,负责:
            1. 从多个数据源检索相关信息
            2. 评估信息的相关性和可信度
            3. 整合和去重检索结果
            4. 提供高质量的信息摘要
            
            请确保检索的信息准确、相关且来源可靠。
            """,
            verbose=True
        )
    
    def _create_retrieval_tools(self):
        """创建检索工具"""
        
        def search_web_content(urls: List[str], query: str) -> str:
            """搜索Web内容"""
            try:
                # 加载Web文档
                documents = self.web_reader.load_data(urls)
                
                # 为每个文档添加元数据
                for i, doc in enumerate(documents):
                    doc.metadata.update({
                        'source_type': 'web',
                        'url': urls[i] if i < len(urls) else 'unknown',
                        'credibility_score': 0.7
                    })
                
                # 创建索引
                index = VectorStoreIndex.from_documents(
                    documents, service_context=self.service_context
                )
                
                # 查询
                query_engine = index.as_query_engine(similarity_top_k=5)
                response = query_engine.query(query)
                
                return f"Web搜索结果: {response.response}"
                
            except Exception as e:
                return f"Web搜索失败: {str(e)}"
        
        def search_arxiv_papers(paper_ids: List[str], query: str) -> str:
            """搜索arXiv论文"""
            try:
                # 加载arXiv论文
                documents = self.arxiv_reader.load_papers(paper_ids)
                
                # 添加元数据
                for doc in documents:
                    doc.metadata.update({
                        'source_type': 'academic',
                        'credibility_score': 0.9
                    })
                
                # 创建索引
                index = VectorStoreIndex.from_documents(
                    documents, service_context=self.service_context
                )
                
                # 查询
                query_engine = index.as_query_engine(similarity_top_k=3)
                response = query_engine.query(query)
                
                return f"学术论文搜索结果: {response.response}"
                
            except Exception as e:
                return f"学术搜索失败: {str(e)}"
        
        def search_knowledge_base(index_name: str, query: str) -> str:
            """搜索知识库"""
            try:
                if index_name not in self.query_engines:
                    return f"知识库 {index_name} 不存在"
                
                query_engine = self.query_engines[index_name]
                response = query_engine.query(query)
                
                return f"知识库搜索结果: {response.response}"
                
            except Exception as e:
                return f"知识库搜索失败: {str(e)}"
        
        def multi_source_search(query: str, sources_config: Dict[str, Any]) -> str:
            """多源综合搜索"""
            try:
                all_results = []
                
                # Web搜索
                if 'web_urls' in sources_config:
                    web_result = search_web_content(
                        sources_config['web_urls'], query
                    )
                    all_results.append(web_result)
                
                # 学术搜索
                if 'arxiv_papers' in sources_config:
                    arxiv_result = search_arxiv_papers(
                        sources_config['arxiv_papers'], query
                    )
                    all_results.append(arxiv_result)
                
                # 知识库搜索
                if 'knowledge_bases' in sources_config:
                    for kb_name in sources_config['knowledge_bases']:
                        kb_result = search_knowledge_base(kb_name, query)
                        all_results.append(kb_result)
                
                # 整合结果
                integrated_result = "\n\n".join(all_results)
                return f"综合搜索结果:\n{integrated_result}"
                
            except Exception as e:
                return f"多源搜索失败: {str(e)}"
        
        return [
            FunctionTool.from_defaults(
                fn=search_web_content,
                name="search_web_content",
                description="从指定的Web页面搜索相关内容"
            ),
            FunctionTool.from_defaults(
                fn=search_arxiv_papers,
                name="search_arxiv_papers",
                description="从arXiv论文中搜索学术内容"
            ),
            FunctionTool.from_defaults(
                fn=search_knowledge_base,
                name="search_knowledge_base",
                description="从指定知识库中搜索内容"
            ),
            FunctionTool.from_defaults(
                fn=multi_source_search,
                name="multi_source_search",
                description="执行多源综合搜索"
            )
        ]
    
    async def retrieve_information(self, query: str, sources_config: Dict[str, Any]) -> Dict[str, Any]:
        """检索信息的主要方法"""
        try:
            # 构建检索提示
            retrieval_prompt = f"""
            请根据查询"{query}"从以下数据源检索相关信息:
            
            数据源配置: {sources_config}
            
            要求:
            1. 使用multi_source_search进行综合搜索
            2. 评估信息的相关性和可信度
            3. 提供信息摘要和关键点
            4. 标注信息来源
            """
            
            # 使用Agent执行检索
            response = await self.agent.achat(retrieval_prompt)
            
            return {
                'success': True,
                'query': query,
                'results': response.response,
                'sources': sources_config
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': str(e),
                'query': query
            }
    
    async def build_knowledge_base(self, name: str, documents: List) -> bool:
        """构建知识库索引"""
        try:
            # 处理文档
            processed_docs = await self.data_fusion.process_and_fuse(documents)
            
            # 创建索引
            index = VectorStoreIndex.from_documents(
                processed_docs, service_context=self.service_context
            )
            
            # 创建查询引擎
            query_engine = index.as_query_engine(
                similarity_top_k=10,
                response_mode="tree_summarize"
            )
            
            # 存储
            self.indexes[name] = index
            self.query_engines[name] = query_engine
            
            return True
            
        except Exception as e:
            print(f"构建知识库失败: {str(e)}")
            return False

多源数据集成配置:

retrieval_sources:
  web:
    - google_search_api
    - bing_api
    - duckduckgo_api
  academic:
    - arxiv
    - ieee_xplore
    - acm_digital_library
    - pubmed
    - google_scholar
  knowledge_base:
    - company_docs
    - tech_wikis
    - code_repositories

3.5 写作Agent (Writing Agent)

职责: 核心内容生成与文本创作

核心能力:

  • 基于RAG的智能写作
  • 多语言内容生成
  • 技术概念解释
  • 代码示例生成

写作引擎设计:

from llama_index.agent.openai import OpenAIAgent
from llama_index.core.tools import FunctionTool, QueryEngineTool
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.llms import ChatMessage, MessageRole
from typing import Dict, List, Any, Optional
import json

class WritingAgent:
    def __init__(self, service_context, retrieval_agent, code_generator=None):
        self.service_context = service_context
        self.retrieval_agent = retrieval_agent
        self.code_generator = code_generator
        
        # 创建写作工具
        self.writing_tools = self._create_writing_tools()
        
        # 初始化写作Agent
        self.agent = OpenAIAgent.from_tools(
            tools=self.writing_tools,
            llm=service_context.llm,
            memory=ChatMemoryBuffer.from_defaults(token_limit=10000),
            system_prompt="""
            你是一个专业的技术写作专家,擅长创作高质量的技术博客和文档。
            
            核心职责:
            1. 基于检索到的信息生成准确、专业的技术内容
            2. 确保内容逻辑清晰、结构合理
            3. 适当插入代码示例和技术图表
            4. 保持一致的写作风格和专业术语使用
            5. 优化内容的可读性和技术深度
            
            写作原则:
            - 准确性:确保技术信息的准确性
            - 清晰性:使用简洁明了的语言表达复杂概念
            - 实用性:提供有价值的实践指导
            - 完整性:覆盖主题的关键方面
            - 专业性:使用标准的技术术语和表达方式
            """,
            verbose=True
        )
        
        # 写作历史和上下文
        self.writing_context = {
            'current_topic': None,
            'writing_style': 'technical',
            'target_audience': 'developers',
            'content_sections': []
        }
    
    def _create_writing_tools(self):
        """创建写作工具"""
        
        def write_section_with_rag(section_plan: str, retrieved_context: str) -> str:
            """基于RAG的章节写作"""
            try:
                plan_data = json.loads(section_plan)
                
                # 构建写作提示
                writing_prompt = f"""
                请根据以下信息写作一个技术博客章节:
                
                章节计划:
                - 标题: {plan_data.get('title', '')}
                - 主要内容点: {plan_data.get('key_points', [])}
                - 目标长度: {plan_data.get('target_length', 800)}字
                - 写作风格: {plan_data.get('style', 'technical')}
                - 目标读者: {plan_data.get('audience', 'developers')}
                
                检索到的相关信息:
                {retrieved_context}
                
                写作要求:
                1. 内容要准确、专业、有深度
                2. 结构清晰,逻辑连贯
                3. 适当使用技术术语,但要确保可读性
                4. 如果涉及代码,请提供清晰的示例
                5. 保持客观、专业的写作语调
                
                请生成完整的章节内容,包括适当的标题层级。
                """
                
                # 这里应该调用LLM生成内容
                # 为了示例,返回一个结构化的响应
                section_content = f"""
## {plan_data.get('title', '章节标题')}

[基于检索信息和章节计划生成的专业技术内容]

### 核心概念

[基于 {retrieved_context[:100]}... 的核心概念解释]

### 技术实现

[技术实现细节和最佳实践]

### 实际应用

[实际应用场景和案例分析]
                """
                
                return section_content
                
            except Exception as e:
                return f"章节写作失败: {str(e)}"
        
        def generate_code_examples(requirements: str, context: str) -> str:
            """生成代码示例"""
            try:
                if self.code_generator:
                    return self.code_generator.generate(requirements, context)
                
                # 默认代码生成逻辑
                req_data = json.loads(requirements) if isinstance(requirements, str) else requirements
                
                code_template = f"""
{req_data.get('language', 'python')}
# {req_data.get('description', '代码示例')}

{req_data.get('template_code', '# 示例代码将在这里生成')}

**代码说明:**

- {req_data.get('explanation', '代码功能和使用方法的详细说明')}
                """

                return code_template
                
            except Exception as e:
                return f"代码生成失败: {str(e)}"
        
        def enhance_content_with_context(base_content: str, additional_context: str) -> str:
            """使用额外上下文增强内容"""
            try:
                enhancement_prompt = f"""
                请基于以下额外上下文信息来增强和完善现有内容:
                
                现有内容:
                {base_content}
                
                额外上下文:
                {additional_context}
                
                增强要求:
                1. 添加更多技术细节和深度分析
                2. 补充相关的最佳实践和注意事项
                3. 增加实际应用场景和案例
                4. 确保信息的准确性和时效性
                5. 保持原有的结构和风格
                
                请返回增强后的完整内容。
                """
                
                # 这里应该调用LLM进行内容增强
                enhanced_content = f"{base_content}\n\n### 深度分析\n\n[基于额外上下文的深度分析内容]\n\n### 最佳实践\n\n[相关最佳实践和建议]"
                
                return enhanced_content
                
            except Exception as e:
                return f"内容增强失败: {str(e)}"
        
        def optimize_readability(content: str, target_audience: str) -> str:
            """优化内容可读性"""
            try:
                optimization_prompt = f"""
                请优化以下内容的可读性,目标读者是: {target_audience}
                
                原始内容:
                {content}
                
                优化要求:
                1. 调整语言复杂度以适应目标读者
                2. 改善段落结构和逻辑流程
                3. 添加必要的解释和背景信息
                4. 优化标题和子标题的层次结构
                5. 确保技术术语的适当使用和解释
                
                请返回优化后的内容。
                """
                
                # 这里应该调用LLM进行可读性优化
                optimized_content = content  # 简化示例
                
                return optimized_content
                
            except Exception as e:
                return f"可读性优化失败: {str(e)}"
        
        return [
            FunctionTool.from_defaults(
                fn=write_section_with_rag,
                name="write_section_with_rag",
                description="基于RAG检索信息和章节计划生成技术内容"
            ),
            FunctionTool.from_defaults(
                fn=generate_code_examples,
                name="generate_code_examples",
                description="根据需求生成相关的代码示例和说明"
            ),
            FunctionTool.from_defaults(
                fn=enhance_content_with_context,
                name="enhance_content_with_context",
                description="使用额外上下文信息增强和完善内容"
            ),
            FunctionTool.from_defaults(
                fn=optimize_readability,
                name="optimize_readability",
                description="优化内容的可读性以适应目标读者"
            )
        ]

    async def write_section(self, section_plan: Dict[str, Any], retrieved_info: Dict[str, Any]) -> Dict[str, Any]:
        """写作章节的主要方法"""
        try:
            # 更新写作上下文
            self.writing_context['current_topic'] = section_plan.get('title', '')

            # 构建写作提示
            writing_prompt = f"""
            请为技术博客写作一个章节,具体要求如下:
            
            章节计划: {json.dumps(section_plan, ensure_ascii=False, indent=2)}
            
            检索到的相关信息: {json.dumps(retrieved_info, ensure_ascii=False, indent=2)}
            
            请按以下步骤执行:
            1. 使用write_section_with_rag基于检索信息生成基础内容
            2. 如果需要代码示例,使用generate_code_examples生成
            3. 使用enhance_content_with_context增强内容深度
            4. 使用optimize_readability优化可读性
            
            最终输出完整的、高质量的技术章节内容。
            """
            
            # 使用Agent执行写作任务
            response = await self.agent.achat(writing_prompt)
            
            # 记录到写作上下文
            self.writing_context['content_sections'].append({
                'title': section_plan.get('title', ''),
                'content': response.response,
                'metadata': section_plan
            })
            
            return {
                'success': True,
                'content': response.response,
                'section_plan': section_plan,
                'metadata': {
                    'word_count': len(response.response.split()),
                    'has_code': '```' in response.response,
                    'section_level': section_plan.get('level', 1)
                }
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': str(e),
                'section_plan': section_plan
            }

    async def write_complete_article(self, outline: Dict[str, Any], research_data: Dict[str, Any]) -> Dict[str, Any]:
        """写作完整文章"""
        try:
            article_sections = []

            # 逐个写作章节
            for section in outline.get('sections', []):
                # 为每个章节检索相关信息
                section_context = await self.retrieval_agent.retrieve_information(
                    section.get('title', ''), research_data
                )
                
                # 写作章节
                section_result = await self.write_section(section, section_context)
                
                if section_result['success']:
                    article_sections.append(section_result['content'])
                else:
                    print(f"章节写作失败: {section_result['error']}")
            
            # 组合完整文章
            complete_article = "\n\n".join(article_sections)
            
            return {
                'success': True,
                'article': complete_article,
                'outline': outline,
                'metadata': {
                    'total_sections': len(article_sections),
                    'total_words': len(complete_article.split()),
                    'writing_context': self.writing_context
                }
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': str(e),
                'outline': outline
            }

3.6 风格化Agent (Style Agent)

职责: 内容风格优化与个性化调整

核心能力:

  • 写作风格一致性保持
  • 技术术语标准化
  • 可读性优化
  • 个性化风格适配

风格化处理流程:

from llama_index.agent.openai import OpenAIAgent
from llama_index.core.tools import FunctionTool
from llama_index.core.memory import ChatMemoryBuffer
from typing import Dict, List, Any, Optional
import json
import re

class StyleAgent:
    def __init__(self, service_context, style_config=None):
        self.service_context = service_context
        self.style_config = style_config or self._default_style_config()
        
        # 创建风格化工具
        self.style_tools = self._create_style_tools()
        
        # 初始化风格化Agent
        self.agent = OpenAIAgent.from_tools(
            tools=self.style_tools,
            llm=service_context.llm,
            memory=ChatMemoryBuffer.from_defaults(token_limit=8000),
            system_prompt="""
            你是一个专业的内容风格优化专家,负责:
            1. 分析和调整文本的写作风格
            2. 确保技术术语的一致性和标准化
            3. 优化内容的可读性和专业性
            4. 适配不同目标读者的需求
            5. 保持品牌和个人写作风格的一致性
            
            风格优化原则:
            - 一致性:确保整篇文章风格统一
            - 专业性:使用准确的技术术语
            - 可读性:平衡专业性和易读性
            - 适应性:根据目标读者调整语言复杂度
            - 准确性:保持技术内容的准确性
            """,
            verbose=True
        )
        
        # 风格分析历史
        self.style_history = []
    
    def _default_style_config(self):
        """默认风格配置"""
        return {
            'tone': 'professional',
            'formality': 'semi-formal',
            'technical_level': 'intermediate',
            'target_audience': 'developers',
            'terminology_standards': {
                'AI': 'Artificial Intelligence',
                'ML': 'Machine Learning',
                'API': 'Application Programming Interface'
            },
            'style_preferences': {
                'use_active_voice': True,
                'prefer_short_sentences': False,
                'include_examples': True,
                'use_bullet_points': True
            }
        }
    
    def _create_style_tools(self):
        """创建风格化工具"""
        
        def analyze_writing_style(content: str) -> str:
            """分析写作风格"""
            try:
                # 分析各种风格指标
                analysis = {
                    'tone_analysis': self._analyze_tone(content),
                    'readability_score': self._calculate_readability(content),
                    'technical_density': self._analyze_technical_density(content),
                    'sentence_structure': self._analyze_sentence_structure(content),
                    'terminology_consistency': self._check_terminology_consistency(content),
                    'style_issues': self._identify_style_issues(content)
                }
                
                return json.dumps(analysis, ensure_ascii=False, indent=2)
                
            except Exception as e:
                return f"风格分析失败: {str(e)}"
        
        def adjust_tone_and_style(content: str, target_style: str) -> str:
            """调整语调和风格"""
            try:
                style_config = json.loads(target_style) if isinstance(target_style, str) else target_style
                
                adjustment_prompt = f"""
                请根据以下风格要求调整内容的语调和写作风格:
                
                原始内容:
                {content}
                
                目标风格配置:
                - 语调: {style_config.get('tone', 'professional')}
                - 正式程度: {style_config.get('formality', 'semi-formal')}
                - 技术水平: {style_config.get('technical_level', 'intermediate')}
                - 目标读者: {style_config.get('target_audience', 'developers')}
                
                调整要求:
                1. 保持技术内容的准确性
                2. 调整语言的正式程度和复杂度
                3. 确保语调符合目标风格
                4. 优化句式结构和表达方式
                5. 保持逻辑结构不变
                
                请返回调整后的内容。
                """
                
                # 这里应该调用LLM进行风格调整
                adjusted_content = content  # 简化示例
                
                return adjusted_content
                
            except Exception as e:
                return f"风格调整失败: {str(e)}"
        
        def standardize_terminology(content: str, terminology_dict: str) -> str:
            """标准化技术术语"""
            try:
                term_dict = json.loads(terminology_dict) if isinstance(terminology_dict, str) else terminology_dict
                
                standardized_content = content
                
                # 应用术语标准化
                for abbrev, full_form in term_dict.items():
                    # 首次出现时使用全称
                    pattern = rf'\b{re.escape(abbrev)}\b'
                    if re.search(pattern, standardized_content):
                        # 只在首次出现时替换为全称+缩写
                        standardized_content = re.sub(
                            pattern, 
                            f"{full_form} ({abbrev})", 
                            standardized_content, 
                            count=1
                        )
                
                return standardized_content
                
            except Exception as e:
                return f"术语标准化失败: {str(e)}"
        
        def optimize_readability(content: str, target_audience: str) -> str:
            """优化可读性"""
            try:
                optimization_prompt = f"""
                请优化以下内容的可读性,目标读者是: {target_audience}
                
                原始内容:
                {content}
                
                优化要求:
                1. 简化复杂句式,提高理解度
                2. 添加适当的过渡词和连接词
                3. 优化段落结构和信息层次
                4. 确保技术概念的清晰解释
                5. 保持专业性的同时提高可读性
                
                请返回优化后的内容。
                """
                
                # 这里应该调用LLM进行可读性优化
                optimized_content = content  # 简化示例
                
                return optimized_content
                
            except Exception as e:
                return f"可读性优化失败: {str(e)}"
        
        def ensure_style_consistency(content_sections: List[str]) -> str:
            """确保风格一致性"""
            try:
                consistency_prompt = f"""
                请检查并确保以下多个章节内容的风格一致性:
                
                章节内容:
                {json.dumps(content_sections, ensure_ascii=False, indent=2)}
                
                一致性要求:
                1. 统一语调和写作风格
                2. 保持术语使用的一致性
                3. 统一句式结构和表达习惯
                4. 确保逻辑连贯性
                5. 保持专业水平的一致性
                
                请返回风格统一后的完整内容。
                """
                
                # 这里应该调用LLM进行一致性处理
                consistent_content = "\n\n".join(content_sections)  # 简化示例
                
                return consistent_content
                
            except Exception as e:
                return f"风格一致性处理失败: {str(e)}"
        
        return [
            FunctionTool.from_defaults(
                fn=analyze_writing_style,
                name="analyze_writing_style",
                description="分析内容的写作风格和特征"
            ),
            FunctionTool.from_defaults(
                fn=adjust_tone_and_style,
                name="adjust_tone_and_style",
                description="根据目标风格调整内容的语调和写作风格"
            ),
            FunctionTool.from_defaults(
                fn=standardize_terminology,
                name="standardize_terminology",
                description="标准化技术术语的使用"
            ),
            FunctionTool.from_defaults(
                fn=optimize_readability,
                name="optimize_readability",
                description="优化内容的可读性"
            ),
            FunctionTool.from_defaults(
                fn=ensure_style_consistency,
                name="ensure_style_consistency",
                description="确保多个章节间的风格一致性"
            )
        ]
    
    def _analyze_tone(self, content: str) -> Dict[str, Any]:
        """分析语调"""
        # 简化的语调分析
        formal_indicators = ['furthermore', 'therefore', 'consequently', 'moreover']
        informal_indicators = ['basically', 'pretty much', 'kind of', 'sort of']
        
        formal_count = sum(1 for word in formal_indicators if word in content.lower())
        informal_count = sum(1 for word in informal_indicators if word in content.lower())
        
        return {
            'formality_score': formal_count - informal_count,
            'tone': 'formal' if formal_count > informal_count else 'informal'
        }
    
    def _calculate_readability(self, content: str) -> float:
        """计算可读性分数"""
        # 简化的可读性计算
        sentences = content.split('.')
        words = content.split()
        
        if len(sentences) == 0:
            return 0.0
        
        avg_sentence_length = len(words) / len(sentences)
        return max(0, 100 - avg_sentence_length * 2)  # 简化公式
    
    def _analyze_technical_density(self, content: str) -> float:
        """分析技术密度"""
        technical_terms = ['algorithm', 'framework', 'implementation', 'architecture', 'optimization']
        words = content.lower().split()
        
        if len(words) == 0:
            return 0.0
        
        technical_count = sum(1 for word in words if word in technical_terms)
        return technical_count / len(words) * 100
    
    def _analyze_sentence_structure(self, content: str) -> Dict[str, Any]:
        """分析句式结构"""
        sentences = [s.strip() for s in content.split('.') if s.strip()]
        
        if not sentences:
            return {'avg_length': 0, 'complexity': 'low'}
        
        avg_length = sum(len(s.split()) for s in sentences) / len(sentences)
        complexity = 'high' if avg_length > 20 else 'medium' if avg_length > 10 else 'low'
        
        return {
            'avg_length': avg_length,
            'complexity': complexity,
            'sentence_count': len(sentences)
        }
    
    def _check_terminology_consistency(self, content: str) -> Dict[str, Any]:
        """检查术语一致性"""
        # 简化的一致性检查
        inconsistencies = []
        
        # 检查常见的不一致用法
        if 'AI' in content and 'artificial intelligence' in content.lower():
            inconsistencies.append('AI术语使用不一致')
        
        return {
            'inconsistencies': inconsistencies,
            'consistency_score': max(0, 100 - len(inconsistencies) * 10)
        }
    
    def _identify_style_issues(self, content: str) -> List[str]:
        """识别风格问题"""
        issues = []
        
        # 检查常见风格问题
        if content.count('!') > 3:
            issues.append('过多使用感叹号')
        
        if 'very' in content.lower():
            issues.append('使用了模糊限定词')
        
        return issues
    
    async def stylize_content(self, content: str, style_guide: Dict[str, Any]) -> Dict[str, Any]:
        """风格化内容的主要方法"""
        try:
            # 构建风格化提示
            stylization_prompt = f"""
            请对以下内容进行全面的风格优化:
            
            原始内容: {content}
            
            风格指南: {json.dumps(style_guide, ensure_ascii=False, indent=2)}
            
            请按以下步骤执行:
            1. 使用analyze_writing_style分析当前风格
            2. 使用adjust_tone_and_style调整语调和风格
            3. 使用standardize_terminology标准化术语
            4. 使用optimize_readability优化可读性
            
            最终输出风格优化后的完整内容。
            """
            
            # 使用Agent执行风格化任务
            response = await self.agent.achat(stylization_prompt)
            
            # 记录风格化历史
            self.style_history.append({
                'original_content': content,
                'style_guide': style_guide,
                'stylized_content': response.response,
                'timestamp': self._get_timestamp()
            })
            
            return {
                'success': True,
                'stylized_content': response.response,
                'style_guide': style_guide,
                'metadata': {
                    'original_length': len(content.split()),
                    'stylized_length': len(response.response.split()),
                    'style_changes_applied': True
                }
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': str(e),
                'original_content': content
            }
    
    def _get_timestamp(self) -> str:
        """获取时间戳"""
        from datetime import datetime
        return datetime.now().isoformat()

3.7 引用Agent (Citation Agent)

职责: 引用管理与学术规范

核心能力:

  • 自动引用生成
  • 多种引用格式支持
  • 引用质量验证
  • 参考文献管理

引用系统设计:

from llama_index.agent.openai import OpenAIAgent
from llama_index.core.tools import FunctionTool
from llama_index.core.memory import ChatMemoryBuffer
from typing import Dict, List, Any, Optional
import json
import re
from datetime import datetime

class CitationAgent:
    def __init__(self, service_context, citation_config=None):
        self.service_context = service_context
        self.citation_config = citation_config or self._default_citation_config()
        
        # 创建引用工具
        self.citation_tools = self._create_citation_tools()
        
        # 初始化引用Agent
        self.agent = OpenAIAgent.from_tools(
            tools=self.citation_tools,
            llm=service_context.llm,
            memory=ChatMemoryBuffer.from_defaults(token_limit=6000),
            system_prompt="""
            你是一个专业的学术引用和参考文献管理专家,负责:
            1. 识别需要引用的内容和观点
            2. 生成符合学术规范的引用格式
            3. 管理和组织参考文献
            4. 验证引用的准确性和完整性
            5. 确保引用格式的一致性
            
            引用原则:
            - 准确性:确保引用信息的准确性
            - 完整性:提供完整的引用信息
            - 一致性:保持引用格式的统一
            - 规范性:遵循学术引用标准
            - 可追溯性:确保读者能够找到原始资料
            """,
            verbose=True
        )
        
        # 引用数据库
        self.citation_database = {
            'sources': {},
            'citations': [],
            'bibliography': []
        }
    
    def _default_citation_config(self):
        """默认引用配置"""
        return {
            'default_style': 'APA',
            'supported_styles': ['APA', 'MLA', 'Chicago', 'IEEE'],
            'auto_generate': True,
            'include_urls': True,
            'include_access_dates': True,
            'validate_sources': True
        }
    
    def _create_citation_tools(self):
        """创建引用工具"""
        
        def extract_citation_points(content: str) -> str:
            """提取需要引用的内容点"""
            try:
                # 识别需要引用的内容
                citation_points = []
                
                # 查找直接引用(引号内容)
                direct_quotes = re.findall(r'"([^"]+)"', content)
                for quote in direct_quotes:
                    citation_points.append({
                        'type': 'direct_quote',
                        'content': quote,
                        'position': content.find(f'"{quote}"'),
                        'needs_citation': True
                    })
                
                # 查找统计数据和具体数字
                statistics = re.findall(r'\b\d+(?:\.\d+)?%|\b\d+(?:,\d{3})*(?:\.\d+)?\b', content)
                for stat in statistics:
                    if content.find(stat) != -1:
                        citation_points.append({
                            'type': 'statistic',
                            'content': stat,
                            'position': content.find(stat),
                            'needs_citation': True
                        })
                
                # 查找技术概念和专业术语
                technical_patterns = [
                    r'according to [^,\.]+',
                    r'research shows',
                    r'studies indicate',
                    r'as demonstrated by'
                ]
                
                for pattern in technical_patterns:
                    matches = re.finditer(pattern, content, re.IGNORECASE)
                    for match in matches:
                        citation_points.append({
                            'type': 'research_reference',
                            'content': match.group(),
                            'position': match.start(),
                            'needs_citation': True
                        })
                
                return json.dumps(citation_points, ensure_ascii=False, indent=2)
                
            except Exception as e:
                return f"引用点提取失败: {str(e)}"
        
        def match_sources_to_citations(citation_points: str, available_sources: str) -> str:
            """将引用点与可用源匹配"""
            try:
                points = json.loads(citation_points)
                sources = json.loads(available_sources)
                
                matched_citations = []
                
                for point in points:
                    best_match = None
                    best_score = 0
                    
                    # 为每个引用点找到最佳匹配的源
                    for source_id, source_info in sources.items():
                        score = self._calculate_relevance_score(
                            point['content'], 
                            source_info.get('content', '') + ' ' + source_info.get('title', '')
                        )
                        
                        if score > best_score:
                            best_score = score
                            best_match = source_id
                    
                    if best_match and best_score > 0.3:  # 阈值
                        matched_citations.append({
                            'citation_point': point,
                            'source_id': best_match,
                            'source_info': sources[best_match],
                            'relevance_score': best_score
                        })
                
                return json.dumps(matched_citations, ensure_ascii=False, indent=2)
                
            except Exception as e:
                return f"源匹配失败: {str(e)}"
        
        def format_citations(matched_citations: str, citation_style: str) -> str:
            """格式化引用"""
            try:
                citations = json.loads(matched_citations)
                formatted_citations = []
                
                for citation in citations:
                    source_info = citation['source_info']
                    
                    if citation_style.upper() == 'APA':
                        formatted = self._format_apa_citation(source_info)
                    elif citation_style.upper() == 'MLA':
                        formatted = self._format_mla_citation(source_info)
                    elif citation_style.upper() == 'IEEE':
                        formatted = self._format_ieee_citation(source_info)
                    else:
                        formatted = self._format_apa_citation(source_info)  # 默认APA
                    
                    formatted_citations.append({
                        'citation_point': citation['citation_point'],
                        'formatted_citation': formatted,
                        'in_text_citation': self._generate_in_text_citation(source_info, citation_style),
                        'source_id': citation['source_id']
                    })
                
                return json.dumps(formatted_citations, ensure_ascii=False, indent=2)
                
            except Exception as e:
                return f"引用格式化失败: {str(e)}"
        
        def generate_bibliography(sources: str, citation_style: str) -> str:
            """生成参考文献列表"""
            try:
                source_data = json.loads(sources)
                bibliography = []
                
                for source_id, source_info in source_data.items():
                    if citation_style.upper() == 'APA':
                        bib_entry = self._format_apa_bibliography(source_info)
                    elif citation_style.upper() == 'MLA':
                        bib_entry = self._format_mla_bibliography(source_info)
                    elif citation_style.upper() == 'IEEE':
                        bib_entry = self._format_ieee_bibliography(source_info)
                    else:
                        bib_entry = self._format_apa_bibliography(source_info)
                    
                    bibliography.append({
                        'source_id': source_id,
                        'formatted_entry': bib_entry,
                        'source_type': source_info.get('type', 'unknown')
                    })
                
                # 按字母顺序排序
                bibliography.sort(key=lambda x: x['formatted_entry'])
                
                return json.dumps(bibliography, ensure_ascii=False, indent=2)
                
            except Exception as e:
                return f"参考文献生成失败: {str(e)}"
        
        def validate_citations(citations: str) -> str:
            """验证引用质量"""
            try:
                citation_data = json.loads(citations)
                validation_results = {
                    'valid_citations': [],
                    'invalid_citations': [],
                    'warnings': [],
                    'suggestions': []
                }
                
                for citation in citation_data:
                    source_info = citation.get('source_info', {})
                    issues = []
                    
                    # 检查必需字段
                    required_fields = ['title', 'author', 'year']
                    for field in required_fields:
                        if not source_info.get(field):
                            issues.append(f"缺少{field}字段")
                    
                    # 检查URL有效性
                    if source_info.get('url') and not source_info['url'].startswith(('http://', 'https://')):
                        issues.append("URL格式无效")
                    
                    # 检查年份格式
                    year = source_info.get('year')
                    if year and not re.match(r'^\d{4}$', str(year)):
                        issues.append("年份格式无效")
                    
                    if issues:
                        validation_results['invalid_citations'].append({
                            'citation': citation,
                            'issues': issues
                        })
                    else:
                        validation_results['valid_citations'].append(citation)
                
                return json.dumps(validation_results, ensure_ascii=False, indent=2)
                
            except Exception as e:
                return f"引用验证失败: {str(e)}"
        
        return [
            FunctionTool.from_defaults(
                fn=extract_citation_points,
                name="extract_citation_points",
                description="从内容中提取需要引用的关键点"
            ),
            FunctionTool.from_defaults(
                fn=match_sources_to_citations,
                name="match_sources_to_citations",
                description="将引用点与可用的源材料进行匹配"
            ),
            FunctionTool.from_defaults(
                fn=format_citations,
                name="format_citations",
                description="按指定格式格式化引用"
            ),
            FunctionTool.from_defaults(
                fn=generate_bibliography,
                name="generate_bibliography",
                description="生成完整的参考文献列表"
            ),
            FunctionTool.from_defaults(
                fn=validate_citations,
                name="validate_citations",
                description="验证引用的质量和完整性"
            )
        ]
    
    def _calculate_relevance_score(self, citation_content: str, source_content: str) -> float:
        """计算相关性分数"""
        # 简化的相关性计算
        citation_words = set(citation_content.lower().split())
        source_words = set(source_content.lower().split())
        
        if not citation_words or not source_words:
            return 0.0
        
        intersection = citation_words.intersection(source_words)
        union = citation_words.union(source_words)
        
        return len(intersection) / len(union) if union else 0.0
    
    def _format_apa_citation(self, source_info: Dict[str, Any]) -> str:
        """格式化APA引用"""
        author = source_info.get('author', 'Unknown Author')
        year = source_info.get('year', 'n.d.')
        title = source_info.get('title', 'Untitled')
        url = source_info.get('url', '')
        
        citation = f"{author} ({year}). {title}."
        if url:
            citation += f" Retrieved from {url}"
        
        return citation
    
    def _format_mla_citation(self, source_info: Dict[str, Any]) -> str:
        """格式化MLA引用"""
        author = source_info.get('author', 'Unknown Author')
        title = source_info.get('title', 'Untitled')
        year = source_info.get('year', 'n.d.')
        url = source_info.get('url', '')
        
        citation = f"{author}. \"{title}.\" {year}."
        if url:
            citation += f" Web. {datetime.now().strftime('%d %b %Y')}."
        
        return citation
    
    def _format_ieee_citation(self, source_info: Dict[str, Any]) -> str:
        """格式化IEEE引用"""
        author = source_info.get('author', 'Unknown Author')
        title = source_info.get('title', 'Untitled')
        year = source_info.get('year', 'n.d.')
        url = source_info.get('url', '')
        
        citation = f"{author}, \"{title},\" {year}."
        if url:
            citation += f" [Online]. Available: {url}"
        
        return citation
    
    def _generate_in_text_citation(self, source_info: Dict[str, Any], style: str) -> str:
        """生成文内引用"""
        author = source_info.get('author', 'Unknown')
        year = source_info.get('year', 'n.d.')
        
        if style.upper() == 'APA':
            return f"({author}, {year})"
        elif style.upper() == 'MLA':
            return f"({author})"
        elif style.upper() == 'IEEE':
            return f"[{source_info.get('id', '1')}]"
        else:
            return f"({author}, {year})"
    
    def _format_apa_bibliography(self, source_info: Dict[str, Any]) -> str:
        """格式化APA参考文献条目"""
        return self._format_apa_citation(source_info)
    
    def _format_mla_bibliography(self, source_info: Dict[str, Any]) -> str:
        """格式化MLA参考文献条目"""
        return self._format_mla_citation(source_info)
    
    def _format_ieee_bibliography(self, source_info: Dict[str, Any]) -> str:
        """格式化IEEE参考文献条目"""
        return self._format_ieee_citation(source_info)
    
    async def generate_citations(self, content: str, sources: Dict[str, Any], style: str = 'APA') -> Dict[str, Any]:
        """生成引用的主要方法"""
        try:
            # 构建引用生成提示
            citation_prompt = f"""
            请为以下内容生成完整的学术引用:
            
            内容: {content}
            
            可用源材料: {json.dumps(sources, ensure_ascii=False, indent=2)}
            
            引用格式: {style}
            
            请按以下步骤执行:
            1. 使用extract_citation_points提取需要引用的内容点
            2. 使用match_sources_to_citations匹配源材料
            3. 使用format_citations格式化引用
            4. 使用generate_bibliography生成参考文献
            5. 使用validate_citations验证引用质量
            
            最终输出完整的引用信息和参考文献列表。
            """
            
            # 使用Agent执行引用生成任务
            response = await self.agent.achat(citation_prompt)
            
            # 更新引用数据库
            self.citation_database['sources'].update(sources)
            
            return {
                'success': True,
                'citations': response.response,
                'style': style,
                'metadata': {
                    'source_count': len(sources),
                    'citation_style': style,
                    'generation_time': datetime.now().isoformat()
                }
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': str(e),
                'content': content
            }

3.8 图片生成Agent (Image Generation Agent)

职责: 技术图表与插图生成

核心能力:

  • 技术架构图生成
  • 流程图创建
  • 代码可视化
  • AI图片生成

图片生成系统:

from llama_index.agent.openai import OpenAIAgent
from llama_index.core.tools import FunctionTool
from llama_index.core.memory import ChatMemoryBuffer
from typing import Dict, List, Any, Optional
import json
import base64
from datetime import datetime
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.graph_objects as go
import plotly.express as px
from PIL import Image, ImageDraw, ImageFont
import io

class ImageGenerationAgent:
    def __init__(self, service_context, image_config=None):
        self.service_context = service_context
        self.image_config = image_config or self._default_image_config()
        
        # 创建图像生成工具
        self.image_tools = self._create_image_tools()
        
        # 初始化图像生成Agent
        self.agent = OpenAIAgent.from_tools(
            tools=self.image_tools,
            llm=service_context.llm,
            memory=ChatMemoryBuffer.from_defaults(token_limit=6000),
            system_prompt="""
            你是一个专业的技术图表和数据可视化专家,负责:
            1. 根据文本描述生成技术图表和流程图
            2. 创建数据可视化图表
            3. 设计架构图和系统图
            4. 优化图像质量和布局
            5. 确保图表的专业性和可读性
            
            设计原则:
            - 清晰性:图表信息清晰易懂
            - 专业性:符合技术文档标准
            - 美观性:视觉效果良好
            - 一致性:风格统一
            - 实用性:服务于内容表达
            """,
            verbose=True
        )
        
        # 图像缓存
        self.image_cache = {}
        
        # 设置绘图样式
        plt.style.use('seaborn-v0_8')
        sns.set_palette("husl")
    
    def _default_image_config(self):
        """默认图像配置"""
        return {
            'default_format': 'svg',
            'default_size': (800, 600),
            'default_dpi': 300,
            'color_scheme': 'professional',
            'font_family': 'Arial',
            'enable_cache': True,
            'max_cache_size': 100
        }
    
    def _create_image_tools(self):
        """创建图像生成工具"""
        
        def generate_flowchart(description: str, chart_type: str = 'process') -> str:
            """生成流程图"""
            try:
                # 解析流程描述
                steps = self._parse_flowchart_description(description)
                
                # 创建流程图
                fig, ax = plt.subplots(figsize=(12, 8))
                ax.set_xlim(0, 10)
                ax.set_ylim(0, len(steps) + 1)
                
                # 绘制流程步骤
                for i, step in enumerate(steps):
                    y_pos = len(steps) - i
                    
                    # 绘制矩形框
                    rect = plt.Rectangle((2, y_pos-0.3), 6, 0.6, 
                                       facecolor='lightblue', 
                                       edgecolor='navy', 
                                       linewidth=2)
                    ax.add_patch(rect)
                    
                    # 添加文本
                    ax.text(5, y_pos, step['text'], 
                           ha='center', va='center', 
                           fontsize=10, fontweight='bold')
                    
                    # 绘制箭头(除了最后一个步骤)
                    if i < len(steps) - 1:
                        ax.arrow(5, y_pos-0.4, 0, -0.3, 
                               head_width=0.2, head_length=0.1, 
                               fc='navy', ec='navy')
                
                ax.set_title(f'流程图: {chart_type}', fontsize=16, fontweight='bold')
                ax.axis('off')
                
                # 保存为SVG
                svg_buffer = io.StringIO()
                plt.savefig(svg_buffer, format='svg', bbox_inches='tight')
                svg_content = svg_buffer.getvalue()
                plt.close()
                
                return f"流程图生成成功:\n{svg_content[:500]}..."  # 返回部分内容作为确认
                
            except Exception as e:
                return f"流程图生成失败: {str(e)}"
        
        def create_architecture_diagram(components: str, relationships: str) -> str:
            """创建架构图"""
            try:
                # 解析组件和关系
                comp_data = json.loads(components)
                rel_data = json.loads(relationships)
                
                fig, ax = plt.subplots(figsize=(14, 10))
                
                # 绘制组件
                positions = {}
                colors = ['lightblue', 'lightgreen', 'lightcoral', 'lightyellow', 'lightpink']
                
                for i, comp in enumerate(comp_data):
                    x = (i % 3) * 4 + 2
                    y = (i // 3) * 3 + 2
                    positions[comp['name']] = (x, y)
                    
                    # 绘制组件框
                    rect = plt.Rectangle((x-1, y-0.5), 2, 1, 
                                       facecolor=colors[i % len(colors)], 
                                       edgecolor='black', 
                                       linewidth=2)
                    ax.add_patch(rect)
                    
                    # 添加组件名称
                    ax.text(x, y, comp['name'], 
                           ha='center', va='center', 
                           fontsize=10, fontweight='bold')
                
                # 绘制关系线
                for rel in rel_data:
                    if rel['from'] in positions and rel['to'] in positions:
                        start_pos = positions[rel['from']]
                        end_pos = positions[rel['to']]
                        
                        ax.annotate('', xy=end_pos, xytext=start_pos,
                                  arrowprops=dict(arrowstyle='->', 
                                                lw=2, color='red'))
                        
                        # 添加关系标签
                        mid_x = (start_pos[0] + end_pos[0]) / 2
                        mid_y = (start_pos[1] + end_pos[1]) / 2
                        ax.text(mid_x, mid_y + 0.2, rel.get('label', ''), 
                               ha='center', va='center', 
                               fontsize=8, style='italic')
                
                ax.set_title('系统架构图', fontsize=16, fontweight='bold')
                ax.set_xlim(-1, 11)
                ax.set_ylim(0, 8)
                ax.axis('off')
                
                # 保存为SVG
                svg_buffer = io.StringIO()
                plt.savefig(svg_buffer, format='svg', bbox_inches='tight')
                svg_content = svg_buffer.getvalue()
                plt.close()
                
                return f"架构图生成成功:\n{svg_content[:500]}..."
                
            except Exception as e:
                return f"架构图生成失败: {str(e)}"
        
        def generate_data_visualization(data: str, chart_type: str = 'bar') -> str:
            """生成数据可视化图表"""
            try:
                # 解析数据
                chart_data = json.loads(data)
                
                if chart_type == 'bar':
                    fig = px.bar(x=chart_data['labels'], 
                               y=chart_data['values'],
                               title=chart_data.get('title', '数据图表'))
                elif chart_type == 'line':
                    fig = px.line(x=chart_data['labels'], 
                                y=chart_data['values'],
                                title=chart_data.get('title', '趋势图'))
                elif chart_type == 'pie':
                    fig = px.pie(values=chart_data['values'], 
                               names=chart_data['labels'],
                               title=chart_data.get('title', '饼图'))
                elif chart_type == 'scatter':
                    fig = px.scatter(x=chart_data['x'], 
                                   y=chart_data['y'],
                                   title=chart_data.get('title', '散点图'))
                else:
                    # 默认柱状图
                    fig = px.bar(x=chart_data['labels'], 
                               y=chart_data['values'],
                               title=chart_data.get('title', '数据图表'))
                
                # 更新布局
                fig.update_layout(
                    font=dict(family="Arial, sans-serif", size=12),
                    title_font_size=16,
                    showlegend=True,
                    width=800,
                    height=600
                )
                
                # 转换为SVG
                svg_content = fig.to_image(format="svg").decode('utf-8')
                
                return f"数据可视化图表生成成功:\n{svg_content[:500]}..."
                
            except Exception as e:
                return f"数据可视化生成失败: {str(e)}"
        
        def create_code_visualization(code: str, language: str = 'python') -> str:
            """创建代码可视化"""
            try:
                # 简化的代码可视化
                fig, ax = plt.subplots(figsize=(12, 8))
                
                # 代码行分析
                lines = code.split('\n')
                line_count = len(lines)
                
                # 创建代码块可视化
                ax.text(0.05, 0.95, f"代码语言: {language}", 
                       transform=ax.transAxes, fontsize=14, fontweight='bold')
                ax.text(0.05, 0.90, f"总行数: {line_count}", 
                       transform=ax.transAxes, fontsize=12)
                
                # 显示代码结构
                y_pos = 0.85
                for i, line in enumerate(lines[:20]):  # 只显示前20行
                    if line.strip():
                        ax.text(0.05, y_pos, f"{i+1:2d}: {line[:60]}...", 
                               transform=ax.transAxes, fontsize=9, 
                               fontfamily='monospace')
                        y_pos -= 0.04
                
                if line_count > 20:
                    ax.text(0.05, y_pos, f"... 还有 {line_count - 20} 行", 
                           transform=ax.transAxes, fontsize=9, style='italic')
                
                ax.set_title('代码结构可视化', fontsize=16, fontweight='bold')
                ax.axis('off')
                
                # 保存为SVG
                svg_buffer = io.StringIO()
                plt.savefig(svg_buffer, format='svg', bbox_inches='tight')
                svg_content = svg_buffer.getvalue()
                plt.close()
                
                return f"代码可视化生成成功:\n{svg_content[:500]}..."
                
            except Exception as e:
                return f"代码可视化生成失败: {str(e)}"
        
        def optimize_image_quality(image_description: str, optimization_goals: str) -> str:
            """优化图像质量"""
            try:
                goals = json.loads(optimization_goals)
                
                optimization_plan = {
                    'image_description': image_description,
                    'optimization_goals': goals,
                    'recommended_improvements': [],
                    'technical_adjustments': []
                }
                
                # 根据目标生成优化建议
                if 'clarity' in goals:
                    optimization_plan['recommended_improvements'].extend([
                        "增加图像分辨率",
                        "优化颜色对比度",
                        "调整字体大小和清晰度"
                    ])
                
                if 'professional' in goals:
                    optimization_plan['recommended_improvements'].extend([
                        "使用企业级配色方案",
                        "统一视觉风格",
                        "添加专业图例和标注"
                    ])
                
                if 'accessibility' in goals:
                    optimization_plan['recommended_improvements'].extend([
                        "确保色盲友好的颜色选择",
                        "提供替代文本描述",
                        "增强颜色对比度"
                    ])
                
                optimization_plan['technical_adjustments'] = [
                    "调整DPI至300以上",
                    "使用矢量格式(SVG)",
                    "优化文件大小",
                    "确保跨平台兼容性"
                ]
                
                return json.dumps(optimization_plan, ensure_ascii=False, indent=2)
                
            except Exception as e:
                return f"图像质量优化失败: {str(e)}"
        
        return [
            FunctionTool.from_defaults(
                fn=generate_flowchart,
                name="generate_flowchart",
                description="根据描述生成流程图"
            ),
            FunctionTool.from_defaults(
                fn=create_architecture_diagram,
                name="create_architecture_diagram",
                description="创建系统架构图"
            ),
            FunctionTool.from_defaults(
                fn=generate_data_visualization,
                name="generate_data_visualization",
                description="生成数据可视化图表"
            ),
            FunctionTool.from_defaults(
                fn=create_code_visualization,
                name="create_code_visualization",
                description="创建代码结构可视化"
            ),
            FunctionTool.from_defaults(
                fn=optimize_image_quality,
                name="optimize_image_quality",
                description="优化图像质量和视觉效果"
            )
        ]
    
    def _parse_flowchart_description(self, description: str) -> List[Dict[str, str]]:
        """解析流程图描述"""
        # 简化的解析逻辑
        steps = []
        lines = description.split('\n')
        
        for line in lines:
            line = line.strip()
            if line and not line.startswith('#'):
                steps.append({
                    'text': line,
                    'type': 'process'
                })
        
        return steps
    
    async def generate_images(self, content_sections: List[Dict[str, Any]]) -> Dict[str, Any]:
        """生成图像的主要方法"""
        try:
            generated_images = []
            
            for section in content_sections:
                if section.get('needs_diagram'):
                    # 构建图表生成提示
                    diagram_prompt = f"""
                    请为以下内容生成技术图表:
                    
                    内容: {section.get('content', '')}
                    图表类型: {section.get('diagram_type', 'flowchart')}
                    规格说明: {section.get('diagram_spec', '')}
                    
                    请使用适当的工具生成专业的技术图表。
                    """
                    
                    response = await self.agent.achat(diagram_prompt)
                    generated_images.append({
                        'type': 'diagram',
                        'content': response.response,
                        'section_id': section.get('id')
                    })
                
                if section.get('needs_chart'):
                    # 生成数据图表
                    chart_prompt = f"""
                    请为以下数据生成可视化图表:
                    
                    数据: {json.dumps(section.get('data', {}), ensure_ascii=False)}
                    图表类型: {section.get('chart_type', 'bar')}
                    
                    请使用generate_data_visualization工具生成图表。
                    """
                    
                    response = await self.agent.achat(chart_prompt)
                    generated_images.append({
                        'type': 'chart',
                        'content': response.response,
                        'section_id': section.get('id')
                    })
                
                if section.get('needs_code_viz'):
                    # 生成代码可视化
                    code_viz_prompt = f"""
                    请为以下代码生成可视化:
                    
                    代码: {section.get('code', '')}
                    语言: {section.get('language', 'python')}
                    
                    请使用create_code_visualization工具生成代码可视化。
                    """
                    
                    response = await self.agent.achat(code_viz_prompt)
                    generated_images.append({
                        'type': 'code_visualization',
                        'content': response.response,
                        'section_id': section.get('id')
                    })
            
            return {
                'success': True,
                'images': generated_images,
                'metadata': {
                    'total_images': len(generated_images),
                    'generation_time': datetime.now().isoformat()
                }
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': str(e),
                'content_sections': content_sections
            }

3.9 审校Agent (Review Agent)

职责: 内容质量控制与错误检查

核心能力:

  • 语法检查与修正
  • 事实准确性验证
  • 逻辑一致性检查
  • 技术准确性审核

质量控制流程:

from llama_index.agent.openai import OpenAIAgent
from llama_index.core.tools import FunctionTool
from llama_index.llms.openai import OpenAI
import re
import asyncio

class ReviewAgent:
    def __init__(self):
        # 初始化LLM
        self.llm = OpenAI(model="gpt-4", temperature=0.1)
        
        # 创建审校工具
        self.review_tools = [
            FunctionTool.from_defaults(fn=self.check_grammar),
            FunctionTool.from_defaults(fn=self.verify_facts),
            FunctionTool.from_defaults(fn=self.check_logic),
            FunctionTool.from_defaults(fn=self.review_technical_accuracy),
            FunctionTool.from_defaults(fn=self.check_consistency)
        ]
        
        # 初始化Agent
        self.agent = OpenAIAgent.from_tools(
            self.review_tools,
            llm=self.llm,
            verbose=True,
            system_prompt="""
            你是一个专业的内容审校专家。你的任务是:
            1. 检查语法和拼写错误
            2. 验证事实准确性
            3. 检查逻辑一致性
            4. 审核技术准确性
            5. 确保内容一致性
            
            请提供详细的审校报告和改进建议。
            """
        )
    
    def check_grammar(self, content: str) -> dict:
        """检查语法和拼写错误"""
        issues = []
        
        # 基本语法检查
        common_errors = {
            r'\b(it\'s)\b(?=\s+[A-Z])': "应该使用 'its' 而不是 'it's'",
            r'\b(your)\b(?=\s+going)': "应该使用 'you're' 而不是 'your'",
            r'\b(there)\b(?=\s+are)': "检查是否应该使用 'their'",
        }
        
        for pattern, suggestion in common_errors.items():
            matches = re.finditer(pattern, content, re.IGNORECASE)
            for match in matches:
                issues.append({
                    'type': 'grammar',
                    'position': match.start(),
                    'text': match.group(),
                    'suggestion': suggestion
                })
        
        return {
            'category': 'grammar',
            'issues': issues,
            'score': max(0, 100 - len(issues) * 5)
        }
    
    def verify_facts(self, content: str, sources: list = None) -> dict:
        """验证事实准确性"""
        issues = []
        
        # 检查数字和统计数据
        number_pattern = r'\b\d+(?:\.\d+)?%?\b'
        numbers = re.findall(number_pattern, content)
        
        for number in numbers:
            # 这里可以集成外部事实检查API
            issues.append({
                'type': 'fact_check',
                'text': number,
                'suggestion': f'请验证数字 "{number}" 的准确性'
            })
        
        return {
            'category': 'facts',
            'issues': issues[:3],  # 限制问题数量
            'score': max(0, 100 - len(issues) * 10)
        }
    
    def check_logic(self, content: str) -> dict:
        """检查逻辑一致性"""
        issues = []
        
        # 检查逻辑连接词
        logical_connectors = ['however', 'therefore', 'moreover', 'furthermore']
        sentences = content.split('.')
        
        for i, sentence in enumerate(sentences[:-1]):
            next_sentence = sentences[i + 1].strip()
            if any(conn in sentence.lower() for conn in logical_connectors):
                if not next_sentence:
                    issues.append({
                        'type': 'logic',
                        'position': i,
                        'suggestion': '逻辑连接词后缺少相应的论述'
                    })
        
        return {
            'category': 'logic',
            'issues': issues,
            'score': max(0, 100 - len(issues) * 15)
        }
    
    def review_technical_accuracy(self, content: str) -> dict:
        """审核技术准确性"""
        issues = []
        
        # 检查技术术语的一致性
        tech_terms = ['API', 'REST', 'JSON', 'HTTP', 'HTTPS', 'SQL']
        term_usage = {}
        
        for term in tech_terms:
            variations = re.findall(rf'\b{term}\w*\b', content, re.IGNORECASE)
            if variations:
                term_usage[term] = variations
        
        for term, variations in term_usage.items():
            unique_variations = set(v.lower() for v in variations)
            if len(unique_variations) > 1:
                issues.append({
                    'type': 'technical',
                    'term': term,
                    'variations': list(unique_variations),
                    'suggestion': f'术语 "{term}" 使用不一致'
                })
        
        return {
            'category': 'technical',
            'issues': issues,
            'score': max(0, 100 - len(issues) * 20)
        }
    
    def check_consistency(self, content: str) -> dict:
        """检查内容一致性"""
        issues = []
        
        # 检查标题层级
        headers = re.findall(r'^(#{1,6})\s+(.+)$', content, re.MULTILINE)
        
        for i, (level, title) in enumerate(headers[:-1]):
            next_level, next_title = headers[i + 1]
            level_diff = len(next_level) - len(level)
            
            if level_diff > 1:
                issues.append({
                    'type': 'consistency',
                    'position': i,
                    'suggestion': f'标题层级跳跃过大: {title} -> {next_title}'
                })
        
        return {
            'category': 'consistency',
            'issues': issues,
            'score': max(0, 100 - len(issues) * 10)
        }
    
    async def review_content(self, content: str, sources: list = None):
        """执行完整的内容审校"""
        try:
            # 使用Agent进行综合审校
            review_prompt = f"""
            请对以下内容进行全面审校:
            
            内容:
            {content}
            
            请检查:
            1. 语法和拼写
            2. 事实准确性
            3. 逻辑一致性
            4. 技术准确性
            5. 内容一致性
            
            提供详细的审校报告和改进建议。
            """
            
            response = await self.agent.achat(review_prompt)
            
            # 执行各项检查
            grammar_result = self.check_grammar(content)
            facts_result = self.verify_facts(content, sources)
            logic_result = self.check_logic(content)
            technical_result = self.review_technical_accuracy(content)
            consistency_result = self.check_consistency(content)
            
            # 计算总体质量分数
            total_score = (
                grammar_result['score'] * 0.2 +
                facts_result['score'] * 0.3 +
                logic_result['score'] * 0.2 +
                technical_result['score'] * 0.2 +
                consistency_result['score'] * 0.1
            )
            
            return {
                'success': True,
                'agent_review': str(response),
                'detailed_analysis': {
                    'grammar': grammar_result,
                    'facts': facts_result,
                    'logic': logic_result,
                    'technical': technical_result,
                    'consistency': consistency_result
                },
                'overall_score': round(total_score, 2),
                'recommendations': self._generate_recommendations(total_score)
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': str(e),
                'overall_score': 0
            }
    
    def _generate_recommendations(self, score: float) -> list:
        """根据分数生成改进建议"""
        if score >= 90:
            return ['内容质量优秀,可以发布']
        elif score >= 80:
            return ['内容质量良好,建议小幅修改后发布']
        elif score >= 70:
            return ['内容需要适度修改', '重点关注事实准确性和技术细节']
        else:
            return ['内容需要大幅修改', '建议重新审视逻辑结构', '加强事实验证']

3.10 格式化Agent (Formatting Agent)

职责: Markdown格式化与多语言输出

核心能力:

  • Markdown标准化
  • 多语言格式适配
  • 响应式布局优化
  • 元数据嵌入

格式化处理:

from llama_index.agent.openai import OpenAIAgent
from llama_index.core.tools import FunctionTool
from llama_index.llms.openai import OpenAI
import re
import yaml
import json
from datetime import datetime

class FormattingAgent:
    def __init__(self):
        # 初始化LLM
        self.llm = OpenAI(model="gpt-4", temperature=0.1)
        
        # 创建格式化工具
        self.formatting_tools = [
            FunctionTool.from_defaults(fn=self.format_markdown),
            FunctionTool.from_defaults(fn=self.adapt_multilanguage),
            FunctionTool.from_defaults(fn=self.inject_metadata),
            FunctionTool.from_defaults(fn=self.optimize_responsive_layout),
            FunctionTool.from_defaults(fn=self.standardize_formatting)
        ]
        
        # 初始化Agent
        self.agent = OpenAIAgent.from_tools(
            self.formatting_tools,
            llm=self.llm,
            verbose=True,
            system_prompt="""
            你是一个专业的内容格式化专家。你的任务是:
            1. 标准化Markdown格式
            2. 适配多语言格式
            3. 注入元数据
            4. 优化响应式布局
            5. 确保格式一致性
            
            请确保输出格式规范、美观且符合标准。
            """
        )
    
    def format_markdown(self, content: str, style: str = "standard") -> dict:
        """标准化Markdown格式"""
        try:
            formatted_content = content
            
            # 标准化标题格式
            formatted_content = re.sub(r'^(#{1,6})([^\s])', r'`\1`(\2) \2', formatted_content, flags=re.MULTILINE)
            
            # 标准化列表格式
            formatted_content = re.sub(r'^(\s*)-([^\s])', r'\1- \2', formatted_content, flags=re.MULTILINE)
            formatted_content = re.sub(r'^(\s*)\*([^\s])', r'\1* \2', formatted_content, flags=re.MULTILINE)
            
            # 标准化代码块
            formatted_content = re.sub(r'```(\w+)\n', r'```\1\n', formatted_content)
            
            # 标准化链接格式
            formatted_content = re.sub(r'\[([^\]]+)\]\s*\(([^\)]+)\)', r'\1', formatted_content)
            
            # 标准化表格格式
            lines = formatted_content.split('\n')
            formatted_lines = []
            
            for line in lines:
                if '|' in line and line.strip().startswith('|'):
                    # 格式化表格行
                    cells = [cell.strip() for cell in line.split('|')]
                    formatted_line = '| ' + ' | '.join(cells[1:-1]) + ' |'
                    formatted_lines.append(formatted_line)
                else:
                    formatted_lines.append(line)
            
            formatted_content = '\n'.join(formatted_lines)
            
            return {
                'success': True,
                'formatted_content': formatted_content,
                'style': style,
                'changes_made': ['标题格式化', '列表格式化', '代码块格式化', '链接格式化', '表格格式化']
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': str(e),
                'formatted_content': content
            }
    
    def adapt_multilanguage(self, content: str, target_language: str = "zh-CN") -> dict:
        """适配多语言格式"""
        try:
            adapted_content = content
            
            # 语言特定的格式调整
            if target_language in ['zh-CN', 'zh-TW', 'ja', 'ko']:
                # 中日韩语言的标点符号调整
                adapted_content = re.sub(r'([\u4e00-\u9fff])\s*,\s*', r'\1,', adapted_content)
                adapted_content = re.sub(r'([\u4e00-\u9fff])\s*\.\s*', r'\1。', adapted_content)
                adapted_content = re.sub(r'([\u4e00-\u9fff])\s*;\s*', r'\1;', adapted_content)
                adapted_content = re.sub(r'([\u4e00-\u9fff])\s*:\s*', r'\1:', adapted_content)
                
                # 引号调整
                adapted_content = re.sub(r'"([^"]+)"', r'"\1"', adapted_content)
                
            elif target_language in ['ar', 'he']:
                # 从右到左语言的特殊处理
                # 这里可以添加RTL语言的特殊格式处理
                pass
            
            # 日期格式本地化
            date_patterns = {
                'zh-CN': r'(\d{4})年(\d{1,2})月(\d{1,2})日',
                'en-US': r'(\w+) (\d{1,2}), (\d{4})',
                'ja': r'(\d{4})年(\d{1,2})月(\d{1,2})日'
            }
            
            return {
                'success': True,
                'adapted_content': adapted_content,
                'target_language': target_language,
                'adaptations_made': ['标点符号本地化', '引号格式调整', '日期格式本地化']
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': str(e),
                'adapted_content': content
            }
    
    def inject_metadata(self, content: str, metadata: dict) -> dict:
        """注入元数据"""
        try:
            # 构建YAML前置元数据
            frontmatter = {
                'title': metadata.get('title', ''),
                'author': metadata.get('author', ''),
                'date': metadata.get('date', datetime.now().isoformat()),
                'tags': metadata.get('tags', []),
                'categories': metadata.get('categories', []),
                'description': metadata.get('description', ''),
                'keywords': metadata.get('keywords', []),
                'lang': metadata.get('language', 'zh-CN')
            }
            
            # 生成YAML前置内容
            yaml_frontmatter = yaml.dump(frontmatter, default_flow_style=False, allow_unicode=True)
            
            # 组合最终内容
            final_content = f"---\n{yaml_frontmatter}---\n\n{content}"
            
            # 在内容末尾添加元数据注释
            metadata_comment = f"\n\n<!-- Generated on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} -->"
            final_content += metadata_comment
            
            return {
                'success': True,
                'content_with_metadata': final_content,
                'frontmatter': frontmatter,
                'metadata_injected': True
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': str(e),
                'content_with_metadata': content
            }
    
    def optimize_responsive_layout(self, content: str) -> dict:
        """优化响应式布局"""
        try:
            optimized_content = content
            
            # 添加响应式图片
            img_pattern = r'!\[([^\]]*)\]\(([^\)]+)\)'
            def replace_img(match):
                alt_text = match.group(1)
                img_url = match.group(2)
                return f'<img src="{img_url}" alt="{alt_text}" style="max-width: 100%; height: auto;" />'
            
            optimized_content = re.sub(img_pattern, replace_img, optimized_content)
            
            # 添加响应式表格包装
            table_pattern = r'(\|[^\n]+\|\n\|[-\s\|]+\|\n(?:\|[^\n]+\|\n?)+)'
            def wrap_table(match):
                table_content = match.group(1)
                return f'<div style="overflow-x: auto;">\n\n{table_content}\n\n</div>'
            
            optimized_content = re.sub(table_pattern, wrap_table, optimized_content, flags=re.MULTILINE)
            
            # 添加代码块的响应式样式
            code_pattern = r'```([^\n]*)\n([^`]+)```'
            def wrap_code(match):
                lang = match.group(1)
                code_content = match.group(2)
                return f'```{lang}\n{code_content}```\n<style>\npre \n</style>'
            
            optimized_content = re.sub(code_pattern, wrap_code, optimized_content, flags=re.DOTALL)
            
            return {
                'success': True,
                'optimized_content': optimized_content,
                'optimizations': ['响应式图片', '响应式表格', '响应式代码块']
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': str(e),
                'optimized_content': content
            }
    
    def standardize_formatting(self, content: str) -> dict:
        """标准化格式"""
        try:
            standardized_content = content
            
            # 统一行尾
            standardized_content = re.sub(r'\r\n|\r', '\n', standardized_content)
            
            # 移除多余空行
            standardized_content = re.sub(r'\n{3,}', '\n\n', standardized_content)
            
            # 统一缩进(使用空格)
            lines = standardized_content.split('\n')
            standardized_lines = []
            
            for line in lines:
                # 将制表符转换为4个空格
                standardized_line = line.expandtabs(4)
                standardized_lines.append(standardized_line)
            
            standardized_content = '\n'.join(standardized_lines)
            
            # 移除行尾空格
            standardized_content = re.sub(r' +$', '', standardized_content, flags=re.MULTILINE)
            
            return {
                'success': True,
                'standardized_content': standardized_content,
                'standardizations': ['统一行尾', '移除多余空行', '统一缩进', '移除行尾空格']
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': str(e),
                'standardized_content': content
            }
    
    async def format_output(self, content: str, format_spec: dict):
        """执行完整的格式化处理"""
        try:
            # 使用Agent进行综合格式化
            format_prompt = f"""
            请对以下内容进行格式化处理:
            
            内容:
            {content}
            
            格式要求:
            - 样式: {format_spec.get('style', 'standard')}
            - 语言: {format_spec.get('language', 'zh-CN')}
            - 包含元数据: {format_spec.get('include_metadata', True)}
            - 响应式优化: {format_spec.get('responsive', True)}
            
            请提供格式化后的内容。
            """
            
            response = await self.agent.achat(format_prompt)
            
            # 执行各项格式化
            current_content = content
            
            # 1. Markdown格式化
            markdown_result = self.format_markdown(
                current_content, 
                format_spec.get('style', 'standard')
            )
            if markdown_result['success']:
                current_content = markdown_result['formatted_content']
            
            # 2. 多语言适配
            if format_spec.get('language', 'zh-CN') != 'en':
                multilang_result = self.adapt_multilanguage(
                    current_content, 
                    format_spec.get('language', 'zh-CN')
                )
                if multilang_result['success']:
                    current_content = multilang_result['adapted_content']
            
            # 3. 响应式优化
            if format_spec.get('responsive', True):
                responsive_result = self.optimize_responsive_layout(current_content)
                if responsive_result['success']:
                    current_content = responsive_result['optimized_content']
            
            # 4. 标准化格式
            standard_result = self.standardize_formatting(current_content)
            if standard_result['success']:
                current_content = standard_result['standardized_content']
            
            # 5. 元数据注入
            if format_spec.get('include_metadata', True):
                metadata_result = self.inject_metadata(
                    current_content, 
                    format_spec.get('metadata', {})
                )
                if metadata_result['success']:
                    current_content = metadata_result['content_with_metadata']
            
            return {
                'success': True,
                'agent_response': str(response),
                'formatted_content': current_content,
                'format_spec': format_spec,
                'processing_steps': [
                    'Markdown格式化',
                    '多语言适配',
                    '响应式优化',
                    '标准化格式',
                    '元数据注入'
                ]
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': str(e),
                'formatted_content': content
            }

3.11 发布Agent (Publishing Agent)

职责: 内容发布与分发管理

核心能力:

  • 多平台发布支持
  • 发布时机优化
  • 内容版本管理
  • 发布状态监控

发布系统设计:

from llama_index.agent.openai import OpenAIAgent
from llama_index.core.tools import FunctionTool
from llama_index.llms.openai import OpenAI
import asyncio
import hashlib
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional

class PublishingAgent:
    def __init__(self):
        # 初始化LLM
        self.llm = OpenAI(model="gpt-4", temperature=0.1)
        
        # 创建发布工具
        self.publishing_tools = [
            FunctionTool.from_defaults(fn=self.create_version),
            FunctionTool.from_defaults(fn=self.adapt_for_platform),
            FunctionTool.from_defaults(fn=self.schedule_publish),
            FunctionTool.from_defaults(fn=self.immediate_publish),
            FunctionTool.from_defaults(fn=self.monitor_publish_status)
        ]
        
        # 初始化Agent
        self.agent = OpenAIAgent.from_tools(
            self.publishing_tools,
            llm=self.llm,
            verbose=True,
            system_prompt="""
            你是一个专业的内容发布管理专家。你的任务是:
            1. 管理内容版本
            2. 适配不同发布平台
            3. 调度发布时机
            4. 监控发布状态
            5. 优化发布策略
            
            请确保发布流程高效、可靠且符合各平台要求。
            """
        )
        
        # 平台配置
        self.platform_configs = {
            'blog': {
                'max_length': 10000,
                'supports_html': True,
                'supports_markdown': True,
                'image_formats': ['jpg', 'png', 'gif', 'webp']
            },
            'medium': {
                'max_length': 8000,
                'supports_html': False,
                'supports_markdown': True,
                'image_formats': ['jpg', 'png', 'gif']
            },
            'linkedin': {
                'max_length': 3000,
                'supports_html': False,
                'supports_markdown': False,
                'image_formats': ['jpg', 'png']
            },
            'twitter': {
                'max_length': 280,
                'supports_html': False,
                'supports_markdown': False,
                'image_formats': ['jpg', 'png', 'gif']
            }
        }
        
        # 版本存储
        self.versions = {}
        self.publish_history = []
    
    def create_version(self, content: str, metadata: dict = None) -> dict:
        """创建内容版本"""
        try:
            # 生成版本ID
            content_hash = hashlib.md5(content.encode()).hexdigest()[:8]
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            version_id = f"v_{timestamp}_{content_hash}"
            
            # 创建版本记录
            version = {
                'id': version_id,
                'content': content,
                'metadata': metadata or {},
                'created_at': datetime.now().isoformat(),
                'content_length': len(content),
                'word_count': len(content.split()),
                'status': 'created'
            }
            
            # 存储版本
            self.versions[version_id] = version
            
            return {
                'success': True,
                'version': version,
                'version_id': version_id
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': str(e),
                'version_id': None
            }
    
    def adapt_for_platform(self, content: str, platform: str) -> dict:
        """为特定平台适配内容"""
        try:
            if platform not in self.platform_configs:
                return {
                    'success': False,
                    'error': f'不支持的平台: {platform}',
                    'adapted_content': content
                }
            
            config = self.platform_configs[platform]
            adapted_content = content
            adaptations = []
            
            # 长度限制
            if len(adapted_content) > config['max_length']:
                adapted_content = adapted_content[:config['max_length']] + '...'
                adaptations.append(f'内容截断至{config["max_length"]}字符')
            
            # Markdown支持
            if not config['supports_markdown']:
                # 移除Markdown格式
                import re
                adapted_content = re.sub(r'\*\*([^*]+)\*\*', r'\1', adapted_content)  # 粗体
                adapted_content = re.sub(r'\*([^*]+)\*', r'\1', adapted_content)    # 斜体
                adapted_content = re.sub(r'`([^`]+)`', r'\1', adapted_content)      # 代码
                adapted_content = re.sub(r'#{1,6}\s*', '', adapted_content)        # 标题
                adapted_content = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', adapted_content)  # 链接
                adaptations.append('移除Markdown格式')
            
            # HTML支持
            if not config['supports_html']:
                import re
                adapted_content = re.sub(r'<[^>]+>', '', adapted_content)
                adaptations.append('移除HTML标签')
            
            # 平台特定优化
            if platform == 'twitter':
                # Twitter特殊处理:添加话题标签
                lines = adapted_content.split('\n')
                if lines:
                    first_line = lines[0]
                    if len(first_line) < 200:
                        adapted_content = first_line + ' #技术分享 #AI'
                        adaptations.append('添加Twitter话题标签')
            
            elif platform == 'linkedin':
                # LinkedIn特殊处理:专业化语调
                if not adapted_content.startswith('在'):
                    adapted_content = '在技术发展的今天,' + adapted_content
                    adaptations.append('添加LinkedIn专业开头')
            
            return {
                'success': True,
                'adapted_content': adapted_content,
                'platform': platform,
                'adaptations': adaptations,
                'original_length': len(content),
                'adapted_length': len(adapted_content)
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': str(e),
                'adapted_content': content
            }
    
    def schedule_publish(self, content_versions: dict, schedule_time: str) -> dict:
        """调度发布"""
        try:
            from datetime import datetime
            
            # 解析调度时间
            if isinstance(schedule_time, str):
                schedule_dt = datetime.fromisoformat(schedule_time)
            else:
                schedule_dt = schedule_time
            
            # 创建调度任务
            schedule_id = f"schedule_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
            
            schedule_task = {
                'id': schedule_id,
                'content_versions': content_versions,
                'schedule_time': schedule_dt.isoformat(),
                'status': 'scheduled',
                'created_at': datetime.now().isoformat()
            }
            
            # 这里可以集成实际的任务调度系统(如Celery)
            # 现在只是模拟调度
            
            return {
                'success': True,
                'schedule_id': schedule_id,
                'schedule_task': schedule_task,
                'message': f'已调度在 {schedule_dt} 发布'
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': str(e),
                'schedule_id': None
            }
    
    async def immediate_publish(self, content_versions: dict) -> dict:
        """立即发布"""
        try:
            publish_results = {}
            publish_id = f"publish_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
            
            for platform, content in content_versions.items():
                try:
                    # 模拟发布到各平台
                    # 实际实现中这里会调用各平台的API
                    
                    if platform == 'blog':
                        result = await self._publish_to_blog(content)
                    elif platform == 'medium':
                        result = await self._publish_to_medium(content)
                    elif platform == 'linkedin':
                        result = await self._publish_to_linkedin(content)
                    elif platform == 'twitter':
                        result = await self._publish_to_twitter(content)
                    else:
                        result = {
                            'success': False,
                            'error': f'不支持的平台: {platform}'
                        }
                    
                    publish_results[platform] = result
                    
                except Exception as e:
                    publish_results[platform] = {
                        'success': False,
                        'error': str(e),
                        'platform': platform
                    }
            
            # 记录发布历史
            publish_record = {
                'id': publish_id,
                'timestamp': datetime.now().isoformat(),
                'platforms': list(content_versions.keys()),
                'results': publish_results,
                'success_count': sum(1 for r in publish_results.values() if r.get('success')),
                'total_count': len(publish_results)
            }
            
            self.publish_history.append(publish_record)
            
            return {
                'success': True,
                'publish_id': publish_id,
                'results': publish_results,
                'summary': publish_record
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': str(e),
                'publish_id': None
            }
    
    async def _publish_to_blog(self, content: str) -> dict:
        """发布到博客平台"""
        # 模拟博客发布
        await asyncio.sleep(0.5)  # 模拟网络延迟
        return {
            'success': True,
            'platform': 'blog',
            'url': 'https://blog.example.com/post/123',
            'published_at': datetime.now().isoformat()
        }
    
    async def _publish_to_medium(self, content: str) -> dict:
        """发布到Medium"""
        await asyncio.sleep(0.8)
        return {
            'success': True,
            'platform': 'medium',
            'url': 'https://medium.com/@user/post-abc123',
            'published_at': datetime.now().isoformat()
        }
    
    async def _publish_to_linkedin(self, content: str) -> dict:
        """发布到LinkedIn"""
        await asyncio.sleep(0.6)
        return {
            'success': True,
            'platform': 'linkedin',
            'url': 'https://linkedin.com/posts/user_post_123',
            'published_at': datetime.now().isoformat()
        }
    
    async def _publish_to_twitter(self, content: str) -> dict:
        """发布到Twitter"""
        await asyncio.sleep(0.3)
        return {
            'success': True,
            'platform': 'twitter',
            'url': 'https://twitter.com/user/status/123456789',
            'published_at': datetime.now().isoformat()
        }
    
    def monitor_publish_status(self, publish_id: str) -> dict:
        """监控发布状态"""
        try:
            # 查找发布记录
            publish_record = None
            for record in self.publish_history:
                if record['id'] == publish_id:
                    publish_record = record
                    break
            
            if not publish_record:
                return {
                    'success': False,
                    'error': f'未找到发布记录: {publish_id}',
                    'status': 'not_found'
                }
            
            # 分析发布状态
            total_platforms = publish_record['total_count']
            success_platforms = publish_record['success_count']
            failure_platforms = total_platforms - success_platforms
            
            status = 'completed' if failure_platforms == 0 else 'partial_failure' if success_platforms > 0 else 'failed'
            
            return {
                'success': True,
                'publish_id': publish_id,
                'status': status,
                'total_platforms': total_platforms,
                'success_platforms': success_platforms,
                'failure_platforms': failure_platforms,
                'publish_record': publish_record
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': str(e),
                'status': 'error'
            }
    
    async def publish_content(self, content: str, publish_config: dict):
        """执行完整的发布流程"""
        try:
            # 使用Agent进行发布管理
            publish_prompt = f"""
            请管理以下内容的发布流程:
            
            内容长度: {len(content)} 字符
            目标平台: {publish_config.get('platforms', [])}
            发布时间: {publish_config.get('schedule_time', '立即发布')}
            
            请提供发布建议和优化方案。
            """
            
            response = await self.agent.achat(publish_prompt)
            
            # 1. 创建版本
            version_result = self.create_version(content, publish_config.get('metadata', {}))
            if not version_result['success']:
                return version_result
            
            version_id = version_result['version_id']
            
            # 2. 平台适配
            adapted_content = {}
            platforms = publish_config.get('platforms', ['blog'])
            
            for platform in platforms:
                adapt_result = self.adapt_for_platform(content, platform)
                if adapt_result['success']:
                    adapted_content[platform] = adapt_result['adapted_content']
                else:
                    adapted_content[platform] = content  # 使用原内容作为备选
            
            # 3. 发布执行
            if publish_config.get('schedule_time'):
                # 调度发布
                publish_result = self.schedule_publish(adapted_content, publish_config['schedule_time'])
            else:
                # 立即发布
                publish_result = await self.immediate_publish(adapted_content)
            
            return {
                'success': True,
                'agent_response': str(response),
                'version_id': version_id,
                'adapted_content': {k: len(v) for k, v in adapted_content.items()},  # 只返回长度信息
                'publish_result': publish_result,
                'platforms': platforms
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': str(e),
                'version_id': None
            }

4. 技术实现细节

4.1 RAG检索增强架构

from llama_index.core import VectorStoreIndex, ServiceContext, Document
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.core.postprocessor import SentenceTransformerRerank
import chromadb

class RAGEngine:
    def __init__(self):
        # 初始化LLM和嵌入模型
        self.llm = OpenAI(model="gpt-4", temperature=0.1)
        self.embed_model = OpenAIEmbedding(model="text-embedding-ada-002")
        
        # 初始化向量存储
        chroma_client = chromadb.PersistentClient(path="./chroma_db")
        chroma_collection = chroma_client.get_or_create_collection("blog_knowledge")
        self.vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
        
        # 配置服务上下文
        self.service_context = ServiceContext.from_defaults(
            llm=self.llm,
            embed_model=self.embed_model
        )
        
        # 初始化重排序器
        self.reranker = SentenceTransformerRerank(
            model="cross-encoder/ms-marco-MiniLM-L-12-v2",
            top_n=10
        )
    
    async def build_index(self, documents):
        """构建向量索引"""
        self.index = VectorStoreIndex.from_documents(
            documents,
            service_context=self.service_context,
            vector_store=self.vector_store
        )
        return self.index
    
    async def retrieve_and_generate(self, query, k=10):
        """检索并生成回答"""
        # 创建查询引擎
        query_engine = self.index.as_query_engine(
            similarity_top_k=k*2,
            node_postprocessors=[self.reranker],
            response_mode="tree_summarize"
        )
        
        # 执行查询
        response = await query_engine.aquery(query)
        
        return {
            'answer': response.response,
            'source_nodes': response.source_nodes,
            'metadata': response.metadata
        }

4.2 多源数据融合算法

from llama_index.readers.web import SimpleWebPageReader
from llama_index.readers.arxiv import ArxivReader
from llama_index.readers.notion import NotionPageReader
from llama_index.core.schema import Document
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import (
    TitleExtractor,
    QuestionsAnsweredExtractor,
    SummaryExtractor
)
from llama_index.core.ingestion import IngestionPipeline

class MultiSourceDataFusion:
    def __init__(self, service_context):
        self.service_context = service_context
        
        # 初始化各种数据读取器
        self.web_reader = SimpleWebPageReader(html_to_text=True)
        self.arxiv_reader = ArxivReader()
        self.notion_reader = NotionPageReader(integration_token="your_token")
        
        # 初始化文档处理管道
        self.node_parser = SentenceSplitter(
            chunk_size=1024,
            chunk_overlap=200
        )
        
        # 初始化元数据提取器
        self.extractors = [
            TitleExtractor(nodes=5),
            QuestionsAnsweredExtractor(questions=3),
            SummaryExtractor(summaries=["prev", "self"])
        ]
        
        # 创建数据处理管道
        self.pipeline = IngestionPipeline(
            transformations=[
                self.node_parser,
                *self.extractors
            ]
        )
    
    async def load_multi_source_data(self, sources_config):
        """从多个数据源加载数据"""
        all_documents = []
        
        # Web数据源
        if 'web_urls' in sources_config:
            web_docs = self.web_reader.load_data(sources_config['web_urls'])
            for doc in web_docs:
                doc.metadata['source_type'] = 'web'
                doc.metadata['credibility_score'] = 0.7
            all_documents.extend(web_docs)
        
        # arXiv学术论文
        if 'arxiv_papers' in sources_config:
            arxiv_docs = self.arxiv_reader.load_papers(
                sources_config['arxiv_papers']
            )
            for doc in arxiv_docs:
                doc.metadata['source_type'] = 'academic'
                doc.metadata['credibility_score'] = 0.9
            all_documents.extend(arxiv_docs)
        
        # Notion知识库
        if 'notion_pages' in sources_config:
            notion_docs = self.notion_reader.load_data(
                page_ids=sources_config['notion_pages']
            )
            for doc in notion_docs:
                doc.metadata['source_type'] = 'knowledge_base'
                doc.metadata['credibility_score'] = 0.8
            all_documents.extend(notion_docs)
        
        return all_documents
    
    async def process_and_fuse(self, documents):
        """处理和融合多源数据"""
        # 通过管道处理文档
        processed_nodes = await self.pipeline.arun(documents=documents)
        
        # 基于相似度去重
        unique_nodes = self._deduplicate_nodes(processed_nodes)
        
        # 基于可信度评分排序
        ranked_nodes = self._rank_by_credibility(unique_nodes)
        
        return ranked_nodes
    
    def _deduplicate_nodes(self, nodes, similarity_threshold=0.85):
        """基于语义相似度去重"""
        unique_nodes = []
        
        for node in nodes:
            is_duplicate = False
            for unique_node in unique_nodes:
                # 使用嵌入模型计算相似度
                similarity = self._calculate_similarity(
                    node.text, unique_node.text
                )
                if similarity > similarity_threshold:
                    # 保留可信度更高的节点
                    if (node.metadata.get('credibility_score', 0) > 
                        unique_node.metadata.get('credibility_score', 0)):
                        unique_nodes.remove(unique_node)
                        unique_nodes.append(node)
                    is_duplicate = True
                    break
            
            if not is_duplicate:
                unique_nodes.append(node)
        
        return unique_nodes
    
    def _rank_by_credibility(self, nodes):
        """基于可信度评分排序"""
        return sorted(
            nodes,
            key=lambda x: x.metadata.get('credibility_score', 0),
            reverse=True
        )
    
    def _calculate_similarity(self, text1, text2):
        """计算文本相似度"""
        # 使用服务上下文中的嵌入模型
        embed1 = self.service_context.embed_model.get_text_embedding(text1)
        embed2 = self.service_context.embed_model.get_text_embedding(text2)
        
        # 计算余弦相似度
        import numpy as np
        return np.dot(embed1, embed2) / (np.linalg.norm(embed1) * np.linalg.norm(embed2))

4.3 Agent间通信协议

from llama_index.core.workflow import (
    Event,
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Context
)
from llama_index.agent.openai import OpenAIAgent
from llama_index.core.tools import QueryEngineTool, ToolMetadata
from typing import Any, Dict
import asyncio
from datetime import datetime

# 定义工作流事件
class PlanningEvent(Event):
    plan: Dict[str, Any]
    topic: str

class RetrievalEvent(Event):
    retrieved_data: Dict[str, Any]
    plan: Dict[str, Any]

class WritingEvent(Event):
    content: str
    metadata: Dict[str, Any]

class ReviewEvent(Event):
    reviewed_content: str
    suggestions: list

class FormattingEvent(Event):
    formatted_content: str
    format_type: str

# 博客写作工作流
class BlogWritingWorkflow(Workflow):
    def __init__(self, service_context, agents_config):
        super().__init__()
        self.service_context = service_context
        self.agents_config = agents_config
        
        # 初始化各个专业Agent
        self._initialize_agents()
    
    def _initialize_agents(self):
        """初始化各个专业Agent"""
        # 规划Agent
        self.planning_agent = OpenAIAgent.from_tools(
            tools=[],
            llm=self.service_context.llm,
            system_prompt="你是一个专业的内容规划专家,负责分析主题并制定写作计划。"
        )
        
        # 检索Agent工具
        retrieval_tools = self._create_retrieval_tools()
        self.retrieval_agent = OpenAIAgent.from_tools(
            tools=retrieval_tools,
            llm=self.service_context.llm,
            system_prompt="你是一个信息检索专家,负责从多个数据源获取相关信息。"
        )
        
        # 写作Agent
        self.writing_agent = OpenAIAgent.from_tools(
            tools=[],
            llm=self.service_context.llm,
            system_prompt="你是一个专业的技术写作专家,负责创作高质量的技术博客内容。"
        )
        
        # 审校Agent
        self.review_agent = OpenAIAgent.from_tools(
            tools=[],
            llm=self.service_context.llm,
            system_prompt="你是一个专业的内容审校专家,负责检查内容质量和准确性。"
        )
        
        # 格式化Agent
        self.formatting_agent = OpenAIAgent.from_tools(
            tools=[],
            llm=self.service_context.llm,
            system_prompt="你是一个专业的内容格式化专家,负责将内容转换为指定格式。"
        )
    
    def _create_retrieval_tools(self):
        """创建检索工具"""
        # 这里可以添加各种检索工具
        return []
    
    @step
    async def planning_step(self, ctx: Context, ev: StartEvent) -> PlanningEvent:
        """规划步骤"""
        topic = ev.topic
        requirements = ev.requirements
        
        planning_prompt = f"""
        请为主题"{topic}"制定详细的写作计划,包括:
        1. 文章结构大纲
        2. 每个章节的核心内容
        3. 需要检索的信息类型
        4. 目标受众分析
        
        要求:{requirements}
        """
        
        response = await self.planning_agent.achat(planning_prompt)
        plan = self._parse_plan(response.response)
        
        ctx.data["planning_result"] = {
            "plan": plan,
            "timestamp": datetime.now(),
            "agent": "planning"
        }
        
        return PlanningEvent(plan=plan, topic=topic)
    
    @step
    async def retrieval_step(self, ctx: Context, ev: PlanningEvent) -> RetrievalEvent:
        """检索步骤"""
        plan = ev.plan
        topic = ev.topic
        
        retrieval_prompt = f"""
        基于以下写作计划,请检索相关信息:
        主题:{topic}
        计划:{plan}
        
        请使用可用的检索工具获取最新和最权威的信息。
        """
        
        response = await self.retrieval_agent.achat(retrieval_prompt)
        retrieved_data = self._parse_retrieval_result(response.response)
        
        ctx.data["retrieval_result"] = {
            "data": retrieved_data,
            "timestamp": datetime.now(),
            "agent": "retrieval"
        }
        
        return RetrievalEvent(retrieved_data=retrieved_data, plan=plan)
    
    @step
    async def writing_step(self, ctx: Context, ev: RetrievalEvent) -> WritingEvent:
        """写作步骤"""
        plan = ev.plan
        retrieved_data = ev.retrieved_data
        
        writing_prompt = f"""
        基于以下信息撰写技术博客:
        
        写作计划:{plan}
        检索信息:{retrieved_data}
        
        要求:
        1. 结构清晰,逻辑严密
        2. 包含代码示例和实际案例
        3. 语言专业但易懂
        4. 添加适当的技术细节
        """
        
        response = await self.writing_agent.achat(writing_prompt)
        content = response.response
        
        ctx.data["writing_result"] = {
            "content": content,
            "timestamp": datetime.now(),
            "agent": "writing"
        }
        
        return WritingEvent(
            content=content,
            metadata={"sources": retrieved_data, "plan": plan}
        )
    
    @step
    async def review_step(self, ctx: Context, ev: WritingEvent) -> ReviewEvent:
        """审校步骤"""
        content = ev.content
        metadata = ev.metadata
        
        review_prompt = f"""
        请审校以下技术博客内容:
        
        {content}
        
        审校要点:
        1. 技术准确性
        2. 逻辑连贯性
        3. 语言表达
        4. 结构完整性
        5. 引用规范性
        
        请提供修改建议和最终版本。
        """
        
        response = await self.review_agent.achat(review_prompt)
        reviewed_content, suggestions = self._parse_review_result(response.response)
        
        ctx.data["review_result"] = {
            "content": reviewed_content,
            "suggestions": suggestions,
            "timestamp": datetime.now(),
            "agent": "review"
        }
        
        return ReviewEvent(
            reviewed_content=reviewed_content,
            suggestions=suggestions
        )
    
    @step
    async def formatting_step(self, ctx: Context, ev: ReviewEvent) -> StopEvent:
        """格式化步骤"""
        content = ev.reviewed_content
        
        formatting_prompt = f"""
        请将以下内容格式化为标准的Markdown格式:
        
        {content}
        
        要求:
        1. 添加适当的标题层级
        2. 格式化代码块
        3. 添加目录
        4. 优化排版
        """
        
        response = await self.formatting_agent.achat(formatting_prompt)
        formatted_content = response.response
        
        ctx.data["formatting_result"] = {
            "content": formatted_content,
            "timestamp": datetime.now(),
            "agent": "formatting"
        }
        
        # 返回最终结果
        return StopEvent(
            result={
                "final_content": formatted_content,
                "workflow_data": ctx.data,
                "success": True
            }
        )
    
    def _parse_plan(self, plan_text):
        """解析规划结果"""
        # 实现计划解析逻辑
        return {"outline": plan_text, "sections": []}
    
    def _parse_retrieval_result(self, retrieval_text):
        """解析检索结果"""
        # 实现检索结果解析逻辑
        return {"sources": [], "content": retrieval_text}
    
    def _parse_review_result(self, review_text):
        """解析审校结果"""
        # 实现审校结果解析逻辑
        return review_text, []

# 工作流管理器
class WorkflowManager:
    def __init__(self, service_context):
        self.service_context = service_context
        self.active_workflows = {}
    
    async def start_blog_writing(self, topic, requirements):
        """启动博客写作工作流"""
        workflow = BlogWritingWorkflow(
            self.service_context,
            agents_config={}
        )
        
        # 启动工作流
        result = await workflow.run(
            topic=topic,
            requirements=requirements
        )
        
        return result
    
    async def get_workflow_status(self, workflow_id):
        """获取工作流状态"""
        if workflow_id in self.active_workflows:
            return self.active_workflows[workflow_id].get_status()
        return None

5. 部署配置

5.1 Docker容器化部署

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

5.2 Kubernetes部署配置

# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: agentic-rag-blog
spec:
  replicas: 3
  selector:
    matchLabels:
      app: agentic-rag-blog
  template:
    metadata:
      labels:
        app: agentic-rag-blog
    spec:
      containers:
      - name: blog-agent
        image: agentic-rag-blog:latest
        ports:
        - containerPort: 8000
        env:
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: api-secrets
              key: openai-key
        resources:
          requests:
            memory: "2Gi"
            cpu: "1000m"
          limits:
            memory: "4Gi"
            cpu: "2000m"

5.3 环境配置

# config.yaml
system:
  max_concurrent_agents: 10
  timeout_seconds: 300
  retry_attempts: 3

agents:
  intent_understanding:
    enabled: true
    confidence_threshold: 0.7
    context_window: 5
    optimization_strategies:
      - "topic_clarification"
      - "audience_identification"
      - "content_structure_suggestion"
  
  coordinator:
    enabled: true
    max_tasks: 100
    intent_integration: true
  
  planning:
    enabled: true
    outline_depth: 3
    intent_aware: true
  
  retrieval:
    enabled: true
    sources:
      - web
      - academic
      - knowledge_base
    max_results_per_source: 20
  
  writing:
    enabled: true
    model: "gpt-4"
    temperature: 0.7
    max_tokens: 2000
  
  style:
    enabled: true
    default_style: "technical"
  
  citation:
    enabled: true
    format: "APA"
  
  image_generation:
    enabled: true
    providers:
      - "dalle-3"
      - "midjourney"
  
  review:
    enabled: true
    quality_threshold: 0.8
  
  formatting:
    enabled: true
    output_formats:
      - "markdown"
      - "html"

databases:
  vector_store:
    type: "chromadb"
    url: "http://chromadb:8000"
  
  cache:
    type: "redis"
    url: "redis://redis:6379"
  
  metadata:
    type: "postgresql"
    url: "postgresql://user:pass@postgres:5432/blogdb"

apis:
  openai:
    base_url: "https://api.openai.com/v1"
    model: "gpt-4"
  
  search:
    google:
      api_key: "${GOOGLE_API_KEY}"
    bing:
      api_key: "${BING_API_KEY}"
  
  academic:
    arxiv:
      base_url: "http://export.arxiv.org/api/query"
    semantic_scholar:
      api_key: "${SEMANTIC_SCHOLAR_KEY}"

6. 使用示例

6.1 API调用示例

# 客户端调用示例
import asyncio
from agentic_rag_client import BlogAgentClient

async def main():
    client = BlogAgentClient(base_url="http://localhost:8000")
    
    # 示例1: 明确的需求
    task1 = await client.create_blog_task({
        "topic": "微服务架构设计模式",
        "language": "zh",
        "target_audience": "senior_developers",
        "length": "long",
        "include_code_examples": True,
        "include_diagrams": True,
        "style": "technical_tutorial"
    })
    
    # 示例2: 模糊的用户输入(展示意图理解能力)
    user_input = "我想了解一下那个什么分布式的东西,就是能让系统更稳定的"
    
    # 系统会自动进行意图理解和需求优化
    task2 = await client.create_blog_task_from_natural_input({
        "user_input": user_input,
        "context": {
            "user_level": "beginner",
            "previous_topics": ["基础架构", "单体应用"]
        }
    })
    
    # 意图理解结果示例
    intent_analysis = await client.get_intent_analysis(task2.id)
    print(f"识别的意图: {intent_analysis.intent}")
    print(f"优化后的主题: {intent_analysis.optimized_topic}")
    print(f"建议的内容结构: {intent_analysis.suggested_structure}")
    
    # 监控任务进度(以task1为例)
    while not task1.is_complete():
        status = await client.get_task_status(task1.id)
        print(f"进度: {status.progress}% - 当前阶段: {status.current_stage}")
        await asyncio.sleep(5)
    
    # 获取最终结果
    result = await client.get_task_result(task1.id)
    print("博客生成完成!")
    print(f"标题: {result.title}")
    print(f"字数: {result.word_count}")
    print(f"质量评分: {result.quality_score}")
    
    # 保存到文件
    with open(f"{result.title}.md", "w", encoding="utf-8") as f:
        f.write(result.content)
    
    # 处理意图理解任务的结果
    result2 = await client.get_task_result(task2.id)
    print(f"\n意图理解任务完成!")
    print(f"原始输入: {user_input}")
    print(f"生成的标题: {result2.title}")
    print(f"内容适配程度: {result2.adaptation_score}")

if __name__ == "__main__":
    asyncio.run(main())

6.2 输出示例

生成的博客将包含以下结构化内容:

# 微服务架构设计模式:构建可扩展分布式系统的最佳实践

## 摘要
本文深入探讨微服务架构的核心设计模式...

## 目录
1. [微服务架构概述](#微服务架构概述)
2. [核心设计模式](#核心设计模式)
3. [实现最佳实践](#实现最佳实践)
...

7. 总结

本设计提供了一个完整的、可扩展的技术博客撰写系统,通过多Agent协作实现了从需求分析到内容发布的全流程自动化。系统具备强大的RAG检索能力、多源数据融合、智能写作辅助和质量控制等核心功能,能够生成高质量的技术博客内容。

主要优势:

  1. 模块化设计:每个Agent职责明确,易于维护和扩展
  2. 多源检索:整合Web、学术和私有数据源
  3. 质量保证:多层次质量控制机制
  4. 多语言支持:支持中英文双语输出
  5. 可扩展性:基于容器化部署,支持水平扩展

该系统可广泛应用于技术团队的内容创作、知识管理和对外技术分享等场景。