Skip to content

函数式 API

函数式 API(Functional API) 允许你在现有代码中,以最少的改动集成 LangGraph 的核心功能:持久化记忆人在回路(Human-in-the-loop)流式输出

它的设计目标是将这些功能集成到使用标准语言原语(如 if 语句、for 循环和函数调用)进行分支和流程控制的现有代码中。与许多需要将代码重构为显式管道或 DAG 的数据编排框架不同,函数式 API 允许你在不强制使用严格执行模型的情况下融入这些能力。

函数式 API 使用两个关键构建块:

  • @entrypoint:将函数标记为工作流的入口点,封装逻辑并管理执行流程,包括处理长时间运行的任务和中断。
  • @task:代表一个离散的工作单元,如 API 调用或数据处理步骤,可在入口点内异步执行。任务返回一个类似 future 的对象,可以被等待或同步解析。

函数式 API 与图 API 的对比

对于偏好更声明式方法的用户,LangGraph 的图 API 允许使用图范式定义工作流。两种 API 共享相同的底层运行时,可以在同一个应用中混合使用。

主要区别:

方面函数式 API图 API
控制流使用标准 Python 结构(iffor、函数调用)需要声明 State、Nodes、Edges
状态管理状态作用域限定在函数内部需要声明 State 和 Reducer
Checkpoint任务结果保存到入口点的 checkpoint每个 super-step 产生一个 checkpoint
可视化不支持(图在运行时动态生成)易于可视化工作流图

使用 @entrypoint@task

基本示例

以下示例展示了一个使用 @entrypoint@task 的简单 Agent:

python
import time
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import interrupt


@task
def search_web(query: str) -> str:
    """搜索网络信息。"""
    time.sleep(1)  # 模拟长时间运行的 API 调用
    return f"Search results for: {query}"


@task
def summarize(text: str) -> str:
    """总结信息。"""
    return f"Summary: {text[:50]}..."


@entrypoint(checkpointer=InMemorySaver())
def research_agent(topic: str) -> dict:
    """一个简单的工作流:搜索、总结、请求审核。"""
    # 执行搜索任务
    results = search_web(topic).result()

    # 总结结果
    summary = summarize(results).result()

    # 中断以请求人工审核
    is_approved = interrupt({
        "summary": summary,
        "action": "Please approve or reject the summary",
    })

    return {
        "topic": topic,
        "summary": summary,
        "is_approved": is_approved,
    }

执行和恢复

使用 invokestream 或它们的异步版本来执行工作流:

python
from langchain_core.utils.uuid import uuid7


# 首次调用
thread_id = str(uuid7())
config = {"configurable": {"thread_id": thread_id}}

for chunk in research_agent.stream("AI in healthcare", config):
    print(chunk)
# > {'search_web': 'Search results for: AI in healthcare'}
# > {'summarize': 'Summary: Search results for: AI in healthcare...'}
# > {'__interrupt__': (Interrupt(value={'summary': ..., 'action': '...'}),)}

恢复执行(例如在人工审核后):

python
from langgraph.types import Command

human_review = True

for chunk in research_agent.stream(Command(resume=human_review), config):
    print(chunk)
# > {'research_agent': {'topic': 'AI in healthcare', 'summary': ..., 'is_approved': True}}

使用 create_agent 的函数式风格

你可以在函数式 API 内部使用 create_agent 来构建 Agent,获得两全其美的效果:

python
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver
from langchain.agents import create_agent
from langchain.tools import tool


@tool
def get_weather(city: str) -> str:
    """获取指定城市的天气。"""
    return f"The weather in {city} is sunny, 22°C."


@entrypoint(checkpointer=InMemorySaver())
def weather_agent(city: str) -> dict:
    """一个函数式 Agent,使用 create_agent 处理天气查询。"""
    agent = create_agent(
        model="claude-sonnet-4-6",
        tools=[get_weather],
        system_prompt=f"You are a helpful weather assistant for {city}.",
    )

    result = agent.invoke({
        "messages": [{"role": "user", "content": f"What's the weather like in {city}?"}],
    })

    return {
        "city": city,
        "response": result["messages"][-1].content,
    }


# 使用
result = weather_agent.invoke(
    "Beijing",
    {"configurable": {"thread_id": str(uuid7())}},
)

Injectable 参数

在声明 @entrypoint 时,你可以请求访问在运行时会自动注入的额外参数:

参数类型描述
previousAny访问该线程之前 checkpoint 的状态。用于短期记忆。
storeBaseStoreBaseStore 实例。用于长期记忆。
writerStreamWriter用于流式输出自定义数据。
configRunnableConfig访问运行时配置。
python
from langchain_core.runnables import RunnableConfig
from langgraph.func import entrypoint
from langgraph.store.base import BaseStore
from langgraph.store.memory import InMemoryStore
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import StreamWriter


checkpointer = InMemorySaver()
memory_store = InMemoryStore()


@entrypoint(checkpointer=checkpointer, store=memory_store)
def personalized_agent(
    user_input: dict,
    *,
    previous: Any = None,
    store: BaseStore,
    writer: StreamWriter,
    config: RunnableConfig,
) -> dict:
    # 从 store 读取长期记忆
    user_id = config["configurable"].get("user_id", "default")
    prefs = store.get(("preferences", user_id))

    # 使用 "previous" 参数获取先前的状态
    last_topic = previous.get("topic") if previous else None

    # 向 writer 写入自定义流数据
    writer({"type": "status", "message": "Processing..."})

    return {
        "response": f"Hello! Your last topic was: {last_topic}",
        "topic": user_input.get("topic"),
    }

与图 API 混合使用

函数式 API 和图 API 共享同一底层运行时,因此可以混合使用:

python
from langgraph.graph import StateGraph, START, END
from langgraph.func import entrypoint, task
from typing_extensions import TypedDict


class State(TypedDict):
    query: str
    results: str


@task
def search_task(query: str) -> str:
    return f"Results for: {query}"


# 图 API 节点可以调用函数式 API 入口点
def graph_node(state: State) -> State:
    return state


# 函数式入口点可以嵌入图的执行流程
@entrypoint()
def hybrid_workflow(query: str) -> dict:
    # 使用任务
    result = search_task(query).result()

    # 使用图
    builder = StateGraph(State)
    builder.add_node("process", graph_node)
    builder.add_edge(START, "process")
    builder.add_edge("process", END)
    graph = builder.compile()

    graph_result = graph.invoke({"query": query, "results": result})
    return graph_result


# 使用
output = hybrid_workflow.invoke("LangGraph framework")

下一步

本站为非官方中文学习站点,不代表 LangChain 官方。部分内容参考官方文档并重新整理为中文学习笔记。