diff --git a/src/abycore/aby/abyparty.cpp b/src/abycore/aby/abyparty.cpp index f11770bc..45b46202 100644 --- a/src/abycore/aby/abyparty.cpp +++ b/src/abycore/aby/abyparty.cpp @@ -51,6 +51,9 @@ class ABYParty::CPartyWorkerThread: public CThread { m_evt.Set(); } + CEvent* GetEvent() { + return &m_evt; + } private: void ThreadMain(); uint32_t threadid; @@ -211,7 +214,7 @@ void ABYParty::ExecCircuit() { std::cout << "Performing setup phase for " << m_vSharings[i]->sharing_type() << " sharing" << std::endl; #endif if(i == S_YAO) { - StartWatch("Starting Circuit Garbling", P_GARBLE); + StartRecording("Starting Circuit Garbling", P_GARBLE, m_vSockets); if(m_eRole == SERVER) { m_vSharings[S_YAO]->PerformSetupPhase(m_pSetup.get()); m_vSharings[S_YAO_REV]->PerformSetupPhase(m_pSetup.get()); @@ -223,7 +226,7 @@ void ABYParty::ExecCircuit() { m_vSharings[S_YAO_REV]->PerformSetupPhase(m_pSetup.get());*/ m_vSharings[S_YAO]->FinishSetupPhase(m_pSetup.get()); m_vSharings[S_YAO_REV]->FinishSetupPhase(m_pSetup.get()); - StopWatch("Time for Circuit garbling: ", P_GARBLE); + StopRecording("Time for Circuit garbling: ", P_GARBLE, m_vSockets); } else if (i == S_YAO_REV) { //Do nothing, was done in parallel to Yao } else { @@ -409,7 +412,7 @@ BOOL ABYParty::PerformInteraction() { return success; } -BOOL ABYParty::ThreadSendValues() { +BOOL ABYParty::ThreadSendValues(uint32_t id) { std::vector >sendbuf(m_vSharings.size()); std::vector >sndbytes(m_vSharings.size()); @@ -440,7 +443,7 @@ BOOL ABYParty::ThreadSendValues() { //gettimeofday(&tstart, NULL); if(snd_buf_size_total > 0) { //m_vSockets[2]->Send(snd_buf_total, snd_buf_size_total); - m_tPartyChan->send(snd_buf_total, snd_buf_size_total); + m_tPartyChan->blocking_send(m_vThreads[id]->GetEvent(), snd_buf_total, snd_buf_size_total); } free(snd_buf_total); @@ -697,7 +700,7 @@ void ABYParty::CPartyWorkerThread::ThreadMain() { return; case e_Party_Comm: if (threadid == 0){ - bSuccess = m_pCallback->ThreadSendValues(); + bSuccess = m_pCallback->ThreadSendValues(threadid); } else{ bSuccess = m_pCallback->ThreadReceiveValues(); diff --git a/src/abycore/aby/abyparty.h b/src/abycore/aby/abyparty.h index 9017d0ed..7f9f711a 100644 --- a/src/abycore/aby/abyparty.h +++ b/src/abycore/aby/abyparty.h @@ -83,7 +83,7 @@ class ABYParty { void UsedGate(uint32_t gateid); BOOL PerformInteraction(); - BOOL ThreadSendValues(); + BOOL ThreadSendValues(uint32_t id); BOOL ThreadReceiveValues(); void PrintPerformanceStatistics(); diff --git a/src/abycore/aby/abysetup.cpp b/src/abycore/aby/abysetup.cpp index e157b05d..43f4ac77 100644 --- a/src/abycore/aby/abysetup.cpp +++ b/src/abycore/aby/abysetup.cpp @@ -484,11 +484,13 @@ void ABYSetup::AddReceiveTask(BYTE* rcvbuf, uint64_t rcvbytes) { WakeupWorkerThreads(e_Receive); } -BOOL ABYSetup::ThreadSendData() { - m_tSetupChan->send(m_tsndtask.sndbuf, m_tsndtask.sndbytes); + +BOOL ABYSetup::ThreadSendData(uint32_t threadid) { + m_tSetupChan->blocking_send(m_vThreads[threadid]->GetEvent(), m_tsndtask.sndbuf, m_tsndtask.sndbytes); return true; } + BOOL ABYSetup::ThreadReceiveData() { m_tSetupChan->blocking_receive(m_trcvtask.rcvbuf, m_trcvtask.rcvbytes); return true; diff --git a/src/abycore/aby/abysetup.h b/src/abycore/aby/abysetup.h index f6196168..bdafab47 100644 --- a/src/abycore/aby/abysetup.h +++ b/src/abycore/aby/abysetup.h @@ -161,8 +161,8 @@ class ABYSetup { BOOL ThreadRunKKSnd(uint32_t exec); BOOL ThreadRunKKRcv(uint32_t exec); - BOOL ThreadSendData(); - BOOL ThreadReceiveData(); + BOOL ThreadSendData(uint32_t id); + BOOL ThreadReceiveData(uint32_t id); BOOL ThreadRunPaillierMTGen(uint32_t exec); BOOL ThreadRunDGKMTGen(uint32_t threadid); @@ -220,6 +220,9 @@ class ABYSetup { m_eJob = e; m_evt.Set(); } + CEvent* GetEvent() { + return &m_evt; + } private: void ThreadMain(); uint32_t threadid;