Skip to content

Commit

Permalink
fix: Fix incorrect link message padding (#1994)
Browse files Browse the repository at this point in the history
## Motivation

During the rust migration, link messages didn't have the full 0 padding
for add/remove keys. Fix that for future messages and add checks to
check for the incorrect padding and treat them the same as the correct
link messages everywhere.

Thank you @FrederikBolding for finding the bug in
#1983

## Change Summary

Describe the changes being made in 1-2 concise sentences.

## Merge Checklist

_Choose all relevant options below by adding an `x` now or at any time
before submitting for review_

- [x] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [x] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [x] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [ ] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.
- [x] All [commits have been
signed](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#22-signing-commits)

## Additional Context

If this is a relatively large or complex change, provide more details
here that will help reviewers


<!-- start pr-codex -->

---

## PR-Codex overview
This PR fixes a padding bug in the link messages and refactors key
generation functions in the `LinkStore`.

### Detailed summary
- Fixed incorrect link message padding bug
- Refactored key generation functions in `LinkStore`
- Added padding with zero bytes for link type keys

> The following files were skipped due to too many changes:
`apps/hubble/src/addon/src/store/link_store.rs`

> ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->

---------

Co-authored-by: Frederik Bolding <[email protected]>
  • Loading branch information
sanjayprabhu and FrederikBolding authored May 12, 2024
1 parent 7486e4b commit 8e7dec1
Show file tree
Hide file tree
Showing 8 changed files with 409 additions and 41 deletions.
5 changes: 5 additions & 0 deletions .changeset/fast-emus-own.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---

fix: Fix incorrect link message padding
8 changes: 8 additions & 0 deletions apps/hubble/src/addon/src/store/cast_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,14 @@ impl StoreDef for CastStoreDef {
Ok(())
}

fn delete_remove_secondary_indices(
&self,
_txn: &mut RocksDbTransactionBatch,
_message: &Message,
) -> Result<(), HubError> {
Ok(())
}

fn make_add_key(&self, message: &protos::Message) -> Result<Vec<u8>, HubError> {
let hash = match message.data.as_ref().unwrap().body.as_ref() {
Some(message_data::Body::CastAddBody(_)) => message.hash.as_ref(),
Expand Down
256 changes: 216 additions & 40 deletions apps/hubble/src/addon/src/store/link_store.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::{borrow::Borrow, convert::TryInto, sync::Arc};

use crate::db::{RocksDB, RocksDbTransactionBatch};
use crate::logger::LOGGER;
use crate::protos::link_body::Target;
use crate::protos::message_data::Body;
use crate::protos::{message_data, LinkBody, Message, MessageData, MessageType};
use crate::store::{
get_page_options, get_store, hub_error_to_js_throw, make_fid_key, make_user_key, message,
HubError, IntoI32, IntoU8, MessagesPage, PageOptions, RootPrefix, Store, StoreDef,
StoreEventHandler, UserPostfix, PAGE_SIZE_MAX, TS_HASH_LENGTH,
get_message, get_page_options, get_store, hub_error_to_js_throw, make_fid_key, make_user_key,
message, utils, HubError, IntoI32, IntoU8, MessagesPage, PageOptions, RootPrefix, Store,
StoreDef, StoreEventHandler, UserPostfix, PAGE_SIZE_MAX, TS_HASH_LENGTH,
};
use crate::{protos, THREAD_POOL};
use neon::prelude::{JsPromise, JsString};
Expand All @@ -18,6 +19,7 @@ use neon::{
types::{JsBox, JsNumber},
};
use prost::Message as _;
use slog::{o, warn};

use super::deferred_settle_messages;

Expand Down Expand Up @@ -117,7 +119,22 @@ impl LinkStore {
..Default::default()
};

store.get_add(&partial_message)
let result = store.get_add(&partial_message);
if let Ok(None) = result {
// Check for incorrectly padded keys
let unpadded_key = Self::make_add_key_padded(&partial_message, false)?;
let message_ts_hash = store.db().get(&unpadded_key)?;
if message_ts_hash.is_none() {
return Ok(None);
}
return get_message(
store.db().borrow(),
partial_message.data.as_ref().unwrap().fid as u32,
store.store_def().postfix(),
&utils::vec_to_u8_24(&message_ts_hash)?,
);
}
result
}

pub fn get_link_adds_by_fid(
Expand Down Expand Up @@ -232,8 +249,22 @@ impl LinkStore {
..Default::default()
};

let r = store.get_remove(&partial_message);
r
let result = store.get_remove(&partial_message);
if let Ok(None) = result {
// Check for incorrectly padded keys
let unpadded_key = Self::make_remove_key_padded(&partial_message, false)?;
let message_ts_hash = store.db().get(&unpadded_key)?;
if message_ts_hash.is_none() {
return Ok(None);
}
return get_message(
store.db().borrow(),
partial_message.data.as_ref().unwrap().fid as u32,
store.store_def().postfix(),
&utils::vec_to_u8_24(&message_ts_hash)?,
);
}
result
}

// Generates a unique key used to store a LinkCompactState message key in the store
Expand All @@ -244,7 +275,10 @@ impl LinkStore {

key.extend_from_slice(&make_user_key(fid));
key.push(UserPostfix::LinkCompactStateMessage.as_u8());
key.extend_from_slice(&link_type.as_bytes());
let type_bytes = &mut link_type.as_bytes().to_vec();
// Pad with zero bytes
type_bytes.resize(Self::LINK_TYPE_BYTE_SIZE, 0);
key.extend_from_slice(&type_bytes);

Ok(key)
}
Expand All @@ -256,7 +290,7 @@ impl LinkStore {
/// * `fid` - farcaster id of the user who created the link
/// * `link_body` - body of link that contains type of link created and target ID of the object
/// being reacted to
fn link_add_key(fid: u32, link_body: &LinkBody) -> Result<Vec<u8>, HubError> {
fn link_add_key(fid: u32, link_body: &LinkBody, padded: bool) -> Result<Vec<u8>, HubError> {
if link_body.target.is_some()
&& (link_body.r#type.is_empty() || link_body.r#type.len() == 0)
{
Expand All @@ -280,10 +314,14 @@ impl LinkStore {
+ Self::TARGET_ID_BYTE_SIZE,
);

// TODO: does the fid and rtype need to be padded? Is it okay not the check their lengths?
key.extend_from_slice(&make_user_key(fid));
key.push(UserPostfix::LinkAdds.as_u8());
key.extend_from_slice(&link_body.r#type.as_bytes());
let type_bytes = &mut link_body.r#type.as_bytes().to_vec();
if padded {
// Pad with zero bytes
type_bytes.resize(Self::LINK_TYPE_BYTE_SIZE, 0);
}
key.extend_from_slice(&type_bytes);
match link_body.target {
None => {}
Some(Target::TargetFid(fid)) => {
Expand All @@ -301,7 +339,7 @@ impl LinkStore {
/// * `fid` - farcaster id of the user who created the link
/// * `link_body` - body of link that contains type of link created and target ID of the object
/// being reacted to
fn link_remove_key(fid: u32, link_body: &LinkBody) -> Result<Vec<u8>, HubError> {
fn link_remove_key(fid: u32, link_body: &LinkBody, padded: bool) -> Result<Vec<u8>, HubError> {
if link_body.target.is_some()
&& (link_body.r#type.is_empty() || link_body.r#type.len() == 0)
{
Expand All @@ -328,7 +366,12 @@ impl LinkStore {
// TODO: does the fid and rtype need to be padded? Is it okay not the check their lengths?
key.extend_from_slice(&make_user_key(fid));
key.push(UserPostfix::LinkRemoves.as_u8());
key.extend_from_slice(&link_body.r#type.as_bytes());
let type_bytes = &mut link_body.r#type.as_bytes().to_vec();
if padded {
// Pad with zero bytes
type_bytes.resize(Self::LINK_TYPE_BYTE_SIZE, 0);
}
key.extend_from_slice(&type_bytes);
match link_body.target {
None => {}
Some(Target::TargetFid(fid)) => {
Expand All @@ -339,6 +382,42 @@ impl LinkStore {
Ok(key)
}

pub fn make_add_key_padded(message: &Message, padded: bool) -> Result<Vec<u8>, HubError> {
message
.data
.as_ref()
.ok_or(HubError::invalid_parameter("invalid message data"))
.and_then(|data| {
data.body
.as_ref()
.ok_or(HubError::invalid_parameter("invalid message data body"))
.and_then(|body_option| match body_option {
Body::LinkBody(link_body) => {
Self::link_add_key(data.fid as u32, link_body, padded)
}
_ => Err(HubError::invalid_parameter("link body not specified")),
})
})
}

pub fn make_remove_key_padded(message: &Message, padded: bool) -> Result<Vec<u8>, HubError> {
message
.data
.as_ref()
.ok_or(HubError::invalid_parameter("invalid message data"))
.and_then(|data| {
data.body
.as_ref()
.ok_or(HubError::invalid_parameter("invalid message data body"))
.and_then(|body_option| match body_option {
Body::LinkBody(link_body) => {
Self::link_remove_key(data.fid as u32, link_body, padded)
}
_ => Err(HubError::invalid_parameter("link body not specified")),
})
})
}

/// Generates a unique key used to store a LinkAdd Message in the LinksByTargetAndType index.
/// Returns RocksDB index key of the form <RootPrefix>:<target_key>:<fid?>:<tsHash?>
///
Expand Down Expand Up @@ -670,12 +749,133 @@ impl StoreDef for LinkStore {
message: &Message,
) -> Result<(), HubError> {
let (by_target_key, _) = self.secondary_index_key(ts_hash, message)?;
if self.is_add_type(message) {
let incorrectly_padded_key = Self::make_add_key_padded(message, false)?;
txn.delete(incorrectly_padded_key);
} else if self.is_remove_type(message) {
let incorrectly_padded_key = Self::make_remove_key_padded(message, false)?;
txn.delete(incorrectly_padded_key);
};

txn.delete(by_target_key);

Ok(())
}

fn delete_remove_secondary_indices(
&self,
txn: &mut RocksDbTransactionBatch,
message: &Message,
) -> Result<(), HubError> {
if self.is_add_type(message) {
let incorrectly_padded_key = Self::make_add_key_padded(message, false)?;
txn.delete(incorrectly_padded_key);
} else if self.is_remove_type(message) {
let incorrectly_padded_key = Self::make_remove_key_padded(message, false)?;
txn.delete(incorrectly_padded_key);
};

Ok(())
}

// During the initial rust migration, we were not padding the type field to 8 bytes, so we still
// have some links that don't have the right padding. Override the default merge conflict resolution
// to check for the presence of incorrectly padded links as well
fn get_merge_conflicts(
&self,
db: &RocksDB,
message: &Message,
ts_hash: &[u8; TS_HASH_LENGTH],
) -> Result<Vec<Message>, HubError> {
// First, call the default implementation to get the default merge conflicts
let mut conflicts = Self::get_default_merge_conflicts(self, db, message, ts_hash)?;

let remove_key = Self::make_remove_key_padded(message, false)?;
let remove_ts_hash = db.get(&remove_key)?;

if remove_ts_hash.is_some() {
let remove_compare = self.message_compare(
self.remove_message_type(),
&remove_ts_hash.clone().unwrap(),
message.data.as_ref().unwrap().r#type as u8,
&ts_hash.to_vec(),
);

if remove_compare > 0 {
return Err(HubError {
code: "bad_request.conflict".to_string(),
message: "message conflicts with a more recent remove".to_string(),
});
}
if remove_compare == 0 {
return Err(HubError {
code: "bad_request.duplicate".to_string(),
message: "message has already been merged".to_string(),
});
}

// If the existing remove has a lower order than the new message, retrieve the full
// Remove message and delete it as part of the RocksDB transaction
let maybe_existing_remove = get_message(
&db,
message.data.as_ref().unwrap().fid as u32,
self.postfix(),
&utils::vec_to_u8_24(&remove_ts_hash)?,
)?;

if maybe_existing_remove.is_some() {
conflicts.push(maybe_existing_remove.unwrap());
} else {
warn!(LOGGER, "Message's ts_hash exists but message not found in store";
o!("remove_ts_hash" => format!("{:x?}", remove_ts_hash.unwrap())));
}
}

// Check if there is an add timestamp hash for this
let add_key = Self::make_add_key_padded(message, false)?;
let add_ts_hash = db.get(&add_key)?;

if add_ts_hash.is_some() {
let add_compare = self.message_compare(
self.add_message_type(),
&add_ts_hash.clone().unwrap(),
message.data.as_ref().unwrap().r#type as u8,
&ts_hash.to_vec(),
);

if add_compare > 0 {
return Err(HubError {
code: "bad_request.conflict".to_string(),
message: "message conflicts with a more recent add".to_string(),
});
}
if add_compare == 0 {
return Err(HubError {
code: "bad_request.duplicate".to_string(),
message: "message has already been merged".to_string(),
});
}

// If the existing add has a lower order than the new message, retrieve the full
// Add message and delete it as part of the RocksDB transaction
let maybe_existing_add = get_message(
&db,
message.data.as_ref().unwrap().fid as u32,
self.postfix(),
&utils::vec_to_u8_24(&add_ts_hash)?,
)?;

if maybe_existing_add.is_none() {
warn!(LOGGER, "Message's ts_hash exists but message not found in store";
o!("add_ts_hash" => format!("{:x?}", add_ts_hash.unwrap())));
} else {
conflicts.push(maybe_existing_add.unwrap());
}
}

return Ok(conflicts);
}

fn find_merge_add_conflicts(&self, _db: &RocksDB, _message: &Message) -> Result<(), HubError> {
// For links, there will be no additional conflict logic
Ok(())
Expand Down Expand Up @@ -717,37 +917,13 @@ impl StoreDef for LinkStore {
}

fn make_add_key(&self, message: &Message) -> Result<Vec<u8>, HubError> {
message
.data
.as_ref()
.ok_or(HubError::invalid_parameter("invalid message data"))
.and_then(|data| {
data.body
.as_ref()
.ok_or(HubError::invalid_parameter("invalid message data body"))
.and_then(|body_option| match body_option {
Body::LinkBody(link_body) => Self::link_add_key(data.fid as u32, link_body),
_ => Err(HubError::invalid_parameter("link body not specified")),
})
})
// Type bytes must be padded to 8 bytes, but we had a bug which allowed unpadded types,
// so this function allows access to both types of keys
return Self::make_add_key_padded(message, true);
}

fn make_remove_key(&self, message: &Message) -> Result<Vec<u8>, HubError> {
message
.data
.as_ref()
.ok_or(HubError::invalid_parameter("invalid message data"))
.and_then(|data| {
data.body
.as_ref()
.ok_or(HubError::invalid_parameter("invalid message data body"))
.and_then(|body_option| match body_option {
Body::LinkBody(link_body) => {
Self::link_remove_key(data.fid as u32, link_body)
}
_ => Err(HubError::invalid_parameter("link body not specified")),
})
})
return Self::make_remove_key_padded(message, true);
}

fn get_prune_size_limit(&self) -> u32 {
Expand Down
8 changes: 8 additions & 0 deletions apps/hubble/src/addon/src/store/reaction_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ impl StoreDef for ReactionStoreDef {
Ok(())
}

fn delete_remove_secondary_indices(
&self,
_txn: &mut RocksDbTransactionBatch,
_message: &Message,
) -> Result<(), HubError> {
Ok(())
}

fn find_merge_add_conflicts(
&self,
_db: &RocksDB,
Expand Down
Loading

0 comments on commit 8e7dec1

Please sign in to comment.