返回笔记首页

第四章:内容生成工作流

主题配置

源码

workmind-04-workflow.zip


4.1 本章目标

学完这章你能做到:

  • 用 LangGraph 构建有固定步骤的工作流(不是 Agent 那种自主决策的)
  • 实现 4 个实用模板:周报、会议纪要、邮件润色、PRD 骨架
  • 前端用节点流程图实时展示每个步骤的执行状态
  • 实现 Human-in-the-Loop:工作流在关键节点暂停,等人工确认再继续

本章对应项目功能:模块四 — 内容生成工作流


4.2 工作流 vs Agent,什么时候用哪个

上一章做了 Agent,这章做工作流,很多人会问:这俩有什么区别?

Agent 的特点

步骤不固定,由模型自己决定。你给它一个目标,它会选择调用哪些工具、调用几次、顺序怎么安排,全是运行时动态决定的。

javascript
用户:分析 Vue3 和 React,生成报告
Agent 自己决定:
  → 调 web_search("Vue3")
  → 调 web_search("React")
  → 调 write_report(...)
  完成

适合:任务模糊、步骤不确定、需要 AI 自主规划的场景。

工作流的特点

步骤完全固定,开发者提前设计好。每一步做什么、顺序是什么,代码里写死的,不会变。

javascript
周报生成工作流,步骤永远是这样:
  步骤1:提炼工作亮点(固定)
  步骤2:识别风险阻塞(固定)
  步骤3:等人工确认(固定)
  步骤4:生成周报(固定)

适合:步骤确定、质量要求高、需要人工介入的场景。

一句话总结
  • Agent:给目标,AI 自己规划怎么做
  • 工作流:开发者规划好,AI 按步骤执行
实际工作中的选择
场景 推荐方案
帮我查一下竞品,整理个报告 Agent(步骤不确定)
生成周报 工作流(步骤固定,有模板)
帮我做一些研究 Agent
把会议记录整理成纪要 工作流(固定格式)
我有个复杂任务,帮我完成 Agent
润色这封邮件 工作流(固定步骤:分析→检查→输出)

4.3 LangGraph 工作流基础

4.3.1 工作流的三个核心概念

状态(State)

工作流执行过程中的所有数据,放在一个对象里。 每个节点读入状态、修改后输出,下一个节点接着用。

javascript
const State = Annotation.Root({
  // 用户输入
  points: Annotation({ reducer: (_, n) => n, default: () => '' }),
  // 第一步的输出
  highlights: Annotation({ reducer: (_, n) => n, default: () => '' }),
  // 第二步的输出
  risks: Annotation({ reducer: (_, n) => n, default: () => '' }),
  // 最终结果
  report: Annotation({ reducer: (_, n) => n, default: () => '' }),
})

reducer: (_, n) => n 是什么意思?

每个字段需要定义一个 reducer 函数,决定当这个字段被更新时怎么合并。 (_, n) => n 的意思是:直接用新值覆盖旧值(_ 是旧值,n 是新值)。

如果是消息列表那种场景,可以写 (old, new) => [...old, ...new],把新内容追加到旧内容后面。

节点(Node)

每个节点就是一个普通的 async 函数,接收当前状态,返回要更新的字段。

javascript
async function extractHighlights(state) {
  // 读入状态(用用户输入的工作要点)
  const result = await callLLM(state.points)

  // 只返回需要更新的字段
  // 注意:不需要返回完整的 state,只返回有变化的字段
  return { highlights: result }
}
边(Edge)

决定节点之间的流转顺序。分两种:

javascript
// 普通边:A 完成后,一定执行 B
.addEdge('node_a', 'node_b')

// 条件边:A 完成后,根据状态决定走 B 还是 C
.addConditionalEdges('node_a', routeFunction, {
  'go_b': 'node_b',
  'go_c': 'node_c',
})

4.3.2 构建第一个工作流

用周报生成来举例,把上面三个概念串起来:

javascript
import { StateGraph, END, START, Annotation } from '@langchain/langgraph'
import { MemorySaver } from '@langchain/langgraph'

// 第一步:定义状态
const State = Annotation.Root({
  points:    Annotation({ reducer: (_, n) => n, default: () => '' }),
  highlights: Annotation({ reducer: (_, n) => n, default: () => '' }),
  risks:      Annotation({ reducer: (_, n) => n, default: () => '' }),
  report:     Annotation({ reducer: (_, n) => n, default: () => '' }),
  humanFeedback: Annotation({ reducer: (_, n) => n, default: () => '' }),
})

// 第二步:定义节点函数
async function extractHighlights(state) {
  const result = await callLLM(
    '从工作要点中提炼3-5条亮点',
    state.points
  )
  return { highlights: result }
}

async function identifyRisks(state) {
  const result = await callLLM(
    '识别风险和阻塞项',
    state.points
  )
  return { risks: result }
}

// 这个函数什么都不做,是一个"占位符"
// 实际暂停行为由 interruptBefore 配置控制
async function humanReview(state) {
  return {}
}

async function generateReport(state) {
  const result = await callLLM(
    `生成周报,用户反馈:${state.humanFeedback}`,
    `亮点:${state.highlights}\n风险:${state.risks}`
  )
  return { report: result }
}

// 第三步:构建图
const checkpointer = new MemorySaver()  // 存档点,用于暂停/恢复

const graph = new StateGraph(State)
  .addNode('extract_highlights', extractHighlights)
  .addNode('identify_risks',     identifyRisks)
  .addNode('human_review',       humanReview)
  .addNode('generate_report',    generateReport)
  .addEdge(START,                'extract_highlights')
  .addEdge('extract_highlights', 'identify_risks')
  .addEdge('identify_risks',     'human_review')
  .addEdge('human_review',       'generate_report')
  .addEdge('generate_report',     END)
  .compile({
    checkpointer,
    interruptBefore: ['human_review'],   // ← 关键!在这个节点前暂停
  })

4.4 Human-in-the-Loop(人机协作)

这是本章最有价值的技术点。

4.4.1 为什么需要人工干预

AI 生成的内容不一定符合你的期望,尤其是:

  • 周报里提炼的"亮点"不是你想强调的
  • 会议纪要里漏掉了某个重要结论
  • 邮件润色后语气变得太正式了

Human-in-the-Loop 的思路:先让 AI 把前几步做完,把中间结果展示给你看,你确认没问题(或者提出修改意见),然后 AI 再继续生成最终结果。

4.4.2 暂停机制:interruptBefore

javascript
graph.compile({
  checkpointer,                         // 必须有存档点
  interruptBefore: ['human_review'],    // 在执行 human_review 节点之前暂停
})

执行工作流时,到 human_review 节点前,streamEvents 会自动停止输出,整个图就暂停了。状态被保存在 checkpointer 里。

javascript
// 启动工作流(会自动在 human_review 前暂停)
for await (const event of graph.streamEvents(input, { ...config, version: 'v2' })) {
  // 处理事件...
}
// 到这里,图已经暂停了,不是真的完成了

// 检查当前状态
const state = await graph.getState(config)
console.log(state.next)    // ['human_review'] ← 说明还没执行 human_review,暂停了
console.log(state.values)  // 包含已执行节点的输出(highlights、risks 等)

4.4.3 恢复:updateState + 传 null 继续

人工确认后,可以往状态里注入反馈,然后从暂停处继续:

javascript
// 1. 注入人工反馈(可选)
if (feedback) {
  await graph.updateState(config, { humanFeedback: feedback })
}

// 2. 传 null 给 streamEvents,从暂停处继续执行
for await (const event of graph.streamEvents(null, { ...config, version: 'v2' })) {
  // 继续处理后续节点的事件...
}

注意:config 里必须包含同样的 thread_id,LangGraph 靠这个找到对应的存档。

4.4.4 完整交互时序

javascript
用户填写输入内容
  ↓
[前端] POST /api/workflow/start/stream
  ↓
[服务端] 执行 extractHighlights → identifyRisks
  ↓
[服务端] 到 human_review 前自动暂停
  ↓
SSE 推送 → event: paused,附带 threadId + 中间产物
  ↓
[前端] 展示中间产物,显示 HumanReviewPanel
  ↓
用户查看亮点/风险,可选填写"这里写得不对,帮我换一下"
  ↓
[前端] POST /api/workflow/resume/stream(带 threadId 和 feedback)
  ↓
[服务端] 注入 humanFeedback 到状态,继续执行 generateReport
  ↓
SSE 流式推送最终周报内容
  ↓
[前端] event: completed,展示结果

4.5 服务端实现要点

4.5.1 工作流实例的生命周期管理

javascript
// 用 Map 存活跃的工作流实例
const activeWorkflows = new Map()  // threadId → { graph, meta, config }

// 启动时存入
const graph = builder()
activeWorkflows.set(threadId, { graph, meta, config })

// 恢复时取出
const wf = activeWorkflows.get(threadId)
if (!wf) {
  return res.status(404).json({ error: '工作流不存在或已过期,请重新启动' })
}

// 完成后清理
activeWorkflows.delete(threadId)

注意:MemorySaver 把状态存在内存里,服务重启后状态丢失。 生产环境要换成 PostgresSaverRedisSaver,状态持久化。

4.5.2 节点状态推送

streamEvents 的事件类型很多,我们只关心节点开始/结束:

javascript
for await (const event of graph.streamEvents(input, { ...config, version: 'v2' })) {
  const { event: eventType, name } = event

  // 节点开始执行(过滤掉 LangGraph 内部节点)
  if (eventType === 'on_chain_start') {
    const nodeInMeta = meta.nodes.find(n => n.id === name)
    if (nodeInMeta) {
      send('node_start', { nodeId: name, label: nodeInMeta.label })
    }
  }

  // 节点执行完毕
  if (eventType === 'on_chain_end') {
    const nodeInMeta = meta.nodes.find(n => n.id === name)
    if (nodeInMeta) {
      // 提取节点输出的预览文本(给前端在节点卡片上显示)
      const output = event.data?.output
      const preview = getPreview(output)
      send('node_done', { nodeId: name, preview })
    }
  }
}

为什么需要 **meta.nodes.find(n => n.id === name)** 来过滤?

on_chain_start 事件不只在业务节点上触发,LangGraph 内部也有很多节点(__start__LangGraph 等)。我们只关心自己定义的业务节点,所以用 meta 里的节点列表来过滤。

4.5.3 暂停检测

工作流暂停后,如何知道是暂停还是真的完成了?

javascript
// 遍历完 streamEvents 之后检查状态
const state = await graph.getState(config)

if (state.next?.length > 0) {
  // state.next 不为空 → 还有待执行的节点 → 说明是暂停的
  // state.next[0] 就是下一个要执行的节点名
  send('paused', {
    threadId,
    nextNode: state.next[0],
    intermediates: getIntermediates(state.values, workflowId),
  })
} else {
  // state.next 为空 → 所有节点都执行完了 → 真正完成
  send('completed', { threadId, result: state.values[meta.resultKey] })
}

4.6 前端实现要点

4.6.1 工作流 store 的状态机设计

一个工作流的执行过程有几种状态,前端要根据这些状态决定显示什么:

javascript
idle(初始)
  ↓ 用户点击开始
running(执行中)
  ↓ 到达 human_review
paused(等待人工审核)
  ↓ 用户点击继续
running(继续执行最后节点)
  ↓ 完成
completed(展示结果)

在 store 里用几个 ref 组合表达这个状态机:

javascript
const running       = ref(false)  // 是否在执行
const paused        = ref(false)  // 是否暂停在审核
const result        = ref('')     // 最终结果
const streamBuffer  = ref('')     // 流式输出的缓冲

各状态对应的 ref 值:

阶段 running paused result streamBuffer
输入阶段 false false '' ''
执行中 true false '' ''
等待审核 false true '' ''
执行最后节点 true false '' 有内容
完成 false false 有内容 ''

4.6.2 节点状态的 reactive 对象

javascript
// nodeStates 存每个节点的状态
const nodeStates = reactive({})
// { 'extract_highlights': 'done', 'identify_risks': 'running', ... }
// 可能的值:'idle' | 'running' | 'done' | 'waiting'

// nodeOutputs 存每个节点的输出预览
const nodeOutputs = reactive({})
// { 'extract_highlights': '• 完成了需求评审...\n• 发布了新版本...' }

为什么用 reactive 而不是 ref

因为 nodeStates 是一个对象,我们会频繁修改它的属性(nodeStates['extract_highlights'] = 'done')。用 reactive 时,直接修改属性,Vue 就能追踪到变化。

如果用 ref,修改内部属性需要 nodeStates.value['extract_highlights'] = 'done',在模板里访问也要 .value,更麻烦。

4.6.3 WorkflowGraph 节点可视化

节点图根据 nodeStates 实时更新样式:

vue
<div
  class="node-card"
  :class="nodeState(node.id)"  <!-- 'idle' | 'running' | 'done' | 'waiting' -->
>
  <!-- 状态圆圈 -->
  <div class="node-status-circle" :class="nodeState(node.id)">
    <span v-if="nodeState(node.id) === 'done'"    >✓</span>
    <span v-else-if="nodeState(node.id) === 'running'" class="spinner-sm" />
    <span v-else-if="nodeState(node.id) === 'waiting'" >⏸</span>
    <span v-else>{{ idx + 1 }}</span>
  </div>
</div>

节点之间的连接线,也根据上一个节点是否完成来改变颜色:

vue
<div class="connector-line" :class="{ lit: nodeState(node.id) === 'done' }" />
css
/* 默认灰色 */
.connector-line { background: var(--color-border); }
/* 上一个节点完成后,变绿色 */
.connector-line.lit { background: var(--color-success); }

这样随着工作流执行,绿色"电流"会从上到下依次点亮,视觉上很直观。

4.6.4 接收 SSE 事件,更新节点状态

store 里处理每个 SSE 事件:

javascript
await fetchStream('/api/workflow/start/stream', payload, {
  onEvent: (event, data) => {
    if (event === 'node_start') {
      // 把这个节点标记为执行中
      nodeStates[data.nodeId] = 'running'
    }

    if (event === 'node_done') {
      // 节点执行完,更新为完成
      nodeStates[data.nodeId] = 'done'
      // 如果有预览内容,一起存下来
      if (data.preview) nodeOutputs[data.nodeId] = data.preview
    }

    if (event === 'paused') {
      // 工作流暂停在审核节点
      paused.value = true
      running.value = false
      nodeStates['human_review'] = 'waiting'
      // 存下来供审核面板展示
      intermediates.value = data.intermediates || []
    }

    if (event === 'completed') {
      result.value = data.result
      running.value = false
    }
  }
})

4.6.5 HumanReviewPanel:审核界面

审核面板做了三件事:

  1. 展示中间产物(各步骤的 AI 输出)
  2. 提供一个文本框让用户填写修改意见
  3. 两个按钮:确认继续(带反馈)、取消
vue
<div class="review-panel">
  <!-- 展示中间产物 -->
  <div v-for="item in intermediates" :key="item.key" class="intermediate-item">
    <div class="item-label">{{ item.label }}</div>
    <div class="item-value">{{ item.value }}</div>
  </div>

  <!-- 修改意见 -->
  <textarea v-model="feedback" placeholder="如有问题,在此填写修改意见..." />

  <!-- 操作 -->
  <button @click="emit('approve', feedback)">
    {{ feedback.trim() ? '采纳意见并继续' : '确认并继续' }}
  </button>
</div>

父组件(WorkflowView)接收 approve 事件:

javascript
async function resumeWithFeedback(feedback) {
  await wfStore.resumeWorkflow(feedback)
}

store 里 resumeWorkflow:

javascript
async function resumeWorkflow(feedback) {
  // 1. 请求恢复接口
  await fetchStream('/api/workflow/resume/stream', { threadId, feedback }, {
    onToken: (token) => {
      // 最后一个节点流式输出
      streamBuffer.value += token
    },
    onEvent: (event, data) => {
      if (event === 'node_start') nodeStates[data.nodeId] = 'running'
      if (event === 'node_done')  nodeStates[data.nodeId] = 'done'
      if (event === 'completed')  result.value = data.result
    }
  })
}

4.7 四个工作流模板的设计思路

周报生成

为什么分三步而不是一步直接生成?

一步生成的问题:你给 AI 一堆工作要点,它直接生成周报,质量难控制。 它可能漏掉重要的成果,或者把一个小事写得很重要。

分步骤的好处:先提炼亮点(可以让人工确认),再识别风险(单独聚焦),最后整合生成。每一步的 prompt 更聚焦,质量更好。

javascript
// 第一步:专注提炼亮点
async function extractHighlights(state) {
  return chatLLM(`
    从工作要点中提炼亮点。
    输出3-5条,每条一行,用"• "开头,不超过30字。
    不要发散,只提炼原文中有的信息。
  `, state.points)
}

// 第二步:专注风险
async function identifyRisks(state) {
  return chatLLM(`
    从工作内容中识别风险和阻塞项。
    如果没有明显风险,输出"本周无明显风险项"。
    只看负面信息,不要混入成果。
  `, state.points)
}

每一步的 system prompt 都只关注一件事,比"帮我提炼亮点和风险,然后生成周报"这种一次性大 prompt 效果好得多。

会议纪要

顺序设计上的考虑:

javascript
提取参会人/议题(最简单,先做)
  ↓
提取结论(需要理解全文)
  ↓
整理 Action Items(最复杂,最后做)
  ↓
人工审核(确认结论和 action 是否正确)
  ↓
生成正式纪要

Action Items 放最后,是因为它是最容易出错的,AI 可能搞错负责人或截止时间,需要人工重点确认。

邮件润色

javascript
分析语气/目的(给后续步骤提供方向)
  ↓
检查问题(找出具体的措辞问题)
  ↓
人工审核(确认问题分析是否准确,避免把好的改坏了)
  ↓
生成润色版本(基于分析结果优化)

为什么要先分析意图再检查问题?

如果不知道这封邮件是写给客户还是上级,"语气是否合适"就没法判断。 先分析意图,后面的检查才有参考依据。


4.8 工作流 vs 直接 Prompt 工程

有同学会问:这四个场景,我直接用一个精心设计的 prompt 也能做,为什么要用工作流?

直接 prompt 的问题:

javascript
system: 你是周报助手
user: 帮我生成周报,以下是工作要点:[内容]
      要求:先提炼亮点,再识别风险,最后生成完整周报

问题1:模型可能不按你期望的顺序走,直接输出最终结果跳过中间分析
问题2:没有办法在中途让用户干预
问题3:中间产物(亮点、风险)没法单独展示给用户
问题4:某一步出错了,需要整个重来

用工作流的好处:

javascript
✓ 每一步的 prompt 更精简聚焦,效果更好
✓ 中间产物可以单独展示,用户能看到"AI 提炼的亮点是什么"
✓ 人工干预点明确(就在 human_review 节点)
✓ 某一步出错,可以从那步重新来,不用整个重跑
✓ 可以给每一步设置不同的 temperature(分析步骤用 0,生成步骤用 0.7)

4.9 本章作业

✅****基础功能

  • 四个模板卡片能正常选择和切换
  • 选择周报模板,输入工作要点,能看到节点图依次变成绿色
  • 执行到 human_review 时,看到黄色的"等待审核"状态和审核面板
  • 审核面板展示了各步骤的中间产物(提炼的亮点、识别的风险)
  • 点击"确认并继续",最终周报生成并展示

✅****进阶功能

  • 在审核面板输入修改意见(如"亮点部分再加一条关于性能优化的"),最终结果体现了修改意见
  • 节点图的绿色连接线随执行进度依次点亮
  • 执行中的节点卡片显示蓝色边框 + 旋转动画
  • 最终内容 Markdown 渲染正确(有标题、列表、加粗等)
✅****测试各模板
  1. 周报生成:粘贴一段本周工作内容
  2. 会议纪要:粘贴任意一段会议讨论内容(可以是假的)
  3. 邮件润色:把"老板我想请假"这五个字作为草稿,看看润色结果
  4. PRD 骨架:输入"做一个用户评论功能,支持点赞和回复"

4.10 常见问题

Q:工作流暂停后,刷新页面能恢复吗?

不行。我们现在用的是 MemorySaver,状态存在内存里,刷新页面服务端的 store 就没了。 生产环境要换成 PostgresSaver

javascript
import { PostgresSaver } from '@langchain/langgraph-checkpoint-postgres'

const checkpointer = PostgresSaver.fromConnString(process.env.DATABASE_URL)
await checkpointer.setup()   // 初始化表结构

这样状态持久化到数据库,刷新也不丢。

Q:节点输出的预览是乱码或者奇怪格式?

getPreview 函数从节点的 output 里取第一个字段的值:

javascript
const output = event.data?.output
const firstVal = Object.values(output)[0]
if (typeof firstVal === 'string') preview = firstVal.slice(0, 80)

如果节点返回的不是简单字符串(比如嵌套对象),需要针对具体工作流调整取值逻辑。

Q:工作流某一步 LLM 调用失败了怎么办?

节点函数里要加 try/catch:

javascript
async function extractHighlights(state) {
  try {
    const result = await callLLM(...)
    return { highlights: result }
  } catch (e) {
    // 失败时给一个默认值,让工作流继续
    return { highlights: '(亮点提取失败,请手动填写)' }
  }
}

或者抛出错误让整个工作流停止,然后路由层的 catch 处理。

Q:同时有两个用户执行同一个工作流会冲突吗?

不会。每次启动都生成一个唯一的 threadId

javascript
const threadId = `wf_${Date.now()}_${Math.random().toString(36).slice(2, 6)}`
const config   = { configurable: { thread_id: threadId } }

LangGraph 用 thread_id 隔离不同用户的状态,互不干扰。