Skip to content
Snippets Groups Projects
Commit 9020f1b5 authored by Pierre-Alain Loizeau's avatar Pierre-Alain Loizeau Committed by Volker Friese
Browse files

[algo] in EventBuildChain, add publication of Histos to server

DISCLAIMERS:
1) emission happens on every iteration of the main loop over TS!
=> No memory between iterations to accumulate statistics
=> No limitation (time interval or nb iterations interval) on emission, lots of small TS will lead to lots of messages emission
2) ZMQ addressing hardcoded in header, would need to be added to binary arguments and maybe also to YAML config!
parent 4807efde
No related branches found
No related tags found
1 merge request!1212Add services main folder + histogram server binary w/o FairMQ + tester binary example
Pipeline #24845 passed
...@@ -98,6 +98,8 @@ target_link_libraries(Algo ...@@ -98,6 +98,8 @@ target_link_libraries(Algo
external::fles_logging external::fles_logging
external::fles_ipc external::fles_ipc
external::fles_monitoring external::fles_monitoring
libzmq
cppzmq
) )
target_compile_definitions(Algo PUBLIC NO_ROOT) target_compile_definitions(Algo PUBLIC NO_ROOT)
xpu_attach(Algo ${DEVICE_SRCS}) xpu_attach(Algo ${DEVICE_SRCS})
......
...@@ -6,15 +6,25 @@ ...@@ -6,15 +6,25 @@
#include "CbmDigiTimeslice.h" #include "CbmDigiTimeslice.h"
#include <boost/archive/binary_oarchive.hpp>
#include <boost/iostreams/device/array.hpp>
#include <boost/iostreams/device/back_inserter.hpp>
#include <boost/iostreams/stream.hpp>
#include <boost/serialization/utility.hpp>
#include <boost/serialization/vector.hpp>
#include <sstream> #include <sstream>
#include <string> #include <string>
#include "DigiData.h" #include "DigiData.h"
#include "Histo1D.h"
#include "evbuild/Config.h" #include "evbuild/Config.h"
using namespace cbm::algo; using namespace cbm::algo;
using namespace cbm::algo::evbuild; using namespace cbm::algo::evbuild;
namespace b_io = boost::iostreams;
namespace b_ar = boost::archive;
// ----- Constructor ------------------------------------------------------ // ----- Constructor ------------------------------------------------------
EventbuildChain::EventbuildChain(const Config& config) EventbuildChain::EventbuildChain(const Config& config)
...@@ -25,9 +35,53 @@ EventbuildChain::EventbuildChain(const Config& config) ...@@ -25,9 +35,53 @@ EventbuildChain::EventbuildChain(const Config& config)
, fQa(DigiEventQaConfig(config.fBuilder, 10., 100)) , fQa(DigiEventQaConfig(config.fBuilder, 10., 100))
{ {
Status(); Status();
if ("" != fHistComChan) {
fZmqContext = new zmq::context_t(1);
fZmqSocket = new zmq::socket_t(*fZmqContext, ZMQ_PUSH);
fZmqSocket->connect(fHistComChan); // This side "connects" to socket => Other side should have "bind"!!!!
/// FIXME: based on JdC question, decide whether config re-emitted on each iteration instead of only at startup?
/// => Header for multi-part message with Configuration + data
/// => Format: std::pair< Nb histogram configs, Nb canvas configs >
std::vector<std::pair<std::string, std::string>> histsCfg = fQa.GetConfig().GetHistosConfigs();
std::vector<std::pair<std::string, std::string>> canvsCfg = fQa.GetConfig().GetCanvasConfigs();
PrepareAndSendMsg(std::pair<uint32_t, uint32_t>(histsCfg.size(), canvsCfg.size()), zmq::send_flags::sndmore);
/// => Histograms configuration = destination folder in http browser, mandatory but can be empty (= root folder)
/// => 1 ZMQ message per histogram (= 1 part)
/// => If no (new) histograms declared (e.g. new canvas declaration), has to be en empty message + `0` in the header
for (const auto& cfg : histsCfg) {
PrepareAndSendMsg(cfg, zmq::send_flags::sndmore);
}
/// => Canvas configuration
/// => 1 ZMQ message per canvas (= 1 part)
/// => If no (new) canvas declared (e.g. only histos declaration), has to be en empty message + `0` in the header
for (const auto& cfg : canvsCfg) {
PrepareAndSendMsg(cfg, zmq::send_flags::sndmore);
}
/// => (empty) Histograms serialization and emission to close multi-part message
PrepareAndSendMsg(std::vector<Histo1D> {}, zmq::send_flags::none);
}
} }
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
// ----- Constructor ------------------------------------------------------
EventbuildChain::~EventbuildChain()
{
if ("" != fHistComChan) {
if (fZmqSocket) {
fZmqSocket->close();
delete fZmqSocket;
}
if (fZmqContext) { //
delete fZmqContext;
}
}
}
// ----------------------------------------------------------------------------
// ----- Run event building on a timeslice -------------------------------- // ----- Run event building on a timeslice --------------------------------
EventbuildChain::ResultType EventbuildChain::Run(const DigiData& timeslice) EventbuildChain::ResultType EventbuildChain::Run(const DigiData& timeslice)
...@@ -42,8 +96,14 @@ EventbuildChain::ResultType EventbuildChain::Run(const DigiData& timeslice) ...@@ -42,8 +96,14 @@ EventbuildChain::ResultType EventbuildChain::Run(const DigiData& timeslice)
// --- Perform event building // --- Perform event building
auto [events, evbuildMon] = fBuilder(timeslice, triggers, fSelector); auto [events, evbuildMon] = fBuilder(timeslice, triggers, fSelector);
// --- Run event QA /// => Histograms serialization and emission
DigiEventQaData qaData = fQa(events); if ("" != fHistComChan) {
// --- Run event QA
DigiEventQaData qaData = fQa(events);
PrepareAndSendMsg(qaData.fVectHistos, zmq::send_flags::none);
L_(info) << "Published histograms, nb: " << qaData.fVectHistos.size();
}
// --- Some log // --- Some log
L_(info) << "Triggers: " << triggers.size() << ", events " << events.size(); L_(info) << "Triggers: " << triggers.size() << ", events " << events.size();
...@@ -142,3 +202,24 @@ std::vector<double> EventbuildChain::GetDigiTimes(const DigiData& timeslice, ECb ...@@ -142,3 +202,24 @@ std::vector<double> EventbuildChain::GetDigiTimes(const DigiData& timeslice, ECb
return result; return result;
} }
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
// ----- Send a message to the histogram server ----------------------------
template<class Object>
void EventbuildChain::PrepareAndSendMsg(const Object& obj, zmq::send_flags flags)
{
/// Needed ressources (serializd string, boost inserter, boost stream, boost binary output archive)
std::string serial_str;
b_io::back_insert_device<std::string> inserter(serial_str);
b_io::stream<b_io::back_insert_device<std::string>> bstream(inserter);
b_ar::binary_oarchive oa(bstream);
serial_str.clear();
oa << obj;
bstream.flush();
zmq::message_t msg(serial_str.size());
std::copy_n(static_cast<const char*>(serial_str.data()), msg.size(), static_cast<char*>(msg.data()));
fZmqSocket->send(msg, flags);
}
// ----------------------------------------------------------------------------
...@@ -9,12 +9,13 @@ ...@@ -9,12 +9,13 @@
#include "TimeClusterTrigger.h" #include "TimeClusterTrigger.h"
#include <zmq.hpp>
#include "DigiEventQa.h" #include "DigiEventQa.h"
#include "DigiEventSelector.h" #include "DigiEventSelector.h"
#include "EventBuilder.h" #include "EventBuilder.h"
#include "SubChain.h" #include "SubChain.h"
namespace cbm::algo namespace cbm::algo
{ {
class DigiData; class DigiData;
...@@ -48,6 +49,9 @@ namespace cbm::algo::evbuild ...@@ -48,6 +49,9 @@ namespace cbm::algo::evbuild
/** @brief Constructor **/ /** @brief Constructor **/
EventbuildChain(const Config& config); EventbuildChain(const Config& config);
/** @brief Destructor **/
~EventbuildChain();
/** @brief Execution **/ /** @brief Execution **/
ResultType Run(const DigiData&); ResultType Run(const DigiData&);
...@@ -62,6 +66,12 @@ namespace cbm::algo::evbuild ...@@ -62,6 +66,12 @@ namespace cbm::algo::evbuild
DigiEventSelector fSelector; ///< Event selector algorithm DigiEventSelector fSelector; ///< Event selector algorithm
DigiEventQa fQa; ///< Event QA algorithm DigiEventQa fQa; ///< Event QA algorithm
/// FIXME: decide if the address and port of the histogram server should come from command line or YAML
//std::string fHistComChan = "";
std::string fHistComChan = "tcp://127.0.0.1:56800";
zmq::context_t* fZmqContext; ///< ZMQ context FIXME: should be only one context per binary!
zmq::socket_t* fZmqSocket; ///< ZMQ socket to histogram server
private: // methods private: // methods
/** @brief Extract digi times from CbmDigiTimeslice /** @brief Extract digi times from CbmDigiTimeslice
...@@ -69,6 +79,13 @@ namespace cbm::algo::evbuild ...@@ -69,6 +79,13 @@ namespace cbm::algo::evbuild
** @return Vector of digi times for the specified system ** @return Vector of digi times for the specified system
**/ **/
std::vector<double> GetDigiTimes(const DigiData& timeslice, ECbmModuleId system); std::vector<double> GetDigiTimes(const DigiData& timeslice, ECbmModuleId system);
/** @brief Serialize object and send it to the histogram server
** @param obj: object to be serialized in the message, e.g. config pairs of strings or Histo1D
** @param flags: or'ed values from zmq::send_flags, typ. zmq::send_flags::sndmore to indicate multi-parts message
**/
template<class Object>
void PrepareAndSendMsg(const Object& obj, zmq::send_flags flags);
}; };
} // namespace cbm::algo::evbuild } // namespace cbm::algo::evbuild
......
...@@ -12,6 +12,7 @@ eventbuilder: ...@@ -12,6 +12,7 @@ eventbuilder:
TRd2D: [-100, 350] TRd2D: [-100, 350]
TOF: [-10, 70] TOF: [-10, 70]
RICH: [-20, 120] RICH: [-20, 120]
PSD: [-50, 150]
FSD: [-50, 150] FSD: [-50, 150]
selector: selector:
minDigis: minDigis:
......
...@@ -118,6 +118,8 @@ Tested on cbmfles01 in folder `/scratch/loizeau/cbmroot/` ...@@ -118,6 +118,8 @@ Tested on cbmfles01 in folder `/scratch/loizeau/cbmroot/`
### Adding histogram source client features to a binary `XYZ` ### Adding histogram source client features to a binary `XYZ`
For an example of this, please have a look at `algo/qa/DigiEventQa` and `algo/evbuild/EventbuildChain`
1. Copy the [`Application::PrepareAndSendMsg`](services/histserv/tester/Application.cxx#L146) method from the tester 1. Copy the [`Application::PrepareAndSendMsg`](services/histserv/tester/Application.cxx#L146) method from the tester
binary binary
1. Copy the `output` argument + related code from the tester binary 1. Copy the `output` argument + related code from the tester binary
...@@ -132,5 +134,16 @@ Tested on cbmfles01 in folder `/scratch/loizeau/cbmroot/` ...@@ -132,5 +134,16 @@ Tested on cbmfles01 in folder `/scratch/loizeau/cbmroot/`
**Typically to reduce traffic and the cost of the publication, this emission will have a lower frequency than the **Typically to reduce traffic and the cost of the publication, this emission will have a lower frequency than the
execution of the main loop. For example one could use std::chrono times to emit only every `> n seconds` or emit execution of the main loop. For example one could use std::chrono times to emit only every `> n seconds` or emit
every `N iterations` (or a combination of both as in the FairMQ device implementations).** \ every `N iterations` (or a combination of both as in the FairMQ device implementations).** \
This timed behavior is not present in the tester binary (04/10/2023) as it was meant only as a proof of concept for This timed behavior is present neither in the tester binary (04/10/2023) as it was meant only as a proof of concept
the protocol itself. for the protocol itself nor in the `DigiEventQa` example as its logic creates new histograms on each iteration.
### Usage with the `cbmreco` binary
1. Controlling the histogram emission behavior in `cbmreco` binary \
Comment/Uncomment one of the two lines defining `fHistComChan` in `algo/evbuild/EventbuildChain.h` then recompile.\
=> An empty string disables the histogram emission
2. Start the server exactly as in the tested app example
3. Start `cbmreco` as usual (no CLI option/parameter for this feature for now)
```
./cbmreco -p <path to algo params> -i <file://path-to-tsa-file or tcp://url-of-ts-server> --steps Unpack DigiTrigger Localreco
```
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment