源码
4.1 本章目标
学完这章你能做到:
- 用 LangGraph 构建有固定步骤的工作流(不是 Agent 那种自主决策的)
- 实现 4 个实用模板:周报、会议纪要、邮件润色、PRD 骨架
- 前端用节点流程图实时展示每个步骤的执行状态
- 实现 Human-in-the-Loop:工作流在关键节点暂停,等人工确认再继续
本章对应项目功能:模块四 — 内容生成工作流
4.2 工作流 vs Agent,什么时候用哪个
上一章做了 Agent,这章做工作流,很多人会问:这俩有什么区别?
Agent 的特点
步骤不固定,由模型自己决定。你给它一个目标,它会选择调用哪些工具、调用几次、顺序怎么安排,全是运行时动态决定的。
用户:分析 Vue3 和 React,生成报告
Agent 自己决定:
→ 调 web_search("Vue3")
→ 调 web_search("React")
→ 调 write_report(...)
完成
适合:任务模糊、步骤不确定、需要 AI 自主规划的场景。
工作流的特点
步骤完全固定,开发者提前设计好。每一步做什么、顺序是什么,代码里写死的,不会变。
周报生成工作流,步骤永远是这样:
步骤1:提炼工作亮点(固定)
步骤2:识别风险阻塞(固定)
步骤3:等人工确认(固定)
步骤4:生成周报(固定)
适合:步骤确定、质量要求高、需要人工介入的场景。
一句话总结
- Agent:给目标,AI 自己规划怎么做
- 工作流:开发者规划好,AI 按步骤执行
实际工作中的选择
| 场景 | 推荐方案 |
|---|---|
| 帮我查一下竞品,整理个报告 | Agent(步骤不确定) |
| 生成周报 | 工作流(步骤固定,有模板) |
| 帮我做一些研究 | Agent |
| 把会议记录整理成纪要 | 工作流(固定格式) |
| 我有个复杂任务,帮我完成 | Agent |
| 润色这封邮件 | 工作流(固定步骤:分析→检查→输出) |
4.3 LangGraph 工作流基础
4.3.1 工作流的三个核心概念
状态(State)
工作流执行过程中的所有数据,放在一个对象里。 每个节点读入状态、修改后输出,下一个节点接着用。
const State = Annotation.Root({
// 用户输入
points: Annotation({ reducer: (_, n) => n, default: () => '' }),
// 第一步的输出
highlights: Annotation({ reducer: (_, n) => n, default: () => '' }),
// 第二步的输出
risks: Annotation({ reducer: (_, n) => n, default: () => '' }),
// 最终结果
report: Annotation({ reducer: (_, n) => n, default: () => '' }),
})
reducer: (_, n) => n 是什么意思?
每个字段需要定义一个 reducer 函数,决定当这个字段被更新时怎么合并。 (_, n) => n 的意思是:直接用新值覆盖旧值(_ 是旧值,n 是新值)。
如果是消息列表那种场景,可以写 (old, new) => [...old, ...new],把新内容追加到旧内容后面。
节点(Node)
每个节点就是一个普通的 async 函数,接收当前状态,返回要更新的字段。
async function extractHighlights(state) {
// 读入状态(用用户输入的工作要点)
const result = await callLLM(state.points)
// 只返回需要更新的字段
// 注意:不需要返回完整的 state,只返回有变化的字段
return { highlights: result }
}
边(Edge)
决定节点之间的流转顺序。分两种:
// 普通边:A 完成后,一定执行 B
.addEdge('node_a', 'node_b')
// 条件边:A 完成后,根据状态决定走 B 还是 C
.addConditionalEdges('node_a', routeFunction, {
'go_b': 'node_b',
'go_c': 'node_c',
})
4.3.2 构建第一个工作流
用周报生成来举例,把上面三个概念串起来:
import { StateGraph, END, START, Annotation } from '@langchain/langgraph'
import { MemorySaver } from '@langchain/langgraph'
// 第一步:定义状态
const State = Annotation.Root({
points: Annotation({ reducer: (_, n) => n, default: () => '' }),
highlights: Annotation({ reducer: (_, n) => n, default: () => '' }),
risks: Annotation({ reducer: (_, n) => n, default: () => '' }),
report: Annotation({ reducer: (_, n) => n, default: () => '' }),
humanFeedback: Annotation({ reducer: (_, n) => n, default: () => '' }),
})
// 第二步:定义节点函数
async function extractHighlights(state) {
const result = await callLLM(
'从工作要点中提炼3-5条亮点',
state.points
)
return { highlights: result }
}
async function identifyRisks(state) {
const result = await callLLM(
'识别风险和阻塞项',
state.points
)
return { risks: result }
}
// 这个函数什么都不做,是一个"占位符"
// 实际暂停行为由 interruptBefore 配置控制
async function humanReview(state) {
return {}
}
async function generateReport(state) {
const result = await callLLM(
`生成周报,用户反馈:${state.humanFeedback}`,
`亮点:${state.highlights}\n风险:${state.risks}`
)
return { report: result }
}
// 第三步:构建图
const checkpointer = new MemorySaver() // 存档点,用于暂停/恢复
const graph = new StateGraph(State)
.addNode('extract_highlights', extractHighlights)
.addNode('identify_risks', identifyRisks)
.addNode('human_review', humanReview)
.addNode('generate_report', generateReport)
.addEdge(START, 'extract_highlights')
.addEdge('extract_highlights', 'identify_risks')
.addEdge('identify_risks', 'human_review')
.addEdge('human_review', 'generate_report')
.addEdge('generate_report', END)
.compile({
checkpointer,
interruptBefore: ['human_review'], // ← 关键!在这个节点前暂停
})
4.4 Human-in-the-Loop(人机协作)
这是本章最有价值的技术点。
4.4.1 为什么需要人工干预
AI 生成的内容不一定符合你的期望,尤其是:
- 周报里提炼的"亮点"不是你想强调的
- 会议纪要里漏掉了某个重要结论
- 邮件润色后语气变得太正式了
Human-in-the-Loop 的思路:先让 AI 把前几步做完,把中间结果展示给你看,你确认没问题(或者提出修改意见),然后 AI 再继续生成最终结果。
4.4.2 暂停机制:interruptBefore
graph.compile({
checkpointer, // 必须有存档点
interruptBefore: ['human_review'], // 在执行 human_review 节点之前暂停
})
执行工作流时,到 human_review 节点前,streamEvents 会自动停止输出,整个图就暂停了。状态被保存在 checkpointer 里。
// 启动工作流(会自动在 human_review 前暂停)
for await (const event of graph.streamEvents(input, { ...config, version: 'v2' })) {
// 处理事件...
}
// 到这里,图已经暂停了,不是真的完成了
// 检查当前状态
const state = await graph.getState(config)
console.log(state.next) // ['human_review'] ← 说明还没执行 human_review,暂停了
console.log(state.values) // 包含已执行节点的输出(highlights、risks 等)
4.4.3 恢复:updateState + 传 null 继续
人工确认后,可以往状态里注入反馈,然后从暂停处继续:
// 1. 注入人工反馈(可选)
if (feedback) {
await graph.updateState(config, { humanFeedback: feedback })
}
// 2. 传 null 给 streamEvents,从暂停处继续执行
for await (const event of graph.streamEvents(null, { ...config, version: 'v2' })) {
// 继续处理后续节点的事件...
}
注意:config 里必须包含同样的 thread_id,LangGraph 靠这个找到对应的存档。
4.4.4 完整交互时序
用户填写输入内容
↓
[前端] POST /api/workflow/start/stream
↓
[服务端] 执行 extractHighlights → identifyRisks
↓
[服务端] 到 human_review 前自动暂停
↓
SSE 推送 → event: paused,附带 threadId + 中间产物
↓
[前端] 展示中间产物,显示 HumanReviewPanel
↓
用户查看亮点/风险,可选填写"这里写得不对,帮我换一下"
↓
[前端] POST /api/workflow/resume/stream(带 threadId 和 feedback)
↓
[服务端] 注入 humanFeedback 到状态,继续执行 generateReport
↓
SSE 流式推送最终周报内容
↓
[前端] event: completed,展示结果
4.5 服务端实现要点
4.5.1 工作流实例的生命周期管理
// 用 Map 存活跃的工作流实例
const activeWorkflows = new Map() // threadId → { graph, meta, config }
// 启动时存入
const graph = builder()
activeWorkflows.set(threadId, { graph, meta, config })
// 恢复时取出
const wf = activeWorkflows.get(threadId)
if (!wf) {
return res.status(404).json({ error: '工作流不存在或已过期,请重新启动' })
}
// 完成后清理
activeWorkflows.delete(threadId)
注意:MemorySaver 把状态存在内存里,服务重启后状态丢失。 生产环境要换成 PostgresSaver 或 RedisSaver,状态持久化。
4.5.2 节点状态推送
streamEvents 的事件类型很多,我们只关心节点开始/结束:
for await (const event of graph.streamEvents(input, { ...config, version: 'v2' })) {
const { event: eventType, name } = event
// 节点开始执行(过滤掉 LangGraph 内部节点)
if (eventType === 'on_chain_start') {
const nodeInMeta = meta.nodes.find(n => n.id === name)
if (nodeInMeta) {
send('node_start', { nodeId: name, label: nodeInMeta.label })
}
}
// 节点执行完毕
if (eventType === 'on_chain_end') {
const nodeInMeta = meta.nodes.find(n => n.id === name)
if (nodeInMeta) {
// 提取节点输出的预览文本(给前端在节点卡片上显示)
const output = event.data?.output
const preview = getPreview(output)
send('node_done', { nodeId: name, preview })
}
}
}
为什么需要 **meta.nodes.find(n => n.id === name)** 来过滤?
on_chain_start 事件不只在业务节点上触发,LangGraph 内部也有很多节点(__start__、LangGraph 等)。我们只关心自己定义的业务节点,所以用 meta 里的节点列表来过滤。
4.5.3 暂停检测
工作流暂停后,如何知道是暂停还是真的完成了?
// 遍历完 streamEvents 之后检查状态
const state = await graph.getState(config)
if (state.next?.length > 0) {
// state.next 不为空 → 还有待执行的节点 → 说明是暂停的
// state.next[0] 就是下一个要执行的节点名
send('paused', {
threadId,
nextNode: state.next[0],
intermediates: getIntermediates(state.values, workflowId),
})
} else {
// state.next 为空 → 所有节点都执行完了 → 真正完成
send('completed', { threadId, result: state.values[meta.resultKey] })
}
4.6 前端实现要点
4.6.1 工作流 store 的状态机设计
一个工作流的执行过程有几种状态,前端要根据这些状态决定显示什么:
idle(初始)
↓ 用户点击开始
running(执行中)
↓ 到达 human_review
paused(等待人工审核)
↓ 用户点击继续
running(继续执行最后节点)
↓ 完成
completed(展示结果)
在 store 里用几个 ref 组合表达这个状态机:
const running = ref(false) // 是否在执行
const paused = ref(false) // 是否暂停在审核
const result = ref('') // 最终结果
const streamBuffer = ref('') // 流式输出的缓冲
各状态对应的 ref 值:
| 阶段 | running | paused | result | streamBuffer |
|---|---|---|---|---|
| 输入阶段 | false | false | '' | '' |
| 执行中 | true | false | '' | '' |
| 等待审核 | false | true | '' | '' |
| 执行最后节点 | true | false | '' | 有内容 |
| 完成 | false | false | 有内容 | '' |
4.6.2 节点状态的 reactive 对象
// nodeStates 存每个节点的状态
const nodeStates = reactive({})
// { 'extract_highlights': 'done', 'identify_risks': 'running', ... }
// 可能的值:'idle' | 'running' | 'done' | 'waiting'
// nodeOutputs 存每个节点的输出预览
const nodeOutputs = reactive({})
// { 'extract_highlights': '• 完成了需求评审...\n• 发布了新版本...' }
为什么用 reactive 而不是 ref?
因为 nodeStates 是一个对象,我们会频繁修改它的属性(nodeStates['extract_highlights'] = 'done')。用 reactive 时,直接修改属性,Vue 就能追踪到变化。
如果用 ref,修改内部属性需要 nodeStates.value['extract_highlights'] = 'done',在模板里访问也要 .value,更麻烦。
4.6.3 WorkflowGraph 节点可视化
节点图根据 nodeStates 实时更新样式:
<div
class="node-card"
:class="nodeState(node.id)" <!-- 'idle' | 'running' | 'done' | 'waiting' -->
>
<!-- 状态圆圈 -->
<div class="node-status-circle" :class="nodeState(node.id)">
<span v-if="nodeState(node.id) === 'done'" >✓</span>
<span v-else-if="nodeState(node.id) === 'running'" class="spinner-sm" />
<span v-else-if="nodeState(node.id) === 'waiting'" >⏸</span>
<span v-else>{{ idx + 1 }}</span>
</div>
</div>
节点之间的连接线,也根据上一个节点是否完成来改变颜色:
<div class="connector-line" :class="{ lit: nodeState(node.id) === 'done' }" />
/* 默认灰色 */
.connector-line { background: var(--color-border); }
/* 上一个节点完成后,变绿色 */
.connector-line.lit { background: var(--color-success); }
这样随着工作流执行,绿色"电流"会从上到下依次点亮,视觉上很直观。
4.6.4 接收 SSE 事件,更新节点状态
store 里处理每个 SSE 事件:
await fetchStream('/api/workflow/start/stream', payload, {
onEvent: (event, data) => {
if (event === 'node_start') {
// 把这个节点标记为执行中
nodeStates[data.nodeId] = 'running'
}
if (event === 'node_done') {
// 节点执行完,更新为完成
nodeStates[data.nodeId] = 'done'
// 如果有预览内容,一起存下来
if (data.preview) nodeOutputs[data.nodeId] = data.preview
}
if (event === 'paused') {
// 工作流暂停在审核节点
paused.value = true
running.value = false
nodeStates['human_review'] = 'waiting'
// 存下来供审核面板展示
intermediates.value = data.intermediates || []
}
if (event === 'completed') {
result.value = data.result
running.value = false
}
}
})
4.6.5 HumanReviewPanel:审核界面
审核面板做了三件事:
- 展示中间产物(各步骤的 AI 输出)
- 提供一个文本框让用户填写修改意见
- 两个按钮:确认继续(带反馈)、取消
<div class="review-panel">
<!-- 展示中间产物 -->
<div v-for="item in intermediates" :key="item.key" class="intermediate-item">
<div class="item-label">{{ item.label }}</div>
<div class="item-value">{{ item.value }}</div>
</div>
<!-- 修改意见 -->
<textarea v-model="feedback" placeholder="如有问题,在此填写修改意见..." />
<!-- 操作 -->
<button @click="emit('approve', feedback)">
{{ feedback.trim() ? '采纳意见并继续' : '确认并继续' }}
</button>
</div>
父组件(WorkflowView)接收 approve 事件:
async function resumeWithFeedback(feedback) {
await wfStore.resumeWorkflow(feedback)
}
store 里 resumeWorkflow:
async function resumeWorkflow(feedback) {
// 1. 请求恢复接口
await fetchStream('/api/workflow/resume/stream', { threadId, feedback }, {
onToken: (token) => {
// 最后一个节点流式输出
streamBuffer.value += token
},
onEvent: (event, data) => {
if (event === 'node_start') nodeStates[data.nodeId] = 'running'
if (event === 'node_done') nodeStates[data.nodeId] = 'done'
if (event === 'completed') result.value = data.result
}
})
}
4.7 四个工作流模板的设计思路
周报生成
为什么分三步而不是一步直接生成?
一步生成的问题:你给 AI 一堆工作要点,它直接生成周报,质量难控制。 它可能漏掉重要的成果,或者把一个小事写得很重要。
分步骤的好处:先提炼亮点(可以让人工确认),再识别风险(单独聚焦),最后整合生成。每一步的 prompt 更聚焦,质量更好。
// 第一步:专注提炼亮点
async function extractHighlights(state) {
return chatLLM(`
从工作要点中提炼亮点。
输出3-5条,每条一行,用"• "开头,不超过30字。
不要发散,只提炼原文中有的信息。
`, state.points)
}
// 第二步:专注风险
async function identifyRisks(state) {
return chatLLM(`
从工作内容中识别风险和阻塞项。
如果没有明显风险,输出"本周无明显风险项"。
只看负面信息,不要混入成果。
`, state.points)
}
每一步的 system prompt 都只关注一件事,比"帮我提炼亮点和风险,然后生成周报"这种一次性大 prompt 效果好得多。
会议纪要
顺序设计上的考虑:
提取参会人/议题(最简单,先做)
↓
提取结论(需要理解全文)
↓
整理 Action Items(最复杂,最后做)
↓
人工审核(确认结论和 action 是否正确)
↓
生成正式纪要
Action Items 放最后,是因为它是最容易出错的,AI 可能搞错负责人或截止时间,需要人工重点确认。
邮件润色
分析语气/目的(给后续步骤提供方向)
↓
检查问题(找出具体的措辞问题)
↓
人工审核(确认问题分析是否准确,避免把好的改坏了)
↓
生成润色版本(基于分析结果优化)
为什么要先分析意图再检查问题?
如果不知道这封邮件是写给客户还是上级,"语气是否合适"就没法判断。 先分析意图,后面的检查才有参考依据。
4.8 工作流 vs 直接 Prompt 工程
有同学会问:这四个场景,我直接用一个精心设计的 prompt 也能做,为什么要用工作流?
直接 prompt 的问题:
system: 你是周报助手
user: 帮我生成周报,以下是工作要点:[内容]
要求:先提炼亮点,再识别风险,最后生成完整周报
问题1:模型可能不按你期望的顺序走,直接输出最终结果跳过中间分析
问题2:没有办法在中途让用户干预
问题3:中间产物(亮点、风险)没法单独展示给用户
问题4:某一步出错了,需要整个重来
用工作流的好处:
✓ 每一步的 prompt 更精简聚焦,效果更好
✓ 中间产物可以单独展示,用户能看到"AI 提炼的亮点是什么"
✓ 人工干预点明确(就在 human_review 节点)
✓ 某一步出错,可以从那步重新来,不用整个重跑
✓ 可以给每一步设置不同的 temperature(分析步骤用 0,生成步骤用 0.7)
4.9 本章作业
✅****基础功能
- 四个模板卡片能正常选择和切换
- 选择周报模板,输入工作要点,能看到节点图依次变成绿色
- 执行到 human_review 时,看到黄色的"等待审核"状态和审核面板
- 审核面板展示了各步骤的中间产物(提炼的亮点、识别的风险)
- 点击"确认并继续",最终周报生成并展示
✅****进阶功能
- 在审核面板输入修改意见(如"亮点部分再加一条关于性能优化的"),最终结果体现了修改意见
- 节点图的绿色连接线随执行进度依次点亮
- 执行中的节点卡片显示蓝色边框 + 旋转动画
- 最终内容 Markdown 渲染正确(有标题、列表、加粗等)
✅****测试各模板
- 周报生成:粘贴一段本周工作内容
- 会议纪要:粘贴任意一段会议讨论内容(可以是假的)
- 邮件润色:把"老板我想请假"这五个字作为草稿,看看润色结果
- PRD 骨架:输入"做一个用户评论功能,支持点赞和回复"
4.10 常见问题
Q:工作流暂停后,刷新页面能恢复吗?
不行。我们现在用的是 MemorySaver,状态存在内存里,刷新页面服务端的 store 就没了。 生产环境要换成 PostgresSaver:
import { PostgresSaver } from '@langchain/langgraph-checkpoint-postgres'
const checkpointer = PostgresSaver.fromConnString(process.env.DATABASE_URL)
await checkpointer.setup() // 初始化表结构
这样状态持久化到数据库,刷新也不丢。
Q:节点输出的预览是乱码或者奇怪格式?
getPreview 函数从节点的 output 里取第一个字段的值:
const output = event.data?.output
const firstVal = Object.values(output)[0]
if (typeof firstVal === 'string') preview = firstVal.slice(0, 80)
如果节点返回的不是简单字符串(比如嵌套对象),需要针对具体工作流调整取值逻辑。
Q:工作流某一步 LLM 调用失败了怎么办?
节点函数里要加 try/catch:
async function extractHighlights(state) {
try {
const result = await callLLM(...)
return { highlights: result }
} catch (e) {
// 失败时给一个默认值,让工作流继续
return { highlights: '(亮点提取失败,请手动填写)' }
}
}
或者抛出错误让整个工作流停止,然后路由层的 catch 处理。
Q:同时有两个用户执行同一个工作流会冲突吗?
不会。每次启动都生成一个唯一的 threadId:
const threadId = `wf_${Date.now()}_${Math.random().toString(36).slice(2, 6)}`
const config = { configurable: { thread_id: threadId } }
LangGraph 用 thread_id 隔离不同用户的状态,互不干扰。