Skip to content

Python API

docetl.api.Dataset dataclass

Source code in docetl/api.py
58
59
60
61
@dataclass
class Dataset:
    type: str
    path: str

docetl.api.BaseOp dataclass

Source code in docetl/api.py
64
65
66
67
@dataclass
class BaseOp:
    name: str
    type: str

docetl.api.MapOp dataclass

Bases: BaseOp

Source code in docetl/api.py
70
71
72
73
74
75
76
77
78
79
80
81
82
@dataclass
class MapOp(BaseOp):
    output: Optional[Dict[str, Any]] = None
    prompt: Optional[str] = None
    model: Optional[str] = None
    optimize: Optional[bool] = None
    recursively_optimize: Optional[bool] = None
    sample_size: Optional[int] = None
    tools: Optional[List[Dict[str, Any]]] = None
    validate: Optional[List[str]] = None
    num_retries_on_validate_failure: Optional[int] = None
    gleaning: Optional[Dict[str, Any]] = None
    drop_keys: Optional[List[str]] = None

docetl.api.ResolveOp dataclass

Bases: BaseOp

Source code in docetl/api.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
@dataclass
class ResolveOp(BaseOp):
    comparison_prompt: str
    resolution_prompt: str
    output: Optional[Dict[str, Any]] = None
    embedding_model: Optional[str] = None
    resolution_model: Optional[str] = None
    comparison_model: Optional[str] = None
    blocking_keys: Optional[List[str]] = None
    blocking_threshold: Optional[float] = None
    blocking_conditions: Optional[List[str]] = None
    input: Optional[Dict[str, Any]] = None
    embedding_batch_size: Optional[int] = None
    compare_batch_size: Optional[int] = None
    limit_comparisons: Optional[int] = None
    optimize: Optional[bool] = None

docetl.api.ReduceOp dataclass

Bases: BaseOp

Source code in docetl/api.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
@dataclass
class ReduceOp(BaseOp):
    reduce_key: Union[str, List[str]]
    output: Optional[Dict[str, Any]] = None
    prompt: Optional[str] = None
    optimize: Optional[bool] = None
    synthesize_resolve: Optional[bool] = None
    model: Optional[str] = None
    input: Optional[Dict[str, Any]] = None
    pass_through: Optional[bool] = None
    associative: Optional[bool] = None
    fold_prompt: Optional[str] = None
    fold_batch_size: Optional[int] = None
    value_sampling: Optional[Dict[str, Any]] = None
    verbose: Optional[bool] = None

docetl.api.ParallelMapOp dataclass

Bases: BaseOp

Source code in docetl/api.py
120
121
122
123
124
125
126
127
128
@dataclass
class ParallelMapOp(BaseOp):
    prompts: List[Dict[str, Any]]
    output: Optional[Dict[str, Any]] = None
    model: Optional[str] = None
    optimize: Optional[bool] = None
    recursively_optimize: Optional[bool] = None
    sample_size: Optional[int] = None
    drop_keys: Optional[List[str]] = None

docetl.api.FilterOp dataclass

Bases: BaseOp

Source code in docetl/api.py
131
132
133
134
135
136
137
138
139
140
@dataclass
class FilterOp(BaseOp):
    output: Optional[Dict[str, Any]] = None
    prompt: Optional[str] = None
    model: Optional[str] = None
    optimize: Optional[bool] = None
    recursively_optimize: Optional[bool] = None
    sample_size: Optional[int] = None
    validate: Optional[List[str]] = None
    num_retries_on_validate_failure: Optional[int] = None

docetl.api.EquijoinOp dataclass

Bases: BaseOp

Source code in docetl/api.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
@dataclass
class EquijoinOp(BaseOp):
    left: str
    right: str
    comparison_prompt: str
    output: Optional[Dict[str, Any]] = None
    blocking_threshold: Optional[float] = None
    blocking_conditions: Optional[Dict[str, List[str]]] = None
    limits: Optional[Dict[str, int]] = None
    comparison_model: Optional[str] = None
    optimize: Optional[bool] = None
    embedding_model: Optional[str] = None
    embedding_batch_size: Optional[int] = None
    compare_batch_size: Optional[int] = None
    limit_comparisons: Optional[int] = None
    blocking_keys: Optional[Dict[str, List[str]]] = None

docetl.api.SplitOp dataclass

Bases: BaseOp

Source code in docetl/api.py
161
162
163
164
165
166
@dataclass
class SplitOp(BaseOp):
    split_key: str
    method: str
    method_kwargs: Dict[str, Any]
    model: Optional[str] = None

docetl.api.GatherOp dataclass

Bases: BaseOp

Source code in docetl/api.py
169
170
171
172
173
174
175
@dataclass
class GatherOp(BaseOp):
    content_key: str
    doc_id_key: str
    order_key: str
    peripheral_chunks: Dict[str, Any]
    doc_header_key: Optional[str] = None

docetl.api.UnnestOp dataclass

Bases: BaseOp

Source code in docetl/api.py
178
179
180
181
182
183
184
@dataclass
class UnnestOp(BaseOp):
    unnest_key: str
    keep_empty: Optional[bool] = None
    expand_fields: Optional[List[str]] = None
    recursive: Optional[bool] = None
    depth: Optional[int] = None

docetl.api.PipelineStep dataclass

Source code in docetl/api.py
200
201
202
203
204
@dataclass
class PipelineStep:
    name: str
    operations: List[Union[Dict[str, Any], str]]
    input: Optional[str] = None

docetl.api.PipelineOutput dataclass

Source code in docetl/api.py
207
208
209
210
211
@dataclass
class PipelineOutput:
    type: str
    path: str
    intermediate_dir: Optional[str] = None

docetl.api.Pipeline dataclass

Source code in docetl/api.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
@dataclass
class Pipeline:
    name: str
    datasets: Dict[str, Dataset]
    operations: List[OpType]
    steps: List[PipelineStep]
    output: PipelineOutput
    default_model: Optional[str] = None

    def optimize(
        self,
        max_threads: Optional[int] = None,
        model: str = "gpt-4o",
        resume: bool = False,
        timeout: int = 60,
    ) -> "Pipeline":
        """
        Optimize the pipeline using the Optimizer.

        Args:
            max_threads (Optional[int]): Maximum number of threads to use for optimization.
            model (str): The model to use for optimization. Defaults to "gpt-4o".
            resume (bool): Whether to resume optimization from a previous state. Defaults to False.
            timeout (int): Timeout for optimization in seconds. Defaults to 60.

        Returns:
            Pipeline: An optimized version of the pipeline.
        """
        config = self._to_dict()
        optimizer = Optimizer(
            config,
            base_name=os.path.join(os.getcwd(), self.name),
            yaml_file_suffix=self.name,
            max_threads=max_threads,
            model=model,
            timeout=timeout,
            resume=resume,
        )
        optimizer.optimize()
        optimized_config = optimizer.clean_optimized_config()

        updated_pipeline = Pipeline(
            name=self.name,
            datasets=self.datasets,
            operations=self.operations,
            steps=self.steps,
            output=self.output,
            default_model=self.default_model,
        )
        updated_pipeline._update_from_dict(optimized_config)
        return updated_pipeline

    def run(self, max_threads: Optional[int] = None) -> float:
        """
        Run the pipeline using the DSLRunner.

        Args:
            max_threads (Optional[int]): Maximum number of threads to use for execution.

        Returns:
            float: The total cost of running the pipeline.
        """
        config = self._to_dict()
        runner = DSLRunner(config, max_threads=max_threads)
        result = runner.run()
        return result

    def to_yaml(self, path: str) -> None:
        """
        Convert the Pipeline object to a YAML string and save it to a file.

        Args:
            path (str): Path to save the YAML file.

        Returns:
            None
        """
        config = self._to_dict()
        with open(path, "w") as f:
            yaml.safe_dump(config, f)

        print(f"[green]Pipeline saved to {path}[/green]")

    def _to_dict(self) -> Dict[str, Any]:
        """
        Convert the Pipeline object to a dictionary representation.

        Returns:
            Dict[str, Any]: Dictionary representation of the Pipeline.
        """
        return {
            "datasets": {
                name: dataset.__dict__ for name, dataset in self.datasets.items()
            },
            "operations": [
                {k: v for k, v in op.__dict__.items() if v is not None}
                for op in self.operations
            ],
            "pipeline": {
                "steps": [
                    {k: v for k, v in step.__dict__.items() if v is not None}
                    for step in self.steps
                ],
                "output": self.output.__dict__,
            },
            "default_model": self.default_model,
        }

    def _update_from_dict(self, config: Dict[str, Any]):
        """
        Update the Pipeline object from a dictionary representation.

        Args:
            config (Dict[str, Any]): Dictionary representation of the Pipeline.
        """
        self.datasets = {
            name: Dataset(**dataset) for name, dataset in config["datasets"].items()
        }
        self.operations = []
        for op in config["operations"]:
            op_type = op.pop("type")
            if op_type == "map":
                self.operations.append(MapOp(**op, type=op_type))
            elif op_type == "resolve":
                self.operations.append(ResolveOp(**op, type=op_type))
            elif op_type == "reduce":
                self.operations.append(ReduceOp(**op, type=op_type))
            elif op_type == "parallel_map":
                self.operations.append(ParallelMapOp(**op, type=op_type))
            elif op_type == "filter":
                self.operations.append(FilterOp(**op, type=op_type))
            elif op_type == "equijoin":
                self.operations.append(EquijoinOp(**op, type=op_type))
            elif op_type == "split":
                self.operations.append(SplitOp(**op, type=op_type))
            elif op_type == "gather":
                self.operations.append(GatherOp(**op, type=op_type))
            elif op_type == "unnest":
                self.operations.append(UnnestOp(**op, type=op_type))
        self.steps = [PipelineStep(**step) for step in config["pipeline"]["steps"]]
        self.output = PipelineOutput(**config["pipeline"]["output"])
        self.default_model = config.get("default_model")

optimize(max_threads=None, model='gpt-4o', resume=False, timeout=60)

Optimize the pipeline using the Optimizer.

Parameters:

Name Type Description Default
max_threads Optional[int]

Maximum number of threads to use for optimization.

None
model str

The model to use for optimization. Defaults to "gpt-4o".

'gpt-4o'
resume bool

Whether to resume optimization from a previous state. Defaults to False.

False
timeout int

Timeout for optimization in seconds. Defaults to 60.

60

Returns:

Name Type Description
Pipeline Pipeline

An optimized version of the pipeline.

Source code in docetl/api.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def optimize(
    self,
    max_threads: Optional[int] = None,
    model: str = "gpt-4o",
    resume: bool = False,
    timeout: int = 60,
) -> "Pipeline":
    """
    Optimize the pipeline using the Optimizer.

    Args:
        max_threads (Optional[int]): Maximum number of threads to use for optimization.
        model (str): The model to use for optimization. Defaults to "gpt-4o".
        resume (bool): Whether to resume optimization from a previous state. Defaults to False.
        timeout (int): Timeout for optimization in seconds. Defaults to 60.

    Returns:
        Pipeline: An optimized version of the pipeline.
    """
    config = self._to_dict()
    optimizer = Optimizer(
        config,
        base_name=os.path.join(os.getcwd(), self.name),
        yaml_file_suffix=self.name,
        max_threads=max_threads,
        model=model,
        timeout=timeout,
        resume=resume,
    )
    optimizer.optimize()
    optimized_config = optimizer.clean_optimized_config()

    updated_pipeline = Pipeline(
        name=self.name,
        datasets=self.datasets,
        operations=self.operations,
        steps=self.steps,
        output=self.output,
        default_model=self.default_model,
    )
    updated_pipeline._update_from_dict(optimized_config)
    return updated_pipeline

run(max_threads=None)

Run the pipeline using the DSLRunner.

Parameters:

Name Type Description Default
max_threads Optional[int]

Maximum number of threads to use for execution.

None

Returns:

Name Type Description
float float

The total cost of running the pipeline.

Source code in docetl/api.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def run(self, max_threads: Optional[int] = None) -> float:
    """
    Run the pipeline using the DSLRunner.

    Args:
        max_threads (Optional[int]): Maximum number of threads to use for execution.

    Returns:
        float: The total cost of running the pipeline.
    """
    config = self._to_dict()
    runner = DSLRunner(config, max_threads=max_threads)
    result = runner.run()
    return result

to_yaml(path)

Convert the Pipeline object to a YAML string and save it to a file.

Parameters:

Name Type Description Default
path str

Path to save the YAML file.

required

Returns:

Type Description
None

None

Source code in docetl/api.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
def to_yaml(self, path: str) -> None:
    """
    Convert the Pipeline object to a YAML string and save it to a file.

    Args:
        path (str): Path to save the YAML file.

    Returns:
        None
    """
    config = self._to_dict()
    with open(path, "w") as f:
        yaml.safe_dump(config, f)

    print(f"[green]Pipeline saved to {path}[/green]")