admin管理员组

文章数量:1531524

引言

LangGraph是一个基于图状态机构建复杂、稳定的AI agent的库。本文介绍LangGraph的核心概念。

基于LangGraph官方文档。

背景

尽管可能每个人对AI Agent由什么构成的定义不同,这里将agent看成是利用语言模型来控制工作流(workflow)循环和采取行动的系统。原型的LLM agent使用ReAct式设计,将LLM应用于驱动一个基本循环,具体步骤如下:

  • 推理并规划要采取的动作
  • 调用工具(普通的函数)来采取动作
  • 观察工具的影响(结果)然后重新规划乎或行动

虽然LLM agent在这方面效果出人意料,但单纯的agent循环在大规模情况下并不能提供用户期待的可靠性。它们具有美丽的随机性。设计精良的系统充分利用这种随机性,使得该系统能容忍LLM输出中的错误。

AI设计模式应该应用良好工程实践,包括:

  • AI应用必须在自主运行和用户控制之间取得平衡;
  • Agent应用类似分布式系统,都需要具备容错和纠正能力;
  • Multi-agent(多智能体)系统在并行时需要解决多人在线类似的冲突;
  • 需要有撤回和版本控制;

LangGraph的StateGraph抽象支持这些需求,提供比AgentExecutor框架更低级别的API,可以完全控制何时何地以及如何使用AI。

核心设计

LangGraph将Agent的工作流建模为状态机,可以使用三个关键组件定义Agent的行为:

  1. State: 表示应用当前快照的共享数据结构,可以是任何Python类型,通常是TypedDictPydanticBaseModel
  2. Node: 编码Agent逻辑的Python函数,接收当前状态,执行一些计算或副作用(side-effect),返回更新后的状态;
  3. Edge: 根据当前状态确定下一个要执行的节点的控制流规则,可以是条件分支或固定转换;

通过组合节点和边,可以创建随时间演变状态的复杂循环工作流,LangGraph的强大在于它如何管理这些状态。

LangGraph的底层图算法使用消息传递来定义通信程序,当一个节点完成时,它沿着一个或多个边发送消息给其他节点。这些节点运行它们的函数,将结果消息传递给下一组节点,以此类推。受Pregel启发,程序按离散的超步(super-step)进行,这些超步在概念上都是并行执行。

当运行图时,所有的节点都处于非活动(inactive)状态,每当入边(incoming edge/channel)接收到新消息(状态)时,节点变为活动(active)状态。运行函数,并响应更新。在每个超步结束时,每个节点都会通过将自身标记为非活动状态来投票停止(vote to halt)。如果没有更多的消息传入,当所有节点都处于非活动状态且无消息在传输时图终止。

Node

节点通常是Python函数,第一个位置参数都是状态,第二个可选的位置参数是config,包含配置参数(例如thread_id)。使用add_node方法将这些节点添加到图中。

from langchain_core.runnables import RunnableConfig
from langgraph.graph import END, START, StateGraph

builder = StateGraph(dict)


def my_node(state: dict, config: RunnableConfig):
    print("In node: ", config["configurable"]["user_id"])
    return {"results": f"Hello, {state['input']}!"}


# The second argument is optional
def my_other_node(state: dict):
    return state


builder.add_node("my_node", my_node)
builder.add_node("other_node", my_other_node)
builder.add_edge(START, "my_node")
builder.add_edge("my_node", "other_node")
builder.add_edge("other_node", END)
graph = builder.compile()
graph.invoke({"input": "Will"}, {"configurable": {"user_id": "abcd-123"}})
# In node:  abcd-123
# {'results': 'Hello, Will!'}

在底层,函数会被转换为RunnableLambda,它添加了批处理和异步支持,以及跟踪和调试功能。

Edge

边定义了逻辑的路由方式和图如何决定停止。类似节点,它们接收图的当前状态并返回一个值。默认该值是要将状态发送到下一个节点或节点的名称。所有的这些节点将作为下一个超步的一部分并行运行。

如果想重用边,可以选择提供一个字典,将边的输出映射到下一个节点的名称。

如果希望始终从节点A到节点B,可以直接使用add_edge方法。

如果要选择性地路由到一个或多个边(或选择性地终止),可以使用add_conditional_edges方法。

如果一个节点有多个出边,所有这些目标节点将作为下一个超步的一部分并行执行

状态管理

LangGraph引入了状态管理的两个关键概念:状态模式(state schema)和reducer。

状态模式定义了提供给图中每个节点的对象的类型。

Reducer定义了如何将节点输出应用于当前状态。例如,可以使用reducer将心的对话响应合并到对话历史记录中,或将多个Agent节点的输出平均聚合在一起(average together)。

下面通过一个示例来看看reducer的工作原理,比较下面两个状态。

from typing import Annotated

from typing_extensions import TypedDict

from langgraph.graph import END, START, StateGraph


class StateA(TypedDict):
    value: int


builder = StateGraph(StateA)
builder.add_node("my_node", lambda state: {"value": 1}) # 更新value为1
builder.add_edge(START, "my_node")
builder.add_edge("my_node", END)
graph = builder.compile()
graph.invoke({"value": 5})

和StateB:

from typing import Annotated

from typing_extensions import TypedDict

from langgraph.graph import END, START, StateGraph



def add(existing: int, new: int):
    return existing + new


class StateB(TypedDict):
    # 高亮的新行
    value: Annotated[int, add]


builder = StateGraph(StateB)
builder.add_node("my_node", lambda state: {"value": 1}) # 更新为 5 + 1
builder.add_edge(START, "my_node")
builder.add_edge("my_node", END)
graph = builder.compile()
graph.invoke({"value": 5})

在StateA中,结果是1。因为状态的默认reducer是直接覆盖。在StateB中,结果是6,因为我们将add函数创建为reducer,它接收现有状态和状态更新,并返回更新后的值。

虽然我们通常使用TypedDict作为State的state schema,实际上可以是几乎任何类型,下面的代码也是有效的:

# Analogous to StateA above
builder = StateGraph(int)
builder.add_node("my_node", lambda state: 1)
builder.add_edge(START, "my_node")
builder.add_edge("my_node", END)
builder.compile().invoke(5)

# Analogous to StateB
def add(left, right):
    return left + right


builder = StateGraph(Annotated[int, add])
builder.add_node("my_node", lambda state: 1)
builder.add_edge(START, "my_node")
builder.add_edge("my_node", END)
graph = builder.compile()
graph.invoke(5)

这意味着可以使用Pydantic BaseModel作为图的状态,可以添加默认值和额外的数据验证

当构建像ChatGPT这样的聊天机器人时,状态可能仅仅是一个聊天消息列表。这是MessageGraph(StateGrpah的轻量包装器)使用的状态,仅比下面的稍微复杂一点:

builder = StateGraph(Annotated[list, add])

在图中使用共享状态涉及一些设计的权衡。共享一个类型化状态提供了很多与构建AI工作流相关的优势,包括:

  1. 在每个超步之和和之后完全检验数据流;
  2. 状态是可变的,可以让用户(或其他客户端)在超步之间写入相同的状态来控制Agent的方向(通过update_state)变得容易;
  3. 明确定义了检查点,可以很容易保存和恢复。

持久化

任何智能系统都需要记忆才能运作。AI智能体也是一样,需要跨一个或多个时间范围(timeframe)的记忆:

  • 它们需要记住已经完成的步骤(以避免回答特定问题时重复自己);
  • 需要记住与用户进行的多轮对话中的先前轮次(用于指代消歧或提供额外的上下文);
  • 理想情况下需要从与用户之前交互中记住上下文,以便在行为上更加个性化和高效;

最后一种记忆形式涵盖了很多内容(个性化、优化、持续学习等),超出了本次的内容。

前两种记忆形式通过基于检查点的StateGraph的API来支持。

检查点

检查点(checkpoint)代表应用程序和用户之间进行的多轮互动中线程的状态。在单次运行中创建的检查点将具有一组在从该状态开始时执行的下一个节点。在给定运行结束时创建的检查点是相同的,只是没有下一个节点可以转换(正在等待用户输入)。

检查点支持聊天记忆等功能,可以tag并持久化系统中已经采取的每个状态。

单轮记忆

Agent的每一步都被设为检查点,在代理未能实现你的目标而遇到错误的情况下,可以随时从其中一个保存的检查点恢复它的任务。

这可以支持human-in-the-loop工作流,在执行给定节点之前或之后,可以中断图的执行将控制权交给用户,这个用户可以立即回复,也可以之后回复。你的工作流都可以随时恢复。

多轮记忆

检查点保存在一个thread_id下,来支持用户和系统之间的多轮交互。在如何配置图以添加多轮记忆支持方面没有任何区别,因为检查点工作在整个过程中都是相同的。

如果要在多轮对话中保留一部分状态并将一些状态视为"短暂的",你可以在图的最终节点中清除相关状态。

使用检查点就像调用compile(checkpointer=my_checkpointer)一样简单,然后在其可配置参数中使用一个thread_id来调用它。

Thread

线程表示图的不同会话,它们将状态检查点组织在离散会话中,以便在应用中支持多用户对话。

一个典型的聊天机器人应用为每个用户创建了多个线程,每个线程代表一次对话,都具有自己的持久化的聊天记录和其他状态。线程内的检查点可以根据需要进行回放和分支。

当一个StateGraph基于checkpointer编译,每次调用图时都需要通过配置(configuration)提供一个thread_id

Configuration

对于任何给定的图部署,你可能希望有一些可在运行时控制的可配置值。这些与图输入不同,因为它们不是要视为状态变量。

一个常见的例子是对话线程thread_id、用户user_id、选择使用哪个LLM、在检索器中返回多少个文档等。虽然你可以将这些值传递到状态中,但最好将其与常规数据流分开。

我们来看一个例子,看多轮记忆是如何工作的。

from typing import Annotated

from typing_extensions import TypedDict

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, StateGraph


def add(left, right):
    return left + right


class State(TypedDict):
    total: Annotated[int, add]
    turn: str


builder = StateGraph(State) # 不存在检查点
builder.add_node("add_one", lambda x: {"total": 1}) # 默认初始或增加1
builder.add_edge(START, "add_one") # 进入add_one
builder.add_edge("add_one", END) # 结束

memory = MemorySaver()
graph = builder.compile(checkpointer=memory) # 基于checkpointer编译
thread_id = "some-thread"
config = {"configurable": {"thread_id": thread_id}} # 配置thread_id
result = graph.invoke({"total": 1, "turn": "First Turn"}, config) # 第一次运行,累加到2
result2 = graph.invoke({"turn": "Next Turn"}, config) # 累加到3,默认传入total=1
result3 = graph.invoke({"total": 5}, config) # 累加 5+1,变成9
result4 = graph.invoke({"total": 5}, {"configurable": {"thread_id": "new-thread-id"}}) # 累加到6,因为是新的对话,从1开始累加

对于第一次运行,不存在检查点,因此图是在原始输入上运行的。total值从1增加到2,turn设置为First Turn

对于第二次运行,用户更新了turn,但没有更新total!由于我们是从状态中加载的,先前的结果增加了1(在add_one节点中),然后turn被用户覆盖。

对于第三次运行,turn保持不变,因为它是从检查点加载的,而没有被用户覆盖。total增加了用户提供的值,因为这个值是通过add函数reduce(更新)的。

对于第四次运行,使用了一个新的线程id,但没有找到检查点,所以结果仅仅是默认的total增加1

这种面向用户的行为等同于没有检查点情况下运行以下内容:

graph = builder.compile()
result = graph.invoke({"total": 1, "turn": "First Turn"})
result2 = graph.invoke({**result, "turn": "Next Turn"})
result3 = graph.invoke({**result2, "total": result2["total"] + 5})
result4 = graph.invoke({"total": 5})

对于StateGraph的单轮执行数据流

下面我们看复杂一点的例子,通过在上面的玩具示例中添加一个条件边。

from typing import Annotated, Literal

from typing_extensions import TypedDict

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, StateGraph



def add(left, right):
    return left + right


class State(TypedDict):
    total: Annotated[int, add]


builder = StateGraph(State)
builder.add_node("add_one", lambda x: {"total": 1}) # 新增1 
builder.add_node("double", lambda x: {"total": x["total"]}) # 新增现在的x值,即翻倍
builder.add_edge(START, "add_one")

# 定义一个路由 route -> double 也可能 route -> end
def route(state: State) -> Literal["double", "__end__"]:
    if state["total"] < 6: 
        return "double" # 路由到double
    return "__end__" # 结束


builder.add_conditional_edges("add_one", route) # add_one -> route
builder.add_edge("double", "add_one") # double -> add_one

memory = MemorySaver()
graph = builder.compile(checkpointer=memory)

然后第一次调用:

thread_id = "some-thread"
config = {"configurable": {"thread_id": thread_id}}
for step in graph.stream({"total": 1}, config, stream_mode="debug"):
    print(step["step"], step["type"], step["payload"].get("values"))
# 0 checkpoint {'total': 1} 将输入的1增加到初始值0中,得到1
# 1 checkpoint {'total': 2}  进入 add_one ,新增了1
# 2 checkpoint {'total': 4} 进入route,到double,翻倍
# 3 checkpoint {'total': 5}  double返回到add_one,新增了1
# 4 checkpoint {'total': 10} 进入route,到double,翻倍
# 5 checkpoint {'total': 11} double返回到add_one,新增了1,然后进入route到end

下面详细介绍执行过程:

  1. 图查找检查点。 没有找到检查点,因此状态被初始化为0。
  2. 接下来,图将用户的输入作为状态更新应用,将输入1加到现有值0中。 在这一超步结束时,总量(total)为1
  3. 调用add_one节点,返回1
  4. 将此更新加到现有总量(1)中。 状态现在是2
  5. 调用条件边route,由于值小于6,继续到double节点。
  6. doube输出现有的总量2并返回。 然后将其加到现有状态中。 状态现在是4
  7. 图通过add_one返回(5),检查条件边并继续进行,因为它小于6
  8. 经过double后,总量为10
  9. 固定边回到add_one,总量为11,检查条件边,由于大于6,程序终止。

第二轮调用,我们使用同样的配置:

for step in graph.stream(
    {"total": -2, "turn": "First Turn"}, config, stream_mode="debug"
):
    print(step["step"], step["type"], step["payload"].get("values"))
# 7 checkpoint {'total': 9} 输入为-2,11-2=9
# 8 checkpoint {'total': 10} 进入add_one,增加了1,变成10
  1. 它应用了来自用户输入的更新。 add reducer 将总量从0更改为-2
  2. 图寻找检查点。 将其加载到内存中作为初始状态。 总数现在为9=-2+11
  3. add_one节点以此状态被调用。 它返回10
  4. 使用reducer应用该更新,将值更新为10
  5. 进入route,由于值大于6,终止程序。

本文标签: 核心概念LangGraph