State Management & Data Flow
AgentMap uses a service-based architecture for state management, with the StateAdapterService handling different state formats and the ExecutionTrackingService monitoring state evolution through the workflow.
State Management Architecture
Clean Architecture Approach
State management is handled by specialized services following clean architecture principles:
Core Services
- StateAdapterService: Adapts between different state formats
- ExecutionTrackingService: Tracks state changes during execution
- GraphRunnerService: Orchestrates state flow through the graph
State Structure & Evolution
The state is typically a dictionary that contains:
- Input fields from the initial state
- Output fields from each node's execution
- System fields like
last_action_success
- Optional memory fields for conversational agents
Example State Evolution
# Initial state
state = {"input": "Hello, world!"}
# After Node1 (Echo)
state = {
"input": "Hello, world!",
"echoed": "Hello, world!", # output_field from Node1
"last_action_success": True
}
# After Node2 (OpenAI)
state = {
"input": "Hello, world!",
"echoed": "Hello, world!",
"response": "Greetings, human!", # output_field from Node2
"last_action_success": True
}
AgentMap treats state transitions as immutable operations. Each agent receives the current state and returns an updated state without modifying the original.
StateAdapterService
The StateAdapterService provides a clean interface for state operations across different state formats.
Service Interface
class StateAdapterService:
"""Service for adapting state between different formats"""
def adapt_initial_state(self, state: Any, schema: Type = None) -> Dict[str, Any]:
"""Adapt initial state to required format"""
if isinstance(state, dict):
return state
elif hasattr(state, 'dict'): # Pydantic model
return state.dict()
else:
return {"input": state} # Wrap simple values
def extract_value(self, state: Any, key: str, default: Any = None) -> Any:
"""Extract value from state regardless of format"""
if isinstance(state, dict):
return state.get(key, default)
elif hasattr(state, '__getattribute__'):
return getattr(state, key, default)
return default
def update_state(self, state: Any, key: str, value: Any) -> Any:
"""Update state value maintaining format"""
if isinstance(state, dict):
state[key] = value
return state
# Handle other formats as needed
Usage in Services
class GraphRunnerService:
def __init__(self, state_adapter_service: StateAdapterService, ...):
self.state_adapter = state_adapter_service
# ... other dependencies
def run_graph(self, graph_name: str, initial_state: Any) -> ExecutionResult:
# Adapt state to standard format
adapted_state = self.state_adapter.adapt_initial_state(
initial_state,
self.get_state_schema(graph_name)
)
# Execute with adapted state
result = self.execute(adapted_state)
return result
State Flow in Agent Lifecycle
Complete Agent Execution Flow
Detailed Lifecycle Steps
-
Input Extraction:
- Agent's
run
method extracts input fields from state using StateAdapterService - Only fields listed in
Input_Fields
are accessible to the agent
- Agent's
-
Processing:
- Agent's
process
method transforms inputs to output - Custom logic determines the result based on agent implementation
- Agent's
-
Output Integration:
- Output is stored in the field specified by
Output_Field
- StateAdapterService handles different state formats transparently
- Output is stored in the field specified by
-
Success/Failure Tracking:
last_action_success
flag is set based on execution result- ExecutionTrackingService records node execution details
-
Routing:
- Next node is determined based on routing rules and
last_action_success
- Next node is determined based on routing rules and
Memory Integration
For agents with memory (like LLM agents), there's additional state handling:
# After LLM agent with memory
state = {
"input": "Hello",
"response": "Hi there!",
"chat_memory": {
"_type": "langchain_memory",
"memory_type": "buffer",
"messages": [
{"type": "human", "content": "Hello"},
{"type": "ai", "content": "Hi there!"}
]
},
"last_action_success": True
}
- Memory Serialization/Deserialization: Memory objects are serialized when stored in state and deserialized when retrieved by an agent
- Memory Flow: Memory is passed between nodes via a designated memory field (e.g.,
chat_memory
) - Memory Updates: Agents can add to the memory during processing
ExecutionTrackingService
The ExecutionTrackingService provides comprehensive tracking of state evolution throughout workflow execution.
Service Interface
class ExecutionTrackingService:
"""Service for tracking workflow execution"""
def create_tracker(self, graph_name: str) -> ExecutionTracker:
"""Create a new execution tracker"""
return ExecutionTracker(
graph_name=graph_name,
track_outputs=self.config.track_outputs,
track_inputs=self.config.track_inputs
)
class ExecutionTracker:
"""Tracks execution of a single workflow"""
def track_node_start(self, node_name: str, inputs: Dict[str, Any]):
"""Track when a node starts executing"""
def track_node_complete(self, node_name: str, outputs: Any, success: bool):
"""Track when a node completes"""
def get_summary(self) -> ExecutionSummary:
"""Get execution summary with all tracking data"""
Tracking Configuration
# In agentmap_config.yaml
execution:
tracking:
enabled: true # Enable tracking
track_outputs: false # Track output values (can be large)
track_inputs: false # Track input values
track_duration: true # Track execution times
Using Execution Tracking
# The GraphRunnerService automatically tracks execution
result = runner.run_graph("MyWorkflow", {"input": "data"})
# Access tracking data
summary = result.execution_summary
print(f"Total duration: {summary.total_duration}s")
print(f"Execution path: {' → '.join(summary.execution_path)}")
# Check node-level details
for node, details in summary.node_results.items():
print(f"{node}: {'✓' if details.success else '✗'} ({details.duration}s)")
Execution tracking is useful for:
- Debugging: Understanding workflow execution flow
- Performance: Monitoring bottlenecks and optimization opportunities
- Reliability: Identifying failing nodes and error patterns
- Analytics: Understanding usage patterns and performance metrics
State Formats & Compatibility
Supported State Types
AgentMap supports multiple state formats through the StateAdapterService:
- Dictionary (Default)
- Pydantic Models
- Custom Objects
# Standard dictionary state
state = {
"user_input": "Hello",
"response": "Hi there!",
"last_action_success": True
}
from pydantic import BaseModel
class WorkflowState(BaseModel):
user_input: str
response: Optional[str] = None
last_action_success: bool = True
# Automatically converted to dict for processing
state = WorkflowState(user_input="Hello")
class CustomState:
def __init__(self):
self.user_input = ""
self.response = ""
self.last_action_success = True
def to_dict(self):
return self.__dict__
# StateAdapter handles conversion
Error Handling & Recovery
Error State Management
When an agent encounters an error:
- Error Logging: The error is logged with context
- Success Flag:
last_action_success
is set toFalse
- Error Field: An
error
field may be added to state - Routing: Routing follows the
Failure_Next
path - Recovery: Custom error handling can be implemented in agents
Error Handling Example
def process_with_error_handling(self, inputs: Dict[str, Any]) -> Any:
"""Agent processing with comprehensive error handling."""
try:
# Normal processing logic
result = self.perform_operation(inputs)
return result
except ValidationError as e:
self.logger.error(f"Validation failed: {e}")
return {"error": f"Validation error: {e}", "recoverable": True}
except ExternalServiceError as e:
self.logger.error(f"External service failed: {e}")
return {"error": f"Service unavailable: {e}", "retry_recommended": True}
except Exception as e:
self.logger.error(f"Unexpected error: {e}")
return {"error": f"Processing failed: {e}", "recoverable": False}
CSV Error Handling Pattern
GraphName,Node,AgentType,Success_Next,Failure_Next,Input_Fields,Output_Field,Description
ErrorFlow,ProcessData,custom_processor,Success,HandleError,raw_data,processed_data,"Process incoming data"
ErrorFlow,HandleError,error_handler,Retry,FinalFailure,error,recovery_plan,"Handle processing errors"
ErrorFlow,Retry,custom_processor,Success,FinalFailure,raw_data,processed_data,"Retry processing with recovery"
ErrorFlow,Success,echo,,,processed_data,final_output,"Processing completed successfully"
ErrorFlow,FinalFailure,echo,,,error,final_output,"Processing failed after retry"
Best Practices
State Management Guidelines
- Keep State Simple: Use dictionaries for most workflows
- Minimize State Size: Only include necessary data to reduce memory usage
- Use Descriptive Field Names: Make state fields self-documenting
- Handle Optional Fields: Always provide defaults for optional state fields
- Memory Lifecycle: Understand memory serialization for conversational agents
Performance Considerations
- State Size: Large state objects impact performance
- Memory Management: Clean up unused memory objects
- Tracking Overhead: Disable detailed tracking in production if not needed
- Field Access: Extract only needed fields in agents
Debugging Tips
- Use Execution Tracking: Enable tracking to understand state flow
- Log State Transitions: Add logging in custom agents to trace state changes
- Validate State Schema: Ensure consistent state structure across nodes
- Test Error Paths: Verify error handling and recovery mechanisms
Related Documentation
- Agent Development: Creating custom agents that work with state
- Memory & Conversations: Managing conversational state
- Monitoring & Debugging: Execution tracking and debugging
- Dependency Injection: Service architecture and injection patterns