Module 1 Review

  • Built an agent capable of:
    • Act: Call specific tools.
    • Observe: Pass tool outputs back to the model.
    • Reason: Decide next steps based on tool output.
    • Persist State: Use in-memory checkpointer for long-running conversations with interruptions.

Schema

  • A state schema defines the structure and data types for the graph.
  • Supports Python types and validation.

TypedDict

  • Define keys and their types using TypedDict.
  • Example:
from typing_extensions import TypedDict
 
class TypedDictState(TypedDict):
    foo: str
    bar: str
  • Use Literal for specific value constraints:
from typing import Literal
 
class TypedDictState(TypedDict):
    name: str
    mood: Literal["happy", "sad"]

Dataclass

from dataclasses import dataclass
 
@dataclass
class DataclassState:
    name: str
    mood: Literal["happy", "sad"]
  • Access keys as attributes (state.name) instead of dictionary subscripting (state["name"]).

Pydantic

from pydantic import BaseModel, field_validator, ValidationError
 
class PydanticState(BaseModel):
    name: str
    mood: str
 
    @field_validator('mood')
    @classmethod
    def validate_mood(cls, value):
        if value not in ["happy", "sad"]:
            raise ValueError("Mood must be 'happy' or 'sad'")
        return value
 
try:
    state = PydanticState(name="John Doe", mood="mad")
except ValidationError as e:
    print("Validation Error:", e)

Reducers

  • Specify how state updates are performed (details).
  • Example: Append values using operator.add.
from operator import add
from typing import Annotated
 
class State(TypedDict):
    foo: Annotated[list[int], add]
 
def node_1(state):
    print("---Node 1---")
    return {"foo": [state['foo'][0] + 1]}

Custom Reducers

  • Combine lists safely, handling None inputs.
def reduce_list(left: list | None, right: list | None) -> list:
    return (left or []) + (right or [])
 
class CustomReducerState(TypedDict):
    foo: Annotated[list[int], reduce_list]

Messages

  • Use add_messages to append messages to the messages key (details).
from langgraph.graph import MessagesState
from langgraph.graph.message import add_messages
 
class CustomMessagesState(MessagesState):
    added_key_1: str
    added_key_2: str
  • Example: Overwrite messages with the same ID.
initial_messages = [AIMessage(content="Hello!", id="1")]
new_message = HumanMessage(content="Hi!", id="1")
add_messages(initial_messages, new_message)
  • Removal: Use RemoveMessage to delete specific messages.
from langchain_core.messages import RemoveMessage
 
messages = [AIMessage("Hi", id="1"), HumanMessage("Hello", id="2")]
delete_messages = [RemoveMessage(id=m.id) for m in messages[:-1]]

Multiple Schemas

Private State

  • Use separate schemas for intermediate processing.
class OverallState(TypedDict):
    foo: int
 
class PrivateState(TypedDict):
    baz: int
 
def node_1(state: OverallState) -> PrivateState:
    return {"baz": state['foo'] + 1}
 
def node_2(state: PrivateState) -> OverallState:
    return {"foo": state['baz'] + 1}

Input/Output Schemas

  • Define specific schemas for graph input and output.
class InputState(TypedDict):
    question: str
 
class OutputState(TypedDict):
    answer: str
 
class OverallState(TypedDict):
    question: str
    answer: str
    notes: str
 
def thinking_node(state: InputState):
    return {"answer": "bye", "notes": "... name is Lance"}
 
def answer_node(state: OverallState) -> OutputState:
    return {"answer": "bye Lance"}
 
graph = StateGraph(OverallState, input=InputState, output=OutputState)

Reducer for Managing Long-Running Conversations

  • Challenge: Long conversations increase token usage and latency.
  • Solution 1: Use RemoveMessage with add_messages reducer to filter messages.
from langchain_core.messages import RemoveMessage
 
def filter_messages(state: MessagesState):
    delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]]
    return {"messages": delete_messages}
 
def chat_model_node(state: MessagesState):
    return {"messages": [llm.invoke(state["messages"])]}
 
# Build graph
builder = StateGraph(MessagesState)
builder.add_node("filter", filter_messages)
builder.add_node("chat_model", chat_model_node)
builder.add_edge(START, "filter")
builder.add_edge("filter", "chat_model")
builder.add_edge("chat_model", END)
graph = builder.compile()

Trim Messages

  • Restrict message history to a fixed token limit using trim_messages.
from langchain_core.messages import trim_messages
 
def chat_model_node(state: MessagesState):
    messages = trim_messages(
        state["messages"],
        max_tokens=100,
        strategy="last",
        token_counter=ChatOpenAI(model="gpt-4o"),
        allow_partial=False,
    )
    return {"messages": [llm.invoke(messages)]}
 
# Build graph
builder = StateGraph(MessagesState)
builder.add_node("chat_model", chat_model_node)
builder.add_edge(START, "chat_model")
builder.add_edge("chat_model", END)
graph = builder.compile()

Chatbot with Message Summarization

  • Use LLMs to summarize conversations for memory efficiency.

Custom State Schema

from langgraph.graph import MessagesState
from langchain_core.messages import SystemMessage, HumanMessage, RemoveMessage
 
class State(MessagesState):
    summary: str

Model Invocation Logic

def call_model(state: State):
    summary = state.get("summary", "")
    messages = [SystemMessage(content=f"Summary: {summary}")] + state["messages"] if summary else state["messages"]
    response = model.invoke(messages)
    return {"messages": response}

Summarize Node

def summarize_conversation(state: State):
    summary = state.get("summary", "")
    summary_message = f"Extend this summary: {summary}" if summary else "Summarize the conversation."
    messages = state["messages"] + [HumanMessage(content=summary_message)]
    response = model.invoke(messages)
    delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]]
    return {"summary": response.content, "messages": delete_messages}

Conditional Edge

def should_continue(state: State):
    return "summarize_conversation" if len(state["messages"]) > 6 else END

Adding Memory

  • Use MemorySaver for state persistence across sessions.
from langgraph.checkpoint.memory import MemorySaver
 
workflow = StateGraph(State)
workflow.add_node("conversation", call_model)
workflow.add_node(summarize_conversation)
workflow.add_edge(START, "conversation")
workflow.add_conditional_edges("conversation", should_continue)
workflow.add_edge("summarize_conversation", END)
memory = MemorySaver()
graph = workflow.compile(checkpointer=memory)

Threads for Grouped State

  • Use thread_id to track conversations.
config = {"configurable": {"thread_id": "1"}}
input_message = HumanMessage(content="Hi, I'm Lance!")
output = graph.invoke({"messages": [input_message]}, config)

Chatbot with Persistent Memory (Sqlite)

from langgraph.checkpoint.sqlite import SqliteSaver
 
conn = sqlite3.connect("state_db/example.db", check_same_thread=False)
memory = SqliteSaver(conn)
 
workflow = StateGraph(State)
workflow.add_node("conversation", call_model)
workflow.add_node(summarize_conversation)
workflow.add_edge(START, "conversation")
workflow.add_conditional_edges("conversation", should_continue)
workflow.add_edge("summarize_conversation", END)
graph = workflow.compile(checkpointer=memory)

Persisting State

  • State persists even after restarting the notebook.
config = {"configurable": {"thread_id": "1"}}
graph_state = graph.get_state(config)
print(graph_state)

Key Features

  • Filtering: Reduce message history with RemoveMessage.
  • Trimming: Limit token usage with trim_messages.
  • Summarization: Create a running summary for long conversations.
  • Memory: Use MemorySaver or SqliteSaver for persistent state.