Cloud Storage Integration
AgentMap supports seamless integration with major cloud storage providers for JSON document operations. This feature allows you to read and write JSON documents directly from/to Azure Blob Storage, AWS S3, and Google Cloud Storage without changing your workflow structure.
- Scalability: Handle large datasets without local storage limitations
- Reliability: Built-in redundancy and backup features
- Security: Enterprise-grade encryption and access controls
- Collaboration: Share data across teams and environments
- Cost-Effective: Pay only for what you use
Supported Cloud Providers
- Azure Blob Storage
- AWS S3
- Google Cloud Storage
- Service: Azure Blob Storage
- Authentication: Connection string or account key
- Features: Container-based organization, metadata support
- Best For: Microsoft ecosystem integration
- Service: Amazon S3
- Authentication: Access key/secret or IAM roles
- Features: Bucket-based organization, versioning
- Best For: AWS ecosystem integration
- Service: Google Cloud Storage
- Authentication: Service account or application default credentials
- Features: Bucket-based organization, fine-grained permissions
- Best For: Google Cloud ecosystem integration
Configuration
Basic Configuration Structure
Update your storage_config.yaml
file with cloud provider configurations:
json:
default_provider: "local" # Default provider if not specified in URI
providers:
local:
base_dir: "data/json"
azure:
connection_string: "env:AZURE_STORAGE_CONNECTION_STRING"
default_container: "documents"
containers:
users: "users-container"
reports: "reports-container"
aws:
region: "us-west-2"
access_key: "env:AWS_ACCESS_KEY_ID"
secret_key: "env:AWS_SECRET_ACCESS_KEY"
default_bucket: "my-documents"
buckets:
users: "users-bucket"
reports: "reports-bucket"
gcp:
project_id: "env:GCP_PROJECT_ID"
credentials_file: "path/to/service-account.json"
default_bucket: "documents"
collections:
# Local files
users: "users.json"
# Cloud storage with explicit URIs
azure_users: "azure://users/data.json"
aws_reports: "s3://reports/monthly.json"
gcp_documents: "gs://documents/archive.json"
Provider-Specific Configuration
- Azure Configuration
- AWS Configuration
- GCP Configuration
azure:
# Primary authentication method (recommended)
connection_string: "env:AZURE_STORAGE_CONNECTION_STRING"
# Alternative authentication
# account_name: "env:AZURE_STORAGE_ACCOUNT"
# account_key: "env:AZURE_STORAGE_KEY"
# Container configuration
default_container: "documents"
containers:
users: "users-prod-container"
configs: "app-configs"
logs: "application-logs"
# Optional settings
timeout: 30 # Connection timeout in seconds
retry_count: 3
enable_logging: true
aws:
# Authentication
region: "us-west-2"
access_key: "env:AWS_ACCESS_KEY_ID"
secret_key: "env:AWS_SECRET_ACCESS_KEY"
# Optional session token for assumed roles
# session_token: "env:AWS_SESSION_TOKEN"
# Bucket configuration
default_bucket: "my-documents"
buckets:
users: "users-prod-bucket"
analytics: "analytics-data"
backups: "system-backups"
# Optional settings
endpoint_url: null # Custom endpoint for LocalStack/MinIO
use_ssl: true
verify_ssl: true
timeout: 30
gcp:
# Project and authentication
project_id: "env:GCP_PROJECT_ID"
# Authentication options
credentials_file: "path/to/service-account.json"
# Or use application default credentials (for GCE/Cloud Run)
# use_default_credentials: true
# Bucket configuration
default_bucket: "documents"
buckets:
users: "users-prod-bucket"
ml_models: "ml-model-storage"
logs: "application-logs"
# Optional settings
timeout: 30
retry_count: 3
URI Format for Cloud Storage
Cloud storage locations are specified using these URI formats:
Provider | URI Format | Example | Description |
---|---|---|---|
Azure Blob Storage | azure://container/path/to/blob.json | azure://documents/users.json | Container-based storage |
AWS S3 | s3://bucket/path/to/object.json | s3://my-bucket/data/config.json | Bucket-based storage |
Google Cloud Storage | gs://bucket/path/to/blob.json | gs://my-bucket/reports/monthly.json | Bucket-based storage |
URI Examples
- Simple Paths
- Nested Paths
- Environment-Specific
collections:
user_profiles: "azure://users/profiles.json"
app_config: "s3://config/app.json"
reports: "gs://analytics/reports.json"
collections:
daily_logs: "azure://logs/2024/01/daily.json"
model_configs: "s3://ml/models/v1/config.json"
user_analytics: "gs://data/users/analytics/summary.json"
collections:
# Production
prod_users: "azure://prod-users/data.json"
prod_config: "s3://prod-config/app.json"
# Staging
staging_users: "azure://staging-users/data.json"
staging_config: "s3://staging-config/app.json"
# Development
dev_users: "azure://dev-users/data.json"
dev_config: "s3://dev-config/app.json"
Authentication Methods
Environment Variables
The recommended approach is to use environment variables for sensitive credentials:
- Azure Environment
- AWS Environment
- GCP Environment
# Azure Blob Storage
export AZURE_STORAGE_CONNECTION_STRING="DefaultEndpointsProtocol=https;AccountName=...;AccountKey=...;EndpointSuffix=core.windows.net"
# Alternative approach
export AZURE_STORAGE_ACCOUNT="mystorageaccount"
export AZURE_STORAGE_KEY="your-account-key"
# AWS S3
export AWS_ACCESS_KEY_ID="your-access-key"
export AWS_SECRET_ACCESS_KEY="your-secret-key"
export AWS_DEFAULT_REGION="us-west-2"
# For assumed roles
export AWS_SESSION_TOKEN="your-session-token"
# Google Cloud Storage
export GCP_PROJECT_ID="your-project-id"
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account.json"
# For Cloud Run/GCE (uses metadata service)
# No environment variables needed when using default credentials
Authentication Best Practices
- Use Environment Variables: Never hardcode credentials in configuration files
- Rotate Credentials Regularly: Set up automatic credential rotation where possible
- Least Privilege Access: Grant only the minimum required permissions
- Use IAM Roles: Prefer IAM roles over static credentials in cloud environments
- Enable Audit Logging: Track access to sensitive data
Advanced Authentication Options
Azure Managed Identity (for Azure VMs/App Service):
azure:
use_managed_identity: true
default_container: "documents"
AWS IAM Roles (for EC2/Lambda):
aws:
region: "us-west-2"
# No credentials needed - uses instance profile
default_bucket: "my-documents"
GCP Service Account (detailed configuration):
gcp:
project_id: "env:GCP_PROJECT_ID"
credentials_file: "/opt/app/credentials/service-account.json"
scopes:
- "https://www.googleapis.com/auth/devstorage.read_write"
default_bucket: "documents"
Using Cloud Storage in Workflows
CSV Workflow Examples
- Basic Cloud Operations
- Named Collections
- Multi-Cloud Pipeline
GraphName,Node,Edge,Context,AgentType,Success_Next,Failure_Next,Input_Fields,Output_Field,Prompt
CloudFlow,ReadData,,Read from Azure,cloud_json_reader,Process,,collection,data,"azure://container/data.json"
CloudFlow,Process,,Process data,DataProcessor,SaveData,,data,processed_data,"Process the data"
CloudFlow,SaveData,,Save to AWS S3,cloud_json_writer,End,,processed_data,result,"s3://bucket/output.json"
CloudFlow,End,,Completion,Echo,,,"result",final_message,Data processing complete
GraphName,Node,Edge,Context,AgentType,Success_Next,Failure_Next,Input_Fields,Output_Field,Prompt
CloudFlow,ReadUsers,,Read user data,cloud_json_reader,ProcessUsers,,collection,users,"azure_users"
CloudFlow,ProcessUsers,,Process users,UserProcessor,SaveResults,,users,processed_users,
CloudFlow,SaveResults,,Save to cloud,cloud_json_writer,End,,processed_users,result,"aws_reports"
CloudFlow,End,,Completion,Echo,,,"result",final_message,User processing complete
GraphName,Node,Edge,Context,AgentType,Success_Next,Failure_Next,Input_Fields,Output_Field,Prompt
MultiCloud,ReadAzure,,Read from Azure,cloud_json_reader,Process1,,collection,azure_data,"azure://source/data.json"
MultiCloud,Process1,,Process Azure data,DataProcessor,ReadAWS,,azure_data,processed_azure,
MultiCloud,ReadAWS,,Read from AWS,cloud_json_reader,Process2,,collection,aws_data,"s3://source/data.json"
MultiCloud,Process2,,Combine data,DataCombiner,SaveGCP,,processed_azure;aws_data,combined_data,
MultiCloud,SaveGCP,,Save to GCP,cloud_json_writer,End,,combined_data,result,"gs://output/combined.json"
MultiCloud,End,,Completion,Echo,,,"result",final_message,Multi-cloud processing complete
Agent Context Configuration
- Cloud Reader Agent
- Cloud Writer Agent
- Batch Operations
# Cloud JSON Reader Agent Context
{
"collection": "azure://prod-data/users.json",
"format": "raw",
"timeout": 30,
"retry_count": 3,
"cache_enabled": true
}
# Cloud JSON Writer Agent Context
{
"collection": "s3://prod-output/results.json",
"mode": "update",
"create_if_missing": true,
"backup_enabled": true,
"compression": "gzip"
}
# Batch Processing Agent Context
{
"source_collections": [
"azure://data/batch1.json",
"azure://data/batch2.json",
"azure://data/batch3.json"
],
"destination": "s3://processed/combined.json",
"batch_size": 100,
"parallel_processing": true
}
Container/Bucket Mappings
You can map logical container/bucket names to actual storage containers for better organization and environment management:
azure:
containers:
users: "users-prod-container" # Production users
configs: "app-configs-v2" # Application configurations
logs: "application-logs-2024" # Current year logs
temp: "temporary-processing" # Temporary data
aws:
buckets:
analytics: "analytics-prod-us-west-2" # Regional analytics data
backups: "system-backups-encrypted" # Encrypted backups
ml_models: "ml-models-versioned" # Versioned ML models
user_uploads: "user-uploads-secure" # Secure user uploads
gcp:
buckets:
documents: "documents-prod-global" # Global document storage
images: "images-cdn-optimized" # CDN-optimized images
archives: "long-term-archives" # Long-term archival
processing: "temp-processing-queue" # Temporary processing queue
Then use logical names in URIs:
collections:
user_data: "azure://users/profiles.json" # Uses "users-prod-container"
app_config: "s3://configs/app.json" # Uses "app-configs-v2"
documents: "gs://documents/archive.json" # Uses "documents-prod-global"
Required Dependencies
Install the appropriate cloud SDK packages:
- Installation Commands
- requirements.txt
- Docker
# Azure Blob Storage
pip install azure-storage-blob
# AWS S3
pip install boto3
# Google Cloud Storage
pip install google-cloud-storage
# Install all cloud providers
pip install azure-storage-blob boto3 google-cloud-storage
# Cloud storage dependencies
azure-storage-blob>=12.19.0
boto3>=1.34.0
google-cloud-storage>=2.10.0
# Optional: for enhanced features
aiofiles>=23.2.0 # Async file operations
tenacity>=8.2.0 # Retry logic
cryptography>=41.0.0 # Enhanced encryption
FROM python:3.11-slim
# Install cloud storage dependencies
RUN pip install \
azure-storage-blob>=12.19.0 \
boto3>=1.34.0 \
google-cloud-storage>=2.10.0
# Copy your application
COPY . /app
WORKDIR /app
# Run your application
CMD ["python", "main.py"]
Error Handling and Troubleshooting
Common Issues and Solutions
Authentication Failures
Problem: AuthenticationError
or Unauthorized
exceptions
Solutions:
- Verify environment variables are set correctly
- Check credential expiration dates
- Ensure proper permissions on containers/buckets
- Validate connection strings format
# Debug authentication
import os
print("Azure connection string:", os.getenv("AZURE_STORAGE_CONNECTION_STRING", "Not set"))
print("AWS access key:", os.getenv("AWS_ACCESS_KEY_ID", "Not set"))
print("GCP project:", os.getenv("GCP_PROJECT_ID", "Not set"))
Network Connectivity Issues
Problem: ConnectionError
or timeout exceptions
Solutions:
- Check internet connectivity
- Verify firewall rules allow outbound HTTPS
- Increase timeout values in configuration
- Check cloud provider service status
# Increase timeouts
azure:
timeout: 60 # Increase from default 30
retry_count: 5
aws:
timeout: 60
verify_ssl: false # Only for testing
gcp:
timeout: 60
retry_count: 5
Container/Bucket Not Found
Problem: ContainerNotFound
or NoSuchBucket
errors
Solutions:
- Verify container/bucket exists in cloud console
- Check spelling and case sensitivity
- Ensure proper region configuration
- Create containers/buckets if needed
# Check if container/bucket exists
def check_storage_exists():
# Implementation depends on provider
# Add checks before workflow execution
pass
Permission Denied Errors
Problem: PermissionDenied
or AccessDenied
exceptions
Solutions:
- Verify IAM permissions include required operations
- Check if containers/buckets have public access restrictions
- Ensure service account has proper roles
- Review bucket policies and ACLs
Required Permissions by Provider:
Azure: Storage Blob Data Contributor
or custom role with:
Microsoft.Storage/storageAccounts/blobServices/containers/read
Microsoft.Storage/storageAccounts/blobServices/containers/blobs/read
Microsoft.Storage/storageAccounts/blobServices/containers/blobs/write
AWS: Policy with actions:
s3:GetObject
s3:PutObject
s3:DeleteObject
s3:ListBucket
GCP: Role Storage Object Admin
or custom role with:
storage.objects.get
storage.objects.create
storage.objects.update
storage.objects.delete
Monitoring and Logging
- Enable Logging
- Health Checks
- Performance Metrics
# Enhanced logging configuration
azure:
enable_logging: true
log_level: "INFO" # DEBUG, INFO, WARNING, ERROR
aws:
enable_logging: true
log_requests: true
log_responses: false # Avoid logging sensitive data
gcp:
enable_logging: true
log_level: "INFO"
# Storage health check implementation
async def check_cloud_storage_health():
"""Check connectivity to all configured cloud providers."""
health_status = {}
# Azure check
try:
result = storage_service.read("azure://health/check.json")
health_status["azure"] = "healthy" if result else "degraded"
except Exception as e:
health_status["azure"] = f"error: {str(e)}"
# AWS check
try:
result = storage_service.read("s3://health/check.json")
health_status["aws"] = "healthy" if result else "degraded"
except Exception as e:
health_status["aws"] = f"error: {str(e)}"
# GCP check
try:
result = storage_service.read("gs://health/check.json")
health_status["gcp"] = "healthy" if result else "degraded"
except Exception as e:
health_status["gcp"] = f"error: {str(e)}"
return health_status
# Track performance metrics
import time
class CloudStorageMetrics:
def __init__(self):
self.operation_times = {}
self.error_counts = {}
def track_operation(self, provider, operation, duration):
key = f"{provider}_{operation}"
if key not in self.operation_times:
self.operation_times[key] = []
self.operation_times[key].append(duration)
def track_error(self, provider, error_type):
key = f"{provider}_{error_type}"
self.error_counts[key] = self.error_counts.get(key, 0) + 1
def get_stats(self):
return {
"average_times": {
k: sum(v) / len(v) for k, v in self.operation_times.items()
},
"error_counts": self.error_counts
}
Performance Optimization
Caching Strategies
- Local Caching
- Redis Caching
json:
cache:
enabled: true
ttl: 300 # 5 minutes
max_size: 100 # Max cached items
strategy: "lru" # Least Recently Used
json:
cache:
enabled: true
provider: "redis"
redis_url: "redis://localhost:6379/0"
ttl: 600 # 10 minutes
key_prefix: "agentmap:storage:"
Batch Operations
# Efficient batch processing
async def process_cloud_data_batch(collections, batch_size=10):
"""Process multiple cloud collections efficiently."""
results = []
for i in range(0, len(collections), batch_size):
batch = collections[i:i + batch_size]
# Process batch concurrently
batch_tasks = [
storage_service.read(collection)
for collection in batch
]
batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
results.extend(batch_results)
return results
Connection Pooling
# Connection pool configuration
azure:
connection_pool:
max_connections: 10
max_idle_time: 300
aws:
connection_pool:
max_connections: 10
max_retries: 3
backoff_mode: "adaptive"
gcp:
connection_pool:
max_connections: 10
keepalive_timeout: 300
Security Best Practices
Data Encryption
- Encryption in Transit
- Encryption at Rest
# Ensure HTTPS/TLS for all providers
azure:
use_ssl: true
verify_ssl: true
aws:
use_ssl: true
verify_ssl: true
gcp:
use_ssl: true # Always enabled
# Provider-specific encryption settings
azure:
encryption:
enabled: true
key_vault_url: "https://myvault.vault.azure.net/"
key_name: "storage-encryption-key"
aws:
encryption:
server_side_encryption: "AES256"
# Or use KMS
# kms_key_id: "arn:aws:kms:us-west-2:123456789012:key/12345678-1234-1234-1234-123456789012"
gcp:
encryption:
# Uses Google-managed encryption by default
# Or specify customer-managed key
# kms_key_name: "projects/PROJECT_ID/locations/LOCATION/keyRings/RING_ID/cryptoKeys/KEY_ID"
Access Control
# Implement least privilege access
azure:
rbac:
enabled: true
roles:
- "Storage Blob Data Reader" # For read-only operations
- "Storage Blob Data Contributor" # For read/write operations
aws:
iam:
policy_arn: "arn:aws:iam::123456789012:policy/AgentMapStoragePolicy"
# Custom policy with minimal required permissions
gcp:
iam:
service_account: "agentmap-storage@project.iam.gserviceaccount.com"
roles:
- "roles/storage.objectViewer" # Read access
- "roles/storage.objectCreator" # Write access
Related Documentation
- Storage Services Overview - Core storage service concepts
- Service Registry Patterns - Host service integration
- Configuration Reference - Complete configuration options
- Security Guide - Security best practices
For production deployments, consider implementing:
- Multi-region replication for disaster recovery
- Automated backup strategies with retention policies
- Monitoring and alerting for storage operations
- Cost optimization through lifecycle policies
- Compliance controls for data governance