Agentic AI与提示工程:构建智能城市的技术基石与实践指南

副标题:从理论架构到落地案例,探索AI驱动的未来城市建设路径

摘要/引言

问题陈述:智能城市的“最后一公里”困境

当我们谈论“智能城市”时,脑海中往往浮现出这样的画面:自动驾驶车辆有序穿梭,能源系统根据需求动态调配,公共服务主动响应市民需求……然而,现实中的智能城市建设却常常陷入“数据孤岛”与“系统刚性”的泥潭:交通管理系统无法与能源调度实时协同,环境监测数据难以转化为应急决策,市民服务仍停留在“被动响应”而非“主动感知”。核心痛点在于缺乏一种能够自主协同、动态决策、持续进化的智能中枢,以及让这种中枢精准理解城市复杂需求的“语言桥梁”。

核心方案:Agentic AI与提示工程的双向赋能

本文提出的解决方案是**“Agentic AI+提示工程”双轮驱动架构**:

  • Agentic AI(智能体AI):通过具备自主性、目标导向性、社交能力的智能体网络,实现城市各系统的分布式协同与自主决策。每个智能体专注于特定领域(如交通、能源、安全),并能跨领域协作解决复杂问题。
  • 提示工程(Prompt Engineering):作为智能体的“语言工程师”,通过精准设计的提示词(Prompts),让智能体明确目标、理解上下文、协调交互规则,确保AI系统的输出可控、高效、贴合城市实际需求。

主要成果/价值

读完本文后,你将获得:

  1. 理论认知:深入理解Agentic AI的核心架构与提示工程的设计原则;
  2. 实践能力:掌握智能城市场景下多智能体系统的设计与提示词开发方法;
  3. 落地案例:通过交通流量优化、应急响应调度等实战案例,学会将技术转化为城市治理效能;
  4. 未来视野:洞察Agentic AI与提示工程在智能城市中的演进方向与潜在突破点。

文章导览

本文将分为四大部分展开:

  • 第一部分(引言与基础):解析智能城市的现状与挑战,铺垫Agentic AI与提示工程的核心概念;
  • 第二部分(核心内容):从理论到实践,详解智能体架构设计、提示工程方法论及系统实现步骤;
  • 第三部分(验证与扩展):通过案例验证技术有效性,探讨性能优化与最佳实践;
  • 第四部分(总结与展望):总结核心成果,展望未来技术融合与城市智能化的演进路径。

目标读者与前置知识

目标读者

  • AI工程师:希望将Agentic AI应用于实际场景的算法开发者;
  • 解决方案架构师:负责智能城市系统设计的技术决策者;
  • 提示工程架构师:专注于AI系统交互设计与输出控制的专业人员;
  • 城市治理技术管理者:关注如何通过AI提升城市运行效率的从业者。

前置知识

  • 基础AI概念:了解大语言模型(LLM)、深度学习基本原理;
  • 软件架构基础:熟悉分布式系统、微服务架构设计思想;
  • 编程能力:掌握Python编程,了解API开发(如FastAPI/Flask);
  • 工具链认知:对向量数据库(如Milvus)、消息队列(如RabbitMQ)有基本了解;
  • 智能城市场景认知:了解交通管理、能源调度、公共安全等典型城市治理场景。

文章目录

  1. 引言与基础

    • 问题背景与动机:智能城市的“智能”瓶颈
    • 核心概念与理论基础:Agentic AI与提示工程的协同逻辑
  2. 核心内容

    • 环境准备:开发工具与技术栈选型
    • 智能体架构设计:从单智能体到多智能体协同
    • 提示工程方法论:为智能城市智能体“精准导航”
    • 分步实现案例:交通流量优化多智能体系统
  3. 验证与扩展

    • 结果展示与验证:从模拟数据到真实场景反馈
    • 性能优化与最佳实践:提升智能体系统稳定性与效率
    • 常见问题与解决方案:智能体冲突、提示词失效等痛点应对
  4. 总结与展望

    • 核心成果回顾:技术赋能城市治理的关键路径
    • 未来展望:具身智能、联邦学习与城市数字孪生的融合

第一部分:引言与基础

问题背景与动机:智能城市的“智能”瓶颈

智能城市建设的现状与挑战

全球已有超1000个城市提出“智能城市”规划,但多数项目仍停留在“数字化”而非“智能化”阶段。根据Gartner 2023年报告,70%的智能城市项目因“系统协同不足”和“AI应用空转”未能达到预期目标。具体表现为:

1. 数据孤岛严重,跨部门协同困难

交通、能源、环保等部门分别建设独立系统,数据格式不统一,难以共享。例如:某城市交通部门的“拥堵预警系统”与交警的“信号控制平台”无法实时联动,导致预警信息无法转化为信号配时调整。

2. 决策依赖人工,响应滞后

多数“智能系统”仅能完成数据采集与展示(如监控大屏),复杂决策仍需人工介入。例如:突发自然灾害时,需要人工协调消防、医疗、交通等部门资源,耗时长达数小时,错过最佳响应时机。

3. 系统刚性强,难以适应动态需求

传统软件系统按固定规则运行,无法应对城市的动态变化(如大型活动、极端天气)。例如:节假日景区人流激增时,原有的人流监控系统因阈值固定,无法提前预警和疏导。

4. 用户体验同质化,缺乏个性化服务

市民服务系统(如政务APP)多采用“一刀切”模式,无法根据市民年龄、需求(如老年人无障碍服务、上班族通勤优化)提供差异化方案。

现有解决方案的局限性

为解决上述问题,行业曾尝试两种路径,但均存在明显短板:

1. 集中式AI平台(“大脑”模式)
  • 做法:构建一个“城市大脑”,集中处理所有数据并统一决策。
  • 缺陷:单点故障风险高,数据处理延迟大,难以适应城市的分布式场景;且“大脑”决策逻辑复杂,一旦出现偏差,影响范围广。
2. 单一功能AI模块(“工具”模式)
  • 做法:为每个场景开发独立AI工具(如交通预测模型、能源优化算法)。
  • 缺陷:工具间缺乏协同,无法解决跨领域问题(如“交通拥堵加剧能源消耗”);且AI输出依赖人工解读,难以直接驱动执行。

Agentic AI+提示工程:破局之道

Agentic AI与提示工程的结合,恰好弥补了上述不足:

  • Agentic AI:通过分布式智能体网络,实现“去中心化协同”——每个智能体自主处理领域内问题,并通过通信协议跨领域协作,避免单点故障;
  • 提示工程:通过精准提示词定义智能体的目标、规则和交互方式,让AI系统“理解”城市需求的复杂性(如“高峰期交通优先保障救护车通行”),并输出可直接执行的决策。

核心概念与理论基础

Agentic AI:智能体的“城市劳动者”

1. 智能体(Agent)的定义与核心特征

智能体是指能够感知环境、自主决策、执行动作以实现目标的AI实体。在智能城市中,智能体需具备以下特征:

特征 定义 城市场景示例
自主性 无需人工干预,根据环境数据独立制定行动计划 交通智能体自主调整红绿灯配时
目标导向 围绕明确目标(如“降低早高峰拥堵率15%”)优化行为 能源智能体以“单日能耗降低10%”为目标
社交能力 与其他智能体通信、协作,共同解决跨领域问题 交通智能体与医疗智能体协同调度救护车
学习与适应 通过反馈数据持续优化决策模型,适应城市动态变化 人流智能体根据节假日数据调整预警阈值
记忆能力 存储历史交互信息与决策经验,支持长周期任务规划 应急智能体调用历史灾害响应案例优化方案
2. 多智能体系统(MAS:Multi-Agent System)

智能城市是典型的多智能体系统,不同领域的智能体通过通信协议(如KQML、FIPA ACL)协同工作。其架构可分为三层:

  • 感知层:数据采集智能体(如摄像头、传感器数据处理Agent);
  • 决策层:业务智能体(如交通、能源、安全Agent);
  • 执行层:控制智能体(如信号控制、设备调度Agent)。

示例:暴雨天气下的多智能体协同流程

  1. 环境感知智能体:监测降雨量、积水深度,发送预警给交通与应急智能体;
  2. 交通智能体:调整红绿灯配时,优先放行排水车辆;
  3. 应急智能体:调度救援队伍,通知受影响区域市民;
  4. 能源智能体:暂时切断积水区域的供电,避免触电事故。
3. 智能体的内部架构(BDI模型)

为实现上述特征,单个智能体可采用BDI(信念-愿望-意图)模型设计:

  • 信念(Belief):智能体对环境的认知(如“当前早高峰车流量800辆/小时”);
  • 愿望(Desire):智能体的目标集合(如“降低拥堵率”“缩短通勤时间”);
  • 意图(Intention):从愿望中选择的优先行动方案(如“调整东西向绿灯时长至90秒”)。

代码逻辑示例(简化版BDI模型):

class TrafficAgent:  
    def __init__(self):  
        self.beliefs = {"current_flow": 0, "weather": "sunny"}  # 环境认知  
        self.desires = ["minimize_delay", "ensure_safety"]       # 目标集合  
        self.intentions = []                                    # 行动计划  

    def update_beliefs(self, sensor_data):  
        # 从传感器更新环境认知(如车流量、天气)  
        self.beliefs["current_flow"] = sensor_data["flow"]  
        self.beliefs["weather"] = sensor_data["weather"]  

    def select_desires(self):  
        # 根据环境选择优先目标(如雨天优先“安全”)  
        if self.beliefs["weather"] == "rainy":  
            return ["ensure_safety", "minimize_delay"]  
        return ["minimize_delay", "ensure_safety"]  

    def form_intentions(self):  
        # 根据优先目标生成行动计划  
        prioritized_desires = self.select_desires()  
        if prioritized_desires[0] == "minimize_delay":  
            self.intentions = self.generate_traffic_light_plan()  
        else:  
            self.intentions = self.generate_safety_plan()  

    def execute(self):  
        # 执行行动计划(如调整红绿灯)  
        for action in self.intentions:  
            self.send_command_to_traffic_light(action)  

提示工程:智能体的“语言教练”

1. 提示工程的定义与核心价值

提示工程是通过设计高质量提示词(Prompts),引导AI系统生成符合预期的输出。在智能城市中,提示工程的核心价值在于:

  • 目标校准:让智能体明确“城市治理目标”(如“公共交通优先于私家车”);
  • 规则约束:定义智能体的行为边界(如“不得泄露市民隐私数据”);
  • 上下文理解:帮助智能体处理复杂场景(如“大型活动期间,兼顾人流疏导与商业需求”);
  • 交互协调:规范多智能体的通信语言(如“用标准化JSON格式发送协作请求”)。
2. 智能城市场景下的提示工程原则

针对智能城市的复杂性,提示词设计需遵循以下原则:

原则1:场景化与具体化

避免模糊表述,需明确时间、地点、条件、目标四要素。

  • ❌ 反面示例:“优化交通流量”;
  • ✅ 正面示例:“在工作日7:00-9:00早高峰期间,针对 downtown区域主干道,将平均通行速度提升20%,同时保障公交车优先通行”。
原则2:多角色协同提示

当多个智能体协作时,需在提示中明确角色分工、通信协议、冲突解决规则

  • 示例(交通与医疗智能体协同提示):
你是交通调度智能体,当收到医疗智能体发送的急救车辆调度请求(格式:{"vehicle_id": "Ambulance-01", "route": "A街→B医院"})时,需:  
1. 立即暂停当前红绿灯配时优化任务;  
2. 规划急救车辆优先通行路线,调整沿线红绿灯为绿灯;  
3. 通过{"status": "completed", "estimated_time": X分钟}格式反馈医疗智能体;  
4. 若遇交通拥堵无法优化,发送{"status": "failed", "reason": "..."}并请求交警智能体协助。  
原则3:动态上下文嵌入

城市场景具有实时性,提示词需嵌入动态数据(如当前时间、实时流量、天气情况)。

  • 示例(结合实时数据的提示模板):
当前时间:{{current_time}},天气:{{weather}},区域:{{district}},实时车流量:{{traffic_flow}}辆/小时。  
你的任务是:基于上述数据,预测未来30分钟车流量变化,并生成红绿灯配时方案。输出格式为JSON:{"direction": ["东-西", "南-北"], "green_time": [90, 60]}。  
原则4:安全与伦理约束

必须在提示中明确禁止行为,避免智能体决策危害公共安全。

  • 示例(安全约束提示):
你的所有决策必须满足:  
1. 不违反交通法规(如红灯时长不得超过120秒);  
2. 优先保障急救车辆、消防车通行;  
3. 避免因配时调整导致行人过街等待超过90秒;  
4. 若无法同时满足多个目标,优先选择“降低事故风险”而非“提升通行效率”。  
3. 提示工程架构师的核心能力

提示工程架构师是连接“城市需求”与“AI能力”的桥梁,需具备:

  • 领域知识:深入理解智能城市各场景的业务逻辑(如交通信号配时规则、能源调度优先级);
  • AI理解力:熟悉LLM的特性(如上下文窗口大小、推理能力边界);
  • 系统思维:能设计端到端提示流程(从需求分析→提示设计→效果验证→迭代优化);
  • 沟通能力:协调城市管理者、AI工程师、智能体开发者,将业务需求转化为技术提示。

Agentic AI与提示工程的协同机制

Agentic AI与提示工程并非独立存在,而是形成“智能体执行+提示工程调控”的闭环:

  1. 提示词定义智能体的“基因”:通过初始化提示(Initial Prompt)定义智能体的角色、目标、能力边界(如“你是负责 downtown区域的交通智能体,目标是降低早高峰拥堵率”);
  2. 智能体执行中动态调用提示:在决策过程中,智能体通过工具调用(Tool Calling)获取实时数据,结合动态提示生成行动计划(如根据实时车流量调整提示中的优化参数);
  3. 反馈提示驱动迭代优化:执行结果(如“拥堵率降低10%”)通过反馈提示(Feedback Prompt)输入智能体,用于更新信念(Beliefs)和优化意图(Intentions)。

协同流程图

┌─────────────┐     初始化提示     ┌─────────────┐  
│ 城市需求    ├────────────────────► 智能体(BDI模型)│  
└─────────────┘                     └──────┬──────┘  
                                           │  
                   执行行动                 ▼  
市民/设备 ◄───────────────────────── 执行层智能体  
                                           ▲  
                                           │  
┌─────────────┐     反馈提示       ┌──────┴──────┐  
│ 结果数据    ├────────────────────► 优化模块     │  
└─────────────┘                     └─────────────┘  

第二部分:核心内容

环境准备:工具链与开发环境

核心技术栈选型

构建智能城市多智能体系统需以下工具链,本文以开源方案为主:

工具类型 推荐方案 用途说明
智能体框架 LangChain、MetaGPT、AutoGPT 快速搭建智能体架构,支持工具调用与记忆管理
LLM模型 GPT-4(精度优先)、Llama 3-70B(开源优先) 作为智能体的“大脑”,负责推理与决策
向量数据库 Milvus、Chroma 存储智能体的记忆数据(如历史交互记录)
消息队列 RabbitMQ、Celery 实现多智能体之间的异步通信
API开发 FastAPI 构建智能体与物理设备(如红绿灯、传感器)的接口
可视化工具 Grafana、Streamlit 监控智能体运行状态与城市数据可视化
部署工具 Docker、Kubernetes 容器化部署智能体,支持弹性扩缩容

环境配置步骤

步骤1:安装基础依赖

创建Python虚拟环境并安装核心库:

# 创建虚拟环境  
python -m venv agentic-ai-env  
source agentic-ai-env/bin/activate  # Linux/Mac  
agentic-ai-env\Scripts\activate     # Windows  

# 安装依赖  
pip install langchain==0.1.10 meta-gpt==0.7.2 fastapi==0.104.1 uvicorn==0.24.0 pymilvus==2.3.5 rabbitmq-client==0.1.0 python-dotenv==1.0.0  
步骤2:启动向量数据库(Milvus)

使用Docker快速启动Milvus(用于存储智能体记忆):

# 拉取Milvus Docker Compose文件  
wget https://github.com/milvus-io/milvus/releases/download/v2.3.5/milvus-standalone-docker-compose.yml -O docker-compose.yml  

# 启动Milvus  
docker-compose up -d  

# 验证启动成功(看到"Milvus standalone is ready")  
docker logs milvus-standalone  
步骤3:启动消息队列(RabbitMQ)

RabbitMQ用于多智能体通信:

docker run -d --name rabbitmq -p 5672:5672 rabbitmq:3-management  
步骤4:配置开发环境变量

创建.env文件,存储LLM API密钥、数据库连接信息等:

# LLM配置  
OPENAI_API_KEY=your_openai_key  # 若使用GPT-4  
LLAMA3_API_BASE=http://localhost:8000/v1  # 若使用本地部署的Llama 3  

# Milvus配置  
MILVUS_HOST=localhost  
MILVUS_PORT=19530  
MILVUS_COLLECTION=agent_memory  

# RabbitMQ配置  
RABBITMQ_HOST=localhost  
RABBITMQ_QUEUE=agent_communication  

智能体架构设计:从单智能体到多智能体协同

单智能体内部架构设计

单个智能体需具备感知-决策-执行-记忆四大核心模块,以下以“交通调度智能体”为例展开设计。

模块1:感知模块(Perception Module)

功能:从传感器、数据库、其他智能体获取数据;
实现:通过API调用、消息队列订阅、数据库查询获取数据;
代码示例(FastAPI数据接口):

# 传感器数据API(提供给感知模块调用)  
from fastapi import FastAPI  
from pydantic import BaseModel  
import uvicorn  

app = FastAPI()  

class TrafficData(BaseModel):  
    timestamp: str  # 格式:YYYY-MM-DD HH:MM:SS  
    location: str   # 如"downtown-main-street"  
    flow: int       # 车流量(辆/小时)  
    speed: float    # 平均速度(km/h)  

# 模拟传感器数据存储(实际项目中对接数据库)  
traffic_db = []  

@app.post("/sensor/traffic")  
def receive_traffic_data(data: TrafficData):  
    traffic_db.append(data.dict())  
    return {"status": "success"}  

@app.get("/sensor/traffic")  
def get_traffic_data(location: str, start_time: str, end_time: str):  
    # 筛选指定位置和时间范围内的数据  
    filtered = [d for d in traffic_db if d["location"] == location and start_time <= d["timestamp"] <= end_time]  
    return {"data": filtered}  

if __name__ == "__main__":  
    uvicorn.run(app, host="0.0.0.0", port=8001)  
模块2:决策模块(Decision Module)

功能:基于感知数据和提示词,通过LLM生成行动计划;
核心:BDI模型+提示工程驱动决策;
代码示例(基于LangChain的决策逻辑):

from langchain.llms import OpenAI  
from langchain.prompts import PromptTemplate  
from dotenv import load_dotenv  
import os  

load_dotenv()  
llm = OpenAI(api_key=os.getenv("OPENAI_API_KEY"), temperature=0.3)  # 低temperature确保决策稳定  

class TrafficDecisionModule:  
    def __init__(self):  
        # 定义决策提示模板  
        self.prompt_template = PromptTemplate(  
            input_variables=["current_time", "location", "traffic_flow", "weather", "goal"],  
            template="""  
            你是交通调度智能体的决策模块,当前信息:  
            - 当前时间:{current_time}  
            - 区域:{location}  
            - 实时车流量:{traffic_flow}辆/小时  
            - 天气:{weather}  
            - 目标:{goal}  

            请生成红绿灯配时方案,输出格式为JSON:{{"direction": ["东-西", "南-北"], "green_time": [X, Y]}},其中X和Y为绿灯时长(秒),需满足:  
            1. 单个方向绿灯时长不超过120秒,不低于30秒;  
            2. 若车流量>1000辆/小时,绿灯时长可增加20%;  
            3. 雨天时,绿灯时长减少10%以避免积水路段拥堵。  
            """  
        )  

    def generate_plan(self, current_time, location, traffic_flow, weather, goal):  
        # 填充提示模板  
        prompt = self.prompt_template.format(  
            current_time=current_time,  
            location=location,  
            traffic_flow=traffic_flow,  
            weather=weather,  
            goal=goal  
        )  
        # 调用LLM生成决策  
        response = llm(prompt)  
        return eval(response)  # 实际项目中需用json.loads,并增加错误处理  
模块3:执行模块(Execution Module)

功能:将决策转化为物理世界的动作(如控制红绿灯、发送调度指令);
代码示例(控制红绿灯设备):

import requests  

class TrafficExecutionModule:  
    def __init__(self, traffic_light_api_url="http://localhost:8002/traffic-light"):  
        self.api_url = traffic_light_api_url  

    def adjust_traffic_light(self, direction: list, green_time: list):  
        """调整红绿灯配时"""  
        payload = {  
            "direction": direction,  
            "green_time": green_time,  
            "timestamp": self.get_current_time()  
        }  
        # 调用红绿灯控制API  
        response = requests.post(f"{self.api_url}/adjust", json=payload)  
        if response.status_code == 200:  
            return {"status": "success", "message": "红绿灯配时已更新"}  
        else:  
            return {"status": "failed", "message": response.text}  

    def get_current_time(self):  
        """获取当前时间(格式:YYYY-MM-DD HH:MM:SS)"""  
        from datetime import datetime  
        return datetime.now().strftime("%Y-%m-%d %H:%M:%S")  
模块4:记忆模块(Memory Module)

功能:存储历史决策、执行结果、环境数据,支持智能体“经验学习”;
实现:使用Milvus向量数据库存储结构化+向量化数据;
代码示例(Milvus记忆存储):

from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType  
import numpy as np  
from dotenv import load_dotenv  
import os  

load_dotenv()  

class TrafficMemoryModule:  
    def __init__(self):  
        # 连接Milvus  
        connections.connect(  
            alias="default",  
            host=os.getenv("MILVUS_HOST"),  
            port=os.getenv("MILVUS_PORT")  
        )  
        # 定义集合 schema(存储决策记录)  
        fields = [  
            FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),  
            FieldSchema(name="timestamp", dtype=DataType.VARCHAR, max_length=20),  # 时间  
            FieldSchema(name="location", dtype=DataType.VARCHAR, max_length=50),   # 区域  
            FieldSchema(name="traffic_flow", dtype=DataType.INT64),                # 车流量  
            FieldSchema(name="decision", dtype=DataType.VARCHAR, max_length=500),  # 决策JSON  
            FieldSchema(name="result", dtype=DataType.VARCHAR, max_length=500),    # 执行结果  
            FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1536)   # 决策向量(用于相似查询)  
        ]  
        schema = CollectionSchema(fields, "交通智能体决策记忆")  
        self.collection = Collection(os.getenv("MILVUS_COLLECTION"), schema)  
        # 创建索引(用于向量检索)  
        index_params = {"index_type": "IVF_FLAT", "metric_type": "L2", "params": {"nlist": 128}}  
        self.collection.create_index("embedding", index_params)  
        self.collection.load()  

    def store_memory(self, timestamp, location, traffic_flow, decision, result, embedding):  
        """存储决策记忆"""  
        data = [  
            [timestamp], [location], [traffic_flow], [decision], [result], [embedding]  
        ]  
        self.collection.insert(data)  

    def recall_similar_memory(self, query_embedding, top_k=5):  
        """检索相似历史决策(用于学习优化)"""  
        search_params = {"metric_type": "L2", "params": {"nprobe": 10}}  
        results = self.collection.search(  
            data=[query_embedding],  
            anns_field="embedding",  
            param=search_params,  
            limit=top_k,  
            output_fields=["timestamp", "decision", "result"]  
        )  
        return [hit.entity.to_dict() for hit in results[0]]  

多智能体协同架构设计

当多个智能体(如交通、医疗、能源)协同工作时,需设计通信协议、角色分工、冲突解决机制三大核心要素。

要素1:通信协议设计

采用发布-订阅(Pub/Sub)模式,通过RabbitMQ实现智能体间异步通信:

  • 主题(Topic)划分:按领域划分主题(如traffic.*medical.*),智能体订阅相关主题;
  • 消息格式:统一使用JSON,包含sender_id(发送者)、receiver_id(接收者)、type(消息类型)、content(内容)、timestamp(时间戳)。

代码示例(RabbitMQ消息发送与接收):

import pika  
import json  
from dotenv import load_dotenv  
import os  

load_dotenv()  

class AgentCommunicator:  
    def __init__(self, agent_id):  
        self.agent_id = agent_id  
        self.connection = pika.BlockingConnection(  
            pika.ConnectionParameters(host=os.getenv("RABBITMQ_HOST"))  
        )  
        self.channel = self.connection.channel()  

    def publish(self, topic, message_type, content, receiver_id="all"):  
        """发送消息到指定主题"""  
        message = {  
            "sender_id": self.agent_id,  
            "receiver_id": receiver_id,  
            "type": message_type,  
            "content": content,  
            "timestamp": self.get_current_time()  
        }  
        self.channel.basic_publish(  
            exchange='agent_topic_exchange',  
            routing_key=topic,  
            body=json.dumps(message)  
        )  
        print(f"[x] Sent {message} to topic {topic}")  

    def subscribe(self, topic, callback):  
        """订阅主题并注册回调函数处理消息"""  
        self.channel.exchange_declare(exchange='agent_topic_exchange', exchange_type='topic')  
        result = self.channel.queue_declare(queue='', exclusive=True)  
        queue_name = result.method.queue  
        self.channel.queue_bind(exchange='agent_topic_exchange', queue=queue_name, routing_key=topic)  
        self.channel.basic_consume(  
            queue=queue_name,  
            on_message_callback=lambda ch, method, properties, body: callback(json.loads(body)),  
            auto_ack=True  
        )  
        print(f"[*] Subscribed to topic {topic}, waiting for messages...")  
        self.channel.start_consuming()  

    def get_current_time(self):  
        from datetime import datetime  
        return datetime.now().strftime("%Y-%m-%d %H:%M:%S")  

# 示例:交通智能体订阅医疗主题,接收急救请求  
def handle_medical_request(message):  
    print(f"[Traffic Agent] Received medical request: {message}")  
    # 处理逻辑:调整红绿灯配时...  

if __name__ == "__main__":  
    communicator = AgentCommunicator(agent_id="traffic-agent-01")  
    communicator.subscribe(topic="medical.emergency", callback=handle_medical_request)  
要素2:角色分工与协同流程

以“应急响应”场景为例,定义各智能体的角色与协同流程:

智能体角色 职责 输入数据 输出动作
环境感知智能体 监测异常事件(如火灾、地震),发送预警 传感器数据(温度、震动、烟雾浓度) 预警消息(type: emergency_alert
医疗智能体 调度救护车,规划急救路线 预警消息、医院资源数据 急救车辆调度请求
交通智能体 调整交通信号,保障急救车辆优先通行 调度请求、实时交通数据 红绿灯配时调整、路线引导
能源智能体 切断事发区域非必要电源,保障救援设备供电 预警消息、电网负载数据 电源切换指令
指挥智能体 协调各智能体,解决冲突(如“交通与能源智能体同时请求优先资源”) 各智能体状态消息 冲突仲裁指令

协同流程图

环境感知智能体 → 发送预警 → [医疗智能体, 交通智能体, 能源智能体, 指挥智能体]  
    ↓               ↓               ↓               ↓               ↓  
医疗智能体生成调度请求 → 交通智能体调整红绿灯 → 能源智能体切断电源 → 指挥智能体监控全过程  
    ↓               ↓               ↓               ↓  
所有智能体向指挥智能体反馈执行结果 → 指挥智能体生成应急响应报告  
要素3:冲突解决机制

当智能体目标冲突时(如“交通智能体希望延长绿灯以疏导车流,而行人智能体希望缩短绿灯保障过街安全”),需通过以下策略解决:

策略1:优先级机制

预设角色优先级(如“急救相关 > 公共安全 > 通行效率”),冲突时高优先级智能体优先。

  • 示例(优先级规则提示):
当收到多个智能体的资源请求时,按以下优先级排序:  
1. 医疗急救相关请求(优先级1);  
2. 公共安全相关请求(优先级2);  
3. 交通效率优化请求(优先级3);  
4. 能源/商业优化请求(优先级4)。  
若优先级相同,按请求时间顺序处理。  
策略2:协商机制

低优先级智能体主动调整目标,与高优先级智能体协商。

  • 示例(交通与行人智能体协商):
行人智能体:“当前行人过街等待已达120秒,请求将东西向绿灯缩短30秒”;  
交通智能体:“当前东西向车流量较大,若缩短30秒,拥堵率将上升15%。建议折中:缩短15秒,同时启用行人二次过街模式”;  
行人智能体:“同意折中方案”。  
策略3:仲裁机制

当协商失败时,由“指挥智能体”作为仲裁者,基于城市整体目标(如“最小化总体社会成本”)强制决策。

提示工程实战:智能城市场景下的提示词设计

提示词模板库构建

针对智能城市的典型场景,构建可复用的提示词模板库,提升开发效率。

模板1:交通流量优化提示模板
TRAFFIC_OPTIMIZATION_PROMPT = """  
你是交通调度智能体,负责{location}区域的交通流量优化。当前信息:  
- 时间:{current_time}({time_period},如早高峰/平峰/晚高峰)  
- 实时数据:车流量{traffic_flow}辆/小时,平均速度{avg_speed} km/h,拥堵路段{congested_roads}  
- 特殊事件:{special_events}(如无/大型活动/施工)  

你的任务是:  
1. 分析当前拥堵原因(如“主干道车流量过大”“红绿灯配时不合理”);  
2. 生成未来{prediction_time}分钟的车流量预测;  
3. 输出红绿灯配时方案(格式:{{"direction": ["东-西", "南-北"], "green_time": [X, Y]}});  
4. 若预测拥堵率>30%,需额外生成3条交通疏导建议(如“建议临时开放公交专用道”)。  

约束条件:  
- 单个方向绿灯时长范围:30-120秒;  
- 优先保障公交车、校车通行;  
- 避免同一方向连续两次绿灯时长差异超过20秒。  
"""  
模板2:医疗急救调度提示模板
MEDICAL_EMERGENCY_PROMPT = """  
你是医疗急救智能体,收到环境感知智能体发送的紧急事件报告:  
{emergency_report}(格式:{{"event_type": "fire/accident", "location": "XX街道", "casualties_estimated": X人}})  

当前医院资源状态:  
{hospital_status}(格式:{{"hospital_id": "H1", "available_ambulances": 3, "available_beds": 10}, ...})  

你的任务是:  
1. 选择距离事发地最近且有可用救护车的医院;  
2. 调度救护车(格式:{{"ambulance_id": "A01", "departure_hospital": "H1", "estimated_arrival_time": X分钟}});  
3. 通过消息队列向交通智能体发送优先通行请求(格式:{{"vehicle_id": "A01", "route": "{location}→H1"}});  
4. 生成伤员分诊建议(如“优先救治重伤员,预计需要3辆救护车”)。  

约束条件:  
- 救护车响应时间不得超过15分钟;  
- 医院剩余床位需≥预估伤员数;  
- 若最近医院资源不足,依次选择次近医院。  
"""  
模板3:多智能体协同提示模板(指挥智能体)
COMMAND_AGENT_PROMPT = """  
你是城市应急指挥智能体,负责协调各智能体处理{event_type}事件。当前各智能体状态:  
{agents_status}(格式:{{"agent_id": "traffic-agent", "status": "idle/busy", "current_task": "..."},...})  

各智能体已发送的请求:  
{agent_requests}(格式:{{"sender_id": "medical-agent", "request_type": "traffic_priority", "content": "..."},...})  

你的任务是:  
1. 检查是否存在请求冲突(如两个智能体同时请求同一资源);  
2. 按优先级规则(急救>安全>效率)仲裁冲突;  
3. 向冲突智能体发送仲裁结果(格式:{{"status": "approved/denied", "reason": "...", "alternative_solution": "..."}});  
4. 每5分钟汇总各智能体执行进度,更新应急响应报告。  

优先级规则:  
1. 直接关系生命安全的请求(如医疗急救、火灾救援)优先级最高;  
2. 影响公共秩序的请求(如交通疏导、人群控制)次之;  
3. 资源优化类请求(如能源调度、商业活动支持)优先级最低。  
"""  

提示词动态优化技术

为应对城市场景的动态变化,需通过反馈数据持续优化提示词,提升智能体决策质量。

方法1:基于执行结果的提示词迭代

通过“执行结果→效果评估→提示词调整”闭环迭代:

  1. 效果评估指标:定义量化指标(如“交通优化后拥堵率下降百分比”“急救响应时间缩短秒数”);
  2. 提示词调整策略:若指标未达标,分析原因并调整提示词(如增加约束条件、细化目标)。

代码示例(提示词迭代优化):

class PromptOptimizer:  
    def __init__(self, initial_prompt, evaluation_metric):  
        self.prompt = initial_prompt  
        self.evaluation_metric = evaluation_metric  # 评估指标函数(如计算拥堵率下降百分比)  
        self.iteration_count = 0  

    def evaluate(self, execution_result):  
        """评估执行结果是否达标"""  
        score = self.evaluation_metric(execution_result)  
        print(f"Iteration {self.iteration_count}: Score = {score}")  
        return score  

    def optimize(self, execution_result, feedback):  
        """根据反馈优化提示词"""  
        self.iteration_count += 1  
        score = self.evaluate(execution_result)  

        # 若分数低于阈值(如目标的80%),调整提示词  
        if score < 0.8:  
            print(f"Optimizing prompt based on feedback: {feedback}")  
            # 策略1:增加约束条件(如反馈为“绿灯时长长导致拥堵”)  
            if "绿灯时长" in feedback:  
                self.prompt += "\n额外约束:单个方向绿灯时长不得超过90秒。"  
            # 策略2:细化目标(如反馈为“未优先公交车”)  
            if "公交车" in feedback:  
                self.prompt = self.prompt.replace("优先保障公交车", "优先保障公交车,若检测到公交车队列长度>5辆,绿灯时长增加10秒")  
        return self.prompt  

# 使用示例  
def congestion_reduction_metric(result):  
    """评估指标:拥堵率下降百分比"""  
    return result.get("congestion_reduction", 0)  

optimizer = PromptOptimizer(  
    initial_prompt=TRAFFIC_OPTIMIZATION_PROMPT,  
    evaluation_metric=congestion_reduction_metric  
)  

# 模拟执行结果与反馈  
execution_result = {"congestion_reduction": 0.15}  # 拥堵率下降15%(未达20%目标)  
feedback = "绿灯时长长导致拥堵,且未优先公交车"  
optimized_prompt = optimizer.optimize(execution_result, feedback)  
print("优化后的提示词:", optimized_prompt)  
方法2:上下文感知提示生成

通过工具调用获取实时上下文(如天气、交通事件),动态填充提示词模板:

  • 使用LangChain的Tool类定义工具(如“获取实时天气”“查询交通事件”);
  • 在提示词生成时,自动调用工具获取数据并嵌入模板。

代码示例(上下文感知提示生成):

from langchain.agents import Tool  
from langchain.agents import initialize_agent  
from langchain.llms import OpenAI  
import requests  

# 定义工具:获取实时天气  
def get_weather(location):  
    url = f"http://api.weatherapi.com/v1/current.json?key=YOUR_WEATHER_API_KEY&q={location}"  
    response = requests.get(url)  
    return response.json()["current"]["condition"]["text"]  # 如"rainy"  

# 定义工具:获取交通事件  
def get_traffic_events(location):  
    url = f"http://api.trafficapi.com/v1/events?location={location}"  
    response = requests.get(url)  
    return [event["description"] for event in response.json()["events"]]  

# 初始化工具列表  
tools = [  
    Tool(  
        name="GetWeather",  
        func=get_weather,  
        description="获取指定位置的实时天气,输入为城市区域名称(如'downtown')"  
    ),  
    Tool(  
        name="GetTrafficEvents",  
        func=get_traffic_events,  
        description="获取指定位置的交通事件(如施工、事故),输入为城市区域名称"  
    )  
]  

# 初始化智能体(使用LangChain的Agent)  
llm = OpenAI(temperature=0.3)  
agent = initialize_agent(  
    tools=tools,  
    llm=llm,  
    agent="zero-shot-react-description",  
    verbose=True  
)  

# 动态生成带上下文的提示词  
def generate_context_aware_prompt(location, goal):  
    # 调用工具获取天气和交通事件  
    weather = agent.run(f"获取{location}的实时天气")  
    traffic_events = agent.run(f"获取{location}的交通事件")  
    special_events = ";".join(traffic_events) if traffic_events else "无"  

    # 填充提示词模板  
    prompt = TRAFFIC_OPTIMIZATION_PROMPT.format(  
        location=location,  
        current_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  
        time_period="早高峰" if 7 <= datetime.now().hour <= 9 else "平峰",  
        traffic_flow=800,  # 假设从传感器获取  
        avg_speed=25,      # 假设从传感器获取  
        congested_roads="Main St, Oak Ave",  # 假设从传感器获取  
        special_events=special_events,  
        prediction_time=30,  
        weather=weather  
    )  
    return prompt  

# 使用示例  
prompt = generate_context_aware_prompt(location="downtown", goal="降低早高峰拥堵率20%")  
print("上下文感知提示词:", prompt)  

分步

Logo

脑启社区是一个专注类脑智能领域的开发者社区。欢迎加入社区,共建类脑智能生态。社区为开发者提供了丰富的开源类脑工具软件、类脑算法模型及数据集、类脑知识库、类脑技术培训课程以及类脑应用案例等资源。

更多推荐