Skip to content

Commit

Permalink
ML-1365: performance imporovements for reading parquet with start/end… (
Browse files Browse the repository at this point in the history
#315)

* ML-1365: performance imporovements for reading parquet with start/end-filter

* minor fix

* fix

* docs

* pr comment
  • Loading branch information
katyakats authored Nov 24, 2021
1 parent 1fae6bc commit f852c63
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 12 deletions.
3 changes: 0 additions & 3 deletions storey/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,6 @@ def should_aggregate(self, element):
return self.aggr_filter(element)


legal_time_units = ['year', 'month', 'day', 'hour', 'minute', 'second']


class FixedWindowType(Enum):
CurrentOpenWindow = 1
LastClosedWindow = 2
13 changes: 4 additions & 9 deletions storey/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@
import warnings
from datetime import datetime
from typing import List, Optional, Union, Callable, Coroutine, Iterable
import pyarrow.parquet as pq

import pandas
import pytz

from .dtypes import _termination_obj, Event, legal_time_units
from .dtypes import _termination_obj, Event
from .flow import Flow, Complete
from .utils import url_to_file_system, find_filters
from .utils import url_to_file_system, find_filters, find_partitions


class AwaitableResult:
Expand Down Expand Up @@ -821,12 +820,8 @@ def __init__(self, paths: Union[str, Iterable[str]], columns=None, start_filter:

def _read_filtered_parquet(self, path):
fs, file_path = url_to_file_system(path, self._storage_options)
dataset = pq.ParquetDataset(path, filesystem=fs)
if dataset.partitions:
partitions = dataset.partitions.partition_names
partitions_time_attributes = [j for j in legal_time_units if j in partitions]
else:
partitions_time_attributes = []

partitions_time_attributes = find_partitions(path, fs)
filters = []
find_filters(partitions_time_attributes, self._start_filter, self._end_filter, filters, self._filter_column)
return pandas.read_parquet(path, columns=self._columns, filters=filters,
Expand Down
38 changes: 38 additions & 0 deletions storey/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import base64
import hashlib
import os
import struct
from array import array
from urllib.parse import urlparse
Expand Down Expand Up @@ -227,6 +228,43 @@ def _get_filters_for_filter_column(start, end, filter_column, side_range):
side_range.append(upper_limit_tuple)


def find_partitions(url, fs):
# ML-1365. assuming the partitioning is symmetrical (for example both year=2020 and year=2021 directories will have
# inner month partitions).

partitions = []

def _is_private(path):
_, tail = os.path.split(path)
return (tail.startswith('_') or tail.startswith('.')) and '=' not in tail

def find_partition_helper(url, fs, partitions):
content = fs.ls(url)
if len(content) == 0:
return partitions
# https://issues.apache.org/jira/browse/ARROW-1079 there could be some private dirs
filtered_dirs = [x for x in content if not _is_private(x["name"])]
if len(filtered_dirs) == 0:
return partitions

inner_dir = filtered_dirs[0]["name"]
if fs.isfile(inner_dir):
return partitions
part = inner_dir.split("/")[-1].split("=")
partitions.append(part[0])
find_partition_helper(inner_dir, fs, partitions)

if fs.isfile(url):
return partitions
find_partition_helper(url, fs, partitions)

legal_time_units = ['year', 'month', 'day', 'hour', 'minute', 'second']

partitions_time_attributes = [j for j in legal_time_units if j in partitions]

return partitions_time_attributes


def find_filters(partitions_time_attributes, start, end, filters, filter_column):
# this method build filters to be used by
# https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html
Expand Down

0 comments on commit f852c63

Please sign in to comment.