Skip to content

Commit

Permalink
Multi threaded cluster finder. (#115)
Browse files Browse the repository at this point in the history
Added a prototype for the multi threaded cluster finder including python
bindings
  • Loading branch information
erikfrojdh authored Jan 9, 2025
2 parents 21ce7a3 + cc95561 commit 72d10b7
Show file tree
Hide file tree
Showing 19 changed files with 861 additions and 77 deletions.
2 changes: 1 addition & 1 deletion conda-recipe/meta.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package:
name: aare
version: 2025.1.7.dev0 #TODO! how to not duplicate this?
version: 2025.1.9.dev0 #TODO! how to not duplicate this?


source:
Expand Down
97 changes: 97 additions & 0 deletions include/aare/CircularFifo.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#pragma once

#include <chrono>
#include <fmt/color.h>
#include <fmt/format.h>
#include <memory>
#include <thread>

#include "aare/ProducerConsumerQueue.hpp"

namespace aare {

template <class ItemType> class CircularFifo {
uint32_t fifo_size;
aare::ProducerConsumerQueue<ItemType> free_slots;
aare::ProducerConsumerQueue<ItemType> filled_slots;

public:
CircularFifo() : CircularFifo(100){};
CircularFifo(uint32_t size) : fifo_size(size), free_slots(size + 1), filled_slots(size + 1) {

// TODO! how do we deal with alignment for writing? alignas???
// Do we give the user a chance to provide memory locations?
// Templated allocator?
for (size_t i = 0; i < fifo_size; ++i) {
free_slots.write(ItemType{});
}
}

bool next() {
// TODO! avoid default constructing ItemType
ItemType it;
if (!filled_slots.read(it))
return false;
if (!free_slots.write(std::move(it)))
return false;
return true;
}

~CircularFifo() {}

using value_type = ItemType;

auto numFilledSlots() const noexcept { return filled_slots.sizeGuess(); }
auto numFreeSlots() const noexcept { return free_slots.sizeGuess(); }
auto isFull() const noexcept { return filled_slots.isFull(); }

ItemType pop_free() {
ItemType v;
while (!free_slots.read(v))
;
return std::move(v);
// return v;
}

bool try_pop_free(ItemType &v) { return free_slots.read(v); }

ItemType pop_value(std::chrono::nanoseconds wait, std::atomic<bool> &stopped) {
ItemType v;
while (!filled_slots.read(v) && !stopped) {
std::this_thread::sleep_for(wait);
}
return std::move(v);
}

ItemType pop_value() {
ItemType v;
while (!filled_slots.read(v))
;
return std::move(v);
}

ItemType *frontPtr() { return filled_slots.frontPtr(); }

// TODO! Add function to move item from filled to free to be used
// with the frontPtr function

template <class... Args> void push_value(Args &&...recordArgs) {
while (!filled_slots.write(std::forward<Args>(recordArgs)...))
;
}

template <class... Args> bool try_push_value(Args &&...recordArgs) {
return filled_slots.write(std::forward<Args>(recordArgs)...);
}

template <class... Args> void push_free(Args &&...recordArgs) {
while (!free_slots.write(std::forward<Args>(recordArgs)...))
;
}

template <class... Args> bool try_push_free(Args &&...recordArgs) {
return free_slots.write(std::forward<Args>(recordArgs)...);
}
};

} // namespace aare
52 changes: 52 additions & 0 deletions include/aare/ClusterCollector.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#pragma once
#include <atomic>
#include <thread>

#include "aare/ProducerConsumerQueue.hpp"
#include "aare/ClusterVector.hpp"
#include "aare/ClusterFinderMT.hpp"

namespace aare {

class ClusterCollector{
ProducerConsumerQueue<ClusterVector<int>>* m_source;
std::atomic<bool> m_stop_requested{false};
std::atomic<bool> m_stopped{true};
std::chrono::milliseconds m_default_wait{1};
std::thread m_thread;
std::vector<ClusterVector<int>> m_clusters;

void process(){
m_stopped = false;
fmt::print("ClusterCollector started\n");
while (!m_stop_requested || !m_source->isEmpty()) {
if (ClusterVector<int> *clusters = m_source->frontPtr();
clusters != nullptr) {
m_clusters.push_back(std::move(*clusters));
m_source->popFront();
}else{
std::this_thread::sleep_for(m_default_wait);
}
}
fmt::print("ClusterCollector stopped\n");
m_stopped = true;
}

public:
ClusterCollector(ClusterFinderMT<uint16_t, double, int32_t>* source){
m_source = source->sink();
m_thread = std::thread(&ClusterCollector::process, this);
}
void stop(){
m_stop_requested = true;
m_thread.join();
}
std::vector<ClusterVector<int>> steal_clusters(){
if(!m_stopped){
throw std::runtime_error("ClusterCollector is still running");
}
return std::move(m_clusters);
}
};

} // namespace aare
7 changes: 3 additions & 4 deletions include/aare/ClusterFile.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,9 @@ class ClusterFile {
ClusterFile(const std::filesystem::path &fname, size_t chunk_size = 1000,
const std::string &mode = "r");
~ClusterFile();
std::vector<Cluster3x3> read_clusters(size_t n_clusters);
std::vector<Cluster3x3> read_frame(int32_t &out_fnum);
void write_frame(int32_t frame_number,
const ClusterVector<int32_t> &clusters);
ClusterVector<int32_t> read_clusters(size_t n_clusters);
ClusterVector<int32_t> read_frame();
void write_frame(const ClusterVector<int32_t> &clusters);
std::vector<Cluster3x3>
read_cluster_with_cut(size_t n_clusters, double *noise_map, int nx, int ny);

Expand Down
56 changes: 56 additions & 0 deletions include/aare/ClusterFileSink.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#pragma once
#include <atomic>
#include <filesystem>
#include <thread>

#include "aare/ProducerConsumerQueue.hpp"
#include "aare/ClusterVector.hpp"
#include "aare/ClusterFinderMT.hpp"

namespace aare{

class ClusterFileSink{
ProducerConsumerQueue<ClusterVector<int>>* m_source;
std::atomic<bool> m_stop_requested{false};
std::atomic<bool> m_stopped{true};
std::chrono::milliseconds m_default_wait{1};
std::thread m_thread;
std::ofstream m_file;


void process(){
m_stopped = false;
fmt::print("ClusterFileSink started\n");
while (!m_stop_requested || !m_source->isEmpty()) {
if (ClusterVector<int> *clusters = m_source->frontPtr();
clusters != nullptr) {
// Write clusters to file
int32_t frame_number = clusters->frame_number(); //TODO! Should we store frame number already as int?
uint32_t num_clusters = clusters->size();
m_file.write(reinterpret_cast<const char*>(&frame_number), sizeof(frame_number));
m_file.write(reinterpret_cast<const char*>(&num_clusters), sizeof(num_clusters));
m_file.write(reinterpret_cast<const char*>(clusters->data()), clusters->size() * clusters->item_size());
m_source->popFront();
}else{
std::this_thread::sleep_for(m_default_wait);
}
}
fmt::print("ClusterFileSink stopped\n");
m_stopped = true;
}

public:
ClusterFileSink(ClusterFinderMT<uint16_t, double, int32_t>* source, const std::filesystem::path& fname){
m_source = source->sink();
m_thread = std::thread(&ClusterFileSink::process, this);
m_file.open(fname, std::ios::binary);
}
void stop(){
m_stop_requested = true;
m_thread.join();
m_file.close();
}
};


} // namespace aare
18 changes: 2 additions & 16 deletions include/aare/ClusterFinder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,12 @@

namespace aare {

/** enum to define the event types */
enum class eventType {
PEDESTAL, /** pedestal */
NEIGHBOUR, /** neighbour i.e. below threshold, but in the cluster of a
photon */
PHOTON, /** photon i.e. above threshold */
PHOTON_MAX, /** maximum of a cluster satisfying the photon conditions */
NEGATIVE_PEDESTAL, /** negative value, will not be accounted for as pedestal
in order to avoid drift of the pedestal towards
negative values */
UNDEFINED_EVENT = -1 /** undefined */
};

template <typename FRAME_TYPE = uint16_t, typename PEDESTAL_TYPE = double,
typename CT = int32_t>
class ClusterFinder {
Shape<2> m_image_size;
const int m_cluster_sizeX;
const int m_cluster_sizeY;
// const PEDESTAL_TYPE m_threshold;
const PEDESTAL_TYPE m_nSigma;
const PEDESTAL_TYPE c2;
const PEDESTAL_TYPE c3;
Expand Down Expand Up @@ -78,13 +64,13 @@ class ClusterFinder {
m_clusters = ClusterVector<CT>(m_cluster_sizeX, m_cluster_sizeY);
return tmp;
}
void find_clusters(NDView<FRAME_TYPE, 2> frame) {
void find_clusters(NDView<FRAME_TYPE, 2> frame, uint64_t frame_number = 0) {
// // TODO! deal with even size clusters
// // currently 3,3 -> +/- 1
// // 4,4 -> +/- 2
int dy = m_cluster_sizeY / 2;
int dx = m_cluster_sizeX / 2;

m_clusters.set_frame_number(frame_number);
std::vector<CT> cluster_data(m_cluster_sizeX * m_cluster_sizeY);
for (int iy = 0; iy < frame.shape(0); iy++) {
for (int ix = 0; ix < frame.shape(1); ix++) {
Expand Down
Loading

0 comments on commit 72d10b7

Please sign in to comment.