from typing import TypedDict, Annotated import operator import time from langgraph.graph import StateGraph, END class GraphState(TypedDict): messages: Annotated[list, operator.add] step_count: int result: str def step_1(state: GraphState) -> GraphState: time.sleep(0.3) return { "messages": ["步骤1: 开始数据准备"], "step_count": state.get("step_count", 0) + 1, "result": "数据准备完成", } def step_2(state: GraphState) -> GraphState: time.sleep(0.4) return { "messages": ["步骤2: 正在处理数据"], "step_count": state.get("step_count", 0) + 1, "result": "数据处理完成,准备分析", } def step_3(state: GraphState) -> GraphState: time.sleep(0.5) total_messages = len(state.get("messages", [])) return { "messages": ["步骤3: 分析完成,生成最终结果"], "step_count": state.get("step_count", 0) + 1, "result": f"分析完成!共 {total_messages + 1} 条消息,步骤 {state.get('step_count', 0) + 1}", } def create_workflow(): wf = StateGraph(GraphState) wf.add_node("step_1", step_1) wf.add_node("step_2", step_2) wf.add_node("step_3", step_3) wf.set_entry_point("step_1") wf.add_edge("step_1", "step_2") wf.add_edge("step_2", "step_3") wf.add_edge("step_3", END) return wf.compile() def main(): print("=" * 60) print("🔄 LangGraph stream_mode='updates' 示例(仅返回增量)") print("=" * 60) app = create_workflow() initial = {"messages": [], "step_count": 0, "result": ""} # 自行维护完整状态 current_state = dict(initial) for chunk in app.stream(initial, stream_mode="updates"): for node_name, updates in chunk.items(): print(f"📦 {node_name} 更新: {updates}") current_state.update(updates) print("\n✅ 最终合并状态:") print(current_state) if __name__ == "__main__": main()