Skip to content
Snippets Groups Projects
Commit 328627f2 authored by Pierre-Alain Loizeau's avatar Pierre-Alain Loizeau
Browse files

In hist serv w/o FairMQ, add support for input High-Water Mark + reduce logs

- Add High-water Mark setting on the ZMQ input socket with default value of 1
- Add binary option to set the HWM + help info
- Reduce the log info linked to the Histos and Canvases updates now that we send them on every update
- Add new logs when first/initial config is received
- Add periodic log to give indication that server is not stuck
parent c988a4ae
No related branches found
No related tags found
1 merge request!1801Add support for High-Water Mark in HistogramSender and histserv_nofairmq + reduce logs
......@@ -55,7 +55,7 @@ Application::Application(ProgramOptions const& opt) : fOpt(opt)
/// FIXME: SOMETHING_To_Replace_FairMQ!!!!!!!!!!!!!
/// FIXME: Initialize communication channels of SOMETHING_To_Replace_FairMQ
/// FIXME: Link channel to method in order to process received messages
// fZmqSocket.set(zmq::sockopt::rcvhwm, int(hwm)); // FIXME: need for HWM?
fZmqSocket.set(zmq::sockopt::rcvhwm, fOpt.ComChanZmqRcvHwm()); // High-Water Mark, nb updates kept in buffer
fZmqSocket.set(zmq::sockopt::rcvtimeo, fOpt.ComChanZmqRcvTo()); // Timeout in ms to avoid stuck in loop!
fZmqSocket.bind(fOpt.ComChan().c_str()); // This side "binds" the socket => Other side should connect!!!!
......@@ -298,6 +298,12 @@ bool Application::ReceiveData(zmq::message_t& msg)
LOG(debug) << "Application::ReceiveData => Finished processing histograms update";
/// FIXME: make the log frequency configurable?
if (0 == fNMessages % 100) {
LOG(info) << "HistServ::Application::ReceiveData => Finished processing histograms update #" << fNMessages
<< ", still alive!";
}
return true;
}
......@@ -314,7 +320,7 @@ bool Application::ReceiveHistoConfig(zmq::message_t& msg)
std::pair<std::string, std::string> tempObject("", "");
iarch >> tempObject;
LOG(info) << " Received configuration for histo " << tempObject.first << " : " << tempObject.second;
LOG(debug) << " Received configuration for histo " << tempObject.first << " : " << tempObject.second;
/// Check if histo name already received in previous messages
/// Linear search should be ok as config is shared only at startup
......@@ -326,7 +332,7 @@ bool Application::ReceiveHistoConfig(zmq::message_t& msg)
} // for( UInt_t uPrevHist = 0; uPrevHist < fvpsHistosFolder.size(); ++uPrevHist )
if (uPrevHist < fvpsHistosFolder.size()) {
LOG(info) << " Ignored new configuration for histo " << tempObject.first
LOG(debug) << " Ignored new configuration for histo " << tempObject.first
<< " due to previously received one: " << tempObject.second;
/// Not sure if we should return false here...
} // if( uPrevHist < fvpsHistosFolder.size() )
......@@ -335,6 +341,7 @@ bool Application::ReceiveHistoConfig(zmq::message_t& msg)
fvHistos.push_back(std::pair<TNamed*, std::string>(nullptr, ""));
fvbHistoRegistered.push_back(false);
fbAllHistosRegistered = false;
LOG(info) << " Stored configuration for histo " << tempObject.first << " : " << tempObject.second;
} // else of if( uPrevHist < fvpsHistosFolder.size() )
return true;
......@@ -353,7 +360,7 @@ bool Application::ReceiveCanvasConfig(zmq::message_t& msg)
std::pair<std::string, std::string> tempObject("", "");
iarch >> tempObject;
LOG(info) << " Received configuration for canvas " << tempObject.first << " : " << tempObject.second;
LOG(debug) << " Received configuration for canvas " << tempObject.first << " : " << tempObject.second;
/// Check if canvas name already received in previous messages
/// Linear search should be ok as config is shared only at startup
......@@ -365,7 +372,7 @@ bool Application::ReceiveCanvasConfig(zmq::message_t& msg)
} // for( UInt_t uPrevCanv = 0; uPrevCanv < fvpsCanvasConfig.size(); ++uPrevCanv )
if (uPrevCanv < fvpsCanvasConfig.size()) {
LOG(warning) << " Ignored new configuration for Canvas " << tempObject.first
LOG(debug) << " Ignored new configuration for Canvas " << tempObject.first
<< " due to previously received one: " << tempObject.second;
/// Not sure if we should return false here...
} // if( uPrevCanv < fvpsCanvasConfig.size() )
......@@ -377,6 +384,7 @@ bool Application::ReceiveCanvasConfig(zmq::message_t& msg)
fvCanvas.push_back(std::pair<TCanvas*, std::string>(nullptr, ""));
fvbCanvasRegistered.push_back(false);
fbAllCanvasRegistered = false;
LOG(info) << " Stored configuration for canvas " << tempObject.first << " : " << tempObject.second;
} // else of if( uPrevCanv < fvpsCanvasConfig.size() )
return true;
}
......
......@@ -36,6 +36,9 @@ namespace cbm::services::histserv
// --- Define configuration options: Optional
po::options_description config(" Configuration (optional or with default)");
auto config_add = config.add_options();
config_add("highwater,hwm", po::value<int32_t>(&fiHistosInZmqHwm)->default_value(1),
"ZMQ input channel high-water mark in messages: 0 = no buffering, val = nb updates kept in buffer \n"
"Tune to match nb of update emitters but also to avoid extrem memory usage!");
config_add("timeout,zto", po::value<int32_t>(&fiHistosInZmqRcvToMs)->default_value(10),
"ZMQ input channel timeout in ms: -1 = block (term. handling!), 0 = instant (CPU load!), val = nb ms");
config_add("port,p", po::value<uint32_t>(&fuHttpServerPort)->default_value(8080),
......
......@@ -33,6 +33,9 @@ namespace cbm::services::histserv
/** @brief Get interface channel name or hostname + port or whatever or ????? (FIXME: replacement of FairMQ) **/
[[nodiscard]] const std::string& ComChan() const { return fsChanHistosIn; }
/** @brief Get receive High-Water Mark for interface channel **/
[[nodiscard]] const int32_t& ComChanZmqRcvHwm() const { return fiHistosInZmqHwm; }
/** @brief Get receive timeout for interface channel **/
[[nodiscard]] const int32_t& ComChanZmqRcvTo() const { return fiHistosInZmqRcvToMs; }
......@@ -58,6 +61,7 @@ namespace cbm::services::histserv
private: // members
std::string fsChanHistosIn = "tcp://127.0.0.1:56800";
int32_t fiHistosInZmqHwm = 1; ///< High-Water Mark, default keep only 1 update in buffer
int32_t fiHistosInZmqRcvToMs = 10; ///< Timeout in ms: -1 = block, 0 = instant, val = nb ms
uint32_t fuHttpServerPort = 8080; ///< HTTP port of the ROOT web server
std::string fsHistoFileName = "histos_dump.root"; ///< Output file name (ROOT format)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment