From f73626c59027b754b4775ed7dd5677194435c9db Mon Sep 17 00:00:00 2001
From: Felix Weiglhofer <weiglhofer@fias.uni-frankfurt.de>
Date: Tue, 7 May 2024 16:45:04 +0000
Subject: [PATCH] online: Monitor idle time and time spend writing output.

---
 algo/CMakeLists.txt                 |  1 +
 algo/base/PartitionedVector.h       |  5 +++++
 algo/global/Reco.cxx                | 15 +++++++++++++
 algo/global/Reco.h                  | 12 ++++++++++
 algo/global/StorableRecoResults.cxx | 35 +++++++++++++++++++++++++++++
 algo/global/StorableRecoResults.h   |  5 +++++
 core/data/base/CbmDigiData.h        | 18 ++++++++++++++-
 reco/app/cbmreco/main.cxx           | 15 +++++++++++++
 8 files changed, 105 insertions(+), 1 deletion(-)
 create mode 100644 algo/global/StorableRecoResults.cxx

diff --git a/algo/CMakeLists.txt b/algo/CMakeLists.txt
index 7992e1d36a..bc7874f883 100644
--- a/algo/CMakeLists.txt
+++ b/algo/CMakeLists.txt
@@ -144,6 +144,7 @@ set(SRCS
   detectors/rich/Unpack.cxx
   detectors/rich/UnpackMS.cxx
   global/ParFiles.cxx
+  global/StorableRecoResults.cxx
   global/Reco.cxx
   global/RecoResultsInputArchive.cxx
   global/RecoResultsOutputArchive.cxx
diff --git a/algo/base/PartitionedVector.h b/algo/base/PartitionedVector.h
index 0e1c83636c..119c2518ad 100644
--- a/algo/base/PartitionedVector.h
+++ b/algo/base/PartitionedVector.h
@@ -142,6 +142,11 @@ namespace cbm::algo
      */
     size_t NElements() const { return fData.size(); }
 
+    /**
+     * @brief Return total size in bytes of the underlying data.
+     */
+    size_t SizeBytes() const { return fData.size() * sizeof(T); }
+
     /**
      * @brief Get the underlying data.
      */
diff --git a/algo/global/Reco.cxx b/algo/global/Reco.cxx
index 3d5d594361..d922ba9ebd 100644
--- a/algo/global/Reco.cxx
+++ b/algo/global/Reco.cxx
@@ -573,3 +573,18 @@ void Reco::QueueProcessingMetrics(const ProcessingMonitor& mon)
   GetMonitor().QueueMetric("cbmreco", {{"hostname", fles::system::current_hostname()}, {"child", Opts().ChildId()}},
                            std::move(fields));
 }
+
+void Reco::QueueProcessingExtraMetrics(const ProcessingExtraMonitor& mon)
+{
+  if (!HasMonitor()) {
+    return;
+  }
+
+  MetricFieldSet fields = {{"processingTimeIdle", FilterNan(mon.timeIdle)},
+                           {"processingTimeWriteArchive", mon.timeWriteArchive.wall()},
+                           {"processingThroughputWriteArchive", FilterNan(mon.timeWriteArchive.throughput())},
+                           {"processingBytesWritten", FilterNan(mon.bytesWritten)}};
+
+  GetMonitor().QueueMetric("cbmreco", {{"hostname", fles::system::current_hostname()}, {"child", Opts().ChildId()}},
+                           std::move(fields));
+}
diff --git a/algo/global/Reco.h b/algo/global/Reco.h
index 0496e3b3cf..803d770759 100644
--- a/algo/global/Reco.h
+++ b/algo/global/Reco.h
@@ -97,6 +97,16 @@ namespace cbm::algo
     std::optional<i64> tsDelta;  //< id difference between current and previous timeslice
   };
 
+  /**
+   * @brief Monitor for additional processing steps
+   * @note Used in the main function, this should be eventually merged with ProcessingMonitor and we have a single class that handles the full processing loop
+   */
+  struct ProcessingExtraMonitor {
+    xpu::timings timeWriteArchive;  //< time spent writing archive
+    size_t bytesWritten;            //< bytes written to archive (estimated)
+    double timeIdle = 0.;           //< time spent idle (waiting for next timeslice) [ms]
+  };
+
   class Reco : SubChain {
    public:
     Reco();
@@ -112,6 +122,8 @@ namespace cbm::algo
     void Finalize();
     void PrintTimings(xpu::timings&);
 
+    void QueueProcessingExtraMetrics(const ProcessingExtraMonitor&);
+
    private:
     bool fInitialized = false;
     ChainContext fContext;
diff --git a/algo/global/StorableRecoResults.cxx b/algo/global/StorableRecoResults.cxx
new file mode 100644
index 0000000000..5df67c5a68
--- /dev/null
+++ b/algo/global/StorableRecoResults.cxx
@@ -0,0 +1,35 @@
+/* Copyright (C) 2024 FIAS Frankfurt Institute for Advanced Studies, Frankfurt / Main
+   SPDX-License-Identifier: GPL-3.0-only
+   Authors: Felix Weiglhofer [committer], P.-A. Loizeau */
+#include "StorableRecoResults.h"
+
+using namespace cbm::algo;
+
+size_t StorableRecoResults::SizeBytes() const
+{
+  size_t size = 0;
+  size += fBmonDigis.size() * sizeof(CbmBmonDigi);
+  size += fStsDigis.size() * sizeof(CbmStsDigi);
+  size += fMuchDigis.size() * sizeof(CbmMuchDigi);
+  size += fTrd2dDigis.size() * sizeof(CbmTrdDigi);
+  size += fTrdDigis.size() * sizeof(CbmTrdDigi);
+  size += fTofDigis.size() * sizeof(CbmTofDigi);
+  size += fRichDigis.size() * sizeof(CbmRichDigi);
+
+  for (const auto& ev : fDigiEvents) {
+    size += ev.fData.SizeBytes();
+  }
+
+  size += fStsClusters.SizeBytes();
+  size += fStsHits.SizeBytes();
+  size += fTofHits.SizeBytes();
+  size += fTrdHits.SizeBytes();
+
+  size += fTracks.size() * sizeof(ca::Track);
+
+  // Exclude TrackHitIndexContainers for now to avoid looping over all tracks
+  // Better way to do this: Just query from boost the size of the written archive.
+  // Requires changes in flesnet to the archive classes for this
+
+  return size;
+}
diff --git a/algo/global/StorableRecoResults.h b/algo/global/StorableRecoResults.h
index 962f5b28f5..5560b7d1ad 100644
--- a/algo/global/StorableRecoResults.h
+++ b/algo/global/StorableRecoResults.h
@@ -44,6 +44,11 @@ namespace cbm::algo
      */
     uint64_t TsStartTime() const { return fTsStartTime; }
 
+    /**
+     * @brief Total size in bytes
+     */
+    size_t SizeBytes() const;
+
     std::vector<CbmBmonDigi>& BmonDigis() { return fBmonDigis; }
     const std::vector<CbmBmonDigi>& BmonDigis() const { return fBmonDigis; }
 
diff --git a/core/data/base/CbmDigiData.h b/core/data/base/CbmDigiData.h
index 5d802e0625..c864e8ba6f 100644
--- a/core/data/base/CbmDigiData.h
+++ b/core/data/base/CbmDigiData.h
@@ -31,7 +31,7 @@
  **/
 class CbmDigiData {
 
-public:
+ public:
   CbmBmonDigiData fBmon;  ///< Beam monitor data
   CbmStsDigiData fSts;    ///< STS data
   CbmMuchDigiData fMuch;  ///< MUCH data
@@ -94,6 +94,22 @@ public:
       default: return 0; break;
     }
   }
+
+  /** @brief Return total size in bytes */
+  size_t SizeBytes() const
+  {
+    size_t size = 0;
+    size += fBmon.Size() * sizeof(CbmBmonDigi);
+    size += fSts.Size() * sizeof(CbmStsDigi);
+    size += fMuch.Size() * sizeof(CbmMuchDigi);
+    size += fTrd.Size() * sizeof(CbmTrdDigi);
+    size += fTrd2d.Size() * sizeof(CbmTrdDigi);
+    size += fTof.Size() * sizeof(CbmTofDigi);
+    size += fPsd.Size() * sizeof(CbmPsdDigi);
+    size += fFsd.Size() * sizeof(CbmFsdDigi);
+    size += fRich.Size() * sizeof(CbmRichDigi);
+    return size;
+  }
 };
 
 BOOST_CLASS_VERSION(CbmDigiData, 5)
diff --git a/reco/app/cbmreco/main.cxx b/reco/app/cbmreco/main.cxx
index cf1f46e520..524141472e 100644
--- a/reco/app/cbmreco/main.cxx
+++ b/reco/app/cbmreco/main.cxx
@@ -172,6 +172,8 @@ int main(int argc, char** argv)
 
   fles::TimesliceAutoSource source(opts.InputLocator());
 
+  ProcessingExtraMonitor extraMonitor;
+
   std::optional<RecoResultsOutputArchive> archive;
   if (!opts.OutputFile().empty()) {
     L_(info) << "Writing results to file: " << opts.OutputFile();
@@ -186,16 +188,26 @@ int main(int argc, char** argv)
   int num_ts = opts.NumTimeslices();
   if (num_ts > 0) num_ts += opts.SkipTimeslices();
   L_(debug) << "Starting to fetch timeslices from source...";
+
+  auto startFetchTS = std::chrono::high_resolution_clock::now();
   while (auto ts = source.get()) {
     if (tsIdx < opts.SkipTimeslices()) {
       tsIdx++;
       continue;
     }
 
+    auto endFetchTS      = std::chrono::high_resolution_clock::now();
+    auto durationFetchTS = endFetchTS - startFetchTS;
+    extraMonitor.timeIdle +=
+      std::chrono::duration_cast<std::chrono::duration<double, std::milli>>(durationFetchTS).count();
+
     try {
       RecoResults result = reco.Run(*ts);
       if (archive) {
+        xpu::scoped_timer t_{"Write Archive", &extraMonitor.timeWriteArchive};
         auto storable = makeStorableRecoResults(*ts, result);
+        extraMonitor.bytesWritten = storable->SizeBytes();
+        xpu::t_add_bytes(extraMonitor.bytesWritten);
         archive->put(storable);
       }
     }
@@ -203,6 +215,7 @@ int main(int argc, char** argv)
       // TODO: Add flag if we want to abort on exception or continue with next timeslice
       L_(error) << "Caught ProcessingError while processing timeslice " << tsIdx << ": " << e.what();
     }
+    reco.QueueProcessingExtraMetrics(extraMonitor);
 
     // Release memory after each timeslice and log memory usage
     // This is useful to detect memory leaks as the memory usage should be constant between timeslices
@@ -212,6 +225,8 @@ int main(int argc, char** argv)
     tsIdx++;
 
     if (num_ts > 0 && tsIdx >= num_ts) break;
+
+    startFetchTS = std::chrono::high_resolution_clock::now();
   }
 
   if (archive) archive->end_stream();
-- 
GitLab