diff --git a/test/functional/feature_taproot.py b/test/functional/feature_taproot.py new file mode 100755 index 0000000000000..9d5e3299f6ad9 --- /dev/null +++ b/test/functional/feature_taproot.py @@ -0,0 +1,443 @@ +#!/usr/bin/env python3 +# Copyright (c) 2019 The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +# Test taproot softfork. + +from test_framework.blocktools import create_coinbase, create_block, create_transaction, add_witness_commitment +from test_framework.messages import CTransaction, CTxIn, CTxOut, COutPoint, CTxInWitness +from test_framework.script import CScript, TaprootSignatureHash, taproot_construct, GetP2SH, OP_0, OP_1, OP_CHECKSIG, OP_CHECKSIGVERIFY, OP_CHECKSIGADD, OP_IF, OP_CODESEPARATOR, OP_ELSE, OP_ENDIF, OP_DROP, DEFAULT_TAPSCRIPT_VER, SIGHASH_SINGLE, is_op_success, CScriptOp, OP_RETURN, OP_VERIF, OP_RESERVED, OP_1NEGATE, OP_EQUAL, MAX_SCRIPT_ELEMENT_SIZE, LOCKTIME_THRESHOLD, ANNEX_TAG +from test_framework.test_framework import BitcoinTestFramework +from test_framework.util import assert_equal, assert_raises_rpc_error, hex_str_to_bytes +from test_framework.key import ECKey +from test_framework.address import program_to_witness, script_to_p2sh +from binascii import hexlify +from hashlib import sha256 +from io import BytesIO +import random +import struct + +EMPTYWITNESS_ERROR = "non-mandatory-script-verify-flag (Witness program was passed an empty witness) (code 64)" +INVALIDKEYPATHSIG_ERROR = "non-mandatory-script-verify-flag (Invalid signature for taproot key path spending) (code 64)" +UNKNOWNWITNESS_ERROR = "non-mandatory-script-verify-flag (Witness version reserved for soft-fork upgrades) (code 64)" + +DUST_LIMIT = 600 +MIN_FEE = 5000 + +def tx_from_hex(hexstring): + tx = CTransaction() + f = BytesIO(hex_str_to_bytes(hexstring)) + tx.deserialize(f) + return tx + +def get_taproot_bech32(info): + if isinstance(info, tuple): + info = info[0] + return program_to_witness(1, info[2:]) + +def get_taproot_p2sh(info): + return script_to_p2sh(info[0]) + +def random_op_success(): + ret = 0 + while (not is_op_success(ret)): + ret = random.randint(0x50, 0xfe) + return CScriptOp(ret) + +def random_unknown_tapscript_ver(no_annex_tag=True): + ret = DEFAULT_TAPSCRIPT_VER + while (ret == DEFAULT_TAPSCRIPT_VER or (no_annex_tag and ret == (ANNEX_TAG & 0xfe))): + ret = random.randrange(128) * 2 + return ret + +def random_bytes(n): + return bytes(random.getrandbits(8) for i in range(n)) + +def random_script(size, no_success = True): + ret = bytes() + while (len(ret) < size): + remain = size - len(ret) + opcode = random.randrange(256) + while (no_success and is_op_success(opcode)): + opcode = random.randrange(256) + if opcode == 0 or opcode >= OP_1NEGATE: + ret += bytes([opcode]) + elif opcode <= 75 and opcode <= remain - 1: + ret += bytes([opcode]) + random_bytes(opcode) + elif opcode == 76 and remain >= 2: + pushsize = random.randint(0, min(0xff, remain - 2)) + ret += bytes([opcode]) + bytes([pushsize]) + random_bytes(pushsize) + elif opcode == 77 and remain >= 3: + pushsize = random.randint(0, min(0xffff, remain - 3)) + ret += bytes([opcode]) + struct.pack(b'= 5: + pushsize = random.randint(0, min(0xffffffff, remain - 5)) + ret += bytes([opcode]) + struct.pack(b' 0 + ret = bytes() + opcode = 78 + if size <= 75: + opcode = random.randint(75, 78) + elif size <= 255: + opcode = random.randint(76, 78) + elif size <= 0xffff: + opcode = random.randint(77, 78) + if opcode == 75: + ret = bytes([size]) + random_bytes(size - 1) + elif opcode == 76: + ret = bytes([opcode]) + bytes([size]) + random_bytes(size - 2) + elif opcode == 77: + ret = bytes([opcode]) + struct.pack(b'= size + return ret[:size] + +def random_checksig_style(pubkey): + opcode = random.choice([OP_CHECKSIG, OP_CHECKSIGVERIFY, OP_CHECKSIGADD]) + if (opcode == OP_CHECKSIGVERIFY): + ret = CScript([pubkey, opcode, OP_1]) + elif (opcode == OP_CHECKSIGADD): + num = random.choice([0, 0x7fffffff, -0x7fffffff]) + ret = CScript([num, pubkey, opcode, num+1, OP_EQUAL]) + else: + ret = CScript([pubkey, opcode]) + return bytes(ret) + +def damage_bytes(b): + return (int.from_bytes(b, 'big') ^ (1 << random.randrange(len(b)*8))).to_bytes(len(b), 'big') + +def spend_single_sig(tx, input_index, spent_utxos, info, p2sh, key, annex=None, hashtype=0, prefix=[], suffix=[], script=None, pos=-1, damage=False): + ht = hashtype + + damage_type = random.randrange(5) if damage else -1 + ''' + * 0. bit flip the sighash + * 1. bit flip the signature + * If the expected hashtype is 0: + -- 2. append a 0 to the signature + -- 3. append a random value of 1-255 to the signature + * If the expected hashtype is not 0: + -- 2. do not append hashtype to the signature + -- 3. append a random incorrect value of 0-255 to the signature + * 4. extra witness element + ''' + + # Taproot key path spend: tweak key + if script is None: + key = key.tweak_add(info[1]) + assert(key is not None) + # Change SIGHASH_SINGLE into SIGHASH_ALL if no corresponding output + if (ht & 3 == SIGHASH_SINGLE and input_index >= len(tx.vout)): + ht ^= 2 + # Compute sighash + if script: + sighash = TaprootSignatureHash(tx, spent_utxos, ht, input_index, scriptpath = True, tapscript = script, codeseparator_pos = pos, annex = annex) + else: + sighash = TaprootSignatureHash(tx, spent_utxos, ht, input_index, scriptpath = False, annex = annex) + if damage_type == 0: + sighash = damage_bytes(sighash) + # Compute signature + sig = key.sign_schnorr(sighash) + if damage_type == 1: + sig = damage_bytes(sig) + if damage_type == 2: + if ht == 0: + sig += bytes([0]) + elif damage_type == 3: + random_ht = ht + while random_ht == ht: + random_ht = random.randrange(256) + sig += bytes([random_ht]) + elif ht > 0: + sig += bytes([ht]) + # Construct witness + ret = prefix + [sig] + suffix + if script is not None: + ret += [script, info[2][script]] + if annex is not None: + ret += [annex] + if damage_type == 4: + ret = [random_bytes(random.randrange(5))] + ret + tx.wit.vtxinwit[input_index].scriptWitness.stack = ret + # Construct P2SH redeemscript + if p2sh: + tx.vin[input_index].scriptSig = CScript([info[0]]) + +def spend_alwaysvalid(tx, input_index, info, p2sh, script, annex=None, damage=False): + if isinstance(script, tuple): + version, script = script + ret = [script, info[2][script]] + if damage: + # With 50% chance, we bit flip the script (unless the script is an empty vector) + # With 50% chance, we bit flip the control block + if random.choice([True, False]) or len(ret[0]) == 0: + # Annex is always required for tapscript version 0x50 + # Unless the original version is 0x50, we couldn't convert it to 0x50 without using annex + tmp = damage_bytes(ret[1]) + while annex is None and tmp[0] == ANNEX_TAG and ret[1][0] != ANNEX_TAG: + tmp = damage_bytes(ret[1]) + ret[1] = tmp + else: + ret[0] = damage_bytes(ret[0]) + if annex is not None: + ret += [annex] + # Randomly add input witness + if random.choice([True, False]): + for i in range(random.randint(1, 10)): + ret = [random_bytes(random.randint(0, MAX_SCRIPT_ELEMENT_SIZE*2))] + ret + tx.wit.vtxinwit[input_index].scriptWitness.stack = ret + # Construct P2SH redeemscript + if p2sh: + tx.vin[input_index].scriptSig = CScript([info[0]]) + +def spender_sighash_mutation(spenders, info, p2sh, comment, standard=True, **kwargs): + spk = info[0] + addr = get_taproot_bech32(info) + if p2sh: + spk = GetP2SH(spk) + addr = get_taproot_p2sh(info) + def fn(t, i, u, v): + return spend_single_sig(t, i, u, damage=not v, info=info, p2sh=p2sh, **kwargs) + spenders.append((spk, addr, comment, standard, fn)) + +def spender_alwaysvalid(spenders, info, p2sh, comment, **kwargs): + spk = info[0] + addr = get_taproot_bech32(info) + if p2sh: + spk = GetP2SH(spk) + addr = get_taproot_p2sh(info) + def fn(t, i, u, v): + return spend_alwaysvalid(t, i, damage=not v, info=info, p2sh=p2sh, **kwargs) + spenders.append((spk, addr, comment, False, fn)) + +class TAPROOTTest(BitcoinTestFramework): + + def set_test_params(self): + self.num_nodes = 1 + self.setup_clean_chain = True + self.extra_args = [["-whitelist=127.0.0.1", "-acceptnonstdtxn=0", "-par=1"]] + + def block_submit(self, node, txs, msg, cb_pubkey=None, fees=0, witness=False, accept=False): + block = create_block(self.tip, create_coinbase(self.lastblockheight + 1, pubkey=cb_pubkey, fees=fees), self.lastblocktime + 1) + block.nVersion = 4 + for tx in txs: + tx.rehash() + block.vtx.append(tx) + block.hashMerkleRoot = block.calc_merkle_root() + witness and add_witness_commitment(block) + block.rehash() + block.solve() + node.submitblock(block.serialize(True).hex()) + if (accept): + assert node.getbestblockhash() == block.hash, "Failed to accept: " + msg + self.tip = block.sha256 + self.lastblockhash = block.hash + self.lastblocktime += 1 + self.lastblockheight += 1 + else: + assert node.getbestblockhash() == self.lastblockhash, "Failed to reject: " + msg + + def test_spenders(self, spenders, input_counts): + """Run randomized tests with a number of "spenders". + + Each spender is a tuple of: + - A scriptPubKey (CScript) + - An address for that scriptPubKey (string) + - A comment describing the test (string) + - Whether the spending (on itself) is expected to be standard (bool) + - A lambda taking as inputs: + - A transaction to sign (CTransaction) + - An input position (int) + - The spent UTXOs by this transaction (list of CTxOut) + - Whether to produce a valid spend (bool) + + Each spender embodies a test; in a large randomized test, it is verified + that toggling the valid argument to each lambda toggles the validity of + the transaction. This is accomplished by constructing transactions consisting + of all valid inputs, except one invalid one. + """ + + # Construct a UTXO to spend for each of the spenders + self.nodes[0].generate(110) + bal = self.nodes[0].getbalance() * 3 / (4*len(spenders)) + random.shuffle(spenders) + num_spenders = len(spenders) + utxos = [] + while len(spenders): + # Create the necessary outputs in multiple transactions, as sPKs may be repeated (which sendmany does not support) + outputs = {} + new_spenders = [] + batch = [] + for spender in spenders: + addr = spender[1] + if addr in outputs: + new_spenders.append(spender) + else: + amount = random.randrange(int(bal * 95000000), int(bal * 105000000)) + outputs[addr] = amount / 100000000 + batch.append(spender) + self.log.info("Constructing %i UTXOs for spending tests" % len(batch)) + tx = tx_from_hex(self.nodes[0].getrawtransaction(self.nodes[0].sendmany("", outputs))) + tx.rehash() + spenders = new_spenders + random.shuffle(spenders) + + # Map created UTXOs back to the spenders they were created for + for n, out in enumerate(tx.vout): + for spender in batch: + if out.scriptPubKey == spender[0]: + utxos.append((COutPoint(tx.sha256, n), out, spender)) + break + assert(len(utxos) == num_spenders) + random.shuffle(utxos) + self.nodes[0].generate(1) + + # Construct a bunch of sPKs that send coins back to the host wallet + self.log.info("Constructing 100 addresses for returning coins") + host_spks = [] + host_pubkeys = [] + for i in range(100): + addr = self.nodes[0].getnewaddress(address_type=random.choice(["legacy", "p2sh-segwit", "bech32"])) + info = self.nodes[0].getaddressinfo(addr) + spk = hex_str_to_bytes(info['scriptPubKey']) + host_spks.append(spk) + host_pubkeys.append(hex_str_to_bytes(info['pubkey'])) + + # Pick random subsets of UTXOs to construct transactions with + self.lastblockhash = self.nodes[0].getbestblockhash() + self.tip = int("0x" + self.lastblockhash, 0) + block = self.nodes[0].getblock(self.lastblockhash) + self.lastblockheight = block['height'] + self.lastblocktime = block['time'] + while len(utxos): + tx = CTransaction() + tx.nVersion = random.choice([1, 2, random.randint(-0x80000000,0x7fffffff)]) + min_sequence = (tx.nVersion != 1 and tx.nVersion != 0) * 0x80000000 # The minimum sequence number to disable relative locktime + if random.choice([True, False]): + tx.nLockTime = random.randrange(LOCKTIME_THRESHOLD, self.lastblocktime - 7200) # all absolute locktimes in the past + else: + tx.nLockTime = random.randrange(self.lastblockheight+1) # all block heights in the past + + # Pick 1 to 4 UTXOs to construct transaction inputs + acceptable_input_counts = [cnt for cnt in input_counts if cnt <= len(utxos)] + while True: + inputs = random.choice(acceptable_input_counts) + remaining = len(utxos) - inputs + if remaining == 0 or remaining >= max(input_counts) or remaining in input_counts: + break + input_utxos = utxos[-inputs:] + utxos = utxos[:-inputs] + fee = random.randrange(MIN_FEE * 2, MIN_FEE * 4) # 10000-20000 sat fee + in_value = sum(utxo[1].nValue for utxo in input_utxos) - fee + tx.vin = [CTxIn(outpoint = input_utxos[i][0], nSequence = random.randint(min_sequence, 0xffffffff)) for i in range(inputs)] + tx.wit.vtxinwit = [CTxInWitness() for i in range(inputs)] + self.log.info("Test: %s" % (", ".join(utxo[2][2] for utxo in input_utxos))) + + # Add 1 to 4 outputs + outputs = random.choice([1,2,3,4]) + assert in_value >= 0 and fee - outputs * DUST_LIMIT >= MIN_FEE + for i in range(outputs): + tx.vout.append(CTxOut()) + if in_value <= DUST_LIMIT: + tx.vout[-1].nValue = DUST_LIMIT + elif i < outputs - 1: + tx.vout[-1].nValue = in_value + else: + tx.vout[-1].nValue = random.randint(DUST_LIMIT, in_value) + in_value -= tx.vout[-1].nValue + tx.vout[-1].scriptPubKey = random.choice(host_spks) + fee += in_value + assert(fee >= 0) + + # For each inputs, make it fail once; then succeed once + for fail_input in range(inputs + 1): + # Wipe scriptSig/witness + for i in range(inputs): + tx.vin[i].scriptSig = CScript() + tx.wit.vtxinwit[i] = CTxInWitness() + # Fill inputs/witnesses + for i in range(inputs): + fn = input_utxos[i][2][4] + fn(tx, i, [utxo[1] for utxo in input_utxos], i != fail_input) + # Submit to mempool to check standardness + standard = fail_input == inputs and all(utxo[2][3] for utxo in input_utxos) and tx.nVersion >= 1 and tx.nVersion <= 2 + if standard: + self.nodes[0].sendrawtransaction(tx.serialize().hex(), 0) + assert(self.nodes[0].getmempoolentry(tx.hash) is not None) + else: + assert_raises_rpc_error(-26, None, self.nodes[0].sendrawtransaction, tx.serialize().hex(), 0) + # Submit in a block + tx.rehash() + msg = ','.join(utxo[2][2] + ("*" if n == fail_input else "") for n, utxo in enumerate(input_utxos)) + self.block_submit(self.nodes[0], [tx], msg, witness=True, accept=fail_input == inputs, cb_pubkey=random.choice(host_pubkeys), fees=fee) + + def run_test(self): + VALID_SIGHASHES = [0,1,2,3,0x81,0x82,0x83] + spenders = [] + + for p2sh in [False, True]: + random_annex = bytes([ANNEX_TAG]) + random_bytes(random.randrange(0, 5)) + for annex in [None, random_annex]: + standard = annex is None + sec1, sec2 = ECKey(), ECKey() + sec1.generate() + sec2.generate() + pub1, pub2 = sec1.get_pubkey(), sec2.get_pubkey() + + # Sighash mutation tests + for hashtype in VALID_SIGHASHES: + # Pure pubkey + info = taproot_construct(pub1, []) + spender_sighash_mutation(spenders, info, p2sh, "sighash/pk#pk", key=sec1, hashtype=hashtype, annex=annex, standard=standard) + # Pubkey/P2PK script combination + scripts = [CScript(random_checksig_style(pub2.get_bytes()))] + info = taproot_construct(pub1, scripts) + spender_sighash_mutation(spenders, info, p2sh, "sighash/p2pk#pk", key=sec1, hashtype=hashtype, annex=annex, standard=standard) + spender_sighash_mutation(spenders, info, p2sh, "sighash/p2pk#s0", script=scripts[0], key=sec2, hashtype=hashtype, annex=annex, standard=standard) + # More complex script structure + scripts = [ + CScript(random_checksig_style(pub2.get_bytes()) + bytes([OP_CODESEPARATOR])), # codesep after checksig + CScript(bytes([OP_CODESEPARATOR]) + random_checksig_style(pub2.get_bytes())), # codesep before checksig + CScript([bytes([1,2,3]), OP_DROP, OP_IF, OP_CODESEPARATOR, pub1.get_bytes(), OP_ELSE, OP_CODESEPARATOR, pub2.get_bytes(), OP_ENDIF, OP_CHECKSIG]), # branch dependent codesep + ] + info = taproot_construct(pub1, scripts) + spender_sighash_mutation(spenders, info, p2sh, "sighash/codesep#pk", key=sec1, hashtype=hashtype, annex=annex, standard=standard) + spender_sighash_mutation(spenders, info, p2sh, "sighash/codesep#s0", script=scripts[0], key=sec2, hashtype=hashtype, annex=annex, standard=standard) + spender_sighash_mutation(spenders, info, p2sh, "sighash/codesep#s1", script=scripts[1], key=sec2, hashtype=hashtype, annex=annex, pos=0, standard=standard) + spender_sighash_mutation(spenders, info, p2sh, "sighash/codesep#s2a", script=scripts[2], key=sec1, hashtype=hashtype, annex=annex, pos=3, suffix=[bytes([1])], standard=standard) + spender_sighash_mutation(spenders, info, p2sh, "sighash/codesep#s2b", script=scripts[2], key=sec2, hashtype=hashtype, annex=annex, pos=6, suffix=[bytes([])], standard=standard) + + # OP_SUCCESSx and unknown tapscript versions + scripts = [ + CScript([random_op_success()]), + CScript([OP_0, OP_IF, random_op_success(), OP_ENDIF, OP_RETURN]), + CScript([random_op_success(), OP_VERIF]), + CScript(random_script(10000) + bytes([random_op_success()]) + random_invalid_push(random.randint(1,10))), + (random_unknown_tapscript_ver(), CScript([OP_RETURN])), + (random_unknown_tapscript_ver(), CScript(random_script(10000) + random_invalid_push(random.randint(1,10)))), + (ANNEX_TAG & 0xfe, CScript()), + ] + info = taproot_construct(pub1, scripts) + spender_sighash_mutation(spenders, info, p2sh, "alwaysvalid/pk", key=sec1, hashtype=random.choice(VALID_SIGHASHES), annex=annex, standard=standard) + spender_alwaysvalid(spenders, info, p2sh, "alwaysvalid/success", script=scripts[0], annex=annex) + spender_alwaysvalid(spenders, info, p2sh, "alwaysvalid/success#if", script=scripts[1], annex=annex) + spender_alwaysvalid(spenders, info, p2sh, "alwaysvalid/success#verif", script=scripts[2], annex=annex) + spender_alwaysvalid(spenders, info, p2sh, "alwaysvalid/success#10000+", script=scripts[3], annex=annex) + spender_alwaysvalid(spenders, info, p2sh, "alwaysvalid/unknownversion#return", script=scripts[4], annex=annex) + spender_alwaysvalid(spenders, info, p2sh, "alwaysvalid/unknownversion#10000+", script=scripts[5], annex=annex) + if (info[2][scripts[6][1]][0] != ANNEX_TAG or annex is not None): + # Annex is mandatory for control block with version 0x50 + spender_alwaysvalid(spenders, info, p2sh, "alwaysvalid/unknownversion#fe", script=scripts[6], annex=annex) + + # Run all tests once with individual inputs, once with groups of inputs + self.test_spenders(spenders, input_counts=[1]) + self.test_spenders(spenders, input_counts=[2,3,4]) + + +if __name__ == '__main__': + TAPROOTTest().main() diff --git a/test/functional/test_framework/blocktools.py b/test/functional/test_framework/blocktools.py index d741b00ba0a71..2c1a73a607e05 100644 --- a/test/functional/test_framework/blocktools.py +++ b/test/functional/test_framework/blocktools.py @@ -99,8 +99,8 @@ def script_BIP34_coinbase_height(height): return CScript([CScriptNum(height)]) -def create_coinbase(height, pubkey=None): - """Create a coinbase transaction, assuming no miner fees. +def create_coinbase(height, pubkey=None, fees=0): + """Create a coinbase transaction. If pubkey is passed in, the coinbase output will be a P2PK output; otherwise an anyone-can-spend output.""" @@ -110,6 +110,7 @@ def create_coinbase(height, pubkey=None): coinbaseoutput.nValue = 50 * COIN halvings = int(height / 150) # regtest coinbaseoutput.nValue >>= halvings + coinbaseoutput.nValue += fees if (pubkey is not None): coinbaseoutput.scriptPubKey = CScript([pubkey, OP_CHECKSIG]) else: diff --git a/test/functional/test_framework/key.py b/test/functional/test_framework/key.py index 912c0ca978e31..b3c283623692e 100644 --- a/test/functional/test_framework/key.py +++ b/test/functional/test_framework/key.py @@ -7,6 +7,7 @@ keys, and is trivially vulnerable to side channel attacks. Do not use for anything but tests.""" import random +import hashlib def modinv(a, n): """Compute the modular inverse of a modulo n @@ -212,7 +213,8 @@ def mul(self, ps): r = self.add(r, p) return r -SECP256K1 = EllipticCurve(2**256 - 2**32 - 977, 0, 7) +SECP256K1_FIELD_SIZE = 2**256 - 2**32 - 977 +SECP256K1 = EllipticCurve(SECP256K1_FIELD_SIZE, 0, 7) SECP256K1_G = (0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798, 0x483ADA7726A3C4655DA4FBFC0E1108A8FD17B448A68554199C47D08FFB10D4B8, 1) SECP256K1_ORDER = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141 SECP256K1_ORDER_HALF = SECP256K1_ORDER // 2 @@ -322,6 +324,38 @@ def verify_ecdsa(self, sig, msg, low_s=True): return False return True + def verify_schnorr(self, sig, msg): + assert(len(msg) == 32) + assert(len(sig) == 64) + assert(self.valid) + assert(self.compressed) + r = int.from_bytes(sig[0:32], 'big') + if r >= SECP256K1_FIELD_SIZE: + return False + s = int.from_bytes(sig[32:64], 'big') + if s >= SECP256K1_ORDER: + return False + e = int.from_bytes(hashlib.sha256(sig[0:32] + self.get_bytes() + msg).digest(), 'big') % SECP256K1_ORDER + R = SECP256K1.mul([(SECP256K1_G, s), (self.p, SECP256K1_ORDER - e)]) + if R[2] == 0 or jacobi_symbol(R[1] * R[2], SECP256K1_FIELD_SIZE) != 1 or ((r * R[2] * R[2]) % SECP256K1_FIELD_SIZE) != R[0]: + return False + return True + + def tweak_add(self, tweak): + assert(self.valid) + assert(len(tweak) == 32) + t = int.from_bytes(tweak, 'big') + if t >= SECP256K1_ORDER: + return None + tweaked = SECP256K1.affine(SECP256K1.mul([(self.p, 1), (SECP256K1_G, t)])) + if tweaked is None: + return None + ret = ECPubKey() + ret.p = tweaked + ret.valid = True + ret.compressed = self.compressed + return ret + class ECKey(): """A secp256k1 private key""" @@ -384,3 +418,29 @@ def sign_ecdsa(self, msg, low_s=True): rb = r.to_bytes((r.bit_length() + 8) // 8, 'big') sb = s.to_bytes((s.bit_length() + 8) // 8, 'big') return b'\x30' + bytes([4 + len(rb) + len(sb), 2, len(rb)]) + rb + bytes([2, len(sb)]) + sb + + def sign_schnorr(self, msg): + """Construct a bip-schnorr compatible signature with this key.""" + assert(self.valid) + assert(self.compressed) + assert(len(msg) == 32) + kp = int.from_bytes(hashlib.sha256(self.get_bytes() + msg).digest(), 'big') % SECP256K1_ORDER + assert(kp != 0) + R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, kp)])) + k = kp if jacobi_symbol(R[1], SECP256K1_FIELD_SIZE) == 1 else SECP256K1_ORDER - kp + e = int.from_bytes(hashlib.sha256(R[0].to_bytes(32, 'big') + self.get_pubkey().get_bytes() + msg).digest(), 'big') % SECP256K1_ORDER + return R[0].to_bytes(32, 'big') + ((k + e*self.secret) % SECP256K1_ORDER).to_bytes(32, 'big') + + def tweak_add(self, tweak): + """Return a tweaked version of this private key.""" + assert(self.valid) + assert(len(tweak) == 32) + t = int.from_bytes(tweak, 'big') + if t >= SECP256K1_ORDER: + return None + tweaked = (self.secret + t) % SECP256K1_ORDER + if tweaked == 0: + return None + ret = ECKey() + ret.set(tweaked.to_bytes(32, 'big'), self.compressed) + return ret diff --git a/test/functional/test_framework/script.py b/test/functional/test_framework/script.py index 384062b8084fe..962d2687f4883 100644 --- a/test/functional/test_framework/script.py +++ b/test/functional/test_framework/script.py @@ -7,7 +7,8 @@ This file is modified from python-bitcoinlib. """ -from .messages import CTransaction, CTxOut, sha256, hash256, uint256_from_str, ser_uint256, ser_string +from .messages import CTransaction, CTxOut, sha256, hash256, uint256_from_str, ser_uint256, ser_string, CTxInWitness +from .key import ECKey, ECPubKey import hashlib import struct @@ -15,9 +16,14 @@ from .bignum import bn2vch MAX_SCRIPT_ELEMENT_SIZE = 520 +LOCKTIME_THRESHOLD = 500000000 +ANNEX_TAG = 0x50 OPCODE_NAMES = {} +DEFAULT_TAPSCRIPT_VER = 0xc0 +TAPROOT_VER = 0 + def hash160(s): return hashlib.new('ripemd160', sha256(s)).digest() @@ -223,11 +229,8 @@ def __new__(cls, n): OP_NOP9 = CScriptOp(0xb8) OP_NOP10 = CScriptOp(0xb9) -# template matching params -OP_SMALLINTEGER = CScriptOp(0xfa) -OP_PUBKEYS = CScriptOp(0xfb) -OP_PUBKEYHASH = CScriptOp(0xfd) -OP_PUBKEY = CScriptOp(0xfe) +# tapscript +OP_CHECKSIGADD = CScriptOp(0xba) OP_INVALIDOPCODE = CScriptOp(0xff) @@ -343,10 +346,7 @@ def __new__(cls, n): OP_NOP8 : 'OP_NOP8', OP_NOP9 : 'OP_NOP9', OP_NOP10 : 'OP_NOP10', - OP_SMALLINTEGER : 'OP_SMALLINTEGER', - OP_PUBKEYS : 'OP_PUBKEYS', - OP_PUBKEYHASH : 'OP_PUBKEYHASH', - OP_PUBKEY : 'OP_PUBKEY', + OP_CHECKSIGADD : 'OP_CHECKSIGADD', OP_INVALIDOPCODE : 'OP_INVALIDOPCODE', }) @@ -607,6 +607,20 @@ def FindAndDelete(script, sig): r += script[last_sop_idx:] return CScript(r) +def IsPayToScriptHash(script): + return len(script) == 23 and script[0] == OP_HASH160 and script[1] == 20 and script[22] == OP_EQUAL + +def IsPayToTaproot(script): + return len(script) == 35 and script[0] == OP_1 and script[1] == 33 and script[2] >= 0 and script[2] <= 1 + +def TaggedHash(tag, data): + ss = sha256(tag.encode('utf-8')) + ss += ss + ss += data + return sha256(ss) + +def GetP2SH(script): + return CScript([OP_HASH160, hash160(script), OP_EQUAL]) def SignatureHash(script, txTo, inIdx, hashtype): """Consensus-correct SignatureHash @@ -702,3 +716,102 @@ def SegwitVersion1SignatureHash(script, txTo, inIdx, hashtype, amount): ss += struct.pack("= 0 and hash_type <= 3) or (hash_type >= 0x81 and hash_type <= 0x83)) + assert (input_index < len(txTo.vin)) + spk = spent_utxos[input_index].scriptPubKey + ss = bytes([0, hash_type]) # epoch, hash_type + ss += struct.pack(" 0) + assert (codeseparator_pos >= -1) + spend_type |= 4 + ss += bytes([spend_type]) + ss += ser_string(spk) + if (hash_type & SIGHASH_ANYONECANPAY): + ss += txTo.vin[input_index].prevout.serialize() + ss += struct.pack("= 0 and version < 0xff and not (version & 1) + data = pubkey.get_bytes() + return bytes([data[0] & 1 | version]) + data[1:] + +def taproot_tree_helper(scripts): + if len(scripts) == 1: + script = scripts[0] + if isinstance(script, list): + return taproot_tree_helper(script) + version = DEFAULT_TAPSCRIPT_VER + if isinstance(script, tuple): + version, script = script + assert isinstance(script, bytes) + h = TaggedHash("TapLeaf", bytes([version & 0xfe]) + ser_string(script)) + return ([(version, script, bytes())], h) + split_pos = len(scripts) // 2 + left, left_h = taproot_tree_helper(scripts[0:split_pos]) + right, right_h = taproot_tree_helper(scripts[split_pos:]) + left = [(version, script, control + right_h) for version, script, control in left] + right = [(version, script, control + left_h) for version, script, control in right] + if right_h < left_h: + right_h, left_h = left_h, right_h + h = TaggedHash("TapBranch", left_h + right_h) + return (left + right, h) + +def taproot_construct(pubkey, scripts=[]): + """Construct a tree of taproot spending conditions + + pubkey: an ECPubKey object for the root pubkey + scripts: a list of items; each item is either: + - a CScript + - a (version, CScript) tuple + - another list of items (with the same structure) + + Returns: script (sPK or redeemScript), tweak, {script:control, ...} + """ + if len(scripts) == 0: + return (CScript([OP_1, GetVersionTaggedPubKey(pubkey, TAPROOT_VER)]), bytes([0 for i in range(32)]), {}) + + ret, h = taproot_tree_helper(scripts) + control_map = dict((script, GetVersionTaggedPubKey(pubkey, version) + control) for version, script, control in ret) + tweak = TaggedHash("TapTweak", pubkey.get_bytes() + h) + tweaked = pubkey.tweak_add(tweak) + return (CScript([OP_1, GetVersionTaggedPubKey(tweaked, TAPROOT_VER)]), tweak, control_map) + +def is_op_success(o): + return o == 0x50 or o == 0x62 or o == 0x89 or o == 0x8a or o == 0x8d or o == 0x8e or (o >= 0x7e and o <= 0x81) or (o >= 0x83 and o <= 0x86) or (o >= 0x95 and o <= 0x99) or (o >= 0xbb and o <= 0xfe) diff --git a/test/functional/test_runner.py b/test/functional/test_runner.py index ad5673e03a232..f8a35dc680816 100755 --- a/test/functional/test_runner.py +++ b/test/functional/test_runner.py @@ -148,6 +148,7 @@ 'p2p_mempool.py', 'rpc_setban.py', 'p2p_blocksonly.py', + 'feature_taproot.py', 'mining_prioritisetransaction.py', 'p2p_invalid_locator.py', 'p2p_invalid_block.py',