Files
agent/examples/stream_modes/values_demo.py
2025-08-30 23:28:22 +08:00

75 lines
2.1 KiB
Python

from typing import TypedDict, Annotated
import operator
import time
from langgraph.graph import StateGraph, END
class GraphState(TypedDict):
messages: Annotated[list, operator.add]
step_count: int
result: str
def step_1(state: GraphState) -> GraphState:
print("🔄 执行步骤1: 数据准备阶段")
time.sleep(0.5)
return {
"messages": ["步骤1: 开始数据准备"],
"step_count": state.get("step_count", 0) + 1,
"result": "数据准备完成",
}
def step_2(state: GraphState) -> GraphState:
print("🔄 执行步骤2: 数据处理阶段")
time.sleep(0.7)
return {
"messages": ["步骤2: 正在处理数据"],
"step_count": state.get("step_count", 0) + 1,
"result": "数据处理完成,准备分析",
}
def step_3(state: GraphState) -> GraphState:
print("🔄 执行步骤3: 数据分析阶段")
time.sleep(0.9)
total_messages = len(state.get("messages", []))
return {
"messages": ["步骤3: 分析完成,生成最终结果"],
"step_count": state.get("step_count", 0) + 1,
"result": f"分析完成!总共处理了 {total_messages + 1} 条消息,执行了 {state.get('step_count', 0) + 1} 个步骤",
}
def create_workflow():
wf = StateGraph(GraphState)
wf.add_node("step_1", step_1)
wf.add_node("step_2", step_2)
wf.add_node("step_3", step_3)
wf.set_entry_point("step_1")
wf.add_edge("step_1", "step_2")
wf.add_edge("step_2", "step_3")
wf.add_edge("step_3", END)
return wf.compile()
def main():
print("=" * 60)
print("🚀 LangGraph stream_mode='values' 示例(返回完整状态)")
print("=" * 60)
app = create_workflow()
initial = {"messages": [], "step_count": 0, "result": ""}
for i, state in enumerate(app.stream(initial, stream_mode="values")):
print(f"\n🔍 步骤 {i} 完成后的状态:")
print(f" 📝 messages: {state.get('messages', [])}")
print(f" 🔢 step_count: {state.get('step_count', 0)}")
print(f" ✨ result: {state.get('result', '')}")
print("\n✅ 完成!")
if __name__ == "__main__":
main()