函数式 API
函数式 API(Functional API) 允许你在现有代码中,以最少的改动集成 LangGraph 的核心功能:持久化、记忆、人在回路(Human-in-the-loop)和流式输出。
它的设计目标是将这些功能集成到使用标准语言原语(如 if 语句、for 循环和函数调用)进行分支和流程控制的现有代码中。与许多需要将代码重构为显式管道或 DAG 的数据编排框架不同,函数式 API 允许你在不强制使用严格执行模型的情况下融入这些能力。
函数式 API 使用两个关键构建块:
@entrypoint:将函数标记为工作流的入口点,封装逻辑并管理执行流程,包括处理长时间运行的任务和中断。@task:代表一个离散的工作单元,如 API 调用或数据处理步骤,可在入口点内异步执行。任务返回一个类似 future 的对象,可以被等待或同步解析。
函数式 API 与图 API 的对比
对于偏好更声明式方法的用户,LangGraph 的图 API 允许使用图范式定义工作流。两种 API 共享相同的底层运行时,可以在同一个应用中混合使用。
主要区别:
| 方面 | 函数式 API | 图 API |
|---|---|---|
| 控制流 | 使用标准 Python 结构(if、for、函数调用) | 需要声明 State、Nodes、Edges |
| 状态管理 | 状态作用域限定在函数内部 | 需要声明 State 和 Reducer |
| Checkpoint | 任务结果保存到入口点的 checkpoint | 每个 super-step 产生一个 checkpoint |
| 可视化 | 不支持(图在运行时动态生成) | 易于可视化工作流图 |
使用 @entrypoint 和 @task
基本示例
以下示例展示了一个使用 @entrypoint 和 @task 的简单 Agent:
python
import time
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import interrupt
@task
def search_web(query: str) -> str:
"""搜索网络信息。"""
time.sleep(1) # 模拟长时间运行的 API 调用
return f"Search results for: {query}"
@task
def summarize(text: str) -> str:
"""总结信息。"""
return f"Summary: {text[:50]}..."
@entrypoint(checkpointer=InMemorySaver())
def research_agent(topic: str) -> dict:
"""一个简单的工作流:搜索、总结、请求审核。"""
# 执行搜索任务
results = search_web(topic).result()
# 总结结果
summary = summarize(results).result()
# 中断以请求人工审核
is_approved = interrupt({
"summary": summary,
"action": "Please approve or reject the summary",
})
return {
"topic": topic,
"summary": summary,
"is_approved": is_approved,
}执行和恢复
使用 invoke、stream 或它们的异步版本来执行工作流:
python
from langchain_core.utils.uuid import uuid7
# 首次调用
thread_id = str(uuid7())
config = {"configurable": {"thread_id": thread_id}}
for chunk in research_agent.stream("AI in healthcare", config):
print(chunk)
# > {'search_web': 'Search results for: AI in healthcare'}
# > {'summarize': 'Summary: Search results for: AI in healthcare...'}
# > {'__interrupt__': (Interrupt(value={'summary': ..., 'action': '...'}),)}恢复执行(例如在人工审核后):
python
from langgraph.types import Command
human_review = True
for chunk in research_agent.stream(Command(resume=human_review), config):
print(chunk)
# > {'research_agent': {'topic': 'AI in healthcare', 'summary': ..., 'is_approved': True}}使用 create_agent 的函数式风格
你可以在函数式 API 内部使用 create_agent 来构建 Agent,获得两全其美的效果:
python
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver
from langchain.agents import create_agent
from langchain.tools import tool
@tool
def get_weather(city: str) -> str:
"""获取指定城市的天气。"""
return f"The weather in {city} is sunny, 22°C."
@entrypoint(checkpointer=InMemorySaver())
def weather_agent(city: str) -> dict:
"""一个函数式 Agent,使用 create_agent 处理天气查询。"""
agent = create_agent(
model="claude-sonnet-4-6",
tools=[get_weather],
system_prompt=f"You are a helpful weather assistant for {city}.",
)
result = agent.invoke({
"messages": [{"role": "user", "content": f"What's the weather like in {city}?"}],
})
return {
"city": city,
"response": result["messages"][-1].content,
}
# 使用
result = weather_agent.invoke(
"Beijing",
{"configurable": {"thread_id": str(uuid7())}},
)Injectable 参数
在声明 @entrypoint 时,你可以请求访问在运行时会自动注入的额外参数:
| 参数 | 类型 | 描述 |
|---|---|---|
| previous | Any | 访问该线程之前 checkpoint 的状态。用于短期记忆。 |
| store | BaseStore | BaseStore 实例。用于长期记忆。 |
| writer | StreamWriter | 用于流式输出自定义数据。 |
| config | RunnableConfig | 访问运行时配置。 |
python
from langchain_core.runnables import RunnableConfig
from langgraph.func import entrypoint
from langgraph.store.base import BaseStore
from langgraph.store.memory import InMemoryStore
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import StreamWriter
checkpointer = InMemorySaver()
memory_store = InMemoryStore()
@entrypoint(checkpointer=checkpointer, store=memory_store)
def personalized_agent(
user_input: dict,
*,
previous: Any = None,
store: BaseStore,
writer: StreamWriter,
config: RunnableConfig,
) -> dict:
# 从 store 读取长期记忆
user_id = config["configurable"].get("user_id", "default")
prefs = store.get(("preferences", user_id))
# 使用 "previous" 参数获取先前的状态
last_topic = previous.get("topic") if previous else None
# 向 writer 写入自定义流数据
writer({"type": "status", "message": "Processing..."})
return {
"response": f"Hello! Your last topic was: {last_topic}",
"topic": user_input.get("topic"),
}与图 API 混合使用
函数式 API 和图 API 共享同一底层运行时,因此可以混合使用:
python
from langgraph.graph import StateGraph, START, END
from langgraph.func import entrypoint, task
from typing_extensions import TypedDict
class State(TypedDict):
query: str
results: str
@task
def search_task(query: str) -> str:
return f"Results for: {query}"
# 图 API 节点可以调用函数式 API 入口点
def graph_node(state: State) -> State:
return state
# 函数式入口点可以嵌入图的执行流程
@entrypoint()
def hybrid_workflow(query: str) -> dict:
# 使用任务
result = search_task(query).result()
# 使用图
builder = StateGraph(State)
builder.add_node("process", graph_node)
builder.add_edge(START, "process")
builder.add_edge("process", END)
graph = builder.compile()
graph_result = graph.invoke({"query": query, "results": result})
return graph_result
# 使用
output = hybrid_workflow.invoke("LangGraph framework")