Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
olehnikolaiev committed Sep 30, 2024
1 parent 0b67a7f commit e426ae7
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 47 deletions.
39 changes: 19 additions & 20 deletions libbatched-io/BatchedRotatingHistoricDbIO.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,45 +11,44 @@ BatchedRotatingHistoricDbIO::BatchedRotatingHistoricDbIO( const boost::filesyste
// initialize timestamps
if ( boost::filesystem::exists( basePath ) )
for ( const auto& f : boost::filesystem::directory_iterator( basePath ) ) {
piecesByTimestamp.push_back( std::stoull( boost::filesystem::basename( f ) ) );
piecesByTimestamp[std::stoull( boost::filesystem::basename( f ) )] =
std::unique_ptr< dev::db::DatabaseFace >(
new LevelDB( basePath / boost::filesystem::basename( f ) ) );
}
if ( piecesByTimestamp.empty() )
piecesByTimestamp.push_back( 0 );
std::sort( piecesByTimestamp.begin(), piecesByTimestamp.end() );

// open current
current.reset( new LevelDB( basePath / std::to_string( piecesByTimestamp.back() ) ) );
piecesByTimestamp[0] =
std::unique_ptr< dev::db::DatabaseFace >( new LevelDB( basePath / "0" ) );

test_crash_before_commit( "after_recover" );
}

void BatchedRotatingHistoricDbIO::rotate( uint64_t timestamp ) {
piecesByTimestamp.push_back( timestamp );
auto storageUsed = currentPiece()->lookup( dev::db::Slice( "storageUsed" ) );
currentPiece()->kill( dev::db::Slice( "storageUsed" ) );

auto storageUsed = current->lookup( dev::db::Slice( "storageUsed" ) );
current->kill( dev::db::Slice( "storageUsed" ) );
piecesByTimestamp[timestamp] = std::unique_ptr< dev::db::DatabaseFace >(
new LevelDB( basePath / std::to_string( timestamp ) ) );

current.reset( new LevelDB( basePath / std::to_string( timestamp ) ) );
current->insert( dev::db::Slice( "storageUsed" ), storageUsed );
piecesByTimestamp[timestamp]->insert( dev::db::Slice( "storageUsed" ), storageUsed );

test_crash_before_commit( "after_open_leveldb" );
}

void BatchedRotatingHistoricDbIO::recover() {}

dev::db::DatabaseFace* BatchedRotatingHistoricDbIO::getPieceByTimestamp( uint64_t timestamp ) {
if ( timestamp < piecesByTimestamp.front() )
return nullptr;
// dev::db::DatabaseFace* BatchedRotatingHistoricDbIO::getPieceByTimestamp( uint64_t timestamp ) {
// if ( timestamp < piecesByTimestamp.front() )
// return nullptr;

if ( timestamp >= piecesByTimestamp.back() )
return currentPiece();
// if ( timestamp >= piecesByTimestamp.back() )
// return currentPiece();

auto it = std::upper_bound( piecesByTimestamp.begin(), piecesByTimestamp.end(), timestamp );
// auto it = std::upper_bound( piecesByTimestamp.begin(), piecesByTimestamp.end(), timestamp );

dev::db::DatabaseFace* ret = new LevelDB( basePath / std::to_string( *( --it ) ) );
// dev::db::DatabaseFace* ret = new LevelDB( basePath / std::to_string( *( --it ) ) );

return ret;
}
// return ret;
//}

BatchedRotatingHistoricDbIO::~BatchedRotatingHistoricDbIO() {}

Expand Down
13 changes: 8 additions & 5 deletions libbatched-io/BatchedRotatingHistoricDbIO.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@ class BatchedRotatingHistoricDbIO : public batched_face {
private:
const boost::filesystem::path basePath;

std::vector< uint64_t > piecesByTimestamp;
std::unique_ptr< dev::db::DatabaseFace > current;
std::map< uint64_t, std::unique_ptr< dev::db::DatabaseFace > > piecesByTimestamp;

public:
using constIterator =
std::map< uint64_t, std::unique_ptr< dev::db::DatabaseFace > >::const_iterator;

BatchedRotatingHistoricDbIO( const boost::filesystem::path& _path );
dev::db::DatabaseFace* currentPiece() const { return current.get(); }
dev::db::DatabaseFace* getPieceByTimestamp( uint64_t timestamp );
std::vector< uint64_t > getPiecesByTimestamp() const { return piecesByTimestamp; }
dev::db::DatabaseFace* currentPiece() const { return piecesByTimestamp.rbegin()->second.get(); }
// dev::db::DatabaseFace* getPieceByTimestamp( uint64_t timestamp );
constIterator begin() const { return piecesByTimestamp.begin(); }
constIterator end() const { return piecesByTimestamp.end(); }
void rotate( uint64_t timestamp );
virtual void revert() { /* no need - as all write is in rotate() */
}
Expand Down
30 changes: 12 additions & 18 deletions libdevcore/RotatingHistoricState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ std::string RotatingHistoricState::lookup( Slice _key ) const {
if ( _key.toString() == std::string( "storageUsed" ) )
return currentPiece()->lookup( _key );

for ( auto timestamp : getPiecesByTimestamp() ) {
auto db = ioBackend->getPieceByTimestamp( timestamp );
auto v = db->lookup( _key );
for ( const auto& item : *ioBackend ) {
auto v = item.second->lookup( _key );
if ( !v.empty() )
return v;
}
Expand All @@ -45,9 +44,8 @@ std::string RotatingHistoricState::lookup( Slice _key ) const {
bool RotatingHistoricState::exists( Slice _key ) const {
std::shared_lock< std::shared_mutex > lock( m_mutex );

for ( auto timestamp : getPiecesByTimestamp() ) {
auto db = ioBackend->getPieceByTimestamp( timestamp );
if ( db->exists( _key ) )
for ( const auto& item : *ioBackend ) {
if ( item.second->exists( _key ) )
return true;
}

Expand All @@ -62,9 +60,8 @@ void RotatingHistoricState::insert( Slice _key, Slice _value ) {
void RotatingHistoricState::kill( Slice _key ) {
std::shared_lock< std::shared_mutex > lock( m_mutex );

for ( auto timestamp : getPiecesByTimestamp() ) {
auto db = ioBackend->getPieceByTimestamp( timestamp );
db->kill( _key );
for ( const auto& item : *ioBackend ) {
item.second->kill( _key );
}
}

Expand All @@ -84,19 +81,17 @@ void RotatingHistoricState::commit( std::unique_ptr< WriteBatchFace > _batch ) {
void RotatingHistoricState::forEach( std::function< bool( Slice, Slice ) > f ) const {
std::shared_lock< std::shared_mutex > lock( m_mutex );

for ( auto timestamp : getPiecesByTimestamp() ) {
auto db = ioBackend->getPieceByTimestamp( timestamp );
db->forEach( f );
for ( const auto& item : *ioBackend ) {
item.second->forEach( f );
}
}

void RotatingHistoricState::forEachWithPrefix(
std::string& _prefix, std::function< bool( Slice, Slice ) > f ) const {
std::shared_lock< std::shared_mutex > lock( m_mutex );

for ( auto timestamp : getPiecesByTimestamp() ) {
auto db = ioBackend->getPieceByTimestamp( timestamp );
db->forEachWithPrefix( _prefix, f );
for ( const auto& item : *ioBackend ) {
item.second->forEachWithPrefix( _prefix, f );
}
}

Expand All @@ -105,9 +100,8 @@ h256 RotatingHistoricState::hashBase() const {
secp256k1_sha256_t ctx;
secp256k1_sha256_initialize( &ctx );

for ( auto timestamp : getPiecesByTimestamp() ) {
auto db = ioBackend->getPieceByTimestamp( timestamp );
h256 h = db->hashBase();
for ( const auto& item : *ioBackend ) {
h256 h = item.second->hashBase();
secp256k1_sha256_write( &ctx, h.data(), h.size );
}

Expand Down
4 changes: 0 additions & 4 deletions libdevcore/RotatingHistoricState.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ class RotatingHistoricState : public DatabaseFace {
mutable std::set< WriteBatchFace* > batch_cache;
mutable std::shared_mutex m_mutex;

std::vector< uint64_t > getPiecesByTimestamp() const {
return ioBackend->getPiecesByTimestamp();
}

public:
RotatingHistoricState( std::shared_ptr< batched_io::BatchedRotatingHistoricDbIO > ioBackend );
void rotate( uint64_t timestamp );
Expand Down

0 comments on commit e426ae7

Please sign in to comment.