Skip to content

Commit

Permalink
Cpp multi example (#84)
Browse files Browse the repository at this point in the history
- Small modifications to the ProducerConsumerQueue to allow for storage
in a vector
- Example showing multi threading in C++ using queues
- fixes for python bindings

---------

Co-authored-by: Bechir <[email protected]>
Co-authored-by: Bechir Braham <[email protected]>
  • Loading branch information
3 people authored Jul 16, 2024
1 parent 246ac90 commit 403d10b
Show file tree
Hide file tree
Showing 8 changed files with 300 additions and 9 deletions.
2 changes: 1 addition & 1 deletion examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ set(EXAMPLE_LIST "json_example;logger_example;numpy_read_example;multiport_examp
set(EXAMPLE_LIST "${EXAMPLE_LIST};mythen_example;numpy_write_example;zmq_sender_example;")
set(EXAMPLE_LIST "${EXAMPLE_LIST};cluster_example;zmq_multi_receiver;zmq_worker;zmq_sink")
set(EXAMPLE_LIST "${EXAMPLE_LIST};zmq_task_ventilator;zmq_restream_example;zmq_receiver_example")
set(EXAMPLE_LIST "${EXAMPLE_LIST};testing_pedestal;cluster_finder_example;reorder_moench_example")
set(EXAMPLE_LIST "${EXAMPLE_LIST};testing_pedestal;cluster_finder_example;reorder_moench_example;clustering")

foreach(example ${EXAMPLE_LIST})
add_executable(${example} ${example}.cpp)
Expand Down
151 changes: 151 additions & 0 deletions examples/clustering.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@

#include "aare/file_io.hpp"
#include "aare/processing/ClusterFinder.hpp"
#include "aare/processing/Pedestal.hpp"

#include <chrono>
#include <fmt/format.h>
using namespace std::chrono;
#include "aare/core/ProducerConsumerQueue.hpp"
#include <memory>
#include <thread>

using Queue = folly::ProducerConsumerQueue<aare::Frame>;

//Global constants, for easy tweaking
constexpr size_t print_interval = 100;
constexpr std::chrono::milliseconds default_wait(1);
constexpr uint32_t queue_size = 1000;
constexpr int n_frames = 8000;

// Small wrapper to do cluster finding in a thread
// helps with keeping track of stopping tokens etc.
class ThreadedClusterFinder {
std::atomic<bool> m_stop_requested = false;
std::atomic<size_t> m_frames_processed = 0;
Queue *m_queue = nullptr;
aare::Pedestal<double> m_pedestal;
int m_object_id;

public:
ThreadedClusterFinder(Queue &q, aare::Pedestal<double> pd, int id) : m_queue(&q), m_pedestal(pd), m_object_id(id) {}

size_t frames_processed() const { return m_frames_processed; }
void request_stop() {
fmt::print("{}:Stop requested\n", m_object_id);
m_stop_requested = true;
}

void find_clusters() {
aare::ClusterFinder cf(3, 3, 5, 0);
while (!m_stop_requested) {
aare::Frame frame(1, 1, aare::Dtype("u4"));
if (m_queue->read(frame)) {
auto clusters = cf.find_clusters_without_threshold(frame.view<uint16_t>(), m_pedestal, false);
m_frames_processed++;
if (m_frames_processed % print_interval == 0) {
fmt::print("{}:Found {} clusters\n", m_object_id, clusters.size());
}
} else {
// fmt::print("{}:Queue empty\n", m_object_id);
std::this_thread::sleep_for(default_wait);
}
}
fmt::print("{}:Done\n", m_object_id);
}
};

int main(int argc, char **argv) {
//Rudimentary argument parsing
if (argc != 3) {
fmt::print("Usage: {} <file> <n_threads>\n", argv[0]);
return 1;
}

std::filesystem::path fname(argv[1]);
fmt::print("Loading {}\n", fname.string());
const int n_threads = std::stoi(argv[2]);

aare::Pedestal<double> pd(400, 400, 1000);
aare::File f(fname, "r");

// Use the first 1000 frames to calculate the pedestal
// we can then copy this pedestal to each thread
auto t0 = high_resolution_clock::now();
for (int i = 0; i < 1000; ++i) {
aare::Frame frame = f.iread(i);
pd.push<uint16_t>(frame);
}
auto t1 = high_resolution_clock::now();
fmt::print("Pedestal run took: {}s\n", duration_cast<microseconds>(t1 - t0).count() / 1e6);

//---------------------------------------------------------------------------------------------
//---------------------- Now lets start with the setup for the threaded cluster finding

// We need one queue per thread...
std::vector<Queue> queues;
for (int i = 0; i < n_threads; ++i) {
queues.emplace_back(queue_size);
}

// and also one cluster finder per thread
std::vector<std::unique_ptr<ThreadedClusterFinder>> cluster_finders;
for (int i = 0; i < n_threads; ++i) {
cluster_finders.push_back(std::make_unique<ThreadedClusterFinder>(queues[i], pd, i));
}

// next we start the threads
std::vector<std::thread> threads;
for (int i = 0; i < n_threads; ++i) {
threads.emplace_back(&ThreadedClusterFinder::find_clusters, cluster_finders[i].get());
}

// Push frames to the queues

for (int i = 0; i < n_frames; ++i) {
// if the Queue is full, wait, there are better ways to do this =)
while (queues[i % n_threads].isFull()) {
// fmt::print("Queue {} is full, waiting\n", i % n_threads);
std::this_thread::sleep_for(default_wait);
}
queues[i % n_threads].write(f.iread(i+1000));
if (i % 100 == 0) {
fmt::print("Pushed frame {}\n", i);
}
}


// wait for all queues to be empty
for (auto &q : queues) {
while (!q.isEmpty()) {
// fmt::print("Finish Queue not empty, waiting\n");
std::this_thread::sleep_for(default_wait);
}
}

// and once empty we stop the cluster finders
for (auto &cf : cluster_finders) {
cf->request_stop();
}
for (auto &t : threads) {
t.join();
}

size_t total_frames = 0;
for (auto &cf : cluster_finders) {
total_frames += cf->frames_processed();
}
auto t2 = high_resolution_clock::now();
fmt::print("Processed {} frames in {}s\n", total_frames, duration_cast<microseconds>(t2 - t1).count() / 1e6);

// auto start = high_resolution_clock::now();
// aare::ClusterFinder cf(3, 3, 5, 0);
// for (int i = 1000; i<2000; ++i){
// aare::Frame frame = f.iread(i);
// auto clusters = cf.find_clusters_without_threshold(frame.view<uint16_t>(), pd, true);
// }

// auto stop = high_resolution_clock::now();
// auto duration = duration_cast<microseconds>(stop - start);
// fmt::print("Run took: {}s\n", duration.count()/1e6);
}
30 changes: 30 additions & 0 deletions src/core/include/aare/core/MultiThread.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// class that takes std::function as a constructor argument
// and run each of them in different threads

#pragma once

#include <functional>
#include <thread>
#include <vector>

namespace aare {

class MultiThread {
public:
explicit MultiThread(std::vector<std::function<void()>> const &functions) : functions_(functions) {}

void run() {
std::vector<std::thread> threads;
for (auto const &f : functions_) {
threads.emplace_back(f);
}
for (auto &t : threads) {
t.join();
}
}

private:
std::vector<std::function<void()>> functions_;
};

} // namespace aare
25 changes: 23 additions & 2 deletions src/core/include/aare/core/ProducerConsumerQueue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,25 @@ template <class T> struct ProducerConsumerQueue {
ProducerConsumerQueue(const ProducerConsumerQueue &) = delete;
ProducerConsumerQueue &operator=(const ProducerConsumerQueue &) = delete;


ProducerConsumerQueue(ProducerConsumerQueue &&other){
size_ = other.size_;
records_ = other.records_;
other.records_ = nullptr;
readIndex_ = other.readIndex_.load(std::memory_order_acquire);
writeIndex_ = other.writeIndex_.load(std::memory_order_acquire);
}
ProducerConsumerQueue &operator=(ProducerConsumerQueue &&other){
size_ = other.size_;
records_ = other.records_;
other.records_ = nullptr;
readIndex_ = other.readIndex_.load(std::memory_order_acquire);
writeIndex_ = other.writeIndex_.load(std::memory_order_acquire);
return *this;
}


ProducerConsumerQueue():ProducerConsumerQueue(2){};
// size must be >= 2.
//
// Also, note that the number of usable slots in the queue at any
Expand Down Expand Up @@ -169,8 +188,10 @@ template <class T> struct ProducerConsumerQueue {
using AtomicIndex = std::atomic<unsigned int>;

char pad0_[hardware_destructive_interference_size];
const uint32_t size_;
T *const records_;
// const uint32_t size_;
uint32_t size_;
// T *const records_;
T* records_;

alignas(hardware_destructive_interference_size) AtomicIndex readIndex_;
alignas(hardware_destructive_interference_size) AtomicIndex writeIndex_;
Expand Down
19 changes: 19 additions & 0 deletions src/python/cpp/core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "aare/core/Frame.hpp"
#include "aare/core/Transforms.hpp"
#include "aare/core/defs.hpp"
#include "aare/core/MultiThread.hpp"

template <typename T> void define_to_frame(py::module &m) {
m.def("to_frame", [](py::array_t<T> &np_array) {
Expand Down Expand Up @@ -156,4 +157,22 @@ void define_core_bindings(py::module &m) {
define_to_frame<int64_t>(m);
define_to_frame<float>(m);
define_to_frame<double>(m);


py::class_<MultiThread>(m, "MultiThread")
// .def(py::init([](std::vector<std::function<void()>> const &python_functions) {
// std::vector<std::function<void()>> functions;
// for (auto const &python_function : python_functions) {
// std::function<void()> tmp = [python_function]() { python_function(); };
// functions.emplace_back(tmp);
// }
// return std::unique_ptr<MultiThread>(new MultiThread(functions));
// }

// ))
.def(py::init<std::vector<std::function<void()>> const &>())
.def("run", [](MultiThread &self) {
py::gil_scoped_release release;
return self.run();
});
}
2 changes: 1 addition & 1 deletion src/python/cpp/file_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ void define_file_io_bindings(py::module &m) {
.def(py::init<const std::filesystem::path &, const std::string &, const FileConfig &>())
.def("read", py::overload_cast<>(&File::read))
.def("read", py::overload_cast<size_t>(&File::read))
.def("iread", py::overload_cast<size_t>(&File::iread))
.def("iread", py::overload_cast<size_t>(&File::iread),py::call_guard<py::gil_scoped_release>())
.def("frame_number", &File::frame_number)
.def_property_readonly("bytes_per_frame", &File::bytes_per_frame)
.def_property_readonly("pixels_per_frame", &File::pixels_per_frame)
Expand Down
40 changes: 35 additions & 5 deletions src/python/cpp/processing.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ template <typename SUM_TYPE> void define_pedestal_bindings(py::module &m) {
.def(py::init<int, int>())
.def("set_freeze", &Pedestal<SUM_TYPE>::set_freeze)
.def("mean", py::overload_cast<>(&Pedestal<SUM_TYPE>::mean))
.def("mean", [](Pedestal<SUM_TYPE> &pedestal, const uint32_t row, const uint32_t col) { return pedestal.mean(row, col); })
.def("mean", [](Pedestal<SUM_TYPE> &pedestal, const uint32_t row,
const uint32_t col) { return pedestal.mean(row, col); })
.def("variance", py::overload_cast<>(&Pedestal<SUM_TYPE>::variance))
.def("variance",
[](Pedestal<SUM_TYPE> &pedestal, const uint32_t row, const uint32_t col) { return pedestal.variance(row, col); })
.def("variance", [](Pedestal<SUM_TYPE> &pedestal, const uint32_t row,
const uint32_t col) { return pedestal.variance(row, col); })
.def("standard_deviation", py::overload_cast<>(&Pedestal<SUM_TYPE>::standard_deviation))
.def("standard_deviation", [](Pedestal<SUM_TYPE> &pedestal, const int row,
const int col) { return pedestal.standard_deviation(row, col); })
Expand All @@ -57,7 +58,10 @@ template <typename SUM_TYPE> void define_pedestal_bindings(py::module &m) {
.def_property_readonly("n_samples", &Pedestal<SUM_TYPE>::n_samples)
.def_property_readonly("index", &Pedestal<SUM_TYPE>::index)
.def_property_readonly("sum", &Pedestal<SUM_TYPE>::get_sum)
.def_property_readonly("sum2", &Pedestal<SUM_TYPE>::get_sum2);
.def_property_readonly("sum2", &Pedestal<SUM_TYPE>::get_sum2)
.def("copy",[&](Pedestal<SUM_TYPE> &pedestal) {
return Pedestal<SUM_TYPE>(pedestal);
});
p.def("push", [](Pedestal<SUM_TYPE> &pedestal, Frame &f) {
if (f.bitdepth() == 8) {
pedestal.template push<uint8_t>(f);
Expand Down Expand Up @@ -138,11 +142,37 @@ void define_cluster_finder_bindings(py::module &m) {
define_cluster_finder_template_bindings<int64_t>(cf);
define_cluster_finder_template_bindings<float>(cf);
define_cluster_finder_template_bindings<double>(cf);
cf.def("find_clusters_without_threshold",
[](ClusterFinder &self, Frame &f, Pedestal<double> &pedestal, bool late_update) {
if (f.dtype() == Dtype::INT8) {
return self.find_clusters_without_threshold(f.view<int8_t>(), pedestal, late_update);
} else if (f.dtype() == Dtype::INT16) {
return self.find_clusters_without_threshold(f.view<int16_t>(), pedestal, late_update);
} else if (f.dtype() == Dtype::INT32) {
return self.find_clusters_without_threshold(f.view<int32_t>(), pedestal, late_update);
} else if (f.dtype() == Dtype::INT64) {
return self.find_clusters_without_threshold(f.view<int64_t>(), pedestal, late_update);
} else if (f.dtype() == Dtype::UINT8) {
return self.find_clusters_without_threshold(f.view<uint8_t>(), pedestal, late_update);
} else if (f.dtype() == Dtype::UINT16) {
return self.find_clusters_without_threshold(f.view<uint16_t>(), pedestal, late_update);
} else if (f.dtype() == Dtype::UINT32) {
return self.find_clusters_without_threshold(f.view<uint32_t>(), pedestal, late_update);
} else if (f.dtype() == Dtype::UINT64) {
return self.find_clusters_without_threshold(f.view<uint64_t>(), pedestal, late_update);
} else if (f.dtype() == Dtype::FLOAT) {
return self.find_clusters_without_threshold(f.view<float>(), pedestal, late_update);
} else if (f.dtype() == Dtype::DOUBLE) {
return self.find_clusters_without_threshold(f.view<double>(), pedestal, late_update);
} else {
throw std::runtime_error("Unsupported dtype");
}
},py::call_guard<py::gil_scoped_release>());
}
void define_processing_bindings(py::module &m) {
define_pedestal_bindings<double>(m);

py::class_<Cluster>(m, "Cluster",py::buffer_protocol())
py::class_<Cluster>(m, "Cluster", py::buffer_protocol())
.def(py::init<int, int, Dtype>())
.def("size", &Cluster::size)
.def("begin", &Cluster::begin)
Expand Down
40 changes: 40 additions & 0 deletions src/python/example/mt_clusterFinder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from pathlib import Path
import sys
PROJECT_ROOT_DIR=(Path(__file__) / "../../../../").resolve()
print(PROJECT_ROOT_DIR)
sys.path.append(str((PROJECT_ROOT_DIR / 'build').resolve()))
from threading import Thread

from _aare import *

file_path = None
N_THREADS = None
if len(sys.argv) <= 2:
raise Exception("Usage: mt_clusterFinder.py <file> <n_threads>")
else:
file_path = sys.argv[1]
N_THREADS = int(sys.argv[2])

file = File(file_path)
pedestal=Pedestal(400,400,1000)
for i in range(1000):
frame = file.iread(i)
pedestal.push(frame)
print("Pedestal done")

def f(idx,n):
def g():
print("Hello from thread",idx)
f = File(file_path)
p = pedestal.copy()
cf = ClusterFinder(3,3,5.0,0)
for i in range(idx,1000,n):
frame = f.iread(i)
clusters=cf.find_clusters_without_threshold(frame,p,False)

print("Goodbye from thread",idx)
return g


mt = MultiThread([f(i,N_THREADS) for i in range(N_THREADS)])
mt.run()

0 comments on commit 403d10b

Please sign in to comment.