Skip to main content

Storage Integration Guide

AgentMap provides enterprise-ready storage integration with comprehensive multi-provider support, advanced caching systems, and robust security patterns. This guide covers the complete storage integration ecosystem including local storage, cloud providers, vector databases, and advanced enterprise features.

Enterprise Storage Integration

AgentMap's storage integration is enterprise-ready with 95% feature coverage, supporting all major cloud providers with consistent APIs, robust error handling, and extensive configuration options. The system provides seamless data movement between providers with enterprise security and governance features.

Storage Architecture Overview

Core Storage Services

CSV Storage Service with Smart ID Detection

The CSV storage service provides pandas-based operations with intelligent ID field detection and flexible data access patterns.

# Automatic ID detection works for standard patterns
users = storage_service.read("users", document_id=123) # Uses 'id' column
orders = storage_service.read("orders", document_id="ORD001") # Uses 'order_id' column

# Multiple potential ID columns (uses priority: 'id' > '*_id' > 'id_*')
complex_data = [
{"record_id": "R001", "user_id": "U001", "id": 1, "data": "..."},
{"record_id": "R002", "user_id": "U002", "id": 2, "data": "..."}
]
record = storage_service.read("complex", document_id=1) # Uses 'id' (highest priority)

JSON Storage Service with Direct Storage Model

The JSON service uses direct storage where user data is stored exactly as provided, ensuring data integrity and predictable behavior.

# Write operation stores data exactly as provided
storage_service.write("users", {"name": "John", "age": 30}, "user123")

# Internal storage structure (in users.json)
{
"user123": {"name": "John", "age": 30} # User data stored directly
}

# Read operation returns data exactly as stored
user = storage_service.read("users", "user123")
# Returns: {"name": "John", "age": 30} # Data unchanged

File Storage Service with LangChain Integration

The file storage service handles text/binary files with advanced document processing capabilities.

# Text file operations
content = storage_service.read("documents", "readme.txt") # Raw content
storage_service.write("documents", "Hello, World!", "greeting.txt")

# Binary file operations
image_data = storage_service.read("images", "photo.jpg", binary_mode=True)
storage_service.write("images", binary_data, "new_photo.jpg", binary_mode=True)

# Structured format with metadata
doc = storage_service.read("documents", "readme.txt", format="structured")
# Returns: {"content": "...", "metadata": {"source": "...", "size": 123, "type": "text"}}

Cloud Storage Integration

Multi-Cloud Provider Support

AgentMap supports seamless integration with all major cloud storage providers through URI-based addressing and unified configuration.

ProviderURI FormatExampleUse Case
Azure Blob Storageazure://container/path/blob.jsonazure://documents/users.jsonMicrosoft ecosystem
AWS S3s3://bucket/path/object.jsons3://my-bucket/data/config.jsonAWS ecosystem
Google Cloud Storagegs://bucket/path/blob.jsongs://my-bucket/reports/monthly.jsonGoogle Cloud ecosystem
Local Storagelocal://path/file.jsonlocal://data/users.jsonDevelopment/testing

Multi-Cloud Workflow Patterns

graph_name,node_name,next_node,context,agent_type,next_on_success,next_on_failure,input_fields,output_field,prompt
CloudFlow,ReadAzure,,Read from Azure,cloud_json_reader,ProcessData,ErrorHandler,collection,azure_data,"azure://prod-data/users.json"
CloudFlow,ProcessData,,Transform data,DataProcessor,SaveAWS,ErrorHandler,azure_data,processed_data,
CloudFlow,SaveAWS,,Save to AWS S3,cloud_json_writer,NotifyComplete,ErrorHandler,processed_data,save_result,"s3://processed-data/users.json"
CloudFlow,NotifyComplete,,Send notification,EmailService,End,ErrorHandler,save_result,notification,
CloudFlow,End,,Completion,Echo,,,"notification",final_message,Multi-cloud processing complete
CloudFlow,ErrorHandler,,Handle errors,Echo,End,,"error",error_message,Error: {error}

Vector Storage Integration

AgentMap provides comprehensive vector storage capabilities for similarity search, embedding storage, and AI-powered data retrieval.

# Vector storage configuration
vector_config = {
"provider": "vector",
"options": {
"vector_db_type": "chroma", # or "pinecone", "weaviate", "qdrant"
"embedding_model": "text-embedding-ada-002",
"dimension": 1536,
"similarity_metric": "cosine"
}
}

# Store documents with embeddings
documents = [
{"id": "doc1", "text": "AgentMap is a powerful workflow system", "metadata": {"category": "tech"}},
{"id": "doc2", "text": "Storage integration supports multiple providers", "metadata": {"category": "storage"}}
]
storage_service.write("knowledge_base", documents, embedding_enabled=True)

# Similarity search
query = "workflow automation system"
similar_docs = storage_service.read("knowledge_base",
query={"similarity_search": query, "limit": 5})

Vector Database Providers

vector:
provider: chroma
options:
persist_directory: "./vector_db"
embedding_function: "openai"
collection_metadata: {"hnsw:space": "cosine"}

# Advanced ChromaDB configuration
vector:
provider: chroma
options:
client_settings:
chroma_api_url: "http://localhost:8000"
chroma_server_host: "localhost"
chroma_server_http_port: 8000
embedding_model: "text-embedding-ada-002"
batch_size: 100

Cache Management System

AgentMap implements advanced caching strategies to optimize storage performance and reduce latency.

Validation Cache Service

The validation cache service stores file validation results to avoid repeated expensive validation operations.

storage:
cache:
validation:
enabled: true
provider: "redis" # or "memory", "file"
ttl: 3600 # 1 hour cache
max_size: 1000 # Max cached validations

# Redis configuration
redis:
host: "localhost"
port: 6379
db: 1
password: "env:REDIS_PASSWORD"
key_prefix: "agentmap:validation:"

# File cache configuration (alternative)
file:
cache_directory: "./cache/validation"
max_file_size: "10MB"
cleanup_interval: 3600 # 1 hour

Routing Decision Cache

The routing decision cache stores LLM routing decisions to improve workflow performance and reduce LLM API costs.

storage:
cache:
routing:
enabled: true
provider: "redis"
ttl: 1800 # 30 minutes
max_size: 5000

# Cache key strategy
key_strategy: "content_hash" # or "input_hash", "semantic_hash"

# Invalidation rules
invalidation:
on_workflow_change: true
on_agent_update: true
max_age: 86400 # 24 hours

# Redis configuration
redis:
host: "localhost"
port: 6379
db: 2
key_prefix: "agentmap:routing:"

Performance Optimization Cache

class BulkOperationCache:
"""Cache for bulk storage operations."""

def __init__(self, cache_service):
self.cache = cache_service

def bulk_read(self, collections: list, batch_size: int = 10):
"""Optimized bulk read with caching."""
results = {}
uncached_collections = []

# Check cache for each collection
for collection in collections:
cached_data = self.cache.get(f"collection:{collection}")
if cached_data:
results[collection] = cached_data
else:
uncached_collections.append(collection)

# Fetch uncached data in batches
if uncached_collections:
for i in range(0, len(uncached_collections), batch_size):
batch = uncached_collections[i:i + batch_size]
batch_results = self._fetch_batch(batch)

# Cache and store results
for collection, data in batch_results.items():
self.cache.set(f"collection:{collection}", data, ttl=300)
results[collection] = data

return results

Enterprise Storage Governance

Data Lifecycle Management

storage:
lifecycle:
enabled: true
policies:
- name: "hot_to_warm"
conditions:
age_days: 30
access_frequency: "low"
actions:
- type: "tier_transition"
target: "warm_storage"
- type: "compress"
format: "gzip"

- name: "warm_to_cold"
conditions:
age_days: 90
tier: "warm_storage"
actions:
- type: "tier_transition"
target: "cold_storage"
- type: "index_removal"

- name: "data_retention"
conditions:
age_days: 2555 # 7 years
data_type: "personal_data"
actions:
- type: "anonymize"
fields: ["email", "phone", "address"]
- type: "archive"
destination: "long_term_archive"

- name: "compliance_deletion"
conditions:
age_days: 3650 # 10 years
compliance_flag: "gdpr_deletion_required"
actions:
- type: "secure_delete"
verification: "cryptographic"

Cost Optimization

class StorageCostAnalyzer:
"""Analyze and optimize storage costs."""

def __init__(self, cost_config):
self.config = cost_config
self.providers = cost_config['providers']

def analyze_costs(self, time_period: str = "monthly"):
"""Analyze storage costs across providers."""
cost_analysis = {
"period": time_period,
"total_cost": 0,
"by_provider": {},
"by_storage_class": {},
"optimization_opportunities": []
}

for provider in self.providers:
provider_costs = self._get_provider_costs(provider, time_period)
cost_analysis['by_provider'][provider] = provider_costs
cost_analysis['total_cost'] += provider_costs['total']

# Identify optimization opportunities
opportunities = self._identify_cost_optimizations(provider, provider_costs)
cost_analysis['optimization_opportunities'].extend(opportunities)

return cost_analysis

def recommend_optimizations(self, cost_analysis):
"""Recommend cost optimizations."""
recommendations = []

# Check for unused storage
for provider, costs in cost_analysis['by_provider'].items():
if costs['unused_storage_percentage'] > 20:
recommendations.append({
"type": "cleanup",
"provider": provider,
"description": f"Remove unused storage in {provider}",
"potential_savings": costs['unused_cost'],
"priority": "high"
})

# Check for tier optimization
for opportunity in cost_analysis['optimization_opportunities']:
if opportunity['type'] == 'tier_optimization':
recommendations.append({
"type": "tier_migration",
"provider": opportunity['provider'],
"description": opportunity['description'],
"potential_savings": opportunity['savings'],
"priority": opportunity['priority']
})

return recommendations

Audit and Compliance Logging

storage:
audit:
enabled: true
log_level: "detailed" # basic, standard, detailed

# What to audit
events:
- "read"
- "write"
- "delete"
- "access_denied"
- "config_change"
- "policy_violation"

# Where to store audit logs
storage:
provider: "secure_json"
location: "audit://logs/storage-audit.json"
encryption: true
retention_days: 2555 # 7 years

# Real-time monitoring
real_time:
enabled: true
alert_on:
- "unauthorized_access"
- "bulk_deletion"
- "policy_violation"
- "unusual_access_pattern"

# SIEM integration
siem:
enabled: true
endpoint: "https://siem.company.com/api/events"
format: "cef" # Common Event Format

Advanced Integration Patterns

Batch Processing Optimization

storage:
batch_processing:
enabled: true
default_batch_size: 100
max_batch_size: 1000
parallel_batches: 4
timeout_per_batch: 300 # 5 minutes

# Provider-specific optimizations
providers:
azure:
batch_size: 50 # Azure Blob optimal batch size
parallel_uploads: 3
use_block_blobs: true

aws:
batch_size: 100 # S3 optimal batch size
parallel_uploads: 5
multipart_threshold: "64MB"

gcp:
batch_size: 75 # GCS optimal batch size
parallel_uploads: 4
resumable_threshold: "8MB"

Health Monitoring and Metrics

storage:
monitoring:
enabled: true
check_interval: 60 # seconds

health_checks:
- name: "provider_connectivity"
enabled: true
timeout: 10
critical: true

- name: "storage_capacity"
enabled: true
warning_threshold: 80 # percent
critical_threshold: 95 # percent

- name: "operation_latency"
enabled: true
warning_threshold: 1000 # ms
critical_threshold: 5000 # ms

- name: "error_rate"
enabled: true
window: 300 # 5 minutes
warning_threshold: 5 # percent
critical_threshold: 10 # percent

metrics:
collection_interval: 30 # seconds
retention_days: 30

# Metrics to collect
collect:
- "operation_count"
- "operation_latency"
- "error_count"
- "data_transfer"
- "cache_hit_rate"
- "connection_pool_utilization"

alerting:
enabled: true
channels:
- type: "email"
recipients: ["ops@company.com"]
severity_threshold: "warning"

- type: "slack"
webhook_url: "env:SLACK_WEBHOOK_URL"
severity_threshold: "critical"

- type: "pagerduty"
service_key: "env:PAGERDUTY_SERVICE_KEY"
severity_threshold: "critical"

Configuration Reference

Complete Storage Configuration

# Complete production storage configuration
storage:
# Core storage services
csv:
provider: csv
options:
base_directory: "/var/app/data/csv"
encoding: "utf-8"
validate_paths: true
max_file_size: "100MB"
backup_enabled: true
compression: "gzip"

json:
provider: json
default_provider: "azure"
providers:
local:
base_dir: "/var/app/data/json"
backup_enabled: true

azure:
connection_string: "env:AZURE_STORAGE_CONNECTION_STRING"
default_container: "agentmap-prod"
containers:
users: "users-prod"
configs: "configs-prod"
analytics: "analytics-prod"
timeout: 30
retry_count: 3
encryption:
enabled: true
key_vault_url: "https://agentmap-kv.vault.azure.net/"

aws:
region: "us-west-2"
access_key: "env:AWS_ACCESS_KEY_ID"
secret_key: "env:AWS_SECRET_ACCESS_KEY"
default_bucket: "agentmap-prod"
buckets:
backups: "agentmap-backups"
archives: "agentmap-archives"
encryption:
server_side_encryption: "AES256"
kms_key_id: "arn:aws:kms:us-west-2:123456789012:key/12345678-1234-1234-1234-123456789012"

gcp:
project_id: "env:GCP_PROJECT_ID"
credentials_file: "/var/app/config/gcp-service-account.json"
default_bucket: "agentmap-prod"
encryption:
kms_key_name: "projects/PROJECT_ID/locations/global/keyRings/agentmap/cryptoKeys/storage"

file:
provider: file
options:
base_directory: "/var/app/data/files"
allow_binary: false
max_file_size: "50MB"
allowed_extensions: [".txt", ".md", ".json", ".csv", ".pdf"]
virus_scanning: true
backup_enabled: true

# Vector storage
vector:
provider: pinecone
options:
api_key: "env:PINECONE_API_KEY"
environment: "env:PINECONE_ENVIRONMENT"
index_name: "agentmap-knowledge"
dimension: 1536
metric: "cosine"
namespaces:
docs: "documentation"
support: "support-tickets"
knowledge: "knowledge-base"

# Cache management
cache:
validation:
enabled: true
provider: "redis"
ttl: 3600
max_size: 1000
redis:
host: "redis.internal.company.com"
port: 6379
db: 1
password: "env:REDIS_PASSWORD"
ssl: true

routing:
enabled: true
provider: "redis"
ttl: 1800
max_size: 5000
redis:
host: "redis.internal.company.com"
port: 6379
db: 2
password: "env:REDIS_PASSWORD"
ssl: true

# Data lifecycle management
lifecycle:
enabled: true
policies:
- name: "tier_transition"
conditions:
age_days: 30
access_frequency: "low"
actions:
- type: "tier_transition"
target: "archive_storage"

- name: "compliance_retention"
conditions:
age_days: 2555 # 7 years
data_classification: "personal"
actions:
- type: "anonymize"
fields: ["email", "phone", "address"]
- type: "archive"
destination: "compliance_archive"

# Monitoring and alerting
monitoring:
enabled: true
check_interval: 60
health_checks:
- name: "provider_connectivity"
enabled: true
timeout: 10
critical: true
- name: "storage_capacity"
enabled: true
warning_threshold: 80
critical_threshold: 95
- name: "operation_latency"
enabled: true
warning_threshold: 1000
critical_threshold: 5000

metrics:
collection_interval: 30
retention_days: 30

alerting:
enabled: true
channels:
- type: "email"
recipients: ["ops@company.com"]
severity_threshold: "warning"
- type: "pagerduty"
service_key: "env:PAGERDUTY_SERVICE_KEY"
severity_threshold: "critical"

# Security and compliance
security:
encryption_at_rest: true
encryption_in_transit: true
access_logging: true
audit_logging: true

audit:
enabled: true
log_level: "detailed"
events: ["read", "write", "delete", "access_denied", "config_change"]
storage:
provider: "secure_json"
location: "audit://logs/storage-audit.json"
encryption: true
retention_days: 2555

Security Best Practices

Enterprise Security Implementation

# Comprehensive encryption configuration
storage:
security:
# Encryption at rest
encryption_at_rest:
enabled: true
algorithm: "AES-256"
key_management: "external" # or "internal"

# Provider-specific encryption
azure:
encryption_scope: "agentmap-scope"
key_vault:
vault_url: "https://agentmap-kv.vault.azure.net/"
key_name: "storage-encryption-key"

aws:
kms_key_id: "arn:aws:kms:us-west-2:123456789012:key/12345678-1234-1234-1234-123456789012"
encryption_context:
application: "agentmap"
environment: "production"

gcp:
kms_key_name: "projects/PROJECT_ID/locations/global/keyRings/agentmap/cryptoKeys/storage"

# Encryption in transit
encryption_in_transit:
enabled: true
tls_version: "1.2"
cipher_suites: ["TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"]
certificate_validation: true

# Field-level encryption for sensitive data
field_encryption:
enabled: true
fields: ["email", "phone", "ssn", "credit_card"]
algorithm: "AES-256-GCM"
key_derivation: "PBKDF2"

Integration Examples

Agent Catalog Integration

AgentMap storage services integrate seamlessly with the agent catalog system for automated discovery and configuration.

# Agent catalog integration for storage services
from agentmap.registry import AgentCatalog
from agentmap.services.protocols import StorageCapableAgent

class StorageAgentCatalog:
"""Agent catalog with storage service integration."""

def __init__(self, storage_service, agent_catalog: AgentCatalog):
self.storage_service = storage_service
self.agent_catalog = agent_catalog

def register_storage_agents(self):
"""Register all storage-capable agents."""

# CSV agents
self.agent_catalog.register_agent(
agent_id="csv_reader",
agent_class="CSVReaderAgent",
capabilities=["storage", "csv", "data_reading"],
config_schema={
"collection": {"type": "string", "required": True},
"format": {"type": "string", "enum": ["dict", "records", "dataframe"], "default": "dict"},
"id_field": {"type": "string", "required": False},
"query": {"type": "object", "required": False}
}
)

# JSON agents
self.agent_catalog.register_agent(
agent_id="json_writer",
agent_class="JSONWriterAgent",
capabilities=["storage", "json", "data_writing"],
config_schema={
"collection": {"type": "string", "required": True},
"mode": {"type": "string", "enum": ["write", "update", "append"], "default": "write"},
"path": {"type": "string", "required": False},
"create_if_missing": {"type": "boolean", "default": True}
}
)

# Cloud storage agents
self.agent_catalog.register_agent(
agent_id="cloud_json_reader",
agent_class="CloudJSONReaderAgent",
capabilities=["storage", "cloud", "json", "data_reading"],
config_schema={
"collection": {"type": "string", "required": True, "pattern": "^(azure|s3|gs)://.*"},
"format": {"type": "string", "enum": ["raw", "structured"], "default": "raw"},
"timeout": {"type": "integer", "default": 30},
"retry_count": {"type": "integer", "default": 3}
}
)

# Vector search agents
self.agent_catalog.register_agent(
agent_id="vector_searcher",
agent_class="VectorSearchAgent",
capabilities=["storage", "vector", "similarity_search"],
config_schema={
"collection": {"type": "string", "required": True},
"query": {"type": "string", "required": True},
"limit": {"type": "integer", "default": 5},
"threshold": {"type": "number", "default": 0.7}
}
)

def get_storage_agents_by_capability(self, capability: str):
"""Get storage agents by specific capability."""
return self.agent_catalog.find_agents_by_capability(capability)

def auto_configure_storage_agents(self):
"""Automatically configure storage services for agents."""
storage_agents = self.agent_catalog.find_agents_by_interface(StorageCapableAgent)

for agent_id, agent_info in storage_agents.items():
agent_instance = self.agent_catalog.get_agent(agent_id)
if hasattr(agent_instance, 'configure_storage_service'):
agent_instance.configure_storage_service(self.storage_service)

Troubleshooting Guide

Common Issues and Solutions

Problem: Storage provider connectivity failures

Symptoms:

  • ConnectionTimeout exceptions
  • AuthenticationError messages
  • Intermittent failures

Solutions:

# Check connectivity
async def diagnose_connectivity():
"""Diagnose storage connectivity issues."""

# Test basic connectivity
try:
health_check = await storage_service.read_async("health_check", "ping")
print("✅ Basic connectivity OK")
except Exception as e:
print(f"❌ Connectivity failed: {e}")
return False

# Test authentication
try:
test_write = await storage_service.write_async("test", {"timestamp": datetime.utcnow()}, "auth_test")
if test_write.success:
print("✅ Authentication OK")
# Cleanup test data
await storage_service.delete_async("test", "auth_test")
else:
print(f"❌ Authentication failed: {test_write.error}")
except Exception as e:
print(f"❌ Authentication error: {e}")

return True

# Recommended fixes
fixes = {
"timeout": "Increase timeout values in configuration",
"auth": "Check credentials and permissions",
"network": "Verify firewall rules and DNS resolution",
"ssl": "Check SSL certificate validation"
}

Diagnostic Tools

#!/usr/bin/env python3
"""
AgentMap Storage Health Check Tool
Run comprehensive health checks on storage system.
"""

import asyncio
import json
from datetime import datetime

class StorageHealthChecker:
def __init__(self, config_path: str):
self.config = self._load_config(config_path)
self.storage_service = self._init_storage_service()

async def run_comprehensive_health_check(self):
"""Run all health checks."""
print("🔍 Starting AgentMap Storage Health Check...")
print("=" * 50)

results = {
"timestamp": datetime.utcnow().isoformat(),
"overall_status": "healthy",
"checks": {}
}

# Connectivity check
print("📡 Checking connectivity...")
connectivity = await self._check_connectivity()
results["checks"]["connectivity"] = connectivity

# Performance check
print("⚡ Checking performance...")
performance = await self._check_performance()
results["checks"]["performance"] = performance

# Data integrity check
print("🔐 Checking data integrity...")
integrity = await self._check_data_integrity()
results["checks"]["data_integrity"] = integrity

# Security check
print("🛡️ Checking security...")
security = await self._check_security()
results["checks"]["security"] = security

# Cache check
print("💾 Checking cache systems...")
cache = await self._check_cache_systems()
results["checks"]["cache"] = cache

# Determine overall status
failed_checks = [
name for name, check in results["checks"].items()
if check["status"] != "healthy"
]

if failed_checks:
results["overall_status"] = "degraded" if len(failed_checks) <= 2 else "unhealthy"

print("\n" + "=" * 50)
print(f"🏥 Overall Status: {results['overall_status'].upper()}")

if failed_checks:
print(f"❌ Failed checks: {', '.join(failed_checks)}")
else:
print("✅ All checks passed!")

return results

async def _check_connectivity(self):
"""Check connectivity to all storage providers."""
providers = ["local", "azure", "aws", "gcp"]
results = {"status": "healthy", "details": {}}

for provider in providers:
try:
# Test basic connectivity
test_result = await self._test_provider_connectivity(provider)
results["details"][provider] = {
"status": "healthy" if test_result else "unhealthy",
"response_time_ms": test_result.get("response_time", 0)
}
except Exception as e:
results["details"][provider] = {
"status": "unhealthy",
"error": str(e)
}
results["status"] = "degraded"

return results

if __name__ == "__main__":
import sys

config_path = sys.argv[1] if len(sys.argv) > 1 else "storage_config.yaml"

checker = StorageHealthChecker(config_path)
results = asyncio.run(checker.run_comprehensive_health_check())

# Save results
with open(f"health_check_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json", "w") as f:
json.dump(results, f, indent=2)

# Exit with appropriate code
exit_code = 0 if results["overall_status"] == "healthy" else 1
sys.exit(exit_code)

Conclusion

AgentMap's storage integration provides enterprise-ready capabilities with 95% feature coverage across all major storage providers. The system supports:

  • Unified Storage API across CSV, JSON, file, and vector storage
  • Multi-Cloud Integration with Azure, AWS, and Google Cloud
  • Advanced Caching for performance optimization
  • Enterprise Governance with compliance and lifecycle management
  • Comprehensive Security with encryption and access controls
  • Production Monitoring with health checks and metrics

The storage integration is designed for scalability, reliability, and ease of use, providing a solid foundation for enterprise data operations in AgentMap workflows.

Production Deployment

For production deployments, implement:

  1. Multi-region replication for disaster recovery
  2. Automated backup strategies with retention policies
  3. Comprehensive monitoring with real-time alerts
  4. Cost optimization through lifecycle policies
  5. Security hardening with encryption and access controls
  6. Compliance automation for regulatory requirements