Optimizing Pipelines with the Python API
Use .optimize() to find cost-accuracy trade-offs for your pipeline. MOAR explores different configurations (models, validation steps, operation rewrites) and returns a frontier of optimized pipelines.
Quick Example
import docetl
docetl.default_model = "gpt-4o-mini"
frame = (
docetl.read_json("medical_transcripts.json")
.map(
prompt="Analyze the transcript: {{ input.src }}\nList all medications mentioned.",
output={"schema": {"medication": "list[str]"}},
)
)
# Define your evaluation function
@docetl.register_eval
def evaluate(results):
correct = sum(
1 for r in results
for med in r.get("medication", [])
if med.lower() in r.get("src", "").lower()
)
return {"medication_extraction_score": correct}
# Optimize — models auto-detected from API keys
optimized = frame.optimize(
eval_fn=evaluate,
metric_key="medication_extraction_score",
)
# Run the optimized pipeline
rows = optimized.collect()
print(f"Cost: ${optimized.total_cost:.4f}")
# Inspect the Pareto frontier
print(optimized.search_results.to_df())
Evaluation Function
Pass any callable that takes the results list and returns a dict of metrics:
@docetl.register_eval
def evaluate(results):
correct = sum(
1 for r in results
for med in r.get("medication", [])
if med.lower() in r.get("src", "").lower()
)
return {"medication_extraction_score": correct}
optimized = frame.optimize(eval_fn=evaluate, metric_key="medication_extraction_score")
File paths for CLI
The CLI uses file-based evaluation via @register_eval. See the Evaluation Functions guide for that workflow.
Configuration Options
All parameters beyond eval_fn and metric_key are optional:
optimized = frame.optimize(
eval_fn=evaluate, # Your evaluation function
metric_key="score", # Key in eval_fn's return dict to optimize
models=["gpt-4o", "gpt-4o-mini"], # Override auto-detection
agent_model="gpt-4o", # Override auto-selection (or set docetl.agent_model)
max_iterations=40, # Search budget (default: 20)
save_dir="./moar_results", # Where to save results (default: temp dir)
exploration_weight=1.414, # UCB exploration constant
dataset_path="data/sample.json", # Sample dataset for optimization (default: full dataset)
max_threads=8, # Max concurrent LLM calls per pipeline run
max_concurrent_agents=3, # Parallel MCTS search agents (default: 3)
)
| Parameter | Description | Default |
|---|---|---|
eval_fn |
Callable that scores pipeline output. Takes a results file path and returns a dict of metrics. | Required |
metric_key |
Which key from eval_fn's return dict to use as the optimization metric. |
Required |
models |
List of LiteLLM model names to explore. | Auto-detected from API keys |
agent_model |
Model for the MOAR rewrite agent. | Auto-selected best available (or docetl.agent_model) |
max_iterations |
Number of MCTS search iterations. Higher = more exploration. | 20 |
save_dir |
Directory to save optimized pipelines and results. | Temp directory |
exploration_weight |
UCB exploration constant. Higher values explore more; lower values exploit. | 1.414 |
dataset_path |
Path to a sample dataset for optimization (avoids optimizing on your full/test set). | Uses the pipeline's dataset |
max_threads |
Max concurrent LLM calls for each pipeline execution during search. | docetl.max_threads or cpu_count * 4 |
max_concurrent_agents |
Number of parallel MCTS search agents. Each agent explores a different part of the search tree. | 3 |
See the Configuration Reference for details.
Working with Results
optimized = frame.optimize(eval_fn=evaluate, metric_key="score")
# The optimized frame is ready to run
rows = optimized.collect()
# Access the full MOAR search results
results = optimized.search_results
# Best accuracy on the frontier
best = results.best()
print(f"Best accuracy: {best.accuracy}, cost: ${best.cost:.4f}")
# Cheapest option on the frontier
cheap = results.cheapest()
print(f"Cheapest cost: ${cheap.cost:.4f}, accuracy: {cheap.accuracy:.4f}")
# Browse the full frontier
for plan in results.frontier:
print(f"Cost: ${plan.cost:.4f}, Accuracy: {plan.accuracy:.4f}")
# Analyze as a DataFrame
print(results.to_df())
See Understanding Results for more details.