From d214378f668e648316817cb3c09ce045a05e48e7 Mon Sep 17 00:00:00 2001
From: Felix Weiglhofer <weiglhofer@fias.uni-frankfurt.de>
Date: Tue, 27 Jun 2023 13:42:09 +0000
Subject: [PATCH] algo::Unpack: Parallelize STS Unpacker.

---
 algo/unpack/Unpack.cxx | 78 ++++++++++++++++++++++++++++++++++++++++--
 algo/unpack/Unpack.h   | 19 ++++++++++
 2 files changed, 94 insertions(+), 3 deletions(-)

diff --git a/algo/unpack/Unpack.cxx b/algo/unpack/Unpack.cxx
index 9d9780bfd5..67fb08580d 100644
--- a/algo/unpack/Unpack.cxx
+++ b/algo/unpack/Unpack.cxx
@@ -22,6 +22,12 @@ namespace cbm::algo
     CbmDigiTimeslice& digiTs   = result.first;
     UnpackMonitorData& monitor = result.second;
 
+    ParallelInit(*timeslice);
+
+    if (DetectorEnabled(fles::SubsystemIdentifier::STS)) {
+      ParallelMsLoop(digiTs.fData.fSts.fDigis, monitor.fSts, *timeslice, 0x20);
+    }
+
     // ---  Component loop
     for (uint64_t comp = 0; comp < timeslice->num_components(); comp++) {
 
@@ -38,9 +44,9 @@ namespace cbm::algo
       // In the future, different data formats will be supported by instantiating different
       // algorithms depending on the version.
 
-      if (systemId == fles::SubsystemIdentifier::STS) {
-        MsLoop(timeslice, fAlgoSts, comp, equipmentId, &digiTs.fData.fSts.fDigis, monitor, &monitor.fSts, 0x20);
-      }
+      // if (systemId == fles::SubsystemIdentifier::STS) {
+      //   MsLoop(timeslice, fAlgoSts, comp, equipmentId, &digiTs.fData.fSts.fDigis, monitor, &monitor.fSts, 0x20);
+      // }
       if (systemId == fles::SubsystemIdentifier::MUCH) {
         MsLoop(timeslice, fAlgoMuch, comp, equipmentId, &digiTs.fData.fMuch.fDigis, monitor, &monitor.fMuch, 0x20);
       }
@@ -136,6 +142,7 @@ namespace cbm::algo
   }
   // ----------------------------------------------------------------------------
 
+
   // -----   Initialisation   ---------------------------------------------------
   void Unpack::Init(std::vector<fles::SubsystemIdentifier> subIds)
   {
@@ -295,4 +302,69 @@ namespace cbm::algo
   }
   // ----------------------------------------------------------------------------
 
+  // ----------------------------------------------------------------------------
+  void Unpack::ParallelInit(const fles::Timeslice& timeslice)
+  {
+    fParallelStsSetup = {};
+
+    size_t numMs       = 0;
+    size_t maxNumDigis = 0;
+    for (uint64_t comp = 0; comp < timeslice.num_components(); comp++) {
+      auto systemId = static_cast<fles::SubsystemIdentifier>(timeslice.descriptor(comp, 0).sys_id);
+      if (systemId == fles::SubsystemIdentifier::STS) {
+        uint64_t numMsInComp = timeslice.num_microslices(comp);
+        numMs += numMsInComp;
+        u16 componentId = timeslice.descriptor(comp, 0).eq_id;
+        for (uint64_t mslice = 0; mslice < numMsInComp; mslice++) {
+          uint64_t msByteSize     = timeslice.descriptor(comp, mslice).size;
+          uint64_t numDigisInComp = msByteSize / sizeof(CbmStsDigi);
+          if (numDigisInComp > maxNumDigis) maxNumDigis = numDigisInComp;
+          fParallelStsSetup.msEquipmentIds.push_back(componentId);
+          fParallelStsSetup.msDescriptors.push_back(timeslice.descriptor(comp, mslice));
+          fParallelStsSetup.msContent.push_back(timeslice.content(comp, mslice));
+        }
+      }
+    }
+    fParallelStsSetup.msDigis.resize(numMs);
+    fParallelStsSetup.msMonitorData.resize(numMs);
+  }
+  // ----------------------------------------------------------------------------
+
+  // ----------------------------------------------------------------------------
+  void Unpack::ParallelMsLoop(std::vector<CbmStsDigi>& digisOut, std::vector<UnpackStsMonitorData>& monitorOut,
+                              const fles::Timeslice& ts, u8 sys_ver)
+  {
+    const auto& msContent = fParallelStsSetup.msContent;
+    const auto& msDesc    = fParallelStsSetup.msDescriptors;
+    const auto& msEqIds   = fParallelStsSetup.msEquipmentIds;
+    auto& monitor         = fParallelStsSetup.msMonitorData;
+    auto& msDigis         = fParallelStsSetup.msDigis;
+    size_t numMs          = msDigis.size();
+
+#pragma omp parallel for schedule(dynamic)
+    for (size_t i = 0; i < numMs; i++) {
+      auto result = fAlgoSts.at(msEqIds[i])(msContent[i], msDesc[i], ts.start_time());
+      msDigis[i]  = std::move(result.first);
+      monitor[i]  = std::move(result.second);
+    }
+
+    size_t nDigisTotal = 0;
+    for (const auto& digis : msDigis) {
+      nDigisTotal += digis.size();
+    }
+
+    digisOut.resize(nDigisTotal);
+#pragma omp parallel for schedule(dynamic)
+    for (unsigned int i = 0; i < numMs; i++) {
+      unsigned int offset = 0;
+      for (unsigned int x = 0; x < i; x++)
+        offset += msDigis[x].size();
+      std::copy(msDigis[i].begin(), msDigis[i].end(), digisOut.begin() + offset);
+    }
+
+    monitorOut = std::move(monitor);
+
+    // Todo: Combine monitor Data
+  }
+
 } /* namespace cbm::algo */
diff --git a/algo/unpack/Unpack.h b/algo/unpack/Unpack.h
index a4955939c3..26189dd835 100644
--- a/algo/unpack/Unpack.h
+++ b/algo/unpack/Unpack.h
@@ -25,6 +25,7 @@
 #include "much/UnpackMuch.h"
 #include "rich/RichReadoutConfig.h"
 #include "rich/UnpackRich.h"
+#include "sts/Digi.h"
 #include "sts/StsReadoutConfigLegacy.h"
 #include "sts/UnpackSts.h"
 
@@ -129,6 +130,15 @@ namespace cbm::algo
       return std::find(fSubIds.begin(), fSubIds.end(), subId) != fSubIds.end();
     }
 
+  private:  // types
+    struct ParallelSetup {
+      std::vector<u16> msEquipmentIds;
+      std::vector<fles::MicrosliceDescriptor> msDescriptors;
+      std::vector<const u8*> msContent;
+      std::vector<std::vector<sts::Digi>> msDigis;
+      std::vector<UnpackStsMonitorData> msMonitorData;
+    };
+
   private:  // methods
     /** @brief Microslice loop **/
     template<class Digi, class UnpackAlgo, class MonitorData>
@@ -136,6 +146,13 @@ namespace cbm::algo
                 const uint16_t eqId, std::vector<Digi>* digis, UnpackMonitorData& monitor,
                 std::vector<MonitorData>* monitorMs, uint8_t sys_ver);
 
+    /** Init parallel unpacker */
+    void ParallelInit(const fles::Timeslice& timeslice);
+
+    /** @brief Parallel microslice loop **/
+    void ParallelMsLoop(std::vector<CbmStsDigi>& digisOut, std::vector<UnpackStsMonitorData>& monitor,
+                        const fles::Timeslice& ts, u8 sys_ver);
+
 
   private:                                                  // members
     bool fApplyWalkCorrection                      = true;  ///< Apply walk correction
@@ -168,6 +185,8 @@ namespace cbm::algo
       {fles::SubsystemIdentifier::RICH, 100},  {fles::SubsystemIdentifier::RPC, 40},
       {fles::SubsystemIdentifier::T0, 0},      {fles::SubsystemIdentifier::TRD, 1300},
       {fles::SubsystemIdentifier::TRD2D, -510}};
+    /** @brief Parallel STS Setup */
+    ParallelSetup fParallelStsSetup = {};
   };
 }  // namespace cbm::algo
 
-- 
GitLab