diff --git a/services/histserv/app/Application.cxx b/services/histserv/app/Application.cxx index f3caeb2a77b25db91a7b8129c5ab695a5af63171..ca6ab005ff6994f46741d2ec5e820941ffdde993 100644 --- a/services/histserv/app/Application.cxx +++ b/services/histserv/app/Application.cxx @@ -27,6 +27,7 @@ #include <boost/serialization/vector.hpp> #include <mutex> +#include <zmq_addon.hpp> #include "Histo1D.h" #include "ui_callbacks.h" @@ -54,6 +55,7 @@ namespace cbm::services::histserv /// FIXME: Initialize communication channels of SOMETHING_To_Replace_FairMQ /// FIXME: Link channel to method in order to process received messages // fZmqSocket.set(zmq::sockopt::rcvhwm, int(hwm)); // FIXME: need for HWM? + fZmqSocket.set(zmq::sockopt::rcvtimeo, fOpt.ComChanZmqRcvTo()); // Timeout in ms to avoid stuck in loop! fZmqSocket.bind(fOpt.ComChan().c_str()); // This side "binds" the socket => Other side should connect!!!! fServer = new THttpServer(Form("http:%u", fOpt.HttpPort())); @@ -86,86 +88,25 @@ namespace cbm::services::histserv { fStopThread = false; fThread = std::thread(&Application::UpdateHttpServer, this); - - const std::chrono::milliseconds timeout {10}; - /* - * Flesnet way (also cppzmq tour https://brettviren.github.io/cppzmq-tour) - * ZMQ Draft API! Need custom built ZMQ + #include <zmq_addon.hpp>! - zmq::poller_t in_poller; - in_poller.add(fZmqSocket, zmq::event_flags::pollin); - std::vector<decltype(poller)::poller_event> in_events(1); - */ - /* Alternative taken from FairMQ Poller and zmq.hpp/monitor::check_event() */ - // http://api.zeromq.org/2-1:zmq-poll: void *socket, int fd, short events, short revents - zmq_pollitem_t pollit {fZmqSocket.handle(), 0, ZMQ_POLLIN, 0}; - while (!bHistoServerStop) { // /// Infinite loop, this is a service which should survive until told otherwise after all /// FIXME: Start listening to <SOMETHING?!?> to receive histograms and configuration - /* - * Flesnet way (also cppzmq tour https://brettviren.github.io/cppzmq-tour) - * ZMQ Draft API! Need custom built ZMQ + #include <zmq_addon.hpp>! - if (in_poller.wait_all(in_events, timeout)) { - zmq::message_t msg; - zmq::recv_result_t rres = fZmqSocket.recv(msg, zmq::recv_flags::none); + /// FIXME: handle signals from OS/console + /* Jan suggestion with zmq_addon CPP interface */ + std::vector<zmq::message_t> vMsg; + const auto ret = zmq::recv_multipart(fZmqSocket, std::back_inserter(vMsg)); + if (!ret) continue; + + if (*ret > 3) { // + ReceiveConfigAndData(vMsg); } - */ - /* Alternative taken from FairMQ Poller and zmq.hpp/monitor::check_event() */ - zmq::poll(&pollit, 1, timeout); - if (pollit.revents & ZMQ_POLLIN) { - std::vector<zmq::message_t> vMsg; - int more = 1; - size_t moreSize = sizeof(more); - while (more) { - vMsg.push_back(zmq::message_t()); - int rc = zmq_msg_recv(vMsg.back().handle(), fZmqSocket.handle(), 0); - if (rc == -1) { - switch (zmq_errno()) { - case ETERM: { - LOG(debug) << "polling exited, reason: socket was terminated (" << zmq_strerror(errno) << ")"; - more = 0; - bHistoServerStop = true; - break; - } - case EINTR: { - LOG(debug) << "polling interrupted by system call"; - /// FIXME: not sure if should stop or go-on at this point - more = 0; - bHistoServerStop = true; - break; - } - default: { - LOG(error) << "polling failed, reason: " << zmq_strerror(errno); - more = 0; - bHistoServerStop = true; - break; - } - } - continue; - } - else { - zmq_getsockopt(fZmqSocket, ZMQ_RCVMORE, &more, &moreSize); - } - } - - /// Check if multi-parts message - if (3 < vMsg.size()) { // - ReceiveConfigAndData(vMsg); - } - else if (1 == vMsg.size()) { - ReceiveData(vMsg[0]); - } - else if (0 < vMsg.size()) { - LOG(error) << "Invalid number of message parts received: should be either 1 or more than 3 vs " - << vMsg.size(); - } - else { - LOG(error) << "polling told some data to expect but none successfully readout!"; - } + else if (*ret == 1) { // + ReceiveData(vMsg[0]); + } + else { // + LOG(error) << "Invalid number of message parts received: should be either 1 or more than 3 vs " << *ret; } - - //std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } // ------------------------------------------------------------------------------------------------------------------- diff --git a/services/histserv/app/ProgramOptions.cxx b/services/histserv/app/ProgramOptions.cxx index e8b4e545ececb4e02c5f70a5d92c5a65fba1a928..54522eaf284fd70ef0afada29d348d134e7b4f29 100644 --- a/services/histserv/app/ProgramOptions.cxx +++ b/services/histserv/app/ProgramOptions.cxx @@ -36,6 +36,8 @@ namespace cbm::services::histserv // --- Define configuration options: Optional po::options_description config(" Configuration (optional or with default)"); auto config_add = config.add_options(); + config_add("timeout,zto", po::value<int32_t>(&fiHistosInZmqRcvToMs)->default_value(10), + "ZMQ input channel timeout in ms: -1 = block (term. handling!), 0 = instant (CPU load!), val = nb ms"); config_add("port,p", po::value<uint32_t>(&fuHttpServerPort)->default_value(8080), "port on which the http ROOT server (JSroot) will be available (mind default value!)"); config_add("output,o", po::value<string>(&fsHistoFileName)->value_name("<file name>"), diff --git a/services/histserv/app/ProgramOptions.h b/services/histserv/app/ProgramOptions.h index 6e7a526bc265ac074b1fd4832382f4c705ac7d2a..47a913878b89576415f0610db3ef7d98286f8050 100644 --- a/services/histserv/app/ProgramOptions.h +++ b/services/histserv/app/ProgramOptions.h @@ -33,6 +33,9 @@ namespace cbm::services::histserv /** @brief Get interface channel name or hostname + port or whatever or ????? (FIXME: replacement of FairMQ) **/ [[nodiscard]] const std::string& ComChan() const { return fsChanHistosIn; } + /** @brief Get receive timeout for interface channel **/ + [[nodiscard]] const int32_t& ComChanZmqRcvTo() const { return fiHistosInZmqRcvToMs; } + /** @brief Get histo server http port **/ [[nodiscard]] const uint32_t& HttpPort() const { return fuHttpServerPort; } @@ -51,11 +54,12 @@ namespace cbm::services::histserv private: // members - std::string fsChanHistosIn = "histogram-in"; - uint32_t fuHttpServerPort = 8080; ///< HTTP port of the ROOT web server - std::string fsHistoFileName = "histos_dump.root"; ///< Output file name (ROOT format) - bool fOverwrite = false; ///< Enable overwriting of existing output file - //std::string fConfig = ""; ///< Configuration file name (YAML format) + std::string fsChanHistosIn = "tcp://127.0.0.1:56800"; + int32_t fiHistosInZmqRcvToMs = 10; ///< Timeout in ms: -1 = block, 0 = instant, val = nb ms + uint32_t fuHttpServerPort = 8080; ///< HTTP port of the ROOT web server + std::string fsHistoFileName = "histos_dump.root"; ///< Output file name (ROOT format) + bool fOverwrite = false; ///< Enable overwriting of existing output file + //std::string fConfig = ""; ///< Configuration file name (YAML format) }; } // namespace cbm::services::histserv