语音 Agent
本教程将指导你构建一个完整的语音 Agent——你可以对它说话,它能听懂并回答你。我们将串接语音识别(Speech-to-Text)、大语言模型(LLM)和语音合成(Text-to-Speech)三个环节,并通过 LangGraph 编排它们。
概述
语音 Agent 的核心流程:
用户语音 → 语音识别 (STT) → 文本 → LLM 处理 → 文本 → 语音合成 (TTS) → 语音输出
↑
对话历史 (记忆)使用 LangGraph 实现的主要优势:
- 状态管理:统一管理对话历史和音频文件
- 灵活编排:可插入错误处理、重试、中断等逻辑
- 可观察性:每个环节的输入输出都可追踪
环境准备
bash
pip install langchain langgraph langchain-openai
pip install openai-whisper sounddevice soundfile numpy
pip install edge-tts # 免费的 TTS 引擎(无需 API Key)
# 或者使用 OpenAI TTS: pip install openai配置 API 密钥(如果使用 OpenAI 的服务):
python
import getpass
import os
if not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = getpass.getpass("OpenAI API Key: ")定义状态
python
from typing import Annotated, List, Optional, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, MessagesState
from langchain_core.messages import BaseMessage
class VoiceAgentState(MessagesState):
"""语音 Agent 的状态。"""
audio_input_path: Optional[str] # 输入音频文件路径
transcription: Optional[str] # 语音识别文本
response_text: Optional[str] # LLM 生成的回答文本
audio_output_path: Optional[str] # 输出音频文件路径
conversation_id: str # 对话 ID
error: Optional[str] # 错误信息构建语音识别节点 (STT)
我们提供两种方案:本地 Whisper 和 OpenAI Whisper API。
方案 A:本地 Whisper
python
import whisper
import numpy as np
class LocalSTT:
"""使用本地 Whisper 模型进行语音识别。"""
def __init__(self, model_size: str = "base"):
self.model = whisper.load_model(model_size)
def transcribe(self, audio_path: str) -> str:
"""将音频文件转录为文本。"""
result = self.model.transcribe(audio_path, language="zh")
return result["text"].strip()
# 初始化
local_stt = LocalSTT(model_size="base")方案 B:OpenAI Whisper API
python
from openai import OpenAI
class OpenAISTT:
"""使用 OpenAI Whisper API 进行语音识别。"""
def __init__(self):
self.client = OpenAI()
def transcribe(self, audio_path: str) -> str:
"""使用 OpenAI API 将音频转录为文本。"""
with open(audio_path, "rb") as audio_file:
transcript = self.client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
language="zh"
)
return transcript.text.strip()音频录制工具
python
import sounddevice as sd
import soundfile as sf
def record_audio(
filename: str = "input.wav",
duration: int = 5,
sample_rate: int = 16000
):
"""录制麦克风音频。"""
print(f"🎤 录音中...({duration}秒)")
audio = sd.rec(
int(duration * sample_rate),
samplerate=sample_rate,
channels=1
)
sd.wait()
sf.write(filename, audio, sample_rate)
print(f"✅ 录音完成: {filename}")
return filenameSTT 节点
python
def stt_node(state: VoiceAgentState) -> VoiceAgentState:
"""语音识别节点:将音频转换为文本。"""
audio_path = state.get("audio_input_path")
if not audio_path:
return {**state, "error": "缺少音频输入文件"}
try:
# 使用 OpenAI API(也可替换为本地 Whisper)
stt = OpenAISTT()
text = stt.transcribe(audio_path)
return {**state, "transcription": text, "error": None}
except Exception as e:
return {**state, "error": f"语音识别失败: {str(e)}"}构建 LLM 处理节点
python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
VOICE_SYSTEM_PROMPT = """你是语音助手"小言",用中文回答用户的问题。
注意:
1. 回答要简洁口语化,适合语音播报
2. 避免使用列表、表格等不适合听觉的格式
3. 使用自然的停顿和连接词
4. 内容长度控制在 200 字以内"""
def llm_processing_node(state: VoiceAgentState) -> VoiceAgentState:
"""LLM 处理节点:根据文本生成回答。"""
user_text = state.get("transcription", "")
if not user_text:
return {**state, "error": "缺少用户输入文本"}
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
# 构建消息列表(包含对话历史)
messages = [SystemMessage(content=VOICE_SYSTEM_PROMPT)]
# 如果有对话历史,添加到消息中
if state.get("messages"):
messages.extend(state["messages"])
messages.append(HumanMessage(content=user_text))
response = llm.invoke(messages)
return {
**state,
"response_text": response.content,
"messages": messages + [AIMessage(content=response.content)]
}构建语音合成节点 (TTS)
方案 A:Edge TTS(免费,无需 API Key)
python
import edge_tts
import asyncio
class EdgeTTS:
"""使用 Edge TTS(免费)进行语音合成。"""
def __init__(self, voice: str = "zh-CN-XiaoxiaoNeural"):
self.voice = voice
# 可选中文语音:
# zh-CN-XiaoxiaoNeural (女声)
# zh-CN-YunxiNeural (男声)
# zh-CN-XiaohanNeural (女声, 温暖)
async def synthesize(self, text: str, output_path: str):
"""文本转语音并保存为音频文件。"""
communicate = edge_tts.Communicate(text, self.voice)
await communicate.save(output_path)
return output_path
def run(self, text: str, output_path: str) -> str:
"""同步接口包装。"""
asyncio.run(self.synthesize(text, output_path))
return output_path
# 初始化
tts = EdgeTTS(voice="zh-CN-XiaoxiaoNeural")方案 B:OpenAI TTS
python
class OpenAITTS:
"""使用 OpenAI TTS API 进行语音合成。"""
def __init__(self, voice: str = "alloy"):
self.client = OpenAI()
self.voice = voice
# 可选:alloy, echo, fable, nova, shimmer
def synthesize(self, text: str, output_path: str) -> str:
"""文本转语音并保存为音频文件。"""
response = self.client.audio.speech.create(
model="tts-1",
voice=self.voice,
input=text
)
response.stream_to_file(output_path)
return output_pathTTS 节点
python
def tts_node(state: VoiceAgentState) -> VoiceAgentState:
"""语音合成节点:将文本转换为语音。"""
text = state.get("response_text", "")
if not text:
return {**state, "error": "缺少回答文本"}
try:
output_path = f"response_{state.get('conversation_id', 'default')}.mp3"
tts = EdgeTTS()
tts.run(text, output_path)
return {**state, "audio_output_path": output_path, "error": None}
except Exception as e:
return {**state, "error": f"语音合成失败: {str(e)}"}播放音频
python
def play_audio(audio_path: str):
"""播放音频文件。"""
import sounddevice as sd
import soundfile as sf
data, sample_rate = sf.read(audio_path)
sd.play(data, sample_rate)
sd.wait()
def play_and_return(state: VoiceAgentState) -> VoiceAgentState:
"""播放生成的语音并返回状态。"""
audio_path = state.get("audio_output_path")
if audio_path:
play_audio(audio_path)
return state组装 StateGraph
python
from langgraph.graph import END, StateGraph
# 初始化图
workflow = StateGraph(VoiceAgentState)
# 添加节点
workflow.add_node("stt", stt_node)
workflow.add_node("llm_process", llm_processing_node)
workflow.add_node("tts", tts_node)
workflow.add_node("play_audio", play_and_return)
# 入口点
workflow.set_entry_point("stt")
# 连接边
workflow.add_edge("stt", "llm_process")
workflow.add_edge("llm_process", "tts")
workflow.add_edge("tts", "play_audio")
workflow.add_edge("play_audio", END)
# 编译
voice_agent = workflow.compile()运行语音 Agent
完整流水线
python
import uuid
def run_voice_agent(duration: int = 5):
"""完整的语音交互流水线:录音 → 转录 → LLM → 合成 → 播放。"""
conversation_id = str(uuid.uuid4())
# 1. 录音
audio_path = record_audio("input.wav", duration=duration)
# 2. 运行语音 Agent
result = voice_agent.invoke({
"audio_input_path": audio_path,
"conversation_id": conversation_id
})
# 3. 打印转录和回答
if result.get("error"):
print(f"❌ 错误: {result['error']}")
else:
print(f"🗣️ 你说: {result['transcription']}")
print(f"🤖 回答: {result['response_text']}")
print(f"🔊 语音已播放: {result.get('audio_output_path')}")
return result
# 交互式对话循环
def interactive_voice_chat():
"""多轮语音对话循环。"""
print("🎙️ 语音助手已启动!按 Ctrl+C 退出")
print("=" * 40)
all_messages = []
conversation_id = str(uuid.uuid4())
try:
while True:
audio_path = record_audio(f"input_{uuid.uuid4().hex[:8]}.wav", duration=4)
result = voice_agent.invoke({
"audio_input_path": audio_path,
"conversation_id": conversation_id,
"messages": all_messages
})
if result.get("error"):
print(f"❌ 错误: {result['error']}")
continue
print(f"🗣️ 你说: {result['transcription']}")
print(f"🤖 回答: {result['response_text']}")
print("=" * 40)
# 更新对话历史
all_messages = result.get("messages", [])
except KeyboardInterrupt:
print("\n👋 语音助手已退出")python
# 运行单次交互
# run_voice_agent(duration=5)
# 或运行交互式对话
# interactive_voice_chat()使用 create_agent API 构建语音 Agent
LangGraph 的 create_agent 可以快速构建包含工具调用的语音 Agent:
python
from langgraph.prebuilt import create_agent
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.tools import tool
# 定义工具
@tool
def get_weather(location: str) -> str:
"""获取指定地点的天气信息。"""
# 这里可以接入真实天气 API
weather_data = {
"北京": "晴,25°C",
"上海": "多云,28°C",
"广州": "阵雨,30°C",
"深圳": "多云,29°C"
}
return weather_data.get(location, f"抱歉,没有 {location} 的天气数据")
@tool
def set_reminder(text: str, minutes: int) -> str:
"""设置一个提醒。"""
return f"✅ 已设置提醒:{minutes}分钟后提醒你{text}"
# 创建语音 Agent
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
voice_agent_with_tools = create_agent(
llm,
tools=[get_weather, set_reminder],
system_prompt=VOICE_SYSTEM_PROMPT,
checkpointer=MemorySaver()
)
# 语音交互处理函数
def process_voice_input(audio_path: str, thread_id: str = "voice-1"):
"""处理语音输入并返回语音输出。"""
# 1. 语音识别
stt = OpenAISTT()
text = stt.transcribe(audio_path)
print(f"🗣️ 你说: {text}")
# 2. LLM 处理(带工具调用)
config = {"configurable": {"thread_id": thread_id}}
result = voice_agent_with_tools.invoke(
{"messages": [{"role": "user", "content": text}]},
config=config
)
response_text = result["messages"][-1].content
print(f"🤖 回答: {response_text}")
# 3. 语音合成
tts = EdgeTTS()
output_path = f"response_{thread_id}.mp3"
tts.run(response_text, output_path)
# 4. 播放语音
play_audio(output_path)
return response_text
# 使用示例
# process_voice_input("input.wav")进阶:带中断和确认的语音 Agent
python
class AdvancedVoiceState(VoiceAgentState):
"""高级语音 Agent 状态,支持确认和中断。"""
requires_confirmation: bool = False
pending_action: Optional[str] = None
is_interrupted: bool = False
def confirmation_node(state: AdvancedVoiceState) -> AdvancedVoiceState:
"""在执��关键操作前要求用户语音确认。"""
if state.get("requires_confirmation"):
# 生成确认语音
confirm_text = f"你确定要执行{state.get('pending_action', '这个操作')}吗?"
tts = EdgeTTS()
confirm_path = f"confirm_{state.get('conversation_id')}.mp3"
tts.run(confirm_text, confirm_path)
play_audio(confirm_path)
# 等待用户确认(重新录音)
audio_path = record_audio("confirm_input.wav", duration=3)
stt = OpenAISTT()
confirm_text = stt.transcribe(audio_path)
if "确定" in confirm_text or "是" in confirm_text:
return {
**state,
"requires_confirmation": False,
"transcription": state.get("pending_action", "")
}
else:
return {
**state,
"response_text": "已取消操作",
}
return state延迟与性能优化
语音 Agent 的延迟主要由三个环节组成:
| 环节 | 延迟 | 优化方案 |
|---|---|---|
| STT | 1-3s (API) / 0.5-2s (本地) | 使用流式 STT |
| LLM | 1-5s | 使用流式输出 |
| TTS | 0.5-2s | 使用流式 TTS |
流式语音输出(尽可能降低延迟)
python
async def stream_voice_response(audio_path: str):
"""流式语音响应:一边生成一边播报。"""
# 1. 流式 STT
stt = OpenAISTT()
user_text = stt.transcribe(audio_path)
# 2. 流式 LLM
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
full_response = ""
async for chunk in llm.astream([
SystemMessage(content=VOICE_SYSTEM_PROMPT),
HumanMessage(content=user_text)
]):
if hasattr(chunk, 'content') and chunk.content:
full_response += chunk.content
# 3. 流式 TTS(分批合成,分批播放)
from edge_tts import Communicate
communicate = Communicate(full_response, "zh-CN-XiaoxiaoNeural")
# 边合成边播放
async for chunk in communicate.stream():
if chunk["type"] == "audio":
# 播放音频块(伪代码)
pass # play_audio_chunk(chunk["data"])下一步
- 学习 自定义 RAG Agent
- 探索 LangGraph 流式输出
- 了解 工具调用
- 查看 生产部署案例