Skip to content

Python API Examples

This document provides two examples of how to use DocETL's Python API. Each example demonstrates a single pipeline step with multiple operations.

Example 1: Extract and Summarize Product Review Themes

This example extracts themes and quotes from product reviews, then summarizes the top themes across ALL documents using the special _all reduce key:

from docetl.api import Pipeline, Dataset, MapOp, ReduceOp, PipelineStep, PipelineOutput

# Define dataset - A CSV of product reviews
dataset = Dataset(
    type="file",
    path="product_reviews.csv"  # Contains columns: review_id, product_id, rating, review_text
)

# Define operations
operations = [
    # Extract themes and quotes from each review
    MapOp(
        name="extract_themes",
        type="map",
        prompt="""
        Analyze this product review and extract the key themes and representative quotes:

        Review: {{ input.review_text }}
        Rating: {{ input.rating }}

        Identify 2-3 major themes (e.g., usability, quality, value) and extract direct quotes that best represent each theme.
        """,
        output={
            "schema": {
                "themes": "list[string]",
                "quotes": "list[string]",
                "sentiment": "string"
            }
        }
    ),
    # Summarize all themes across reviews using _all key
    ReduceOp(
        name="summarize_themes",
        type="reduce",
        reduce_key="_all",  # Special key to reduce across all items
        prompt="""
        Analyze and synthesize the themes and quotes from these product reviews:

        {% for item in inputs %}
        Review ID: {{ item.review_id }}
        Product: {{ item.product_id }}
        Rating: {{ item.rating }}
        Themes: {{ item.themes | join(", ") }}
        Quotes: 
        {% for quote in item.quotes %}
        - "{{ quote }}"
        {% endfor %}
        Sentiment: {{ item.sentiment }}

        {% endfor %}

        Summarize the most frequent themes, the most representative quotes for each theme, and the overall sentiment.
        """,
        output={
            "schema": {
                "summary": "string"
            }
        }
    )
]

# Define pipeline step (can consist of multiple operations)
step = PipelineStep(
    name="review_analysis",
    input="product_reviews",
    operations=["extract_themes", "summarize_themes"]
)

# Define output
output = PipelineOutput(type="file", path="review_analysis_summary.json")

# Create and run pipeline
pipeline = Pipeline(
    name="review_analysis_pipeline",
    datasets={"product_reviews": dataset},
    operations=operations,
    steps=[step],
    output=output,
    default_model="gpt-4o-mini"
)

# Run the pipeline
cost = pipeline.run()
print(f"Pipeline execution completed. Total cost: ${cost:.2f}")

Example 2: Map-Unnest-Resolve-Reduce on Theme Keys

This example extracts theme-quote pairs from reviews, unnests them, resolves similar themes, and then reduces on the theme key. Here we will also optimize the pipeline.

from docetl.api import Pipeline, Dataset, MapOp, UnnestOp, ResolveOp, ReduceOp, PipelineStep, PipelineOutput

# Define dataset - A JSON file with product reviews
dataset = Dataset(
    type="file",
    path="product_reviews.csv" # Same csv as previously
)

# Define operations
operations = [
    # Extract theme-quote pairs from each review
    MapOp(
        name="extract_theme_quotes",
        type="map",
        prompt="""
        Extract theme and quote pairs from this product review:

        Review: {{ input.review_text }}
        Product: {{ input.product_name }}
        Rating: {{ input.rating }}

        For each distinct theme in the review, extract a direct quote that best represents that theme.
        Return each theme and its representative quote as a separate object in the "theme_quotes" array.
        """,
        output={
            "schema": {
                "theme_quotes": "array"  # Array of objects with theme and quote properties
            }
        }
    ),
    # Unnest to create separate items for each theme-quote pair
    UnnestOp(
        name="unnest_theme_quotes",
        type="unnest",
        array_path="theme_quotes"
    ),
    # Resolve similar themes using fuzzy matching
    ResolveOp(
        name="resolve_themes",
        type="resolve",
        comparison_prompt="""
        Determine if these two themes are the same or closely related:

        Theme 1: {{ input1.theme }}
        Theme 2: {{ input2.theme }}

        Consider semantic similarity, synonyms, and conceptual overlap.
        """,
        resolution_prompt="""
        Given the following list of similar themes, determine a canonical name that best represents all of them:

        {% for item in inputs %}
        Theme: {{ item.theme }}
        {% endfor %}

        Choose a clear, concise name that accurately captures the core concept shared across all these related themes.
        """
    ),
    # Reduce by theme to aggregate quotes and insights
    ReduceOp(
        name="aggregate_by_theme",
        type="reduce",
        reduce_key="theme",
        prompt="""
        Analyze all quotes related to the theme "{{ reduce_key }}":

        {% for item in inputs %}
        Product: {{ item.product_name }}
        Rating: {{ item.rating }}
        Quote: "{{ item.quote }}"

        {% endfor %}

        Summarize the key insights about this theme across all products and ratings.
        """,
        output={
            "schema": {
                "summary": "string"
            }
        }
    )
]

# Define pipeline with a single step
step = PipelineStep(
    name="theme_analysis",
    input="product_reviews",
    operations=["extract_theme_quotes", "unnest_theme_quotes", "resolve_themes", "aggregate_by_theme"]
)

# Define output
output = PipelineOutput(type="file", path="theme_analysis_results.json")

# Create the pipeline
pipeline = Pipeline(
    name="theme_analysis_pipeline",
    datasets={"product_reviews": dataset},
    operations=operations,
    steps=[step],
    output=output,
    default_model="gpt-4o"
)

# Optimize the pipeline before running
optimized_pipeline = pipeline.optimize()

# Run the optimized pipeline
cost = optimized_pipeline.run()
print(f"Pipeline execution completed. Total cost: ${cost:.2f}")

Note that datasets can be json or CSV.