AI 工作流:异步状态机与工程落地

在企业级应用里,工作流编排是处理复杂业务的核心组件。随着大模型接入业务系统,工作流不再只是简单的条件分支,而是需要根据 AI 的输出来动态决定下一步该做什么。这时候,设计一套高性能、低认知负担的异步框架,就成了工程师必须解决的问题。

同步阻塞的坑

构建包含耗时操作(比如调用大模型、爬取数据、多媒体处理)的工作流时,很多系统习惯用同步阻塞调用。并发量一上来,线程池迅速耗尽,服务直接挂起,甚至死锁。

大模型调用的网络延迟通常在几秒到几十秒,这意味着必须采用完全解耦的异步非阻塞架构。如果状态控制和自动补偿机制没做好,任何一个子步骤的网络抖动,都会导致长周期工作流残留在内存里,造成资源泄漏和数据不一致。核心痛点很明确:如何设计一个容错性强、逻辑直观的异步状态机,让业务在不可靠的网络里也能跑稳。

异步状态机模型

支持长周期、有弹性的 AI 工作流,最稳妥的方案是“基于持久化事件驱动的异步状态机”。每个节点由引擎调度,状态转移时序列化保存,支持断点续传和异常自愈。

状态流转逻辑大致如下:

graph TD
    A[Pending / 初始化] --> B[Running / 任务执行中]
    B -->|执行成功| C[Success / 成功状态]
    B -->|执行失败且未超出重试限制| D[Retrying / 触发指数避退重试]
    B -->|遇到不可逆重大错误 / 超限| E[Failed / 失败状态]
    D --> B
    C -->|分析下一节点依赖| F{是否还有后续任务?}
    F -- 是 --> A
    F -- 否 --> G[Completed / 工作流完成]
    E --> H[触发自动补偿机制 / 通知人工干预]

有了这个状态闭环,任务中断时系统能自动保存上下文,崩溃恢复后可以从最后一个成功节点继续执行,对用户来说几乎是透明的。

工程实现

下面用 Python 原生 asyncio 实现一个轻量级的异步工作流调度引擎。不依赖外部大型组件,实现了任务节点注册、拓扑依赖处理、超时中断、避退重试、异常自动补偿(Compensation Rollback)以及核心事件状态变迁捕获。

# workflow_engine.py
import asyncio
import logging
from enum import Enum
from typing import Dict, List, Callable, Any, Optional

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")

class TaskState(Enum):
    PENDING = "PENDING"
    RUNNING = "RUNNING"
    SUCCESS = "SUCCESS"
    RETRYING = "RETRYING"
    FAILED = "FAILED"

class TaskNode:
    def __init__(self, name: str, action: Callable[[Dict[str, Any]], Any], 
                 compensation: Optional[Callable[[Dict[str, Any]], Any]] = None,
                 max_retries: int = 3, timeout: float = 5.0):
        self.name = name
        self.action = action
        self.compensation = compensation
        self.max_retries = max_retries
        self.timeout = timeout
        self.state = TaskState.PENDING
        self.retries = 0

class Workflow:
    def __init__(self):
        self.nodes: Dict[str, TaskNode] = {}
        self.dependencies: Dict[str, List[str]] = {}
        self.context: Dict[str, Any] = {}

    def add_node(self, node: TaskNode, dependencies: List[str] = []):
        self.nodes[node.name] = node
        self.dependencies[node.name] = dependencies

    async def execute_node(self, node: TaskNode) -> bool:
        node.state = TaskState.RUNNING
        logging.info(f"Task '{node.name}' started execution.")
        
        while node.retries <= node.max_retries:
            try:
                # 超时限制与异步任务包装
                result = await asyncio.wait_for(
                    asyncio.to_thread(node.action, self.context), 
                    timeout=node.timeout
                )
                node.state = TaskState.SUCCESS
                self.context[f"{node.name}_output"] = result
                logging.info(f"Task '{node.name}' finished successfully.")
                return True
            except Exception as e:
                node.retries += 1
                if node.retries <= node.max_retries:
                    node.state = TaskState.RETRYING
                    backoff = 2 ** node.retries
                    logging.warning(
                        f"Task '{node.name}' failed due to: {str(e)}. "
                        f"Retrying in {backoff} seconds... ({node.retries}/{node.max_retries})"
                    )
                    await asyncio.sleep(backoff)
                else:
                    node.state = TaskState.FAILED
                    logging.error(f"Task '{node.name}' exceeded maximum retries. Processing rollback.")
                    if node.compensation:
                        try:
                            await asyncio.to_thread(node.compensation, self.context)
                            logging.info(f"Compensation for '{node.name}' executed successfully.")
                        except Exception as comp_err:
                            logging.critical(f"Compensation for '{node.name}' failed: {str(comp_err)}")
                    return False
        return False

    async def run(self) -> bool:
        completed = set()
        running_tasks = {}

        while len(completed) < len(self.nodes):
            ready_nodes = []
            for name, node in self.nodes.items():
                if name in completed or name in running_tasks:
                    continue
                deps = self.dependencies[name]
                if all(d in completed and self.nodes[d].state == TaskState.SUCCESS for d in deps):
                    ready_nodes.append(node)

            if not ready_nodes and not running_tasks:
                logging.error("Deadlock or circular dependency detected in workflow definition.")
                return False

            for node in ready_nodes:
                task = asyncio.create_task(self.execute_node(node))
                running_tasks[node.name] = task

            if not running_tasks:
                await asyncio.sleep(0.1)
                continue

            done, _ = await asyncio.wait(running_tasks.values(), return_when=asyncio.FIRST_COMPLETED)
            
            for task in done:
                finished_name = None
                for name, t in running_tasks.items():
                    if t == task:
                        finished_name = name
                        break
                
                if finished_name:
                    success = task.result()
                    del running_tasks[finished_name]
                    if success:
                        completed.add(finished_name)
                    else:
                        logging.error(f"Workflow execution halted due to failure of critical task '{finished_name}'.")
                        return False
        return True

实际坑点:持久化、可观测性与资源泄漏

轻量级引擎虽然可控性高,但在架构层面有几个问题必须面对:

  1. 持久化与吞吐量的权衡:每一步状态变迁都写数据库,响应延迟会明显增加。高并发场景下,可以用批量事务合并,或者用 Redis 做临时状态存储。在非核心链路,适当妥协一致性来换取性能是必要的。
  2. 可观测性:异步系统排查问题很麻烦。必须全链路传递 TraceID,每个节点运行时把输出绑定到 Trace 上下文。这样出问题时,能拉出一条清晰的调用链审计日志。
  3. 资源泄漏:异步任务必须显式声明全局最大超时(TTL),禁止无期限挂起的循环等待。否则某个节点卡住,整个工作流都会悬在内存里。

结语

智能工作流平台的核心是对系统状态的掌控。通过轻量级异步状态机、关键节点的异常补偿与避退重试,再加上合理的可观测性投入,团队才能以较低的重构代价,构建出能支撑大模型复杂调用的业务系统。

Logo

脑启社区是一个专注类脑智能领域的开发者社区。欢迎加入社区,共建类脑智能生态。社区为开发者提供了丰富的开源类脑工具软件、类脑算法模型及数据集、类脑知识库、类脑技术培训课程以及类脑应用案例等资源。

更多推荐