Transports in the Model Context Protocol (MCP) provide the foundation for communication between clients and servers. A transport handles the underlying mechanics of how messages are sent and received.
MCP uses JSON-RPC 2.0 as its wire format. The transport layer is responsible for converting MCP protocol messages into JSON-RPC format for transmission and converting received JSON-RPC messages back into MCP protocol messages.
The stdio transport enables communication through standard input and output streams. This is particularly useful for local integrations and command-line tools.
Use stdio when:
Building command-line tools
Implementing local integrations
Needing simple process communication
Working with shell scripts
const server = new Server({ name: "example-server", version: "1.0.0"}, { capabilities: {}});const transport = new StdioServerTransport();await server.connect(transport);
const server = new Server({ name: "example-server", version: "1.0.0"}, { capabilities: {}});const transport = new StdioServerTransport();await server.connect(transport);
const client = new Client({ name: "example-client", version: "1.0.0"}, { capabilities: {}});const transport = new StdioClientTransport({ command: "./server", args: ["--option", "value"]});await client.connect(transport);
app = Server("example-server")async with stdio_server() as streams: await app.run( streams[0], streams[1], app.create_initialization_options() )
params = StdioServerParameters( command="./server", args=["--option", "value"])async with stdio_client(params) as streams: async with ClientSession(streams[0], streams[1]) as session: await session.initialize()
SSE transports can be vulnerable to DNS rebinding attacks if not properly secured. To prevent this:
Always validate Origin headers on incoming SSE connections to ensure they come from expected sources
Avoid binding servers to all network interfaces (0.0.0.0) when running locally - bind only to localhost (127.0.0.1) instead
Implement proper authentication for all SSE connections
Without these protections, attackers could use DNS rebinding to interact with local MCP servers from remote websites.
import express from "express";const app = express();const server = new Server({ name: "example-server", version: "1.0.0"}, { capabilities: {}});let transport: SSEServerTransport | null = null;app.get("/sse", (req, res) => { transport = new SSEServerTransport("/messages", res); server.connect(transport);});app.post("/messages", (req, res) => { if (transport) { transport.handlePostMessage(req, res); }});app.listen(3000);
import express from "express";const app = express();const server = new Server({ name: "example-server", version: "1.0.0"}, { capabilities: {}});let transport: SSEServerTransport | null = null;app.get("/sse", (req, res) => { transport = new SSEServerTransport("/messages", res); server.connect(transport);});app.post("/messages", (req, res) => { if (transport) { transport.handlePostMessage(req, res); }});app.listen(3000);
const client = new Client({ name: "example-client", version: "1.0.0"}, { capabilities: {}});const transport = new SSEClientTransport( new URL("http://localhost:3000/sse"));await client.connect(transport);
MCP makes it easy to implement custom transports for specific needs. Any transport implementation just needs to conform to the Transport interface:
You can implement custom transports for:
Custom network protocols
Specialized communication channels
Integration with existing systems
Performance optimization
interface Transport { // Start processing messages start(): Promise<void>; // Send a JSON-RPC message send(message: JSONRPCMessage): Promise<void>; // Close the connection close(): Promise<void>; // Callbacks onclose?: () => void; onerror?: (error: Error) => void; onmessage?: (message: JSONRPCMessage) => void;}
interface Transport { // Start processing messages start(): Promise<void>; // Send a JSON-RPC message send(message: JSONRPCMessage): Promise<void>; // Close the connection close(): Promise<void>; // Callbacks onclose?: () => void; onerror?: (error: Error) => void; onmessage?: (message: JSONRPCMessage) => void;}
Note that while MCP Servers are often implemented with asyncio, we recommend
implementing low-level interfaces like transports with anyio for wider compatibility.
@contextmanagerasync def create_transport( read_stream: MemoryObjectReceiveStream[JSONRPCMessage | Exception], write_stream: MemoryObjectSendStream[JSONRPCMessage]): """ Transport interface for MCP. Args: read_stream: Stream to read incoming messages from write_stream: Stream to write outgoing messages to """ async with anyio.create_task_group() as tg: try: # Start processing messages tg.start_soon(lambda: process_messages(read_stream)) # Send messages async with write_stream: yield write_stream except Exception as exc: # Handle errors raise exc finally: # Clean up tg.cancel_scope.cancel() await write_stream.aclose() await read_stream.aclose()
Note that while MCP Servers are often implemented with asyncio, we recommend
implementing low-level interfaces like transports with anyio for wider compatibility.
@contextmanagerasync def example_transport(scope: Scope, receive: Receive, send: Send): try: # Create streams for bidirectional communication read_stream_writer, read_stream = anyio.create_memory_object_stream(0) write_stream, write_stream_reader = anyio.create_memory_object_stream(0) async def message_handler(): try: async with read_stream_writer: # Message handling logic pass except Exception as exc: logger.error(f"Failed to handle message: {exc}") raise exc async with anyio.create_task_group() as tg: tg.start_soon(message_handler) try: # Yield streams for communication yield read_stream, write_stream except Exception as exc: logger.error(f"Transport error: {exc}") raise exc finally: tg.cancel_scope.cancel() await write_stream.aclose() await read_stream.aclose() except Exception as exc: logger.error(f"Failed to initialize transport: {exc}") raise exc