Skip to content

Commit

Permalink
Update pydeephaven for aj/raj (#3914)
Browse files Browse the repository at this point in the history
  • Loading branch information
devinrsmith authored Jun 2, 2023
1 parent a0dca3d commit a4676d8
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 69 deletions.
4 changes: 2 additions & 2 deletions py/client/pydeephaven/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@

from .session import Session
from .dherror import DHError
from ._table_interface import SortDirection, MatchRule
from ._table_interface import SortDirection
from .query import Query

__all__ = ["Session", "DHError", "SortDirection", "MatchRule"]
__all__ = ["Session", "DHError", "SortDirection"]
__version__ = "0.25.0"
32 changes: 15 additions & 17 deletions py/client/pydeephaven/_table_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
from pydeephaven import agg
from pydeephaven._table_ops import UpdateOp, LazyUpdateOp, ViewOp, UpdateViewOp, SelectOp, DropColumnsOp, \
SelectDistinctOp, SortOp, UnstructuredFilterOp, HeadOp, TailOp, HeadByOp, TailByOp, UngroupOp, NaturalJoinOp, \
ExactJoinOp, CrossJoinOp, AsOfJoinOp, UpdateByOp, SnapshotTableOp, SnapshotWhenTableOp, WhereInTableOp, \
AggregateAllOp, AggregateOp, MatchRule, SortDirection
ExactJoinOp, CrossJoinOp, AjOp, RajOp, UpdateByOp, SnapshotTableOp, SnapshotWhenTableOp, WhereInTableOp, \
AggregateAllOp, AggregateOp, SortDirection
from pydeephaven._utils import to_list
from pydeephaven.agg import Aggregation, _AggregationColumns
from pydeephaven.dherror import DHError
Expand Down Expand Up @@ -278,8 +278,7 @@ def join(self, table: Table, on: Union[str, List[str]] = None, joins: Union[str,
table_op = CrossJoinOp(table=table, keys=to_list(on), columns_to_add=to_list(joins), reserve_bits=reserve_bits)
return self.table_op_handler(table_op)

def aj(self, table: Table, on: Union[str, List[str]], joins: Union[str, List[str]] = None,
match_rule: MatchRule = MatchRule.LESS_THAN_EQUAL) -> Union[Table, Query]:
def aj(self, table: Table, on: Union[str, List[str]], joins: Union[str, List[str]] = None) -> Union[Table, Query]:
"""The aj (as-of join) method creates a new table containing all the rows and columns of the left table,
plus additional columns containing data from the right table. For columns appended to the left table (joins),
row values equal the row values from the right table where the keys from the left table most closely match
Expand All @@ -288,25 +287,24 @@ def aj(self, table: Table, on: Union[str, List[str]], joins: Union[str, List[str
Args:
table (Table): the right-table of the join
on (Union[str, List[str]]): the column(s) to match, can be a common name or an equal expression,
i.e. "col_a = col_b" for different column names
on (Union[str, List[str]]): the column(s) to match, can be a common name or a match condition of two
columns, e.g. 'col_a = col_b'. The first 'N-1' matches are exact matches. The final match is an inexact
match. The inexact match can use either '>' or '>='. If a common name is used for the inexact match,
'>=' is used for the comparison.
joins (Union[str, List[str]], optional): the column(s) to be added from the right table to the result
table, can be renaming expressions, i.e. "new_col = col"; default is None,, which means all the columns
table, can be renaming expressions, i.e. "new_col = col"; default is None, which means all the columns
from the right table, excluding those specified in 'on'
match_rule (MatchRule, optional): the match rule for the as-of join, default is LESS_THAN_EQUAL
Returns:
a Table or Query object
Raises:
DHError
"""
match_rule = MatchRule.LESS_THAN if match_rule == MatchRule.LESS_THAN else MatchRule.LESS_THAN_EQUAL
table_op = AsOfJoinOp(table=table, keys=to_list(on), columns_to_add=to_list(joins), match_rule=match_rule)
table_op = AjOp(table=table, keys=to_list(on), columns_to_add=to_list(joins))
return self.table_op_handler(table_op)

def raj(self, table: Table, on: Union[str, List[str]], joins: Union[str, List[str]] = None,
match_rule: MatchRule = MatchRule.GREATER_THAN_EQUAL) -> Union[Table, Query]:
def raj(self, table: Table, on: Union[str, List[str]], joins: Union[str, List[str]] = None) -> Union[Table, Query]:
"""The raj (reverse as-of join) method creates a new table containing all the rows and columns of the left
table, plus additional columns containing data from the right table. For columns appended to the left table (
joins), row values equal the row values from the right table where the keys from the left table most closely
Expand All @@ -315,21 +313,21 @@ def raj(self, table: Table, on: Union[str, List[str]], joins: Union[str, List[st
Args:
table (Table): the right-table of the join
on (Union[str, List[str]]): the column(s) to match, can be a common name or an equal expression,
i.e. "col_a = col_b" for different column names
on (Union[str, List[str]]): the column(s) to match, can be a common name or a match condition of two
columns, e.g. 'col_a = col_b'. The first 'N-1' matches are exact matches. The final match is an inexact
match. The inexact match can use either '<' or '<='. If a common name is used for the inexact match,
'<=' is used for the comparison.
joins (Union[str, List[str]], optional): the column(s) to be added from the right table to the result
table, can be renaming expressions, i.e. "new_col = col"; default is None, which means all the columns
from the right table, excluding those specified in 'on'
match_rule (MatchRule, optional): the match rule for the as-of join, default is GREATER_THAN_EQUAL
Returns:
a Table or Query object
Raises:
DHError
"""
match_rule = MatchRule.GREATER_THAN if match_rule == MatchRule.GREATER_THAN else MatchRule.GREATER_THAN_EQUAL
table_op = AsOfJoinOp(table=table, keys=to_list(on), columns_to_add=to_list(joins), match_rule=match_rule)
table_op = RajOp(table=table, keys=to_list(on), columns_to_add=to_list(joins))
return self.table_op_handler(table_op)

def head_by(self, num_rows: int, by: Union[str, List[str]]) -> Union[Table, Query]:
Expand Down
56 changes: 31 additions & 25 deletions py/client/pydeephaven/_table_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,6 @@
from pydeephaven.updateby import UpdateByOperation


class MatchRule(Enum):
"""An enum defining the match rules for the as-of and reverse-as-of joins."""

""""""
LESS_THAN_EQUAL = table_pb2.AsOfJoinTablesRequest.MatchRule.LESS_THAN_EQUAL
""""""
LESS_THAN = table_pb2.AsOfJoinTablesRequest.MatchRule.LESS_THAN
""""""
GREATER_THAN_EQUAL = table_pb2.AsOfJoinTablesRequest.MatchRule.GREATER_THAN_EQUAL
""""""
GREATER_THAN = table_pb2.AsOfJoinTablesRequest.MatchRule.GREATER_THAN
""""""


class SortDirection(Enum):
"""An enum defining the sort ordering."""

Expand Down Expand Up @@ -432,33 +418,53 @@ def make_grpc_request_for_batch(self, result_id, source_id) -> Any:
return table_pb2.BatchTableRequest.Operation(
cross_join=self.make_grpc_request(result_id=result_id, source_id=source_id))


class AsOfJoinOp(TableOp):
def __init__(self, table: Any, keys: List[str] = [], columns_to_add: List[str] = [],
match_rule: MatchRule = MatchRule.LESS_THAN_EQUAL):
class AjOp(TableOp):
def __init__(self, table: Any, keys: List[str] = [], columns_to_add: List[str] = []):
self.table = table
self.keys = keys
self.columns_to_add = columns_to_add
self.match_rule = match_rule

@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.AsOfJoinTables
return table_service_stub.AjTables

def make_grpc_request(self, result_id, source_id) -> Any:
left_id = source_id
right_id = table_pb2.TableReference(ticket=self.table.ticket)
return table_pb2.AsOfJoinTablesRequest(result_id=result_id,
return table_pb2.AjRajTablesRequest(result_id=result_id,
left_id=left_id,
right_id=right_id,
columns_to_match=self.keys,
columns_to_add=self.columns_to_add,
as_of_match_rule=self.match_rule.value)
exact_match_columns=self.keys[:-1],
as_of_column=self.keys[-1],
columns_to_add=self.columns_to_add)

def make_grpc_request_for_batch(self, result_id, source_id) -> Any:
return table_pb2.BatchTableRequest.Operation(
as_of_join=self.make_grpc_request(result_id=result_id, source_id=source_id))
aj=self.make_grpc_request(result_id=result_id, source_id=source_id))

class RajOp(TableOp):
def __init__(self, table: Any, keys: List[str] = [], columns_to_add: List[str] = []):
self.table = table
self.keys = keys
self.columns_to_add = columns_to_add

@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.RajTables

def make_grpc_request(self, result_id, source_id) -> Any:
left_id = source_id
right_id = table_pb2.TableReference(ticket=self.table.ticket)
return table_pb2.AjRajTablesRequest(result_id=result_id,
left_id=left_id,
right_id=right_id,
exact_match_columns=self.keys[:-1],
as_of_column=self.keys[-1],
columns_to_add=self.columns_to_add)

def make_grpc_request_for_batch(self, result_id, source_id) -> Any:
return table_pb2.BatchTableRequest.Operation(
raj=self.make_grpc_request(result_id=result_id, source_id=source_id))

class FlattenOp(TableOp):
@classmethod
Expand Down
22 changes: 11 additions & 11 deletions py/client/pydeephaven/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,34 +217,34 @@ def join(self, table: Any, on: Union[str, List[str]] = None, joins: Union[str, L
"""
return super().join(table, on, joins)

def aj(self, table: Any, on: Union[str, List[str]], joins: Union[str, List[str]] = None,
match_rule: MatchRule = MatchRule.LESS_THAN_EQUAL) -> Query:
def aj(self, table: Any, on: Union[str, List[str]], joins: Union[str, List[str]] = None) -> Query:
"""Adds a as-of join operation to the query.
Args:
table (Table): the right-table of the join
on (Union[str, List[str]]): the column(s) to match, can be a common name or an equal expression,
i.e. "col_a = col_b" for different column names
on (Union[str, List[str]]): the column(s) to match, can be a common name or a match condition of two
columns, e.g. 'col_a = col_b'. The first 'N-1' matches are exact matches. The final match is an inexact
match. The inexact match can use either '>' or '>='. If a common name is used for the inexact match,
'>=' is used for the comparison.
joins (Union[str, List[str]], optional): the column(s) to be added from the right table to the result
table, can be renaming expressions, i.e. "new_col = col"; default is None
match_rule (MatchRule, optional): the match rule for the as-of join, default is LESS_THAN_EQUAL
Returns:
self
"""
return super().aj(table, on, joins, match_rule)
return super().aj(table, on, joins)

def raj(self, table: Any, on: Union[str, List[str]], joins: Union[str, List[str]] = None,
match_rule: MatchRule = MatchRule.GREATER_THAN_EQUAL) -> Query:
def raj(self, table: Any, on: Union[str, List[str]], joins: Union[str, List[str]] = None) -> Query:
"""Adds a reverse as-of join operation to the query.
Args:
table (Table): the right-table of the join
on (Union[str, List[str]]): the column(s) to match, can be a common name or an equal expression,
i.e. "col_a = col_b" for different column names
on (Union[str, List[str]]): the column(s) to match, can be a common name or a match condition of two
columns, e.g. 'col_a = col_b'. The first 'N-1' matches are exact matches. The final match is an inexact
match. The inexact match can use either '<' or '<='. If a common name is used for the inexact match,
'<=' is used for the comparison.
joins (Union[str, List[str]], optional): the column(s) to be added from the right table to the result
table, can be renaming expressions, i.e. "new_col = col"; default is None
match_rule (MatchRule, optional): the match rule for the as-of join, default is GREATER_THAN_EQUAL
Returns:
self
Expand Down
28 changes: 14 additions & 14 deletions py/client/pydeephaven/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import pyarrow as pa

from pydeephaven._table_ops import MetaTableOp, SortDirection, MatchRule
from pydeephaven._table_ops import MetaTableOp, SortDirection
from pydeephaven.agg import Aggregation
from pydeephaven.dherror import DHError
from pydeephaven._table_interface import TableInterface
Expand Down Expand Up @@ -330,8 +330,7 @@ def join(self, table: Table, on: Union[str, List[str]] = None, joins: Union[str,
"""
return super(Table, self).join(table, on, joins)

def aj(self, table: Table, on: Union[str, List[str]], joins: Union[str, List[str]] = None,
match_rule: MatchRule = MatchRule.LESS_THAN_EQUAL) -> Table:
def aj(self, table: Table, on: Union[str, List[str]], joins: Union[str, List[str]] = None) -> Table:
"""The aj (as-of join) method creates a new table containing all the rows and columns of the left table,
plus additional columns containing data from the right table. For columns appended to the left table (joins),
row values equal the row values from the right table where the keys from the left table most closely match
Expand All @@ -340,23 +339,23 @@ def aj(self, table: Table, on: Union[str, List[str]], joins: Union[str, List[str
Args:
table (Table): the right-table of the join
on (Union[str, List[str]]): the column(s) to match, can be a common name or an equal expression,
i.e. "col_a = col_b" for different column names
on (Union[str, List[str]]): the column(s) to match, can be a common name or a match condition of two
columns, e.g. 'col_a = col_b'. The first 'N-1' matches are exact matches. The final match is an inexact
match. The inexact match can use either '>' or '>='. If a common name is used for the inexact match,
'>=' is used for the comparison.
joins (Union[str, List[str]], optional): the column(s) to be added from the right table to the result
table, can be renaming expressions, i.e. "new_col = col"; default is None,, which means all the columns
table, can be renaming expressions, i.e. "new_col = col"; default is None, which means all the columns
from the right table, excluding those specified in 'on'
match_rule (MatchRule, optional): the match rule for the as-of join, default is LESS_THAN_EQUAL
Returns:
a Table object
Raises:
DHError
"""
return super(Table, self).aj(table, on, joins, match_rule)
return super(Table, self).aj(table, on, joins)

def raj(self, table: Table, on: Union[str, List[str]], joins: Union[str, List[str]] = None,
match_rule: MatchRule = MatchRule.GREATER_THAN_EQUAL) -> Table:
def raj(self, table: Table, on: Union[str, List[str]], joins: Union[str, List[str]] = None) -> Table:
"""The raj (reverse as-of join) method creates a new table containing all the rows and columns of the left
table, plus additional columns containing data from the right table. For columns appended to the left table (
joins), row values equal the row values from the right table where the keys from the left table most closely
Expand All @@ -365,20 +364,21 @@ def raj(self, table: Table, on: Union[str, List[str]], joins: Union[str, List[st
Args:
table (Table): the right-table of the join
on (Union[str, List[str]]): the column(s) to match, can be a common name or an equal expression,
i.e. "col_a = col_b" for different column names
on (Union[str, List[str]]): the column(s) to match, can be a common name or a match condition of two
columns, e.g. 'col_a = col_b'. The first 'N-1' matches are exact matches. The final match is an inexact
match. The inexact match can use either '<' or '<='. If a common name is used for the inexact match,
'<=' is used for the comparison.
joins (Union[str, List[str]], optional): the column(s) to be added from the right table to the result
table, can be renaming expressions, i.e. "new_col = col"; default is None, which means all the columns
from the right table, excluding those specified in 'on'
match_rule (MatchRule, optional): the match rule for the as-of join, default is GREATER_THAN_EQUAL
Returns:
a Table object
Raises:
DHError
"""
return super(Table, self).raj(table, on, joins, match_rule)
return super(Table, self).raj(table, on, joins)

def head_by(self, num_rows: int, by: Union[str, List[str]]) -> Table:
"""The head_by method creates a new table containing the first number of rows for each group.
Expand Down

0 comments on commit a4676d8

Please sign in to comment.