Skip to content

Commit

Permalink
Make the accountsdb plugin sensitive to is_startup. (#166)
Browse files Browse the repository at this point in the history
* Initial commit for accountsdb startup types.

* Fix sample_config.json

* Tweak semantics of exchange and queue formatting.
  • Loading branch information
ray-kast authored Feb 21, 2022
1 parent b7d3a18 commit 640dd79
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 69 deletions.
30 changes: 16 additions & 14 deletions crates/accountsdb-rabbitmq/sample_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@
"network": "devnet",
"address": "amqp://"
},
"accountOwners": [
"metaqbxxUerdq28cj1RbAWkYQm3ybzjb6a8bt518x1s",
"vau1zxA2LbssAUEF7Gpw91zMM1LvXrvpzJtmZ58rPsn",
"auctxRXPeJoc4817jDhf4HbjnhEcr1cCXenosMhK5R8",
"p1exdMJcjVao65QdewkaZRUnU6VPSXhus9n2GzWfh98",
"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA",
"hausS13jsjafwWwGqZTUQRmWyvyxn9EQpqMwV1PBBmk"

],
"instruction_programs": [

]

}
"jobs": {
"limit": 16
},
"accounts": {
"owners": [
"metaqbxxUerdq28cj1RbAWkYQm3ybzjb6a8bt518x1s",
"vau1zxA2LbssAUEF7Gpw91zMM1LvXrvpzJtmZ58rPsn",
"auctxRXPeJoc4817jDhf4HbjnhEcr1cCXenosMhK5R8",
"p1exdMJcjVao65QdewkaZRUnU6VPSXhus9n2GzWfh98",
"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA",
"hausS13jsjafwWwGqZTUQRmWyvyxn9EQpqMwV1PBBmk"
],
"startup": false
},
"instructionPrograms": []
}
24 changes: 20 additions & 4 deletions crates/accountsdb-rabbitmq/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub struct Config {
jobs: Jobs,

#[serde(default)]
account_owners: HashSet<String>,
accounts: Accounts,

#[serde(default)]
instruction_programs: HashSet<String>,
Expand All @@ -37,6 +37,22 @@ pub struct Jobs {
pub limit: usize,
}

#[derive(Debug, Default, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Accounts {
#[serde(default)]
pub owners: HashSet<String>,

/// Filter for changing how to interpret the `is_startup` flag.
///
/// This option has three states:
/// - `None`: Ignore the `is_startup` flag and send all updates.
/// - `Some(true)`: Only send updates when `is_startup` is `true`.
/// - `Some(false)`: Only send updates when `is_startup` is `false`.
#[serde(default)]
pub startup: Option<bool>,
}

impl Config {
pub fn read(path: &str) -> Result<Self> {
let f = std::fs::File::open(path).context("Failed to open config file")?;
Expand All @@ -49,12 +65,12 @@ impl Config {
let Self {
amqp,
jobs,
account_owners,
accounts,
instruction_programs,
} = self;

let acct = AccountSelector::from_config(account_owners)
.context("Failed to create account selector")?;
let acct =
AccountSelector::from_config(accounts).context("Failed to create account selector")?;
let ins = InstructionSelector::from_config(instruction_programs)
.context("Failed to create instruction selector")?;

Expand Down
74 changes: 43 additions & 31 deletions crates/accountsdb-rabbitmq/src/plugin.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashSet, io::Read};
use std::collections::HashSet;

use indexer_rabbitmq::{
accountsdb::{AccountUpdate, Message, Producer, QueueType},
Expand Down Expand Up @@ -42,7 +42,7 @@ pub struct AccountsDbPluginRabbitMq {
producer: Option<Sender<Producer>>,
acct_sel: Option<AccountSelector>,
ins_sel: Option<InstructionSelector>,
token_addresses: HashSet<String>,
token_addresses: HashSet<Pubkey>,
}

#[derive(Deserialize)]
Expand All @@ -55,6 +55,25 @@ struct TokenList {
tokens: Vec<TokenItem>,
}

impl AccountsDbPluginRabbitMq {
const TOKEN_REG_URL: &'static str = "https://raw.githubusercontent.com/solana-labs/token-list/main/src/tokens/solana.tokenlist.json";

async fn load_token_reg() -> Result<HashSet<Pubkey>> {
let res: TokenList = reqwest::get(Self::TOKEN_REG_URL)
.await
.map_err(custom_err)?
.json()
.await
.map_err(custom_err)?;

res.tokens
.into_iter()
.map(|TokenItem { address }| address.parse())
.collect::<StdResult<_, _>>()
.map_err(custom_err)
}
}

impl AccountsDbPlugin for AccountsDbPluginRabbitMq {
fn name(&self) -> &'static str {
"AccountsDbPluginRabbitMq"
Expand All @@ -67,38 +86,27 @@ impl AccountsDbPlugin for AccountsDbPluginRabbitMq {
.and_then(Config::into_parts)
.map_err(custom_err)?;

let startup_type = acct.startup();

self.acct_sel = Some(acct);
self.ins_sel = Some(ins);

let mut res = reqwest::blocking::get("https://raw.githubusercontent.com/solana-labs/token-list/main/src/tokens/solana.tokenlist.json")
.expect("couldn't fetch token list");
let mut body = String::new();
res.read_to_string(&mut body)?;

let json: TokenList = serde_json::from_str(&body).expect("file should be proper JSON");

let mut token_addresses: HashSet<String> = HashSet::new();
for token_data in json.tokens {
let address = token_data.address;
token_addresses.insert(address);
}

self.token_addresses = token_addresses;

smol::block_on(async {
let conn =
Connection::connect(&amqp.address, ConnectionProperties::default().with_smol())
.await
.map_err(custom_err)?;

self.producer = Some(Sender::new(
QueueType::new(amqp.network, None)
QueueType::new(amqp.network, startup_type, None)
.producer(&conn)
.await
.map_err(custom_err)?,
jobs.limit,
));

self.token_addresses = Self::load_token_reg().await?;

Ok(())
})
}
Expand All @@ -118,7 +126,12 @@ impl AccountsDbPlugin for AccountsDbPluginRabbitMq {
smol::block_on(async {
match account {
ReplicaAccountInfoVersions::V0_0_1(acct) => {
if !self.acct_sel.as_ref().ok_or_else(uninit)?.is_selected(acct) {
if !self
.acct_sel
.as_ref()
.ok_or_else(uninit)?
.is_selected(acct, is_startup)
{
return Ok(());
}

Expand All @@ -132,25 +145,24 @@ impl AccountsDbPlugin for AccountsDbPluginRabbitMq {
write_version,
} = *acct;

let key = Pubkey::new_from_array(pubkey.try_into().map_err(custom_err)?);
let owner = Pubkey::new_from_array(owner.try_into().map_err(custom_err)?);
let data = data.to_owned();

if owner == ids::token() && data.len() == TokenAccount::get_packed_len() {
let token_account = TokenAccount::unpack_from_slice(&data);
if owner == ids::token().as_ref()
&& data.len() == TokenAccount::get_packed_len()
{
let token_account = TokenAccount::unpack_from_slice(data);

if let Ok(token_account) = token_account {
if token_account.amount > 1 {
return Ok(());
}
let mint = token_account.mint.to_string();
let is_token = self.token_addresses.contains(&mint);
if is_token {
if token_account.amount > 1
|| self.token_addresses.contains(&token_account.mint)
{
return Ok(());
}
}
}

let key = Pubkey::new_from_array(pubkey.try_into().map_err(custom_err)?);
let owner = Pubkey::new_from_array(owner.try_into().map_err(custom_err)?);
let data = data.to_owned();

self.producer
.as_ref()
.ok_or_else(uninit)?
Expand Down
18 changes: 14 additions & 4 deletions crates/accountsdb-rabbitmq/src/selectors.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
use std::collections::HashSet;

use indexer_rabbitmq::accountsdb::StartupType;
use solana_program::instruction::CompiledInstruction;

use super::config::Accounts;
use crate::{interface::ReplicaAccountInfo, prelude::*};

#[derive(Debug)]
pub struct AccountSelector {
owners: HashSet<Box<[u8]>>,
startup: Option<bool>,
}

impl AccountSelector {
pub fn from_config(owners: HashSet<String>) -> Result<Self> {
pub fn from_config(config: Accounts) -> Result<Self> {
let Accounts { owners, startup } = config;

let owners = owners
.into_iter()
.map(|s| {
Expand All @@ -20,12 +25,17 @@ impl AccountSelector {
.collect::<Result<_, _>>()
.context("Failed to parse account owner keys")?;

Ok(Self { owners })
Ok(Self { owners, startup })
}

#[inline]
pub fn startup(&self) -> StartupType {
StartupType::new(self.startup)
}

#[inline]
pub fn is_selected(&self, acct: &ReplicaAccountInfo) -> bool {
self.owners.contains(acct.owner)
pub fn is_selected(&self, acct: &ReplicaAccountInfo, is_startup: bool) -> bool {
self.startup.map_or(true, |s| is_startup == s) && self.owners.contains(acct.owner)
}
}

Expand Down
13 changes: 7 additions & 6 deletions crates/cli/src/repl/rmq.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,32 @@
use anyhow::Context;
use docbot::prelude::*;
use indexer_rabbitmq::{
accountsdb::{Network, QueueType},
accountsdb::{Network, QueueType, StartupType},
lapin::{Connection, ConnectionProperties},
prelude::*,
};
use lapinou::LapinSmolExt;

#[derive(Docbot)]
pub enum RmqCommand {
/// `listen <network> <address> <suffix>`
/// `listen <address> <network> <startup> <suffix>`
/// Open an AMQP connection to the specified address
///
/// # Arguments
/// network: The network identifier of the server to listen for
/// address: The address to connect to
/// network: The network identifier of the server to listen for
/// startup: The startup-type identifier of the server to listen for
/// suffix: A unique identifier to suffix a new queue with
Listen(Network, String, String),
Listen(String, Network, StartupType, String),
}

pub fn handle(cmd: RmqCommand) -> super::Result {
match cmd {
RmqCommand::Listen(network, addr, suffix) => smol::block_on(async {
RmqCommand::Listen(addr, network, startup, suffix) => smol::block_on(async {
let conn = Connection::connect(&addr, ConnectionProperties::default().with_smol())
.await
.context("Failed to connect to the AMQP server")?;
let mut consumer = QueueType::new(network, Some(&suffix))
let mut consumer = QueueType::new(network, startup, Some(&suffix))
.consumer(&conn)
.await
.context("Failed to create a consumer")?;
Expand Down
7 changes: 6 additions & 1 deletion crates/indexer/src/bin/metaplex-indexer-accountsdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ struct Args {
#[clap(long, env)]
network: accountsdb::Network,

/// The startup type of events to listen for
#[clap(long, env, default_value_t = accountsdb::StartupType::Normal)]
startup: accountsdb::StartupType,

/// An optional suffix for the AMQP queue ID
///
/// For debug builds a value must be provided here to avoid interfering with
Expand All @@ -24,6 +28,7 @@ fn main() {
|Args {
amqp_url,
network,
startup,
queue_suffix,
},
db| async move {
Expand All @@ -45,7 +50,7 @@ fn main() {

let mut consumer = accountsdb::Consumer::new(
&conn,
accountsdb::QueueType::new(network, queue_suffix.as_deref()),
accountsdb::QueueType::new(network, startup, queue_suffix.as_deref()),
)
.await
.context("Failed to create queue consumer")?;
Expand Down
Loading

0 comments on commit 640dd79

Please sign in to comment.