Skip to content
Snippets Groups Projects
Commit 7d6e5ce5 authored by Dominik Smith's avatar Dominik Smith
Browse files

Applied OpenMP parallelization to cbm::algo::tof::Hitfind.

parent b9a9f2a0
No related branches found
No related tags found
1 merge request!1494OpenMP parallelization of cbm::algo::tof::Hitfind.
Pipeline #25638 failed
......@@ -6,6 +6,7 @@
#include <chrono>
#include "compat/OpenMP.h"
#include "log.hpp"
#include "util/TimingsFormat.h"
......@@ -57,7 +58,10 @@ namespace cbm::algo::tof
par->fChanPar[Ch].cell.sizeX = rpcPar.cell.sizeX;
par->fChanPar[Ch].cell.sizeY = rpcPar.cell.sizeY;
}
fAlgo[SmType].emplace(std::make_pair(Sm * NbRpc + Rpc, tof::Clusterizer(std::move(*par))));
fAlgo.emplace_back(std::move(*par));
// fill unique rpc pointer vectors for parallelization
fStorDigiPtr.push_back(&fStorDigi[SmType][Sm * NbRpc + Rpc]);
}
}
}
......@@ -70,18 +74,13 @@ namespace cbm::algo::tof
// ----- Execution -------------------------------------------------------
Hitfind::resultType Hitfind::operator()(gsl::span<CbmTofDigi> digiIn)
{
xpu::push_timer("TofHitfind");
xpu::t_add_bytes(digiIn.size_bytes());
// --- Output data
resultType result = {};
auto& clusterTs = std::get<0>(result);
auto& monitor = std::get<1>(result);
auto& digiInd = std::get<2>(result); // digi indices
resultType result = {};
auto& [clusterTs, monitor, digiInd] = result;
// Loop over the digis array and store the Digis in separate vectors for
// each RPC modules
xpu::push_timer("TofHitfindChanSort");
for (size_t idigi = 0; idigi < digiIn.size(); idigi++) {
CbmTofDigi* pDigi = &(digiIn[idigi]);
......@@ -96,48 +95,91 @@ namespace cbm::algo::tof
}
fStorDigi[SmType][Sm * NbRpc + Rpc].emplace_back(*pDigi, idigi);
}
monitor.fSortTime = xpu::pop_timer();
std::vector<Hit> clustersFlat;
std::vector<Hit> clustersFlat; // cluster storage
std::vector<size_t> chanSizes; // nClusters per channel
std::vector<u32> chanAddresses; // channel addresses
// --- RPC loop
for (uint32_t SmType = 0; SmType < fNbSm.size(); SmType++) {
const uint32_t NbRpc = fNbRpc[SmType];
const uint32_t NbSm = fNbSm[SmType];
for (uint32_t Sm = 0; Sm < NbSm; Sm++) {
for (uint32_t Rpc = 0; Rpc < NbRpc; Rpc++) {
// Get digis
std::vector<std::pair<CbmTofDigi, int32_t>>& digiExp = fStorDigi[SmType][Sm * NbRpc + Rpc];
// Build clusters
auto rpcresult = fAlgo[SmType][Sm * NbRpc + Rpc](digiExp);
std::vector<Hit>& clusters = std::get<0>(rpcresult); // Hits
std::vector<size_t>& sizes = std::get<1>(rpcresult); // nClusters per channel
std::vector<u32>& addresses = std::get<2>(rpcresult); // channel addresses
std::vector<i32>& indices = std::get<3>(rpcresult); // digi indices
// Append clusters to output
clustersFlat.insert(clustersFlat.end(), std::make_move_iterator(clusters.begin()),
std::make_move_iterator(clusters.end()));
// Store hw address of partition
chanAddresses.insert(chanAddresses.end(), std::make_move_iterator(addresses.begin()),
std::make_move_iterator(addresses.end()));
// store partition size
chanSizes.insert(chanSizes.end(), std::make_move_iterator(sizes.begin()),
std::make_move_iterator(sizes.end()));
// store digi indices
digiInd.insert(digiInd.end(), std::make_move_iterator(indices.begin()),
std::make_move_iterator(indices.end()));
// Clear digi storage
digiExp.clear();
// Prefix arrays for parallelization
std::vector<size_t> cluPrefix;
std::vector<size_t> sizePrefix;
std::vector<size_t> addrPrefix;
std::vector<size_t> indPrefix;
xpu::push_timer("TofHitfind");
xpu::t_add_bytes(digiIn.size_bytes());
#pragma omp parallel
{
#ifdef _OPENMP
int ithread = omp_get_thread_num();
int nthreads = omp_get_num_threads();
#else
int ithread = 0;
int nthreads = 1;
#endif
#pragma omp single
{
cluPrefix.resize(nthreads + 1);
sizePrefix.resize(nthreads + 1);
addrPrefix.resize(nthreads + 1);
indPrefix.resize(nthreads + 1);
}
auto [clusters, sizes, addresses, indices] = Clusterizer::resultType();
#pragma omp for schedule(dynamic) nowait
for (uint32_t iRpc = 0; iRpc < fAlgo.size(); iRpc++) {
// Get digis
std::vector<std::pair<CbmTofDigi, int32_t>>& digiExp = *fStorDigiPtr[iRpc];
// Build clusters
auto [rpc_clu, rpc_size, rpc_addr, rpc_ind] = fAlgo[iRpc](digiExp);
// Append clusters to output
clusters.insert(clusters.end(), std::make_move_iterator(rpc_clu.begin()),
std::make_move_iterator(rpc_clu.end()));
// store partition size
sizes.insert(sizes.end(), std::make_move_iterator(rpc_size.begin()), std::make_move_iterator(rpc_size.end()));
// Store hw address of partition
addresses.insert(addresses.end(), std::make_move_iterator(rpc_addr.begin()),
std::make_move_iterator(rpc_addr.end()));
// store digi indices
indices.insert(indices.end(), std::make_move_iterator(rpc_ind.begin()), std::make_move_iterator(rpc_ind.end()));
// Clear digi storage
digiExp.clear();
}
cluPrefix[ithread + 1] = clusters.size();
sizePrefix[ithread + 1] = sizes.size();
addrPrefix[ithread + 1] = addresses.size();
indPrefix[ithread + 1] = indices.size();
#pragma omp barrier
#pragma omp single
{
for (int i = 1; i < (nthreads + 1); i++) {
cluPrefix[i] += cluPrefix[i - 1];
sizePrefix[i] += sizePrefix[i - 1];
addrPrefix[i] += addrPrefix[i - 1];
indPrefix[i] += indPrefix[i - 1];
}
clustersFlat.resize(cluPrefix[nthreads]);
chanSizes.resize(sizePrefix[nthreads]);
chanAddresses.resize(addrPrefix[nthreads]);
digiInd.resize(indPrefix[nthreads]);
}
std::move(clusters.begin(), clusters.end(), clustersFlat.begin() + cluPrefix[ithread]);
std::move(sizes.begin(), sizes.end(), chanSizes.begin() + sizePrefix[ithread]);
std::move(addresses.begin(), addresses.end(), chanAddresses.begin() + addrPrefix[ithread]);
std::move(indices.begin(), indices.end(), digiInd.begin() + indPrefix[ithread]);
}
// Monitoring
......
......@@ -30,6 +30,7 @@ namespace cbm::algo::tof
struct HitfindMonitorData {
//std::vector<tof::ClusterizerMonitorData> fMonitor; //Per RPC monitoring data, to be implemented
xpu::timings fTime;
xpu::timings fSortTime;
size_t fNumDigis = 0;
size_t fNumHits = 0;
......@@ -37,7 +38,7 @@ namespace cbm::algo::tof
{
std::stringstream ss;
ss << "Hitfind stats: num digis " << fNumDigis << ", time " << fTime.wall() << " ms ( " << fTime.throughput()
<< " GB/s ), num hits " << fNumHits << std::endl;
<< " GB/s ), sort time " << fSortTime.wall() << " ms, num hits " << fNumHits << std::endl;
return ss.str();
}
};
......@@ -65,8 +66,8 @@ namespace cbm::algo::tof
explicit Hitfind(tof::HitfindSetup);
private: // members
/** @brief TOF hitfinders **/
std::map<uint32_t, std::map<uint32_t, tof::Clusterizer>> fAlgo; //[nbType][nbSm*nbRpc]
/** @brief TOF hitfinders (with unique RPC index for OpenMP) **/
std::vector<tof::Clusterizer> fAlgo; //[rpcUnique]
/** @brief Number of SMs per super module type **/
std::vector<int32_t> fNbSm;
......@@ -76,6 +77,9 @@ namespace cbm::algo::tof
/** @brief Intermediate storage variables (digi, index) **/
std::vector<std::vector<std::vector<std::pair<CbmTofDigi, int32_t>>>> fStorDigi; //[nbType][nbSm*nbRpc][nDigis]
/** @brief Pointer to storage variables with unique RPC index (for OpenMP) **/
std::vector<std::vector<std::pair<CbmTofDigi, int32_t>>*> fStorDigiPtr; //[rpcUnique][nDigis]
};
} // namespace cbm::algo::tof
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment