Python API Reference
The Python API lets you define, optimize, and run DocETL pipelines programmatically. All classes are importable from docetl.api.
from docetl.api import (
Pipeline, Dataset, PipelineStep, PipelineOutput,
MapOp, ReduceOp, ResolveOp, FilterOp, ParallelMapOp,
EquijoinOp, SplitOp, GatherOp, UnnestOp, SampleOp,
CodeMapOp, CodeReduceOp, CodeFilterOp, ExtractOp,
)
Pipeline
The main class for defining and running a complete document processing pipeline.
Pipeline(
name: str,
datasets: dict[str, Dataset],
operations: list[OpType],
steps: list[PipelineStep],
output: PipelineOutput,
default_model: str | None = None,
parsing_tools: list[ParsingTool] = [],
)
| Parameter |
Type |
Description |
name |
str |
Name of the pipeline. |
datasets |
dict[str, Dataset] |
Datasets keyed by name. |
operations |
list[OpType] |
List of operation definitions. |
steps |
list[PipelineStep] |
Ordered steps to execute. |
output |
PipelineOutput |
Output configuration. |
default_model |
str \| None |
Default LLM model for all operations. |
parsing_tools |
list[ParsingTool] |
Custom parsing functions. Can be ParsingTool objects or plain Python functions. |
Methods:
pipeline.run() -> float — Execute the pipeline. Returns total cost.
pipeline.optimize(**kwargs) -> Pipeline — Return an optimized copy of the pipeline.
Dataset
Dataset(
type: "file" | "memory",
path: str | list[dict] | pd.DataFrame,
source: str = "local",
parsing: list[dict[str, str]] | None = None,
)
| Parameter |
Type |
Description |
type |
"file" or "memory" |
"file" to load from disk, "memory" for in-memory data. |
path |
str \| list[dict] \| DataFrame |
File path (for "file" type) or data (for "memory" type). |
source |
str |
Source identifier. Defaults to "local". |
parsing |
list[dict] |
Parsing instructions. Each dict has input_key, function, and output_key. |
PipelineStep
PipelineStep(
name: str,
input: str | None,
operations: list[str | dict],
)
| Parameter |
Type |
Description |
name |
str |
Step name. |
input |
str \| None |
Name of a dataset or previous step to use as input. |
operations |
list[str \| dict] |
Operation names (or dicts for more complex configs) to run in this step. |
PipelineOutput
PipelineOutput(
type: str,
path: str,
intermediate_dir: str | None = None,
)
| Parameter |
Type |
Description |
type |
str |
Output type (e.g., "file"). |
path |
str |
Path to write output. |
intermediate_dir |
str \| None |
Directory for intermediate results. |
LLM-Powered Operations
All operations share these base fields:
| Field |
Type |
Default |
Description |
name |
str |
required |
Unique operation name. |
type |
str |
required |
Operation type (must match the Op class). |
skip_on_error |
bool |
False |
Skip documents that cause errors. |
MapOp
Applies an LLM prompt to each document independently.
MapOp(name="...", type="map", prompt="...", output={"schema": {...}})
| Field |
Type |
Default |
Description |
prompt |
str |
— |
Jinja2 template. Use {{ input.key }} to access fields. |
output |
dict |
— |
Output schema, e.g. {"schema": {"field": "string"}}. |
model |
str |
None |
Override the default model. |
drop_keys |
list[str] |
None |
Keys to drop from output. |
batch_size |
int |
None |
Process documents in batches. |
batch_prompt |
str |
None |
Jinja2 template for batch processing. Uses {{ inputs }}. |
timeout |
int |
None |
Timeout in seconds per LLM call. |
optimize |
bool |
None |
Mark for optimization. |
limit |
int |
None |
Max documents to process. |
litellm_completion_kwargs |
dict |
{} |
Extra kwargs passed to litellm. |
enable_observability |
bool |
False |
Enable observability logging. |
ReduceOp
Groups documents by key(s) and reduces each group with an LLM.
ReduceOp(name="...", type="reduce", reduce_key="key", prompt="...", output={"schema": {...}})
| Field |
Type |
Default |
Description |
reduce_key |
str \| list[str] |
required |
Key(s) to group by. Use "_all" for a single group. |
prompt |
str |
required |
Jinja2 template. Use {% for item in inputs %} to iterate. |
output |
dict |
required |
Output schema. |
model |
str |
None |
Override the default model. |
input |
dict |
None |
Input schema constraints. |
pass_through |
bool |
None |
Pass through non-reduced keys. |
associative |
bool |
None |
Whether reduce is associative (enables parallelism). |
fold_prompt |
str |
None |
Prompt for incremental fold operations. |
fold_batch_size |
int |
None |
Batch size for fold. |
merge_prompt |
str |
None |
Prompt for merging fold results. |
merge_batch_size |
int |
None |
Batch size for merge. |
optimize |
bool |
None |
Mark for optimization. |
timeout |
int |
None |
Timeout in seconds. |
limit |
int |
None |
Max groups to process. |
litellm_completion_kwargs |
dict |
{} |
Extra kwargs passed to litellm. |
ResolveOp
Deduplicates/resolves entities by comparing pairs of documents.
ResolveOp(name="...", type="resolve", comparison_prompt="...", resolution_prompt="...")
| Field |
Type |
Default |
Description |
comparison_prompt |
str |
required |
Jinja2 template comparing {{ input1 }} and {{ input2 }}. |
resolution_prompt |
str |
None |
Prompt for resolving matched pairs. |
output |
dict |
None |
Output schema. |
embedding_model |
str |
None |
Model for blocking embeddings. |
comparison_model |
str |
None |
Model for comparisons. |
resolution_model |
str |
None |
Model for resolution. |
blocking_keys |
list[str] |
None |
Keys to use for blocking. |
blocking_threshold |
float |
None |
Similarity threshold for blocking (0–1). |
blocking_target_recall |
float |
None |
Target recall for blocking (0–1). |
blocking_conditions |
list[str] |
None |
Custom blocking conditions. |
optimize |
bool |
None |
Mark for optimization. |
timeout |
int |
None |
Timeout in seconds. |
litellm_completion_kwargs |
dict |
{} |
Extra kwargs passed to litellm. |
FilterOp
Filters documents using an LLM prompt that returns a boolean.
FilterOp(name="...", type="filter", prompt="...", output={"schema": {...}})
| Field |
Type |
Default |
Description |
prompt |
str |
required |
Jinja2 template. Use {{ input.key }}. |
output |
dict |
required |
Must include a boolean field in schema. |
ParallelMapOp
Runs multiple prompts on each document in parallel.
ParallelMapOp(name="...", type="parallel_map", prompts=[...], output={"schema": {...}})
| Field |
Type |
Default |
Description |
prompts |
list[dict] |
— |
List of prompt configurations. |
output |
dict |
— |
Combined output schema. |
drop_keys |
list[str] |
None |
Keys to drop from output. |
EquijoinOp
Joins two datasets by comparing document pairs with an LLM.
EquijoinOp(name="...", type="equijoin", comparison_prompt="...")
| Field |
Type |
Default |
Description |
comparison_prompt |
str |
required |
Jinja2 template comparing {{ left }} and {{ right }}. |
output |
dict |
None |
Output schema. |
blocking_keys |
dict[str, list[str]] |
None |
Keys for blocking per dataset. |
blocking_threshold |
float |
None |
Similarity threshold. |
blocking_conditions |
list[str] |
None |
Custom blocking conditions. |
limits |
dict[str, int] |
None |
Max matches per side. |
comparison_model |
str |
None |
Model for comparisons. |
embedding_model |
str |
None |
Model for embeddings. |
optimize |
bool |
None |
Mark for optimization. |
timeout |
int |
None |
Timeout in seconds. |
litellm_completion_kwargs |
dict |
{} |
Extra kwargs passed to litellm. |
Extracts specific information from documents with line-level precision.
ExtractOp(name="...", type="extract", prompt="...", document_keys=["content"])
| Field |
Type |
Default |
Description |
prompt |
str |
required |
Extraction prompt. |
document_keys |
list[str] |
required |
Keys containing document text. |
model |
str |
None |
Override the default model. |
format_extraction |
bool |
True |
Format extracted content. |
extraction_key_suffix |
str |
None |
Suffix for extraction output keys. |
extraction_method |
"line_number" \| "regex" |
"line_number" |
Extraction method. |
timeout |
int |
None |
Timeout in seconds. |
limit |
int |
None |
Max documents to process. |
litellm_completion_kwargs |
dict |
{} |
Extra kwargs passed to litellm. |
Auxiliary Operations
SplitOp
Splits documents into chunks.
SplitOp(name="...", type="split", split_key="content", method="token_count", method_kwargs={"num_tokens": 500})
| Field |
Type |
Default |
Description |
split_key |
str |
required |
Key containing text to split. |
method |
str |
required |
Split method (e.g., "token_count", "delimiter"). |
method_kwargs |
dict |
required |
Arguments for the split method. |
model |
str |
None |
Model for token counting. |
GatherOp
Adds surrounding context to chunks created by split.
GatherOp(name="...", type="gather", content_key="...", doc_id_key="...", order_key="...")
| Field |
Type |
Default |
Description |
content_key |
str |
required |
Key with chunk content. |
doc_id_key |
str |
required |
Key identifying the source document. |
order_key |
str |
required |
Key for chunk ordering. |
peripheral_chunks |
dict |
None |
Configuration for surrounding context. |
doc_header_key |
str |
None |
Key for document headers. |
UnnestOp
Flattens a list-valued field into separate documents.
UnnestOp(name="...", type="unnest", unnest_key="items")
| Field |
Type |
Default |
Description |
unnest_key |
str |
required |
Key containing the list to unnest. |
keep_empty |
bool |
None |
Keep documents with empty lists. |
expand_fields |
list[str] |
None |
Additional fields to expand. |
recursive |
bool |
None |
Recursively unnest nested lists. |
depth |
int |
None |
Max recursion depth. |
SampleOp
Samples a subset of documents.
SampleOp(name="...", type="sample", method="uniform", samples=100)
| Field |
Type |
Default |
Description |
method |
str |
required |
One of "uniform", "outliers", "custom", "first", "top_embedding", "top_fts". |
samples |
int \| float \| list |
None |
Number of samples or fraction. |
stratify_key |
str \| list[str] |
None |
Key(s) for stratified sampling. |
samples_per_group |
bool |
False |
Apply sample count per group. |
method_kwargs |
dict |
{} |
Extra arguments for the sampling method. |
random_state |
int |
None |
Random seed for reproducibility. |
Code Operations
Code operations run Python functions instead of LLM calls. The code parameter accepts either a string containing Python code that defines a transform function, or a regular Python function.
def my_transform(doc: dict) -> dict:
return {"doubled": doc["value"] * 2}
op = CodeMapOp(name="double", type="code_map", code=my_transform)
CodeMapOp
| Field |
Type |
Default |
Description |
code |
str \| Callable |
required |
fn(doc: dict) -> dict |
drop_keys |
list[str] |
None |
Keys to drop from output. |
limit |
int |
None |
Max documents to process. |
CodeReduceOp
| Field |
Type |
Default |
Description |
code |
str \| Callable |
required |
fn(group: list[dict]) -> dict |
limit |
int |
None |
Max groups to process. |
CodeFilterOp
| Field |
Type |
Default |
Description |
code |
str \| Callable |
required |
fn(doc: dict) -> bool |
limit |
int |
None |
Max documents to process. |