From 19487cb117a106110c5f7d6d080c6b28ba5b5f5a Mon Sep 17 00:00:00 2001 From: lingyuzeng Date: Sat, 30 Aug 2025 23:28:22 +0800 Subject: [PATCH] add langgraph use --- examples/stream_modes/debug_demo.py | 53 +++++++++++++++++ examples/stream_modes/graph_visualize.py | 67 +++++++++++++++++++++ examples/stream_modes/messages_demo.py | 43 ++++++++++++++ examples/stream_modes/updates_demo.py | 75 ++++++++++++++++++++++++ examples/stream_modes/values_demo.py | 74 +++++++++++++++++++++++ 5 files changed, 312 insertions(+) create mode 100644 examples/stream_modes/debug_demo.py create mode 100644 examples/stream_modes/graph_visualize.py create mode 100644 examples/stream_modes/messages_demo.py create mode 100644 examples/stream_modes/updates_demo.py create mode 100644 examples/stream_modes/values_demo.py diff --git a/examples/stream_modes/debug_demo.py b/examples/stream_modes/debug_demo.py new file mode 100644 index 0000000..9819723 --- /dev/null +++ b/examples/stream_modes/debug_demo.py @@ -0,0 +1,53 @@ +from typing import TypedDict, Annotated +import operator +import time + +from langgraph.graph import StateGraph, END + + +class GraphState(TypedDict): + messages: Annotated[list, operator.add] + step_count: int + result: str + + +def step_1(state: GraphState) -> GraphState: + time.sleep(0.2) + return {"messages": ["S1"], "step_count": state.get("step_count", 0) + 1, "result": "r1"} + + +def step_2(state: GraphState) -> GraphState: + time.sleep(0.2) + return {"messages": ["S2"], "step_count": state.get("step_count", 0) + 1, "result": "r2"} + + +def step_3(state: GraphState) -> GraphState: + time.sleep(0.2) + return {"messages": ["S3"], "step_count": state.get("step_count", 0) + 1, "result": "done"} + + +def create_workflow(): + wf = StateGraph(GraphState) + wf.add_node("step_1", step_1) + wf.add_node("step_2", step_2) + wf.add_node("step_3", step_3) + wf.set_entry_point("step_1") + wf.add_edge("step_1", "step_2") + wf.add_edge("step_2", "step_3") + wf.add_edge("step_3", END) + return wf.compile() + + +def main(): + print("=" * 60) + print("🛠️ LangGraph stream_mode='debug' 示例(开发定位用)") + print("=" * 60) + app = create_workflow() + initial = {"messages": [], "step_count": 0, "result": ""} + for info in app.stream(initial, stream_mode="debug"): + print(info) + + +if __name__ == "__main__": + main() + diff --git a/examples/stream_modes/graph_visualize.py b/examples/stream_modes/graph_visualize.py new file mode 100644 index 0000000..d0a2873 --- /dev/null +++ b/examples/stream_modes/graph_visualize.py @@ -0,0 +1,67 @@ +""" +Demonstrate compiling a LangGraph and visualizing its structure. +Requires no external viz packages for ASCII. To export PNG, install: + uv pip install -e '.[viz]' +and ensure Graphviz is present on your system. +""" + +from typing import TypedDict, Annotated +import operator + +from langgraph.graph import StateGraph, START, END + + +class S(TypedDict): + a: int + b: int + c: int + + +def n1(s: S) -> S: + return {"a": s.get("a", 0) + 1, "b": s.get("b", 0), "c": s.get("c", 0)} + + +def n2(s: S) -> S: + return {"a": s.get("a", 0), "b": s.get("b", 0) + 2, "c": s.get("c", 0)} + + +def n3(s: S) -> S: + return {"a": s.get("a", 0), "b": s.get("b", 0), "c": s.get("c", 0) + 3} + + +def build_app(): + g = StateGraph(S) + g.add_node("n1", n1) + g.add_node("n2", n2) + g.add_node("n3", n3) + g.add_edge(START, "n1") + g.add_edge("n1", "n2") + g.add_edge("n2", "n3") + g.add_edge("n3", END) + return g.compile() + + +def main(): + app = build_app() + graph = app.get_graph() + + # Try ASCII render if available + draw_ascii = getattr(graph, "draw_ascii", None) + if callable(draw_ascii): + print("=== Graph ASCII ===") + print(draw_ascii()) + + # Try PNG export if available and pydot/graphviz installed + draw_png = getattr(graph, "draw_png", None) + if callable(draw_png): + try: + out = "graph_demo.png" + draw_png(out) + print(f"Saved PNG: {out}") + except Exception as e: + print(f"PNG export unavailable: {e}") + + +if __name__ == "__main__": + main() + diff --git a/examples/stream_modes/messages_demo.py b/examples/stream_modes/messages_demo.py new file mode 100644 index 0000000..385d6a5 --- /dev/null +++ b/examples/stream_modes/messages_demo.py @@ -0,0 +1,43 @@ +from typing import TypedDict + +from langgraph.graph import StateGraph, START +from langchain_core.messages import HumanMessage + +from langgraph_qwen import ChatQwenOpenAICompat + + +class SimpleState(TypedDict): + topic: str + joke: str + + +model = ChatQwenOpenAICompat(temperature=0) + + +def call_model(state: SimpleState): + llm_response = model.invoke([HumanMessage(content=f"Generate a joke about {state['topic']}")]) + return {"joke": llm_response.content} + + +graph = ( + StateGraph(SimpleState) + .add_node("call_model", call_model) + .add_edge(START, "call_model") + .compile() +) + + +def main(): + print("=" * 60) + print("💬 LangGraph stream_mode='messages' 示例(捕获 LLM tokens)") + print("=" * 60) + for msg, meta in graph.stream({"topic": "cats"}, stream_mode="messages"): + if hasattr(msg, "content") and msg.content: + node = meta.get("langgraph_node", "unknown") + print(f"[{node}] {msg.content}", end="", flush=True) + print("\n\n✅ 完成!") + + +if __name__ == "__main__": + main() + diff --git a/examples/stream_modes/updates_demo.py b/examples/stream_modes/updates_demo.py new file mode 100644 index 0000000..6a21588 --- /dev/null +++ b/examples/stream_modes/updates_demo.py @@ -0,0 +1,75 @@ +from typing import TypedDict, Annotated +import operator +import time + +from langgraph.graph import StateGraph, END + + +class GraphState(TypedDict): + messages: Annotated[list, operator.add] + step_count: int + result: str + + +def step_1(state: GraphState) -> GraphState: + time.sleep(0.3) + return { + "messages": ["步骤1: 开始数据准备"], + "step_count": state.get("step_count", 0) + 1, + "result": "数据准备完成", + } + + +def step_2(state: GraphState) -> GraphState: + time.sleep(0.4) + return { + "messages": ["步骤2: 正在处理数据"], + "step_count": state.get("step_count", 0) + 1, + "result": "数据处理完成,准备分析", + } + + +def step_3(state: GraphState) -> GraphState: + time.sleep(0.5) + total_messages = len(state.get("messages", [])) + return { + "messages": ["步骤3: 分析完成,生成最终结果"], + "step_count": state.get("step_count", 0) + 1, + "result": f"分析完成!共 {total_messages + 1} 条消息,步骤 {state.get('step_count', 0) + 1}", + } + + +def create_workflow(): + wf = StateGraph(GraphState) + wf.add_node("step_1", step_1) + wf.add_node("step_2", step_2) + wf.add_node("step_3", step_3) + wf.set_entry_point("step_1") + wf.add_edge("step_1", "step_2") + wf.add_edge("step_2", "step_3") + wf.add_edge("step_3", END) + return wf.compile() + + +def main(): + print("=" * 60) + print("🔄 LangGraph stream_mode='updates' 示例(仅返回增量)") + print("=" * 60) + + app = create_workflow() + initial = {"messages": [], "step_count": 0, "result": ""} + + # 自行维护完整状态 + current_state = dict(initial) + for chunk in app.stream(initial, stream_mode="updates"): + for node_name, updates in chunk.items(): + print(f"📦 {node_name} 更新: {updates}") + current_state.update(updates) + + print("\n✅ 最终合并状态:") + print(current_state) + + +if __name__ == "__main__": + main() + diff --git a/examples/stream_modes/values_demo.py b/examples/stream_modes/values_demo.py new file mode 100644 index 0000000..a0a6b27 --- /dev/null +++ b/examples/stream_modes/values_demo.py @@ -0,0 +1,74 @@ +from typing import TypedDict, Annotated +import operator +import time + +from langgraph.graph import StateGraph, END + + +class GraphState(TypedDict): + messages: Annotated[list, operator.add] + step_count: int + result: str + + +def step_1(state: GraphState) -> GraphState: + print("🔄 执行步骤1: 数据准备阶段") + time.sleep(0.5) + return { + "messages": ["步骤1: 开始数据准备"], + "step_count": state.get("step_count", 0) + 1, + "result": "数据准备完成", + } + + +def step_2(state: GraphState) -> GraphState: + print("🔄 执行步骤2: 数据处理阶段") + time.sleep(0.7) + return { + "messages": ["步骤2: 正在处理数据"], + "step_count": state.get("step_count", 0) + 1, + "result": "数据处理完成,准备分析", + } + + +def step_3(state: GraphState) -> GraphState: + print("🔄 执行步骤3: 数据分析阶段") + time.sleep(0.9) + total_messages = len(state.get("messages", [])) + return { + "messages": ["步骤3: 分析完成,生成最终结果"], + "step_count": state.get("step_count", 0) + 1, + "result": f"分析完成!总共处理了 {total_messages + 1} 条消息,执行了 {state.get('step_count', 0) + 1} 个步骤", + } + + +def create_workflow(): + wf = StateGraph(GraphState) + wf.add_node("step_1", step_1) + wf.add_node("step_2", step_2) + wf.add_node("step_3", step_3) + wf.set_entry_point("step_1") + wf.add_edge("step_1", "step_2") + wf.add_edge("step_2", "step_3") + wf.add_edge("step_3", END) + return wf.compile() + + +def main(): + print("=" * 60) + print("🚀 LangGraph stream_mode='values' 示例(返回完整状态)") + print("=" * 60) + app = create_workflow() + initial = {"messages": [], "step_count": 0, "result": ""} + for i, state in enumerate(app.stream(initial, stream_mode="values")): + print(f"\n🔍 步骤 {i} 完成后的状态:") + print(f" 📝 messages: {state.get('messages', [])}") + print(f" 🔢 step_count: {state.get('step_count', 0)}") + print(f" ✨ result: {state.get('result', '')}") + + print("\n✅ 完成!") + + +if __name__ == "__main__": + main() +