Genie Integration Patterns
Integrate Databricks Genie rooms as powerful tools in your agent workflows using SDK or MCP approaches.
Core Concepts
What is Databricks Genie?
Genie is a conversational BI interface that:
- •Translates natural language to SQL
- •Executes queries against your data
- •Returns formatted results and visualizations
- •Maintains conversation context
Key advantage: Existing Genie rooms become instant agent tools without rebuilding data pipelines.
Integration Approaches
SDK Integration:
- •Direct control via Databricks Python SDK
- •Manual conversation management
- •Flexible polling strategies
- •Best for custom workflows
MCP Tools:
- •Pre-configured tool interfaces
- •Simplified API surface
- •Built-in polling logic
- •Best for standard patterns
Problem-Solution Patterns
Problem 1: Genie Query Timeouts
Symptoms:
- •Queries hang indefinitely
- •Agent gets stuck waiting for response
- •No error handling for slow queries
Root causes:
- •Insufficient polling timeout
- •Complex SQL generated by Genie
- •Large dataset queries
- •No backoff strategy
Solution:
@tool
def query_genie_with_timeout(question: str, space_id: str, max_attempts: int = 30) -> str:
"""Query Genie with proper timeout handling"""
import time
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
try:
# Start conversation
response = w.genie.start_conversation(
space_id=space_id,
content=question
)
conversation_id = response.conversation_id
message_id = response.message_id
# Poll with exponential backoff
wait_time = 2
for attempt in range(max_attempts):
message = w.genie.get_message(
space_id=space_id,
conversation_id=conversation_id,
message_id=message_id
)
if message.status == "COMPLETED":
return extract_response(message)
elif message.status in ["FAILED", "CANCELLED"]:
return f"Query failed: {message.status}. Try simplifying your question."
# Exponential backoff: 2s, 2s, 4s, 4s, 8s, 8s...
time.sleep(wait_time)
if attempt % 2 == 1: # Double wait time every 2 attempts
wait_time = min(wait_time * 2, 10) # Cap at 10 seconds
return "Query timeout. The query may be too complex or the dataset too large."
except Exception as e:
return f"Error: {str(e)}"
def extract_response(message) -> str:
"""Extract formatted response from Genie message"""
response_text = ""
if message.attachments:
for attachment in message.attachments:
if hasattr(attachment, 'text') and attachment.text:
response_text += attachment.text.content + "\n"
elif hasattr(attachment, 'query') and attachment.query:
if hasattr(attachment.query, 'description'):
response_text += f"{attachment.query.description}\n"
return response_text or message.content or "No response available"
Problem 2: Verbose Genie Responses Confuse Agent
Symptoms:
- •Agent overwhelmed by SQL query text
- •Long table outputs cause token limits
- •Agent can't synthesize due to noise
Root causes:
- •Not filtering Genie response attachments
- •Including raw SQL in tool output
- •No summarization of large results
Solution:
@tool
def query_genie_concise(question: str, space_id: str) -> str:
"""Query Genie and return concise, agent-friendly response"""
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Get raw response (using pattern from Problem 1)
response = w.genie.start_conversation(space_id=space_id, content=question)
message = poll_for_completion(w, space_id, response.conversation_id, response.message_id)
# Extract ONLY the natural language summary
if message.attachments:
for attachment in message.attachments:
# Prioritize text summaries over raw SQL
if hasattr(attachment, 'text') and attachment.text:
# Return first text attachment (usually the summary)
return attachment.text.content
# If query result, extract key insights only
if hasattr(attachment, 'query') and attachment.query:
if hasattr(attachment.query, 'description'):
desc = attachment.query.description
# Truncate if too long
if len(desc) > 500:
return desc[:500] + "... [truncated for brevity]"
return desc
return message.content or "No clear response from Genie"
Best practice: Return summaries, not raw data. Let Genie do the summarization.
Problem 3: Losing Conversation Context
Symptoms:
- •Each query starts fresh conversation
- •Agent can't do follow-up questions
- •"What about last month?" fails
Root causes:
- •Not tracking conversation IDs
- •Creating new conversation for each query
- •No conversation state management
Solution:
class GenieConversationManager:
"""Manage ongoing conversations with Genie rooms"""
def __init__(self):
self.workspace_client = WorkspaceClient()
self.conversations = {} # space_id -> conversation_id
@tool
def query_genie_contextual(self, question: str, space_id: str) -> str:
"""
Query Genie while maintaining conversation context.
Automatically continues existing conversations or starts new ones.
"""
# Check if we have an active conversation for this space
conversation_id = self.conversations.get(space_id)
if conversation_id:
# Continue existing conversation
response = self.workspace_client.genie.create_message(
space_id=space_id,
conversation_id=conversation_id,
content=question
)
else:
# Start new conversation
response = self.workspace_client.genie.start_conversation(
space_id=space_id,
content=question
)
# Save conversation ID
self.conversations[space_id] = response.conversation_id
# Poll and return (using patterns from above)
message = poll_for_completion(
self.workspace_client,
space_id,
response.conversation_id,
response.message_id
)
return extract_response(message)
def reset_conversation(self, space_id: str):
"""Start fresh conversation for a space"""
if space_id in self.conversations:
del self.conversations[space_id]
Usage in agent:
# Initialize once for agent lifecycle
genie_manager = GenieConversationManager()
@tool
def query_customer_behavior(question: str) -> str:
"""Query customer behavior Genie room"""
return genie_manager.query_genie_contextual(
question=question,
space_id="01f09cdbacf01b5fa7ff7c237365502c"
)
Problem 4: Genie Space ID Management
Symptoms:
- •Hard-coded space IDs scattered in code
- •Errors when space IDs change
- •Difficulty managing multiple environments
Root causes:
- •No centralized configuration
- •Space IDs embedded in tool definitions
- •No environment-aware setup
Solution:
# config.py
from dataclasses import dataclass
from typing import Dict
import os
@dataclass
class GenieSpaceConfig:
space_id: str
name: str
description: str
class GenieConfig:
"""Centralized Genie space configuration"""
def __init__(self, environment: str = None):
self.environment = environment or os.getenv("DATABRICKS_ENV", "production")
self.spaces = self._load_spaces()
def _load_spaces(self) -> Dict[str, GenieSpaceConfig]:
"""Load space configurations per environment"""
# Production spaces
if self.environment == "production":
return {
"customer_behavior": GenieSpaceConfig(
space_id="01f09cdbacf01b5fa7ff7c237365502c",
name="Customer Behavior Analysis",
description="Customer trends and preferences"
),
"inventory": GenieSpaceConfig(
space_id="02a10defbcg02c6ga8gg8d348476613d",
name="Real-Time Inventory",
description="Stock levels and turnover"
)
}
# Development spaces
elif self.environment == "development":
return {
"customer_behavior": GenieSpaceConfig(
space_id="dev_customer_space_id",
name="Customer Behavior (Dev)",
description="Dev customer data"
),
"inventory": GenieSpaceConfig(
space_id="dev_inventory_space_id",
name="Inventory (Dev)",
description="Dev inventory data"
)
}
raise ValueError(f"Unknown environment: {self.environment}")
def get_space(self, name: str) -> GenieSpaceConfig:
"""Get space configuration by name"""
if name not in self.spaces:
raise ValueError(f"Unknown Genie space: {name}")
return self.spaces[name]
# Usage in tools
config = GenieConfig()
@tool
def query_customer_behavior(question: str) -> str:
"""Query customer behavior"""
space = config.get_space("customer_behavior")
return query_genie(question, space.space_id)
SDK Integration Pattern
Complete SDK-Based Tool
from databricks.sdk import WorkspaceClient
from langchain.tools import tool
import time
@tool
def query_genie_sdk(
question: str,
space_id: str,
conversation_id: str = None,
max_attempts: int = 30
) -> str:
"""
Query Databricks Genie room using SDK.
Args:
question: Natural language question
space_id: Genie space ID
conversation_id: Optional conversation ID to continue conversation
max_attempts: Max polling attempts
Returns:
Genie's response as string
"""
w = WorkspaceClient()
try:
# Start or continue conversation
if conversation_id:
response = w.genie.create_message(
space_id=space_id,
conversation_id=conversation_id,
content=question
)
else:
response = w.genie.start_conversation(
space_id=space_id,
content=question
)
conv_id = response.conversation_id
msg_id = response.message_id
# Poll for completion with exponential backoff
wait_time = 2
for attempt in range(max_attempts):
message = w.genie.get_message(
space_id=space_id,
conversation_id=conv_id,
message_id=msg_id
)
# Check status
if message.status == "COMPLETED":
# Extract response
result = ""
if message.attachments:
for attachment in message.attachments:
if hasattr(attachment, 'text') and attachment.text:
result += attachment.text.content + "\n"
elif hasattr(attachment, 'query') and attachment.query:
if hasattr(attachment.query, 'description'):
result += attachment.query.description + "\n"
return result.strip() or message.content or "No response"
elif message.status in ["FAILED", "CANCELLED"]:
return f"Query failed: {message.status}"
# Wait with backoff
time.sleep(wait_time)
if attempt % 2 == 1:
wait_time = min(wait_time * 2, 10)
return "Query timeout after 60 seconds"
except Exception as e:
return f"Error querying Genie: {str(e)}"
MCP Tool Integration Pattern
Understanding MCP Tools
MCP (Model Context Protocol) tools provide a higher-level interface:
# MCP tools are pre-configured for specific Genie spaces
# They handle polling and conversation management automatically
# Available in environment as:
# - query_space_<SPACE_ID>
# - poll_response_<SPACE_ID>
# Example: Using existing MCP tool
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Call Genie via MCP
result = w.genie.query_space(
space_id="01f09cdbacf01b5fa7ff7c237365502c",
query="What products are trending?",
conversation_id=None # Optional: continue conversation
)
When to Use MCP vs SDK
Use MCP tools when:
- •Standard query/response pattern
- •Don't need custom polling logic
- •Want simpler code
- •MCP tools available for your spaces
Use SDK when:
- •Need custom timeout handling
- •Complex conversation management
- •Custom response parsing
- •MCP tools not configured
Performance Optimization
Pattern 1: Caching Genie Responses
from functools import lru_cache
from datetime import datetime, timedelta
class CachedGenieQuery:
"""Cache Genie responses with TTL"""
def __init__(self, ttl_minutes: int = 15):
self.cache = {}
self.ttl = timedelta(minutes=ttl_minutes)
def query(self, question: str, space_id: str) -> str:
"""Query with caching"""
cache_key = f"{space_id}:{question}"
# Check cache
if cache_key in self.cache:
result, timestamp = self.cache[cache_key]
if datetime.now() - timestamp < self.ttl:
return result
# Query Genie
result = query_genie_sdk(question, space_id)
# Cache result
self.cache[cache_key] = (result, datetime.now())
return result
Pattern 2: Parallel Genie Queries
import concurrent.futures
def query_multiple_genie_spaces(questions: list[tuple[str, str]]) -> list[str]:
"""
Query multiple Genie spaces in parallel.
Args:
questions: List of (question, space_id) tuples
Returns:
List of responses in same order
"""
def query_one(question_space):
question, space_id = question_space
return query_genie_sdk(question, space_id)
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(query_one, questions))
return results
# Usage in agent
questions = [
("What products are trending?", "customer_space_id"),
("Which locations have high turnover?", "inventory_space_id")
]
results = query_multiple_genie_spaces(questions)
Error Handling Best Practices
Comprehensive Error Strategy
@tool
def query_genie_robust(question: str, space_id: str) -> str:
"""Query Genie with comprehensive error handling"""
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import DatabricksError
w = WorkspaceClient()
try:
response = w.genie.start_conversation(
space_id=space_id,
content=question
)
message = poll_for_completion(
w, space_id, response.conversation_id, response.message_id
)
if message.status == "COMPLETED":
return extract_response(message)
elif message.status == "FAILED":
# Provide actionable error message
return ("Query failed. This could be due to:\n"
"- Invalid SQL generated\n"
"- Data source unavailable\n"
"- Permissions issue\n"
"Try rephrasing your question or check Genie room configuration.")
else:
return f"Unexpected status: {message.status}"
except DatabricksError as e:
if "not found" in str(e).lower():
return "Genie space not found. Check space ID configuration."
elif "permission" in str(e).lower():
return "Permission denied. Ensure agent has access to Genie space."
else:
return f"Databricks API error: {str(e)}"
except TimeoutError:
return "Query timeout. Try a simpler question or check data volume."
except Exception as e:
return f"Unexpected error: {str(e)}"
Testing Genie Integration
Unit Test Pattern
def test_genie_tool():
"""Test Genie tool in isolation"""
# Test 1: Simple query
result = query_genie_sdk(
question="How many customers do we have?",
space_id="01f09cdbacf01b5fa7ff7c237365502c"
)
assert result, "Should return non-empty result"
assert "error" not in result.lower(), "Should not contain error"
# Test 2: Invalid space ID
result = query_genie_sdk(
question="test",
space_id="invalid_id"
)
assert "not found" in result.lower() or "error" in result.lower()
# Test 3: Conversation continuity
# (Test that conversation_id parameter works)
Integration Test Pattern
def test_genie_in_agent():
"""Test Genie tool within agent workflow"""
from your_agent import GenieAgent
agent = GenieAgent()
# Test single-tool query
result = agent.query("What products are trending?")
assert "customer_behavior" in result['intermediate_steps'][0][0].tool
# Test multi-tool query
result = agent.query("Trending products at risk of overstock?")
tools_called = [step[0].tool for step in result['intermediate_steps']]
assert "customer_behavior" in tools_called
assert "inventory" in tools_called
Quick Reference
Minimum Viable Genie Tool (SDK)
from databricks.sdk import WorkspaceClient
from langchain.tools import tool
import time
@tool
def query_genie(question: str, space_id: str) -> str:
"""Query Genie room"""
w = WorkspaceClient()
resp = w.genie.start_conversation(space_id=space_id, content=question)
for _ in range(30):
msg = w.genie.get_message(space_id, resp.conversation_id, resp.message_id)
if msg.status == "COMPLETED":
return msg.attachments[0].text.content if msg.attachments else msg.content
elif msg.status in ["FAILED", "CANCELLED"]:
return f"Failed: {msg.status}"
time.sleep(2)
return "Timeout"
Related Skills
- •mosaic-ai-agent: Design agents that use Genie tools
- •agent-mlops: Deploy Genie-powered agents to production