企业级多智能体 AI 系统

基于《多智能体 AI 系统基础:理论与框架》构建的生产级多智能体 AI 系统,集成现代 AI 工具链和企业级特性。

1. 项目概述

本项目是一个生产就绪的企业级多智能体 AI 系统,采用现代软件架构设计原则,实现了:

  • BDI 认知架构:完整的信念-愿望-意图循环实现
  • 智能体协作:专业化智能体间的高效协作机制
  • 工作流编排:基于 LangGraph 的复杂业务流程自动化
  • 全链路监控:LangSmith 集成的性能追踪和分析
  • 企业级特性:高可用、可扩展、可观测的系统设计

2. 核心特性

2.1 架构设计

  • BDI 认知架构:完整的信念-愿望-意图认知循环
  • 事件驱动架构:异步消息处理和响应式设计
  • 微服务架构:松耦合、高内聚的模块化设计
  • SOLID 原则:遵循面向对象设计最佳实践

2.2 智能体能力

  • 专业化智能体:研究、分析、客服等领域专家
  • 协作机制:智能体间的高效协作和任务分配
  • 插件化扩展:支持自定义智能体和能力扩展
  • 角色适配:动态角色切换和能力组合

2.3 技术栈

  • LangGraph 工作流:复杂业务流程的可视化编排
  • LangSmith 监控:全链路追踪和性能分析
  • FastAPI 框架:高性能异步 Web API
  • 容器化部署:Docker + Kubernetes 生产部署(demo 采用 docker compose)

2.4 企业级特性

  • 高可用性:故障恢复和负载均衡
  • 安全认证:API 密钥和访问控制
  • 可观测性:日志、指标、链路追踪
  • 高性能:异步处理和资源优化

3. 项目结构

multi_agent_system/
├── 📂 src/                           # 核心源代码
│   ├── 🤖 agents/                    # 智能体模块
│   │   ├── base_agent.py            # 🧠 BDI基础智能体架构
│   │   ├── research_agent.py        # 🔬 研究专家智能体
│   │   └── analysis_agent.py        # 📊 分析专家智能体
│   ├── 📡 communication/             # 通信中间件
│   │   └── message_bus.py           # 🚌 企业级消息总线
│   ├── 📊 monitoring/                # 监控与追踪
│   │   └── langsmith_integration.py # 📈 LangSmith 监控集成
│   ├── 🔄 workflows/                 # 工作流引擎
│   │   └── langgraph_workflow.py    # 🌊 LangGraph工作流编排
│   ├── 🎯 examples/                  # 应用示例
│   │   └── customer_service_system.py # 🎧 智能客服系统
│   ├── __init__.py                  # 模块初始化
│   └── 🚀 main.py                   # 主应用入口
├── 🧪 tests/                        # 测试套件
│   ├── test_system.py              # 🔍 系统集成测试
│   ├── test_interactions.py        # 🔄 交互功能测试
│   ├── test_api_server.py          # 🚀 API 服务器测试
│   └── test_api_client.py          # 📡 API 客户端测试
├── ⚙️ config.json                   # 系统配置文件
├── 📦 requirements.txt              # Python依赖清单
├── 🐳 Dockerfile                    # 容器化配置
├── 🐳 .dockerignore                 # Docker 忽略文件
├── 🐙 docker-compose.yml           # 多服务编排
└── 📖 README.md                     # 项目文档

3.1 目录说明

目录/文件 功能描述 关键特性
src/agents/ 智能体核心实现 BDI 架构、专业化能力、协作机制
src/communication/ 通信基础设施 消息总线、发布订阅、请求响应
src/monitoring/ 监控与追踪 LangSmith 集成、链路追踪、性能分析
src/workflows/ 工作流引擎 LangGraph 集成、流程编排、状态管理
src/examples/ 业务应用示例 智能客服、研究分析、最佳实践
tests/ 测试覆盖 单元测试、集成测试、性能测试、API 测试
config.json 配置管理 环境配置、API 密钥、系统参数

4. 系统架构

4.1 分层架构设计

graph TB
    subgraph "🖥️ 表示层"
        UI["🌐 Web界面"]
        API["🚀 REST API"]
        CLI["💻 命令行"]
    end

    subgraph "🎯 应用层"
        Gateway["🚪 API网关"]
        Auth["🔐 认证授权"]
        Router["🔀 请求路由"]
    end

    subgraph "🤖 智能体层"
        Orchestrator["🎼 智能体编排器"]
        Research["🔬 研究智能体"]
        Analysis["📊 分析智能体"]
        Customer["🎧 客服智能体"]
    end

    subgraph "🔄 工作流层"
        LangGraph["🌊 LangGraph引擎"]
        StateManager["📋 状态管理"]
        TaskQueue["📝 任务队列"]
    end

    subgraph "📡 通信层"
        MessageBus["🚌 消息总线"]
        PubSub["📢 发布订阅"]
        ReqRes["🔄 请求响应"]
    end

    subgraph "💾 数据层"
        Memory["🧠 内存存储"]
        Cache["⚡ 缓存层"]
        Persistence["💿 持久化"]
    end

    subgraph "📊 监控层"
        LangSmith["🔍 LangSmith"]
        Metrics["📈 指标收集"]
        Logging["📝 日志系统"]
    end

    UI --> Gateway
    API --> Gateway
    CLI --> Gateway

    Gateway --> Orchestrator
    Auth --> Orchestrator
    Router --> Orchestrator

    Orchestrator --> Research
    Orchestrator --> Analysis
    Orchestrator --> Customer

    Research --> LangGraph
    Analysis --> LangGraph
    Customer --> LangGraph

    LangGraph --> MessageBus
    StateManager --> MessageBus
    TaskQueue --> MessageBus

    MessageBus --> Memory
    PubSub --> Cache
    ReqRes --> Persistence

    LangSmith --> Metrics
    Metrics --> Logging

4.2 核心组件说明

层级 组件 职责 技术栈
表示层 Web 界面/API 用户交互、接口暴露 FastAPI, Streamlit
应用层 API 网关 请求路由、认证授权 FastAPI, JWT
智能体层 专业智能体 业务逻辑、AI 推理 LangChain, OpenAI
工作流层 LangGraph 引擎 流程编排、状态管理 LangGraph, StateGraph
通信层 消息总线 异步通信、事件驱动 asyncio, Queue
数据层 存储管理 数据持久化、缓存 Redis, PostgreSQL
监控层 可观测性 追踪、指标、日志 LangSmith, Prometheus

5. 快速开始

5.1 一键启动

# 克隆项目
git clone <repository-url>
cd multi_agent_system

5.1.1 方式一:Docker Compose 启动(较复杂)

# 启动所有服务
docker-compose up -d

# 查看服务状态
docker-compose ps

可访问的服务:

服务 地址 用途
🌐 Web 界面 http://localhost:8501 Streamlit 用户界面
🚀 API 文档 http://localhost:8000/docs FastAPI Swagger 文档
📊 Grafana 面板 http://localhost:3000 监控面板 (admin/admin)
🔍 Prometheus http://localhost:9090 指标收集
📝 Jupyter Notebook http://localhost:8888 开发环境
🌐 Nginx 代理 http://localhost:80 反向代理

5.1.2 方式二:Python 本地启动(交互式方式)

# 安装依赖
pip install -r requirements.txt

# 启动主应用
python main.py

# 或指定参数启动
python main.py --log-level INFO --demo research
python main.py --demo customer_service
python main.py --metrics
python main.py --interactive

系统功能:

功能 描述 用途
🤖 多智能体系统 核心智能体框架 智能体协作、任务处理
📊 消息总线 智能体间通信 消息传递、事件处理
🔄 工作流引擎 LangGraph 集成 复杂任务编排
📈 监控追踪 LangSmith 集成 性能监控、调试追踪

注意: Python 本地启动方式启动多智能体系统核心服务,不包含 Web API 接口。如需 API 服务,请使用方式三的 API 服务器测试启动。

5.1.3 方式三:API 服务器测试启动

如果您需要专门启动 FastAPI 服务器进行 API 测试,可以使用以下命令:

# 激活虚拟环境
source venv/bin/activate

# 启动 API 服务器(专用于测试)
python tests/test_api_server.py

服务器功能:

  • 🌐 完整的 FastAPI Web 服务器
  • 📡 所有 RESTful API 接口
  • 📊 实时系统监控和指标
  • 🔍 自动生成的 API 文档
  • 🎯 健康检查和状态端点

可访问地址:

服务 地址 用途
🚀 API 服务 http://localhost:8000 主要 API 接口
🚀 API 文档 http://localhost:8000/docs Swagger 文档
📖 备用文档 http://localhost:8000/redoc ReDoc 文档
📊 健康检查 http://localhost:8000/health 系统状态
📈 系统指标 http://localhost:8000/metrics Prometheus 指标

适用场景:

  • API 接口开发和测试
  • 前端开发时的后端服务
  • API 文档查看和调试
  • 系统集成测试

5.2 环境配置

必需环境:

  • Python 3.11+
  • Docker(推荐)

API 密钥配置:

# 设置环境变量
export OPENAI_API_KEY="your-openai-key"
export LANGSMITH_API_KEY="your-langsmith-key"

# 或编辑 config.json
{
  "openai_api_key": "your-openai-key",
  "langsmith_api_key": "your-langsmith-key"
}

5.3 服务访问说明

根据启动方式不同,可访问的服务也有所区别:

Docker Compose 启动(完整服务):

Python 本地启动(核心服务):

5.4 验证安装

# 检查系统状态
curl http://localhost:8000/health

# 运行所有测试
python -m pytest tests/ -v

# 运行特定测试
python tests/test_interactions.py      # 基础交互测试
python tests/test_api_server.py        # 启动 API 服务器
python tests/test_api_client.py        # API 客户端测试

5.4.1 测试文件说明

项目包含完整的测试套件,验证系统的各个功能模块:

5.5.1 🔄 test_interactions.py - 基础交互测试

验证多智能体系统的核心交互功能:

# 运行基础交互测试
cd /path/to/multi_agent_system
source venv/bin/activate
python tests/test_interactions.py

测试内容:

  • ✅ 系统启动和关闭
  • ✅ 智能体创建和配置
  • ✅ 基础任务执行
  • ✅ 错误处理机制
  • ✅ 性能指标收集

5.5.2 🚀 test_api_server.py - API 服务器测试

启动完整的 FastAPI 服务器进行测试:

# 启动 API 服务器(阻塞模式)
python tests/test_api_server.py

服务器功能:

  • 🌐 FastAPI Web 服务器
  • 📡 RESTful API 接口
  • 📊 实时系统监控
  • 🔍 API 文档生成
  • 🎯 健康检查端点

可访问地址:

5.5.3 📡 test_api_client.py - API 客户端测试

验证所有 API 接口的功能:

# 运行 API 客户端测试(需要先启动服务器)
python tests/test_api_client.py

测试接口:

  • ✅ 健康检查 (GET /health)
  • ✅ 创建研究任务 (POST /tasks)
  • ✅ 查询任务状态 (GET /tasks/{task_id})
  • ✅ 数据分析 (POST /analysis)
  • ✅ 智能客服对话 (POST /chat)
  • ✅ 系统监控 (GET /metrics)

5.5.4 🔍 test_system.py - 系统集成测试

原有的系统集成测试,使用 pytest 框架:

# 运行 pytest 测试
python -m pytest tests/test_system.py -v

5.5.5 📋 测试执行顺序建议

  1. 基础功能验证

    python tests/test_interactions.py
    
  2. 启动 API 服务器(新终端):

    python tests/test_api_server.py
    
  3. 验证 API 接口(另一个新终端):

    python tests/test_api_client.py
    
  4. 运行完整测试套件

    python -m pytest tests/ -v
    

💡 提示:API 客户端测试需要先启动 API 服务器,建议在不同终端窗口中运行。

5.5 使用指南

5.5.1 核心功能

功能 接口 示例
🔬 研究任务 POST /tasks 市场分析、技术调研
📊 数据分析 POST /analysis 用户行为、趋势预测
🎧 智能客服 POST /chat 问答、工单处理
📈 系统监控 GET /metrics 性能指标、健康检查

6.2 API 使用示例

6.2.1 认证配置

环境变量设置:

# 设置 API 密钥
export OPENAI_API_KEY="your-openai-api-key"
export LANGSMITH_API_KEY="your-langsmith-api-key"

# 或使用配置文件
cp config.json.example config.json
# 编辑 config.json 文件,填入相应的 API 密钥

API 请求头:

# 所有 API 请求都需要包含以下请求头
-H "Content-Type: application/json"
-H "Accept: application/json"

6.2.2 研究任务 API

创建研究任务:

curl -X POST "http://localhost:8000/tasks" \
  -H "Content-Type: application/json" \
  -H "Accept: application/json" \
  -d '{
    "type": "research",
    "query": "人工智能在医疗领域的应用前景",
    "priority": "high",
    "context": {
      "domain": "healthcare",
      "timeframe": "2024-2025",
      "depth": "comprehensive"
    }
  }'

成功响应示例:

{
  "status": "success",
  "data": {
    "task_id": "task_12345",
    "type": "research",
    "status": "created",
    "priority": "high",
    "created_at": "2024-01-15T10:30:00Z",
    "estimated_completion": "2024-01-15T10:35:00Z"
  },
  "message": "研究任务创建成功"
}

查询任务状态:

curl -X GET "http://localhost:8000/tasks/task_12345" \
  -H "Accept: application/json"

任务状态响应示例:

{
  "status": "success",
  "data": {
    "task_id": "task_12345",
    "type": "research",
    "status": "completed",
    "progress": 100,
    "result": {
      "summary": "人工智能在医疗领域展现出巨大潜力...",
      "key_findings": [
        "AI 诊断准确率提升 15-20%",
        "药物研发周期缩短 30%",
        "医疗成本降低 25%"
      ],
      "recommendations": ["加强 AI 医疗监管框架建设", "推进医疗数据标准化"]
    },
    "created_at": "2024-01-15T10:30:00Z",
    "completed_at": "2024-01-15T10:34:23Z"
  }
}

6.2.3 数据分析 API

创建分析任务:

curl -X POST "http://localhost:8000/analysis" \
  -H "Content-Type: application/json" \
  -d '{
    "data_source": "user_behavior",
    "analysis_type": "trend_prediction",
    "parameters": {
      "time_range": "last_30_days",
      "metrics": ["engagement", "conversion", "retention"],
      "prediction_horizon": "next_7_days"
    }
  }'

分析结果响应示例:

{
  "status": "success",
  "data": {
    "analysis_id": "analysis_67890",
    "type": "trend_prediction",
    "results": {
      "current_trends": {
        "engagement": { "trend": "increasing", "rate": 12.5 },
        "conversion": { "trend": "stable", "rate": 3.2 },
        "retention": { "trend": "decreasing", "rate": -5.1 }
      },
      "predictions": {
        "next_7_days": {
          "engagement": { "predicted_change": "+8.3%", "confidence": 0.87 },
          "conversion": { "predicted_change": "+1.2%", "confidence": 0.92 },
          "retention": { "predicted_change": "-2.1%", "confidence": 0.79 }
        }
      },
      "insights": [
        "用户参与度持续上升,建议加大内容投入",
        "留存率下降需要关注,建议优化用户体验"
      ]
    },
    "generated_at": "2024-01-15T11:15:30Z"
  }
}

6.2.4 智能客服 API

发起客服对话:

curl -X POST "http://localhost:8000/chat" \
  -H "Content-Type: application/json" \
  -d '{
    "message": "我想了解你们的企业版定价方案",
    "user_id": "user_123456",
    "session_id": "session_abc789",
    "context": {
      "user_tier": "premium",
      "previous_interactions": 3,
      "preferred_language": "zh-CN"
    }
  }'

客服响应示例:

{
  "status": "success",
  "data": {
    "response": "您好!我很乐意为您介绍企业版定价方案。我们提供三个企业版套餐:\n\n1. **标准企业版**:¥999/月,支持 50 个智能体\n2. **专业企业版**:¥2999/月,支持 200 个智能体\n3. **旗舰企业版**:¥5999/月,无限智能体\n\n所有企业版都包含:7x24 技术支持、私有部署选项、定制化开发服务。\n\n您的团队规模大概是多少人呢?我可以为您推荐最适合的方案。",
    "intent": "pricing_inquiry",
    "sentiment": "neutral",
    "confidence": 0.94,
    "suggested_actions": [
      "schedule_demo",
      "send_pricing_details",
      "connect_sales_team"
    ],
    "session_id": "session_abc789",
    "timestamp": "2024-01-15T11:20:15Z"
  }
}

6.2.5 系统监控 API

获取系统指标:

curl -X GET "http://localhost:8000/metrics" \
  -H "Accept: application/json"

系统指标响应示例:

{
  "status": "success",
  "data": {
    "system_health": {
      "status": "healthy",
      "uptime": "72h 15m 30s",
      "version": "1.0.0"
    },
    "performance": {
      "cpu_usage": 45.2,
      "memory_usage": 68.7,
      "disk_usage": 23.1,
      "network_io": {
        "bytes_sent": 1024000,
        "bytes_received": 2048000
      }
    },
    "agents": {
      "total_agents": 12,
      "active_agents": 8,
      "idle_agents": 4,
      "average_response_time": "1.2s"
    },
    "tasks": {
      "total_tasks": 156,
      "completed_tasks": 142,
      "pending_tasks": 8,
      "failed_tasks": 6,
      "success_rate": 91.0
    },
    "timestamp": "2024-01-15T11:25:00Z"
  }
}

6.2.6 错误处理

常见错误响应格式:

{
  "status": "error",
  "error": {
    "code": "INVALID_REQUEST",
    "message": "请求参数无效",
    "details": {
      "field": "query",
      "issue": "查询内容不能为空"
    }
  },
  "timestamp": "2024-01-15T11:30:00Z"
}

HTTP 状态码说明:

状态码 含义 说明
200 成功 请求处理成功
201 已创建 资源创建成功
400 请求错误 请求参数无效
401 未授权 API 密钥无效或缺失
403 禁止访问 权限不足
404 未找到 资源不存在
429 请求过多 超出速率限制
500 服务器错误 内部服务器错误
503 服务不可用 服务暂时不可用

6.3 Python SDK

6.3.1 基础使用

from src.main import MultiAgentSystem
import asyncio
import json

async def main():
    # 初始化系统(使用默认配置)
    system = MultiAgentSystem()

    try:
        # 启动系统
        await system.start()
        print("✅ 多智能体系统启动成功")

        # 等待系统完全初始化
        await asyncio.sleep(2)

        # 运行示例任务
        await run_examples(system)

    except Exception as e:
        print(f"❌ 系统错误:{e}")
    finally:
        # 优雅关闭系统
        await system.shutdown()
        print("🔄 系统已关闭")

async def run_examples(system):
    """运行各种示例任务"""

    # 1. 研究任务示例
    print("\n🔬 执行研究任务...")
    research_result = await system.run_research_demo()
    print(f"研究结果:{research_result}")

    # 2. 客服系统示例
    print("\n🎧 执行客服对话...")
    customer_result = await system.run_customer_service_demo()
    print(f"客服结果:{customer_result}")

    # 3. 系统指标查看
    print("\n📊 查看系统指标...")
    await system.show_metrics()

if __name__ == "__main__":
    asyncio.run(main())

6.3.2 自定义配置

import json
from pathlib import Path
from src.main import MultiAgentSystem

# 加载自定义配置
config_path = Path("config.json")
with open(config_path, 'r', encoding='utf-8') as f:
    custom_config = json.load(f)

# 修改配置参数
custom_config.update({
    "system": {
        "name": "My Custom AI System",
        "log_level": "DEBUG",
        "debug": True
    },
    "agents": {
        "max_concurrent_tasks": 10,
        "default_timeout": 60
    }
})

# 使用自定义配置初始化系统
system = MultiAgentSystem(config=custom_config)

6.3.3 智能体交互

from src.agents.research_agent import ResearchAgent
from src.agents.analysis_agent import AnalysisAgent
from src.communication.message_bus import Message, MessageType, MessagePriority

async def agent_interaction_example(system):
    """智能体交互示例"""

    # 获取已注册的智能体
    research_agent = system.agents.get("research_agent")
    analysis_agent = system.agents.get("analysis_agent")

    if research_agent and analysis_agent:
        # 创建研究任务
        research_task = {
            "query": "2024年人工智能发展趋势",
            "domain": "technology",
            "depth": "comprehensive"
        }

        # 执行研究
        research_result = await research_agent.execute(research_task)
        print(f"🔬 研究结果:{research_result.data}")

        # 将研究结果传递给分析智能体
        analysis_task = {
            "data": research_result.data,
            "analysis_type": "trend_analysis",
            "output_format": "structured"
        }

        # 执行分析
        analysis_result = await analysis_agent.execute(analysis_task)
        print(f"📊 分析结果:{analysis_result.data}")

        return {
            "research": research_result.data,
            "analysis": analysis_result.data
        }

6.3.4 工作流编排

from src.workflows.langgraph_workflow import EnterpriseWorkflowEngine, EnhancedAgentState

async def workflow_example(system):
    """工作流编排示例"""

    # 创建工作流引擎
    workflow_engine = EnterpriseWorkflowEngine(
        tracer=system.tracer,
        message_bus=system.message_bus
    )

    # 定义初始状态
    initial_state = EnhancedAgentState(
        task_id="workflow_001",
        task_type="comprehensive_analysis",
        input_data={
            "topic": "企业数字化转型策略",
            "requirements": [
                "市场调研",
                "竞争分析",
                "技术评估",
                "实施建议"
            ]
        },
        context={"priority": "high", "deadline": "2024-01-20"}
    )

    # 执行工作流
    try:
        final_state = await workflow_engine.execute(initial_state)
        print(f"🌊 工作流执行完成:{final_state.output_data}")
        return final_state.output_data
    except Exception as e:
        print(f"❌ 工作流执行失败:{e}")
        return None

6.3.5 消息总线使用

from src.communication.message_bus import MessageBus, Message, MessageType, MessagePriority

async def message_bus_example(system):
    """消息总线使用示例"""

    message_bus = system.message_bus

    # 订阅消息
    async def task_completed_handler(message: Message):
        print(f"📨 收到任务完成消息:{message.payload}")

    await message_bus.subscribe("task.completed", task_completed_handler)

    # 发布消息
    completion_message = Message(
        type=MessageType.EVENT,
        topic="task.completed",
        payload={
            "task_id": "task_123",
            "status": "success",
            "result": "任务执行成功"
        },
        priority=MessagePriority.NORMAL,
        sender="system"
    )

    await message_bus.publish(completion_message)

    # 等待消息处理
    await asyncio.sleep(1)

6.3.6 性能监控

from src.monitoring.langsmith_integration import EnterpriseTracing, TraceLevel

async def monitoring_example(system):
    """性能监控示例"""

    tracer = system.tracer

    # 创建追踪会话
    with tracer.trace_session("custom_operation", TraceLevel.INFO) as session:
        # 执行被监控的操作
        session.log("开始执行自定义操作")

        # 模拟一些工作
        await asyncio.sleep(2)

        session.log("操作执行完成")
        session.add_metadata({
            "operation_type": "custom",
            "duration": "2s",
            "status": "success"
        })

    # 获取性能指标
    metrics = await system._collect_system_metrics()
    print(f"📈 系统性能指标:{json.dumps(metrics, indent=2, ensure_ascii=False)}")

6.3.7 错误处理和重试

import asyncio
from typing import Optional, Dict, Any

class AgentTaskManager:
    """智能体任务管理器"""

    def __init__(self, system: MultiAgentSystem):
        self.system = system
        self.max_retries = 3
        self.retry_delay = 1.0

    async def execute_with_retry(self,
                               agent_id: str,
                               task_data: Dict[str, Any],
                               max_retries: Optional[int] = None) -> Optional[Dict[str, Any]]:
        """带重试机制的任务执行"""

        max_retries = max_retries or self.max_retries

        for attempt in range(max_retries + 1):
            try:
                agent = self.system.agents.get(agent_id)
                if not agent:
                    raise ValueError(f"智能体 {agent_id} 不存在")

                result = await agent.execute(task_data)

                if result.success:
                    print(f"✅ 任务执行成功(尝试 {attempt + 1}/{max_retries + 1})")
                    return result.data
                else:
                    raise Exception(f"任务执行失败:{result.error}")

            except Exception as e:
                print(f"❌ 尝试 {attempt + 1}/{max_retries + 1} 失败:{e}")

                if attempt < max_retries:
                    await asyncio.sleep(self.retry_delay * (2 ** attempt))  # 指数退避
                else:
                    print(f"🚫 任务最终失败,已达到最大重试次数")
                    return None

# 使用示例
async def retry_example(system):
    task_manager = AgentTaskManager(system)

    result = await task_manager.execute_with_retry(
        agent_id="research_agent",
        task_data={
            "query": "量子计算发展现状",
            "timeout": 30
        },
        max_retries=5
    )

    if result:
        print(f"🎯 最终结果:{result}")
    else:
        print("💥 任务执行失败")

7. 监控和调试

Docker Compose 启动时可用:

Python 本地启动时可用:


💡 提示:本项目基于《多智能体 AI 系统基础:理论与框架》实现,展示了理论到实践的完整转化过程。