Skip to content

Commit

Permalink
Merge pull request #1466 from CounterpartyXCP/kickstart
Browse files Browse the repository at this point in the history
Kickstart optimization
  • Loading branch information
adamkrellenstein authored Mar 4, 2024
2 parents 01e25bd + e3980da commit a5a0967
Show file tree
Hide file tree
Showing 11 changed files with 319 additions and 219 deletions.
8 changes: 5 additions & 3 deletions counterparty-cli/counterpartycli/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
APP_NAME = 'counterparty-server'

CONFIG_ARGS = [
[('-v', '--verbose'), {'dest': 'verbose', 'action': 'store_true', 'default': False, 'help': 'sets log level to DEBUG instead of WARNING'}],
[('-v', '--verbose'), {'dest': 'verbose', 'action': 'store_true', 'default': False, 'help': 'sets log level to DEBUG or INFO if --quiet is set'}],
[('--quiet',), {'dest': 'quiet', 'action': 'store_true', 'default': True, 'help': 'sets log level to ERROR or INFO if --verbose is set'}],
[('--testnet',), {'action': 'store_true', 'default': False, 'help': 'use {} testnet addresses and block numbers'.format(config.BTC_NAME)}],
[('--testcoin',), {'action': 'store_true', 'default': False, 'help': 'use the test {} network on every blockchain'.format(config.XCP_NAME)}],
[('--regtest',), {'action': 'store_true', 'default': False, 'help': 'use {} regtest addresses and block numbers'.format(config.BTC_NAME)}],
Expand Down Expand Up @@ -102,7 +103,7 @@ def main():

args = parser.parse_args()

log.set_up(log.ROOT_LOGGER, verbose=args.verbose, console_logfilter=os.environ.get('COUNTERPARTY_LOGGING', None))
log.set_up(log.ROOT_LOGGER, verbose=args.verbose, quiet=args.quiet, console_logfilter=os.environ.get('COUNTERPARTY_LOGGING', None))

logger.info('Running v{} of {}.'.format(APP_VERSION, APP_NAME))

Expand Down Expand Up @@ -148,7 +149,8 @@ def init_with_catch(fn, init_args):
requests_timeout=args.requests_timeout,
rpc_batch_size=args.rpc_batch_size,
check_asset_conservation=not args.no_check_asset_conservation,
force=args.force, verbose=args.verbose, console_logfilter=os.environ.get('COUNTERPARTY_LOGGING', None),
force=args.force, verbose=args.verbose, quiet=args.quiet,
console_logfilter=os.environ.get('COUNTERPARTY_LOGGING', None),
p2sh_dust_return_pubkey=args.p2sh_dust_return_pubkey,
utxo_locks_max_addresses=args.utxo_locks_max_addresses,
utxo_locks_max_age=args.utxo_locks_max_age,
Expand Down
12 changes: 10 additions & 2 deletions counterparty-lib/counterpartylib/lib/backend/addrindexrs.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,8 +520,7 @@ def search_raw_transactions(address, unconfirmed=True, only_tx_hashes=False):

return batch

def get_oldest_tx(address):
print("get_oldest_tx old")
def get_oldest_tx_legacy(address):
hsh = _address_to_hash(address)
call_result = Indexer_Thread.send({
"method": "blockchain.scripthash.get_oldest_tx",
Expand Down Expand Up @@ -622,3 +621,12 @@ def get_oldest_tx(self, address, timeout=SOCKET_TIMEOUT):
"params": [hsh]
}
return self.send(query, timeout=timeout)


ADDRINDEXRS_CLIENT = None

def get_oldest_tx(address):
global ADDRINDEXRS_CLIENT
if ADDRINDEXRS_CLIENT is None:
ADDRINDEXRS_CLIENT = AddrindexrsSocket()
return ADDRINDEXRS_CLIENT.get_oldest_tx(address)
84 changes: 71 additions & 13 deletions counterparty-lib/counterpartylib/lib/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import bitcoin as bitcoinlib
from bitcoin.core.script import CScriptInvalidError

from halo import Halo
from termcolor import colored

from counterpartylib import server
from counterpartylib.lib import config
from counterpartylib.lib import exceptions
Expand Down Expand Up @@ -69,6 +72,8 @@
for line in mainnet_burns_reader:
MAINNET_BURNS[line['tx_hash']] = line

OK_GREEN = colored("[OK]", "green")
SPINNER_STYLE = "bouncingBar"

def parse_tx(db, tx):
"""Parse the transaction, return True for success."""
Expand Down Expand Up @@ -538,32 +543,85 @@ def list_tx(db, block_hash, block_index, block_time, tx_hash, tx_index, tx_hex=N


def clean_table_from(cursor, table, block_index):
logger.info('Cleaning table {} from block_index {}...'.format(table, block_index))
cursor.execute('''DELETE FROM {} WHERE block_index > ?'''.format(table), (block_index,))


def rollback(db, block_index=0):
# clean all tables
cursor = db.cursor()
for table in TABLES + ['transaction_outputs', 'transactions', 'blocks']:
def clean_messages_tables(cursor, block_index=0):
# clean all tables except assets' blocks', 'transaction_outputs' and 'transactions'
cursor.execute('''PRAGMA foreign_keys=OFF''')
for table in TABLES:
clean_table_from(cursor, table, block_index)
cursor.close()
logger.info('Database rolled back to block_index {}'.format(block_index))
cursor.execute('''PRAGMA foreign_keys=ON''')


def clean_messages_tables(cursor, block_index=0):
def clean_transactions_tables(cursor, block_index=0):
# clean all tables except assets' blocks', 'transaction_outputs' and 'transactions'
for table in TABLES:
cursor.execute('''PRAGMA foreign_keys=OFF''')
for table in ['transaction_outputs', 'transactions', 'blocks']:
clean_table_from(cursor, table, block_index)
cursor.execute('''PRAGMA foreign_keys=ON''')


def rollback(db, block_index=0):
# clean all tables
start_time = time.time()
step = f"Cleaning database from block {block_index}..."
with Halo(text=step, spinner=SPINNER_STYLE):
cursor = db.cursor()
clean_messages_tables(cursor, block_index=block_index)
clean_transactions_tables(cursor, block_index=block_index)
cursor.close()
logger.info('Database rolled back to block_index {}'.format(block_index))
print(f'{OK_GREEN} {step}')
print('Rollback done in {:.2f}s'.format(time.time() - start_time))


def generate_progression_message(block, start_time_block_parse, start_time_all_blocks_parse, block_parsed_count, block_count, tx_index=None):
block_parsing_duration = time.time() - start_time_block_parse
cumulated_duration = time.time() - start_time_all_blocks_parse
expected_duration = (cumulated_duration / block_parsed_count) * block_count
remaining_duration = expected_duration - cumulated_duration
current_block = f"Block {block['block_index']} parsed in {block_parsing_duration:.3f}s"
blocks_parsed = f"{block_parsed_count}/{block_count} blocks parsed"
txs_indexed = " - "
if tx_index is not None:
txs_indexed = f" - tx_index: {tx_index} - "
duration = f"{cumulated_duration:.3f}s/{expected_duration:.3f}s ({remaining_duration:.3f}s)"
return f"{current_block} [{blocks_parsed}{txs_indexed}{duration}]"


def reparse(db, block_index=0):
server.connect_to_addrindexrs()

cursor = db.cursor()
clean_messages_tables(cursor, block_index=0)
# clean all tables except assets' blocks', 'transaction_outputs' and 'transactions'
step = f"Cleaning database from block {block_index}..."
with Halo(text=step, spinner=SPINNER_STYLE):
clean_messages_tables(cursor, block_index=block_index)
print(f'{OK_GREEN} {step}')

# reparse blocks
ledger.CURRENT_BLOCK_INDEX = block_index if block_index !=0 else config.BLOCK_FIRST
cursor.execute('''SELECT * FROM blocks WHERE block_index > ?''', (block_index,))
for block in cursor:
parse_block(db, block['block_index'], block['block_time'])
start_time_all_blocks_parse = time.time()
block_parsed_count = 0
count_query = "SELECT COUNT(*) AS cnt FROM blocks WHERE block_index > ?"
block_count = cursor.execute(count_query, (block_index,)).fetchone()['cnt']
step = f"Reparsing blocks from block {block_index}..."
with Halo(text=step, spinner=SPINNER_STYLE) as spinner:
cursor.execute('''SELECT * FROM blocks WHERE block_index > ? ORDER BY block_index''', (block_index,))
for block in cursor:
start_time_block_parse = time.time()
ledger.CURRENT_BLOCK_INDEX = block['block_index']
parse_block(db, block['block_index'], block['block_time'])
block_parsed_count += 1
message = generate_progression_message(
block,
start_time_block_parse, start_time_all_blocks_parse,
block_parsed_count, block_count
)
spinner.text = message
print(f'{OK_GREEN} {message}')
print('All blocks reparsed in {:.2f}s'.format(time.time() - start_time_all_blocks_parse))


def last_db_index(db):
Expand Down
14 changes: 9 additions & 5 deletions counterparty-lib/counterpartylib/lib/gettxinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ def get_dispensers_tx_info(sources, dispensers_outputs):
return source, destination, btc_amount, fee, data, outs


def parse_transaction_vouts(decoded_tx, p2sh_support):
def parse_transaction_vouts(decoded_tx):
# Get destinations and data outputs.
destinations, btc_amount, fee, data, potential_dispensers = [], 0, 0, b'', []

Expand All @@ -272,7 +272,7 @@ def parse_transaction_vouts(decoded_tx, p2sh_support):
potential_dispensers[-1] = (new_destination, output_value)
except:
raise DecodeError('unrecognised output type')
elif p2sh_support and asm[0] == OP_HASH160 and asm[-1] == OP_EQUAL and len(asm) == 3:
elif ledger.enabled('p2sh_addresses') and asm[0] == OP_HASH160 and asm[-1] == OP_EQUAL and len(asm) == 3:
new_destination, new_data = decode_scripthash(asm)
if ledger.enabled('p2sh_dispensers_support'):
potential_dispensers[-1] = (new_destination, output_value)
Expand Down Expand Up @@ -305,7 +305,7 @@ def parse_transaction_vouts(decoded_tx, p2sh_support):
return destinations, btc_amount, fee, data, potential_dispensers


def get_tx_info_new(db, decoded_tx, block_index, block_parser=None, p2sh_support=False, p2sh_is_segwit=False):
def get_tx_info_new(db, decoded_tx, block_index, block_parser=None, p2sh_is_segwit=False):
"""Get multisig transaction info.
The destinations, if they exists, always comes before the data output; the
change, if it exists, always comes after.
Expand All @@ -316,7 +316,12 @@ def get_tx_info_new(db, decoded_tx, block_index, block_parser=None, p2sh_support
raise DecodeError('coinbase transaction')

# Get destinations and data outputs.
destinations, btc_amount, fee, data, potential_dispensers = parse_transaction_vouts(decoded_tx, p2sh_support)
if 'parsed_vouts' in decoded_tx:
if decoded_tx['parsed_vouts'] == "DecodeError":
raise DecodeError('unrecognised output type')
destinations, btc_amount, fee, data, potential_dispensers = decoded_tx['parsed_vouts']
else:
destinations, btc_amount, fee, data, potential_dispensers = parse_transaction_vouts(decoded_tx)

# source can be determined by parsing the p2sh_data transaction
# or from the first spent output
Expand Down Expand Up @@ -474,7 +479,6 @@ def _get_tx_info(db, decoded_tx, block_index, block_parser=None, p2sh_is_segwit=
decoded_tx,
block_index,
block_parser=block_parser,
p2sh_support=True,
p2sh_is_segwit=p2sh_is_segwit,
)
elif ledger.enabled('multisig_addresses', block_index=block_index): # Protocol change.
Expand Down
74 changes: 19 additions & 55 deletions counterparty-lib/counterpartylib/lib/kickstart/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from counterpartylib import server
from counterpartylib.lib import config, blocks, ledger, backend, database, log
from counterpartylib.lib.kickstart.blocks_parser import BlockchainParser, ChainstateParser
from counterpartylib.lib.kickstart.utils import remove_shm_from_resource_tracker
from counterpartylib.lib.backend.addrindexrs import AddrindexrsSocket

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -79,7 +80,7 @@ def fetch_blocks(cursor, bitcoind_dir, last_known_hash, first_block, spinner):
cursor.execute(f'''INSERT INTO kickstart_blocks (block_index, block_hash, block_time, previous_block_hash, difficulty, tx_count)
VALUES {', '.join(bindings_place)}''',
bindings_lot)
spinner.text = f"Block {bindings_lot[0]} to {bindings_lot[-6]} inserted."
spinner.text = f"Initialising database: block {bindings_lot[0]} to {bindings_lot[-6]} inserted."
if block['block_index'] == first_block:
break
block_parser.close()
Expand Down Expand Up @@ -124,21 +125,6 @@ def prepare_db_for_resume(cursor):
return block_count, tx_index, last_parsed_block


def connect_to_addrindexrs():
step = 'Connecting to `addrindexrs`...'
with Halo(text=step, spinner=SPINNER_STYLE):
ledger.CURRENT_BLOCK_INDEX = 0
backend.BACKEND()
check_addrindexrs = {}
while check_addrindexrs == {}:
check_address = "tb1qurdetpdk8zg2thzx3g77qkgr7a89cp2m429t9c" if config.TESTNET else "1GsjsKKT4nH4GPmDnaxaZEDWgoBpmexwMA"
check_addrindexrs = backend.get_oldest_tx(check_address)
if check_addrindexrs == {}:
logger.info('`addrindexrs` is not ready. Waiting one second.')
time.sleep(1)
print(f'{OK_GREEN} {step}')


def get_bitcoind_dir(bitcoind_dir=None):
if bitcoind_dir is None:
if platform.system() == 'Darwin':
Expand Down Expand Up @@ -180,9 +166,8 @@ def intialize_kickstart_db(bitcoind_dir, last_known_hash, resuming, new_database
if not resuming:
first_block = config.BLOCK_FIRST
if not new_database:
first_block = cursor.execute('SELECT block_index FROM blocks ORDER BY block_index ASC LIMIT 1').fetchone()['block_index']
first_block = cursor.execute('SELECT block_index FROM blocks ORDER BY block_index DESC LIMIT 1').fetchone()['block_index']
fetch_blocks(cursor, bitcoind_dir, last_known_hash, first_block, spinner)

# get last block index
spinner.text = step
block_count, tx_index, last_parsed_block = prepare_db_for_resume(cursor)
Expand Down Expand Up @@ -244,10 +229,12 @@ def backup_if_needed(new_database, resuming):
return
# move old database
backup_db(move=True)
return True
else:
backup_db()
elif not new_database:
backup_db()
return new_database


def parse_block(kickstart_db, cursor, block, block_parser, tx_index):
Expand Down Expand Up @@ -284,27 +271,14 @@ def parse_block(kickstart_db, cursor, block, block_parser, tx_index):
return tx_index


def generate_progression_message(block, tx_index, start_time_block_parse, start_time_all_blocks_parse, block_parsed_count, block_count):
block_parsing_duration = time.time() - start_time_block_parse
message = f"Block {block['block_index']} parsed in {block_parsing_duration:.3f}s."
message += f" {tx_index} transactions indexed."
cumulated_duration = time.time() - start_time_all_blocks_parse
message += f" Cumulated duration: {cumulated_duration:.3f}s."
expected_duration = (cumulated_duration / block_parsed_count) * block_count
message += f" Expected duration: {expected_duration:.3f}s."
return message


def cleanup(kickstart_db, block_parser):
remove_shm_from_resource_tracker()
step = 'Cleaning up...'
with Halo(text=step, spinner=SPINNER_STYLE):
# empyt queue to clean shared memory
try:
block_parser.block_parsed()
block_parser.close()
#block = block_parser.next_block(timeout=1)
#while block is not None:
# block = block_parser.next_block(timeout=1)
except (Empty, FileNotFoundError):
pass
backend.stop()
Expand All @@ -314,12 +288,6 @@ def cleanup(kickstart_db, block_parser):


def run(bitcoind_dir, force=False, max_queue_size=None, debug_block=None):
# set log level
if not config.VERBOSE and debug_block is None:
log.ROOT_LOGGER.setLevel(logging.ERROR)
else:
log.ROOT_LOGGER.setLevel(logging.INFO)

# default signal handlers
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.default_int_handler)
Expand All @@ -328,12 +296,8 @@ def run(bitcoind_dir, force=False, max_queue_size=None, debug_block=None):
if not force:
confirm_kickstart()

# override backend get_oldest_tx
addrindexrs = AddrindexrsSocket()
backend.get_oldest_tx = addrindexrs.get_oldest_tx

# check addrindexrs
connect_to_addrindexrs()
server.connect_to_addrindexrs()

# determine bitoincore data directory
bitcoind_dir = get_bitcoind_dir(bitcoind_dir)
Expand All @@ -349,7 +313,7 @@ def run(bitcoind_dir, force=False, max_queue_size=None, debug_block=None):

# backup old database
if not force:
backup_if_needed(new_database, resuming)
new_database = backup_if_needed(new_database, resuming)

# initialize main timer
start_time_total = time.time()
Expand Down Expand Up @@ -378,29 +342,29 @@ def run(bitcoind_dir, force=False, max_queue_size=None, debug_block=None):
start_time_block_parse = time.time()
# parse block
tx_index = parse_block(kickstart_db, cursor, block, block_parser, tx_index)
# check if we are done
if block['block_hash'] == last_known_hash:
break
# update last parsed block
last_parsed_block = block['block_index']
# update block parsed count
block_parsed_count += 1
# let's have a nice message
spinner.text = generate_progression_message(
block, tx_index,
message = blocks.generate_progression_message(
block,
start_time_block_parse, start_time_all_blocks_parse,
block_parsed_count, block_count
block_parsed_count, block_count,
tx_index
)
spinner.text = message
# notify block parsed
block_parser.block_parsed()
# get next block
# check if we are done
if block['block_hash'] == last_known_hash:
break
if debug_block is not None and block['block_index'] == int(debug_block):
block = None
else:
block = block_parser.next_block()
break
# get next block
block = block_parser.next_block()

spinner.stop()
print('All blocks parsed in: {:.3f}s'.format(time.time() - start_time_all_blocks_parse))
except FileNotFoundError:
pass # block file not found on stopping
except KeyboardInterrupt:
Expand Down
Loading

0 comments on commit a5a0967

Please sign in to comment.