From 8acc53dcae05f12ec3494159baf67632a21e8a04 Mon Sep 17 00:00:00 2001 From: "P.-A. Loizeau" <p.-a.loizeau@gsi.de> Date: Fri, 14 Jul 2023 18:42:57 +0200 Subject: [PATCH] Add tester client app for histserv_nofairmq --- services/histserv/CMakeLists.txt | 1 + services/histserv/Roadmap.md | 9 ++ services/histserv/tester/Application.cxx | 163 ++++++++++++++++++++ services/histserv/tester/Application.h | 49 ++++++ services/histserv/tester/CMakeLists.txt | 45 ++++++ services/histserv/tester/ProgramOptions.cxx | 63 ++++++++ services/histserv/tester/ProgramOptions.h | 58 +++++++ services/histserv/tester/main.cxx | 26 ++++ 8 files changed, 414 insertions(+) create mode 100644 services/histserv/tester/Application.cxx create mode 100644 services/histserv/tester/Application.h create mode 100644 services/histserv/tester/CMakeLists.txt create mode 100644 services/histserv/tester/ProgramOptions.cxx create mode 100644 services/histserv/tester/ProgramOptions.h create mode 100644 services/histserv/tester/main.cxx diff --git a/services/histserv/CMakeLists.txt b/services/histserv/CMakeLists.txt index 9bd62df144..9be8e800ea 100644 --- a/services/histserv/CMakeLists.txt +++ b/services/histserv/CMakeLists.txt @@ -1,2 +1,3 @@ add_subdirectory(app) add_subdirectory(lib) +add_subdirectory(tester) diff --git a/services/histserv/Roadmap.md b/services/histserv/Roadmap.md index 70b4731337..83dafbe9be 100644 --- a/services/histserv/Roadmap.md +++ b/services/histserv/Roadmap.md @@ -24,3 +24,12 @@ Histo consummer: histserv - Config deserialization: boost? - Histos deserialization: boost? - Message processing: parts + calls to deserialization methods for Config and histo depending on size + + +TODO: +1) Deserialization of configs in histserv +2) Creation of configs in Tester +3) Serialization of configs in Tester +4) Multi-part message in Tester +5) Test +6) 2-4 in binary diff --git a/services/histserv/tester/Application.cxx b/services/histserv/tester/Application.cxx new file mode 100644 index 0000000000..9c2b9fbdbc --- /dev/null +++ b/services/histserv/tester/Application.cxx @@ -0,0 +1,163 @@ +/* Copyright (C) 2023 Facility for Antiproton and Ion Research in Europe, Darmstadt + SPDX-License-Identifier: GPL-3.0-only + Authors: Pierre-Alain Loizeau [committer] */ + +#include "Application.h" + +#include "CbmFlesCanvasTools.h" + +#include <Logger.h> + +#include <boost/archive/binary_oarchive.hpp> +#include <boost/iostreams/device/array.hpp> +#include <boost/iostreams/device/back_inserter.hpp> +#include <boost/iostreams/stream.hpp> +#include <boost/serialization/utility.hpp> +#include <boost/serialization/vector.hpp> + +#include <thread> + +#include "Histo1D.h" +#include "ui_callbacks.h" + +std::mutex mtx; + +namespace b_io = boost::iostreams; +namespace b_ar = boost::archive; +using sctp = std::chrono::time_point<std::chrono::system_clock>; +using scsc = std::chrono::system_clock; + +namespace cbm::services::histserv_tester +{ + + // ----- Constructor --------------------------------------------------------------------------------------------- + Application::Application(ProgramOptions const& opt) : fOpt(opt) + { + /// Read options from executable + LOG(info) << "Options for Application."; + LOG(info) << " Output ZMQ channel: " << fOpt.ComChan(); + LOG(info) << " Run time duration: " << fOpt.Runtime() << " s"; + + /// FIXME: SOMETHING_To_Replace_FairMQ!!!!!!!!!!!!! + /// FIXME: Initialize communication channels of SOMETHING_To_Replace_FairMQ + /// FIXME: Link channel to method in order to process received messages + // fZmqSocket.set(zmq::sockopt::rcvhwm, int(hwm)); // FIXME: need for HWM? + fZmqSocket.connect(fOpt.ComChan().c_str()); // This side "connects" to socket => Other side should have "bind"!!!! + } + // ------------------------------------------------------------------------------------------------------------------- + + + // ----- Main Loop ----------------------------------------------------------------------------------------------- + void Application::Exec() + { + const std::chrono::milliseconds interval {250}; + const std::chrono::seconds runtime(fOpt.Runtime()); + const std::chrono::seconds pubint(fOpt.PubInterval()); + sctp startTime = scsc::now(); + sctp stopTime = startTime + runtime; + sctp lastPubTime = startTime; + + std::vector<cbm::algo::Histo1D> vHist; + vHist.emplace_back((fOpt.Runtime() + 2), -1.0, fOpt.Runtime() + 1.0, "testHist", + "Tester source; Runtime [s]; Entries to histo [iterations]"); + vHist.emplace_back(1001, -0.025, 50.025, "transHist", + "Tester histos transmission time; Trans. time [ms]; Messages []"); + + /// Initial emission, including generation and serialization of configs + /// => Try to evaluate "time cost" of the histo transmission, including serialization + sctp transStartTime = scsc::now(); + + /// => Header for multi-part message with Configuration + data + /// => Format: std::pair< Nb histogram configs, Nb canvas configs > + std::pair<uint32_t, uint32_t> pHeader(vHist.size(), 1); + PrepareAndSendMsg(pHeader, zmq::send_flags::sndmore); + + /// => Histograms configuration = destination folder in http browser, mandatory but can be empty (= root folder) + /// => 1 ZMQ message per histogram (= 1 part) + /// => If no (new) histograms declared (e.g. new canvas declaration), has to be en empty message + `0` in the header + std::pair<std::string, std::string> pFirstHist(vHist[0].Name(), "TestFolder"); + std::pair<std::string, std::string> pSecondHist(vHist[1].Name(), "TestFolder/TestSubFolder"); + PrepareAndSendMsg(pFirstHist, zmq::send_flags::sndmore); + PrepareAndSendMsg(pSecondHist, zmq::send_flags::sndmore); + + /// => Canvas configuration = in old code extracted from canvas object, here need to be made by hand + /// => 1 ZMQ message per canvas (= 1 part) + /// => If no (new) canvas declared (e.g. only histos declaration), has to be en empty message + `0` in the header + /// => Format is "CanvasName;Canvas Title;NbPadX(U);NbPadY(U);ConfigPad1(s);....;ConfigPadXY(s)" + /// => Format of Pad config is + /// "GrixX(b),GridY(b),LogX(b),LogY(b),LogZ(b),(HistoName1,DrawOptions1),...,(HistoNameZ,DrawOptionsZ)" + /// => See core/base/utils/fles/CbmFlesCanvasTools for the full code, especially GenerateCanvasConfigString + /// (CanvasConfig class not used here as library loading would need binding to ROOT library) + std::pair<std::string, std::string> pCanvConfig("TestCanvas", "TestCanvas;Canvas configs testing;2;2;"); + pCanvConfig.second += "0,0,0,0,0,(" + vHist[0].Name() + ",hist);"; // Pad 0 + pCanvConfig.second += "1,1,0,0,0,(" + vHist[0].Name() + ",hist);"; // Pad 1 + pCanvConfig.second += "1,1,1,0,0,(" + vHist[1].Name() + ",hist);"; // Pad 2 + pCanvConfig.second += "1,1,1,1,0,(" + vHist[1].Name() + ",hist),(" + vHist[0].Name() + ",hist same);"; // Pad 3 + PrepareAndSendMsg(pCanvConfig, zmq::send_flags::sndmore); + + /// => (empty) Histograms serialization and emission + PrepareAndSendMsg(vHist, zmq::send_flags::none); + lastPubTime = scsc::now(); + + /// No general references as member/variable bec. simple example, use directly hardcoded vector "array access" + vHist[1].Add(std::chrono::duration_cast<std::chrono::microseconds>(lastPubTime - transStartTime).count() / 1e3); + + while (scsc::now() < stopTime) { // + /// No general references as member/variable bec. simple example, use directly hardcoded vector "array access" + vHist[0].Add(std::chrono::duration_cast<std::chrono::milliseconds>(scsc::now() - startTime).count() / 1e3); + + if (pubint < scsc::now() - lastPubTime) { + LOG(info) << "Pub-hist: " << scsc::now().time_since_epoch().count() << " " << vHist[0].NumEntries(); + /// Try to evaluate "time cost" of the histo transmission, including serialization + transStartTime = scsc::now(); + + /// => Histograms serialization and emission + PrepareAndSendMsg(vHist, zmq::send_flags::none); + + /// => Reset all histograms as they are added on the other end! + for (std::vector<cbm::algo::Histo1D>::iterator itHist = vHist.begin(); itHist != vHist.end(); ++itHist) { + + itHist->Clear(); + } + lastPubTime = scsc::now(); + + /// No general references as member/variable bec. simple example, use directly hardcoded vector "array access" + vHist[1].Add(std::chrono::duration_cast<std::chrono::microseconds>(lastPubTime - transStartTime).count() / 1e3); + } + + std::this_thread::sleep_for(interval); + LOG(info) << "test: " << (scsc::now() < stopTime) << " (" << scsc::now().time_since_epoch().count() << " vs " + << stopTime.time_since_epoch().count() << ") => " + << (std::chrono::duration_cast<std::chrono::milliseconds>(scsc::now() - startTime).count() / 1e3); + } + + /// Final publications + LOG(info) << "Pub-hist: " << scsc::now().time_since_epoch().count() << " " << vHist[0].NumEntries(); + /// => Histograms serialization and emission + PrepareAndSendMsg(vHist, zmq::send_flags::none); + /// => Reset all histograms as they are added on the other end! + for (std::vector<cbm::algo::Histo1D>::iterator itHist = vHist.begin(); itHist != vHist.end(); ++itHist) { + itHist->Clear(); + } + } + // ------------------------------------------------------------------------------------------------------------------- + + template<class Object> + void Application::PrepareAndSendMsg(Object& obj, zmq::send_flags flags) + { + /// Needed ressources (serializd string, boost inserter, boost stream, boost binary output archive) + std::string serial_str; + b_io::back_insert_device<std::string> inserter(serial_str); + b_io::stream<b_io::back_insert_device<std::string>> bstream(inserter); + b_ar::binary_oarchive oa(bstream); + + serial_str.clear(); + oa << obj; + bstream.flush(); + + zmq::message_t msg(serial_str.size()); + std::copy_n(static_cast<const char*>(serial_str.data()), msg.size(), static_cast<char*>(msg.data())); + fZmqSocket.send(msg, flags); + } + +} // namespace cbm::services::histserv_tester diff --git a/services/histserv/tester/Application.h b/services/histserv/tester/Application.h new file mode 100644 index 0000000000..6188785213 --- /dev/null +++ b/services/histserv/tester/Application.h @@ -0,0 +1,49 @@ +/* Copyright (C) 2023 Facility for Antiproton and Ion Research in Europe, Darmstadt + SPDX-License-Identifier: GPL-3.0-only + Authors: Pierre-Alain Loizeau [committer] */ + +#ifndef CBM_SERVICES_HISTSERV_TESTER_APPLICATION_H +#define CBM_SERVICES_HISTSERV_TESTER_APPLICATION_H 1 + +#include <zmq.hpp> + +#include "ProgramOptions.h" + +namespace cbm::services::histserv_tester +{ + + class Application { + + + public: + /** @brief Standard constructor, initialises the application + ** @param opt **/ + explicit Application(ProgramOptions const& opt); + + /** @brief Copy constructor forbidden **/ + Application(const Application&) = delete; + + /** @brief Assignment operator forbidden **/ + void operator=(const Application&) = delete; + + /** @brief Destructor **/ + ~Application() = default; + + /** @brief Run the application **/ + void Exec(); + + private: + template<class Object> + void PrepareAndSendMsg(Object& obj, zmq::send_flags flags); + + private: + ProgramOptions const& fOpt; ///< Program options object + + /// Interface + zmq::context_t fZmqContext {1}; + zmq::socket_t fZmqSocket {fZmqContext, ZMQ_PUSH}; + }; + +} // namespace cbm::services::histserv_tester + +#endif /* CBM_SERVICES_HISTSERV_TESTER_APPLICATION_H */ diff --git a/services/histserv/tester/CMakeLists.txt b/services/histserv/tester/CMakeLists.txt new file mode 100644 index 0000000000..474297514c --- /dev/null +++ b/services/histserv/tester/CMakeLists.txt @@ -0,0 +1,45 @@ +# CMakeList file for binary histserv_nofairmq +# P.-A. Loizeau, 14 July 2023 + +set(INCLUDE_DIRECTORIES + ${INCLUDE_DIRECTORIES} + ${CMAKE_CURRENT_SOURCE_DIR} + ${CBMROOT_SOURCE_DIR}/algo/qa/ + ) + +set(SRCS + Application.cxx + ProgramOptions.cxx + main.cxx + ) + +set(HEADERS + Application.h + ProgramOptions.h + ) + +set(INCLUDE_DIRECTORIES + ${INCLUDE_DIRECTORIES} + ${CBMROOT_SOURCE_DIR} + ) + +add_executable(histserv_nofairmq_tester ${SRCS} ${HEADERS}) + +target_link_libraries(histserv_nofairmq_tester + PUBLIC + Algo + CbmFlibFlesTools + CbmServicesHistServ + PRIVATE + Boost::program_options + FairLogger::FairLogger + ROOT::Core + ROOT::Gpad + ROOT::Hist + ROOT::RIO + ROOT::RHTTP + libzmq + cppzmq + ) + +install(TARGETS histserv_nofairmq_tester DESTINATION bin) diff --git a/services/histserv/tester/ProgramOptions.cxx b/services/histserv/tester/ProgramOptions.cxx new file mode 100644 index 0000000000..8843997193 --- /dev/null +++ b/services/histserv/tester/ProgramOptions.cxx @@ -0,0 +1,63 @@ +/* Copyright (C) 2023 Facility for Antiproton and Ion Research in Europe, Darmstadt + SPDX-License-Identifier: GPL-3.0-only + Authors: Pierre-Alain Loizeau [committer] */ + +#include "ProgramOptions.h" + +#include <Logger.h> + +#include <boost/program_options.hpp> + +#include <iostream> + +namespace po = boost::program_options; + +using std::string; + +namespace cbm::services::histserv_tester +{ + + // ----- Parse command line --------------------------------------------- + void ProgramOptions::ParseOptions(int argc, char* argv[]) + { + + // --- Define generic options + po::options_description generic(" Generic options"); + auto generic_add = generic.add_options(); + generic_add("help,h", "display this help and exit"); + + // --- Define configuration options: Mandatory + po::options_description config_req(" Configuration (required)"); + auto config_req_add = config_req.add_options(); + config_req_add("output,o", po::value<string>(&fsChanHistosIn)->required()->value_name("<protocol://xxxxxx>"), + "name or host:port or whatever is needed for output channel (histos/canvases config and data), " + "cf http://api.zeromq.org/2-1:zmq-connect"); + + // --- Define configuration options: Optional + po::options_description config(" Configuration (optional)"); + auto config_add = config.add_options(); + config_add("runtime,r", po::value<int64_t>(&fRunTime)->default_value(90), + "duration of test run in seconds ~ x4 entries in test histogram "); + config_add("pubint,p", po::value<int64_t>(&fPubInterval)->default_value(5), + "publication interval in seconds (accumulate statistics between message emissions)"); + + // --- Allowed options + po::options_description cmdline_options("Allowed options"); + cmdline_options.add(generic).add(config_req).add(config); + + // --- Parse command line + po::variables_map vars; + po::store(po::parse_command_line(argc, argv, cmdline_options), vars); + + // --- Help: print help information and exit program + if (vars.count("help") != 0u) { + std::cout << cmdline_options << std::endl; + exit(EXIT_SUCCESS); + } + + // --- Run notify after processing the help to avoid it being blocked by missing arguments + po::notify(vars); + } + // -------------------------------------------------------------------------- + +} // namespace cbm::services::histserv_tester diff --git a/services/histserv/tester/ProgramOptions.h b/services/histserv/tester/ProgramOptions.h new file mode 100644 index 0000000000..4bcd5478bb --- /dev/null +++ b/services/histserv/tester/ProgramOptions.h @@ -0,0 +1,58 @@ +/* Copyright (C) 2023 Facility for Antiproton and Ion Research in Europe, Darmstadt + SPDX-License-Identifier: GPL-3.0-only + Authors: Pierre-Alain Loizeau [committer] */ + +#ifndef CBM_SERVICES_HISTSERV_TESTER_PROGRAMOPTIONS_H +#define CBM_SERVICES_HISTSERV_TESTER_PROGRAMOPTIONS_H 1 + +#include <string> + +namespace cbm::services::histserv_tester +{ + + /** @class ProgramOptions + ** @author Pierre-Alain Loizeau <p.-a.loizeau@gsi.de> + ** @since 26 June 2023 + ** + ** Program option class for the application histserv_nofairmq + **/ + class ProgramOptions { + public: + /** @brief Standard constructor with command line arguments **/ + ProgramOptions(int argc, char* argv[]) { ParseOptions(argc, argv); } + + /** @brief Copy constructor forbidden **/ + ProgramOptions(const ProgramOptions&) = default; + + /** @brief Assignment operator forbidden **/ + ProgramOptions& operator=(const ProgramOptions&) = default; + + /** @brief Destructor **/ + ~ProgramOptions() = default; + + /** @brief Get interface channel name or hostname + port or whatever or ????? (FIXME: replacement of FairMQ) **/ + [[nodiscard]] const std::string& ComChan() const { return fsChanHistosIn; } + + /** @brief Get run duration **/ + [[nodiscard]] const int64_t& Runtime() const { return fRunTime; } + + /** @brief Get histos publication interval **/ + [[nodiscard]] const int64_t& PubInterval() const { return fPubInterval; } + + // /** @brief Get configuration file name (YAML format) **/ + // [[nodiscard]] const std::string& ConfigFile() const { return fConfig; } + + private: + /** @brief Parse command line arguments using boost program_options **/ + void ParseOptions(int argc, char* argv[]); + + + private: // members + std::string fsChanHistosIn = "histogram-in"; + int64_t fRunTime = 90; + int64_t fPubInterval = 5; + }; + +} // namespace cbm::services::histserv_tester + +#endif /* CBM_SERVICES_HISTSERV_TESTER_PROGRAMOPTIONS_H */ diff --git a/services/histserv/tester/main.cxx b/services/histserv/tester/main.cxx new file mode 100644 index 0000000000..cbb73ebc23 --- /dev/null +++ b/services/histserv/tester/main.cxx @@ -0,0 +1,26 @@ +/* Copyright (C) 2023 Facility for Antiproton and Ion Research in Europe, Darmstadt + SPDX-License-Identifier: GPL-3.0-only + Authors: Pierre-Alain Loizeau [committer] */ +#include <Logger.h> + +#include "Application.h" +#include "ProgramOptions.h" + +using namespace cbm::services::histserv_tester; + +int main(int argc, char* argv[]) +{ + LOG(info) << "***** Tester client for Histogram server without FairMQ *****"; + try { + ProgramOptions opt(argc, argv); + Application app(opt); + app.Exec(); + } + catch (std::exception const& e) { + LOG(error) << e.what() << "; terminating."; + return EXIT_FAILURE; + } + + LOG(info) << "Histogram server without FairMQ: Tester client Program completed successfully; exiting."; + return EXIT_SUCCESS; +} -- GitLab