Skip to main content

Advanced Testing Patterns

Specialized testing patterns for complex AgentMap scenarios including performance testing, security validation, integration testing, and advanced mock strategies.

For Complex Scenarios

Reference these patterns when implementing:

  • 🚀 Performance & Load Testing - Scalability and timing requirements
  • 🔐 Security Testing - Authentication and authorization patterns
  • 🔄 Integration Testing - Multi-service coordination
  • 🎛️ Advanced Mocking - Complex dependency scenarios
  • 📊 Monitoring & Metrics - Observability testing

🚀 Performance Testing Patterns

Benchmark Testing with Timing Assertions

import time
import threading
from unittest.mock import patch
from contextlib import contextmanager

class PerformanceTestMixin:
"""Mixin for performance testing capabilities."""

@contextmanager
def time_operation(self, max_duration=1.0, operation_name="operation"):
"""Context manager for timing operations with automatic assertions."""
start_time = time.time()
try:
yield
finally:
duration = time.time() - start_time
self.assertLess(
duration,
max_duration,
f"{operation_name} took {duration:.2f}s, expected < {max_duration}s"
)

def assert_performance_improvement(self, baseline_func, optimized_func,
min_improvement=0.5, iterations=3):
"""Assert that optimized function performs better than baseline."""

# Measure baseline performance
baseline_times = []
for _ in range(iterations):
start = time.time()
baseline_func()
baseline_times.append(time.time() - start)

# Measure optimized performance
optimized_times = []
for _ in range(iterations):
start = time.time()
optimized_func()
optimized_times.append(time.time() - start)

# Calculate averages
avg_baseline = sum(baseline_times) / len(baseline_times)
avg_optimized = sum(optimized_times) / len(optimized_times)

improvement = (avg_baseline - avg_optimized) / avg_baseline

self.assertGreater(
improvement,
min_improvement,
f"Expected {min_improvement*100}% improvement, got {improvement*100:.1f}%"
)

class TestServicePerformance(unittest.TestCase, PerformanceTestMixin):
"""Performance testing for AgentMap services."""

def setUp(self):
self.mock_config = MockServiceFactory.create_mock_app_config_service()
self.mock_logging = MockServiceFactory.create_mock_logging_service()
self.mock_llm = MockServiceFactory.create_mock_llm_service()

# Configure fast mock responses
self.mock_llm.process.return_value = {"result": "fast_response"}

self.service = MyService(
app_config_service=self.mock_config,
logging_service=self.mock_logging,
llm_service=self.mock_llm
)

def test_single_request_performance(self):
"""Test single request meets performance requirements."""
with self.time_operation(max_duration=0.1, operation_name="single_request"):
result = self.service.process_single_request("test_input")

self.assertTrue(result.success)
self.mock_llm.process.assert_called_once()

def test_batch_processing_efficiency(self):
"""Test batch processing is more efficient than individual requests."""

def process_individually():
for i in range(10):
self.service.process_single_request(f"input_{i}")

def process_as_batch():
inputs = [f"input_{i}" for i in range(10)]
self.service.process_batch_requests(inputs)

# Reset mock call counts between tests
self.mock_llm.reset_mock()

self.assert_performance_improvement(
baseline_func=process_individually,
optimized_func=process_as_batch,
min_improvement=0.3 # 30% improvement expected
)

def test_caching_performance_benefit(self):
"""Test caching provides measurable performance improvement."""

# First call - cache miss (slower)
self.mock_storage = MockServiceFactory.create_mock_storage_service()
self.mock_storage.cache_lookup.return_value = None
self.mock_storage.cache_store.return_value = True
self.service.storage = self.mock_storage

with self.time_operation(max_duration=0.5, operation_name="cache_miss"):
result1 = self.service.process_with_cache("test_key")

# Second call - cache hit (faster)
self.mock_storage.cache_lookup.return_value = {"cached": "fast_result"}

with self.time_operation(max_duration=0.1, operation_name="cache_hit"):
result2 = self.service.process_with_cache("test_key")

# Verify cache was used
self.assertEqual(self.mock_storage.cache_lookup.call_count, 2)
self.assertEqual(result2.result, "fast_result")

Load Testing and Concurrency

import concurrent.futures
import queue
import threading
from unittest.mock import Mock

class LoadTestingMixin:
"""Mixin for load testing capabilities."""

def run_concurrent_requests(self, request_func, num_requests=10,
max_workers=5, timeout=10):
"""Run multiple requests concurrently and collect results."""

results = []
errors = []

with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all requests
futures = []
for i in range(num_requests):
future = executor.submit(request_func, i)
futures.append(future)

# Collect results with timeout
for future in concurrent.futures.as_completed(futures, timeout=timeout):
try:
result = future.result()
results.append(result)
except Exception as e:
errors.append(e)

return results, errors

def assert_concurrent_performance(self, request_func, expected_success_rate=0.95,
max_response_time=2.0, num_requests=20):
"""Assert service handles concurrent load appropriately."""

start_time = time.time()
results, errors = self.run_concurrent_requests(request_func, num_requests)
total_time = time.time() - start_time

# Calculate success rate
success_rate = len(results) / num_requests
avg_response_time = total_time / num_requests

self.assertGreaterEqual(
success_rate,
expected_success_rate,
f"Success rate {success_rate:.2%} below expected {expected_success_rate:.2%}"
)

self.assertLess(
avg_response_time,
max_response_time,
f"Average response time {avg_response_time:.2f}s exceeds {max_response_time}s"
)

if errors:
print(f"Encountered {len(errors)} errors during load test:")
for error in errors[:3]: # Show first 3 errors
print(f" - {error}")

class TestServiceLoadHandling(unittest.TestCase, LoadTestingMixin):
"""Load testing for service scalability."""

def setUp(self):
self.service = self.create_test_service()

# Configure thread-safe mocks
self.mock_llm = MockServiceFactory.create_mock_llm_service()
self.mock_llm.process.return_value = {"result": "load_test_response"}
self.service.llm_service = self.mock_llm

def test_concurrent_request_handling(self):
"""Test service handles concurrent requests without issues."""

def make_request(request_id):
return self.service.process_request(f"request_{request_id}")

self.assert_concurrent_performance(
request_func=make_request,
expected_success_rate=0.95,
max_response_time=1.0,
num_requests=20
)

# Verify all requests were processed
self.assertEqual(self.mock_llm.process.call_count, 20)

def test_memory_usage_under_load(self):
"""Test memory usage remains stable under load."""
import psutil
import os

process = psutil.Process(os.getpid())
initial_memory = process.memory_info().rss

# Process large dataset
large_inputs = [f"large_input_{i}" * 100 for i in range(100)]

results = []
for input_data in large_inputs:
result = self.service.process_large_input(input_data)
results.append(result)

final_memory = process.memory_info().rss
memory_growth = final_memory - initial_memory

# Assert memory growth is reasonable (< 100MB)
max_growth = 100 * 1024 * 1024 # 100MB in bytes
self.assertLess(
memory_growth,
max_growth,
f"Memory grew by {memory_growth / (1024*1024):.1f}MB, expected < 100MB"
)

# Verify all results were processed
self.assertEqual(len(results), 100)
self.assertTrue(all(r.success for r in results))

Resource Usage and Profiling

import cProfile
import pstats
import memory_profiler
from functools import wraps

class ProfilingTestMixin:
"""Mixin for performance profiling in tests."""

def profile_operation(self, operation_func, *args, **kwargs):
"""Profile an operation and return performance statistics."""

profiler = cProfile.Profile()
profiler.enable()

try:
result = operation_func(*args, **kwargs)
finally:
profiler.disable()

# Analyze results
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')

return result, stats

def assert_no_expensive_operations(self, operation_func, forbidden_patterns, *args, **kwargs):
"""Assert operation doesn't perform expensive operations."""

result, stats = self.profile_operation(operation_func, *args, **kwargs)

# Check for forbidden operations
stats_dict = stats.get_stats()
for func_info in stats_dict:
func_name = str(func_info)
for pattern in forbidden_patterns:
if pattern in func_name.lower():
self.fail(f"Forbidden operation detected: {pattern} in {func_name}")

return result

@memory_profiler.profile
def memory_profile_operation(self, operation_func, *args, **kwargs):
"""Profile memory usage of an operation."""
return operation_func(*args, **kwargs)

class TestServiceProfiling(unittest.TestCase, ProfilingTestMixin):
"""Performance profiling tests."""

def test_no_file_operations_in_processing(self):
"""Ensure processing doesn't perform real file operations."""

forbidden_operations = ['open', 'read', 'write', 'stat']

result = self.assert_no_expensive_operations(
self.service.process_data,
forbidden_operations,
"test_input"
)

self.assertTrue(result.success)

def test_batch_processing_efficiency_profile(self):
"""Profile batch processing to identify bottlenecks."""

large_batch = [f"item_{i}" for i in range(1000)]

result, stats = self.profile_operation(
self.service.process_batch,
large_batch
)

# Analyze top time consumers
stats.sort_stats('cumulative')
top_functions = stats.get_stats()

# Print top 5 for analysis
print("Top time consumers:")
for i, (func_info, (cc, nc, tt, ct, callers)) in enumerate(list(top_functions.items())[:5]):
print(f"{i+1}. {func_info}: {ct:.3f}s cumulative")

# Assert reasonable performance
self.assertTrue(result.success)
self.assertEqual(len(result.processed_items), 1000)

🔐 Security Testing Patterns

Authentication and Authorization Testing

import jwt
from datetime import datetime, timedelta

class SecurityTestMixin:
"""Mixin for security testing capabilities."""

def create_test_jwt_token(self, payload=None, secret="test_secret",
expires_in_minutes=60, algorithm="HS256"):
"""Create test JWT token with configurable payload."""

default_payload = {
"user_id": "test_user",
"permissions": ["read", "write"],
"exp": datetime.utcnow() + timedelta(minutes=expires_in_minutes),
"iat": datetime.utcnow()
}

if payload:
default_payload.update(payload)

return jwt.encode(default_payload, secret, algorithm=algorithm)

def assert_requires_authentication(self, request_func, *args, **kwargs):
"""Assert that endpoint requires authentication."""

# Test without authentication
response = request_func(*args, **kwargs)
self.assertEqual(response.status_code, 401, "Expected 401 for unauthenticated request")

# Test with invalid token
headers = {"Authorization": "Bearer invalid_token"}
kwargs_with_auth = kwargs.copy()
kwargs_with_auth['headers'] = headers

response = request_func(*args, **kwargs_with_auth)
self.assertEqual(response.status_code, 401, "Expected 401 for invalid token")

def assert_requires_permission(self, request_func, required_permission, *args, **kwargs):
"""Assert that endpoint requires specific permission."""

# Test with insufficient permissions
token = self.create_test_jwt_token({"permissions": ["read"]})
headers = {"Authorization": f"Bearer {token}"}
kwargs_with_auth = kwargs.copy()
kwargs_with_auth['headers'] = headers

response = request_func(*args, **kwargs_with_auth)
self.assertEqual(response.status_code, 403, "Expected 403 for insufficient permissions")

# Test with correct permissions
token = self.create_test_jwt_token({"permissions": [required_permission]})
headers = {"Authorization": f"Bearer {token}"}
kwargs_with_auth['headers'] = headers

response = request_func(*args, **kwargs_with_auth)
self.assertNotEqual(response.status_code, 403, "Should allow access with correct permission")

class TestAPISecurityPatterns(unittest.TestCase, SecurityTestMixin):
"""Security testing for API endpoints."""

def setUp(self):
self.client = self.create_test_client()
self.setup_auth_service()

def test_authentication_matrix(self):
"""Test authentication requirements across all endpoints."""

protected_endpoints = [
("GET", "/api/graphs"),
("POST", "/api/graphs"),
("PUT", "/api/graphs/test"),
("DELETE", "/api/graphs/test")
]

for method, endpoint in protected_endpoints:
with self.subTest(method=method, endpoint=endpoint):
request_func = getattr(self.client, method.lower())
self.assert_requires_authentication(request_func, endpoint)

def test_permission_matrix(self):
"""Test permission requirements for different operations."""

permission_tests = [
("GET", "/api/graphs", "graphs:read"),
("POST", "/api/graphs", "graphs:write"),
("PUT", "/api/graphs/test", "graphs:write"),
("DELETE", "/api/graphs/test", "graphs:delete")
]

for method, endpoint, permission in permission_tests:
with self.subTest(method=method, endpoint=endpoint, permission=permission):
request_func = getattr(self.client, method.lower())
self.assert_requires_permission(request_func, permission, endpoint)

def test_token_expiration_handling(self):
"""Test expired token handling."""

# Create expired token
expired_token = self.create_test_jwt_token(expires_in_minutes=-60)
headers = {"Authorization": f"Bearer {expired_token}"}

response = self.client.get("/api/graphs", headers=headers)
self.assertEqual(response.status_code, 401)

response_data = response.json()
self.assertIn("expired", response_data.get("error", "").lower())

def test_rate_limiting(self):
"""Test rate limiting protection."""

# Create valid token
token = self.create_test_jwt_token()
headers = {"Authorization": f"Bearer {token}"}

# Make requests up to rate limit
responses = []
for i in range(20): # Assuming 10 requests per minute limit
response = self.client.get("/api/graphs", headers=headers)
responses.append(response)
if response.status_code == 429: # Rate limited
break

# Should eventually get rate limited
rate_limited_responses = [r for r in responses if r.status_code == 429]
self.assertGreater(len(rate_limited_responses), 0, "Rate limiting should activate")

Input Validation and Security

class SecurityValidationTestMixin:
"""Mixin for input validation security testing."""

def assert_rejects_sql_injection(self, request_func, param_name, *args, **kwargs):
"""Test that endpoint rejects SQL injection attempts."""

sql_injection_payloads = [
"'; DROP TABLE users; --",
"1' OR '1'='1",
"UNION SELECT * FROM admin_users",
"'; EXEC xp_cmdshell('dir'); --"
]

for payload in sql_injection_payloads:
with self.subTest(payload=payload):
kwargs_copy = kwargs.copy()
kwargs_copy[param_name] = payload

response = request_func(*args, **kwargs_copy)
self.assertIn(
response.status_code,
[400, 422], # Bad Request or Unprocessable Entity
f"Should reject SQL injection payload: {payload}"
)

def assert_rejects_xss_attempts(self, request_func, param_name, *args, **kwargs):
"""Test that endpoint rejects XSS attempts."""

xss_payloads = [
"<script>alert('xss')</script>",
"javascript:alert('xss')",
"<img src=x onerror=alert('xss')>",
"';alert('xss');//"
]

for payload in xss_payloads:
with self.subTest(payload=payload):
kwargs_copy = kwargs.copy()
kwargs_copy[param_name] = payload

response = request_func(*args, **kwargs_copy)

# Should either reject (400/422) or sanitize
if response.status_code == 200:
response_text = response.text or ""
self.assertNotIn("<script>", response_text.lower())
self.assertNotIn("javascript:", response_text.lower())
else:
self.assertIn(response.status_code, [400, 422])

def assert_validates_file_uploads(self, upload_func, allowed_types, *args, **kwargs):
"""Test file upload validation."""

dangerous_files = [
("malware.exe", b"MZ\x90\x00", "application/octet-stream"),
("script.php", b"<?php echo 'hello'; ?>", "text/php"),
("payload.js", b"alert('xss')", "application/javascript")
]

for filename, content, content_type in dangerous_files:
with self.subTest(filename=filename):
# Should reject files not in allowed types
if content_type not in allowed_types:
response = upload_func(filename, content, content_type, *args, **kwargs)
self.assertIn(response.status_code, [400, 422, 415])

class TestInputValidationSecurity(unittest.TestCase, SecurityValidationTestMixin):
"""Input validation security tests."""

def test_graph_name_validation(self):
"""Test graph name input validation."""

token = self.create_test_jwt_token({"permissions": ["graphs:write"]})
headers = {"Authorization": f"Bearer {token}"}

def create_graph_request(graph_name):
return self.client.post(
"/api/graphs",
json={"name": graph_name, "description": "Test graph"},
headers=headers
)

# Test SQL injection protection
self.assert_rejects_sql_injection(create_graph_request, "graph_name")

# Test XSS protection
self.assert_rejects_xss_attempts(create_graph_request, "graph_name")

# Test path traversal protection
path_traversal_payloads = ["../../../etc/passwd", "..\\windows\\system32"]
for payload in path_traversal_payloads:
response = create_graph_request(payload)
self.assertIn(response.status_code, [400, 422])

def test_csv_upload_security(self):
"""Test CSV upload security validation."""

def upload_csv(filename, content, content_type):
return self.client.post(
"/api/graphs/upload",
files={"file": (filename, content, content_type)},
headers=self.get_auth_headers()
)

allowed_types = ["text/csv", "application/csv"]
self.assert_validates_file_uploads(upload_csv, allowed_types)

# Test CSV bomb protection (large files)
large_csv = "a,b,c\n" * 1000000 # 1M rows
response = upload_csv("large.csv", large_csv.encode(), "text/csv")
self.assertIn(response.status_code, [400, 413, 422]) # Should reject large files

🔄 Integration Testing Patterns

Multi-Service Integration Testing

class IntegrationTestBase(unittest.TestCase):
"""Base class for integration testing with real service coordination."""

def setUp(self):
"""Set up integration test environment with real services."""

# Use real config service with test configuration
self.config_service = ConfigService()
self.config_service.load_config("tests/data/integration_config.yaml")

# Use real dependency injection container
self.container = DIContainer()
self.container.register_service("config", self.config_service)

# Register real services
self.container.register_service(
"app_config",
AppConfigService(config_service=self.config_service)
)

# Mock only external dependencies (LLM, storage)
self.mock_llm_service = MockServiceFactory.create_mock_llm_service()
self.mock_storage_service = MockServiceFactory.create_mock_storage_service()

self.container.register_service("llm", self.mock_llm_service)
self.container.register_service("storage", self.mock_storage_service)

# Real services that depend on mocked externals
self.graph_builder = GraphBuilderService(
app_config_service=self.container.get_service("app_config"),
logging_service=self.container.get_service("logging")
)

self.workflow_executor = WorkflowExecutorService(
llm_service=self.mock_llm_service,
storage_service=self.mock_storage_service,
app_config_service=self.container.get_service("app_config")
)

def tearDown(self):
"""Clean up integration test environment."""
self.container.clear()

class TestEndToEndWorkflowIntegration(IntegrationTestBase):
"""End-to-end workflow integration testing."""

def test_complete_csv_to_execution_flow(self):
"""Test complete flow from CSV input to workflow execution."""

# Step 1: Create realistic CSV
csv_content = self.create_realistic_workflow_csv()
csv_file = self.create_temp_csv_file(csv_content)

# Step 2: Configure LLM responses for workflow
self.configure_llm_for_workflow()

# Step 3: Build graph from CSV
graph_result = self.graph_builder.build_graph_from_csv(csv_file)
self.assertTrue(graph_result.success)
self.assertIsNotNone(graph_result.graph)

# Step 4: Validate graph structure
validation_result = self.graph_builder.validate_graph(graph_result.graph)
self.assertTrue(validation_result.is_valid)

# Step 5: Execute workflow
initial_state = {"user_request": "process customer data"}
execution_result = self.workflow_executor.execute_workflow(
graph_result.graph,
initial_state
)

# Step 6: Verify end-to-end results
self.assertTrue(execution_result.success)
self.assertIn("final_result", execution_result.final_state)

# Step 7: Verify service coordination
self.verify_service_interactions()

def configure_llm_for_workflow(self):
"""Configure LLM mock for realistic workflow responses."""

def llm_response_generator(prompt, **kwargs):
if "analyze" in prompt.lower():
return {"analysis": "Customer data analysis completed"}
elif "process" in prompt.lower():
return {"processed_data": "Processed customer information"}
elif "format" in prompt.lower():
return {"final_result": "Formatted customer report"}
else:
return {"result": "default_response"}

self.mock_llm_service.process.side_effect = llm_response_generator

def verify_service_interactions(self):
"""Verify proper coordination between services."""

# Verify LLM was called for each processing step
llm_calls = self.mock_llm_service.process.call_args_list
self.assertGreaterEqual(len(llm_calls), 3) # At least 3 processing steps

# Verify storage interactions
storage_calls = self.mock_storage_service.method_calls
self.assertTrue(any("save" in str(call) for call in storage_calls))

# Verify configuration was accessed
config_service = self.container.get_service("app_config")
self.assertIsInstance(config_service, AppConfigService)

Cross-Service Communication Testing

class TestServiceCommunication(IntegrationTestBase):
"""Test communication patterns between services."""

def test_service_event_propagation(self):
"""Test event propagation between services."""

# Create event-aware services
event_manager = EventManager()

publisher_service = ServiceA(event_manager=event_manager)
subscriber_service = ServiceB(event_manager=event_manager)

# Subscribe to events
events_received = []

def event_handler(event):
events_received.append(event)

event_manager.subscribe("workflow.completed", event_handler)

# Trigger workflow that should generate events
result = publisher_service.execute_workflow("test_workflow")

# Verify event was propagated
self.assertTrue(result.success)
self.assertEqual(len(events_received), 1)

event = events_received[0]
self.assertEqual(event.type, "workflow.completed")
self.assertIn("workflow_id", event.data)

def test_service_dependency_chain(self):
"""Test complex service dependency chains."""

# Create chain: ServiceA -> ServiceB -> ServiceC -> External
service_c = ServiceC(external_api=self.mock_external_api)
service_b = ServiceB(service_c=service_c)
service_a = ServiceA(service_b=service_b)

# Configure external API mock
self.mock_external_api.process.return_value = {"external_result": "success"}

# Execute operation that flows through all services
result = service_a.complex_operation("test_input")

# Verify result propagated through chain
self.assertTrue(result.success)
self.assertEqual(result.data["external_result"], "success")

# Verify call chain
self.mock_external_api.process.assert_called_once()
self.assertIn("test_input", str(self.mock_external_api.process.call_args))

def test_circular_dependency_detection(self):
"""Test detection of circular service dependencies."""

# This should be caught during container setup
container = DIContainer()

with self.assertRaises(CircularDependencyError):
container.register_service("service_a", ServiceA, depends_on=["service_b"])
container.register_service("service_b", ServiceB, depends_on=["service_c"])
container.register_service("service_c", ServiceC, depends_on=["service_a"])

# This should trigger detection
container.resolve_dependencies()

📊 Monitoring and Observability Testing

Metrics and Telemetry Testing

class ObservabilityTestMixin:
"""Mixin for testing monitoring and observability features."""

def setUp(self):
super().setUp()
self.metrics_collector = MetricsCollector()
self.trace_recorder = TraceRecorder()

def assert_metric_recorded(self, metric_name, expected_value=None, tags=None):
"""Assert that a specific metric was recorded."""

metrics = self.metrics_collector.get_metrics()
matching_metrics = [m for m in metrics if m.name == metric_name]

self.assertGreater(
len(matching_metrics),
0,
f"Metric '{metric_name}' was not recorded"
)

if expected_value is not None:
metric_values = [m.value for m in matching_metrics]
self.assertIn(expected_value, metric_values)

if tags:
for metric in matching_metrics:
for tag_key, tag_value in tags.items():
self.assertIn(tag_key, metric.tags)
self.assertEqual(metric.tags[tag_key], tag_value)

def assert_trace_recorded(self, operation_name, min_duration=None, max_duration=None):
"""Assert that a trace was recorded for an operation."""

traces = self.trace_recorder.get_traces()
matching_traces = [t for t in traces if t.operation_name == operation_name]

self.assertGreater(
len(matching_traces),
0,
f"No trace recorded for operation '{operation_name}'"
)

if min_duration or max_duration:
for trace in matching_traces:
if min_duration:
self.assertGreaterEqual(trace.duration, min_duration)
if max_duration:
self.assertLessEqual(trace.duration, max_duration)

class TestServiceObservability(unittest.TestCase, ObservabilityTestMixin):
"""Test observability features of services."""

def test_request_metrics_collection(self):
"""Test that request metrics are properly collected."""

# Execute operations that should generate metrics
self.service.process_request("test_input")
self.service.process_batch_request(["input1", "input2", "input3"])

# Verify metrics were recorded
self.assert_metric_recorded("requests.processed", expected_value=1)
self.assert_metric_recorded("batch_requests.processed", expected_value=1)
self.assert_metric_recorded("items.processed", expected_value=3)

# Verify metric tags
self.assert_metric_recorded(
"requests.processed",
tags={"service": "MyService", "operation": "process_request"}
)

def test_error_metrics_collection(self):
"""Test that error metrics are collected appropriately."""

# Configure service to fail
self.mock_dependency.process.side_effect = Exception("Test error")

# Execute operation that should fail
with self.assertRaises(Exception):
self.service.process_request("failing_input")

# Verify error metrics
self.assert_metric_recorded("requests.failed", expected_value=1)
self.assert_metric_recorded(
"errors.by_type",
tags={"error_type": "Exception", "service": "MyService"}
)

def test_performance_tracing(self):
"""Test that performance traces are recorded."""

# Execute operation with tracing
with self.trace_recorder.trace("test_operation"):
result = self.service.complex_operation("test_input")

# Verify trace was recorded
self.assert_trace_recorded("test_operation", min_duration=0.001)

# Verify nested traces
self.assert_trace_recorded("complex_operation.step1")
self.assert_trace_recorded("complex_operation.step2")

def test_custom_metrics_integration(self):
"""Test integration with custom business metrics."""

# Execute business operations
self.service.process_customer_data({"customer_id": "123", "action": "purchase"})
self.service.process_customer_data({"customer_id": "456", "action": "refund"})

# Verify business metrics
self.assert_metric_recorded("customer.actions", expected_value=2)
self.assert_metric_recorded(
"customer.actions.by_type",
tags={"action_type": "purchase"}
)
self.assert_metric_recorded(
"customer.actions.by_type",
tags={"action_type": "refund"}
)

🎯 Test Execution and Automation

Test Suite Organization

# tests/conftest.py - Pytest configuration for advanced patterns

import pytest
from unittest.mock import Mock
from tests.utils.test_categories import TestCategory
from tests.utils.performance_markers import PerformanceTest
from tests.utils.integration_markers import IntegrationTest

def pytest_configure(config):
"""Configure pytest with custom markers."""

config.addinivalue_line("markers", "unit: Unit tests with mocked dependencies")
config.addinivalue_line("markers", "integration: Integration tests with real services")
config.addinivalue_line("markers", "performance: Performance and load tests")
config.addinivalue_line("markers", "security: Security validation tests")
config.addinivalue_line("markers", "slow: Tests that take longer than 5 seconds")

@pytest.fixture(scope="session")
def test_database():
"""Provide test database for integration tests."""

# Set up test database
db = create_test_database()
yield db

# Clean up
db.drop_all_tables()
db.close()

@pytest.fixture(scope="function")
def isolated_container():
"""Provide isolated DI container for each test."""

container = DIContainer()
yield container
container.clear()

# Example test categorization
@pytest.mark.unit
@pytest.mark.fast
class TestUnitPatterns:
"""Fast unit tests with full mocking."""
pass

@pytest.mark.integration
@pytest.mark.slow
class TestIntegrationPatterns:
"""Integration tests with real service coordination."""
pass

@pytest.mark.performance
@pytest.mark.slow
class TestPerformancePatterns:
"""Performance and load testing."""
pass

@pytest.mark.security
class TestSecurityPatterns:
"""Security validation tests."""
pass

Continuous Integration Patterns

# .github/workflows/testing-advanced.yml
name: Advanced Testing Pipeline

on: [push, pull_request]

jobs:
unit-tests:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.11, 3.12]

steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
with:
python-version: ${{ matrix.python-version }}

- name: Install dependencies
run: |
pip install poetry
poetry install --with dev

- name: Run unit tests
run: |
poetry run pytest tests/ -m "unit and not slow" \
--cov=agentmap --cov-report=xml \
--junit-xml=junit-unit.xml

- name: Upload coverage
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml

integration-tests:
runs-on: ubuntu-latest
needs: unit-tests

steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v3
with:
python-version: 3.11

- name: Run integration tests
run: |
poetry run pytest tests/ -m "integration" \
--junit-xml=junit-integration.xml

performance-tests:
runs-on: ubuntu-latest
needs: unit-tests

steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v3
with:
python-version: 3.11

- name: Run performance tests
run: |
poetry run pytest tests/ -m "performance" \
--junit-xml=junit-performance.xml \
--benchmark-json=benchmark.json

- name: Store benchmark results
uses: benchmark-action/github-action-benchmark@v1
with:
tool: 'pytest'
output-file-path: benchmark.json
github-token: ${{ secrets.GITHUB_TOKEN }}

security-tests:
runs-on: ubuntu-latest
needs: unit-tests

steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v3
with:
python-version: 3.11

- name: Run security tests
run: |
poetry run pytest tests/ -m "security" \
--junit-xml=junit-security.xml

- name: Run security scan
run: |
poetry run bandit -r src/ -f json -o bandit-report.json

- name: Upload security results
uses: github/codeql-action/upload-sarif@v2
with:
sarif_file: bandit-report.json

For Complex Testing Scenarios: These advanced patterns provide robust testing strategies for performance, security, integration, and specialized requirements in AgentMap applications.