In the world of agentic AI, performance isn't just about speed—it's about efficiency, resource utilization, and delivering value at scale. As your AI agents move from prototype to production, optimization becomes the critical factor that separates successful deployments from costly failures.
Think of optimization like tuning a high-performance engine. Every component must work in harmony, resources must be allocated precisely, and the system must adapt to varying conditions. Poorly optimized agents can lead to skyrocketing cloud costs, sluggish user experiences, and system failures under load.
This comprehensive lesson explores the art and science of optimizing agentic AI systems. We'll dive deep into performance tuning techniques, resource management strategies, and efficiency patterns that will transform your agents from resource-hungry prototypes into lean, scalable production systems.
Whether you're optimizing a single agent or a complex multi-agent ecosystem, understanding these optimization techniques is essential for building cost-effective, responsive, and reliable AI systems that can handle real-world workloads.
By the end of this comprehensive lesson, you will be able to:
Performance bottlenecks in agentic AI systems typically fall into four categories:
Characteristics: High CPU/GPU utilization, slow processing times Common Causes: Complex model inference, intensive calculations, inefficient algorithms
Identification:
import time
import psutil
from contextlib import contextmanager
@contextmanager
def performance_monitor(operation_name):
"""Monitor performance of an operation"""
start_time = time.time()
start_cpu = psutil.cpu_percent()
start_memory = psutil.virtual_memory().percent
try:
yield
finally:
end_time = time.time()
end_cpu = psutil.cpu_percent()
end_memory = psutil.virtual_memory().percent
duration = end_time - start_time
cpu_usage = end_cpu - start_cpu
memory_usage = end_memory - start_memory
print(f"Operation: {operation_name}")
print(f"Duration: {duration:.2f}s")
print(f"CPU Usage: {cpu_usage:.1f}%")
print(f"Memory Usage: {memory_usage:.1f}%")
# Usage example
with performance_monitor("model_inference"):
result = model.predict(input_data)
Characteristics: High memory usage, frequent garbage collection, out-of-memory errors Common Causes: Large context windows, inefficient data structures, memory leaks
Memory Optimization Example:
import gc
from typing import Generator
class MemoryOptimizedAgent:
def __init__(self):
self.context_window = []
self.max_context_size = 1000
def add_context(self, item):
"""Add context with memory management"""
self.context_window.append(item)
# Remove old items to prevent memory bloat
if len(self.context_window) > self.max_context_size:
self.context_window = self.context_window[-self.max_context_size:]
def process_batch(self, items: list) -> Generator:
"""Process items in batches to reduce memory pressure"""
batch_size = 100
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
yield self.process_batch_items(batch)
# Explicit cleanup
del batch
gc.collect()
def process_batch_items(self, batch):
"""Process a single batch"""
# Batch processing logic here
return results
Characteristics: Slow response times, high network/disk usage Common Causes: Database queries, API calls, file operations
I/O Optimization Example:
import asyncio
import aiohttp
from asyncio import Semaphore
class AsyncOptimizedAgent:
def __init__(self, max_concurrent_requests=10):
self.semaphore = Semaphore(max_concurrent_requests)
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def fetch_data_concurrent(self, urls):
"""Fetch multiple URLs concurrently with rate limiting"""
tasks = []
for url in urls:
task = self.fetch_single_url(url)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def fetch_single_url(self, url):
"""Fetch single URL with semaphore control"""
async with self.semaphore:
try:
async with self.session.get(url) as response:
return await response.json()
except Exception as e:
print(f"Error fetching {url}: {e}")
return None
Characteristics: High latency, bandwidth limitations, connection overhead Common Causes: Remote API calls, distributed agent communication
Network Optimization Example:
import zlib
import json
from typing import Dict, Any
class NetworkOptimizedAgent:
def __init__(self):
self.compression_threshold = 1024 # Compress payloads > 1KB
def optimize_payload(self, data: Dict[str, Any]) -> bytes:
"""Optimize payload for network transmission"""
# Serialize to JSON
json_data = json.dumps(data, separators=(',', ':'))
# Compress if large enough
if len(json_data) > self.compression_threshold:
compressed = zlib.compress(json_data.encode())
return b'COMP:' + compressed
else:
return json_data.encode()
def decompress_payload(self, payload: bytes) -> Dict[str, Any]:
"""Decompress and deserialize payload"""
if payload.startswith(b'COMP:'):
compressed_data = payload[5:] # Remove 'COMP:' prefix
decompressed = zlib.decompress(compressed_data)
return json.loads(decompressed.decode())
else:
return json.loads(payload.decode())
async def send_optimized_request(self, url: str, data: Dict[str, Any]):
"""Send optimized network request"""
optimized_payload = self.optimize_payload(data)
headers = {
'Content-Type': 'application/json',
'Content-Encoding': 'deflate' if len(optimized_payload) > self.compression_threshold else 'identity'
}
# Send request with optimized payload
async with aiohttp.ClientSession() as session:
async with session.post(url, data=optimized_payload, headers=headers) as response:
return await response.json()
Effective caching is one of the most powerful optimization techniques for agentic AI systems. A well-designed caching strategy can reduce response times by orders of magnitude while significantly cutting costs.
Use Case: Frequently accessed data, session information, model outputs Benefits: Fastest access, reduces repeated computations
import time
from functools import wraps
from typing import Dict, Any, Optional
import hashlib
class InMemoryCache:
def __init__(self, max_size=1000, ttl=3600):
self.cache = {}
self.max_size = max_size
self.ttl = ttl
self.access_times = {}
def _generate_key(self, func_name: str, args: tuple, kwargs: dict) -> str:
"""Generate cache key from function arguments"""
key_data = f"{func_name}:{str(args)}:{str(sorted(kwargs.items()))}"
return hashlib.md5(key_data.encode()).hexdigest()
def get(self, key: str) -> Optional[Any]:
"""Get item from cache"""
if key in self.cache:
item, timestamp = self.cache[key]
# Check TTL
if time.time() - timestamp < self.ttl:
self.access_times[key] = time.time()
return item
else:
# Expired, remove from cache
del self.cache[key]
if key in self.access_times:
del self.access_times[key]
return None
def set(self, key: str, value: Any):
"""Set item in cache"""
# Remove oldest item if cache is full
if len(self.cache) >= self.max_size:
oldest_key = min(self.access_times.keys(), key=lambda k: self.access_times[k])
del self.cache[oldest_key]
del self.access_times[oldest_key]
self.cache[key] = (value, time.time())
self.access_times[key] = time.time()
def cached(cache_instance):
"""Decorator for caching function results"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
key = cache_instance._generate_key(func.__name__, args, kwargs)
# Try to get from cache
cached_result = cache_instance.get(key)
if cached_result is not None:
return cached_result
# Execute function and cache result
result = func(*args, **kwargs)
cache_instance.set(key, result)
return result
return wrapper
return decorator
# Usage example
memory_cache = InMemoryCache(max_size=500, ttl=1800)
@cached(memory_cache)
def expensive_model_inference(prompt: str, context: dict) -> str:
"""Expensive model inference with caching"""
# Simulate expensive computation
time.sleep(2)
return f"Response to: {prompt}"
Use Case: Multi-agent systems, microservices, shared data Benefits: Consistent across services, scalable, persistent
import redis
import json
import pickle
from typing import Any, Optional
class DistributedCache:
def __init__(self, redis_url: str, default_ttl: int = 3600):
self.redis_client = redis.from_url(redis_url)
self.default_ttl = default_ttl
def serialize(self, data: Any) -> bytes:
"""Serialize data for storage"""
return pickle.dumps(data)
def deserialize(self, data: bytes) -> Any:
"""Deserialize data from storage"""
return pickle.loads(data)
async def get(self, key: str) -> Optional[Any]:
"""Get item from distributed cache"""
try:
data = self.redis_client.get(key)
if data:
return self.deserialize(data)
return None
except Exception as e:
print(f"Cache get error: {e}")
return None
async def set(self, key: str, value: Any, ttl: int = None) -> bool:
"""Set item in distributed cache"""
try:
ttl = ttl or self.default_ttl
serialized_data = self.serialize(value)
return self.redis_client.setex(key, ttl, serialized_data)
except Exception as e:
print(f"Cache set error: {e}")
return False
async def delete(self, key: str) -> bool:
"""Delete item from cache"""
try:
return bool(self.redis_client.delete(key))
except Exception as e:
print(f"Cache delete error: {e}")
return False
async def get_or_set(self, key: str, factory_func, ttl: int = None) -> Any:
"""Get from cache or set using factory function"""
value = await self.get(key)
if value is None:
value = await factory_func()
await self.set(key, value, ttl)
return value
# Usage in multi-agent system
class OptimizedAgent:
def __init__(self, cache_url: str):
self.cache = DistributedCache(cache_url)
async def get_user_preferences(self, user_id: str) -> dict:
"""Get user preferences with caching"""
cache_key = f"user_prefs:{user_id}"
async def fetch_preferences():
# Simulate database call
return {"theme": "dark", "language": "en", "notifications": True}
return await self.cache.get_or_set(cache_key, fetch_preferences, ttl=1800)
Use Case: Model inference results, API responses, computed results Benefits: Reduces API costs, improves response times, handles rate limits
import hashlib
import json
from typing import Dict, Any, List
class ModelResponseCache:
def __init__(self, cache_backend):
self.cache = cache_backend
def generate_request_hash(self, model: str, prompt: str, parameters: dict) -> str:
"""Generate hash for model request"""
request_data = {
"model": model,
"prompt": prompt,
"parameters": sorted(parameters.items())
}
request_str = json.dumps(request_data, sort_keys=True)
return hashlib.sha256(request_str.encode()).hexdigest()
async def get_cached_response(self, model: str, prompt: str, parameters: dict) -> Optional[str]:
"""Get cached model response"""
cache_key = self.generate_request_hash(model, prompt, parameters)
return await self.cache.get(cache_key)
async def cache_response(self, model: str, prompt: str, parameters: dict, response: str, ttl: int = 86400):
"""Cache model response"""
cache_key = self.generate_request_hash(model, prompt, parameters)
await self.cache.set(cache_key, response, ttl)
async def get_or_generate(self, model: str, prompt: str, parameters: dict, generator_func) -> str:
"""Get cached response or generate new one"""
# Try cache first
cached_response = await self.get_cached_response(model, prompt, parameters)
if cached_response:
return cached_response
# Generate new response
response = await generator_func(model, prompt, parameters)
# Cache the response
await self.cache_response(model, prompt, parameters, response)
return response
# Usage example
class CachedAIAgent:
def __init__(self, cache_backend, api_client):
self.response_cache = ModelResponseCache(cache_backend)
self.api_client = api_client
async def generate_response(self, prompt: str, model: str = "gpt-3.5-turbo", temperature: float = 0.7) -> str:
"""Generate AI response with caching"""
parameters = {"temperature": temperature, "max_tokens": 1000}
async def call_api(model, prompt, parameters):
return await self.api_client.chat_completion(
model=model,
messages=[{"role": "user", "content": prompt}],
**parameters
)
return await self.response_cache.get_or_generate(
model, prompt, parameters, call_api
)
Memory management is crucial for agentic AI systems, especially when dealing with large language models, extensive context windows, or multiple concurrent agents.
from collections import deque
from typing import List, Dict, Any
class ContextManager:
def __init__(self, max_tokens: int = 4000, token_reserve: int = 500):
self.max_tokens = max_tokens
self.token_reserve = token_reserve
self.available_tokens = max_tokens - token_reserve
self.conversation_history = deque(maxlen=100)
self.token_counts = {}
def estimate_tokens(self, text: str) -> int:
"""Rough token estimation (approximately 4 chars per token)"""
return len(text) // 4
def add_message(self, role: str, content: str):
"""Add message to conversation with token tracking"""
message = {"role": role, "content": content}
token_count = self.estimate_tokens(content)
self.conversation_history.append(message)
self.token_counts[id(message)] = token_count
# Trim if necessary
self.trim_if_needed()
def trim_if_needed(self):
"""Trim conversation to fit within token limits"""
total_tokens = sum(self.token_counts.values())
while total_tokens > self.available_tokens and self.conversation_history:
# Remove oldest message
oldest_message = self.conversation_history.popleft()
oldest_tokens = self.token_counts.pop(id(oldest_message), 0)
total_tokens -= oldest_tokens
def get_context(self) -> List[Dict[str, str]]:
"""Get current conversation context"""
return list(self.conversation_history)
def get_token_usage(self) -> Dict[str, int]:
"""Get current token usage statistics"""
total_tokens = sum(self.token_counts.values())
return {
"used": total_tokens,
"available": self.available_tokens,
"percentage": (total_tokens / self.available_tokens) * 100
}
# Usage example
class MemoryOptimizedAgent:
def __init__(self):
self.context_manager = ContextManager(max_tokens=4000)
async def process_message(self, user_input: str) -> str:
"""Process user message with memory optimization"""
# Add user message
self.context_manager.add_message("user", user_input)
# Get context for model
context = self.context_manager.get_context()
# Generate response
response = await self.generate_response(context)
# Add assistant response
self.context_manager.add_message("assistant", response)
return response
import asyncio
from typing import List, Any, Callable
from dataclasses import dataclass
@dataclass
class BatchConfig:
batch_size: int = 32
max_wait_time: float = 0.1 # Maximum time to wait for batch
max_concurrent_batches: int = 4
class BatchProcessor:
def __init__(self, config: BatchConfig, process_func: Callable):
self.config = config
self.process_func = process_func
self.current_batch = []
self.batch_semaphore = asyncio.Semaphore(config.max_concurrent_batches)
self.batch_event = asyncio.Event()
self.processing = False
async def add_item(self, item: Any) -> Any:
"""Add item to batch and return result"""
# Create future for result
future = asyncio.Future()
# Add to current batch
self.current_batch.append((item, future))
# Start processing if batch is full or start timer
if len(self.current_batch) >= self.config.batch_size:
asyncio.create_task(self.process_batch())
elif not self.processing:
asyncio.create_task(self.batch_timer())
# Wait for result
return await future
async def batch_timer(self):
"""Timer to process batch after max_wait_time"""
self.processing = True
await asyncio.sleep(self.config.max_wait_time)
if self.current_batch:
await self.process_batch()
self.processing = False
async def process_batch(self):
"""Process current batch"""
if not self.current_batch:
return
# Get current batch and reset
batch = self.current_batch.copy()
self.current_batch.clear()
# Process batch with semaphore control
async with self.batch_semaphore:
try:
# Extract items and futures
items = [item for item, _ in batch]
futures = [future for _, future in batch]
# Process batch
results = await self.process_func(items)
# Set results for futures
for future, result in zip(futures, results):
if not future.done():
future.set_result(result)
except Exception as e:
# Set exception for all futures in batch
for _, future in batch:
if not future.done():
future.set_exception(e)
# Usage example
async def batch_model_inference(items: List[str]) -> List[str]:
"""Process multiple prompts in batch"""
# Simulate batch API call
await asyncio.sleep(0.5) # Simulate API latency
return [f"Response to: {item}" for item in items]
class BatchOptimizedAgent:
def __init__(self):
batch_config = BatchConfig(batch_size=16, max_wait_time=0.05)
self.batch_processor = BatchProcessor(batch_config, batch_model_inference)
async def respond(self, prompt: str) -> str:
"""Get response with batch optimization"""
return await self.batch_processor.add_item(prompt)
Optimizing AI model inference is crucial for production systems. These techniques can dramatically reduce latency and costs while maintaining acceptable quality.
import torch
import torch.nn as nn
from typing import Dict, Any
class ModelQuantizer:
def __init__(self, model: nn.Module):
self.model = model
self.quantized_model = None
def quantize_dynamic(self, quantization_config: Dict[str, Any] = None):
"""Apply dynamic quantization to model"""
config = quantization_config or {
"dtype": torch.qint8,
"qconfig_spec": {
nn.Linear: torch.quantization.default_dynamic_qconfig,
nn.LSTM: torch.quantization.default_dynamic_qconfig,
nn.GRU: torch.quantization.default_dynamic_qconfig
}
}
# Apply dynamic quantization
self.quantized_model = torch.quantization.quantize_dynamic(
self.model.eval(),
config["qconfig_spec"],
dtype=config["dtype"]
)
return self.quantized_model
def quantize_static(self, calibration_data, quantization_config: Dict[str, Any] = None):
"""Apply static quantization with calibration"""
config = quantization_config or {
"dtype": torch.qint8,
"qconfig_spec": {
nn.Linear: torch.quantization.get_default_qconfig('fbgemm'),
nn.Conv2d: torch.quantization.get_default_qconfig('fbgemm')
}
}
# Prepare model for quantization
self.model.qconfig = torch.quantization.get_default_qconfig('fbgemm')
torch.quantization.prepare(self.model, inplace=True)
# Calibrate with sample data
with torch.no_grad():
for data in calibration_data:
self.model(data)
# Convert to quantized model
self.quantized_model = torch.quantization.convert(self.model.eval(), inplace=False)
return self.quantized_model
def compare_models(self, test_data) -> Dict[str, float]:
"""Compare performance between original and quantized models"""
if not self.quantized_model:
raise ValueError("Model not quantized yet")
original_times = []
quantized_times = []
# Test original model
self.model.eval()
with torch.no_grad():
for data in test_data:
start_time = torch.cuda.Event(enable_timing=True)
end_time = torch.cuda.Event(enable_timing=True)
start_time.record()
_ = self.model(data)
end_time.record()
torch.cuda.synchronize()
original_times.append(start_time.elapsed_time(end_time))
# Test quantized model
self.quantized_model.eval()
with torch.no_grad():
for data in test_data:
start_time = torch.cuda.Event(enable_timing=True)
end_time = torch.cuda.Event(enable_timing=True)
start_time.record()
_ = self.quantized_model(data)
end_time.record()
torch.cuda.synchronize()
quantized_times.append(start_time.elapsed_time(end_time))
return {
"original_avg_time": sum(original_times) / len(original_times),
"quantized_avg_time": sum(quantized_times) / len(quantized_times),
"speedup_ratio": sum(original_times) / sum(quantized_times),
"model_size_reduction": self._calculate_size_reduction()
}
def _calculate_size_reduction(self) -> float:
"""Calculate model size reduction ratio"""
original_size = sum(p.numel() * p.element_size() for p in self.model.parameters())
quantized_size = sum(p.numel() * p.element_size() for p in self.quantized_model.parameters())
return original_size / quantized_size
import torch.nn.utils.prune as prune
from typing import List, Tuple
class ModelPruner:
def __init__(self, model: nn.Module):
self.model = model
self.pruned_model = None
def prune_structured(self, pruning_ratio: float = 0.2, layers_to_prune: List[str] = None):
"""Apply structured pruning to specified layers"""
layers_to_prune = layers_to_prune or self._get_prunable_layers()
for name, module in self.model.named_modules():
if name in layers_to_prune and hasattr(module, 'weight'):
# Prune 20% of connections in specified layers
prune.l1_unstructured(
module,
name='weight',
amount=pruning_ratio
)
# Remove pruning masks to make pruning permanent
for name, module in self.model.named_modules():
if name in layers_to_prune:
prune.remove(module, 'weight')
self.pruned_model = self.model
return self.pruned_model
def prune_global(self, pruning_ratio: float = 0.1):
"""Apply global unstructured pruning"""
parameters_to_prune = []
for name, module in self.model.named_modules():
if isinstance(module, (nn.Conv2d, nn.Linear)):
parameters_to_prune.append((module, 'weight'))
# Apply global pruning
prune.global_unstructured(
parameters_to_prune,
pruning_method=prune.L1Unstructured,
amount=pruning_ratio
)
# Remove pruning masks
for module, param_name in parameters_to_prune:
prune.remove(module, param_name)
self.pruned_model = self.model
return self.pruned_model
def _get_prunable_layers(self) -> List[str]:
"""Get list of layers that can be pruned"""
prunable_layers = []
for name, module in self.model.named_modules():
if isinstance(module, (nn.Conv2d, nn.Linear)):
prunable_layers.append(name)
return prunable_layers
def fine_tune_pruned_model(self, train_loader, optimizer, criterion, epochs: int = 5):
"""Fine-tune pruned model to recover accuracy"""
if not self.pruned_model:
raise ValueError("Model not pruned yet")
self.pruned_model.train()
for epoch in range(epochs):
total_loss = 0
for batch_idx, (data, target) in enumerate(train_loader):
optimizer.zero_grad()
output = self.pruned_model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Epoch {epoch+1}, Loss: {total_loss/len(train_loader):.4f}")
return self.pruned_model
Modern agentic AI systems need to adapt their optimization strategies based on current conditions, workload patterns, and performance requirements.
import asyncio
import psutil
from typing import Dict, Any, Callable
from dataclasses import dataclass
from enum import Enum
class ResourceLevel(Enum):
MINIMAL = "minimal"
STANDARD = "standard"
HIGH_PERFORMANCE = "high_performance"
@dataclass
class ResourceConfig:
cpu_threshold_low: float = 30.0
cpu_threshold_high: float = 80.0
memory_threshold_low: float = 40.0
memory_threshold_high: float = 85.0
response_time_threshold: float = 2.0 # seconds
class AdaptiveOptimizer:
def __init__(self, config: ResourceConfig):
self.config = config
self.current_level = ResourceLevel.STANDARD
self.performance_history = []
self.optimization_strategies = {
ResourceLevel.MINIMAL: self._minimal_optimization,
ResourceLevel.STANDARD: self._standard_optimization,
ResourceLevel.HIGH_PERFORMANCE: self._high_performance_optimization
}
async def monitor_and_optimize(self, agent_system):
"""Continuously monitor and optimize system performance"""
while True:
# Collect metrics
metrics = await self._collect_metrics(agent_system)
# Determine optimal resource level
optimal_level = self._determine_optimal_level(metrics)
# Apply optimization if level changed
if optimal_level != self.current_level:
await self._apply_optimization(agent_system, optimal_level)
self.current_level = optimal_level
# Wait before next check
await asyncio.sleep(30) # Check every 30 seconds
async def _collect_metrics(self, agent_system) -> Dict[str, float]:
"""Collect system performance metrics"""
cpu_percent = psutil.cpu_percent(interval=1)
memory_percent = psutil.virtual_memory().percent
# Get agent-specific metrics
avg_response_time = await agent_system.get_average_response_time()
queue_size = await agent_system.get_queue_size()
return {
"cpu_percent": cpu_percent,
"memory_percent": memory_percent,
"avg_response_time": avg_response_time,
"queue_size": queue_size
}
def _determine_optimal_level(self, metrics: Dict[str, float]) -> ResourceLevel:
"""Determine optimal resource level based on metrics"""
cpu = metrics["cpu_percent"]
memory = metrics["memory_percent"]
response_time = metrics["avg_response_time"]
# High performance needed
if (cpu > self.config.cpu_threshold_high or
memory > self.config.memory_threshold_high or
response_time > self.config.response_time_threshold):
return ResourceLevel.HIGH_PERFORMANCE
# Minimal resources sufficient
elif (cpu < self.config.cpu_threshold_low and
memory < self.config.memory_threshold_low and
response_time < self.config.response_time_threshold / 2):
return ResourceLevel.MINIMAL
# Standard resources
else:
return ResourceLevel.STANDARD
async def _apply_optimization(self, agent_system, level: ResourceLevel):
"""Apply optimization strategy for given level"""
strategy = self.optimization_strategies[level]
await strategy(agent_system)
print(f"Applied {level.value} optimization strategy")
async def _minimal_optimization(self, agent_system):
"""Minimal resource usage optimization"""
await agent_system.reduce_batch_size(0.5)
await agent_system.increase_cache_ttl(7200) # 2 hours
await agent_system.enable_aggressive_gc()
await agent_system.reduce_concurrent_requests(2)
async def _standard_optimization(self, agent_system):
"""Standard balanced optimization"""
await agent_system.reset_batch_size()
await agent_system.reset_cache_ttl()
await agent_system.disable_aggressive_gc()
await agent_system.reset_concurrent_requests()
async def _high_performance_optimization(self, agent_system):
"""High performance optimization"""
await agent_system.increase_batch_size(2.0)
await agent_system.decrease_cache_ttl(900) # 15 minutes
await agent_system.enable_preloading()
await agent_system.increase_concurrent_requests(10)
import numpy as np
from typing import List, Dict, Tuple
from datetime import datetime, timedelta
import asyncio
class PredictiveScaler:
def __init__(self, history_window: int = 24):
self.history_window = history_window
self.usage_history = []
self.scaling_predictions = []
def record_usage(self, timestamp: datetime, metrics: Dict[str, float]):
"""Record system usage metrics"""
self.usage_history.append({
"timestamp": timestamp,
"metrics": metrics
})
# Keep only recent history
cutoff_time = timestamp - timedelta(hours=self.history_window)
self.usage_history = [
entry for entry in self.usage_history
if entry["timestamp"] > cutoff_time
]
def predict_load(self, hours_ahead: int = 1) -> Dict[str, float]:
"""Predict system load for future time period"""
if len(self.usage_history) < 2:
return {"cpu": 50.0, "memory": 60.0, "requests": 100} # Default prediction
# Extract time series data
timestamps = [entry["timestamp"] for entry in self.usage_history]
cpu_usage = [entry["metrics"]["cpu_percent"] for entry in self.usage_history]
memory_usage = [entry["metrics"]["memory_percent"] for entry in self.usage_history]
request_count = [entry["metrics"]["requests_per_minute"] for entry in self.usage_history]
# Simple linear trend prediction
predictions = {}
for metric_name, values in [("cpu", cpu_usage), ("memory", memory_usage), ("requests", request_count)]:
# Calculate trend
x = np.arange(len(values))
coeffs = np.polyfit(x, values, 1) # Linear fit
# Predict future value
future_x = len(values) + (hours_ahead * 60) # Assuming data points per minute
predicted_value = np.polyval(coeffs, future_x)
# Clamp to reasonable bounds
if metric_name in ["cpu", "memory"]:
predicted_value = max(0, min(100, predicted_value))
else:
predicted_value = max(0, predicted_value)
predictions[metric_name] = float(predicted_value)
return predictions
def get_scaling_recommendation(self, hours_ahead: int = 1) -> Dict[str, Any]:
"""Get scaling recommendation based on predictions"""
predicted_load = self.predict_load(hours_ahead)
recommendations = {
"scale_up": False,
"scale_down": False,
"target_replicas": 1,
"resource_adjustments": {},
"confidence": "medium"
}
# CPU-based scaling
if predicted_load["cpu"] > 80:
recommendations["scale_up"] = True
recommendations["target_replicas"] = min(10, int(predicted_load["cpu"] / 40))
recommendations["confidence"] = "high"
elif predicted_load["cpu"] < 20:
recommendations["scale_down"] = True
recommendations["target_replicas"] = max(1, int(predicted_load["cpu"] / 40))
# Memory-based adjustments
if predicted_load["memory"] > 85:
recommendations["resource_adjustments"]["memory"] = "increase"
elif predicted_load["memory"] < 30:
recommendations["resource_adjustments"]["memory"] = "decrease"
# Request-based scaling
if predicted_load["requests"] > 500:
recommendations["resource_adjustments"]["batch_size"] = "increase"
elif predicted_load["requests"] < 100:
recommendations["resource_adjustments"]["batch_size"] = "decrease"
return recommendations
class AutoScalingAgent:
def __init__(self, min_replicas: int = 1, max_replicas: int = 10):
self.min_replicas = min_replicas
self.max_replicas = max_replicas
self.current_replicas = min_replicas
self.predictive_scaler = PredictiveScaler()
self.scaling_cooldown = 300 # 5 minutes
self.last_scaling_time = None
async def auto_scale_loop(self, agent_system):
"""Main auto-scaling loop"""
while True:
# Record current metrics
current_metrics = await self._collect_current_metrics(agent_system)
self.predictive_scaler.record_usage(datetime.now(), current_metrics)
# Check if we can scale (cooldown period)
if self._can_scale():
# Get scaling recommendation
recommendation = self.predictive_scaler.get_scaling_recommendation()
# Apply scaling if needed
if recommendation["scale_up"] and self.current_replicas < self.max_replicas:
await self._scale_up(agent_system, recommendation["target_replicas"])
elif recommendation["scale_down"] and self.current_replicas > self.min_replicas:
await self._scale_down(agent_system, recommendation["target_replicas"])
await asyncio.sleep(60) # Check every minute
async def _collect_current_metrics(self, agent_system) -> Dict[str, float]:
"""Collect current system metrics"""
return {
"cpu_percent": psutil.cpu_percent(),
"memory_percent": psutil.virtual_memory().percent,
"requests_per_minute": await agent_system.get_requests_per_minute()
}
def _can_scale(self) -> bool:
"""Check if scaling is allowed (cooldown period)"""
if self.last_scaling_time is None:
return True
time_since_last_scaling = (datetime.now() - self.last_scaling_time).total_seconds()
return time_since_last_scaling >= self.scaling_cooldown
async def _scale_up(self, agent_system, target_replicas: int):
"""Scale up to target number of replicas"""
print(f"Scaling up from {self.current_replicas} to {target_replicas} replicas")
await agent_system.scale_replicas(target_replicas)
self.current_replicas = target_replicas
self.last_scaling_time = datetime.now()
async def _scale_down(self, agent_system, target_replicas: int):
"""Scale down to target number of replicas"""
print(f"Scaling down from {self.current_replicas} to {target_replicas} replicas")
await agent_system.scale_replicas(target_replicas)
self.current_replicas = target_replicas
self.last_scaling_time = datetime.now()
Continuous optimization requires comprehensive monitoring and profiling to identify bottlenecks and measure improvements.
import time
import asyncio
from typing import Dict, List, Any
from dataclasses import dataclass, field
from collections import defaultdict, deque
import statistics
@dataclass
class PerformanceMetrics:
operation_name: str
duration: float
timestamp: float
cpu_usage: float
memory_usage: float
success: bool
error_message: str = ""
class PerformanceProfiler:
def __init__(self, window_size: int = 1000):
self.window_size = window_size
self.metrics_history = defaultdict(lambda: deque(maxlen=window_size))
self.active_operations = {}
self.alert_thresholds = {
"response_time_p95": 2.0, # seconds
"error_rate": 0.05, # 5%
"cpu_usage": 80.0, # percentage
"memory_usage": 85.0 # percentage
}
def start_operation(self, operation_id: str, operation_name: str):
"""Start tracking an operation"""
self.active_operations[operation_id] = {
"name": operation_name,
"start_time": time.time(),
"start_cpu": psutil.cpu_percent(),
"start_memory": psutil.virtual_memory().percent
}
def end_operation(self, operation_id: str, success: bool = True, error_message: str = ""):
"""End tracking an operation and record metrics"""
if operation_id not in self.active_operations:
return
operation = self.active_operations.pop(operation_id)
# Calculate metrics
duration = time.time() - operation["start_time"]
end_cpu = psutil.cpu_percent()
end_memory = psutil.virtual_memory().percent
metrics = PerformanceMetrics(
operation_name=operation["name"],
duration=duration,
timestamp=time.time(),
cpu_usage=(operation["start_cpu"] + end_cpu) / 2,
memory_usage=(operation["start_memory"] + end_memory) / 2,
success=success,
error_message=error_message
)
# Store metrics
self.metrics_history[operation["name"]].append(metrics)
# Check for alerts
self._check_alerts(operation["name"])
return metrics
def get_operation_stats(self, operation_name: str) -> Dict[str, Any]:
"""Get statistics for a specific operation"""
metrics = list(self.metrics_history[operation_name])
if not metrics:
return {}
durations = [m.duration for m in metrics]
success_rate = sum(1 for m in metrics if m.success) / len(metrics)
return {
"operation_name": operation_name,
"total_operations": len(metrics),
"avg_duration": statistics.mean(durations),
"p50_duration": statistics.median(durations),
"p95_duration": np.percentile(durations, 95),
"p99_duration": np.percentile(durations, 99),
"min_duration": min(durations),
"max_duration": max(durations),
"success_rate": success_rate,
"error_rate": 1 - success_rate,
"avg_cpu_usage": statistics.mean([m.cpu_usage for m in metrics]),
"avg_memory_usage": statistics.mean([m.memory_usage for m in metrics])
}
def _check_alerts(self, operation_name: str):
"""Check if any performance thresholds are exceeded"""
stats = self.get_operation_stats(operation_name)
if not stats:
return
alerts = []
# Check response time
if stats["p95_duration"] > self.alert_thresholds["response_time_p95"]:
alerts.append(f"High P95 response time: {stats['p95_duration']:.2f}s")
# Check error rate
if stats["error_rate"] > self.alert_thresholds["error_rate"]:
alerts.append(f"High error rate: {stats['error_rate']:.2%}")
# Check resource usage
if stats["avg_cpu_usage"] > self.alert_thresholds["cpu_usage"]:
alerts.append(f"High CPU usage: {stats['avg_cpu_usage']:.1f}%")
if stats["avg_memory_usage"] > self.alert_thresholds["memory_usage"]:
alerts.append(f"High memory usage: {stats['avg_memory_usage']:.1f}%")
if alerts:
print(f"ALERT for {operation_name}: {'; '.join(alerts)}")
def get_optimization_recommendations(self, operation_name: str) -> List[str]:
"""Get optimization recommendations based on performance data"""
stats = self.get_operation_stats(operation_name)
recommendations = []
if not stats:
return recommendations
# Response time recommendations
if stats["p95_duration"] > 1.0:
recommendations.append("Consider implementing caching for frequently accessed data")
recommendations.append("Optimize database queries and add indexes")
if stats["p99_duration"] > 2.0:
recommendations.append("Implement request batching for bulk operations")
recommendations.append("Consider async processing for long-running tasks")
# Error rate recommendations
if stats["error_rate"] > 0.02:
recommendations.append("Implement better error handling and retry mechanisms")
recommendations.append("Add input validation and sanitization")
# Resource usage recommendations
if stats["avg_cpu_usage"] > 70:
recommendations.append("Consider CPU-intensive task offloading")
recommendations.append("Implement algorithmic optimizations")
if stats["avg_memory_usage"] > 75:
recommendations.append("Implement memory pooling and object reuse")
recommendations.append("Optimize data structures and reduce memory footprint")
return recommendations
# Usage example
class MonitoredAgent:
def __init__(self):
self.profiler = PerformanceProfiler()
async def process_request(self, request_data):
"""Process request with performance monitoring"""
operation_id = f"req_{int(time.time() * 1000)}"
try:
# Start monitoring
self.profiler.start_operation(operation_id, "process_request")
# Process the request
result = await self._actually_process_request(request_data)
# End monitoring with success
self.profiler.end_operation(operation_id, success=True)
return result
except Exception as e:
# End monitoring with error
self.profiler.end_operation(operation_id, success=False, error_message=str(e))
raise
async def _actually_process_request(self, request_data):
"""Actual request processing logic"""
# Simulate processing time
await asyncio.sleep(0.1)
return {"status": "success", "data": request_data}
def get_performance_report(self) -> Dict[str, Any]:
"""Get comprehensive performance report"""
report = {
"timestamp": datetime.now().isoformat(),
"operations": {}
}
for operation_name in self.profiler.metrics_history.keys():
stats = self.profiler.get_operation_stats(operation_name)
recommendations = self.profiler.get_optimization_recommendations(operation_name)
report["operations"][operation_name] = {
**stats,
"recommendations": recommendations
}
return report
You've mastered advanced optimization techniques for agentic AI systems!
In the next lesson, "MCP Ecosystem Future", we'll explore:
This knowledge will position you at the forefront of agentic AI development, ready to leverage the latest advancements in the MCP ecosystem and build future-proof agent systems.
| Term | Definition |
|---|---|
| Quantization | Process of reducing model precision to improve performance and reduce memory usage |
| Pruning | Technique of removing unnecessary model parameters to reduce size and improve speed |
| Batch Processing | Processing multiple items together to improve throughput |
| Context Window | Maximum amount of information a model can consider at once |
| Predictive Scaling | Using historical data to predict and prepare for future resource needs |
| Adaptive Optimization | Dynamic adjustment of optimization strategies based on current conditions |
| Performance Profiling | Detailed analysis of system performance to identify bottlenecks |
| Resource Allocation | Distribution of computational resources among competing processes |
| Caching Strategy | Approach to storing frequently accessed data for faster retrieval |
| Auto-scaling | Automatic adjustment of resources based on current demand |
Optimization is not a one-time task but a continuous process of measurement, analysis, and improvement. Master these techniques to build agentic AI systems that are not just functional, but truly efficient and scalable in production environments!