Skip to content
Snippets Groups Projects
Commit 197cb9a7 authored by Administrator's avatar Administrator Committed by Florian Uhlig
Browse files

Update flesnet version

The current flesnet version contain all changes/patches which we did so far
when compiling CbmRoot.
The new flesnet version also needs un updated version of cppzmq.
refs. #2068, #2069
parent c896d5c7
No related branches found
No related tags found
1 merge request!292Update flesnet version
Pipeline #9077 passed
......@@ -273,7 +273,7 @@ else(CBMROOT_MINIMAL)
find_package(TBB)
find_package(SSE)
# find_package(IWYU)
# find_package(ZeroMQ)
find_package(ZeroMQ)
Set(Boost_NO_SYSTEM_PATHS TRUE)
Set(Boost_NO_BOOST_CMAKE TRUE)
......
......@@ -45,6 +45,7 @@ include_directories(${INCLUDE_DIRECTORIES})
include_directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES})
set(LINK_DIRECTORIES
${Vc_LIB_DIR}
${ROOT_LIBRARY_DIR}
${FAIRROOT_LIBRARY_DIR}
${KFParticle_LIB_DIR}
......
......@@ -9,7 +9,7 @@ Option(DOWNLOAD_EXTERNALS "Download the code from the external repositories." ON
if(DOWNLOAD_EXTERNALS)
download_project_if_needed(PROJECT cppzmq
GIT_REPOSITORY "https://github.com/zeromq/cppzmq/"
GIT_TAG "05a0256d0eeea8063690fde6a156e14b70ed2280"
GIT_TAG "4f111562e7ce23d53bda53748d934ca523d650d7"
SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/cppzmq
TEST_FILE zmq.hpp
)
......@@ -34,7 +34,7 @@ else()
# Define targets which are needed by CbmRoot but are not available
# whithout the external packages
add_library(ANALYSISTREE SHARED IMPORTED GLOBAL)
# add_library(ANALYSISTREEQA SHARED IMPORTED GLOBAL)
add_library(ANALYSISTREEQA SHARED IMPORTED GLOBAL)
add_library(NICAFEMTO SHARED IMPORTED GLOBAL)
add_library(KFPARTICLE SHARED IMPORTED GLOBAL)
endif()
......@@ -5,21 +5,12 @@
download_project_if_needed(PROJECT fles_ipc
GIT_REPOSITORY "https://github.com/cbm-fles/flesnet"
GIT_TAG "e2d20813a74561cf58661b077c046c0da1f28288"
GIT_TAG "92ff50ead204d0acb4fccd9cbb9876817d077528"
GIT_STASH TRUE
SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/ipc
PATCH_COMMAND "patch -p1 < ${CMAKE_CURRENT_SOURCE_DIR}/ipc.patch"
TEST_FILE CMakeLists.txt
)
configure_file(Timeslice.hpp ${CMAKE_CURRENT_SOURCE_DIR}/ipc/lib/fles_ipc/Timeslice.hpp COPYONLY)
configure_file(TimesliceMultiInputArchive.hpp ${CMAKE_CURRENT_SOURCE_DIR}/ipc/lib/fles_ipc/TimesliceMultiInputArchive.hpp COPYONLY)
configure_file(TimesliceMultiInputArchive.cpp ${CMAKE_CURRENT_SOURCE_DIR}/ipc/lib/fles_ipc/TimesliceMultiInputArchive.cpp COPYONLY)
configure_file(TimesliceMultiSubscriber.hpp ${CMAKE_CURRENT_SOURCE_DIR}/ipc/lib/fles_ipc/TimesliceMultiSubscriber.hpp COPYONLY)
configure_file(TimesliceMultiSubscriber.cpp ${CMAKE_CURRENT_SOURCE_DIR}/ipc/lib/fles_ipc/TimesliceMultiSubscriber.cpp COPYONLY)
# Create the logging library
add_library(fles_logging SHARED ipc/lib/logging/log.cpp ipc/lib/logging/log.hpp)
......
// Copyright 2013 Jan de Cuveland <cmail@cuveland.de>
/// \file
/// \brief Defines the fles::Timeslice abstract base class.
#pragma once
#include "MicrosliceDescriptor.hpp"
#include "MicrosliceView.hpp"
#include "TimesliceComponentDescriptor.hpp"
#include "TimesliceDescriptor.hpp"
#include <fstream>
#include <vector>
#include <boost/serialization/access.hpp>
// Note: <fstream> has to precede boost/serialization includes for non-obvious
// reasons to avoid segfault similar to
// http://lists.debian.org/debian-hppa/2009/11/msg00069.html
namespace fles
{
/**
* \brief The Timeslice class provides read access to the data of a timeslice.
*
* This class is an abstract base class for all classes providing access to the
* contents of a single timeslice.
*/
class Timeslice
{
public:
virtual ~Timeslice() = 0;
/// Retrieve the timeslice index.
uint64_t index() const { return timeslice_descriptor_.index; }
/// Retrieve the number of core microslices.
uint64_t num_core_microslices() const
{
return timeslice_descriptor_.num_core_microslices;
}
/// Retrieve the total number of microslices.
uint64_t num_microslices(uint64_t component) const
{
return desc_ptr_[component]->num_microslices;
}
/// Retrieve the number of components (contributing input channels).
uint64_t num_components() const
{
return timeslice_descriptor_.num_components;
}
/// Retrieve the size of component.
uint64_t size_component(uint64_t component) const
{
return desc_ptr_[component]->size;
}
/// Retrieve a pointer to the data content of a given microslice
const uint8_t* content(uint64_t component, uint64_t microslice) const
{
return data_ptr_[component] +
desc_ptr_[component]->num_microslices *
sizeof(MicrosliceDescriptor) +
descriptor(component, microslice).offset -
descriptor(component, 0).offset;
}
/// Retrieve the descriptor of a given microslice
const MicrosliceDescriptor& descriptor(uint64_t component,
uint64_t microslice) const
{
return reinterpret_cast<const MicrosliceDescriptor*>(
data_ptr_[component])[microslice];
}
/// Retrieve the descriptor and pointer to the data of a given microslice
const MicrosliceView get_microslice(uint64_t component,
uint64_t microslice_index) const
{
uint8_t* component_data_ptr = data_ptr_[component];
MicrosliceDescriptor& dd = reinterpret_cast<MicrosliceDescriptor*>(
component_data_ptr)[microslice_index];
MicrosliceDescriptor& dd0 =
reinterpret_cast<MicrosliceDescriptor*>(component_data_ptr)[0];
uint8_t* cc = component_data_ptr +
desc_ptr_[component]->num_microslices *
sizeof(MicrosliceDescriptor) +
dd.offset - dd0.offset;
return MicrosliceView(dd, cc);
}
protected:
Timeslice(){};
friend class StorableTimeslice;
/// The timeslice descriptor.
TimesliceDescriptor timeslice_descriptor_;
/// A vector of pointers to the data content, one per timeslice component.
std::vector<uint8_t*> data_ptr_;
/// \brief A vector of pointers to the microslice descriptors, one per
/// timeslice component.
std::vector<TimesliceComponentDescriptor*> desc_ptr_;
};
} // namespace fles
// Copyright 2019 Florian Uhlig <f.uhlig@gsi.de>
#include "TimesliceMultiInputArchive.hpp"
#include "TimesliceInputArchive.hpp"
#include "StorableTimeslice.hpp"
#include <boost/regex.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/filesystem.hpp>
namespace filesys = boost::filesystem;
namespace fles {
TimesliceMultiInputArchive::TimesliceMultiInputArchive(const std::string& inputString, const std::string& inputDirectory)
{
std::string newInputString{""};
if (!inputDirectory.empty()) {
// split the input string at the character ";" which divides the string
// into different files/filelists for the different streams
std::vector<std::string> inputStreams;
boost::split(inputStreams, inputString, [](char c){return c == ';';});
for (auto& string: inputStreams) {
std::string fullFile = inputDirectory + "/" + string;
newInputString += fullFile;
newInputString += ";";
}
newInputString.pop_back(); // Remove the last ;
} else {
newInputString = inputString;
}
if (!newInputString.empty()) {
CreateInputFileList(newInputString);
for (auto& stream: InputFileList) {
std::string file = stream.at(0);
stream.erase(stream.begin());
source_.push_back(
std::unique_ptr<TimesliceInputArchive>(
new TimesliceInputArchive(file)));
L_(info) << " Open file: " << file;
}
} else {
L_(fatal) << "No input files defined";
exit(1);
}
InitTimesliceArchive();
}
void TimesliceMultiInputArchive::CreateInputFileList(std::string inputString)
{
// split the input string at the character ";" which divides the string
// into different files/filelists for the different streams
std::vector<std::string> inputStreams;
boost::split(inputStreams, inputString, [](char c){return c == ';';});
// loop over the inputs and extract the file List for
// the orresponding input
// The filename should contain the full path to the file
// The filename can contain the wildcard "*"
for (auto& string: inputStreams) {
filesys::path p{string};
std::string dir = p.parent_path().string();
std::string filename = p.filename().string();
std::vector<std::string> v;
// escape "." which have a special meaning in regex
// change "*" to ".*" to find any number
// e.g. tofget4_hd2018.*.tsa => tofget4_hd2018\..*\.tsa
boost::replace_all(filename, ".", "\\.");
boost::replace_all(filename, "*", ".*");
// create regex
const boost::regex my_filter(filename);
// loop over all files in input directory
for ( auto&& x : filesys::directory_iterator( p.parent_path() ) ) {
// Skip if not a file
if( !boost::filesystem::is_regular_file( x ) ) continue;
// Skip if no match
// x.path().leaf().string() means get from directory iterator the
// current entry as filesys::path, from this extract the leaf
// filename or directory name and convert it to a string to be
// used in the regex:match
boost::smatch what;
if( !boost::regex_match( x.path().leaf().string(), what, my_filter ) ) continue;
v.push_back(x.path().string());
}
// sort the files which match the regex in increasing order
// (hopefully)
std::sort(v.begin(), v.end());
InputFileList.push_back(v);
}
// some dubug output
L_(info) << "Number of input streams: " << InputFileList.size();
for (auto& streamList : InputFileList) {
L_(info) << "Number of files in Stream: " << streamList.size();
for (auto& fileList : streamList) {
L_(info) << "File: " << fileList;
}
}
}
void TimesliceMultiInputArchive::InitTimesliceArchive()
{
timesliceCont.resize(source_.size());
int element = 0;
for (auto& sourceNr: source_) {
if(auto timeslice = sourceNr->get()) {
sortedSource_.insert({timeslice->index(), element});
timesliceCont.at(element) = std::move(timeslice);
element++;
} else {
L_(fatal) << "Could not read a timeslice from input stream " << element;
exit(1);
}
}
}
Timeslice* TimesliceMultiInputArchive::do_get()
{
return GetNextTimeslice().release();
}
std::unique_ptr<Timeslice> TimesliceMultiInputArchive::GetNextTimeslice()
{
if (sortedSource_.size()>0) {
// take the first element from the set which is the one with the smallest
// ts number
// (*(sortedSource_.begin())) dereference the std::set iterator to get access to the
// contained pair
// afterwards erase the first element of the set
int currentSource = (*(sortedSource_.begin())).second;
sortedSource_.erase(sortedSource_.begin());
std::unique_ptr<Timeslice> retTimeslice = std::move(timesliceCont.at(currentSource));
if (auto timeslice = source_.at(currentSource)->get()) {
sortedSource_.insert({timeslice->index(), currentSource});
timesliceCont.at(currentSource) = std::move(timeslice);
} else {
if (!OpenNextFile(currentSource)) {
// if the first file reaches the end stop reading
//return std::unique_ptr<const Timeslice>(nullptr);
} else {
if ( (timeslice = source_.at(currentSource)->get()) ) {
sortedSource_.insert({timeslice->index(), currentSource});
timesliceCont.at(currentSource) = std::move(timeslice);
}
}
}
return retTimeslice;
} else {
return std::unique_ptr<Timeslice>(nullptr);
}
}
bool TimesliceMultiInputArchive::OpenNextFile(int element)
{
// First Close and delete existing source
if( nullptr != source_.at(element) ) {
delete source_.at(element).release();
}
if (InputFileList.at(element).size() > 0) {
std::string file = InputFileList.at(element).at(0);
InputFileList.at(element).erase(InputFileList.at(element).begin());
source_.at(element) = std::unique_ptr<TimesliceInputArchive>(
new TimesliceInputArchive(file));
L_(info) << " Open file: " << file;
} else {
L_(info) << "End of files list reached.";
return false;
}
return true;
}
}
// Copyright 2019 Florian Uhlig <f.uhlig@gsi.de>
/// \file
/// \brief Defines the fles::TimesliceMultiInputArchive class.
#pragma once
#include "TimesliceSource.hpp"
#include "StorableTimeslice.hpp"
#include "log.hpp"
#include <chrono>
#include <memory>
#include <vector>
#include <string>
#include <set>
namespace fles {
/**
* \brief The TimesliceMultiInputArchive class reads timeslice data from
* several TimesliceInputArchives and returns the timslice with the
* smallest index.
*/
class TimesliceMultiInputArchive : public TimesliceSource {
public:
// Construct an input archive object for each of the files passed in the input string
// open the archive files for reading, and read the archive descriptors
// If a directory is passed as second parameter build first a list of filenames which
// contains the full path
explicit TimesliceMultiInputArchive(const std::string&, const std::string& ="");
/// Delete copy constructor (non-copyable).
TimesliceMultiInputArchive(const TimesliceMultiInputArchive&) = delete;
/// Delete assignment operator (non-copyable).
void operator=(const TimesliceMultiInputArchive&) = delete;
~TimesliceMultiInputArchive() override = default;
/**
* \brief Retrieve the next item.
*
* \return pointer to the item, or nullptr if no more
* timeslices available in the input archives
*/
std::unique_ptr<Timeslice> get() {
return (GetNextTimeslice());
};
bool eos() const override { return sortedSource_.size() == 0; }
private:
Timeslice* do_get() override;
void InitTimesliceArchive();
void CreateInputFileList(std::string);
bool OpenNextFile(int);
std::unique_ptr<Timeslice> GetNextTimeslice();
std::vector<std::unique_ptr<TimesliceSource>> source_;
std::vector<std::vector<std::string>> InputFileList;
std::vector<std::unique_ptr<Timeslice>> timesliceCont;
std::set<std::pair<uint64_t,int>> sortedSource_;
logging::OstreamLog status_log_{status};
logging::OstreamLog debug_log_{debug};
};
} // namespace fles
// Copyright 2019 Florian Uhlig <f.uhlig@gsi.de>
#include "TimesliceMultiSubscriber.hpp"
#include "TimesliceSubscriber.hpp"
#include "StorableTimeslice.hpp"
#include <boost/regex.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/filesystem.hpp>
#include <boost/format.hpp>
namespace filesys = boost::filesystem;
namespace fles {
TimesliceMultiSubscriber::TimesliceMultiSubscriber(const std::string& inputString,
uint32_t hwm)
{
if (!inputString.empty()) {
CreateHostPortFileList(inputString);
for (auto& stream: InputHostPortList) {
std::string server = stream;
source_.push_back(
std::unique_ptr<TimesliceSubscriber>(
new TimesliceSubscriber(server, hwm)));
L_(info) << " Open server: " << server << " with ZMQ HW mark " << hwm;
}
} else {
L_(fatal) << "No server defined";
exit(1);
}
InitTimesliceSubscriber();
}
void TimesliceMultiSubscriber::CreateHostPortFileList(std::string inputString)
{
// split the input string at the character ";" which devides the string
// into different file/filelists for the different streams
std::vector<std::string> inputStreams;
boost::split(inputStreams, inputString, [](char c){return c == ';';});
// loop over the inputs and extract for each input the host address including eventual port
// if not port is defined, add the default port
// The hostname cannot contain the wildcard "*"
for (auto& string: inputStreams) {
if( 0 == string.size() )
L_(error) << " Empty hostname string, ignoring it";
std::vector<std::string> stringsHostnamePort;
boost::split(stringsHostnamePort, string, [](char c){return c == ':';});
switch( stringsHostnamePort.size() ) {
case 1:
string += DefaultPort;
break;
case 2:
break;
default:
// Bad hostname, ignore it
L_(error) << " Bad hostname string: " << string;
continue;
}
std::string fullpath = "tcp://";
fullpath += string;
InputHostPortList.push_back(fullpath);
}
// some dubug output
L_(info) << "Number of input streams: " << InputHostPortList.size();
for (auto& streamList : InputHostPortList) {
L_(info) << "Host and port: " << streamList;
}
}
void TimesliceMultiSubscriber::InitTimesliceSubscriber()
{
timesliceCont.resize(source_.size());
int element = 0;
for (auto& sourceNr: source_) {
if(auto timeslice = sourceNr->get()) {
sortedSource_.insert({timeslice->index(), element});
timesliceCont.at(element) = std::move(timeslice);
element++;
} else {
L_(fatal) << "Could not read a timeslice from input stream " << element;
exit(1);
}
}
}
Timeslice* TimesliceMultiSubscriber::do_get()
{
return GetNextTimeslice().release();
}
std::unique_ptr<Timeslice> TimesliceMultiSubscriber::GetNextTimeslice()
{
if (sortedSource_.size()>0) {
// take the first element from the set which is the one with the smallest
// ts number
// (*(sortedSource_.begin())) dereference the std::set iterator to get access to the
// contained pair
// afterwards erase the first element of the set
int currentSource = (*(sortedSource_.begin())).second;
sortedSource_.erase(sortedSource_.begin());
std::unique_ptr<Timeslice> retTimeslice = std::move(timesliceCont.at(currentSource));
if (auto timeslice = source_.at(currentSource)->get()) {
sortedSource_.insert({timeslice->index(), currentSource});
timesliceCont.at(currentSource) = std::move(timeslice);
} else {
// When any server stopped sending, stop reading
//return std::unique_ptr<const Timeslice>(nullptr);
}
return retTimeslice;
} else {
return std::unique_ptr<Timeslice>(nullptr);
}
}
} // end of namespace fles
// Copyright 2019 Florian Uhlig <f.uhlig@gsi.de>
/// \file
/// \brief Defines the fles::TimesliceMultiSubscriber class.
#pragma once
#include "TimesliceSource.hpp"
#include "StorableTimeslice.hpp"
#include "log.hpp"
#include <chrono>
#include <memory>
#include <vector>
#include <string>
#include <set>
namespace fles {
/**
* \brief The TimesliceMultiSubscriber class reads timeslice data from
* several TimesliceInputArchives and returns the timslice with the
* smallest index.
*/
class TimesliceMultiSubscriber : public TimesliceSource {
public:
/// Construct timeslice subscriber receiving from given ZMQ address.
explicit TimesliceMultiSubscriber(const std::string&, uint32_t hwm = 1);
/// Delete copy constructor (non-copyable).
TimesliceMultiSubscriber(const TimesliceMultiSubscriber&) = delete;
/// Delete assignment operator (non-copyable).
void operator=(const TimesliceMultiSubscriber&) = delete;
~TimesliceMultiSubscriber() override = default;
/**
* \brief Retrieve the next item.
*
* \return pointer to the item, or nullptr if no more
* timeslices available in the input archives
*/
std::unique_ptr<Timeslice> get() {
return (GetNextTimeslice());
};
bool eos() const override { return sortedSource_.size() == 0; }
private:
Timeslice* do_get() override;
void InitTimesliceSubscriber();
void CreateHostPortFileList(std::string);
std::unique_ptr<Timeslice> GetNextTimeslice();
std::vector<std::unique_ptr<TimesliceSource>> source_;
std::string DefaultPort = ":5556";
std::vector<std::string> InputHostPortList;
std::vector<std::unique_ptr<Timeslice>> timesliceCont;
std::set<std::pair<uint64_t,int>> sortedSource_;
logging::OstreamLog status_log_{status};
logging::OstreamLog debug_log_{debug};
};
} // namespace fles
diff --git a/lib/fles_ipc/StorableTimeslice.cpp b/lib/fles_ipc/StorableTimeslice.cpp
index 8d7ca72..15a3e88 100644
--- a/lib/fles_ipc/StorableTimeslice.cpp
+++ b/lib/fles_ipc/StorableTimeslice.cpp
@@ -2,6 +2,8 @@
#include "StorableTimeslice.hpp"
+#include <algorithm>
+
namespace fles {
StorableTimeslice::StorableTimeslice(const StorableTimeslice& ts)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment