Skip to content
Snippets Groups Projects

Add support for stream compression in online hist-sender and hist-server

Merged Pierre-Alain Loizeau requested to merge p.-a.loizeau/cbmroot:histserv_compr into master
Files
7
@@ -7,6 +7,10 @@
#include <boost/archive/binary_oarchive.hpp>
#include <boost/iostreams/device/array.hpp>
#include <boost/iostreams/device/back_inserter.hpp>
#ifdef BOOST_IOS_HAS_ZSTD
#include <boost/iostreams/filter/zstd.hpp>
#include <boost/iostreams/filtering_stream.hpp>
#endif
#include <boost/iostreams/stream.hpp>
#include <boost/serialization/utility.hpp>
#include <boost/serialization/vector.hpp>
@@ -20,9 +24,10 @@ namespace cbm::algo
class HistogramSender {
public:
HistogramSender(std::string_view address, int32_t hwm = 1)
HistogramSender(std::string_view address, int32_t hwm = 1, bool compression = false)
: fHistComChan(address)
, fHistHighWaterMark(hwm)
, fbCompression(compression)
, fZmqContext(1)
, fZmqSocket(fZmqContext, zmq::socket_type::push)
{
@@ -44,10 +49,25 @@ namespace cbm::algo
std::string serial_str;
b_io::back_insert_device<std::string> inserter(serial_str);
b_io::stream<b_io::back_insert_device<std::string>> bstream(inserter);
b_ar::binary_oarchive oa(bstream);
serial_str.clear();
oa << obj;
if (fbCompression) {
#ifdef BOOST_IOS_HAS_ZSTD
std::unique_ptr<b_io::filtering_ostream> out_ = std::make_unique<b_io::filtering_ostream>();
out_->push(b_io::zstd_compressor(b_io::zstd::best_speed));
out_->push(bstream);
std::unique_ptr<b_ar::binary_oarchive> oarchive_ =
std::make_unique<b_ar::binary_oarchive>(*out_, b_ar::no_header);
*oarchive_ << obj;
#else
throw std::runtime_error("Unsupported ZSTD compression (boost) for histograms emissions channel");
#endif
}
else {
b_ar::binary_oarchive oa(bstream);
oa << obj;
}
bstream.flush();
zmq::message_t msg(serial_str.size());
@@ -59,6 +79,7 @@ namespace cbm::algo
private:
std::string fHistComChan = "tcp://127.0.0.1:56800";
int32_t fHistHighWaterMark = 1;
bool fbCompression = false;
zmq::context_t fZmqContext; ///< ZMQ context FIXME: should be only one context per binary!
zmq::socket_t fZmqSocket; ///< ZMQ socket to histogram server
};
Loading