From 9020f1b54932d65f27a105c0e148b282a89a3ee9 Mon Sep 17 00:00:00 2001 From: P-A Loizeau <p.-a.loizeau@gsi.de> Date: Thu, 12 Oct 2023 17:09:39 +0200 Subject: [PATCH] [algo] in EventBuildChain, add publication of Histos to server DISCLAIMERS: 1) emission happens on every iteration of the main loop over TS! => No memory between iterations to accumulate statistics => No limitation (time interval or nb iterations interval) on emission, lots of small TS will lead to lots of messages emission 2) ZMQ addressing hardcoded in header, would need to be added to binary arguments and maybe also to YAML config! --- algo/CMakeLists.txt | 2 + algo/evbuild/EventbuildChain.cxx | 85 ++++++++++++++++++++++++++++++- algo/evbuild/EventbuildChain.h | 19 ++++++- algo/params/EventbuildConfig.yaml | 1 + docs/histservs/HowTo.md | 17 ++++++- 5 files changed, 119 insertions(+), 5 deletions(-) diff --git a/algo/CMakeLists.txt b/algo/CMakeLists.txt index 7e7b4c6abb..6f425a623a 100644 --- a/algo/CMakeLists.txt +++ b/algo/CMakeLists.txt @@ -98,6 +98,8 @@ target_link_libraries(Algo external::fles_logging external::fles_ipc external::fles_monitoring + libzmq + cppzmq ) target_compile_definitions(Algo PUBLIC NO_ROOT) xpu_attach(Algo ${DEVICE_SRCS}) diff --git a/algo/evbuild/EventbuildChain.cxx b/algo/evbuild/EventbuildChain.cxx index 5b9df9737b..fe8cc5b599 100644 --- a/algo/evbuild/EventbuildChain.cxx +++ b/algo/evbuild/EventbuildChain.cxx @@ -6,15 +6,25 @@ #include "CbmDigiTimeslice.h" +#include <boost/archive/binary_oarchive.hpp> +#include <boost/iostreams/device/array.hpp> +#include <boost/iostreams/device/back_inserter.hpp> +#include <boost/iostreams/stream.hpp> +#include <boost/serialization/utility.hpp> +#include <boost/serialization/vector.hpp> + #include <sstream> #include <string> #include "DigiData.h" +#include "Histo1D.h" #include "evbuild/Config.h" using namespace cbm::algo; using namespace cbm::algo::evbuild; +namespace b_io = boost::iostreams; +namespace b_ar = boost::archive; // ----- Constructor ------------------------------------------------------ EventbuildChain::EventbuildChain(const Config& config) @@ -25,9 +35,53 @@ EventbuildChain::EventbuildChain(const Config& config) , fQa(DigiEventQaConfig(config.fBuilder, 10., 100)) { Status(); + + if ("" != fHistComChan) { + fZmqContext = new zmq::context_t(1); + fZmqSocket = new zmq::socket_t(*fZmqContext, ZMQ_PUSH); + fZmqSocket->connect(fHistComChan); // This side "connects" to socket => Other side should have "bind"!!!! + + /// FIXME: based on JdC question, decide whether config re-emitted on each iteration instead of only at startup? + /// => Header for multi-part message with Configuration + data + /// => Format: std::pair< Nb histogram configs, Nb canvas configs > + std::vector<std::pair<std::string, std::string>> histsCfg = fQa.GetConfig().GetHistosConfigs(); + std::vector<std::pair<std::string, std::string>> canvsCfg = fQa.GetConfig().GetCanvasConfigs(); + PrepareAndSendMsg(std::pair<uint32_t, uint32_t>(histsCfg.size(), canvsCfg.size()), zmq::send_flags::sndmore); + + /// => Histograms configuration = destination folder in http browser, mandatory but can be empty (= root folder) + /// => 1 ZMQ message per histogram (= 1 part) + /// => If no (new) histograms declared (e.g. new canvas declaration), has to be en empty message + `0` in the header + for (const auto& cfg : histsCfg) { + PrepareAndSendMsg(cfg, zmq::send_flags::sndmore); + } + + /// => Canvas configuration + /// => 1 ZMQ message per canvas (= 1 part) + /// => If no (new) canvas declared (e.g. only histos declaration), has to be en empty message + `0` in the header + for (const auto& cfg : canvsCfg) { + PrepareAndSendMsg(cfg, zmq::send_flags::sndmore); + } + + /// => (empty) Histograms serialization and emission to close multi-part message + PrepareAndSendMsg(std::vector<Histo1D> {}, zmq::send_flags::none); + } } // ---------------------------------------------------------------------------- +// ----- Constructor ------------------------------------------------------ +EventbuildChain::~EventbuildChain() +{ + if ("" != fHistComChan) { + if (fZmqSocket) { + fZmqSocket->close(); + delete fZmqSocket; + } + if (fZmqContext) { // + delete fZmqContext; + } + } +} +// ---------------------------------------------------------------------------- // ----- Run event building on a timeslice -------------------------------- EventbuildChain::ResultType EventbuildChain::Run(const DigiData& timeslice) @@ -42,8 +96,14 @@ EventbuildChain::ResultType EventbuildChain::Run(const DigiData& timeslice) // --- Perform event building auto [events, evbuildMon] = fBuilder(timeslice, triggers, fSelector); - // --- Run event QA - DigiEventQaData qaData = fQa(events); + /// => Histograms serialization and emission + if ("" != fHistComChan) { + // --- Run event QA + DigiEventQaData qaData = fQa(events); + + PrepareAndSendMsg(qaData.fVectHistos, zmq::send_flags::none); + L_(info) << "Published histograms, nb: " << qaData.fVectHistos.size(); + } // --- Some log L_(info) << "Triggers: " << triggers.size() << ", events " << events.size(); @@ -142,3 +202,24 @@ std::vector<double> EventbuildChain::GetDigiTimes(const DigiData& timeslice, ECb return result; } // ---------------------------------------------------------------------------- + + +// ----- Send a message to the histogram server ---------------------------- +template<class Object> +void EventbuildChain::PrepareAndSendMsg(const Object& obj, zmq::send_flags flags) +{ + /// Needed ressources (serializd string, boost inserter, boost stream, boost binary output archive) + std::string serial_str; + b_io::back_insert_device<std::string> inserter(serial_str); + b_io::stream<b_io::back_insert_device<std::string>> bstream(inserter); + b_ar::binary_oarchive oa(bstream); + + serial_str.clear(); + oa << obj; + bstream.flush(); + + zmq::message_t msg(serial_str.size()); + std::copy_n(static_cast<const char*>(serial_str.data()), msg.size(), static_cast<char*>(msg.data())); + fZmqSocket->send(msg, flags); +} +// ---------------------------------------------------------------------------- diff --git a/algo/evbuild/EventbuildChain.h b/algo/evbuild/EventbuildChain.h index 9b81a556ed..05ada402d4 100644 --- a/algo/evbuild/EventbuildChain.h +++ b/algo/evbuild/EventbuildChain.h @@ -9,12 +9,13 @@ #include "TimeClusterTrigger.h" +#include <zmq.hpp> + #include "DigiEventQa.h" #include "DigiEventSelector.h" #include "EventBuilder.h" #include "SubChain.h" - namespace cbm::algo { class DigiData; @@ -48,6 +49,9 @@ namespace cbm::algo::evbuild /** @brief Constructor **/ EventbuildChain(const Config& config); + /** @brief Destructor **/ + ~EventbuildChain(); + /** @brief Execution **/ ResultType Run(const DigiData&); @@ -62,6 +66,12 @@ namespace cbm::algo::evbuild DigiEventSelector fSelector; ///< Event selector algorithm DigiEventQa fQa; ///< Event QA algorithm + /// FIXME: decide if the address and port of the histogram server should come from command line or YAML + //std::string fHistComChan = ""; + std::string fHistComChan = "tcp://127.0.0.1:56800"; + zmq::context_t* fZmqContext; ///< ZMQ context FIXME: should be only one context per binary! + zmq::socket_t* fZmqSocket; ///< ZMQ socket to histogram server + private: // methods /** @brief Extract digi times from CbmDigiTimeslice @@ -69,6 +79,13 @@ namespace cbm::algo::evbuild ** @return Vector of digi times for the specified system **/ std::vector<double> GetDigiTimes(const DigiData& timeslice, ECbmModuleId system); + + /** @brief Serialize object and send it to the histogram server + ** @param obj: object to be serialized in the message, e.g. config pairs of strings or Histo1D + ** @param flags: or'ed values from zmq::send_flags, typ. zmq::send_flags::sndmore to indicate multi-parts message + **/ + template<class Object> + void PrepareAndSendMsg(const Object& obj, zmq::send_flags flags); }; } // namespace cbm::algo::evbuild diff --git a/algo/params/EventbuildConfig.yaml b/algo/params/EventbuildConfig.yaml index 3a37fbccd8..ba8faab96b 100644 --- a/algo/params/EventbuildConfig.yaml +++ b/algo/params/EventbuildConfig.yaml @@ -12,6 +12,7 @@ eventbuilder: TRd2D: [-100, 350] TOF: [-10, 70] RICH: [-20, 120] + PSD: [-50, 150] FSD: [-50, 150] selector: minDigis: diff --git a/docs/histservs/HowTo.md b/docs/histservs/HowTo.md index eb096f3451..4fd68698d8 100644 --- a/docs/histservs/HowTo.md +++ b/docs/histservs/HowTo.md @@ -118,6 +118,8 @@ Tested on cbmfles01 in folder `/scratch/loizeau/cbmroot/` ### Adding histogram source client features to a binary `XYZ` +For an example of this, please have a look at `algo/qa/DigiEventQa` and `algo/evbuild/EventbuildChain` + 1. Copy the [`Application::PrepareAndSendMsg`](services/histserv/tester/Application.cxx#L146) method from the tester binary 1. Copy the `output` argument + related code from the tester binary @@ -132,5 +134,16 @@ Tested on cbmfles01 in folder `/scratch/loizeau/cbmroot/` **Typically to reduce traffic and the cost of the publication, this emission will have a lower frequency than the execution of the main loop. For example one could use std::chrono times to emit only every `> n seconds` or emit every `N iterations` (or a combination of both as in the FairMQ device implementations).** \ - This timed behavior is not present in the tester binary (04/10/2023) as it was meant only as a proof of concept for - the protocol itself. + This timed behavior is present neither in the tester binary (04/10/2023) as it was meant only as a proof of concept + for the protocol itself nor in the `DigiEventQa` example as its logic creates new histograms on each iteration. + +### Usage with the `cbmreco` binary + +1. Controlling the histogram emission behavior in `cbmreco` binary \ + Comment/Uncomment one of the two lines defining `fHistComChan` in `algo/evbuild/EventbuildChain.h` then recompile.\ + => An empty string disables the histogram emission +2. Start the server exactly as in the tested app example +3. Start `cbmreco` as usual (no CLI option/parameter for this feature for now) + ``` + ./cbmreco -p <path to algo params> -i <file://path-to-tsa-file or tcp://url-of-ts-server> --steps Unpack DigiTrigger Localreco + ``` -- GitLab