Module 1 Review
Built an agent capable of:
Act : Call specific tools.
Observe : Pass tool outputs back to the model.
Reason : Decide next steps based on tool output.
Persist State : Use in-memory checkpointer for long-running conversations with interruptions.
Schema
A state schema defines the structure and data types for the graph.
Supports Python types and validation.
TypedDict
Define keys and their types using TypedDict
.
Example:
from typing_extensions import TypedDict
class TypedDictState ( TypedDict ):
foo: str
bar: str
Use Literal
for specific value constraints:
from typing import Literal
class TypedDictState ( TypedDict ):
name: str
mood: Literal[ "happy" , "sad" ]
Dataclass
from dataclasses import dataclass
@dataclass
class DataclassState :
name: str
mood: Literal[ "happy" , "sad" ]
Access keys as attributes (state.name
) instead of dictionary subscripting (state["name"]
).
Pydantic
from pydantic import BaseModel, field_validator, ValidationError
class PydanticState ( BaseModel ):
name: str
mood: str
@field_validator ( 'mood' )
@ classmethod
def validate_mood (cls, value):
if value not in [ "happy" , "sad" ]:
raise ValueError ( "Mood must be 'happy' or 'sad'" )
return value
try :
state = PydanticState( name = "John Doe" , mood = "mad" )
except ValidationError as e:
print ( "Validation Error:" , e)
Reducers
Specify how state updates are performed (details ).
Example: Append values using operator.add
.
from operator import add
from typing import Annotated
class State ( TypedDict ):
foo: Annotated[list[ int ], add]
def node_1 (state):
print ( "---Node 1---" )
return { "foo" : [state[ 'foo' ][ 0 ] + 1 ]}
Custom Reducers
Combine lists safely, handling None
inputs.
def reduce_list (left: list | None , right: list | None ) -> list :
return (left or []) + (right or [])
class CustomReducerState ( TypedDict ):
foo: Annotated[list[ int ], reduce_list]
Messages
Use add_messages
to append messages to the messages
key (details ).
from langgraph.graph import MessagesState
from langgraph.graph.message import add_messages
class CustomMessagesState ( MessagesState ):
added_key_1: str
added_key_2: str
Example: Overwrite messages with the same ID.
initial_messages = [AIMessage( content = "Hello!" , id = "1" )]
new_message = HumanMessage( content = "Hi!" , id = "1" )
add_messages(initial_messages, new_message)
Removal : Use RemoveMessage
to delete specific messages.
from langchain_core.messages import RemoveMessage
messages = [AIMessage( "Hi" , id = "1" ), HumanMessage( "Hello" , id = "2" )]
delete_messages = [RemoveMessage( id = m.id) for m in messages[: - 1 ]]
Multiple Schemas
Private State
Use separate schemas for intermediate processing.
class OverallState ( TypedDict ):
foo: int
class PrivateState ( TypedDict ):
baz: int
def node_1 (state: OverallState) -> PrivateState:
return { "baz" : state[ 'foo' ] + 1 }
def node_2 (state: PrivateState) -> OverallState:
return { "foo" : state[ 'baz' ] + 1 }
Define specific schemas for graph input and output.
class InputState ( TypedDict ):
question: str
class OutputState ( TypedDict ):
answer: str
class OverallState ( TypedDict ):
question: str
answer: str
notes: str
def thinking_node (state: InputState):
return { "answer" : "bye" , "notes" : "... name is Lance" }
def answer_node (state: OverallState) -> OutputState:
return { "answer" : "bye Lance" }
graph = StateGraph(OverallState, input = InputState, output = OutputState)
Reducer for Managing Long-Running Conversations
Challenge : Long conversations increase token usage and latency.
Solution 1 : Use RemoveMessage
with add_messages
reducer to filter messages.
from langchain_core.messages import RemoveMessage
def filter_messages (state: MessagesState):
delete_messages = [RemoveMessage( id = m.id) for m in state[ "messages" ][: - 2 ]]
return { "messages" : delete_messages}
def chat_model_node (state: MessagesState):
return { "messages" : [llm.invoke(state[ "messages" ])]}
# Build graph
builder = StateGraph(MessagesState)
builder.add_node( "filter" , filter_messages)
builder.add_node( "chat_model" , chat_model_node)
builder.add_edge( START , "filter" )
builder.add_edge( "filter" , "chat_model" )
builder.add_edge( "chat_model" , END )
graph = builder.compile()
Trim Messages
Restrict message history to a fixed token limit using trim_messages
.
from langchain_core.messages import trim_messages
def chat_model_node (state: MessagesState):
messages = trim_messages(
state[ "messages" ],
max_tokens = 100 ,
strategy = "last" ,
token_counter = ChatOpenAI( model = "gpt-4o" ),
allow_partial = False ,
)
return { "messages" : [llm.invoke(messages)]}
# Build graph
builder = StateGraph(MessagesState)
builder.add_node( "chat_model" , chat_model_node)
builder.add_edge( START , "chat_model" )
builder.add_edge( "chat_model" , END )
graph = builder.compile()
Chatbot with Message Summarization
Use LLMs to summarize conversations for memory efficiency.
Custom State Schema
from langgraph.graph import MessagesState
from langchain_core.messages import SystemMessage, HumanMessage, RemoveMessage
class State ( MessagesState ):
summary: str
Model Invocation Logic
def call_model (state: State):
summary = state.get( "summary" , "" )
messages = [SystemMessage( content = f "Summary: { summary } " )] + state[ "messages" ] if summary else state[ "messages" ]
response = model.invoke(messages)
return { "messages" : response}
Summarize Node
def summarize_conversation (state: State):
summary = state.get( "summary" , "" )
summary_message = f "Extend this summary: { summary } " if summary else "Summarize the conversation."
messages = state[ "messages" ] + [HumanMessage( content = summary_message)]
response = model.invoke(messages)
delete_messages = [RemoveMessage( id = m.id) for m in state[ "messages" ][: - 2 ]]
return { "summary" : response.content, "messages" : delete_messages}
Conditional Edge
def should_continue (state: State):
return "summarize_conversation" if len (state[ "messages" ]) > 6 else END
Adding Memory
Use MemorySaver
for state persistence across sessions.
from langgraph.checkpoint.memory import MemorySaver
workflow = StateGraph(State)
workflow.add_node( "conversation" , call_model)
workflow.add_node(summarize_conversation)
workflow.add_edge( START , "conversation" )
workflow.add_conditional_edges( "conversation" , should_continue)
workflow.add_edge( "summarize_conversation" , END )
memory = MemorySaver()
graph = workflow.compile( checkpointer = memory)
Threads for Grouped State
Use thread_id
to track conversations.
config = { "configurable" : { "thread_id" : "1" }}
input_message = HumanMessage( content = "Hi, I'm Lance!" )
output = graph.invoke({ "messages" : [input_message]}, config)
Chatbot with Persistent Memory (Sqlite)
from langgraph.checkpoint.sqlite import SqliteSaver
conn = sqlite3.connect( "state_db/example.db" , check_same_thread = False )
memory = SqliteSaver(conn)
workflow = StateGraph(State)
workflow.add_node( "conversation" , call_model)
workflow.add_node(summarize_conversation)
workflow.add_edge( START , "conversation" )
workflow.add_conditional_edges( "conversation" , should_continue)
workflow.add_edge( "summarize_conversation" , END )
graph = workflow.compile( checkpointer = memory)
Persisting State
State persists even after restarting the notebook.
config = { "configurable" : { "thread_id" : "1" }}
graph_state = graph.get_state(config)
print (graph_state)
Key Features
Filtering : Reduce message history with RemoveMessage
.
Trimming : Limit token usage with trim_messages
.
Summarization : Create a running summary for long conversations.
Memory : Use MemorySaver
or SqliteSaver
for persistent state.