Skip to content

Commit

Permalink
Disable Transfer Consolidation (#194)
Browse files Browse the repository at this point in the history
We will spend more gas to do individual transfers for all Rewards, Gas Reimbursements and Positive slippage. This is supposed to make it easier to validate the solver payouts and in some sense it kinda does. The records are sorted in the same way they will appear in the dashboard.
  • Loading branch information
bh2smith authored Feb 20, 2023
1 parent 17d7205 commit 30a6ad3
Show file tree
Hide file tree
Showing 12 changed files with 467 additions and 125 deletions.
2 changes: 1 addition & 1 deletion src/fetch/dune.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def _parameterized_query(
def _get_query_results(self, query: Query) -> list[dict[str, str]]:
"""Internally every dune query execution is routed through here."""
log.info(f"Fetching {query.name} from query: {query}")
exec_result = self.dune.refresh(query, ping_frequency=5)
exec_result = self.dune.refresh(query, ping_frequency=15)
log.info(f"Fetch completed for execution {exec_result.execution_id}")
# TODO - use a real logger:
# https://github.com/cowprotocol/dune-client/issues/34
Expand Down
42 changes: 26 additions & 16 deletions src/fetch/transfer_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,25 @@
SAFE_URL,
FILE_OUT_DIR,
)
from src.fetch.dune import DuneFetcher
from src.models.accounting_period import AccountingPeriod
from src.models.transfer import Transfer, CSVTransfer
from src.multisend import post_multisend, prepend_unwrap_if_necessary
from src.slack import post_to_slack
from src.utils.print_store import Category
from src.utils.print_store import Category, PrintStore
from src.utils.script_args import generic_script_init


def manual_propose(dune: DuneFetcher) -> None:
def manual_propose(transfers: list[Transfer], period: AccountingPeriod) -> None:
"""
Entry point to manual creation of rewards payout transaction.
This function generates the CSV transfer file to be pasted into the COW Safe app
"""
print(
f"Please double check the batches with unusual slippage: "
f"{dune.period.unusual_slippage_url()}"
f"{period.unusual_slippage_url()}"
)
transfers = Transfer.consolidate(dune.get_transfers())
csv_transfers = [asdict(CSVTransfer.from_transfer(t)) for t in transfers]
FileIO(FILE_OUT_DIR).write_csv(csv_transfers, f"transfers-{dune.period}.csv")
FileIO(FILE_OUT_DIR).write_csv(csv_transfers, f"transfers-{period}.csv")

print(Transfer.summarize(transfers))
print(
Expand All @@ -50,7 +49,12 @@ def manual_propose(dune: DuneFetcher) -> None:
)


def auto_propose(dune: DuneFetcher, slack_client: WebClient, dry_run: bool) -> None:
def auto_propose(
transfers: list[Transfer],
log_saver: PrintStore,
slack_client: WebClient,
dry_run: bool,
) -> None:
"""
Entry point auto creation of rewards payout transaction.
This function encodes the multisend of reward transfers and posts
Expand All @@ -61,13 +65,12 @@ def auto_propose(dune: DuneFetcher, slack_client: WebClient, dry_run: bool) -> N
signing_key = os.environ["PROPOSER_PK"]
client = EthereumClient(URI(NODE_URL))

transfers = Transfer.consolidate(dune.get_transfers())
dune.log_saver.print(Transfer.summarize(transfers), category=Category.TOTALS)
log_saver.print(Transfer.summarize(transfers), category=Category.TOTALS)
transactions = prepend_unwrap_if_necessary(
client, SAFE_ADDRESS, transactions=[t.as_multisend_tx() for t in transfers]
)
if len(transactions) > len(transfers):
dune.log_saver.print("Prepended WETH unwrap", Category.GENERAL)
log_saver.print("Prepended WETH unwrap", Category.GENERAL)

if not dry_run:
nonce = post_multisend(
Expand All @@ -85,20 +88,27 @@ def auto_propose(dune: DuneFetcher, slack_client: WebClient, dry_run: bool) -> N
f"To sign and execute, visit:\n{SAFE_URL}\n"
f"More details in thread"
),
sub_messages=dune.log_saver.get_values(),
sub_messages=log_saver.get_values(),
)


if __name__ == "__main__":
args = generic_script_init(description="Fetch Complete Reimbursement")
args.dune.log_saver.print(
f"The data aggregated can be visualized at\n"
f"{args.dune.period.dashboard_url()}",
dune = args.dune
dune.log_saver.print(
f"The data aggregated can be visualized at\n" f"{dune.period.dashboard_url()}",
category=Category.GENERAL,
)

payout_transfers = dune.get_transfers()
Transfer.sort_list(payout_transfers)
if args.consolidate_transfers:
payout_transfers = Transfer.consolidate(payout_transfers)

if args.post_tx:
auto_propose(
dune=args.dune,
transfers=payout_transfers,
log_saver=dune.log_saver,
slack_client=WebClient(
token=os.environ["SLACK_TOKEN"],
# https://stackoverflow.com/questions/59808346/python-3-slack-client-ssl-sslcertverificationerror
Expand All @@ -107,4 +117,4 @@ def auto_propose(dune: DuneFetcher, slack_client: WebClient, dry_run: bool) -> N
dry_run=args.dry_run,
)
else:
manual_propose(args.dune)
manual_propose(transfers=payout_transfers, period=dune.period)
4 changes: 2 additions & 2 deletions src/models/overdraft.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def from_objects(
cls, transfer: Transfer, slippage: SolverSlippage, period: AccountingPeriod
) -> Overdraft:
"""Constructs an overdraft instance based on Transfer & Slippage"""
assert transfer.receiver == slippage.solver_address
assert transfer.solver == slippage.solver_address
assert transfer.token_type == TokenType.NATIVE
overdraft = transfer.amount_wei + slippage.amount_wei
assert overdraft < 0, "This is why we are here."
Expand All @@ -46,6 +46,6 @@ def eth(self) -> float:

def __str__(self) -> str:
return (
f"Overdraft(solver={self.account}({self.name}),"
f"Overdraft(solver={self.account} ({self.name}),"
f"period={self.period},owed={self.eth} ETH)"
)
10 changes: 5 additions & 5 deletions src/models/split_transfers.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def _process_native_transfers(
penalty_total = 0
while self.unprocessed_native:
transfer = self.unprocessed_native.pop(0)
solver = transfer.receiver
solver = transfer.solver
slippage: Optional[SolverSlippage] = indexed_slippage.get(solver)
if slippage is not None:
assert (
Expand All @@ -73,7 +73,7 @@ def _process_native_transfers(
except ValueError as err:
name, address = slippage.solver_name, slippage.solver_address
self.log_saver.print(
f"Slippage for {address}({name}) exceeds reimbursement: {err}\n"
f"Slippage for {address} ({name}) exceeds reimbursement: {err}\n"
f"Excluding payout and appending excess to overdraft",
category=Category.OVERDRAFT,
)
Expand All @@ -99,7 +99,7 @@ def _process_rewards(
price_day = self.period.end - timedelta(days=1)
while self.unprocessed_cow:
transfer = self.unprocessed_cow.pop(0)
solver = transfer.receiver
solver = transfer.solver
# Remove the element if it exists (assuming it won't have to be reinserted)
overdraft = self.overdrafts.pop(solver, None)
if overdraft is not None:
Expand All @@ -121,7 +121,7 @@ def _process_rewards(
# Reinsert since there is still an amount owed.
self.overdrafts[solver] = overdraft
continue
transfer.redirect_to(redirect_map, self.log_saver)
transfer.try_redirect(redirect_map, self.log_saver)
self.cow_transfers.append(transfer)
# We do not need to worry about any controversy between overdraft
# and positive slippage adjustments, because positive/negative slippage
Expand All @@ -134,7 +134,7 @@ def _process_rewards(
), f"Expected positive slippage got {slippage.amount_wei}"
total_positive_slippage += slippage.amount_wei
slippage_transfer = Transfer.from_slippage(slippage)
slippage_transfer.redirect_to(redirect_map, self.log_saver)
slippage_transfer.try_redirect(redirect_map, self.log_saver)
self.eth_transfers.append(slippage_transfer)
return total_positive_slippage

Expand Down
79 changes: 58 additions & 21 deletions src/models/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def from_transfer(cls, transfer: Transfer) -> CSVTransfer:
return cls(
token_type=transfer.token_type,
token_address=transfer.token.address if transfer.token else None,
receiver=transfer.receiver,
receiver=transfer.recipient,
# The primary purpose for this class is to convert amount_wei to amount
amount=transfer.amount,
)
Expand All @@ -46,16 +46,32 @@ class Transfer:
"""Total amount reimbursed for accounting period"""

token: Optional[Token]
receiver: Address
_solver: Address
amount_wei: int
_redirect_target: Optional[Address] = None

def __init__(self, token: Optional[Token], solver: Address, amount_wei: int):
self.token = token
self._solver = solver
self.amount_wei = amount_wei

@property
def solver(self) -> Address:
"""Read access to the solver field"""
return self._solver

@property
def redirect_target(self) -> Optional[Address]:
"""Read access to the redirect_target field"""
return self._redirect_target

@classmethod
def from_dict(cls, obj: dict[str, str]) -> Transfer:
"""Converts Dune data dict to object with types"""
token_address = obj.get("token_address", None)
return cls(
token=Token(token_address) if token_address else None,
receiver=Address(obj["receiver"]),
solver=Address(obj["receiver"]),
amount_wei=int(obj["amount"]),
)

Expand All @@ -65,7 +81,7 @@ def from_dataframe(cls, pdf: pd.DataFrame) -> list[Transfer]:
return [
cls(
token=Token(row["token_address"]) if row["token_address"] else None,
receiver=Address(row["receiver"]),
solver=Address(row["receiver"]),
amount_wei=int(row["amount"]),
)
for _, row in pdf.iterrows()
Expand All @@ -85,6 +101,11 @@ def summarize(transfers: list[Transfer]) -> str:
f"Total COW Funds needed: {cow_total / 10 ** 18:.4f}\n"
)

@property
def recipient(self) -> Address:
"""The correct way to access the true recipient of a transfer"""
return self.redirect_target if self.redirect_target is not None else self.solver

@staticmethod
def consolidate(transfer_list: list[Transfer]) -> list[Transfer]:
"""
Expand All @@ -95,14 +116,14 @@ def consolidate(transfer_list: list[Transfer]) -> list[Transfer]:

transfer_dict: dict[tuple, Transfer] = {}
for transfer in transfer_list:
key = (transfer.receiver, transfer.token)
key = (transfer.recipient, transfer.token)
if key in transfer_dict:
transfer_dict[key] = transfer_dict[key].merge(transfer)
else:
transfer_dict[key] = transfer
return sorted(
transfer_dict.values(),
key=lambda t: (-t.amount, t.receiver, t.token),
key=lambda t: (-t.amount, t.recipient, t.token),
)

@property
Expand All @@ -123,10 +144,10 @@ def amount(self) -> float:

def add_slippage(self, slippage: SolverSlippage, log_saver: PrintStore) -> None:
"""Adds Adjusts Transfer amount by Slippage amount"""
assert self.receiver == slippage.solver_address, "receiver != solver"
assert self.solver == slippage.solver_address, "receiver != solver"
adjustment = slippage.amount_wei
log_saver.print(
f"Deducting slippage for solver {self.receiver}"
f"Deducting slippage for solver {self.solver}"
f"by {adjustment / 10 ** 18:.5f} ({slippage.solver_name})",
category=Category.SLIPPAGE,
)
Expand All @@ -137,30 +158,34 @@ def add_slippage(self, slippage: SolverSlippage, log_saver: PrintStore) -> None:

def merge(self, other: Transfer) -> Transfer:
"""
Merge two transfers (acts like addition)
if all fields except amount are equal, returns a transfer who amount is the sum
Merge two transfers (acts like addition) if all fields except amount are equal,
returns a transfer who amount is the sum.
Merges incur information loss (particularly in the category of redirects).
Note that two transfers of the same token with different receivers,
but redirected to the same address, will get merged and original receivers will be forgotten
"""
merge_requirements = [
self.receiver == other.receiver,
self.recipient == other.recipient,
self.token == other.token,
]
if all(merge_requirements):
return Transfer(
token=self.token,
receiver=self.receiver,
solver=self.recipient,
amount_wei=self.amount_wei + other.amount_wei,
)
raise ValueError(
f"Can't merge tokens {self}, {other}. "
f"Can't merge transfers {self}, {other}. "
f"Requirements met {merge_requirements}"
)

def as_multisend_tx(self) -> MultiSendTx:
"""Converts Transfer into encoded MultiSendTx bytes"""
receiver = self.recipient.address
if self.token_type == TokenType.NATIVE:
return MultiSendTx(
operation=MultiSendOperation.CALL,
to=self.receiver.address,
to=receiver,
value=self.amount_wei,
data=HexStr("0x"),
)
Expand All @@ -171,31 +196,31 @@ def as_multisend_tx(self) -> MultiSendTx:
to=str(self.token.address),
value=0,
data=erc20().encodeABI(
fn_name="transfer", args=[self.receiver.address, self.amount_wei]
fn_name="transfer", args=[receiver, self.amount_wei]
),
)
raise ValueError(f"Unsupported type {self.token_type}")

def __str__(self) -> str:
if self.token_type == TokenType.NATIVE:
return f"TransferETH(receiver={self.receiver}, amount_wei={self.amount})"
return f"TransferETH(receiver={self.recipient}, amount_wei={self.amount})"
if self.token_type == TokenType.ERC20:
return (
f"Transfer("
f"token_address={self.token}, "
f"receiver={self.receiver}, "
f"recipient={self.recipient}, "
f"amount_wei={self.amount})"
)
raise ValueError(f"Invalid Token Type {self.token_type}")

def redirect_to(
def try_redirect(
self, redirects: dict[Address, Vouch], log_saver: PrintStore
) -> None:
"""
Redirects Transfers via Address => Vouch.reward_target
This function modifies self!
"""
recipient = self.receiver
recipient = self.recipient
if recipient in redirects:
# Redirect COW rewards to reward target specific by VouchRegistry
redirect_address = redirects[recipient].reward_target
Expand All @@ -205,7 +230,7 @@ def redirect_to(
if self.token is None
else Category.COW_REDIRECT,
)
self.receiver = redirect_address
self._redirect_target = redirect_address

@classmethod
def from_slippage(cls, slippage: SolverSlippage) -> Transfer:
Expand All @@ -215,6 +240,18 @@ def from_slippage(cls, slippage: SolverSlippage) -> Transfer:
"""
return cls(
token=None,
receiver=slippage.solver_address,
solver=slippage.solver_address,
amount_wei=slippage.amount_wei,
)

@staticmethod
def sort_list(transfer_list: list[Transfer]) -> None:
"""
This is the preferred and tested sort order we use for generating the transfer file.
It ensures that transfers are grouped
1. first by initial recipient (i.e. solvers),
2. then by token address (with native ETH last)
and finally order by amount descending (so that largest in category occur first).
Note that this method mutates the input data and nothing in returned.
"""
transfer_list.sort(key=lambda t: (t.solver, str(t.token), -t.amount))
Loading

0 comments on commit 30a6ad3

Please sign in to comment.