Skip to content

Commit

Permalink
Add cross-platform code for semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
ben committed Aug 11, 2024
1 parent e648ca6 commit 161b5b9
Show file tree
Hide file tree
Showing 6 changed files with 272 additions and 23 deletions.
116 changes: 104 additions & 12 deletions sh_mem/ProcCommunicator.cpp
Original file line number Diff line number Diff line change
@@ -1,27 +1,26 @@
#include "ProcCommunicator.h"
#include <iostream>
#ifndef _WIN32
#include <unistd.h>
#include <fcntl.h>
#endif

static constexpr int SEMAPHORE_DISABLED = 0;
static constexpr int SEMAPHORE_ENABLED = 1;

ProcCommunicator::ProcCommunicator(const bool isMasterMode,
const bool isMultipleMasters,
const std::string &shMemName) : m_master_mode(isMasterMode),
m_multiple_master(isMultipleMasters),
m_master_received((sem_t *)-1),
m_slave_received((sem_t *)-1),
m_master_sent((sem_t *)-1),
m_slave_sent((sem_t *)-1)
m_multiple_master(isMultipleMasters)
{
const std::string master_mem_name = shMemName + "_master";
const std::string slave_mem_name = shMemName + "_slave";

m_sender = std::make_unique<SharedMemorySender>(master_mem_name.c_str());
m_receiver = std::make_unique<SharedMemoryReceiver>(slave_mem_name.c_str());
#ifndef _WIN32
if (isMasterMode)
{
m_sender = std::make_unique<SharedMemorySender>(master_mem_name.c_str());
m_receiver = std::make_unique<SharedMemoryReceiver>(slave_mem_name.c_str());

m_master_received = sem_open((shMemName + "_m_rsem").c_str(), O_CREAT, 0666, SEMAPHORE_DISABLED);
m_slave_received = sem_open((shMemName + "_s_rsem").c_str(), O_CREAT, 0666, SEMAPHORE_DISABLED);
m_master_sent = sem_open((shMemName + "_m_sent").c_str(), O_CREAT, 0666, SEMAPHORE_DISABLED);
Expand All @@ -31,16 +30,13 @@ ProcCommunicator::ProcCommunicator(const bool isMasterMode,
m_slave_ready = sem_open((shMemName + "_s_ready").c_str(), O_CREAT, 0666, SEMAPHORE_ENABLED);

if (m_master_received == SEM_FAILED || m_slave_received == SEM_FAILED ||
m_master_sent == SEM_FAILED || m_slave_sent == SEM_FAILED || m_slave_ready == SEM_FAILED)
m_master_sent == SEM_FAILED || m_slave_sent == SEM_FAILED || (isMultipleMasters && m_slave_ready == SEM_FAILED))
{
perror("ProcCommunicator sem_open failure.");
}
}
else
{
m_sender = std::make_unique<SharedMemorySender>(slave_mem_name.c_str());
m_receiver = std::make_unique<SharedMemoryReceiver>(master_mem_name.c_str());

while (m_master_received == SEM_FAILED || m_slave_received == SEM_FAILED ||
m_master_sent == SEM_FAILED || m_slave_sent == SEM_FAILED || m_slave_ready == SEM_FAILED)
{
Expand All @@ -53,9 +49,42 @@ ProcCommunicator::ProcCommunicator(const bool isMasterMode,
m_slave_ready = sem_open((shMemName + "_s_ready").c_str(), O_RDWR, 0666, SEMAPHORE_ENABLED);
}
}
#else
m_sender = std::make_unique<SharedMemorySender>(slave_mem_name.c_str());
m_receiver = std::make_unique<SharedMemoryReceiver>(master_mem_name.c_str());

std::wstring wshMemName(shMemName.begin(), shMemName.end());

if (!(m_master_received = CreateSemaphoreW(NULL, SEMAPHORE_DISABLED, MAXLONG, (wshMemName + L"_m_rsem").c_str())))
m_master_received = OpenSemaphoreW(SYNCHRONIZE | SEMAPHORE_MODIFY_STATE, 0, (wshMemName + L"_m_rsem").c_str());

if (!(m_slave_received = CreateSemaphoreW(NULL, SEMAPHORE_DISABLED, MAXLONG, (wshMemName + L"_s_rsem").c_str())))
m_slave_received = OpenSemaphoreW(SYNCHRONIZE | SEMAPHORE_MODIFY_STATE, 0, (wshMemName + L"_s_rsem").c_str());

if (!(m_master_sent = CreateSemaphoreW(NULL, SEMAPHORE_DISABLED, MAXLONG, (wshMemName + L"_m_sent").c_str())))
m_master_sent = OpenSemaphoreW(SYNCHRONIZE | SEMAPHORE_MODIFY_STATE, 0, (wshMemName + L"_m_sent").c_str());

if (!(m_slave_sent = CreateSemaphoreW(NULL, SEMAPHORE_DISABLED, MAXLONG, (wshMemName + L"_s_ready").c_str())))
m_slave_sent = OpenSemaphoreW(SYNCHRONIZE | SEMAPHORE_MODIFY_STATE, 0, (wshMemName + L"_s_ready").c_str());

if (isMultipleMasters)
{
if (!(m_slave_ready = CreateSemaphoreW(NULL, SEMAPHORE_DISABLED, MAXLONG, (wshMemName + L"_s_ready").c_str())))
m_slave_ready = OpenSemaphoreW(SYNCHRONIZE | SEMAPHORE_MODIFY_STATE, 0, (wshMemName + L"_s_ready").c_str());
}

if (m_master_received == NULL || m_slave_received == NULL ||
m_master_sent == NULL || m_slave_sent == NULL || m_slave_ready == NULL || (isMultipleMasters && m_slave_ready == NULL))
{
perror("ProcCommunicator sem_open failure.");
}

#endif
}

ProcCommunicator::~ProcCommunicator()
{
#ifndef _WIN32
if (sem_close(m_master_received) == -1)
{
perror("Failed to destroy m_master_received semaphore");
Expand All @@ -76,10 +105,33 @@ ProcCommunicator::~ProcCommunicator()
{
perror("Failed to destroy m_slave_ready semaphore");
}
#else
if (m_master_received && CloseHandle(m_master_received))
{
perror("Failed to destroy m_master_received semaphore");
}
if (m_slave_received && CloseHandle(m_slave_received))
{
perror("Failed to destroy m_slave_received semaphore");
}
if (m_master_sent && CloseHandle(m_master_sent))
{
perror("Failed to destroy m_master_sent semaphore");
}
if (m_slave_sent && CloseHandle(m_slave_sent))
{
perror("Failed to destroy m_slave_sent semaphore");
}
if (m_multiple_master && m_slave_ready && CloseHandle(m_slave_ready))
{
perror("Failed to destroy m_slave_ready semaphore");
}
#endif
}

void ProcCommunicator::send(const Message *msg)
{
#ifndef _WIN32
if (m_multiple_master && m_master_mode)
sem_wait(m_slave_ready);

Expand All @@ -89,15 +141,55 @@ void ProcCommunicator::send(const Message *msg)

if (m_multiple_master && !m_master_mode)
sem_post(m_slave_ready);
#else
// Wait for the semaphore to be signaled
if (m_multiple_master && m_master_mode) {
WaitForSingleObject(m_slave_ready, INFINITE); // INFINITE timeout to wait indefinitely
}

// Send the message
m_sender->sendMessage(msg);

// Post (release) the semaphore
ReleaseSemaphore(m_master_mode ? m_master_sent : m_slave_sent, 1, NULL);

// Wait for the semaphore to be signaled
WaitForSingleObject(m_master_mode ? m_slave_received : m_master_received, INFINITE);

// Post (release) the semaphore if needed
if (m_multiple_master && !m_master_mode) {
ReleaseSemaphore(m_slave_ready, 1, NULL);
}
#endif
}

Message *ProcCommunicator::receive()
{
#ifndef _WIN32
sem_wait(m_master_mode ? m_slave_sent : m_master_sent);
Message *response = m_receiver->receiveMessage();
sem_post(m_master_mode ? m_master_received : m_slave_received);

return response;
#else
DWORD waitResult = WaitForSingleObject(m_master_mode ? m_slave_sent : m_master_sent, INFINITE);
if (waitResult != WAIT_OBJECT_0) {
// Handle error
return nullptr;
}

// Receive the message
Message *response = m_receiver->receiveMessage();

// Post (release) the semaphore
BOOL releaseResult = ReleaseSemaphore(m_master_mode ? m_master_received : m_slave_received, 1, NULL);
if (!releaseResult) {
// Handle error
return nullptr;
}

return response;
#endif
}

Message *ProcCommunicator::sendAndGetResponse(const Message *msg)
Expand Down
13 changes: 13 additions & 0 deletions sh_mem/ProcCommunicator.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
#pragma once

#include <memory>
#ifndef _WIN32
#include <semaphore.h>
#else
#include <windows.h>
#endif
#include "SharedMemorySender.h"
#include "SharedMemoryReceiver.h"
#include "Message.hpp"
Expand All @@ -20,9 +25,17 @@ class ProcCommunicator
std::unique_ptr<SharedMemoryReceiver> m_receiver;
bool m_master_mode;
bool m_multiple_master;
#ifndef _WIN32
sem_t *m_master_received;
sem_t *m_slave_received;
sem_t *m_master_sent;
sem_t *m_slave_sent;
sem_t *m_slave_ready;
#else
HANDLE m_master_received;
HANDLE m_slave_received;
HANDLE m_master_sent;
HANDLE m_slave_sent;
HANDLE m_slave_ready;
#endif
};
62 changes: 60 additions & 2 deletions sh_mem/SharedMemoryReceiver.cpp
Original file line number Diff line number Diff line change
@@ -1,20 +1,39 @@
#include <iostream>
#include <cstdlib>
#include <cstring>
#ifndef _WIN32
#include <unistd.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <semaphore.h>
#else
#include <windows.h>
#include <string>
#include <iostream>
#endif
#include "SharedMemoryReceiver.h"
#include <vector>

const int SHARED_MEMORY_SIZE = 4096; // 4KB

SharedMemoryReceiver::SharedMemoryReceiver(const char *shMemName) : m_name(shMemName)
{
init();
#ifdef _WIN32
initWindowsSharedMemory();
#else
initUnixSharedMemory();
#endif
}
SharedMemoryReceiver::~SharedMemoryReceiver()
{
#ifdef _WIN32
finishWindows();
#else
finish();
#endif
}
void SharedMemoryReceiver::init()
#ifndef _WIN32
void SharedMemoryReceiver::initUnixSharedMemory()
{
// Try to create the shared memory segment
m_shm_fd = shm_open(m_name.c_str(), O_CREAT | O_EXCL | O_RDWR, 0666);
Expand Down Expand Up @@ -66,6 +85,45 @@ void SharedMemoryReceiver::finish()
std::cerr << "close failed" << std::endl;
}
}
#else
void SharedMemoryReceiver::initWindowsSharedMemory()
{
std::wstring wshMemName(m_name.begin(), m_name.end());
m_shm_fd = OpenFileMapping(
FILE_MAP_ALL_ACCESS, // read/write access
FALSE, // do not inherit the name
wshMemName.c_str()); // name of mapping object

if (m_shm_fd == NULL)
{
printf(("Could not open file mapping object (%d).\n"),
GetLastError());
return;
}

m_ptr = (void *)MapViewOfFile(m_shm_fd, // handle to map object
FILE_MAP_ALL_ACCESS, // read/write permission
0,
0,
SHARED_MEMORY_SIZE);

if (m_ptr == NULL)
{
printf("Could not map view of file (%d).\n", GetLastError());

CloseHandle(m_shm_fd);

return;
}
}

void SharedMemoryReceiver::finishWindows()
{
UnmapViewOfFile(m_ptr);

CloseHandle(m_shm_fd);
}
#endif

Message *SharedMemoryReceiver::receiveMessage()
{
Expand Down
16 changes: 12 additions & 4 deletions sh_mem/SharedMemoryReceiver.h
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
#pragma once
#include <semaphore.h>
#include <string>
#include "Message.hpp"

class SharedMemoryReceiver
{
public:
SharedMemoryReceiver(const char *shMemName);
void init();
~SharedMemoryReceiver();

#ifdef WIN32
void initWindowsSharedMemory();
void finishWindows();
#else
void initUnixSharedMemory();
void finish();
#endif
Message *receiveMessage();

private:
#ifndef _WIN32
int m_shm_fd;
#else
HANDLE m_shm_fd;
#endif
void *m_ptr;
sem_t *m_sem;
sem_t *m_rec_sem;
std::string m_name;
};
Loading

0 comments on commit 161b5b9

Please sign in to comment.