Skip to main content

agent-development


Advanced Agent Development Guide

This comprehensive guide covers advanced agent development patterns in AgentMap, including sophisticated architectures, performance optimization, and integration strategies for building production-ready agent systems.

Scaffolding-First Development Workflow

Recommended Approach

The fastest way to develop custom agents is to use AgentMap's service-aware scaffolding system first, then customize the generated code. This approach reduces development time by 90% and ensures proper service integration patterns.

1. Design with Service Requirements

Start by designing your CSV workflow with explicit service requirements:

graph_name,node_name,agent_type,context,description,input_fields,output_field,prompt
AdvancedWorkflow,DataProcessor,AdvancedDataAgent,"{""services"": [""llm"", ""vector"", ""storage""]}","AI-powered data processing",raw_data|query,processed_result,"Analyze {raw_data} for {query}"
AdvancedWorkflow,Validator,ValidationAgent,"{""services"": [""storage""]}","Validate processed data",processed_result,validation_status,"Validate: {processed_result}"

2. Generate Agent Foundation

Use scaffolding to create service-integrated agent foundations:

agentmap scaffold --graph AdvancedWorkflow

Generated Foundation (AdvancedDataAgent):

from agentmap.agents.base_agent import BaseAgent
from agentmap.services.protocols import LLMCapableAgent, VectorCapableAgent, StorageCapableAgent
from typing import Dict, Any

class AdvancedDataAgent(BaseAgent, LLMCapableAgent, VectorCapableAgent, StorageCapableAgent):
"""
AI-powered data processing with LLM, vector, and storage capabilities

Available Services:
- self.llm_service: LLM service for calling language models
- self.vector_service: Vector storage service for similarity search and embeddings
- self.storage_service: Generic storage service (supports all storage types)
"""

def __init__(self):
super().__init__()
# Service attributes (automatically injected during graph building)
self.llm_service: LLMServiceProtocol = None
self.vector_service: Any = None # Vector storage service
self.storage_service: StorageServiceProtocol = None

def process(self, inputs: Dict[str, Any]) -> Any:
# Generated foundation with service integration examples
raw_data_value = inputs.get("raw_data")
query_value = inputs.get("query")

# Ready to customize with your logic
return "Your AdvancedDataAgent implementation here"

3. Enhance with Advanced Patterns

Build upon the scaffolded foundation with advanced agent patterns:

class AdvancedDataAgent(BaseAgent, LLMCapableAgent, VectorCapableAgent, StorageCapableAgent):
def __init__(self):
super().__init__()
# Scaffolded service attributes
self.llm_service: LLMServiceProtocol = None
self.vector_service: Any = None
self.storage_service: StorageServiceProtocol = None

# Advanced customizations
self.cache = {}
self.processing_pipeline = []
self.validation_rules = []

def process(self, inputs: Dict[str, Any]) -> Any:
raw_data = inputs.get("raw_data")
query = inputs.get("query")

# 1. Vector search for relevant context
context = self._get_relevant_context(query)

# 2. LLM analysis with context
analysis = self._analyze_with_llm(raw_data, query, context)

# 3. Storage and caching
self._store_results(analysis, raw_data, query)

return analysis

def _get_relevant_context(self, query: str) -> list:
"""Use vector service to find relevant context"""
if hasattr(self, 'vector_service') and self.vector_service:
return self.vector_service.search(
collection="knowledge_base",
query=query,
limit=5
)
return []

def _analyze_with_llm(self, data: str, query: str, context: list) -> str:
"""Enhanced LLM analysis with context"""
if not (hasattr(self, 'llm_service') and self.llm_service):
return f"Basic analysis of {data} for {query}"

# Build enriched prompt with context
context_text = "\n".join([item.get("content", "") for item in context])
enhanced_prompt = f"""
Data to analyze: {data}
Query: {query}
Relevant context: {context_text}

Provide detailed analysis incorporating the context.
"""

response = self.llm_service.call_llm(
provider="anthropic",
messages=[{"role": "user", "content": enhanced_prompt}],
model="claude-3-opus-20240229"
)

return response.get("content", "Analysis failed")

def _store_results(self, analysis: str, raw_data: str, query: str):
"""Store analysis results for future reference"""
if hasattr(self, 'storage_service') and self.storage_service:
result_data = {
"timestamp": datetime.now().isoformat(),
"raw_data": raw_data,
"query": query,
"analysis": analysis,
"metadata": {"agent": "AdvancedDataAgent", "version": "1.0"}
}

self.storage_service.write(
"json",
f"analysis_results/{hash(query)}.json",
result_data
)

4. Benefits of Scaffolding-First Approach

  • 🚀 Rapid Development: Generate complete agent structures in seconds
  • 🔧 Service Integration: Automatic protocol inheritance and service injection
  • 📝 Documentation: Generated code includes comprehensive documentation and examples
  • ✅ Best Practices: Templates follow AgentMap conventions and patterns
  • 🔄 Iterative Enhancement: Easy to enhance scaffolded foundation with advanced patterns

AgentMap Agent Architecture

Core Agent Pattern

All AgentMap agents inherit from BaseAgent and implement the process method:

from agentmap.agents.base_agent import BaseAgent
from typing import Any, Dict, Optional
import logging

class CustomAgent(BaseAgent):
def __init__(
self,
name: str,
prompt: str,
context: Optional[Dict[str, Any]] = None,
logger: Optional[logging.Logger] = None,
execution_tracking_service=None,
state_adapter_service=None,
):
super().__init__(
name=name,
prompt=prompt,
context=context,
logger=logger,
execution_tracking_service=execution_tracking_service,
state_adapter_service=state_adapter_service,
)

def process(self, inputs: Dict[str, Any]) -> Any:
"""
Process inputs and return output.

Args:
inputs: Dictionary of input values from input_fields

Returns:
Output value for the output_field
"""
# Your agent logic here
result = f"Processed: {inputs}"
return result

Agent Architecture Patterns

1. Pipeline Agents

Pipeline agents process data through a series of stages, each handling a specific transformation:

from agentmap.agents.base_agent import BaseAgent
from typing import List, Dict, Any, Callable
import time
from datetime import datetime

class PipelineAgent(BaseAgent):
def __init__(self, name: str, prompt: str, context=None, **kwargs):
super().__init__(name, prompt, context, **kwargs)
self.pipeline_stages = []
self.stage_configs = {}

def add_stage(self, stage_name: str, processor: Callable, config: Dict = None):
"""Add a processing stage to the pipeline"""
self.pipeline_stages.append(stage_name)
self.stage_configs[stage_name] = {
'processor': processor,
'config': config or {}
}

def process(self, inputs: Dict[str, Any]) -> Any:
"""Process all pipeline stages in sequence"""
current_data = inputs
execution_trace = []

for stage_name in self.pipeline_stages:
stage_config = self.stage_configs[stage_name]
processor = stage_config['processor']
config = stage_config['config']

try:
# Process stage
stage_start = time.time()
current_data = processor(current_data, config)
stage_duration = time.time() - stage_start

execution_trace.append({
'stage': stage_name,
'duration': stage_duration,
'input_size': len(str(current_data)),
'status': 'success'
})

except Exception as e:
execution_trace.append({
'stage': stage_name,
'error': str(e),
'status': 'failed'
})

# Handle pipeline failure
if self.context.get('stop_on_failure', True):
break
else:
# Continue with error placeholder
current_data = {'error': str(e), 'stage': stage_name}

return {
'result': current_data,
'execution_trace': execution_trace,
'total_stages': len(self.pipeline_stages),
'successful_stages': len([t for t in execution_trace if t['status'] == 'success'])
}

# Example usage
class DataProcessingPipeline(PipelineAgent):
def __init__(self, name: str, prompt: str, context=None, **kwargs):
super().__init__(name, prompt, context, **kwargs)

# Add processing stages
self.add_stage('validate', self.validate_data, {'required_fields': ['id', 'name']})
self.add_stage('clean', self.clean_data, {'remove_nulls': True})
self.add_stage('transform', self.transform_data, {'format': 'normalized'})
self.add_stage('enrich', self.enrich_data, {'add_metadata': True})

def validate_data(self, data: Dict, config: Dict) -> Dict:
"""Validate input data structure"""
required_fields = config.get('required_fields', [])

if isinstance(data, list):
# Validate list of records
for record in data:
missing_fields = [field for field in required_fields if field not in record]
if missing_fields:
raise ValueError(f"Missing required fields: {missing_fields}")
else:
# Validate single record
missing_fields = [field for field in required_fields if field not in data]
if missing_fields:
raise ValueError(f"Missing required fields: {missing_fields}")

return data

def clean_data(self, data: Dict, config: Dict) -> Dict:
"""Clean and normalize data"""
remove_nulls = config.get('remove_nulls', False)

if isinstance(data, list):
cleaned_data = []
for record in data:
cleaned_record = self.clean_record(record, remove_nulls)
if cleaned_record: # Only include non-empty records
cleaned_data.append(cleaned_record)
return cleaned_data
else:
return self.clean_record(data, remove_nulls)

def clean_record(self, record: Dict, remove_nulls: bool) -> Dict:
"""Clean individual record"""
cleaned = {}

for key, value in record.items():
# Remove null values if configured
if remove_nulls and value in [None, '', 'null', 'NULL']:
continue

# Clean string values
if isinstance(value, str):
value = value.strip()
if value.lower() in ['n/a', 'na', 'none']:
value = None if not remove_nulls else continue

cleaned[key] = value

return cleaned

def transform_data(self, data: Dict, config: Dict) -> Dict:
"""Transform data to required format"""
format_type = config.get('format', 'default')

if format_type == 'normalized':
return self.normalize_data(data)
elif format_type == 'standardized':
return self.standardize_data(data)
else:
return data

def normalize_data(self, data: Dict) -> Dict:
"""Normalize data values"""
if isinstance(data, list):
return [self.normalize_record(record) for record in data]
else:
return self.normalize_record(data)

def normalize_record(self, record: Dict) -> Dict:
"""Normalize individual record"""
normalized = {}

for key, value in record.items():
# Normalize field names
normalized_key = key.lower().replace(' ', '_').replace('-', '_')

# Normalize values
if isinstance(value, str):
# Convert to lowercase for standardization
if key.lower() in ['status', 'type', 'category']:
value = value.lower()
# Handle boolean strings
elif value.lower() in ['true', 'yes', '1']:
value = True
elif value.lower() in ['false', 'no', '0']:
value = False

normalized[normalized_key] = value

return normalized

def enrich_data(self, data: Dict, config: Dict) -> Dict:
"""Enrich data with additional metadata"""
add_metadata = config.get('add_metadata', False)

if not add_metadata:
return data

metadata = {
'processed_at': datetime.now().isoformat(),
'processor': 'DataProcessingPipeline',
'version': '1.0'
}

if isinstance(data, list):
for record in data:
record['_metadata'] = metadata.copy()
return data
else:
data['_metadata'] = metadata
return data

2. State Machine Agents

State machine agents manage complex workflows with multiple states and transitions:

from enum import Enum
from typing import Dict, Any, Optional, Set

class AgentState(Enum):
INITIALIZED = "initialized"
PROCESSING = "processing"
WAITING_INPUT = "waiting_input"
VALIDATING = "validating"
COMPLETED = "completed"
ERROR = "error"
SUSPENDED = "suspended"

class StateMachineAgent(BaseAgent):
def __init__(self, name: str, prompt: str, context=None, **kwargs):
super().__init__(name, prompt, context, **kwargs)
self.current_state = AgentState.INITIALIZED
self.state_history = [AgentState.INITIALIZED]
self.state_data = {}
self.transitions = {}
self.state_handlers = {}
self.setup_state_machine()

def setup_state_machine(self):
"""Define state transitions and handlers"""
# Define valid transitions
self.transitions = {
AgentState.INITIALIZED: {AgentState.PROCESSING, AgentState.ERROR},
AgentState.PROCESSING: {AgentState.WAITING_INPUT, AgentState.VALIDATING, AgentState.COMPLETED, AgentState.ERROR},
AgentState.WAITING_INPUT: {AgentState.PROCESSING, AgentState.SUSPENDED, AgentState.ERROR},
AgentState.VALIDATING: {AgentState.PROCESSING, AgentState.COMPLETED, AgentState.ERROR},
AgentState.COMPLETED: {AgentState.INITIALIZED}, # Can restart
AgentState.ERROR: {AgentState.INITIALIZED, AgentState.PROCESSING}, # Can recover
AgentState.SUSPENDED: {AgentState.PROCESSING, AgentState.ERROR}
}

# Define state handlers
self.state_handlers = {
AgentState.INITIALIZED: self.handle_initialized,
AgentState.PROCESSING: self.handle_processing,
AgentState.WAITING_INPUT: self.handle_waiting_input,
AgentState.VALIDATING: self.handle_validating,
AgentState.COMPLETED: self.handle_completed,
AgentState.ERROR: self.handle_error,
AgentState.SUSPENDED: self.handle_suspended
}

def process(self, inputs: Dict[str, Any]) -> Any:
"""Process state machine logic"""
self.state_data['input'] = inputs

max_iterations = inputs.get('max_iterations', 10)
iterations = 0

while iterations < max_iterations and self.current_state != AgentState.COMPLETED:
try:
# Process current state handler
result = self.process_current_state()

# Check for state transition
if 'next_state' in result:
self.transition_to(result['next_state'])

# Update state data
if 'state_data' in result:
self.state_data.update(result['state_data'])

iterations += 1

except Exception as e:
self.transition_to(AgentState.ERROR)
self.state_data['error'] = str(e)
break

return {
'final_state': self.current_state.value,
'state_history': [state.value for state in self.state_history],
'iterations': iterations,
'state_data': self.state_data,
'completed': self.current_state == AgentState.COMPLETED
}

def process_current_state(self) -> Dict[str, Any]:
"""Process handler for current state"""
handler = self.state_handlers.get(self.current_state)
if handler:
return handler()
else:
raise Exception(f"No handler defined for state: {self.current_state}")

def transition_to(self, new_state: AgentState):
"""Transition to new state if valid"""
valid_transitions = self.transitions.get(self.current_state, set())

if new_state in valid_transitions:
self.state_history.append(new_state)
self.current_state = new_state
self.log_info(f"Transitioned to state: {new_state.value}")
else:
raise Exception(f"Invalid transition from {self.current_state.value} to {new_state.value}")

def handle_initialized(self) -> Dict[str, Any]:
"""Handle initialized state"""
return {
'next_state': AgentState.PROCESSING,
'state_data': {'initialized_at': datetime.now().isoformat()}
}

def handle_processing(self) -> Dict[str, Any]:
"""Handle processing state - override in subclasses"""
# Default processing logic
input_data = self.state_data.get('input')

if not input_data:
return {'next_state': AgentState.ERROR, 'state_data': {'error': 'No input data'}}

# Simulate processing
processed_data = f"Processed: {input_data}"

return {
'next_state': AgentState.VALIDATING,
'state_data': {'processed_data': processed_data}
}

def handle_waiting_input(self) -> Dict[str, Any]:
"""Handle waiting for input state"""
# Check if additional input is available
additional_input = self.state_data.get('additional_input')

if additional_input:
return {
'next_state': AgentState.PROCESSING,
'state_data': {'additional_input': additional_input}
}
else:
return {'next_state': AgentState.SUSPENDED}

def handle_validating(self) -> Dict[str, Any]:
"""Handle validation state"""
processed_data = self.state_data.get('processed_data')

if processed_data and len(str(processed_data)) > 0:
return {
'next_state': AgentState.COMPLETED,
'state_data': {'validation_passed': True}
}
else:
return {
'next_state': AgentState.ERROR,
'state_data': {'validation_error': 'Invalid processed data'}
}

def handle_completed(self) -> Dict[str, Any]:
"""Handle completed state"""
return {
'state_data': {
'completed_at': datetime.now().isoformat(),
'final_result': self.state_data.get('processed_data')
}
}

def handle_error(self) -> Dict[str, Any]:
"""Handle error state"""
error = self.state_data.get('error', 'Unknown error')

return {
'state_data': {
'error_handled_at': datetime.now().isoformat(),
'error_message': error,
'recovery_available': True
}
}

def handle_suspended(self) -> Dict[str, Any]:
"""Handle suspended state"""
return {
'state_data': {
'suspended_at': datetime.now().isoformat(),
'reason': 'Waiting for external input'
}
}

3. Conditional Processing Agents

Agents that implement conditional logic and branching:

class ConditionalAgent(BaseAgent):
def __init__(self, name: str, prompt: str, context=None, **kwargs):
super().__init__(name, prompt, context, **kwargs)
self.conditions = self.context.get('conditions', [])
self.default_action = self.context.get('default_action', 'continue')

def process(self, inputs: Dict[str, Any]) -> Any:
"""Process inputs based on defined conditions"""

for condition in self.conditions:
if self.evaluate_condition(condition, inputs):
result = self.process_condition_action(condition, inputs)
return {
'condition_met': condition.get('name', 'unnamed'),
'result': result,
'matched': True
}

# No conditions met - use default action
default_result = self.process_default_action(inputs)
return {
'condition_met': 'default',
'result': default_result,
'matched': False
}

def evaluate_condition(self, condition: Dict, inputs: Dict[str, Any]) -> bool:
"""Evaluate if a condition is met"""
condition_type = condition.get('type', 'equals')
field = condition.get('field')
expected_value = condition.get('value')

if field not in inputs:
return False

actual_value = inputs[field]

if condition_type == 'equals':
return actual_value == expected_value
elif condition_type == 'not_equals':
return actual_value != expected_value
elif condition_type == 'greater_than':
return float(actual_value) > float(expected_value)
elif condition_type == 'less_than':
return float(actual_value) < float(expected_value)
elif condition_type == 'contains':
return expected_value in str(actual_value)
elif condition_type == 'regex':
import re
return bool(re.search(expected_value, str(actual_value)))
elif condition_type == 'exists':
return actual_value is not None
elif condition_type == 'custom':
# Allow custom evaluation function
eval_func = condition.get('eval_function')
if eval_func and callable(eval_func):
return eval_func(actual_value, expected_value)

return False

def process_condition_action(self, condition: Dict, inputs: Dict[str, Any]) -> Any:
"""Process action when condition is met"""
action = condition.get('action', 'continue')

if action == 'transform':
transform_func = condition.get('transform_function')
if transform_func and callable(transform_func):
return transform_func(inputs)
else:
return inputs
elif action == 'filter':
filter_fields = condition.get('filter_fields', [])
return {field: inputs[field] for field in filter_fields if field in inputs}
elif action == 'enrich':
enrichment_data = condition.get('enrichment', {})
result = inputs.copy()
result.update(enrichment_data)
return result
elif action == 'stop':
return {'status': 'stopped', 'reason': condition.get('stop_reason', 'Condition met')}
else:
return inputs

def process_default_action(self, inputs: Dict[str, Any]) -> Any:
"""Process default action when no conditions are met"""
if self.default_action == 'continue':
return inputs
elif self.default_action == 'error':
raise ValueError("No matching conditions found")
elif self.default_action == 'empty':
return {}
else:
return {'status': 'no_match', 'input': inputs}

# Example usage for data routing
class DataRouterAgent(ConditionalAgent):
def __init__(self, name: str, prompt: str, context=None, **kwargs):
# Define routing conditions
routing_conditions = [
{
'name': 'high_priority',
'type': 'greater_than',
'field': 'priority',
'value': 8,
'action': 'enrich',
'enrichment': {'route': 'priority_queue', 'urgent': True}
},
{
'name': 'error_status',
'type': 'equals',
'field': 'status',
'value': 'error',
'action': 'enrich',
'enrichment': {'route': 'error_handler', 'needs_attention': True}
},
{
'name': 'large_data',
'type': 'custom',
'field': 'data_size',
'eval_function': lambda value, _: int(value) > 1000000,
'action': 'enrich',
'enrichment': {'route': 'batch_processor', 'processing_mode': 'parallel'}
}
]

if context is None:
context = {}
context.update({
'conditions': routing_conditions,
'default_action': 'continue'
})

super().__init__(name, prompt, context, **kwargs)

Service Integration Patterns

1. LLM-Capable Agents

For agents that need LLM capabilities:

from agentmap.services.protocols import LLMCapableAgent, LLMServiceProtocol

class LLMAgent(BaseAgent, LLMCapableAgent):
def __init__(self, name: str, prompt: str, context=None, **kwargs):
super().__init__(name, prompt, context, **kwargs)
self._llm_service: Optional[LLMServiceProtocol] = None

def configure_llm_service(self, llm_service: LLMServiceProtocol):
"""Configure LLM service using protocol-based injection"""
self._llm_service = llm_service

@property
def llm_service(self) -> LLMServiceProtocol:
"""Get LLM service, raising clear error if not configured"""
if self._llm_service is None:
raise ValueError(f"LLM service not configured for agent '{self.name}'")
return self._llm_service

def process(self, inputs: Dict[str, Any]) -> Any:
"""Process inputs using LLM service"""
# Format prompt with inputs
formatted_prompt = self.prompt.format(**inputs)

# Call LLM service
response = self.llm_service.generate_response(
prompt=formatted_prompt,
context=inputs
)

return response.get('content', 'No response')

def _get_child_service_info(self) -> Optional[Dict[str, Any]]:
"""Provide LLM agent service information"""
return {
"services": {
"llm_service_configured": self._llm_service is not None,
"supports_llm_generation": True,
},
"capabilities": {
"prompt_formatting": True,
"context_injection": True,
"response_processing": True,
}
}

2. Storage-Capable Agents

For agents that need storage capabilities:

from agentmap.services.protocols import StorageCapableAgent, StorageServiceProtocol

class StorageAgent(BaseAgent, StorageCapableAgent):
def __init__(self, name: str, prompt: str, context=None, **kwargs):
super().__init__(name, prompt, context, **kwargs)
self._storage_service: Optional[StorageServiceProtocol] = None

def configure_storage_service(self, storage_service: StorageServiceProtocol):
"""Configure storage service using protocol-based injection"""
self._storage_service = storage_service

@property
def storage_service(self) -> StorageServiceProtocol:
"""Get storage service, raising clear error if not configured"""
if self._storage_service is None:
raise ValueError(f"Storage service not configured for agent '{self.name}'")
return self._storage_service

def process(self, inputs: Dict[str, Any]) -> Any:
"""Process inputs with storage operations"""

# Store input data
storage_key = f"{self.name}_{inputs.get('id', 'unknown')}"
self.storage_service.store(storage_key, inputs)

# Process data
result = self.process_data(inputs)

# Store result
result_key = f"{storage_key}_result"
self.storage_service.store(result_key, result)

return {
'result': result,
'storage_keys': {
'input': storage_key,
'output': result_key
}
}

def process_data(self, inputs: Dict[str, Any]) -> Any:
"""Override for specific data processing logic"""
return f"Processed: {inputs}"

def retrieve_previous_results(self, identifier: str) -> Any:
"""Retrieve previously stored results"""
storage_key = f"{self.name}_{identifier}_result"
return self.storage_service.retrieve(storage_key)

Performance Optimization

1. Caching Strategies

from functools import wraps
import hashlib
import json

class CachingAgent(BaseAgent):
def __init__(self, name: str, prompt: str, context=None, **kwargs):
super().__init__(name, prompt, context, **kwargs)
self.cache_config = {
'default_ttl': 3600, # 1 hour
'max_cache_size': 1000,
'cache_enabled': True
}
self._memory_cache = {}

def cache_key(self, inputs: Dict[str, Any]) -> str:
"""Generate cache key from inputs"""
# Create deterministic hash from input
if isinstance(inputs, dict):
data_str = json.dumps(inputs, sort_keys=True)
else:
data_str = str(inputs)

combined = f"{data_str}:{self.__class__.__name__}:{self.prompt}"
return hashlib.md5(combined.encode()).hexdigest()

def get_from_cache(self, cache_key: str) -> Any:
"""Get result from cache"""
if not self.cache_config['cache_enabled']:
return None

return self._memory_cache.get(cache_key)

def store_in_cache(self, cache_key: str, result: Any):
"""Store result in cache"""
if not self.cache_config['cache_enabled']:
return

self._memory_cache[cache_key] = result

# Simple cache size management
if len(self._memory_cache) > self.cache_config['max_cache_size']:
# Remove oldest entries (simplified LRU)
keys_to_remove = list(self._memory_cache.keys())[:10]
for key in keys_to_remove:
del self._memory_cache[key]

def process(self, inputs: Dict[str, Any]) -> Any:
"""Process with caching"""

# Generate cache key
cache_key = self.cache_key(inputs)

# Try to get from cache
cached_result = self.get_from_cache(cache_key)

if cached_result is not None:
self.log_info(f"Cache hit for key: {cache_key}")
return {**cached_result, '_cache_hit': True}

# Cache miss - process normally
self.log_info(f"Cache miss for key: {cache_key}")
result = self.process_data(inputs)

# Store in cache
result_with_meta = {**result, '_cache_hit': False} if isinstance(result, dict) else result
self.store_in_cache(cache_key, result_with_meta)

return result_with_meta

def process_data(self, inputs: Dict[str, Any]) -> Any:
"""Override in subclasses for specific processing logic"""
return {'processed_data': f"Processed: {inputs}"}

Testing and Quality Assurance

1. Agent Testing Framework

import unittest
from unittest.mock import Mock, patch, MagicMock

class AgentTestCase(unittest.TestCase):
"""Base test case for agent testing"""

def setUp(self):
"""Setup test environment"""
self.mock_logger = Mock()
self.mock_execution_tracker = Mock()
self.mock_state_adapter = Mock()
self.agent = self.create_agent_instance()

def create_agent_instance(self):
"""Create agent instance for testing - override in subclasses"""
return BaseAgent(
name="test_agent",
prompt="Test prompt",
logger=self.mock_logger,
execution_tracking_service=self.mock_execution_tracker,
state_adapter_service=self.mock_state_adapter
)

def assert_agent_processing_successful(self, result: Any):
"""Assert that agent processing was successful"""
self.assertIsNotNone(result)

def assert_agent_processing_failed(self, inputs: Dict[str, Any], expected_error: str = None):
"""Assert that agent processing failed"""
with self.assertRaises(Exception) as context:
self.agent.process(inputs)

if expected_error:
self.assertIn(expected_error, str(context.exception))

class TestDataProcessingAgent(AgentTestCase):
"""Test case for DataProcessingAgent"""

def create_agent_instance(self):
return DataProcessingPipeline(
name="test_pipeline",
prompt="Test processing pipeline",
logger=self.mock_logger,
execution_tracking_service=self.mock_execution_tracker,
state_adapter_service=self.mock_state_adapter
)

def test_valid_data_processing(self):
"""Test processing with valid data"""
valid_inputs = {
'data': {
'id': '123',
'name': 'Test Item',
'value': 100
}
}

result = self.agent.process(valid_inputs)

self.assert_agent_processing_successful(result)
self.assertIn('result', result)
self.assertEqual(result['successful_stages'], result['total_stages'])

def test_invalid_data_processing(self):
"""Test processing with invalid data"""
invalid_inputs = {
'data': {
'name': 'Test Item'
# Missing required 'id' field
}
}

result = self.agent.process(invalid_inputs)

# Should fail at validation stage
self.assertIn('execution_trace', result)
validation_stage = next(
(stage for stage in result['execution_trace'] if stage['stage'] == 'validate'),
None
)
self.assertIsNotNone(validation_stage)
self.assertEqual(validation_stage['status'], 'failed')

Best Practices

1. Error Handling

class RobustAgent(BaseAgent):
def process(self, inputs: Dict[str, Any]) -> Any:
"""Process with comprehensive error handling"""
try:
# Validate inputs
self.validate_inputs(inputs)

# Process data
result = self.process_data(inputs)

# Validate output
self.validate_output(result)

return result

except ValueError as e:
self.log_error(f"Validation error: {str(e)}")
return {'error': 'validation_failed', 'message': str(e)}
except Exception as e:
self.log_error(f"Processing error: {str(e)}")
return {'error': 'processing_failed', 'message': str(e)}

def validate_inputs(self, inputs: Dict[str, Any]):
"""Validate input data"""
required_fields = self.context.get('required_fields', [])

for field in required_fields:
if field not in inputs:
raise ValueError(f"Required field missing: {field}")

def validate_output(self, output: Any):
"""Validate output data"""
if output is None:
raise ValueError("Output cannot be None")

if isinstance(output, dict) and 'error' in output:
raise ValueError(f"Processing resulted in error: {output['error']}")

2. Logging and Monitoring

class MonitoredAgent(BaseAgent):
def process(self, inputs: Dict[str, Any]) -> Any:
"""Process with comprehensive monitoring"""
start_time = time.time()

# Log processing start
self.log_info(f"Starting processing with inputs: {inputs}")

try:
result = self.process_data(inputs)

# Log success
duration = time.time() - start_time
self.log_info(f"Processing completed successfully in {duration:.3f}s")

# Add monitoring metadata
if isinstance(result, dict):
result['_monitoring'] = {
'duration': duration,
'timestamp': datetime.now().isoformat(),
'agent': self.name
}

return result

except Exception as e:
# Log error with context
duration = time.time() - start_time
self.log_error(f"Processing failed after {duration:.3f}s: {str(e)}")

return {
'error': str(e),
'_monitoring': {
'duration': duration,
'timestamp': datetime.now().isoformat(),
'agent': self.name,
'failed': True
}
}