Skip to main content

Service Registry Patterns

The HostServiceRegistry provides a powerful way for host applications to register their own services and make them available to AgentMap agents through protocol-based dependency injection. This enables seamless integration between AgentMap workflows and existing application infrastructure.

Service Registry Benefits
  • Protocol-Based Discovery: Type-safe service contracts using Python protocols
  • Dependency Injection: Automatic service configuration for agents
  • Loose Coupling: Clean separation between host services and AgentMap logic
  • Extensibility: Easy integration of custom business services
  • Centralized Management: Single registry for all host services

Overview

The HostServiceRegistry is a centralized registry that:

  • Stores host application services and their providers
  • Tracks which protocols each service implements
  • Provides service discovery by protocol
  • Manages service metadata and lifecycle

Architecture Patterns

Protocol-Based Service Contracts

Define clear service contracts using Python's Protocol with @runtime_checkable:

from typing import Protocol, runtime_checkable, Any, List, Dict

@runtime_checkable
class EmailServiceProtocol(Protocol):
"""Protocol for email service implementations."""

def send_email(self, to: str, subject: str, body: str) -> bool:
"""Send an email to the specified recipient."""
...

def send_bulk_email(self, recipients: List[str], subject: str, body: str) -> Dict[str, bool]:
"""Send email to multiple recipients."""
...

def validate_email(self, email: str) -> bool:
"""Validate email address format."""
...

@runtime_checkable
class DatabaseServiceProtocol(Protocol):
"""Protocol for database service implementations."""

def query(self, sql: str, params: Dict[str, Any] = None) -> List[Dict]:
"""Execute a query and return results."""
...

def execute(self, sql: str, params: Dict[str, Any] = None) -> bool:
"""Execute a statement and return success status."""
...

def transaction(self) -> Any:
"""Get a transaction context manager."""
...

@runtime_checkable
class CacheServiceProtocol(Protocol):
"""Protocol for cache service implementations."""

def get(self, key: str) -> Any:
"""Get value from cache."""
...

def set(self, key: str, value: Any, ttl: int = None) -> None:
"""Set value in cache with optional TTL."""
...

def delete(self, key: str) -> bool:
"""Delete key from cache."""
...

def exists(self, key: str) -> bool:
"""Check if key exists in cache."""
...

Service Implementation Patterns

class SMTPEmailService:
"""SMTP-based email service implementation."""

def __init__(self, smtp_host: str, smtp_port: int, username: str, password: str):
self.smtp_host = smtp_host
self.smtp_port = smtp_port
self.username = username
self.password = password

def send_email(self, to: str, subject: str, body: str) -> bool:
"""Send email via SMTP."""
try:
# SMTP implementation
import smtplib
from email.mime.text import MIMEText

msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = self.username
msg['To'] = to

with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
server.starttls()
server.login(self.username, self.password)
server.send_message(msg)

return True
except Exception as e:
print(f"Email send failed: {e}")
return False

def send_bulk_email(self, recipients: List[str], subject: str, body: str) -> Dict[str, bool]:
"""Send email to multiple recipients."""
results = {}
for recipient in recipients:
results[recipient] = self.send_email(recipient, subject, body)
return results

def validate_email(self, email: str) -> bool:
"""Validate email format."""
import re
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None

Registry Setup and Configuration

Basic Registry Usage

from agentmap.di import initialize_di

# Initialize the DI container
container = initialize_di()

# Get the HostServiceRegistry
registry = container.host_service_registry()

# Register services with the registry
def setup_host_services(registry):
"""Set up all host services."""

# Email service
email_service = SMTPEmailService(
smtp_host="smtp.company.com",
smtp_port=587,
username="agentmap@company.com",
password="secure_password"
)

registry.register_service_provider(
service_name="email_service",
provider=email_service, # Can be instance, class, or factory
protocols=[EmailServiceProtocol],
metadata={
"provider": "smtp",
"version": "1.0",
"description": "Corporate SMTP email service"
}
)

# Database service (using factory)
db_factory = lambda: DatabaseServiceFactory.create_postgres_service(
"postgresql://user:pass@localhost:5432/agentmap"
)

registry.register_service_provider(
service_name="database_service",
provider=db_factory,
protocols=[DatabaseServiceProtocol],
metadata={
"provider": "postgresql",
"version": "14.0",
"connection_pool": True
}
)

# Cache service
cache_service = RedisService("redis://localhost:6379/0")

registry.register_service_provider(
service_name="cache_service",
provider=cache_service,
protocols=[CacheServiceProtocol],
metadata={
"provider": "redis",
"version": "7.0",
"cluster": False
}
)

# Set up services
setup_host_services(registry)

Service Discovery and Configuration

Manual Service Configuration

class NotificationAgent(BaseAgent):
"""Agent that sends notifications using host services."""

def __init__(self, **kwargs):
super().__init__(**kwargs)
self._email_service = None
self._database_service = None

def configure_email_service(self, email_service: Any) -> None:
"""Configure email service for this agent."""
self._email_service = email_service
self.log_info("Email service configured")

def configure_database_service(self, database_service: Any) -> None:
"""Configure database service for this agent."""
self._database_service = database_service
self.log_info("Database service configured")

def process(self, inputs: dict) -> Any:
"""Process notification request."""
notification_type = inputs.get("type", "email")

if notification_type == "email" and self._email_service:
success = self._email_service.send_email(
to=inputs['recipient'],
subject=inputs['subject'],
body=inputs['message']
)

# Log to database if available
if self._database_service:
self._database_service.execute(
"INSERT INTO notifications (type, recipient, success) VALUES (%s, %s, %s)",
{"type": "email", "recipient": inputs['recipient'], "success": success}
)

return {"status": "sent" if success else "failed"}

return {"status": "error", "message": "Service not available"}

# Manual configuration
def configure_agent_manually(agent, registry):
"""Manually configure agent with services."""

# Configure email service
email_provider = registry.get_service_provider("email_service")
if email_provider:
email_service = email_provider() if callable(email_provider) else email_provider
agent.configure_email_service(email_service)

# Configure database service
db_provider = registry.get_service_provider("database_service")
if db_provider:
db_service = db_provider() if callable(db_provider) else db_provider
agent.configure_database_service(db_service)

Service Discovery Patterns

def discover_available_services(registry):
"""Discover all available services and their capabilities."""

services_info = {}

for service_name in registry.list_registered_services():
# Get service metadata
metadata = registry.get_service_metadata(service_name)
protocols = registry.get_service_protocols(service_name)

# Test service availability
validation = registry.validate_service_provider(service_name)

services_info[service_name] = {
"protocols": [p.__name__ for p in protocols],
"metadata": metadata,
"available": validation["valid"],
"validation_errors": validation.get("failed_checks", [])
}

return services_info

# Service health monitoring
class ServiceHealthMonitor:
"""Monitor health of registered services."""

def __init__(self, registry):
self.registry = registry
self.health_cache = {}

def check_service_health(self, service_name: str) -> dict:
"""Check health of a specific service."""
try:
provider = self.registry.get_service_provider(service_name)
if not provider:
return {"status": "not_found"}

# Create service instance
service = provider() if callable(provider) else provider

# Protocol-specific health checks
protocols = self.registry.get_service_protocols(service_name)

health_results = {}
for protocol in protocols:
if protocol == EmailServiceProtocol:
# Test email service
health_results["email"] = self._test_email_service(service)
elif protocol == DatabaseServiceProtocol:
# Test database service
health_results["database"] = self._test_database_service(service)
elif protocol == CacheServiceProtocol:
# Test cache service
health_results["cache"] = self._test_cache_service(service)

overall_health = all(result.get("healthy", False) for result in health_results.values())

return {
"status": "healthy" if overall_health else "degraded",
"checks": health_results,
"timestamp": time.time()
}

except Exception as e:
return {
"status": "error",
"error": str(e),
"timestamp": time.time()
}

def _test_email_service(self, service) -> dict:
"""Test email service connectivity."""
try:
# Test email validation (lightweight check)
is_valid = service.validate_email("test@example.com")
return {"healthy": True, "validation_working": is_valid}
except Exception as e:
return {"healthy": False, "error": str(e)}

def _test_database_service(self, service) -> dict:
"""Test database service connectivity."""
try:
# Simple health check query
result = service.query("SELECT 1 as health_check")
return {"healthy": len(result) > 0, "query_working": True}
except Exception as e:
return {"healthy": False, "error": str(e)}

def _test_cache_service(self, service) -> dict:
"""Test cache service connectivity."""
try:
# Test set/get operation
test_key = f"health_check_{time.time()}"
service.set(test_key, "test_value", ttl=60)
value = service.get(test_key)
service.delete(test_key)

return {
"healthy": value == "test_value",
"set_get_working": True
}
except Exception as e:
return {"healthy": False, "error": str(e)}

def get_all_service_health(self) -> dict:
"""Get health status for all registered services."""
health_report = {}

for service_name in self.registry.list_registered_services():
health_report[service_name] = self.check_service_health(service_name)

return health_report

Integration with AgentMap Workflows

Workflow Configuration

GraphName,Node,Edge,Context,AgentType,Success_Next,Failure_Next,Input_Fields,Output_Field,Prompt
ServiceFlow,LoadData,,Load user data,csv_reader,ValidateData,ErrorHandler,,users,data/users.csv
ServiceFlow,ValidateData,,Validate with service,ServiceValidator,ProcessData,ErrorHandler,users,validated_users,
ServiceFlow,ProcessData,,Process data,DataProcessor,SendNotifications,ErrorHandler,validated_users,processed_data,
ServiceFlow,SendNotifications,,Send via email service,NotificationAgent,LogResults,ErrorHandler,processed_data,notification_results,
ServiceFlow,LogResults,,Log to database,DatabaseLogger,End,ErrorHandler,notification_results,log_result,
ServiceFlow,End,,Completion,Echo,,,log_result,final_message,Processing complete with services
ServiceFlow,ErrorHandler,,Handle errors,Echo,End,,error,error_message,Error: {error}

Advanced Service Patterns

class ServiceMiddleware:
"""Middleware for service operations with logging, metrics, etc."""

def __init__(self, service, logger=None, metrics=None):
self.service = service
self.logger = logger
self.metrics = metrics

def __getattr__(self, name):
"""Proxy method calls to underlying service."""
attr = getattr(self.service, name)

if callable(attr):
def wrapper(*args, **kwargs):
start_time = time.time()

try:
if self.logger:
self.logger.info(f"Calling {name} on {type(self.service).__name__}")

result = attr(*args, **kwargs)

if self.metrics:
duration = time.time() - start_time
self.metrics.record_operation(
service=type(self.service).__name__,
operation=name,
duration=duration,
success=True
)

return result

except Exception as e:
if self.logger:
self.logger.error(f"Error in {name}: {e}")

if self.metrics:
duration = time.time() - start_time
self.metrics.record_operation(
service=type(self.service).__name__,
operation=name,
duration=duration,
success=False,
error=str(e)
)

raise

return wrapper

return attr

# Registry with middleware
def register_service_with_middleware(registry, service_name, service, protocols, metadata=None):
"""Register service with automatic middleware wrapping."""

# Create middleware-wrapped service
wrapped_service = ServiceMiddleware(
service,
logger=logging.getLogger(f"service.{service_name}"),
metrics=MetricsCollector()
)

registry.register_service_provider(
service_name=service_name,
provider=wrapped_service,
protocols=protocols,
metadata={**(metadata or {}), "middleware_enabled": True}
)

Testing Service Integration

Unit Testing

import unittest
from unittest.mock import Mock, MagicMock

class TestServiceIntegration(unittest.TestCase):
"""Test service integration with agents."""

def setUp(self):
"""Set up test fixtures."""
self.mock_registry = Mock()
self.mock_email_service = Mock()
self.mock_db_service = Mock()

# Configure mock email service
self.mock_email_service.send_email.return_value = True
self.mock_email_service.validate_email.return_value = True

# Configure mock database service
self.mock_db_service.query.return_value = [{"id": 1, "name": "test"}]
self.mock_db_service.execute.return_value = True

# Configure mock registry
self.mock_registry.get_service_provider.side_effect = self._get_service_provider

def _get_service_provider(self, service_name):
"""Mock service provider lookup."""
if service_name == "email_service":
return self.mock_email_service
elif service_name == "database_service":
return self.mock_db_service
return None

def test_agent_with_email_service(self):
"""Test agent functionality with email service."""
# Arrange
agent = NotificationAgent()
agent.configure_email_service(self.mock_email_service)

inputs = {
"type": "email",
"recipient": "test@example.com",
"subject": "Test",
"message": "Test message"
}

# Act
result = agent.process(inputs)

# Assert
self.assertEqual(result["status"], "sent")
self.mock_email_service.send_email.assert_called_once_with(
"test@example.com", "Test", "Test message"
)

def test_service_discovery(self):
"""Test service discovery mechanism."""
# Arrange
agent = ServiceAwareAgent()

# Act
configured_count = configure_agent_from_registry(agent, self.mock_registry)

# Assert
self.assertGreater(configured_count, 0)
self.mock_registry.list_registered_services.assert_called()

Best Practices

Service Design Guidelines

Service Design Best Practices
  1. Protocol-First Design: Define protocols before implementations
  2. Immutable Interfaces: Keep protocol interfaces stable across versions
  3. Error Handling: Implement comprehensive error handling and logging
  4. Resource Management: Properly manage connections, file handles, etc.
  5. Thread Safety: Ensure services are thread-safe for concurrent access
  6. Documentation: Document service behavior, limitations, and dependencies
Service Implementation Checklist

Protocol Definition:

  • Use @runtime_checkable decorator
  • Define clear method signatures with type hints
  • Document expected behavior and exceptions
  • Consider async variants if needed

Implementation:

  • Handle all error conditions gracefully
  • Implement proper resource cleanup
  • Add logging for debugging
  • Include configuration validation
  • Support both sync and async patterns if applicable

Testing:

  • Unit tests for all public methods
  • Integration tests with real dependencies
  • Error condition testing
  • Performance testing for critical paths
  • Mock interfaces for dependent services

Documentation:

  • API documentation with examples
  • Configuration options
  • Error codes and meanings
  • Performance characteristics
  • Dependencies and requirements

Registry Management

class ServiceRegistry:
"""Enhanced service registry with lifecycle management."""

def __init__(self):
self._services = {}
self._lifecycle_hooks = {}

def register_service_with_lifecycle(self, service_name, provider, protocols,
startup=None, shutdown=None, metadata=None):
"""Register service with lifecycle hooks."""

self.register_service_provider(service_name, provider, protocols, metadata)

if startup or shutdown:
self._lifecycle_hooks[service_name] = {
"startup": startup,
"shutdown": shutdown
}

def startup_services(self):
"""Initialize all registered services."""
for service_name, hooks in self._lifecycle_hooks.items():
if hooks.get("startup"):
try:
hooks["startup"]()
print(f"Service {service_name} started successfully")
except Exception as e:
print(f"Failed to start service {service_name}: {e}")

def shutdown_services(self):
"""Clean up all registered services."""
for service_name, hooks in self._lifecycle_hooks.items():
if hooks.get("shutdown"):
try:
hooks["shutdown"]()
print(f"Service {service_name} shut down successfully")
except Exception as e:
print(f"Failed to shut down service {service_name}: {e}")

# Usage with lifecycle management
def setup_services_with_lifecycle(registry):
"""Set up services with proper lifecycle management."""

# Database service with connection management
db_service = DatabaseService()

registry.register_service_with_lifecycle(
service_name="database_service",
provider=db_service,
protocols=[DatabaseServiceProtocol],
startup=db_service.connect,
shutdown=db_service.disconnect,
metadata={"managed_lifecycle": True}
)

Troubleshooting

Common Issues

Service Not Found

Problem: get_service_provider returns None

Solutions:

  1. Verify service is registered: registry.is_service_registered("service_name")
  2. Check service name spelling
  3. Ensure registration happened before lookup
  4. Review registry initialization order
# Debug service registration
def debug_registry(registry):
"""Debug registry state."""
services = registry.list_registered_services()
print(f"Registered services: {services}")

for service_name in services:
protocols = registry.get_service_protocols(service_name)
metadata = registry.get_service_metadata(service_name)
print(f" {service_name}: {[p.__name__ for p in protocols]} - {metadata}")
Protocol Recognition Issues

Problem: Protocol-based discovery not working

Solutions:

  1. Ensure protocol is decorated with @runtime_checkable
  2. Import protocol from correct module
  3. Verify agent actually implements protocol methods
  4. Check isinstance() calls work as expected
# Test protocol recognition
def test_protocol_recognition():
"""Test if service implements expected protocol."""
service = MyEmailService()

print(f"Implements EmailServiceProtocol: {isinstance(service, EmailServiceProtocol)}")

# Check individual methods
required_methods = ['send_email', 'validate_email']
for method in required_methods:
has_method = hasattr(service, method)
print(f"Has {method}: {has_method}")
Service Configuration Failures

Problem: Agent configuration methods not called

Solutions:

  1. Check configuration method exists on agent
  2. Verify method name follows convention
  3. Ensure service instance created correctly
  4. Review agent initialization order
# Debug agent configuration
def debug_agent_configuration(agent, registry):
"""Debug agent service configuration."""
print(f"Agent type: {type(agent).__name__}")

# Check available configuration methods
config_methods = [
method for method in dir(agent)
if method.startswith('configure_') and method.endswith('_service')
]
print(f"Available config methods: {config_methods}")

# Check services in registry
for service_name in registry.list_registered_services():
protocols = registry.get_service_protocols(service_name)
for protocol in protocols:
protocol_name = protocol.__name__
base_name = protocol_name.replace('ServiceProtocol', '').replace('Protocol', '')
expected_method = f"configure_{base_name.lower()}_service"

has_method = hasattr(agent, expected_method)
print(f"Service {service_name} -> Method {expected_method}: {has_method}")
Next Steps

The service registry provides a powerful foundation for integrating AgentMap with existing application infrastructure. Consider implementing:

  1. Health monitoring for all registered services
  2. Circuit breaker patterns for resilience
  3. Service discovery automation for dynamic environments
  4. Metrics collection for performance monitoring