From 8c44af55e8bb8dbfffe3bf72abe3216ee3450002 Mon Sep 17 00:00:00 2001
From: Felix Weiglhofer <weiglhofer@fias.uni-frankfurt.de>
Date: Tue, 11 Jul 2023 15:04:52 +0000
Subject: [PATCH] cbmreco: Add unpack monitoring.

---
 algo/global/Reco.cxx   | 23 +++++++++++++++++------
 algo/global/Reco.h     |  2 ++
 algo/unpack/Unpack.cxx | 14 +++++++++-----
 algo/unpack/Unpack.h   | 13 ++++++++++---
 4 files changed, 38 insertions(+), 14 deletions(-)

diff --git a/algo/global/Reco.cxx b/algo/global/Reco.cxx
index a15bc7fd26..8fb3e2d622 100644
--- a/algo/global/Reco.cxx
+++ b/algo/global/Reco.cxx
@@ -97,7 +97,9 @@ RecoResults Reco::Run(const fles::Timeslice& ts)
   L_(info) << ">>> Processing TS " << ts.index();
   xpu::set<cbm::algo::Params>(Params());
 
-  CbmDigiTimeslice digiTs;
+  std::vector<CbmStsDigi> digis;
+  UnpackMonitorData unpackMonitor;
+
   if (Opts().HasStep(Step::Unpack)) {
     switch (Params().sts.unpackMode) {
       case RecoParams::UnpackMode::XPU:
@@ -105,7 +107,11 @@ RecoResults Reco::Run(const fles::Timeslice& ts)
         throw std::runtime_error("XPU unpacker currently not implemented");
         break;
       default:
-      case RecoParams::UnpackMode::CPU: digiTs = fUnpack.Run(ts).first; break;
+      case RecoParams::UnpackMode::CPU:
+        auto result   = fUnpack.Run(ts);
+        digis         = result.first.fData.fSts.fDigis;
+        unpackMonitor = result.second;
+        break;
     }
   }
 
@@ -119,10 +125,7 @@ RecoResults Reco::Run(const fles::Timeslice& ts)
 
   xpu::timings ts_times = xpu::pop_timer();
 
-  if (HasMonitor())
-    GetMonitor().QueueMetric("cbm_reco", {{"hostname", fles::system::current_hostname()}},
-                             {{"tsProcessed", 1}, {"bytesProcessed", ts_utils::SizeBytes(ts)}});
-
+  QueueUnpackerMetrics(ts, unpackMonitor);
 
   PrintTimings(ts_times);
 
@@ -158,3 +161,11 @@ void Reco::PrintTimings(xpu::timings& timings)
     L_(info) << "TS Processing time (Wall): " << timings.wall() << " ms";
   }
 }
+
+void Reco::QueueUnpackerMetrics(const fles::Timeslice& ts, const UnpackMonitorData& monitor)
+{
+  if (!HasMonitor()) return;
+
+  GetMonitor().QueueMetric("cbmreco", {{"hostname", fles::system::current_hostname()}},
+                           {{"bytesInSts", monitor.fNumBytesInSts}, {"bytesInTof", monitor.fNumBytesInTof}});
+}
diff --git a/algo/global/Reco.h b/algo/global/Reco.h
index 2eef46610d..ed30d43d1c 100644
--- a/algo/global/Reco.h
+++ b/algo/global/Reco.h
@@ -48,6 +48,8 @@ namespace cbm::algo
     EventbuildChain fEventBuild;
 
     void Validate(const Options& opts);
+
+    void QueueUnpackerMetrics(const fles::Timeslice&, const UnpackMonitorData&);
   };
 }  // namespace cbm::algo
 
diff --git a/algo/unpack/Unpack.cxx b/algo/unpack/Unpack.cxx
index f9cf7e11a8..f8fbe902a5 100644
--- a/algo/unpack/Unpack.cxx
+++ b/algo/unpack/Unpack.cxx
@@ -28,11 +28,13 @@ namespace cbm::algo
     UnpackMonitorData& monitor = result.second;
 
     if (DetectorEnabled(Subsystem::STS)) {
-      ParallelMsLoop(Subsystem::STS, monitor, digiTs.fData.fSts.fDigis, monitor.fSts, *timeslice, fAlgoSts, 0x20);
+      monitor.fNumBytesInSts +=
+        ParallelMsLoop(Subsystem::STS, monitor, digiTs.fData.fSts.fDigis, monitor.fSts, *timeslice, fAlgoSts, 0x20);
     }
 
     if (DetectorEnabled(Subsystem::TOF)) {
-      ParallelMsLoop(Subsystem::TOF, monitor, digiTs.fData.fTof.fDigis, monitor.fTof, *timeslice, fAlgoTof, 0x00);
+      monitor.fNumBytesInTof +=
+        ParallelMsLoop(Subsystem::TOF, monitor, digiTs.fData.fTof.fDigis, monitor.fTof, *timeslice, fAlgoTof, 0x00);
     }
 
     if (DetectorEnabled(Subsystem::BMON)) {
@@ -147,9 +149,9 @@ namespace cbm::algo
 
   // ----------------------------------------------------------------------------
   template<class Digi, class UnpackAlgo, class Monitor>
-  void Unpack::ParallelMsLoop(const Subsystem subsystem, UnpackMonitorData& genericMonitor, std::vector<Digi>& digisOut,
-                              std::vector<Monitor>& monitorOut, const fles::Timeslice& ts,
-                              const std::map<u16, UnpackAlgo>& algos, u8 sys_ver)
+  size_t Unpack::ParallelMsLoop(const Subsystem subsystem, UnpackMonitorData& genericMonitor,
+                                std::vector<Digi>& digisOut, std::vector<Monitor>& monitorOut,
+                                const fles::Timeslice& ts, const std::map<u16, UnpackAlgo>& algos, u8 sys_ver)
   {
     xpu::scoped_timer t_(fles::to_string(subsystem));
 
@@ -193,6 +195,8 @@ namespace cbm::algo
     xpu::pop_timer();
 
     monitorOut = std::move(monitor);
+
+    return sizeBytes;
   }
   // ----------------------------------------------------------------------------
 
diff --git a/algo/unpack/Unpack.h b/algo/unpack/Unpack.h
index 6804b37a6e..13cb0b7d92 100644
--- a/algo/unpack/Unpack.h
+++ b/algo/unpack/Unpack.h
@@ -48,6 +48,13 @@ namespace cbm::algo
     std::vector<UnpackRichMonitorData> fRich;    ///< Monitoring data for RICH
     size_t fNumMs       = 0;
     size_t fNumBytes    = 0;
+    size_t fNumBytesInSts       = 0;
+    size_t fNumBytesInMuch      = 0;
+    size_t fNumBytesInTof       = 0;
+    size_t fNumBytesInBmon      = 0;
+    size_t fNumBytesInTrd       = 0;
+    size_t fNumBytesInTrd2d     = 0;
+    size_t fNumBytesInRich      = 0;
     size_t fNumDigis    = 0;
     size_t fNumCompUsed = 0;
     size_t fNumErrInvalidEqId   = 0;
@@ -141,9 +148,9 @@ namespace cbm::algo
 
     /** @brief Parallel microslice loop **/
     template<class Digi, class UnpackAlgo, class Monitor>
-    void ParallelMsLoop(const Subsystem subsystem, UnpackMonitorData& monitor, std::vector<Digi>& digisOut,
-                        std::vector<Monitor>& monitorOut, const fles::Timeslice& ts,
-                        const std::map<u16, UnpackAlgo>& algos, u8 sys_ver);
+    size_t ParallelMsLoop(const Subsystem subsystem, UnpackMonitorData& monitor, std::vector<Digi>& digisOut,
+                          std::vector<Monitor>& monitorOut, const fles::Timeslice& ts,
+                          const std::map<u16, UnpackAlgo>& algos, u8 sys_ver);
 
     std::pair<size_t, size_t> ParallelInit(const fles::Timeslice& ts, Subsystem subsystem,
                                            gsl::span<const uint16_t> legalEqIds, uint8_t sys_ver,
-- 
GitLab