Newer
Older
/* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau, Florian Uhlig [committer] */
/**
* CbmMQTsaMultiSampler.cpp
*
* @since 2017-11-17
* @author F. Uhlig
*/
#include "CbmMQTsaMultiSampler.h"
#include "CbmFlesCanvasTools.h"

Pierre-Alain Loizeau
committed
#include "CbmFormatDecHexPrintout.h"
#include "TimesliceInputArchive.hpp"
#include "TimesliceMultiInputArchive.hpp"
#include "TimesliceMultiSubscriber.hpp"
#include "TimesliceSubscriber.hpp"
#include "FairMQLogger.h"
#include "FairMQProgOptions.h" // device->fConfig
#include <TCanvas.h>
#include <TH1F.h>
#include <TH1I.h>
#include <TProfile.h>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/filesystem.hpp>
#include <boost/regex.hpp>
#include <boost/serialization/utility.hpp>
namespace filesys = boost::filesystem;
#include <thread> // this_thread::sleep_for
#include <algorithm>
#include <chrono>
#include <ctime>
using namespace std;
#include <stdexcept>
struct InitTaskError : std::runtime_error {
using std::runtime_error::runtime_error;
};
CbmMQTsaMultiSampler::CbmMQTsaMultiSampler()
: FairMQDevice()
, fMaxTimeslices(0)
, fFileName("")
, fDirName("")
, fInputFileList()
, fFileCounter(0)
, fHost("")
, fPort(0)
, fHighWaterMark(1)
, fTSCounter(0)
, fMessageCounter(0)
, fSource(nullptr)
, fTime()
, fLastPublishTime {std::chrono::system_clock::now()}
void CbmMQTsaMultiSampler::InitTask()
try {
// Get the values from the command line options (via fConfig)
fFileName = fConfig->GetValue<string>("filename");
fDirName = fConfig->GetValue<string>("dirname");
fHost = fConfig->GetValue<string>("flib-host");
fPort = fConfig->GetValue<uint64_t>("flib-port");
fHighWaterMark = fConfig->GetValue<uint64_t>("high-water-mark");
fMaxTimeslices = fConfig->GetValue<uint64_t>("max-timeslices");
fbNoSplitTs = fConfig->GetValue<bool>("no-split-ts");
fbSendTsPerSysId = fConfig->GetValue<bool>("send-ts-per-sysid");
fbSendTsPerChannel = fConfig->GetValue<bool>("send-ts-per-channel");
fsChannelNameMissedTs = fConfig->GetValue<std::string>("ChNameMissTs");
fsChannelNameCommands = fConfig->GetValue<std::string>("ChNameCmds");
fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs");
fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin");
fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax");
fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn");
fsChannelNameHistosConfig = fConfig->GetValue<std::string>("ChNameHistCfg");
fsChannelNameCanvasConfig = fConfig->GetValue<std::string>("ChNameCanvCfg");
if (fbNoSplitTs) {
if (fbSendTsPerSysId) {
if (fbSendTsPerChannel) {
LOG(warning) << "Both no-split-ts, send-ts-per-sysid and "
"send-ts-per-channel options used => "
<< " second and third one will be ignored!!!!";
} // if( fbSendTsPerSysId )
else
LOG(warning) << "Both no-split-ts and send-ts-per-sysid options used => "
<< " second one will be ignored!!!!";
} // if( fbSendTsPerSysId )
else if (fbSendTsPerChannel) {
LOG(warning) << "Both no-split-ts and send-ts-per-channel options used => "
<< " second one will be ignored!!!!";
} // else if( fbSendTsPerSysId ) of if( fbSendTsPerSysId )
} // if( fbNoSplitTs )
else if (fbSendTsPerSysId && fbSendTsPerChannel) {
LOG(warning) << "Both send-ts-per-sysid and send-ts-per-channel options used => "
<< " second one will be ignored!!!!";
} // else if( fbSendTsPerSysId && fbSendTsPerSysId ) of if( fbNoSplitTs )
/// Extract SysId and channel information if provided in the binary options
std::vector<std::string> vSysIdChanPairs = fConfig->GetValue<std::vector<std::string>>("sysid-chan");
for (uint32_t uPair = 0; uPair < vSysIdChanPairs.size(); ++uPair) {
const size_t sep = vSysIdChanPairs[uPair].find(':');
if (string::npos == sep || 0 == sep || vSysIdChanPairs[uPair].size() == sep) {
throw InitTaskError("Provided pair of SysId + Channel name is missing a : or an argument.");
} // if( string::npos == sep || 0 == sep || vSysIdChanPairs[ uPair ].size() == sep )
/// Extract SysId
std::string sSysId = vSysIdChanPairs[uPair].substr(0, sep);
const size_t hexPos = sSysId.find("0x");
int iSysId;
if (string::npos == hexPos) iSysId = std::stoi(sSysId);
else
iSysId = std::stoi(sSysId.substr(hexPos + 2), nullptr, 16);
/// Extract Channel name
std::string sChannelName = vSysIdChanPairs[uPair].substr(sep + 1);
/// Look if SysId is already defined
const vector<int>::const_iterator pos = std::find(fSysId.begin(), fSysId.end(), iSysId);
if (fSysId.end() != pos) {
/// SysId already there, redefine the corresponding channel name
const vector<std::string>::size_type idx = pos - fSysId.begin();
fAllowedChannels[idx] = sChannelName;
} // if( fSysId.end() != pos )
else {
/// SysId unknown yet, add both SysId and channe name at end of respective vectors
fSysId.push_back(iSysId);
fAllowedChannels.push_back(sChannelName);
} // else of if( fSysId.end() != pos )
LOG(info) << vSysIdChanPairs[uPair] << " " << iSysId << " " << sChannelName;
} // for( uint32_t uPair = 0; uPair < vSysIdChanPairs.size(); ++uPair )
if (0 == fMaxTimeslices) fMaxTimeslices = UINT_MAX;
// Check which input is defined
// Posibilities
// filename && ! dirname : single file
// filename with wildcards && diranme : all files with filename regex in the directory
// host && port : connect to the flim server
bool isGoodInputCombi {false};
if (0 != fFileName.size() && 0 == fDirName.size() && 0 == fHost.size() && 0 == fPort) {
isGoodInputCombi = true;
fInputFileList.push_back(fFileName);
}
else if (0 != fFileName.size() && 0 != fDirName.size() && 0 == fHost.size() && 0 == fPort) {
isGoodInputCombi = true;
fInputFileList.push_back(fFileName);
}
else if (0 == fFileName.size() && 0 == fDirName.size() && 0 != fHost.size() && 0 != fPort) {
isGoodInputCombi = true;
LOG(info) << "Host: " << fHost;
LOG(info) << "Port: " << fPort;
}
else if (0 == fFileName.size() && 0 == fDirName.size() && 0 != fHost.size() && 0 == fPort) {
isGoodInputCombi = true;
LOG(info) << "Host string: " << fHost;
throw InitTaskError("Wrong combination of inputs. Either file or wildcard file + directory "
"or host + port are allowed combination.");

Pierre-Alain Loizeau
committed
LOG(info) << "MaxTimeslices: " << fMaxTimeslices;
// Get the information about created channels from the device
// Check if the defined channels from the topology (by name)
// are in the list of channels which are possible/allowed
// for the device
// The idea is to check at initilization if the devices are
// properly connected. For the time beeing this is done with a
// nameing convention. It is not avoided that someone sends other
// data on this channel.
int noChannel = fChannels.size();
LOG(info) << "Number of defined output channels: " << noChannel;
for (auto const& entry : fChannels) {
/// Catches and ignores the channels for missing TS indices and commands
/// Same for the histogram channels
if (entry.first == fsChannelNameMissedTs || entry.first == fsChannelNameCommands
|| (0 < fuPublishFreqTs
&& (entry.first == fsChannelNameHistosInput || entry.first == fsChannelNameHistosConfig
|| entry.first == fsChannelNameCanvasConfig))) {
} // if( entry.first == fsChannelNameMissedTs || entry.first == fsChannelNameCommands || histo channels name)
if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
for (auto const& value : fComponentsToSend) {
LOG(info) << "Value : " << value;
if (value > 1) {
throw InitTaskError("Sending same data to more than one output channel "
"not implemented yet.");
if (0 == fFileName.size() && 0 != fHost.size() && 0 != fPort) {
// Don't add the protocol since this is done now in the TimesliceMultiSubscriber
//std::string connector = "tcp://" + fHost + ":" + std::to_string(fPort);
std::string connector = fHost + ":" + std::to_string(fPort);
LOG(info) << "Open TSPublisher at " << connector;
fSource = new fles::TimesliceMultiSubscriber(connector);
}
else if (0 == fFileName.size() && 0 != fHost.size()) {
std::string connector = fHost;
LOG(info) << "Open TSPublisher with host string: " << connector;
fSource = new fles::TimesliceMultiSubscriber(connector, fHighWaterMark);
// Create a ";" separated string with all file names
std::string fileName = obj;
fileList += fileName;
fileList += ";";
}
LOG(info) << "Input File String: " << fileList;
fSource = new fles::TimesliceMultiInputArchive(fileList, fDirName);
if (!fSource) { throw InitTaskError("Could open files from file list."); }
LOG(info) << "High-Water Mark: " << fHighWaterMark;
LOG(info) << "Max. Timeslices: " << fMaxTimeslices;
if (fbNoSplitTs) { LOG(info) << "Sending TS copies in no-split mode"; } // if( fbNoSplitTs )
LOG(info) << "Sending components in separate TS per SysId";
} // else if( fbSendTsPerSysId && fbSendTsPerSysId ) of if( fbNoSplitTs
else if (fbSendTsPerChannel) {
LOG(info) << "Sending components in separate TS per channel";
} // else if( fbSendTsPerSysId && fbSendTsPerSysId ) of if( fbNoSplitTs )

Pierre-Alain Loizeau
committed
fTime = std::chrono::steady_clock::now();
}
catch (InitTaskError& e) {
ChangeState(fair::mq::Transition::ErrorFound);
bool CbmMQTsaMultiSampler::IsChannelNameAllowed(std::string channelName)
{

Pierre-Alain Loizeau
committed
/// If sending full TS, accept any name!

Pierre-Alain Loizeau
committed
fComponentsToSend[0]++;
fChannelsToSend[0].push_back(channelName);
return true;

Pierre-Alain Loizeau
committed
// for(auto const &entry : fAllowedChannels) {
for (uint32_t idx = 0; idx < fAllowedChannels.size(); ++idx) {
auto const& entry = fAllowedChannels[idx];
LOG(info) << "Looking for name " << channelName << " in " << entry;
std::size_t pos1 = channelName.find(entry);
const vector<std::string>::const_iterator pos =
std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
const vector<std::string>::size_type idx = pos-fAllowedChannels.begin();
*/
LOG(info) << "Found " << entry << " in " << channelName;
LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx
<< " (SysId 0x" << std::hex << fSysId[idx] << std::dec << ")";
fComponentsToSend[idx]++;
fChannelsToSend[idx].push_back(channelName);
/// If sending per channel, do not stop the loop as we allow more than 1 comp type per channel
if (fbSendTsPerChannel) bFoundMatch = true;
else
return true;
} // if (pos1!=std::string::npos)
}
/// If sending per channel, do not stop the loop but still check if at least 1 match found
LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
LOG(error) << "Stop device.";
return false;
}

Pierre-Alain Loizeau
committed
bool CbmMQTsaMultiSampler::InitHistograms()
{
LOG(info) << "Histograms publication frequency in TS: " << fuPublishFreqTs;
LOG(info) << "Histograms publication min. interval in s: " << fdMinPublishTime;
LOG(info) << "Histograms publication max. interval in s: " << fdMaxPublishTime;
/// Vector of pointers on each histo (+ optionally desired folder)
std::vector<std::pair<TNamed*, std::string>> vHistos = {};
/// Vector of pointers on each canvas (+ optionally desired folder)
std::vector<std::pair<TCanvas*, std::string>> vCanvases = {};
/// Histos creation and obtain pointer on them
fhTsRate = new TH1I("TsRate", "TS rate; t [s]", 1800, 0., 1800.);
fhTsSize = new TH1I("TsSize", "Size of TS; Size [MB]", 15000, 0., 15000.);
fhTsSizeEvo = new TProfile("TsSizeEvo", "Evolution of the TS Size; t [s]; Mean size [MB]", 1800, 0., 1800.);
fhTsMaxSizeEvo = new TH1F("TsMaxSizeEvo", "Evolution of maximal TS Size; t [s]; Max size [MB]", 1800, 0., 1800.);

Pierre-Alain Loizeau
committed
fhMissedTS = new TH1I("Missed_TS", "Missed TS", 2, -0.5, 1.5);

Pierre-Alain Loizeau
committed
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
fhMissedTSEvo = new TProfile("Missed_TS_Evo", "Missed TS evolution; t [s]", 1800, 0., 1800.);
/// Add histo pointers to the histo vector
vHistos.push_back(std::pair<TNamed*, std::string>(fhTsRate, "Sampler"));
vHistos.push_back(std::pair<TNamed*, std::string>(fhTsSize, "Sampler"));
vHistos.push_back(std::pair<TNamed*, std::string>(fhTsSizeEvo, "Sampler"));
vHistos.push_back(std::pair<TNamed*, std::string>(fhTsMaxSizeEvo, "Sampler"));
vHistos.push_back(std::pair<TNamed*, std::string>(fhMissedTS, "Sampler"));
vHistos.push_back(std::pair<TNamed*, std::string>(fhMissedTSEvo, "Sampler"));
/// Canvases creation
Double_t w = 10;
Double_t h = 10;
fcSummary = new TCanvas("cSampSummary", "Sampler monitoring plots", w, h);
fcSummary->Divide(2, 3);
fcSummary->cd(1);
gPad->SetGridx();
gPad->SetGridy();
fhTsRate->Draw("hist");
fcSummary->cd(2);
gPad->SetGridx();
gPad->SetGridy();
gPad->SetLogx();
gPad->SetLogy();
fhTsSize->Draw("hist");
fcSummary->cd(3);
gPad->SetGridx();
gPad->SetGridy();
fhTsSizeEvo->Draw("hist");
fcSummary->cd(4);
gPad->SetGridx();
gPad->SetGridy();
fhTsMaxSizeEvo->Draw("hist");
fcSummary->cd(5);
gPad->SetGridx();
gPad->SetGridy();
fhMissedTS->Draw("hist");
fcSummary->cd(6);
gPad->SetGridx();
gPad->SetGridy();
fhMissedTSEvo->Draw("el");
/// Add canvas pointers to the canvas vector
vCanvases.push_back(std::pair<TCanvas*, std::string>(fcSummary, "canvases"));
/// Add pointers to each histo in the histo array
/// Create histo config vector
/// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder >
/// and send it through a separate channel using the BoostSerializer
for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
// LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
// << " in " << vHistos[ uHisto ].second.data()
// ;
fArrayHisto.Add(vHistos[uHisto].first);
std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second);
fvpsHistosFolder.push_back(psHistoConfig);

Pierre-Alain Loizeau
committed
/// Serialize the vector of histo config into a single MQ message
// Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, psHistoConfig);
BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist, psHistoConfig);

Pierre-Alain Loizeau
committed
/// Send message to the common histogram config messages queue
if (Send(messageHist, fsChannelNameHistosConfig) < 0) {

Pierre-Alain Loizeau
committed
LOG(fatal) << "Problem sending histo config";
} // if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )

Pierre-Alain Loizeau
committed
LOG(info) << "Config of hist " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data();

Pierre-Alain Loizeau
committed
} // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
/// Create canvas config vector
/// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config >
/// and send it through a separate channel using the BoostSerializer
for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
// LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
// << " in " << vCanvases[ uCanv ].second.data();
std::string sCanvName = (vCanvases[uCanv].first)->GetName();
std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first);

Pierre-Alain Loizeau
committed
std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);

Pierre-Alain Loizeau
committed

Pierre-Alain Loizeau
committed
/// Serialize the vector of canvas config into a single MQ message
// Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, psCanvConfig);
BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan, psCanvConfig);

Pierre-Alain Loizeau
committed
/// Send message to the common canvas config messages queue
if (Send(messageCan, fsChannelNameCanvasConfig) < 0) {

Pierre-Alain Loizeau
committed
LOG(fatal) << "Problem sending canvas config";
} // if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )

Pierre-Alain Loizeau
committed
LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data();

Pierre-Alain Loizeau
committed
} // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
return true;
}
bool CbmMQTsaMultiSampler::ConditionalRun()
{
if (0 < fuPublishFreqTs && 0 == fTSCounter) { InitHistograms(); } // if( 0 < fuPublishFreqTs )

Pierre-Alain Loizeau
committed
/// initialize the source (connect to emitter, ...)
if (0 == fTSCounter && nullptr != dynamic_cast<fles::TimesliceMultiSubscriber*>(fSource)) {
dynamic_cast<fles::TimesliceMultiSubscriber*>(fSource)->InitTimesliceSubscriber();
} // if( 0 == fTSCounter && nullptr != dynamic_cast< fles::TimesliceMultiSubscriber >(fSource) )

Pierre-Alain Loizeau
committed
auto timeslice = fSource->get();
if (timeslice) {
if (fTSCounter < fMaxTimeslices) {
fTSCounter++;
const fles::Timeslice& ts = *timeslice;

Pierre-Alain Loizeau
committed
uint64_t uTsTime = ts.descriptor(0, 0).idx;
if (0 == fuStartTime) { fuStartTime = uTsTime; } // if( 0 == fuStartTime )
fdTimeToStart = static_cast<double_t>(uTsTime - fuStartTime) / 1e9;
uint64_t uSizeMb = 0;
for (uint64_t uComp = 0; uComp < ts.num_components(); ++uComp) {
uSizeMb += ts.size_component(uComp) / (1024 * 1024);
} // for( uint_t uComp = 0; uComp < ts.num_components(); ++uComp )
fhTsRate->Fill(fdTimeToStart);
fhTsSize->Fill(uSizeMb);
fhTsSizeEvo->Fill(fdTimeToStart, uSizeMb);
/// Fill max size per s (assumes the histo binning is 1 second!)
fdLastMaxTime = fdTimeToStart;
fdTsMaxSize = uSizeMb;
} // if( 0. == fdLastMaxTime )
else if (1. <= fdTimeToStart - fdLastMaxTime) {
fhTsMaxSizeEvo->Fill(fdLastMaxTime, fdTsMaxSize);
fdLastMaxTime = fdTimeToStart;
fdTsMaxSize = uSizeMb;
} // else if if( 1 <= fdTimeToStart - fdLastMaxTime )
else if (fdTsMaxSize < uSizeMb) {
fdTsMaxSize = uSizeMb;
} // else if( fdTsMaxSize < uSizeMb )
} // if( 0 < fuPublishFreqTs )

Pierre-Alain Loizeau
committed
/// Missed TS detection (only if output channel name defined by user)

Pierre-Alain Loizeau
committed
if ((uTsIndex != (fuPrevTsIndex + 1)) && !(0 == fuPrevTsIndex && 0 == uTsIndex)) {

Pierre-Alain Loizeau
committed
LOG(info) << "Missed Timeslices. Old TS Index was " << fuPrevTsIndex << " New TS Index is " << uTsIndex
<< " diff is " << uTsIndex - fuPrevTsIndex << " Missing are " << uTsIndex - fuPrevTsIndex - 1;

Pierre-Alain Loizeau
committed
/// Add missing TS indices to a vector and send it in appropriate channel
std::vector<uint64_t> vulMissedIndices;
for (uint64_t ulMiss = fuPrevTsIndex + 1; ulMiss < uTsIndex; ++ulMiss) {
vulMissedIndices.emplace_back(ulMiss);
} // for( uint64_t ulMiss = fuPrevTsIndex + 1; ulMiss < uTsIndex; ++ulMiss )
if (!SendMissedTsIdx(vulMissedIndices)) {
/// If command channel defined, send command to all "slaves"
if ("" != fsChannelNameCommands) {
/// Wait 1 s before sending a STOP to let all slaves finish processing previous data
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
SendCommand("STOP");
} // if( "" != fsChannelNameCommands )
return false;
} // if( !SendMissedTsIdx( vulMissedIndices ) )

Pierre-Alain Loizeau
committed
fhMissedTS->Fill(1, uTsIndex - fuPrevTsIndex - 1);
fhMissedTSEvo->Fill(fdTimeToStart, 1, uTsIndex - fuPrevTsIndex - 1);

Pierre-Alain Loizeau
committed
} // if( ( uTsIndex != ( fuPrevTsIndex + 1 ) ) && !( 0 == fuPrevTsIndex && 0 == uTsIndex ) )

Pierre-Alain Loizeau
committed
if (0 < fuPublishFreqTs) {
fhMissedTS->Fill(0);
fhMissedTSEvo->Fill(fdTimeToStart, 0, 1);

Pierre-Alain Loizeau
committed
fuPrevTsIndex = uTsIndex;
if (fTSCounter % 10000 == 0) { LOG(info) << "Received TS " << fTSCounter << " with index " << uTsIndex; }
LOG(debug) << "Found " << ts.num_components() << " different components in timeslice";
/// => Inefficient as copy the TS as many times as need!

Pierre-Alain Loizeau
committed
/// If command channel defined, send command to all "slaves"

Pierre-Alain Loizeau
committed
/// Wait 1 s before sending a STOP to let all slaves finish processing previous data
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
SendCommand("STOP");
} // if( "" != fsChannelNameCommands )

Pierre-Alain Loizeau
committed
} // if( !CreateAndSendFullTs( ts ) )
} // if( fbNoSplitTs )
else if (fbSendTsPerSysId) {
/// This assumes that the order of the components does NOT change after the first TS
/// That should be the case as the component index correspond to a physical link idx

Pierre-Alain Loizeau
committed
/// If command channel defined, send command to all "slaves"

Pierre-Alain Loizeau
committed
/// Wait 1 s before sending a STOP to let all slaves finish processing previous data
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
SendCommand("STOP");
} // if( "" != fsChannelNameCommands )

Pierre-Alain Loizeau
committed
} // if( !CreateAndCombineComponentsPerSysId( ts ) )
} // else if( fbSendTsPerSysId ) of if( fbNoSplitTs )
else if (fbSendTsPerChannel) {
/// This assumes that the order of the components does NOT change after the first TS
/// That should be the case as the component index correspond to a physical link idx

Pierre-Alain Loizeau
committed
/// If command channel defined, send command to all "slaves"

Pierre-Alain Loizeau
committed
/// Wait 1 s before sending a STOP to let all slaves finish processing previous data
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
SendCommand("STOP");
} // if( "" != fsChannelNameCommands )

Pierre-Alain Loizeau
committed
} // if( !CreateAndCombineComponentsPerChannel( ts ) )
} // else if( fbSendTsPerChannel ) of if( fbSendTsPerSysId )
else {
for (unsigned int nrComp = 0; nrComp < ts.num_components(); ++nrComp) {
if (!CreateAndSendComponent(ts, nrComp)) {
/// If command channel defined, send command to all "slaves"
if ("" != fsChannelNameCommands) {
/// Wait 1 s before sending a STOP to let all slaves finish processing previous data
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
SendCommand("STOP");
} // if( "" != fsChannelNameCommands )
return false;
} // if( !CreateAndSendComponent(ts, nrComp) )
} // for (unsigned int nrComp = 0; nrComp < ts.num_components(); ++nrComp)
} // else of if( fbSendTsPerSysId )
/// Send histograms periodically.
/// Use also runtime checker to trigger sending after M s if
/// processing too slow or delay sending if processing too fast
std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime;
if ((fdMaxPublishTime < elapsedSeconds.count())
|| (0 == fTSCounter % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) {
SendHistograms();
fLastPublishTime = std::chrono::system_clock::now();
} // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fTSCounter % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
else {

Pierre-Alain Loizeau
committed
/// If command channel defined, send command to all "slaves"

Pierre-Alain Loizeau
committed
/// Wait 1 s before sending an EOF to let all slaves finish processing previous data

Pierre-Alain Loizeau
committed
std::string sCmd = "EOF ";

Pierre-Alain Loizeau
committed
sCmd += " ";
sCmd += FormatDecPrintout(fTSCounter);
SendCommand(sCmd);
} // if( "" != fsChannelNameCommands )

Pierre-Alain Loizeau
committed
} // else of if (fTSCounter < fMaxTimeslices)
} // if (timeslice)
else {

Pierre-Alain Loizeau
committed
/// If command channel defined, send command to all "slaves"

Pierre-Alain Loizeau
committed
/// Wait 1 s before sending an EOF to let all slaves finish processing previous data

Pierre-Alain Loizeau
committed
std::string sCmd = "EOF ";

Pierre-Alain Loizeau
committed
sCmd += " ";
sCmd += FormatDecPrintout(fTSCounter);
SendCommand(sCmd);
} // if( "" != fsChannelNameCommands )

Pierre-Alain Loizeau
committed
bool CbmMQTsaMultiSampler::CreateAndSendComponent(const fles::Timeslice& ts, int nrComp)
{
// Check if component has to be send. If the corresponding channel
// is connected create the new timeslice and send it to the
// correct channel
LOG(debug) << "SysID: " << static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
const vector<int>::const_iterator pos =
std::find(fSysId.begin(), fSysId.end(), static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
if (pos != fSysId.end()) {
const vector<std::string>::size_type idx = pos - fSysId.begin();
if (fComponentsToSend[idx] > 0) {
LOG(debug) << "Create timeslice component for link " << nrComp;
fles::StorableTimeslice component {static_cast<uint32_t>(ts.num_core_microslices()), ts.index()};
component.append_component(ts.num_microslices(0));
for (size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
component.append_microslice(0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m));
LOG(info) << "Number of core microslices before: " << ts.num_core_microslices();
LOG(info) << "Number of core microslices after : " << component.num_core_microslices();
LOG(info) << "Number of microslices: " << component.num_microslices(0);
*/
return true;
}
}
return true;
}
bool CbmMQTsaMultiSampler::CreateAndCombineComponentsPerSysId(const fles::Timeslice& ts)
{
/// First build the list of components for each SysId if it was not already done
if (false == fbListCompPerSysIdReady) {
for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
uint16_t usSysId = ts.descriptor(uCompIdx, 0).sys_id;
const vector<int>::const_iterator pos = std::find(fSysId.begin(), fSysId.end(), usSysId);
const vector<std::string>::size_type idx = pos - fSysId.begin();
fvvCompPerSysId[idx].push_back(uCompIdx);
} // if( fSysId.end() != pos )
} // for( uint32_t uNrComp = 0; uNrComp < ts.num_components(); ++uNrComp )
for (uint32_t uSysIdx = 0; uSysIdx < fComponentsToSend.size(); ++uSysIdx) {
ss << "Found " << std::setw(2) << fvvCompPerSysId[uSysIdx].size() << " components for SysId 0x" << std::hex
<< std::setw(2) << fSysId[uSysIdx] << std::dec << " :";
for (uint32_t uComp = 0; uComp < fvvCompPerSysId[uSysIdx].size(); ++uComp) {
ss << " " << std::setw(3) << fvvCompPerSysId[uSysIdx][uComp];
} // for( uint32_t uComp = 0; uComp < fvvCompPerSysId[ uSysIdx ].size(); ++uComp )
LOG(info) << ss.str();
} // for( uint32_t uSysId = 0; uSysId < fSysId.size(); ++uSysId )
fbListCompPerSysIdReady = true;
/// Then loop on all possible SysId and send TS with their respective components if needed
for (uint32_t uSysIdx = 0; uSysIdx < fComponentsToSend.size(); ++uSysIdx) {
if (0 < fComponentsToSend[uSysIdx]) {
LOG(debug) << "Create timeslice with components for SysId " << std::hex << fSysId[uSysIdx] << std::dec;
fles::StorableTimeslice component {static_cast<uint32_t>(ts.num_core_microslices()), ts.index()};
for (uint32_t uComp = 0; uComp < fvvCompPerSysId[uSysIdx].size(); ++uComp) {
uint32_t uNumMsInComp = ts.num_microslices(fvvCompPerSysId[uSysIdx][uComp]);
LOG(debug) << "Add components to TS for SysId " << std::hex << fSysId[uSysIdx] << std::dec << " TS "
<< ts.index() << " Comp " << fvvCompPerSysId[uSysIdx][uComp];
component.append_microslice(uComp, m, ts.descriptor(fvvCompPerSysId[uSysIdx][uComp], m),
ts.content(fvvCompPerSysId[uSysIdx][uComp], m));
} // for( uint32_t uComp = 0; uComp < fvvCompPerSysId[ uSysIdx ].size(); ++uComp )
LOG(debug) << "Prepared timeslice for SysId " << std::hex << fSysId[uSysIdx] << std::dec << " with "
<< component.num_components() << " components";
if (!SendData(component, uSysIdx)) return false;
} // if( 0 < fvvCompPerSysId[ uSysIdx ].size() )
} // if( 0 < fComponentsToSend[ uSysIdx ] )
} // for( uSysIdx = 0; uSysIdx < fComponentsToSend.size(); ++uSysIdx )
return true;
}
bool CbmMQTsaMultiSampler::CreateAndCombineComponentsPerChannel(const fles::Timeslice& ts)
{
/// First build the list of components for each channel name if it was not already done
/// First add each channel enabled for sending to the list of channels we will use
for (uint32_t uSysIdx = 0; uSysIdx < fComponentsToSend.size(); ++uSysIdx) {
if (0 < fComponentsToSend[uSysIdx]) {
for (uint32_t uChan = 0; uChan < fChannelsToSend[uSysIdx].size(); ++uChan) {
std::find(fvChannelsToSend.begin(), fvChannelsToSend.end(), fChannelsToSend[uSysIdx][uChan]);
if (fvChannelsToSend.end() == pos) {
fvChannelsToSend.push_back(fChannelsToSend[uSysIdx][uChan]);
fvvCompPerChannel.push_back(std::vector<uint32_t>());
} // for( uChan = 0; uChan < fChannelsToSend[ uSysIdx ].size(); ++ uChan )
} // if( 0 < fComponentsToSend[ uSysIdx ] )
} // for( uint32_t uSysIdx = 0; uSysIdx < fComponentsToSend.size(); ++uSysIdx )
/// Now resize the vector in which we will store fo each sending channel the list of components
/// Check for each component if its system is enabled and if the name of its channel(s) is in the list
/// If yes, add it to the vector of the corresponding channel
for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
uint16_t usSysId = ts.descriptor(uCompIdx, 0).sys_id;
const vector<int>::const_iterator pos = std::find(fSysId.begin(), fSysId.end(), usSysId);
const vector<std::string>::size_type idxSys = pos - fSysId.begin();
for (uint32_t uChan = 0; uChan < fChannelsToSend[idxSys].size(); ++uChan) {
std::find(fvChannelsToSend.begin(), fvChannelsToSend.end(), fChannelsToSend[idxSys][uChan]);
const vector<std::string>::size_type idxChan = posCh - fvChannelsToSend.begin();
fvvCompPerChannel[idxChan].push_back(uCompIdx);
} // if( fvChannelsToSend.end() != posCh )
} // for( uChan = 0; uChan < fChannelsToSend[ idxSys ].size(); ++ uChan )
} // if( 0 < fComponentsToSend[ uSysIdx ] )
} // if( fSysId.end() != pos )
} // for( uint32_t uNrComp = 0; uNrComp < ts.num_components(); ++uNrComp )
for (uint32_t uChanIdx = 0; uChanIdx < fvChannelsToSend.size(); ++uChanIdx) {
ss << "Found " << std::setw(2) << fvvCompPerChannel[uChanIdx].size() << " components for channel "
<< fvChannelsToSend[uChanIdx] << " :";
for (uint32_t uComp = 0; uComp < fvvCompPerChannel[uChanIdx].size(); ++uComp) {
ss << " " << std::setw(3) << fvvCompPerChannel[uChanIdx][uComp];
} // for( uint32_t uComp = 0; uComp < fvvCompPerChannel[ uChanIdx ].size(); ++uComp )
LOG(info) << ss.str();
} // for( uint32_t uChanIdx = 0; uChanIdx < fvChannelsToSend.size(); ++uChanIdx )
fbListCompPerChannelReady = true;
/// Then loop on all possible channels and send TS with their respective components if needed
for (uint32_t uChanIdx = 0; uChanIdx < fvChannelsToSend.size(); ++uChanIdx) {
LOG(debug) << "Create timeslice with components for channel " << fvChannelsToSend[uChanIdx];
fles::StorableTimeslice component {static_cast<uint32_t>(ts.num_core_microslices()), ts.index()};
for (uint32_t uComp = 0; uComp < fvvCompPerChannel[uChanIdx].size(); ++uComp) {
uint32_t uNumMsInComp = ts.num_microslices(fvvCompPerChannel[uChanIdx][uComp]);
LOG(debug) << "Add components to TS for SysId " << std::hex
<< static_cast<uint16_t>(ts.descriptor(fvvCompPerChannel[uChanIdx][uComp], 0).sys_id) << std::dec
<< " TS " << ts.index() << " Comp " << fvvCompPerChannel[uChanIdx][uComp];
component.append_microslice(uComp, m, ts.descriptor(fvvCompPerChannel[uChanIdx][uComp], m),
ts.content(fvvCompPerChannel[uChanIdx][uComp], m));
} // for( uint32_t uComp = 0; uComp < fvvCompPerChannel[ uChanIdx ].size(); ++uComp )
LOG(debug) << "Prepared timeslice for channel " << fvChannelsToSend[uChanIdx] << " with "
if (!SendData(component, fvChannelsToSend[uChanIdx])) return false;
} // if( 0 < fvvCompPerSysId[ uSysIdx ].size() )
} // for( uChanIdx = 0; uChanIdx < fvChannelsToSend.size(); ++uChanIdx )
return true;
}
bool CbmMQTsaMultiSampler::CreateAndSendFullTs(const fles::Timeslice& ts)
{
/// Send full TS to all enabled channels
for (uint32_t uChanIdx = 0; uChanIdx < fChannelsToSend.size(); ++uChanIdx) {
if (0 < fComponentsToSend[uChanIdx]) {
LOG(debug) << "Copy timeslice component for channel " << fChannelsToSend[uChanIdx][0];
fles::StorableTimeslice fullTs {ts};
if (!SendData(fullTs, uChanIdx)) return false;
} // if( 0 < fComponentsToSend[ uChanIdx ] )
} // for( uint32_t uChanIdx = 0; uChanIdx < fChannelsToSend.size(); ++uChanIdx )
bool CbmMQTsaMultiSampler::SendData(const fles::StorableTimeslice& component, int idx)
{
// serialize the timeslice and create the message
std::stringstream oss;
boost::archive::binary_oarchive oa(oss);
oa << component;
std::string* strMsg = new std::string(oss.str());
FairMQMessagePtr msg(NewMessage(
const_cast<char*>(strMsg->c_str()), // data
strMsg->length(), // size
[](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
// TODO: Implement sending same data to more than one channel
// Need to create new message (copy message??)
if (fComponentsToSend[idx] > 1) { LOG(info) << "Need to copy FairMessage"; }
// in case of error or transfer interruption,
// return false to go to IDLE state
// successfull transfer will return number of bytes
// transfered (can be 0 if sending an empty message).
LOG(debug) << "Send data to channel " << fChannelsToSend[idx][0];
if (Send(msg, fChannelsToSend[idx][0]) < 0) {
LOG(error) << "Problem sending data";
return false;
}
fMessageCounter++;
LOG(debug) << "Send message " << fMessageCounter << " with a size of " << msg->GetSize();
return true;
}
bool CbmMQTsaMultiSampler::SendData(const fles::StorableTimeslice& component, std::string sChannel)
{
// serialize the timeslice and create the message
std::stringstream oss;
boost::archive::binary_oarchive oa(oss);
oa << component;
std::string* strMsg = new std::string(oss.str());
FairMQMessagePtr msg(NewMessage(
const_cast<char*>(strMsg->c_str()), // data
strMsg->length(), // size
[](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
// in case of error or transfer interruption,
// return false to go to IDLE state
// successfull transfer will return number of bytes
// transfered (can be 0 if sending an empty message).
LOG(debug) << "Send data to channel " << sChannel;
if (Send(msg, sChannel) < 0) {
LOG(error) << "Problem sending data";
return false;
}
fMessageCounter++;
LOG(debug) << "Send message " << fMessageCounter << " with a size of " << msg->GetSize();
bool CbmMQTsaMultiSampler::SendMissedTsIdx(std::vector<uint64_t> vIndices)
{

Pierre-Alain Loizeau
committed
std::stringstream oss;
boost::archive::binary_oarchive oa(oss);
oa << vIndices;
std::string* strMsg = new std::string(oss.str());
FairMQMessagePtr msg(NewMessage(
const_cast<char*>(strMsg->c_str()), // data
strMsg->length(), // size
[](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },

Pierre-Alain Loizeau
committed
// in case of error or transfer interruption,
// return false to go to IDLE state
// successfull transfer will return number of bytes
// transfered (can be 0 if sending an empty message).
LOG(debug) << "Send data to channel " << fsChannelNameMissedTs;
LOG(error) << "Problem sending missed TS indices to channel " << fsChannelNameMissedTs;

Pierre-Alain Loizeau
committed
return false;

Pierre-Alain Loizeau
committed
return true;
}
bool CbmMQTsaMultiSampler::SendCommand(std::string sCommand)
{

Pierre-Alain Loizeau
committed
std::stringstream oss;
boost::archive::binary_oarchive oa(oss);
oa << sCommand;
std::string* strMsg = new std::string(oss.str());
FairMQMessagePtr msg(NewMessage(
const_cast<char*>(strMsg->c_str()), // data
strMsg->length(), // size
[](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },

Pierre-Alain Loizeau
committed
// FairMQMessagePtr msg( NewMessage( const_cast<char*>( sCommand.c_str() ), // data
// sCommand.length(), // size
// []( void* /*data*/, void* object ){ delete static_cast< std::string * >( object ); },
// &sCommand ) ); // object that manages the data

Pierre-Alain Loizeau
committed
// in case of error or transfer interruption,
// return false to go to IDLE state
// successfull transfer will return number of bytes
// transfered (can be 0 if sending an empty message).
LOG(debug) << "Send data to channel " << fsChannelNameCommands;
LOG(error) << "Problem sending missed TS indices to channel " << fsChannelNameCommands;

Pierre-Alain Loizeau
committed
return false;

Pierre-Alain Loizeau
committed
return true;
}
bool CbmMQTsaMultiSampler::SendHistograms()
{
/// Serialize the array of histos into a single MQ message
// Serialize<RootSerializer>(*message, &fArrayHisto);
RootSerializer().Serialize(*message, &fArrayHisto);
/// Send message to the common histogram messages queue
if (Send(message, fsChannelNameHistosInput) < 0) {
LOG(error) << "Problem sending data";
return false;
} // if( Send( message, fsChannelNameHistosInput ) < 0 )
/// Reset the histograms after sending them (but do not reset the time)
ResetHistograms();

Pierre-Alain Loizeau
committed
return true;
}

Pierre-Alain Loizeau
committed
bool CbmMQTsaMultiSampler::ResetHistograms()
{
fhTsRate->Reset();
fhTsSize->Reset();
fhTsSizeEvo->Reset();
fhTsMaxSizeEvo->Reset();
fhMissedTS->Reset();
fhMissedTSEvo->Reset();
return true;

Pierre-Alain Loizeau
committed
}
void CbmMQTsaMultiSampler::CalcRuntime()
{
std::chrono::duration<double> run_time = std::chrono::steady_clock::now() - fTime;
LOG(info) << "Runtime: " << run_time.count();
LOG(info) << "No more input data";
}
void CbmMQTsaMultiSampler::PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor& mdsc)