Skip to content

Python API

Operations

docetl.schemas.MapOp = map.MapOperation.schema module-attribute

docetl.schemas.ResolveOp = resolve.ResolveOperation.schema module-attribute

docetl.schemas.ReduceOp = reduce.ReduceOperation.schema module-attribute

docetl.schemas.ParallelMapOp = map.ParallelMapOperation.schema module-attribute

docetl.schemas.FilterOp = filter.FilterOperation.schema module-attribute

docetl.schemas.EquijoinOp = equijoin.EquijoinOperation.schema module-attribute

docetl.schemas.SplitOp = split.SplitOperation.schema module-attribute

docetl.schemas.GatherOp = gather.GatherOperation.schema module-attribute

docetl.schemas.UnnestOp = unnest.UnnestOperation.schema module-attribute

docetl.schemas.SampleOp = sample.SampleOperation.schema module-attribute

docetl.schemas.ClusterOp = cluster.ClusterOperation.schema module-attribute

Dataset and Pipeline

docetl.schemas.Dataset = dataset.Dataset.schema module-attribute

docetl.schemas.ParsingTool

Bases: BaseModel

Represents a parsing tool used for custom data parsing in the pipeline.

Attributes:

Name Type Description
name str

The name of the parsing tool. This should be unique within the pipeline configuration.

function_code str

The Python code defining the parsing function. This code will be executed to parse the input data according to the specified logic. It should return a list of strings, where each string is its own document.

Example
parsing_tools:
  - name: ocr_parser
    function_code: |
      import pytesseract
      from pdf2image import convert_from_path
      def ocr_parser(filename: str) -> List[str]:
          images = convert_from_path(filename)
          text = ""
          for image in images:
              text += pytesseract.image_to_string(image)
          return [text]
Source code in docetl/base_schemas.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class ParsingTool(BaseModel):
    """
    Represents a parsing tool used for custom data parsing in the pipeline.

    Attributes:
        name (str): The name of the parsing tool. This should be unique within the pipeline configuration.
        function_code (str): The Python code defining the parsing function. This code will be executed
                             to parse the input data according to the specified logic. It should return a list of strings, where each string is its own document.

    Example:
        ```yaml
        parsing_tools:
          - name: ocr_parser
            function_code: |
              import pytesseract
              from pdf2image import convert_from_path
              def ocr_parser(filename: str) -> List[str]:
                  images = convert_from_path(filename)
                  text = ""
                  for image in images:
                      text += pytesseract.image_to_string(image)
                  return [text]
        ```
    """

    name: str
    function_code: str

docetl.schemas.PipelineStep

Bases: BaseModel

Represents a step in the pipeline.

Attributes:

Name Type Description
name str

The name of the step.

operations List[Union[Dict[str, Any], str]]

A list of operations to be applied in this step. Each operation can be either a string (the name of the operation) or a dictionary (for more complex configurations).

input Optional[str]

The input for this step. It can be either the name of a dataset or the name of a previous step. If not provided, the step will use the output of the previous step as its input.

Example
# Simple step with a single operation
process_step = PipelineStep(
    name="process_step",
    input="my_dataset",
    operations=["process"]
)

# Step with multiple operations
summarize_step = PipelineStep(
    name="summarize_step",
    input="process_step",
    operations=["summarize"]
)

# Step with a more complex operation configuration
custom_step = PipelineStep(
    name="custom_step",
    input="previous_step",
    operations=[
        {
            "custom_operation": {
                "model": "gpt-4",
                "prompt": "Perform a custom analysis on the following text:"
            }
        }
    ]
)

These examples show different ways to configure pipeline steps, from simple single-operation steps to more complex configurations with custom parameters.

Source code in docetl/base_schemas.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class PipelineStep(BaseModel):
    """
    Represents a step in the pipeline.

    Attributes:
        name (str): The name of the step.
        operations (List[Union[Dict[str, Any], str]]): A list of operations to be applied in this step.
            Each operation can be either a string (the name of the operation) or a dictionary
            (for more complex configurations).
        input (Optional[str]): The input for this step. It can be either the name of a dataset
            or the name of a previous step. If not provided, the step will use the output
            of the previous step as its input.

    Example:
        ```python
        # Simple step with a single operation
        process_step = PipelineStep(
            name="process_step",
            input="my_dataset",
            operations=["process"]
        )

        # Step with multiple operations
        summarize_step = PipelineStep(
            name="summarize_step",
            input="process_step",
            operations=["summarize"]
        )

        # Step with a more complex operation configuration
        custom_step = PipelineStep(
            name="custom_step",
            input="previous_step",
            operations=[
                {
                    "custom_operation": {
                        "model": "gpt-4",
                        "prompt": "Perform a custom analysis on the following text:"
                    }
                }
            ]
        )
        ```

    These examples show different ways to configure pipeline steps, from simple
    single-operation steps to more complex configurations with custom parameters.
    """

    name: str
    operations: List[Union[Dict[str, Any], str]]
    input: Optional[str] = None

docetl.schemas.PipelineOutput

Bases: BaseModel

Represents the output configuration for a pipeline.

Attributes:

Name Type Description
type str

The type of output. This could be 'file', 'database', etc.

path str

The path where the output will be stored. This could be a file path, database connection string, etc., depending on the type.

intermediate_dir Optional[str]

The directory to store intermediate results, if applicable. Defaults to None.

Example
output = PipelineOutput(
    type="file",
    path="/path/to/output.json",
    intermediate_dir="/path/to/intermediate/results"
)
Source code in docetl/base_schemas.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class PipelineOutput(BaseModel):
    """
    Represents the output configuration for a pipeline.

    Attributes:
        type (str): The type of output. This could be 'file', 'database', etc.
        path (str): The path where the output will be stored. This could be a file path,
                    database connection string, etc., depending on the type.
        intermediate_dir (Optional[str]): The directory to store intermediate results,
                                          if applicable. Defaults to None.

    Example:
        ```python
        output = PipelineOutput(
            type="file",
            path="/path/to/output.json",
            intermediate_dir="/path/to/intermediate/results"
        )
        ```
    """

    type: str
    path: str
    intermediate_dir: Optional[str] = None

docetl.api.Pipeline

Represents a complete document processing pipeline.

Attributes:

Name Type Description
name str

The name of the pipeline.

datasets Dict[str, Dataset]

A dictionary of datasets used in the pipeline, where keys are dataset names and values are Dataset objects.

operations List[OpType]

A list of operations to be performed in the pipeline.

steps List[PipelineStep]

A list of steps that make up the pipeline.

output PipelineOutput

The output configuration for the pipeline.

parsing_tools List[ParsingTool]

A list of parsing tools used in the pipeline. Defaults to an empty list.

default_model Optional[str]

The default language model to use for operations that require one. Defaults to None.

Example
def custom_parser(text: str) -> List[str]:
    # this will convert the text in the column to uppercase
    # You should return a list of strings, where each string is a separate document
    return [text.upper()]

pipeline = Pipeline(
    name="document_processing_pipeline",
    datasets={
        "input_data": Dataset(type="file", path="/path/to/input.json", parsing=[{"name": "custom_parser", "input_key": "content", "output_key": "uppercase_content"}]),
    },
    parsing_tools=[custom_parser],
    operations=[
        MapOp(
            name="process",
            type="map",
            prompt="Determine what type of document this is: {{ input.uppercase_content }}",
            output={"schema": {"document_type": "string"}}
        ),
        ReduceOp(
            name="summarize",
            type="reduce",
            reduce_key="document_type",
            prompt="Summarize the processed contents: {% for item in inputs %}{{ item.uppercase_content }} {% endfor %}",
            output={"schema": {"summary": "string"}}
        )
    ],
    steps=[
        PipelineStep(name="process_step", input="input_data", operations=["process"]),
        PipelineStep(name="summarize_step", input="process_step", operations=["summarize"])
    ],
    output=PipelineOutput(type="file", path="/path/to/output.json"),
    default_model="gpt-4o-mini"
)

This example shows a complete pipeline configuration with datasets, operations, steps, and output settings.

Source code in docetl/api.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
class Pipeline:
    """
    Represents a complete document processing pipeline.

    Attributes:
        name (str): The name of the pipeline.
        datasets (Dict[str, Dataset]): A dictionary of datasets used in the pipeline,
                                       where keys are dataset names and values are Dataset objects.
        operations (List[OpType]): A list of operations to be performed in the pipeline.
        steps (List[PipelineStep]): A list of steps that make up the pipeline.
        output (PipelineOutput): The output configuration for the pipeline.
        parsing_tools (List[ParsingTool]): A list of parsing tools used in the pipeline.
                                           Defaults to an empty list.
        default_model (Optional[str]): The default language model to use for operations
                                       that require one. Defaults to None.

    Example:
        ```python
        def custom_parser(text: str) -> List[str]:
            # this will convert the text in the column to uppercase
            # You should return a list of strings, where each string is a separate document
            return [text.upper()]

        pipeline = Pipeline(
            name="document_processing_pipeline",
            datasets={
                "input_data": Dataset(type="file", path="/path/to/input.json", parsing=[{"name": "custom_parser", "input_key": "content", "output_key": "uppercase_content"}]),
            },
            parsing_tools=[custom_parser],
            operations=[
                MapOp(
                    name="process",
                    type="map",
                    prompt="Determine what type of document this is: {{ input.uppercase_content }}",
                    output={"schema": {"document_type": "string"}}
                ),
                ReduceOp(
                    name="summarize",
                    type="reduce",
                    reduce_key="document_type",
                    prompt="Summarize the processed contents: {% for item in inputs %}{{ item.uppercase_content }} {% endfor %}",
                    output={"schema": {"summary": "string"}}
                )
            ],
            steps=[
                PipelineStep(name="process_step", input="input_data", operations=["process"]),
                PipelineStep(name="summarize_step", input="process_step", operations=["summarize"])
            ],
            output=PipelineOutput(type="file", path="/path/to/output.json"),
            default_model="gpt-4o-mini"
        )
        ```

    This example shows a complete pipeline configuration with datasets, operations,
    steps, and output settings.
    """

    def __init__(
        self,
        name: str,
        datasets: Dict[str, Dataset],
        operations: List[OpType],
        steps: List[PipelineStep],
        output: PipelineOutput,
        parsing_tools: List[Union[ParsingTool, Callable]] = [],
        default_model: Optional[str] = None,
        rate_limits: Optional[Dict[str, int]] = None,
    ):
        self.name = name
        self.datasets = datasets
        self.operations = operations
        self.steps = steps
        self.output = output
        self.parsing_tools = [
            (
                tool
                if isinstance(tool, ParsingTool)
                else ParsingTool(
                    name=tool.__name__, function_code=inspect.getsource(tool)
                )
            )
            for tool in parsing_tools
        ]
        self.default_model = default_model
        self.rate_limits = rate_limits
        self._load_env()

    def _load_env(self):
        import os

        from dotenv import load_dotenv

        # Get the current working directory
        cwd = os.getcwd()

        # Load .env file from the current working directory if it exists
        env_file = os.path.join(cwd, ".env")
        if os.path.exists(env_file):
            load_dotenv(env_file)

    def optimize(
        self,
        max_threads: Optional[int] = None,
        model: str = "gpt-4o",
        resume: bool = False,
        timeout: int = 60,
    ) -> "Pipeline":
        """
        Optimize the pipeline using the Optimizer.

        Args:
            max_threads (Optional[int]): Maximum number of threads to use for optimization.
            model (str): The model to use for optimization. Defaults to "gpt-4o".
            resume (bool): Whether to resume optimization from a previous state. Defaults to False.
            timeout (int): Timeout for optimization in seconds. Defaults to 60.

        Returns:
            Pipeline: An optimized version of the pipeline.
        """
        config = self._to_dict()
        runner = DSLRunner(
            config,
            base_name=os.path.join(os.getcwd(), self.name),
            yaml_file_suffix=self.name,
            max_threads=max_threads,
        )
        optimized_config, _ = runner.optimize(return_pipeline=False)

        updated_pipeline = Pipeline(
            name=self.name,
            datasets=self.datasets,
            operations=self.operations,
            steps=self.steps,
            output=self.output,
            default_model=self.default_model,
            parsing_tools=self.parsing_tools,
        )
        updated_pipeline._update_from_dict(optimized_config)
        return updated_pipeline

    def run(self, max_threads: Optional[int] = None) -> float:
        """
        Run the pipeline using the DSLRunner.

        Args:
            max_threads (Optional[int]): Maximum number of threads to use for execution.

        Returns:
            float: The total cost of running the pipeline.
        """
        config = self._to_dict()
        runner = DSLRunner(
            config,
            base_name=os.path.join(os.getcwd(), self.name),
            yaml_file_suffix=self.name,
            max_threads=max_threads,
        )
        result = runner.load_run_save()
        return result

    def to_yaml(self, path: str) -> None:
        """
        Convert the Pipeline object to a YAML string and save it to a file.

        Args:
            path (str): Path to save the YAML file.

        Returns:
            None
        """
        config = self._to_dict()
        with open(path, "w") as f:
            yaml.safe_dump(config, f)

        print(f"[green]Pipeline saved to {path}[/green]")

    def _to_dict(self) -> Dict[str, Any]:
        """
        Convert the Pipeline object to a dictionary representation.

        Returns:
            Dict[str, Any]: Dictionary representation of the Pipeline.
        """
        d = {
            "datasets": {
                name: dataset.dict() for name, dataset in self.datasets.items()
            },
            "operations": [
                {k: v for k, v in op.dict().items() if v is not None}
                for op in self.operations
            ],
            "pipeline": {
                "steps": [
                    {k: v for k, v in step.dict().items() if v is not None}
                    for step in self.steps
                ],
                "output": self.output.dict(),
            },
            "default_model": self.default_model,
            "parsing_tools": (
                [tool.dict() for tool in self.parsing_tools]
                if self.parsing_tools
                else None
            ),
        }
        if self.rate_limits:
            d["rate_limits"] = self.rate_limits
        return d

    def _update_from_dict(self, config: Dict[str, Any]):
        """
        Update the Pipeline object from a dictionary representation.

        Args:
            config (Dict[str, Any]): Dictionary representation of the Pipeline.
        """
        self.datasets = {
            name: Dataset(
                type=dataset["type"],
                source=dataset["source"],
                path=dataset["path"],
                parsing=dataset.get("parsing"),
            )
            for name, dataset in config["datasets"].items()
        }
        self.operations = []
        for op in config["operations"]:
            op_type = op.pop("type")
            if op_type == "map":
                self.operations.append(MapOp(**op, type=op_type))
            elif op_type == "resolve":
                self.operations.append(ResolveOp(**op, type=op_type))
            elif op_type == "reduce":
                self.operations.append(ReduceOp(**op, type=op_type))
            elif op_type == "parallel_map":
                self.operations.append(ParallelMapOp(**op, type=op_type))
            elif op_type == "filter":
                self.operations.append(FilterOp(**op, type=op_type))
            elif op_type == "equijoin":
                self.operations.append(EquijoinOp(**op, type=op_type))
            elif op_type == "split":
                self.operations.append(SplitOp(**op, type=op_type))
            elif op_type == "gather":
                self.operations.append(GatherOp(**op, type=op_type))
            elif op_type == "unnest":
                self.operations.append(UnnestOp(**op, type=op_type))
            elif op_type == "cluster":
                self.operations.append(ClusterOp(**op, type=op_type))
            elif op_type == "sample":
                self.operations.append(SampleOp(**op, type=op_type))
        self.steps = [PipelineStep(**step) for step in config["pipeline"]["steps"]]
        self.output = PipelineOutput(**config["pipeline"]["output"])
        self.default_model = config.get("default_model")
        self.parsing_tools = (
            [ParsingTool(**tool) for tool in config.get("parsing_tools", [])]
            if config.get("parsing_tools")
            else []
        )

optimize(max_threads=None, model='gpt-4o', resume=False, timeout=60)

Optimize the pipeline using the Optimizer.

Parameters:

Name Type Description Default
max_threads Optional[int]

Maximum number of threads to use for optimization.

None
model str

The model to use for optimization. Defaults to "gpt-4o".

'gpt-4o'
resume bool

Whether to resume optimization from a previous state. Defaults to False.

False
timeout int

Timeout for optimization in seconds. Defaults to 60.

60

Returns:

Name Type Description
Pipeline Pipeline

An optimized version of the pipeline.

Source code in docetl/api.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def optimize(
    self,
    max_threads: Optional[int] = None,
    model: str = "gpt-4o",
    resume: bool = False,
    timeout: int = 60,
) -> "Pipeline":
    """
    Optimize the pipeline using the Optimizer.

    Args:
        max_threads (Optional[int]): Maximum number of threads to use for optimization.
        model (str): The model to use for optimization. Defaults to "gpt-4o".
        resume (bool): Whether to resume optimization from a previous state. Defaults to False.
        timeout (int): Timeout for optimization in seconds. Defaults to 60.

    Returns:
        Pipeline: An optimized version of the pipeline.
    """
    config = self._to_dict()
    runner = DSLRunner(
        config,
        base_name=os.path.join(os.getcwd(), self.name),
        yaml_file_suffix=self.name,
        max_threads=max_threads,
    )
    optimized_config, _ = runner.optimize(return_pipeline=False)

    updated_pipeline = Pipeline(
        name=self.name,
        datasets=self.datasets,
        operations=self.operations,
        steps=self.steps,
        output=self.output,
        default_model=self.default_model,
        parsing_tools=self.parsing_tools,
    )
    updated_pipeline._update_from_dict(optimized_config)
    return updated_pipeline

run(max_threads=None)

Run the pipeline using the DSLRunner.

Parameters:

Name Type Description Default
max_threads Optional[int]

Maximum number of threads to use for execution.

None

Returns:

Name Type Description
float float

The total cost of running the pipeline.

Source code in docetl/api.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def run(self, max_threads: Optional[int] = None) -> float:
    """
    Run the pipeline using the DSLRunner.

    Args:
        max_threads (Optional[int]): Maximum number of threads to use for execution.

    Returns:
        float: The total cost of running the pipeline.
    """
    config = self._to_dict()
    runner = DSLRunner(
        config,
        base_name=os.path.join(os.getcwd(), self.name),
        yaml_file_suffix=self.name,
        max_threads=max_threads,
    )
    result = runner.load_run_save()
    return result

to_yaml(path)

Convert the Pipeline object to a YAML string and save it to a file.

Parameters:

Name Type Description Default
path str

Path to save the YAML file.

required

Returns:

Type Description
None

None

Source code in docetl/api.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
def to_yaml(self, path: str) -> None:
    """
    Convert the Pipeline object to a YAML string and save it to a file.

    Args:
        path (str): Path to save the YAML file.

    Returns:
        None
    """
    config = self._to_dict()
    with open(path, "w") as f:
        yaml.safe_dump(config, f)

    print(f"[green]Pipeline saved to {path}[/green]")