From 52c0538765822e270f5dc76bf1b63d77216f7c26 Mon Sep 17 00:00:00 2001 From: Geoffrey Hayes Date: Sat, 1 Dec 2018 12:15:46 -0800 Subject: [PATCH 1/5] Fix Frame Size Issue When packets come in, sometimes they are incomplete. We previously handled packets that had a valid header but insufficient body size. This patch adds support for packets that don't even have complete headers. We queue the packet into queued data and wait for more data before processing. --- apps/ex_wire/lib/ex_wire/framing/frame.ex | 141 ++++++++++++---------- apps/ex_wire/lib/ex_wire/p2p/manager.ex | 6 +- 2 files changed, 79 insertions(+), 68 deletions(-) diff --git a/apps/ex_wire/lib/ex_wire/framing/frame.ex b/apps/ex_wire/lib/ex_wire/framing/frame.ex index 7b85939d1..3c3fc5552 100644 --- a/apps/ex_wire/lib/ex_wire/framing/frame.ex +++ b/apps/ex_wire/lib/ex_wire/framing/frame.ex @@ -16,6 +16,7 @@ defmodule ExWire.Framing.Frame do @type frame :: binary() @padding_size 16 + @min_frame_size 16 + 16 @spec frame(integer(), ExRLP.t(), Secrets.t()) :: {frame, Secrets.t()} def frame( @@ -97,7 +98,11 @@ defmodule ExWire.Framing.Frame do end @spec unframe(binary(), Secrets.t()) :: - {:ok, integer(), binary(), binary(), Secrets.t()} | {:error, String.t()} + {:ok, integer(), binary(), binary(), Secrets.t()} + | {:error, + :insufficient_data + | :failed_to_match_header_ingress_mac + | :failed_to_match_frame_ingress_mac} def unframe( frame, frame_secrets = %Secrets{ @@ -107,80 +112,84 @@ defmodule ExWire.Framing.Frame do mac_secret: mac_secret } ) do - << - # is header always 128 bits? - header_enc::binary-size(16), - header_mac::binary-size(16), - frame_rest::binary() - >> = frame - - # verify header mac - ingress_mac = update_mac(ingress_mac, mac_encoder, mac_secret, header_enc) - expected_header_mac = ingress_mac |> MAC.final() |> Binary.take(16) - - if expected_header_mac != header_mac do - :ok = - Logger.error( - "Failed to match ingress header mac, expected: #{inspect(expected_header_mac)}, got #{ - inspect(header_mac) - }" - ) - - {:error, "Failed to match header ingress mac"} + if byte_size(frame) < @min_frame_size do + {:error, :insufficient_data} else - {decoder_stream, header} = AES.stream_decrypt(header_enc, decoder_stream) - << - frame_size::integer-size(24), - _header_data_and_padding::binary() - >> = header - - # TODO: We should read the header? But, it's unused by all clients. - # header_rlp = header_data_and_padding |> ExRLP.decode - # protocol_id = Enum.at(header_rlp, 0) |> ExRLP.decode - - frame_padding_bytes = padding_size(frame_size, @padding_size) - - if byte_size(frame_rest) < frame_size + frame_padding_bytes + 16 do - {:error, "Insufficent data"} + # is header always 128 bits? + header_enc::binary-size(16), + header_mac::binary-size(16), + frame_rest::binary() + >> = frame + + # verify header mac + ingress_mac = update_mac(ingress_mac, mac_encoder, mac_secret, header_enc) + expected_header_mac = ingress_mac |> MAC.final() |> Binary.take(16) + + if expected_header_mac != header_mac do + :ok = + Logger.error( + "Failed to match ingress header mac, expected: #{inspect(expected_header_mac)}, got #{ + inspect(header_mac) + }" + ) + + {:error, :failed_to_match_header_ingress_mac} else - # let's go and ignore the entire header data.... + {decoder_stream, header} = AES.stream_decrypt(header_enc, decoder_stream) + << - frame_enc::binary-size(frame_size), - frame_padding::binary-size(frame_padding_bytes), - frame_mac::binary-size(16), - frame_rest::binary() - >> = frame_rest + frame_size::integer-size(24), + _header_data_and_padding::binary() + >> = header - frame_enc_with_padding = frame_enc <> frame_padding + # TODO: We should read the header? But, it's unused by all clients. + # header_rlp = header_data_and_padding |> ExRLP.decode + # protocol_id = Enum.at(header_rlp, 0) |> ExRLP.decode - ingress_mac = MAC.update(ingress_mac, frame_enc_with_padding) - ingress_mac = update_mac(ingress_mac, mac_encoder, mac_secret, nil) - expected_frame_mac = ingress_mac |> MAC.final() |> Binary.take(16) + frame_padding_bytes = padding_size(frame_size, @padding_size) - if expected_frame_mac != frame_mac do - {:error, "Failed to match frame ingress mac"} + if byte_size(frame_rest) < frame_size + frame_padding_bytes + 16 do + {:error, :insufficient_data} else - {decoder_stream, frame_with_padding} = - AES.stream_decrypt(frame_enc_with_padding, decoder_stream) - - << - frame::binary-size(frame_size), - _frame_padding::binary() - >> = frame_with_padding - + # let's go and ignore the entire header data.... << - packet_type_rlp::binary-size(1), - packet_data_rlp::binary() - >> = frame - - { - :ok, - packet_type_rlp |> ExRLP.decode() |> :binary.decode_unsigned(), - packet_data_rlp |> ExRLP.decode(), - frame_rest, - %{frame_secrets | ingress_mac: ingress_mac, decoder_stream: decoder_stream} - } + frame_enc::binary-size(frame_size), + frame_padding::binary-size(frame_padding_bytes), + frame_mac::binary-size(16), + frame_rest::binary() + >> = frame_rest + + frame_enc_with_padding = frame_enc <> frame_padding + + ingress_mac = MAC.update(ingress_mac, frame_enc_with_padding) + ingress_mac = update_mac(ingress_mac, mac_encoder, mac_secret, nil) + expected_frame_mac = ingress_mac |> MAC.final() |> Binary.take(16) + + if expected_frame_mac != frame_mac do + {:error, :failed_to_match_frame_ingress_mac} + else + {decoder_stream, frame_with_padding} = + AES.stream_decrypt(frame_enc_with_padding, decoder_stream) + + << + frame::binary-size(frame_size), + _frame_padding::binary() + >> = frame_with_padding + + << + packet_type_rlp::binary-size(1), + packet_data_rlp::binary() + >> = frame + + { + :ok, + packet_type_rlp |> ExRLP.decode() |> :binary.decode_unsigned(), + packet_data_rlp |> ExRLP.decode(), + frame_rest, + %{frame_secrets | ingress_mac: ingress_mac, decoder_stream: decoder_stream} + } + end end end end diff --git a/apps/ex_wire/lib/ex_wire/p2p/manager.ex b/apps/ex_wire/lib/ex_wire/p2p/manager.ex index 4e2948e2a..c24c81342 100644 --- a/apps/ex_wire/lib/ex_wire/p2p/manager.ex +++ b/apps/ex_wire/lib/ex_wire/p2p/manager.ex @@ -133,13 +133,15 @@ defmodule ExWire.P2P.Manager do # TOOD: How does this work exactly? Is this for multiple frames? handle_packet_data(frame_rest, conn_after_handle) - {:error, "Insufficent data"} -> + {:error, :insufficient_data} -> %{conn | queued_data: total_data} {:error, reason} -> _ = Logger.error( - "[Network] [#{peer}] Failed to read incoming packet from #{peer.host_name} `#{reason}`)" + "[Network] [#{peer}] Failed to read incoming packet from #{peer.host_name} `#{ + to_string(reason) + }`)" ) %{conn | last_error: reason} From 0ee08a962695ee34befefa0a84569d44e0a23a73 Mon Sep 17 00:00:00 2001 From: Geoffrey Hayes Date: Fri, 30 Nov 2018 20:02:09 -0800 Subject: [PATCH 2/5] Connect Warp Sync to Proper Packets This patch connects Warp Sync to the correct packets that come back from Parity when warp syncing. There were a few issues with the algorithm for determining packet ids. It appears that each capability has a size (to allow for protocol upgrades) and we skip that many before the next protocol. These aren't documented from come by Parity's source code. Additionally, it appears that once you negotiate a protocol, you still use the offests from Pv62 but from the higher negotiated ids. It's all a bit weird, but this seems to get everything working. Finally, we had an issue with default map getting called *a lot*, so we instead opt to hard-code the value and try to pass nil around instead of default map. --- apps/ex_wire/config/config.exs | 4 ++-- apps/ex_wire/lib/ex_wire/dev_p2p/session.ex | 4 ++-- .../lib/ex_wire/packet/capability/eth.ex | 11 ++++++++++- .../packet/capability/eth/new_block.ex | 17 ++++++++--------- .../lib/ex_wire/packet/capability/mana.ex | 4 +++- .../lib/ex_wire/packet/capability/par.ex | 19 ++++++++++++++++++- .../lib/ex_wire/packet/packet_id_map.ex | 11 ++++++++--- .../packet/capability/eth/new_block_test.exs | 5 +++++ 8 files changed, 56 insertions(+), 19 deletions(-) create mode 100644 apps/ex_wire/test/ex_wire/packet/capability/eth/new_block_test.exs diff --git a/apps/ex_wire/config/config.exs b/apps/ex_wire/config/config.exs index a07eb315b..c071c19cd 100644 --- a/apps/ex_wire/config/config.exs +++ b/apps/ex_wire/config/config.exs @@ -26,8 +26,8 @@ mana_version = config :ex_wire, p2p_version: 0x04, - protocol_version: 63, - caps: [{"eth", 62}, {"eth", 63}, {"par", 1}], + protocol_version: 62, + caps: [{"eth", 62}, {"par", 1}], # TODO: This should be set and stored in a file private_key: :random, bootnodes: :from_chain, diff --git a/apps/ex_wire/lib/ex_wire/dev_p2p/session.ex b/apps/ex_wire/lib/ex_wire/dev_p2p/session.ex index 9fa719010..bccd4c300 100644 --- a/apps/ex_wire/lib/ex_wire/dev_p2p/session.ex +++ b/apps/ex_wire/lib/ex_wire/dev_p2p/session.ex @@ -65,7 +65,7 @@ defmodule ExWire.DEVp2p.Session do """ @spec disconnect(t) :: t def disconnect(session = %__MODULE__{}) do - %{session | hello_sent: nil, hello_received: nil, packet_id_map: PacketIdMap.default_map()} + %{session | hello_sent: nil, hello_received: nil, packet_id_map: nil} end @doc """ @@ -100,6 +100,6 @@ defmodule ExWire.DEVp2p.Session do """ @spec compatible_capabilities?(t) :: boolean() def compatible_capabilities?(%__MODULE__{packet_id_map: packet_id_map}) do - packet_id_map != PacketIdMap.default_map() + Map.has_key?(packet_id_map.ids_to_modules, 0x10) end end diff --git a/apps/ex_wire/lib/ex_wire/packet/capability/eth.ex b/apps/ex_wire/lib/ex_wire/packet/capability/eth.ex index e5dcaaf79..daddeb6bf 100644 --- a/apps/ex_wire/lib/ex_wire/packet/capability/eth.ex +++ b/apps/ex_wire/lib/ex_wire/packet/capability/eth.ex @@ -16,7 +16,16 @@ defmodule ExWire.Packet.Capability.Eth do Eth.BlockHeaders, Eth.GetBlockBodies, Eth.BlockBodies, - Eth.NewBlock + Eth.NewBlock, + :reserved, + :reserved, + :reserved, + :reserved, + :reserved, + :reserved, + :reserved, + :reserved, + :reserved ] } diff --git a/apps/ex_wire/lib/ex_wire/packet/capability/eth/new_block.ex b/apps/ex_wire/lib/ex_wire/packet/capability/eth/new_block.ex index 6e580dbb9..6fa636812 100644 --- a/apps/ex_wire/lib/ex_wire/packet/capability/eth/new_block.ex +++ b/apps/ex_wire/lib/ex_wire/packet/capability/eth/new_block.ex @@ -15,14 +15,13 @@ defmodule ExWire.Packet.Capability.Eth.NewBlock do alias Block.Header alias Blockchain.Block - alias ExWire.Packet.Capability.Eth.Transactions + alias Blockchain.Transaction @behaviour ExWire.Packet - # TODO: fill in Transactions typespec when that packet is figured out @type t :: %__MODULE__{ header: Header.t(), - transactions: [any()], + transactions: [Transaction.t()], uncles: [Block.t()], total_difficulty: integer() | nil } @@ -53,8 +52,8 @@ defmodule ExWire.Packet.Capability.Eth.NewBlock do [ [ Header.serialize(packet.header), - Transactions.serialize(%Transactions{transactions: packet.transactions}), - packet.uncles |> Block.serialize() |> Enum.to_list() + Enum.map(packet.transactions, &Transaction.serialize/1), + Enum.map(packet.uncles, &Header.serialize/1) ], packet.total_difficulty ] @@ -70,16 +69,16 @@ defmodule ExWire.Packet.Capability.Eth.NewBlock do [ [ header, - transactions, - uncles + transaction_rlp, + uncles_rlp ], total_difficulty ] = rlp %__MODULE__{ header: Header.deserialize(header), - transactions: Transactions.deserialize(transactions).transactions, - uncles: uncles |> Block.deserialize() |> Enum.to_list(), + transactions: Enum.map(transaction_rlp, &Transaction.deserialize/1), + uncles: Enum.map(uncles_rlp, &Header.deserialize/1), total_difficulty: total_difficulty |> :binary.decode_unsigned() } end diff --git a/apps/ex_wire/lib/ex_wire/packet/capability/mana.ex b/apps/ex_wire/lib/ex_wire/packet/capability/mana.ex index 4c968d44b..b9089a3d2 100644 --- a/apps/ex_wire/lib/ex_wire/packet/capability/mana.ex +++ b/apps/ex_wire/lib/ex_wire/packet/capability/mana.ex @@ -1,9 +1,11 @@ defmodule ExWire.Packet.Capability.Mana do alias ExWire.Packet.Capability alias ExWire.Packet.Capability.Eth + alias ExWire.Packet.Capability.Par @our_capabilities_map %{ - Eth.get_name() => Eth + Eth.get_name() => Eth, + Par.get_name() => Par } @our_capabilities @our_capabilities_map diff --git a/apps/ex_wire/lib/ex_wire/packet/capability/par.ex b/apps/ex_wire/lib/ex_wire/packet/capability/par.ex index 8cd0da994..c505fa8b9 100644 --- a/apps/ex_wire/lib/ex_wire/packet/capability/par.ex +++ b/apps/ex_wire/lib/ex_wire/packet/capability/par.ex @@ -1,6 +1,7 @@ defmodule ExWire.Packet.Capability.Par do alias ExWire.Config alias ExWire.Packet.Capability + alias ExWire.Packet.Capability.Eth alias ExWire.Packet.Capability.Par @behaviour Capability @@ -10,10 +11,26 @@ defmodule ExWire.Packet.Capability.Par do @version_to_packet_types %{ 1 => [ Par.WarpStatus, + Eth.NewBlockHashes, + Eth.Transactions, + Eth.GetBlockHeaders, + Eth.BlockHeaders, + Eth.GetBlockBodies, + Eth.BlockBodies, + Eth.NewBlock, Par.GetSnapshotManifest, Par.SnapshotManifest, Par.GetSnapshotData, - Par.SnapshotData + Par.SnapshotData, + :reserved, + :reserved, + :reserved, + :reserved, + :reserved, + :reserved, + :reserved, + :reserved, + :reserved ] } diff --git a/apps/ex_wire/lib/ex_wire/packet/packet_id_map.ex b/apps/ex_wire/lib/ex_wire/packet/packet_id_map.ex index fdea5c22f..40bde637a 100644 --- a/apps/ex_wire/lib/ex_wire/packet/packet_id_map.ex +++ b/apps/ex_wire/lib/ex_wire/packet/packet_id_map.ex @@ -48,7 +48,7 @@ defmodule ExWire.Packet.PacketIdMap do |> Enum.filter(fn cap -> Capability.are_we_capable?(cap, Mana.get_our_capabilities_map()) end) - |> Enum.sort(fn {name1, _v1}, {name2, _v2} -> name1 < name2 end) + |> Enum.sort(fn %Capability{name: name1}, %Capability{name: name2} -> name1 < name2 end) |> Enum.map(fn cap -> Capability.get_packets_for_capability(cap, Mana.get_our_capabilities_map()) end) @@ -116,8 +116,13 @@ defmodule ExWire.Packet.PacketIdMap do defp build_capability_ids_to_modules_map(capability_packet_types, {base_id, starting_map}) do capability_packet_types |> Enum.reduce({base_id, starting_map}, fn packet_type, {next_base_id, updated_map} -> - packet_id = base_id + apply(packet_type, :message_id_offset, []) - {Kernel.max(next_base_id, packet_id + 1), Map.put(updated_map, packet_id, packet_type)} + if packet_type == :reserved do + {next_base_id + 1, updated_map} + else + packet_id = base_id + apply(packet_type, :message_id_offset, []) + + {Kernel.max(next_base_id, packet_id + 1), Map.put(updated_map, packet_id, packet_type)} + end end) end end diff --git a/apps/ex_wire/test/ex_wire/packet/capability/eth/new_block_test.exs b/apps/ex_wire/test/ex_wire/packet/capability/eth/new_block_test.exs new file mode 100644 index 000000000..d7d44839b --- /dev/null +++ b/apps/ex_wire/test/ex_wire/packet/capability/eth/new_block_test.exs @@ -0,0 +1,5 @@ +defmodule ExWire.Packet.Capability.Eth.NewBlockTest do + use ExUnit.Case, async: true + alias ExWire.Packet.Capability.Eth.NewBlock + doctest NewBlock +end From 6a01be5afe6027031b74ea726ae968d2c88f1da1 Mon Sep 17 00:00:00 2001 From: Geoffrey Hayes Date: Sat, 1 Dec 2018 18:55:38 -0800 Subject: [PATCH 3/5] Add new packet types for Fast Sync --- .../packet/capability/eth/get_node_data.ex | 120 ++++++++++++++++ .../packet/capability/eth/get_receipts.ex | 128 ++++++++++++++++++ .../packet/capability/eth/node_data.ex | 83 ++++++++++++ .../ex_wire/packet/capability/eth/receipts.ex | 87 ++++++++++++ .../capability/eth/get_node_data_test.exs | 25 ++++ .../capability/eth/get_receipts_test.exs | 39 ++++++ .../packet/capability/eth/node_data_test.exs | 4 + .../packet/capability/eth/receipts_test.exs | 4 + 8 files changed, 490 insertions(+) create mode 100644 apps/ex_wire/lib/ex_wire/packet/capability/eth/get_node_data.ex create mode 100644 apps/ex_wire/lib/ex_wire/packet/capability/eth/get_receipts.ex create mode 100644 apps/ex_wire/lib/ex_wire/packet/capability/eth/node_data.ex create mode 100644 apps/ex_wire/lib/ex_wire/packet/capability/eth/receipts.ex create mode 100644 apps/ex_wire/test/ex_wire/packet/capability/eth/get_node_data_test.exs create mode 100644 apps/ex_wire/test/ex_wire/packet/capability/eth/get_receipts_test.exs create mode 100644 apps/ex_wire/test/ex_wire/packet/capability/eth/node_data_test.exs create mode 100644 apps/ex_wire/test/ex_wire/packet/capability/eth/receipts_test.exs diff --git a/apps/ex_wire/lib/ex_wire/packet/capability/eth/get_node_data.ex b/apps/ex_wire/lib/ex_wire/packet/capability/eth/get_node_data.ex new file mode 100644 index 000000000..018023b2e --- /dev/null +++ b/apps/ex_wire/lib/ex_wire/packet/capability/eth/get_node_data.ex @@ -0,0 +1,120 @@ +defmodule ExWire.Packet.Capability.Eth.GetNodeData do + @moduledoc """ + TODO + + ``` + **GetNodeData** [`+0x0d`, `hash_0`: `B_32`, `hash_1`: `B_32`, `...`] + Require peer to return a NodeData message. Hint that useful values in it are those which correspond to given hashes. + ``` + """ + + alias ExWire.Bridge.Sync + alias ExWire.Packet + alias ExWire.Packet.Capability.Eth.NodeData + alias MerklePatriciaTree.TrieStorage + require Logger + + @behaviour Packet + + @sync Application.get_env(:ex_wire, :sync_mock, Sync) + @max_hashes_supported 100 + + @type t :: %__MODULE__{ + hashes: list(EVM.hash()) + } + + defstruct hashes: [] + + @doc """ + Returns the relative message id offset for this message. + + This will help determine what its message ID is relative to other Packets in the same Capability. + """ + @impl true + @spec message_id_offset() :: 0x0D + def message_id_offset do + 0x0D + end + + @doc """ + Given a GetNodeData packet, serializes for transport over Eth Wire Protocol. + + ## Examples + + iex> %ExWire.Packet.Capability.Eth.GetNodeData{hashes: [<<1::256>>, <<2::256>>]} + ...> |> ExWire.Packet.Capability.Eth.GetNodeData.serialize() + [<<1::256>>, <<2::256>>] + """ + @impl true + @spec serialize(t()) :: ExRLP.t() + def serialize(packet = %__MODULE__{}) do + packet.hashes + end + + @doc """ + Given an RLP-encoded GetNodeData packet from Eth Wire Protocol, + decodes into a GetNodeData struct. + + ## Examples + + iex> ExWire.Packet.Capability.Eth.GetNodeData.deserialize([<<1::256>>, <<2::256>>]) + %ExWire.Packet.Capability.Eth.GetNodeData{hashes: [<<1::256>>, <<2::256>>]} + """ + @impl true + @spec deserialize(ExRLP.t()) :: t() + def deserialize(rlp) do + hashes = rlp + + %__MODULE__{ + hashes: hashes + } + end + + @doc """ + Handles a GetNodeData message. We should send the node data for the given + keys if we have that data. + """ + @impl true + @spec handle(t()) :: ExWire.Packet.handle_response() + def handle(packet = %__MODULE__{}) do + values = + case @sync.get_current_trie() do + {:ok, trie} -> + get_node_data( + trie, + Enum.take(packet.hashes, @max_hashes_supported) + ) + + {:error, error} -> + :ok = + Logger.warn(fn -> + "#{__MODULE__} Error calling Sync.get_current_trie(): #{error}. Returning empty values." + end) + + [] + end + + {:send, %NodeData{values: values}} + end + + @spec get_node_data(Trie.t(), list(EVM.hash())) :: list(binary()) + defp get_node_data(trie, hashes) do + do_get_node_data(trie, hashes, []) + end + + @spec do_get_node_data(Trie.t(), list(EVM.hash()), list(binary())) :: list(binary()) + defp do_get_node_data(_trie, [], acc_values), do: Enum.reverse(acc_values) + + defp do_get_node_data(trie, [hash | rest_hashes], acc_values) do + new_acc = + case TrieStorage.get_raw_key(trie, hash) do + :not_found -> + acc_values + + {:ok, value} -> + [value | acc_values] + end + + do_get_node_data(trie, rest_hashes, new_acc) + end +end diff --git a/apps/ex_wire/lib/ex_wire/packet/capability/eth/get_receipts.ex b/apps/ex_wire/lib/ex_wire/packet/capability/eth/get_receipts.ex new file mode 100644 index 000000000..62f2148b6 --- /dev/null +++ b/apps/ex_wire/lib/ex_wire/packet/capability/eth/get_receipts.ex @@ -0,0 +1,128 @@ +defmodule ExWire.Packet.Capability.Eth.GetReceipts do + @moduledoc """ + TODO + + ``` + **GetReceipts** [`+0x0d`, `hash_0`: `B_32`, `hash_1`: `B_32`, `...`] + Require peer to return a `Receipts` message. Hint that useful values in it + are those which correspond to blocks of the given hashes. + ``` + """ + require Logger + + alias Blockchain.Transaction.Receipt + alias ExWire.Bridge.Sync + alias ExWire.Packet + alias ExWire.Packet.Capability.Eth.Receipts + alias MerklePatriciaTree.TrieStorage + + @behaviour Packet + + @sync Application.get_env(:ex_wire, :sync_mock, Sync) + @max_hashes_supported 100 + + @type t :: %__MODULE__{ + hashes: list(EVM.hash()) + } + + defstruct hashes: [] + + @doc """ + Returns the relative message id offset for this message. + + This will help determine what its message ID is relative to other Packets in the same Capability. + """ + @impl true + @spec message_id_offset() :: 0x0D + def message_id_offset do + 0x0D + end + + @doc """ + Given a GetReceipts packet, serializes for transport over Eth Wire Protocol. + + ## Examples + + iex> %ExWire.Packet.Capability.Eth.GetReceipts{hashes: [<<1::256>>, <<2::256>>]} + ...> |> ExWire.Packet.Capability.Eth.GetReceipts.serialize() + [<<1::256>>, <<2::256>>] + """ + @impl true + @spec serialize(t()) :: ExRLP.t() + def serialize(packet = %__MODULE__{}) do + packet.hashes + end + + @doc """ + Given an RLP-encoded GetReceipts packet from Eth Wire Protocol, + decodes into a GetReceipts struct. + + ## Examples + + iex> ExWire.Packet.Capability.Eth.GetReceipts.deserialize([<<1::256>>, <<2::256>>]) + %ExWire.Packet.Capability.Eth.GetReceipts{hashes: [<<1::256>>, <<2::256>>]} + """ + @impl true + @spec deserialize(ExRLP.t()) :: t() + def deserialize(rlp) do + hashes = rlp + + %__MODULE__{ + hashes: hashes + } + end + + @doc """ + Handles a GetReceipts message. We should send the node data for the given + keys if we have that data. + """ + @impl true + @spec handle(t()) :: ExWire.Packet.handle_response() + def handle(packet = %__MODULE__{}) do + receipts = + case @sync.get_current_trie() do + {:ok, trie} -> + get_receipts( + trie, + Enum.take(packet.hashes, @max_hashes_supported) + ) + + {:error, error} -> + :ok = + Logger.warn(fn -> + "#{__MODULE__} Error calling Sync.get_current_trie(): #{error}. Returning empty receipts." + end) + + [] + end + + {:send, %Receipts{receipts: receipts}} + end + + @spec get_receipts(Trie.t(), list(EVM.hash())) :: list(Receipt.t()) + defp get_receipts(trie, hashes) do + do_get_receipts(trie, hashes, []) + end + + @spec do_get_receipts(Trie.t(), list(EVM.hash()), list(Receipt.t())) :: list(Receipt.t()) + defp do_get_receipts(_trie, [], acc_receipts), do: Enum.reverse(acc_receipts) + + defp do_get_receipts(trie, [hash | rest_hashes], acc_receipts) do + # TODO: Get receipts correctly or whatever. + new_acc = + case TrieStorage.get_raw_key(trie, hash) do + :not_found -> + acc_receipts + + {:ok, receipt_rlp_bin} -> + receipt = + receipt_rlp_bin + |> ExRLP.decode() + |> Receipt.deserialize() + + [receipt | acc_receipts] + end + + do_get_receipts(trie, rest_hashes, new_acc) + end +end diff --git a/apps/ex_wire/lib/ex_wire/packet/capability/eth/node_data.ex b/apps/ex_wire/lib/ex_wire/packet/capability/eth/node_data.ex new file mode 100644 index 000000000..8492fd95d --- /dev/null +++ b/apps/ex_wire/lib/ex_wire/packet/capability/eth/node_data.ex @@ -0,0 +1,83 @@ +defmodule ExWire.Packet.Capability.Eth.NodeData do + @moduledoc """ + TODO + + ``` + **NodeData** [`+0x0e`, `value_0`: `B`, `value_1`: `B`, `...`] + Provide a set of values which correspond to previously asked node data hashes + from GetNodeData. Does not need to contain all; best effort is fine. If it + contains none, then has no information for previous GetNodeData hashes. + ``` + """ + + alias ExWire.Packet + require Logger + + @behaviour Packet + + @type t :: %__MODULE__{ + values: list(binary()) + } + + defstruct values: [] + + @doc """ + Returns the relative message id offset for this message. + + This will help determine what its message ID is relative to other Packets in the same Capability. + """ + @impl true + @spec message_id_offset() :: 0x0E + def message_id_offset do + 0x0E + end + + @doc """ + Given a NodeData packet, serializes for transport over Eth Wire Protocol. + + ## Examples + + iex> %ExWire.Packet.Capability.Eth.NodeData{values: [<<1::256>>, <<2::256>>]} + ...> |> ExWire.Packet.Capability.Eth.NodeData.serialize() + [<<1::256>>, <<2::256>>] + """ + @impl true + @spec serialize(t()) :: ExRLP.t() + def serialize(packet = %__MODULE__{}) do + packet.values + end + + @doc """ + Given an RLP-encoded NodeData packet from Eth Wire Protocol, + decodes into a NodeData struct. + + ## Examples + + iex> ExWire.Packet.Capability.Eth.NodeData.deserialize([<<1::256>>, <<2::256>>]) + %ExWire.Packet.Capability.Eth.NodeData{values: [<<1::256>>, <<2::256>>]} + """ + @impl true + @spec deserialize(ExRLP.t()) :: t() + def deserialize(rlp) do + values = rlp + + %__MODULE__{ + values: values + } + end + + @doc """ + Handles a NodeData message. We do not respond. + + ## Examples + + iex> %ExWire.Packet.Capability.Eth.NodeData{values: [<<1::256>>, <<2::256>>]} + ...> |> ExWire.Packet.Capability.Eth.NodeData.handle() + :ok + """ + @impl true + @spec handle(t()) :: ExWire.Packet.handle_response() + def handle(_packet = %__MODULE__{}) do + :ok + end +end diff --git a/apps/ex_wire/lib/ex_wire/packet/capability/eth/receipts.ex b/apps/ex_wire/lib/ex_wire/packet/capability/eth/receipts.ex new file mode 100644 index 000000000..ac1a37ffd --- /dev/null +++ b/apps/ex_wire/lib/ex_wire/packet/capability/eth/receipts.ex @@ -0,0 +1,87 @@ +defmodule ExWire.Packet.Capability.Eth.Receipts do + @moduledoc """ + TODO + + ``` + **Receipts** [`+0x10`, [`receipt_0`, `receipt_1`], ...] + Provide a set of receipts which correspond to previously asked in + `GetReceipts`. + ``` + """ + require Logger + + alias Blockchain.Transaction.Receipt + alias ExWire.Packet + + @behaviour Packet + + @type t :: %__MODULE__{ + receipts: list(Receipt.t()) + } + + defstruct receipts: [] + + @doc """ + Returns the relative message id offset for this message. + + This will help determine what its message ID is relative to other Packets in the same Capability. + """ + @impl true + @spec message_id_offset() :: 0x10 + def message_id_offset do + 0x10 + end + + @doc """ + Given a Receipts packet, serializes for transport over Eth Wire Protocol. + + ## Examples + + iex> %ExWire.Packet.Capability.Eth.Receipts{receipts: [ + ...> %Blockchain.Transaction.Receipt{state: <<1,2,3>>, cumulative_gas: 5, bloom_filter: <<2,3,4>>, logs: []} + ...> ]} + ...> |> ExWire.Packet.Capability.Eth.Receipts.serialize() + [[<<1, 2, 3>>, 5, <<2, 3, 4>>, []]] + """ + @impl true + @spec serialize(t()) :: ExRLP.t() + def serialize(packet = %__MODULE__{}) do + Enum.map(packet.receipts, &Receipt.serialize/1) + end + + @doc """ + Given an RLP-encoded Receipts packet from Eth Wire Protocol, + decodes into a Receipts struct. + + ## Examples + + iex> ExWire.Packet.Capability.Eth.Receipts.deserialize([[<<1, 2, 3>>, 5, <<2, 3, 4>>, []]]) + %ExWire.Packet.Capability.Eth.Receipts{receipts: [ + %Blockchain.Transaction.Receipt{state: <<1,2,3>>, cumulative_gas: 5, bloom_filter: <<2,3,4>>, logs: []} + ]} + """ + @impl true + @spec deserialize(ExRLP.t()) :: t() + def deserialize(rlp) do + receipts_rlp = rlp + + %__MODULE__{ + receipts: Enum.map(receipts_rlp, &Receipt.deserialize/1) + } + end + + @doc """ + Handles a Receipts message. We do not respond. + + ## Examples + + iex> %ExWire.Packet.Capability.Eth.Receipts{receipts: []} + ...> |> ExWire.Packet.Capability.Eth.Receipts.handle() + :ok + """ + @impl true + @spec handle(t()) :: ExWire.Packet.handle_response() + def handle(_packet = %__MODULE__{}) do + :ok + end +end diff --git a/apps/ex_wire/test/ex_wire/packet/capability/eth/get_node_data_test.exs b/apps/ex_wire/test/ex_wire/packet/capability/eth/get_node_data_test.exs new file mode 100644 index 000000000..dd81b8c3c --- /dev/null +++ b/apps/ex_wire/test/ex_wire/packet/capability/eth/get_node_data_test.exs @@ -0,0 +1,25 @@ +defmodule ExWire.Packet.Capability.Eth.GetNodeDataTest do + use ExUnit.Case, async: true + doctest ExWire.Packet.Capability.Eth.GetNodeData + + describe "handle/1" do + test "responds to request" do + ExWire.BridgeSyncMock.start_link(%{}) + + MerklePatriciaTree.Test.random_ets_db() + |> MerklePatriciaTree.Trie.new() + |> MerklePatriciaTree.TrieStorage.put_raw_key!(<<2::256>>, "mana") + |> ExWire.BridgeSyncMock.set_current_trie() + + handle_response = + %ExWire.Packet.Capability.Eth.GetNodeData{hashes: [<<1::256>>, <<2::256>>]} + |> ExWire.Packet.Capability.Eth.GetNodeData.handle() + + assert handle_response == + {:send, + %ExWire.Packet.Capability.Eth.NodeData{ + values: ["mana"] + }} + end + end +end diff --git a/apps/ex_wire/test/ex_wire/packet/capability/eth/get_receipts_test.exs b/apps/ex_wire/test/ex_wire/packet/capability/eth/get_receipts_test.exs new file mode 100644 index 000000000..f27000025 --- /dev/null +++ b/apps/ex_wire/test/ex_wire/packet/capability/eth/get_receipts_test.exs @@ -0,0 +1,39 @@ +defmodule ExWire.Packet.Capability.Eth.GetReceiptsTest do + use ExUnit.Case, async: true + doctest ExWire.Packet.Capability.Eth.GetReceipts + + alias Blockchain.Transaction.Receipt + + describe "handle/1" do + test "respond to request" do + ExWire.BridgeSyncMock.start_link(%{}) + + receipt = %Receipt{ + state: <<1, 2, 3>>, + cumulative_gas: 5, + bloom_filter: <<2, 3, 4>>, + logs: [] + } + + receipt_rlp_bin = + receipt + |> Receipt.serialize() + |> ExRLP.encode() + + MerklePatriciaTree.Test.random_ets_db() + |> MerklePatriciaTree.Trie.new() + |> MerklePatriciaTree.TrieStorage.put_raw_key!(<<2::256>>, receipt_rlp_bin) + |> ExWire.BridgeSyncMock.set_current_trie() + + handle_response = + %ExWire.Packet.Capability.Eth.GetReceipts{hashes: [<<1::256>>, <<2::256>>]} + |> ExWire.Packet.Capability.Eth.GetReceipts.handle() + + assert handle_response == + {:send, + %ExWire.Packet.Capability.Eth.Receipts{ + receipts: [receipt] + }} + end + end +end diff --git a/apps/ex_wire/test/ex_wire/packet/capability/eth/node_data_test.exs b/apps/ex_wire/test/ex_wire/packet/capability/eth/node_data_test.exs new file mode 100644 index 000000000..71f0b3bf6 --- /dev/null +++ b/apps/ex_wire/test/ex_wire/packet/capability/eth/node_data_test.exs @@ -0,0 +1,4 @@ +defmodule ExWire.Packet.Capability.Eth.NodeDataTest do + use ExUnit.Case, async: true + doctest ExWire.Packet.Capability.Eth.NodeData +end diff --git a/apps/ex_wire/test/ex_wire/packet/capability/eth/receipts_test.exs b/apps/ex_wire/test/ex_wire/packet/capability/eth/receipts_test.exs new file mode 100644 index 000000000..63100d8d4 --- /dev/null +++ b/apps/ex_wire/test/ex_wire/packet/capability/eth/receipts_test.exs @@ -0,0 +1,4 @@ +defmodule ExWire.Packet.Capability.Eth.ReceiptsTest do + use ExUnit.Case, async: true + doctest ExWire.Packet.Capability.Eth.Receipts +end From 132f08486d90e21cffacd05799158278988274bd Mon Sep 17 00:00:00 2001 From: Geoffrey Hayes Date: Sat, 1 Dec 2018 19:00:48 -0800 Subject: [PATCH 4/5] Add fast options --- apps/cli/lib/mix/tasks/mana.ex | 8 +++++--- apps/cli/lib/parser/mana_parser.ex | 20 ++++++++++++++++++- apps/ex_wire/config/config.exs | 4 ++-- apps/ex_wire/lib/ex_wire.ex | 3 ++- apps/ex_wire/lib/ex_wire/config.ex | 6 ++++++ .../ex_wire/lib/ex_wire/struct/block_queue.ex | 4 ++-- apps/ex_wire/lib/ex_wire/sync.ex | 14 +++++++------ 7 files changed, 44 insertions(+), 15 deletions(-) diff --git a/apps/cli/lib/mix/tasks/mana.ex b/apps/cli/lib/mix/tasks/mana.ex index 65033d8d8..b97691be5 100644 --- a/apps/cli/lib/mix/tasks/mana.ex +++ b/apps/cli/lib/mix/tasks/mana.ex @@ -36,6 +36,7 @@ defmodule Mix.Tasks.Mana do sync: sync, bootnodes: bootnodes, warp: warp, + fast: fast, debug: debug }} -> :ok = Logger.warn("Starting mana chain #{Atom.to_string(chain_name)}...") @@ -48,13 +49,14 @@ defmodule Mix.Tasks.Mana do sync: sync, discovery: discovery, bootnodes: bootnodes, - warp: warp + warp: warp, + fast: fast ) {:ok, _} = Application.ensure_all_started(:ex_wire) - # No Halt - Process.sleep(:infinity) + # No Halt + # Process.sleep(:infinity) {:error, error} -> _ = Logger.error("Error: #{error}") diff --git a/apps/cli/lib/parser/mana_parser.ex b/apps/cli/lib/parser/mana_parser.ex index 44f37f254..65ad61d75 100644 --- a/apps/cli/lib/parser/mana_parser.ex +++ b/apps/cli/lib/parser/mana_parser.ex @@ -8,6 +8,7 @@ defmodule CLI.Parser.ManaParser do @default_no_sync false @default_bootnodes "from_chain" @default_warp false + @default_fast false @default_debug false @doc """ @@ -19,6 +20,7 @@ defmodule CLI.Parser.ManaParser do * `--no-sync` - Perform syncing (default: false) * `--bootnodes` - Comma separated list of bootnodes (default: from_chain) * `--warp` - Perform warp sync (default: false) + * `--fast` - Perform fast sync (default: false) * `--debug` - Add remote debugging (default: false) ## Examples @@ -30,16 +32,18 @@ defmodule CLI.Parser.ManaParser do sync: false, bootnodes: :from_chain, warp: false, + fast: false, debug: false }} - iex> CLI.Parser.ManaParser.mana_args(["--chain", "foundation", "--bootnodes", "enode://google.com,enode://apple.com", "--warp", "--debug"]) + iex> CLI.Parser.ManaParser.mana_args(["--chain", "foundation", "--bootnodes", "enode://google.com,enode://apple.com", "--warp", "--fast", "--debug"]) {:ok, %{ chain_name: :foundation, discovery: true, sync: true, bootnodes: ["enode://google.com", "enode://apple.com"], warp: true, + fast: true, debug: true }} @@ -50,6 +54,7 @@ defmodule CLI.Parser.ManaParser do sync: true, bootnodes: :from_chain, warp: false, + fast: false, debug: false }} @@ -64,6 +69,7 @@ defmodule CLI.Parser.ManaParser do sync: boolean(), bootnodes: :from_chain | list(String.t()), warp: boolean(), + fast: boolean(), debug: boolean() }} | {:error, String.t()} @@ -76,6 +82,7 @@ defmodule CLI.Parser.ManaParser do no_sync: :boolean, bootnodes: :string, warp: :boolean, + fast: :boolean, debug: :boolean ] ) @@ -85,6 +92,7 @@ defmodule CLI.Parser.ManaParser do {:ok, sync} <- get_sync(kw_args), {:ok, bootnodes} <- get_bootnodes(kw_args), {:ok, warp} <- get_warp(kw_args), + {:ok, fast} <- get_fast(kw_args), {:ok, debug} <- get_debug(kw_args) do {:ok, %{ @@ -93,6 +101,7 @@ defmodule CLI.Parser.ManaParser do sync: sync, bootnodes: bootnodes, warp: warp, + fast: fast, debug: debug }} end @@ -158,6 +167,15 @@ defmodule CLI.Parser.ManaParser do {:ok, given_warp} end + @spec get_fast(fast: boolean()) :: {:ok, boolean()} | {:error, String.t()} + defp get_fast(kw_args) do + given_fast = + kw_args + |> Keyword.get(:fast, @default_fast) + + {:ok, given_fast} + end + @spec get_debug(debug: boolean()) :: {:ok, boolean()} | {:error, String.t()} defp get_debug(kw_args) do given_debug = diff --git a/apps/ex_wire/config/config.exs b/apps/ex_wire/config/config.exs index c071c19cd..a07eb315b 100644 --- a/apps/ex_wire/config/config.exs +++ b/apps/ex_wire/config/config.exs @@ -26,8 +26,8 @@ mana_version = config :ex_wire, p2p_version: 0x04, - protocol_version: 62, - caps: [{"eth", 62}, {"par", 1}], + protocol_version: 63, + caps: [{"eth", 62}, {"eth", 63}, {"par", 1}], # TODO: This should be set and stored in a file private_key: :random, bootnodes: :from_chain, diff --git a/apps/ex_wire/lib/ex_wire.ex b/apps/ex_wire/lib/ex_wire.ex index 9e5517b49..226164990 100644 --- a/apps/ex_wire/lib/ex_wire.ex +++ b/apps/ex_wire/lib/ex_wire.ex @@ -31,6 +31,7 @@ defmodule ExWire do perform_discovery = Config.perform_discovery?() warp = Config.warp?() + fast = Config.fast?() db = RocksDB.init(Config.db_name(chain)) @@ -65,7 +66,7 @@ defmodule ExWire do child_spec({PeerSupervisor, start_nodes}, []), # Sync coordinates asking peers for new blocks - child_spec({Sync, {trie, chain, warp, warp_queue}}, []) + child_spec({Sync, {trie, chain, warp, warp_queue, fast}}, []) ] else [] diff --git a/apps/ex_wire/lib/ex_wire/config.ex b/apps/ex_wire/lib/ex_wire/config.ex index e6e727d35..6498d7d6d 100644 --- a/apps/ex_wire/lib/ex_wire/config.ex +++ b/apps/ex_wire/lib/ex_wire/config.ex @@ -28,6 +28,7 @@ defmodule ExWire.Config do | :public_ip | :sync | :warp + | :fast @doc """ Allows application to configure ExWire before it starts. @@ -156,6 +157,11 @@ defmodule ExWire.Config do get_env(given_params, :warp) end + @spec fast?(Keyword.t()) :: boolean() + def fast?(given_params \\ []) do + get_env(given_params, :fast) + end + @spec bootnodes(Keyword.t()) :: [String.t()] def bootnodes(given_params \\ []) do if conf_ip = System.get_env("BOOTNODES") do diff --git a/apps/ex_wire/lib/ex_wire/struct/block_queue.ex b/apps/ex_wire/lib/ex_wire/struct/block_queue.ex index 3d884c817..d467df06b 100644 --- a/apps/ex_wire/lib/ex_wire/struct/block_queue.ex +++ b/apps/ex_wire/lib/ex_wire/struct/block_queue.ex @@ -15,13 +15,13 @@ defmodule ExWire.Struct.BlockQueue do 3. We may be waiting on a parent block as we received the child first. We add these blocks to a backlog map keyed by the parent hash. """ + require Logger + alias Block.Header alias ExWire.Struct.Block, as: BlockStruct alias Blockchain.{Block, Blocktree, Chain} alias MerklePatriciaTree.Trie - require Logger - # These will be used to help us determine if a block is empty @empty_trie MerklePatriciaTree.Trie.empty_trie_root_hash() @empty_hash [] |> ExRLP.encode() |> ExthCrypto.Hash.Keccak.kec() diff --git a/apps/ex_wire/lib/ex_wire/sync.ex b/apps/ex_wire/lib/ex_wire/sync.ex index 69f4d3bff..42b271897 100644 --- a/apps/ex_wire/lib/ex_wire/sync.ex +++ b/apps/ex_wire/lib/ex_wire/sync.ex @@ -57,7 +57,8 @@ defmodule ExWire.Sync do starting_block_number: non_neg_integer() | nil, highest_block_number: non_neg_integer() | nil, warp: boolean(), - warp_processor: GenServer.server() + warp_processor: GenServer.server(), + fast: boolean() } @spec get_state(GenServer.server()) :: state @@ -68,12 +69,12 @@ defmodule ExWire.Sync do @doc """ Starts a sync process for a given chain. """ - @spec start_link({Trie.t(), Chain.t(), boolean(), WarpQueue.t() | nil}, Keyword.t()) :: + @spec start_link({Trie.t(), Chain.t(), boolean(), WarpQueue.t() | nil, boolean()}, Keyword.t()) :: GenServer.on_start() - def start_link({trie, chain, warp, warp_queue}, opts \\ []) do + def start_link({trie, chain, warp, warp_queue, fast}, opts \\ []) do warp_processor = Keyword.get(opts, :warp_processor, WarpProcessor) - GenServer.start_link(__MODULE__, {trie, chain, warp, warp_queue, warp_processor}, + GenServer.start_link(__MODULE__, {trie, chain, warp, warp_queue, warp_processor, fast}, name: Keyword.get(opts, :name, __MODULE__) ) end @@ -87,7 +88,7 @@ defmodule ExWire.Sync do We should handle this case more gracefully. """ @impl true - def init({trie, chain, warp, warp_queue, warp_processor}) do + def init({trie, chain, warp, warp_queue, warp_processor, fast}) do block_tree = load_sync_state(TrieStorage.permanent_db(trie)) block_queue = %BlockQueue{} @@ -103,7 +104,8 @@ defmodule ExWire.Sync do starting_block_number: block.header.number, highest_block_number: block.header.number, warp: warp, - warp_processor: warp_processor + warp_processor: warp_processor, + fast: fast } next_state = From 4e81268764f32719ba8b6af58b93807e781d3cd8 Mon Sep 17 00:00:00 2001 From: Geoffrey Hayes Date: Mon, 3 Dec 2018 10:43:24 -0800 Subject: [PATCH 5/5] Upgrade block syncing to use new processor model --- apps/blockchain/lib/blockchain/block.ex | 2 +- apps/ex_wire/lib/ex_wire.ex | 10 +- apps/ex_wire/lib/ex_wire/p2p/server.ex | 9 + .../lib/ex_wire/packet/capability/eth.ex | 19 + apps/ex_wire/lib/ex_wire/peer_supervisor.ex | 16 +- .../ex_wire/lib/ex_wire/struct/block_queue.ex | 517 ++++++++++-------- apps/ex_wire/lib/ex_wire/sync.ex | 420 +++++++------- .../lib/ex_wire/sync/block_processor.ex | 187 +++++++ .../block_processor/standard_processor.ex | 122 +++++ apps/ex_wire/lib/ex_wire/sync/block_state.ex | 46 ++ 10 files changed, 921 insertions(+), 427 deletions(-) create mode 100644 apps/ex_wire/lib/ex_wire/sync/block_processor.ex create mode 100644 apps/ex_wire/lib/ex_wire/sync/block_processor/standard_processor.ex create mode 100644 apps/ex_wire/lib/ex_wire/sync/block_state.ex diff --git a/apps/blockchain/lib/blockchain/block.ex b/apps/blockchain/lib/blockchain/block.ex index eca38aebf..2dc4dc510 100644 --- a/apps/blockchain/lib/blockchain/block.ex +++ b/apps/blockchain/lib/blockchain/block.ex @@ -286,7 +286,7 @@ defmodule Blockchain.Block do iex> trie = MerklePatriciaTree.Trie.new(MerklePatriciaTree.Test.random_ets_db()) iex> {updated_block, _new_trie} = Blockchain.Block.put_receipt(%Blockchain.Block{}, 6, %Blockchain.Transaction.Receipt{state: <<1, 2, 3>>, cumulative_gas: 10, bloom_filter: <<2, 3, 4>>, logs: []}, trie) iex> {updated_block, _new_trie} = Blockchain.Block.put_receipt(updated_block, 7, %Blockchain.Transaction.Receipt{state: <<4, 5, 6>>, cumulative_gas: 11, bloom_filter: <<5, 6, 7>>, logs: []}, trie) - iex> Blockchain.Block.get_receipt(updated_block, 6, trie.db) + iex> Blockchain.Block.x(updated_block, 6, trie.db) %Blockchain.Transaction.Receipt{state: <<1, 2, 3>>, cumulative_gas: 10, bloom_filter: <<2, 3, 4>>, logs: []} iex> trie = MerklePatriciaTree.Trie.new(MerklePatriciaTree.Test.random_ets_db()) diff --git a/apps/ex_wire/lib/ex_wire.ex b/apps/ex_wire/lib/ex_wire.ex index 226164990..41e0a2bd9 100644 --- a/apps/ex_wire/lib/ex_wire.ex +++ b/apps/ex_wire/lib/ex_wire.ex @@ -13,8 +13,8 @@ defmodule ExWire do alias ExWire.NodeDiscoverySupervisor alias ExWire.PeerSupervisor alias ExWire.Sync + alias ExWire.Sync.{BlockState, WarpState} alias ExWire.Sync.WarpProcessor.PowProcessor - alias ExWire.Sync.WarpState alias ExWire.TCPListeningSupervisor alias MerklePatriciaTree.{CachingTrie, DB.RocksDB, Trie} @@ -31,7 +31,6 @@ defmodule ExWire do perform_discovery = Config.perform_discovery?() warp = Config.warp?() - fast = Config.fast?() db = RocksDB.init(Config.db_name(chain)) @@ -40,6 +39,8 @@ defmodule ExWire do |> Trie.new() |> CachingTrie.new() + block_queue = BlockState.load_block_queue(db) + warp_queue = if warp do WarpState.load_warp_queue(db) @@ -65,8 +66,11 @@ defmodule ExWire do # Peer supervisor maintains a pool of outbound peers child_spec({PeerSupervisor, start_nodes}, []), + # Processes blocks + {ExWire.Sync.BlockProcessor, {trie}}, + # Sync coordinates asking peers for new blocks - child_spec({Sync, {trie, chain, warp, warp_queue, fast}}, []) + child_spec({Sync, {trie, chain, block_queue, warp, warp_queue}}, []) ] else [] diff --git a/apps/ex_wire/lib/ex_wire/p2p/server.ex b/apps/ex_wire/lib/ex_wire/p2p/server.ex index 10f9922df..88c4580ac 100644 --- a/apps/ex_wire/lib/ex_wire/p2p/server.ex +++ b/apps/ex_wire/lib/ex_wire/p2p/server.ex @@ -74,6 +74,11 @@ defmodule ExWire.P2P.Server do }) end + @spec get_state(pid()) :: Connection.t() + def get_state(pid) do + GenServer.call(pid, :get_state, :infinity) + end + @doc """ Client function for sending a packet over to a peer. """ @@ -132,6 +137,10 @@ defmodule ExWire.P2P.Server do {:ok, state} end + def handle_call(:get_state, _from, state) do + {:reply, state, state} + end + def handle_call(:get_peer, _from, state = %{peer: peer}) do {:reply, peer, state} end diff --git a/apps/ex_wire/lib/ex_wire/packet/capability/eth.ex b/apps/ex_wire/lib/ex_wire/packet/capability/eth.ex index daddeb6bf..bae4c1e43 100644 --- a/apps/ex_wire/lib/ex_wire/packet/capability/eth.ex +++ b/apps/ex_wire/lib/ex_wire/packet/capability/eth.ex @@ -26,6 +26,25 @@ defmodule ExWire.Packet.Capability.Eth do :reserved, :reserved, :reserved + ], + 63 => [ + Eth.Status, + Eth.NewBlockHashes, + Eth.Transactions, + Eth.GetBlockHeaders, + Eth.BlockHeaders, + Eth.GetBlockBodies, + Eth.BlockBodies, + Eth.NewBlock, + Eth.GetNodeData, + Eth.NodeData, + Eth.GetReceipts, + Eth.Receipts, + :reserved, + :reserved, + :reserved, + :reserved, + :reserved ] } diff --git a/apps/ex_wire/lib/ex_wire/peer_supervisor.ex b/apps/ex_wire/lib/ex_wire/peer_supervisor.ex index c5e661fbd..f32bc12f3 100644 --- a/apps/ex_wire/lib/ex_wire/peer_supervisor.ex +++ b/apps/ex_wire/lib/ex_wire/peer_supervisor.ex @@ -92,28 +92,36 @@ defmodule ExWire.PeerSupervisor do @spec do_find_children(node_selector()) :: list(any()) defp do_find_children(:all) do - DynamicSupervisor.which_children(@name) + compatible_children(@name) end defp do_find_children(:last) do @name - |> DynamicSupervisor.which_children() + |> compatible_children() |> List.last() |> List.wrap() end defp do_find_children(:random) do @name - |> DynamicSupervisor.which_children() + |> compatible_children() |> Enum.shuffle() |> Enum.take(1) |> List.wrap() end + defp compatible_children(name) do + name + |> DynamicSupervisor.which_children() + |> Enum.filter(fn {_id, child, _type, _modules} -> + Server.get_state(child).session != nil + end) + end + @spec connected_peer_count() :: non_neg_integer() def connected_peer_count() do @name - |> DynamicSupervisor.which_children() + |> compatible_children() |> Enum.count() end diff --git a/apps/ex_wire/lib/ex_wire/struct/block_queue.ex b/apps/ex_wire/lib/ex_wire/struct/block_queue.ex index d467df06b..77cff8346 100644 --- a/apps/ex_wire/lib/ex_wire/struct/block_queue.ex +++ b/apps/ex_wire/lib/ex_wire/struct/block_queue.ex @@ -19,17 +19,29 @@ defmodule ExWire.Struct.BlockQueue do alias Block.Header alias ExWire.Struct.Block, as: BlockStruct - alias Blockchain.{Block, Blocktree, Chain} + alias ExWire.Struct.Peer + alias Blockchain.{Block, Blocktree} alias MerklePatriciaTree.Trie + alias ExWire.Packet.Capability.Eth.{ + BlockBodies, + BlockHeaders, + NodeData, + Receipts + } + # These will be used to help us determine if a block is empty @empty_trie MerklePatriciaTree.Trie.empty_trie_root_hash() @empty_hash [] |> ExRLP.encode() |> ExthCrypto.Hash.Keccak.kec() defstruct queue: %{}, - backlog: %{}, - do_validation: true, - block_numbers: MapSet.new() + block_numbers: MapSet.new(), + needed_block_hashes: [], + max_header_request: nil, + header_requests: MapSet.new(), + block_requests: MapSet.new(), + block_tree: Blocktree.new_tree(), + processing_blocks: MapSet.new() @type block_item :: %{ commitments: list(binary()), @@ -41,206 +53,20 @@ defmodule ExWire.Struct.BlockQueue do EVM.hash() => block_item } + @type request :: {:header, integer()} | {:block, list(EVM.hash())} + @type t :: %__MODULE__{ queue: %{integer() => block_map}, - backlog: %{EVM.hash() => list(Block.t())}, - do_validation: boolean(), - block_numbers: MapSet.t() + block_numbers: MapSet.t(integer()), + needed_block_hashes: list(EVM.hash()), + max_header_request: nil | integer(), + header_requests: MapSet.t(integer()), + block_requests: MapSet.t(EVM.hash()), + block_tree: Blocktree.t(), + processing_blocks: %{EVM.hash() => Block.t()} } - @doc """ - Adds a given header received by a peer to a block queue. Returns whether or - not we should request the block body. - - Note: we will process it if the block is empty (i.e. has neither transactions - nor ommers). - """ - @spec add_header( - t, - Blocktree.t(), - Header.t(), - EVM.hash(), - binary(), - Chain.t(), - Trie.t() - ) :: {t, Blocktree.t(), Trie.t(), boolean()} - def add_header( - block_queue = %__MODULE__{queue: queue}, - block_tree, - header, - header_hash, - remote_id, - chain, - trie - ) do - block_map = Map.get(queue, header.number, %{}) - - {block_map, should_request_body} = - case Map.get(block_map, header_hash) do - nil -> - # may already be ready, already. - is_empty = is_block_empty?(header) - - block_map = - Map.put(block_map, header_hash, %{ - commitments: MapSet.new([remote_id]), - block: %Block{header: header}, - ready: is_empty - }) - - {block_map, not is_empty} - - block_item -> - {Map.put(block_map, header_hash, %{ - block_item - | commitments: MapSet.put(block_item.commitments, remote_id) - }), false} - end - - updated_block_queue = %{ - block_queue - | queue: Map.put(queue, header.number, block_map), - block_numbers: MapSet.put(block_queue.block_numbers, header.number) - } - - {new_block_queue, new_block_tree, new_trie} = - process_block_queue(updated_block_queue, block_tree, chain, trie) - - {new_block_queue, new_block_tree, new_trie, should_request_body} - end - - @doc """ - Adds a given block struct received by a peer to a block queue. - - Since we don't really know which block this belongs to, we're going to just - need to look at every block and try and guess. - - To guess, we'll compute the transactions root and ommers hash, and then try - and find a header that matches it. For empty blocks (ones with no transactions - and no ommers, there may be several matches. Otherwise, each block body should - pretty much be unique). - """ - @spec add_block_struct( - t(), - Blocktree.t(), - BlockStruct.t(), - Chain.t(), - Trie.t() - ) :: {t(), Blocktree.t(), Trie.t()} - def add_block_struct( - block_queue = %__MODULE__{queue: queue}, - block_tree, - block_struct, - chain, - trie - ) do - transactions_root = get_transactions_root(block_struct.transactions_rlp) - ommers_hash = get_ommers_hash(block_struct.ommers_rlp) - - updated_queue = - Enum.reduce(queue, queue, fn {number, block_map}, queue -> - updated_block_map = - Enum.reduce(block_map, block_map, fn {hash, block_item}, block_map -> - if block_item.block.header.transactions_root == transactions_root and - block_item.block.header.ommers_hash == ommers_hash do - # This is now ready! (though, it may not still have enough commitments) - block = %{ - block_item.block - | transactions: block_struct.transactions, - ommers: block_struct.ommers - } - - Map.put(block_map, hash, %{block_item | block: block, ready: true}) - else - block_map - end - end) - - Map.put(queue, number, updated_block_map) - end) - - updated_block_queue = %{block_queue | queue: updated_queue} - - process_block_queue(updated_block_queue, block_tree, chain, trie) - end - - @doc """ - Processes a the block queue, adding any blocks which are complete and pass - the number of confirmations to the block tree. These blocks are then removed - from the queue. Note: they may end up in the backlog, nonetheless, if we are - waiting still for the parent block. - """ - @spec process_block_queue(t(), Blocktree.t(), Chain.t(), Trie.t()) :: - {t(), Blocktree.t(), Trie.t()} - def process_block_queue( - block_queue = %__MODULE__{}, - block_tree, - chain, - trie - ) do - # First get ready to process blocks - {remaining_block_queue, blocks} = get_complete_blocks(block_queue) - - # Then recursively process them - do_process_blocks(blocks, remaining_block_queue, block_tree, chain, trie) - end - - @spec do_process_blocks(list(Block.t()), t(), Blocktree.t(), Chain.t(), Trie.t()) :: - {t(), Blocktree.t(), Trie.t()} - - defp do_process_blocks([], block_queue, block_tree, _chain, trie), - do: {block_queue, block_tree, trie} - - defp do_process_blocks([block | rest], block_queue, block_tree, chain, trie) do - {new_block_tree, new_trie, new_backlog, extra_blocks} = - case Blocktree.verify_and_add_block( - block_tree, - chain, - block, - trie, - block_queue.do_validation - ) do - {:invalid, [:non_genesis_block_requires_parent]} -> - # Note: this is probably too slow since we see a lot of blocks without - # parents and, I think, we're running the full validity check. - - # :ok = Logger.debug("[Block Queue] Failed to verify block due to missing parent") - - updated_backlog = - Map.update( - block_queue.backlog, - block.header.parent_hash, - [block], - fn blocks -> [block | blocks] end - ) - - {block_tree, trie, updated_backlog, []} - - {:invalid, reasons} -> - :ok = - Logger.debug(fn -> - "[Block Queue] Failed to verify block due to #{inspect(reasons)}" - end) - - {block_tree, trie, block_queue.backlog, []} - - {:ok, {new_block_tree, new_trie, block_hash}} -> - :ok = - Logger.debug(fn -> - "[Block Queue] Verified block #{block.header.number} (0x#{ - Base.encode16(block_hash, case: :lower) - }) and added to new block tree" - end) - - {backlogged_blocks, new_backlog} = Map.pop(block_queue.backlog, block_hash, []) - - {new_block_tree, new_trie, new_backlog, backlogged_blocks} - end - - new_block_queue = %{block_queue | backlog: new_backlog} - - do_process_blocks(extra_blocks ++ rest, new_block_queue, new_block_tree, chain, new_trie) - end + @headers_per_request 15 @doc """ Returns the set of blocks which are complete in the block queue, returning a @@ -321,29 +147,39 @@ defmodule ExWire.Struct.BlockQueue do } """ @spec get_complete_blocks(t) :: {t, [Block.t()]} - def get_complete_blocks(block_queue = %__MODULE__{queue: queue}) do - {queue, blocks} = - Enum.reduce(queue, {queue, []}, fn {number, block_map}, {queue, blocks} -> - {final_block_map, new_blocks} = - Enum.reduce(block_map, {block_map, []}, fn {hash, block_item}, {block_map, blocks} -> + def get_complete_blocks( + block_queue = %__MODULE__{queue: queue, processing_blocks: processing_blocks} + ) do + {next_queue, next_processing_blocks, blocks} = + Enum.reduce(queue, {queue, processing_blocks, []}, fn {number, block_map}, + {curr_queue, curr_processing_blocks, + blocks} -> + {final_block_map, new_blocks, next_processing_blocks} = + Enum.reduce(block_map, {block_map, [], curr_processing_blocks}, fn {hash, block_item}, + {block_map, blocks, + inner_curr_processing_blocks} -> if block_item.ready and MapSet.size(block_item.commitments) >= ExWire.Config.commitment_count() do - {Map.delete(block_map, hash), [block_item.block | blocks]} + {Map.delete(block_map, hash), [block_item.block | blocks], + Map.put(inner_curr_processing_blocks, hash, block_item.block)} else - {block_map, blocks} + {block_map, blocks, inner_curr_processing_blocks} end end) total_blocks = blocks ++ new_blocks - if final_block_map == %{} do - {Map.delete(queue, number), total_blocks} - else - {Map.put(queue, number, final_block_map), total_blocks} - end + next_queue = + if final_block_map == %{} do + Map.delete(curr_queue, number) + else + Map.put(curr_queue, number, final_block_map) + end + + {next_queue, next_processing_blocks, total_blocks} end) - {%{block_queue | queue: queue}, blocks} + {%{block_queue | queue: next_queue, processing_blocks: next_processing_blocks}, blocks} end @doc """ @@ -378,6 +214,261 @@ defmodule ExWire.Struct.BlockQueue do header.transactions_root == @empty_trie and header.ommers_hash == @empty_hash end + @doc """ + Determines the next block we don't yet have in our blocktree and + dispatches a request to all connected peers for that block and the + next `n` blocks after it. + """ + @spec get_requests(BlockQueue.t()) :: list(request()) + def get_requests(block_queue) do + requests = [] + + # TODO: Consider this conditional logic + {next_block_queue, requests} = + if MapSet.size(block_queue.header_requests) > 5 || + Enum.count(block_queue.needed_block_hashes) > 5 do + {block_queue, requests} + else + highest_request = + if is_nil(block_queue.max_header_request) do + if is_nil(block_queue.block_tree.best_block) do + 0 + else + 0 + end + else + 0 + end + + { + %{ + block_queue + | header_requests: + MapSet.union( + block_queue.header_requests, + MapSet.new(highest_request..(highest_request + @headers_per_request)) + ), + max_header_request: highest_request + @headers_per_request + }, + [{:headers, highest_request, @headers_per_request} | requests] + } + end + + {next_block_queue_2, requests} = + if Enum.count(next_block_queue.needed_block_hashes) == 0 do + {next_block_queue, requests} + else + { + %{ + next_block_queue + | needed_block_hashes: [], + block_requests: + MapSet.union( + next_block_queue.block_requests, + MapSet.new(next_block_queue.needed_block_hashes) + ) + }, + [{:bodies, next_block_queue.needed_block_hashes} | requests] + } + end + + { + next_block_queue_2, + Enum.reverse(requests) + } + end + + @doc """ + Adds new block headers to the block queue. + """ + @spec new_block_headers(t(), BlockHeaders.t(), Peer.t()) :: t() + def new_block_headers( + block_queue = %__MODULE__{}, + %BlockHeaders{headers: headers}, + peer + ) do + Enum.reduce(headers, block_queue, fn header, curr_block_queue -> + header_hash = Header.hash(header) + bq = add_header(curr_block_queue, header, header_hash, peer.remote_id) + IO.inspect(["Queue Size", Enum.count(bq.queue)]) + bq + end) + end + + @doc """ + Adds new block bodies to the block queue. + """ + @spec new_block_bodies(t(), BlockBodies.t()) :: t() + def new_block_bodies( + block_queue = %__MODULE__{}, + %BlockBodies{blocks: blocks} + ) do + Enum.reduce(blocks, block_queue, fn block_body, curr_block_queue -> + add_block_struct(curr_block_queue, block_body) + end) + end + + @doc """ + Adds new node data to the block queue. + """ + @spec new_node_data(t(), NodeData.t()) :: t() + def new_node_data( + block_queue = %__MODULE__{}, + %NodeData{values: values} + ) do + :ok = + Exth.trace(fn -> + "#{__MODULE__} Got and ignoring #{Enum.count(values)} node data value(s)." + end) + + block_queue + end + + @doc """ + Adds new receipts to the block queue. + """ + @spec new_receipts(t(), Receipts.t()) :: t() + def new_receipts( + block_queue = %__MODULE__{}, + %Receipts{receipts: receipts} + ) do + :ok = + Exth.trace(fn -> "#{__MODULE__} Got and ignoring #{Enum.count(receipts)} receipt(s)." end) + + block_queue + end + + # Adds a given header received by a peer to a block queue. Returns whether or + # not we should request the block body. + + # Note: we will process it if the block is empty (i.e. has neither transactions + # nor ommers). + @spec add_header(t(), Header.t(), EVM.hash(), binary()) :: t() + def add_header( + block_queue = %__MODULE__{ + queue: queue, + needed_block_hashes: needed_block_hashes + }, + header, + header_hash, + remote_id + ) do + block_map = Map.get(queue, header.number, %{}) + + {next_block_map, next_needed_block_hashes} = + case Map.get(block_map, header_hash) do + nil -> + # may already be ready, already. + is_empty = is_block_empty?(header) + + next_block_map_inner = + Map.put(block_map, header_hash, %{ + commitments: MapSet.new([remote_id]), + block: %Block{header: header}, + ready: is_empty + }) + + next_needed_block_hashes = + if is_empty do + needed_block_hashes + else + [header_hash | needed_block_hashes] + end + + {next_block_map_inner, next_needed_block_hashes} + + block_item -> + {Map.put(block_map, header_hash, %{ + block_item + | commitments: MapSet.put(block_item.commitments, remote_id) + }), needed_block_hashes} + end + + %{ + block_queue + | queue: Map.put(queue, header.number, next_block_map |> IO.inspect()), + block_numbers: MapSet.put(block_queue.block_numbers, header.number), + needed_block_hashes: next_needed_block_hashes, + header_requests: MapSet.delete(block_queue.header_requests, header.number) + } + end + + @doc """ + Adds a given block struct received by a peer to a block queue. + + Since we don't really know which block this belongs to, we're going to just + need to look at every block and try and guess. + + To guess, we'll compute the transactions root and ommers hash, and then try + and find a header that matches it. For empty blocks (ones with no transactions + and no ommers, there may be several matches. Otherwise, each block body should + pretty much be unique). + """ + @spec add_block_struct( + t(), + BlockStruct.t() + ) :: t() + def add_block_struct( + block_queue = %__MODULE__{ + queue: queue + }, + block_struct + ) do + transactions_root = get_transactions_root(block_struct.transactions_rlp) + ommers_hash = get_ommers_hash(block_struct.ommers_rlp) + Exth.inspect(queue, "queue") + + updated_queue = + Enum.reduce(queue, queue, fn {number, block_map}, curr_queue -> + updated_block_map = + Enum.reduce(block_map, block_map, fn {hash, block_item}, curr_block_map -> + if block_item.block.header.transactions_root == transactions_root and + block_item.block.header.ommers_hash == ommers_hash do + IO.inspect("yes") + # This is now ready! (though, it may not still have enough commitments) + block = %{ + block_item.block + | transactions: block_struct.transactions, + ommers: block_struct.ommers + } + + Map.put(curr_block_map, hash, %{block_item | block: block, ready: true}) + else + IO.inspect("no") + curr_block_map + end + end) + + Map.put(curr_queue, number, updated_block_map) + end) + + %{block_queue | queue: updated_queue |> Exth.inspect("new queue")} + end + + @spec processed_blocks(t(), list(EVM.hash()), Block.t()) :: t() + def processed_blocks( + block_queue = %{ + block_tree: block_tree, + processing_blocks: processing_blocks + }, + block_hashes, + best_block + ) do + next_processing_blocks = + Enum.reduce(block_hashes, processing_blocks, fn block_hash, curr_processing_blocks -> + Map.delete(curr_processing_blocks, block_hash) + end) + + next_block_tree = + if best_block do + Blocktree.update_best_block(block_tree, best_block) + else + block_tree + end + + %{block_queue | processing_blocks: next_processing_blocks, block_tree: next_block_tree} + end + # Tries to get the transaction root by encoding the transaction trie @spec get_transactions_root([ExRLP.t()]) :: MerklePatriciaTree.Trie.root_hash() defp get_transactions_root(transactions_rlp) do diff --git a/apps/ex_wire/lib/ex_wire/sync.ex b/apps/ex_wire/lib/ex_wire/sync.ex index 42b271897..5a1923255 100644 --- a/apps/ex_wire/lib/ex_wire/sync.ex +++ b/apps/ex_wire/lib/ex_wire/sync.ex @@ -15,20 +15,21 @@ defmodule ExWire.Sync do require Logger - alias Block.Header - alias Blockchain.{Block, Blocktree, Blocktree.State, Chain} + alias Blockchain.{Blocktree, Chain} alias Exth.Time alias ExWire.Packet alias ExWire.PeerSupervisor alias ExWire.Struct.{BlockQueue, Peer, WarpQueue} - alias ExWire.Sync.{WarpProcessor, WarpState} - alias MerklePatriciaTree.{DB, Trie, TrieStorage} + alias ExWire.Sync.{BlockProcessor, BlockState, WarpProcessor, WarpState} + alias MerklePatriciaTree.{Trie, TrieStorage} alias ExWire.Packet.Capability.Eth.{ BlockBodies, BlockHeaders, GetBlockBodies, - GetBlockHeaders + GetBlockHeaders, + NodeData, + Receipts } alias ExWire.Packet.Capability.Par.{ @@ -40,8 +41,6 @@ defmodule ExWire.Sync do alias ExWire.Packet.Capability.Par.SnapshotData.{BlockChunk, StateChunk} - @save_block_interval 100 - @blocks_per_request 100 @startup_delay 10_000 @retry_delay 5_000 @request_limit 5 @@ -57,8 +56,8 @@ defmodule ExWire.Sync do starting_block_number: non_neg_integer() | nil, highest_block_number: non_neg_integer() | nil, warp: boolean(), - warp_processor: GenServer.server(), - fast: boolean() + block_processor: GenServer.server(), + warp_processor: GenServer.server() } @spec get_state(GenServer.server()) :: state @@ -69,12 +68,17 @@ defmodule ExWire.Sync do @doc """ Starts a sync process for a given chain. """ - @spec start_link({Trie.t(), Chain.t(), boolean(), WarpQueue.t() | nil, boolean()}, Keyword.t()) :: - GenServer.on_start() - def start_link({trie, chain, warp, warp_queue, fast}, opts \\ []) do + @spec start_link( + {Trie.t(), Chain.t(), BlockQueue.t(), boolean(), WarpQueue.t() | nil, boolean()}, + Keyword.t() + ) :: GenServer.on_start() + def start_link({trie, chain, block_queue, warp, warp_queue}, opts \\ []) do + block_processor = Keyword.get(opts, :block_processor, BlockProcessor) warp_processor = Keyword.get(opts, :warp_processor, WarpProcessor) - GenServer.start_link(__MODULE__, {trie, chain, warp, warp_queue, warp_processor, fast}, + GenServer.start_link( + __MODULE__, + {trie, chain, block_queue, block_processor, warp, warp_queue, warp_processor}, name: Keyword.get(opts, :name, __MODULE__) ) end @@ -88,24 +92,20 @@ defmodule ExWire.Sync do We should handle this case more gracefully. """ @impl true - def init({trie, chain, warp, warp_queue, warp_processor, fast}) do - block_tree = load_sync_state(TrieStorage.permanent_db(trie)) - block_queue = %BlockQueue{} - - {:ok, {block, _caching_trie}} = Blocktree.get_best_block(block_tree, chain, trie) + def init({trie, chain, block_queue, block_processor, warp, warp_queue, warp_processor}) do + {:ok, {block, _caching_trie}} = Blocktree.get_best_block(block_queue.block_tree, chain, trie) state = %{ chain: chain, block_queue: block_queue, warp_queue: warp_queue, - block_tree: block_tree, trie: trie, last_requested_block: nil, starting_block_number: block.header.number, highest_block_number: block.header.number, warp: warp, - warp_processor: warp_processor, - fast: fast + block_processor: block_processor, + warp_processor: warp_processor } next_state = @@ -120,9 +120,10 @@ defmodule ExWire.Sync do state end else - request_next_block(@startup_delay) - - state + %{ + state + | block_queue: dispatch_new_block_queue_requests(block_queue) + } end {:ok, next_state} @@ -133,6 +134,19 @@ defmodule ExWire.Sync do end @impl true + def handle_cast( + {:processed_blocks, processed_blocks, best_block}, + state = %{block_queue: block_queue} + ) do + next_state = + block_queue + |> BlockQueue.processed_blocks(processed_blocks, best_block) + |> dispatch_new_block_queue_requests() + |> save_and_check_block_state(state) + + {:noreply, next_state} + end + def handle_cast( {:processed_block_chunk, chunk_hash, processed_blocks, block}, state = %{warp_queue: warp_queue} @@ -166,11 +180,52 @@ defmodule ExWire.Sync do @impl true def handle_info( :request_next_block, - state = %{block_queue: block_queue, block_tree: block_tree} + state = %{block_queue: block_queue} ) do - new_state = handle_request_next_block(block_queue, block_tree, state) + next_block_queue = dispatch_new_block_queue_requests(block_queue) - {:noreply, new_state} + {:noreply, + %{ + state + | block_queue: next_block_queue + }} + end + + def handle_info({:get_header_hashes, header_hashes}, state) do + if send_with_retry( + %GetBlockBodies{ + hashes: header_hashes + }, + :random, + {:get_header_hashes, header_hashes} + ) do + :ok = + Logger.debug(fn -> + "[Sync] Requested #{Enum.count(header_hashes)} block bodies on retry" + end) + end + + {:noreply, state} + end + + def handle_info({:get_block_headers, block_number, count}, state) do + if send_with_retry( + %GetBlockHeaders{ + block_identifier: block_number, + max_headers: count, + skip: 0, + reverse: false + }, + :random, + {:get_block_headers, block_number, count} + ) do + :ok = + Logger.debug(fn -> + "[Sync] Requested block headers #{block_number}..#{block_number + count} on retry" + end) + end + + {:noreply, state} end def handle_info(:request_manifest, state) do @@ -193,14 +248,19 @@ defmodule ExWire.Sync do {:noreply, handle_block_bodies(block_bodies, state)} end + def handle_info({:packet, %NodeData{} = node_data, _peer}, state) do + {:noreply, handle_node_data(node_data, state)} + end + + def handle_info({:packet, %Receipts{} = receipts, _peer}, state) do + {:noreply, handle_receipts(receipts, state)} + end + def handle_info({:packet, %SnapshotManifest{} = snapshot_manifest, peer}, state) do {:noreply, handle_snapshot_manifest(snapshot_manifest, peer, state)} end - def handle_info( - {:packet, %SnapshotData{} = snapshot_data, peer}, - state - ) do + def handle_info({:packet, %SnapshotData{} = snapshot_data, peer}, state) do {:noreply, handle_snapshot_data(snapshot_data, peer, state)} end @@ -247,32 +307,6 @@ defmodule ExWire.Sync do state end - @doc """ - Dispatches a packet of `GetBlockHeaders` to a peer for the next block - number that we don't have in our block queue or state tree. - """ - @spec handle_request_next_block(BlockQueue.t(), Blocktree.t(), state()) :: state() - def handle_request_next_block(block_queue, block_tree, state) do - next_block_to_request = get_next_block_to_request(block_queue, block_tree) - - if send_with_retry( - %GetBlockHeaders{ - block_identifier: next_block_to_request, - max_headers: @blocks_per_request, - skip: 0, - reverse: false - }, - :random, - :request_next_block - ) do - :ok = Logger.debug(fn -> "[Sync] Requested block #{next_block_to_request}" end) - - Map.put(state, :last_requested_block, next_block_to_request + @blocks_per_request) - else - state - end - end - @doc """ When we receive a new snapshot manifest, we add it to our warp queue. We may have new blocks to fetch, so we ask the warp queue for more blocks to @@ -300,22 +334,6 @@ defmodule ExWire.Sync do next_state end - @spec dispatch_new_warp_queue_requests(WarpQueue.t(), integer(), integer()) :: WarpQueue.t() - defp dispatch_new_warp_queue_requests( - warp_queue, - request_limit \\ @request_limit, - queue_limit \\ @queue_limit - ) do - {new_warp_queue, hashes_to_request} = - WarpQueue.get_hashes_to_request(warp_queue, request_limit, queue_limit) - - for hash <- hashes_to_request do - request_chunk(hash) - end - - new_warp_queue - end - @doc """ When we receive a SnapshotData, let's try to add the received block to the warp queue. We may decide to request new blocks at this time. @@ -376,62 +394,14 @@ defmodule ExWire.Sync do peer, state = %{ block_queue: block_queue, - block_tree: block_tree, - chain: chain, - trie: trie, - highest_block_number: highest_block_number + block_processor: block_processor, + chain: chain } ) do - {next_highest_block_number, next_block_queue, next_block_tree, next_trie, header_hashes} = - Enum.reduce( - block_headers.headers, - {highest_block_number, block_queue, block_tree, trie, []}, - fn header, {highest_block_number, block_queue, block_tree, trie, header_hashes} -> - header_hash = header |> Header.hash() - - {next_block_queue, next_block_tree, next_trie, should_request_block} = - BlockQueue.add_header( - block_queue, - block_tree, - header, - header_hash, - peer.remote_id, - chain, - trie - ) - - next_header_hashes = - if should_request_block do - :ok = Logger.debug(fn -> "[Sync] Requesting block body #{header.number}" end) - - [header_hash | header_hashes] - else - header_hashes - end - - next_highest_block_number = Kernel.max(highest_block_number, header.number) - - {next_highest_block_number, next_block_queue, next_block_tree, next_trie, - next_header_hashes} - end - ) - - :ok = - PeerSupervisor.send_packet( - %GetBlockBodies{ - hashes: header_hashes - }, - :random - ) - - next_maybe_saved_trie = maybe_save(block_tree, next_block_tree, next_trie) - :ok = maybe_request_next_block(next_block_queue) - - state - |> Map.put(:block_queue, next_block_queue) - |> Map.put(:block_tree, next_block_tree) - |> Map.put(:trie, next_maybe_saved_trie) - |> Map.put(:highest_block_number, next_highest_block_number) + BlockQueue.new_block_headers(block_queue, block_headers, peer) + |> process_completed_blocks(block_processor, chain) + |> dispatch_new_block_queue_requests() + |> save_and_check_block_state(state) end @doc """ @@ -447,78 +417,133 @@ defmodule ExWire.Sync do block_bodies, state = %{ block_queue: block_queue, - block_tree: block_tree, - chain: chain, - trie: trie + block_processor: block_processor, + chain: chain } ) do - {next_block_queue, next_block_tree, next_trie} = - Enum.reduce(block_bodies.blocks, {block_queue, block_tree, trie}, fn block_body, - {block_queue, - block_tree, trie} -> - BlockQueue.add_block_struct(block_queue, block_tree, block_body, chain, trie) - end) - - next_maybe_saved_trie = maybe_save(block_tree, next_block_tree, next_trie) - :ok = maybe_request_next_block(next_block_queue) + BlockQueue.new_block_bodies(block_queue, block_bodies) + |> process_completed_blocks(block_processor, chain) + |> dispatch_new_block_queue_requests() + |> save_and_check_block_state(state) + end - state - |> Map.put(:block_queue, next_block_queue) - |> Map.put(:block_tree, next_block_tree) - |> Map.put(:trie, next_maybe_saved_trie) - end - - # Determines the next block we don't yet have in our blocktree and - # dispatches a request to all connected peers for that block and the - # next `n` blocks after it. - @spec get_next_block_to_request(BlockQueue.t(), Blocktree.t()) :: integer() - defp get_next_block_to_request(block_queue, block_tree) do - # This is the best we know about - next_number = - case block_tree.best_block do - nil -> 0 - %Block{header: %Header{number: number}} -> number + 1 - end + @doc """ + We are not currently doing anything with PV63 node data + """ + @spec handle_node_data(NodeData.t(), state()) :: state() + def handle_node_data( + node_data, + state = %{ + block_queue: block_queue, + block_processor: block_processor, + chain: chain + } + ) do + BlockQueue.new_node_data(block_queue, node_data) + |> process_completed_blocks(block_processor, chain) + |> dispatch_new_block_queue_requests() + |> save_and_check_block_state(state) + end - # But we may have it queued up already in the block queue, let's - # start from the first we *don't* know about. It's possible there's - # holes in block queue, so it's not `max(best_block.number, max(keys(queue)))`, - # though it could be... - next_number - |> Stream.iterate(fn n -> n + 1 end) - |> Stream.reject(fn n -> MapSet.member?(block_queue.block_numbers, n) end) - |> Enum.at(0) + @doc """ + We are not currently doing anything with PV63 receipts + """ + @spec handle_receipts(Receipts.t(), state()) :: state() + def handle_receipts( + receipts, + state = %{ + block_queue: block_queue, + block_processor: block_processor, + chain: chain + } + ) do + BlockQueue.new_receipts(block_queue, receipts) + |> process_completed_blocks(block_processor, chain) + |> dispatch_new_block_queue_requests() + |> save_and_check_block_state(state) end - @spec maybe_save(Blocktree.t(), Blocktree.t(), Trie.t()) :: Trie.t() - defp maybe_save(block_tree, next_block_tree, trie) do - if block_tree != next_block_tree do - block_number = next_block_tree.best_block.header.number + @spec dispatch_new_warp_queue_requests(WarpQueue.t(), integer(), integer()) :: WarpQueue.t() + defp dispatch_new_warp_queue_requests( + warp_queue, + request_limit \\ @request_limit, + queue_limit \\ @queue_limit + ) do + {new_warp_queue, hashes_to_request} = + WarpQueue.get_hashes_to_request(warp_queue, request_limit, queue_limit) - if rem(block_number, @save_block_interval) == 0 do - save_sync_state(next_block_tree, trie) - else - trie - end - else - trie + for hash <- hashes_to_request do + Process.send_after(self(), {:request_chunk, hash}, 0) end - end - @spec request_chunk(EVM.hash()) :: reference() - defp request_chunk(chunk_hash) do - Process.send_after(self(), {:request_chunk, chunk_hash}, 0) + new_warp_queue end - @spec maybe_request_next_block(BlockQueue.t()) :: :ok - defp maybe_request_next_block(block_queue) do - # Let's pull a new block if we have none left - _ = - if block_queue.queue == %{} do - request_next_block() + @spec process_completed_blocks(BlockQueue.t(), GenServer.server(), Chain.t()) :: BlockQueue.t() + defp process_completed_blocks(block_queue, block_processor, chain) do + {next_block_queue, blocks} = BlockQueue.get_complete_blocks(block_queue) + + BlockProcessor.process_completed_blocks(block_processor, blocks, chain) + + next_block_queue + end + + # Dispatches new requests for headers or block data based on what is + # required in the block queue. + @spec dispatch_new_block_queue_requests(BlockQueue.t()) :: BlockQueue.t() + defp dispatch_new_block_queue_requests(block_queue) do + {next_block_queue, requests} = BlockQueue.get_requests(block_queue) + + for request <- requests do + case request do + {:headers, block_number, count} -> + :ok = + Logger.debug(fn -> + "[Sync] Requesting new block headers from #{block_number}..#{block_number + count}." + end) + + unless send_with_retry( + %GetBlockHeaders{ + block_identifier: block_number, + max_headers: count, + skip: 0, + reverse: false + }, + :random, + {:get_block_headers, block_number, count} + ) do + :ok = Logger.debug(fn -> "[Sync] Failed to request block headers" end) + end + + {:bodies, header_hashes} -> + :ok = + Logger.debug(fn -> + "[Sync] Requesting new block bodies for #{Enum.count(header_hashes)} header hash(es)." + end) + + unless send_with_retry( + %GetBlockBodies{ + hashes: header_hashes + }, + :random, + {:get_header_hashes, header_hashes} + ) do + :ok = Logger.debug(fn -> "[Sync] Failed to request header hashes" end) + end end + end - :ok + next_block_queue + end + + @spec save_and_check_block_state(BlockQueue.t(), state(), boolean()) :: state() + def save_and_check_block_state(block_queue, state = %{trie: trie}, save \\ true) do + if save do + :ok = BlockState.save_block_queue(TrieStorage.permanent_db(trie), block_queue) + end + + # TODO: What is check block state for here? + state end @spec save_and_check_warp_complete(WarpQueue.t(), state(), boolean()) :: state() @@ -543,7 +568,8 @@ defmodule ExWire.Sync do Logger.info("[Warp] Warp Completed in #{Time.elapsed(warp_queue.warp_start, :second)}") # Save our process - saved_tried = save_sync_state(warp_queue.block_tree, trie) + # TODO... + # saved_trie = save_sync_state(warp_queue.block_tree, trie) # Request a normal sync to start request_next_block() @@ -554,30 +580,12 @@ defmodule ExWire.Sync do %{ state | warp_queue: warp_queue, - trie: saved_tried, + trie: trie, warp: false } end end - # Loads sync state from our backing database - @spec load_sync_state(DB.db()) :: Blocktree.t() - defp load_sync_state(db) do - State.load_tree(db) - end - - # Save sync state from our backing database. - @spec save_sync_state(Blocktree.t(), Trie.t()) :: Trie.t() - defp save_sync_state(blocktree, trie) do - committed_trie = TrieStorage.commit!(trie) - - committed_trie - |> TrieStorage.permanent_db() - |> State.save_tree(blocktree) - - committed_trie - end - @spec send_with_retry( Packet.packet(), PeerSupervisor.node_selector(), diff --git a/apps/ex_wire/lib/ex_wire/sync/block_processor.ex b/apps/ex_wire/lib/ex_wire/sync/block_processor.ex new file mode 100644 index 000000000..c9fe8565f --- /dev/null +++ b/apps/ex_wire/lib/ex_wire/sync/block_processor.ex @@ -0,0 +1,187 @@ +defmodule ExWire.Sync.BlockProcessor do + @moduledoc """ + + """ + use GenServer + + require Logger + + alias Blockchain.{Block, Blocktree, Chain} + alias Exth.Time + alias ExWire.Sync.BlockProcessor.StandardProcessor + alias MerklePatriciaTree.{Trie, TrieStorage} + + @callback process_blocks( + list(Block.t()), + Blocktree.t(), + backlog(), + Chain.t(), + Trie.t(), + boolean() + ) :: {list(EVM.hash()), Blocktree.t(), backlog(), Trie.t()} + + @type backlog :: %{EVM.hash() => list(Block.t())} + + @type state :: %{ + sup: pid(), + block_processing_task: Task.t(), + queue_blocks_messages: list(Block.t()), + backlog: backlog(), + trie: Trie.t() + } + + @name __MODULE__ + + @doc """ + Initializes a new BlockProcessor server. + """ + @spec start_link({Trie.t()}, Keyword.t()) :: GenServer.on_start() + def start_link({trie}, opts \\ []) do + GenServer.start_link( + __MODULE__, + [trie: trie], + name: Keyword.get(opts, :name, @name) + ) + end + + @doc """ + Initializes gen server with options from `start_link`. + """ + @impl true + def init(trie: trie) do + {:ok, sup} = Task.Supervisor.start_link() + + {:ok, + %{ + sup: sup, + block_processing_task: nil, + queue_blocks_messages: [], + backlog: %{}, + trie: trie + }} + end + + # When a task completes, we try to pull a new task from the queue. + @spec handle_task_complete(term(), term(), state()) :: state() + defp handle_task_complete( + ref, + status, + state = %{ + sup: sup, + trie: trie, + backlog: backlog, + block_processing_task: %Task{ref: task_ref}, + queue_blocks_messages: queue_blocks_messages + } + ) + when ref == task_ref do + case status do + {:ok, {:ok, next_backlog, next_trie}} -> + %{state | backlog: next_backlog, trie: next_trie} + + :down -> + {next_task, next_queue} = + case queue_blocks_messages do + [{:process_completed_blocks, pid, blocks, chain} | next_queue] -> + task = run_task(sup, blocks, chain, pid, backlog, trie) + + {task, next_queue} + + [] -> + {nil, []} + end + + %{state | block_processing_task: next_task, queue_blocks_messages: next_queue} + end + end + + @impl true + # Called when a task completes successfully with the return + # value of that task. + def handle_info({ref, msg}, state) do + {:noreply, handle_task_complete(ref, {:ok, msg}, state)} + end + + # Called when a task completes informing the supervisor that a child + # has terminated normally. + def handle_info({:DOWN, ref, :process, _pid, :normal}, state) do + {:noreply, handle_task_complete(ref, :down, state)} + end + + @impl true + def handle_cast( + {:process_completed_blocks, _pid, [], _chain}, + state + ) do + {:noreply, state} + end + + def handle_cast( + blocks_message = {:process_completed_blocks, pid, blocks, chain}, + state = %{ + sup: sup, + trie: trie, + backlog: backlog, + block_processing_task: task, + queue_blocks_messages: queue_blocks_messages + } + ) do + {next_task, next_queue_blocks_messages} = + if is_nil(task) do + {run_task(sup, blocks, chain, pid, backlog, trie), queue_blocks_messages} + else + {task, [blocks_message | queue_blocks_messages]} + end + |> Exth.inspect("Process completed blocks") + + {:noreply, + %{ + state + | block_processing_task: next_task, + queue_blocks_messages: next_queue_blocks_messages + }} + end + + @spec run_task(pid(), list(Block.t()), Chain.t(), pid(), backlog(), Trie.t()) :: Task.t() + defp run_task(sup, blocks, chain, pid, backlog, trie) do + Task.Supervisor.async(sup, fn -> + start = Time.time_start() + + :ok = + Logger.debug(fn -> + "[BlockProcessor] Starting to process #{Enum.count(blocks)} block(s)." + end) + + {processed_blocks, next_block_tree, next_backlog, next_trie} = + StandardProcessor.process_blocks( + blocks, + Blocktree.new_tree(), + backlog, + chain, + trie, + false + ) + + trie_elapsed = + Time.elapsed(fn -> + TrieStorage.commit!(next_trie) + end) + + :ok = + Logger.debug(fn -> + "[BlockProcessor] Processed #{Enum.count(processed_blocks)} block(s) in #{ + Time.elapsed(start) + } (trie commit time: #{trie_elapsed})." + end) + + :ok = GenServer.cast(pid, {:processed_blocks, processed_blocks, next_block_tree.best_block}) + + {:ok, next_backlog, next_trie} + end) + end + + @spec process_completed_blocks(pid(), list(Block.t()), Chain.t()) :: :ok + def process_completed_blocks(pid, blocks, chain) do + GenServer.cast(pid, {:process_completed_blocks, self(), blocks, chain}) + end +end diff --git a/apps/ex_wire/lib/ex_wire/sync/block_processor/standard_processor.ex b/apps/ex_wire/lib/ex_wire/sync/block_processor/standard_processor.ex new file mode 100644 index 000000000..6847e88fa --- /dev/null +++ b/apps/ex_wire/lib/ex_wire/sync/block_processor/standard_processor.ex @@ -0,0 +1,122 @@ +defmodule ExWire.Sync.BlockProcessor.StandardProcessor do + @moduledoc """ + + """ + require Logger + + alias Blockchain.{Block, Blocktree, Chain} + alias ExWire.Sync.BlockProcessor + alias MerklePatriciaTree.Trie + + @behaviour BlockProcessor + + @doc """ + Processes a the block queue, adding any blocks which are complete and pass + the number of confirmations to the block tree. These blocks are then removed + from the queue. Note: they may end up in the backlog, nonetheless, if we are + waiting still for the parent block. + """ + @spec process_blocks( + list(Block.t()), + Blocktree.t(), + BlockProcessor.backlog(), + Chain.t(), + Trie.t(), + boolean() + ) :: {list(EVM.hash()), Blocktree.t(), BlockProcessor.backlog(), Trie.t()} + def process_blocks( + blocks, + block_tree, + backlog, + chain, + trie, + do_validation + ) do + do_process_blocks(blocks, [], block_tree, backlog, chain, trie, do_validation) + end + + @spec do_process_blocks( + list(Block.t()), + list(EVM.hash()), + Blocktree.t(), + BlockProcessor.backlog(), + Chain.t(), + Trie.t(), + boolean() + ) :: {list(EVM.hash()), Blocktree.t(), BlockProcessor.backlog(), Trie.t()} + defp do_process_blocks([], processed_blocks, block_tree, backlog, _chain, trie, _do_validation), + do: {processed_blocks, block_tree, backlog, trie} + + defp do_process_blocks( + [block | rest], + processed_blocks, + block_tree, + backlog, + chain, + trie, + do_validation + ) do + {processed_block_hash, new_block_tree, new_trie, new_backlog, extra_blocks} = + case Blocktree.verify_and_add_block( + block_tree, + chain, + block, + trie, + do_validation + ) do + {:invalid, [:non_genesis_block_requires_parent]} -> + # Note: this is probably too slow since we see a lot of blocks without + # parents and, I think, we're running the full validity check. + + :ok = + Logger.debug(fn -> "[Block Queue] Failed to verify block due to missing parent" end) + + updated_backlog = + Map.update( + backlog, + block.header.parent_hash, + [block], + fn blocks -> [block | blocks] end + ) + + {nil, block_tree, trie, updated_backlog, []} + + {:invalid, reasons} -> + :ok = + Logger.debug(fn -> + "[Block Queue] Failed to verify block ##{block.header.number} due to #{ + inspect(reasons) + }" + end) + + {nil, block_tree, trie, backlog, []} + + {:ok, {new_block_tree, new_trie, block_hash}} -> + # Weird that we can't verify block 0.... + + :ok = + Logger.debug(fn -> + "[Block Queue] Verified block ##{block.header.number} (0x#{ + Base.encode16(block_hash, case: :lower) + })" + end) + + {backlogged_blocks, new_backlog} = Map.pop(backlog, block_hash, []) + + {block_hash, new_block_tree, new_trie, new_backlog, backlogged_blocks} + end + + do_process_blocks( + extra_blocks ++ rest, + if(processed_block_hash, + do: [processed_block_hash | processed_blocks], + else: processed_blocks + ), + new_block_tree, + new_backlog, + chain, + new_trie, + do_validation + ) + end +end diff --git a/apps/ex_wire/lib/ex_wire/sync/block_state.ex b/apps/ex_wire/lib/ex_wire/sync/block_state.ex new file mode 100644 index 000000000..35df744b9 --- /dev/null +++ b/apps/ex_wire/lib/ex_wire/sync/block_state.ex @@ -0,0 +1,46 @@ +defmodule ExWire.Sync.BlockState do + @moduledoc """ + This module exposes functions to store and load the current state of + a block sync in the database. + """ + require Logger + + alias ExWire.Struct.BlockQueue + alias MerklePatriciaTree.DB + + @key "current_block_queue_9" + + @doc """ + Loads the current block queue from database. + """ + @spec load_block_queue(DB.db()) :: WarpQueue.t() + def load_block_queue(db) do + case DB.get(db, @key) do + {:ok, current_block_queue} -> + :erlang.binary_to_term(current_block_queue) + + :not_found -> + %BlockQueue{} + end + end + + @doc """ + Stores the current block queue into the database. + """ + @spec save_block_queue(DB.db(), BlockQueue.t()) :: :ok + def save_block_queue(db, block_queue) do + :ok = Logger.debug(fn -> "Saving block queue..." end) + + DB.put!( + db, + @key, + :erlang.term_to_binary(%{ + block_queue + | header_requests: MapSet.new(), + block_requests: MapSet.new() + }) + ) + + :ok + end +end