Agent MLOps: Production Deployment & Monitoring
Deploy, evaluate, and monitor AI agents in production using Databricks MLflow and Model Serving.
Core Concepts
Agent MLOps Lifecycle
Development → Logging → Evaluation → Deployment → Monitoring → Iteration
↑ ↓
└────────────────────────────────────────────────────────────┘
Key difference from traditional MLOps: Agents make dynamic decisions, requiring evaluation of decision-making quality, not just prediction accuracy.
Problem-Solution Patterns
Problem 1: Can't Debug Agent Decisions
Symptoms:
- •Agent makes unexpected tool choices
- •No visibility into reasoning process
- •Can't reproduce issues
- •Debugging requires re-running entire workflow
Root causes:
- •No tracing enabled
- •Tool calls not logged
- •Intermediate steps discarded
Solution: Enable MLflow Tracing
import mlflow
# Enable automatic tracing for LangChain
mlflow.langchain.autolog()
# All agent executions now automatically traced
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
return_intermediate_steps=True # Critical for tracing
)
# Execute with tracing
with mlflow.start_run(run_name="agent_execution"):
result = agent_executor.invoke({"input": "user query"})
# Tracing captures:
# - LLM calls and prompts
# - Tool selection decisions
# - Tool inputs/outputs
# - Execution time per step
# - Token usage
View traces in MLflow UI:
- •Open MLflow experiment
- •Click run
- •Navigate to "Traces" tab
- •See full execution tree
Problem 2: No Systematic Evaluation
Symptoms:
- •Testing is manual and ad-hoc
- •Can't measure improvement over versions
- •Regression issues slip through
- •No confidence in deployment
Root causes:
- •No test dataset
- •No evaluation metrics
- •Manual testing only
Solution: Implement Agent Evaluation
import mlflow
import pandas as pd
from mlflow.metrics.genai import answer_similarity
# Step 1: Create evaluation dataset
eval_data = pd.DataFrame({
"question": [
"What products are trending?",
"Which locations have lowest turnover?",
"Trending products at risk of overstock?",
],
"expected_tools": [
"customer_behavior",
"inventory",
"customer_behavior,inventory"
],
"ground_truth_answer": [
"Products X, Y, Z are currently trending based on purchase data",
"Location A has lowest turnover at 2.1x annually",
"Products X and Y are trending but have 60-day supply"
]
})
# Step 2: Define evaluation metrics
def tool_selection_accuracy(predictions, targets):
"""Custom metric: Did agent call expected tools?"""
correct = 0
for pred_tools, expected_tools in zip(predictions, targets):
expected_set = set(expected_tools.split(','))
pred_set = set([step[0].tool for step in pred_tools['intermediate_steps']])
if expected_set.issubset(pred_set):
correct += 1
return correct / len(predictions)
# Step 3: Run evaluation
with mlflow.start_run(run_name="agent_evaluation"):
results = mlflow.evaluate(
model=agent_uri, # URI of logged model
data=eval_data,
targets="ground_truth_answer",
model_type="question-answering",
evaluators="default",
extra_metrics=[
answer_similarity,
tool_selection_accuracy
]
)
# View results
print(f"Answer Similarity: {results.metrics['answer_similarity/v1/mean']}")
print(f"Tool Selection Accuracy: {results.metrics['tool_selection_accuracy']}")
Problem 3: Model Serving Deployment Failures
Symptoms:
- •Endpoint won't start
- •Timeout errors
- •Import errors in production
- •Works locally, fails in serving
Root causes:
- •Missing dependencies
- •Environment mismatch
- •Incorrect model signature
- •No validation before deployment
Solution: Proper Model Packaging
import mlflow
from mlflow.models import ModelSignature
from mlflow.types.schema import ColSpec, Schema
import pandas as pd
# Step 1: Define clear signature
input_schema = Schema([
ColSpec("string", "question")
])
output_schema = Schema([
ColSpec("string", "response")
])
signature = ModelSignature(inputs=input_schema, outputs=output_schema)
# Step 2: Create example for validation
input_example = pd.DataFrame({
"question": ["What products are trending?"]
})
# Step 3: Specify all dependencies
pip_requirements = [
"databricks-sdk>=0.20.0",
"mlflow>=2.10.0",
"langchain>=0.1.0",
"langchain-community>=0.0.1",
]
# Step 4: Create MLflow model wrapper
class AgentModel(mlflow.pyfunc.PythonModel):
def load_context(self, context):
"""Initialize agent when model loads"""
from your_module import GenieAgent
self.agent = GenieAgent()
def predict(self, context, model_input):
"""Handle predictions"""
responses = []
for question in model_input['question']:
result = self.agent.query(question)
responses.append(result['output'])
return pd.DataFrame({'response': responses})
# Step 5: Log with all metadata
with mlflow.start_run(run_name="agent_v1"):
mlflow.pyfunc.log_model(
artifact_path="agent",
python_model=AgentModel(),
signature=signature,
input_example=input_example,
pip_requirements=pip_requirements,
registered_model_name="main.default.my_agent"
)
# Log configuration
mlflow.log_param("foundation_model", "llama-3-1-70b")
mlflow.log_param("tools", "customer,inventory,web")
mlflow.log_param("temperature", 0.1)
# Step 6: Test locally before deploying
model_uri = "runs:/<run_id>/agent"
loaded_model = mlflow.pyfunc.load_model(model_uri)
# Validate it works
test_df = pd.DataFrame({"question": ["test query"]})
result = loaded_model.predict(test_df)
assert result is not None, "Model prediction failed"
Problem 4: High Latency in Production
Symptoms:
- •Responses take >30 seconds
- •Users abandoning queries
- •High timeout rates
- •Poor user experience
Root causes:
- •Cold starts on serverless
- •No request batching
- •Inefficient tool implementations
- •Wrong workload size
Solutions:
Strategy 1: Configure Model Serving Properly
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import (
ServedEntityInput,
EndpointCoreConfigInput
)
w = WorkspaceClient()
# For agents, optimize configuration:
w.serving_endpoints.create(
name="agent-endpoint",
config=EndpointCoreConfigInput(
served_entities=[
ServedEntityInput(
entity_name="main.default.my_agent",
entity_version=1,
workload_size="Small", # Start small
scale_to_zero_enabled=True, # Cost optimization
# For latency-critical apps:
# scale_to_zero_enabled=False # Keep warm
)
],
# Optional: Traffic config for A/B testing
traffic_config={
"routes": [
{"served_model_name": "my_agent-1", "traffic_percentage": 100}
]
}
)
)
Strategy 2: Implement Request Batching
class AgentModel(mlflow.pyfunc.PythonModel):
def predict(self, context, model_input):
"""Batch predictions for efficiency"""
questions = model_input['question'].tolist()
# Process in batches
batch_size = 5
all_responses = []
for i in range(0, len(questions), batch_size):
batch = questions[i:i+batch_size]
# Parallel processing within batch
with concurrent.futures.ThreadPoolExecutor() as executor:
responses = list(executor.map(
lambda q: self.agent.query(q)['output'],
batch
))
all_responses.extend(responses)
return pd.DataFrame({'response': all_responses})
Strategy 3: Cache at Multiple Levels
from functools import lru_cache
import hashlib
class CachedAgentModel(mlflow.pyfunc.PythonModel):
def __init__(self):
self.response_cache = {} # Simple in-memory cache
def predict(self, context, model_input):
responses = []
for question in model_input['question']:
# Check cache
cache_key = hashlib.md5(question.encode()).hexdigest()
if cache_key in self.response_cache:
responses.append(self.response_cache[cache_key])
else:
# Execute agent
result = self.agent.query(question)
response = result['output']
# Cache result
self.response_cache[cache_key] = response
responses.append(response)
return pd.DataFrame({'response': responses})
Problem 5: Can't Track Agent Costs
Symptoms:
- •Unexpected high bills
- •Can't attribute costs to queries
- •No visibility into token usage
- •Can't optimize expensive queries
Root causes:
- •Not logging token usage
- •No cost tracking per query
- •Missing Foundation Model API metrics
Solution: Comprehensive Cost Tracking
import mlflow
from datetime import datetime
class CostTrackingAgent(mlflow.pyfunc.PythonModel):
def predict(self, context, model_input):
responses = []
for question in model_input['question']:
start_time = datetime.now()
# Execute agent
result = self.agent.query(question)
end_time = datetime.now()
latency_ms = (end_time - start_time).total_seconds() * 1000
# Extract metrics from result
tool_calls = len(result.get('intermediate_steps', []))
# Log to MLflow
with mlflow.start_span(name="agent_query") as span:
span.set_attribute("question", question)
span.set_attribute("latency_ms", latency_ms)
span.set_attribute("tool_calls", tool_calls)
span.set_attribute("response_length", len(result['output']))
# If LLM provides token counts:
if 'token_usage' in result:
span.set_attribute("input_tokens", result['token_usage']['input'])
span.set_attribute("output_tokens", result['token_usage']['output'])
# Calculate cost (example for Llama 3.1 70B)
input_cost = result['token_usage']['input'] * 0.001 / 1000
output_cost = result['token_usage']['output'] * 0.002 / 1000
total_cost = input_cost + output_cost
span.set_attribute("cost_usd", total_cost)
responses.append(result['output'])
return pd.DataFrame({'response': responses})
MLflow Integration Patterns
Pattern 1: Development Workflow
import mlflow
# Set experiment
mlflow.set_experiment("/Users/your_email/agent_development")
# Enable tracing
mlflow.langchain.autolog()
# Development iteration
with mlflow.start_run(run_name="experiment_1"):
# Build agent
agent = GenieAgent(temperature=0.1)
# Test queries
test_queries = ["query1", "query2", "query3"]
for query in test_queries:
result = agent.query(query)
# Automatically traced
# Log parameters
mlflow.log_param("temperature", 0.1)
mlflow.log_param("model", "llama-3-1-70b")
mlflow.log_param("tools", "customer,inventory")
# Log metrics
mlflow.log_metric("avg_latency_ms", 5000)
mlflow.log_metric("tool_calls_per_query", 1.5)
Pattern 2: Model Registration
# After development, register for production
with mlflow.start_run(run_name="production_candidate"):
mlflow.pyfunc.log_model(
artifact_path="agent",
python_model=AgentModel(),
signature=signature,
registered_model_name="main.default.my_agent"
)
run_id = mlflow.active_run().info.run_id
# Promote to production
from mlflow.tracking import MlflowClient
client = MlflowClient()
# Add model version alias
client.set_registered_model_alias(
name="main.default.my_agent",
alias="production",
version=1
)
Pattern 3: A/B Testing
# Deploy two versions for comparison
w.serving_endpoints.update_config(
name="agent-endpoint",
served_entities=[
ServedEntityInput(
entity_name="main.default.my_agent",
entity_version=1,
workload_size="Small",
scale_to_zero_enabled=True
),
ServedEntityInput(
entity_name="main.default.my_agent",
entity_version=2,
workload_size="Small",
scale_to_zero_enabled=True
)
],
traffic_config={
"routes": [
{"served_model_name": "my_agent-1", "traffic_percentage": 50},
{"served_model_name": "my_agent-2", "traffic_percentage": 50}
]
}
)
# Monitor performance of each version
# After validation, shift 100% to winner
Agent Evaluation Best Practices
Building Evaluation Datasets
# Principle: Cover all agent capabilities
eval_cases = [
# Single-tool queries
{
"question": "What products are trending?",
"expected_tools": ["customer_behavior"],
"expected_answer": "Products X, Y, Z trending...",
"category": "single_tool"
},
# Multi-tool queries
{
"question": "Trending products at risk of overstock?",
"expected_tools": ["customer_behavior", "inventory"],
"expected_answer": "X and Y trending but overstocked...",
"category": "multi_tool"
},
# Edge cases
{
"question": "Tell me about products",
"expected_behavior": "ask_clarification",
"category": "ambiguous"
},
# Error handling
{
"question": "Query invalid data source",
"expected_behavior": "graceful_error",
"category": "error"
}
]
eval_df = pd.DataFrame(eval_cases)
Custom Evaluation Metrics
def evaluate_tool_selection(predictions, targets):
"""Did agent call the right tools?"""
correct = 0
for pred, expected in zip(predictions, targets):
expected_tools = set(expected.split(','))
actual_tools = set([
step[0].tool
for step in pred['intermediate_steps']
])
if expected_tools == actual_tools:
correct += 1
return correct / len(predictions)
def evaluate_latency(predictions):
"""Measure response time"""
latencies = [pred['latency_ms'] for pred in predictions]
return {
"mean": sum(latencies) / len(latencies),
"p95": sorted(latencies)[int(len(latencies) * 0.95)],
"max": max(latencies)
}
def evaluate_cost(predictions):
"""Measure cost per query"""
costs = [pred['cost_usd'] for pred in predictions]
return {
"mean": sum(costs) / len(costs),
"total": sum(costs)
}
Running Evaluation
# Step 1: Load evaluation data
eval_df = pd.read_csv("agent_eval_dataset.csv")
# Step 2: Run evaluation
with mlflow.start_run(run_name="evaluation_v1"):
results = mlflow.evaluate(
model="main.default.my_agent@production",
data=eval_df,
targets="expected_answer",
model_type="question-answering",
evaluators=["default"],
extra_metrics=[
evaluate_tool_selection,
evaluate_latency,
evaluate_cost
]
)
# Step 3: Log results
mlflow.log_metrics({
"tool_accuracy": results.metrics['tool_selection'],
"mean_latency": results.metrics['latency_mean'],
"p95_latency": results.metrics['latency_p95'],
"cost_per_query": results.metrics['cost_mean']
})
# Step 4: Log evaluation dataset
mlflow.log_table(eval_df, "evaluation_dataset.json")
Production Monitoring
Key Metrics to Track
# 1. Usage Metrics - queries_per_minute - unique_users - query_distribution_by_tool # 2. Performance Metrics - latency_p50, p95, p99 - timeout_rate - error_rate - tool_call_distribution # 3. Cost Metrics - cost_per_query - total_daily_cost - tokens_per_query - tool_calls_per_query # 4. Quality Metrics - user_feedback_score - retry_rate - clarification_request_rate
Monitoring Implementation
from databricks.sdk import WorkspaceClient
import time
def monitor_agent_endpoint(endpoint_name: str):
"""Monitor agent serving endpoint"""
w = WorkspaceClient()
while True:
# Get endpoint metrics
metrics = w.serving_endpoints.get(endpoint_name)
# Log to MLflow
with mlflow.start_run(run_name="monitoring"):
mlflow.log_metric("requests_per_minute", metrics.rpm)
mlflow.log_metric("error_rate", metrics.error_rate)
mlflow.log_metric("p95_latency_ms", metrics.p95_latency)
# Alert if thresholds exceeded
if metrics.error_rate > 0.05: # 5% error rate
send_alert("High error rate on agent endpoint")
if metrics.p95_latency > 30000: # 30 seconds
send_alert("High latency on agent endpoint")
time.sleep(60) # Check every minute
CI/CD for Agents
Automated Deployment Pipeline
# .github/workflows/agent-deployment.yml
# or Databricks Jobs workflow
# Step 1: Run tests
def test_agent():
agent = GenieAgent()
# Unit tests
assert agent.query("test")['output']
# Integration tests
eval_df = load_eval_dataset()
results = run_evaluation(agent, eval_df)
# Quality gates
assert results['tool_accuracy'] > 0.90
assert results['mean_latency'] < 10000 # 10 seconds
return results
# Step 2: Log to MLflow
with mlflow.start_run():
test_results = test_agent()
mlflow.pyfunc.log_model(
artifact_path="agent",
python_model=AgentModel(),
signature=signature,
registered_model_name="main.default.my_agent"
)
run_id = mlflow.active_run().info.run_id
# Step 3: Deploy to staging
deploy_to_endpoint(
model_uri=f"runs:/{run_id}/agent",
endpoint_name="agent-staging"
)
# Step 4: Run validation
validation_results = validate_endpoint("agent-staging")
# Step 5: Deploy to production if validation passes
if validation_results['success']:
deploy_to_endpoint(
model_uri=f"runs:/{run_id}/agent",
endpoint_name="agent-production"
)
Quick Reference
Deployment Checklist
- • MLflow tracing enabled
- • Evaluation dataset created (20+ diverse examples)
- • Model signature defined
- • Dependencies specified
- • Input example provided
- • Local testing passed
- • Evaluation metrics > thresholds
- • Model registered in Unity Catalog
- • Deployed to staging endpoint
- • Staging validation passed
- • Production deployment
- • Monitoring configured
- • Alerts set up
Common MLflow Commands
# Set experiment
mlflow.set_experiment("/path/to/experiment")
# Enable tracing
mlflow.langchain.autolog()
# Log model
mlflow.pyfunc.log_model(
artifact_path="agent",
python_model=model,
signature=signature,
registered_model_name="catalog.schema.model"
)
# Load model
model = mlflow.pyfunc.load_model("models:/catalog.schema.model/1")
# Run evaluation
results = mlflow.evaluate(
model=model_uri,
data=eval_df,
targets="expected",
evaluators="default"
)
Related Skills
- •mosaic-ai-agent: Build the agents to deploy
- •genie-integration: Integrate Genie rooms in production agents