AI 工作流:异步状态机与工程落地
·
AI 工作流:异步状态机与工程落地
在企业级应用里,工作流编排是处理复杂业务的核心组件。随着大模型接入业务系统,工作流不再只是简单的条件分支,而是需要根据 AI 的输出来动态决定下一步该做什么。这时候,设计一套高性能、低认知负担的异步框架,就成了工程师必须解决的问题。
同步阻塞的坑
构建包含耗时操作(比如调用大模型、爬取数据、多媒体处理)的工作流时,很多系统习惯用同步阻塞调用。并发量一上来,线程池迅速耗尽,服务直接挂起,甚至死锁。
大模型调用的网络延迟通常在几秒到几十秒,这意味着必须采用完全解耦的异步非阻塞架构。如果状态控制和自动补偿机制没做好,任何一个子步骤的网络抖动,都会导致长周期工作流残留在内存里,造成资源泄漏和数据不一致。核心痛点很明确:如何设计一个容错性强、逻辑直观的异步状态机,让业务在不可靠的网络里也能跑稳。
异步状态机模型
支持长周期、有弹性的 AI 工作流,最稳妥的方案是“基于持久化事件驱动的异步状态机”。每个节点由引擎调度,状态转移时序列化保存,支持断点续传和异常自愈。
状态流转逻辑大致如下:
graph TD
A[Pending / 初始化] --> B[Running / 任务执行中]
B -->|执行成功| C[Success / 成功状态]
B -->|执行失败且未超出重试限制| D[Retrying / 触发指数避退重试]
B -->|遇到不可逆重大错误 / 超限| E[Failed / 失败状态]
D --> B
C -->|分析下一节点依赖| F{是否还有后续任务?}
F -- 是 --> A
F -- 否 --> G[Completed / 工作流完成]
E --> H[触发自动补偿机制 / 通知人工干预]
有了这个状态闭环,任务中断时系统能自动保存上下文,崩溃恢复后可以从最后一个成功节点继续执行,对用户来说几乎是透明的。
工程实现
下面用 Python 原生 asyncio 实现一个轻量级的异步工作流调度引擎。不依赖外部大型组件,实现了任务节点注册、拓扑依赖处理、超时中断、避退重试、异常自动补偿(Compensation Rollback)以及核心事件状态变迁捕获。
# workflow_engine.py
import asyncio
import logging
from enum import Enum
from typing import Dict, List, Callable, Any, Optional
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
class TaskState(Enum):
PENDING = "PENDING"
RUNNING = "RUNNING"
SUCCESS = "SUCCESS"
RETRYING = "RETRYING"
FAILED = "FAILED"
class TaskNode:
def __init__(self, name: str, action: Callable[[Dict[str, Any]], Any],
compensation: Optional[Callable[[Dict[str, Any]], Any]] = None,
max_retries: int = 3, timeout: float = 5.0):
self.name = name
self.action = action
self.compensation = compensation
self.max_retries = max_retries
self.timeout = timeout
self.state = TaskState.PENDING
self.retries = 0
class Workflow:
def __init__(self):
self.nodes: Dict[str, TaskNode] = {}
self.dependencies: Dict[str, List[str]] = {}
self.context: Dict[str, Any] = {}
def add_node(self, node: TaskNode, dependencies: List[str] = []):
self.nodes[node.name] = node
self.dependencies[node.name] = dependencies
async def execute_node(self, node: TaskNode) -> bool:
node.state = TaskState.RUNNING
logging.info(f"Task '{node.name}' started execution.")
while node.retries <= node.max_retries:
try:
# 超时限制与异步任务包装
result = await asyncio.wait_for(
asyncio.to_thread(node.action, self.context),
timeout=node.timeout
)
node.state = TaskState.SUCCESS
self.context[f"{node.name}_output"] = result
logging.info(f"Task '{node.name}' finished successfully.")
return True
except Exception as e:
node.retries += 1
if node.retries <= node.max_retries:
node.state = TaskState.RETRYING
backoff = 2 ** node.retries
logging.warning(
f"Task '{node.name}' failed due to: {str(e)}. "
f"Retrying in {backoff} seconds... ({node.retries}/{node.max_retries})"
)
await asyncio.sleep(backoff)
else:
node.state = TaskState.FAILED
logging.error(f"Task '{node.name}' exceeded maximum retries. Processing rollback.")
if node.compensation:
try:
await asyncio.to_thread(node.compensation, self.context)
logging.info(f"Compensation for '{node.name}' executed successfully.")
except Exception as comp_err:
logging.critical(f"Compensation for '{node.name}' failed: {str(comp_err)}")
return False
return False
async def run(self) -> bool:
completed = set()
running_tasks = {}
while len(completed) < len(self.nodes):
ready_nodes = []
for name, node in self.nodes.items():
if name in completed or name in running_tasks:
continue
deps = self.dependencies[name]
if all(d in completed and self.nodes[d].state == TaskState.SUCCESS for d in deps):
ready_nodes.append(node)
if not ready_nodes and not running_tasks:
logging.error("Deadlock or circular dependency detected in workflow definition.")
return False
for node in ready_nodes:
task = asyncio.create_task(self.execute_node(node))
running_tasks[node.name] = task
if not running_tasks:
await asyncio.sleep(0.1)
continue
done, _ = await asyncio.wait(running_tasks.values(), return_when=asyncio.FIRST_COMPLETED)
for task in done:
finished_name = None
for name, t in running_tasks.items():
if t == task:
finished_name = name
break
if finished_name:
success = task.result()
del running_tasks[finished_name]
if success:
completed.add(finished_name)
else:
logging.error(f"Workflow execution halted due to failure of critical task '{finished_name}'.")
return False
return True
实际坑点:持久化、可观测性与资源泄漏
轻量级引擎虽然可控性高,但在架构层面有几个问题必须面对:
- 持久化与吞吐量的权衡:每一步状态变迁都写数据库,响应延迟会明显增加。高并发场景下,可以用批量事务合并,或者用 Redis 做临时状态存储。在非核心链路,适当妥协一致性来换取性能是必要的。
- 可观测性:异步系统排查问题很麻烦。必须全链路传递 TraceID,每个节点运行时把输出绑定到 Trace 上下文。这样出问题时,能拉出一条清晰的调用链审计日志。
- 资源泄漏:异步任务必须显式声明全局最大超时(TTL),禁止无期限挂起的循环等待。否则某个节点卡住,整个工作流都会悬在内存里。
结语
智能工作流平台的核心是对系统状态的掌控。通过轻量级异步状态机、关键节点的异常补偿与避退重试,再加上合理的可观测性投入,团队才能以较低的重构代价,构建出能支撑大模型复杂调用的业务系统。
更多推荐
所有评论(0)