Skip to content

Commit

Permalink
Port Procummunicator to Windows
Browse files Browse the repository at this point in the history
  • Loading branch information
ben committed Aug 18, 2024
1 parent c6ac8f4 commit cc0ad58
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 31 deletions.
136 changes: 117 additions & 19 deletions sh_mem/ProcCommunicator.cpp
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
#include "ProcCommunicator.h"
#include <iostream>
#ifndef _WIN32
#include <unistd.h>
#include <fcntl.h>
#endif

static constexpr int SEMAPHORE_DISABLED = 0;
static constexpr int SEMAPHORE_ENABLED = 1;

ProcCommunicator::ProcCommunicator(const bool isMasterMode,
const bool isMultipleMasters,
const std::string &shMemName) : m_master_mode(isMasterMode),
m_multiple_master(isMultipleMasters),
m_multiple_master(isMultipleMasters)/*,
m_master_received((sem_t *)-1),
m_slave_received((sem_t *)-1),
m_master_sent((sem_t *)-1),
m_slave_sent((sem_t *)-1)
m_slave_sent((sem_t *)-1)*/
{
const std::string master_mem_name = shMemName + "_master";
const std::string slave_mem_name = shMemName + "_slave";
#ifndef _WIN32
if (isMasterMode)
{
m_sender = std::make_unique<SharedMemorySender>(master_mem_name.c_str());
Expand All @@ -29,33 +32,62 @@ ProcCommunicator::ProcCommunicator(const bool isMasterMode,

if (isMultipleMasters)
m_slave_ready = sem_open((shMemName + "_s_ready").c_str(), O_CREAT, 0666, SEMAPHORE_ENABLED);

if (m_master_received == SEM_FAILED || m_slave_received == SEM_FAILED ||
m_master_sent == SEM_FAILED || m_slave_sent == SEM_FAILED || m_slave_ready == SEM_FAILED)
{
perror("ProcCommunicator sem_open failure.");
}
}
else
{
m_sender = std::make_unique<SharedMemorySender>(slave_mem_name.c_str());
m_receiver = std::make_unique<SharedMemoryReceiver>(master_mem_name.c_str());

while (m_master_received == SEM_FAILED || m_slave_received == SEM_FAILED ||
m_master_sent == SEM_FAILED || m_slave_sent == SEM_FAILED || m_slave_ready == SEM_FAILED)
{
m_master_received = sem_open((shMemName + "_m_rsem").c_str(), O_RDWR, 0666, SEMAPHORE_DISABLED);
m_slave_received = sem_open((shMemName + "_s_rsem").c_str(), O_RDWR, 0666, SEMAPHORE_DISABLED);
m_master_sent = sem_open((shMemName + "_m_sent").c_str(), O_RDWR, 0666, SEMAPHORE_DISABLED);
m_slave_sent = sem_open((shMemName + "_s_sent").c_str(), O_RDWR, 0666, SEMAPHORE_DISABLED);
m_master_received = sem_open((shMemName + "_m_rsem").c_str(), O_RDWR, 0666, SEMAPHORE_DISABLED);
m_slave_received = sem_open((shMemName + "_s_rsem").c_str(), O_RDWR, 0666, SEMAPHORE_DISABLED);
m_master_sent = sem_open((shMemName + "_m_sent").c_str(), O_RDWR, 0666, SEMAPHORE_DISABLED);
m_slave_sent = sem_open((shMemName + "_s_sent").c_str(), O_RDWR, 0666, SEMAPHORE_DISABLED);

if (isMultipleMasters)
m_slave_ready = sem_open((shMemName + "_s_ready").c_str(), O_RDWR, 0666, SEMAPHORE_ENABLED);

}
if (m_master_received == SEM_FAILED || m_slave_received == SEM_FAILED ||
m_master_sent == SEM_FAILED || m_slave_sent == SEM_FAILED || m_slave_ready == SEM_FAILED)
{
perror("ProcCommunicator sem_open failure.");
}
#else
m_sender = std::make_unique<SharedMemorySender>(slave_mem_name.c_str());
m_receiver = std::make_unique<SharedMemoryReceiver>(master_mem_name.c_str());

std::wstring wshMemName(shMemName.begin(), shMemName.end());

if (!(m_master_received = CreateSemaphoreW(NULL, SEMAPHORE_DISABLED, MAXLONG, (wshMemName + L"_m_rsem").c_str())))
m_master_received = OpenSemaphoreW(SYNCHRONIZE | SEMAPHORE_MODIFY_STATE, 0, (wshMemName + L"_m_rsem").c_str());

if (!(m_slave_received = CreateSemaphoreW(NULL, SEMAPHORE_DISABLED, MAXLONG, (wshMemName + L"_s_rsem").c_str())))
m_slave_received = OpenSemaphoreW(SYNCHRONIZE | SEMAPHORE_MODIFY_STATE, 0, (wshMemName + L"_s_rsem").c_str());

if (!(m_master_sent = CreateSemaphoreW(NULL, SEMAPHORE_DISABLED, MAXLONG, (wshMemName + L"_m_sent").c_str())))
m_master_sent = OpenSemaphoreW(SYNCHRONIZE | SEMAPHORE_MODIFY_STATE, 0, (wshMemName + L"_m_sent").c_str());

if (isMultipleMasters)
m_slave_ready = sem_open((shMemName + "_s_ready").c_str(), O_RDWR, 0666, SEMAPHORE_ENABLED);
}
if (!(m_slave_sent = CreateSemaphoreW(NULL, SEMAPHORE_DISABLED, MAXLONG, (wshMemName + L"_s_ready").c_str())))
m_slave_sent = OpenSemaphoreW(SYNCHRONIZE | SEMAPHORE_MODIFY_STATE, 0, (wshMemName + L"_s_ready").c_str());

if (isMultipleMasters)
{
if (!(m_slave_ready = CreateSemaphoreW(NULL, SEMAPHORE_DISABLED, MAXLONG, (wshMemName + L"_s_ready").c_str())))
m_slave_ready = OpenSemaphoreW(SYNCHRONIZE | SEMAPHORE_MODIFY_STATE, 0, (wshMemName + L"_s_ready").c_str());
}

if (m_master_received == NULL || m_slave_received == NULL ||
m_master_sent == NULL || m_slave_sent == NULL || m_slave_ready == NULL )
{
perror("ProcCommunicator sem_open failure.");
}

#endif
}

ProcCommunicator::~ProcCommunicator()
{
#ifndef _WIN32
if (sem_close(m_master_received) == -1)
{
perror("Failed to destroy m_master_received semaphore");
Expand All @@ -76,8 +108,30 @@ ProcCommunicator::~ProcCommunicator()
{
perror("Failed to destroy m_slave_ready semaphore");
}
#else
if (m_master_received && CloseHandle(m_master_received))
{
perror("Failed to destroy m_master_received semaphore");
}
if (m_slave_received && CloseHandle(m_slave_received))
{
perror("Failed to destroy m_slave_received semaphore");
}
if (m_master_sent && CloseHandle(m_master_sent))
{
perror("Failed to destroy m_master_sent semaphore");
}
if (m_slave_sent && CloseHandle(m_slave_sent))
{
perror("Failed to destroy m_slave_sent semaphore");
}
if (m_multiple_master && m_slave_ready && CloseHandle(m_slave_ready))
{
perror("Failed to destroy m_slave_ready semaphore");
}
#endif
}

#ifndef _WIN32
void ProcCommunicator::send(const Message *msg)
{
if (m_multiple_master && m_master_mode)
Expand All @@ -99,6 +153,50 @@ Message *ProcCommunicator::receive()

return response;
}
#else
void ProcCommunicator::send(const Message *msg)
{
// Wait for the semaphore to be signaled
if (m_multiple_master && m_master_mode) {
WaitForSingleObject(m_slave_ready, INFINITE); // INFINITE timeout to wait indefinitely
}

// Send the message
m_sender->sendMessage(msg);

// Post (release) the semaphore
ReleaseSemaphore(m_master_mode ? m_master_sent : m_slave_sent, 1, NULL);

// Wait for the semaphore to be signaled
WaitForSingleObject(m_master_mode ? m_slave_received : m_master_received, INFINITE);

// Post (release) the semaphore if needed
if (m_multiple_master && !m_master_mode) {
ReleaseSemaphore(m_slave_ready, 1, NULL);
}
}

Message *ProcCommunicator::receive()
{
DWORD waitResult = WaitForSingleObject(m_master_mode ? m_slave_sent : m_master_sent, INFINITE);
if (waitResult != WAIT_OBJECT_0) {
// Handle error
return nullptr;
}

// Receive the message
Message *response = m_receiver->receiveMessage();

// Post (release) the semaphore
BOOL releaseResult = ReleaseSemaphore(m_master_mode ? m_master_received : m_slave_received, 1, NULL);
if (!releaseResult) {
// Handle error
return nullptr;
}

return response;
}
#endif

Message *ProcCommunicator::sendAndGetResponse(const Message *msg)
{
Expand Down
14 changes: 13 additions & 1 deletion sh_mem/ProcCommunicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
#include "SharedMemorySender.h"
#include "SharedMemoryReceiver.h"
#include "Message.hpp"

#ifndef _WIN32
#include <semaphore.h>
#else
#include <windows.h>
#endif
class ProcCommunicator
{
public:
Expand All @@ -20,9 +24,17 @@ class ProcCommunicator
std::unique_ptr<SharedMemoryReceiver> m_receiver;
bool m_master_mode;
bool m_multiple_master;
#ifndef _WIN32
sem_t *m_master_received;
sem_t *m_slave_received;
sem_t *m_master_sent;
sem_t *m_slave_sent;
sem_t *m_slave_ready;
#else
HANDLE m_master_received;
HANDLE m_slave_received;
HANDLE m_master_sent;
HANDLE m_slave_sent;
HANDLE m_slave_ready;
#endif
};
46 changes: 45 additions & 1 deletion sh_mem/SharedMemoryReceiver.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
#include <iostream>
#include <cstdlib>
#include <cstring>
#ifndef _WIN32
#include <unistd.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <semaphore.h>
#else
#include <windows.h>
#include <string>
#include <iostream>
#endif
#include "SharedMemoryReceiver.h"
#include <vector>

Expand All @@ -14,6 +19,7 @@ SharedMemoryReceiver::SharedMemoryReceiver(const char *shMemName) : m_name(shMem
{
init();
}
#ifndef _WIN32
void SharedMemoryReceiver::init()
{
// Try to create the shared memory segment
Expand Down Expand Up @@ -66,6 +72,44 @@ void SharedMemoryReceiver::finish()
std::cerr << "close failed" << std::endl;
}
}
#else
void SharedMemoryReceiver::init()
{
std::wstring wshMemName(m_name.begin(), m_name.end());
m_shm_fd = OpenFileMappingW(
FILE_MAP_ALL_ACCESS, // read/write access
FALSE, // do not inherit the name
wshMemName.c_str()); // name of mapping object

if (m_shm_fd == NULL)
{
printf(("Could not open file mapping object (%d).\n"),
GetLastError());
return;
}

m_ptr = (void *)MapViewOfFile(m_shm_fd, // handle to map object
FILE_MAP_ALL_ACCESS, // read/write permission
0,
0,
SHARED_MEMORY_SIZE);

if (m_ptr == NULL)
{
printf("Could not map view of file (%d).\n", GetLastError());

CloseHandle(m_shm_fd);

return;
}
}

void SharedMemoryReceiver::finish()
{
UnmapViewOfFile(m_ptr);
CloseHandle(m_shm_fd);
}
#endif

Message *SharedMemoryReceiver::receiveMessage()
{
Expand Down
9 changes: 6 additions & 3 deletions sh_mem/SharedMemoryReceiver.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#pragma once
#include <semaphore.h>
#include <string>
#include "Message.hpp"

Expand All @@ -12,9 +11,13 @@ class SharedMemoryReceiver
Message *receiveMessage();

private:
#ifndef _WIN32
int m_shm_fd;
#else
HANDLE m_shm_fd;
#endif

void *m_ptr;
sem_t *m_sem;
sem_t *m_rec_sem;

std::string m_name;
};
Loading

0 comments on commit cc0ad58

Please sign in to comment.