Skip to content

Commit

Permalink
Make shared control block agnostic to 32-bit/64-bit.
Browse files Browse the repository at this point in the history
We need to support cross-architecture host/child process.
Use Futex on Linux which is a fixed 32-bit lock mechanism, unlike
pthreads.
  • Loading branch information
HansKristian-Work committed May 15, 2019
1 parent 3b058d5 commit a229ffd
Show file tree
Hide file tree
Showing 10 changed files with 240 additions and 59 deletions.
6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,18 @@ if (WIN32)
target_sources(fossilize PRIVATE fossilize_external_replayer_windows.hpp)
else()
target_sources(fossilize PRIVATE fossilize_external_replayer_linux.hpp)
if (APPLE)
target_sources(fossilize PRIVATE platform/gcc_clang_spinlock.hpp)
else()
target_sources(fossilize PRIVATE platform/futex_wrapper_linux.hpp)
endif()
endif()

target_include_directories(fossilize PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
target_compile_options(fossilize PRIVATE ${FOSSILIZE_CXX_FLAGS})
if (NOT WIN32)
target_link_libraries(fossilize -pthread)
target_compile_options(fossilize PUBLIC -pthread)
if (NOT APPLE)
if (ANDROID)
target_link_libraries(fossilize android log)
Expand Down
13 changes: 7 additions & 6 deletions cli/fossilize_replay_linux.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <limits.h>
#include <errno.h>
#include "fossilize_external_replayer.hpp"
#include "platform/futex_wrapper_linux.hpp"

static bool write_all(int fd, const char *str)
{
Expand Down Expand Up @@ -133,9 +134,9 @@ void ProcessProgress::parse(const char *cmd)
char buffer[ControlBlockMessageSize] = {};
strcpy(buffer, cmd);

pthread_mutex_lock(&Global::control_block->lock);
futex_wrapper_lock(&Global::control_block->futex_lock);
shared_control_block_write(Global::control_block, buffer, sizeof(buffer));
pthread_mutex_unlock(&Global::control_block->lock);
futex_wrapper_unlock(&Global::control_block->futex_lock);
}
}
else
Expand Down Expand Up @@ -405,7 +406,7 @@ static int run_master_process(const VulkanDevice::Options &opts,
}

if (Global::control_block)
Global::control_block->progress_started.store(true, std::memory_order_release);
Global::control_block->progress_started.store(1, std::memory_order_release);

Global::active_processes = 0;

Expand Down Expand Up @@ -570,7 +571,7 @@ static int run_master_process(const VulkanDevice::Options &opts,
}

if (Global::control_block)
Global::control_block->progress_complete.store(true, std::memory_order_release);
Global::control_block->progress_complete.store(1, std::memory_order_release);

return EXIT_SUCCESS;
}
Expand Down Expand Up @@ -723,11 +724,11 @@ static int run_slave_process(const VulkanDevice::Options &opts,
#if 0
if (Global::control_block)
{
pthread_mutex_lock(&Global::control_block->lock);
futex_wrapper_lock(&Global::control_block->futex_lock);
char msg[ControlBlockMessageSize] = {};
sprintf(msg, "SLAVE_FINISHED\n");
shared_control_block_write(Global::control_block, msg, sizeof(msg));
pthread_mutex_unlock(&Global::control_block->lock);
futex_wrapper_unlock(&Global::control_block->futex_lock);
}
#endif

Expand Down
6 changes: 5 additions & 1 deletion cli/fossilize_replay_windows.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ static int run_master_process(const VulkanDevice::Options &opts,
}

if (Global::control_block)
Global::control_block->progress_started.store(true, std::memory_order_release);
Global::control_block->progress_started.store(1, std::memory_order_release);

Global::active_processes = 0;
vector<ProcessProgress> child_processes(processes);
Expand Down Expand Up @@ -700,6 +700,10 @@ static int run_master_process(const VulkanDevice::Options &opts,

if (Global::job_handle)
CloseHandle(Global::job_handle);

if (Global::control_block)
Global::control_block->progress_complete.store(1, std::memory_order_release);

return EXIT_SUCCESS;
}

Expand Down
57 changes: 24 additions & 33 deletions fossilize_external_replayer_control_block.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,6 @@
#pragma once

#include <string.h>
#ifdef _WIN32
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#else
#include <pthread.h>
#endif

#include <atomic>
static_assert(sizeof(std::atomic<uint32_t>) == sizeof(uint32_t), "Atomic size mismatch. This type likely requires a lock to work.");

Expand All @@ -39,17 +32,16 @@ static_assert(sizeof(std::atomic<uint32_t>) == sizeof(uint32_t), "Atomic size mi
namespace Fossilize
{
enum { ControlBlockMessageSize = 32 };
enum { ControlBlockMagic = 0x19bcde14 };
enum { ControlBlockMagic = 0x19bcde15 };

struct SharedControlBlock
{
uint32_t version_cookie;

#ifndef _WIN32
pthread_mutex_t lock;
#endif
// Used to implement a lock (or spinlock).
int futex_lock;

// Progress. Just need atomics to implements this.
// Progress. Just need atomics to implement this.
std::atomic<uint32_t> successful_modules;
std::atomic<uint32_t> successful_graphics;
std::atomic<uint32_t> successful_compute;
Expand All @@ -64,30 +56,29 @@ struct SharedControlBlock
std::atomic<uint32_t> total_modules;
std::atomic<uint32_t> banned_modules;
std::atomic<uint32_t> module_validation_failures;
std::atomic<bool> progress_started;
std::atomic<bool> progress_complete;
std::atomic<uint32_t> progress_started;
std::atomic<uint32_t> progress_complete;

// Ring buffer. Needs lock.
uint64_t write_count;
uint64_t read_count;

size_t read_offset;
size_t write_offset;
size_t ring_buffer_offset;
size_t ring_buffer_size;
uint32_t write_count;
uint32_t read_count;
uint32_t read_offset;
uint32_t write_offset;
uint32_t ring_buffer_offset;
uint32_t ring_buffer_size;
};

// These are not thread-safe. Need to lock them by external means.
static inline size_t shared_control_block_read_avail(SharedControlBlock *control_block)
static inline uint32_t shared_control_block_read_avail(SharedControlBlock *control_block)
{
size_t ret = control_block->write_count - control_block->read_count;
uint32_t ret = control_block->write_count - control_block->read_count;
return ret;
}

static inline size_t shared_control_block_write_avail(SharedControlBlock *control_block)
static inline uint32_t shared_control_block_write_avail(SharedControlBlock *control_block)
{
size_t ret = 0;
size_t max_capacity_write_count = control_block->read_count + control_block->ring_buffer_size;
uint32_t ret = 0;
uint32_t max_capacity_write_count = control_block->read_count + control_block->ring_buffer_size;
if (control_block->write_count >= max_capacity_write_count)
ret = 0;
else
Expand All @@ -96,7 +87,7 @@ static inline size_t shared_control_block_write_avail(SharedControlBlock *contro
}

static inline bool shared_control_block_read(SharedControlBlock *control_block,
void *data_, size_t size)
void *data_, uint32_t size)
{
auto *data = static_cast<uint8_t *>(data_);
const uint8_t *ring = reinterpret_cast<const uint8_t *>(control_block) + control_block->ring_buffer_offset;
Expand All @@ -107,8 +98,8 @@ static inline bool shared_control_block_read(SharedControlBlock *control_block,
if (size > (control_block->write_count - control_block->read_count))
return false;

size_t read_first = control_block->ring_buffer_size - control_block->read_offset;
size_t read_second = 0;
uint32_t read_first = control_block->ring_buffer_size - control_block->read_offset;
uint32_t read_second = 0;
if (read_first > size)
read_first = size;
read_second = size - read_first;
Expand All @@ -123,20 +114,20 @@ static inline bool shared_control_block_read(SharedControlBlock *control_block,
}

static inline bool shared_control_block_write(SharedControlBlock *control_block,
const void *data_, size_t size)
const void *data_, uint32_t size)
{
auto *data = static_cast<const uint8_t *>(data_);
uint8_t *ring = reinterpret_cast<uint8_t *>(control_block) + control_block->ring_buffer_offset;

if (size > control_block->ring_buffer_size)
return false;

size_t max_capacity_write_count = control_block->read_count + control_block->ring_buffer_size;
uint32_t max_capacity_write_count = control_block->read_count + control_block->ring_buffer_size;
if (control_block->write_count + size > max_capacity_write_count)
return false;

size_t write_first = control_block->ring_buffer_size - control_block->write_offset;
size_t write_second = 0;
uint32_t write_first = control_block->ring_buffer_size - control_block->write_offset;
uint32_t write_second = 0;
if (write_first > size)
write_first = size;
write_second = size - write_first;
Expand Down
27 changes: 10 additions & 17 deletions fossilize_external_replayer_linux.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,17 @@
#include <vector>
#include <unordered_set>
#include <string>
#include <pthread.h>
#include <signal.h>
#include <limits.h>
#include "path.hpp"
#include "fossilize_external_replayer_control_block.hpp"

#ifdef __linux__
#include "platform/futex_wrapper_linux.hpp"
#else
#include "platform/gcc_clang_spinlock.hpp"
#endif

namespace Fossilize
{
static std::atomic<int32_t> shm_index;
Expand Down Expand Up @@ -74,10 +79,7 @@ ExternalReplayer::Impl::~Impl()
close(fd);

if (shm_block)
{
pthread_mutex_destroy(&shm_block->lock);
munmap(shm_block, shm_block_size);
}
}

uintptr_t ExternalReplayer::Impl::get_process_handle() const
Expand All @@ -90,9 +92,9 @@ ExternalReplayer::PollResult ExternalReplayer::Impl::poll_progress(ExternalRepla
if (pid < 0)
return ExternalReplayer::PollResult::Error;

bool complete = shm_block->progress_complete.load(std::memory_order_acquire);
bool complete = shm_block->progress_complete.load(std::memory_order_acquire) != 0;

if (!shm_block->progress_started.load(std::memory_order_acquire))
if (shm_block->progress_started.load(std::memory_order_acquire) == 0)
return ExternalReplayer::PollResult::ResultNotReady;

progress.compute.total = shm_block->total_compute.load(std::memory_order_relaxed);
Expand All @@ -110,15 +112,15 @@ ExternalReplayer::PollResult ExternalReplayer::Impl::poll_progress(ExternalRepla
progress.clean_crashes = shm_block->clean_process_deaths.load(std::memory_order_relaxed);
progress.dirty_crashes = shm_block->dirty_process_deaths.load(std::memory_order_relaxed);

pthread_mutex_lock(&shm_block->lock);
futex_wrapper_lock(&shm_block->futex_lock);
size_t read_avail = shared_control_block_read_avail(shm_block);
for (size_t i = ControlBlockMessageSize; i <= read_avail; i += ControlBlockMessageSize)
{
char buf[ControlBlockMessageSize] = {};
shared_control_block_read(shm_block, buf, sizeof(buf));
parse_message(buf);
}
pthread_mutex_unlock(&shm_block->lock);
futex_wrapper_unlock(&shm_block->futex_lock);
return complete ? ExternalReplayer::PollResult::Complete : ExternalReplayer::PollResult::Running;
}

Expand Down Expand Up @@ -241,15 +243,6 @@ bool ExternalReplayer::Impl::start(const ExternalReplayer::Options &options)
shm_block->ring_buffer_size = 64 * 1024;
shm_block->ring_buffer_offset = 4 * 1024;

pthread_mutexattr_t attr;
if (pthread_mutexattr_init(&attr) < 0)
return false;
if (pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED) < 0)
return false;
if (pthread_mutex_init(&shm_block->lock, &attr) < 0)
return false;
pthread_mutexattr_destroy(&attr);

// We need to let our child inherit the shared FD.
int current_flags = fcntl(fd, F_GETFD);
if (current_flags < 0)
Expand Down
4 changes: 2 additions & 2 deletions fossilize_external_replayer_windows.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ ExternalReplayer::PollResult ExternalReplayer::Impl::poll_progress(ExternalRepla
if (!process)
return ExternalReplayer::PollResult::Error;

bool complete = shm_block->progress_complete.load(std::memory_order_acquire);
bool complete = shm_block->progress_complete.load(std::memory_order_acquire) != 0;

if (!shm_block->progress_started.load(std::memory_order_acquire))
if (shm_block->progress_started.load(std::memory_order_acquire) == 0)
return ExternalReplayer::PollResult::ResultNotReady;

progress.compute.total = shm_block->total_compute.load(std::memory_order_relaxed);
Expand Down
74 changes: 74 additions & 0 deletions platform/futex_wrapper_linux.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/* Copyright (c) 2019 Hans-Kristian Arntzen
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

#pragma once

#include <unistd.h>
#include <sys/syscall.h>
#include <linux/futex.h>

// Implementation based on https://eli.thegreenplace.net/2018/basics-of-futexes/ and
// "Futexes are Tricky" by Ulrich Drepper.
// Kind of overkill, but we need a mutex which can work cross-process and cross-architecture via shared memory (32-bit and 64-bit).
// Alternative is full kernel semaphores or raw spinlocks.

namespace Fossilize
{
static inline int cmpxchg(int *value, int expected_value, int new_value)
{
int ret = __sync_val_compare_and_swap(value, expected_value, new_value);
return ret;
}

static inline void futex_wrapper_lock(int *lock)
{
int c = cmpxchg(lock, 0, 1);
if (c != 0)
{
// Contention.
do
{
// Need to lock. Force *lock to be 2.
if (c == 2 || cmpxchg(lock, 1, 2) != 0)
{
// If *lock is 2 (was not unlocked somehow by other thread),
// wait until it's woken up.
syscall(SYS_futex, lock, FUTEX_WAIT, 2, 0, 0, 0);
}
} while ((c = cmpxchg(lock, 0, 2)) != 0);
}
}

static inline void futex_wrapper_unlock(int *lock)
{
int c = __sync_sub_and_fetch(lock, 1);
if (c == 1)
{
// We have some waiters to wake up.

// Atomic store, really, but there's no __sync variant for that.
__sync_fetch_and_and(lock, 0);

syscall(SYS_futex, lock, FUTEX_WAKE, 1, 0, 0, 0);
}
}
}
Loading

0 comments on commit a229ffd

Please sign in to comment.