Skip to content

Commit

Permalink
refactor: Pass Block as a reference to BlockStore::put_block (#272)
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 authored Jul 31, 2024
1 parent b2a91ec commit 008a388
Show file tree
Hide file tree
Showing 16 changed files with 62 additions and 50 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- refactor: Remove sessions and redundant code. [PR 255](https://github.com/dariusc93/rust-ipfs/pull/255)
- refactor: Move from libipld to ipld-core. [PR 257](https://github.com/dariusc93/rust-ipfs/pull/257)
- chore: Use `Bytes` when creating or using `Block` within bitswap. [PR 264](https://github.com/dariusc93/rust-ipfs/pull/264)
- refactor: Reference `Block` in `BlockStore::put_block`. [PR 272](https://github.com/dariusc93/rust-ipfs/pull/272)
- feat: Passthrough timeout to WantSession::new. [PR 265](https://github.com/dariusc93/rust-ipfs/pull/265)

# 0.11.20
Expand Down
4 changes: 4 additions & 0 deletions src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ impl Block {
&self.data
}

pub fn inner_data(&self) -> &Bytes {
&self.data
}

pub fn into_inner(self) -> (Cid, Bytes) {
(self.cid, self.data)
}
Expand Down
8 changes: 4 additions & 4 deletions src/dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ impl std::future::IntoFuture for DagPut {
};
let cid = Cid::new(version, self.codec.into(), hash)?;
let block = Block::new(cid, bytes)?;
let cid = self.dag_ipld.repo.put_block(block).await?;
let cid = self.dag_ipld.repo.put_block(&block).await?;

if let Some(opt) = self.pinned {
if !self.dag_ipld.repo.is_pinned(&cid).await? {
Expand Down Expand Up @@ -1250,7 +1250,7 @@ mod tests {
let (cid, data) = blocks.next().unwrap();
assert_eq!(blocks.next(), None);

ipfs.put_block(Block::new(cid, data).unwrap())
ipfs.put_block(&Block::new(cid, data).unwrap())
.await
.unwrap();

Expand Down Expand Up @@ -1283,7 +1283,7 @@ mod tests {

let total_size = data.len();

ipfs.put_block(Block::new(cid, data).unwrap())
ipfs.put_block(&Block::new(cid, data).unwrap())
.await
.unwrap();

Expand All @@ -1301,7 +1301,7 @@ mod tests {
let node = node.unwrap();
let block = Block::new(node.cid.to_owned(), node.block.to_vec()).unwrap();

ipfs.put_block(block).await.unwrap();
ipfs.put_block(&block).await.unwrap();

cids.push(node.cid.to_owned());
}
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1128,7 +1128,7 @@ impl Ipfs {
}

/// Puts a block into the ipfs repo.
pub async fn put_block(&self, block: Block) -> Result<Cid, Error> {
pub async fn put_block(&self, block: &Block) -> Result<Cid, Error> {
self.repo.put_block(block).span(self.span.clone()).await
}

Expand Down Expand Up @@ -2752,7 +2752,7 @@ mod tests {
let cid = Cid::new_v1(BlockCodec::Raw.into(), Code::Sha2_256.digest(&data));
let block = Block::new(cid, data).unwrap();

let cid: Cid = ipfs.put_block(block.clone()).await.unwrap();
let cid: Cid = ipfs.put_block(&block).await.unwrap();
let new_block = ipfs.get_block(&cid).await.unwrap();
assert_eq!(block, new_block);
}
Expand Down
6 changes: 3 additions & 3 deletions src/p2p/bitswap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ mod test {

let cid = *block.cid();

repo.put_block(block.clone()).await?;
repo.put_block(&block).await?;

let opt = DialOpts::peer_id(peer2)
.addresses(vec![addr2.clone()])
Expand Down Expand Up @@ -759,7 +759,7 @@ mod test {

let cid = *block.cid();

repo.put_block(block.clone()).await?;
repo.put_block(&block).await?;

let opt = DialOpts::peer_id(peer2)
.addresses(vec![addr2.clone()])
Expand Down Expand Up @@ -816,7 +816,7 @@ mod test {

let cid = *block.cid();

repo.put_block(block.clone()).await?;
repo.put_block(&block).await?;

let opt = DialOpts::peer_id(peer2)
.addresses(vec![addr2.clone()])
Expand Down
2 changes: 1 addition & 1 deletion src/p2p/bitswap/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl WantSession {
tracing::warn!(session = %self.cid, %peer_id, cid = %block.cid(), name = "want_session", "state already putting block into store");
} else {
tracing::info!(%peer_id, cid = %block.cid(), name = "want_session", "storing block");
let fut = self.repo.put_block(block).into_future();
let fut = self.repo.put_block(&block).into_future();
self.state = WantSessionState::PutBlock {
from_peer_id: peer_id,
fut,
Expand Down
2 changes: 1 addition & 1 deletion src/refs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ mod tests {
let cid = Cid::try_from(*cid_str).unwrap();
let block = Block::new(cid, data.to_vec()).unwrap();
block.to_ipld().unwrap();
ipfs.put_block(block).await.unwrap();
ipfs.put_block(&block).await.unwrap();
}

ipfs
Expand Down
15 changes: 8 additions & 7 deletions src/repo/blockstore/flatfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl BlockStore for FsBlockStore {

//TODO: Allow multiple puts without holding a lock. We could probably hold a read lock instead
// and revert back to using a broadcast
async fn put(&self, block: Block) -> Result<(Cid, BlockPut), Error> {
async fn put(&self, block: &Block) -> Result<(Cid, BlockPut), Error> {
let inner = &mut *self.inner.write().await;
inner.put(block).await
}
Expand Down Expand Up @@ -138,7 +138,8 @@ impl FsBlockStoreInner {
.await?
}

async fn put(&mut self, block: Block) -> Result<(Cid, BlockPut), Error> {
async fn put(&mut self, block: &Block) -> Result<(Cid, BlockPut), Error> {
let block = block.clone();
let target_path = block_path(self.path.clone(), block.cid());
let cid = *block.cid();

Expand Down Expand Up @@ -344,7 +345,7 @@ mod tests {
panic!("block should not be found")
}

let put = store.put(block.clone()).await.unwrap();
let put = store.put(&block).await.unwrap();
assert_eq!(put.0, cid.to_owned());
let contains = store.contains(&cid);
assert!(contains.await.unwrap());
Expand Down Expand Up @@ -375,7 +376,7 @@ mod tests {
block_store.open().await.unwrap();

assert!(!block_store.contains(block.cid()).await.unwrap());
block_store.put(block.clone()).await.unwrap();
block_store.put(&block).await.unwrap();

let block_store = FsBlockStore::new(tmp.clone());
block_store.open().await.unwrap();
Expand All @@ -399,7 +400,7 @@ mod tests {
let data_slice = data.to_vec();
let cid = Cid::new_v1(BlockCodec::Raw.into(), Code::Sha2_256.digest(&data_slice));
let block = Block::new(cid, data_slice).unwrap();
block_store.put(block.clone()).await.unwrap();
block_store.put(&block).await.unwrap();
}

let cids = block_store.list().await.collect::<Vec<_>>().await;
Expand Down Expand Up @@ -449,7 +450,7 @@ mod tests {
let block = block.clone();
async move {
barrier.wait().await;
bs.put(block).await
bs.put(&block).await
}
})
})
Expand Down Expand Up @@ -490,7 +491,7 @@ mod tests {

assert_eq!(single.list().await.collect::<Vec<_>>().await.len(), 0);

single.put(block).await.unwrap();
single.put(&block).await.unwrap();

// compare the multihash since we store the block named as cidv1
assert_eq!(
Expand Down
3 changes: 2 additions & 1 deletion src/repo/blockstore/idb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ impl BlockStore for IdbBlockStore {
rx.await?
}

async fn put(&self, block: Block) -> Result<(Cid, BlockPut), Error> {
async fn put(&self, block: &Block) -> Result<(Cid, BlockPut), Error> {
let block = block.clone();
if self.contains(block.cid()).await? {
return Ok((*block.cid(), BlockPut::Existed));
}
Expand Down
23 changes: 14 additions & 9 deletions src/repo/blockstore/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use futures::StreamExt;
use ipld_core::cid::Cid;
use tokio::sync::RwLock;

use bytes::Bytes;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
Expand All @@ -26,7 +27,7 @@ impl std::fmt::Debug for MemBlockStore {
}

struct MemBlockInner {
blocks: HashMap<Cid, Block>,
blocks: HashMap<Cid, Bytes>,
}

impl MemBlockStore {
Expand Down Expand Up @@ -58,8 +59,12 @@ impl BlockStore for MemBlockStore {

async fn get(&self, cid: &Cid) -> Result<Option<Block>, Error> {
let inner = &*self.inner.read().await;
let block = inner.blocks.get(cid).cloned();
Ok(block)
let block = match inner.blocks.get(cid) {
Some(bytes) => Block::new(*cid, bytes.clone())?,
None => return Ok(None),
};

Ok(Some(block))
}

async fn size(&self, cid: &[Cid]) -> Result<Option<usize>, Error> {
Expand All @@ -69,17 +74,17 @@ impl BlockStore for MemBlockStore {
.blocks
.iter()
.filter(|(id, _)| cid.contains(id))
.map(|(_, b)| b.data().len())
.map(|(_, b)| b.len())
.sum(),
))
}

async fn total_size(&self) -> Result<usize, Error> {
let inner = &*self.inner.read().await;
Ok(inner.blocks.values().map(|b| b.data().len()).sum())
Ok(inner.blocks.values().map(|b| b.len()).sum())
}

async fn put(&self, block: Block) -> Result<(Cid, BlockPut), Error> {
async fn put(&self, block: &Block) -> Result<(Cid, BlockPut), Error> {
use std::collections::hash_map::Entry;

let inner = &mut *self.inner.write().await;
Expand All @@ -92,7 +97,7 @@ impl BlockStore for MemBlockStore {
Entry::Vacant(ve) => {
trace!("new block");
let cid = *ve.key();
ve.insert(block);
ve.insert(block.inner_data().clone());
Ok((cid, BlockPut::NewBlock))
}
}
Expand Down Expand Up @@ -154,7 +159,7 @@ mod tests {
panic!("block should not be found")
}

let put = store.put(block.clone());
let put = store.put(&block);
assert_eq!(put.await.unwrap().0, cid.to_owned());
let contains = store.contains(&cid);
assert!(contains.await.unwrap());
Expand All @@ -180,7 +185,7 @@ mod tests {
let data_slice = data.to_vec();
let cid = Cid::new_v1(BlockCodec::Raw.into(), Code::Sha2_256.digest(&data_slice));
let block = Block::new(cid, data_slice).unwrap();
mem_store.put(block.clone()).await.unwrap();
mem_store.put(&block).await.unwrap();
assert!(mem_store.contains(block.cid()).await.unwrap());
}

Expand Down
20 changes: 10 additions & 10 deletions src/repo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub trait BlockStore: Debug + Send + Sync + 'static {
/// Get a total size of the block store
async fn total_size(&self) -> Result<usize, Error>;
/// Inserts a block in the blockstore.
async fn put(&self, block: Block) -> Result<(Cid, BlockPut), Error>;
async fn put(&self, block: &Block) -> Result<(Cid, BlockPut), Error>;
/// Removes a block from the blockstore.
async fn remove(&self, cid: &Cid) -> Result<(), Error>;
/// Remove multiple blocks from the blockstore
Expand Down Expand Up @@ -451,7 +451,7 @@ impl Repo {
let mut stream = self.list_blocks().await;
while let Some(cid) = stream.next().await {
match self.get_block_now(&cid).await {
Ok(Some(block)) => match repo.inner.block_store.put(block).await {
Ok(Some(block)) => match repo.inner.block_store.put(&block).await {
Ok(_) => {}
Err(e) => error!("Error migrating {cid}: {e}"),
},
Expand Down Expand Up @@ -607,7 +607,7 @@ impl Repo {
}

/// Puts a block into the block store.
pub fn put_block(&self, block: Block) -> RepoPutBlock {
pub fn put_block<'a>(&self, block: &'a Block) -> RepoPutBlock<'a> {
RepoPutBlock::new(self, block).broadcast_on_new_block(true)
}

Expand Down Expand Up @@ -998,15 +998,15 @@ impl Repo {
}
}

pub struct RepoPutBlock {
pub struct RepoPutBlock<'a> {
repo: Repo,
block: Block,
block: &'a Block,
span: Option<Span>,
broadcast_on_new_block: bool,
}

impl RepoPutBlock {
fn new(repo: &Repo, block: Block) -> Self {
impl<'a> RepoPutBlock<'a> {
fn new(repo: &Repo, block: &'a Block) -> Self {
Self {
repo: repo.clone(),
block,
Expand All @@ -1026,16 +1026,16 @@ impl RepoPutBlock {
}
}

impl IntoFuture for RepoPutBlock {
impl<'a> IntoFuture for RepoPutBlock<'a> {
type IntoFuture = BoxFuture<'static, Self::Output>;
type Output = Result<Cid, Error>;
fn into_future(self) -> Self::IntoFuture {
let block = self.block;
let block = self.block.clone();
let span = self.span.unwrap_or(Span::current());
let span = debug_span!(parent: &span, "put_block", cid = %block.cid());
async move {
let _guard = self.repo.inner.gclock.read().await;
let (cid, res) = self.repo.inner.block_store.put(block.clone()).await?;
let (cid, res) = self.repo.inner.block_store.put(&block).await?;

if let BlockPut::NewBlock = res {
if self.broadcast_on_new_block {
Expand Down
6 changes: 3 additions & 3 deletions src/unixfs/add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl Stream for UnixfsAdd {
return;
}
};
let _cid = match repo.put_block(block).await {
let _cid = match repo.put_block(&block).await {
Ok(cid) => cid,
Err(e) => {
yield UnixfsStatus::FailedStatus { written, total_size, error: Some(e) };
Expand All @@ -209,7 +209,7 @@ impl Stream for UnixfsAdd {
return;
}
};
let _cid = match repo.put_block(block).await {
let _cid = match repo.put_block(&block).await {
Ok(cid) => cid,
Err(e) => {
yield UnixfsStatus::FailedStatus { written, total_size, error: Some(e) };
Expand Down Expand Up @@ -249,7 +249,7 @@ impl Stream for UnixfsAdd {
let cid = node.cid.to_owned();
let block = Block::new(cid, node.block.to_vec())?;

repo.put_block(block).await?;
repo.put_block(&block).await?;

cids.push(cid);
}
Expand Down
6 changes: 3 additions & 3 deletions tests/bitswap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ async fn bitswap_stress_test() {

let nodes = spawn_nodes::<5>(Topology::Mesh).await;

let block = Block::new(cid, data.clone()).unwrap();

for (i, node) in nodes.iter().enumerate() {
if filter(i) {
node.put_block(Block::new(cid, data.clone()).unwrap())
.await
.unwrap();
node.put_block(&block).await.unwrap();
}
}

Expand Down
Loading

0 comments on commit 008a388

Please sign in to comment.