Welcome to the hands-on world of MCP Labs - where theory meets practice and concepts become reality! If you've been following our journey through the Model Context Protocol, you already understand the "what" and "why" of MCP. Now it's time to dive into the "how" with practical, interactive development experiences that will transform your understanding from theoretical knowledge to practical expertise.
MCP Labs represents the experimental playground where developers can explore cutting-edge MCP implementations, test new features, and build sophisticated agent systems without the constraints of production environments. Think of it as your personal laboratory for innovation - a space where you can experiment, fail safely, learn rapidly, and push the boundaries of what's possible with MCP.
In this immersive, hands-on lesson, you'll move beyond understanding MCP concepts to actually building with them. We'll guide you through setting up your development environment, creating your first MCP servers and clients, implementing advanced features, and solving real-world problems using the power of MCP. By the end of this session, you won't just know about MCP - you'll be a practitioner who can architect, implement, and deploy MCP-based solutions.
The beauty of MCP Labs lies in its accessibility and power. Whether you're building a simple file system connector or a complex multi-agent collaboration system, MCP Labs provides the tools, frameworks, and experimental features you need to bring your ideas to life. We'll explore everything from basic server implementations to advanced patterns like caching, security, and performance optimization.
By the end of this comprehensive, hands-on lesson, you will be able to:
Before diving into MCP Labs, let's ensure your development environment is properly configured for optimal learning and development experience.
System Requirements:
Development Tools:
The MCP SDK provides the core libraries and tools needed for building MCP servers and clients. Let's set up your development environment step by step.
Python SDK Installation:
# Create a virtual environment for MCP development
python -m venv mcp-labs-env
source mcp-labs-env/bin/activate # On Windows: mcp-labs-env\Scripts\activate
# Install the MCP Python SDK
pip install mcp
# Install additional development dependencies
pip install mcp[dev] # Includes testing and development tools
pip install pytest pytest-asyncio black flake8 # Development tools
Node.js/TypeScript SDK Installation:
# Initialize a new Node.js project
npm init -y mcp-labs-project
cd mcp-labs-project
# Install MCP TypeScript SDK
npm install @modelcontextprotocol/sdk
npm install -D typescript @types/node ts-node nodemon
# Initialize TypeScript configuration
npx tsc --init
Verification Installation:
# Test Python installation
python -c "import mcp; print('MCP Python SDK installed successfully')"
print(f"MCP Version: {mcp.__version__}")
// Test Node.js installation
const mcp = require('@modelcontextprotocol/sdk');
console.log('MCP Node.js SDK installed successfully');
Proper configuration of your development environment will significantly enhance your productivity and learning experience.
VS Code Setup:
// .vscode/settings.json
{
"python.defaultInterpreterPath": "./mcp-labs-env/bin/python",
"python.linting.enabled": true,
"python.linting.flake8Enabled": true,
"python.formatting.provider": "black",
"typescript.preferences.importModuleSpecifier": "relative",
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": true
}
}
Recommended VS Code Extensions:
Project Structure Template:
mcp-labs-project/
βββ servers/ # MCP server implementations
β βββ filesystem/ # File system server
β βββ database/ # Database connector server
β βββ custom/ # Custom domain-specific servers
βββ clients/ # MCP client implementations
β βββ cli/ # Command-line client
β βββ web/ # Web-based client
β βββ agents/ # AI agent clients
βββ shared/ # Shared utilities and types
βββ tests/ # Test suites
βββ docs/ # Documentation
βββ examples/ # Example implementations
βββ scripts/ # Development and deployment scripts
βββ README.md # Project documentation
Let's start our MCP Labs journey by building a practical, useful MCP server that provides file system access through the MCP protocol. This hands-on exercise will teach you the fundamental patterns and concepts of MCP server development.
Our file system MCP server will provide secure, controlled access to local files and directories through the MCP protocol. This is a common use case that demonstrates many core MCP concepts.
Core Features:
Security Considerations:
Let's build our file system server incrementally, starting with the basic structure and adding features progressively.
Step 1: Basic Server Structure:
# servers/filesystem/server.py
import asyncio
import json
import os
import mimetypes
from pathlib import Path
from typing import List, Optional, Dict, Any
from datetime import datetime
from mcp.server import Server
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
from mcp.types import (
Resource, Tool, TextContent, ImageContent, EmbeddedResource,
ListResourcesResult, ReadResourceResult, CallToolResult
)
class FileSystemServer:
def __init__(self, allowed_paths: List[str] = None):
self.server = Server("filesystem-server")
self.allowed_paths = allowed_paths or [os.getcwd()]
self._setup_handlers()
def _setup_handlers(self):
"""Register MCP protocol handlers"""
self.server.list_resources = self._list_resources
self.server.read_resource = self._read_resource
self.server.list_tools = self._list_tools
self.server.call_tool = self._call_tool
def _validate_path(self, path: str) -> bool:
"""Validate that path is within allowed directories"""
try:
abs_path = os.path.abspath(path)
for allowed in self.allowed_paths:
if abs_path.startswith(os.path.abspath(allowed)):
return True
return False
except Exception:
return False
async def _list_resources(self) -> ListResourcesResult:
"""List available file system resources"""
resources = []
for base_path in self.allowed_paths:
for root, dirs, files in os.walk(base_path):
for file in files:
file_path = os.path.join(root, file)
if self._validate_path(file_path):
rel_path = os.path.relpath(file_path, base_path)
uri = f"file://{file_path}"
# Determine MIME type
mime_type, _ = mimetypes.guess_type(file_path)
mime_type = mime_type or "application/octet-stream"
# Get file metadata
stat = os.stat(file_path)
resource = Resource(
uri=uri,
name=rel_path,
description=f"File: {rel_path}",
mimeType=mime_type,
metadata={
"size": stat.st_size,
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat(),
"created": datetime.fromtimestamp(stat.st_ctime).isoformat()
}
)
resources.append(resource)
return ListResourcesResult(resources=resources)
async def _read_resource(self, uri: str) -> ReadResourceResult:
"""Read a file resource"""
try:
# Extract file path from URI
if uri.startswith("file://"):
file_path = uri[7:] # Remove "file://" prefix
else:
file_path = uri
if not self._validate_path(file_path):
raise ValueError(f"Access denied: {file_path}")
if not os.path.exists(file_path):
raise ValueError(f"File not found: {file_path}")
# Check file size (limit to 10MB for safety)
file_size = os.path.getsize(file_path)
if file_size > 10 * 1024 * 1024:
raise ValueError(f"File too large: {file_size} bytes")
# Read file content
with open(file_path, 'rb') as f:
content = f.read()
# Determine content type
mime_type, _ = mimetypes.guess_type(file_path)
mime_type = mime_type or "application/octet-stream"
# Create appropriate content object
if mime_type.startswith("text/"):
text_content = content.decode('utf-8', errors='replace')
contents = [TextContent(type="text", text=text_content)]
elif mime_type.startswith("image/"):
import base64
base64_content = base64.b64encode(content).decode('utf-8')
contents = [ImageContent(type="image", mediaType=mime_type, data=base64_content)]
else:
# For binary files, provide metadata and hex preview
hex_preview = content[:100].hex()
text_content = f"Binary file: {file_path}\nSize: {file_size} bytes\nPreview (hex): {hex_preview}"
contents = [TextContent(type="text", text=text_content)]
return ReadResourceResult(contents=contents)
except Exception as e:
raise ValueError(f"Error reading resource {uri}: {str(e)}")
async def _list_tools(self) -> List[Tool]:
"""List available tools"""
return [
Tool(
name="list_directory",
description="List contents of a directory",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Directory path to list"
},
"recursive": {
"type": "boolean",
"description": "List recursively",
"default": False
}
},
"required": ["path"]
}
),
Tool(
name="search_files",
description="Search for files by name or content",
inputSchema={
"type": "object",
"properties": {
"pattern": {
"type": "string",
"description": "Search pattern (supports wildcards)"
},
"content": {
"type": "string",
"description": "Content to search within files"
},
"path": {
"type": "string",
"description": "Directory to search in",
"default": "."
}
}
}
),
Tool(
name="write_file",
description="Write content to a file",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "File path to write"
},
"content": {
"type": "string",
"description": "Content to write"
},
"append": {
"type": "boolean",
"description": "Append to existing file",
"default": False
}
},
"required": ["path", "content"]
}
)
]
async def _call_tool(self, name: str, arguments: Dict[str, Any]) -> CallToolResult:
"""Execute a tool"""
try:
if name == "list_directory":
return await self._tool_list_directory(arguments)
elif name == "search_files":
return await self._tool_search_files(arguments)
elif name == "write_file":
return await self._tool_write_file(arguments)
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
return CallToolResult(
content=[TextContent(type="text", text=f"Error: {str(e)}")],
isError=True
)
async def _tool_list_directory(self, args: Dict[str, Any]) -> CallToolResult:
"""List directory contents"""
path = args.get("path", ".")
recursive = args.get("recursive", False)
if not self._validate_path(path):
raise ValueError(f"Access denied: {path}")
if not os.path.exists(path):
raise ValueError(f"Path not found: {path}")
if not os.path.isdir(path):
raise ValueError(f"Path is not a directory: {path}")
result = []
if recursive:
for root, dirs, files in os.walk(path):
for item in dirs + files:
item_path = os.path.join(root, item)
rel_path = os.path.relpath(item_path, path)
stat = os.stat(item_path)
result.append({
"name": item,
"path": rel_path,
"type": "directory" if os.path.isdir(item_path) else "file",
"size": stat.st_size,
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat()
})
else:
for item in os.listdir(path):
item_path = os.path.join(path, item)
if self._validate_path(item_path):
stat = os.stat(item_path)
result.append({
"name": item,
"path": item_path,
"type": "directory" if os.path.isdir(item_path) else "file",
"size": stat.st_size,
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat()
})
return CallToolResult(
content=[TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
)
async def _tool_search_files(self, args: Dict[str, Any]) -> CallToolResult:
"""Search for files"""
pattern = args.get("pattern", "")
content_search = args.get("content", "")
search_path = args.get("path", ".")
if not self._validate_path(search_path):
raise ValueError(f"Access denied: {search_path}")
if not os.path.exists(search_path):
raise ValueError(f"Search path not found: {search_path}")
import fnmatch
results = []
for root, dirs, files in os.walk(search_path):
for file in files:
file_path = os.path.join(root, file)
# Check name pattern
if pattern and not fnmatch.fnmatch(file, pattern):
continue
# Check content if specified
if content_search:
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
if content_search.lower() not in content.lower():
continue
except Exception:
continue # Skip files that can't be read as text
rel_path = os.path.relpath(file_path, search_path)
stat = os.stat(file_path)
results.append({
"file": rel_path,
"path": file_path,
"size": stat.st_size,
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat()
})
return CallToolResult(
content=[TextContent(
type="text",
text=f"Found {len(results)} matching files:\n" +
json.dumps(results, indent=2)
)]
)
async def _tool_write_file(self, args: Dict[str, Any]) -> CallToolResult:
"""Write content to a file"""
path = args["path"]
content = args["content"]
append = args.get("append", False)
if not self._validate_path(path):
raise ValueError(f"Access denied: {path}")
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(path), exist_ok=True)
mode = 'a' if append else 'w'
with open(path, mode, encoding='utf-8') as f:
f.write(content)
action = "appended to" if append else "written to"
return CallToolResult(
content=[TextContent(
type="text",
text=f"Content successfully {action} {path}"
)]
)
# Server entry point
async def main():
server = FileSystemServer()
# Use stdio transport for communication
async with stdio_server() as (read_stream, write_stream):
await server.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="filesystem-server",
server_version="1.0.0",
capabilities=server.server.get_capabilities(
notification_options=None,
experimental_capabilities=None,
),
),
)
if __name__ == "__main__":
asyncio.run(main())
Step 2: Configuration and Security:
# servers/filesystem/config.py
import os
from typing import List, Dict, Any
from dataclasses import dataclass
@dataclass
class FileSystemConfig:
"""Configuration for file system MCP server"""
# Security settings
allowed_paths: List[str]
max_file_size: int = 10 * 1024 * 1024 # 10MB
allowed_extensions: List[str] = None # None means all allowed
blocked_extensions: List[str] = None
# Performance settings
max_search_results: int = 1000
cache_enabled: bool = True
cache_ttl: int = 300 # 5 minutes
# Logging settings
log_level: str = "INFO"
log_file: str = "filesystem_server.log"
audit_log: str = "filesystem_audit.log"
@classmethod
def from_env(cls) -> 'FileSystemConfig':
"""Create configuration from environment variables"""
allowed_paths = os.getenv("MCP_ALLOWED_PATHS", os.getcwd()).split(":")
return cls(
allowed_paths=allowed_paths,
max_file_size=int(os.getenv("MCP_MAX_FILE_SIZE", str(10 * 1024 * 1024))),
allowed_extensions=os.getenv("MCP_ALLOWED_EXTENSIONS", "").split(",") if os.getenv("MCP_ALLOWED_EXTENSIONS") else None,
blocked_extensions=os.getenv("MCP_BLOCKED_EXTENSIONS", "").split(",") if os.getenv("MCP_BLOCKED_EXTENSIONS") else None,
max_search_results=int(os.getenv("MCP_MAX_SEARCH_RESULTS", "1000")),
cache_enabled=os.getenv("MCP_CACHE_ENABLED", "true").lower() == "true",
cache_ttl=int(os.getenv("MCP_CACHE_TTL", "300")),
log_level=os.getenv("MCP_LOG_LEVEL", "INFO"),
log_file=os.getenv("MCP_LOG_FILE", "filesystem_server.log"),
audit_log=os.getenv("MCP_AUDIT_LOG", "filesystem_audit.log")
)
def validate_file_access(self, file_path: str) -> bool:
"""Validate if file access is allowed based on configuration"""
# Check path validation
abs_path = os.path.abspath(file_path)
for allowed in self.allowed_paths:
if abs_path.startswith(os.path.abspath(allowed)):
break
else:
return False
# Check file size
try:
if os.path.getsize(file_path) > self.max_file_size:
return False
except OSError:
return False
# Check extensions
_, ext = os.path.splitext(file_path)
ext = ext.lower()
if self.allowed_extensions and ext not in self.allowed_extensions:
return False
if self.blocked_extensions and ext in self.blocked_extensions:
return False
return True
Let's create a comprehensive test suite to ensure our file system server works correctly and securely.
Unit Tests:
# tests/test_filesystem_server.py
import pytest
import asyncio
import tempfile
import os
from pathlib import Path
from servers.filesystem.server import FileSystemServer
from servers.filesystem.config import FileSystemConfig
class TestFileSystemServer:
@pytest.fixture
def temp_dir(self):
"""Create a temporary directory for testing"""
with tempfile.TemporaryDirectory() as temp_dir:
yield temp_dir
@pytest.fixture
def server(self, temp_dir):
"""Create a test server instance"""
config = FileSystemConfig(allowed_paths=[temp_dir])
return FileSystemServer(allowed_paths=[temp_dir])
@pytest.fixture
def test_files(self, temp_dir):
"""Create test files for testing"""
# Create test directory structure
test_dir = Path(temp_dir)
# Create test files
(test_dir / "test.txt").write_text("Hello, World!")
(test_dir / "subdir").mkdir()
(test_dir / "subdir" / "nested.txt").write_text("Nested content")
(test_dir / "data.json").write_text('{"key": "value"}')
return test_dir
@pytest.mark.asyncio
async def test_list_resources(self, server, test_files):
"""Test resource listing"""
result = await server._list_resources()
assert len(result.resources) >= 3 # At least our test files
resource_uris = [r.uri for r in result.resources]
# Check that our test files are listed
assert any("test.txt" in uri for uri in resource_uris)
assert any("nested.txt" in uri for uri in resource_uris)
assert any("data.json" in uri for uri in resource_uris)
@pytest.mark.asyncio
async def test_read_text_file(self, server, test_files):
"""Test reading a text file"""
file_path = str(test_files / "test.txt")
uri = f"file://{file_path}"
result = await server._read_resource(uri)
assert len(result.contents) == 1
assert result.contents[0].type == "text"
assert "Hello, World!" in result.contents[0].text
@pytest.mark.asyncio
async def test_read_json_file(self, server, test_files):
"""Test reading a JSON file"""
file_path = str(test_files / "data.json")
uri = f"file://{file_path}"
result = await server._read_resource(uri)
assert len(result.contents) == 1
assert result.contents[0].type == "text"
assert '"key": "value"' in result.contents[0].text
@pytest.mark.asyncio
async def test_security_path_validation(self, server, temp_dir):
"""Test security path validation"""
# Try to access file outside allowed directory
outside_file = "/etc/passwd" # This should be blocked
with pytest.raises(ValueError, match="Access denied"):
await server._read_resource(f"file://{outside_file}")
@pytest.mark.asyncio
async def test_tool_list_directory(self, server, test_files):
"""Test directory listing tool"""
result = await server._tool_list_directory({"path": str(test_files)})
assert not result.isError
assert "test.txt" in result.content[0].text
assert "subdir" in result.content[0].text
@pytest.mark.asyncio
async def test_tool_search_files(self, server, test_files):
"""Test file search tool"""
# Search by pattern
result = await server._tool_search_files({
"pattern": "*.txt",
"path": str(test_files)
})
assert not result.isError
assert "test.txt" in result.content[0].text
assert "nested.txt" in result.content[0].text
@pytest.mark.asyncio
async def test_tool_write_file(self, server, test_files):
"""Test file writing tool"""
new_file = test_files / "new_file.txt"
result = await server._tool_write_file({
"path": str(new_file),
"content": "New content"
})
assert not result.isError
assert new_file.exists()
assert new_file.read_text() == "New content"
Integration Tests:
# tests/test_integration.py
import pytest
import asyncio
import json
from mcp.client.session import ClientSession
from mcp.client.stdio import stdio_client
class TestFileSystemIntegration:
@pytest.mark.asyncio
async def test_full_workflow(self):
"""Test complete client-server workflow"""
# Start server process
server_process = await asyncio.create_subprocess_exec(
"python", "-m", "servers.filesystem.server",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
try:
# Connect client
async with stdio_client(server_process.stdin, server_process.stdout) as (read, write):
async with ClientSession(read, write) as session:
# Initialize session
await session.initialize()
# List resources
resources = await session.list_resources()
assert len(resources.resources) > 0
# Read first resource
if resources.resources:
first_resource = resources.resources[0]
content = await session.read_resource(first_resource.uri)
assert len(content.contents) > 0
# List tools
tools = await session.list_tools()
assert len(tools.tools) > 0
# Call list_directory tool
if tools.tools:
result = await session.call_tool("list_directory", {"path": "."})
assert not result.isError
finally:
# Clean up server process
server_process.terminate()
await server_process.wait()
Now that you've built your first MCP server, let's create sophisticated clients that can interact with multiple servers, handle complex workflows, and provide rich user experiences.
A production-ready MCP client needs to manage connections to multiple servers, handle failover, and provide unified access to resources and tools.
Advanced Client Implementation:
# clients/advanced/multi_server_client.py
import asyncio
import json
import logging
from typing import Dict, List, Optional, Any, Union
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from mcp.client.session import ClientSession
from mcp.client.stdio import stdio_client
from mcp.types import Resource, Tool, CallToolResult, ReadResourceResult
@dataclass
class ServerConnection:
"""Represents a connection to an MCP server"""
name: str
command: List[str]
session: Optional[ClientSession] = None
process: Optional[asyncio.subprocess.Process] = None
last_connected: Optional[datetime] = None
is_healthy: bool = True
retry_count: int = 0
max_retries: int = 3
class MultiServerMCPClient:
"""Advanced MCP client that manages multiple server connections"""
def __init__(self):
self.servers: Dict[str, ServerConnection] = {}
self.resource_cache: Dict[str, Any] = {}
self.tool_cache: Dict[str, List[Tool]] = {}
self.cache_ttl = timedelta(minutes=5)
self.logger = logging.getLogger(__name__)
async def add_server(self, name: str, command: List[str]) -> bool:
"""Add a new server connection"""
try:
connection = ServerConnection(name=name, command=command)
await self._connect_server(connection)
self.servers[name] = connection
self.logger.info(f"Successfully connected to server: {name}")
return True
except Exception as e:
self.logger.error(f"Failed to connect to server {name}: {e}")
return False
async def _connect_server(self, connection: ServerConnection) -> None:
"""Establish connection to a server"""
process = await asyncio.create_subprocess_exec(
*connection.command,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
async with stdio_client(process.stdin, process.stdout) as (read, write):
session = ClientSession(read, write)
await session.initialize()
connection.process = process
connection.session = session
connection.last_connected = datetime.now()
connection.is_healthy = True
connection.retry_count = 0
async def health_check(self) -> Dict[str, bool]:
"""Check health of all server connections"""
health_status = {}
for name, connection in self.servers.items():
try:
if connection.session:
# Try to list resources as a health check
await connection.session.list_resources()
connection.is_healthy = True
health_status[name] = True
else:
health_status[name] = False
except Exception as e:
self.logger.warning(f"Health check failed for {name}: {e}")
connection.is_healthy = False
health_status[name] = False
# Attempt reconnection
if connection.retry_count < connection.max_retries:
await self._reconnect_server(connection)
return health_status
async def _reconnect_server(self, connection: ServerConnection) -> None:
"""Attempt to reconnect to a server"""
try:
connection.retry_count += 1
self.logger.info(f"Attempting to reconnect to {connection.name} (attempt {connection.retry_count})")
# Clean up existing connection
if connection.process:
connection.process.terminate()
await connection.process.wait()
# Establish new connection
await self._connect_server(connection)
self.logger.info(f"Successfully reconnected to {connection.name}")
except Exception as e:
self.logger.error(f"Reconnection failed for {connection.name}: {e}")
async def list_all_resources(self, refresh_cache: bool = False) -> Dict[str, List[Resource]]:
"""List resources from all servers"""
all_resources = {}
for name, connection in self.servers.items():
if not connection.is_healthy:
continue
cache_key = f"resources:{name}"
# Check cache
if not refresh_cache and cache_key in self.resource_cache:
cached_data, timestamp = self.resource_cache[cache_key]
if datetime.now() - timestamp < self.cache_ttl:
all_resources[name] = cached_data
continue
try:
resources = await connection.session.list_resources()
all_resources[name] = resources.resources
# Update cache
self.resource_cache[cache_key] = (resources.resources, datetime.now())
except Exception as e:
self.logger.error(f"Failed to list resources from {name}: {e}")
all_resources[name] = []
return all_resources
async def list_all_tools(self, refresh_cache: bool = False) -> Dict[str, List[Tool]]:
"""List tools from all servers"""
all_tools = {}
for name, connection in self.servers.items():
if not connection.is_healthy:
continue
cache_key = f"tools:{name}"
# Check cache
if not refresh_cache and cache_key in self.tool_cache:
cached_data, timestamp = self.tool_cache[cache_key]
if datetime.now() - timestamp < self.cache_ttl:
all_tools[name] = cached_data
continue
try:
tools = await connection.session.list_tools()
all_tools[name] = tools.tools
# Update cache
self.tool_cache[cache_key] = (tools.tools, datetime.now())
except Exception as e:
self.logger.error(f"Failed to list tools from {name}: {e}")
all_tools[name] = []
return all_tools
async def read_resource(self, uri: str, server_name: Optional[str] = None) -> Optional[ReadResourceResult]:
"""Read a resource from the appropriate server"""
if server_name:
# Read from specific server
return await self._read_resource_from_server(server_name, uri)
else:
# Try to find the server that has this resource
all_resources = await self.list_all_resources()
for name, resources in all_resources.items():
for resource in resources:
if resource.uri == uri:
return await self._read_resource_from_server(name, uri)
raise ValueError(f"Resource not found: {uri}")
async def _read_resource_from_server(self, server_name: str, uri: str) -> ReadResourceResult:
"""Read resource from a specific server"""
connection = self.servers.get(server_name)
if not connection or not connection.is_healthy:
raise ValueError(f"Server not available: {server_name}")
try:
return await connection.session.read_resource(uri)
except Exception as e:
self.logger.error(f"Failed to read resource {uri} from {server_name}: {e}")
raise
async def call_tool(self, tool_name: str, arguments: Dict[str, Any],
server_name: Optional[str] = None) -> CallToolResult:
"""Call a tool on the appropriate server"""
if server_name:
# Call on specific server
return await self._call_tool_on_server(server_name, tool_name, arguments)
else:
# Find the server that has this tool
all_tools = await self.list_all_tools()
for name, tools in all_tools.items():
for tool in tools:
if tool.name == tool_name:
return await self._call_tool_on_server(name, tool_name, arguments)
raise ValueError(f"Tool not found: {tool_name}")
async def _call_tool_on_server(self, server_name: str, tool_name: str,
arguments: Dict[str, Any]) -> CallToolResult:
"""Call tool on a specific server"""
connection = self.servers.get(server_name)
if not connection or not connection.is_healthy:
raise ValueError(f"Server not available: {server_name}")
try:
return await connection.session.call_tool(tool_name, arguments)
except Exception as e:
self.logger.error(f"Failed to call tool {tool_name} on {server_name}: {e}")
raise
async def search_resources(self, query: str) -> Dict[str, List[Resource]]:
"""Search for resources across all servers"""
all_resources = await self.list_all_resources()
matching_resources = {}
query_lower = query.lower()
for server_name, resources in all_resources.items():
matches = []
for resource in resources:
if (query_lower in resource.name.lower() or
query_lower in resource.description.lower()):
matches.append(resource)
if matches:
matching_resources[server_name] = matches
return matching_resources
async def get_server_status(self) -> Dict[str, Dict[str, Any]]:
"""Get detailed status of all servers"""
status = {}
for name, connection in self.servers.items():
status[name] = {
"is_healthy": connection.is_healthy,
"last_connected": connection.last_connected.isoformat() if connection.last_connected else None,
"retry_count": connection.retry_count,
"max_retries": connection.max_retries,
"command": " ".join(connection.command)
}
return status
async def cleanup(self) -> None:
"""Clean up all server connections"""
for name, connection in self.servers.items():
try:
if connection.process:
connection.process.terminate()
await connection.process.wait()
except Exception as e:
self.logger.error(f"Error cleaning up server {name}: {e}")
self.servers.clear()
self.resource_cache.clear()
self.tool_cache.clear()
# Usage example
async def main():
client = MultiServerMCPClient()
try:
# Add servers
await client.add_server("filesystem", ["python", "-m", "servers.filesystem.server"])
await client.add_server("database", ["python", "-m", "servers.database.server"])
# Check health
health = await client.health_check()
print("Server health:", health)
# List all resources
resources = await client.list_all_resources()
print("Available resources:", {name: len(res) for name, res in resources.items()})
# List all tools
tools = await client.list_all_tools()
print("Available tools:", {name: [t.name for t in tool_list] for name, tool_list in tools.items()})
# Example: Call a tool
if tools.get("filesystem"):
fs_tools = tools["filesystem"]
if any(t.name == "list_directory" for t in fs_tools):
result = await client.call_tool("list_directory", {"path": "."}, "filesystem")
print("Directory listing:", result.content[0].text if result.content else "No content")
finally:
await client.cleanup()
if __name__ == "__main__":
asyncio.run(main())
Let's build a command-line interface that provides an interactive way to explore and interact with MCP servers.
# clients/cli/interactive_client.py
import asyncio
import cmd
import json
import sys
from typing import List, Dict, Any, Optional
from datetime import datetime
from clients.advanced.multi_server_client import MultiServerMCPClient
class InteractiveMCPClient(cmd.Cmd):
"""Interactive command-line interface for MCP"""
intro = """
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Interactive MCP Client - Interactive Mode β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ£
β Type 'help' for available commands or '?' for command help β
β Type 'servers' to see connected servers β
β Type 'resources' to list available resources β
β Type 'tools' to list available tools β
β Type 'quit' or 'exit' to leave β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
"""
prompt = "MCP> "
def __init__(self):
super().__init__()
self.client = MultiServerMCPClient()
self.current_server = None
self.setup_logging()
def setup_logging(self):
"""Setup logging for the client"""
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
async def initialize(self):
"""Initialize the client with default servers"""
print("π Initializing MCP Client...")
# Try to add common servers
servers_to_try = [
("filesystem", ["python", "-m", "servers.filesystem.server"]),
("database", ["python", "-m", "servers.database.server"]),
]
for name, command in servers_to_try:
try:
success = await self.client.add_server(name, command)
if success:
print(f"β
Connected to {name} server")
else:
print(f"β Failed to connect to {name} server")
except Exception as e:
print(f"β οΈ Could not connect to {name}: {e}")
# Perform health check
health = await self.client.health_check()
print(f"\nπ Server Status:")
for name, is_healthy in health.items():
status = "π’ Healthy" if is_healthy else "π΄ Unhealthy"
print(f" {name}: {status}")
def do_servers(self, arg):
"""List all connected servers and their status"""
asyncio.run(self._cmd_servers())
async def _cmd_servers(self):
"""Implementation of servers command"""
status = await self.client.get_server_status()
print("\nπ₯οΈ Connected Servers:")
print("-" * 60)
for name, info in status.items():
health_icon = "π’" if info["is_healthy"] else "π΄"
print(f"{health_icon} {name}")
print(f" Command: {info['command']}")
print(f" Last Connected: {info['last_connected'] or 'Never'}")
print(f" Retry Count: {info['retry_count']}/{info['max_retries']}")
print()
def do_resources(self, arg):
"""List available resources. Usage: resources [server_name]"""
asyncio.run(self._cmd_resources(arg))
async def _cmd_resources(self, arg):
"""Implementation of resources command"""
try:
if arg:
# List resources from specific server
resources = await self.client.list_all_resources()
if arg in resources:
self._display_resources(arg, resources[arg])
else:
print(f"β Server '{arg}' not found")
else:
# List resources from all servers
all_resources = await self.client.list_all_resources()
for server_name, resource_list in all_resources.items():
if resource_list:
self._display_resources(server_name, resource_list)
except Exception as e:
print(f"β Error listing resources: {e}")
def _display_resources(self, server_name: str, resources: List):
"""Display a list of resources"""
print(f"\nπ Resources from {server_name}:")
print("-" * 60)
for i, resource in enumerate(resources, 1):
print(f"{i:2d}. {resource.name}")
print(f" URI: {resource.uri}")
print(f" Type: {resource.mimeType}")
if resource.description:
print(f" Description: {resource.description}")
if resource.metadata:
print(f" Metadata: {json.dumps(resource.metadata, indent=6)}")
print()
def do_tools(self, arg):
"""List available tools. Usage: tools [server_name]"""
asyncio.run(self._cmd_tools(arg))
async def _cmd_tools(self, arg):
"""Implementation of tools command"""
try:
if arg:
# List tools from specific server
tools = await self.client.list_all_tools()
if arg in tools:
self._display_tools(arg, tools[arg])
else:
print(f"β Server '{arg}' not found")
else:
# List tools from all servers
all_tools = await self.client.list_all_tools()
for server_name, tool_list in all_tools.items():
if tool_list:
self._display_tools(server_name, tool_list)
except Exception as e:
print(f"β Error listing tools: {e}")
def _display_tools(self, server_name: str, tools: List):
"""Display a list of tools"""
print(f"\nπ§ Tools from {server_name}:")
print("-" * 60)
for i, tool in enumerate(tools, 1):
print(f"{i:2d}. {tool.name}")
print(f" Description: {tool.description}")
if tool.inputSchema:
print(f" Schema: {json.dumps(tool.inputSchema, indent=6)}")
print()
def do_read(self, arg):
"""Read a resource. Usage: read <resource_uri> [server_name]"""
if not arg:
print("β Usage: read <resource_uri> [server_name]")
return
parts = arg.split(maxsplit=1)
uri = parts[0]
server_name = parts[1] if len(parts) > 1 else None
asyncio.run(self._cmd_read(uri, server_name))
async def _cmd_read(self, uri: str, server_name: Optional[str]):
"""Implementation of read command"""
try:
result = await self.client.read_resource(uri, server_name)
print(f"\nπ Resource: {uri}")
print("-" * 60)
for content in result.contents:
if content.type == "text":
print(content.text)
elif content.type == "image":
print(f"[Image: {content.mediaType}, Size: {len(content.data)} bytes]")
elif content.type == "resource":
print(f"[Resource: {content.resource.uri}]")
except Exception as e:
print(f"β Error reading resource: {e}")
def do_call(self, arg):
"""Call a tool. Usage: call <tool_name> [server_name]"""
if not arg:
print("β Usage: call <tool_name> [server_name]")
return
parts = arg.split(maxsplit=1)
tool_name = parts[0]
server_name = parts[1] if len(parts) > 1 else None
asyncio.run(self._cmd_call(tool_name, server_name))
async def _cmd_call(self, tool_name: str, server_name: Optional[str]):
"""Implementation of call command"""
try:
# Get tool schema first
all_tools = await self.client.list_all_tools()
tool = None
if server_name:
if server_name in all_tools:
tool = next((t for t in all_tools[server_name] if t.name == tool_name), None)
else:
for tools in all_tools.values():
tool = next((t for t in tools if t.name == tool_name), None)
if tool:
break
if not tool:
print(f"β Tool '{tool_name}' not found")
return
# Get arguments from user
arguments = {}
if tool.inputSchema and "properties" in tool.inputSchema:
print(f"\nπ§ Tool: {tool_name}")
print(f"Description: {tool.description}")
print("\nEnter arguments (press Enter to skip optional arguments):")
for prop_name, prop_schema in tool.inputSchema["properties"].items():
is_required = prop_name in tool.inputSchema.get("required", [])
prompt = f"{prop_name}"
if not is_required:
prompt += " (optional)"
prompt += ": "
value = input(prompt).strip()
if value:
# Try to parse as JSON, otherwise use as string
try:
arguments[prop_name] = json.loads(value)
except json.JSONDecodeError:
arguments[prop_name] = value
elif is_required:
print(f"β Required argument '{prop_name}' missing")
return
# Call the tool
print(f"\nπ Calling {tool_name}...")
result = await self.client.call_tool(tool_name, arguments, server_name)
if result.isError:
print(f"β Tool execution failed:")
for content in result.content:
if content.type == "text":
print(content.text)
else:
print(f"β
Tool executed successfully:")
for content in result.content:
if content.type == "text":
print(content.text)
elif content.type == "image":
print(f"[Image result: {content.mediaType}]")
elif content.type == "resource":
print(f"[Resource result: {content.resource.uri}]")
except Exception as e:
print(f"β Error calling tool: {e}")
def do_search(self, arg):
"""Search for resources. Usage: search <query>"""
if not arg:
print("β Usage: search <query>")
return
asyncio.run(self._cmd_search(arg))
async def _cmd_search(self, query: str):
"""Implementation of search command"""
try:
results = await self.client.search_resources(query)
if not results:
print(f"π No resources found for '{query}'")
return
print(f"\nπ Search results for '{query}':")
print("-" * 60)
for server_name, resources in results.items():
print(f"\nπ From {server_name}:")
for resource in resources:
print(f" β’ {resource.name}")
print(f" URI: {resource.uri}")
print()
except Exception as e:
print(f"β Error searching: {e}")
def do_health(self, arg):
"""Check health of all servers"""
asyncio.run(self._cmd_health())
async def _cmd_health(self):
"""Implementation of health command"""
try:
health = await self.client.health_check()
print("\nπ₯ Server Health Check:")
print("-" * 30)
for name, is_healthy in health.items():
status = "π’ Healthy" if is_healthy else "π΄ Unhealthy"
print(f"{name}: {status}")
except Exception as e:
print(f"β Error checking health: {e}")
def do_quit(self, arg):
"""Exit the client"""
print("π Goodbye!")
asyncio.run(self.client.cleanup())
return True
def do_exit(self, arg):
"""Exit the client"""
return self.do_quit(arg)
def default(self, line):
"""Handle unknown commands"""
print(f"β Unknown command: {line}")
print("Type 'help' for available commands")
# Main entry point
async def main():
client = InteractiveMCPClient()
await client.initialize()
try:
client.cmdloop()
except KeyboardInterrupt:
print("\nπ Goodbye!")
finally:
await client.cleanup()
if __name__ == "__main__":
asyncio.run(main())
Let's apply our MCP knowledge to build real-world applications that solve practical problems. We'll create a comprehensive project management system that demonstrates advanced MCP patterns and best practices.
Our project management system will integrate multiple MCP servers to provide a unified interface for managing projects, tasks, resources, and team collaboration.
System Components:
# servers/project_management/task_server.py
import asyncio
import json
import uuid
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, asdict
from enum import Enum
from mcp.server import Server
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
from mcp.types import (
Resource, Tool, TextContent, ListResourcesResult,
ReadResourceResult, CallToolResult
)
class TaskStatus(Enum):
TODO = "todo"
IN_PROGRESS = "in_progress"
REVIEW = "review"
DONE = "done"
BLOCKED = "blocked"
class TaskPriority(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
URGENT = "urgent"
@dataclass
class Task:
id: str
title: str
description: str
status: TaskStatus
priority: TaskPriority
assignee: Optional[str]
created_by: str
created_at: datetime
updated_at: datetime
due_date: Optional[datetime]
tags: List[str]
project_id: str
estimated_hours: Optional[float]
actual_hours: Optional[float]
@dataclass
class Project:
id: str
name: str
description: str
created_by: str
created_at: datetime
updated_at: datetime
status: str
team_members: List[str]
class TaskManagementServer:
def __init__(self, storage_file: str = "tasks.json"):
self.server = Server("task-management-server")
self.storage_file = storage_file
self.projects: Dict[str, Project] = {}
self.tasks: Dict[str, Task] = {}
self._setup_handlers()
self._load_data()
def _setup_handlers(self):
"""Register MCP protocol handlers"""
self.server.list_resources = self._list_resources
self.server.read_resource = self._read_resource
self.server.list_tools = self._list_tools
self.server.call_tool = self._call_tool
def _load_data(self):
"""Load data from storage file"""
try:
with open(self.storage_file, 'r') as f:
data = json.load(f)
# Load projects
for proj_data in data.get('projects', []):
project = Project(
id=proj_data['id'],
name=proj_data['name'],
description=proj_data['description'],
created_by=proj_data['created_by'],
created_at=datetime.fromisoformat(proj_data['created_at']),
updated_at=datetime.fromisoformat(proj_data['updated_at']),
status=proj_data['status'],
team_members=proj_data['team_members']
)
self.projects[project.id] = project
# Load tasks
for task_data in data.get('tasks', []):
task = Task(
id=task_data['id'],
title=task_data['title'],
description=task_data['description'],
status=TaskStatus(task_data['status']),
priority=TaskPriority(task_data['priority']),
assignee=task_data.get('assignee'),
created_by=task_data['created_by'],
created_at=datetime.fromisoformat(task_data['created_at']),
updated_at=datetime.fromisoformat(task_data['updated_at']),
due_date=datetime.fromisoformat(task_data['due_date']) if task_data.get('due_date') else None,
tags=task_data['tags'],
project_id=task_data['project_id'],
estimated_hours=task_data.get('estimated_hours'),
actual_hours=task_data.get('actual_hours')
)
self.tasks[task.id] = task
except FileNotFoundError:
# Initialize with sample data
self._initialize_sample_data()
except Exception as e:
print(f"Error loading data: {e}")
self._initialize_sample_data()
def _save_data(self):
"""Save data to storage file"""
try:
data = {
'projects': [asdict(proj) for proj in self.projects.values()],
'tasks': [asdict(task) for task in self.tasks.values()]
}
with open(self.storage_file, 'w') as f:
json.dump(data, f, indent=2, default=str)
except Exception as e:
print(f"Error saving data: {e}")
def _initialize_sample_data(self):
"""Initialize with sample project and task data"""
# Create sample project
project = Project(
id=str(uuid.uuid4()),
name="Sample Project",
description="A sample project for demonstration",
created_by="admin",
created_at=datetime.now(),
updated_at=datetime.now(),
status="active",
team_members=["alice", "bob", "charlie"]
)
self.projects[project.id] = project
# Create sample tasks
tasks = [
Task(
id=str(uuid.uuid4()),
title="Setup project structure",
description="Create initial project structure and documentation",
status=TaskStatus.DONE,
priority=TaskPriority.HIGH,
assignee="alice",
created_by="admin",
created_at=datetime.now() - timedelta(days=5),
updated_at=datetime.now() - timedelta(days=3),
due_date=datetime.now() - timedelta(days=2),
tags=["setup", "infrastructure"],
project_id=project.id,
estimated_hours=8.0,
actual_hours=6.5
),
Task(
id=str(uuid.uuid4()),
title="Implement core features",
description="Implement the main functionality",
status=TaskStatus.IN_PROGRESS,
priority=TaskPriority.HIGH,
assignee="bob",
created_by="admin",
created_at=datetime.now() - timedelta(days=3),
updated_at=datetime.now() - timedelta(days=1),
due_date=datetime.now() + timedelta(days=5),
tags=["development", "core"],
project_id=project.id,
estimated_hours=40.0,
actual_hours=15.0
),
Task(
id=str(uuid.uuid4()),
title="Write documentation",
description="Create comprehensive documentation",
status=TaskStatus.TODO,
priority=TaskPriority.MEDIUM,
assignee="charlie",
created_by="admin",
created_at=datetime.now() - timedelta(days=2),
updated_at=datetime.now() - timedelta(days=2),
due_date=datetime.now() + timedelta(days=10),
tags=["documentation"],
project_id=project.id,
estimated_hours=16.0,
actual_hours=None
)
]
for task in tasks:
self.tasks[task.id] = task
self._save_data()
async def _list_resources(self) -> ListResourcesResult:
"""List available task and project resources"""
resources = []
# Add project resources
for project in self.projects.values():
resources.append(Resource(
uri=f"project://{project.id}",
name=f"Project: {project.name}",
description=f"Project: {project.description}",
mimeType="application/json"
))
# Add task resources
for task in self.tasks.values():
resources.append(Resource(
uri=f"task://{task.id}",
name=f"Task: {task.title}",
description=f"Task: {task.description}",
mimeType="application/json"
))
# Add summary resources
resources.append(Resource(
uri="summary://projects",
name="Projects Summary",
description="Summary of all projects",
mimeType="application/json"
))
resources.append(Resource(
uri="summary://tasks",
name="Tasks Summary",
description="Summary of all tasks",
mimeType="application/json"
))
return ListResourcesResult(resources=resources)
async def _read_resource(self, uri: str) -> ReadResourceResult:
"""Read a specific resource"""
try:
if uri.startswith("project://"):
project_id = uri[11:] # Remove "project://" prefix
if project_id in self.projects:
project = self.projects[project_id]
project_data = asdict(project)
# Add related tasks
project_tasks = [
asdict(task) for task in self.tasks.values()
if task.project_id == project_id
]
project_data['tasks'] = project_tasks
return ReadResourceResult(contents=[
TextContent(
type="text",
text=json.dumps(project_data, indent=2, default=str)
)
])
else:
raise ValueError(f"Project not found: {project_id}")
elif uri.startswith("task://"):
task_id = uri[7:] # Remove "task://" prefix
if task_id in self.tasks:
task = self.tasks[task_id]
task_data = asdict(task)
# Add project information
if task.project_id in self.projects:
task_data['project'] = asdict(self.projects[task.project_id])
return ReadResourceResult(contents=[
TextContent(
type="text",
text=json.dumps(task_data, indent=2, default=str)
)
])
else:
raise ValueError(f"Task not found: {task_id}")
elif uri == "summary://projects":
projects_summary = {
"total_projects": len(self.projects),
"active_projects": len([p for p in self.projects.values() if p.status == "active"]),
"projects": [
{
"id": proj.id,
"name": proj.name,
"status": proj.status,
"team_size": len(proj.team_members),
"created_at": proj.created_at.isoformat()
}
for proj in self.projects.values()
]
}
return ReadResourceResult(contents=[
TextContent(
type="text",
text=json.dumps(projects_summary, indent=2, default=str)
)
])
elif uri == "summary://tasks":
tasks_summary = {
"total_tasks": len(self.tasks),
"by_status": {
status.value: len([t for t in self.tasks.values() if t.status == status])
for status in TaskStatus
},
"by_priority": {
priority.value: len([t for t in self.tasks.values() if t.priority == priority])
for priority in TaskPriority
},
"overdue_tasks": len([
t for t in self.tasks.values()
if t.due_date and t.due_date < datetime.now() and t.status != TaskStatus.DONE
]),
"tasks": [
{
"id": task.id,
"title": task.title,
"status": task.status.value,
"priority": task.priority.value,
"assignee": task.assignee,
"project_id": task.project_id,
"due_date": task.due_date.isoformat() if task.due_date else None
}
for task in self.tasks.values()
]
}
return ReadResourceResult(contents=[
TextContent(
type="text",
text=json.dumps(tasks_summary, indent=2, default=str)
)
])
else:
raise ValueError(f"Unknown resource URI: {uri}")
except Exception as e:
raise ValueError(f"Error reading resource {uri}: {str(e)}")
async def _list_tools(self) -> List[Tool]:
"""List available task management tools"""
return [
Tool(
name="create_project",
description="Create a new project",
inputSchema={
"type": "object",
"properties": {
"name": {"type": "string", "description": "Project name"},
"description": {"type": "string", "description": "Project description"},
"created_by": {"type": "string", "description": "Creator ID"},
"team_members": {
"type": "array",
"items": {"type": "string"},
"description": "Team member IDs"
}
},
"required": ["name", "description", "created_by"]
}
),
Tool(
name="create_task",
description="Create a new task",
inputSchema={
"type": "object",
"properties": {
"title": {"type": "string", "description": "Task title"},
"description": {"type": "string", "description": "Task description"},
"project_id": {"type": "string", "description": "Project ID"},
"assignee": {"type": "string", "description": "Assignee ID"},
"priority": {
"type": "string",
"enum": ["low", "medium", "high", "urgent"],
"description": "Task priority"
},
"due_date": {"type": "string", "description": "Due date (ISO format)"},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "Task tags"
},
"estimated_hours": {"type": "number", "description": "Estimated hours"},
"created_by": {"type": "string", "description": "Creator ID"}
},
"required": ["title", "description", "project_id", "created_by"]
}
),
Tool(
name="update_task_status",
description="Update task status",
inputSchema={
"type": "object",
"properties": {
"task_id": {"type": "string", "description": "Task ID"},
"status": {
"type": "string",
"enum": ["todo", "in_progress", "review", "done", "blocked"],
"description": "New status"
},
"updated_by": {"type": "string", "description": "Updater ID"}
},
"required": ["task_id", "status", "updated_by"]
}
),
Tool(
name="assign_task",
description="Assign a task to someone",
inputSchema={
"type": "object",
"properties": {
"task_id": {"type": "string", "description": "Task ID"},
"assignee": {"type": "string", "description": "Assignee ID"},
"assigned_by": {"type": "string", "description": "Who made the assignment"}
},
"required": ["task_id", "assignee", "assigned_by"]
}
),
Tool(
name="search_tasks",
description="Search for tasks",
inputSchema={
"type": "object",
"properties": {
"project_id": {"type": "string", "description": "Filter by project"},
"assignee": {"type": "string", "description": "Filter by assignee"},
"status": {
"type": "string",
"enum": ["todo", "in_progress", "review", "done", "blocked"],
"description": "Filter by status"
},
"priority": {
"type": "string",
"enum": ["low", "medium", "high", "urgent"],
"description": "Filter by priority"
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "Filter by tags"
},
"query": {"type": "string", "description": "Search in title/description"}
}
}
),
Tool(
name="get_project_analytics",
description="Get analytics for a project",
inputSchema={
"type": "object",
"properties": {
"project_id": {"type": "string", "description": "Project ID"}
},
"required": ["project_id"]
}
)
]
async def _call_tool(self, name: str, arguments: Dict[str, Any]) -> CallToolResult:
"""Execute a task management tool"""
try:
if name == "create_project":
return await self._tool_create_project(arguments)
elif name == "create_task":
return await self._tool_create_task(arguments)
elif name == "update_task_status":
return await self._tool_update_task_status(arguments)
elif name == "assign_task":
return await self._tool_assign_task(arguments)
elif name == "search_tasks":
return await self._tool_search_tasks(arguments)
elif name == "get_project_analytics":
return await self._tool_get_project_analytics(arguments)
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
return CallToolResult(
content=[TextContent(type="text", text=f"Error: {str(e)}")],
isError=True
)
async def _tool_create_project(self, args: Dict[str, Any]) -> CallToolResult:
"""Create a new project"""
project = Project(
id=str(uuid.uuid4()),
name=args["name"],
description=args["description"],
created_by=args["created_by"],
created_at=datetime.now(),
updated_at=datetime.now(),
status="active",
team_members=args.get("team_members", [])
)
self.projects[project.id] = project
self._save_data()
return CallToolResult(
content=[TextContent(
type="text",
text=f"Project created successfully:\n{json.dumps(asdict(project), indent=2, default=str)}"
)]
)
async def _tool_create_task(self, args: Dict[str, Any]) -> CallToolResult:
"""Create a new task"""
# Validate project exists
if args["project_id"] not in self.projects:
raise ValueError(f"Project not found: {args['project_id']}")
task = Task(
id=str(uuid.uuid4()),
title=args["title"],
description=args["description"],
status=TaskStatus.TODO,
priority=TaskPriority(args.get("priority", "medium")),
assignee=args.get("assignee"),
created_by=args["created_by"],
created_at=datetime.now(),
updated_at=datetime.now(),
due_date=datetime.fromisoformat(args["due_date"]) if args.get("due_date") else None,
tags=args.get("tags", []),
project_id=args["project_id"],
estimated_hours=args.get("estimated_hours"),
actual_hours=None
)
self.tasks[task.id] = task
self._save_data()
return CallToolResult(
content=[TextContent(
type="text",
text=f"Task created successfully:\n{json.dumps(asdict(task), indent=2, default=str)}"
)]
)
async def _tool_update_task_status(self, args: Dict[str, Any]) -> CallToolResult:
"""Update task status"""
task_id = args["task_id"]
new_status = TaskStatus(args["status"])
updated_by = args["updated_by"]
if task_id not in self.tasks:
raise ValueError(f"Task not found: {task_id}")
task = self.tasks[task_id]
old_status = task.status
task.status = new_status
task.updated_at = datetime.now()
# If task is being marked as done, set actual hours if not already set
if new_status == TaskStatus.DONE and task.actual_hours is None and task.estimated_hours:
task.actual_hours = task.estimated_hours # Default to estimated
self._save_data()
return CallToolResult(
content=[TextContent(
type="text",
text=f"Task status updated:\n"
f"Task: {task.title}\n"
f"Status: {old_status.value} β {new_status.value}\n"
f"Updated by: {updated_by}\n"
f"Updated at: {task.updated_at.isoformat()}"
)]
)
async def _tool_assign_task(self, args: Dict[str, Any]) -> CallToolResult:
"""Assign a task to someone"""
task_id = args["task_id"]
assignee = args["assignee"]
assigned_by = args["assigned_by"]
if task_id not in self.tasks:
raise ValueError(f"Task not found: {task_id}")
task = self.tasks[task_id]
old_assignee = task.assignee
task.assignee = assignee
task.updated_at = datetime.now()
self._save_data()
return CallToolResult(
content=[TextContent(
type="text",
text=f"Task assigned:\n"
f"Task: {task.title}\n"
f"Assignee: {old_assignee} β {assignee}\n"
f"Assigned by: {assigned_by}\n"
f"Assigned at: {task.updated_at.isoformat()}"
)]
)
async def _tool_search_tasks(self, args: Dict[str, Any]) -> CallToolResult:
"""Search for tasks"""
filtered_tasks = []
for task in self.tasks.values():
# Apply filters
if args.get("project_id") and task.project_id != args["project_id"]:
continue
if args.get("assignee") and task.assignee != args["assignee"]:
continue
if args.get("status") and task.status.value != args["status"]:
continue
if args.get("priority") and task.priority.value != args["priority"]:
continue
if args.get("tags"):
if not any(tag in task.tags for tag in args["tags"]):
continue
if args.get("query"):
query = args["query"].lower()
if (query not in task.title.lower() and
query not in task.description.lower()):
continue
filtered_tasks.append(asdict(task))
return CallToolResult(
content=[TextContent(
type="text",
text=f"Found {len(filtered_tasks)} matching tasks:\n" +
json.dumps(filtered_tasks, indent=2, default=str)
)]
)
async def _tool_get_project_analytics(self, args: Dict[str, Any]) -> CallToolResult:
"""Get analytics for a project"""
project_id = args["project_id"]
if project_id not in self.projects:
raise ValueError(f"Project not found: {project_id}")
project = self.projects[project_id]
project_tasks = [task for task in self.tasks.values() if task.project_id == project_id]
# Calculate analytics
total_tasks = len(project_tasks)
completed_tasks = len([t for t in project_tasks if t.status == TaskStatus.DONE])
in_progress_tasks = len([t for t in project_tasks if t.status == TaskStatus.IN_PROGRESS])
total_estimated = sum(t.estimated_hours or 0 for t in project_tasks)
total_actual = sum(t.actual_hours or 0 for t in project_tasks)
overdue_tasks = len([
t for t in project_tasks
if t.due_date and t.due_date < datetime.now() and t.status != TaskStatus.DONE
])
# Tasks by assignee
tasks_by_assignee = {}
for task in project_tasks:
if task.assignee:
if task.assignee not in tasks_by_assignee:
tasks_by_assignee[task.assignee] = {"total": 0, "completed": 0}
tasks_by_assignee[task.assignee]["total"] += 1
if task.status == TaskStatus.DONE:
tasks_by_assignee[task.assignee]["completed"] += 1
analytics = {
"project": {
"id": project.id,
"name": project.name,
"team_size": len(project.team_members)
},
"task_summary": {
"total_tasks": total_tasks,
"completed_tasks": completed_tasks,
"in_progress_tasks": in_progress_tasks,
"completion_rate": (completed_tasks / total_tasks * 100) if total_tasks > 0 else 0,
"overdue_tasks": overdue_tasks
},
"time_tracking": {
"total_estimated_hours": total_estimated,
"total_actual_hours": total_actual,
"efficiency": (total_estimated / total_actual) if total_actual > 0 else 0
},
"tasks_by_assignee": tasks_by_assignee,
"tasks_by_status": {
status.value: len([t for t in project_tasks if t.status == status])
for status in TaskStatus
},
"tasks_by_priority": {
priority.value: len([t for t in project_tasks if t.priority == priority])
for priority in TaskPriority
}
}
return CallToolResult(
content=[TextContent(
type="text",
text=json.dumps(analytics, indent=2, default=str)
)]
)
# Server entry point
async def main():
server = TaskManagementServer()
async with stdio_server() as (read_stream, write_stream):
await server.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="task-management-server",
server_version="1.0.0",
capabilities=server.server.get_capabilities(
notification_options=None,
experimental_capabilities=None,
),
),
)
if __name__ == "__main__":
asyncio.run(main())
Now let's create a gateway client that orchestrates multiple servers to provide a unified project management interface.
# clients/gateway/project_management_gateway.py
import asyncio
import json
from typing import Dict, List, Any, Optional
from datetime import datetime
from clients.advanced.multi_server_client import MultiServerMCPClient
class ProjectManagementGateway:
"""Unified gateway for project management across multiple MCP servers"""
def __init__(self):
self.client = MultiServerMCPClient()
self.task_server = "task-management"
self.file_server = "filesystem"
self.analytics_server = "analytics"
async def initialize(self):
"""Initialize all required servers"""
servers = [
(self.task_server, ["python", "-m", "servers.project_management.task_server"]),
(self.file_server, ["python", "-m", "servers.filesystem.server"]),
# Add more servers as needed
]
for name, command in servers:
try:
await self.client.add_server(name, command)
print(f"β
Connected to {name} server")
except Exception as e:
print(f"β Failed to connect to {name}: {e}")
async def create_project_workspace(self, project_name: str, description: str,
created_by: str, team_members: List[str]) -> Dict[str, Any]:
"""Create a complete project workspace with tasks and file structure"""
try:
# Create project in task management server
project_result = await self.client.call_tool(
"create_project",
{
"name": project_name,
"description": description,
"created_by": created_by,
"team_members": team_members
},
self.task_server
)
if project_result.isError:
return {"success": False, "error": "Failed to create project"}
# Extract project ID from result
project_data = json.loads(project_result.content[0].text)
project_id = project_data["id"]
# Create project directory structure
workspace_setup = await self._setup_project_workspace(project_id, project_name)
# Create initial tasks
initial_tasks = await self._create_initial_tasks(project_id, created_by)
return {
"success": True,
"project_id": project_id,
"workspace": workspace_setup,
"initial_tasks": initial_tasks
}
except Exception as e:
return {"success": False, "error": str(e)}
async def _setup_project_workspace(self, project_id: str, project_name: str) -> Dict[str, Any]:
"""Create file structure for the project"""
try:
# Create main project directory
project_dir = f"projects/{project_id}"
# Create directory structure
directories = [
f"{project_dir}/docs",
f"{project_dir}/src",
f"{project_dir}/tests",
f"{project_dir}/assets",
f"{project_dir}/meetings"
]
created_dirs = []
for directory in directories:
result = await self.client.call_tool(
"write_file",
{
"path": f"{directory}/.gitkeep",
"content": f"# {directory} directory\n"
},
self.file_server
)
created_dirs.append(directory)
# Create initial documentation
readme_content = f"""# {project_name}
## Description
Project documentation and resources.
## Structure
- `docs/` - Project documentation
- `src/` - Source code and implementation
- `tests/` - Test files and results
- `assets/` - Project assets and resources
- `meetings/` - Meeting notes and recordings
## Getting Started
Add project-specific getting started instructions here.
Created: {datetime.now().isoformat()}
"""
await self.client.call_tool(
"write_file",
{
"path": f"{project_dir}/README.md",
"content": readme_content
},
self.file_server
)
return {
"directories_created": created_dirs,
"readme_created": f"{project_dir}/README.md"
}
except Exception as e:
return {"error": str(e)}
async def _create_initial_tasks(self, project_id: str, created_by: str) -> List[Dict[str, Any]]:
"""Create initial tasks for the project"""
initial_tasks = [
{
"title": "Project Setup",
"description": "Complete initial project setup and configuration",
"priority": "high",
"tags": ["setup", "infrastructure"]
},
{
"title": "Requirements Gathering",
"description": "Gather and document project requirements",
"priority": "high",
"tags": ["planning", "requirements"]
},
{
"title": "Design Documentation",
"description": "Create technical design documents",
"priority": "medium",
"tags": ["design", "documentation"]
}
]
created_tasks = []
for task_data in initial_tasks:
try:
result = await self.client.call_tool(
"create_task",
{
"title": task_data["title"],
"description": task_data["description"],
"project_id": project_id,
"priority": task_data["priority"],
"tags": task_data["tags"],
"created_by": created_by
},
self.task_server
)
if not result.isError:
task_info = json.loads(result.content[0].text)
created_tasks.append(task_info)
except Exception as e:
print(f"Failed to create task {task_data['title']}: {e}")
return created_tasks
async def get_project_dashboard(self, project_id: str) -> Dict[str, Any]:
"""Get comprehensive project dashboard"""
try:
# Get project details
project_resource = await self.client.read_resource(f"project://{project_id}", self.task_server)
project_data = json.loads(project_resource.content[0].text)
# Get project analytics
analytics_result = await self.client.call_tool(
"get_project_analytics",
{"project_id": project_id},
self.task_server
)
analytics_data = json.loads(analytics_result.content[0].text) if not analytics_result.isError else {}
# Get project files
project_files = await self.client.call_tool(
"list_directory",
{"path": f"projects/{project_id}", "recursive": True},
self.file_server
)
files_data = json.loads(project_files.content[0].text) if not project_files.isError else []
# Combine all information
dashboard = {
"project": project_data,
"analytics": analytics_data,
"files": files_data,
"generated_at": datetime.now().isoformat()
}
return dashboard
except Exception as e:
return {"error": str(e)}
async def generate_project_report(self, project_id: str, format_type: str = "json") -> str:
"""Generate comprehensive project report"""
try:
dashboard = await self.get_project_dashboard(project_id)
if "error" in dashboard:
return f"Error generating report: {dashboard['error']}"
if format_type == "markdown":
return self._generate_markdown_report(dashboard)
elif format_type == "html":
return self._generate_html_report(dashboard)
else:
return json.dumps(dashboard, indent=2, default=str)
except Exception as e:
return f"Error generating report: {str(e)}"
def _generate_markdown_report(self, dashboard: Dict[str, Any]) -> str:
"""Generate markdown format report"""
project = dashboard["project"]
analytics = dashboard["analytics"]
report = f"""# Project Report: {project['name']}
## Project Overview
- **ID**: {project['id']}
- **Status**: {project['status']}
- **Created**: {project['created_at']}
- **Team Size**: {len(project['team_members'])}
- **Description**: {project['description']}
## Task Summary
"""
if "task_summary" in analytics:
summary = analytics["task_summary"]
report += f"""- **Total Tasks**: {summary['total_tasks']}
- **Completed**: {summary['completed_tasks']}
- **In Progress**: {summary['in_progress_tasks']}
- **Completion Rate**: {summary['completion_rate']:.1f}%
- **Overdue Tasks**: {summary['overdue_tasks']}
"""
report += "\n## Team Members\n"
for member in project['team_members']:
report += f"- {member}\n"
if "tasks_by_assignee" in analytics:
report += "\n## Tasks by Assignee\n"
for assignee, stats in analytics["tasks_by_assignee"].items():
report += f"- **{assignee}**: {stats['completed']}/{stats['total']} completed\n"
report += f"\n---\n*Report generated on {dashboard['generated_at']}*\n"
return report
def _generate_html_report(self, dashboard: Dict[str, Any]) -> str:
"""Generate HTML format report"""
project = dashboard["project"]
analytics = dashboard["analytics"]
html = f"""<!DOCTYPE html>
<html>
<head>
<title>Project Report: {project['name']}</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 40px; }}
.header {{ background-color: #f0f0f0; padding: 20px; border-radius: 5px; }}
.section {{ margin: 20px 0; }}
.metric {{ display: inline-block; margin: 10px; padding: 10px; background-color: #e9ecef; border-radius: 3px; }}
table {{ border-collapse: collapse; width: 100%; }}
th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
th {{ background-color: #f2f2f2; }}
</style>
</head>
<body>
<div class="header">
<h1>Project Report: {project['name']}</h1>
<p><strong>ID:</strong> {project['id']} | <strong>Status:</strong> {project['status']}</p>
<p>{project['description']}</p>
</div>
"""
if "task_summary" in analytics:
summary = analytics["task_summary"]
html += f"""
<div class="section">
<h2>Task Summary</h2>
<div class="metric">Total Tasks: {summary['total_tasks']}</div>
<div class="metric">Completed: {summary['completed_tasks']}</div>
<div class="metric">In Progress: {summary['in_progress_tasks']}</div>
<div class="metric">Completion Rate: {summary['completion_rate']:.1f}%</div>
</div>
"""
html += f"""
<div class="section">
<h2>Team Members</h2>
<ul>
"""
for member in project['team_members']:
html += f" <li>{member}</li>\n"
html += """ </ul>
</div>
<div class="section">
<p><em>Report generated on """ + dashboard['generated_at'] + """</em></p>
</div>
</body>
</html>"""
return html
async def cleanup(self):
"""Clean up all connections"""
await self.client.cleanup()
# Usage example
async def main():
gateway = ProjectManagementGateway()
try:
await gateway.initialize()
# Create a new project workspace
result = await gateway.create_project_workspace(
project_name="AI Development Project",
description="Develop an AI-powered application",
created_by="alice",
team_members=["alice", "bob", "charlie"]
)
if result["success"]:
project_id = result["project_id"]
print(f"β
Project created with ID: {project_id}")
# Generate project dashboard
dashboard = await gateway.get_project_dashboard(project_id)
print("π Project Dashboard:")
print(json.dumps(dashboard, indent=2, default=str))
# Generate reports
markdown_report = await gateway.generate_project_report(project_id, "markdown")
print("\nπ Markdown Report:")
print(markdown_report)
else:
print(f"β Failed to create project: {result['error']}")
finally:
await gateway.cleanup()
if __name__ == "__main__":
asyncio.run(main())
MCP Labs is not just about building standard MCP implementations - it's also about exploring cutting-edge features and experimental capabilities that push the boundaries of what's possible with MCP.
Let's implement experimental real-time collaboration features that allow multiple agents to work together on shared resources.
# servers/experimental/collaboration_server.py
import asyncio
import json
import websockets
from typing import Dict, List, Any, Set, Optional
from datetime import datetime
from dataclasses import dataclass, asdict
from mcp.server import Server
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
from mcp.types import (
Resource, Tool, TextContent, ListResourcesResult,
ReadResourceResult, CallToolResult
)
@dataclass
class CollaborationSession:
id: str
name: str
participants: Set[str]
shared_resources: List[str]
created_at: datetime
last_activity: datetime
is_active: bool
@dataclass
class CollaborationEvent:
id: str
session_id: str
participant_id: str
event_type: str # "join", "leave", "resource_update", "message"
data: Dict[str, Any]
timestamp: datetime
class ExperimentalCollaborationServer:
"""Experimental MCP server with real-time collaboration features"""
def __init__(self):
self.server = Server("collaboration-server")
self.sessions: Dict[str, CollaborationSession] = {}
self.events: List[CollaborationEvent] = []
self.websocket_connections: Dict[str, websockets.WebSocketServerProtocol] = {}
self._setup_handlers()
def _setup_handlers(self):
"""Register MCP protocol handlers"""
self.server.list_resources = self._list_resources
self.server.read_resource = self._read_resource
self.server.list_tools = self._list_tools
self.server.call_tool = self._call_tool
async def _list_resources(self) -> ListResourcesResult:
"""List collaboration resources"""
resources = []
# Add session resources
for session in self.sessions.values():
resources.append(Resource(
uri=f"collaboration://session/{session.id}",
name=f"Session: {session.name}",
description=f"Collaboration session with {len(session.participants)} participants",
mimeType="application/json"
))
# Add event log resource
resources.append(Resource(
uri="collaboration://events",
name="Collaboration Events",
description="Real-time collaboration event log",
mimeType="application/json"
))
return ListResourcesResult(resources=resources)
async def _read_resource(self, uri: str) -> ReadResourceResult:
"""Read collaboration resources"""
try:
if uri.startswith("collaboration://session/"):
session_id = uri.split("/")[-1]
if session_id in self.sessions:
session = self.sessions[session_id]
session_data = asdict(session)
session_data['participants'] = list(session.participants)
return ReadResourceResult(contents=[
TextContent(
type="text",
text=json.dumps(session_data, indent=2, default=str)
)
])
else:
raise ValueError(f"Session not found: {session_id}")
elif uri == "collaboration://events":
events_data = [asdict(event) for event in self.events[-100:]] # Last 100 events
return ReadResourceResult(contents=[
TextContent(
type="text",
text=json.dumps(events_data, indent=2, default=str)
)
])
else:
raise ValueError(f"Unknown resource URI: {uri}")
except Exception as e:
raise ValueError(f"Error reading resource {uri}: {str(e)}")
async def _list_tools(self) -> List[Tool]:
"""List collaboration tools"""
return [
Tool(
name="create_session",
description="Create a new collaboration session",
inputSchema={
"type": "object",
"properties": {
"name": {"type": "string", "description": "Session name"},
"creator_id": {"type": "string", "description": "Creator ID"},
"initial_resources": {
"type": "array",
"items": {"type": "string"},
"description": "Initial shared resources"
}
},
"required": ["name", "creator_id"]
}
),
Tool(
name="join_session",
description="Join a collaboration session",
inputSchema={
"type": "object",
"properties": {
"session_id": {"type": "string", "description": "Session ID"},
"participant_id": {"type": "string", "description": "Participant ID"}
},
"required": ["session_id", "participant_id"]
}
),
Tool(
name="leave_session",
description="Leave a collaboration session",
inputSchema={
"type": "object",
"properties": {
"session_id": {"type": "string", "description": "Session ID"},
"participant_id": {"type": "string", "description": "Participant ID"}
},
"required": ["session_id", "participant_id"]
}
),
Tool(
name="send_message",
description="Send a message to a collaboration session",
inputSchema={
"type": "object",
"properties": {
"session_id": {"type": "string", "description": "Session ID"},
"participant_id": {"type": "string", "description": "Participant ID"},
"message": {"type": "string", "description": "Message content"},
"message_type": {
"type": "string",
"enum": ["text", "code", "file_share"],
"description": "Message type",
"default": "text"
}
},
"required": ["session_id", "participant_id", "message"]
}
),
Tool(
name="update_shared_resource",
description="Update a shared resource in a session",
inputSchema={
"type": "object",
"properties": {
"session_id": {"type": "string", "description": "Session ID"},
"participant_id": {"type": "string", "description": "Participant ID"},
"resource_uri": {"type": "string", "description": "Resource URI"},
"update_data": {"type": "object", "description": "Update data"}
},
"required": ["session_id", "participant_id", "resource_uri", "update_data"]
}
),
Tool(
name="get_active_sessions",
description="Get list of active collaboration sessions",
inputSchema={
"type": "object",
"properties": {}
}
)
]
async def _call_tool(self, name: str, arguments: Dict[str, Any]) -> CallToolResult:
"""Execute collaboration tools"""
try:
if name == "create_session":
return await self._tool_create_session(arguments)
elif name == "join_session":
return await self._tool_join_session(arguments)
elif name == "leave_session":
return await self._tool_leave_session(arguments)
elif name == "send_message":
return await self._tool_send_message(arguments)
elif name == "update_shared_resource":
return await self._tool_update_shared_resource(arguments)
elif name == "get_active_sessions":
return await self._tool_get_active_sessions(arguments)
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
return CallToolResult(
content=[TextContent(type="text", text=f"Error: {str(e)}")],
isError=True
)
async def _tool_create_session(self, args: Dict[str, Any]) -> CallToolResult:
"""Create a new collaboration session"""
import uuid
session_id = str(uuid.uuid4())
session = CollaborationSession(
id=session_id,
name=args["name"],
participants=set([args["creator_id"]]),
shared_resources=args.get("initial_resources", []),
created_at=datetime.now(),
last_activity=datetime.now(),
is_active=True
)
self.sessions[session_id] = session
# Log creation event
await self._log_event(session_id, args["creator_id"], "create", {
"session_name": args["name"],
"initial_resources": session.shared_resources
})
return CallToolResult(
content=[TextContent(
type="text",
text=f"Collaboration session created:\n{json.dumps(asdict(session), indent=2, default=str)}"
)]
)
async def _tool_join_session(self, args: Dict[str, Any]) -> CallToolResult:
"""Join a collaboration session"""
session_id = args["session_id"]
participant_id = args["participant_id"]
if session_id not in self.sessions:
raise ValueError(f"Session not found: {session_id}")
session = self.sessions[session_id]
session.participants.add(participant_id)
session.last_activity = datetime.now()
# Log join event
await self._log_event(session_id, participant_id, "join", {})
return CallToolResult(
content=[TextContent(
type="text",
text=f"Joined session '{session.name}' successfully. Current participants: {len(session.participants)}"
)]
)
async def _tool_leave_session(self, args: Dict[str, Any]) -> CallToolResult:
"""Leave a collaboration session"""
session_id = args["session_id"]
participant_id = args["participant_id"]
if session_id not in self.sessions:
raise ValueError(f"Session not found: {session_id}")
session = self.sessions[session_id]
session.participants.discard(participant_id)
session.last_activity = datetime.now()
# Log leave event
await self._log_event(session_id, participant_id, "leave", {})
# Deactivate session if no participants
if len(session.participants) == 0:
session.is_active = False
return CallToolResult(
content=[TextContent(
type="text",
text=f"Left session '{session.name}'. Remaining participants: {len(session.participants)}"
)]
)
async def _tool_send_message(self, args: Dict[str, Any]) -> CallToolResult:
"""Send a message to a collaboration session"""
session_id = args["session_id"]
participant_id = args["participant_id"]
message = args["message"]
message_type = args.get("message_type", "text")
if session_id not in self.sessions:
raise ValueError(f"Session not found: {session_id}")
session = self.sessions[session_id]
if participant_id not in session.participants:
raise ValueError(f"Participant not in session: {participant_id}")
session.last_activity = datetime.now()
# Log message event
await self._log_event(session_id, participant_id, "message", {
"content": message,
"message_type": message_type
})
# Broadcast to WebSocket connections
await self._broadcast_to_session(session_id, {
"type": "message",
"participant_id": participant_id,
"message": message,
"message_type": message_type,
"timestamp": datetime.now().isoformat()
})
return CallToolResult(
content=[TextContent(
type="text",
text=f"Message sent to session '{session.name}'"
)]
)
async def _tool_update_shared_resource(self, args: Dict[str, Any]) -> CallToolResult:
"""Update a shared resource"""
session_id = args["session_id"]
participant_id = args["participant_id"]
resource_uri = args["resource_uri"]
update_data = args["update_data"]
if session_id not in self.sessions:
raise ValueError(f"Session not found: {session_id}")
session = self.sessions[session_id]
if participant_id not in session.participants:
raise ValueError(f"Participant not in session: {participant_id}")
session.last_activity = datetime.now()
# Add resource to shared resources if not already there
if resource_uri not in session.shared_resources:
session.shared_resources.append(resource_uri)
# Log resource update event
await self._log_event(session_id, participant_id, "resource_update", {
"resource_uri": resource_uri,
"update_data": update_data
})
# Broadcast to WebSocket connections
await self._broadcast_to_session(session_id, {
"type": "resource_update",
"participant_id": participant_id,
"resource_uri": resource_uri,
"update_data": update_data,
"timestamp": datetime.now().isoformat()
})
return CallToolResult(
content=[TextContent(
type="text",
text=f"Resource {resource_uri} updated in session '{session.name}'"
)]
)
async def _tool_get_active_sessions(self, args: Dict[str, Any]) -> CallToolResult:
"""Get list of active collaboration sessions"""
active_sessions = [
{
"id": session.id,
"name": session.name,
"participant_count": len(session.participants),
"resource_count": len(session.shared_resources),
"created_at": session.created_at.isoformat(),
"last_activity": session.last_activity.isoformat()
}
for session in self.sessions.values()
if session.is_active
]
return CallToolResult(
content=[TextContent(
type="text",
text=json.dumps(active_sessions, indent=2)
)]
)
async def _log_event(self, session_id: str, participant_id: str,
event_type: str, data: Dict[str, Any]):
"""Log a collaboration event"""
import uuid
event = CollaborationEvent(
id=str(uuid.uuid4()),
session_id=session_id,
participant_id=participant_id,
event_type=event_type,
data=data,
timestamp=datetime.now()
)
self.events.append(event)
# Keep only last 1000 events
if len(self.events) > 1000:
self.events = self.events[-1000:]
async def _broadcast_to_session(self, session_id: str, message: Dict[str, Any]):
"""Broadcast message to all WebSocket connections in a session"""
# This is a placeholder for WebSocket broadcasting
# In a real implementation, you would maintain WebSocket connections
# and broadcast to participants in the session
pass
# WebSocket server for real-time collaboration
async def start_websocket_server(self, host: str = "localhost", port: int = 8765):
"""Start WebSocket server for real-time collaboration"""
async def handle_websocket(websocket, path):
try:
# Handle WebSocket connection
async for message in websocket:
data = json.loads(message)
# Process WebSocket messages
await self._handle_websocket_message(websocket, data)
except websockets.exceptions.ConnectionClosed:
pass
finally:
# Clean up connection
pass
start_server = websockets.serve(handle_websocket, host, port)
await start_server
print(f"WebSocket server started on ws://{host}:{port}")
async def _handle_websocket_message(self, websocket, data: Dict[str, Any]):
"""Handle incoming WebSocket messages"""
# Implement WebSocket message handling
pass
# Server entry point
async def main():
server = ExperimentalCollaborationServer()
# Start WebSocket server in background
websocket_task = asyncio.create_task(server.start_websocket_server())
try:
async with stdio_server() as (read_stream, write_stream):
await server.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="collaboration-server",
server_version="1.0.0-experimental",
capabilities=server.server.get_capabilities(
notification_options=None,
experimental_capabilities={
"real_time_collaboration": True,
"websocket_support": True,
"event_streaming": True
},
),
),
)
finally:
websocket_task.cancel()
if __name__ == "__main__":
asyncio.run(main())
Comprehensive testing and debugging are crucial for building reliable MCP applications. Let's explore advanced testing strategies and debugging tools.
# tests/framework/mcp_test_framework.py
import asyncio
import json
import pytest
import logging
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass
from contextlib import asynccontextmanager
from mcp.client.session import ClientSession
from mcp.client.stdio import stdio_client
from mcp.types import Resource, Tool, CallToolResult, ReadResourceResult
@dataclass
class MCPTestConfig:
"""Configuration for MCP testing"""
server_command: List[str]
timeout: float = 30.0
retry_attempts: int = 3
log_level: str = "INFO"
expected_resources: Optional[List[str]] = None
expected_tools: Optional[List[str]] = None
class MCPTestFramework:
"""Advanced testing framework for MCP servers"""
def __init__(self, config: MCPTestConfig):
self.config = config
self.logger = self._setup_logger()
self._process = None
self._session = None
def _setup_logger(self) -> logging.Logger:
"""Setup test logger"""
logger = logging.getLogger(f"mcp_test_{id(self)}")
logger.setLevel(getattr(logging, self.config.log_level))
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
@asynccontextmanager
async def client_session(self):
"""Context manager for MCP client session"""
process = None
session = None
try:
# Start server process
self.logger.info(f"Starting server: {' '.join(self.config.server_command)}")
process = await asyncio.create_subprocess_exec(
*self.config.server_command,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
# Wait for server to start
await asyncio.sleep(0.5)
# Create client session
async with stdio_client(process.stdin, process.stdout) as (read, write):
session = ClientSession(read, write)
await session.initialize()
yield session
except Exception as e:
self.logger.error(f"Error in client session: {e}")
raise
finally:
# Cleanup
if session:
try:
await session.close()
except:
pass
if process:
try:
process.terminate()
await process.wait(timeout=5)
except asyncio.TimeoutError:
process.kill()
await process.wait()
async def test_server_startup(self) -> bool:
"""Test that server starts successfully"""
try:
async with self.client_session() as session:
self.logger.info("β
Server started successfully")
return True
except Exception as e:
self.logger.error(f"β Server startup failed: {e}")
return False
async def test_resource_listing(self) -> Dict[str, Any]:
"""Test resource listing functionality"""
result = {
"success": False,
"resources": [],
"errors": []
}
try:
async with self.client_session() as session:
resources_response = await session.list_resources()
result["resources"] = resources_response.resources
result["success"] = True
# Check expected resources
if self.config.expected_resources:
found_resources = [r.uri for r in resources_response.resources]
missing = set(self.config.expected_resources) - set(found_resources)
if missing:
result["errors"].append(f"Missing expected resources: {missing}")
self.logger.info(f"β
Found {len(resources_response.resources)} resources")
except Exception as e:
result["errors"].append(str(e))
self.logger.error(f"β Resource listing failed: {e}")
return result
async def test_tool_listing(self) -> Dict[str, Any]:
"""Test tool listing functionality"""
result = {
"success": False,
"tools": [],
"errors": []
}
try:
async with self.client_session() as session:
tools_response = await session.list_tools()
result["tools"] = tools_response.tools
result["success"] = True
# Check expected tools
if self.config.expected_tools:
found_tools = [t.name for t in tools_response.tools]
missing = set(self.config.expected_tools) - set(found_tools)
if missing:
result["errors"].append(f"Missing expected tools: {missing}")
self.logger.info(f"β
Found {len(tools_response.tools)} tools")
except Exception as e:
result["errors"].append(str(e))
self.logger.error(f"β Tool listing failed: {e}")
return result
async def test_resource_reading(self, resource_uri: str) -> Dict[str, Any]:
"""Test reading a specific resource"""
result = {
"success": False,
"content": None,
"errors": []
}
try:
async with self.client_session() as session:
read_response = await session.read_resource(resource_uri)
result["content"] = read_response.contents
result["success"] = True
self.logger.info(f"β
Successfully read resource: {resource_uri}")
except Exception as e:
result["errors"].append(str(e))
self.logger.error(f"β Resource reading failed for {resource_uri}: {e}")
return result
async def test_tool_execution(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Test executing a specific tool"""
result = {
"success": False,
"result": None,
"errors": []
}
try:
async with self.client_session() as session:
tool_result = await session.call_tool(tool_name, arguments)
result["result"] = tool_result
result["success"] = not tool_result.isError
if tool_result.isError:
result["errors"].extend([c.text for c in tool_result.content if c.type == "text"])
status = "β
" if result["success"] else "β οΈ"
self.logger.info(f"{status} Tool execution: {tool_name}")
except Exception as e:
result["errors"].append(str(e))
self.logger.error(f"β Tool execution failed for {tool_name}: {e}")
return result
async def run_comprehensive_tests(self) -> Dict[str, Any]:
"""Run comprehensive test suite"""
self.logger.info("π Starting comprehensive MCP server tests")
results = {
"server_startup": await self.test_server_startup(),
"resource_listing": await self.test_resource_listing(),
"tool_listing": await self.test_tool_listing(),
"resource_reading": {},
"tool_execution": {},
"summary": {
"total_tests": 0,
"passed_tests": 0,
"failed_tests": 0
}
}
# Test resource reading for each found resource
if results["resource_listing"]["success"]:
for resource in results["resource_listing"]["resources"][:5]: # Test first 5
resource_result = await self.test_resource_reading(resource.uri)
results["resource_reading"][resource.uri] = resource_result
# Test tool execution for each found tool (with basic arguments)
if results["tool_listing"]["success"]:
for tool in results["tool_listing"]["tools"][:5]: # Test first 5
# Generate test arguments based on tool schema
test_args = self._generate_test_arguments(tool)
if test_args:
tool_result = await self.test_tool_execution(tool.name, test_args)
results["tool_execution"][tool.name] = tool_result
# Calculate summary
results["summary"]["total_tests"] = (
1 + # server startup
1 + # resource listing
1 + # tool listing
len(results["resource_reading"]) +
len(results["tool_execution"])
)
results["summary"]["passed_tests"] = sum(
1 for result in [
results["server_startup"],
results["resource_listing"]["success"],
results["tool_listing"]["success"]
] + [
r["success"] for r in results["resource_reading"].values()
] + [
r["success"] for r in results["tool_execution"].values()
] if result
)
results["summary"]["failed_tests"] = (
results["summary"]["total_tests"] - results["summary"]["passed_tests"]
)
self.logger.info(
f"π Test Summary: {results['summary']['passed_tests']}/{results['summary']['total_tests']} passed"
)
return results
def _generate_test_arguments(self, tool: Tool) -> Optional[Dict[str, Any]]:
"""Generate test arguments for a tool based on its schema"""
if not tool.inputSchema or "properties" not in tool.inputSchema:
return None
args = {}
properties = tool.inputSchema["properties"]
required = tool.inputSchema.get("required", [])
for prop_name, prop_schema in properties.items():
if prop_name not in required:
continue # Skip optional properties for basic testing
prop_type = prop_schema.get("type")
if prop_type == "string":
if "enum" in prop_schema:
args[prop_name] = prop_schema["enum"][0]
else:
args[prop_name] = f"test_{prop_name}"
elif prop_type == "number":
args[prop_name] = 1.0
elif prop_type == "integer":
args[prop_name] = 1
elif prop_type == "boolean":
args[prop_name] = True
elif prop_type == "array":
args[prop_name] = []
elif prop_type == "object":
args[prop_name] = {}
return args if args else None
# Test utilities
def create_filesystem_test_config() -> MCPTestConfig:
"""Create test configuration for filesystem server"""
return MCPTestConfig(
server_command=["python", "-m", "servers.filesystem.server"],
expected_resources=["file://"], # Expect file resources
expected_tools=["list_directory", "search_files", "write_file"]
)
def create_task_management_test_config() -> MCPTestConfig:
"""Create test configuration for task management server"""
return MCPTestConfig(
server_command=["python", "-m", "servers.project_management.task_server"],
expected_resources=["project://", "task://", "summary://"],
expected_tools=["create_project", "create_task", "update_task_status"]
)
# Example usage
async def run_tests():
"""Run tests for multiple servers"""
test_configs = [
("filesystem", create_filesystem_test_config()),
("task_management", create_task_management_test_config())
]
all_results = {}
for server_name, config in test_configs:
print(f"\nπ§ͺ Testing {server_name} server...")
framework = MCPTestFramework(config)
results = await framework.run_comprehensive_tests()
all_results[server_name] = results
# Print summary
summary = results["summary"]
print(f"π {server_name}: {summary['passed_tests']}/{summary['total_tests']} tests passed")
return all_results
if __name__ == "__main__":
asyncio.run(run_tests())
Building production-ready MCP applications requires careful attention to performance optimization and monitoring. Let's explore advanced techniques for optimizing MCP implementations.
# monitoring/mcp_performance_monitor.py
import asyncio
import time
import json
import psutil
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from collections import defaultdict, deque
@dataclass
class PerformanceMetrics:
"""Performance metrics for MCP operations"""
operation_type: str
operation_name: str
start_time: float
end_time: float
duration: float
success: bool
error_message: Optional[str]
memory_usage: float
cpu_usage: float
@dataclass
class ServerMetrics:
"""Server-level performance metrics"""
server_name: str
uptime: float
total_operations: int
successful_operations: int
failed_operations: int
average_response_time: float
peak_memory_usage: float
average_cpu_usage: float
last_activity: datetime
class MCPPerformanceMonitor:
"""Advanced performance monitoring for MCP applications"""
def __init__(self, max_history: int = 10000):
self.max_history = max_history
self.operation_history: deque = deque(maxlen=max_history)
self.server_metrics: Dict[str, ServerMetrics] = {}
self.operation_counts: Dict[str, int] = defaultdict(int)
self.error_counts: Dict[str, int] = defaultdict(int)
self.response_times: Dict[str, deque] = defaultdict(lambda: deque(maxlen=1000))
self.start_time = time.time()
def start_operation(self, operation_type: str, operation_name: str, server_name: str) -> str:
"""Start monitoring an operation"""
operation_id = f"{operation_type}_{operation_name}_{int(time.time() * 1000000)}"
# Record initial system metrics
process = psutil.Process()
memory_info = process.memory_info()
cpu_percent = process.cpu_percent()
return {
"operation_id": operation_id,
"start_time": time.time(),
"server_name": server_name,
"initial_memory": memory_info.rss / 1024 / 1024, # MB
"initial_cpu": cpu_percent
}
def end_operation(self, operation_context: Dict[str, Any], success: bool,
error_message: Optional[str] = None) -> PerformanceMetrics:
"""End monitoring an operation and record metrics"""
end_time = time.time()
start_time = operation_context["start_time"]
duration = end_time - start_time
# Record final system metrics
process = psutil.Process()
memory_info = process.memory_info()
cpu_percent = process.cpu_percent()
metrics = PerformanceMetrics(
operation_type=operation_context.get("operation_type", "unknown"),
operation_name=operation_context.get("operation_name", "unknown"),
start_time=start_time,
end_time=end_time,
duration=duration,
success=success,
error_message=error_message,
memory_usage=memory_info.rss / 1024 / 1024, # MB
cpu_usage=cpu_percent
)
# Store metrics
self.operation_history.append(metrics)
# Update counts
operation_key = f"{metrics.operation_type}:{metrics.operation_name}"
self.operation_counts[operation_key] += 1
if not success:
self.error_counts[operation_key] += 1
# Update response times
self.response_times[operation_key].append(duration)
# Update server metrics
server_name = operation_context.get("server_name", "unknown")
self._update_server_metrics(server_name, metrics)
return metrics
def _update_server_metrics(self, server_name: str, metrics: PerformanceMetrics):
"""Update server-level metrics"""
if server_name not in self.server_metrics:
self.server_metrics[server_name] = ServerMetrics(
server_name=server_name,
uptime=0,
total_operations=0,
successful_operations=0,
failed_operations=0,
average_response_time=0,
peak_memory_usage=0,
average_cpu_usage=0,
last_activity=datetime.now()
)
server_metrics = self.server_metrics[server_name]
server_metrics.total_operations += 1
server_metrics.last_activity = datetime.now()
if metrics.success:
server_metrics.successful_operations += 1
else:
server_metrics.failed_operations += 1
# Update average response time
total_time = server_metrics.average_response_time * (server_metrics.total_operations - 1) + metrics.duration
server_metrics.average_response_time = total_time / server_metrics.total_operations
# Update peak memory usage
if metrics.memory_usage > server_metrics.peak_memory_usage:
server_metrics.peak_memory_usage = metrics.memory_usage
# Update uptime
server_metrics.uptime = time.time() - self.start_time
def get_performance_summary(self, time_window: Optional[timedelta] = None) -> Dict[str, Any]:
"""Get performance summary for a time window"""
cutoff_time = time.time() - time_window.total_seconds() if time_window else 0
recent_operations = [
op for op in self.operation_history
if op.start_time >= cutoff_time
]
if not recent_operations:
return {"message": "No operations in specified time window"}
# Calculate summary statistics
total_operations = len(recent_operations)
successful_operations = len([op for op in recent_operations if op.success])
failed_operations = total_operations - successful_operations
response_times = [op.duration for op in recent_operations]
avg_response_time = sum(response_times) / len(response_times)
min_response_time = min(response_times)
max_response_time = max(response_times)
# Calculate percentiles
sorted_times = sorted(response_times)
p50 = sorted_times[len(sorted_times) // 2]
p95 = sorted_times[int(len(sorted_times) * 0.95)]
p99 = sorted_times[int(len(sorted_times) * 0.99)]
# Error analysis
errors = [op for op in recent_operations if not op.success]
error_rate = (len(errors) / total_operations) * 100 if total_operations > 0 else 0
common_errors = defaultdict(int)
for error in errors:
error_type = f"{error.operation_type}:{error.operation_name}"
common_errors[error_type] += 1
return {
"time_window": str(time_window) if time_window else "all time",
"summary": {
"total_operations": total_operations,
"successful_operations": successful_operations,
"failed_operations": failed_operations,
"success_rate": (successful_operations / total_operations) * 100,
"error_rate": error_rate
},
"response_times": {
"average": avg_response_time,
"minimum": min_response_time,
"maximum": max_response_time,
"p50": p50,
"p95": p95,
"p99": p99
},
"common_errors": dict(common_errors),
"server_metrics": {
name: asdict(metrics) for name, metrics in self.server_metrics.items()
}
}
def get_operation_breakdown(self) -> Dict[str, Any]:
"""Get detailed breakdown of operations by type"""
breakdown = {}
for operation_key, count in self.operation_counts.items():
op_type, op_name = operation_key.split(":", 1)
if op_type not in breakdown:
breakdown[op_type] = {}
# Calculate statistics for this operation
response_times = list(self.response_times[operation_key])
error_count = self.error_counts[operation_key]
breakdown[op_type][op_name] = {
"total_operations": count,
"successful_operations": count - error_count,
"failed_operations": error_count,
"success_rate": ((count - error_count) / count) * 100 if count > 0 else 0,
"average_response_time": sum(response_times) / len(response_times) if response_times else 0,
"min_response_time": min(response_times) if response_times else 0,
"max_response_time": max(response_times) if response_times else 0
}
return breakdown
def export_metrics(self, format_type: str = "json") -> str:
"""Export metrics in specified format"""
data = {
"export_timestamp": datetime.now().isoformat(),
"monitoring_start_time": datetime.fromtimestamp(self.start_time).isoformat(),
"performance_summary": self.get_performance_summary(),
"operation_breakdown": self.get_operation_breakdown(),
"server_metrics": {
name: asdict(metrics) for name, metrics in self.server_metrics.items()
}
}
if format_type == "json":
return json.dumps(data, indent=2, default=str)
elif format_type == "csv":
return self._export_to_csv(data)
else:
raise ValueError(f"Unsupported export format: {format_type}")
def _export_to_csv(self, data: Dict[str, Any]) -> str:
"""Export metrics to CSV format"""
import csv
import io
output = io.StringIO()
# Server metrics CSV
if data["server_metrics"]:
writer = csv.writer(output)
writer.writerow(["Server", "Uptime", "Total Operations", "Success Rate", "Avg Response Time", "Peak Memory"])
for server_name, metrics in data["server_metrics"].items():
writer.writerow([
server_name,
metrics["uptime"],
metrics["total_operations"],
(metrics["successful_operations"] / metrics["total_operations"]) * 100 if metrics["total_operations"] > 0 else 0,
metrics["average_response_time"],
metrics["peak_memory_usage"]
])
return output.getvalue()
def reset_metrics(self):
"""Reset all metrics"""
self.operation_history.clear()
self.server_metrics.clear()
self.operation_counts.clear()
self.error_counts.clear()
self.response_times.clear()
self.start_time = time.time()
# Performance monitoring decorator
def monitor_performance(monitor: MCPPerformanceMonitor, operation_type: str,
operation_name: str, server_name: str = "default"):
"""Decorator to monitor function performance"""
def decorator(func):
async def async_wrapper(*args, **kwargs):
context = monitor.start_operation(operation_type, operation_name, server_name)
try:
result = await func(*args, **kwargs)
monitor.end_operation(context, True)
return result
except Exception as e:
monitor.end_operation(context, False, str(e))
raise
def sync_wrapper(*args, **kwargs):
context = monitor.start_operation(operation_type, operation_name, server_name)
try:
result = func(*args, **kwargs)
monitor.end_operation(context, True)
return result
except Exception as e:
monitor.end_operation(context, False, str(e))
raise
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
return decorator
# Example usage
async def example_usage():
"""Example of using the performance monitor"""
monitor = MCPPerformanceMonitor()
# Simulate some operations
@monitor_performance(monitor, "tool_call", "list_directory", "filesystem")
async def simulated_operation():
await asyncio.sleep(0.1) # Simulate work
return "success"
# Run some operations
for i in range(10):
try:
await simulated_operation()
except:
pass
# Get performance summary
summary = monitor.get_performance_summary(timedelta(minutes=5))
print("Performance Summary:")
print(json.dumps(summary, indent=2, default=str))
if __name__ == "__main__":
asyncio.run(example_usage())
Congratulations! You've completed a comprehensive journey through MCP Labs, building everything from basic file system servers to advanced collaborative systems. Let's recap what you've learned and the key takeaways for your MCP development journey.
ποΈ Built Complete MCP Servers:
π§ Developed Advanced Clients:
π§ͺ Mastered Testing and Debugging:
π Implemented Production Features:
1. MCP Architecture Patterns
2. Performance Optimization
3. Security Best Practices
4. Testing Strategies
Enterprise Integration:
AI Agent Collaboration:
Developer Tools:
β Development Practices:
β Security Practices:
β Performance Practices:
π Immediate Next Steps:
π Advanced Learning Paths:
π Career Opportunities:
MCP Labs represents more than just a technology - it's a paradigm shift in how we think about AI integration and system interoperability. By mastering MCP, you're positioning yourself at the forefront of the next generation of AI-powered applications.
The skills you've developed here - building protocols, designing APIs, implementing security, optimizing performance, and testing complex systems - are valuable far beyond the MCP ecosystem. You now have the foundation to build the next generation of intelligent, interconnected systems.
Remember that MCP is still evolving. Stay curious, keep experimenting, and don't be afraid to push the boundaries of what's possible. The MCP community needs innovators like you to shape the future of AI integration.
The journey doesn't end here - it's just beginning. Welcome to the world of MCP development! π
model-context-protocolYou've completed Building with MCP Labs! You now have the skills and knowledge to build sophisticated, production-ready MCP applications. The future of AI integration is in your hands!