54 lines
1.3 KiB
Python
54 lines
1.3 KiB
Python
from typing import TypedDict, Annotated
|
|
import operator
|
|
import time
|
|
|
|
from langgraph.graph import StateGraph, END
|
|
|
|
|
|
class GraphState(TypedDict):
|
|
messages: Annotated[list, operator.add]
|
|
step_count: int
|
|
result: str
|
|
|
|
|
|
def step_1(state: GraphState) -> GraphState:
|
|
time.sleep(0.2)
|
|
return {"messages": ["S1"], "step_count": state.get("step_count", 0) + 1, "result": "r1"}
|
|
|
|
|
|
def step_2(state: GraphState) -> GraphState:
|
|
time.sleep(0.2)
|
|
return {"messages": ["S2"], "step_count": state.get("step_count", 0) + 1, "result": "r2"}
|
|
|
|
|
|
def step_3(state: GraphState) -> GraphState:
|
|
time.sleep(0.2)
|
|
return {"messages": ["S3"], "step_count": state.get("step_count", 0) + 1, "result": "done"}
|
|
|
|
|
|
def create_workflow():
|
|
wf = StateGraph(GraphState)
|
|
wf.add_node("step_1", step_1)
|
|
wf.add_node("step_2", step_2)
|
|
wf.add_node("step_3", step_3)
|
|
wf.set_entry_point("step_1")
|
|
wf.add_edge("step_1", "step_2")
|
|
wf.add_edge("step_2", "step_3")
|
|
wf.add_edge("step_3", END)
|
|
return wf.compile()
|
|
|
|
|
|
def main():
|
|
print("=" * 60)
|
|
print("🛠️ LangGraph stream_mode='debug' 示例(开发定位用)")
|
|
print("=" * 60)
|
|
app = create_workflow()
|
|
initial = {"messages": [], "step_count": 0, "result": ""}
|
|
for info in app.stream(initial, stream_mode="debug"):
|
|
print(info)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|