Skip to content

Commit

Permalink
feat: send metadata (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
wedancedalot authored Aug 17, 2023
1 parent a88d83b commit 2e17e21
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 25 deletions.
2 changes: 1 addition & 1 deletion config/geyser-plugin-config.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"libpath": "../target/release/libsolana_geyser_plugin_scaffold.so",
"tcp_port": 2000,
"tcp_buffer_size": 500,
"tcp_buffer_size": 5000,
"tcp_batch_max_bytes": 2097152,
"send_transactions": true,
"send_accounts": true,
Expand Down
2 changes: 1 addition & 1 deletion src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use thiserror::Error;
#[derive(Error, Debug)]
pub enum GeyserError {
#[error("tcp send error")]
TcpSend(usize),
TcpSend(u64),

#[error("cannot acquire sender lock")]
SenderLockError,
Expand Down
7 changes: 7 additions & 0 deletions src/flatbuffer/metadata.fbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Metadata;

table Metadata {
send_errors: ulong;
}

root_type Metadata;
195 changes: 195 additions & 0 deletions src/flatbuffer/metadata_generated.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// automatically generated by the FlatBuffers compiler, do not modify

// @generated

extern crate flatbuffers;

#[allow(unused_imports, dead_code)]
pub mod metadata {

use core::cmp::Ordering;
use core::mem;

extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};

pub enum MetadataOffset {}
#[derive(Copy, Clone, PartialEq)]

pub struct Metadata<'a> {
pub _tab: flatbuffers::Table<'a>,
}

impl<'a> flatbuffers::Follow<'a> for Metadata<'a> {
type Inner = Metadata<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self {
_tab: flatbuffers::Table::new(buf, loc),
}
}
}

impl<'a> Metadata<'a> {
pub const VT_SEND_ERRORS: flatbuffers::VOffsetT = 4;

#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
Metadata { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>,
args: &'args MetadataArgs,
) -> flatbuffers::WIPOffset<Metadata<'bldr>> {
let mut builder = MetadataBuilder::new(_fbb);
builder.add_send_errors(args.send_errors);
builder.finish()
}

#[inline]
pub fn send_errors(&self) -> u64 {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe {
self._tab
.get::<u64>(Metadata::VT_SEND_ERRORS, Some(0))
.unwrap()
}
}
}

impl flatbuffers::Verifiable for Metadata<'_> {
#[inline]
fn run_verifier(
v: &mut flatbuffers::Verifier,
pos: usize,
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<u64>("send_errors", Self::VT_SEND_ERRORS, false)?
.finish();
Ok(())
}
}
pub struct MetadataArgs {
pub send_errors: u64,
}
impl<'a> Default for MetadataArgs {
#[inline]
fn default() -> Self {
MetadataArgs { send_errors: 0 }
}
}

pub struct MetadataBuilder<'a: 'b, 'b> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b> MetadataBuilder<'a, 'b> {
#[inline]
pub fn add_send_errors(&mut self, send_errors: u64) {
self.fbb_
.push_slot::<u64>(Metadata::VT_SEND_ERRORS, send_errors, 0);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> MetadataBuilder<'a, 'b> {
let start = _fbb.start_table();
MetadataBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<Metadata<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}

impl core::fmt::Debug for Metadata<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("Metadata");
ds.field("send_errors", &self.send_errors());
ds.finish()
}
}
#[inline]
/// Verifies that a buffer of bytes contains a `Metadata`
/// and returns it.
/// Note that verification is still experimental and may not
/// catch every error, or be maximally performant. For the
/// previous, unchecked, behavior use
/// `root_as_metadata_unchecked`.
pub fn root_as_metadata(buf: &[u8]) -> Result<Metadata, flatbuffers::InvalidFlatbuffer> {
flatbuffers::root::<Metadata>(buf)
}
#[inline]
/// Verifies that a buffer of bytes contains a size prefixed
/// `Metadata` and returns it.
/// Note that verification is still experimental and may not
/// catch every error, or be maximally performant. For the
/// previous, unchecked, behavior use
/// `size_prefixed_root_as_metadata_unchecked`.
pub fn size_prefixed_root_as_metadata(
buf: &[u8],
) -> Result<Metadata, flatbuffers::InvalidFlatbuffer> {
flatbuffers::size_prefixed_root::<Metadata>(buf)
}
#[inline]
/// Verifies, with the given options, that a buffer of bytes
/// contains a `Metadata` and returns it.
/// Note that verification is still experimental and may not
/// catch every error, or be maximally performant. For the
/// previous, unchecked, behavior use
/// `root_as_metadata_unchecked`.
pub fn root_as_metadata_with_opts<'b, 'o>(
opts: &'o flatbuffers::VerifierOptions,
buf: &'b [u8],
) -> Result<Metadata<'b>, flatbuffers::InvalidFlatbuffer> {
flatbuffers::root_with_opts::<Metadata<'b>>(opts, buf)
}
#[inline]
/// Verifies, with the given verifier options, that a buffer of
/// bytes contains a size prefixed `Metadata` and returns
/// it. Note that verification is still experimental and may not
/// catch every error, or be maximally performant. For the
/// previous, unchecked, behavior use
/// `root_as_metadata_unchecked`.
pub fn size_prefixed_root_as_metadata_with_opts<'b, 'o>(
opts: &'o flatbuffers::VerifierOptions,
buf: &'b [u8],
) -> Result<Metadata<'b>, flatbuffers::InvalidFlatbuffer> {
flatbuffers::size_prefixed_root_with_opts::<Metadata<'b>>(opts, buf)
}
#[inline]
/// Assumes, without verification, that a buffer of bytes contains a Metadata and returns it.
/// # Safety
/// Callers must trust the given bytes do indeed contain a valid `Metadata`.
pub unsafe fn root_as_metadata_unchecked(buf: &[u8]) -> Metadata {
flatbuffers::root_unchecked::<Metadata>(buf)
}
#[inline]
/// Assumes, without verification, that a buffer of bytes contains a size prefixed Metadata and returns it.
/// # Safety
/// Callers must trust the given bytes do indeed contain a valid size prefixed `Metadata`.
pub unsafe fn size_prefixed_root_as_metadata_unchecked(buf: &[u8]) -> Metadata {
flatbuffers::size_prefixed_root_unchecked::<Metadata>(buf)
}
#[inline]
pub fn finish_metadata_buffer<'a, 'b>(
fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>,
root: flatbuffers::WIPOffset<Metadata<'a>>,
) {
fbb.finish(root, None);
}

#[inline]
pub fn finish_size_prefixed_metadata_buffer<'a, 'b>(
fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>,
root: flatbuffers::WIPOffset<Metadata<'a>>,
) {
fbb.finish_size_prefixed(root, None);
}
} // pub mod Metadata
16 changes: 16 additions & 0 deletions src/flatbuffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use solana_program::pubkey::Pubkey;
use solana_sdk::signature::Signature;
use solana_transaction_status::{UiReturnDataEncoding, UiTransactionReturnData};

use self::metadata_generated::metadata::{Metadata, MetadataArgs};
use self::{
block_info_generated::block_info::{BlockInfo, BlockInfoArgs},
common_generated::common::{Reward, RewardArgs, RewardType},
Expand All @@ -36,6 +37,8 @@ mod block_info_generated;
#[allow(dead_code, clippy::all)]
mod common_generated;
#[allow(dead_code, clippy::all)]
mod metadata_generated;
#[allow(dead_code, clippy::all)]
mod slot_generated;
#[allow(dead_code, clippy::all)]
mod transaction_info_generated;
Expand All @@ -47,6 +50,7 @@ const BYTE_PREFIX_ACCOUNT: u8 = 0;
const BYTE_PREFIX_SLOT: u8 = 1;
const BYTE_PREFIX_TX: u8 = 2;
const BYTE_PREFIX_BLOCK: u8 = 3;
const BYTE_PREFIX_METADATA: u8 = 4;

pub struct AccountUpdate {
/// The account's public key
Expand Down Expand Up @@ -1341,3 +1345,15 @@ pub fn serialize_transaction(transaction: &TransactionUpdate) -> Result<Vec<u8>,

Ok(output)
}

pub fn serialize_metadata(send_errors: u64) -> Vec<u8> {
let mut builder = FlatBufferBuilder::new();

let obj = Metadata::create(&mut builder, &MetadataArgs { send_errors });
builder.finish(obj, None);

let mut output = vec![BYTE_PREFIX_METADATA];
output.extend(builder.finished_data().to_vec());

output
}
16 changes: 12 additions & 4 deletions src/geyser_plugin_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,22 @@ impl GeyserPlugin for GeyserPluginHook {

info!("[on_load] - socket created");

self.0 = Some(Arc::new(Inner {
let plugin = Arc::new(Inner {
socket,
metrics: metrics.clone(),
config: cfg,
}));
});

self.0 = Some(plugin.clone());

thread::spawn(move || loop {
let data = flatbuffer::serialize_metadata(metrics.send_errs.load(Ordering::Relaxed));
if let Err(e) = plugin.socket.publish(data) {
info!("{}", e);
}

thread::spawn(move || {
metrics.spin(Duration::from_secs(10));
info!("{}", metrics);
thread::sleep(Duration::from_secs(10));
});

Ok(())
Expand Down
30 changes: 11 additions & 19 deletions src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,23 @@
use log::info;
use std::{fmt, sync::Arc, thread, time};
use std::{fmt, sync::Arc};

pub struct Metrics {
pub send_errs: std::sync::atomic::AtomicUsize,
pub serialize_errs: std::sync::atomic::AtomicUsize,
pub sender_lock_errs: std::sync::atomic::AtomicUsize,
pub conn_lock_errs: std::sync::atomic::AtomicUsize,
pub untyped_errs: std::sync::atomic::AtomicUsize,
pub send_errs: std::sync::atomic::AtomicU64,
pub serialize_errs: std::sync::atomic::AtomicU64,
pub sender_lock_errs: std::sync::atomic::AtomicU64,
pub conn_lock_errs: std::sync::atomic::AtomicU64,
pub untyped_errs: std::sync::atomic::AtomicU64,
}

impl Metrics {
pub fn new_rc() -> Arc<Self> {
Arc::new(Self {
send_errs: std::sync::atomic::AtomicUsize::new(0),
serialize_errs: std::sync::atomic::AtomicUsize::new(0),
sender_lock_errs: std::sync::atomic::AtomicUsize::new(0),
conn_lock_errs: std::sync::atomic::AtomicUsize::new(0),
untyped_errs: std::sync::atomic::AtomicUsize::new(0),
send_errs: std::sync::atomic::AtomicU64::new(0),
serialize_errs: std::sync::atomic::AtomicU64::new(0),
sender_lock_errs: std::sync::atomic::AtomicU64::new(0),
conn_lock_errs: std::sync::atomic::AtomicU64::new(0),
untyped_errs: std::sync::atomic::AtomicU64::new(0),
})
}

pub fn spin(&self, interval: time::Duration) {
loop {
info!("{}", self);
thread::sleep(interval)
}
}
}

impl std::fmt::Display for Metrics {
Expand Down

0 comments on commit 2e17e21

Please sign in to comment.