Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • le.koch/cbmroot
  • patrick.pfistner_AT_kit.edu/cbmroot
  • lena.rossel_AT_stud.uni-frankfurt.de/cbmroot
  • i.deppner/cbmroot
  • fweig/cbmroot
  • karpushkin_AT_inr.ru/cbmroot
  • v.akishina/cbmroot
  • rishat.sultanov_AT_cern.ch/cbmroot
  • l_fabe01_AT_uni-muenster.de/cbmroot
  • pwg-c2f/cbmroot
  • j.decuveland/cbmroot
  • a.toia/cbmroot
  • i.vassiliev/cbmroot
  • n.herrmann/cbmroot
  • o.lubynets/cbmroot
  • se.gorbunov/cbmroot
  • cornelius.riesen_AT_physik.uni-giessen.de/cbmroot
  • zhangqn17_AT_mails.tsinghua.edu.cn/cbmroot
  • bartosz.sobol/cbmroot
  • ajit.kumar/cbmroot
  • computing/cbmroot
  • a.agarwal_AT_vecc.gov.in/cbmroot
  • osingh/cbmroot
  • wielanek_AT_if.pw.edu.pl/cbmroot
  • malgorzata.karabowicz.stud_AT_pw.edu.pl/cbmroot
  • m.shiroya/cbmroot
  • s.roy/cbmroot
  • p.-a.loizeau/cbmroot
  • a.weber/cbmroot
  • ma.beyer/cbmroot
  • d.klein/cbmroot
  • d.smith/cbmroot
  • mvdsoft/cbmroot
  • d.spicker/cbmroot
  • y.h.leung/cbmroot
  • m.deveaux/cbmroot
  • mkunold/cbmroot
  • h.darwish/cbmroot
  • f_fido01_AT_uni-muenster.de/cbmroot
  • g.kozlov/cbmroot
  • d.emschermann/cbmroot
  • evgeny.lavrik/cbmroot
  • v.friese/cbmroot
  • f.uhlig/cbmroot
  • ebechtel_AT_ikf.uni-frankfurt.de/cbmroot
  • a.senger/cbmroot
  • praisig/cbmroot
  • s.lebedev/cbmroot
  • redelbach_AT_compeng.uni-frankfurt.de/cbmroot
  • p.subramani/cbmroot
  • a_meye37_AT_uni-muenster.de/cbmroot
  • om/cbmroot
  • o.golosov/cbmroot
  • l.chlad/cbmroot
  • a.bercuci/cbmroot
  • d.ramirez/cbmroot
  • v.singhal/cbmroot
  • h.schiller/cbmroot
  • apuntke/cbmroot
  • f.zorn/cbmroot
  • rubio_AT_physi.uni-heidelberg.de/cbmroot
  • p.chudoba/cbmroot
  • apuntke/mcbmroot
  • r.karabowicz/cbmroot
64 results
Show changes
Showing
with 1331 additions and 813 deletions
/* Copyright (C) 2017-2019 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Florian Uhlig [committer] */
/**
* CbmMQTsaSampler.cpp
*
......@@ -7,235 +11,235 @@
#include "CbmMQTsaSampler.h"
#include "CbmMQDefs.h"
#include "FairMQLogger.h"
#include "FairMQProgOptions.h" // device->fConfig
#include "CbmMQDefs.h"
#include "TimesliceSubscriber.hpp"
#include "TimesliceInputArchive.hpp"
#include "TimesliceSubscriber.hpp"
#include "FairMQLogger.h"
#include "FairMQProgOptions.h" // device->fConfig
#include <boost/algorithm/string.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/filesystem.hpp>
#include <boost/regex.hpp>
#include <boost/algorithm/string.hpp>
namespace filesys = boost::filesystem;
#include <stdio.h>
#include <ctime>
#include <thread> // this_thread::sleep_for
#include <chrono>
#include <thread> // this_thread::sleep_for
#include <algorithm>
#include <chrono>
#include <ctime>
#include <string>
#include <stdio.h>
using namespace std;
#include <stdexcept>
struct InitTaskError : std::runtime_error { using std::runtime_error::runtime_error; };
struct InitTaskError : std::runtime_error {
using std::runtime_error::runtime_error;
};
CbmMQTsaSampler::CbmMQTsaSampler()
: FairMQDevice()
, fMaxTimeslices(0)
, fFileName("")
, fDirName("")
, fInputFileList()
, fFileCounter(0)
, fHost("")
, fPort(0)
, fTSNumber(0)
, fTSCounter(0)
, fMessageCounter(0)
, fSource(nullptr)
, fTime()
: FairMQDevice()
, fMaxTimeslices(0)
, fFileName("")
, fDirName("")
, fInputFileList()
, fFileCounter(0)
, fHost("")
, fPort(0)
, fTSNumber(0)
, fTSCounter(0)
, fMessageCounter(0)
, fSource(nullptr)
, fTime()
{
}
void CbmMQTsaSampler::InitTask()
try
{
// Get the values from the command line options (via fConfig)
fFileName = fConfig->GetValue<string>("filename");
fDirName = fConfig->GetValue<string>("dirname");
fHost = fConfig->GetValue<string>("flib-host");
fPort = fConfig->GetValue<uint64_t>("flib-port");
fMaxTimeslices = fConfig->GetValue<uint64_t>("max-timeslices");
// Check which input is defined
// Posibilities
// filename && ! dirname : single file
// filename with wildcards && diranme : all files with filename regex in the directory
// host && port : connect to the flim server
bool isGoodInputCombi{false};
if ( 0 != fFileName.size() && 0 == fDirName.size() && 0 == fHost.size() && 0 == fPort ) {
isGoodInputCombi=true;
// Create a Path object from given path string
filesys::path pathObj(fFileName);
if ( ! filesys::is_regular_file(pathObj) ) {
throw InitTaskError("Passed file name is no valid file");
}
fInputFileList.push_back(fFileName);
LOG(info) << "Filename: " << fFileName;
} else if ( 0 != fFileName.size() && 0 != fDirName.size() && 0 == fHost.size() && 0 == fPort) {
isGoodInputCombi=true;
filesys::path pathObj = fDirName;
if ( ! filesys::is_directory(pathObj) ) {
throw InitTaskError("Passed directory name is no valid directory");
}
if (fFileName.find("*") == std::string::npos) {
// Normal file without wildcards
pathObj += fFileName;
if ( ! filesys::is_regular_file(pathObj) ) {
throw InitTaskError("Passed file name is no valid file");
}
fInputFileList.push_back(pathObj.string());
LOG(info) << "Filename: " << fInputFileList[0];
} else {
std::vector<filesys::path> v;
// escape "." which have a special meaning in regex
// change "*" to ".*" to find any number
// e.g. tofget4_hd2018.*.tsa => tofget4_hd2018\..*\.tsa
boost::replace_all(fFileName, ".", "\\.");
boost::replace_all(fFileName, "*", ".*");
// create regex
const boost::regex my_filter(fFileName);
// loop over all files in input directory
for (auto&& x : filesys::directory_iterator(pathObj)) {
// Skip if not a file
if( !boost::filesystem::is_regular_file( x ) ) continue;
// Skip if no match
// x.path().leaf().string() means get from directory iterator the
// current entry as filesys::path, from this extract the leaf
// filename or directory name and convert it to a string to be
// used in the regex:match
boost::smatch what;
if( !boost::regex_match( x.path().leaf().string(), what, my_filter ) ) continue;
v.push_back(x.path());
}
// sort the files which match the regex in increasing order
// (hopefully)
std::sort(v.begin(), v.end());
for (auto&& x : v)
fInputFileList.push_back(x.string());
LOG(info) << "The following files will be used in this order.";
for (auto&& x : v)
LOG(info) << " " << x;
try {
// Get the values from the command line options (via fConfig)
fFileName = fConfig->GetValue<string>("filename");
fDirName = fConfig->GetValue<string>("dirname");
fHost = fConfig->GetValue<string>("flib-host");
fPort = fConfig->GetValue<uint64_t>("flib-port");
fMaxTimeslices = fConfig->GetValue<uint64_t>("max-timeslices");
// Check which input is defined
// Posibilities
// filename && ! dirname : single file
// filename with wildcards && diranme : all files with filename regex in the directory
// host && port : connect to the flim server
bool isGoodInputCombi {false};
if (0 != fFileName.size() && 0 == fDirName.size() && 0 == fHost.size() && 0 == fPort) {
isGoodInputCombi = true;
// Create a Path object from given path string
filesys::path pathObj(fFileName);
if (!filesys::is_regular_file(pathObj)) { throw InitTaskError("Passed file name is no valid file"); }
fInputFileList.push_back(fFileName);
LOG(info) << "Filename: " << fFileName;
}
else if (0 != fFileName.size() && 0 != fDirName.size() && 0 == fHost.size() && 0 == fPort) {
isGoodInputCombi = true;
filesys::path pathObj = fDirName;
if (!filesys::is_directory(pathObj)) { throw InitTaskError("Passed directory name is no valid directory"); }
if (fFileName.find("*") == std::string::npos) {
// Normal file without wildcards
pathObj += fFileName;
if (!filesys::is_regular_file(pathObj)) { throw InitTaskError("Passed file name is no valid file"); }
fInputFileList.push_back(pathObj.string());
LOG(info) << "Filename: " << fInputFileList[0];
}
else {
std::vector<filesys::path> v;
// escape "." which have a special meaning in regex
// change "*" to ".*" to find any number
// e.g. tofget4_hd2018.*.tsa => tofget4_hd2018\..*\.tsa
boost::replace_all(fFileName, ".", "\\.");
boost::replace_all(fFileName, "*", ".*");
// create regex
const boost::regex my_filter(fFileName);
// loop over all files in input directory
for (auto&& x : filesys::directory_iterator(pathObj)) {
// Skip if not a file
if (!boost::filesystem::is_regular_file(x)) continue;
// Skip if no match
// x.path().leaf().string() means get from directory iterator the
// current entry as filesys::path, from this extract the leaf
// filename or directory name and convert it to a string to be
// used in the regex:match
boost::smatch what;
if (!boost::regex_match(x.path().leaf().string(), what, my_filter)) continue;
v.push_back(x.path());
}
} else if ( 0 == fFileName.size() && 0 == fDirName.size() && 0 != fHost.size() && 0!= fPort) {
isGoodInputCombi=true;
LOG(info) << "Host: " << fHost;
LOG(info) << "Port: " << fPort;
} else {
isGoodInputCombi=false;
}
// sort the files which match the regex in increasing order
// (hopefully)
std::sort(v.begin(), v.end());
for (auto&& x : v)
fInputFileList.push_back(x.string());
if (!isGoodInputCombi) {
throw InitTaskError("Wrong combination of inputs. Either file or wildcard file + directory or host + port are allowed combination.");
LOG(info) << "The following files will be used in this order.";
for (auto&& x : v)
LOG(info) << " " << x;
}
}
else if (0 == fFileName.size() && 0 == fDirName.size() && 0 != fHost.size() && 0 != fPort) {
isGoodInputCombi = true;
LOG(info) << "Host: " << fHost;
LOG(info) << "Port: " << fPort;
}
else {
isGoodInputCombi = false;
}
if (!isGoodInputCombi) {
throw InitTaskError("Wrong combination of inputs. Either file or wildcard file + directory "
"or host + port are allowed combination.");
}
LOG(info) << "MaxTimeslices: " << fMaxTimeslices;
// Get the information about created channels from the device
// Check if the defined channels from the topology (by name)
// are in the list of channels which are possible/allowed
// for the device
// The idea is to check at initilization if the devices are
// properly connected. For the time beeing this is done with a
// nameing convention. It is not avoided that someone sends other
// data on this channel.
int noChannel = fChannels.size();
LOG(info) << "Number of defined output channels: " << noChannel;
for(auto const &entry : fChannels) {
LOG(info) << "Channel name: " << entry.first;
if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
}
for(auto const& value: fComponentsToSend) {
LOG(info) << "Value : " << value;
if (value > 1) {
throw InitTaskError("Sending same data to more than one output channel not implemented yet.");
}
LOG(info) << "MaxTimeslices: " << fMaxTimeslices;
// Get the information about created channels from the device
// Check if the defined channels from the topology (by name)
// are in the list of channels which are possible/allowed
// for the device
// The idea is to check at initilization if the devices are
// properly connected. For the time beeing this is done with a
// nameing convention. It is not avoided that someone sends other
// data on this channel.
int noChannel = fChannels.size();
LOG(info) << "Number of defined output channels: " << noChannel;
for (auto const& entry : fChannels) {
LOG(info) << "Channel name: " << entry.first;
if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
}
for (auto const& value : fComponentsToSend) {
LOG(info) << "Value : " << value;
if (value > 1) {
throw InitTaskError("Sending same data to more than one output channel "
"not implemented yet.");
}
}
if ( 0 == fFileName.size() && 0 != fHost.size() ) {
if (0 == fFileName.size() && 0 != fHost.size()) {
std::string connector = "tcp://" + fHost + ":" + std::to_string(fPort);
LOG(info) << "Open TSPublisher at " << connector;
fSource = new fles::TimesliceSubscriber(connector);
if ( !fSource) {
throw InitTaskError("Could not connect to publisher.");
}
} else {
if( false == OpenNextFile() )
{
fSource = new fles::TimesliceSubscriber(connector, 1);
if (!fSource) { throw InitTaskError("Could not connect to publisher."); }
}
else {
if (false == OpenNextFile()) {
throw InitTaskError("Could not open the first input file in the list, Doing nothing!");
}
}
fTime = std::chrono::steady_clock::now();
} catch (InitTaskError& e) {
}
catch (InitTaskError& e) {
LOG(error) << e.what();
// Wrapper defined in CbmMQDefs.h to support different FairMQ versions
cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound);
}
bool CbmMQTsaSampler::OpenNextFile()
{
bool CbmMQTsaSampler::OpenNextFile()
{
// First Close and delete existing source
if( nullptr != fSource )
delete fSource;
if (nullptr != fSource) delete fSource;
if (fInputFileList.size() > 0) {
fFileName = fInputFileList[0];
fInputFileList.erase(fInputFileList.begin());
LOG(info) << "Open the Flib input file " << fFileName;
filesys::path pathObj(fFileName);
if ( ! filesys::is_regular_file(pathObj) ) {
if (!filesys::is_regular_file(pathObj)) {
LOG(error) << "Input file " << fFileName << " doesn't exist.";
return false;
}
fSource = new fles::TimesliceInputArchive(fFileName);
if ( !fSource) {
if (!fSource) {
LOG(error) << "Could not open input file.";
return false;
}
} else {
}
else {
LOG(info) << "End of files list reached.";
return false;
}
}
return true;
}
bool CbmMQTsaSampler::IsChannelNameAllowed(std::string channelName)
{
for(auto const &entry : fAllowedChannels) {
for (auto const& entry : fAllowedChannels) {
std::size_t pos1 = channelName.find(entry);
if (pos1!=std::string::npos) {
if (pos1 != std::string::npos) {
const vector<std::string>::const_iterator pos =
std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
const vector<std::string>::size_type idx = pos-fAllowedChannels.begin();
std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
const vector<std::string>::size_type idx = pos - fAllowedChannels.begin();
LOG(info) << "Found " << entry << " in " << channelName;
LOG(info) << "Channel name " << channelName
<< " found in list of allowed channel names at position " << idx;
LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
fComponentsToSend[idx]++;
fChannelsToSend[idx].push_back(channelName);
return true;
}
}
LOG(info) << "Channel name " << channelName
<< " not found in list of allowed channel names.";
LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
LOG(error) << "Stop device.";
return false;
}
......@@ -248,15 +252,14 @@ bool CbmMQTsaSampler::ConditionalRun()
if (timeslice) {
if (fTSCounter < fMaxTimeslices) {
fTSCounter++;
if (fTSCounter % 10000 == 0) LOG(info) << "Analyse Event " << fTSCounter;
if (fTSCounter % 10000 == 0) LOG(info) << "Analyse Event " << fTSCounter;
const fles::Timeslice& ts = *timeslice;
// auto tsIndex = ts.index();
// auto tsIndex = ts.index();
LOG(info) << "Found " << ts.num_components()
<< " different components in timeslice";
LOG(info) << "Found " << ts.num_components() << " different components in timeslice";
CheckTimeslice(ts);
......@@ -265,23 +268,26 @@ bool CbmMQTsaSampler::ConditionalRun()
CreateAndSendComponent(ts, nrComp);
}
return true;
} else {
if ( false == OpenNextFile() ) {
}
else {
if (false == OpenNextFile()) {
CalcRuntime();
return false;
} else {
}
else {
return true;
}
}
} else {
if ( false == OpenNextFile() ) {
}
else {
if (false == OpenNextFile()) {
CalcRuntime();
return false;
} else {
}
else {
return true;
}
}
}
bool CbmMQTsaSampler::CreateAndSendComponent(const fles::Timeslice& ts, int nrComp)
......@@ -291,21 +297,21 @@ bool CbmMQTsaSampler::CreateAndSendComponent(const fles::Timeslice& ts, int nrCo
// is connected create the new timeslice and send it to the
// correct channel
LOG(info) << "SysID: " << static_cast<int>(ts.descriptor(nrComp,0).sys_id);
LOG(info) << "SysID: " << static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
const vector<int>::const_iterator pos =
std::find(fSysId.begin(), fSysId.end(), static_cast<int>(ts.descriptor(nrComp,0).sys_id));
if (pos != fSysId.end() ) {
const vector<std::string>::size_type idx = pos-fSysId.begin();
if (fComponentsToSend[idx]>0) {
std::find(fSysId.begin(), fSysId.end(), static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
if (pos != fSysId.end()) {
const vector<std::string>::size_type idx = pos - fSysId.begin();
if (fComponentsToSend[idx] > 0) {
LOG(info) << "Create timeslice component for link " << nrComp;
fles::StorableTimeslice component{static_cast<uint32_t>(ts.num_microslices(nrComp), ts.index())};
fles::StorableTimeslice component {static_cast<uint32_t>(ts.num_core_microslices()), ts.index()};
component.append_component(ts.num_microslices(0));
for (size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
component.append_microslice( 0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m) );
component.append_microslice(0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m));
}
if ( ! SendData(component, idx) ) return false;
if (!SendData(component, idx)) return false;
return true;
}
}
......@@ -313,23 +319,22 @@ bool CbmMQTsaSampler::CreateAndSendComponent(const fles::Timeslice& ts, int nrCo
}
bool CbmMQTsaSampler::SendData(const fles::StorableTimeslice& component, int idx)
{
{
// serialize the timeslice and create the message
std::stringstream oss;
boost::archive::binary_oarchive oa(oss);
oa << component;
std::string* strMsg = new std::string(oss.str());
FairMQMessagePtr msg(NewMessage(const_cast<char*>(strMsg->c_str()), // data
strMsg->length(), // size
[](void* /*data*/, void* object){ delete static_cast<std::string*>(object); },
strMsg)); // object that manages the data
FairMQMessagePtr msg(NewMessage(
const_cast<char*>(strMsg->c_str()), // data
strMsg->length(), // size
[](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
strMsg)); // object that manages the data
// TODO: Implement sending same data to more than one channel
// Need to create new message (copy message??)
if (fComponentsToSend[idx]>1) {
LOG(info) << "Need to copy FairMessage";
}
if (fComponentsToSend[idx] > 1) { LOG(info) << "Need to copy FairMessage"; }
// in case of error or transfer interruption,
// return false to go to IDLE state
......@@ -343,40 +348,31 @@ bool CbmMQTsaSampler::SendData(const fles::StorableTimeslice& component, int idx
}
fMessageCounter++;
LOG(info) << "Send message " << fMessageCounter << " with a size of "
<< msg->GetSize();
LOG(info) << "Send message " << fMessageCounter << " with a size of " << msg->GetSize();
return true;
}
CbmMQTsaSampler::~ CbmMQTsaSampler()
{
}
CbmMQTsaSampler::~CbmMQTsaSampler() {}
void CbmMQTsaSampler::CalcRuntime()
{
std::chrono::duration<double> run_time =
std::chrono::steady_clock::now() - fTime;
std::chrono::duration<double> run_time = std::chrono::steady_clock::now() - fTime;
LOG(info) << "Runtime: " << run_time.count();
LOG(info) << "No more input data";
}
void CbmMQTsaSampler::PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor& mdsc)
{
LOG(info) << "Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id)
<< std::dec;
LOG(info) << "Header version: Ox" << std::hex << static_cast<int>(mdsc.hdr_ver)
<< std::dec;
LOG(info) << "Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id) << std::dec;
LOG(info) << "Header version: Ox" << std::hex << static_cast<int>(mdsc.hdr_ver) << std::dec;
LOG(info) << "Equipement ID: " << mdsc.eq_id;
LOG(info) << "Flags: " << mdsc.flags;
LOG(info) << "Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id)
<< std::dec;
LOG(info) << "Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver)
<< std::dec;
LOG(info) << "Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id) << std::dec;
LOG(info) << "Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver) << std::dec;
LOG(info) << "Microslice Idx: " << mdsc.idx;
LOG(info) << "Checksum: " << mdsc.crc;
LOG(info) << "Size: " << mdsc.size;
......@@ -385,25 +381,20 @@ void CbmMQTsaSampler::PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor
bool CbmMQTsaSampler::CheckTimeslice(const fles::Timeslice& ts)
{
if ( 0 == ts.num_components() ) {
if (0 == ts.num_components()) {
LOG(error) << "No Component in TS " << ts.index();
return 1;
}
LOG(info) << "Found " << ts.num_components()
<< " different components in timeslice";
LOG(info) << "Found " << ts.num_components() << " different components in timeslice";
for (size_t c = 0; c < ts.num_components(); ++c) {
LOG(info) << "Found " << ts.num_microslices(c)
<< " microslices in component " << c;
LOG(info) << "Component " << c << " has a size of "
<< ts.size_component(c) << " bytes";
LOG(info) << "Component " << c << " has the system id 0x"
<< std::hex << static_cast<int>(ts.descriptor(c,0).sys_id)
<< std::dec;
LOG(info) << "Component " << c << " has the system id 0x"
<< static_cast<int>(ts.descriptor(c,0).sys_id);
/*
LOG(info) << "Found " << ts.num_microslices(c) << " microslices in component " << c;
LOG(info) << "Component " << c << " has a size of " << ts.size_component(c) << " bytes";
LOG(info) << "Component " << c << " has the system id 0x" << std::hex
<< static_cast<int>(ts.descriptor(c, 0).sys_id) << std::dec;
LOG(info) << "Component " << c << " has the system id 0x" << static_cast<int>(ts.descriptor(c, 0).sys_id);
/*
for (size_t m = 0; m < ts.num_microslices(c); ++m) {
PrintMicroSliceDescriptor(ts.descriptor(c,m));
}
......
/* Copyright (C) 2017-2018 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Florian Uhlig [committer] */
/**
* CbmMQTsaSampler.h
*
......@@ -9,77 +13,75 @@
#define CBMMQTSASAMPLER_H_
#include "TimesliceSource.hpp"
#include "Timeslice.hpp"
#include "StorableTimeslice.hpp"
#include "MicrosliceDescriptor.hpp"
#include "StorableTimeslice.hpp"
#include "Timeslice.hpp"
#include "TimesliceSource.hpp"
//#include "Message.hpp"
#include "FairMQDevice.h"
#include <ctime>
#include <string>
#include <vector>
#include <ctime>
class CbmMQTsaSampler : public FairMQDevice
{
public:
CbmMQTsaSampler();
virtual ~CbmMQTsaSampler();
class CbmMQTsaSampler : public FairMQDevice {
public:
CbmMQTsaSampler();
virtual ~CbmMQTsaSampler();
protected:
uint64_t fMaxTimeslices;
protected:
uint64_t fMaxTimeslices;
std::string fFileName;
std::string fDirName;
std::string fFileName;
std::string fDirName;
std::vector<std::string> fInputFileList; ///< List of input files
uint64_t fFileCounter;
std::string fHost;
uint64_t fPort;
std::vector<std::string> fInputFileList; ///< List of input files
uint64_t fFileCounter;
std::string fHost;
uint64_t fPort;
uint64_t fTSNumber;
uint64_t fTSCounter;
uint64_t fMessageCounter;
uint64_t fTSNumber;
uint64_t fTSCounter;
uint64_t fMessageCounter;
int fMaxMemory = 0;
int fMaxMemory = 0;
virtual void InitTask();
virtual bool ConditionalRun();
virtual void InitTask();
virtual bool ConditionalRun();
private:
bool OpenNextFile();
private:
bool OpenNextFile();
bool CheckTimeslice(const fles::Timeslice& ts);
void PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor& mdsc);
bool SendData(const fles::StorableTimeslice& component);
void CalcRuntime();
bool IsChannelNameAllowed(std::string);
bool CreateAndSendComponent(const fles::Timeslice&, int);
bool SendData(const fles::StorableTimeslice&, int);
bool CheckTimeslice(const fles::Timeslice& ts);
void PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor& mdsc);
bool SendData(const fles::StorableTimeslice& component);
void CalcRuntime();
bool IsChannelNameAllowed(std::string);
bool CreateAndSendComponent(const fles::Timeslice&, int);
bool SendData(const fles::StorableTimeslice&, int);
fles::TimesliceSource* fSource; //!
std::chrono::steady_clock::time_point fTime;
fles::TimesliceSource* fSource; //!
std::chrono::steady_clock::time_point fTime;
// The vector fAllowedChannels contain the list of defined channel names
// which are used for connecting the different devices. For the time
// being the correct connection are done checking the names. A connection
// using the name stscomponent will receive timeslices containing the
// sts component only. The corresponding system ids are defined in the
// vector fSysId. At startup it is checked which channels are defined
// in the startup script such that later on only timeslices whith the
// corresponding data are send to the correct channels.
// TODO: Up to now we have three disconnected vectors which is very
// error prone. Find a better solution
// The vector fAllowedChannels contain the list of defined channel names
// which are used for connecting the different devices. For the time
// being the correct connection are done checking the names. A connection
// using the name stscomponent will receive timeslices containing the
// sts component only. The corresponding system ids are defined in the
// vector fSysId. At startup it is checked which channels are defined
// in the startup script such that later on only timeslices whith the
// corresponding data are send to the correct channels.
// TODO: Up to now we have three disconnected vectors which is very
// error prone. Find a better solution
std::vector<std::string> fAllowedChannels
= {"stscomponent","trdcomponent","tofcomponent"};
std::vector<int> fSysId = {16, 64, 96};
std::vector<std::string> fAllowedChannels = {"stscomponent", "trdcomponent", "tofcomponent"};
std::vector<int> fSysId = {16, 64, 96};
std::vector<int> fComponentsToSend = {0, 0, 0};
std::vector<std::vector<std::string>> fChannelsToSend = { {},{},{} };
std::vector<int> fComponentsToSend = {0, 0, 0};
std::vector<std::vector<std::string>> fChannelsToSend = {{}, {}, {}};
};
#endif /* CBMMQTSASAMPLER_H_ */
/* Copyright (C) 2018-2019 PI-UHd, GSI
SPDX-License-Identifier: GPL-3.0-only
Authors: Norbert Herrmann [committer], Florian Uhlig */
/**
* CbmMQTsaSamplerTof.cpp
*
......@@ -7,257 +11,255 @@
#include "CbmMQTsaSamplerTof.h"
#include "CbmMQDefs.h"
#include "FairMQLogger.h"
#include "FairMQProgOptions.h" // device->fConfig
#include "CbmMQDefs.h"
#include "TimesliceSubscriber.hpp"
#include "TimesliceInputArchive.hpp"
#include "TimesliceSubscriber.hpp"
#include "FairMQLogger.h"
#include "FairMQProgOptions.h" // device->fConfig
#include <boost/algorithm/string.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/filesystem.hpp>
#include <boost/regex.hpp>
#include <boost/algorithm/string.hpp>
namespace filesys = boost::filesystem;
#include <stdio.h>
#include <ctime>
#include <thread> // this_thread::sleep_for
#include <chrono>
#include <thread> // this_thread::sleep_for
#include <algorithm>
#include <chrono>
#include <ctime>
#include <string>
#include <stdio.h>
using namespace std;
#include <stdexcept>
struct InitTaskError : std::runtime_error { using std::runtime_error::runtime_error; };
struct InitTaskError : std::runtime_error {
using std::runtime_error::runtime_error;
};
CbmMQTsaSamplerTof::CbmMQTsaSamplerTof()
: FairMQDevice()
, fMaxTimeslices(0)
, fFileName("")
, fDirName("")
, fInputFileList()
, fFileCounter(0)
, fHost("")
, fPort(0)
, fTSNumber(0)
, fTSCounter(0)
, fMessageCounter(0)
, fSource(nullptr)
, fTime()
: FairMQDevice()
, fMaxTimeslices(0)
, fFileName("")
, fDirName("")
, fInputFileList()
, fFileCounter(0)
, fHost("")
, fPort(0)
, fTSNumber(0)
, fTSCounter(0)
, fMessageCounter(0)
, fSource(nullptr)
, fTime()
{
}
void CbmMQTsaSamplerTof::InitTask()
try
{
// Get the values from the command line options (via fConfig)
fFileName = fConfig->GetValue<string>("filename");
fDirName = fConfig->GetValue<string>("dirname");
fHost = fConfig->GetValue<string>("flib-host");
fPort = fConfig->GetValue<uint64_t>("flib-port");
fMaxTimeslices = fConfig->GetValue<uint64_t>("max-timeslices");
// Check which input is defined
// Posibilities
// filename && ! dirname : single file
// filename with wildcards && diranme : all files with filename regex in the directory
// host && port : connect to the flim server
bool isGoodInputCombi{false};
if ( 0 != fFileName.size() && 0 == fDirName.size() && 0 == fHost.size() && 0 == fPort ) {
isGoodInputCombi=true;
// Create a Path object from given path string
filesys::path pathObj(fFileName);
if ( ! filesys::is_regular_file(pathObj) ) {
throw InitTaskError("Passed file name is no valid file");
}
fInputFileList.push_back(fFileName);
LOG(info) << "Filename: " << fFileName;
} else if ( 0 != fFileName.size() && 0 != fDirName.size() && 0 == fHost.size() && 0 == fPort) {
isGoodInputCombi=true;
filesys::path pathObj = fDirName;
if ( ! filesys::is_directory(pathObj) ) {
throw InitTaskError("Passed directory name is no valid directory");
try {
// Get the values from the command line options (via fConfig)
fFileName = fConfig->GetValue<string>("filename");
fDirName = fConfig->GetValue<string>("dirname");
fHost = fConfig->GetValue<string>("flib-host");
fPort = fConfig->GetValue<uint64_t>("flib-port");
fMaxTimeslices = fConfig->GetValue<uint64_t>("max-timeslices");
// Check which input is defined
// Posibilities
// filename && ! dirname : single file
// filename with wildcards && diranme : all files with filename regex in the directory
// host && port : connect to the flim server
bool isGoodInputCombi {false};
if (0 != fFileName.size() && 0 == fDirName.size() && 0 == fHost.size() && 0 == fPort) {
isGoodInputCombi = true;
// Create a Path object from given path string
filesys::path pathObj(fFileName);
if (!filesys::is_regular_file(pathObj)) { throw InitTaskError("Passed file name is no valid file"); }
fInputFileList.push_back(fFileName);
LOG(info) << "Filename: " << fFileName;
}
else if (0 != fFileName.size() && 0 != fDirName.size() && 0 == fHost.size() && 0 == fPort) {
isGoodInputCombi = true;
filesys::path pathObj = fDirName;
if (!filesys::is_directory(pathObj)) { throw InitTaskError("Passed directory name is no valid directory"); }
if (fFileName.find("*") == std::string::npos) {
// Normal file without wildcards
pathObj += fFileName;
if (!filesys::is_regular_file(pathObj)) { throw InitTaskError("Passed file name is no valid file"); }
fInputFileList.push_back(pathObj.string());
LOG(info) << "Filename: " << fInputFileList[0];
}
else {
std::vector<filesys::path> v;
// escape "." which have a special meaning in regex
// change "*" to ".*" to find any number
// e.g. tofget4_hd2018.*.tsa => tofget4_hd2018\..*\.tsa
boost::replace_all(fFileName, ".", "\\.");
boost::replace_all(fFileName, "*", ".*");
// create regex
const boost::regex my_filter(fFileName);
// loop over all files in input directory
for (auto&& x : filesys::directory_iterator(pathObj)) {
// Skip if not a file
if (!boost::filesystem::is_regular_file(x)) continue;
// Skip if no match
// x.path().leaf().string() means get from directory iterator the
// current entry as filesys::path, from this extract the leaf
// filename or directory name and convert it to a string to be
// used in the regex:match
boost::smatch what;
if (!boost::regex_match(x.path().leaf().string(), what, my_filter)) continue;
v.push_back(x.path());
}
if (fFileName.find("*") == std::string::npos) {
// Normal file without wildcards
pathObj += fFileName;
if ( ! filesys::is_regular_file(pathObj) ) {
throw InitTaskError("Passed file name is no valid file");
}
fInputFileList.push_back(pathObj.string());
LOG(info) << "Filename: " << fInputFileList[0];
} else {
std::vector<filesys::path> v;
// escape "." which have a special meaning in regex
// change "*" to ".*" to find any number
// e.g. tofget4_hd2018.*.tsa => tofget4_hd2018\..*\.tsa
boost::replace_all(fFileName, ".", "\\.");
boost::replace_all(fFileName, "*", ".*");
// create regex
const boost::regex my_filter(fFileName);
// loop over all files in input directory
for (auto&& x : filesys::directory_iterator(pathObj)) {
// Skip if not a file
if( !boost::filesystem::is_regular_file( x ) ) continue;
// Skip if no match
// x.path().leaf().string() means get from directory iterator the
// current entry as filesys::path, from this extract the leaf
// filename or directory name and convert it to a string to be
// used in the regex:match
boost::smatch what;
if( !boost::regex_match( x.path().leaf().string(), what, my_filter ) ) continue;
v.push_back(x.path());
}
// sort the files which match the regex in increasing order
// (hopefully)
std::sort(v.begin(), v.end());
for (auto&& x : v)
fInputFileList.push_back(x.string());
// sort the files which match the regex in increasing order
// (hopefully)
std::sort(v.begin(), v.end());
LOG(info) << "The following files will be used in this order.";
for (auto&& x : v)
LOG(info) << " " << x;
}
// throw InitTaskError("Input is a directory");
} else if ( 0 == fFileName.size() && 0 == fDirName.size() && 0 != fHost.size() && 0!= fPort) {
isGoodInputCombi=true;
LOG(info) << "Host: " << fHost;
LOG(info) << "Port: " << fPort;
} else {
isGoodInputCombi=false;
}
for (auto&& x : v)
fInputFileList.push_back(x.string());
if (!isGoodInputCombi) {
throw InitTaskError("Wrong combination of inputs. Either file or wildcard file + directory or host + port are allowed combination.");
LOG(info) << "The following files will be used in this order.";
for (auto&& x : v)
LOG(info) << " " << x;
}
// throw InitTaskError("Input is a directory");
}
else if (0 == fFileName.size() && 0 == fDirName.size() && 0 != fHost.size() && 0 != fPort) {
isGoodInputCombi = true;
LOG(info) << "Host: " << fHost;
LOG(info) << "Port: " << fPort;
}
else {
isGoodInputCombi = false;
}
if (!isGoodInputCombi) {
throw InitTaskError("Wrong combination of inputs. Either file or wildcard file + directory "
"or host + port are allowed combination.");
}
LOG(info) << "MaxTimeslices: " << fMaxTimeslices;
// Get the information about created channels from the device
// Check if the defined channels from the topology (by name)
// are in the list of channels which are possible/allowed
// for the device
// The idea is to check at initilization if the devices are
// properly connected. For the time beeing this is done with a
// nameing convention. It is not avoided that someone sends other
// data on this channel.
int noChannel = fChannels.size();
LOG(info) << "Number of defined output channels: " << noChannel;
for(auto const &entry : fChannels) {
LOG(info) << "Channel name: " << entry.first;
if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
}
for(auto const& value: fComponentsToSend) {
LOG(info) << "Value : " << value;
if (value > 1) {
throw InitTaskError("Sending same data to more than one output channel not implemented yet.");
}
LOG(info) << "MaxTimeslices: " << fMaxTimeslices;
// Get the information about created channels from the device
// Check if the defined channels from the topology (by name)
// are in the list of channels which are possible/allowed
// for the device
// The idea is to check at initilization if the devices are
// properly connected. For the time beeing this is done with a
// nameing convention. It is not avoided that someone sends other
// data on this channel.
int noChannel = fChannels.size();
LOG(info) << "Number of defined output channels: " << noChannel;
for (auto const& entry : fChannels) {
LOG(info) << "Channel name: " << entry.first;
if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
}
for (auto const& value : fComponentsToSend) {
LOG(info) << "Value : " << value;
if (value > 1) {
throw InitTaskError("Sending same data to more than one output channel "
"not implemented yet.");
}
}
if ( 0 == fFileName.size() && 0 != fHost.size() ) {
if (0 == fFileName.size() && 0 != fHost.size()) {
std::string connector = "tcp://" + fHost + ":" + std::to_string(fPort);
LOG(info) << "Open TSPublisher at " << connector;
fSource = new fles::TimesliceSubscriber(connector);
if ( !fSource) {
throw InitTaskError("Could not connect to publisher.");
}
} else {
if( false == OpenNextFile() )
{
fSource = new fles::TimesliceSubscriber(connector, 1);
if (!fSource) { throw InitTaskError("Could not connect to publisher."); }
}
else {
if (false == OpenNextFile()) {
throw InitTaskError("Could not open the first input file in the list, Doing nothing!");
}
}
fTime = std::chrono::steady_clock::now();
} catch (InitTaskError& e) {
}
catch (InitTaskError& e) {
LOG(error) << e.what();
// Wrapper defined in CbmMQDefs.h to support different FairMQ versions
cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound);
}
bool CbmMQTsaSamplerTof::OpenNextFile()
{
bool CbmMQTsaSamplerTof::OpenNextFile()
{
// First Close and delete existing source
if( nullptr != fSource )
delete fSource;
if (nullptr != fSource) delete fSource;
if (fInputFileList.size() > 0) {
fFileName = fInputFileList[0];
fInputFileList.erase(fInputFileList.begin());
LOG(info) << "Open the Flib input file " << fFileName;
filesys::path pathObj(fFileName);
if ( ! filesys::is_regular_file(pathObj) ) {
if (!filesys::is_regular_file(pathObj)) {
LOG(error) << "Input file " << fFileName << " doesn't exist.";
return false;
}
fSource = new fles::TimesliceInputArchive(fFileName);
if ( !fSource) {
if (!fSource) {
LOG(error) << "Could not open input file.";
return false;
}
} else {
}
else {
LOG(info) << "End of files list reached.";
return false;
}
}
return true;
}
bool CbmMQTsaSamplerTof::IsChannelNameAllowed(std::string channelName)
{
LOG(info) << "Number of allowed channels: " << fAllowedChannels.size();
for(auto const &entry : fAllowedChannels) {
for (auto const& entry : fAllowedChannels) {
LOG(info) << "Inspect " << entry;
std::size_t pos1 = channelName.find(entry);
if (pos1!=std::string::npos) {
if (pos1 != std::string::npos) {
const vector<std::string>::const_iterator pos =
std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
const vector<std::string>::size_type idx = pos-fAllowedChannels.begin();
std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
const vector<std::string>::size_type idx = pos - fAllowedChannels.begin();
LOG(info) << "Found " << entry << " in " << channelName;
LOG(info) << "Channel name " << channelName
<< " found in list of allowed channel names at position " << idx;
if(idx<3) { //FIXME, hardwired constant!!!
fComponentsToSend[idx]++;
fChannelsToSend[idx].push_back(channelName);
LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
if (idx < 3) { //FIXME, hardwired constant!!!
fComponentsToSend[idx]++;
fChannelsToSend[idx].push_back(channelName);
}
return true;
}
}
LOG(info) << "Channel name " << channelName
<< " not found in list of allowed channel names.";
LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
LOG(error) << "Stop device.";
return false;
}
bool CbmMQTsaSamplerTof::IsChannelUp(std::string channelName)
{
for(auto const &entry : fChannels) {
for (auto const& entry : fChannels) {
LOG(info) << "Inspect " << entry.first;
std::size_t pos1 = channelName.find(entry.first);
if (pos1!=std::string::npos) {
LOG(info) << "Channel name " << channelName
<< " found in list of defined channel names ";
if (pos1 != std::string::npos) {
LOG(info) << "Channel name " << channelName << " found in list of defined channel names ";
return true;
}
}
LOG(info) << "Channel name " << channelName
<< " not found in list of defined channel names.";
LOG(info) << "Channel name " << channelName << " not found in list of defined channel names.";
LOG(error) << "Stop device.";
return false;
}
......@@ -271,14 +273,12 @@ bool CbmMQTsaSamplerTof::ConditionalRun()
fTSCounter++;
const fles::Timeslice& ts = *timeslice;
auto tsIndex = ts.index();
auto tsIndex = ts.index();
if (fTSCounter % 10000 == 0)
LOG(info) << "Sample TimeSlice " << fTSCounter<<", Index "<< tsIndex;
if (fTSCounter % 10000 == 0) LOG(info) << "Sample TimeSlice " << fTSCounter << ", Index " << tsIndex;
LOG(debug) << "Found " << ts.num_components()
<< " different components in timeslice "
<< fTSCounter << ", index "<< tsIndex;
LOG(debug) << "Found " << ts.num_components() << " different components in timeslice " << fTSCounter << ", index "
<< tsIndex;
CheckTimeslice(ts);
......@@ -292,86 +292,88 @@ bool CbmMQTsaSamplerTof::ConditionalRun()
std::vector<bool> bparts;
parts.resize(fComponentsToSend.size());
bparts.resize(parts.size());
for(uint i=0; i<bparts.size(); i++) bparts[i]=false;
LOG(debug) << "parts with size "<<parts.size();
for (uint i = 0; i < bparts.size(); i++)
bparts[i] = false;
LOG(debug) << "parts with size " << parts.size();
for (uint nrComp = 0; nrComp < ts.num_components(); ++nrComp) {
// CreateAndCombineComponents(ts, nrComp);
LOG(debug) <<"nrComp "<< nrComp<< ", SysID: " << static_cast<int>(ts.descriptor(nrComp,0).sys_id);
const vector<int>::const_iterator pos =
std::find(fSysId.begin(), fSysId.end(), static_cast<int>(ts.descriptor(nrComp,0).sys_id));
if (pos != fSysId.end() ) {
const vector<std::string>::size_type idx = pos-fSysId.begin();
if (fComponentsToSend[idx]>0) {
LOG(debug) << "Append timeslice component of link " << nrComp<< " to idx "<<idx;
fles::StorableTimeslice component{static_cast<uint32_t>(ts.num_microslices(nrComp), ts.index())};
component.append_component(ts.num_microslices(0));
for (size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
component.append_microslice( 0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m) );
}
//LOG(debug)<<"Parts size available for "<<idx<<": "<<parts.size();
//if(idx > parts.size()-1) parts.resize(idx+1);
//if ( !AppendData(component, idx) ) return false;
// serialize the timeslice and create the message
std::stringstream oss;
boost::archive::binary_oarchive oa(oss);
oa << component;
std::string* strMsg = new std::string(oss.str());
LOG(debug)<<"AddParts to "<<idx<<": current size "<<parts[idx].Size();
parts[idx].AddPart(NewMessage(const_cast<char*>(strMsg->c_str()), // data
strMsg->length(), // size
[](void* /*data*/, void* object){ delete static_cast<std::string*>(object); },
strMsg)); // object that manages the data
bparts[idx]=true;
}
}
// CreateAndCombineComponents(ts, nrComp);
LOG(debug) << "nrComp " << nrComp << ", SysID: " << static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
const vector<int>::const_iterator pos =
std::find(fSysId.begin(), fSysId.end(), static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
if (pos != fSysId.end()) {
const vector<std::string>::size_type idx = pos - fSysId.begin();
if (fComponentsToSend[idx] > 0) {
LOG(debug) << "Append timeslice component of link " << nrComp << " to idx " << idx;
fles::StorableTimeslice component {static_cast<uint32_t>(ts.num_core_microslices()), ts.index()};
component.append_component(ts.num_microslices(0));
for (size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
component.append_microslice(0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m));
}
//LOG(debug)<<"Parts size available for "<<idx<<": "<<parts.size();
//if(idx > parts.size()-1) parts.resize(idx+1);
//if ( !AppendData(component, idx) ) return false;
// serialize the timeslice and create the message
std::stringstream oss;
boost::archive::binary_oarchive oa(oss);
oa << component;
std::string* strMsg = new std::string(oss.str());
LOG(debug) << "AddParts to " << idx << ": current size " << parts[idx].Size();
parts[idx].AddPart(NewMessage(
const_cast<char*>(strMsg->c_str()), // data
strMsg->length(), // size
[](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
strMsg)); // object that manages the data
bparts[idx] = true;
}
}
}
for (uint idx=0; idx<parts.size(); idx++)
if(bparts[idx]){
LOG(debug) << "Send parts with size "<< parts[idx].Size()
<<" to channel " << fChannelsToSend[idx][0];
if (Send(parts[idx], fChannelsToSend[idx][0]) < 0) {
LOG(error) << "Problem sending data";
return false;
}
LOG(debug) << "Sent message " << fMessageCounter << " with a size of "
<< parts[idx].Size();
fMessageCounter++;
}
//if(!SendTs()) return false;
for (uint idx = 0; idx < parts.size(); idx++)
if (bparts[idx]) {
LOG(debug) << "Send parts with size " << parts[idx].Size() << " to channel " << fChannelsToSend[idx][0];
if (Send(parts[idx], fChannelsToSend[idx][0]) < 0) {
LOG(error) << "Problem sending data";
return false;
}
LOG(debug) << "Sent message " << fMessageCounter << " with a size of " << parts[idx].Size();
fMessageCounter++;
}
//if(!SendTs()) return false;
return true;
} else {
}
else {
LOG(info) << " Number of requested time slices reached, exiting ";
if ( false == OpenNextFile() ) {
if (false == OpenNextFile()) {
CalcRuntime();
SendSysCmdStop();
SendSysCmdStop();
return false;
} else {
}
else {
CalcRuntime();
SendSysCmdStop();
SendSysCmdStop();
return false;
}
}
} else {
if ( false == OpenNextFile() ) {
}
else {
if (false == OpenNextFile()) {
CalcRuntime();
SendSysCmdStop();
return false;
} else {
}
else {
return true;
}
}
}
bool CbmMQTsaSamplerTof::CreateAndCombineComponents(const fles::Timeslice& /*ts*/, int /*nrComp*/)
......@@ -388,7 +390,7 @@ bool CbmMQTsaSamplerTof::CreateAndCombineComponents(const fles::Timeslice& /*ts*
if (fComponentsToSend[idx]>0) {
LOG(debug) << "Append timeslice component of link " << nrComp<< " to idx "<<idx;
fles::StorableTimeslice component{static_cast<uint32_t>(ts.num_microslices(nrComp), ts.index())};
fles::StorableTimeslice component{static_cast<uint32_t>(ts.num_core_microslices()), ts.index()};
component.append_component(ts.num_microslices(0));
for (size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
......@@ -408,7 +410,7 @@ bool CbmMQTsaSamplerTof::CreateAndCombineComponents(const fles::Timeslice& /*ts*
}
bool CbmMQTsaSamplerTof::AppendData(const fles::StorableTimeslice& /*component*/, int /*idx*/)
{
{
// serialize the timeslice and create the message
/*
std::stringstream oss;
......@@ -427,9 +429,9 @@ bool CbmMQTsaSamplerTof::AppendData(const fles::StorableTimeslice& /*component*/
}
bool CbmMQTsaSamplerTof::SendTs()
{
{
/*
for (int idx=0; idx<parts.size(); idx++)
for (int idx=0; idx<parts.size(); idx++)
if(bparts[idx]){
LOG(debug) << "Send data to channel " << fChannelsToSend[idx][0];
if (Send(parts[idx], fChannelsToSend[idx][0]) < 0) {
......@@ -452,21 +454,21 @@ bool CbmMQTsaSamplerTof::CreateAndSendComponent(const fles::Timeslice& ts, int n
// is connected create the new timeslice and send it to the
// correct channel
LOG(debug) << "SysID: " << static_cast<int>(ts.descriptor(nrComp,0).sys_id);
LOG(debug) << "SysID: " << static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
const vector<int>::const_iterator pos =
std::find(fSysId.begin(), fSysId.end(), static_cast<int>(ts.descriptor(nrComp,0).sys_id));
if (pos != fSysId.end() ) {
const vector<std::string>::size_type idx = pos-fSysId.begin();
if (fComponentsToSend[idx]>0) {
std::find(fSysId.begin(), fSysId.end(), static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
if (pos != fSysId.end()) {
const vector<std::string>::size_type idx = pos - fSysId.begin();
if (fComponentsToSend[idx] > 0) {
LOG(debug) << "Create timeslice component for link " << nrComp;
fles::StorableTimeslice component{static_cast<uint32_t>(ts.num_microslices(nrComp), ts.index())};
fles::StorableTimeslice component {static_cast<uint32_t>(ts.num_core_microslices()), ts.index()};
component.append_component(ts.num_microslices(0));
for (size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
component.append_microslice( 0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m) );
component.append_microslice(0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m));
}
if ( ! SendData(component, idx) ) return false;
if (!SendData(component, idx)) return false;
return true;
}
}
......@@ -474,23 +476,22 @@ bool CbmMQTsaSamplerTof::CreateAndSendComponent(const fles::Timeslice& ts, int n
}
bool CbmMQTsaSamplerTof::SendData(const fles::StorableTimeslice& component, int idx)
{
{
// serialize the timeslice and create the message
std::stringstream oss;
boost::archive::binary_oarchive oa(oss);
oa << component;
std::string* strMsg = new std::string(oss.str());
FairMQMessagePtr msg(NewMessage(const_cast<char*>(strMsg->c_str()), // data
strMsg->length(), // size
[](void* /*data*/, void* object){ delete static_cast<std::string*>(object); },
strMsg)); // object that manages the data
FairMQMessagePtr msg(NewMessage(
const_cast<char*>(strMsg->c_str()), // data
strMsg->length(), // size
[](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
strMsg)); // object that manages the data
// TODO: Implement sending same data to more than one channel
// Need to create new message (copy message??)
if (fComponentsToSend[idx]>1) {
LOG(debug) << "Need to copy FairMessage";
}
if (fComponentsToSend[idx] > 1) { LOG(debug) << "Need to copy FairMessage"; }
// in case of error or transfer interruption,
// return false to go to IDLE state
......@@ -504,40 +505,31 @@ bool CbmMQTsaSamplerTof::SendData(const fles::StorableTimeslice& component, int
}
fMessageCounter++;
LOG(debug) << "Send message " << fMessageCounter << " with a size of "
<< msg->GetSize();
LOG(debug) << "Send message " << fMessageCounter << " with a size of " << msg->GetSize();
return true;
}
CbmMQTsaSamplerTof::~ CbmMQTsaSamplerTof()
{
}
CbmMQTsaSamplerTof::~CbmMQTsaSamplerTof() {}
void CbmMQTsaSamplerTof::CalcRuntime()
{
std::chrono::duration<double> run_time =
std::chrono::steady_clock::now() - fTime;
std::chrono::duration<double> run_time = std::chrono::steady_clock::now() - fTime;
LOG(info) << "Runtime: " << run_time.count();
LOG(info) << "No more input data";
}
void CbmMQTsaSamplerTof::PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor& mdsc)
{
LOG(info) << "Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id)
<< std::dec;
LOG(info) << "Header version: Ox" << std::hex << static_cast<int>(mdsc.hdr_ver)
<< std::dec;
LOG(info) << "Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id) << std::dec;
LOG(info) << "Header version: Ox" << std::hex << static_cast<int>(mdsc.hdr_ver) << std::dec;
LOG(info) << "Equipement ID: " << mdsc.eq_id;
LOG(info) << "Flags: " << mdsc.flags;
LOG(info) << "Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id)
<< std::dec;
LOG(info) << "Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver)
<< std::dec;
LOG(info) << "Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id) << std::dec;
LOG(info) << "Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver) << std::dec;
LOG(info) << "Microslice Idx: " << mdsc.idx;
LOG(info) << "Checksum: " << mdsc.crc;
LOG(info) << "Size: " << mdsc.size;
......@@ -546,26 +538,22 @@ void CbmMQTsaSamplerTof::PrintMicroSliceDescriptor(const fles::MicrosliceDescrip
bool CbmMQTsaSamplerTof::CheckTimeslice(const fles::Timeslice& ts)
{
if ( 0 == ts.num_components() ) {
if (0 == ts.num_components()) {
LOG(error) << "No Component in TS " << ts.index();
return 1;
}
LOG(debug) << "Found " << ts.num_components()
<< " different components in timeslice";
LOG(debug) << "Found " << ts.num_components() << " different components in timeslice";
for (size_t c = 0; c < ts.num_components(); ++c) {
LOG(debug) << "Found " << ts.num_microslices(c)
<< " microslices in component " << c;
LOG(debug) << "Component " << c << " has a size of "
<< ts.size_component(c) << " bytes";
LOG(debug) << "Component " << c << " has the system id 0x"
<< std::hex << static_cast<int>(ts.descriptor(c,0).sys_id)
<< std::dec;
LOG(debug) << "Found " << ts.num_microslices(c) << " microslices in component " << c;
LOG(debug) << "Component " << c << " has a size of " << ts.size_component(c) << " bytes";
LOG(debug) << "Component " << c << " has the system id 0x" << std::hex
<< static_cast<int>(ts.descriptor(c, 0).sys_id) << std::dec;
/*
LOG(debug) << "Component " << c << " has the system id 0x"
<< static_cast<int>(ts.descriptor(c,0).sys_id);
*/
/*
/*
for (size_t m = 0; m < ts.num_microslices(c); ++m) {
PrintMicroSliceDescriptor(ts.descriptor(c,m));
}
......@@ -577,24 +565,18 @@ bool CbmMQTsaSamplerTof::CheckTimeslice(const fles::Timeslice& ts)
void CbmMQTsaSamplerTof::SendSysCmdStop()
{
if(IsChannelUp("syscmd")){
if (IsChannelUp("syscmd")) {
LOG(info) << "stop subscribers in 100 sec";
std::this_thread::sleep_for(std::chrono::milliseconds(100000));
FairMQMessagePtr pub(NewSimpleMessage("STOP"));
if (Send(pub, "syscmd") < 0) {
LOG(error) << "Sending STOP message failed";
}
if (Send(pub, "syscmd") < 0) { LOG(error) << "Sending STOP message failed"; }
LOG(info) << "task reset subscribers in 1 sec";
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
FairMQMessagePtr task_reset(NewSimpleMessage("TASK_RESET"));
if (Send(task_reset, "syscmd") < 0) {
LOG(error) << "Sending Task_Reset message failed";
}
if (Send(task_reset, "syscmd") < 0) { LOG(error) << "Sending Task_Reset message failed"; }
}
// FairMQStateMachine::ChangeState(STOP);
}
/* Copyright (C) 2018 PI-UHd, GSI
SPDX-License-Identifier: GPL-3.0-only
Authors: Norbert Herrmann [committer] */
/**
* CbmMQTsaSamplerTof.h
*
......@@ -9,82 +13,79 @@
#define CBMMQTSASAMPLERTOF_H_
#include "TimesliceSource.hpp"
#include "Timeslice.hpp"
#include "StorableTimeslice.hpp"
#include "MicrosliceDescriptor.hpp"
#include "StorableTimeslice.hpp"
#include "Timeslice.hpp"
#include "TimesliceSource.hpp"
//#include "Message.hpp"
#include "FairMQDevice.h"
#include <ctime>
#include <string>
#include <vector>
#include <ctime>
class CbmMQTsaSamplerTof : public FairMQDevice
{
public:
CbmMQTsaSamplerTof();
virtual ~CbmMQTsaSamplerTof();
protected:
uint64_t fMaxTimeslices;
std::string fFileName;
std::string fDirName;
std::vector<std::string> fInputFileList; ///< List of input files
uint64_t fFileCounter;
std::string fHost;
uint64_t fPort;
uint64_t fTSNumber;
uint64_t fTSCounter;
uint64_t fMessageCounter;
int fMaxMemory = 0;
virtual void InitTask();
virtual bool ConditionalRun();
private:
bool OpenNextFile();
bool CheckTimeslice(const fles::Timeslice& ts);
void PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor& mdsc);
bool SendData(const fles::StorableTimeslice& component);
void CalcRuntime();
bool IsChannelNameAllowed(std::string);
bool IsChannelUp(std::string);
bool CreateAndSendComponent(const fles::Timeslice&, int);
bool SendData(const fles::StorableTimeslice&, int);
bool CreateAndCombineComponents(const fles::Timeslice&, int);
bool AppendData(const fles::StorableTimeslice&, int);
bool SendTs();
void SendSysCmdStop();
fles::TimesliceSource* fSource; //!
std::chrono::steady_clock::time_point fTime;
// The vector fAllowedChannels contain the list of defined channel names
// which are used for connecting the different devices. For the time
// being the correct connection are done checking the names. A connection
// using the name stscomponent will receive timeslices containing the
// sts component only. The corresponding system ids are defined in the
// vector fSysId. At startup it is checked which channels are defined
// in the startup script such that later on only timeslices whith the
// corresponding data are send to the correct channels.
// TODO: Up to now we have three disconnected vectors which is very
// error prone. Find a better solution
std::vector<std::string> fAllowedChannels
= {"stscomponent","trdcomponent","tofcomponent","syscmd","syscmdin"};
std::vector<int> fSysId = {16, 64, 96};
std::vector<int> fComponentsToSend = {0, 0, 0};
std::vector<std::vector<std::string>> fChannelsToSend = { {},{},{} };
class CbmMQTsaSamplerTof : public FairMQDevice {
public:
CbmMQTsaSamplerTof();
virtual ~CbmMQTsaSamplerTof();
protected:
uint64_t fMaxTimeslices;
std::string fFileName;
std::string fDirName;
std::vector<std::string> fInputFileList; ///< List of input files
uint64_t fFileCounter;
std::string fHost;
uint64_t fPort;
uint64_t fTSNumber;
uint64_t fTSCounter;
uint64_t fMessageCounter;
int fMaxMemory = 0;
virtual void InitTask();
virtual bool ConditionalRun();
private:
bool OpenNextFile();
bool CheckTimeslice(const fles::Timeslice& ts);
void PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor& mdsc);
bool SendData(const fles::StorableTimeslice& component);
void CalcRuntime();
bool IsChannelNameAllowed(std::string);
bool IsChannelUp(std::string);
bool CreateAndSendComponent(const fles::Timeslice&, int);
bool SendData(const fles::StorableTimeslice&, int);
bool CreateAndCombineComponents(const fles::Timeslice&, int);
bool AppendData(const fles::StorableTimeslice&, int);
bool SendTs();
void SendSysCmdStop();
fles::TimesliceSource* fSource; //!
std::chrono::steady_clock::time_point fTime;
// The vector fAllowedChannels contain the list of defined channel names
// which are used for connecting the different devices. For the time
// being the correct connection are done checking the names. A connection
// using the name stscomponent will receive timeslices containing the
// sts component only. The corresponding system ids are defined in the
// vector fSysId. At startup it is checked which channels are defined
// in the startup script such that later on only timeslices whith the
// corresponding data are send to the correct channels.
// TODO: Up to now we have three disconnected vectors which is very
// error prone. Find a better solution
std::vector<std::string> fAllowedChannels = {"stscomponent", "trdcomponent", "tofcomponent", "syscmd", "syscmdin"};
std::vector<int> fSysId = {16, 64, 96};
std::vector<int> fComponentsToSend = {0, 0, 0};
std::vector<std::vector<std::string>> fChannelsToSend = {{}, {}, {}};
};
#endif /* CBMMQTSASAMPLERTOF_H_ */
/* Copyright (C) 2019-2020 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Florian Uhlig [committer] */
/**
* CbmStsDigiSource.cpp
*
......@@ -7,30 +11,32 @@
#include "CbmStsDigiSource.h"
#include "CbmMQDefs.h"
#include "CbmDigiManager.h"
#include "CbmMQDefs.h"
#include "CbmStsDigi.h"
#include "FairFileSource.h"
#include "FairMQLogger.h"
#include "FairMQProgOptions.h" // device->fConfig
#include "FairMQProgOptions.h" // device->fConfig
#include "FairRootManager.h"
#include "FairRunAna.h"
#include "FairFileSource.h"
#include <thread> // this_thread::sleep_for
#include <boost/archive/binary_oarchive.hpp>
#include <stdio.h>
#include <ctime>
#include <thread> // this_thread::sleep_for
#include <chrono>
#include <ctime>
#include <stdexcept>
#include <stdio.h>
using namespace std;
struct InitTaskError : std::runtime_error { using std::runtime_error::runtime_error; };
struct InitTaskError : std::runtime_error {
using std::runtime_error::runtime_error;
};
CbmStsDigiSource::CbmStsDigiSource()
......@@ -47,16 +53,15 @@ CbmStsDigiSource::CbmStsDigiSource()
}
void CbmStsDigiSource::InitTask()
try
{
try {
// Get the values from the command line options (via fConfig)
fFileName = fConfig->GetValue<string>("filename");
fFileName = fConfig->GetValue<string>("filename");
fMaxEvents = fConfig->GetValue<uint64_t>("max-events");
LOG(info) << "Filename: " << fFileName;
LOG(info) << "MaxEvents: " << fMaxEvents;
// Get the information about created channels from the device
// Check if the defined channels from the topology (by name)
// are in the list of channels which are possible/allowed
......@@ -67,61 +72,58 @@ try
// data on this channel.
int noChannel = fChannels.size();
LOG(info) << "Number of defined output channels: " << noChannel;
for(auto const &entry : fChannels) {
for (auto const& entry : fChannels) {
LOG(info) << "Channel name: " << entry.first;
if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
}
FairRootManager* rootman = FairRootManager::Instance();
if ( 0 != fFileName.size() ) {
if (0 != fFileName.size()) {
LOG(info) << "Open the ROOT input file " << fFileName;
// Check if the input file exist
FILE* inputFile = fopen(fFileName.c_str(), "r");
if ( ! inputFile ) {
throw InitTaskError("Input file doesn't exist.");
}
if (!inputFile) { throw InitTaskError("Input file doesn't exist."); }
fclose(inputFile);
FairFileSource* source = new FairFileSource(fFileName);
if ( !source) {
throw InitTaskError("Could not open input file.");
}
FairFileSource* source = new FairFileSource(fFileName);
if (!source) { throw InitTaskError("Could not open input file."); }
rootman->SetSource(source);
rootman->InitSource();
CbmDigiManager* digiMan = CbmDigiManager::Instance();
digiMan->Init();
if ( ! digiMan->IsPresent(ECbmModuleId::kSts) ) {
throw InitTaskError("No StsDigi branch in input!");
}
} else {
throw InitTaskError("No input file specified");
if (!digiMan->IsPresent(ECbmModuleId::kSts)) { throw InitTaskError("No StsDigi branch in input!"); }
}
else {
throw InitTaskError("No input file specified");
}
Int_t MaxAllowed=FairRootManager::Instance()->CheckMaxEventNo(fMaxEvents);
if ( MaxAllowed != -1 ) {
if (fMaxEvents == 0) {
fMaxEvents = MaxAllowed;
} else {
Int_t MaxAllowed = FairRootManager::Instance()->CheckMaxEventNo(fMaxEvents);
if (MaxAllowed != -1) {
if (fMaxEvents == 0) { fMaxEvents = MaxAllowed; }
else {
if (static_cast<Int_t>(fMaxEvents) > MaxAllowed) {
LOG(warn) << "-------------------Warning---------------------------";
LOG(warn) << " File has less events than requested!!";
LOG(warn) << " File contains : " << MaxAllowed << " Events";
LOG(warn) << " Requested number of events = " << fMaxEvents << " Events";
LOG(warn) << " File contains : " << MaxAllowed << " Events";
LOG(warn) << " Requested number of events = " << fMaxEvents << " Events";
LOG(warn) << " The number of events is set to " << MaxAllowed << " Events";
LOG(warn) << "-----------------------------------------------------";
fMaxEvents = MaxAllowed;
}
}
LOG(info) << "After checking, the run will run from event 0 " << " to " << fMaxEvents << ".";
} else {
LOG(info) << "continue running without stop";
LOG(info) << "After checking, the run will run from event 0 "
<< " to " << fMaxEvents << ".";
}
else {
LOG(info) << "continue running without stop";
}
fTime = std::chrono::steady_clock::now();
} catch (InitTaskError& e) {
}
catch (InitTaskError& e) {
LOG(error) << e.what();
// Wrapper defined in CbmMQDefs.h to support different FairMQ versions
cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound);
......@@ -129,14 +131,12 @@ try
bool CbmStsDigiSource::IsChannelNameAllowed(std::string channelName)
{
if ( std::find(fAllowedChannels.begin(), fAllowedChannels.end(),
channelName) != fAllowedChannels.end() ) {
LOG(info) << "Channel name " << channelName
<< " found in list of allowed channel names.";
if (std::find(fAllowedChannels.begin(), fAllowedChannels.end(), channelName) != fAllowedChannels.end()) {
LOG(info) << "Channel name " << channelName << " found in list of allowed channel names.";
return true;
} else {
LOG(info) << "Channel name " << channelName
<< " not found in list of allowed channel names.";
}
else {
LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
LOG(error) << "Stop device.";
return false;
}
......@@ -145,50 +145,43 @@ bool CbmStsDigiSource::IsChannelNameAllowed(std::string channelName)
bool CbmStsDigiSource::ConditionalRun()
{
Int_t readEventReturn = FairRootManager::Instance()->ReadEvent(fEventCounter);
LOG(info) <<"Return value: " << readEventReturn;
Int_t readEventReturn = FairRootManager::Instance()->ReadEvent(fEventCounter);
LOG(info) << "Return value: " << readEventReturn;
if ( readEventReturn != 0 ) {
LOG(warn) << "FairRootManager::Instance()->ReadEvent(" << fEventCounter << ") returned " << readEventReturn << ". Breaking the event loop";
CalcRuntime();
return false;
}
if (readEventReturn != 0) {
LOG(warn) << "FairRootManager::Instance()->ReadEvent(" << fEventCounter << ") returned " << readEventReturn
<< ". Breaking the event loop";
CalcRuntime();
return false;
}
for (Int_t index = 0; index < CbmDigiManager::Instance()->GetNofDigis(ECbmModuleId::kSts); index++) {
const CbmStsDigi* stsDigi = CbmDigiManager::Instance()->Get<CbmStsDigi>(index);
PrintStsDigi(stsDigi);
}
for (Int_t index = 0; index < CbmDigiManager::Instance()->GetNofDigis(ECbmModuleId::kSts); index++) {
const CbmStsDigi* stsDigi = CbmDigiManager::Instance()->Get<CbmStsDigi>(index);
PrintStsDigi(stsDigi);
}
if (fEventCounter % 10000 == 0) LOG(info) << "Analyse Event " << fEventCounter;
fEventCounter++;
if (fEventCounter % 10000 == 0) LOG(info) << "Analyse Event " << fEventCounter;
fEventCounter++;
LOG(info) << "Counter: " << fEventCounter << " Events: " << fMaxEvents;
if (fEventCounter < fMaxEvents) {
return true;
} else {
CalcRuntime();
return false;
}
LOG(info) << "Counter: " << fEventCounter << " Events: " << fMaxEvents;
if (fEventCounter < fMaxEvents) { return true; }
else {
CalcRuntime();
return false;
}
}
CbmStsDigiSource::~ CbmStsDigiSource()
{
}
CbmStsDigiSource::~CbmStsDigiSource() {}
void CbmStsDigiSource::CalcRuntime()
{
std::chrono::duration<double> run_time =
std::chrono::steady_clock::now() - fTime;
std::chrono::duration<double> run_time = std::chrono::steady_clock::now() - fTime;
LOG(info) << "Runtime: " << run_time.count();
LOG(info) << "No more input data";
}
void CbmStsDigiSource::PrintStsDigi(const CbmStsDigi* digi)
{
LOG(info) << digi->ToString();
}
void CbmStsDigiSource::PrintStsDigi(const CbmStsDigi* digi) { LOG(info) << digi->ToString(); }
/* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Florian Uhlig [committer] */
/**
* CbmStsDigiSource.h
*
......@@ -9,45 +13,42 @@
#define CBMSTSDIGISOURCE_H_
#include "FairMQDevice.h"
#include <ctime>
#include <string>
#include <vector>
#include <ctime>
class CbmStsDigi;
class CbmStsDigiSource : public FairMQDevice
{
public:
CbmStsDigiSource();
virtual ~CbmStsDigiSource();
protected:
uint64_t fMaxEvents;
class CbmStsDigiSource : public FairMQDevice {
public:
CbmStsDigiSource();
virtual ~CbmStsDigiSource();
std::string fFileName;
std::vector<std::string> fInputFileList; ///< List of input files
uint64_t fFileCounter;
protected:
uint64_t fMaxEvents;
uint64_t fEventNumber;
uint64_t fEventCounter;
uint64_t fMessageCounter;
std::string fFileName;
std::vector<std::string> fInputFileList; ///< List of input files
uint64_t fFileCounter;
int fMaxMemory = 0;
uint64_t fEventNumber;
uint64_t fEventCounter;
uint64_t fMessageCounter;
virtual void InitTask();
virtual bool ConditionalRun();
int fMaxMemory = 0;
private:
void PrintStsDigi(const CbmStsDigi*);
bool SendData();
void CalcRuntime();
bool IsChannelNameAllowed(std::string);
virtual void InitTask();
virtual bool ConditionalRun();
std::chrono::steady_clock::time_point fTime;
private:
void PrintStsDigi(const CbmStsDigi*);
bool SendData();
void CalcRuntime();
bool IsChannelNameAllowed(std::string);
std::vector<std::string> fAllowedChannels
= {"stsdigi"};
std::chrono::steady_clock::time_point fTime;
std::vector<std::string> fAllowedChannels = {"stsdigi"};
};
#endif /* CBMSTSDIGISOURCE_H_ */
/* Copyright (C) 2021 Facility for Antiproton and Ion Research in Europe, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau [committer] */
#include "CbmTsConsumerReqDevExample.h"
#include "CbmFlesCanvasTools.h"
#include "StorableTimeslice.hpp"
#include "FairMQLogger.h"
#include "FairMQProgOptions.h" // device->fConfig
#include "FairParGenericSet.h"
#include "TCanvas.h"
#include "TFile.h"
#include "TH1.h"
#include "TList.h"
#include "TNamed.h"
#include <thread>
#include "BoostSerializer.h"
#include <boost/archive/binary_iarchive.hpp>
#include <boost/serialization/utility.hpp>
#include <array>
#include <iomanip>
#include <stdexcept>
#include <string>
#include "RootSerializer.h"
struct InitTaskError : std::runtime_error {
using std::runtime_error::runtime_error;
};
using namespace std;
CbmTsConsumerReqDevExample::CbmTsConsumerReqDevExample()
// ALGO: : fMonitorAlgo {new CbmMcbm2018MonitorAlgoBmon()}
{
}
void CbmTsConsumerReqDevExample::InitTask()
try {
/// Read options from executable
LOG(info) << "Init options for CbmMqStarHistoServer.";
fbIgnoreOverlapMs = fConfig->GetValue<bool>("IgnOverMs");
fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs");
fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin");
fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax");
fsChannelNameDataInput = fConfig->GetValue<std::string>("TsNameIn");
fsTsBlockName = fConfig->GetValue<std::string>("TsBlockName");
fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn");
LOG(info) << "Histograms publication frequency in TS: " << fuPublishFreqTs;
LOG(info) << "Histograms publication min. interval in s: " << fdMinPublishTime;
LOG(info) << "Histograms publication max. interval in s: " << fdMaxPublishTime;
}
catch (InitTaskError& e) {
LOG(error) << e.what();
ChangeState(fair::mq::Transition::ErrorFound);
}
bool CbmTsConsumerReqDevExample::InitContainers()
{
LOG(info) << "Init parameter containers for CbmTsConsumerReqDevExample.";
// ALGO: fParCList = fMonitorAlgo->GetParList();
fParCList = new TList();
for (int iparC = 0; iparC < fParCList->GetEntries(); iparC++) {
FairParGenericSet* tempObj = (FairParGenericSet*) (fParCList->At(iparC));
fParCList->Remove(tempObj);
std::string paramName {tempObj->GetName()};
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
// Her must come the proper Runid
std::string message = paramName + ",111";
LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message;
FairMQMessagePtr req(NewSimpleMessage(message));
FairMQMessagePtr rep(NewMessage());
FairParGenericSet* newObj = nullptr;
if (Send(req, "parameters") > 0) {
if (Receive(rep, "parameters") >= 0) {
if (rep->GetSize() != 0) {
CbmMqTMessage tmsg(rep->GetData(), rep->GetSize());
newObj = static_cast<FairParGenericSet*>(tmsg.ReadObject(tmsg.GetClass()));
LOG(info) << "Received unpack parameter from the server:";
newObj->print();
}
else {
LOG(error) << "Received empty reply. Parameter not available";
} // if (rep->GetSize() != 0)
} // if (Receive(rep, "parameters") >= 0)
} // if (Send(req, "parameters") > 0)
fParCList->AddAt(newObj, iparC);
delete tempObj;
} // for ( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
/// Apply options to the processing algo
// ALGO: fMonitorAlgo->SetIgnoreOverlapMs(fbIgnoreOverlapMs);
// fMonitorAlgo->AddMsComponentToList(0, 0x90);
// ALGO: Bool_t initOK = fMonitorAlgo->InitContainers();
bool initOK = true;
return initOK;
}
bool CbmTsConsumerReqDevExample::InitHistograms()
{
/// Histos creation and obtain pointer on them
/// Trigger histo creation on all associated algos
// ALGO: bool initOK = fMonitorAlgo->CreateHistograms();
bool initOK = true;
/// Obtain vector of pointers on each histo from the algo (+ optionally desired folder)
// ALGO: std::vector<std::pair<TNamed*, std::string>> vHistos = fMonitorAlgo->GetHistoVector();
std::vector<std::pair<TNamed*, std::string>> vHistos = {};
/// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder)
// ALGO: std::vector<std::pair<TCanvas*, std::string>> vCanvases = fMonitorAlgo->GetCanvasVector();
std::vector<std::pair<TCanvas*, std::string>> vCanvases = {};
/// Add pointers to each histo in the histo array
/// Create histo config vector
/// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder >
/// and send it through a separate channel using the BoostSerializer
for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
// LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
// << " in " << vHistos[ uHisto ].second.data()
// ;
fArrayHisto.Add(vHistos[uHisto].first);
std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second);
fvpsHistosFolder.push_back(psHistoConfig);
LOG(info) << "Config of hist " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data();
} // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
/// Create canvas config vector
/// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config >
/// and send it through a separate channel using the BoostSerializer
for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
// LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
// << " in " << vCanvases[ uCanv ].second.data();
std::string sCanvName = (vCanvases[uCanv].first)->GetName();
std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first);
std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
fvpsCanvasConfig.push_back(psCanvConfig);
LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data();
} // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
return initOK;
}
bool CbmTsConsumerReqDevExample::ConditionalRun()
{
/// First request a new TS (full or single system components or multi-syst components block)
std::string message = fsTsBlockName;
if ("" == message) message = std::to_string(kusSysId);
LOG(debug) << "Requesting new TS by sending message: " << message;
FairMQMessagePtr req(NewSimpleMessage(message));
FairMQMessagePtr rep(NewMessage());
if (Send(req, fsChannelNameDataInput) <= 0) {
LOG(error) << "Failed to send the request! message was " << message;
return false;
} // if (Send(req, fsChannelNameDataInput) <= 0)
else if (Receive(rep, fsChannelNameDataInput) < 0) {
LOG(error) << "Failed to receive a reply to the request! message was " << message;
return false;
} // else if (Receive(rep, fsChannelNameDataInput) < 0)
else if (rep->GetSize() == 0) {
LOG(error) << "Received empty reply. Something went wrong with the timeslice generation! message was " << message;
return false;
} // else if (rep->GetSize() == 0)
/// Message received, do Algo related Initialization steps if needed
if (0 == fulNumMessages) {
try {
InitContainers();
}
catch (InitTaskError& e) {
LOG(error) << e.what();
ChangeState(fair::mq::Transition::ErrorFound);
}
} // if( 0 == fulNumMessages)
if (0 == fulNumMessages) InitHistograms();
fulNumMessages++;
LOG(debug) << "Received message number " << fulNumMessages << " with size " << rep->GetSize();
if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages";
std::string msgStr(static_cast<char*>(rep->GetData()), rep->GetSize());
std::istringstream iss(msgStr);
boost::archive::binary_iarchive inputArchive(iss);
/// Create an empty TS and fill it with the incoming message
fles::StorableTimeslice component {0};
inputArchive >> component;
/// Process the Timeslice
DoUnpack(component, 0);
/// Send histograms each 100 time slices. Should be each ~1s
/// Use also runtime checker to trigger sending after M s if
/// processing too slow or delay sending if processing too fast
std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime;
if ((fdMaxPublishTime < elapsedSeconds.count())
|| (0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) {
if (!fbConfigSent) {
// Send the configuration only once per run!
fbConfigSent = SendHistoConfAndData();
} // if( !fbConfigSent )
else
SendHistograms();
fLastPublishTime = std::chrono::system_clock::now();
} // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
return true;
}
bool CbmTsConsumerReqDevExample::SendHistoConfAndData()
{
/// Prepare multiparts message and header
std::pair<uint32_t, uint32_t> pairHeader(fvpsHistosFolder.size(), fvpsCanvasConfig.size());
FairMQMessagePtr messageHeader(NewMessage());
// Serialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*messageHeader, pairHeader);
BoostSerializer<std::pair<uint32_t, uint32_t>>().Serialize(*messageHeader, pairHeader);
FairMQParts partsOut;
partsOut.AddPart(std::move(messageHeader));
for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) {
/// Serialize the vector of histo config into a single MQ message
FairMQMessagePtr messageHist(NewMessage());
// Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, fvpsHistosFolder[uHisto]);
BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist, fvpsHistosFolder[uHisto]);
partsOut.AddPart(std::move(messageHist));
} // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto)
for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) {
/// Serialize the vector of canvas config into a single MQ message
FairMQMessagePtr messageCan(NewMessage());
// Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, fvpsCanvasConfig[uCanv]);
BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan, fvpsCanvasConfig[uCanv]);
partsOut.AddPart(std::move(messageCan));
} // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv)
/// Serialize the array of histos into a single MQ message
FairMQMessagePtr msgHistos(NewMessage());
// Serialize<RootSerializer>(*msgHistos, &fArrayHisto);
RootSerializer().Serialize(*msgHistos, &fArrayHisto);
partsOut.AddPart(std::move(msgHistos));
/// Send the multi-parts message to the common histogram messages queue
if (Send(partsOut, fsChannelNameHistosInput) < 0) {
LOG(error) << "CbmTsConsumerReqDevExample::SendHistoConfAndData => Problem sending data";
return false;
} // if( Send( partsOut, fsChannelNameHistosInput ) < 0 )
/// Reset the histograms after sending them (but do not reset the time)
// ALGO: fMonitorAlgo->ResetHistograms(kFALSE);
return true;
}
bool CbmTsConsumerReqDevExample::SendHistograms()
{
/// Serialize the array of histos into a single MQ message
FairMQMessagePtr message(NewMessage());
// Serialize<RootSerializer>(*message, &fArrayHisto);
RootSerializer().Serialize(*message, &fArrayHisto);
/// Send message to the common histogram messages queue
if (Send(message, fsChannelNameHistosInput) < 0) {
LOG(error) << "Problem sending data";
return false;
} // if( Send( message, fsChannelNameHistosInput ) < 0 )
/// Reset the histograms after sending them (but do not reset the time)
// ALGO: fMonitorAlgo->ResetHistograms(kFALSE);
return true;
}
CbmTsConsumerReqDevExample::~CbmTsConsumerReqDevExample() {}
Bool_t CbmTsConsumerReqDevExample::DoUnpack(const fles::Timeslice& ts, size_t /*component*/)
{
fulTsCounter++;
if (kFALSE == fbComponentsAddedToList) {
for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
if (kusSysId == ts.descriptor(uCompIdx, 0).sys_id) {
/// Do something here
// ALGO:
std::this_thread::sleep_for(std::chrono::milliseconds(500));
} // if( kusSysId == ts.descriptor( uCompIdx, 0 ).sys_id )
} // for( uint32_t uComp = 0; uComp < ts.num_components(); ++uComp )
fbComponentsAddedToList = kTRUE;
} // if( kFALSE == fbComponentsAddedToList )
// ALGO:
/*
if (kFALSE == fMonitorAlgo->ProcessTs(ts)) {
LOG(error) << "Failed processing TS " << ts.index() << " in unpacker algorithm class";
return kTRUE;
} // if( kFALSE == fMonitorAlgo->ProcessTs( ts ) )
*/
/// Clear the digis vector in case it was filled
// ALGO: fMonitorAlgo->ClearVector();
if (0 == fulTsCounter % 10000) LOG(info) << "Processed " << fulTsCounter << " time slices";
return kTRUE;
}
void CbmTsConsumerReqDevExample::Finish() {}
/* Copyright (C) 2021 Facility for Antiproton and Ion Research in Europe, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau [committer] */
#ifndef CBMTSCONSUMERREQDEVEXPL_H_
#define CBMTSCONSUMERREQDEVEXPL_H_
#include "CbmMqTMessage.h"
#include "Timeslice.hpp"
#include "FairMQDevice.h"
#include "Rtypes.h"
#include "TObjArray.h"
#include <chrono>
#include <map>
#include <vector>
class TList;
class CbmTsConsumerReqDevExample : public FairMQDevice {
public:
CbmTsConsumerReqDevExample();
virtual ~CbmTsConsumerReqDevExample();
protected:
virtual void InitTask();
virtual bool ConditionalRun();
private:
/// Constants
static const uint16_t kusSysId = 0xFF;
/// Control flags
Bool_t fbIgnoreOverlapMs = kFALSE; //! Ignore Overlap Ms: all fuOverlapMsNb MS at the end of timeslice
Bool_t fbComponentsAddedToList = kFALSE;
/// User settings parameters
std::string fsChannelNameDataInput = "ts-request";
std::string fsTsBlockName = "exampleblock";
std::string fsChannelNameHistosInput = "histogram-in";
uint32_t fuPublishFreqTs = 100;
double_t fdMinPublishTime = 0.5;
double_t fdMaxPublishTime = 5.0;
/// Parameters management
TList* fParCList = nullptr;
/// Statistics & first TS rejection
uint64_t fulNumMessages = 0;
uint64_t fulTsCounter = 0;
std::chrono::system_clock::time_point fLastPublishTime = std::chrono::system_clock::now();
/// Processing algo
// ALGO: CbmMcbm2018MonitorAlgoBmon* fMonitorAlgo;
/// Array of histograms to send to the histogram server
TObjArray fArrayHisto = {};
/// Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server
std::vector<std::pair<std::string, std::string>> fvpsHistosFolder = {};
/// Vector of string pairs with ( CanvasName, CanvasConfig ) to send to the histogram server
/// Format of Can config is "NbPadX(U);NbPadY(U);ConfigPad1(s);....;ConfigPadXY(s)"
/// Format of Pad config is "GrixX(b),GridY(b),LogX(b),LogY(b),LogZ(b),HistoName(s),DrawOptions(s)"
std::vector<std::pair<std::string, std::string>> fvpsCanvasConfig = {};
/// Flag indicating whether the histograms and canvases configurations were already published
bool fbConfigSent = false;
bool InitContainers();
bool InitHistograms();
bool DoUnpack(const fles::Timeslice& ts, size_t component);
void Finish();
bool SendHistoConfAndData();
bool SendHistograms();
};
#endif /* CBMTSCONSUMERREQDEVEXPL_H_ */
#include "runFairMQDevice.h"
/* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Florian Uhlig [committer] */
#include "CbmMCPointSource.h"
#include "runFairMQDevice.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("filename", bpo::value<std::string>()->default_value(""), "Filename of the input file")
("max-events", bpo::value<uint64_t>()->default_value(0), "Maximum number of timeslices to process for Run/ConditionalRun/OnData (0 - infinite)");
options.add_options()("filename", bpo::value<std::string>()->default_value(""), "Filename of the input file")(
"max-events", bpo::value<uint64_t>()->default_value(0),
"Maximum number of timeslices to process for Run/ConditionalRun/OnData (0 "
"- infinite)");
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{
return new CbmMCPointSource();
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmMCPointSource(); }
#include "runFairMQDevice.h"
/* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Florian Uhlig [committer] */
#include "CbmStsDigiSource.h"
#include "runFairMQDevice.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("filename", bpo::value<std::string>()->default_value(""), "Filename of the input file")
("max-events", bpo::value<uint64_t>()->default_value(0), "Maximum number of timeslices to process for Run/ConditionalRun/OnData (0 - infinite)");
options.add_options()("filename", bpo::value<std::string>()->default_value(""), "Filename of the input file")(
"max-events", bpo::value<uint64_t>()->default_value(0),
"Maximum number of timeslices to process for Run/ConditionalRun/OnData (0 "
"- infinite)");
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{
return new CbmStsDigiSource();
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmStsDigiSource(); }
/* Copyright (C) 2021 Facility for Antiproton and Ion Research in Europe, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau [committer] */
#include "CbmTsConsumerReqDevExample.h"
#include <iomanip>
#include <string>
#include "runFairMQDevice.h"
namespace bpo = boost::program_options;
using namespace std;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()("IgnOverMs", bpo::value<bool>()->default_value(true), "Ignore overlap MS if true");
options.add_options()("PubFreqTs", bpo::value<uint32_t>()->default_value(100), "Histo publishing frequency in TS");
options.add_options()("PubTimeMin", bpo::value<double_t>()->default_value(1.0),
"Minimal time between two publishing");
options.add_options()("PubTimeMax", bpo::value<double_t>()->default_value(10.0),
"Maximal time between two publishing");
options.add_options()("TsNameIn", bpo::value<std::string>()->default_value("ts-request"),
"MQ channel name for TS data");
options.add_options()("TsBlockName", bpo::value<std::string>()->default_value("exampleblock"),
"Block name for requesting TS data, TOF SysId request if empty");
options.add_options()("ChNameIn", bpo::value<std::string>()->default_value("histogram-in"),
"MQ channel name for histos");
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmTsConsumerReqDevExample(); }
/* Copyright (C) 2021 Facility for Antiproton and Ion Research in Europe, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau [committer] */
#include "CbmMQTsSamplerRepReq.h"
#include "runFairMQDevice.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()("filename", bpo::value<std::string>()->default_value(""), "Filename of the input file");
options.add_options()("dirname", bpo::value<std::string>()->default_value(""),
"Directory name where to find the input files");
options.add_options()("fles-host", bpo::value<std::string>()->default_value(""),
"Host where the timeslice server is running");
options.add_options()("fles-port", bpo::value<uint16_t>()->default_value(0),
"Port where the timeslice server is running");
options.add_options()("max-timeslices", bpo::value<uint64_t>()->default_value(0),
"Maximum number of timeslices to process for Run/ConditionalRun/OnData (0 - infinite)");
options.add_options()("high-water-mark", bpo::value<uint64_t>()->default_value(1), "High water mark for ZeroMQ");
options.add_options()("ChNameTsReq", bpo::value<std::string>()->default_value("ts-request"),
"MQ channel name for TS requests");
options.add_options()("no-split-ts", bpo::value<bool>()->default_value(0),
"Send a copy of the full TS to single consummer");
options.add_options()("send-ts-per-sysid", bpo::value<bool>()->default_value(0),
"Send a single TS upon request of a SysId with all matching components");
options.add_options()("send-ts-per-block", bpo::value<bool>()->default_value(0),
"Send a single TS upon request of a block name with all matching components");
options.add_options()("block-sysid", bpo::value<std::vector<std::string>>(),
"Pair a block name and SysId in hex, separated by :, unique use of SysId for all blocks!");
options.add_options()("ChNameMissTs", bpo::value<std::string>()->default_value(""),
"MQ channel name for missed TS indices");
options.add_options()("ChNameCmds", bpo::value<std::string>()->default_value(""),
"MQ channel name for commands to slaves");
options.add_options()("PubFreqTs", bpo::value<uint32_t>()->default_value(0), "Histo publishing frequency in TS");
options.add_options()("PubTimeMin", bpo::value<double_t>()->default_value(1.0),
"Minimal time between two publishing");
options.add_options()("PubTimeMax", bpo::value<double_t>()->default_value(10.0),
"Maximal time between two publishing");
options.add_options()("HistosSuffix", bpo::value<std::string>()->default_value(""),
"Suffix added to folders, histos and canvases names, e.g. for multiple nodes usages");
options.add_options()("ChNameIn", bpo::value<std::string>()->default_value("histogram-in"),
"MQ channel name for histos");
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmMQTsSamplerRepReq(); }
#include "runFairMQDevice.h"
/* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Florian Uhlig [committer] */
#include "CbmMQTsaInfo.h"
#include "runFairMQDevice.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("filename", bpo::value<std::string>()->default_value(""), "Filename of the input file")
("flib-host", bpo::value<std::string>()->default_value(""), "Host where the timeslice server is running")
("max-timeslices", bpo::value<uint64_t>()->default_value(0), "Maximum number of timeslices to process for Run/ConditionalRun/OnData (0 - infinite)")
("flib-port", bpo::value<uint64_t>()->default_value(0), "Port where the timeslice server is running");
options.add_options()("filename", bpo::value<std::string>()->default_value(""), "Filename of the input file")(
"flib-host", bpo::value<std::string>()->default_value(""), "Host where the timeslice server is running")(
"max-timeslices", bpo::value<uint64_t>()->default_value(0),
"Maximum number of timeslices to process for Run/ConditionalRun/OnData (0 "
"- infinite)")("flib-port", bpo::value<uint64_t>()->default_value(0), "Port where the timeslice server is running");
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{
return new CbmMQTsaInfo();
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmMQTsaInfo(); }
#include "runFairMQDevice.h"
/* Copyright (C) 2019-2021 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Florian Uhlig [committer], Pierre-Alain Loizeau */
#include "CbmMQTsaMultiSampler.h"
#include "runFairMQDevice.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("filename", bpo::value<std::string>()->default_value(""), "Filename of the input file")
("dirname", bpo::value<std::string>()->default_value(""), "Directory name where to find the input files")
("flib-host", bpo::value<std::string>()->default_value(""), "Host where the timeslice server is running")
("max-timeslices", bpo::value<uint64_t>()->default_value(0), "Maximum number of timeslices to process for Run/ConditionalRun/OnData (0 - infinite)")
("high-water-mark", bpo::value<uint64_t>()->default_value(1), "High water mark for ZeroMQ")
("no-split-ts", bpo::value<bool>()->default_value(0), "Send a copy of the full TS to all enabled channels")
("send-ts-per-sysid", bpo::value<bool>()->default_value(0), "Send a single TS per SysId with all matching components")
("send-ts-per-channel", bpo::value<bool>()->default_value(0), "Send a single TS per channel with all matching components")
("sysid-chan", bpo::value< std::vector< std::string > >(), "Pair a SysId in hex + channel name, separated by :, unique SysId!")
("flib-port", bpo::value<uint64_t>()->default_value(0), "Port where the timeslice server is running");
}
options.add_options()("filename", bpo::value<std::string>()->default_value(""), "Filename of the input file")(
"dirname", bpo::value<std::string>()->default_value(""), "Directory name where to find the input files")(
"flib-host", bpo::value<std::string>()->default_value(""), "Host where the timeslice server is running")(
"max-timeslices", bpo::value<uint64_t>()->default_value(0),
"Maximum number of timeslices to process for Run/ConditionalRun/OnData (0 "
"- infinite)")("high-water-mark", bpo::value<uint64_t>()->default_value(1), "High water mark for ZeroMQ")(
"no-split-ts", bpo::value<bool>()->default_value(0),
"Send a copy of the full TS to all enabled channels")("send-ts-per-sysid", bpo::value<bool>()->default_value(0),
"Send a single TS per SysId with all matching components")(
"send-ts-per-channel", bpo::value<bool>()->default_value(0),
"Send a single TS per channel with all matching components")(
"sysid-chan", bpo::value<std::vector<std::string>>(),
"Pair a SysId in hex + channel name, separated by :, unique SysId!")(
"flib-port", bpo::value<uint64_t>()->default_value(0), "Port where the timeslice server is running");
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{
return new CbmMQTsaMultiSampler();
options.add_options()("ChNameMissTs", bpo::value<std::string>()->default_value(""),
"MQ channel name for missed TS indices");
options.add_options()("ChNameCmds", bpo::value<std::string>()->default_value(""),
"MQ channel name for commands to slaves");
options.add_options()("PubFreqTs", bpo::value<uint32_t>()->default_value(0), "Histo publishing frequency in TS");
options.add_options()("PubTimeMin", bpo::value<double_t>()->default_value(1.0),
"Minimal time between two publishing");
options.add_options()("PubTimeMax", bpo::value<double_t>()->default_value(10.0),
"Maximal time between two publishing");
options.add_options()("ChNameIn", bpo::value<std::string>()->default_value("histogram-in"),
"MQ channel name for histos");
options.add_options()("ChNameHistCfg", bpo::value<std::string>()->default_value("histo-conf"),
"MQ channel name for histos config");
options.add_options()("ChNameCanvCfg", bpo::value<std::string>()->default_value("canvas-conf"),
"MQ channel name for canvases config");
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmMQTsaMultiSampler(); }
#include "runFairMQDevice.h"
/* Copyright (C) 2019-2020 PI-UHd, GSI
SPDX-License-Identifier: GPL-3.0-only
Authors: Norbert Herrmann [committer] */
#include "CbmMQTsaMultiSamplerTof.h"
#include "runFairMQDevice.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("filename", bpo::value<std::string>()->default_value(""), "Filename of the input file")
("dirname", bpo::value<std::string>()->default_value(""), "Directory name where to find the input files")
("flib-host", bpo::value<std::string>()->default_value(""), "Host where the timeslice server is running")
("max-timeslices", bpo::value<uint64_t>()->default_value(0), "Maximum number of timeslices to process for Run/ConditionalRun/OnData (0 - infinite)")
("flib-port", bpo::value<uint64_t>()->default_value(0), "Port where the timeslice server is running")
("SelectComponents", bpo::value<uint64_t>()->default_value(0), "Select components for transport");
options.add_options()("filename", bpo::value<std::string>()->default_value(""), "Filename of the input file")(
"dirname", bpo::value<std::string>()->default_value(""), "Directory name where to find the input files")(
"flib-host", bpo::value<std::string>()->default_value(""), "Host where the timeslice server is running")(
"max-timeslices", bpo::value<uint64_t>()->default_value(0),
"Maximum number of timeslices to process for Run/ConditionalRun/OnData (0 "
"- infinite)")("flib-port", bpo::value<uint64_t>()->default_value(0), "Port where the timeslice server is running")(
"SelectComponents", bpo::value<uint64_t>()->default_value(0), "Select components for transport");
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{
return new CbmMQTsaMultiSamplerTof();
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmMQTsaMultiSamplerTof(); }
#include "runFairMQDevice.h"
/* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Florian Uhlig [committer] */
#include "CbmMQTsaSampler.h"
#include "runFairMQDevice.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("filename", bpo::value<std::string>()->default_value(""), "Filename of the input file")
("dirname", bpo::value<std::string>()->default_value(""), "Directory name where to find the input files")
("flib-host", bpo::value<std::string>()->default_value(""), "Host where the timeslice server is running")
("max-timeslices", bpo::value<uint64_t>()->default_value(0), "Maximum number of timeslices to process for Run/ConditionalRun/OnData (0 - infinite)")
("flib-port", bpo::value<uint64_t>()->default_value(0), "Port where the timeslice server is running");
options.add_options()("filename", bpo::value<std::string>()->default_value(""), "Filename of the input file")(
"dirname", bpo::value<std::string>()->default_value(""), "Directory name where to find the input files")(
"flib-host", bpo::value<std::string>()->default_value(""), "Host where the timeslice server is running")(
"max-timeslices", bpo::value<uint64_t>()->default_value(0),
"Maximum number of timeslices to process for Run/ConditionalRun/OnData (0 "
"- infinite)")("flib-port", bpo::value<uint64_t>()->default_value(0), "Port where the timeslice server is running");
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{
return new CbmMQTsaSampler();
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmMQTsaSampler(); }
#include "runFairMQDevice.h"
/* Copyright (C) 2018 PI-UHd, GSI
SPDX-License-Identifier: GPL-3.0-only
Authors: Norbert Herrmann [committer] */
#include "CbmMQTsaSamplerTof.h"
#include "runFairMQDevice.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("filename", bpo::value<std::string>()->default_value(""), "Filename of the input file")
("dirname", bpo::value<std::string>()->default_value(""), "Directory name where to find the input files")
("flib-host", bpo::value<std::string>()->default_value(""), "Host where the timeslice server is running")
("max-timeslices", bpo::value<uint64_t>()->default_value(0), "Maximum number of timeslices to process for Run/ConditionalRun/OnData (0 - infinite)")
("flib-port", bpo::value<uint64_t>()->default_value(0), "Port where the timeslice server is running");
options.add_options()("filename", bpo::value<std::string>()->default_value(""), "Filename of the input file")(
"dirname", bpo::value<std::string>()->default_value(""), "Directory name where to find the input files")(
"flib-host", bpo::value<std::string>()->default_value(""), "Host where the timeslice server is running")(
"max-timeslices", bpo::value<uint64_t>()->default_value(0),
"Maximum number of timeslices to process for Run/ConditionalRun/OnData (0 "
"- infinite)")("flib-port", bpo::value<uint64_t>()->default_value(0), "Port where the timeslice server is running");
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{
return new CbmMQTsaSamplerTof();
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmMQTsaSamplerTof(); }
#!/bin/bash
$FAIRROOTPATH/bin/shmmonitor --cleanup
if [ -e @SIMPATH@/bin/fairmq-shmmonitor ]; then
@SIMPATH@/bin/fairmq-shmmonitor --cleanup
fi
if [ -z "$1" ]; then
if [ -z "$1" ]; then
# _filename=@VMCWORKDIR@/input/stsxyter_cosy2018.tsa
_filename=@VMCWORKDIR@/input/tofget4_hd2018.tsa
else
else
_filename=$1
fi
......
#!/bin/bash
if [ -e $FAIRROOTPATH/bin/shmmonitor ]; then
$FAIRROOTPATH/bin/shmmonitor --cleanup
fi
if [ -e @SIMPATH@/bin/fairmq-shmmonitor ]; then
@SIMPATH@/bin/fairmq-shmmonitor --cleanup
fi
if [ -z "$1" ]; then
if [ -z "$1" ]; then
# _filename=@VMCWORKDIR@/input/stsxyter_cosy2018.tsa
_filename=tofget4_hd2018.tsa
_dirname=@VMCWORKDIR@/input/
else
else
_filename=$1
_dirname=$2
fi
......
#!/bin/bash
# Copyright (C) 2020 PI-UHd,GSI
# SPDX-License-Identifier: GPL-3.0-only
# First commited by Norbert Herrmann
pkill -SIGINT Hit
pkill -SIGINT Unp
sleep 30
pkill -9 Tsa
pkill -9 Hit
pkill -9 Unp
pkill -9 parmq