Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fullnode interactive rescan. #856

Merged
merged 8 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,16 @@
**When upgrading to this version of hsd, you must pass `--wallet-migrate=3` when
you run it for the first time.**

### Primitives
- TX Changes:
- tx.test no longer updates the filter.
- Introduce TX.testAndMaybeUpdate method for potentially updating filter while
testing. (old tx.test)

### Node Changes
Add support for the interactive rescan, that allows more control over rescan
process and allows parallel rescans.

#### Node HTTP API
- `GET /` or `getInfo()` now has more properties:
- `treeRootHeight` - height at which the block txns are accumulated
Expand All @@ -22,6 +31,13 @@ you run it for the first time.**
- `compactInterval` - what is the current compaction interval config.
- `nextCompaction` - when will the next compaction trigger after restart.
- `lastCompaction` - when was the last compaction run.
- Introduce `scan interactive` hook (start, filter)

### Node HTTP Client:
- Introduce `scanInteractive` method that starts interactive rescan.
- expects ws hook for `block rescan interactive` params `rawEntry, rawTXs`
that returns scanAction object.
- expects ws hook for `block rescan interactive abort` param `message`.

### Wallet Changes
#### Configuration
Expand Down
75 changes: 74 additions & 1 deletion lib/blockchain/chain.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const {OwnershipProof} = require('../covenants/ownership');
const AirdropProof = require('../primitives/airdropproof');
const {CriticalError} = require('../errors');
const thresholdStates = common.thresholdStates;
const scanActions = common.scanActions;
const {states} = NameState;

const {
Expand Down Expand Up @@ -2250,7 +2251,7 @@ class Chain extends AsyncEmitter {

/**
* Scan the blockchain for transactions containing specified address hashes.
* @param {Hash} start - Block hash to start at.
* @param {Hash|Number} start - Block hash or height to start at.
* @param {Bloom} filter - Bloom filter containing tx and address hashes.
* @param {Function} iter - Iterator.
* @returns {Promise}
Expand All @@ -2265,6 +2266,78 @@ class Chain extends AsyncEmitter {
}
}

/**
* Interactive scan the blockchain for transactions containing specified
* address hashes. Allows repeat and abort.
* @param {Hash|Number} start - Block hash or height to start at.
* @param {BloomFilter} filter - Starting bloom filter containing tx,
* address and name hashes.
* @param {Function} iter - Iterator.
*/

async scanInteractive(start, filter, iter) {
if (start == null)
start = this.network.genesis.hash;

if (typeof start === 'number')
this.logger.info('Scanning(interactive) from height %d.', start);
else
this.logger.info('Scanning(interactive) from block %x.', start);

let hash = start;

while (hash != null) {
const unlock = await this.locker.lock();

try {
const {entry, txs} = await this.db.scanBlock(hash, filter);

const action = await iter(entry, txs);

if (!action || typeof action !== 'object')
throw new Error('Did not get proper action');

switch (action.type) {
case scanActions.REPEAT: {
break;
}
case scanActions.REPEAT_SET: {
// try again with updated filter.
filter = action.filter;
break;
}
case scanActions.REPEAT_ADD: {
if (!filter)
throw new Error('No filter set.');

for (const chunk of action.chunks)
filter.add(chunk);
break;
}
case scanActions.NEXT: {
const next = await this.getNext(entry);
hash = next && next.hash;
break;
}
case scanActions.ABORT: {
this.logger.info('Scan(interactive) aborted at %x (%d).',
entry.hash, entry.height);
throw new Error('scan request aborted.');
}
default:
this.logger.debug('Scan(interactive) aborting. Unknown action: %d',
action.type);
throw new Error('Unknown action.');
}
} catch (e) {
this.logger.debug('Scan(interactive) errored. Error: %s', e.message);
throw e;
} finally {
unlock();
}
}
}

/**
* Add a block to the chain, perform all necessary verification.
* @param {Block} block
Expand Down
47 changes: 46 additions & 1 deletion lib/blockchain/chaindb.js
Original file line number Diff line number Diff line change
Expand Up @@ -1600,7 +1600,7 @@ class ChainDB {
entry.hash, entry.height);

for (const tx of block.txs) {
if (tx.test(filter))
if (tx.testAndMaybeUpdate(filter))
txs.push(tx);
}

Expand All @@ -1612,6 +1612,51 @@ class ChainDB {
this.logger.info('Finished scanning %d blocks.', total);
}

/**
* Interactive scans block checks.
* @param {Hash|Number} blockID - Block hash or height to start at.
* @param {BloomFilter} [filter] - Starting bloom filter containing tx,
* address and name hashes.
* @returns {Promise}
*/

async scanBlock(blockID, filter) {
assert(blockID != null);

const entry = await this.getEntry(blockID);

if (!entry)
throw new Error('Could not find entry.');

if (!await this.isMainChain(entry))
throw new Error('Cannot rescan an alternate chain.');

const block = await this.getBlock(entry.hash);

if (!block)
throw new Error('Block not found.');

this.logger.info(
'Scanning block %x (%d)',
entry.hash, entry.height);

let txs = [];

if (!filter) {
txs = block.txs;
} else {
for (const tx of block.txs) {
if (tx.testAndMaybeUpdate(filter))
txs.push(tx);
}
}

return {
entry,
txs
};
}

/**
* Save an entry to the database and optionally
* connect it as the tip. Note that this method
Expand Down
15 changes: 15 additions & 0 deletions lib/blockchain/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,18 @@ exports.flags = {
exports.flags.DEFAULT_FLAGS = 0
| exports.flags.VERIFY_POW
| exports.flags.VERIFY_BODY;

/**
* Interactive scan actions.
* @enum {Number}
* @default
*/

exports.scanActions = {
NONE: 0,
ABORT: 1,
NEXT: 2,
REPEAT_SET: 3,
REPEAT_ADD: 4,
REPEAT: 5
};
16 changes: 16 additions & 0 deletions lib/client/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,22 @@ class NodeClient extends Client {

return this.call('rescan', start);
}

/**
* Rescan for any missed transactions. (Interactive)
* @param {Number|Hash} start - Start block.
* @param {BloomFilter} [filter]
* @returns {Promise}
*/

rescanInteractive(start, filter = null) {
if (start == null)
start = 0;

assert(typeof start === 'number' || Buffer.isBuffer(start));

return this.call('rescan interactive', start, filter);
}
}

/*
Expand Down
2 changes: 1 addition & 1 deletion lib/net/peer.js
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ class Peer extends EventEmitter {
// Check the peer's bloom
// filter if they're using spv.
if (this.spvFilter) {
if (!tx.test(this.spvFilter))
if (!tx.testAndMaybeUpdate(this.spvFilter))
continue;
}

Expand Down
14 changes: 13 additions & 1 deletion lib/node/fullnode.js
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ class FullNode extends Node {
/**
* Rescan for any missed transactions.
* @param {Number|Hash} start - Start block.
* @param {Bloom} filter
* @param {BloomFilter} filter
* @param {Function} iter - Iterator.
* @returns {Promise}
*/
Expand All @@ -364,6 +364,18 @@ class FullNode extends Node {
return this.chain.scan(start, filter, iter);
}

/**
* Interactive rescan for any missed transactions.
* @param {Number|Hash} start - Start block.
* @param {BloomFilter} filter
* @param {Function} iter - Iterator.
* @returns {Promise}
*/

scanInteractive(start, filter, iter) {
return this.chain.scanInteractive(start, filter, iter);
}

/**
* Broadcast a transaction.
* @param {TX|Block|Claim|AirdropProof} item
Expand Down
94 changes: 93 additions & 1 deletion lib/node/http.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const TX = require('../primitives/tx');
const Claim = require('../primitives/claim');
const Address = require('../primitives/address');
const Network = require('../protocol/network');
const scanActions = require('../blockchain/common').scanActions;
const pkg = require('../pkg');

/**
Expand Down Expand Up @@ -706,6 +707,21 @@ class HTTP extends Server {

return this.scan(socket, start);
});

socket.hook('rescan interactive', (...args) => {
const valid = new Validator(args);
const start = valid.uintbhash(0);
const rawFilter = valid.buf(1);
let filter = socket.filter;

if (start == null)
throw new Error('Invalid parameter.');

if (rawFilter)
filter = BloomFilter.fromRaw(rawFilter);

return this.scanInteractive(socket, start, filter);
});
}

/**
Expand Down Expand Up @@ -813,7 +829,7 @@ class HTTP extends Server {
if (!socket.filter)
return false;

return tx.test(socket.filter);
return tx.testAndMaybeUpdate(socket.filter);
}

/**
Expand All @@ -834,6 +850,82 @@ class HTTP extends Server {

return socket.call('block rescan', block, raw);
});

return null;
}

/**
* Scan using a socket's filter (interactive).
* @param {WebSocket} socket
* @param {Hash} start
* @param {BloomFilter} filter
* @returns {Promise}
*/

async scanInteractive(socket, start, filter) {
const iter = async (entry, txs) => {
const block = entry.encode();
const raw = [];

for (const tx of txs)
raw.push(tx.encode());

const action = await socket.call('block rescan interactive', block, raw);
const valid = new Validator(action);
const actionType = valid.i32('type');

switch (actionType) {
case scanActions.NEXT:
case scanActions.ABORT:
case scanActions.REPEAT: {
return {
type: actionType
};
}
case scanActions.REPEAT_SET: {
// NOTE: This is operation is on the heavier side,
// because it sends the whole Filter that can be quite
// big depending on the situation.
// NOTE: In HTTP Context REPEAT_SET wont modify socket.filter
// but instead setup new one for the rescan. Further REPEAT_ADDs will
// modify this filter instead of the socket.filter.
const rawFilter = valid.buf('filter');
let filter = null;

if (rawFilter != null)
filter = BloomFilter.fromRaw(rawFilter);

return {
type: scanActions.REPEAT_SET,
filter: filter
};
}
case scanActions.REPEAT_ADD: {
// NOTE: This operation depending on the filter
// that was provided can be either modifying the
// socket.filter or the filter provided by REPEAT_SET.
const chunks = valid.array('chunks');

if (!chunks)
throw new Error('Invalid parameter.');

return {
type: scanActions.REPEAT_ADD,
chunks: chunks
};
}

default:
throw new Error('Unknown action.');
}
};

try {
await this.node.scanInteractive(start, filter, iter);
} catch (err) {
return socket.call('block rescan interactive abort', err.message);
}

return null;
}
}
Expand Down
Loading