diff --git a/algo/base/HistogramSender.h b/algo/base/HistogramSender.h index ec9847312cc0dcaa2c930d8d24207c5a896ee169..f3dd22ae9155c627b7d95bf3f51be50f685dd070 100644 --- a/algo/base/HistogramSender.h +++ b/algo/base/HistogramSender.h @@ -20,11 +20,13 @@ namespace cbm::algo class HistogramSender { public: - HistogramSender(std::string_view address) + HistogramSender(std::string_view address, int32_t hwm = 1) : fHistComChan(address) + , fHistHighWaterMark(hwm) , fZmqContext(1) , fZmqSocket(fZmqContext, zmq::socket_type::push) { + fZmqSocket.set(zmq::sockopt::sndhwm, fHistHighWaterMark); // High-Water Mark, max nb updates kept in out buffer fZmqSocket.connect(fHistComChan); // This side "connects" to socket => Other side should have "bind"!!!! } @@ -54,7 +56,8 @@ namespace cbm::algo } private: - std::string fHistComChan = "tcp://127.0.0.1:56800"; + std::string fHistComChan = "tcp://127.0.0.1:56800"; + int32_t fHistHighWaterMark = 1; zmq::context_t fZmqContext; ///< ZMQ context FIXME: should be only one context per binary! zmq::socket_t fZmqSocket; ///< ZMQ socket to histogram server }; diff --git a/algo/base/Options.cxx b/algo/base/Options.cxx index 838430e279c51fdf246afab2178dc0eea8bcd646..25285bceef5306bec982a75c564e38a1bce4a2d9 100644 --- a/algo/base/Options.cxx +++ b/algo/base/Options.cxx @@ -72,6 +72,10 @@ Options::Options(int argc, char** argv) ("monitor,m", po::value(&fMonitorUri)->value_name("<uri>")->implicit_value("file:cout"), "URI specifying monitor output (e.g. file:/tmp/monitor.txt, influx1:login:8086:cbmreco_status). Prints to cout when no argument is given. Monitor is disabled when flag is not set.") ("histogram", po::value(&fHistogramUri)->value_name("<uri>"), "URI to specify histogram server") + ("histoshwm", po::value(&fHistogramHwm)->default_value(1)->value_name("<num>"), + "High-Water Mark for ZMQ socket to histogram server in messages:\n" + " 0 = no buffering, num = nb updates kept in buffer if not pulled by server \n" + " Tune to avoid too high memory usage but also adapt to server load!") ("log-file,L", po::value(&fLogFile)->value_name("<file>"), "write log messages to file") ("output-types,O", po::value(&fOutputTypes)->multitoken()->value_name("<types>"), diff --git a/algo/base/Options.h b/algo/base/Options.h index 06eb0c2fac39f853a86b0ac4dc7a9a0c281e9926..61b6516675e6da0570d68c0081521d210215f6d5 100644 --- a/algo/base/Options.h +++ b/algo/base/Options.h @@ -30,6 +30,7 @@ namespace cbm::algo const std::string& Device() const { return fDevice; } const std::string& MonitorUri() const { return fMonitorUri; } const std::string& HistogramUri() const { return fHistogramUri; } + const int32_t& HistogramHwm() const { return fHistogramHwm; } bool CollectKernelTimes() const { return fProfilingLevel != ProfilingNone; } ProfilingLevel Profiling() const { return fProfilingLevel; } fs::path TimingsFile() const { return fTimingsFile; } @@ -68,6 +69,7 @@ namespace cbm::algo std::string fDevice; std::string fMonitorUri; std::string fHistogramUri; + int32_t fHistogramHwm; bool fDumpArchive = false; ProfilingLevel fProfilingLevel = ProfilingNone; std::string fTimingsFile; diff --git a/algo/global/Reco.cxx b/algo/global/Reco.cxx index 94cf218a6aad04a66398d15c75ef971ab19af24c..56884d0d06eaf5a0a8dc43e52dffb317ee47abdd 100644 --- a/algo/global/Reco.cxx +++ b/algo/global/Reco.cxx @@ -82,7 +82,7 @@ void Reco::Init(const Options& opts) SetContext(&fContext); if (Opts().HistogramUri() != "") { - fSender = std::make_shared<HistogramSender>(Opts().HistogramUri()); + fSender = std::make_shared<HistogramSender>(Opts().HistogramUri(), Opts().HistogramHwm()); // fContext.sender = fSender; }