diff --git a/services/histserv/app/Application.cxx b/services/histserv/app/Application.cxx index 1668612b1ac2ccb2799b42d71157fa6e6f21e2fc..942dda4f8dd35f05acacaca9c97f4c1b64194983 100644 --- a/services/histserv/app/Application.cxx +++ b/services/histserv/app/Application.cxx @@ -55,7 +55,7 @@ Application::Application(ProgramOptions const& opt) : fOpt(opt) /// FIXME: SOMETHING_To_Replace_FairMQ!!!!!!!!!!!!! /// FIXME: Initialize communication channels of SOMETHING_To_Replace_FairMQ /// FIXME: Link channel to method in order to process received messages - // fZmqSocket.set(zmq::sockopt::rcvhwm, int(hwm)); // FIXME: need for HWM? + fZmqSocket.set(zmq::sockopt::rcvhwm, fOpt.ComChanZmqRcvHwm()); // High-Water Mark, nb updates kept in buffer fZmqSocket.set(zmq::sockopt::rcvtimeo, fOpt.ComChanZmqRcvTo()); // Timeout in ms to avoid stuck in loop! fZmqSocket.bind(fOpt.ComChan().c_str()); // This side "binds" the socket => Other side should connect!!!! @@ -298,6 +298,12 @@ bool Application::ReceiveData(zmq::message_t& msg) LOG(debug) << "Application::ReceiveData => Finished processing histograms update"; + /// FIXME: make the log frequency configurable? + if (0 == fNMessages % 100) { + LOG(info) << "HistServ::Application::ReceiveData => Finished processing histograms update #" << fNMessages + << ", still alive!"; + } + return true; } @@ -314,7 +320,7 @@ bool Application::ReceiveHistoConfig(zmq::message_t& msg) std::pair<std::string, std::string> tempObject("", ""); iarch >> tempObject; - LOG(info) << " Received configuration for histo " << tempObject.first << " : " << tempObject.second; + LOG(debug) << " Received configuration for histo " << tempObject.first << " : " << tempObject.second; /// Check if histo name already received in previous messages /// Linear search should be ok as config is shared only at startup @@ -326,8 +332,8 @@ bool Application::ReceiveHistoConfig(zmq::message_t& msg) } // for( UInt_t uPrevHist = 0; uPrevHist < fvpsHistosFolder.size(); ++uPrevHist ) if (uPrevHist < fvpsHistosFolder.size()) { - LOG(info) << " Ignored new configuration for histo " << tempObject.first - << " due to previously received one: " << tempObject.second; + LOG(debug) << " Ignored new configuration for histo " << tempObject.first + << " due to previously received one: " << tempObject.second; /// Not sure if we should return false here... } // if( uPrevHist < fvpsHistosFolder.size() ) else { @@ -335,6 +341,7 @@ bool Application::ReceiveHistoConfig(zmq::message_t& msg) fvHistos.push_back(std::pair<TNamed*, std::string>(nullptr, "")); fvbHistoRegistered.push_back(false); fbAllHistosRegistered = false; + LOG(info) << " Stored configuration for histo " << tempObject.first << " : " << tempObject.second; } // else of if( uPrevHist < fvpsHistosFolder.size() ) return true; @@ -353,7 +360,7 @@ bool Application::ReceiveCanvasConfig(zmq::message_t& msg) std::pair<std::string, std::string> tempObject("", ""); iarch >> tempObject; - LOG(info) << " Received configuration for canvas " << tempObject.first << " : " << tempObject.second; + LOG(debug) << " Received configuration for canvas " << tempObject.first << " : " << tempObject.second; /// Check if canvas name already received in previous messages /// Linear search should be ok as config is shared only at startup @@ -365,8 +372,8 @@ bool Application::ReceiveCanvasConfig(zmq::message_t& msg) } // for( UInt_t uPrevCanv = 0; uPrevCanv < fvpsCanvasConfig.size(); ++uPrevCanv ) if (uPrevCanv < fvpsCanvasConfig.size()) { - LOG(warning) << " Ignored new configuration for Canvas " << tempObject.first - << " due to previously received one: " << tempObject.second; + LOG(debug) << " Ignored new configuration for Canvas " << tempObject.first + << " due to previously received one: " << tempObject.second; /// Not sure if we should return false here... } // if( uPrevCanv < fvpsCanvasConfig.size() ) else { @@ -377,6 +384,7 @@ bool Application::ReceiveCanvasConfig(zmq::message_t& msg) fvCanvas.push_back(std::pair<TCanvas*, std::string>(nullptr, "")); fvbCanvasRegistered.push_back(false); fbAllCanvasRegistered = false; + LOG(info) << " Stored configuration for canvas " << tempObject.first << " : " << tempObject.second; } // else of if( uPrevCanv < fvpsCanvasConfig.size() ) return true; } diff --git a/services/histserv/app/ProgramOptions.cxx b/services/histserv/app/ProgramOptions.cxx index 94058260e682570bbfb22007027417a9dba9fca1..a730a8fda99510300c3a53010169d21271e7b42b 100644 --- a/services/histserv/app/ProgramOptions.cxx +++ b/services/histserv/app/ProgramOptions.cxx @@ -36,6 +36,9 @@ namespace cbm::services::histserv // --- Define configuration options: Optional po::options_description config(" Configuration (optional or with default)"); auto config_add = config.add_options(); + config_add("highwater,hwm", po::value<int32_t>(&fiHistosInZmqHwm)->default_value(1), + "ZMQ input channel high-water mark in messages: 0 = no buffering, val = nb updates kept in buffer \n" + "Tune to match nb of update emitters but also to avoid extrem memory usage!"); config_add("timeout,zto", po::value<int32_t>(&fiHistosInZmqRcvToMs)->default_value(10), "ZMQ input channel timeout in ms: -1 = block (term. handling!), 0 = instant (CPU load!), val = nb ms"); config_add("port,p", po::value<uint32_t>(&fuHttpServerPort)->default_value(8080), diff --git a/services/histserv/app/ProgramOptions.h b/services/histserv/app/ProgramOptions.h index 7f8f1372d89ff608f4aecbde4133c8bc54b6ee47..18a3fde0f64c1c89f538d4c43263661eeb8db398 100644 --- a/services/histserv/app/ProgramOptions.h +++ b/services/histserv/app/ProgramOptions.h @@ -33,6 +33,9 @@ namespace cbm::services::histserv /** @brief Get interface channel name or hostname + port or whatever or ????? (FIXME: replacement of FairMQ) **/ [[nodiscard]] const std::string& ComChan() const { return fsChanHistosIn; } + /** @brief Get receive High-Water Mark for interface channel **/ + [[nodiscard]] const int32_t& ComChanZmqRcvHwm() const { return fiHistosInZmqHwm; } + /** @brief Get receive timeout for interface channel **/ [[nodiscard]] const int32_t& ComChanZmqRcvTo() const { return fiHistosInZmqRcvToMs; } @@ -58,6 +61,7 @@ namespace cbm::services::histserv private: // members std::string fsChanHistosIn = "tcp://127.0.0.1:56800"; + int32_t fiHistosInZmqHwm = 1; ///< High-Water Mark, default keep only 1 update in buffer int32_t fiHistosInZmqRcvToMs = 10; ///< Timeout in ms: -1 = block, 0 = instant, val = nb ms uint32_t fuHttpServerPort = 8080; ///< HTTP port of the ROOT web server std::string fsHistoFileName = "histos_dump.root"; ///< Output file name (ROOT format)