Skip to content

Commit

Permalink
Backend and aliasing
Browse files Browse the repository at this point in the history
ross-spencer committed Nov 12, 2024
1 parent a2ac599 commit 7922978
Showing 5 changed files with 556 additions and 8 deletions.
1 change: 1 addition & 0 deletions requirements/local.txt
Original file line number Diff line number Diff line change
@@ -6,4 +6,5 @@ pre-commit==3.7.0
pip-upgrader==1.4.15
pylint==3.1.0
pytest==8.1.1
pytest-mock==3.14.0
tox==4.14.2
266 changes: 266 additions & 0 deletions src/simple_sign/backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
"""Cardano handlers.
This is very much a work in progress aimed at a very small dApp where
the anticipated amount of data to be returned for a query is very
small. The concept of a backend is not really fleshed out either and
so remains unexported until an interface is implemented or some other
useful/interesting concept.
"""

import logging
from dataclasses import dataclass
from typing import Callable, Final

import cachetools.func
import pycardano as pyc
import pydantic
import requests


@dataclass
class ValidTx:
slot: int
tx_id: str
address: str
staking: str


logger = logging.getLogger(__name__)

BACKENDS: Final[list] = ["kupo"]


def _sum_dict(key: str, value: int, accumulator: dict):
"""Increment values in a given dictionary"""
if key not in accumulator:
accumulator[key] = value
return accumulator
count = accumulator[key]
count = count + value
accumulator[key] = count
return accumulator


def _get_staking_from_addr(addr: str) -> str:
"""Return a staking address if possible from a given address,
otherwise, return the original address string.
"""
try:
address = pyc.Address.from_primitive(addr)
return str(
pyc.Address(staking_part=address.staking_part, network=pyc.Network.MAINNET)
)
except pyc.exception.InvalidAddressInputException:
return str(address)
except TypeError as err:
logger.error("cannot convert '%s' (%s)", addr, err)
return str(addr)


class BackendContext:
"""Backend interfaces.
NB. this will probably prove to be a naive implementation of this
sort of thing, but lets see. Learning from PyCardano.
"""

def _retrieve_unspent_utxos(self) -> dict:
"""Retrieve unspent utxos from the backend."""
raise NotImplementedError()

def retrieve_staked_holders(self, token_policy: str) -> list:
"""Retrieve a list of staked holders against a given CNT."""
raise NotImplementedError()

def retrieve_nft_holders(
self, policy: str, deny_list: list, seek_addr: str = None
) -> list:
"""Retrieve a list of NFT holders, e.g. a license to operate
a decentralized node.
"""
raise NotImplementedError()

def retrieve_metadata(
self, value: int, policy: str, tag: str, callback: Callable = None
) -> list:
"""Retrieve metadata from the backend."""
raise NotImplementedError()


class KupoContext(BackendContext):
"""Kupo backend."""

def __init__(
self,
base_url: str,
port: int,
):
"""Initialize this thing..."""
self._base_url = base_url
self._port = port

@cachetools.func.ttl_cache(ttl=60)
def _retrieve_unspent_utxos(self, addr: str = "") -> dict:
"""Retrieve unspent utxos from Kupo.
NB. Kupo must be configured to capture sparingly.
"""
if not addr:
resp = requests.get(
f"{self._base_url}:{self._port}/matches?unspent", timeout=30
)
return resp.json()
resp = requests.get(
f"{self._base_url}:{self._port}/matches/{addr}?unspent", timeout=30
)
return resp.json()

def _retrieve_metadata(self, tag: str, tx_list: list[ValidTx]):
"""Return metadata based on slot and transaction ID. This is
very much a Kupo-centric approach. Metadata is not indexed
locally and instead needs to be retrieved directly from
a node.
IMPORTANT: The metadata is modified here to provide information
about the source address. This is so that the data remains
accurately coupled with what is retrieved. We can't do this
with Kupo easily otherwise.
"""
md_list = []
for tx in tx_list:
resp = requests.get(
f"{self._base_url}:{self._port}/metadata/{tx.slot}?transaction_id={tx.tx_id}",
timeout=30,
)
if not resp.json():
return md_list
md_dict = resp.json()
try:
_ = md_dict[0]["schema"][tag]
except (IndexError, KeyError):
return md_list
md_dict[0]["address"] = tx.address
md_dict[0]["staking"] = tx.staking
md_dict[0]["transaction"] = tx.tx_id
md_list.append(md_dict[0])
return md_list

def retrieve_staked_holders(self, token_policy: str, seek_addr: str = None) -> list:
"""Retrieve a list of staked holders against a given CNT."""
unspent = self._retrieve_unspent_utxos()
addresses_with_fact = {}
for item in unspent:
addr = item["address"]
if seek_addr and addr != seek_addr:
# don't process further than we have to if we're only
# looking for a single address.
continue
staking = _get_staking_from_addr(addr)
assets = item["value"]["assets"]
for key, value in assets.items():
if token_policy in key:
addresses_with_fact = _sum_dict(staking, value, addresses_with_fact)
return addresses_with_fact

def retrieve_nft_holders(
self, policy: str, deny_list: list = None, seek_addr: str = None
) -> list:
"""Retrieve a list of NFT holders, e.g. a license to operate
a decentralized node.
Filtering can be performed elsewhere, but a deny_list is used
to remove some results that are unhelpful, e.g. the minting
address if desired.
"""
unspent = self._retrieve_unspent_utxos()
holders = {}
for item in unspent:
addr = item["address"]
if seek_addr and addr != seek_addr:
# don't process further than we have to if we're only
# looking for a single address.
continue
staking = _get_staking_from_addr(addr)
if addr in deny_list:
continue
assets = item["value"]["assets"]
for key, _ in assets.items():
if not key.startswith(policy):
continue
holders[key] = staking
return holders

@staticmethod
def _get_valid_txs(unspent: list[dict], value: int, policy: str) -> list[ValidTx]:
"""Retrieve a list of valid transactions according to our
policy rules.
"""
valid_txs = []
if not unspent:
return valid_txs
for item in unspent:
coins = item["value"]["coins"]
if coins != value:
continue
assets = item["value"]["assets"]
for asset in assets:
if policy not in asset:
continue
logger.error(policy)
slot = item["created_at"]["slot_no"]
tx_id = item["transaction_id"]
address = item["address"]
valid_tx = ValidTx(
slot=slot,
tx_id=tx_id,
address=address,
staking=_get_staking_from_addr(address),
)
valid_txs.append(valid_tx)
return valid_txs

@pydantic.validate_call()
def retrieve_metadata(
self,
value: int,
policy: str,
tag: str,
callback: Callable = None,
) -> list:
"""Retrieve a list of aliased signing addresses. An aliased
signing address is an address that has been setup using a
protocol that allows NFT holders to participate in a network
without having the key to their primary wallets hot/live on the
decentralized node that they are operating.
Kupo queries involved:
```sh
curl -s "http://0.0.0.0:1442/matches?unspent"
curl -s "http://0.0.0.0:1442/metadata/{slot_id}?transaction_id={}"
```
Strategy 1: Retrieve all aliased keys for a policy ID.
Capture all values that match.
Capture all slots and tx ids for those values.
Retrieve metadata for all those txs.
Augment metadata with address and staking address.
Optionally, use the callback to process the data
according to a set of rules.
Return the metadata or a list of processed values to
the caller.
NB. the callback must return a list to satisfy the output of the
primary function.
NB. this function is not as generic as it could be.
"""
unspent = self._retrieve_unspent_utxos()
valid_txs = self._get_valid_txs(unspent, value, policy)
if not valid_txs:
return valid_txs
md = self._retrieve_metadata(tag, valid_txs)
if not callback:
return md
return callback(md)
36 changes: 28 additions & 8 deletions src/simple_sign/sign.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Orcfax simple sign."""

# pylint: disable=W0613

import argparse
import logging
import os
@@ -35,29 +37,47 @@


KNOWN_SIGNERS_CONFIG: Final[str] = "CIP8_NOTARIES"
KUPO_URL: Final[str] = "KUPO_URL_NOTARIES"


class UnknownSigningKey(Exception):
"""Exception to raise when the signing key is unknown."""


def signature_belongs_to_stake_pool():
"""Validate whether the signing key belongs to a stake pool
associated with the dApp and return True if so.
def retrieve_aliased(pkey: str) -> str:
"""Retrieve another public key aliased by the given lookup.
The result might then be used to verify using one of the other
methods in this library, e.g.
1. lookup aliased staking key.
2. lookup staking key in license pool.
3. if not exists, raise exception, else, pass.
We want to do this on an address by address basis. The difficulty
is consistent parsing of metadata that allows this function to be
broadly applicable across functions.
"""
raise NotImplementedError(
"verifying signature in stake pool is not yet implemented"
)
raise NotImplementedError("reading staked values is not yet implemented")


def signature_belongs_to_staked_pool(
pkey: str, token_policy_id: str, min_stake: int
) -> bool:
"""Validate whether the signing key belongs to a someone who has
enough stake in a given token.
"""
raise NotImplementedError("reading staked values is not yet implemented")


def signature_in_license_pool():
def signature_in_license_pool(pkey: str, policy_id: str) -> bool:
"""Validate whether signing key matches one of those in a pool of
licenses associated with the project and return True if so.
"""
raise NotImplementedError("reading from license pool is not yet implemented")


def signature_in_constitution_datum_utxo():
def signature_in_constitution_datum_utxo(pkey: str) -> bool:
"""Validate whether signing key matches one of those a list of
addresses in a given constitution UTxO.
"""
11 changes: 11 additions & 0 deletions src/simple_sign/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"""Simple-sign data types"""

from dataclasses import dataclass


@dataclass
class Alias:
alias: str
address: str
staking: str
tx: str
250 changes: 250 additions & 0 deletions tests/test_kupo_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
"""Test the functions looking at what's on-chain."""

# pylint: disable=W0212,R0903,C0103

import json
from typing import Final

import pycardano as pyc

from src.simple_sign.backend import KupoContext, ValidTx, _get_staking_from_addr
from src.simple_sign.types import Alias

tx_data: Final[
dict
] = """
[
{
"transaction_index": 19,
"transaction_id": "791c3d699f1236a227edd611dc6408034523b98858cd15b4b495415c2835a242",
"output_index": 0,
"address": "addr1qy0pwlnkznxhq50fhwh8ac90c0yt54n6y9krg054daehxzqgp4e92w5qwg3jlp6xqyfh7hxrwv74gu3t6awu0v84vhmszrx6vt",
"value": {
"coins": 1233712,
"assets": {
"5e43f3c51d80434a0be558da4272f189ea1a36f4d8b5165da7ca1e60.427569646c657246657374303235": 1
}
},
"datum_hash": null,
"script_hash": null,
"created_at": {
"slot_no": 139250846,
"header_hash": "af3249be9a3bc21b6a98cc57e693c17ec2afbe50b7bc5e1da5ece75312f83d87"
},
"spent_at": null
},
{
"transaction_index": 16,
"transaction_id": "2c53ca6f6848870d9a872f7fdbfb56ea9fd05d57d92445f460b62167ea3cca7f",
"output_index": 0,
"address": "addr1q983mnp8yhmga4h0wz8ha9eaxdpusx7dp36ca6y3jd2nyly00f3ztshzchqy4rt5tcqtprvr38f56u9h46wlthvd9a2s6rw6vc",
"value": {
"coins": 1611940,
"assets": {
"84063e20b788729b48e6455b1f97062d54b15114c37aeda24fd27c0e.454d5552474f2078204e4d4b522053756d6d69742023343233": 1,
"5e43f3c51d80434a0be558da4272f189ea1a36f4d8b5165da7ca1e60.427569646c657246657374313139": 1,
"5964c3813d1abae676a8d88547d73a842f62576befc7b93753c12c2c.4353595032343239": 1
}
},
"datum_hash": null,
"script_hash": null,
"created_at": {
"slot_no": 139161268,
"header_hash": "a29abdc22c9f89f891709330bd6965b4c647de73a996c517952366c9c95e71bf"
},
"spent_at": null
}
]
"""

md_data = """
[
{
"address": "addr123",
"staking": "stake123",
"transaction": "tx123",
"hash": "2e8dde3ec1e295abb4ff18491658c04b1ade5329f13967d391f91e42eca07047",
"raw": "a11902a2a1636d7367836852454749535445526349544e7839303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303020",
"schema": {
"674": {
"map": [
{
"k": {
"string": "msg"
},
"v": {
"list": [
{
"string": "REGISTER"
},
{
"string": "ITN"
},
{
"string": "00000000000000000000000000000000000000000000000000000000 "
}
]
}
}
]
}
}
}
]
"""


def my_callable(md: list[dict]) -> list[Alias]:
"""A function to use to test callbacks in the backend."""
addresses = []
for item in md:
assert (
"address" in item
), "metadata dictionary should have been augmented with address"
assert (
"staking" in item
), "metadata dictionary should have been augmented with staking"
try:
value = item["schema"]["674"]["map"][0]["v"]["list"]
except KeyError:
continue
try:
action = value[0]["string"]
project = value[1]["string"]
vkh = value[2]["string"]
except IndexError:
continue
try:
if (
action.upper().strip() != "REGISTER"
and project.upper().strip() != "ITN"
):
continue
except ValueError:
continue
try:
network = pyc.Network.MAINNET
verification_key_hash = pyc.VerificationKeyHash.from_primitive(vkh)
address = pyc.Address(verification_key_hash, network=network)
addresses.append(
Alias(
alias=str(address),
address=item["address"],
staking=item["staking"],
tx=item["transaction"],
)
)
except ValueError:
continue
return addresses


class MockMetadataResponse:
"""Mock requests response for our requests functions."""

@staticmethod
def json():
"""Return a dictionary representation of our data."""
return json.loads(md_data)


def test_get_valid_tx_private():
"""Ensure that our processing of valid transactions is doing
something sensible.
"""
value = 1233712
policy = "5e43f3c51d80434a0be558da4272f189ea1a36f4d8b5165da7ca1e60"
context = KupoContext("mock_address", 9999)
valid_txs = context._get_valid_txs(json.loads(tx_data), value, policy)
assert len(valid_txs) == 1
assert valid_txs == [
ValidTx(
slot=139250846,
tx_id="791c3d699f1236a227edd611dc6408034523b98858cd15b4b495415c2835a242",
address="addr1qy0pwlnkznxhq50fhwh8ac90c0yt54n6y9krg054daehxzqgp4e92w5qwg3jlp6xqyfh7hxrwv74gu3t6awu0v84vhmszrx6vt",
staking="stake1uyyq6uj482q8yge0sarqzymltnphx025wg4awhw8kr6ktacaez36t",
)
]


def test_retrieve_metadata_private(mocker):
"""Test our mocked function below to ensure that it is being
sensible.
"""
context = KupoContext("mock_address", 9999)
mocker.patch("requests.get", return_value=MockMetadataResponse())
resp = context._retrieve_metadata(
"674",
[
ValidTx(
123,
"tx123",
"addr123",
"stake123",
)
],
)
assert isinstance(resp, list)
assert resp[0]["address"] == "addr123"
assert resp[0]["staking"] == "stake123"


def test_retrieve_metadata_private_fail(mocker):
"""Test our mocked function below to ensure that it is being
sensible. In this test the metadata label is incorrect.
"""
context = KupoContext("mock_address", 9999)
mocker.patch("requests.get", return_value=MockMetadataResponse())
resp = context._retrieve_metadata(
"675",
[
ValidTx(
123,
"tx123",
"addr123",
"stake123",
)
],
)
assert len(resp) == 0 and isinstance(resp, list)


def test_aliased_signing_addresses(mocker):
"""Ensure we can trace aliased addresses. This test provides
somewhat of an integration test and so other smaller units need
testing independently, e.g. metadata retrieval.
"""
context = KupoContext("mock_address", 9999)
mocker.patch(
"src.simple_sign.backend.KupoContext._retrieve_unspent_utxos",
return_value=json.loads(tx_data),
)
mocker.patch(
"src.simple_sign.backend.KupoContext._retrieve_metadata",
return_value=json.loads(md_data),
)
md_output = context.retrieve_metadata(
value=1233712,
policy="5e43f3c51d80434a0be558da4272f189ea1a36f4d8b5165da7ca1e60",
tag="674",
callback=my_callable,
)
assert md_output == [
Alias(
alias="addr1vyqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqkdl5mw",
address="addr123",
staking="stake123",
tx="tx123",
)
]


def test_get_staking_addr_behavior():
"""Make sure we are handling exceptions if we search for a staking
address instead of a spending or enterprise address.
"""
test_staking: Final[
str
] = "stake1uyyq6uj482q8yge0sarqzymltnphx025wg4awhw8kr6ktacaez36t"
res = _get_staking_from_addr(test_staking)
assert res == test_staking

0 comments on commit 7922978

Please sign in to comment.