Lesson 6: External Services Integration
Ready to connect your AI workflows to the real world? In this advanced lesson, you'll learn to integrate external APIs, databases, and custom services into AgentMap workflows, creating powerful systems that bridge AI capabilities with existing infrastructure.
Learning Objectives
By the end of this lesson, you will:
- ✅ Build custom service implementations for external APIs
- ✅ Use the host protocol registry for service management
- ✅ Implement service injection patterns for flexible architectures
- ✅ Handle authentication, rate limiting, and error recovery
- ✅ Create reusable service libraries for common integrations
- ✅ Design fault-tolerant service interaction patterns
Overview: What We're Building
We'll create a Comprehensive External Integration System that:
- Connects to multiple external APIs (Slack, GitHub, email)
- Manages authentication and rate limiting automatically
- Processes data from multiple sources intelligently
- Handles failures gracefully with retry logic
- Provides a unified interface for service interactions
Step 1: Download the External Integration System
Let's get all the files for our comprehensive external services integration:
Main Workflow File
Sample Project Tasks Data
Custom Service: Email Service
Custom Service: Slack Service
Step 2: Understanding Service Architecture
Service Registration Pattern
AgentMap uses a service registry pattern for external integrations:
# Service registration
from agentmap.registry import ServiceRegistry
ServiceRegistry.register_service('email_service', EmailService)
ServiceRegistry.register_service('slack_service', SlackService)
ServiceRegistry.register_service('github_service', GitHubService)
Host Protocol Implementation
Services implement the host protocol interface:
class BaseService:
def __init__(self, config: Dict[str, Any] = None):
self.config = config or {}
def process(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""Main service processing method."""
raise NotImplementedError
def health_check(self) -> bool:
"""Check if service is healthy."""
return True
def get_capabilities(self) -> List[str]:
"""Return list of service capabilities."""
return []
Service Injection Patterns
1. Configuration-Based Injection
node,type,context
EmailNotification,email_service,"{""provider"": ""smtp"", ""retry_count"": 3}"
2. Dynamic Service Selection
def select_service(requirements):
if requirements.get('priority') == 'high':
return 'premium_email_service'
else:
return 'standard_email_service'
3. Service Composition
class CompositeService(BaseService):
def __init__(self, services: List[BaseService]):
self.services = services
def process(self, inputs):
results = []
for service in self.services:
result = service.process(inputs)
results.append(result)
return self.aggregate_results(results)
Step 3: Running the External Integration System
Setup Steps
- Create Directory Structure:
mkdir -p external-integration/data
cd external-integration
-
Download All Files:
- lesson6.csv (main workflow)
- project_tasks.json (sample project data)
- email_service.py (email integration)
- slack_service.py (Slack integration)
-
Configure Services:
# Set environment variables for service authentication
export SMTP_USERNAME="your-email@gmail.com"
export SMTP_PASSWORD="your-app-password"
export SLACK_BOT_TOKEN="xoxb-your-bot-token"
export SLACK_WEBHOOK_URL="https://hooks.slack.com/services/..."
- Install Service Files:
# Copy service files to AgentMap services directory
cp email_service.py ~/.agentmap/services/
cp slack_service.py ~/.agentmap/services/
- Run the Integration:
agentmap run lesson6.csv
Expected Execution Flow
📄 Loading project tasks...
🔍 Analyzing service requirements...
📧 Sending email notifications...
💬 Posting Slack updates...
🔗 Creating GitHub issues...
💾 Storing data in database...
🌐 Calling custom APIs...
📊 Aggregating all results...
📋 Generating status report...
✅ Integration complete!
Step 4: Advanced Service Patterns
Pattern 1: Circuit Breaker
Prevent cascading failures with circuit breakers:
class CircuitBreakerService(BaseService):
def __init__(self, wrapped_service, failure_threshold=5, timeout=60):
super().__init__()
self.wrapped_service = wrapped_service
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = 0
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
def process(self, inputs):
if self.state == 'OPEN':
if time.time() - self.last_failure_time > self.timeout:
self.state = 'HALF_OPEN'
else:
return {"error": "Circuit breaker is OPEN"}
try:
result = self.wrapped_service.process(inputs)
if self.state == 'HALF_OPEN':
self.state = 'CLOSED'
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
raise e
Pattern 2: Service Proxy with Caching
Cache expensive service calls:
class CachingServiceProxy(BaseService):
def __init__(self, wrapped_service, cache_ttl=300):
super().__init__()
self.wrapped_service = wrapped_service
self.cache = {}
self.cache_ttl = cache_ttl
def process(self, inputs):
cache_key = self._generate_cache_key(inputs)
# Check cache
if cache_key in self.cache:
cached_item = self.cache[cache_key]
if time.time() - cached_item['timestamp'] < self.cache_ttl:
return cached_item['result']
# Call service and cache result
result = self.wrapped_service.process(inputs)
self.cache[cache_key] = {
'result': result,
'timestamp': time.time()
}
return result
def _generate_cache_key(self, inputs):
# Create a deterministic cache key
return hashlib.md5(json.dumps(inputs, sort_keys=True).encode()).hexdigest()
Pattern 3: Service Health Monitoring
Monitor service health automatically:
class HealthMonitoringService(BaseService):
def __init__(self, services: Dict[str, BaseService]):
super().__init__()
self.services = services
self.health_status = {}
self.last_check_time = {}
def process(self, inputs):
# Check health of all services
for name, service in self.services.items():
if self._should_check_health(name):
self.health_status[name] = service.health_check()
self.last_check_time[name] = time.time()
# Route to healthy services only
healthy_services = {
name: service for name, service in self.services.items()
if self.health_status.get(name, True)
}
if not healthy_services:
return {"error": "No healthy services available"}
# Use first healthy service (could implement load balancing)
service_name, service = next(iter(healthy_services.items()))
return service.process(inputs)
def _should_check_health(self, service_name):
last_check = self.last_check_time.get(service_name, 0)
return time.time() - last_check > 60 # Check every minute
Step 5: Building Custom Services
Exercise 1: GitHub Service
Create a comprehensive GitHub integration service:
class GitHubService(BaseService):
def __init__(self, config: Dict[str, Any] = None):
super().__init__(config)
self.api_token = self.config.get('api_token', '')
self.base_url = self.config.get('base_url', 'https://api.github.com')
def process(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
github_config = self._extract_github_config(inputs)
results = []
for config in github_config:
action = config.get('action', 'create_issue')
if action == 'create_issue':
result = self._create_issue(config)
elif action == 'update_pr':
result = self._update_pull_request(config)
elif action == 'add_comment':
result = self._add_comment(config)
else:
result = {'error': f'Unknown action: {action}'}
results.append(result)
return self._aggregate_github_results(results)
def _create_issue(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""Create a GitHub issue."""
repo = config.get('repository', '')
title = config.get('title', 'AgentMap Integration')
body = config.get('body', '')
labels = config.get('labels', [])
# Simulate GitHub API call
issue_data = {
'title': title,
'body': body,
'labels': labels
}
return {
'action': 'create_issue',
'status': 'success',
'repository': repo,
'issue_number': 42, # Simulated
'url': f'https://github.com/{repo}/issues/42'
}
Exercise 2: Database Service
Build a database integration service:
class DatabaseService(BaseService):
def __init__(self, config: Dict[str, Any] = None):
super().__init__(config)
self.connection_string = self.config.get('connection_string', '')
self.pool_size = self.config.get('pool_size', 5)
def process(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
db_operations = self._extract_db_operations(inputs)
results = []
for operation in db_operations:
op_type = operation.get('operation', 'select')
if op_type == 'insert':
result = self._insert_data(operation)
elif op_type == 'update':
result = self._update_data(operation)
elif op_type == 'select':
result = self._select_data(operation)
elif op_type == 'delete':
result = self._delete_data(operation)
else:
result = {'error': f'Unknown operation: {op_type}'}
results.append(result)
return self._aggregate_db_results(results)
def _insert_data(self, operation: Dict[str, Any]) -> Dict[str, Any]:
"""Insert data into database."""
table = operation.get('table', '')
data = operation.get('data', {})
# Simulate database insertion
return {
'operation': 'insert',
'status': 'success',
'table': table,
'rows_affected': 1,
'record_id': 12345
}
Exercise 3: Custom API Service
Create a flexible API integration service:
class CustomAPIService(BaseService):
def __init__(self, config: Dict[str, Any] = None):
super().__init__(config)
self.default_timeout = self.config.get('timeout', 30)
self.retry_count = self.config.get('retry_count', 3)
def process(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
api_calls = self._extract_api_calls(inputs)
results = []
for call in api_calls:
result = self._make_api_call_with_retry(call)
results.append(result)
return self._aggregate_api_results(results)
def _make_api_call_with_retry(self, call_config: Dict[str, Any]) -> Dict[str, Any]:
"""Make API call with retry logic."""
last_error = None
for attempt in range(self.retry_count + 1):
try:
return self._make_single_api_call(call_config)
except Exception as e:
last_error = str(e)
if attempt < self.retry_count:
wait_time = 2 ** attempt
time.sleep(wait_time)
return {
'status': 'failed',
'error': last_error,
'attempts': self.retry_count + 1
}
def _make_single_api_call(self, call_config: Dict[str, Any]) -> Dict[str, Any]:
"""Make a single API call."""
endpoint = call_config.get('endpoint', '')
method = call_config.get('method', 'GET')
headers = call_config.get('headers', {})
data = call_config.get('data', {})
# Simulate API call
return {
'status': 'success',
'endpoint': endpoint,
'method': method,
'response_code': 200,
'response_time': 0.5,
'data': {'message': 'API call successful'}
}
Step 6: Service Configuration and Management
Environment-Based Configuration
import os
class ServiceConfig:
@staticmethod
def get_email_config():
return {
'provider': os.getenv('EMAIL_PROVIDER', 'smtp'),
'smtp_host': os.getenv('SMTP_HOST', 'smtp.gmail.com'),
'smtp_port': int(os.getenv('SMTP_PORT', '587')),
'smtp_username': os.getenv('SMTP_USERNAME', ''),
'smtp_password': os.getenv('SMTP_PASSWORD', ''),
'retry_count': int(os.getenv('EMAIL_RETRY_COUNT', '3'))
}
@staticmethod
def get_slack_config():
return {
'bot_token': os.getenv('SLACK_BOT_TOKEN', ''),
'webhook_url': os.getenv('SLACK_WEBHOOK_URL', ''),
'default_channels': os.getenv('SLACK_DEFAULT_CHANNELS', '#general').split(','),
'timeout': int(os.getenv('SLACK_TIMEOUT', '30'))
}
Service Discovery
class ServiceDiscovery:
def __init__(self):
self.services = {}
self.service_configs = {}
def register_service(self, name: str, service_class: type, config: Dict[str, Any] = None):
"""Register a service with discovery system."""
self.services[name] = service_class
self.service_configs[name] = config or {}
def get_service(self, name: str) -> BaseService:
"""Get service instance by name."""
if name not in self.services:
raise ValueError(f"Service {name} not found")
service_class = self.services[name]
config = self.service_configs[name]
return service_class(config)
def list_services(self) -> List[str]:
"""List all registered services."""
return list(self.services.keys())
def health_check_all(self) -> Dict[str, bool]:
"""Health check all registered services."""
health_status = {}
for name in self.services:
try:
service = self.get_service(name)
health_status[name] = service.health_check()
except Exception:
health_status[name] = False
return health_status
Service Middleware
class ServiceMiddleware:
def __init__(self, service: BaseService):
self.service = service
self.middlewares = []
def add_middleware(self, middleware):
"""Add middleware to the service chain."""
self.middlewares.append(middleware)
def process(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""Process inputs through middleware chain."""
# Pre-processing middleware
for middleware in self.middlewares:
if hasattr(middleware, 'pre_process'):
inputs = middleware.pre_process(inputs)
# Main service processing
result = self.service.process(inputs)
# Post-processing middleware
for middleware in reversed(self.middlewares):
if hasattr(middleware, 'post_process'):
result = middleware.post_process(result)
return result
class LoggingMiddleware:
def pre_process(self, inputs):
logging.info(f"Service call started with inputs: {inputs}")
return inputs
def post_process(self, result):
logging.info(f"Service call completed with result: {result}")
return result
class MetricsMiddleware:
def __init__(self):
self.call_count = 0
self.total_time = 0
def pre_process(self, inputs):
self.start_time = time.time()
self.call_count += 1
return inputs
def post_process(self, result):
end_time = time.time()
self.total_time += (end_time - self.start_time)
return result
Step 7: Error Handling and Resilience
Comprehensive Error Handling
class ResilientService(BaseService):
def __init__(self, services: List[BaseService], strategy='failover'):
super().__init__()
self.services = services
self.strategy = strategy
def process(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
if self.strategy == 'failover':
return self._failover_strategy(inputs)
elif self.strategy == 'parallel':
return self._parallel_strategy(inputs)
elif self.strategy == 'load_balance':
return self._load_balance_strategy(inputs)
else:
raise ValueError(f"Unknown strategy: {self.strategy}")
def _failover_strategy(self, inputs):
"""Try services in order until one succeeds."""
last_error = None
for i, service in enumerate(self.services):
try:
return service.process(inputs)
except Exception as e:
last_error = str(e)
self.logger.warning(f"Service {i} failed: {last_error}")
return {"error": f"All services failed. Last error: {last_error}"}
def _parallel_strategy(self, inputs):
"""Run all services in parallel and return first success."""
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {executor.submit(service.process, inputs): service
for service in self.services}
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
if 'error' not in result:
return result
except Exception as e:
self.logger.warning(f"Service failed: {str(e)}")
return {"error": "All parallel services failed"}
def _load_balance_strategy(self, inputs):
"""Distribute load across services."""
import random
available_services = [s for s in self.services if s.health_check()]
if not available_services:
return {"error": "No healthy services available"}
selected_service = random.choice(available_services)
return selected_service.process(inputs)
Graceful Degradation
class GracefulDegradationService(BaseService):
def __init__(self, primary_service: BaseService, fallback_service: BaseService):
super().__init__()
self.primary_service = primary_service
self.fallback_service = fallback_service
def process(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
try:
# Try primary service first
result = self.primary_service.process(inputs)
# Check if result indicates degraded performance
if self._is_degraded_result(result):
self.logger.warning("Primary service showing degraded performance")
return self._try_fallback(inputs)
return result
except Exception as e:
self.logger.error(f"Primary service failed: {str(e)}")
return self._try_fallback(inputs)
def _try_fallback(self, inputs):
"""Try fallback service with reduced functionality."""
try:
result = self.fallback_service.process(inputs)
result['degraded_mode'] = True
result['message'] = 'Service running in degraded mode'
return result
except Exception as e:
return {
"error": "Both primary and fallback services failed",
"primary_error": str(e),
"fallback_error": str(e)
}
def _is_degraded_result(self, result):
"""Check if result indicates degraded performance."""
if 'response_time' in result and result['response_time'] > 5.0:
return True
if 'error_rate' in result and result['error_rate'] > 0.1:
return True
return False
Key Concepts Mastered
1. Service Architecture Patterns
- Service registration and discovery
- Host protocol implementation
- Service injection and composition
- Middleware and proxy patterns
2. External Integration Strategies
- Multi-provider service implementations
- Authentication and rate limiting
- Retry logic and circuit breakers
- Health monitoring and failover
3. Resilience and Error Handling
- Graceful degradation strategies
- Circuit breaker patterns
- Parallel and failover execution
- Comprehensive error recovery
4. Service Management
- Configuration management
- Service discovery and registration
- Health monitoring and metrics
- Load balancing and routing
Troubleshooting External Services
Common Integration Issues
Issue: Authentication failures
Symptoms: 401/403 errors from external APIs Solutions:
- Verify API keys and tokens are correct
- Check token expiration and refresh logic
- Confirm service account permissions
- Test authentication outside of workflow
Issue: Rate limiting
Symptoms: 429 errors or throttling responses Solutions:
- Implement exponential backoff
- Add rate limiting awareness to services
- Use service queues for high-volume operations
- Consider upgrading service plans
Issue: Network connectivity
Symptoms: Timeout errors or connection failures Solutions:
- Check firewall and proxy configurations
- Verify DNS resolution
- Test service endpoints directly
- Implement connection pooling
Issue: Service dependencies
Symptoms: Cascading failures across services Solutions:
- Use circuit breakers
- Implement service health checks
- Design for graceful degradation
- Add service isolation
Congratulations!
You've mastered external services integration! You can now build workflows that:
- ✅ Connect to External APIs - Integrate with any REST/GraphQL service
- ✅ Handle Authentication - Manage tokens, keys, and OAuth flows
- ✅ Implement Resilience - Circuit breakers, retries, and failover
- ✅ Monitor Service Health - Track performance and availability
- ✅ Manage Configuration - Environment-based and dynamic configuration
What's Next?
Ready for deployment and production patterns? Let's explore:
- Lesson 7: FastAPI Standalone - Deploy as standalone web services
- Lesson 8: FastAPI Integration - Integrate into existing applications
🌐 External services integration opens unlimited possibilities - connect your AI workflows to any system, API, or service in your technology ecosystem. You're now ready to build truly connected intelligent automation!