Modern AI systems rarely live alone. They collaborate. Google’s Agent-to-Agent (A2A) protocol gives agents a common language (JSON-RPC over HTTP/S, with streaming via SSE) plus a discovery mechanism (the Agent Card) so they can find and work with each other securely.
Databricks Apps let you ship secure, serverless AI apps inside your Databricks workspace—no extra infra, integrated with Unity Catalog, SQL, Model Serving, OAuth and more. That’s perfect territory for hosting interoperable A2A agents where data governance and latency matter.
In this post, you’ll deploy a production-style A2A server as a Databricks App. The example agent (“CurrencyAgent”) uses LangGraph tools + a Databricks LLM endpoint to answer FX questions—then exposes itself as an A2A service with streaming updates and optional push notifications.
A2A standardizes inter-agent comms (HTTP + JSON-RPC + SSE) and discovery (Agent Card). Databricks Apps provide governed hosting, OAuth integration and serverless scale so you can safely place agents next to your data and models.
app/
agent.py # LangGraph agent using Databricks-hosted LLM
agent_executor.py # A2A AgentExecutor wiring (streaming + artifacts)
server.py # Starlette app exposing A2A endpoints + Agent Card
test.py # Notebook/REPL client using A2A SDK
app.yaml # Databricks App config (command + env)
requirements.txt # a2a-sdk[http-server], langgraph, httpx, uvicorn, etc.
The A2A SDK (a2a-sdk) provides server/client primitives, Agent Card types, event queues, push notifications, and JSON-RPC methods.
Goal: Build a small ReAct agent that calls a currency API tool and returns a stable, typed (Pydantic) response you can safely consume downstream.
Key points:
Use ChatDatabricks (LangChain on Databricks) and point it at your Model Serving endpoint by name.
Bind your tools and enable structured output with response_format=(..., ResponseFormat).
Read the validated object from state.values["structured_response"].
Auth: set either PAT (DATABRICKS_TOKEN) or OAuth (DATABRICKS_CLIENT_ID + DATABRICKS_CLIENT_SECRET) — not both.
(Optional helper below auto-picks one if both are present.)
Frankfurter is a free, open-source currency API tracking ECB reference rates.
# app/agent.py (excerpt)
import os, httpx, json, logging
from typing import Literal
from pydantic import BaseModel
from langchain_core.tools import tool
from langgraph.prebuilt import create_react_agent
from databricks_langchain import ChatDatabricks
log = logging.getLogger(__name__)
# --- Tool --------------------------------------------------------------------
@tool
def get_exchange_rate(currency_from: str="USD",
currency_to: str="EUR",
currency_date: str="latest"):
"""Use this to get current exchange rate."""
r = httpx.get(
f"https://api.frankfurter.app/{currency_date}",
params={"from": currency_from, "to": currency_to},
timeout=15.0,
)
r.raise_for_status()
data = r.json()
if "rates" not in data:
return {"error": "Invalid API response format."}
return data
# --- Structured response schema ----------------------------------------------
class ResponseFormat(BaseModel):
status: Literal["input_required", "completed", "error"] = "input_required"
message: str
# --- Optional: guard against PAT+OAuth at once (pick one) --------------------
def _prefer_single_databricks_auth():
pat = bool(os.getenv("DATABRICKS_TOKEN"))
oauth = bool(os.getenv("DATABRICKS_CLIENT_ID") and os.getenv("DATABRICKS_CLIENT_SECRET"))
if pat and oauth:
prefer = (os.getenv("DBX_AUTH_PREFERENCE") or "pat").lower()
if prefer == "oauth":
os.environ.pop("DATABRICKS_TOKEN", None)
log.warning("Both PAT and OAuth found; preferring OAuth (unset DATABRICKS_TOKEN).")
else:
os.environ.pop("DATABRICKS_CLIENT_ID", None)
os.environ.pop("DATABRICKS_CLIENT_SECRET", None)
log.warning("Both PAT and OAuth found; preferring PAT (unset CLIENT_ID/SECRET).")
# --- LLM factory --------------------------------------------------------------
def _build_dbx_llm() -> ChatDatabricks:
_prefer_single_databricks_auth() # harmless if only one method is set
endpoint = os.getenv("DBX_LLM_ENDPOINT") # e.g., "databricks-dbrx-instruct" or your custom endpoint
if not endpoint:
raise RuntimeError("Missing required env var: DBX_LLM_ENDPOINT")
# ChatDatabricks uses your Databricks workspace auth (PAT or OAuth).
# When running outside a workspace, also set DATABRICKS_HOST.
return ChatDatabricks(
endpoint=endpoint,
temperature=0.0,
max_tokens=256,
# Optional: smoothes tool-calling streams for some models
disable_streaming="tool_calling",
)
# --- Agent -------------------------------------------------------------------
class CurrencyAgent:
SYSTEM_INSTRUCTION = (
"You are a specialized assistant for currency conversions. "
"Use the 'get_exchange_rate' tool to answer currency exchange questions. "
"If asked about anything else, politely refuse."
)
def __init__(self):
llm = _build_dbx_llm()
tools = [get_exchange_rate]
self.graph = create_react_agent(
llm.bind_tools(tools),
tools=tools,
prompt=self.SYSTEM_INSTRUCTION,
response_format=(
"Set status to input_required, completed, or error as appropriate.",
ResponseFormat,
),
)
Frankfurter is a free, open-source currency API tracking ECB reference rates.
LangGraph supports structured responses via response_format, which you read from state.values["structured_response"].
Databricks Model Serving exposes a unified API, which lets you use standard Chat clients in this case ChatDatabricks.
app/agent_executor.py bridges A2A’s JSON-RPC calls to your agent:
# app/agent_executor.py (excerpt)
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import TaskState, Part, TextPart
from a2a.utils import new_agent_text_message, new_task
from app.agent import CurrencyAgent
class CurrencyAgentExecutor(AgentExecutor):
def __init__(self): self.agent = CurrencyAgent()
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
query = context.get_user_input()
task = context.current_task or new_task(context.message)
await event_queue.enqueue_event(task)
updater = TaskUpdater(event_queue, task.id, task.context_id)
async for item in self.agent.stream(query, task.context_id):
if item["require_user_input"]:
await updater.update_status(
TaskState.input_required,
new_agent_text_message(item["content"], task.context_id, task.id),
final=True,
)
break
elif not item["is_task_complete"]:
await updater.update_status(
TaskState.working,
new_agent_text_message(item["content"], task.context_id, task.id),
)
else:
await updater.add_artifact([Part(root=TextPart(text=item["content"]))], name="conversion_result")
await updater.complete()
break
In A2A, streaming is done with SSE; clients receive a sequence of JSON-RPC responses until the task completes.
Publish capabilities and skills so other agents can discover and call yours:
# app/server.py (excerpt)
import httpx, logging
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore, InMemoryPushNotificationConfigStore, BasePushNotificationSender
from a2a.types import AgentCapabilities, AgentCard, AgentSkill
from app.agent import CurrencyAgent
from app.agent_executor import CurrencyAgentExecutor
def build_asgi_app():
capabilities = AgentCapabilities(streaming=True, push_notifications=True)
skill = AgentSkill(
id="convert_currency",
name="Currency Exchange Rates Tool",
description="Helps with exchange values between various currencies",
tags=["currency conversion", "currency exchange"],
examples=["What is exchange rate between USD and GBP?"],
)
agent_card = AgentCard(
name="Currency Agent",
description="Helps with exchange rates for currencies",
url="/", # important in Databricks Apps: relative path behind reverse proxy
version="1.0.0",
default_input_modes=CurrencyAgent.SUPPORTED_CONTENT_TYPES,
default_output_modes=CurrencyAgent.SUPPORTED_CONTENT_TYPES,
capabilities=capabilities,
skills=[skill],
)
httpx_client = httpx.AsyncClient()
push_store = InMemoryPushNotificationConfigStore()
push_sender = BasePushNotificationSender(httpx_client=httpx_client, config_store=push_store)
request_handler = DefaultRequestHandler(
agent_executor=CurrencyAgentExecutor(),
task_store=InMemoryTaskStore(),
push_config_store=push_store,
push_sender=push_sender,
)
server = A2AStarletteApplication(agent_card=agent_card, http_handler=request_handler)
app = server.build()
async def _close_httpx(): await httpx_client.aclose()
app.add_event_handler("shutdown", _close_httpx)
return app
app = build_asgi_app()
The Agent Card advertises identity, capabilities (e.g., streaming, pushNotifications), skills, and how to authenticate. Clients usually fetch it from a well-known path under your base URL.
Create app.yaml to define how your server starts and how env vars flow in. Never hardcode secrets—use App resources + valueFrom or Databricks Secrets so Databricks injects them as environment variables at runtime.
# app.yaml
command: ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
env:
# Workspace URL & endpoint name can be plain values (non-secret).
- name: DATABRICKS_WORKSPACE_URL
value: "https://<your-workspace>.cloud.databricks.com"
- name: DBX_LLM_ENDPOINT
value: "<your-serving-endpoint>"
# Token is secret: reference an App resource key you configured in the UI.
# For production, prefer OAuth with the app’s service principal.
- name: DATABRICKS_TOKEN
valueFrom: dbx_llm_token # <-- this key is defined in App resources (Secret)
Notes
Your requirements.txt should include:
uvicorn
fastapi
httpx
langgraph
databricks_langchain
a2a-sdk[http-server]>=0.3.0
The official A2A SDK is published on PyPI as a2a-sdk (extras: [http-server], [grpc], [telemetry]).
Databricks Apps run behind a reverse proxy with platform-managed networking—avoid assumptions about request origin and keep your base URLs relative.
# test.py (excerpt): call your app via A2A client
import json, logging, httpx
from uuid import uuid4
from a2a.client import A2ACardResolver, A2AClient
from a2a.types import MessageSendParams, SendMessageRequest
BASE_URL = "https://<your-app-url>"
async def chat_once(text: str):
async with httpx.AsyncClient(timeout=60.0) as httpx_client:
resolver = A2ACardResolver(httpx_client=httpx_client, base_url=BASE_URL)
card = await resolver.get_agent_card()
client = A2AClient(httpx_client=httpx_client, agent_card=card)
req = SendMessageRequest(id=str(uuid4()),
params=MessageSendParams(message={"role":"user",
"parts":[{"kind":"text","text":text}]}))
resp = await client.send_message(req)
print(json.dumps(resp.model_dump(mode="json"), indent=2))
# e.g.
# await chat_once("USD to EUR today?")
The SDK exposes both non-streaming and streaming calls, plus push-notification configuration APIs.
# server.py (minimal)
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore, InMemoryPushNotificationConfigStore, BasePushNotificationSender
from a2a.types import AgentCapabilities, AgentCard
import httpx
from app.agent_executor import CurrencyAgentExecutor
from app.agent import CurrencyAgent
def build_asgi_app():
card = AgentCard(
name="Currency Agent",
description="Exchange rates over A2A",
url="/",
version="1.0.0",
capabilities=AgentCapabilities(streaming=True, push_notifications=True),
default_input_modes=CurrencyAgent.SUPPORTED_CONTENT_TYPES,
default_output_modes=CurrencyAgent.SUPPORTED_CONTENT_TYPES,
)
httpx_client = httpx.AsyncClient()
request_handler = DefaultRequestHandler(
agent_executor=CurrencyAgentExecutor(),
task_store=InMemoryTaskStore(),
push_config_store=InMemoryPushNotificationConfigStore(),
push_sender=BasePushNotificationSender(httpx_client=httpx_client),
)
return A2AStarletteApplication(agent_card=card, http_handler=request_handler).build()
app = build_asgi_app()
# app.yaml (minimal, secrets via valueFrom)
command: ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
env:
- name: DATABRICKS_WORKSPACE_URL
value: "https://<your-workspace>.cloud.databricks.com"
- name: DBX_LLM_ENDPOINT
value: "<your-serving-endpoint>"
- name: DATABRICKS_TOKEN
valueFrom: dbx_llm_token
With A2A providing the interoperability fabric and Databricks Apps providing a secure, serverless runtime next to your governed data and models, you can stand up enterprise-ready agents quickly—and connect them to the rest of your agent ecosystem when you’re ready. Swap the currency tool for your business tools, wire in push notifications, and you’ve got a production pattern you can reuse across many agents.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.