Integration Patterns
AgentMap provides multiple integration patterns to extend functionality and connect with external systems. This guide covers the major integration approaches: LangChain memory, custom host services, external APIs, and enterprise systems.
- LangChain Memory: Advanced conversation memory with multiple strategies
- Host Services: Custom service injection using protocols
- External APIs: HTTP, REST, and GraphQL integrations
- Enterprise Systems: Database, message queue, and microservice patterns
LangChain Memory Integration
AgentMap integrates seamlessly with LangChain memory components for sophisticated conversation management.
Memory Types and Strategies
- Buffer Memory
- Buffer Window Memory
- Summary Memory
- Token Buffer Memory
Buffer Memory stores complete conversation history without limitations.
graph_name,node_name,next_node,context,agent_type,next_on_success,next_on_failure,input_fields,output_field,prompt
ChatBot,Conversation,,"{'memory':{'type':'buffer','memory_key':'chat_history'}}",claude,Next,Error,user_input|chat_history,response,Human: {user_input}
Best for: Short conversations where all context is needed
Buffer Window Memory keeps only the most recent k
interactions.
graph_name,node_name,next_node,context,agent_type,next_on_success,next_on_failure,input_fields,output_field,prompt
ChatBot,Conversation,,"{'memory':{'type':'buffer_window','k':10,'memory_key':'chat_history'}}",claude,Next,Error,user_input|chat_history,response,Human: {user_input}
Best for: Longer conversations where recent context is most important
Summary Memory maintains a running summary instead of storing all exchanges.
graph_name,node_name,next_node,context,agent_type,next_on_success,next_on_failure,input_fields,output_field,prompt
ChatBot,Conversation,,"{'memory':{'type':'summary','memory_key':'chat_history'}}",claude,Next,Error,user_input|chat_history,response,Human: {user_input}
Best for: Very long conversations where overall context matters more than specific details
Token Buffer Memory limits memory based on token count rather than message count.
graph_name,node_name,next_node,context,agent_type,next_on_success,next_on_failure,input_fields,output_field,prompt
ChatBot,Conversation,,"{'memory':{'type':'token_buffer','max_token_limit':4000,'memory_key':'chat_history'}}",claude,Next,Error,user_input|chat_history,response,Human: {user_input}
Best for: Precise control over token usage and cost optimization
Memory Configuration Reference
Parameter | Description | Default | Type |
---|---|---|---|
type | Memory strategy | "buffer" | "buffer" , "buffer_window" , "summary" , "token_buffer" |
memory_key | State field for memory storage | "conversation_memory" | String |
k | Window size (buffer_window) | 5 | Integer |
max_token_limit | Token limit (token_buffer) | 2000 | Integer |
Multi-Agent Memory Sharing
Share memory across multiple agents in a workflow:
Support,GetQuery,,Get user query,input,Classify,,query,user_query,How can we help you today?
Support,Classify,,"{'memory':{'type':'buffer_window','k':5,'memory_key':'support_session'}}",claude,RouteQuery,Error,user_query|support_session,query_type,Classify this query: {user_query}
Support,ProductSpecialist,,"{'memory':{'type':'buffer_window','k':5,'memory_key':'support_session'}}",openai,End,Error,user_query|support_session,response,"You are a product specialist. User: {user_query}"
Support,TechSupport,,"{'memory':{'type':'buffer_window','k':5,'memory_key':'support_session'}}",openai,End,Error,user_query|support_session,response,"You are a technical support agent. User: {user_query}"
Host Service Integration
Extend AgentMap with custom services using protocol-based dependency injection.
Protocol Definition
Define service interfaces using Python protocols:
from typing import Protocol, runtime_checkable, Any
from abc import abstractmethod
@runtime_checkable
class DatabaseServiceProtocol(Protocol):
"""Protocol for agents that need database access."""
@abstractmethod
def configure_database_service(self, database_service: Any) -> None:
"""Configure the agent with a database service."""
...
Service Implementation
Create concrete service classes:
class DatabaseService:
"""Database service for AgentMap integration."""
def __init__(self, config: Dict[str, Any], logger: logging.Logger):
self.config = config
self.logger = logger
self.connection_string = config.get("connection_string")
def execute_query(self, query: str, params: tuple = None) -> List[Dict]:
"""Execute database query and return results."""
try:
# Database implementation here
results = self._execute_sql(query, params)
return {"success": True, "data": results}
except Exception as e:
self.logger.error(f"Database query failed: {e}")
return {"success": False, "error": str(e)}
Custom Agent with Service
Build agents that implement protocols to receive services:
from agentmap.agents.base_agent import BaseAgent
class DatabaseAgent(BaseAgent, DatabaseServiceProtocol):
"""Agent that performs database operations."""
def configure_database_service(self, database_service: Any) -> None:
"""Configure database service (called automatically)."""
self.database_service = database_service
self.log_debug("Database service configured")
def process(self, inputs: Dict[str, Any]) -> Any:
"""Execute database operations."""
if not self.database_service:
return {"error": "Database service not available"}
operation = inputs.get("operation", "query")
if operation == "get_users":
return self.database_service.execute_query("SELECT * FROM users")
elif operation == "insert_user":
user_data = inputs.get("user_data", {})
return self.database_service.execute_query(
"INSERT INTO users (name, email) VALUES (?, ?)",
(user_data.get("name"), user_data.get("email"))
)
return {"error": f"Unknown operation: {operation}"}
Service Registration
Register services with AgentMap's dependency injection container:
from agentmap.di.containers import ApplicationContainer
def create_database_service(app_config_service, logging_service):
"""Factory function for database service."""
config = app_config_service.get_host_service_config("database_service")
logger = logging_service.get_logger("database_service")
return DatabaseService(config["configuration"], logger)
# Register with AgentMap
container = ApplicationContainer()
container.register_host_factory(
service_name="database_service",
factory_function=create_database_service,
dependencies=["app_config_service", "logging_service"],
protocols=[DatabaseServiceProtocol]
)
# Register agent
agent_registry = container.agent_registry_service()
agent_registry.register_agent("database", DatabaseAgent)
External API Integration
Connect AgentMap workflows to external APIs and services.
REST API Agent
Build agents that interact with REST APIs:
import requests
from typing import Dict, Any
class RestApiAgent(BaseAgent):
"""Agent for REST API integration."""
def __init__(self, name: str, prompt: str, context: Optional[Dict[str, Any]] = None, **kwargs):
super().__init__(name, prompt, context, **kwargs)
# API configuration
self.base_url = self.context.get("base_url", "")
self.api_key = self.context.get("api_key", "")
self.timeout = self.context.get("timeout", 30)
self.headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}" if self.api_key else ""
}
def process(self, inputs: Dict[str, Any]) -> Any:
"""Make REST API calls based on inputs."""
method = inputs.get("method", "GET").upper()
endpoint = inputs.get("endpoint", "")
data = inputs.get("data", {})
url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}"
try:
if method == "GET":
response = requests.get(url, headers=self.headers, timeout=self.timeout)
elif method == "POST":
response = requests.post(url, json=data, headers=self.headers, timeout=self.timeout)
elif method == "PUT":
response = requests.put(url, json=data, headers=self.headers, timeout=self.timeout)
elif method == "DELETE":
response = requests.delete(url, headers=self.headers, timeout=self.timeout)
else:
return {"error": f"Unsupported method: {method}"}
response.raise_for_status()
return {
"success": True,
"status_code": response.status_code,
"data": response.json() if response.content else None,
"url": url
}
except requests.RequestException as e:
self.log_error(f"API request failed: {e}")
return {
"success": False,
"error": str(e),
"url": url
}
GraphQL Integration
Create agents for GraphQL API interaction:
import requests
import json
class GraphQLAgent(BaseAgent):
"""Agent for GraphQL API integration."""
def process(self, inputs: Dict[str, Any]) -> Any:
"""Execute GraphQL queries and mutations."""
query = inputs.get("query", "")
variables = inputs.get("variables", {})
if not query:
return {"error": "No GraphQL query provided"}
endpoint = self.context.get("graphql_endpoint", "")
api_key = self.context.get("api_key", "")
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}" if api_key else ""
}
payload = {
"query": query,
"variables": variables
}
try:
response = requests.post(
endpoint,
json=payload,
headers=headers,
timeout=self.context.get("timeout", 30)
)
response.raise_for_status()
result = response.json()
if "errors" in result:
return {
"success": False,
"errors": result["errors"],
"data": result.get("data")
}
return {
"success": True,
"data": result.get("data"),
"query": query
}
except Exception as e:
self.log_error(f"GraphQL query failed: {e}")
return {"success": False, "error": str(e)}
Enterprise System Integration
Integrate with enterprise systems like databases, message queues, and microservices.
Database Integration
Advanced database operations with connection pooling:
import sqlalchemy
from sqlalchemy import create_engine, text
from sqlalchemy.pool import QueuePool
class EnterpriseDBAgent(BaseAgent):
"""Agent for enterprise database integration."""
def __init__(self, name: str, prompt: str, context: Optional[Dict[str, Any]] = None, **kwargs):
super().__init__(name, prompt, context, **kwargs)
# Database configuration
self.connection_string = self.context.get("connection_string", "")
self.pool_size = self.context.get("pool_size", 5)
self.max_overflow = self.context.get("max_overflow", 10)
# Create engine with connection pooling
self.engine = create_engine(
self.connection_string,
poolclass=QueuePool,
pool_size=self.pool_size,
max_overflow=self.max_overflow,
echo=self.context.get("echo_sql", False)
)
def process(self, inputs: Dict[str, Any]) -> Any:
"""Execute database operations with enterprise features."""
operation = inputs.get("operation", "query")
try:
with self.engine.connect() as connection:
if operation == "query":
return self._execute_query(connection, inputs)
elif operation == "transaction":
return self._execute_transaction(connection, inputs)
elif operation == "bulk_insert":
return self._bulk_insert(connection, inputs)
else:
return {"error": f"Unknown operation: {operation}"}
except Exception as e:
self.log_error(f"Database operation failed: {e}")
return {"error": str(e)}
def _execute_query(self, connection, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a SELECT query."""
query = inputs.get("query", "")
params = inputs.get("params", {})
result = connection.execute(text(query), params)
rows = [dict(row._mapping) for row in result]
return {
"success": True,
"data": rows,
"row_count": len(rows)
}
def _execute_transaction(self, connection, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""Execute multiple operations in a transaction."""
operations = inputs.get("operations", [])
with connection.begin():
results = []
for op in operations:
result = connection.execute(text(op["query"]), op.get("params", {}))
results.append({
"affected_rows": result.rowcount,
"query": op["query"]
})
return {
"success": True,
"transaction_results": results
}
Message Queue Integration
Connect to message queues for asynchronous processing:
import json
import pika
from typing import Dict, Any
class MessageQueueAgent(BaseAgent):
"""Agent for message queue integration (RabbitMQ)."""
def __init__(self, name: str, prompt: str, context: Optional[Dict[str, Any]] = None, **kwargs):
super().__init__(name, prompt, context, **kwargs)
# RabbitMQ configuration
self.host = self.context.get("host", "localhost")
self.port = self.context.get("port", 5672)
self.username = self.context.get("username", "guest")
self.password = self.context.get("password", "guest")
self.virtual_host = self.context.get("virtual_host", "/")
def process(self, inputs: Dict[str, Any]) -> Any:
"""Handle message queue operations."""
operation = inputs.get("operation", "publish")
try:
# Establish connection
credentials = pika.PlainCredentials(self.username, self.password)
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=self.host,
port=self.port,
virtual_host=self.virtual_host,
credentials=credentials
)
)
channel = connection.channel()
if operation == "publish":
result = self._publish_message(channel, inputs)
elif operation == "consume":
result = self._consume_messages(channel, inputs)
elif operation == "declare_queue":
result = self._declare_queue(channel, inputs)
else:
result = {"error": f"Unknown operation: {operation}"}
connection.close()
return result
except Exception as e:
self.log_error(f"Message queue operation failed: {e}")
return {"error": str(e)}
def _publish_message(self, channel, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""Publish message to queue."""
queue_name = inputs.get("queue", "")
message = inputs.get("message", {})
# Declare queue if it doesn't exist
channel.queue_declare(queue=queue_name, durable=True)
# Publish message
channel.basic_publish(
exchange='',
routing_key=queue_name,
body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=2) # Make message persistent
)
return {
"success": True,
"queue": queue_name,
"message": message
}
def _consume_messages(self, channel, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""Consume messages from queue."""
queue_name = inputs.get("queue", "")
max_messages = inputs.get("max_messages", 10)
messages = []
for _ in range(max_messages):
method_frame, header_frame, body = channel.basic_get(queue=queue_name)
if method_frame:
try:
message = json.loads(body.decode())
messages.append(message)
channel.basic_ack(method_frame.delivery_tag)
except json.JSONDecodeError:
messages.append({"raw_body": body.decode()})
channel.basic_ack(method_frame.delivery_tag)
else:
break
return {
"success": True,
"queue": queue_name,
"messages": messages,
"count": len(messages)
}
Microservices Integration
Connect to microservice architectures:
import requests
import consul
from typing import Dict, Any
class MicroserviceAgent(BaseAgent):
"""Agent for microservice integration with service discovery."""
def __init__(self, name: str, prompt: str, context: Optional[Dict[str, Any]] = None, **kwargs):
super().__init__(name, prompt, context, **kwargs)
# Consul configuration for service discovery
consul_host = self.context.get("consul_host", "localhost")
consul_port = self.context.get("consul_port", 8500)
self.consul = consul.Consul(host=consul_host, port=consul_port)
# Circuit breaker configuration
self.circuit_breaker_threshold = self.context.get("circuit_breaker_threshold", 5)
self.circuit_breaker_timeout = self.context.get("circuit_breaker_timeout", 60)
self.failed_requests = {}
def process(self, inputs: Dict[str, Any]) -> Any:
"""Call microservice with service discovery."""
service_name = inputs.get("service", "")
endpoint = inputs.get("endpoint", "")
method = inputs.get("method", "GET")
data = inputs.get("data", {})
try:
# Service discovery
service_url = self._discover_service(service_name)
if not service_url:
return {"error": f"Service not found: {service_name}"}
# Circuit breaker check
if self._is_circuit_open(service_name):
return {"error": f"Circuit breaker open for {service_name}"}
# Make service call
url = f"{service_url.rstrip('/')}/{endpoint.lstrip('/')}"
response = self._make_request(method, url, data)
# Record success
self._record_success(service_name)
return {
"success": True,
"service": service_name,
"data": response
}
except Exception as e:
# Record failure
self._record_failure(service_name)
self.log_error(f"Microservice call failed: {e}")
return {"error": str(e)}
def _discover_service(self, service_name: str) -> str:
"""Discover service URL using Consul."""
try:
_, services = self.consul.health.service(service_name, passing=True)
if services:
service = services[0]['Service']
return f"http://{service['Address']}:{service['Port']}"
return None
except Exception as e:
self.log_error(f"Service discovery failed: {e}")
return None
def _is_circuit_open(self, service_name: str) -> bool:
"""Check if circuit breaker is open for service."""
if service_name not in self.failed_requests:
return False
failures = self.failed_requests[service_name]
if failures['count'] >= self.circuit_breaker_threshold:
time_since_last_failure = time.time() - failures['last_failure']
return time_since_last_failure < self.circuit_breaker_timeout
return False
def _record_success(self, service_name: str):
"""Record successful request."""
if service_name in self.failed_requests:
del self.failed_requests[service_name]
def _record_failure(self, service_name: str):
"""Record failed request."""
if service_name not in self.failed_requests:
self.failed_requests[service_name] = {'count': 0, 'last_failure': 0}
self.failed_requests[service_name]['count'] += 1
self.failed_requests[service_name]['last_failure'] = time.time()
Configuration Examples
LangChain Memory in CSV
graph_name,node_name,next_node,context,agent_type,next_on_success,next_on_failure,input_fields,output_field,prompt,description
# Buffer memory for short conversations
ChatBot,Basic,,"{'memory':{'type':'buffer'}}",claude,Next,Error,user_input|conversation_memory,response,Human: {user_input},Basic chat with full memory
# Window memory for longer conversations
Support,Agent,,"{'memory':{'type':'buffer_window','k':8}}",openai,Next,Error,query|conversation_memory,response,Support agent: {query},Support with recent context
# Token-limited memory for cost control
Analysis,Agent,,"{'memory':{'type':'token_buffer','max_token_limit':3000}}",openai,Next,Error,data|conversation_memory,analysis,Analyze: {data},Cost-optimized analysis
Host Service Configuration
host_application:
enabled: true
protocol_folders:
- "protocols"
- "custom_protocols"
services:
database_service:
enabled: true
configuration:
connection_string: "${DATABASE_URL}"
pool_size: 10
max_overflow: 20
api_service:
enabled: true
configuration:
base_url: "${API_BASE_URL}"
api_key: "${API_KEY}"
timeout: 30
retries: 3
message_queue:
enabled: true
configuration:
host: "${MQ_HOST}"
port: 5672
username: "${MQ_USERNAME}"
password: "${MQ_PASSWORD}"
Integration Workflow Example
graph_name,node_name,next_node,context,agent_type,next_on_success,next_on_failure,input_fields,output_field,prompt,description
DataPipeline,GetQuery,,"{'memory':{'type':'buffer_window','k':5}}",input,FetchData,Error,,user_query,What data do you need?,Collect user requirements
DataPipeline,FetchData,,Get from database,custom:DatabaseAgent,ProcessAPI,Error,user_query|conversation_memory,db_data,,Query database
DataPipeline,ProcessAPI,,"{'base_url':'${API_URL}','timeout':30}",custom:RestApiAgent,AnalyzeData,Error,db_data,api_data,,Enrich with API data
DataPipeline,AnalyzeData,,"{'memory':{'type':'buffer_window','k':5}}",openai,SendMessage,Error,db_data|api_data|conversation_memory,analysis,Analyze this data: {db_data} {api_data},AI analysis
DataPipeline,SendMessage,,"{'queue':'results'}",custom:MessageQueueAgent,End,Error,analysis,message_sent,,Send to queue
DataPipeline,Error,,Handle errors,echo,End,,error,error_message,,Error handling
DataPipeline,End,,Complete workflow,echo,,,analysis|error_message,result,,Workflow complete
Best Practices
Integration Design
- Start Simple: Begin with basic integrations and add complexity gradually
- Handle Failures: Always include error handling and recovery mechanisms
- Use Protocols: Leverage protocol-based injection for clean architecture
- Monitor Performance: Track integration performance and bottlenecks
Memory Management
- Choose Appropriate Types: Select memory strategy based on conversation length
- Share Memory Wisely: Use same memory key for related agents
- Limit Memory Size: Prevent unlimited growth in long conversations
- Test Memory Behavior: Verify memory works as expected in workflows
Service Integration
- Configuration Management: Use environment variables for sensitive data
- Connection Pooling: Use connection pools for database and network resources
- Circuit Breakers: Implement circuit breakers for external service calls
- Graceful Degradation: Provide fallback behavior when services fail
Security Considerations
- Secret Management: Never hardcode API keys or passwords
- Input Validation: Validate all inputs to prevent injection attacks
- Network Security: Use HTTPS and proper authentication
- Access Control: Implement proper authorization and access controls
Related Documentation
Core Concepts
- Custom Agent Development - Building agents that use integrations
- Service Injection - Understanding dependency injection patterns
- Memory Management - Basic memory concepts
Advanced Topics
- Orchestration Patterns - Dynamic routing with memory
- Testing Strategies - Testing integrated workflows
- Best Practices - Development guidelines
Production
- Deployment - Deploying integrated workflows
- Monitoring - Monitoring integrations
- Security - Securing integrated systems
💡 Pro Tip: Start with LangChain memory for conversational features, then add custom services as your needs grow. The protocol-based approach makes it easy to add new integrations without changing existing code.
Last updated: June 28, 2025