From 043446e8d6f407f6a4ec50b1d7c1296b1c0a2eee Mon Sep 17 00:00:00 2001
From: "P.-A. Loizeau" <p.-a.loizeau@gsi.de>
Date: Tue, 14 May 2024 16:35:34 +0200
Subject: [PATCH] Add support for stream compression in online hist-sender and
 hist-server

---
 algo/base/HistogramSender.h              | 27 ++++++++-
 algo/base/Options.cxx                    |  4 ++
 algo/base/Options.h                      |  2 +
 algo/global/Reco.cxx                     |  3 +-
 services/histserv/app/Application.cxx    | 74 +++++++++++++++++++++---
 services/histserv/app/ProgramOptions.cxx |  4 ++
 services/histserv/app/ProgramOptions.h   |  4 ++
 7 files changed, 106 insertions(+), 12 deletions(-)

diff --git a/algo/base/HistogramSender.h b/algo/base/HistogramSender.h
index 343a8e4a93..d4bb0597a5 100644
--- a/algo/base/HistogramSender.h
+++ b/algo/base/HistogramSender.h
@@ -7,6 +7,10 @@
 #include <boost/archive/binary_oarchive.hpp>
 #include <boost/iostreams/device/array.hpp>
 #include <boost/iostreams/device/back_inserter.hpp>
+#ifdef BOOST_IOS_HAS_ZSTD
+#include <boost/iostreams/filter/zstd.hpp>
+#include <boost/iostreams/filtering_stream.hpp>
+#endif
 #include <boost/iostreams/stream.hpp>
 #include <boost/serialization/utility.hpp>
 #include <boost/serialization/vector.hpp>
@@ -20,9 +24,10 @@ namespace cbm::algo
 
   class HistogramSender {
    public:
-    HistogramSender(std::string_view address, int32_t hwm = 1)
+    HistogramSender(std::string_view address, int32_t hwm = 1, bool compression = false)
       : fHistComChan(address)
       , fHistHighWaterMark(hwm)
+      , fbCompression(compression)
       , fZmqContext(1)
       , fZmqSocket(fZmqContext, zmq::socket_type::push)
     {
@@ -44,10 +49,25 @@ namespace cbm::algo
       std::string serial_str;
       b_io::back_insert_device<std::string> inserter(serial_str);
       b_io::stream<b_io::back_insert_device<std::string>> bstream(inserter);
-      b_ar::binary_oarchive oa(bstream);
 
       serial_str.clear();
-      oa << obj;
+
+      if (fbCompression) {
+#ifdef BOOST_IOS_HAS_ZSTD
+        std::unique_ptr<b_io::filtering_ostream> out_ = std::make_unique<b_io::filtering_ostream>();
+        out_->push(b_io::zstd_compressor(b_io::zstd::best_speed));
+        out_->push(bstream);
+        std::unique_ptr<b_ar::binary_oarchive> oarchive_ =
+          std::make_unique<b_ar::binary_oarchive>(*out_, b_ar::no_header);
+        *oarchive_ << obj;
+#else
+        throw std::runtime_error("Unsupported ZSTD compression (boost) for histograms emissions channel");
+#endif
+      }
+      else {
+        b_ar::binary_oarchive oa(bstream);
+        oa << obj;
+      }
       bstream.flush();
 
       zmq::message_t msg(serial_str.size());
@@ -59,6 +79,7 @@ namespace cbm::algo
    private:
     std::string fHistComChan   = "tcp://127.0.0.1:56800";
     int32_t fHistHighWaterMark = 1;
+    bool fbCompression         = false;
     zmq::context_t fZmqContext;  ///< ZMQ context FIXME: should be only one context per binary!
     zmq::socket_t fZmqSocket;    ///< ZMQ socket to histogram server
   };
diff --git a/algo/base/Options.cxx b/algo/base/Options.cxx
index 55dca18c13..85b256b6c9 100644
--- a/algo/base/Options.cxx
+++ b/algo/base/Options.cxx
@@ -79,6 +79,10 @@ Options::Options(int argc, char** argv)
     ("aux-data", po::value(&fCollectAuxData)->implicit_value(true), "Enables collecting of auxiliary data from algorithms")
     ("qa", po::value(&fQaSteps)->multitoken()->default_value({QaStep::UnpackSts, QaStep::EventBuilding, QaStep::Tracking})->value_name("<qa steps>"),
       "space separated list of QA Steps to enable (BeamBmon, UnpackSts, EventBuilding, Tracking, ...)")
+#ifdef BOOST_IOS_HAS_ZSTD
+    ("hist-compr", po::bool_switch(&fCompressHistograms)->default_value(false),
+      "enables ZSTD compression of the outgoing histograms stream (decompression needed in target server!)")
+#endif
     ("log-file,L", po::value(&fLogFile)->value_name("<file>"),
       "write log messages to file")
     ("output-types,O", po::value(&fOutputTypes)->multitoken()->value_name("<types>"),
diff --git a/algo/base/Options.h b/algo/base/Options.h
index 51a8bb6fe0..b5247796c6 100644
--- a/algo/base/Options.h
+++ b/algo/base/Options.h
@@ -32,6 +32,7 @@ namespace cbm::algo
     const std::string& HistogramUri() const { return fHistogramUri; }
     const int32_t& HistogramHwm() const { return fHistogramHwm; }
     bool CollectAuxData() const { return fCollectAuxData; }
+    const bool& CompressHistograms() const { return fCompressHistograms; }
     bool CollectKernelTimes() const { return fProfilingLevel != ProfilingNone; }
     ProfilingLevel Profiling() const { return fProfilingLevel; }
     fs::path TimingsFile() const { return fTimingsFile; }
@@ -75,6 +76,7 @@ namespace cbm::algo
     std::string fMonitorUri;
     std::string fHistogramUri;
     int32_t fHistogramHwm;
+    bool fCompressHistograms = false;
     std::vector<QaStep> fQaSteps;
     bool fDumpArchive              = false;
     bool fReleaseMode              = false;
diff --git a/algo/global/Reco.cxx b/algo/global/Reco.cxx
index a5a3696f06..a58d419693 100644
--- a/algo/global/Reco.cxx
+++ b/algo/global/Reco.cxx
@@ -86,7 +86,8 @@ void Reco::Init(const Options& opts)
   SetContext(&fContext);
 
   if (Opts().HistogramUri() != "") {
-    fSender = std::make_shared<HistogramSender>(Opts().HistogramUri(), Opts().HistogramHwm());
+    fSender =
+      std::make_shared<HistogramSender>(Opts().HistogramUri(), Opts().HistogramHwm(), Opts().CompressHistograms());
     // fContext.sender = fSender;
 
     fRunStartTimeNs = Opts().RunStart();
diff --git a/services/histserv/app/Application.cxx b/services/histserv/app/Application.cxx
index 14c5ee7aee..39597c6832 100644
--- a/services/histserv/app/Application.cxx
+++ b/services/histserv/app/Application.cxx
@@ -24,6 +24,10 @@
 
 #include <boost/archive/binary_iarchive.hpp>
 #include <boost/iostreams/device/array.hpp>
+#ifdef BOOST_IOS_HAS_ZSTD
+#include <boost/iostreams/filter/zstd.hpp>
+#include <boost/iostreams/filtering_stream.hpp>
+#endif
 #include <boost/iostreams/stream.hpp>
 #include <boost/serialization/utility.hpp>
 #include <boost/serialization/vector.hpp>
@@ -174,10 +178,24 @@ bool Application::ReceiveData(zmq::message_t& msg)
   /// FIXME: Need something to replace the TObjArray which allowed to have a mix of of TH1x, TH2x, TH3x or TProfile
   b_io::basic_array_source<char> device(static_cast<char*>(msg.data()), msg.size());
   b_io::stream<b_io::basic_array_source<char>> s(device);
-  b_ar::binary_iarchive iarch(s);
 
   cbm::algo::qa::HistogramContainer vHist;
-  iarch >> vHist;
+  if (fOpt.CompressedInput()) {
+#ifdef BOOST_IOS_HAS_ZSTD
+    std::unique_ptr<b_io::filtering_istream> in_ = std::make_unique<b_io::filtering_istream>();
+    in_->push(b_io::zstd_decompressor());
+    in_->push(s);
+    std::unique_ptr<b_ar::binary_iarchive> iarchive_ = std::make_unique<b_ar::binary_iarchive>(*in_, b_ar::no_header);
+    *iarchive_ >> vHist;
+#else
+    throw std::runtime_error("Unsupported ZSTD decompression (boost) for histograms input channel");
+#endif
+  }
+  else {
+    b_ar::binary_iarchive iarch(s);
+    iarch >> vHist;
+  }
+
 
   /// copied from CbmTaskDigiEventQa::ToTH1D
   /// FIXME: Should be placed in a tools/interface/whatever library with all similar functions!!
@@ -307,10 +325,23 @@ bool Application::ReceiveHistoConfig(zmq::message_t& msg)
   //  BoostSerializer<std::pair<std::string, std::string>>().Deserialize(msg, tempObject);
   b_io::basic_array_source<char> device(static_cast<char*>(msg.data()), msg.size());
   b_io::stream<b_io::basic_array_source<char>> s(device);
-  b_ar::binary_iarchive iarch(s);
 
   std::pair<std::string, std::string> tempObject("", "");
-  iarch >> tempObject;
+  if (fOpt.CompressedInput()) {
+#ifdef BOOST_IOS_HAS_ZSTD
+    std::unique_ptr<b_io::filtering_istream> in_ = std::make_unique<b_io::filtering_istream>();
+    in_->push(b_io::zstd_decompressor());
+    in_->push(s);
+    std::unique_ptr<b_ar::binary_iarchive> iarchive_ = std::make_unique<b_ar::binary_iarchive>(*in_, b_ar::no_header);
+    *iarchive_ >> tempObject;
+#else
+    throw std::runtime_error("Unsupported ZSTD decompression (boost) for histograms config input channel");
+#endif
+  }
+  else {
+    b_ar::binary_iarchive iarch(s);
+    iarch >> tempObject;
+  }
 
   LOG(debug) << " Received configuration for histo " << tempObject.first << " : " << tempObject.second;
 
@@ -347,10 +378,23 @@ bool Application::ReceiveCanvasConfig(zmq::message_t& msg)
   //  BoostSerializer<std::pair<std::string, std::string>>().Deserialize(msg, tempObject);
   b_io::basic_array_source<char> device(static_cast<char*>(msg.data()), msg.size());
   b_io::stream<b_io::basic_array_source<char>> s(device);
-  b_ar::binary_iarchive iarch(s);
 
   std::pair<std::string, std::string> tempObject("", "");
-  iarch >> tempObject;
+  if (fOpt.CompressedInput()) {
+#ifdef BOOST_IOS_HAS_ZSTD
+    std::unique_ptr<b_io::filtering_istream> in_ = std::make_unique<b_io::filtering_istream>();
+    in_->push(b_io::zstd_decompressor());
+    in_->push(s);
+    std::unique_ptr<b_ar::binary_iarchive> iarchive_ = std::make_unique<b_ar::binary_iarchive>(*in_, b_ar::no_header);
+    *iarchive_ >> tempObject;
+#else
+    throw std::runtime_error("Unsupported ZSTD decompression (boost) for canvas config input channel");
+#endif
+  }
+  else {
+    b_ar::binary_iarchive iarch(s);
+    iarch >> tempObject;
+  }
 
   LOG(debug) << " Received configuration for canvas " << tempObject.first << " : " << tempObject.second;
 
@@ -393,10 +437,24 @@ bool Application::ReceiveConfigAndData(std::vector<zmq::message_t>& vMsg)
   // BoostSerializer<std::pair<uint32_t, uint32_t>>().Deserialize(vMsg.at(0), pairHeader);
   b_io::basic_array_source<char> device_header(static_cast<char*>(vMsg.at(0).data()), vMsg.at(0).size());
   b_io::stream<b_io::basic_array_source<char>> s_header(device_header);
-  b_ar::binary_iarchive iarch_header(s_header);
 
   std::pair<uint32_t, uint32_t> pairHeader;
-  iarch_header >> pairHeader;
+  if (fOpt.CompressedInput()) {
+#ifdef BOOST_IOS_HAS_ZSTD
+    std::unique_ptr<b_io::filtering_istream> in_ = std::make_unique<b_io::filtering_istream>();
+    in_->push(b_io::zstd_decompressor());
+    in_->push(s_header);
+    std::unique_ptr<b_ar::binary_iarchive> iarchive_ = std::make_unique<b_ar::binary_iarchive>(*in_, b_ar::no_header);
+    *iarchive_ >> pairHeader;
+#else
+    throw std::runtime_error("Unsupported ZSTD decompression (boost) for Config + Histos input channel");
+#endif
+  }
+  else {
+    b_ar::binary_iarchive iarch_header(s_header);
+    iarch_header >> pairHeader;
+  }
+
   LOG(debug) << "Application::ReceiveConfigAndData => Received configuration for " << pairHeader.first << " histos and "
              << pairHeader.second << " canvases";
 
diff --git a/services/histserv/app/ProgramOptions.cxx b/services/histserv/app/ProgramOptions.cxx
index a730a8fda9..e0c00f4b82 100644
--- a/services/histserv/app/ProgramOptions.cxx
+++ b/services/histserv/app/ProgramOptions.cxx
@@ -49,6 +49,10 @@ namespace cbm::services::histserv
                "allow to overwite an existing output file");
     config_add("hideguicmds", po::bool_switch(&fHideGuiCommands)->default_value(false),
                "allow to hides (disable) the GUI commands for Reset/Save/Close");
+#ifdef BOOST_IOS_HAS_ZSTD
+    config_add("compressed,c", po::bool_switch(&fCompressedInput)->default_value(false),
+               "enables ZSTD decompression of the input stream (compression needed in source!)");
+#endif
 
     // --- Allowed options
     po::options_description cmdline_options("Allowed options");
diff --git a/services/histserv/app/ProgramOptions.h b/services/histserv/app/ProgramOptions.h
index 8247c7513f..f55b4fbc12 100644
--- a/services/histserv/app/ProgramOptions.h
+++ b/services/histserv/app/ProgramOptions.h
@@ -52,6 +52,9 @@ namespace cbm::services::histserv
     /** @brief Get overwrite option **/
     [[nodiscard]] bool HideGuiCommands() const { return fHideGuiCommands; }
 
+    /** @brief Get compressed input option **/
+    [[nodiscard]] bool CompressedInput() const { return fCompressedInput; }
+
     // /** @brief Get configuration file name (YAML format) **/
     // [[nodiscard]] const std::string& ConfigFile() const { return fConfig; }
 
@@ -68,6 +71,7 @@ namespace cbm::services::histserv
     std::string fsHistoFileName  = "histos_dump.root";  ///< Output file name (ROOT format)
     bool fOverwrite              = false;               ///< Enable overwriting of existing output file
     bool fHideGuiCommands        = false;               ///< Hides (disables) the GUI commands for Reset/Save/Close
+    bool fCompressedInput        = false;               ///< Enables ZSTD stream decompression is available
     //std::string fConfig        = "";         ///< Configuration file name (YAML format)
   };
 
-- 
GitLab