Skip to content

Commit

Permalink
[C++] Fix epoch scan range (#484)
Browse files Browse the repository at this point in the history
* [C++] Fix epoch scan range

* updates

* fix testcase
  • Loading branch information
badrishc authored May 29, 2021
1 parent 1f8b5b0 commit 32b3f6e
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 13 deletions.
21 changes: 8 additions & 13 deletions cc/src/core/light_epoch.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,6 @@ class LightEpoch {
static constexpr uint32_t kDrainListSize = 256;
/// Epoch table
Entry* table_;
/// Number of entries in epoch table.
uint32_t num_entries_;

/// List of action, epoch pairs containing actions to performed when an epoch becomes
/// safe to reclaim.
Expand All @@ -137,25 +135,23 @@ class LightEpoch {
/// Cached value of epoch that is safe to reclaim
std::atomic<uint64_t> safe_to_reclaim_epoch;

LightEpoch(uint32_t size = kTableSize)
LightEpoch()
: table_{ nullptr }
, num_entries_{ 0 }
, drain_count_{ 0 }
, drain_list_{} {
Initialize(size);
Initialize();
}

~LightEpoch() {
Uninitialize();
}

private:
void Initialize(uint32_t size) {
num_entries_ = size;
void Initialize() {
// do cache-line alignment
table_ = reinterpret_cast<Entry*>(aligned_alloc(Constants::kCacheLineBytes,
(size + 2) * sizeof(Entry)));
new(table_) Entry[size + 2];
(kTableSize + 2) * sizeof(Entry)));
new(table_) Entry[kTableSize + 2];
current_epoch = 1;
safe_to_reclaim_epoch = 0;
for(uint32_t idx = 0; idx < kDrainListSize; ++idx) {
Expand All @@ -167,7 +163,6 @@ class LightEpoch {
void Uninitialize() {
aligned_free(table_);
table_ = nullptr;
num_entries_ = 0;
current_epoch = 1;
safe_to_reclaim_epoch = 0;
}
Expand Down Expand Up @@ -272,7 +267,7 @@ class LightEpoch {
/// Compute latest epoch that is safe to reclaim, by scanning the epoch table
uint64_t ComputeNewSafeToReclaimEpoch(uint64_t current_epoch_) {
uint64_t oldest_ongoing_call = current_epoch_;
for(uint32_t index = 1; index <= num_entries_; ++index) {
for(uint32_t index = 0; index < kTableSize; ++index) {
uint64_t entry_epoch = table_[index].local_current_epoch;
if(entry_epoch != kUnprotected && entry_epoch < oldest_ongoing_call) {
oldest_ongoing_call = entry_epoch;
Expand All @@ -294,7 +289,7 @@ class LightEpoch {

/// CPR checkpoint functions.
inline void ResetPhaseFinished() {
for(uint32_t idx = 1; idx <= num_entries_; ++idx) {
for(uint32_t idx = 0; idx < kTableSize; ++idx) {
assert(table_[idx].phase_finished.load() == Phase::REST ||
table_[idx].phase_finished.load() == Phase::INDEX_CHKPT ||
table_[idx].phase_finished.load() == Phase::PERSISTENCE_CALLBACK ||
Expand All @@ -308,7 +303,7 @@ class LightEpoch {
uint32_t entry = Thread::id();
table_[entry].phase_finished = phase;
// Check if other threads have reported complete.
for(uint32_t idx = 1; idx <= num_entries_; ++idx) {
for(uint32_t idx = 0; idx < kTableSize; ++idx) {
Phase entry_phase = table_[idx].phase_finished.load();
uint64_t entry_epoch = table_[idx].local_current_epoch;
if(entry_epoch != 0 && entry_phase != phase) {
Expand Down
6 changes: 6 additions & 0 deletions cc/test/paging_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,9 @@ TEST(CLASS, UpsertRead_Concurrent) {
ASSERT_TRUE(result);
ASSERT_EQ(kNumRecords, records_read.load());

// Stop session as we are going to wait for threads
store.StopSession();

//// Update.
num_writes = 0;
threads.clear();
Expand All @@ -550,6 +553,9 @@ TEST(CLASS, UpsertRead_Concurrent) {

ASSERT_EQ(kNumRecords, num_writes.load());

// Restart session
store.StartSession();

// Delete some old copies of records (160 MB) that we no longer need.
static constexpr uint64_t kNewBeginAddress{ 167772160L };
static std::atomic<bool> truncated{ false };
Expand Down

0 comments on commit 32b3f6e

Please sign in to comment.