Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add pandas df accessor #287

Merged
merged 4 commits into from
Jan 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docetl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

from docetl.runner import DSLRunner
from docetl.optimizer import Optimizer
from docetl.apis.pd_accessors import SemanticAccessor

__all__ = ["DSLRunner", "Optimizer"]
__all__ = ["DSLRunner", "Optimizer", "SemanticAccessor"]
Empty file added docetl/apis/__init__.py
Empty file.
633 changes: 633 additions & 0 deletions docetl/apis/pd_accessors.py

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions docetl/config_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def __init__(
yaml_file_suffix: Optional[str] = None,
max_threads: int = None,
console: Optional[Console] = None,
**kwargs,
):
self.config = config
self.base_name = base_name
Expand Down
4 changes: 4 additions & 0 deletions docetl/operations/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,8 @@ def execute(
if not is_build:
results = [result for result in results if result[filter_key]]

# Drop the filter_key from the results
for result in results:
result.pop(filter_key, None)

return results, total_cost
61 changes: 40 additions & 21 deletions docetl/operations/resolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class ResolveOperation(BaseOperation):
class schema(BaseOperation.schema):
type: str = "resolve"
comparison_prompt: str
resolution_prompt: str
resolution_prompt: Optional[str] = None
output: Optional[Dict[str, Any]] = None
embedding_model: Optional[str] = None
resolution_model: Optional[str] = None
Expand Down Expand Up @@ -119,14 +119,16 @@ def syntax_check(self) -> None:
f"Missing required key '{key}' in ResolveOperation configuration"
)

if "schema" not in self.config["output"]:
if "schema" not in self.config["output"] and not self.runner._from_df_accessors:
raise ValueError("Missing 'schema' in 'output' configuration")
elif not self.runner._from_df_accessors:
if not isinstance(self.config["output"]["schema"], dict):
raise TypeError(
"'schema' in 'output' configuration must be a dictionary"
)

if not isinstance(self.config["output"]["schema"], dict):
raise TypeError("'schema' in 'output' configuration must be a dictionary")

if not self.config["output"]["schema"]:
raise ValueError("'schema' in 'output' configuration cannot be empty")
if not self.config["output"]["schema"]:
raise ValueError("'schema' in 'output' configuration cannot be empty")

# Check if the comparison_prompt is a valid Jinja2 template
try:
Expand All @@ -140,7 +142,7 @@ def syntax_check(self) -> None:
or "input2" not in comparison_var_names
):
raise ValueError(
"'comparison_prompt' must contain both 'input1' and 'input2' variables"
f"'comparison_prompt' must contain both 'input1' and 'input2' variables. {self.config['comparison_prompt']}"
)

if "resolution_prompt" in self.config:
Expand Down Expand Up @@ -674,19 +676,36 @@ def process_cluster(cluster):
f"Number of distinct keys after resolution: {num_clusters_after}"
)

with ThreadPoolExecutor(max_workers=self.max_threads) as executor:
futures = [
executor.submit(process_cluster, cluster) for cluster in final_clusters
]
for future in rich_as_completed(
futures,
total=len(futures),
desc="Determining resolved key for each group of equivalent keys",
console=self.console,
):
cluster_results, cluster_cost = future.result()
results.extend(cluster_results)
total_cost += cluster_cost
# If no resolution prompt is provided, we can skip the resolution phase
# And simply select the most common value for each key
if not self.config.get("resolution_prompt", None):
for cluster in final_clusters:
if len(cluster) > 1:
for key in self.config["output"]["keys"]:
most_common_value = max(
set(input_data[i][key] for i in cluster),
key=lambda x: sum(
1 for i in cluster if input_data[i][key] == x
),
)
for i in cluster:
input_data[i][key] = most_common_value
results = input_data
else:
with ThreadPoolExecutor(max_workers=self.max_threads) as executor:
futures = [
executor.submit(process_cluster, cluster)
for cluster in final_clusters
]
for future in rich_as_completed(
futures,
total=len(futures),
desc="Determining resolved key for each group of equivalent keys",
console=self.console,
):
cluster_results, cluster_cost = future.result()
results.extend(cluster_results)
total_cost += cluster_cost

total_pairs = len(input_data) * (len(input_data) - 1) // 2
true_match_count = sum(
Expand Down
3 changes: 2 additions & 1 deletion docetl/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ def __init__(
if self.config.get("optimizer_config", {}).get("sample_sizes", {}):
self.sample_size_map.update(self.config["optimizer_config"]["sample_sizes"])

self.print_optimizer_config()
if not self.runner._from_df_accessors:
self.print_optimizer_config()

def print_optimizer_config(self):
"""
Expand Down
83 changes: 52 additions & 31 deletions docetl/optimizers/join_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,11 @@ def optimize_resolve(
return optimized_config, embedding_cost + comparison_cost

def optimize_equijoin(
self, left_data: List[Dict[str, Any]], right_data: List[Dict[str, Any]]
self,
left_data: List[Dict[str, Any]],
right_data: List[Dict[str, Any]],
skip_map_gen: bool = False,
skip_containment_gen: bool = False,
) -> Tuple[Dict[str, Any], float, Dict[str, Any]]:
left_keys = self.op_config.get("blocking_keys", {}).get("left", [])
right_keys = self.op_config.get("blocking_keys", {}).get("right", [])
Expand All @@ -554,12 +558,16 @@ def optimize_equijoin(
# Ask the LLM agent if it would be beneficial to do a map operation on
# one of the datasets before doing an equijoin
apply_transformation, dataset_to_transform, reason = (
self._should_apply_map_transformation(
left_keys, right_keys, left_data, right_data
(
self._should_apply_map_transformation(
left_keys, right_keys, left_data, right_data
)
)
if not skip_map_gen
else (False, None, None)
)

if apply_transformation:
if apply_transformation and not skip_map_gen:
self.console.log(
f"LLM agent suggested applying a map transformation to {dataset_to_transform} dataset because: {reason}"
)
Expand Down Expand Up @@ -717,21 +725,25 @@ def optimize_equijoin(
containment_rules = self._generate_containment_rules_equijoin(
left_data, right_data
)
self.console.log(
f"[bold]Generated {len(containment_rules)} containment rules. Please select which ones to use as blocking conditions:[/bold]"
)
selected_containment_rules = []
for rule in containment_rules:
self.console.log(f"[green]{rule}[/green]")
# Temporarily stop the status
if self.status:
self.status.stop()
# Use Rich's Confirm for input
if Confirm.ask("Use this rule?", console=self.console):
selected_containment_rules.append(rule)
# Restart the status
if self.status:
self.status.start()
if not skip_containment_gen:
self.console.log(
f"[bold]Generated {len(containment_rules)} containment rules. Please select which ones to use as blocking conditions:[/bold]"
)
selected_containment_rules = []
for rule in containment_rules:
self.console.log(f"[green]{rule}[/green]")
# Temporarily stop the status
if self.status:
self.status.stop()
# Use Rich's Confirm for input
if Confirm.ask("Use this rule?", console=self.console):
selected_containment_rules.append(rule)
# Restart the status
if self.status:
self.status.start()
else:
# Take first 2
selected_containment_rules = containment_rules[:2]

if len(containment_rules) > 0:
self.console.log(
Expand Down Expand Up @@ -1416,18 +1428,27 @@ def _generate_containment_rules_equijoin(
right_keys = set(right_data[0].keys())

# Find the keys that are in the config's prompt
left_prompt_keys = set(
self.op_config.get("comparison_prompt", "")
.split("{{ left.")[1]
.split(" }}")[0]
.split(".")
)
right_prompt_keys = set(
self.op_config.get("comparison_prompt", "")
.split("{{ right.")[1]
.split(" }}")[0]
.split(".")
)
try:
left_prompt_keys = set(
self.op_config.get("comparison_prompt", "")
.split("{{ left.")[1]
.split(" }}")[0]
.split(".")
)
except Exception as e:
self.console.log(f"[red]Error parsing comparison prompt: {e}[/red]")
left_prompt_keys = left_keys

try:
right_prompt_keys = set(
self.op_config.get("comparison_prompt", "")
.split("{{ right.")[1]
.split(" }}")[0]
.split(".")
)
except Exception as e:
self.console.log(f"[red]Error parsing comparison prompt: {e}[/red]")
right_prompt_keys = right_keys

# Sample a few records from each dataset
sample_left = random.sample(left_data, min(3, len(left_data)))
Expand Down
4 changes: 3 additions & 1 deletion docetl/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ def __init__(self, config: Dict, max_threads: int = None, **kwargs):
self._compute_operation_hashes()

# Run initial validation
self.syntax_check()
self._from_df_accessors = kwargs.get("from_df_accessors", False)
if not self._from_df_accessors:
self.syntax_check()

def _initialize_state(self) -> None:
"""Initialize basic runner state and datasets"""
Expand Down
11 changes: 11 additions & 0 deletions docs/concepts/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ This approach allows DocETL to dynamically load and process various file types,

Currently, DocETL only supports JSON files or CSV files as input datasets. If you're interested in support for other data types or cloud-based datasets, please reach out to us or join our open-source community and contribute! We welcome new ideas and contributions to expand the capabilities of DocETL.

### Dataset Description and Persona

You can define a description of your dataset and persona you want the LLM to adopt when executing operations on your dataset. This is useful for providing context to the LLM and for optimizing the operations.

```yaml
system_prompt: # This is optional, but recommended for better performance. It is applied to all operations in the pipeline.
dataset_description: a collection of transcripts of doctor visits
persona: a medical practitioner analyzing patient symptoms and reactions to medications
```

### Operators

Operators are the building blocks of your pipeline, defining the transformations and analyses to be performed on your data. They are detailed in the [Operators](../concepts/operators.md) documentation. Operators can include map, reduce, filter, and other types of operations.
Expand All @@ -85,6 +95,7 @@ pipeline:
output:
type: file
path: "country_summaries.json"
intermediate_dir: "intermediate_data" # Optional: If you want to store intermediate outputs in a directory
```

For a practical example of how these components come together, refer to the [Tutorial](../tutorial.md), which demonstrates a complete pipeline for analyzing user behavior data.
Loading
Loading